#!/usr/bin/env python3 """ Blockchain Compliance & AML Screening Datasets used: eth.transfers eth.dex_swaps eth.lending eth.lp_events eth.mev Cross-dataset value: eth.transfers reveals counterparty concentration — high unique-sender or unique-recipient counts are a layering signal; burst activity across short windows can indicate structuring. eth.dex_swaps surfaces DeFi layering: cross-protocol swaps through many pools fragment the transaction trail; high active-blocks-per-swap ratios indicate automated behaviour. eth.lending flags leverage and liquidation risk — liquidation_call events are a strong financial-stress signal; repeated borrow/repay cycles may indicate wash activity to manufacture on-chain history. eth.lp_events detects wash-trading patterns: frequent mint/burn cycles in the same pool inflate apparent volume without economic purpose. eth.mev cross-references block-level MEV relay data — addresses appearing consistently in MEV-extracted blocks are either MEV bots themselves or persistent targets, both of which are elevated-risk profiles. Together they answer: "Does this wallet show AML red flags — layering, structuring, wash trading, high-risk counterparties, or automated behaviour?" Performance note: Queries use block_timestamp (transfers/dex_swaps/lending/lp_events) and timestamp (mev) as Sneller automatically time-indexes these fields. A 30-day window prunes transfers ~72%, dex_swaps ~95%, lending/lp_events ~92%. Usage: export MICROQUERY_TOKEN=your_token python3 blockchain_aml.py --address 0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045 python3 blockchain_aml.py --address 0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045 --days 30 """ import argparse import datetime import os import sys from concurrent.futures import ThreadPoolExecutor sys.path.insert(0, os.path.dirname(__file__)) from client import MicroqueryClient, QueryError def fmt(n: int) -> str: return f"{n:,}" def header(title: str) -> None: print(f"\n{'=' * 60}") print(f" {title}") print(f"{'=' * 60}") def section(title: str) -> None: print(f"\n--- {title} ---") def sparkline(values: list[int]) -> str: if not values or max(values) == 0: return "▁" * len(values) blocks = "▁▂▃▄▅▆▇█" mx = max(values) return "".join(blocks[int(v / mx * 7)] for v in values) def run(address: str, days: int, mq: MicroqueryClient) -> None: addr = address.lower() cutoff = (datetime.date.today() - datetime.timedelta(days=days)).isoformat() # block_timestamp is a Sneller-indexed timestamp field; backtick conversion # happens automatically in the proxy via rewriteTimestampLiterals. date_filter = f"AND block_timestamp >= '{cutoff}'" mev_date_filter = f"AND timestamp >= '{cutoff}'" header(f"AML SCREENING: {address[:10]}...{address[-6:]}") print(f" Address : {address}") print(f" Window : last {days} days (since {cutoff})") with ThreadPoolExecutor(max_workers=7) as ex: # ------------------------------------------------------------------ # # Phase 1: fire all six independent queries together. # # block_rows is needed for phase 2 (MEV cross-reference), so collect # # it first; the others can be collected after block_rows is known. # # ------------------------------------------------------------------ # # Single pass over transfers for both inbound and outbound — halves # the bytes scanned vs two separate queries. f_transfers = ex.submit(mq.query, "eth", f""" SELECT CASE WHEN to_address = '{addr}' THEN 'in' ELSE 'out' END AS dir, token_address, COUNT(*) AS tx_count, COUNT(DISTINCT CASE WHEN to_address = '{addr}' THEN from_address ELSE to_address END) AS counterparties FROM transfers WHERE (from_address = '{addr}' OR to_address = '{addr}') {date_filter} GROUP BY dir, token_address ORDER BY dir, tx_count DESC LIMIT 16 """, verbose=True) f_swaps = ex.submit(mq.query, "eth", f""" SELECT protocol, pool_address, COUNT(*) AS swap_count, COUNT(DISTINCT block_number) AS active_blocks FROM dex_swaps WHERE (sender = '{addr}' OR recipient = '{addr}') {date_filter} GROUP BY protocol, pool_address ORDER BY swap_count DESC LIMIT 8 """, verbose=True) f_lend = ex.submit(mq.query, "eth", f""" SELECT protocol, event_type, asset, COUNT(*) AS event_count FROM lending WHERE (user = '{addr}' OR on_behalf_of = '{addr}') {date_filter} GROUP BY protocol, event_type, asset ORDER BY event_count DESC LIMIT 10 """, verbose=True) f_lp = ex.submit(mq.query, "eth", f""" SELECT pool_address, event_type, COUNT(*) AS event_count FROM lp_events WHERE (owner = '{addr}' OR sender = '{addr}') {date_filter} GROUP BY pool_address, event_type ORDER BY event_count DESC LIMIT 8 """, verbose=True) # block_rows feeds phase 2 (MEV lookup) — collect it first f_blocks = ex.submit(mq.query, "eth", f""" SELECT DISTINCT block_number FROM transfers WHERE (from_address = '{addr}' OR to_address = '{addr}') {date_filter} LIMIT 200 """, verbose=True) block_rows = f_blocks.result() # ------------------------------------------------------------------ # # Phase 2: MEV lookup depends on block_rows result # # ------------------------------------------------------------------ # mev_rows = [] if block_rows: block_list = ", ".join(str(r["block_number"]) for r in block_rows) mev_rows = ex.submit(mq.query, "eth", f""" SELECT relay, COUNT(*) AS mev_blocks, SUM(value_eth) AS total_eth_extracted FROM mev WHERE block_number IN ({block_list}) {mev_date_filter} GROUP BY relay ORDER BY mev_blocks DESC """, verbose=True).result() # Collect remaining phase-1 results (likely already done by now) transfer_rows = f_transfers.result() in_rows = [{"token_address": r["token_address"], "tx_count": r["tx_count"], "senders": r["counterparties"]} for r in transfer_rows if r["dir"] == "in"] out_rows = [{"token_address": r["token_address"], "tx_count": r["tx_count"], "recipients": r["counterparties"]} for r in transfer_rows if r["dir"] == "out"] swap_rows = f_swaps.result() lend_rows = f_lend.result() lp_rows = f_lp.result() # ------------------------------------------------------------------ # # Print results in logical order # # ------------------------------------------------------------------ # # ------------------------------------------------------------------ # # 1. Token transfers — inflows and outflows # # ------------------------------------------------------------------ # section("1. Counterparty & Transfer Activity") total_senders = sum(r["senders"] for r in in_rows) total_recipients = sum(r["recipients"] for r in out_rows) if in_rows: flag = " ⚠ HIGH counterparty count (layering signal)" if total_senders > 100 else "" print(f" Received from {total_senders:,} unique senders:{flag}") for r in in_rows[:5]: tok = r["token_address"] print(f" {tok[:10]}...{tok[-4:]} {fmt(r['tx_count'])} inbound txs " f"({r['senders']} senders)") else: print(" No inbound token transfers found") if out_rows: flag = " ⚠ HIGH counterparty count (fan-out / structuring signal)" if total_recipients > 100 else "" print(f" Sent to {total_recipients:,} unique recipients:{flag}") for r in out_rows[:5]: tok = r["token_address"] print(f" {tok[:10]}...{tok[-4:]} {fmt(r['tx_count'])} outbound txs " f"({r['recipients']} recipients)") else: print(" No outbound token transfers found") # ------------------------------------------------------------------ # # 2. DEX swaps — trading activity # # ------------------------------------------------------------------ # section("2. DEX Activity — Layering Signals") if swap_rows: total_swaps = sum(r["swap_count"] for r in swap_rows) protocols = {r["protocol"] for r in swap_rows if r.get("protocol")} cross_protocol = len(protocols) > 1 flag = " ⚠ cross-protocol layering" if cross_protocol else "" print(f" {fmt(total_swaps)} swaps across {len(swap_rows)} pools " f"({', '.join(protocols) or 'unknown'}){flag}") for r in swap_rows[:5]: pool = r["pool_address"] ratio = r["swap_count"] / max(r["active_blocks"], 1) auto_flag = " ⚠ automated" if ratio > 3 else "" print(f" {r.get('protocol','?'):12s} pool {pool[:10]}...{pool[-4:]} " f"{fmt(r['swap_count'])} swaps ({r['active_blocks']} blocks){auto_flag}") else: print(" No DEX swap activity found") # ------------------------------------------------------------------ # # 3. Lending — AAVE / Compound # # ------------------------------------------------------------------ # section("3. Lending — Leverage & Liquidation Risk") if lend_rows: liq = [r for r in lend_rows if r.get("event_type") == "liquidation_call"] liq_flag = f" ⚠ {len(liq)} LIQUIDATION event(s) — financial stress signal" if liq else "" print(f" {len(lend_rows)} lending event type(s) across protocols{liq_flag}") for r in lend_rows[:6]: asset = (r.get("asset") or "") print(f" {r.get('protocol','?'):8s} {r.get('event_type','?'):20s} " f"asset {asset[:10]} {fmt(r['event_count'])} events") else: print(" No lending activity found") # ------------------------------------------------------------------ # # 4. LP events — Uniswap V3 liquidity positions # # ------------------------------------------------------------------ # section("4. LP Activity — Wash-Trading Indicators") if lp_rows: pools = len({r["pool_address"] for r in lp_rows}) mints = sum(r["event_count"] for r in lp_rows if r.get("event_type") == "mint") burns = sum(r["event_count"] for r in lp_rows if r.get("event_type") == "burn") churn_flag = (" ⚠ high mint/burn churn — possible wash-trading" if mints > 0 and burns > 0 and min(mints, burns) / max(mints, burns) > 0.7 else "") print(f" LP activity in {pools} pool(s) ({mints} mints / {burns} burns){churn_flag}") for r in lp_rows[:5]: pool = r["pool_address"] print(f" {pool[:10]}...{pool[-4:]} {r.get('event_type','?'):12s} " f"{fmt(r['event_count'])} events") else: print(" No LP activity found") # ------------------------------------------------------------------ # # 5. MEV exposure — was this address in MEV-extracted blocks? # # ------------------------------------------------------------------ # section("5. MEV Exposure — Automated / Sophisticated Actor") if block_rows: if mev_rows: total_mev = sum(r["mev_blocks"] for r in mev_rows) pct = total_mev / len(block_rows) * 100 mev_flag = " ⚠ likely MEV bot or persistent MEV target" if pct > 50 else "" print(f" {total_mev}/{len(block_rows)} sampled blocks were MEV blocks ({pct:.1f}%){mev_flag}") for r in mev_rows[:4]: eth_val = r.get("total_eth_extracted") or 0 print(f" relay={r.get('relay','?'):20s} {fmt(r['mev_blocks'])} blocks " f"{float(eth_val):.4f} ETH extracted") else: print(f" None of {len(block_rows)} sampled blocks were MEV blocks — no bot signal") else: print(" No transfer blocks found to cross-reference") # ------------------------------------------------------------------ # # Summary # # ------------------------------------------------------------------ # section("AML Risk Summary") has_liq = any(r.get("event_type") == "liquidation_call" for r in lend_rows) protocols = {r["protocol"] for r in swap_rows if r.get("protocol")} mev_total = sum(r["mev_blocks"] for r in mev_rows) if mev_rows else 0 mev_pct = mev_total / len(block_rows) * 100 if block_rows else 0 flags = [] if total_senders > 100: flags.append("HIGH inbound counterparties (layering)") if total_recipients > 100: flags.append("HIGH outbound counterparties (structuring/fan-out)") if len(protocols) > 1: flags.append("cross-protocol DEX activity (layering)") if has_liq: flags.append("liquidation events (financial stress)") if mev_pct > 50: flags.append("MEV bot or persistent MEV target (automated actor)") if flags: print(f" ⚠ Risk flags ({len(flags)}):") for f in flags: print(f" • {f}") else: print(" ✓ No significant AML risk flags in this window") print(f"\n {mq.cost_summary()}") def main() -> None: parser = argparse.ArgumentParser(description="Blockchain compliance & AML screening") parser.add_argument( "--address", default="0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045", help="Ethereum address to analyse (default: Vitalik's public address)", ) parser.add_argument("--days", type=int, default=90, help="Lookback window in days (default: 90)") parser.add_argument("--token", default=os.environ.get("MICROQUERY_TOKEN"), help="Microquery API token (or set MICROQUERY_TOKEN)") args = parser.parse_args() if not args.token: print("Error: set MICROQUERY_TOKEN or pass --token", file=sys.stderr) sys.exit(1) mq = MicroqueryClient(api_key=args.token) try: run(args.address, args.days, mq) except QueryError as e: print(f"Query error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()