cd /news/developer-tools/gist-6a5e632583c67dadf84d68d339cdf79… Β· home β€Ί topics β€Ί developer-tools β€Ί article
[ARTICLE Β· art-26126] src=gist.github.com pub= topic=developer-tools verified=true sentiment=Β· neutral

gist:6a5e632583c67dadf84d68d339cdf799

A developer built an AUR malware monitor that performs static analysis on Arch User Repository packages. The tool fetches AUR metadata, applies regex pre-filters, and retrieves git diffs via cgit to identify potentially malicious packages without executing any code. It stages suspicious candidates for human review or automated classification.

read26 min publishedJun 13, 2026

| #!/usr/bin/env python3 | | | """AUR malware monitor β€” candidate dropper. | | | Static analysis only. Never runs makepkg, never sources PKGBUILDs, never | | | executes anything fetched. Fetches AUR bulk metadata, selects newly created / | | | recently modified packages, runs a cheap offline regex pre-filter, fetches the | | | git diff of each candidate over cgit (HTTP, no clones), and stages the diff + | | | metadata into per-candidate folders for a human (or claude -p) to classify. | | | See design.md for the threat model and pipeline. | | | """ | | | from future import annotations | | | import argparse | | | import gzip | | | import json | | | import re | | | import shutil | | | import subprocess | | | import sys | | | import time | | | from concurrent.futures import ThreadPoolExecutor | | | from dataclasses import dataclass, field | | | from datetime import datetime, timezone | | | from pathlib import Path | | | import requests | | | # Known-bad indicators. If any appears in a candidate's diff it's marked | | | # malicious outright β€” no need to spend a classification call. (atomic-lockfile | | | # + execa were the payload pulled by the 2026 ledger-udev-bin specimen.) | | | KNOWN_BAD_IOCS = [ | | | "atomic-lockfile", | | | ] | | | # Verdicts the analyst may return, ordered clean -> worst. Anything at or above | | | # ALERT_THRESHOLD rings the terminal bell. | | | VERDICT_ORDER = ["clean", "review", "suspicious", "malicious"] | | | ALERT_THRESHOLD = "suspicious" | | | VERDICT_RANK = {v: i for i, v in enumerate(VERDICT_ORDER)} | | | USER_AGENT = "aur-malware-monitor/0.1 (static-analysis; +https://github.com/local/aur-monitor)" | | | META_URL = "https://aur.archlinux.org/packages-meta-ext-v1.json.gz" | | | # cgit patch view: latest commit as a git-format patch, including its SHA in the | | | # "From <sha>" header. The pkgbase and id2 lower bound are passed as query | | | # params so requests URL-encodes them (pkgbases can contain +, which would | | | # otherwise be read as a space). | | | CGIT_PATCH_URL = "https://aur.archlinux.org/cgit/aur.git/patch/" | | | PKG_PAGE_URL = "https://aur.archlinux.org/packages/{name}" | | | STATE_DIR = Path.home() / ".cache" / "aur-watch" | | | STATE_FILE = STATE_DIR / "state.json" | | | MAX_DIFF_BYTES = 512 * 1024 # cap on a single staged diff | | | # --- Regex pre-filter (cheap, offline) ------------------------------------- | | | # Each rule is (signal_name, compiled_regex). A hit prioritises the candidate | | | # for review and is recorded in meta.json. See design.md "Regex pre-filter". | | | BRAND_KEYWORDS = ( | | | "firefox|chrome|brave|librewolf|zen|tor|vivaldi|opera|telegram|" | | | "signal|discord|spotify|zoom|vscode" | | | ) | | | PREFILTER_RULES: list[tuple[str, re.Pattern]] = [ | | | ("pipe-to-shell", re.compile(r"(curl|wget|fetch)[^\n]|\s(bash|sh|zsh)", re.I)), | | | ("exec-process-substitution", re.compile(r"(bash|sh)\s+<(\s*(curl|wget)", re.I)), | | | ("python-remote-exec", re.compile( | | | r"python[0-9.]\s+-c[^\n](urlopen|requests.get|urllib)", re.I)), | | | ("base64-to-shell", re.compile(r"base64\s+-d[^\n]|\s(ba)?sh", re.I)), | | | ("hardcoded-ip-port", re.compile(r"\b(?:\d{1,3}.){3}\d{1,3}:\d+")), | | | ("drop-host-or-paste", re.compile( | | | r"pastebin|gist.githubusercontent|0x0.st|transfer.sh|anonfiles|" | | | r"file.io|ngrok|bit.ly|tinyurl", re.I)), | | | ("tmp-write-exec", re.compile(r"chmod\s++x[^\n]/tmp", re.I)), | | | ("tmp-unit", re.compile(r"(systemd|cron|timer)[^\n]/tmp", re.I)), | | | ("sudo", re.compile(r"\bsudo\b")), | | | ("long-base64-or-hex", re.compile(r"[A-Za-z0-9+/]{120,}={0,2}")), | | | # npm install / npx pulling packages at build or install time fetches | | | # and runs arbitrary remote code β€” flag it even for ostensibly non-JS | | | # packages (e.g. a "udev rules" pkg that npm-installs in a /tmp hook). | | | ("npm-install-exec", re.compile(r"\b(npm\s+(install|i|exec)|npx)\b", re.I)), | | | ] | | | # Match a brand keyword only as a whole token (bounded by start/end or a | | | # non-alphanumeric separator), so "tor" doesn't fire inside "aligator". | | | BRAND_RE = re.compile(rf"(?:^|[^a-z0-9])(?:{BRAND_KEYWORDS})(?:[^a-z0-9]|$)", re.I) | | | BIN_SUFFIX_RE = re.compile(r"-bin$", re.I) | | | PATCH_HINT_RE = re.compile(r"-(patch|fix|patched|mod|hotfix)\b", re.I) | | | # --- Data model ------------------------------------------------------------- | | | @dataclass | | | class Candidate: | | | pkgbase: str | | | name: str | | | maintainer: str | None | | | num_votes: int | | | popularity: float | | | first_submitted: int | | | last_modified: int | | | version: str | | | reason: str # "new" | "modified" | "since-hours" | | | signals: list[str] = field(default_factory=list) | | | priority: int = 0 | | | # --- State ------------------------------------------------------------------ | | | def load_state() -> dict: | | | if STATE_FILE.exists(): | | | try: | | | return json.loads(STATE_FILE.read_text()) | | | except (json.JSONDecodeError, OSError): | | | pass | | | # seen maps pkgbase -> {"version": str, "sha": str} | | | return {"last_run": 0, "etag": None, "seen": {}} | | | def save_state(state: dict) -> None: | | | STATE_DIR.mkdir(parents=True, exist_ok=True) | | | STATE_FILE.write_text(json.dumps(state, indent=2)) | | | # --- Fetch ------------------------------------------------------------------ | | | def session() -> requests.Session: | | | s = requests.Session() | | | s.headers["User-Agent"] = USER_AGENT | | | return s | | | def fetch_metadata(s: requests.Session, etag: str | None) -> tuple[list[dict] | None, str | None]: | | | """Return (entries, new_etag). entries is None if 304 Not Modified.""" | | | headers = {} | | | if etag: | | | headers["If-None-Match"] = etag | | | resp = s.get(META_URL, headers=headers, timeout=60) | | | if resp.status_code == 304: | | | return None, etag | | | resp.raise_for_status() | | | raw = gzip.decompress(resp.content) | | | return json.loads(raw), resp.headers.get("ETag", etag) | | | SHA_HEADER_RE = re.compile(r"^From ([0-9a-f]{7,40}) ", re.M) | | | def fetch_diff(s: requests.Session, pkgbase: str, since_sha: str | None | | | ) -> tuple[str, str | None]: | | | """Fetch the git diff for a package over cgit. | | | Returns (diff_text, new_head_sha). If since_sha is given, the diff spans | | | since_sha..HEAD; otherwise it's the latest commit (or, for a brand-new | | | package, its initial commit β€” full file contents as additions). | | | """ | | | params = {"h": pkgbase} | | | if since_sha: | | | params["id2"] = since_sha | | | resp = s.get(CGIT_PATCH_URL, params=params, timeout=60) | | | resp.raise_for_status() | | | text = resp.text | | | m = SHA_HEADER_RE.search(text) | | | new_sha = m.group(1) if m else None | | | return text, new_sha | | | # --- Candidate selection ---------------------------------------------------- | | | def select_candidates(entries: list[dict], last_run: int, since_cutoff: int | None) -> list[Candidate]: | | | # Lower bound a package must beat to qualify. last_run drives the | | | # incremental trigger; since_cutoff (if set) tightens it to a recent window. | | | cutoff = max(last_run, since_cutoff or 0) | | | cands: list[Candidate] = [] | | | for e in entries: | | | first = e.get("FirstSubmitted", 0) or 0 | | | modified = e.get("LastModified", 0) or 0 | | | is_new = first > cutoff | | | is_mod = modified > cutoff | | | if not (is_new or is_mod): | | | continue | | | reason = "new" if is_new else "modified" | | | cands.append(Candidate( | | | pkgbase=e.get("PackageBase") or e.get("Name", ""), | | | name=e.get("Name", ""), | | | maintainer=e.get("Maintainer"), | | | num_votes=e.get("NumVotes", 0) or 0, | | | popularity=float(e.get("Popularity", 0) or 0), | | | first_submitted=first, | | | last_modified=modified, | | | version=e.get("Version", ""), | | | reason=reason, | | | )) | | | return cands | | | def rank_metadata(c: Candidate) -> None: | | | """Apply metadata-only signals and a priority score (higher = check first).""" | | | prio = 0 | | | if c.reason == "new": | | | prio += 2 | | | if c.num_votes == 0: | | | prio += 1 | | | if c.popularity < 0.01: | | | prio += 1 | | | if BRAND_RE.search(c.name) and c.reason == "new": | | | c.signals.append("brand-name-new") | | | prio += 3 | | | if BIN_SUFFIX_RE.search(c.name) and PATCH_HINT_RE.search(c.name): | | | c.signals.append("bin-patch-suffix") | | | prio += 3 | | | c.priority = prio | | | def run_prefilter(text: str) -> list[str]: | | | return [name for name, rx in PREFILTER_RULES if rx.search(text)] | | | # --- Staging ---------------------------------------------------------------- | | | PROMPT_TEXT = """\ | | | You are a supply-chain security analyst reviewing the git diff of one Arch Linux | | | AUR package for malware. Judge ONLY the code in the diff β€” what it would do if | | | built or installed. The package's name, age, vote count, or popularity is NOT | | | evidence of anything; ignore it. For a brand-new package the diff is the initial | | | commit (the full PKGBUILD/scripts as + additions); otherwise it is just the | | | change. Focus on added lines (+). | | | Flag a diff only for concrete malicious behaviour: | | | - remote fetch-and-execute (curl|bash, bash <(curl), python -c fetching, | | | base64 -d | sh, eval of downloaded data) | | | - hardcoded IP:port / C2; sources from paste sites, raw gists, IP URLs, | | | file-drop hosts, URL shorteners, ngrok | | | - write + chmod +x + exec in /tmp; systemd unit / cron / timer pointing at | | | dropped files; sudo | | | - obfuscation (long base64/hex, \\x escapes) | | | - a binary for a well-known app sourced from a non-official domain | | | (typosquat / masquerade) | | | - installing/running unrelated packages (e.g. npm install/ npx) in build() | | | or an install hook | | | Most diffs are ordinary packaging and are clean. Do NOT downgrade a clean diff | | | to review just because the package is new, unpopular, or sparsely documented β€” | | | that is not a security signal. Legitimate -bin packages pulling from the | | | vendor's official release URL are clean. | | | Verdict scale (pick the lowest that fits): | | | - clean : nothing concerning in the diff. This is the common case. | | | - review : a specific line is genuinely ambiguous and a human should look β€” | | | not "could be anything", but "this exact thing might be bad". | | | - suspicious : a concrete pattern above is present and not clearly legitimate. | | | - malicious : clear fetch-and-execute, C2, or obfuscated payload. | | | Respond with ONLY a JSON object (no prose, no markdown fences). For a clean | | | diff, leave iocs/reasons/evidence empty: | | | { | | | "verdict": "clean | review | suspicious | malicious", | | | "confidence": 0.0, | | | "iocs": ["130.162.225.47:8080", "https://..."], | | | "reasons": ["one-line findings tied to a specific line"], | | | "evidence": [{"snippet": "exact line", "why": "what's wrong"}] | | | } | | | """ | | | # JSON schema for the verdict, used to grammar-constrain the local llama-server | | | # so it can only emit schema-valid JSON β€” small quantized models otherwise tend | | | # to ignore the "respond with JSON" instruction and parrot the diff or loop. | | | VERDICT_SCHEMA = { | | | "type": "object", | | | "properties": { | | | "verdict": {"type": "string", "enum": VERDICT_ORDER}, | | | "confidence": {"type": "number"}, | | | "iocs": {"type": "array", "items": {"type": "string"}}, | | | "reasons": {"type": "array", "items": {"type": "string"}}, | | | "evidence": { | | | "type": "array", | | | "items": { | | | "type": "object", | | | "properties": { | | | "snippet": {"type": "string"}, | | | "why": {"type": "string"}, | | | }, | | | "required": ["snippet", "why"], | | | }, | | | }, | | | }, | | | "required": ["verdict", "confidence", "reasons"], | | | } | | | def stage(out_dir: Path, candidate: Candidate, diff: str, new_sha: str | None) -> None: | | | cdir = out_dir / candidate.pkgbase | | | cdir.mkdir(parents=True, exist_ok=True) | | | (cdir / "change.diff").write_text(diff) | | | meta = { | | | "pkgbase": candidate.pkgbase, | | | "name": candidate.name, | | | "maintainer": candidate.maintainer, | | | "num_votes": candidate.num_votes, | | | "popularity": candidate.popularity, | | | "first_submitted": iso(candidate.first_submitted), | | | "last_modified": iso(candidate.last_modified), | | | "version": candidate.version, | | | "reason": candidate.reason, | | | "head_sha": new_sha, | | | "prefilter_signals": candidate.signals, | | | "priority": candidate.priority, | | | "aur_page": PKG_PAGE_URL.format(name=candidate.name), | | | } | | | (cdir / "meta.json").write_text(json.dumps(meta, indent=2)) | | | def iso(ts: int) -> str: | | | if not ts: | | | return "" | | | return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() | | | # --- Classification (claude -p) -------------------------------------------- | | | def _extract_json(text: str) -> dict | None: | | | """Pull the first JSON object out of claude's stdout, fences or not.""" | | | text = text.strip() | | | fence = re.search(r"(?:json)?\s*(\{.*?\})\s*", text, re.S) | | | if fence: | | | text = fence.group(1) | | | start = text.find("{") | | | if start == -1: | | | return None | | | depth = 0 | | | for i in range(start, len(text)): | | | if text[i] == "{": | | | depth += 1 | | | elif text[i] == "}": | | | depth -= 1 | | | if depth == 0: | | | try: | | | return json.loads(text[start:i + 1]) | | | except json.JSONDecodeError: | | | return None | | | return None | | | def match_known_bad(diff: str) -> list[str]: | | | """Return any known-bad IOCs present in the diff (case-insensitive).""" | | | low = diff.lower() | | | return [ioc for ioc in KNOWN_BAD_IOCS if ioc.lower() in low] | | | def trim_diff(diff: str) -> str: | | | """Shrink a unified diff for classification, keeping the signal. | | | Removes unchanged context lines (those starting with a single space), | | | keeping only added/removed lines plus the structural headers that say which | | | file and hunk a change belongs to. The full diff stays on disk untouched β€” | | | this only affects what we hand to Claude. | | | """ | | | out: list[str] = [] | | | for line in diff.splitlines(): | | | # Drop unchanged context lines (single leading space). Keep everything | | | # else: headers, hunk markers (@@), file markers (+++/---), and the | | | # actual +/- changes. | | | if line.startswith(" "): | | | continue | | | out.append(line) | | | return "\n".join(out) + "\n" | | | def build_payload(name: str, diff: str) -> str: | | | """The model-facing payload. PROMPT_TEXT comes first and is byte-identical | | | across candidates, so a prefix-caching backend (claude -p, or llama-server | | | with cache_prompt) reuses it and only reprocesses the per-candidate tail. | | | We send only the package name (so the model can spot typosquats), not the | | | reputation metadata (votes/popularity/dates) β€” that biased the model into | | | downgrading clean diffs to "review". The staged change.diff stays full; only | | | what we hand the model is trimmed. | | | """ | | | return ( | | | f"{PROMPT_TEXT}\n\n" | | | f"--- package name ---\n{name}\n\n" | | | f"--- git diff (unchanged context lines removed) ---\n{trim_diff(diff)}\n" | | | ) | | | def call_claude(payload: str, model: str, timeout: int) -> str: | | | """Run claude -p; return raw stdout. Raises ClassifyError on failure.""" | | | cmd = ["claude", "-p", "--model", model] | | | try: | | | proc = subprocess.run( | | | cmd, input=payload, capture_output=True, text=True, timeout=timeout) | | | except FileNotFoundError: | | | raise ClassifyError("claude CLI not found on PATH") | | | except subprocess.TimeoutExpired: | | | raise ClassifyError(f"claude timed out after {timeout}s") | | | if proc.returncode != 0: | | | err = (proc.stderr or proc.stdout or "").strip().splitlines() | | | raise ClassifyError(f"claude exited {proc.returncode}: {err[-1] if err else '?'}") | | | return proc.stdout | | | def call_llama(payload: str, url: str, timeout: int) -> str: | | | """POST to a llama-server /completion endpoint; return generated text. | | | cache_prompt: true tells llama-server to keep the KV cache for the shared | | | prefix (PROMPT_TEXT) across requests, so repeat candidates only reprocess | | | their diff. json_schema grammar-constrains the output so the model can | | | only emit schema-valid JSON β€” small quantized models otherwise tend to | | | ignore the instruction, parrot the diff, and loop. Run the server with e.g.: | | | taskset -c 0,2,4,6,8,10 llama-server \ | | | -m ~/models/gemma4-26b/gemma-4-26B-A4B-it-qat-UD-Q4_K_XL.gguf \ | | | -t 6 -c 8192 --swa-full --cache-reuse 256 --host 127.0.0.1 --port 8080 | | | """ | | | body = { | | | "prompt": payload, | | | "cache_prompt": True, # reuse KV cache for the shared PROMPT_TEXT prefix | | | "json_schema": VERDICT_SCHEMA, # force schema-valid JSON output | | | "temperature": 0, | | | "n_predict": 512, | | | } | | | try: | | | resp = requests.post(url.rstrip("/") + "/completion", json=body, timeout=timeout) | | | resp.raise_for_status() | | | except requests.RequestException as exc: | | | raise ClassifyError(f"llama-server request failed: {exc}") | | | return resp.json().get("content", "") | | | class ClassifyError(Exception): | | | """A backend call failed; the candidate gets an "error" verdict.""" | | | def classify(cdir: Path, backend: str, model: str, llama_url: str, | | | timeout: int = 180) -> dict: | | | """Classify one staged candidate folder; return a verdict dict. | | | A diff containing a known-bad IOC is marked malicious immediately, with no | | | model call. Otherwise the configured backend (claude | llama) judges the | | | diff. On any failure we return a verdict of "error" so the caller flags it | | | for manual review rather than silently dropping it. Pre-filter signals are | | | carried onto the verdict so they stay visible in the summary even when the | | | model rates the diff clean. | | | """ | | | meta = json.loads((cdir / "meta.json").read_text()) | | | diff = (cdir / "change.diff").read_text() | | | signals = meta.get("prefilter_signals") or [] | | | hits = match_known_bad(diff) | | | if hits: | | | return { | | | "verdict": "malicious", | | | "confidence": 1.0, | | | "iocs": hits, | | | "reasons": [f"known-bad indicator in diff: {', '.join(hits)}"], | | | "evidence": [], | | | "source": "known-bad-ioc", | | | "prefilter_signals": signals, | | | } | | | payload = build_payload(meta.get("name", cdir.name), diff) | | | try: | | | if backend == "llama": | | | raw = call_llama(payload, llama_url, timeout) | | | else: | | | raw = call_claude(payload, model, timeout) | | | except ClassifyError as exc: | | | return {"verdict": "error", "reasons": [str(exc)], "prefilter_signals": signals} | | | verdict = _extract_json(raw) | | | if verdict is None: | | | # Keep the full model output so the failure is debuggable: it lands in | | | # the candidate's verdict.json, and a snippet shows in the summary. | | | snippet = " ".join(raw.split())[:200] | | | return {"verdict": "error", | | | "reasons": [f"could not parse JSON from {backend} output: {snippet}" | | | if snippet else f"{backend} returned empty output"], | | | "raw_output": raw, | | | "prefilter_signals": signals} | | | verdict.setdefault("verdict", "error") | | | if verdict["verdict"] not in VERDICT_ORDER: | | | verdict.setdefault("reasons", []).append( | | | f"unexpected verdict value: {verdict['verdict']!r}") | | | verdict["verdict"] = "error" | | | verdict["prefilter_signals"] = signals | | | return verdict | | | def is_bad(verdict: str) -> bool: | | | return VERDICT_RANK.get(verdict, -1) >= VERDICT_RANK[ALERT_THRESHOLD] | | | def summarize_and_alert(results: list[dict]) -> bool: | | | """Print a verdict table sorted worst-first; ring the bell on bad ones. | | | results is a list of {pkgbase, verdict, reasons, ...}. Returns True if | | | anything is at or above the alert threshold (suspicious/malicious). | | | """ | | | if not results: | | | print("No new candidates this cycle.", file=sys.stderr) | | | return False | | | def sort_key(r: dict) -> tuple[int, str]: | | | # errors sort just below malicious so they're visible at the top | | | rank = VERDICT_RANK.get(r["verdict"], len(VERDICT_ORDER)) | | | return (-rank, r["pkgbase"]) | | | results = sorted(results, key=sort_key) | | | bad = [r for r in results if is_bad(r["verdict"])] | | | errors = [r for r in results if r["verdict"] == "error"] | | | # Candidates the offline regex pre-filter flagged but Claude did not rate | | | # bad β€” easy to overlook, so call them out separately. | | | flagged_clean = [r for r in results | | | if r.get("prefilter_signals") and not is_bad(r["verdict"]) | | | and r["verdict"] != "error"] | | | print("\n=== verdict summary ===", file=sys.stderr) | | | for r in results: | | | sig = r.get("prefilter_signals") or [] | | | if is_bad(r["verdict"]): | | | mark = "!!" | | | elif r["verdict"] == "error": | | | mark = "??" | | | elif sig: | | | mark = "* " # clean-ish, but regex flagged it | | | else: | | | mark = " " | | | note = "; ".join(r.get("reasons") or []) | | | if sig: | | | note = (note + " " if note else "") + f"[regex: {','.join(sig)}]" | | | print(f" {mark} {r['verdict']:10s} {r['pkgbase']:40s} {note[:120]}", file=sys.stderr) | | | if bad: | | | # Ring the terminal bell once per bad candidate (capped), and shout. | | | sys.stderr.write("\a" * min(len(bad), 5)) | | | sys.stderr.flush() | | | names = ", ".join(r["pkgbase"] for r in bad) | | | print(f"\n*** ALERT: {len(bad)} candidate(s) >= {ALERT_THRESHOLD}: {names} **", | | | file=sys.stderr) | | | if flagged_clean: | | | names = ", ".join(r["pkgbase"] for r in flagged_clean) | | | print(f"() {len(flagged_clean)} candidate(s) regex-flagged but rated " | | | f"{'/'.join(sorted({r['verdict'] for r in flagged_clean}))} β€” worth a glance: " | | | f"{names}", file=sys.stderr) | | | if errors: | | | print(f"({len(errors)} candidate(s) could not be classified β€” review manually)", | | | file=sys.stderr) | | | if not bad and not errors and not flagged_clean: | | | print("All clear.", file=sys.stderr) | | | return bool(bad) | | | # --- Pipeline --------------------------------------------------------------- | | | def run_once(args) -> list[Path]: | | | """Fetch metadata, select + stage candidates. Returns staged folder paths.""" | | | state = {"last_run": 0, "etag": None, "seen": {}} if args.no_state else load_state() | | | last_run = state.get("last_run", 0) | | | now = int(time.time()) | | | since_cutoff = (now - args.since_hours * 3600) if args.since_hours else None | | | if last_run == 0 and since_cutoff is None: | | | # First run with no window would select the entire AUR. Default to 24h. | | | since_cutoff = now - 24 * 3600 | | | print("First run and no --since-hours: defaulting to last 24h.", file=sys.stderr) | | | s = session() | | | print("Fetching metadata archive...", file=sys.stderr) | | | # --force skips the stored ETag so the server can't 304 us, while still | | | # using/updating last_run and seen (unlike --no-state). | | | etag = None if args.force else state.get("etag") | | | entries, new_etag = fetch_metadata(s, etag) | | | if entries is None: | | | print("Metadata unchanged since last run (304); use --force to refetch.", | | | file=sys.stderr) | | | return [] | | | print(f" {len(entries)} packages in archive.", file=sys.stderr) | | | cands = select_candidates(entries, last_run, since_cutoff) | | | for c in cands: | | | rank_metadata(c) | | | cands.sort(key=lambda c: c.priority, reverse=True) | | | print(f"Selected {len(cands)} candidate(s) (new/modified/since-window).", file=sys.stderr) | | | if args.dry_run: | | | for c in cands[:args.max_candidates]: | | | print(f" [{c.priority:2d}] {c.reason:9s} {c.pkgbase} " | | | f"votes={c.num_votes} pop={c.popularity:.3f} signals={c.signals}") | | | return [] | | | args.out.mkdir(parents=True, exist_ok=True) | | | (args.out / "PROMPT.md").write_text(PROMPT_TEXT) | | | seen = state.get("seen", {}) | | | # Candidates whose diff we still need (skip already-seen exact versions). | | | todo: list[Candidate] = [] | | | for c in cands[:args.max_candidates]: | | | prev = seen.get(c.pkgbase) or {} | | | if prev.get("version") == c.version and not args.no_state: | | | continue | | | todo.append(c) | | | def fetch(c: Candidate) -> tuple[Candidate, str | None, str | None, str | None]: | | | """Worker: fetch one diff. Returns (candidate, diff, sha, error).""" | | | prev = seen.get(c.pkgbase) or {} | | | since_sha = None if args.no_state else prev.get("sha") | | | try: | | | diff, new_sha = fetch_diff(s, c.pkgbase, since_sha) | | | return c, diff, new_sha, None | | | except requests.RequestException as exc: | | | return c, None, None, str(exc) | | | # Fetch diffs concurrently (bounded). The pool size is the politeness knob β€” | | | # a handful of parallel requests, not a flood. | | | print(f"Fetching {len(todo)} diff(s) with {args.workers} worker(s)...", file=sys.stderr) | | | fetched: list[tuple[Candidate, str | None, str | None, str | None]] = [] | | | if todo: | | | with ThreadPoolExecutor(max_workers=max(1, args.workers)) as pool: | | | for res in pool.map(fetch, todo): | | | fetched.append(res) | | | # Process results sequentially: prefilter, stage, index. Preserve priority | | | # order (todo was already sorted; pool.map keeps input order). | | | index: list[dict] = [] | | | staged_dirs: list[Path] = [] | | | flagged = 0 | | | for c, diff, new_sha, err in fetched: | | | if err is not None: | | | print(f" ! {c.pkgbase}: diff fetch failed: {err}", file=sys.stderr) | | | continue | | | if len(diff.encode("utf-8", "replace")) > MAX_DIFF_BYTES: | | | diff = diff[:MAX_DIFF_BYTES] + "\n\n[... diff truncated ...]\n" | | | c.signals.extend(run_prefilter(diff)) | | | if c.signals: | | | flagged += 1 | | | stage(args.out, c, diff, new_sha) | | | staged_dirs.append(args.out / c.pkgbase) | | | seen[c.pkgbase] = {"version": c.version, "sha": new_sha} | | | index.append({ | | | "pkgbase": c.pkgbase, | | | "name": c.name, | | | "priority": c.priority, | | | "reason": c.reason, | | | "signals": c.signals, | | | }) | | | index.sort(key=lambda d: (bool(d["signals"]), d["priority"]), reverse=True) | | | (args.out / "candidates.json").write_text(json.dumps(index, indent=2)) | | | state["last_run"] = now | | | state["etag"] = new_etag | | | state["seen"] = seen | | | if not args.no_state: | | | save_state(state) | | | print(f"Staged {len(staged_dirs)} candidate(s) into {args.out}/ " | | | f"({flagged} with pre-filter hits).", file=sys.stderr) | | | return staged_dirs | | | def classify_staged(staged_dirs: list[Path], backend: str, model: str, | | | llama_url: str) -> list[dict]: | | | """Classify each staged folder with the chosen backend; write verdict.json.""" | | | results: list[dict] = [] | | | for i, cdir in enumerate(staged_dirs, 1): | | | pkgbase = cdir.name | | | print(f" [{i}/{len(staged_dirs)}] classifying {pkgbase} ...", file=sys.stderr) | | | verdict = classify(cdir, backend, model, llama_url) | | | verdict["pkgbase"] = pkgbase | | | (cdir / "verdict.json").write_text(json.dumps(verdict, indent=2)) | | | results.append(verdict) | | | return results | | | # --- Main ------------------------------------------------------------------- | | | def cycle(args) -> bool: | | | """One full pass: fetch -> stage -> classify -> summarize. Returns True if bad.""" | | | staged = run_once(args) | | | if args.dry_run: | | | return False | | | if not staged: | | | print("No new candidates to classify.", file=sys.stderr) | | | return False | | | if args.no_classify: | | | print(f"\nStaged only (--no-classify). Point Claude at {args.out}/:\n" | | | f" cd {args.out} && claude -p "$(cat PROMPT.md)"", file=sys.stderr) | | | return False | | | results = classify_staged(staged, args.backend, args.model, args.llama_url) | | | return summarize_and_alert(results) | | | def main(argv: list[str] | None = None) -> int: | | | ap = argparse.ArgumentParser(description=doc, | | | formatter_class=argparse.RawDescriptionHelpFormatter) | | | ap.add_argument("--out", type=Path, default=Path("staging"), | | | help="staging directory for candidate folders (default: ./staging)") | | | ap.add_argument("--since-hours", type=float, default=None, | | | help="also include packages created/modified within the last N hours") | | | ap.add_argument("--max-candidates", type=int, default=200, | | | help="cap on candidates to fetch diffs for (default: 200)") | | | ap.add_argument("--workers", type=int, default=4, | | | help="concurrent diff downloads (default: 4; keep it modest " | | | "to be polite to the AUR)") | | | ap.add_argument("--backend", choices=("claude", "llama"), default="claude", | | | help="classifier backend: claude -p, or a local llama-server (default: claude)") | | | ap.add_argument("--model", default="claude-haiku-4-5", | | | help="model for the claude backend (default: claude-haiku-4-5)") | | | ap.add_argument("--llama-url", default="http://127.0.0.1:8080", | | | help="llama-server base URL for the llama backend " | | | "(default: http://127.0.0.1:8080)") | | | ap.add_argument("--no-classify", action="store_true", | | | help="stage only; don't classify (review manually later)") | | | ap.add_argument("--loop", action="store_true", | | | help="run continuously, sleeping --interval-hours between cycles") | | | ap.add_argument("--interval-hours", type=float, default=1.0, | | | help="hours to sleep between cycles in --loop mode (default: 1)") | | | ap.add_argument("--no-state", action="store_true", | | | help="ignore/don't update state; diff against latest commit only") | | | ap.add_argument("--force", action="store_true", | | | help="skip the stored ETag and refetch metadata even if " | | | "unchanged (still uses/updates last_run and seen)") | | | ap.add_argument("--dry-run", action="store_true", | | | help="select + rank from metadata only; don't fetch diffs or stage") | | | args = ap.parse_args(argv) | | | if (not args.no_classify and not args.dry_run and args.backend == "claude" | | | and shutil.which("claude") is None): | | | print("warning: claude not found on PATH β€” classification will report " | | | "errors. Use --no-classify to stage only, or --backend llama.", | | | file=sys.stderr) | | | if not args.loop: | | | bad = cycle(args) | | | # Non-zero exit if anything is suspicious/malicious (for cron/alerting). | | | return 2 if bad else 0 | | | interval = args.interval_hours * 3600 | | | print(f"Loop mode: cycle every {args.interval_hours}h. Ctrl-C to stop.", file=sys.stderr) | | | while True: | | | ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | | | print(f"\n========== cycle @ {ts} ==========", file=sys.stderr) | | | try: | | | cycle(args) | | | except requests.RequestException as exc: | | | print(f"cycle failed (network): {exc} β€” retrying next interval", file=sys.stderr) | | | except Exception as exc: # keep the loop alive across unexpected errors | | | print(f"cycle failed: {exc!r} β€” retrying next interval", file=sys.stderr) | | | try: | | | time.sleep(interval) | | | except KeyboardInterrupt: | | | print("\nStopped.", file=sys.stderr) | | | return 0 | | | if name == "main": | | | raise SystemExit(main()) |

── more in #developer-tools 4 stories Β· sorted by recency
sponsored brought to you by zahid.host 4,200+ EU-deployed projects
reading about agents? ship yours in a single git push.

Run your AI side-project on zahid.host

EU-based hosting, git-push deploys, automatic HTTPS, no cold starts. Free tier with a custom domain β€” perfect for shipping the agent you just read about.

$git push zahid main
β†’ Live at https://your-agent.zahid.host βœ“
Get free account β†’ Pricing
from €0/mo Β· no card required
LIVE [news/gist-6a5e632583c67da…] indexed:0 read:26min 2026-06-13 Β· β€”