|
18 | 18 | ``logs/auditor/<audit_type>__<timestamp>.audit``. |
19 | 19 | """ |
20 | 20 |
|
| 21 | +import uuid |
21 | 22 | import json |
22 | 23 | import gnupg |
23 | 24 |
|
|
32 | 33 | DEFAULT_AUDITOR_OUT_DIR = Path("logs/auditor") |
33 | 34 | DEFAULT_AUDITOR_OUT_DIR.mkdir(parents=True, exist_ok=True) |
34 | 35 |
|
35 | | -# Default gpg auditor public key |
36 | | -AUDITOR_PUB_KEY_DIR = Path("resources/keys") |
37 | | -AUDITOR_PUB_KEY_FILE = AUDITOR_PUB_KEY_DIR / Path("llm-router-auditor-pub.asc") |
38 | | - |
39 | 36 |
|
40 | 37 | class GPGAuditorLogStorage(AuditorLogStorageInterface): |
| 38 | + # Default gpg auditor public key |
| 39 | + AUDITOR_PUB_KEY_DIR = Path("resources/keys") |
| 40 | + AUDITOR_PUB_KEY_FILE = AUDITOR_PUB_KEY_DIR / Path("llm-router-auditor-pub.asc") |
| 41 | + |
41 | 42 | """ |
42 | 43 | GPG‑backed storage for audit logs. |
43 | 44 |
|
@@ -68,14 +69,16 @@ def __init__(self): |
68 | 69 | """ |
69 | 70 |
|
70 | 71 | self._import_result = None |
71 | | - self._gpg = gnupg.GPG(gnupghome=str(AUDITOR_PUB_KEY_DIR)) |
| 72 | + self._gpg = gnupg.GPG(gnupghome=str(self.AUDITOR_PUB_KEY_DIR)) |
72 | 73 | self._gpg.encoding = "utf-8" |
73 | 74 |
|
74 | | - with AUDITOR_PUB_KEY_FILE.open("r", encoding="utf-8") as f: |
| 75 | + self.__verify() |
| 76 | + |
| 77 | + with self.AUDITOR_PUB_KEY_FILE.open("r", encoding="utf-8") as f: |
75 | 78 | self._import_result = self._gpg.import_keys(f.read()) |
76 | 79 | if not self._import_result.count: |
77 | 80 | raise RuntimeError( |
78 | | - f"Failed to import public key from {AUDITOR_PUB_KEY_FILE}" |
| 81 | + f"Failed to import public key from {self.AUDITOR_PUB_KEY_FILE}" |
79 | 82 | ) |
80 | 83 |
|
81 | 84 | def store_log(self, audit_log, audit_type: str): |
@@ -114,3 +117,46 @@ def store_log(self, audit_log, audit_type: str): |
114 | 117 | armor=True, |
115 | 118 | ) |
116 | 119 | f.write(str(encrypted_data)) |
| 120 | + |
| 121 | + def __verify(self): |
| 122 | + """ |
| 123 | + Perform internal sanity checks during initialisation. |
| 124 | +
|
| 125 | + This method verifies that the required GPG public key file exists |
| 126 | + and that the process has write access to ``DEFAULT_AUDITOR_OUT_DIR``. |
| 127 | + It raises an exception if either check fails. |
| 128 | + """ |
| 129 | + self.__check_key_exists() |
| 130 | + self.__check_write_permission() |
| 131 | + |
| 132 | + def __check_key_exists(self): |
| 133 | + """ |
| 134 | + Ensure that the GPG public key file specified by |
| 135 | + ``AUDITOR_PUB_KEY_FILE`` is present on the filesystem. |
| 136 | +
|
| 137 | + Raises |
| 138 | + ------ |
| 139 | + Exception |
| 140 | + If the public key file does not exist. |
| 141 | + """ |
| 142 | + if not self.AUDITOR_PUB_KEY_FILE.exists(): |
| 143 | + raise Exception( |
| 144 | + f"GPG public key {self.AUDITOR_PUB_KEY_FILE} does not exists!" |
| 145 | + ) |
| 146 | + |
| 147 | + def __check_write_permission(self): |
| 148 | + """ |
| 149 | + Verify that the process can write to ``DEFAULT_AUDITOR_OUT_DIR``. |
| 150 | + Writes a temporary file containing the string ``"llm-router"``, |
| 151 | + then removes it. Raises ``PermissionError`` if any step fails. |
| 152 | + """ |
| 153 | + try: |
| 154 | + temp_name = f"perm_check_{uuid.uuid4().hex}.audit" |
| 155 | + temp_path = DEFAULT_AUDITOR_OUT_DIR / temp_name |
| 156 | + with open(temp_path, "wt") as f: |
| 157 | + f.write("llm-router") |
| 158 | + temp_path.unlink() |
| 159 | + except Exception as exc: |
| 160 | + raise PermissionError( |
| 161 | + f"Unable to write to {DEFAULT_AUDITOR_OUT_DIR}: {exc}" |
| 162 | + ) |
0 commit comments