diff options
| author | Kyle Javier [kj_sh604] | 2026-04-04 02:12:07 -0400 |
|---|---|---|
| committer | GitHub | 2026-04-04 02:12:07 -0400 |
| commit | abf56914d2e9b2658b29be96811930ceedac9aa7 (patch) | |
| tree | a6f1c9486883d5f1f4ecb088a0fb05d2715e4382 /src/server.py | |
| parent | 9312014ad56607b1b890db261b8079e968d9f656 (diff) | |
[squash] refactor: security improvements and sane defaults (#1)
Diffstat (limited to 'src/server.py')
| -rw-r--r-- | src/server.py | 387 |
1 files changed, 311 insertions, 76 deletions
diff --git a/src/server.py b/src/server.py index e82406c..bddee31 100644 --- a/src/server.py +++ b/src/server.py @@ -6,7 +6,7 @@ import http.server import json -import os +import ipaddress import re import secrets import signal @@ -21,20 +21,42 @@ from urllib.parse import urlparse, unquote # config -PORT = int(os.environ.get("KJ_CLIPBOARD_PORT", 5555)) -BIND = os.environ.get("KJ_CLIPBOARD_BIND", "0.0.0.0") +PORT = 5555 +BIND = "0.0.0.0" BASE_DIR = Path(__file__).parent.resolve() DB_PATH = BASE_DIR / "data" / "kj-clipboard.db" RANDOM_ID_LENGTH = 40 # random chars after unix epoch MAX_PASTE_SIZE = 67 * 1024 * 1024 // 10 # 6.7 MiB MAX_PASSPHRASE_SIZE = 512 MAX_LANGUAGE_SIZE = 32 -MAX_DECRYPT_SIZE = 3 * 1024 * 1024 # headroom for decrypt requests with long passphrases +# decrypt requests only need id+passphrase json, keep this small against body-flood abuse. +MAX_DECRYPT_SIZE = 16 * 1024 ID_PATTERN = re.compile(r"^[0-9]{10,}[a-f0-9]{40}$") LANGUAGE_PATTERN = re.compile(r"^[a-z0-9_.+#-]{1,32}$") -REQUESTS_PER_WINDOW = int(os.environ.get("KJ_CLIPBOARD_RATE_LIMIT", "60")) -RATE_WINDOW_SECONDS = int(os.environ.get("KJ_CLIPBOARD_RATE_WINDOW", "60")) +# per-ip post limit (create/decrypt). +REQUESTS_PER_WINDOW = 150 +RATE_WINDOW_SECONDS = 60 + +# sqlite concurrency defaults +SQLITE_BUSY_TIMEOUT_MS = 2500 +SQLITE_WRITE_RETRIES = 5 +SQLITE_READ_RETRIES = 3 +SQLITE_RETRY_BASE_MS = 20 +SQLITE_CACHE_SIZE_KIB = 6144 +SQLITE_MMAP_SIZE_BYTES = 134217728 +SQLITE_WAL_AUTOCHECKPOINT_PAGES = 2000 +SQLITE_JOURNAL_SIZE_LIMIT_BYTES = 67108864 +SQLITE_SYNCHRONOUS = "NORMAL" # OFF | NORMAL | FULL | EXTRA + +# accept short bursts without immediately refusing tcp connections. +HTTP_REQUEST_QUEUE_SIZE = 64 + +TRUST_PROXY = False +TRUSTED_PROXY_IPS = {"127.0.0.1", "::1"} +# hsts off by default to avoid breaking plain-http setups. +ENABLE_HSTS = False +HSTS_MAX_AGE = 31536000 ALLOWED_LANGUAGES = { "1c", @@ -371,11 +393,50 @@ _rate_state = {} # database +class DatabaseBusyError(RuntimeError): + pass + + +def is_sqlite_busy_error(err): + msg = str(err).lower() + return "database is locked" in msg or "database is busy" in msg + + +def sqlite_retry_sleep(attempt): + delay_ms = SQLITE_RETRY_BASE_MS * (2**attempt) + jitter_ms = secrets.randbelow(SQLITE_RETRY_BASE_MS + 1) + time.sleep(min((delay_ms + jitter_ms) / 1000.0, 1.0)) + + +def open_db(): + conn = sqlite3.connect( + str(DB_PATH), + timeout=SQLITE_BUSY_TIMEOUT_MS / 1000.0, + ) + conn.execute(f"PRAGMA busy_timeout={SQLITE_BUSY_TIMEOUT_MS}") + conn.execute("PRAGMA foreign_keys=ON") + conn.execute(f"PRAGMA cache_size=-{SQLITE_CACHE_SIZE_KIB}") + conn.execute("PRAGMA temp_store=MEMORY") + if SQLITE_MMAP_SIZE_BYTES > 0: + try: + conn.execute(f"PRAGMA mmap_size={SQLITE_MMAP_SIZE_BYTES}") + except sqlite3.DatabaseError: + pass + # defense-in-depth: ignore if running on an older sqlite without this pragma. + try: + conn.execute("PRAGMA trusted_schema=OFF") + except sqlite3.DatabaseError: + pass + return conn + + def init_db(): DB_PATH.parent.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(str(DB_PATH)) + conn = open_db() conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA synchronous=NORMAL") + conn.execute(f"PRAGMA synchronous={SQLITE_SYNCHRONOUS}") + conn.execute(f"PRAGMA wal_autocheckpoint={SQLITE_WAL_AUTOCHECKPOINT_PAGES}") + conn.execute(f"PRAGMA journal_size_limit={SQLITE_JOURNAL_SIZE_LIMIT_BYTES}") conn.execute( """ CREATE TABLE IF NOT EXISTS pastes ( @@ -404,41 +465,70 @@ def is_valid_paste_id(paste_id): def save_paste(content, language=None, is_code=False, is_encrypted=False): """store a paste in the database, return its id""" - conn = sqlite3.connect(str(DB_PATH)) - try: - for _ in range(5): - paste_id = generate_id() + for write_attempt in range(SQLITE_WRITE_RETRIES + 1): + conn = open_db() + try: + # reserve the write lock early to reduce lock thrash under bursty writes. + conn.execute("BEGIN IMMEDIATE") + for _ in range(5): + paste_id = generate_id() + try: + conn.execute( + "INSERT INTO pastes (id, content, language, is_code, is_encrypted, created_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + ( + paste_id, + content, + language, + int(is_code), + int(is_encrypted), + int(time.time()), + ), + ) + conn.commit() + return paste_id + except sqlite3.IntegrityError: + continue + conn.rollback() + raise RuntimeError("failed to generate unique paste id") + except sqlite3.OperationalError as err: try: - conn.execute( - "INSERT INTO pastes (id, content, language, is_code, is_encrypted, created_at) " - "VALUES (?, ?, ?, ?, ?, ?)", - ( - paste_id, - content, - language, - int(is_code), - int(is_encrypted), - int(time.time()), - ), - ) - conn.commit() - return paste_id - except sqlite3.IntegrityError: + conn.rollback() + except sqlite3.DatabaseError: + pass + if is_sqlite_busy_error(err): + if write_attempt >= SQLITE_WRITE_RETRIES: + raise DatabaseBusyError("database is busy; retry shortly") from err + sqlite_retry_sleep(write_attempt) continue - raise RuntimeError("failed to generate unique paste id") - finally: - conn.close() + raise + finally: + conn.close() + + raise DatabaseBusyError("database is busy; retry shortly") def get_paste(paste_id): """retrieve a paste by id, returns dict or None""" - conn = sqlite3.connect(str(DB_PATH)) - conn.row_factory = sqlite3.Row - row = conn.execute("SELECT * FROM pastes WHERE id = ?", (paste_id,)).fetchone() - conn.close() - if row: - return dict(row) - return None + for read_attempt in range(SQLITE_READ_RETRIES + 1): + conn = open_db() + try: + conn.row_factory = sqlite3.Row + row = conn.execute("SELECT * FROM pastes WHERE id = ?", (paste_id,)).fetchone() + if row: + return sanitize_paste_record(dict(row)) + return None + except sqlite3.OperationalError as err: + if is_sqlite_busy_error(err): + if read_attempt >= SQLITE_READ_RETRIES: + raise DatabaseBusyError("database is busy; retry shortly") from err + sqlite_retry_sleep(read_attempt) + continue + raise + finally: + conn.close() + + raise DatabaseBusyError("database is busy; retry shortly") # mojicrypt helpers @@ -487,26 +577,36 @@ def landing_page(): return (BASE_DIR / "index.html").read_text(encoding="utf-8") -def paste_page(paste): +def paste_page(paste, csp_nonce): """render the view page for a paste""" paste_id = paste["id"] content = paste["content"] - is_code = paste["is_code"] - is_encrypted = paste["is_encrypted"] - language = paste["language"] or "" - created = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime(paste["created_at"])) + is_code = coerce_bool_flag(paste.get("is_code", 0)) + is_encrypted = coerce_bool_flag(paste.get("is_encrypted", 0)) + language = normalize_language(paste.get("language", "")) or "" + created = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime(int(paste["created_at"]))) + + escaped_paste_id = html_escape(paste_id) + escaped_paste_id_attr = html_escape_attr(paste_id) + escaped_language = html_escape(language) + + paste_id_json = json.dumps(paste_id) + code_lang_class = f"language-{language}" if language else "" + code_lang_class_json = json.dumps(code_lang_class) + + script_nonce_attr = f' nonce="{html_escape_attr(csp_nonce)}"' if is_encrypted: # show decrypt form instead of content content_block = f""" <form id="decrypt-form"> <p>this paste is password-protected.</p> - <input type="password" id="decrypt-pass" placeholder="passphrase" required> + <input type="password" id="decrypt-pass" placeholder="passphrase" autocomplete="off" required> <button type="submit" id="decrypt-btn">decrypt</button> <span id="decrypt-status" style="margin-left:0.5rem;"></span> </form> <div id="paste-content" style="display:none;"></div> - <script> + <script{script_nonce_attr}> document.getElementById("decrypt-form").addEventListener("submit", async function(e) {{ e.preventDefault(); const btn = document.getElementById("decrypt-btn"); @@ -518,11 +618,20 @@ def paste_page(paste): const resp = await fetch("/api/decrypt", {{ method: "POST", headers: {{"Content-Type": "application/json"}}, - body: JSON.stringify({{id: "{paste_id}", passphrase: pass}}) + body: JSON.stringify({{id: {paste_id_json}, passphrase: pass}}) }}); - const data = await resp.json(); - if (data.error) {{ - alert(data.error); + let data = null; + try {{ + data = await resp.json(); + }} catch (_err) {{ + btn.disabled = false; + btn.textContent = "decrypt"; + status.textContent = ""; + alert("decrypt request failed"); + return; + }} + if (!resp.ok || data.error) {{ + alert(data && data.error ? data.error : "decrypt failed"); btn.disabled = false; btn.textContent = "decrypt"; status.textContent = ""; @@ -535,7 +644,7 @@ def paste_page(paste): {"" if not is_code else f''' const codeEl = document.createElement("pre"); const codeInner = document.createElement("code"); - codeInner.className = "{("language-" + language) if language else ""}"; + codeInner.className = {code_lang_class_json}; codeInner.textContent = data.content; codeEl.appendChild(codeInner); el.appendChild(codeEl); @@ -546,10 +655,6 @@ def paste_page(paste): pre.textContent = data.content; el.appendChild(pre); '''} - // update copy button - document.getElementById("copy-btn").onclick = function() {{ - copyPaste(); - }}; // keep decrypted text in a runtime-only container for copy. el.dataset.decryptedContent = data.content; }}); @@ -557,7 +662,7 @@ def paste_page(paste): else: escaped = html_escape(content) if is_code: - lang_class = f'class="language-{language}"' if language else "" + lang_class = f'class="language-{html_escape_attr(language)}"' if language else "" content_block = ( f'<pre><code id="paste-code" {lang_class}>{escaped}</code></pre>' ) @@ -568,8 +673,8 @@ def paste_page(paste): highlight_js = "" if is_code: highlight_css = '<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/styles/vs2015.min.css">' - highlight_js = """<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"></script> -<script>hljs.highlightAll();</script>""" + highlight_js = f"""<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"></script> + <script{script_nonce_attr}>hljs.highlightAll();</script>""" return f"""<!DOCTYPE html> <html lang="en"> @@ -578,22 +683,22 @@ def paste_page(paste): <meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="color-scheme" content="light dark"> <meta name="robots" content="noindex, nofollow"> - <title>kj-clipboard - {paste_id}</title> + <title>kj-clipboard - {escaped_paste_id}</title> <link rel="icon" type="image/svg+xml" href="/favicon.svg"> <link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/kj-sh604/noir.css@latest/out/noir.min.css"> {highlight_css} </head> <body> <h1><a href="/" style="text-decoration:none;color:inherit;">kj-clipboard</a></h1> - <p class="meta">created {created}{(" · " + language) if language else ""}{" · encrypted" if is_encrypted else ""}</p> + <p class="meta">created {created}{(" · " + escaped_language) if language else ""}{" · encrypted" if is_encrypted else ""}</p> <div class="actions"> - <button id="copy-btn" onclick="copyPaste()">copy to clipboard</button> - <a href="/raw/{paste_id}">raw</a> + <button id="copy-btn" type="button">copy to clipboard</button> + <a href="/raw/{escaped_paste_id_attr}">raw</a> </div> {content_block} {highlight_js} - <script> - function copyPaste() {{ + <script{script_nonce_attr}> + async function copyPaste() {{ const btn = document.getElementById("copy-btn"); let text = ""; @@ -616,10 +721,15 @@ def paste_page(paste): return; }} - navigator.clipboard.writeText(text); - btn.textContent = "copied!"; + try {{ + await navigator.clipboard.writeText(text); + btn.textContent = "copied!"; + }} catch (_err) {{ + btn.textContent = "copy failed"; + }} setTimeout(() => btn.textContent = "copy to clipboard", 1500); }} + document.getElementById("copy-btn").addEventListener("click", copyPaste); </script> </body> </html>""" @@ -647,6 +757,7 @@ def not_found_page(): def html_escape(text): + text = str(text) return ( text.replace("&", "&") .replace("<", "<") @@ -660,6 +771,57 @@ def html_escape_attr(text): return html_escape(text).replace("\n", " ").replace("\r", " ") +def generate_csp_nonce(): + return secrets.token_urlsafe(18) + + +def coerce_bool_flag(value): + try: + return bool(int(value)) + except (TypeError, ValueError): + return bool(value) + + +def sanitize_paste_record(row): + if not isinstance(row, dict): + return None + + paste_id = row.get("id") + if not isinstance(paste_id, str) or not is_valid_paste_id(paste_id): + return None + + content = row.get("content") + if not isinstance(content, str): + return None + + language = normalize_language(row.get("language", "")) or "" + is_code = coerce_bool_flag(row.get("is_code", 0)) + is_encrypted = coerce_bool_flag(row.get("is_encrypted", 0)) + + try: + created_at = int(row.get("created_at", 0)) + except (TypeError, ValueError): + created_at = 0 + + now = int(time.time()) + if created_at < 0: + created_at = 0 + if created_at > now + 315360000: + created_at = now + + if not is_code: + language = "" + + return { + "id": paste_id, + "content": content, + "language": language, + "is_code": int(is_code), + "is_encrypted": int(is_encrypted), + "created_at": created_at, + } + + def normalize_language(value): """normalize and validate highlight.js language token""" if value is None: @@ -680,12 +842,57 @@ def normalize_language(value): return lang +def is_valid_ip(value): + try: + ipaddress.ip_address(value) + return True + except ValueError: + return False + + def get_client_ip(handler): - """use x-forwarded-for first when present (nginx reverse proxy)""" + """resolve client ip safely, only trusting proxy headers when configured""" + remote_ip = handler.client_address[0] + + if not TRUST_PROXY or remote_ip not in TRUSTED_PROXY_IPS: + return remote_ip + xff = handler.headers.get("X-Forwarded-For", "").strip() if xff: - return xff.split(",")[0].strip() - return handler.client_address[0] + candidate = xff.split(",")[0].strip() + if is_valid_ip(candidate): + return candidate + + xri = handler.headers.get("X-Real-IP", "").strip() + if xri and is_valid_ip(xri): + return xri + + return remote_ip + + +def is_same_origin_post(handler): + # block cross-site browser requests while allowing non-browser clients. + sec_fetch_site = handler.headers.get("Sec-Fetch-Site", "").strip().lower() + if sec_fetch_site == "cross-site": + return False + + origin = handler.headers.get("Origin", "").strip() + if not origin: + return True + + host = handler.headers.get("Host", "").strip().lower() + if not host: + return False + + try: + parsed_origin = urlparse(origin) + except ValueError: + return False + + if parsed_origin.scheme not in {"http", "https"}: + return False + + return secrets.compare_digest(parsed_origin.netloc.lower(), host) def is_rate_limited(client_ip): @@ -711,41 +918,59 @@ def is_rate_limited(client_ip): class ClipboardHandler(http.server.BaseHTTPRequestHandler): + server_version = "kj-clipboard" + sys_version = "" + def log_message(self, fmt, *args): client_ip = get_client_ip(self) sys.stderr.write( f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {client_ip} - {fmt % args}\n" ) - def add_security_headers(self): + def add_security_headers(self, csp_nonce=None): self.send_header("X-Content-Type-Options", "nosniff") self.send_header("X-Frame-Options", "DENY") self.send_header("Referrer-Policy", "no-referrer") - self.send_header("Permissions-Policy", "interest-cohort=()") + self.send_header( + "Permissions-Policy", + "camera=(), microphone=(), geolocation=(), gyroscope=(), magnetometer=(), payment=(), usb=()", + ) self.send_header("Cross-Origin-Opener-Policy", "same-origin") self.send_header("Cross-Origin-Resource-Policy", "same-origin") self.send_header("Cache-Control", "no-store") - # CSP allows required CDNs and inline scripts currently used in templates. + + if ENABLE_HSTS and HSTS_MAX_AGE > 0: + self.send_header( + "Strict-Transport-Security", + f"max-age={HSTS_MAX_AGE}; includeSubDomains", + ) + + script_sources = ["'self'", "https://cdnjs.cloudflare.com"] + if csp_nonce: + script_sources.append(f"'nonce-{csp_nonce}'") + + # keep style-src permissive due inline styles and the shared theme css. self.send_header( "Content-Security-Policy", "default-src 'self'; style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://cdnjs.cloudflare.com; " - "script-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com; img-src 'self' data:; " - "connect-src 'self'; base-uri 'none'; frame-ancestors 'none'; object-src 'none'; form-action 'self'", + f"script-src {' '.join(script_sources)}; img-src 'self' data:; connect-src 'self'; " + "font-src 'self' https://cdn.jsdelivr.net https://cdnjs.cloudflare.com; " + "base-uri 'none'; frame-ancestors 'none'; object-src 'none'; form-action 'self'", ) - def send_html(self, code, body): + def send_html(self, code, body, csp_nonce=None): data = body.encode("utf-8") self.send_response(code) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(data))) - self.add_security_headers() + self.add_security_headers(csp_nonce=csp_nonce) self.end_headers() self.wfile.write(data) def send_json(self, code, obj): data = json.dumps(obj, separators=(",", ":")).encode("utf-8") self.send_response(code) - self.send_header("Content-Type", "application/json") + self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.add_security_headers() self.end_headers() @@ -854,12 +1079,19 @@ class ClipboardHandler(http.server.BaseHTTPRequestHandler): self.send_html(404, not_found_page()) return - self.send_html(200, paste_page(paste)) + csp_nonce = generate_csp_nonce() + self.send_html(200, paste_page(paste, csp_nonce), csp_nonce=csp_nonce) + except DatabaseBusyError: + self.send_plain(503, "service busy, retry shortly") except Exception: self.send_json(500, {"error": "internal server error"}) def do_POST(self): try: + if not is_same_origin_post(self): + self.send_json(403, {"error": "cross-origin request blocked"}) + return + client_ip = get_client_ip(self) if is_rate_limited(client_ip): self.send_json(429, {"error": "rate limit exceeded"}) @@ -874,6 +1106,8 @@ class ClipboardHandler(http.server.BaseHTTPRequestHandler): self.handle_decrypt() else: self.send_json(404, {"error": "not found"}) + except DatabaseBusyError: + self.send_json(503, {"error": "database busy, retry shortly"}) except Exception: self.send_json(500, {"error": "internal server error"}) @@ -983,6 +1217,7 @@ class ClipboardHandler(http.server.BaseHTTPRequestHandler): class ClipboardHTTPServer(http.server.ThreadingHTTPServer): daemon_threads = True allow_reuse_address = True + request_queue_size = HTTP_REQUEST_QUEUE_SIZE def main(): |
