aboutsummaryrefslogtreecommitdiffstats
path: root/src/server.py
diff options
context:
space:
mode:
authorKyle Javier [kj_sh604]2026-04-04 02:12:07 -0400
committerGitHub2026-04-04 02:12:07 -0400
commitabf56914d2e9b2658b29be96811930ceedac9aa7 (patch)
treea6f1c9486883d5f1f4ecb088a0fb05d2715e4382 /src/server.py
parent9312014ad56607b1b890db261b8079e968d9f656 (diff)
[squash] refactor: security improvements and sane defaults (#1)
Diffstat (limited to 'src/server.py')
-rw-r--r--src/server.py387
1 files changed, 311 insertions, 76 deletions
diff --git a/src/server.py b/src/server.py
index e82406c..bddee31 100644
--- a/src/server.py
+++ b/src/server.py
@@ -6,7 +6,7 @@
import http.server
import json
-import os
+import ipaddress
import re
import secrets
import signal
@@ -21,20 +21,42 @@ from urllib.parse import urlparse, unquote
# config
-PORT = int(os.environ.get("KJ_CLIPBOARD_PORT", 5555))
-BIND = os.environ.get("KJ_CLIPBOARD_BIND", "0.0.0.0")
+PORT = 5555
+BIND = "0.0.0.0"
BASE_DIR = Path(__file__).parent.resolve()
DB_PATH = BASE_DIR / "data" / "kj-clipboard.db"
RANDOM_ID_LENGTH = 40 # random chars after unix epoch
MAX_PASTE_SIZE = 67 * 1024 * 1024 // 10 # 6.7 MiB
MAX_PASSPHRASE_SIZE = 512
MAX_LANGUAGE_SIZE = 32
-MAX_DECRYPT_SIZE = 3 * 1024 * 1024 # headroom for decrypt requests with long passphrases
+# decrypt requests only need id+passphrase json, keep this small against body-flood abuse.
+MAX_DECRYPT_SIZE = 16 * 1024
ID_PATTERN = re.compile(r"^[0-9]{10,}[a-f0-9]{40}$")
LANGUAGE_PATTERN = re.compile(r"^[a-z0-9_.+#-]{1,32}$")
-REQUESTS_PER_WINDOW = int(os.environ.get("KJ_CLIPBOARD_RATE_LIMIT", "60"))
-RATE_WINDOW_SECONDS = int(os.environ.get("KJ_CLIPBOARD_RATE_WINDOW", "60"))
+# per-ip post limit (create/decrypt).
+REQUESTS_PER_WINDOW = 150
+RATE_WINDOW_SECONDS = 60
+
+# sqlite concurrency defaults
+SQLITE_BUSY_TIMEOUT_MS = 2500
+SQLITE_WRITE_RETRIES = 5
+SQLITE_READ_RETRIES = 3
+SQLITE_RETRY_BASE_MS = 20
+SQLITE_CACHE_SIZE_KIB = 6144
+SQLITE_MMAP_SIZE_BYTES = 134217728
+SQLITE_WAL_AUTOCHECKPOINT_PAGES = 2000
+SQLITE_JOURNAL_SIZE_LIMIT_BYTES = 67108864
+SQLITE_SYNCHRONOUS = "NORMAL" # OFF | NORMAL | FULL | EXTRA
+
+# accept short bursts without immediately refusing tcp connections.
+HTTP_REQUEST_QUEUE_SIZE = 64
+
+TRUST_PROXY = False
+TRUSTED_PROXY_IPS = {"127.0.0.1", "::1"}
+# hsts off by default to avoid breaking plain-http setups.
+ENABLE_HSTS = False
+HSTS_MAX_AGE = 31536000
ALLOWED_LANGUAGES = {
"1c",
@@ -371,11 +393,50 @@ _rate_state = {}
# database
+class DatabaseBusyError(RuntimeError):
+ pass
+
+
+def is_sqlite_busy_error(err):
+ msg = str(err).lower()
+ return "database is locked" in msg or "database is busy" in msg
+
+
+def sqlite_retry_sleep(attempt):
+ delay_ms = SQLITE_RETRY_BASE_MS * (2**attempt)
+ jitter_ms = secrets.randbelow(SQLITE_RETRY_BASE_MS + 1)
+ time.sleep(min((delay_ms + jitter_ms) / 1000.0, 1.0))
+
+
+def open_db():
+ conn = sqlite3.connect(
+ str(DB_PATH),
+ timeout=SQLITE_BUSY_TIMEOUT_MS / 1000.0,
+ )
+ conn.execute(f"PRAGMA busy_timeout={SQLITE_BUSY_TIMEOUT_MS}")
+ conn.execute("PRAGMA foreign_keys=ON")
+ conn.execute(f"PRAGMA cache_size=-{SQLITE_CACHE_SIZE_KIB}")
+ conn.execute("PRAGMA temp_store=MEMORY")
+ if SQLITE_MMAP_SIZE_BYTES > 0:
+ try:
+ conn.execute(f"PRAGMA mmap_size={SQLITE_MMAP_SIZE_BYTES}")
+ except sqlite3.DatabaseError:
+ pass
+ # defense-in-depth: ignore if running on an older sqlite without this pragma.
+ try:
+ conn.execute("PRAGMA trusted_schema=OFF")
+ except sqlite3.DatabaseError:
+ pass
+ return conn
+
+
def init_db():
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
- conn = sqlite3.connect(str(DB_PATH))
+ conn = open_db()
conn.execute("PRAGMA journal_mode=WAL")
- conn.execute("PRAGMA synchronous=NORMAL")
+ conn.execute(f"PRAGMA synchronous={SQLITE_SYNCHRONOUS}")
+ conn.execute(f"PRAGMA wal_autocheckpoint={SQLITE_WAL_AUTOCHECKPOINT_PAGES}")
+ conn.execute(f"PRAGMA journal_size_limit={SQLITE_JOURNAL_SIZE_LIMIT_BYTES}")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS pastes (
@@ -404,41 +465,70 @@ def is_valid_paste_id(paste_id):
def save_paste(content, language=None, is_code=False, is_encrypted=False):
"""store a paste in the database, return its id"""
- conn = sqlite3.connect(str(DB_PATH))
- try:
- for _ in range(5):
- paste_id = generate_id()
+ for write_attempt in range(SQLITE_WRITE_RETRIES + 1):
+ conn = open_db()
+ try:
+ # reserve the write lock early to reduce lock thrash under bursty writes.
+ conn.execute("BEGIN IMMEDIATE")
+ for _ in range(5):
+ paste_id = generate_id()
+ try:
+ conn.execute(
+ "INSERT INTO pastes (id, content, language, is_code, is_encrypted, created_at) "
+ "VALUES (?, ?, ?, ?, ?, ?)",
+ (
+ paste_id,
+ content,
+ language,
+ int(is_code),
+ int(is_encrypted),
+ int(time.time()),
+ ),
+ )
+ conn.commit()
+ return paste_id
+ except sqlite3.IntegrityError:
+ continue
+ conn.rollback()
+ raise RuntimeError("failed to generate unique paste id")
+ except sqlite3.OperationalError as err:
try:
- conn.execute(
- "INSERT INTO pastes (id, content, language, is_code, is_encrypted, created_at) "
- "VALUES (?, ?, ?, ?, ?, ?)",
- (
- paste_id,
- content,
- language,
- int(is_code),
- int(is_encrypted),
- int(time.time()),
- ),
- )
- conn.commit()
- return paste_id
- except sqlite3.IntegrityError:
+ conn.rollback()
+ except sqlite3.DatabaseError:
+ pass
+ if is_sqlite_busy_error(err):
+ if write_attempt >= SQLITE_WRITE_RETRIES:
+ raise DatabaseBusyError("database is busy; retry shortly") from err
+ sqlite_retry_sleep(write_attempt)
continue
- raise RuntimeError("failed to generate unique paste id")
- finally:
- conn.close()
+ raise
+ finally:
+ conn.close()
+
+ raise DatabaseBusyError("database is busy; retry shortly")
def get_paste(paste_id):
"""retrieve a paste by id, returns dict or None"""
- conn = sqlite3.connect(str(DB_PATH))
- conn.row_factory = sqlite3.Row
- row = conn.execute("SELECT * FROM pastes WHERE id = ?", (paste_id,)).fetchone()
- conn.close()
- if row:
- return dict(row)
- return None
+ for read_attempt in range(SQLITE_READ_RETRIES + 1):
+ conn = open_db()
+ try:
+ conn.row_factory = sqlite3.Row
+ row = conn.execute("SELECT * FROM pastes WHERE id = ?", (paste_id,)).fetchone()
+ if row:
+ return sanitize_paste_record(dict(row))
+ return None
+ except sqlite3.OperationalError as err:
+ if is_sqlite_busy_error(err):
+ if read_attempt >= SQLITE_READ_RETRIES:
+ raise DatabaseBusyError("database is busy; retry shortly") from err
+ sqlite_retry_sleep(read_attempt)
+ continue
+ raise
+ finally:
+ conn.close()
+
+ raise DatabaseBusyError("database is busy; retry shortly")
# mojicrypt helpers
@@ -487,26 +577,36 @@ def landing_page():
return (BASE_DIR / "index.html").read_text(encoding="utf-8")
-def paste_page(paste):
+def paste_page(paste, csp_nonce):
"""render the view page for a paste"""
paste_id = paste["id"]
content = paste["content"]
- is_code = paste["is_code"]
- is_encrypted = paste["is_encrypted"]
- language = paste["language"] or ""
- created = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime(paste["created_at"]))
+ is_code = coerce_bool_flag(paste.get("is_code", 0))
+ is_encrypted = coerce_bool_flag(paste.get("is_encrypted", 0))
+ language = normalize_language(paste.get("language", "")) or ""
+ created = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime(int(paste["created_at"])))
+
+ escaped_paste_id = html_escape(paste_id)
+ escaped_paste_id_attr = html_escape_attr(paste_id)
+ escaped_language = html_escape(language)
+
+ paste_id_json = json.dumps(paste_id)
+ code_lang_class = f"language-{language}" if language else ""
+ code_lang_class_json = json.dumps(code_lang_class)
+
+ script_nonce_attr = f' nonce="{html_escape_attr(csp_nonce)}"'
if is_encrypted:
# show decrypt form instead of content
content_block = f"""
<form id="decrypt-form">
<p>this paste is password-protected.</p>
- <input type="password" id="decrypt-pass" placeholder="passphrase" required>
+ <input type="password" id="decrypt-pass" placeholder="passphrase" autocomplete="off" required>
<button type="submit" id="decrypt-btn">decrypt</button>
<span id="decrypt-status" style="margin-left:0.5rem;"></span>
</form>
<div id="paste-content" style="display:none;"></div>
- <script>
+ <script{script_nonce_attr}>
document.getElementById("decrypt-form").addEventListener("submit", async function(e) {{
e.preventDefault();
const btn = document.getElementById("decrypt-btn");
@@ -518,11 +618,20 @@ def paste_page(paste):
const resp = await fetch("/api/decrypt", {{
method: "POST",
headers: {{"Content-Type": "application/json"}},
- body: JSON.stringify({{id: "{paste_id}", passphrase: pass}})
+ body: JSON.stringify({{id: {paste_id_json}, passphrase: pass}})
}});
- const data = await resp.json();
- if (data.error) {{
- alert(data.error);
+ let data = null;
+ try {{
+ data = await resp.json();
+ }} catch (_err) {{
+ btn.disabled = false;
+ btn.textContent = "decrypt";
+ status.textContent = "";
+ alert("decrypt request failed");
+ return;
+ }}
+ if (!resp.ok || data.error) {{
+ alert(data && data.error ? data.error : "decrypt failed");
btn.disabled = false;
btn.textContent = "decrypt";
status.textContent = "";
@@ -535,7 +644,7 @@ def paste_page(paste):
{"" if not is_code else f'''
const codeEl = document.createElement("pre");
const codeInner = document.createElement("code");
- codeInner.className = "{("language-" + language) if language else ""}";
+ codeInner.className = {code_lang_class_json};
codeInner.textContent = data.content;
codeEl.appendChild(codeInner);
el.appendChild(codeEl);
@@ -546,10 +655,6 @@ def paste_page(paste):
pre.textContent = data.content;
el.appendChild(pre);
'''}
- // update copy button
- document.getElementById("copy-btn").onclick = function() {{
- copyPaste();
- }};
// keep decrypted text in a runtime-only container for copy.
el.dataset.decryptedContent = data.content;
}});
@@ -557,7 +662,7 @@ def paste_page(paste):
else:
escaped = html_escape(content)
if is_code:
- lang_class = f'class="language-{language}"' if language else ""
+ lang_class = f'class="language-{html_escape_attr(language)}"' if language else ""
content_block = (
f'<pre><code id="paste-code" {lang_class}>{escaped}</code></pre>'
)
@@ -568,8 +673,8 @@ def paste_page(paste):
highlight_js = ""
if is_code:
highlight_css = '<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/styles/vs2015.min.css">'
- highlight_js = """<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"></script>
-<script>hljs.highlightAll();</script>"""
+ highlight_js = f"""<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"></script>
+ <script{script_nonce_attr}>hljs.highlightAll();</script>"""
return f"""<!DOCTYPE html>
<html lang="en">
@@ -578,22 +683,22 @@ def paste_page(paste):
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="color-scheme" content="light dark">
<meta name="robots" content="noindex, nofollow">
- <title>kj-clipboard - {paste_id}</title>
+ <title>kj-clipboard - {escaped_paste_id}</title>
<link rel="icon" type="image/svg+xml" href="/favicon.svg">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/kj-sh604/noir.css@latest/out/noir.min.css">
{highlight_css}
</head>
<body>
<h1><a href="/" style="text-decoration:none;color:inherit;">kj-clipboard</a></h1>
- <p class="meta">created {created}{(" · " + language) if language else ""}{" · encrypted" if is_encrypted else ""}</p>
+ <p class="meta">created {created}{(" · " + escaped_language) if language else ""}{" · encrypted" if is_encrypted else ""}</p>
<div class="actions">
- <button id="copy-btn" onclick="copyPaste()">copy to clipboard</button>
- <a href="/raw/{paste_id}">raw</a>
+ <button id="copy-btn" type="button">copy to clipboard</button>
+ <a href="/raw/{escaped_paste_id_attr}">raw</a>
</div>
{content_block}
{highlight_js}
- <script>
- function copyPaste() {{
+ <script{script_nonce_attr}>
+ async function copyPaste() {{
const btn = document.getElementById("copy-btn");
let text = "";
@@ -616,10 +721,15 @@ def paste_page(paste):
return;
}}
- navigator.clipboard.writeText(text);
- btn.textContent = "copied!";
+ try {{
+ await navigator.clipboard.writeText(text);
+ btn.textContent = "copied!";
+ }} catch (_err) {{
+ btn.textContent = "copy failed";
+ }}
setTimeout(() => btn.textContent = "copy to clipboard", 1500);
}}
+ document.getElementById("copy-btn").addEventListener("click", copyPaste);
</script>
</body>
</html>"""
@@ -647,6 +757,7 @@ def not_found_page():
def html_escape(text):
+ text = str(text)
return (
text.replace("&", "&amp;")
.replace("<", "&lt;")
@@ -660,6 +771,57 @@ def html_escape_attr(text):
return html_escape(text).replace("\n", "&#10;").replace("\r", "&#13;")
+def generate_csp_nonce():
+ return secrets.token_urlsafe(18)
+
+
+def coerce_bool_flag(value):
+ try:
+ return bool(int(value))
+ except (TypeError, ValueError):
+ return bool(value)
+
+
+def sanitize_paste_record(row):
+ if not isinstance(row, dict):
+ return None
+
+ paste_id = row.get("id")
+ if not isinstance(paste_id, str) or not is_valid_paste_id(paste_id):
+ return None
+
+ content = row.get("content")
+ if not isinstance(content, str):
+ return None
+
+ language = normalize_language(row.get("language", "")) or ""
+ is_code = coerce_bool_flag(row.get("is_code", 0))
+ is_encrypted = coerce_bool_flag(row.get("is_encrypted", 0))
+
+ try:
+ created_at = int(row.get("created_at", 0))
+ except (TypeError, ValueError):
+ created_at = 0
+
+ now = int(time.time())
+ if created_at < 0:
+ created_at = 0
+ if created_at > now + 315360000:
+ created_at = now
+
+ if not is_code:
+ language = ""
+
+ return {
+ "id": paste_id,
+ "content": content,
+ "language": language,
+ "is_code": int(is_code),
+ "is_encrypted": int(is_encrypted),
+ "created_at": created_at,
+ }
+
+
def normalize_language(value):
"""normalize and validate highlight.js language token"""
if value is None:
@@ -680,12 +842,57 @@ def normalize_language(value):
return lang
+def is_valid_ip(value):
+ try:
+ ipaddress.ip_address(value)
+ return True
+ except ValueError:
+ return False
+
+
def get_client_ip(handler):
- """use x-forwarded-for first when present (nginx reverse proxy)"""
+ """resolve client ip safely, only trusting proxy headers when configured"""
+ remote_ip = handler.client_address[0]
+
+ if not TRUST_PROXY or remote_ip not in TRUSTED_PROXY_IPS:
+ return remote_ip
+
xff = handler.headers.get("X-Forwarded-For", "").strip()
if xff:
- return xff.split(",")[0].strip()
- return handler.client_address[0]
+ candidate = xff.split(",")[0].strip()
+ if is_valid_ip(candidate):
+ return candidate
+
+ xri = handler.headers.get("X-Real-IP", "").strip()
+ if xri and is_valid_ip(xri):
+ return xri
+
+ return remote_ip
+
+
+def is_same_origin_post(handler):
+ # block cross-site browser requests while allowing non-browser clients.
+ sec_fetch_site = handler.headers.get("Sec-Fetch-Site", "").strip().lower()
+ if sec_fetch_site == "cross-site":
+ return False
+
+ origin = handler.headers.get("Origin", "").strip()
+ if not origin:
+ return True
+
+ host = handler.headers.get("Host", "").strip().lower()
+ if not host:
+ return False
+
+ try:
+ parsed_origin = urlparse(origin)
+ except ValueError:
+ return False
+
+ if parsed_origin.scheme not in {"http", "https"}:
+ return False
+
+ return secrets.compare_digest(parsed_origin.netloc.lower(), host)
def is_rate_limited(client_ip):
@@ -711,41 +918,59 @@ def is_rate_limited(client_ip):
class ClipboardHandler(http.server.BaseHTTPRequestHandler):
+ server_version = "kj-clipboard"
+ sys_version = ""
+
def log_message(self, fmt, *args):
client_ip = get_client_ip(self)
sys.stderr.write(
f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {client_ip} - {fmt % args}\n"
)
- def add_security_headers(self):
+ def add_security_headers(self, csp_nonce=None):
self.send_header("X-Content-Type-Options", "nosniff")
self.send_header("X-Frame-Options", "DENY")
self.send_header("Referrer-Policy", "no-referrer")
- self.send_header("Permissions-Policy", "interest-cohort=()")
+ self.send_header(
+ "Permissions-Policy",
+ "camera=(), microphone=(), geolocation=(), gyroscope=(), magnetometer=(), payment=(), usb=()",
+ )
self.send_header("Cross-Origin-Opener-Policy", "same-origin")
self.send_header("Cross-Origin-Resource-Policy", "same-origin")
self.send_header("Cache-Control", "no-store")
- # CSP allows required CDNs and inline scripts currently used in templates.
+
+ if ENABLE_HSTS and HSTS_MAX_AGE > 0:
+ self.send_header(
+ "Strict-Transport-Security",
+ f"max-age={HSTS_MAX_AGE}; includeSubDomains",
+ )
+
+ script_sources = ["'self'", "https://cdnjs.cloudflare.com"]
+ if csp_nonce:
+ script_sources.append(f"'nonce-{csp_nonce}'")
+
+ # keep style-src permissive due inline styles and the shared theme css.
self.send_header(
"Content-Security-Policy",
"default-src 'self'; style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net https://cdnjs.cloudflare.com; "
- "script-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com; img-src 'self' data:; "
- "connect-src 'self'; base-uri 'none'; frame-ancestors 'none'; object-src 'none'; form-action 'self'",
+ f"script-src {' '.join(script_sources)}; img-src 'self' data:; connect-src 'self'; "
+ "font-src 'self' https://cdn.jsdelivr.net https://cdnjs.cloudflare.com; "
+ "base-uri 'none'; frame-ancestors 'none'; object-src 'none'; form-action 'self'",
)
- def send_html(self, code, body):
+ def send_html(self, code, body, csp_nonce=None):
data = body.encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
- self.add_security_headers()
+ self.add_security_headers(csp_nonce=csp_nonce)
self.end_headers()
self.wfile.write(data)
def send_json(self, code, obj):
data = json.dumps(obj, separators=(",", ":")).encode("utf-8")
self.send_response(code)
- self.send_header("Content-Type", "application/json")
+ self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.add_security_headers()
self.end_headers()
@@ -854,12 +1079,19 @@ class ClipboardHandler(http.server.BaseHTTPRequestHandler):
self.send_html(404, not_found_page())
return
- self.send_html(200, paste_page(paste))
+ csp_nonce = generate_csp_nonce()
+ self.send_html(200, paste_page(paste, csp_nonce), csp_nonce=csp_nonce)
+ except DatabaseBusyError:
+ self.send_plain(503, "service busy, retry shortly")
except Exception:
self.send_json(500, {"error": "internal server error"})
def do_POST(self):
try:
+ if not is_same_origin_post(self):
+ self.send_json(403, {"error": "cross-origin request blocked"})
+ return
+
client_ip = get_client_ip(self)
if is_rate_limited(client_ip):
self.send_json(429, {"error": "rate limit exceeded"})
@@ -874,6 +1106,8 @@ class ClipboardHandler(http.server.BaseHTTPRequestHandler):
self.handle_decrypt()
else:
self.send_json(404, {"error": "not found"})
+ except DatabaseBusyError:
+ self.send_json(503, {"error": "database busy, retry shortly"})
except Exception:
self.send_json(500, {"error": "internal server error"})
@@ -983,6 +1217,7 @@ class ClipboardHandler(http.server.BaseHTTPRequestHandler):
class ClipboardHTTPServer(http.server.ThreadingHTTPServer):
daemon_threads = True
allow_reuse_address = True
+ request_queue_size = HTTP_REQUEST_QUEUE_SIZE
def main():