diff --git a/backend/script/detect.py b/backend/script/detect.py index a09a5a6..59feb99 100644 --- a/backend/script/detect.py +++ b/backend/script/detect.py @@ -29,35 +29,23 @@ from math import log2 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from bitcoin_rpc import cli, get_tx -# ═══════════════════════════════════════════════════════════════════════════════ -# ANSI formatting -# ═══════════════════════════════════════════════════════════════════════════════ -G = "\033[92m"; R_ = "\033[91m"; Y = "\033[93m"; C = "\033[96m" -B = "\033[1m"; DIM = "\033[2m"; RST = "\033[0m" - -FINDING_COUNT = 0 -WARN_COUNT = 0 +FINDINGS = [] +WARNINGS = [] def section(title): - print(f"\n{B}{'━'*78}{RST}") - print(f"{B}{C} {title}{RST}") - print(f"{B}{'━'*78}{RST}") + print(f"[{title}]", file=sys.stderr) def finding(msg): - global FINDING_COUNT - FINDING_COUNT += 1 - print(f" {R_}⚠ FINDING:{RST} {msg}") + FINDINGS.append(msg) def warn(msg): - global WARN_COUNT - WARN_COUNT += 1 - print(f" {Y}⚡ WARNING:{RST} {msg}") + WARNINGS.append(msg) def ok(msg): - print(f" {G}✓{RST} {msg}") + print(f"ok: {msg}", file=sys.stderr) def info(msg): - print(f" {DIM}│{RST} {msg}") + print(f" {msg}", file=sys.stderr) # ═══════════════════════════════════════════════════════════════════════════════ @@ -293,12 +281,21 @@ def detect_01_address_reuse(g: TxGraph): for addr, txids in reused.items(): meta = g.addr_map.get(addr, {}) role = "change" if meta.get("internal") else "receive" - finding(f"Address {addr} ({role}) used in {len(txids)} different transactions") + tx_list = [] for txid in sorted(txids): tx = g.fetch_tx(txid) - confs = tx.get("confirmations", "?") if tx else "?" - info(f"TX {txid[:16]}… ({confs} confirmations)") - info(f"An observer links all {len(txids)} transactions to the same entity.") + tx_list.append({"txid": txid, "confirmations": tx.get("confirmations", 0) if tx else 0}) + finding({ + "type": "ADDRESS_REUSE", + "severity": "HIGH", + "description": f"Address {addr} ({role}) reused across {len(txids)} transactions", + "details": { + "address": addr, + "role": role, + "tx_count": len(txids), + "txids": tx_list, + }, + }) def detect_02_cioh(g: TxGraph): @@ -330,32 +327,26 @@ def detect_02_cioh(g: TxGraph): ownership_pct = n_ours / total_inputs * 100 severity = "CRITICAL" if n_ours == total_inputs else "HIGH" - finding( - f"TX {txid[:16]}… has {total_inputs} inputs, {n_ours} are YOURS " - f"({ownership_pct:.0f}% ownership) [{severity}]" - ) - - # Shape analysis - if total_inputs >= 3 and n_outputs <= 2: - info(f"Consolidation shape: {total_inputs} inputs → {n_outputs} outputs (many→few)") - - # List the linked addresses - linked_addrs = set() - for ia in our_inputs: - linked_addrs.add(ia["address"]) - info(f"CIOH assumption: all {total_inputs} input addresses belong to the same entity.") - if n_ours == total_inputs: - info(f"CONFIRMED: all {n_ours} inputs are derived from your descriptor — this is provably your consolidation.") - else: - info(f"{n_ours}/{total_inputs} inputs are yours; the remaining {len(ext_inputs)} are external.") - info("An observer still assumes all inputs are one entity (CIOH).") - - for ia in our_inputs[:8]: - meta = g.addr_map.get(ia["address"], {}) - role = "change" if meta.get("internal") else "receive" - info(f" YOUR input: {ia['address'][:30]}… ({role}, {ia['value']:.8f} BTC)") - for ia in ext_inputs[:4]: - info(f" EXT input: {ia['address'][:30]}… ({ia['value']:.8f} BTC)") + finding({ + "type": "CIOH", + "severity": severity, + "description": f"TX {txid} merges {n_ours}/{total_inputs} of your inputs ({round(ownership_pct)}% ownership)", + "details": { + "txid": txid, + "total_inputs": total_inputs, + "our_inputs": n_ours, + "external_inputs": len(ext_inputs), + "ownership_pct": round(ownership_pct), + "our_addresses": [ + { + "address": ia["address"], + "role": "change" if g.addr_map.get(ia["address"], {}).get("internal") else "receive", + "amount_btc": round(ia["value"], 8), + } + for ia in our_inputs + ], + }, + }) if not found_any: ok("No multi-input transactions with ≥2 of your addresses detected.") @@ -387,13 +378,22 @@ def detect_03_dust(g: TxGraph): return if found: - finding(f"{len(found)} dust UTXO(s) currently in your wallet") for u in found: sats = int(round(u["amount"] * 1e8)) - label = "STRICT DUST" if sats <= STRICT_DUST else "dust-class" - finding(f" {u['address'][:30]}… = {sats} sats ({label}) — TX {u['txid'][:16]}…") - info("Dust UTXOs can be tracking tokens planted by an adversary (dust attack).") - info("If you spend this alongside a normal UTXO, the attacker links them via CIOH.") + label = "STRICT_DUST" if sats <= STRICT_DUST else "dust-class" + finding({ + "type": "DUST", + "severity": "HIGH" if label == "STRICT_DUST" else "MEDIUM", + "description": f"Dust UTXO at {u['address']} ({sats} sats, {label}, unspent)", + "details": { + "status": "unspent", + "address": u["address"], + "sats": sats, + "label": label, + "txid": u["txid"], + "vout": u["vout"], + }, + }) # Deduplicate historical seen = set() @@ -405,17 +405,20 @@ def detect_03_dust(g: TxGraph): unique_hist.append(h) if unique_hist: - if found: - extra = len(unique_hist) - len(found) - if extra > 0: - info(f"Additionally, {extra} dust outputs were sent to your addresses historically " - f"(already spent).") - else: - finding(f"{len(unique_hist)} dust output(s) were sent to your addresses historically (already spent)") - for h in unique_hist[:5]: - info(f" {h['address'][:30]}… = {h['sats']} sats — TX {h['txid'][:16]}…") - info("Dust UTXOs are tracking tokens planted by an adversary (dust attack).") - info("If spent alongside normal UTXOs, the attacker links them via CIOH.") + current_keys = {(u["txid"], u.get("address", "")) for u in found} + for h in unique_hist: + if (h["txid"], h["address"]) not in current_keys: + finding({ + "type": "DUST", + "severity": "LOW", + "description": f"Historical dust output at {h['address']} ({h['sats']} sats, already spent)", + "details": { + "status": "spent", + "address": h["address"], + "sats": h["sats"], + "txid": h["txid"], + }, + }) def detect_04_dust_spending(g: TxGraph): @@ -442,15 +445,16 @@ def detect_04_dust_spending(g: TxGraph): if dust_inputs and normal_inputs: found_any = True - finding( - f"TX {txid[:16]}… spends {len(dust_inputs)} dust input(s) alongside " - f"{len(normal_inputs)} normal input(s)" - ) - for d in dust_inputs: - info(f" Dust: {d['address'][:30]}… = {int(round(d['value']*1e8))} sats") - for n in normal_inputs: - info(f" Normal: {n['address'][:30]}… = {n['value']:.8f} BTC") - info("A dust attacker can now link your normal UTXO to the dust tracking token via CIOH.") + finding({ + "type": "DUST_SPENDING", + "severity": "HIGH", + "description": f"TX {txid} spends {len(dust_inputs)} dust input(s) alongside {len(normal_inputs)} normal input(s)", + "details": { + "txid": txid, + "dust_inputs": [{"address": d["address"], "sats": int(round(d["value"] * 1e8))} for d in dust_inputs], + "normal_inputs": [{"address": n["address"], "amount_btc": round(n["value"], 8)} for n in normal_inputs], + }, + }) if not found_any: ok("No dust spending mixed with normal inputs detected.") @@ -512,12 +516,16 @@ def detect_05_change_detection(g: TxGraph): if problems: found_any = True - finding(f"TX {txid[:16]}… has identifiable change output(s)") - for p in problems[:6]: - info(p) - for co in our_outs: - info(f" Probable change: {co['address'][:30]}… = {co['value']:.8f} BTC") - info("An observer can distinguish payment from change, tracking your remaining funds.") + finding({ + "type": "CHANGE_DETECTION", + "severity": "MEDIUM", + "description": f"TX {txid} has identifiable change output(s) ({len(problems)} heuristic(s) matched)", + "details": { + "txid": txid, + "reasons": problems[:6], + "change_outputs": [{"address": co["address"], "amount_btc": round(co["value"], 8)} for co in our_outs], + }, + }) if not found_any: ok("No easily identifiable change outputs detected.") @@ -542,14 +550,19 @@ def detect_06_consolidation_origin(g: TxGraph): # Check how many of the consolidation inputs were ours parent_inputs = g.get_input_addresses(utxo["txid"]) our_parent_in = [ia for ia in parent_inputs if g.is_ours(ia["address"])] - finding( - f"UTXO {utxo['txid'][:16]}…:{utxo['vout']} ({utxo['amount']:.8f} BTC) " - f"was born from consolidation ({n_in} inputs → {n_out} output)" - ) - if our_parent_in: - info(f"{len(our_parent_in)}/{n_in} inputs were yours — this was YOUR consolidation.") - info("This UTXO carries the full cluster linkage of all merged inputs.") - info("Anyone who traces back 1 hop sees all the addresses you linked together.") + finding({ + "type": "CONSOLIDATION", + "severity": "MEDIUM", + "description": f"UTXO {utxo['txid']}:{utxo['vout']} ({utxo['amount']:.8f} BTC) born from a {n_in}-input consolidation", + "details": { + "txid": utxo["txid"], + "vout": utxo["vout"], + "amount_btc": round(utxo["amount"], 8), + "consolidation_inputs": n_in, + "consolidation_outputs": n_out, + "our_inputs_in_consolidation": len(our_parent_in), + }, + }) if not found_any: ok("No UTXOs from prior consolidation detected.") @@ -576,12 +589,19 @@ def detect_07_script_type_mixing(g: TxGraph): types.discard("unknown") if len(types) >= 2: found_any = True - finding(f"TX {txid[:16]}… mixes input script types: {types}") - for ia in input_addrs: - mine = "YOURS" if g.is_ours(ia["address"]) else "ext" - info(f" [{mine}] {ia['address'][:30]}… type={g.get_script_type(ia['address'])}") - info("Mixing script types is a strong wallet fingerprint. Most wallets use one type.") - info("This reveals that a single entity controls multiple address families.") + finding({ + "type": "SCRIPT_TYPE_MIXING", + "severity": "HIGH", + "description": f"TX {txid} mixes input script types: {sorted(types)}", + "details": { + "txid": txid, + "script_types": sorted(types), + "inputs": [ + {"address": ia["address"], "script_type": g.get_script_type(ia["address"]), "ours": g.is_ours(ia["address"])} + for ia in input_addrs + ], + }, + }) if not found_any: ok("No script type mixing detected.") @@ -627,11 +647,15 @@ def detect_08_cluster_merge(g: TxGraph): if merged_clusters: found_any = True - finding(f"TX {txid[:16]}… merges UTXOs from different funding chains") - for key, sources in funding_sources.items(): - info(f" Input {key} ← funded by {sources}") - info("Previously separate identity clusters are now permanently linked.") - info("An observer can conclude the same entity controlled both funding paths.") + finding({ + "type": "CLUSTER_MERGE", + "severity": "HIGH", + "description": f"TX {txid} merges UTXOs from {len(funding_sources)} different funding chains", + "details": { + "txid": txid, + "funding_sources": {k: sorted(v) for k, v in funding_sources.items()}, + }, + }) if not found_any: ok("No cross-origin cluster merges detected.") @@ -669,20 +693,29 @@ def detect_09_lookback_depth(g: TxGraph): ok(f"UTXO age spread is small ({spread} blocks). No dormancy pattern.") return - finding(f"UTXO age spread: {spread} blocks between oldest and newest") - info(f"Oldest: {oldest['utxo']['txid'][:16]}… = {oldest['confirmations']} confirmations " - f"({oldest['utxo']['amount']:.8f} BTC)") - info(f"Newest: {newest['utxo']['txid'][:16]}… = {newest['confirmations']} confirmations " - f"({newest['utxo']['amount']:.8f} BTC)") + finding({ + "type": "UTXO_AGE_SPREAD", + "severity": "LOW", + "description": f"UTXO age spread of {spread} blocks between oldest and newest", + "details": { + "spread_blocks": spread, + "oldest": {"txid": oldest["utxo"]["txid"], "confirmations": oldest["confirmations"], "amount_btc": round(oldest["utxo"]["amount"], 8)}, + "newest": {"txid": newest["utxo"]["txid"], "confirmations": newest["confirmations"], "amount_btc": round(newest["utxo"]["amount"], 8)}, + }, + }) - # Flag very old UTXOs OLD_THRESHOLD = 100 # blocks old_utxos = [a for a in aged if a["confirmations"] >= OLD_THRESHOLD] if old_utxos: - warn(f"{len(old_utxos)} UTXO(s) have ≥{OLD_THRESHOLD} confirmations — dormant/hoarded coins pattern") - - info("UTXO age reveals dormancy patterns and can distinguish 'fresh' exchange") - info("withdrawals from aged savings. Spending old + new together worsens this.") + warn({ + "type": "DORMANT_UTXOS", + "severity": "LOW", + "description": f"{len(old_utxos)} UTXO(s) have ≥{OLD_THRESHOLD} confirmations (dormant/hoarded coins pattern)", + "details": { + "count": len(old_utxos), + "threshold_blocks": OLD_THRESHOLD, + }, + }) def detect_10_exchange_origin(g: TxGraph, known_exchange_wallets=None): @@ -756,12 +789,16 @@ def detect_10_exchange_origin(g: TxGraph, known_exchange_wallets=None): if len(signals) >= 2: found_any = True - finding(f"TX {txid[:16]}… looks like an exchange batch withdrawal") - for s in signals: - info(s) - for o in our_outputs: - info(f" You received: {o['address'][:30]}… = {o['value']:.8f} BTC") - info("UTXOs from exchange withdrawals reveal you interacted with that exchange.") + finding({ + "type": "EXCHANGE_ORIGIN", + "severity": "MEDIUM", + "description": f"TX {txid} looks like an exchange batch withdrawal ({len(signals)} signal(s))", + "details": { + "txid": txid, + "signals": signals, + "received_outputs": [{"address": o["address"], "amount_btc": round(o["value"], 8)} for o in our_outputs], + }, + }) if not found_any: ok("No exchange-origin batch patterns detected.") @@ -812,16 +849,17 @@ def detect_11_tainted_utxos(g: TxGraph, known_risky_wallets=None): if tainted and clean: found_any = True taint_pct = len(tainted) / len(input_addrs) * 100 - finding( - f"TX {txid[:16]}… merges {len(tainted)} tainted + {len(clean)} clean inputs " - f"({taint_pct:.0f}% taint)" - ) - for t in tainted: - info(f" TAINTED: {t['address'][:30]}… = {t['value']:.8f} BTC (from risky TX {t['txid'][:16]}…)") - for c in clean[:4]: - info(f" CLEAN: {c['address'][:30]}… = {c['value']:.8f} BTC") - info("Taint propagation: ALL outputs of this TX are now contaminated.") - info("Even clean recipients inherit the taint via the merge.") + finding({ + "type": "TAINTED_UTXO_MERGE", + "severity": "HIGH", + "description": f"TX {txid} merges {len(tainted)} tainted + {len(clean)} clean inputs ({round(taint_pct)}% taint)", + "details": { + "txid": txid, + "tainted_inputs": [{"address": t["address"], "amount_btc": round(t["value"], 8), "source_txid": t["txid"]} for t in tainted], + "clean_inputs": [{"address": c["address"], "amount_btc": round(c["value"], 8)} for c in clean], + "taint_pct": round(taint_pct), + }, + }) # Also check: did we receive directly from a risky source? for txid in g.our_txids: @@ -829,9 +867,15 @@ def detect_11_tainted_utxos(g: TxGraph, known_risky_wallets=None): our_outs = [o for o in g.get_output_addresses(txid) if g.is_ours(o["address"])] if our_outs: found_any = True - warn(f"TX {txid[:16]}… is directly from a known risky source") - for o in our_outs: - info(f" You received: {o['address'][:30]}… = {o['value']:.8f} BTC") + warn({ + "type": "DIRECT_TAINT", + "severity": "HIGH", + "description": f"TX {txid} is directly from a known risky source", + "details": { + "txid": txid, + "received_outputs": [{"address": o["address"], "amount_btc": round(o["value"], 8)} for o in our_outs], + }, + }) if not found_any: ok("No tainted UTXO merges detected.") @@ -1027,14 +1071,15 @@ def detect_12_behavioral_fingerprint(g: TxGraph): ok(f"Analyzed {len(send_txids)} transactions. No strong behavioral fingerprints detected.") return - finding(f"Behavioral fingerprint detected across {len(send_txids)} send transactions") - for p in problems: - warn(p) - - info("") - info(f"Summary: {len(problems)} identifiable pattern(s) found.") - info("Chain analysis firms use exactly these features to cluster wallets.") - info("Even without address reuse, behavioral consistency can re-identify you.") + finding({ + "type": "BEHAVIORAL_FINGERPRINT", + "severity": "MEDIUM", + "description": f"Behavioral fingerprint detected across {len(send_txids)} send transactions ({len(problems)} pattern(s))", + "details": { + "send_tx_count": len(send_txids), + "patterns": problems, + }, + }) # ═══════════════════════════════════════════════════════════════════════════════ @@ -1063,10 +1108,6 @@ def main(): if not args.wallet and not args.descriptors: parser.error("Provide either --wallet or one or more descriptors.") - print(f"\n{B}{'═'*78}{RST}") - print(f"{B}{C} BITCOIN PRIVACY VULNERABILITY DETECTOR{RST}") - print(f"{B}{'═'*78}{RST}") - # ── Step 1: Resolve descriptors ── section("Setup: Resolving Descriptors") descriptors = resolve_descriptors(args) @@ -1107,8 +1148,7 @@ def main(): info(f"Current UTXOs: {len(utxos)}") if not wallet_txs: - print(f"\n {R_}No transactions found for these descriptors.{RST}") - print(f" Make sure you have run reproduce.py first, or the descriptors are correct.\n") + print(json.dumps({"error": "No transactions found for these descriptors."})) return # ── Step 5: Build transaction graph ── @@ -1129,17 +1169,21 @@ def main(): detect_11_tainted_utxos(g, args.known_risky_wallets) detect_12_behavioral_fingerprint(g) - # ── Summary ── - print(f"\n{B}{'═'*78}{RST}") - print(f"{B} SCAN COMPLETE{RST}") - print(f"{'═'*78}") - print(f" {R_}⚠ Findings: {FINDING_COUNT}{RST}") - print(f" {Y}⚡ Warnings: {WARN_COUNT}{RST}") - print(f" Transactions analyzed: {len(g.our_txids)}") - print(f" Addresses derived: {len(addr_map)}") - if FINDING_COUNT == 0 and WARN_COUNT == 0: - print(f" {G}✓ No privacy issues detected.{RST}") - print(f"{'═'*78}\n") + # ── JSON output ── + report = { + "stats": { + "transactions_analyzed": len(g.our_txids), + "addresses_derived": len(addr_map), + }, + "findings": FINDINGS, + "warnings": WARNINGS, + "summary": { + "findings": len(FINDINGS), + "warnings": len(WARNINGS), + "clean": len(FINDINGS) == 0 and len(WARNINGS) == 0, + }, + } + print(json.dumps(report, indent=2)) # Cleanup if not args.wallet and not args.keep_scan_wallet: