#!/usr/bin/env python3 """ U.S. Macro Economy Dashboard Datasets used: fred.series fred.catalog Five-section snapshot of the U.S. economy pulled from the Federal Reserve Economic Database (FRED). All ~20 series are fetched in parallel, with year-partition pruning limiting each scan to the requested window. 1. Monetary Policy — Fed Funds Rate, IORB, SOFR, Fed balance sheet 2. Yield Curve — 2Y/5Y/10Y/30Y Treasuries + inversion spreads 3. Inflation — CPI, Core CPI, PCE, Core PCE, PPI, 10Y breakeven 4. Labour Market — Unemployment, nonfarm payrolls, initial claims 5. Housing & Growth — Housing starts, building permits, real GDP --search uses fred.catalog (539K series) to find series by keyword. Usage: export MICROQUERY_TOKEN=your_token python3 macro_economy.py python3 macro_economy.py --since 2020 python3 macro_economy.py --search "consumer price" python3 macro_economy.py --search "housing starts" """ import argparse import os import sys from concurrent.futures import ThreadPoolExecutor from client import MicroqueryClient, QueryError # ── helpers ────────────────────────────────────────────────────────────────── def to_float(v): try: return float(v) except (TypeError, ValueError): return None def fmt_val(v: float) -> str: if abs(v) >= 100_000: return f"{v:,.0f}" if abs(v) >= 1_000: return f"{v:,.1f}" if abs(v) >= 10: return f"{v:.2f}" return f"{v:.3f}" def header(title: str): print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}") def section(title: str): print(f"\n--- {title} ---") def latest(rows): """Return (float_val, date_str) for the most recent non-missing observation.""" for r in rows: v = to_float(r.get("val")) if v is not None: return v, r.get("obs_date", "")[:7] return None, "" def trend(rows, n: int = 3) -> str: """Short trend string comparing latest to n non-missing observations ago.""" vals = [] for r in rows: v = to_float(r.get("val")) if v is not None: vals.append(v) if len(vals) > n: break if len(vals) < 2: return "" delta = vals[0] - vals[-1] sign = "+" if delta >= 0 else "" return f" ({sign}{fmt_val(delta)} vs {n}p ago)" def fetch(mq, sid, since_year, limit=6): return mq.query("fred", f""" SELECT obs_date, "value" AS val FROM series WHERE series_id = '{sid}' AND obs_year >= {since_year} AND "value" != '.' ORDER BY obs_date DESC LIMIT {limit} """, verbose=True) # ── search mode ─────────────────────────────────────────────────────────────── def run_search(keyword: str, mq: MicroqueryClient): header(f"FRED CATALOG SEARCH: {keyword!r}") rows = mq.query("fred", f""" SELECT series_id, title, units, frequency, observation_end, popularity FROM catalog WHERE title ~ '(?i){keyword}' ORDER BY popularity DESC LIMIT 15 """, verbose=True) if not rows: print(" No matching series found.") else: print(f" {'SERIES_ID':<22} {'FREQ':<5} {'LAST_OBS':<11} TITLE") print(f" {'-'*21} {'-'*4} {'-'*10} {'-'*44}") for r in rows: sid = (r.get("series_id") or "")[:21] freq = (r.get("frequency") or "")[:4] end = (r.get("observation_end") or "")[:10] title = (r.get("title") or "")[:55] print(f" {sid:<22} {freq:<5} {end:<11} {title}") print(f"\n{mq.cost_summary()}") # ── dashboard mode ──────────────────────────────────────────────────────────── POLICY = { "DFF": "Fed Funds Rate (%)", "IORB": "Interest on Reserves (%)", "SOFR": "SOFR (%)", "WALCL": "Fed Balance Sheet (M USD)", } CURVE = { "DGS2": "2-Year Treasury (%)", "DGS5": "5-Year Treasury (%)", "DGS10": "10-Year Treasury (%)", "DGS30": "30-Year Treasury (%)", "T10Y2Y": "10Y minus 2Y spread", "T10Y3M": "10Y minus 3M spread", } INFLATION = { "CPIAUCSL": "CPI All Items (index)", "CPILFESL": "Core CPI ex food/energy (index)", "PCEPI": "PCE Price Index (index)", "PCEPILFE": "Core PCE (index)", "T10YIE": "10Y Breakeven Inflation (%)", "PPIFGS": "PPI Finished Goods (index)", } LABOR = { "UNRATE": "Unemployment Rate (%)", "PAYEMS": "Nonfarm Payrolls (000s)", "ICSA": "Initial Jobless Claims", } HOUSING = { "HOUST": "Housing Starts (000s ann.)", "PERMIT": "Building Permits (000s ann.)", "GDPC1": "Real GDP (B chained 2017 USD)", } ALL_SERIES = {**POLICY, **CURVE, **INFLATION, **LABOR, **HOUSING} def run_dashboard(since_year: int, mq: MicroqueryClient): header("U.S. MACRO ECONOMY DASHBOARD") print(f" Source: FRED (Federal Reserve Economic Data)") print(f" Window: obs_year >= {since_year}") with ThreadPoolExecutor(max_workers=16) as ex: futures = {sid: ex.submit(fetch, mq, sid, since_year) for sid in ALL_SERIES} results = {sid: f.result() for sid, f in futures.items()} def row(sid, label, inversion_flag=False): v, d = latest(results[sid]) if v is None: return t = trend(results[sid]) inv = " *** INVERTED ***" if inversion_flag and v < 0 else "" print(f" {label:<38} {fmt_val(v):>10} [{d}]{t}{inv}") section("1. Monetary Policy") for sid, label in POLICY.items(): row(sid, label) section("2. Yield Curve") for sid, label in CURVE.items(): inversion = sid in ("T10Y2Y", "T10Y3M") row(sid, label, inversion_flag=inversion) section("3. Inflation") for sid, label in INFLATION.items(): row(sid, label) section("4. Labour Market") for sid, label in LABOR.items(): row(sid, label) section("5. Housing & Growth") for sid, label in HOUSING.items(): row(sid, label) section("6. Recession Signals") for sid in ("T10Y2Y", "T10Y3M"): label = CURVE[sid] v, d = latest(results[sid]) if v is not None: status = "INVERTED (recession signal)" if v < 0 else "positive" print(f" {label}: {v:+.2f} [{d}] → {status}") unrate = [to_float(r.get("val")) for r in results.get("UNRATE", []) if to_float(r.get("val")) is not None] if len(unrate) >= 2: low = min(unrate) rise = unrate[0] - low if rise >= 0.5: print(f" Unemployment +{rise:.1f}pp above recent low → Sahm-Rule watch") header("SUMMARY") print(mq.cost_summary()) # ── main ────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="U.S. macro economy dashboard (FRED)") parser.add_argument("--since", type=int, default=2022, help="Show data from this year onward (default: 2022)") parser.add_argument("--search", metavar="KEYWORD", help="Search FRED catalog for series by keyword") parser.add_argument("--token", default=os.environ.get("MICROQUERY_TOKEN"), help="Microquery API token (or set MICROQUERY_TOKEN)") args = parser.parse_args() if not args.token: print("Error: set MICROQUERY_TOKEN or pass --token", file=sys.stderr) sys.exit(1) mq = MicroqueryClient(api_key=args.token) try: if args.search: run_search(args.search, mq) else: run_dashboard(args.since, mq) except QueryError as e: print(f"Query error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()