1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
|
#!/usr/bin/env python3
import os
import re
import sqlite3
import subprocess
import sys
import uuid
from pathlib import Path
from typing import Callable, Optional, Protocol
from email_validator import EmailNotValidError, validate_email
# fixed challenge text used for passphrase verification.
# we store encrypted challenge output instead of storing passwords.
AUTH_CHALLENGE = "SHIM_AUTH_VALID"
HANDLE_RE = re.compile(r"^[a-z0-9_.-]{2,64}$")
UUID_RE = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
)
ConnectFn = Callable[[], sqlite3.Connection]
class AuthBackend(Protocol):
def bootstrap_required(self) -> bool:
...
def create_user(self, username: str, password: str, role: str = "user") -> tuple[bool, str]:
...
def authenticate(self, username: str, password: str) -> Optional[sqlite3.Row]:
...
def update_username(self, user_uuid: str, new_username: str) -> tuple[bool, str]:
...
def update_password(self, user_uuid: str, new_password: str) -> tuple[bool, str]:
...
def normalize_username(username: str) -> str:
return username.strip().lower()
def normalize_uuid(value: str) -> Optional[str]:
candidate = (value or "").strip().lower()
if not UUID_RE.fullmatch(candidate):
return None
return candidate
def is_valid_email_username(value: str) -> bool:
if len(value) > 254:
return False
try:
validated = validate_email(
value,
check_deliverability=False,
allow_smtputf8=False,
allow_display_name=False,
)
except EmailNotValidError:
return False
# keep stored usernames predictable - only accept already normalized forms.
return validated.normalized == value
def looks_like_python_script(path: Path) -> bool:
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
first_line = f.readline(200).lower()
except OSError:
return False
return first_line.startswith("#!") and "python" in first_line
class LocalMojicryptAuthBackend:
"""default auth backend using sqlite and local mojicrypt challenge encryption"""
def __init__(self, connect_db: ConnectFn, mojicrypt_bin: Path):
self.connect_db = connect_db
self.mojicrypt_bin = mojicrypt_bin
self.last_error = ""
def bootstrap_required(self) -> bool:
with self.connect_db() as conn:
row = conn.execute("SELECT COUNT(*) AS count FROM users").fetchone()
return int(row["count"]) == 0
def create_user(self, username: str, password: str, role: str = "user") -> tuple[bool, str]:
username = normalize_username(username)
username_error = self._validate_username(username)
if username_error:
return False, username_error
password_error = self._validate_password(password)
if password_error:
return False, password_error
if role not in {"admin", "user"}:
return False, "invalid role"
# store an encrypted challenge, never the raw password.
encrypted = self.encrypt_password(password)
if not encrypted:
return False, "mojicrypt encryption failed"
try:
with self.connect_db() as conn:
conn.execute(
"""
INSERT INTO users (user_uuid, username, role, encrypted_challenge, created_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
""",
(str(uuid.uuid4()), username, role, encrypted),
)
return True, "user created"
except sqlite3.IntegrityError:
return False, "username already exists"
def update_username(self, user_uuid: str, new_username: str) -> tuple[bool, str]:
normalized_user_uuid = normalize_uuid(user_uuid)
if normalized_user_uuid is None:
return False, "invalid user id"
normalized = normalize_username(new_username)
username_error = self._validate_username(normalized)
if username_error:
return False, username_error
try:
with self.connect_db() as conn:
cursor = conn.execute(
"UPDATE users SET username = ? WHERE user_uuid = ?",
(normalized, normalized_user_uuid),
)
if cursor.rowcount == 0:
return False, "user not found"
except sqlite3.IntegrityError:
return False, "username already exists"
return True, "username updated"
def update_password(self, user_uuid: str, new_password: str) -> tuple[bool, str]:
normalized_user_uuid = normalize_uuid(user_uuid)
if normalized_user_uuid is None:
return False, "invalid user id"
password_error = self._validate_password(new_password)
if password_error:
return False, password_error
# this rotates the encrypted challenge blob for future logins.
encrypted = self.encrypt_password(new_password)
if not encrypted:
return False, "mojicrypt encryption failed"
with self.connect_db() as conn:
cursor = conn.execute(
"UPDATE users SET encrypted_challenge = ? WHERE user_uuid = ?",
(encrypted, normalized_user_uuid),
)
if cursor.rowcount == 0:
return False, "user not found"
return True, "password updated"
def authenticate(self, username: str, password: str) -> Optional[sqlite3.Row]:
username = normalize_username(username)
with self.connect_db() as conn:
user = conn.execute(
"""
SELECT id, username, role, encrypted_challenge
FROM users
WHERE username = ?
""",
(username,),
).fetchone()
if not user:
return None
if not self.verify_password(user["encrypted_challenge"], password):
return None
return user
def encrypt_password(self, password: str) -> Optional[str]:
return self._run_mojicrypt("encrypt", AUTH_CHALLENGE, password)
def _validate_username(self, username: str) -> Optional[str]:
if len(username) < 2:
return "username must be at least 2 characters"
if not username.isascii():
return "username must use ascii characters only"
if HANDLE_RE.fullmatch(username):
return None
if is_valid_email_username(username):
return None
return (
"username must be a handle [a-z0-9_.-] (2-64 chars) or a valid email"
)
def _validate_password(self, password: str) -> Optional[str]:
if len(password) < 2:
return "password must be at least 2 characters"
return None
def verify_password(self, encrypted_blob: str, password: str) -> bool:
decrypted = self._run_mojicrypt("decrypt", encrypted_blob, password)
return decrypted == AUTH_CHALLENGE
def _run_mojicrypt(self, command: str, payload: str, passphrase: str) -> Optional[str]:
self.last_error = ""
if not self.mojicrypt_bin.exists():
self.last_error = f"binary not found at {self.mojicrypt_bin}"
return None
is_python_script = looks_like_python_script(self.mojicrypt_bin)
direct_exec_ok = os.access(self.mojicrypt_bin, os.X_OK)
python_exec_ok = bool(sys.executable) and is_python_script
if not direct_exec_ok and not python_exec_ok:
self.last_error = "binary is not executable"
return None
# try direct execution first, then python fallback when needed.
commands_to_try = []
if direct_exec_ok:
commands_to_try.append([
str(self.mojicrypt_bin),
"-p",
passphrase,
command,
payload,
])
# some systems do not expose a plain `python` binary for shebang execution.
# retrying with the active interpreter avoids false failures for vendored scripts.
if python_exec_ok:
fallback_cmd = [
sys.executable,
str(self.mojicrypt_bin),
"-p",
passphrase,
command,
payload,
]
if fallback_cmd not in commands_to_try:
commands_to_try.append(fallback_cmd)
for cmd in commands_to_try:
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=90,
)
except FileNotFoundError as exc:
self.last_error = str(exc)
continue
except subprocess.TimeoutExpired:
self.last_error = "command timed out"
continue
if result.returncode == 0:
return result.stdout.strip()
stderr = (result.stderr or "").strip()
if stderr:
self.last_error = stderr
else:
self.last_error = f"command exited with status {result.returncode}"
return None
|