#!/usr/bin/env python3 """ Software Dependency Security Audit Datasets used: nvd.cve osv.advisories eol.cycles Cross-dataset value: NVD/CVE gives you the official CVSS-scored vulnerability record — severity, attack vector, scope, and affected version ranges. OSV maps the same vulnerabilities directly to package ecosystem version strings (npm, PyPI, Go, Maven, Rust) — answering "is THIS exact version affected?" EOL.date tells you whether the runtime itself is end-of-life, which means no future security patches regardless of current CVE exposure. Together they answer: "For this dependency list, what is vulnerable right now, how severe is it, is there a fixed version, and is the underlying runtime still receiving security patches?" Usage: export MICROQUERY_TOKEN=your_token # Check a specific package python3 security_audit.py --package requests --ecosystem PyPI --version 2.28.0 # Check a runtime version python3 security_audit.py --runtime python --cycle 3.8 # Check multiple packages from a requirements-style list python3 security_audit.py --file requirements.txt """ import argparse import os import re import sys from client import MicroqueryClient, QueryError SEVERITY_EMOJI = {"CRITICAL": "🔴", "HIGH": "🟠", "MEDIUM": "🟡", "LOW": "🟢"} SEVERITY_ORDER = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1, "": 0} def header(title: str): print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}") def section(title: str): print(f"\n--- {title} ---") def parse_requirements(path: str) -> list[tuple[str, str]]: """Parse a pip requirements file into (package, version) pairs.""" results = [] with open(path) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue m = re.match(r"^([A-Za-z0-9_\-\.]+)==([^\s;]+)", line) if m: results.append((m.group(1), m.group(2))) return results def audit_package(pkg: str, ecosystem: str, version: "str | None", mq: MicroqueryClient): pkg_pat = pkg.replace("-", "[_\\-]") # ------------------------------------------------------------------ # # OSV — ecosystem-aware vulnerability lookup # # ------------------------------------------------------------------ # section(f"OSV Advisories — {ecosystem}/{pkg} {version or '(any version)'}") # advisories is denormalized: one row per affected package/ecosystem # severity_score is a CVSS vector string for CVSS_V3 advisories osv_rows = mq.query("osv", f""" SELECT id, summary, severity_score, aliases, fixed, published FROM advisories WHERE package_name = '{pkg}' AND ecosystem = '{ecosystem}' ORDER BY published DESC LIMIT 15 """, verbose=True) if osv_rows: print(f" {len(osv_rows)} advisories for '{pkg}' in {ecosystem}:") for r in osv_rows: aliases = ", ".join((r.get("aliases") or [])[:2]) fixed = r.get("fixed") or "—" summary = (r.get("summary") or "")[:50] print(f" {r['id']:<30} fixed:{str(fixed):<12} {summary}") if aliases: print(f" aliases: {aliases}") else: print(f" ✓ No OSV advisories found for '{pkg}' in {ecosystem}") return # ------------------------------------------------------------------ # # NVD — CVSS scores for the CVE aliases # # ------------------------------------------------------------------ # section(f"NVD/CVE Detail — CVSS scores") # Collect CVE IDs from OSV aliases cve_ids = [] for r in osv_rows: for alias in (r.get("aliases") or []): if alias.startswith("CVE-"): cve_ids.append(alias) if cve_ids: cve_list = ", ".join(f"'{c}'" for c in cve_ids[:20]) # NVD schema: nested metrics; use array paths for CVSS data nvd_rows = mq.query("nvd", f""" SELECT id, metrics.cvssMetricV31[0].cvssData.baseScore AS cvss_score, metrics.cvssMetricV31[0].cvssData.baseSeverity AS severity, metrics.cvssMetricV31[0].cvssData.vectorString AS vector, descriptions[0].value AS description FROM cve WHERE id IN ({cve_list}) ORDER BY cvss_score DESC """, verbose=True) if nvd_rows: for r in nvd_rows: score = r.get("cvss_score") or 0 sev = r.get("severity") or "?" icon = SEVERITY_EMOJI.get(sev.upper(), "⚪") desc = (r.get("description") or "")[:70] vector = r.get("vector") or "" print(f" {icon} {r['id']:<20} CVSS {score:<4} {sev:<8} {desc}") av_match = re.search(r"AV:([NLA])", vector) if av_match: av_label = {"N": "NETWORK", "L": "LOCAL", "A": "ADJACENT"}.get(av_match.group(1), "") print(f" Attack vector: {av_label}") else: print(" No NVD records found for these CVEs") else: print(" No CVE aliases found in OSV advisories") def audit_runtime(runtime: str, cycle: str, mq: MicroqueryClient): # ------------------------------------------------------------------ # # EOL — is this runtime version still receiving security patches? # # ------------------------------------------------------------------ # section(f"End-of-Life Status — {runtime} {cycle}") eol_rows = mq.query("eol", f""" SELECT product, cycle, eol, lts, support, "latest", latest_date FROM cycles WHERE product = '{runtime}' AND cycle = '{cycle}' LIMIT 1 """, verbose=True) if eol_rows: r = eol_rows[0] eol_val = r.get("eol") lts_val = r.get("lts") if eol_val in (True, "true"): print(f" ⛔ {runtime} {cycle} is END-OF-LIFE (no date recorded)") elif eol_val and eol_val not in (False, "false"): from datetime import date try: eol_date = date.fromisoformat(eol_val[:10]) today = date.today() if eol_date < today: days_past = (today - eol_date).days print(f" ⛔ {runtime} {cycle} reached EOL {eol_val} ({days_past} days ago)") else: days_left = (eol_date - today).days print(f" ⚠ {runtime} {cycle} EOL in {days_left} days ({eol_val})") except ValueError: print(f" EOL: {eol_val}") else: print(f" ✓ {runtime} {cycle} is still supported (EOL: {eol_val})") if lts_val and lts_val not in (False, "false"): print(f" LTS: {lts_val}") print(f" Latest patch: {r.get('latest')} ({r.get('latest_date')})") # Check if they're running the latest patch else: # List available cycles all_cycles = mq.query("eol", f""" SELECT cycle, eol, lts FROM cycles WHERE product = '{runtime}' LIMIT 10 """, verbose=True) if all_cycles: print(f" '{cycle}' not found. Available cycles for {runtime}:") for r in all_cycles: print(f" {r['cycle']:<10} eol={r['eol']}") else: print(f" Product '{runtime}' not found in EOL database") def main(): parser = argparse.ArgumentParser(description="Software dependency security audit") parser.add_argument("--package", help="Package name (e.g. requests)") parser.add_argument("--ecosystem", default="PyPI", help="Ecosystem (PyPI, npm, Go, Maven, crates.io)") parser.add_argument("--version", help="Package version (e.g. 2.28.0)") parser.add_argument("--runtime", help="Runtime product slug (e.g. python, nodejs)") parser.add_argument("--cycle", help="Runtime cycle (e.g. 3.8, 20.04)") parser.add_argument("--file", help="requirements.txt to audit") parser.add_argument("--token", default=os.environ.get("MICROQUERY_TOKEN"), help="Microquery API token (or set MICROQUERY_TOKEN)") args = parser.parse_args() if not args.token: print("Error: set MICROQUERY_TOKEN or pass --token", file=sys.stderr) sys.exit(1) if not any([args.package, args.runtime, args.file]): parser.error("Provide at least one of --package, --runtime, or --file") mq = MicroqueryClient(api_key=args.token) try: header("SOFTWARE SECURITY AUDIT") if args.file: packages = parse_requirements(args.file) if not packages: print(f"No pinned packages found in {args.file}") for pkg, ver in packages: audit_package(pkg, args.ecosystem, ver, mq) if args.package: audit_package(args.package, args.ecosystem, args.version, mq) if args.runtime and args.cycle: audit_runtime(args.runtime, args.cycle, mq) header("SUMMARY") print(mq.cost_summary()) except QueryError as e: print(f"Query error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()