#!/usr/bin/env python3 """ Company Due Diligence Report Datasets used: sec.edgar fec.contributions sanctions.entities fred.series Cross-dataset value: SEC EDGAR gives you audited financials — revenue trend, profitability, debt load. FEC contributions expose political donation patterns by company insiders/PACs — who they're funding, which committees, which cycles. Sanctions screening catches counterparty risk that financial statements never show. FRED provides the macro backdrop that makes financial trends interpretable — interest rate environment, GDP growth, inflation context for the reporting periods. Together they answer: "Is this company financially healthy, politically exposed, compliance-clean, and operating in a favorable macro environment?" Usage: export MICROQUERY_TOKEN=your_token python3 company_diligence.py --company "Apple" --ticker AAPL python3 company_diligence.py --company "Lockheed Martin" --ticker LMT """ import argparse import os import sys from concurrent.futures import ThreadPoolExecutor, as_completed from client import MicroqueryClient, QueryError def header(title: str): print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}") def section(title: str): print(f"\n--- {title} ---") def run(company: str, ticker: "str | None", mq: MicroqueryClient): name_pat = company.replace("'", ".") header(f"DUE DILIGENCE: {company.upper()}") rev_concepts = [ "RevenueFromContractWithCustomerExcludingAssessedTax", "Revenues", "SalesRevenueNet", ] metric_concepts = [ "NetIncomeLoss", "LongTermDebt", "CashAndCashEquivalentsAtCarryingValue", ] macro_series = { "DFF": "Fed Funds Rate (%)", "CPIAUCSL": "CPI (index)", "GDP": "GDP ($B)", "UNRATE": "Unemployment (%)", } with ThreadPoolExecutor(max_workers=8) as ex: # ------------------------------------------------------------------ # # Fire the large independent queries in the background while the # # SEC section (which has internal sequencing on CIK) runs normally. # # Sanctions and FRED are tiny so they also fire immediately. # # ------------------------------------------------------------------ # f_fec = ex.submit(mq.query, "fec", f""" SELECT cycle, cmte_type, SUM(transaction_amt) AS total, COUNT(*) AS contributions FROM contributions WHERE employer ~ '(?i){name_pat}' AND cycle >= 2022 GROUP BY cycle, cmte_type ORDER BY cycle DESC, total DESC LIMIT 20 """, verbose=True) f_sanctions = ex.submit(mq.query, "sanctions", f""" SELECT name, type, source, programs, start_date, addresses FROM entities WHERE name ~ '(?i){name_pat}' LIMIT 10 """, verbose=True) f_alt = ex.submit(mq.query, "sanctions", f""" SELECT name, type, source, programs FROM entities, UNNEST(alt_names) AS alt WHERE alt ~ '(?i){name_pat}' LIMIT 5 """, verbose=True) f_fred = {sid: ex.submit(mq.query, "fred", f""" SELECT series_id, obs_date, "value" AS val FROM series WHERE series_id = '{sid}' AND obs_year >= 2020 AND "value" != '.' ORDER BY obs_date DESC LIMIT 16 """, verbose=True) for sid in macro_series} # ------------------------------------------------------------------ # # SEC section runs sequentially (CIK dependency chain), sharing # # snellerd capacity with the background queries above. # # ------------------------------------------------------------------ # entity_rows = mq.query("sec", f""" SELECT company, cik, COUNT(*) AS filings FROM edgar WHERE company ~ '(?i){name_pat}' GROUP BY company, cik ORDER BY filings DESC LIMIT 5 """, verbose=True) rev_rows = [] metrics = [] cik = None if entity_rows: cik = entity_rows[0]["cik"] # Single scan for all three metrics — one pass instead of three. metric_in = ", ".join(f"'{c}'" for c in metric_concepts) f_metrics = ex.submit(mq.query, "sec", f""" SELECT concept, val, "end" AS period FROM edgar WHERE cik = {cik} AND concept IN ({metric_in}) AND form = '10-K' AND fp = 'FY' ORDER BY period DESC LIMIT 30 """, verbose=True) # Single scan for all revenue concept variants — fire alongside # f_metrics so both run in parallel. rev_in = ", ".join(f"'{c}'" for c in rev_concepts) f_rev = ex.submit(mq.query, "sec", f""" SELECT "end" AS period, val AS revenue, concept FROM edgar WHERE cik = {cik} AND concept IN ({rev_in}) AND form = '10-K' AND fp = 'FY' ORDER BY period DESC LIMIT {6 * len(rev_concepts)} """, verbose=True) metrics = f_metrics.result() rev_rows_all = f_rev.result() for concept in rev_concepts: rows = [r for r in rev_rows_all if r["concept"] == concept] if rows: rev_rows = rows break # ------------------------------------------------------------------ # # Collect background results (likely already done by now) # # ------------------------------------------------------------------ # fec_rows = f_fec.result() sanctions_rows = f_sanctions.result() alt_rows = f_alt.result() fred_results = {sid: f.result() for sid, f in f_fred.items()} # ------------------------------------------------------------------ # # Print results in logical order # # ------------------------------------------------------------------ # section("1. Financial Health (SEC EDGAR)") if entity_rows: best = entity_rows[0] print(f" Entity: {best['company']} CIK: {cik}") if rev_rows: seen = set() deduped = [] for r in rev_rows: if r["period"] not in seen: seen.add(r["period"]) deduped.append(r) print(f" Annual revenue trend (10-K):") for r in deduped[:5]: rev_b = r["revenue"] / 1e9 if r["revenue"] else 0 print(f" {r['period'][:4]} ${rev_b:>8.1f}B") labels = { "NetIncomeLoss": "Net income", "LongTermDebt": "Long-term debt", "CashAndCashEquivalentsAtCarryingValue": "Cash", } for concept, label in labels.items(): vals = [r for r in metrics if r["concept"] == concept] if vals: latest = vals[0] v_b = latest["val"] / 1e9 if latest["val"] else 0 print(f" {label} ({latest['period'][:4]}): ${v_b:.1f}B") else: print(f" No SEC EDGAR records found for '{company}'") section("2. Political Exposure (FEC Campaign Finance)") if fec_rows: cycles: dict = {} for r in fec_rows: cycles.setdefault(r["cycle"], []).append(r) print(f" Contributions from employees listing employer as '{company}':") for cycle in sorted(cycles.keys(), reverse=True)[:5]: total_cycle = sum(r["total"] for r in cycles[cycle]) contribs = sum(r["contributions"] for r in cycles[cycle]) top_types = sorted(cycles[cycle], key=lambda x: -x["total"])[:3] type_str = " ".join(f"{r['cmte_type']}:${r['total']/1e3:.0f}k" for r in top_types) print(f" {cycle} ${total_cycle/1e3:.0f}k ({contribs} contributions) {type_str}") else: print(f" No FEC contribution records found for '{company}'") section("3. Sanctions Screening (U.S. CSL)") if sanctions_rows: print(f" ⚠ {len(sanctions_rows)} match(es) found on sanctions lists:") for r in sanctions_rows: progs = ", ".join(r.get("programs") or []) print(f" {r['name']} [{r['source'][:40]}] {progs}") else: print(f" ✓ No matches on U.S. Consolidated Screening List") if alt_rows: print(f" ⚠ {len(alt_rows)} match(es) via alias/alternate name:") for r in alt_rows: print(f" {r['name']} [{r['source'][:40]}]") section("4. Macro Context (FRED — recent quarters)") for sid, label in macro_series.items(): vals = fred_results.get(sid, []) if vals: latest = vals[0] prev = vals[min(3, len(vals) - 1)] trend = "" try: lv = float(latest["val"]) pv = float(prev["val"]) delta = lv - pv trend = f" ({'+' if delta >= 0 else ''}{delta:.2f} vs prior)" except (TypeError, ValueError): lv = None if lv is not None: print(f" {label:<30} {lv:>8.2f} [{latest['obs_date'][:7]}]{trend}") header("SUMMARY") print(mq.cost_summary()) def main(): parser = argparse.ArgumentParser(description="Company due diligence report") parser.add_argument("--company", required=True, help="Company name (e.g. 'Apple')") parser.add_argument("--ticker", default=None, help="Ticker symbol (optional)") parser.add_argument("--token", default=os.environ.get("MICROQUERY_TOKEN"), help="Microquery API token (or set MICROQUERY_TOKEN)") args = parser.parse_args() if not args.token: print("Error: set MICROQUERY_TOKEN or pass --token", file=sys.stderr) sys.exit(1) mq = MicroqueryClient(api_key=args.token) try: run(args.company, args.ticker, mq) except QueryError as e: print(f"Query error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()