-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaudit.py
More file actions
102 lines (77 loc) · 2.91 KB
/
audit.py
File metadata and controls
102 lines (77 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# audit.py (FINAL - IMPROVED FOR KMS SYNC)
from datetime import datetime, timezone
from config import NODE_ID
class AuditLogger:
def __init__(self):
self.node_id = NODE_ID
# -------------------------------
# CORE LOG
# -------------------------------
def log(self, event_type, message, plane="LOCAL"):
timestamp = datetime.now(timezone.utc).isoformat()
print(
f"[{timestamp}] "
f"[NODE={self.node_id}] "
f"[PLANE={plane}] "
f"[{event_type}] "
f"{message}"
)
# -------------------------------
# SYSTEM
# -------------------------------
def system_start(self):
self.log("SYSTEM_START", "Node started", "SYSTEM")
def system_stop(self):
self.log("SYSTEM_STOP", "Node stopped", "SYSTEM")
# -------------------------------
# KEY EVENTS
# -------------------------------
def key_added(self, key_id, origin="LOCAL"):
self.log("KEY_ADDED", f"id={key_id} from {origin}")
def key_served(self, key_id):
self.log("KEY_SERVED", f"id={key_id}", "APP")
def key_used(self, key_id):
self.log("KEY_USED", f"id={key_id}", "APP")
# -------------------------------
# SYNC EVENTS (VERY IMPORTANT)
# -------------------------------
def sync_send(self, key_id, target):
self.log("SYNC_SEND", f"id={key_id} → {target}", "SYNC")
def sync_receive(self, key_id, source):
self.log("SYNC_RECEIVE", f"id={key_id} ← {source}", "SYNC")
def sync_success(self, key_id):
self.log("SYNC_OK", f"id={key_id} matched", "SYNC")
def sync_fail(self, key_id):
self.log("SYNC_FAIL", f"id={key_id} mismatch", "SYNC")
def sync_compare(self, key_id, local_hash, remote_hash):
self.log(
"SYNC_COMPARE",
f"id={key_id} local={local_hash[:6]} remote={remote_hash[:6]}",
"SYNC"
)
def sync_progress(self, key_id):
self.log("SYNC_PROGRESS", f"id={key_id}", "SYNC")
# -------------------------------
# INTER-KMS
# -------------------------------
def key_sent(self, key_id, remote):
self.log("KEY_SENT", f"id={key_id} → {remote}", "INTER-KMS")
def key_received(self, key_id, remote):
self.log("KEY_RECEIVED", f"id={key_id} ← {remote}", "INTER-KMS")
# -------------------------------
# CRYPTO
# -------------------------------
def encrypt(self, key_id, size):
self.log("ENCRYPT", f"id={key_id} bytes={size}", "APP")
def decrypt(self, key_id, size):
self.log("DECRYPT", f"id={key_id} bytes={size}", "APP")
# -------------------------------
# API
# -------------------------------
def api(self, endpoint):
self.log("API", endpoint, "API")
# -------------------------------
# ERROR
# -------------------------------
def error(self, msg):
self.log("ERROR", msg, "ERROR")