diff --git a/backend/script/README.md b/backend/script/README.md new file mode 100644 index 0000000..7f3efbf --- /dev/null +++ b/backend/script/README.md @@ -0,0 +1,29 @@ +pass: "aW2u~fYiuLu3)%a" + +METAMASK: +1.twenty +2.series +3.camera +4.invite +5.dismiss +6.gentle +7.dose +8.hotel +9.circle +10.eight +11.rotate +12.assault + +ENKRYPT: +damage +scare +aerobic +eagle +club +typical +cricket +kick +jaguar +paddle +void +dinner \ No newline at end of file diff --git a/backend/script/__pycache__/bitcoin_rpc.cpython-310.pyc b/backend/script/__pycache__/bitcoin_rpc.cpython-310.pyc new file mode 100644 index 0000000..7ebb809 Binary files /dev/null and b/backend/script/__pycache__/bitcoin_rpc.cpython-310.pyc differ diff --git a/backend/script/bitcoin_rpc.py b/backend/script/bitcoin_rpc.py new file mode 100644 index 0000000..cada361 --- /dev/null +++ b/backend/script/bitcoin_rpc.py @@ -0,0 +1,142 @@ +""" +bitcoin_rpc.py — Thin wrapper around bitcoin-cli for Python tests. +Uses subprocess calls to bitcoin-cli -regtest. +""" + +import json +import subprocess +import time +import os + +CLI = "bitcoin-cli" +SIGNET_ARGS = [CLI, "-regtest"] + +def cli(*args, wallet=None): + """Call bitcoin-cli -regtest [wallet] and return parsed JSON or string.""" + cmd = list(SIGNET_ARGS) + if wallet: + cmd.append(f"-rpcwallet={wallet}") + cmd.extend(str(a) for a in args) + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) + if result.returncode != 0: + raise RuntimeError(f"bitcoin-cli error: {result.stderr.strip()}\n cmd: {' '.join(cmd)}") + + output = result.stdout.strip() + if not output: + return None + try: + return json.loads(output) + except json.JSONDecodeError: + return output + + +def mine_blocks(n=1): + """Mine n blocks on regtest using generatetoaddress.""" + miner_addr = cli("getnewaddress", "", "bech32", wallet="miner") + cli("generatetoaddress", n, miner_addr) + return int(cli("getblockcount")) + + +def fund_wallet(wallet_name, amount=1.0, from_wallet="miner"): + """Send `amount` BTC from `from_wallet` to a new address in `wallet_name`.""" + addr = cli("getnewaddress", "", "bech32", wallet=wallet_name) + txid = cli("sendtoaddress", addr, f"{amount:.8f}", wallet=from_wallet) + return txid, addr + + +def wait_for_mempool_empty(timeout=60): + """Wait until mempool is empty (all txs mined).""" + for _ in range(timeout * 2): + info = cli("getmempoolinfo") + if info["size"] == 0: + return True + time.sleep(0.5) + return False + + +def get_tx(txid): + """Get decoded transaction.""" + return cli("getrawtransaction", txid, "true") + + +def get_utxos(wallet_name, min_conf=0): + """List unspent outputs for a wallet.""" + return cli("listunspent", min_conf, wallet=wallet_name) + + +def get_balance(wallet_name): + """Get wallet balance.""" + return float(cli("getbalance", wallet=wallet_name)) + + +def send_raw(hex_tx): + """Broadcast a raw transaction.""" + return cli("sendrawtransaction", hex_tx) + + +def create_funded_psbt(wallet_name, inputs, outputs, options=None): + """Create a funded PSBT.""" + args = ["walletcreatefundedpsbt", json.dumps(inputs), json.dumps(outputs), 0] + if options: + args.append(json.dumps(options)) + return cli(*args, wallet=wallet_name) + + +def process_psbt(wallet_name, psbt): + """Sign a PSBT.""" + return cli("walletprocesspsbt", psbt, wallet=wallet_name) + + +def finalize_psbt(psbt): + """Finalize a PSBT.""" + return cli("finalizepsbt", psbt) + + +def decode_psbt(psbt): + """Decode a PSBT.""" + return cli("decodepsbt", psbt) + + +def create_raw_tx(inputs, outputs): + """Create a raw transaction.""" + return cli("createrawtransaction", json.dumps(inputs), json.dumps(outputs)) + + +def sign_raw_tx(wallet_name, hex_tx): + """Sign a raw transaction.""" + return cli("signrawtransactionwithwallet", hex_tx, wallet=wallet_name) + + +def decode_raw_tx(hex_tx): + """Decode a raw transaction.""" + return cli("decoderawtransaction", hex_tx) + + +def get_block_count(): + """Get current block height.""" + return int(cli("getblockcount")) + + +def get_new_address(wallet_name, addr_type="bech32"): + """Get a new address.""" + return cli("getnewaddress", "", addr_type, wallet=wallet_name) + + +def send_to_address(wallet_name, address, amount): + """Send BTC to an address.""" + return cli("sendtoaddress", address, f"{amount:.8f}", wallet=wallet_name) + + +if __name__ == "__main__": + print("Testing RPC connection...") + info = cli("getblockchaininfo") + print(f" Chain: {info['chain']}") + print(f" Blocks: {info['blocks']}") + + for w in ["miner", "alice", "bob", "carol", "exchange", "risky"]: + try: + bal = get_balance(w) + print(f" {w}: {bal} BTC") + except Exception as e: + print(f" {w}: ERROR - {e}") diff --git a/backend/script/detect.py b/backend/script/detect.py new file mode 100644 index 0000000..36314fb --- /dev/null +++ b/backend/script/detect.py @@ -0,0 +1,1141 @@ +#!/usr/bin/env python3 +""" +detect.py +========= +Blockchain privacy vulnerability detector. + +INPUT: One or more output descriptors (or --wallet to read them). +OUTPUT: Every privacy vulnerability found for that descriptor's address set. + +The detector creates a temporary watch-only wallet, imports descriptors with +a full rescan, then analyses all historical transactions touching any derived +address. It never scans the entire chain — only transactions the wallet knows. + +Usage: + python3 detect.py --wallet alice + python3 detect.py "wpkh([fp/84h/1h/0h]tpub.../0/*)#checksum" "wpkh([fp/84h/1h/0h]tpub.../1/*)#checksum" + python3 detect.py --wallet alice --known-risky-wallets risky --known-exchange-wallets exchange +""" + +import sys +import os +import json +import time +import hashlib +import argparse +from collections import defaultdict +from math import log2 + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from bitcoin_rpc import cli, get_tx + +# ═══════════════════════════════════════════════════════════════════════════════ +# ANSI formatting +# ═══════════════════════════════════════════════════════════════════════════════ +G = "\033[92m"; R_ = "\033[91m"; Y = "\033[93m"; C = "\033[96m" +B = "\033[1m"; DIM = "\033[2m"; RST = "\033[0m" + +FINDING_COUNT = 0 +WARN_COUNT = 0 + +def section(title): + print(f"\n{B}{'━'*78}{RST}") + print(f"{B}{C} {title}{RST}") + print(f"{B}{'━'*78}{RST}") + +def finding(msg): + global FINDING_COUNT + FINDING_COUNT += 1 + print(f" {R_}⚠ FINDING:{RST} {msg}") + +def warn(msg): + global WARN_COUNT + WARN_COUNT += 1 + print(f" {Y}⚡ WARNING:{RST} {msg}") + +def ok(msg): + print(f" {G}✓{RST} {msg}") + +def info(msg): + print(f" {DIM}│{RST} {msg}") + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 1. WALLET + ADDRESS RESOLUTION +# ═══════════════════════════════════════════════════════════════════════════════ + +def resolve_descriptors(args): + """Get the descriptor list from args: either --wallet or positional descriptors.""" + descs = [] + if args.wallet: + result = cli("listdescriptors", wallet=args.wallet) + for d in result["descriptors"]: + descs.append({ + "desc": d["desc"], + "internal": d.get("internal", False), + "active": d.get("active", True), + "range_end": d.get("range", [0, 999])[1] if isinstance(d.get("range"), list) else d.get("range", 999), + }) + else: + for d in args.descriptors: + descs.append({ + "desc": d, + "internal": "/1/*" in d, + "active": True, + "range_end": 999, + }) + return descs + + +def derive_all_addresses(descriptors): + """Derive addresses from all descriptors, return {address -> (desc_type, internal, index)}.""" + addr_map = {} # address -> metadata + for dinfo in descriptors: + desc = dinfo["desc"] + rng = min(dinfo["range_end"], 999) + # Detect descriptor type + dtype = "unknown" + if desc.startswith("wpkh("): dtype = "p2wpkh" + elif desc.startswith("tr("): dtype = "p2tr" + elif desc.startswith("sh(wpkh("): dtype = "p2sh-p2wpkh" + elif desc.startswith("pkh("): dtype = "p2pkh" + + try: + addrs = cli("deriveaddresses", desc, f"[0,{rng}]") + if addrs: + for i, a in enumerate(addrs): + addr_map[a] = { + "type": dtype, + "internal": dinfo["internal"], + "index": i, + } + except Exception as e: + info(f"Could not derive from {desc[:40]}…: {e}") + return addr_map + + +def build_scan_wallet(descriptors, wallet_name="_detect_scan"): + """Create a temporary watch-only wallet with descriptors, do full rescan.""" + # Clean up if exists + try: + cli("unloadwallet", wallet_name) + except Exception: + pass + + try: + cli("createwallet", wallet_name, "true", "true", "", "false", "true") + except Exception: + try: + cli("loadwallet", wallet_name) + except Exception: + pass + + import_batch = [] + for d in descriptors: + import_batch.append({ + "desc": d["desc"], + "timestamp": 0, # full rescan + "internal": d["internal"], + "active": d["active"], + "range": [0, d["range_end"]], + }) + + result = cli("importdescriptors", json.dumps(import_batch), wallet=wallet_name) + # Check results + for r in (result or []): + if not r.get("success"): + info(f"Import warning: {r.get('error', {}).get('message', 'unknown')}") + + return wallet_name + + +def get_all_transactions(wallet_name, count=10000): + """Get full transaction history for the wallet.""" + txs = cli("listtransactions", "*", count, 0, "true", wallet=wallet_name) + return txs or [] + + +def get_all_utxos(wallet_name): + """Get all UTXOs (confirmed and unconfirmed).""" + return cli("listunspent", 0, 9999999, wallet=wallet_name) or [] + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 2. TRANSACTION GRAPH BUILDER +# ═══════════════════════════════════════════════════════════════════════════════ + +class TxGraph: + """Indexed view of all transactions touching our address set.""" + + def __init__(self, addr_map, wallet_txs, utxos): + self.addr_map = addr_map # {address -> metadata} + self.our_addrs = set(addr_map.keys()) + self.utxos = utxos # current UTXOs + self.tx_cache = {} # txid -> decoded tx + self.our_txids = set() # txids we participate in + + # Index: address -> list of (txid, direction, value) + self.addr_txs = defaultdict(list) # address -> [{txid, direction, amount}] + # Index: txid -> list of our addresses involved + self.tx_addrs = defaultdict(set) + + # Build from wallet tx list + for wtx in wallet_txs: + txid = wtx.get("txid", "") + addr = wtx.get("address", "") + cat = wtx.get("category", "") # send/receive + amount = wtx.get("amount", 0) + if txid: + self.our_txids.add(txid) + if addr and txid: + self.addr_txs[addr].append({ + "txid": txid, "category": cat, "amount": amount, + "confirmations": wtx.get("confirmations", 0), + "blockheight": wtx.get("blockheight", 0), + }) + self.tx_addrs[txid].add(addr) + + def fetch_tx(self, txid): + """Get decoded transaction (cached).""" + if txid not in self.tx_cache: + try: + self.tx_cache[txid] = get_tx(txid) + except Exception: + return None + return self.tx_cache[txid] + + def get_input_addresses(self, txid): + """Get all input addresses for a transaction.""" + tx = self.fetch_tx(txid) + if not tx: + return [] + addrs = [] + for vin in tx.get("vin", []): + if vin.get("coinbase"): + continue + parent = self.fetch_tx(vin["txid"]) + if parent: + vout_data = parent["vout"][vin["vout"]] + addr = vout_data.get("scriptPubKey", {}).get("address", "") + value = vout_data.get("value", 0) + addrs.append({"address": addr, "value": value, "txid": vin["txid"], "vout": vin["vout"]}) + return addrs + + def get_output_addresses(self, txid): + """Get all output addresses for a transaction.""" + tx = self.fetch_tx(txid) + if not tx: + return [] + addrs = [] + for vout in tx.get("vout", []): + addr = vout.get("scriptPubKey", {}).get("address", "") + addrs.append({ + "address": addr, + "value": vout["value"], + "n": vout["n"], + "type": vout.get("scriptPubKey", {}).get("type", "unknown"), + }) + return addrs + + def is_ours(self, address): + return address in self.our_addrs + + def get_script_type(self, address): + """Return the script type metadata for one of our addresses.""" + meta = self.addr_map.get(address) + if meta: + return meta["type"] + # Heuristic from prefix (supports mainnet, testnet/signet, regtest) + if address.startswith(("tb1q", "bc1q", "bcrt1q")): + return "p2wpkh" + if address.startswith(("tb1p", "bc1p", "bcrt1p")): + return "p2tr" + if address.startswith(("2", "3")): + return "p2sh-p2wpkh" + return "unknown" + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 3. VULNERABILITY DETECTORS +# +# Each detector receives the TxGraph and reports findings. +# ═══════════════════════════════════════════════════════════════════════════════ + +def detect_01_address_reuse(g: TxGraph): + """Detect addresses that appear as recipients in multiple transactions.""" + section("1 · Address Reuse") + reused = {} + for addr in g.our_addrs: + # Count distinct TXIDs where this address received funds + receive_txids = set() + for entry in g.addr_txs.get(addr, []): + if entry["category"] == "receive": + receive_txids.add(entry["txid"]) + if len(receive_txids) >= 2: + reused[addr] = receive_txids + + if not reused: + ok("No address reuse detected.") + return + + for addr, txids in reused.items(): + meta = g.addr_map.get(addr, {}) + role = "change" if meta.get("internal") else "receive" + finding(f"Address {addr} ({role}) used in {len(txids)} different transactions") + for txid in sorted(txids): + tx = g.fetch_tx(txid) + confs = tx.get("confirmations", "?") if tx else "?" + info(f"TX {txid[:16]}… ({confs} confirmations)") + info(f"An observer links all {len(txids)} transactions to the same entity.") + + +def detect_02_cioh(g: TxGraph): + """Detect multi-input transactions (CIOH) and verify input ownership.""" + section("2 · Common Input Ownership Heuristic (CIOH)") + found_any = False + + for txid in g.our_txids: + tx = g.fetch_tx(txid) + if not tx or len(tx.get("vin", [])) < 2: + continue + + input_addrs = g.get_input_addresses(txid) + if len(input_addrs) < 2: + continue + + # Classify inputs: ours vs external + our_inputs = [ia for ia in input_addrs if g.is_ours(ia["address"])] + ext_inputs = [ia for ia in input_addrs if not g.is_ours(ia["address"])] + total_inputs = len(input_addrs) + n_ours = len(our_inputs) + + if n_ours < 2: + # Only 1 of ours — CIOH doesn't expose us + continue + + found_any = True + n_outputs = len(tx.get("vout", [])) + ownership_pct = n_ours / total_inputs * 100 + + severity = "CRITICAL" if n_ours == total_inputs else "HIGH" + finding( + f"TX {txid[:16]}… has {total_inputs} inputs, {n_ours} are YOURS " + f"({ownership_pct:.0f}% ownership) [{severity}]" + ) + + # Shape analysis + if total_inputs >= 3 and n_outputs <= 2: + info(f"Consolidation shape: {total_inputs} inputs → {n_outputs} outputs (many→few)") + + # List the linked addresses + linked_addrs = set() + for ia in our_inputs: + linked_addrs.add(ia["address"]) + info(f"CIOH assumption: all {total_inputs} input addresses belong to the same entity.") + if n_ours == total_inputs: + info(f"CONFIRMED: all {n_ours} inputs are derived from your descriptor — this is provably your consolidation.") + else: + info(f"{n_ours}/{total_inputs} inputs are yours; the remaining {len(ext_inputs)} are external.") + info("An observer still assumes all inputs are one entity (CIOH).") + + for ia in our_inputs[:8]: + meta = g.addr_map.get(ia["address"], {}) + role = "change" if meta.get("internal") else "receive" + info(f" YOUR input: {ia['address'][:30]}… ({role}, {ia['value']:.8f} BTC)") + for ia in ext_inputs[:4]: + info(f" EXT input: {ia['address'][:30]}… ({ia['value']:.8f} BTC)") + + if not found_any: + ok("No multi-input transactions with ≥2 of your addresses detected.") + + +def detect_03_dust(g: TxGraph): + """Detect dust UTXOs (current and historical).""" + section("3 · Dust UTXO Detection") + DUST_SATS = 1000 + STRICT_DUST = 546 + + found = [] + for utxo in g.utxos: + sats = int(round(utxo["amount"] * 1e8)) + if sats <= DUST_SATS and g.is_ours(utxo.get("address", "")): + found.append(utxo) + + # Also check historical: any tx that sent dust to our addresses + hist_dust = [] + for txid in g.our_txids: + outputs = g.get_output_addresses(txid) + for out in outputs: + sats = int(round(out["value"] * 1e8)) + if sats <= DUST_SATS and g.is_ours(out["address"]): + hist_dust.append({"txid": txid, "address": out["address"], "sats": sats}) + + if not found and not hist_dust: + ok("No dust UTXOs detected.") + return + + if found: + finding(f"{len(found)} dust UTXO(s) currently in your wallet") + for u in found: + sats = int(round(u["amount"] * 1e8)) + label = "STRICT DUST" if sats <= STRICT_DUST else "dust-class" + finding(f" {u['address'][:30]}… = {sats} sats ({label}) — TX {u['txid'][:16]}…") + info("Dust UTXOs can be tracking tokens planted by an adversary (dust attack).") + info("If you spend this alongside a normal UTXO, the attacker links them via CIOH.") + + # Deduplicate historical + seen = set() + unique_hist = [] + for h in hist_dust: + key = (h["txid"], h["address"]) + if key not in seen: + seen.add(key) + unique_hist.append(h) + + if unique_hist: + if found: + extra = len(unique_hist) - len(found) + if extra > 0: + info(f"Additionally, {extra} dust outputs were sent to your addresses historically " + f"(already spent).") + else: + finding(f"{len(unique_hist)} dust output(s) were sent to your addresses historically (already spent)") + for h in unique_hist[:5]: + info(f" {h['address'][:30]}… = {h['sats']} sats — TX {h['txid'][:16]}…") + info("Dust UTXOs are tracking tokens planted by an adversary (dust attack).") + info("If spent alongside normal UTXOs, the attacker links them via CIOH.") + + +def detect_04_dust_spending(g: TxGraph): + """Detect transactions that spend dust alongside normal inputs.""" + section("4 · Dust Spent with Normal Inputs") + DUST_SATS = 1000 + found_any = False + + for txid in g.our_txids: + input_addrs = g.get_input_addresses(txid) + if not input_addrs or len(input_addrs) < 2: + continue + + dust_inputs = [] + normal_inputs = [] + for ia in input_addrs: + if not g.is_ours(ia["address"]): + continue + sats = int(round(ia["value"] * 1e8)) + if sats <= DUST_SATS: + dust_inputs.append(ia) + elif sats > 10000: # > 10k sats = clearly normal + normal_inputs.append(ia) + + if dust_inputs and normal_inputs: + found_any = True + finding( + f"TX {txid[:16]}… spends {len(dust_inputs)} dust input(s) alongside " + f"{len(normal_inputs)} normal input(s)" + ) + for d in dust_inputs: + info(f" Dust: {d['address'][:30]}… = {int(round(d['value']*1e8))} sats") + for n in normal_inputs: + info(f" Normal: {n['address'][:30]}… = {n['value']:.8f} BTC") + info("A dust attacker can now link your normal UTXO to the dust tracking token via CIOH.") + + if not found_any: + ok("No dust spending mixed with normal inputs detected.") + + +def detect_05_change_detection(g: TxGraph): + """Detect transactions where change output is easily distinguishable.""" + section("5 · Probable Change Output Detection") + found_any = False + + for txid in g.our_txids: + tx = g.fetch_tx(txid) + if not tx: + continue + outputs = g.get_output_addresses(txid) + input_addrs = g.get_input_addresses(txid) + if not outputs or len(outputs) < 2: + continue + + # We only care about sends (where at least 1 input is ours) + our_in = [ia for ia in input_addrs if g.is_ours(ia["address"])] + if not our_in: + continue + + # Identify which outputs are ours (change) vs external (payment) + our_outs = [o for o in outputs if g.is_ours(o["address"])] + ext_outs = [o for o in outputs if not g.is_ours(o["address"])] + + if not our_outs or not ext_outs: + continue # can't distinguish change if all outputs are ours or all external + + # Check change-detection heuristics + problems = [] + + for change in our_outs: + ch_sats = int(round(change["value"] * 1e8)) + ch_round = ch_sats % 100000 == 0 or ch_sats % 1000000 == 0 + + for payment in ext_outs: + pay_sats = int(round(payment["value"] * 1e8)) + pay_round = pay_sats % 100000 == 0 or pay_sats % 1000000 == 0 + + # Heuristic 1: payment is round, change is not + if pay_round and not ch_round: + problems.append(f"Round payment ({pay_sats} sats) vs non-round change ({ch_sats} sats)") + + # Heuristic 2: change has same script type as input + in_types = set(g.get_script_type(ia["address"]) for ia in our_in) + ch_type = g.get_script_type(change["address"]) + if ch_type in in_types and change["type"] != payment["type"]: + problems.append( + f"Change script type ({change['type']}) matches input type — different from payment ({payment['type']})" + ) + + # Heuristic 3: change address is internal (derivation /1/*) + ch_meta = g.addr_map.get(change["address"], {}) + if ch_meta.get("internal"): + problems.append("Change uses an internal (BIP-44 /1/*) derivation path — standard wallet change pattern") + + if problems: + found_any = True + finding(f"TX {txid[:16]}… has identifiable change output(s)") + for p in problems[:6]: + info(p) + for co in our_outs: + info(f" Probable change: {co['address'][:30]}… = {co['value']:.8f} BTC") + info("An observer can distinguish payment from change, tracking your remaining funds.") + + if not found_any: + ok("No easily identifiable change outputs detected.") + + +def detect_06_consolidation_origin(g: TxGraph): + """Detect UTXOs that originate from a prior consolidation transaction.""" + section("6 · UTXOs from Prior Consolidation") + CONSOLIDATION_THRESHOLD = 3 # ≥3 inputs with ≤2 outputs = consolidation + found_any = False + + for utxo in g.utxos: + if not g.is_ours(utxo.get("address", "")): + continue + parent = g.fetch_tx(utxo["txid"]) + if not parent: + continue + n_in = len(parent.get("vin", [])) + n_out = len(parent.get("vout", [])) + if n_in >= CONSOLIDATION_THRESHOLD and n_out <= 2: + found_any = True + # Check how many of the consolidation inputs were ours + parent_inputs = g.get_input_addresses(utxo["txid"]) + our_parent_in = [ia for ia in parent_inputs if g.is_ours(ia["address"])] + finding( + f"UTXO {utxo['txid'][:16]}…:{utxo['vout']} ({utxo['amount']:.8f} BTC) " + f"was born from consolidation ({n_in} inputs → {n_out} output)" + ) + if our_parent_in: + info(f"{len(our_parent_in)}/{n_in} inputs were yours — this was YOUR consolidation.") + info("This UTXO carries the full cluster linkage of all merged inputs.") + info("Anyone who traces back 1 hop sees all the addresses you linked together.") + + if not found_any: + ok("No UTXOs from prior consolidation detected.") + + +def detect_07_script_type_mixing(g: TxGraph): + """Detect transactions mixing different script types in inputs.""" + section("7 · Script Type Mixing in Inputs") + found_any = False + + for txid in g.our_txids: + input_addrs = g.get_input_addresses(txid) + if len(input_addrs) < 2: + continue + + our_in = [ia for ia in input_addrs if g.is_ours(ia["address"])] + if len(our_in) < 2: + continue + + types = set() + for ia in input_addrs: + types.add(g.get_script_type(ia["address"])) + + types.discard("unknown") + if len(types) >= 2: + found_any = True + finding(f"TX {txid[:16]}… mixes input script types: {types}") + for ia in input_addrs: + mine = "YOURS" if g.is_ours(ia["address"]) else "ext" + info(f" [{mine}] {ia['address'][:30]}… type={g.get_script_type(ia['address'])}") + info("Mixing script types is a strong wallet fingerprint. Most wallets use one type.") + info("This reveals that a single entity controls multiple address families.") + + if not found_any: + ok("No script type mixing detected.") + + +def detect_08_cluster_merge(g: TxGraph): + """Detect transactions that merge UTXOs from different funding sources (clusters).""" + section("8 · Cluster Merge (Cross-Origin Input Mixing)") + found_any = False + + for txid in g.our_txids: + input_addrs = g.get_input_addresses(txid) + if len(input_addrs) < 2: + continue + + our_in = [ia for ia in input_addrs if g.is_ours(ia["address"])] + if len(our_in) < 2: + continue + + # Trace each of our inputs one hop back to find their funding sources + funding_sources = {} # our_input_txid:vout -> set of grandparent source txids + for ia in our_in: + parent_tx = g.fetch_tx(ia["txid"]) + if not parent_tx: + continue + gp_sources = set() + for p_vin in parent_tx.get("vin", []): + if p_vin.get("coinbase"): + gp_sources.add("coinbase") + else: + gp_sources.add(p_vin["txid"][:16]) + funding_sources[f"{ia['txid'][:16]}:{ia['vout']}"] = gp_sources + + # Check if funding sources differ + all_sources = list(funding_sources.values()) + if len(all_sources) >= 2: + # Are the source sets disjoint? (different clusters) + merged_clusters = False + for i in range(len(all_sources)): + for j in range(i + 1, len(all_sources)): + if all_sources[i].isdisjoint(all_sources[j]): + merged_clusters = True + + if merged_clusters: + found_any = True + finding(f"TX {txid[:16]}… merges UTXOs from different funding chains") + for key, sources in funding_sources.items(): + info(f" Input {key} ← funded by {sources}") + info("Previously separate identity clusters are now permanently linked.") + info("An observer can conclude the same entity controlled both funding paths.") + + if not found_any: + ok("No cross-origin cluster merges detected.") + + +def detect_09_lookback_depth(g: TxGraph): + """Detect UTXOs with significantly different ages (dormancy patterns).""" + section("9 · UTXO Age / Lookback Depth") + + if not g.utxos: + ok("No UTXOs to analyze.") + return + + our_utxos = [u for u in g.utxos if g.is_ours(u.get("address", ""))] + if not our_utxos: + ok("No UTXOs belonging to the descriptor.") + return + + # Get confirmation counts + aged = [] + for u in our_utxos: + confs = u.get("confirmations", 0) + aged.append({"utxo": u, "confirmations": confs}) + + if len(aged) < 2: + ok("Only one UTXO, no age comparison possible.") + return + + aged.sort(key=lambda x: x["confirmations"], reverse=True) + oldest = aged[0] + newest = aged[-1] + spread = oldest["confirmations"] - newest["confirmations"] + + if spread < 10: + ok(f"UTXO age spread is small ({spread} blocks). No dormancy pattern.") + return + + finding(f"UTXO age spread: {spread} blocks between oldest and newest") + info(f"Oldest: {oldest['utxo']['txid'][:16]}… = {oldest['confirmations']} confirmations " + f"({oldest['utxo']['amount']:.8f} BTC)") + info(f"Newest: {newest['utxo']['txid'][:16]}… = {newest['confirmations']} confirmations " + f"({newest['utxo']['amount']:.8f} BTC)") + + # Flag very old UTXOs + OLD_THRESHOLD = 100 # blocks + old_utxos = [a for a in aged if a["confirmations"] >= OLD_THRESHOLD] + if old_utxos: + warn(f"{len(old_utxos)} UTXO(s) have ≥{OLD_THRESHOLD} confirmations — dormant/hoarded coins pattern") + + info("UTXO age reveals dormancy patterns and can distinguish 'fresh' exchange") + info("withdrawals from aged savings. Spending old + new together worsens this.") + + +def detect_10_exchange_origin(g: TxGraph, known_exchange_wallets=None): + """Detect UTXOs that likely originated from exchange batch withdrawals.""" + section("10 · Probable Exchange Origin") + + # Build set of known exchange txids if wallet names provided + exchange_txids = set() + if known_exchange_wallets: + for ew in known_exchange_wallets: + try: + etxs = cli("listtransactions", "*", 10000, 0, "true", wallet=ew) + for etx in (etxs or []): + if etx.get("txid"): + exchange_txids.add(etx["txid"]) + except Exception: + pass + + BATCH_THRESHOLD = 5 # ≥5 outputs = likely batch withdrawal + found_any = False + + for txid in g.our_txids: + tx = g.fetch_tx(txid) + if not tx: + continue + + n_out = len(tx.get("vout", [])) + if n_out < BATCH_THRESHOLD: + continue + + # Check: do we RECEIVE in this tx? (we're a recipient, not sender) + our_inputs = [ia for ia in g.get_input_addresses(txid) if g.is_ours(ia["address"])] + our_outputs = [o for o in g.get_output_addresses(txid) if g.is_ours(o["address"])] + + if our_inputs: + # We're a sender in a many-output TX — that's OUR batch, not exchange + continue + + if not our_outputs: + continue + + # Heuristics for exchange batch + signals = [] + + # 1. High output count + signals.append(f"High output count: {n_out}") + + # 2. Many unique addresses + unique_addrs = set() + for vout in tx["vout"]: + a = vout.get("scriptPubKey", {}).get("address", "") + if a: + unique_addrs.add(a) + if len(unique_addrs) >= BATCH_THRESHOLD: + signals.append(f"{len(unique_addrs)} unique recipient addresses") + + # 3. Known exchange wallet + if txid in exchange_txids: + signals.append("TX matches known exchange wallet history") + + # 4. Large input relative to individual outputs + input_addrs = g.get_input_addresses(txid) + input_total = sum(ia["value"] for ia in input_addrs) + output_vals = sorted(v.get("value", 0) for v in tx["vout"]) + if output_vals: + median_out = output_vals[len(output_vals) // 2] + if median_out > 0: + ratio = input_total / median_out + if ratio > 10: + signals.append(f"Input/median-output ratio: {ratio:.0f}x (hot wallet pattern)") + + if len(signals) >= 2: + found_any = True + finding(f"TX {txid[:16]}… looks like an exchange batch withdrawal") + for s in signals: + info(s) + for o in our_outputs: + info(f" You received: {o['address'][:30]}… = {o['value']:.8f} BTC") + info("UTXOs from exchange withdrawals reveal you interacted with that exchange.") + + if not found_any: + ok("No exchange-origin batch patterns detected.") + + +def detect_11_tainted_utxos(g: TxGraph, known_risky_wallets=None): + """Detect UTXOs that have taint from known risky sources.""" + section("11 · Tainted UTXOs / Risky Source Exposure") + + if not known_risky_wallets: + info("No --known-risky-wallets provided. Skipping taint analysis.") + info("(Provide wallet names to enable: --known-risky-wallets risky)") + ok("Taint detection requires known-risky wallet metadata.") + return + + # Build set of risky TXIDs + risky_txids = set() + for rw in known_risky_wallets: + try: + rtxs = cli("listtransactions", "*", 10000, 0, "true", wallet=rw) + for rtx in (rtxs or []): + if rtx.get("txid"): + risky_txids.add(rtx["txid"]) + except Exception: + info(f"Could not read wallet '{rw}'") + + if not risky_txids: + info("No transactions found in risky wallets.") + return + + found_any = False + + for txid in g.our_txids: + input_addrs = g.get_input_addresses(txid) + our_in = [ia for ia in input_addrs if g.is_ours(ia["address"])] + if not our_in or len(input_addrs) < 2: + continue + + tainted = [] + clean = [] + for ia in input_addrs: + # An input is tainted if its funding TX is in a risky wallet's history + if ia["txid"] in risky_txids: + tainted.append(ia) + else: + clean.append(ia) + + if tainted and clean: + found_any = True + taint_pct = len(tainted) / len(input_addrs) * 100 + finding( + f"TX {txid[:16]}… merges {len(tainted)} tainted + {len(clean)} clean inputs " + f"({taint_pct:.0f}% taint)" + ) + for t in tainted: + info(f" TAINTED: {t['address'][:30]}… = {t['value']:.8f} BTC (from risky TX {t['txid'][:16]}…)") + for c in clean[:4]: + info(f" CLEAN: {c['address'][:30]}… = {c['value']:.8f} BTC") + info("Taint propagation: ALL outputs of this TX are now contaminated.") + info("Even clean recipients inherit the taint via the merge.") + + # Also check: did we receive directly from a risky source? + for txid in g.our_txids: + if txid in risky_txids: + our_outs = [o for o in g.get_output_addresses(txid) if g.is_ours(o["address"])] + if our_outs: + found_any = True + warn(f"TX {txid[:16]}… is directly from a known risky source") + for o in our_outs: + info(f" You received: {o['address'][:30]}… = {o['value']:.8f} BTC") + + if not found_any: + ok("No tainted UTXO merges detected.") + + +def detect_12_behavioral_fingerprint(g: TxGraph): + """ + Analyze the descriptor's transaction set for patterns that make the user + identifiable through behavioral consistency. + + We evaluate OBJECTIVE, measurable features that chain analysis firms + actually use to cluster and fingerprint wallets. + """ + section("12 · Behavioral Fingerprint Analysis") + + # Collect send transactions (where we have inputs) + send_txids = [] + for txid in g.our_txids: + input_addrs = g.get_input_addresses(txid) + our_in = [ia for ia in input_addrs if g.is_ours(ia["address"])] + if our_in: + send_txids.append(txid) + + if len(send_txids) < 3: + ok(f"Only {len(send_txids)} send transactions — not enough data for fingerprinting.") + return + + # ── Feature extraction ── + output_counts = [] + payment_amounts_sats = [] + change_amounts_sats = [] + input_script_types = [] + output_script_types = [] + rbf_signals = [] + locktime_values = [] + fee_rates = [] # sat/vB + n_inputs_list = [] + uses_round_amounts = 0 + total_payments = 0 + change_address_types_used = set() + payment_address_types_used = set() + version_numbers = set() + + for txid in send_txids: + tx = g.fetch_tx(txid) + if not tx: + continue + + n_in = len(tx.get("vin", [])) + n_out = len(tx.get("vout", [])) + n_inputs_list.append(n_in) + output_counts.append(n_out) + + # Version + version_numbers.add(tx.get("version", 2)) + + # Locktime + locktime_values.append(tx.get("locktime", 0)) + + # RBF signalling + for vin in tx.get("vin", []): + seq = vin.get("sequence", 0xffffffff) + rbf_signals.append(seq < 0xfffffffe) + + # Input script types + for ia in g.get_input_addresses(txid): + if g.is_ours(ia["address"]): + input_script_types.append(g.get_script_type(ia["address"])) + + # Output analysis + outputs = g.get_output_addresses(txid) + for out in outputs: + sats = int(round(out["value"] * 1e8)) + if g.is_ours(out["address"]): + # Change output + change_amounts_sats.append(sats) + change_address_types_used.add(out["type"]) + else: + # Payment output + payment_amounts_sats.append(sats) + output_script_types.append(out["type"]) + payment_address_types_used.add(out["type"]) + total_payments += 1 + if sats > 0 and (sats % 100000 == 0 or sats % 1000000 == 0): + uses_round_amounts += 1 + + # Fee rate + if "vsize" in tx and tx["vsize"] > 0: + # Compute fee from inputs - outputs + in_total = sum(ia["value"] for ia in g.get_input_addresses(txid)) + out_total = sum(v.get("value", 0) for v in tx["vout"]) + fee_sats = int(round((in_total - out_total) * 1e8)) + if fee_sats > 0: + fee_rates.append(fee_sats / tx["vsize"]) + + # ── Analysis ── + problems = [] + + # 1. Round amount usage pattern + if total_payments > 0: + round_pct = uses_round_amounts / total_payments * 100 + if round_pct > 60: + problems.append( + f"Round payment amounts: {round_pct:.0f}% of payments are round numbers. " + "This is a distinctive behavioral pattern that aids clustering." + ) + + # 2. Consistent output count (always 2 outputs = simple spend pattern) + if output_counts: + avg_outs = sum(output_counts) / len(output_counts) + if all(c == output_counts[0] for c in output_counts) and len(output_counts) >= 3: + problems.append( + f"Uniform output count: all {len(output_counts)} send TXs have exactly " + f"{output_counts[0]} outputs. Consistent structure aids fingerprinting." + ) + + # 3. Script type consistency or mixing + input_types_set = set(input_script_types) + if len(input_types_set) > 1: + problems.append( + f"Mixed input script types used across TXs: {input_types_set}. " + "Mixing address families is rare and highly identifying." + ) + elif len(input_types_set) == 1 and input_script_types: + t = input_types_set.pop() + if t == "p2pkh": + problems.append( + f"All inputs use legacy P2PKH — a very uncommon script type today. " + "This alone narrows your anonymity set significantly." + ) + + # 4. RBF signaling consistency + if rbf_signals: + rbf_pct = sum(rbf_signals) / len(rbf_signals) * 100 + if rbf_pct == 100: + problems.append( + f"RBF always enabled: 100% of inputs signal replace-by-fee. " + "While increasingly common, it's a distinguishing feature vs non-RBF wallets." + ) + elif rbf_pct == 0: + problems.append( + "RBF never enabled: 0% of inputs signal replace-by-fee. " + "This is uncommon in modern wallets and distinguishes your software." + ) + + # 5. Locktime pattern + if locktime_values: + nonzero_lt = [lt for lt in locktime_values if lt > 0] + if len(nonzero_lt) == len(locktime_values) and len(locktime_values) >= 3: + problems.append( + "Anti-fee-sniping locktime always set — consistent with Bitcoin Core / Electrum. " + "Absence or presence of this reveals your wallet software." + ) + elif not nonzero_lt and len(locktime_values) >= 3: + problems.append( + "Locktime always 0 — no anti-fee-sniping. " + "This distinguishes your wallet from Bitcoin Core / Electrum defaults." + ) + + # 6. Fee rate consistency + if len(fee_rates) >= 3: + avg_fee = sum(fee_rates) / len(fee_rates) + if avg_fee > 0: + variance = sum((f - avg_fee) ** 2 for f in fee_rates) / len(fee_rates) + stddev = variance ** 0.5 + cv = stddev / avg_fee # coefficient of variation + if cv < 0.15: + problems.append( + f"Very consistent fee rate: avg {avg_fee:.1f} sat/vB ± {stddev:.1f} " + f"(CV={cv:.2f}). Low variance suggests fixed-fee-rate wallet configuration." + ) + + # 7. Change address type pattern + if change_address_types_used and payment_address_types_used: + if change_address_types_used != payment_address_types_used: + # This leaks which outputs are change + problems.append( + f"Change uses different script type ({change_address_types_used}) " + f"than payments ({payment_address_types_used}) — trivially identifies change outputs." + ) + + # 8. Input count pattern (always 1 input = no consolidation; always many = distinctive) + if n_inputs_list and len(n_inputs_list) >= 3: + if all(n == 1 for n in n_inputs_list): + pass # normal, not distinctive + elif all(n == n_inputs_list[0] for n in n_inputs_list) and n_inputs_list[0] > 1: + problems.append( + f"Always uses exactly {n_inputs_list[0]} inputs per TX — unusual and identifying." + ) + + # ── Report ── + if not problems: + ok(f"Analyzed {len(send_txids)} transactions. No strong behavioral fingerprints detected.") + return + + finding(f"Behavioral fingerprint detected across {len(send_txids)} send transactions") + for p in problems: + warn(p) + + info("") + info(f"Summary: {len(problems)} identifiable pattern(s) found.") + info("Chain analysis firms use exactly these features to cluster wallets.") + info("Even without address reuse, behavioral consistency can re-identify you.") + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 4. MAIN +# ═══════════════════════════════════════════════════════════════════════════════ + +def main(): + parser = argparse.ArgumentParser( + description="Detect Bitcoin privacy vulnerabilities from output descriptors.", + epilog="Examples:\n" + " python3 detect.py --wallet alice\n" + ' python3 detect.py --wallet alice --known-risky-wallets risky\n' + ' python3 detect.py "wpkh(tpub.../0/*)#chk" "wpkh(tpub.../1/*)#chk"\n', + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("descriptors", nargs="*", help="Output descriptors to scan") + parser.add_argument("--wallet", "-w", help="Read descriptors from an existing wallet") + parser.add_argument("--known-risky-wallets", nargs="*", default=None, + help="Wallet names whose TXIDs are considered tainted") + parser.add_argument("--known-exchange-wallets", nargs="*", default=None, + help="Wallet names whose TXIDs are considered exchange-origin") + parser.add_argument("--keep-scan-wallet", action="store_true", + help="Don't delete the temporary scan wallet after running") + args = parser.parse_args() + + if not args.wallet and not args.descriptors: + parser.error("Provide either --wallet or one or more descriptors.") + + print(f"\n{B}{'═'*78}{RST}") + print(f"{B}{C} BITCOIN PRIVACY VULNERABILITY DETECTOR{RST}") + print(f"{B}{'═'*78}{RST}") + + # ── Step 1: Resolve descriptors ── + section("Setup: Resolving Descriptors") + descriptors = resolve_descriptors(args) + info(f"Found {len(descriptors)} descriptors") + for d in descriptors: + dtype = d["desc"].split("(")[0] + role = "internal/change" if d["internal"] else "external/receive" + info(f" {dtype:15} {role:20} range [0..{d['range_end']}]") + + # ── Step 2: Derive all addresses ── + section("Setup: Deriving Addresses") + addr_map = derive_all_addresses(descriptors) + info(f"Derived {len(addr_map)} addresses across all descriptor types") + + # Count by type + type_counts = defaultdict(int) + for meta in addr_map.values(): + type_counts[meta["type"]] += 1 + for t, c in sorted(type_counts.items()): + info(f" {t}: {c} addresses") + + # ── Step 3: Build watch-only wallet ── + section("Setup: Building Scan Wallet") + scan_wallet = "_detect_scan" + if args.wallet: + # If they gave us a wallet, just use it directly — faster, no rescan needed + scan_wallet = args.wallet + info(f"Using existing wallet '{scan_wallet}' directly (no rescan needed)") + else: + scan_wallet = build_scan_wallet(descriptors) + info(f"Created temporary watch-only wallet '{scan_wallet}' with full rescan") + + # ── Step 4: Gather transaction history ── + section("Setup: Loading Transaction History") + wallet_txs = get_all_transactions(scan_wallet) + utxos = get_all_utxos(scan_wallet) + info(f"Transaction history: {len(wallet_txs)} entries") + info(f"Current UTXOs: {len(utxos)}") + + if not wallet_txs: + print(f"\n {R_}No transactions found for these descriptors.{RST}") + print(f" Make sure you have run reproduce.py first, or the descriptors are correct.\n") + return + + # ── Step 5: Build transaction graph ── + g = TxGraph(addr_map, wallet_txs, utxos) + info(f"Unique transaction IDs: {len(g.our_txids)}") + + # ── Step 6: Run all detectors ── + detect_01_address_reuse(g) + detect_02_cioh(g) + detect_03_dust(g) + detect_04_dust_spending(g) + detect_05_change_detection(g) + detect_06_consolidation_origin(g) + detect_07_script_type_mixing(g) + detect_08_cluster_merge(g) + detect_09_lookback_depth(g) + detect_10_exchange_origin(g, args.known_exchange_wallets) + detect_11_tainted_utxos(g, args.known_risky_wallets) + detect_12_behavioral_fingerprint(g) + + # ── Summary ── + print(f"\n{B}{'═'*78}{RST}") + print(f"{B} SCAN COMPLETE{RST}") + print(f"{'═'*78}") + print(f" {R_}⚠ Findings: {FINDING_COUNT}{RST}") + print(f" {Y}⚡ Warnings: {WARN_COUNT}{RST}") + print(f" Transactions analyzed: {len(g.our_txids)}") + print(f" Addresses derived: {len(addr_map)}") + if FINDING_COUNT == 0 and WARN_COUNT == 0: + print(f" {G}✓ No privacy issues detected.{RST}") + print(f"{'═'*78}\n") + + # Cleanup + if not args.wallet and not args.keep_scan_wallet: + try: + cli("unloadwallet", "_detect_scan") + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/backend/script/mine_blocks.sh b/backend/script/mine_blocks.sh new file mode 100755 index 0000000..4253aba --- /dev/null +++ b/backend/script/mine_blocks.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +# mine_blocks.sh — Mine N blocks on the custom Signet +set -euo pipefail + +N="${1:-1}" +source "$HOME/.bitcoin/signet_keys.env" + +MINER="/home/renato/Desktop/bitcoin/bitcoin/contrib/signet/miner" +GRIND="bitcoin-util grind" +CLI="bitcoin-cli -signet" + +CURRENT=$($CLI getblockcount) +TARGET=$((CURRENT + N)) +echo "Mining $N blocks (from $CURRENT to $TARGET)..." + +BLOCK_TIME=$(date +%s) +for i in $(seq 1 $N); do + BLOCK_TIME=$((BLOCK_TIME + 1)) + $MINER \ + --cli="bitcoin-cli -rpcwallet=miner" \ + generate \ + --grind-cmd="$GRIND" \ + --address="$MINER_ADDR" \ + --min-nbits \ + --set-block-time="$BLOCK_TIME" \ + 2>&1 >/dev/null +done +echo "Done. Block height: $($CLI getblockcount)" diff --git a/backend/script/openconf.sh b/backend/script/openconf.sh new file mode 100755 index 0000000..04deef3 --- /dev/null +++ b/backend/script/openconf.sh @@ -0,0 +1 @@ +code ~/.bitcoin/bitcoin.conf \ No newline at end of file diff --git a/backend/script/reproduce.py b/backend/script/reproduce.py new file mode 100644 index 0000000..eeb1449 --- /dev/null +++ b/backend/script/reproduce.py @@ -0,0 +1,418 @@ +#!/usr/bin/env python3 +""" +reproduce.py +============ +Reproduces 12 Bitcoin privacy vulnerabilities on a local custom Signet. +Each run creates NEW on-chain transactions that exhibit the vulnerability. +No detection logic — that lives in detect.py. + +Usage: + python3 reproduce.py # Create all 12 vulnerability scenarios + python3 reproduce.py -k 3 # Create only vulnerability 3 +""" + +import sys +import os +import json +import time + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from bitcoin_rpc import ( + cli, mine_blocks, get_tx, get_utxos, get_balance, + get_new_address, send_to_address, create_raw_tx, sign_raw_tx, + send_raw, get_block_count, create_funded_psbt, + process_psbt, finalize_psbt, +) + +# ═══════════════════════════════════════════════════════════════════════════════ +# Formatting helpers +# ═══════════════════════════════════════════════════════════════════════════════ +G = "\033[92m"; Y = "\033[93m"; C = "\033[96m"; B = "\033[1m"; R = "\033[0m" + +def header(num, title): + print(f"\n{'═'*78}") + print(f"{B}{C} REPRODUCE {num}: {title}{R}") + print(f"{'═'*78}") + +def ok(msg): + print(f" {G}✓{R} {msg}") + +def info(msg): + print(f" {Y}ℹ{R} {msg}") + +def ensure_funds(wallet, min_btc=0.5): + bal = get_balance(wallet) + if bal < min_btc: + addr = get_new_address(wallet, "bech32") + send_to_address("miner", addr, min_btc + 0.5) + mine_blocks(1) + +def mine_and_confirm(): + mine_blocks(1) + time.sleep(0.5) + +# ═══════════════════════════════════════════════════════════════════════════════ +# 1. Address Reuse +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_01(): + header(1, "Address Reuse") + ensure_funds("bob", 1.0) + reused_addr = get_new_address("alice", "bech32") + txid1 = send_to_address("bob", reused_addr, 0.01) + txid2 = send_to_address("bob", reused_addr, 0.02) + mine_and_confirm() + ok(f"Sent to same address {reused_addr} twice: TX {txid1[:16]}… and {txid2[:16]}…") + +# ═══════════════════════════════════════════════════════════════════════════════ +# 2. Multi-input / CIOH +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_02(): + header(2, "Multi-input / CIOH (Common Input Ownership Heuristic)") + ensure_funds("bob", 2.0) + for _ in range(5): + addr = get_new_address("alice", "bech32") + send_to_address("bob", addr, 0.005) + mine_and_confirm() + + utxos = get_utxos("alice", 1) + small = [u for u in utxos if 0.004 < u["amount"] < 0.006][:5] + if len(small) < 2: + info("Not enough small UTXOs, skipping consolidation step") + return + inputs = [{"txid": u["txid"], "vout": u["vout"]} for u in small] + dest = get_new_address("bob", "bech32") + total = sum(u["amount"] for u in small) + psbt_result = create_funded_psbt( + "alice", inputs, [{dest: round(total - 0.001, 8)}], + {"subtractFeeFromOutputs": [0], "add_inputs": False} + ) + signed = process_psbt("alice", psbt_result["psbt"]) + final = finalize_psbt(signed["psbt"]) + txid = send_raw(final["hex"]) + mine_and_confirm() + ok(f"Consolidated {len(small)} inputs in TX {txid[:16]}… (CIOH trigger)") + +# ═══════════════════════════════════════════════════════════════════════════════ +# 3. Dust UTXO Detection +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_03(): + header(3, "Dust UTXO Detection") + ensure_funds("bob", 1.0) + dust1 = get_new_address("alice", "bech32") + dust2 = get_new_address("alice", "bech32") + bob_utxos = get_utxos("bob", 1) + big = max(bob_utxos, key=lambda u: u["amount"]) + change = get_new_address("bob", "bech32") + change_amt = round(big["amount"] - 0.00001000 - 0.00000546 - 0.0001, 8) + raw = create_raw_tx( + [{"txid": big["txid"], "vout": big["vout"]}], + [{dust1: 0.00001000}, {dust2: 0.00000546}, {change: change_amt}] + ) + signed = sign_raw_tx("bob", raw) + txid = send_raw(signed["hex"]) + mine_and_confirm() + ok(f"Created 1000-sat and 546-sat dust outputs to Alice in TX {txid[:16]}…") + +# ═══════════════════════════════════════════════════════════════════════════════ +# 4. Spending Dust with Normal Inputs +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_04(): + header(4, "Spending Dust with Normal Inputs") + ensure_funds("alice", 0.5) + utxos = get_utxos("alice", 1) + dust_utxos = [u for u in utxos if u["amount"] <= 0.00001] + normal_utxos = [u for u in utxos if u["amount"] > 0.001] + + if not dust_utxos: + info("No dust UTXOs, creating one first…") + ensure_funds("bob", 1.0) + a = get_new_address("alice", "bech32") + bu = get_utxos("bob", 1) + big = max(bu, key=lambda u: u["amount"]) + ch = get_new_address("bob", "bech32") + raw = create_raw_tx( + [{"txid": big["txid"], "vout": big["vout"]}], + [{a: 0.00001000}, {ch: round(big["amount"] - 0.00001 - 0.0001, 8)}] + ) + signed = sign_raw_tx("bob", raw) + send_raw(signed["hex"]) + mine_and_confirm() + utxos = get_utxos("alice", 1) + dust_utxos = [u for u in utxos if u["amount"] <= 0.00001] + normal_utxos = [u for u in utxos if u["amount"] > 0.001] + + if not normal_utxos: + ensure_funds("alice", 0.5) + mine_and_confirm() + utxos = get_utxos("alice", 1) + normal_utxos = [u for u in utxos if u["amount"] > 0.001] + + dust = dust_utxos[0] + normal = normal_utxos[0] + dest = get_new_address("bob", "bech32") + total = dust["amount"] + normal["amount"] + raw = create_raw_tx( + [{"txid": dust["txid"], "vout": dust["vout"]}, + {"txid": normal["txid"], "vout": normal["vout"]}], + [{dest: round(total - 0.0001, 8)}] + ) + signed = sign_raw_tx("alice", raw) + txid = send_raw(signed["hex"]) + mine_and_confirm() + ok(f"Spent dust ({int(dust['amount']*1e8)} sats) + normal ({normal['amount']:.8f}) together in TX {txid[:16]}…") + +# ═══════════════════════════════════════════════════════════════════════════════ +# 5. Change Detection +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_05(): + header(5, "Change Detection — Round Payment") + ensure_funds("alice", 1.0) + bob_addr = get_new_address("bob", "bech32") + txid = send_to_address("alice", bob_addr, 0.05) + mine_and_confirm() + ok(f"Alice paid Bob 0.05 BTC (round amount) in TX {txid[:16]}… — change output is obvious") + +# ═══════════════════════════════════════════════════════════════════════════════ +# 6. Consolidation Origin +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_06(): + header(6, "Consolidation Origin") + ensure_funds("bob", 2.0) + for _ in range(4): + addr = get_new_address("alice", "bech32") + send_to_address("bob", addr, 0.003) + mine_and_confirm() + + utxos = get_utxos("alice", 1) + small = [u for u in utxos if 0.002 < u["amount"] < 0.004][:4] + if len(small) < 3: + info(f"Only {len(small)} small UTXOs, creating more…") + for _ in range(4): + addr = get_new_address("alice", "bech32") + send_to_address("bob", addr, 0.003) + mine_and_confirm() + utxos = get_utxos("alice", 1) + small = [u for u in utxos if 0.002 < u["amount"] < 0.004][:4] + + inputs = [{"txid": u["txid"], "vout": u["vout"]} for u in small] + consol_addr = get_new_address("alice", "bech32") + total = sum(u["amount"] for u in small) + raw = create_raw_tx(inputs, [{consol_addr: round(total - 0.0001, 8)}]) + signed = sign_raw_tx("alice", raw) + consol_txid = send_raw(signed["hex"]) + mine_and_confirm() + ok(f"Consolidated {len(small)} UTXOs → 1 in TX {consol_txid[:16]}…") + + # Now spend the consolidated output + utxos = get_utxos("alice", 1) + cu = [u for u in utxos if u["txid"] == consol_txid] + if cu: + dest = get_new_address("carol", "bech32") + raw = create_raw_tx( + [{"txid": cu[0]["txid"], "vout": cu[0]["vout"]}], + [{dest: round(cu[0]["amount"] - 0.0001, 8)}] + ) + signed = sign_raw_tx("alice", raw) + txid2 = send_raw(signed["hex"]) + mine_and_confirm() + ok(f"Spent consolidated UTXO in TX {txid2[:16]}… — carries full cluster history") + +# ═══════════════════════════════════════════════════════════════════════════════ +# 7. Script Type Mixing +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_07(): + header(7, "Script Type Mixing") + ensure_funds("bob", 2.0) + wpkh = get_new_address("alice", "bech32") + tr = get_new_address("alice", "bech32m") + send_to_address("bob", wpkh, 0.005) + send_to_address("bob", tr, 0.005) + mine_and_confirm() + + utxos = get_utxos("alice", 1) + def is_wpkh(addr): + return addr and not addr.startswith(("tb1p","bc1p","bcrt1p")) and addr.startswith(("tb1q","bc1q","bcrt1q")) + def is_tr(addr): + return addr and addr.startswith(("tb1p","bc1p","bcrt1p")) + wu = next((u for u in utxos if is_wpkh(u.get("address","")) and u["amount"] >= 0.004), None) + tu = next((u for u in utxos if is_tr(u.get("address","")) and u["amount"] >= 0.004), None) + if not wu or not tu: + info("Could not find both UTXO types") + return + dest = get_new_address("bob", "bech32") + total = wu["amount"] + tu["amount"] + raw = create_raw_tx( + [{"txid": wu["txid"], "vout": wu["vout"]}, + {"txid": tu["txid"], "vout": tu["vout"]}], + [{dest: round(total - 0.0002, 8)}] + ) + signed = sign_raw_tx("alice", raw) + txid = send_raw(signed["hex"]) + mine_and_confirm() + ok(f"Mixed P2WPKH + P2TR inputs in TX {txid[:16]}… — script type fingerprint") + +# ═══════════════════════════════════════════════════════════════════════════════ +# 8. Cluster Merge +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_08(): + header(8, "Cluster Merge") + ensure_funds("bob", 2.0) + ensure_funds("carol", 2.0) + a_addr = get_new_address("alice", "bech32") + b_addr = get_new_address("alice", "bech32") + txid_a = send_to_address("bob", a_addr, 0.004) + txid_b = send_to_address("carol", b_addr, 0.004) + mine_and_confirm() + + utxos = get_utxos("alice", 1) + ua = next((u for u in utxos if u["txid"] == txid_a), None) + ub = next((u for u in utxos if u["txid"] == txid_b), None) + if not ua: ua = next((u for u in utxos if u.get("address") == a_addr), None) + if not ub: ub = next((u for u in utxos if u.get("address") == b_addr), None) + if not ua or not ub: + info("Could not find both cluster UTXOs") + return + dest = get_new_address("bob", "bech32") + total = ua["amount"] + ub["amount"] + raw = create_raw_tx( + [{"txid": ua["txid"], "vout": ua["vout"]}, + {"txid": ub["txid"], "vout": ub["vout"]}], + [{dest: round(total - 0.0002, 8)}] + ) + signed = sign_raw_tx("alice", raw) + txid = send_raw(signed["hex"]) + mine_and_confirm() + ok(f"Merged Bob-cluster and Carol-cluster UTXOs in TX {txid[:16]}…") + +# ═══════════════════════════════════════════════════════════════════════════════ +# 9. Lookback Depth +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_09(): + header(9, "Lookback Depth / UTXO Age") + old_addr = get_new_address("alice", "bech32") + send_to_address("miner", old_addr, 0.01) + mine_blocks(20) + new_addr = get_new_address("alice", "bech32") + send_to_address("miner", new_addr, 0.01) + mine_and_confirm() + ok(f"Created old UTXO (20+ blocks ago) and new UTXO (just now) for Alice") + +# ═══════════════════════════════════════════════════════════════════════════════ +# 10. Exchange Origin +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_10(): + header(10, "Exchange Origin — Batch Withdrawal") + ensure_funds("exchange", 5.0) + batch = {} + wallets = ["alice", "bob", "carol", "alice", "bob", "carol", "alice", "bob"] + for i in range(8): + addr = get_new_address(wallets[i], "bech32") + batch[addr] = round(0.01 + i * 0.001, 8) + txid = cli("sendmany", "", json.dumps(batch), wallet="exchange") + mine_and_confirm() + ok(f"Exchange batch withdrawal to 8 recipients in TX {txid[:16]}…") + +# ═══════════════════════════════════════════════════════════════════════════════ +# 11. Tainted UTXOs +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_11(): + header(11, "Tainted UTXOs / Dirty Money") + ensure_funds("risky", 2.0) + ensure_funds("bob", 1.0) + ta = get_new_address("alice", "bech32") + taint_txid = send_to_address("risky", ta, 0.01) + ca = get_new_address("alice", "bech32") + clean_txid = send_to_address("bob", ca, 0.01) + mine_and_confirm() + + utxos = get_utxos("alice", 1) + tu = next((u for u in utxos if u["txid"] == taint_txid), None) + cu = next((u for u in utxos if u["txid"] == clean_txid), None) + if not tu: tu = next((u for u in utxos if u.get("address") == ta), None) + if not cu: cu = next((u for u in utxos if u.get("address") == ca), None) + if not tu or not cu: + info("Could not locate tainted + clean UTXOs") + return + dest = get_new_address("carol", "bech32") + total = tu["amount"] + cu["amount"] + raw = create_raw_tx( + [{"txid": tu["txid"], "vout": tu["vout"]}, + {"txid": cu["txid"], "vout": cu["vout"]}], + [{dest: round(total - 0.0002, 8)}] + ) + signed = sign_raw_tx("alice", raw) + txid = send_raw(signed["hex"]) + mine_and_confirm() + ok(f"Merged tainted + clean UTXOs in TX {txid[:16]}… — taint propagation") + +# ═══════════════════════════════════════════════════════════════════════════════ +# 12. Behavioral Fingerprinting +# ═══════════════════════════════════════════════════════════════════════════════ +def reproduce_12(): + header(12, "Behavioral Fingerprinting") + ensure_funds("alice", 3.0) + ensure_funds("bob", 3.0) + + info("Alice's pattern: round amounts, always bech32…") + for i in range(5): + dest = get_new_address("carol", "bech32") + send_to_address("alice", dest, 0.01 * (i + 1)) + + mine_and_confirm() + + info("Bob's pattern: odd amounts, mixed address types…") + for i in range(5): + atype = "bech32m" if i % 2 == 0 else "bech32" + dest = get_new_address("carol", atype) + send_to_address("bob", dest, round(0.00723 * (i + 1) + 0.00011, 8)) + + mine_and_confirm() + ok("Created distinguishable behavioral patterns for Alice and Bob") + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Main +# ═══════════════════════════════════════════════════════════════════════════════ +ALL = [ + (1, "Address Reuse", reproduce_01), + (2, "Multi-input / CIOH", reproduce_02), + (3, "Dust UTXO Detection", reproduce_03), + (4, "Dust Spending w/ Normal", reproduce_04), + (5, "Change Detection", reproduce_05), + (6, "Consolidation Origin", reproduce_06), + (7, "Script Type Mixing", reproduce_07), + (8, "Cluster Merge", reproduce_08), + (9, "Lookback Depth", reproduce_09), + (10, "Exchange Origin", reproduce_10), + (11, "Tainted UTXOs", reproduce_11), + (12, "Behavioral Fingerprint", reproduce_12), +] + +def main(): + filt = None + if "-k" in sys.argv: + idx = sys.argv.index("-k") + if idx + 1 < len(sys.argv): + filt = sys.argv[idx + 1] + + print(f"\n{B}{'═'*78}{R}") + print(f"{B}{C} REPRODUCE — Bitcoin Privacy Vulnerabilities{R}") + print(f"{B}{C} Custom Signet — {get_block_count()} blocks{R}") + print(f"{B}{'═'*78}{R}") + + for num, name, fn in ALL: + if filt and str(num) != filt: + continue + try: + fn() + except Exception as e: + print(f" \033[91m✗ ERROR in {name}: {e}\033[0m") + import traceback; traceback.print_exc() + + print(f"\n{B}{'═'*78}{R}") + print(f" {G}Done. All vulnerability scenarios have been created on-chain.{R}") + print(f" Now run: python3 detect.py ") + print(f"{B}{'═'*78}{R}\n") + +if __name__ == "__main__": + main() diff --git a/backend/script/run_all.sh b/backend/script/run_all.sh new file mode 100755 index 0000000..a88533d --- /dev/null +++ b/backend/script/run_all.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash +# run_all.sh — Setup custom signet and run all 12 vulnerability tests +set -euo pipefail + +cd "$(dirname "$0")" + +echo "╔══════════════════════════════════════════════════════════════╗" +echo "║ Bitcoin Privacy Vulnerability Suite — Full Run ║" +echo "╚══════════════════════════════════════════════════════════════╝" +echo "" + +# Step 1: Setup signet (if not already running) +if bitcoin-cli -signet getblockchaininfo &>/dev/null; then + HEIGHT=$(bitcoin-cli -signet getblockcount) + echo "✓ Custom Signet already running at block $HEIGHT" + + # Check wallets exist + WALLETS=$(bitcoin-cli -signet listwallets 2>/dev/null) + if echo "$WALLETS" | grep -q "alice"; then + echo "✓ Wallets already created" + else + echo "⚠ Wallets not found. Running setup..." + bash setup_signet.sh + fi +else + echo "Starting custom Signet setup..." + bash setup_signet.sh +fi + +echo "" +echo "Running vulnerability tests..." +echo "" + +# Step 2: Run all tests +python3 test_vulnerabilities.py "$@" diff --git a/backend/script/setup_signet.sh b/backend/script/setup_signet.sh new file mode 100755 index 0000000..80d7c5c --- /dev/null +++ b/backend/script/setup_signet.sh @@ -0,0 +1,294 @@ +#!/usr/bin/env bash +# ============================================================================= +# setup_signet.sh — Bootstrap a private custom Signet for vulnerability testing +# ============================================================================= +set -euo pipefail + +DATADIR="$HOME/.bitcoin" +SIGNET_DIR="$DATADIR/signet" +MINER="/home/renato/Desktop/bitcoin/bitcoin/contrib/signet/miner" +GRIND="bitcoin-util grind" +CLI="bitcoin-cli" + +echo "============================================" +echo " STEP 0: Cleanup previous state" +echo "============================================" +bitcoin-cli stop 2>/dev/null || true +bitcoin-cli -signet stop 2>/dev/null || true +sleep 3 + +# Remove old signet data but keep blocks/chainstate for mainnet untouched +rm -rf "$SIGNET_DIR" +rm -f "$DATADIR/bitcoin.conf" + +echo "============================================" +echo " STEP 1: Generate Signet challenge key" +echo "============================================" +rm -f "$DATADIR/bitcoin.conf" + +# Generate key pair using Python + bitcoin-cli (no wallet needed) +KEYPAIR=$(python3 -c " +import hashlib, os, struct + +# Generate a random 32-byte private key +privkey_bytes = os.urandom(32) + +# secp256k1 parameters +P = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F +A = 0 +B = 7 +Gx = 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798 +Gy = 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8 +N = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 + +def modinv(a, m): + g, x, _ = extended_gcd(a % m, m) + return x % m + +def extended_gcd(a, b): + if a == 0: + return b, 0, 1 + g, x, y = extended_gcd(b % a, a) + return g, y - (b // a) * x, x + +def point_add(p1, p2): + if p1 is None: return p2 + if p2 is None: return p1 + x1, y1 = p1 + x2, y2 = p2 + if x1 == x2 and y1 != y2: + return None + if x1 == x2: + lam = (3 * x1 * x1 + A) * modinv(2 * y1, P) % P + else: + lam = (y2 - y1) * modinv(x2 - x1, P) % P + x3 = (lam * lam - x1 - x2) % P + y3 = (lam * (x1 - x3) - y1) % P + return (x3, y3) + +def scalar_mult(k, point): + result = None + addend = point + while k: + if k & 1: + result = point_add(result, addend) + addend = point_add(addend, addend) + k >>= 1 + return result + +privkey_int = int.from_bytes(privkey_bytes, 'big') % N +if privkey_int == 0: + privkey_int = 1 +privkey_bytes = privkey_int.to_bytes(32, 'big') + +pub = scalar_mult(privkey_int, (Gx, Gy)) +pubkey_bytes = b'\x02' + pub[0].to_bytes(32, 'big') if pub[1] % 2 == 0 else b'\x03' + pub[0].to_bytes(32, 'big') + +# WIF encode (testnet/signet = 0xEF prefix) +wif_payload = b'\xef' + privkey_bytes + b'\x01' # compressed +checksum = hashlib.sha256(hashlib.sha256(wif_payload).digest()).digest()[:4] +import base64 +# base58 encoding +ALPHABET = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' +num = int.from_bytes(wif_payload + checksum, 'big') +b58 = '' +while num > 0: + num, rem = divmod(num, 58) + b58 = ALPHABET[rem] + b58 +for byte in (wif_payload + checksum): + if byte == 0: + b58 = '1' + b58 + else: + break + +print(f'{b58} {pubkey_bytes.hex()}') +") + +PRIVKEY=$(echo "$KEYPAIR" | awk '{print $1}') +PUBKEY=$(echo "$KEYPAIR" | awk '{print $2}') + +# Build 1-of-1 multisig signet challenge: OP_1 <33-byte pubkey> OP_1 OP_CHECKMULTISIG +SIGNETCHALLENGE="5121${PUBKEY}51ae" + +echo "" +echo ">>> Private key (WIF): $PRIVKEY" +echo ">>> Public key: $PUBKEY" +echo ">>> Signet challenge: $SIGNETCHALLENGE" +echo "" + +# Save keys for later use +cat > "$DATADIR/signet_keys.env" < "$DATADIR/bitcoin.conf" </dev/null 2>&1; then + echo " Signet node ready" + break + fi + echo " Waiting for node to start... ($i)" + sleep 2 +done + +echo "Node info:" +$CLI -signet getblockchaininfo | python3 -c " +import sys, json +d = json.load(sys.stdin) +print(f' Chain: {d[\"chain\"]}') +print(f' Blocks: {d[\"blocks\"]}') +" + +echo "" +echo "============================================" +echo " STEP 4: Create wallets" +echo "============================================" +WALLETS=("miner" "alice" "bob" "carol" "exchange" "risky") + +for w in "${WALLETS[@]}"; do + echo -n " Creating wallet '$w'... " + $CLI -signet -named createwallet wallet_name="$w" descriptors=true 2>&1 | grep -o '"name": "[^"]*"' || echo "(exists or loaded)" +done + +echo " Loaded wallets:" +$CLI -signet listwallets + +echo "" +echo "============================================" +echo " STEP 5: Import Signet challenge key into miner wallet" +echo "============================================" +# For descriptor wallets, we need to import with the PRIVATE key in the descriptor +# Import both combo (for general key use) and multi(1,...) (for signet challenge signing) +COMBO_INFO=$($CLI -signet getdescriptorinfo "combo($PRIVKEY)") +COMBO_CHECKSUM=$(echo "$COMBO_INFO" | python3 -c "import sys,json; print(json.load(sys.stdin)['checksum'])") +COMBO_DESC="combo($PRIVKEY)#$COMBO_CHECKSUM" + +MULTI_INFO=$($CLI -signet getdescriptorinfo "multi(1,$PRIVKEY)") +MULTI_CHECKSUM=$(echo "$MULTI_INFO" | python3 -c "import sys,json; print(json.load(sys.stdin)['checksum'])") +MULTI_DESC="multi(1,$PRIVKEY)#$MULTI_CHECKSUM" + +echo " Importing combo and multi(1,...) descriptors..." + +$CLI -signet -rpcwallet=miner importdescriptors "[{\"desc\": \"$COMBO_DESC\", \"timestamp\": \"now\"}, {\"desc\": \"$MULTI_DESC\", \"timestamp\": \"now\"}]" | python3 -c " +import sys, json +r = json.load(sys.stdin) +for i, item in enumerate(r): + s = item.get('success', False) + print(f' Import {i+1} success: {s}') + if not s: + print(f' Error: {item.get(\"error\", \"unknown\")}') +" + +echo "" +echo "============================================" +echo " STEP 6: Mine initial blocks (110 blocks)" +echo "============================================" + +MINER_ADDR=$($CLI -signet -rpcwallet=miner getnewaddress "" bech32) +echo " Mining to: $MINER_ADDR" + +echo " Mining 110 blocks (this may take a minute)..." +BLOCK_TIME=$(date +%s) +for i in $(seq 1 110); do + BLOCK_TIME=$((BLOCK_TIME + 1)) + $MINER \ + --cli="$CLI -rpcwallet=miner" \ + generate \ + --grind-cmd="$GRIND" \ + --address="$MINER_ADDR" \ + --min-nbits \ + --set-block-time="$BLOCK_TIME" \ + 2>&1 >/dev/null + if [ $((i % 10)) -eq 0 ]; then + echo " Mined block $i / 110" + fi +done + +HEIGHT=$($CLI -signet getblockcount) +echo " Block height: $HEIGHT" + +echo "" +echo "============================================" +echo " STEP 7: Fund wallets" +echo "============================================" + +MINER_BAL=$($CLI -signet -rpcwallet=miner getbalance) +echo " Miner balance: $MINER_BAL BTC" + +# Fund each wallet +for w in alice bob carol exchange risky; do + ADDR=$($CLI -signet -rpcwallet=$w getnewaddress "" bech32) + TXID=$($CLI -signet -rpcwallet=miner sendtoaddress "$ADDR" 10.0) + echo " Funded $w with 10 BTC (txid: ${TXID:0:16}...)" +done + +echo " Mining 6 more blocks to confirm funding..." +BLOCK_TIME=$(($(date +%s) + 200)) +for i in $(seq 1 6); do + BLOCK_TIME=$((BLOCK_TIME + 1)) + $MINER \ + --cli="$CLI -rpcwallet=miner" \ + generate \ + --grind-cmd="$GRIND" \ + --address="$MINER_ADDR" \ + --min-nbits \ + --set-block-time="$BLOCK_TIME" \ + 2>&1 >/dev/null +done + +echo "" +echo " Final balances:" +for w in miner alice bob carol exchange risky; do + BAL=$($CLI -signet -rpcwallet=$w getbalance) + echo " $w: $BAL BTC" +done + +echo "" +echo "============================================" +echo " SETUP COMPLETE" +echo "============================================" +echo "" +echo " Custom Signet is running with:" +echo " - 6 wallets (miner, alice, bob, carol, exchange, risky)" +echo " - Each funded with 10 BTC" +echo " - txindex=1 for historical lookups" +echo " - acceptnonstdtxn=1 for dust experiments" +echo "" +echo " Signet keys saved to: $DATADIR/signet_keys.env" +echo " To mine more blocks, use: ./mine_blocks.sh " +echo "" + +# Save mining address for later mining +echo "MINER_ADDR=$MINER_ADDR" >> "$DATADIR/signet_keys.env" diff --git a/backend/script/test_vulnerabilities.py b/backend/script/test_vulnerabilities.py new file mode 100644 index 0000000..1993142 --- /dev/null +++ b/backend/script/test_vulnerabilities.py @@ -0,0 +1,1079 @@ +#!/usr/bin/env python3 +""" +test_vulnerabilities.py +======================= +Reproduces and verifies 12 Bitcoin privacy vulnerabilities on a local custom Signet. + +Each test: + 1. Creates the vulnerability scenario using real Bitcoin transactions + 2. Analyzes the on-chain data to DETECT the vulnerability + 3. Asserts the detection is correct (proving the vulnerability exists) + +Usage: + python3 test_vulnerabilities.py # Run all tests + python3 test_vulnerabilities.py -k 1 # Run test for vulnerability 1 +""" + +import sys +import os +import json +import time +import math +from collections import defaultdict + +# Add project dir to path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from bitcoin_rpc import ( + cli, mine_blocks, get_tx, get_utxos, get_balance, + get_new_address, send_to_address, create_raw_tx, sign_raw_tx, + send_raw, decode_raw_tx, get_block_count, create_funded_psbt, + process_psbt, finalize_psbt, +) + +# ═══════════════════════════════════════════════════════════════════════════════ +# ANSI colors for output +# ═══════════════════════════════════════════════════════════════════════════════ +GREEN = "\033[92m" +RED = "\033[91m" +YELLOW = "\033[93m" +CYAN = "\033[96m" +BOLD = "\033[1m" +RESET = "\033[0m" + +PASS_COUNT = 0 +FAIL_COUNT = 0 + + +def header(num, title): + print(f"\n{'═'*78}") + print(f"{BOLD}{CYAN} VULNERABILITY {num}: {title}{RESET}") + print(f"{'═'*78}") + + +def check(condition, msg): + global PASS_COUNT, FAIL_COUNT + if condition: + PASS_COUNT += 1 + print(f" {GREEN}✓ PASS:{RESET} {msg}") + else: + FAIL_COUNT += 1 + print(f" {RED}✗ FAIL:{RESET} {msg}") + return condition + + +def info(msg): + print(f" {YELLOW}ℹ{RESET} {msg}") + + +def ensure_funds(wallet, min_btc=0.5): + """Ensure wallet has at least min_btc, fund from miner if needed.""" + bal = get_balance(wallet) + if bal < min_btc: + addr = get_new_address(wallet, "bech32") + send_to_address("miner", addr, min_btc + 0.1) + mine_blocks(1) + + +def mine_and_confirm(): + """Mine 1 block to confirm pending transactions.""" + mine_blocks(1) + time.sleep(1) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 1: Address Reuse (Reutilização de endereços) +# ═══════════════════════════════════════════════════════════════════════════════ +def test_01_address_reuse(): + header(1, "Address Reuse (Reutilização de endereços)") + + ensure_funds("bob", 1.0) + + # REPRODUCE: Generate ONE address for Alice, receive payments multiple times + reused_addr = get_new_address("alice", "bech32") + info(f"Alice's reused address: {reused_addr}") + + txid1 = send_to_address("bob", reused_addr, 0.01) + txid2 = send_to_address("bob", reused_addr, 0.02) + info(f"TX1: {txid1[:16]}... (0.01 BTC)") + info(f"TX2: {txid2[:16]}... (0.02 BTC)") + + mine_and_confirm() + + # DETECT: Find the same address appearing as output in multiple transactions + tx1 = get_tx(txid1) + tx2 = get_tx(txid2) + + addr_occurrences = defaultdict(list) + for tx_data, txid in [(tx1, txid1), (tx2, txid2)]: + for vout in tx_data["vout"]: + addr = vout.get("scriptPubKey", {}).get("address", "") + if addr: + addr_occurrences[addr].append(txid) + + # Check: reused_addr appears in outputs of BOTH transactions + reuse_count = len(addr_occurrences.get(reused_addr, [])) + check(reuse_count >= 2, + f"Address {reused_addr[:20]}... found in {reuse_count} distinct transactions (need ≥2)") + + # Show the privacy impact + info(f"PRIVACY IMPACT: An observer can link TX1 and TX2 to the same entity") + info(f" because the same address {reused_addr[:20]}... receives funds in both") + + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 2: Multi-input Transactions (Consolidation / CIOH) +# ═══════════════════════════════════════════════════════════════════════════════ +def test_02_consolidation_cioh(): + header(2, "Multi-input Transactions (Consolidation / CIOH)") + + ensure_funds("bob", 2.0) + + # REPRODUCE: Create 5 separate UTXOs for Alice, then spend them all at once + alice_addrs = [] + for i in range(5): + addr = get_new_address("alice", "bech32") + send_to_address("bob", addr, 0.005) + alice_addrs.append(addr) + info(f"UTXO {i+1}: 0.005 BTC -> {addr[:20]}...") + + mine_and_confirm() + + # Select all Alice's UTXOs explicitly + utxos = get_utxos("alice", 1) + small_utxos = [u for u in utxos if 0.004 < u["amount"] < 0.006] + info(f"Found {len(small_utxos)} small UTXOs to consolidate") + + # Build consolidation TX using PSBT + inputs = [{"txid": u["txid"], "vout": u["vout"]} for u in small_utxos[:5]] + dest_addr = get_new_address("bob", "bech32") + + total_input = sum(u["amount"] for u in small_utxos[:5]) + send_amount = round(total_input - 0.001, 8) # leave fee + + psbt_result = create_funded_psbt( + "alice", + inputs, + [{dest_addr: send_amount}], + {"subtractFeeFromOutputs": [0], "add_inputs": False} + ) + psbt = psbt_result["psbt"] + signed = process_psbt("alice", psbt) + final = finalize_psbt(signed["psbt"]) + txid = send_raw(final["hex"]) + info(f"Consolidation TX: {txid[:16]}...") + + mine_and_confirm() + + # DETECT: Transaction with N≥2 inputs = CIOH trigger + tx = get_tx(txid) + num_inputs = len(tx["vin"]) + num_outputs = len(tx["vout"]) + + check(num_inputs >= 2, + f"Transaction has {num_inputs} inputs (CIOH: all inputs assumed same owner)") + check(num_inputs >= 3 and num_outputs <= 2, + f"Consolidation shape: {num_inputs} inputs → {num_outputs} outputs (many→few)") + + info(f"PRIVACY IMPACT: All {num_inputs} input addresses are now linked as same entity") + for vin in tx["vin"]: + parent_tx = get_tx(vin["txid"]) + addr = parent_tx["vout"][vin["vout"]]["scriptPubKey"].get("address", "?") + info(f" Linked address: {addr[:25]}...") + + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 3: Dust UTXO Detection (Detecção de UTXOs dust) +# ═══════════════════════════════════════════════════════════════════════════════ +def test_03_dust_detection(): + header(3, "Dust UTXO Detection (Detecção de UTXOs dust)") + + ensure_funds("bob", 1.0) + + # REPRODUCE: Create very small UTXOs (dust-class) + # Standard dust threshold for P2WPKH is ~294 sats at default relay fee + # We'll create UTXOs of 546 sats (0.00000546) and 1000 sats (0.00001000) + alice_dust_addr1 = get_new_address("alice", "bech32") + alice_dust_addr2 = get_new_address("alice", "bech32") + info(f"Dust target address 1: {alice_dust_addr1[:20]}...") + info(f"Dust target address 2: {alice_dust_addr2[:20]}...") + + # Use raw tx to create precise dust amounts + bob_utxos = get_utxos("bob", 1) + big_utxo = max(bob_utxos, key=lambda u: u["amount"]) + info(f"Using Bob's UTXO: {big_utxo['amount']} BTC") + + change_addr = get_new_address("bob", "bech32") + change_amount = round(big_utxo["amount"] - 0.00001000 - 0.00000546 - 0.0001, 8) + + raw_tx = create_raw_tx( + [{"txid": big_utxo["txid"], "vout": big_utxo["vout"]}], + [ + {alice_dust_addr1: 0.00001000}, # 1000 sats - dust-class + {alice_dust_addr2: 0.00000546}, # 546 sats - at dust threshold + {change_addr: change_amount}, + ] + ) + signed = sign_raw_tx("bob", raw_tx) + txid = send_raw(signed["hex"]) + info(f"Dust TX: {txid[:16]}...") + + mine_and_confirm() + + # DETECT: Scan outputs for values below dust threshold + tx = get_tx(txid) + DUST_THRESHOLD_SATS = 1000 # Conservative: anything ≤ 1000 sats is "dust-class" + STRICT_DUST_SATS = 546 # Bitcoin Core's strict P2WPKH dust limit + + dust_outputs = [] + for vout in tx["vout"]: + value_sats = int(round(vout["value"] * 1e8)) + if value_sats <= DUST_THRESHOLD_SATS: + dust_outputs.append({ + "vout_n": vout["n"], + "value_sats": value_sats, + "address": vout["scriptPubKey"].get("address", "?"), + "is_strict_dust": value_sats <= STRICT_DUST_SATS, + }) + + check(len(dust_outputs) >= 2, + f"Found {len(dust_outputs)} dust outputs (≤{DUST_THRESHOLD_SATS} sats)") + + strict_dust = [d for d in dust_outputs if d["is_strict_dust"]] + check(len(strict_dust) >= 1, + f"Found {len(strict_dust)} outputs at/below strict dust threshold (≤{STRICT_DUST_SATS} sats)") + + for d in dust_outputs: + info(f" Dust output #{d['vout_n']}: {d['value_sats']} sats -> {d['address'][:20]}... " + f"({'STRICT DUST' if d['is_strict_dust'] else 'dust-class'})") + + info("PRIVACY IMPACT: Dust UTXOs can be used as tracking tokens (dust attacks)") + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 4: Spending Dust with Other Inputs +# ═══════════════════════════════════════════════════════════════════════════════ +def test_04_dust_spending(): + header(4, "Spending Dust UTXOs with Other Inputs") + + ensure_funds("alice", 1.0) + + # REPRODUCE: Alice has dust UTXOs from test 3, plus normal UTXOs + # Spend a dust UTXO together with a normal UTXO + utxos = get_utxos("alice", 1) + + dust_utxos = [u for u in utxos if u["amount"] <= 0.00001] + normal_utxos = [u for u in utxos if u["amount"] > 0.001] + + if not dust_utxos: + info("No dust UTXOs found, creating one...") + # Create a dust UTXO for alice + ensure_funds("bob", 1.0) + alice_addr = get_new_address("alice", "bech32") + bob_utxos = get_utxos("bob", 1) + big_utxo = max(bob_utxos, key=lambda u: u["amount"]) + change_addr = get_new_address("bob", "bech32") + change_amount = round(big_utxo["amount"] - 0.00001000 - 0.0001, 8) + raw_tx = create_raw_tx( + [{"txid": big_utxo["txid"], "vout": big_utxo["vout"]}], + [{alice_addr: 0.00001000}, {change_addr: change_amount}] + ) + signed = sign_raw_tx("bob", raw_tx) + send_raw(signed["hex"]) + mine_and_confirm() + utxos = get_utxos("alice", 1) + dust_utxos = [u for u in utxos if u["amount"] <= 0.00001] + normal_utxos = [u for u in utxos if u["amount"] > 0.001] + + if not normal_utxos: + info("No normal UTXOs found, creating one...") + ensure_funds("alice", 0.5) + mine_and_confirm() + utxos = get_utxos("alice", 1) + normal_utxos = [u for u in utxos if u["amount"] > 0.001] + + dust = dust_utxos[0] + normal = normal_utxos[0] + + info(f"Dust UTXO: {dust['amount']:.8f} BTC ({int(dust['amount']*1e8)} sats)") + info(f"Normal UTXO: {normal['amount']:.8f} BTC") + + # Spend both together + dest_addr = get_new_address("bob", "bech32") + total = dust["amount"] + normal["amount"] + send_amt = round(total - 0.0001, 8) + + raw_tx = create_raw_tx( + [ + {"txid": dust["txid"], "vout": dust["vout"]}, + {"txid": normal["txid"], "vout": normal["vout"]}, + ], + [{dest_addr: send_amt}] + ) + signed = sign_raw_tx("alice", raw_tx) + txid = send_raw(signed["hex"]) + info(f"Dust-spend TX: {txid[:16]}...") + + mine_and_confirm() + + # DETECT: A tx with inputs mixing dust and non-dust + tx = get_tx(txid) + input_values = [] + for vin in tx["vin"]: + parent = get_tx(vin["txid"]) + val = parent["vout"][vin["vout"]]["value"] + input_values.append(val) + + dust_inputs = [v for v in input_values if v <= 0.00001] + non_dust_inputs = [v for v in input_values if v > 0.001] + + check(len(dust_inputs) >= 1 and len(non_dust_inputs) >= 1, + f"TX mixes {len(dust_inputs)} dust input(s) with {len(non_dust_inputs)} normal input(s)") + + info("PRIVACY IMPACT: Dust attack succeeds—the dust sender can now link") + info(" Alice's normal UTXO to the dust tracking token via CIOH") + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 5: Change Detection (Detecção provável de troco) +# ═══════════════════════════════════════════════════════════════════════════════ +def test_05_change_detection(): + header(5, "Probable Change Detection (Detecção provável de troco)") + + ensure_funds("alice", 1.0) + + # REPRODUCE: Alice pays Bob a round amount; wallet auto-creates change + bob_addr = get_new_address("bob", "bech32") + txid = send_to_address("alice", bob_addr, 0.05) # Round payment + info(f"Payment TX: {txid[:16]}...") + + mine_and_confirm() + + # DETECT: Heuristic change detection + tx = get_tx(txid) + + payment_output = None + change_candidate = None + + for vout in tx["vout"]: + addr = vout["scriptPubKey"].get("address", "") + value = vout["value"] + value_sats = int(round(value * 1e8)) + + # Heuristic 1: Round amount = payment (not change) + is_round = (value_sats % 100000 == 0) or (value_sats % 1000000 == 0) + + # Heuristic 2: Recipient address + is_to_bob = (addr == bob_addr) + + if is_to_bob or is_round: + payment_output = {"n": vout["n"], "value": value, "addr": addr, "round": is_round} + else: + change_candidate = {"n": vout["n"], "value": value, "addr": addr, "round": is_round} + + check(payment_output is not None, + f"Payment output detected: {payment_output['value']:.8f} BTC (round={payment_output['round']})") + + check(change_candidate is not None, + f"Change candidate detected: {change_candidate['value']:.8f} BTC (non-round amount)") + + if payment_output and change_candidate: + # Verify: change output should be the "odd" amount + check(not change_candidate["round"], + f"Change has non-round value ({int(change_candidate['value']*1e8)} sats) — strong change indicator") + + # Heuristic 3: Same script type as input + input_tx = get_tx(tx["vin"][0]["txid"]) + input_type = input_tx["vout"][tx["vin"][0]["vout"]]["scriptPubKey"]["type"] + change_type = tx["vout"][change_candidate["n"]]["scriptPubKey"]["type"] + check(input_type == change_type, + f"Change has same script type as input ({change_type}) — another strong indicator") + + info("PRIVACY IMPACT: Observer can distinguish payment from change,") + info(" identifying the sender's change address and tracking their funds") + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 6: UTXOs from Prior Consolidation +# ═══════════════════════════════════════════════════════════════════════════════ +def test_06_consolidation_origin(): + header(6, "UTXOs Originating from Prior Consolidation") + + ensure_funds("bob", 2.0) + + # REPRODUCE: Step 1 - Create a consolidation transaction for Alice + for i in range(4): + addr = get_new_address("alice", "bech32") + send_to_address("bob", addr, 0.003) + + mine_and_confirm() + + # Consolidate + utxos = get_utxos("alice", 1) + small_utxos = [u for u in utxos if 0.002 < u["amount"] < 0.004][:4] + + if len(small_utxos) < 2: + info(f"Not enough small UTXOs ({len(small_utxos)}), creating more...") + for i in range(4): + addr = get_new_address("alice", "bech32") + send_to_address("bob", addr, 0.003) + mine_and_confirm() + utxos = get_utxos("alice", 1) + small_utxos = [u for u in utxos if 0.002 < u["amount"] < 0.004][:4] + + inputs = [{"txid": u["txid"], "vout": u["vout"]} for u in small_utxos] + consolidation_addr = get_new_address("alice", "bech32") + total = sum(u["amount"] for u in small_utxos) + send_amt = round(total - 0.0001, 8) + + raw_tx = create_raw_tx(inputs, [{consolidation_addr: send_amt}]) + signed = sign_raw_tx("alice", raw_tx) + consolidation_txid = send_raw(signed["hex"]) + info(f"Consolidation TX: {consolidation_txid[:16]}... ({len(inputs)} inputs → 1 output)") + + mine_and_confirm() + + # Step 2 - Spend the consolidated output + utxos = get_utxos("alice", 1) + consolidated = [u for u in utxos if u["txid"] == consolidation_txid] + + if consolidated: + dest = get_new_address("carol", "bech32") + spend_amt = round(consolidated[0]["amount"] - 0.0001, 8) + raw_tx = create_raw_tx( + [{"txid": consolidated[0]["txid"], "vout": consolidated[0]["vout"]}], + [{dest: spend_amt}] + ) + signed = sign_raw_tx("alice", raw_tx) + spend_txid = send_raw(signed["hex"]) + info(f"Spend TX: {spend_txid[:16]}...") + mine_and_confirm() + + # DETECT: Check if input's parent tx has consolidation shape + spend_tx = get_tx(spend_txid) + parent_txid = spend_tx["vin"][0]["txid"] + parent_tx = get_tx(parent_txid) + parent_inputs = len(parent_tx["vin"]) + parent_outputs = len(parent_tx["vout"]) + + is_from_consolidation = parent_inputs >= 3 and parent_outputs <= 2 + + check(is_from_consolidation, + f"UTXO parent has consolidation shape: {parent_inputs} inputs → {parent_outputs} output(s)") + check(parent_inputs >= 3, + f"Parent tx has {parent_inputs} inputs (threshold: ≥3 = consolidation)") + + info("PRIVACY IMPACT: UTXOs born from consolidation carry the full") + info(" cluster linkage of ALL inputs that were merged") + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 7: Script Type Inconsistency / Mixing +# ═══════════════════════════════════════════════════════════════════════════════ +def test_07_script_type_mixing(): + header(7, "Script Type Inconsistency / Mixing") + + ensure_funds("bob", 2.0) + + # REPRODUCE: Create UTXOs of different script types for Alice + wpkh_addr = get_new_address("alice", "bech32") # P2WPKH (bc1q...) + tr_addr = get_new_address("alice", "bech32m") # P2TR (bc1p...) + + info(f"P2WPKH address: {wpkh_addr[:20]}...") + info(f"P2TR address: {tr_addr[:20]}...") + + send_to_address("bob", wpkh_addr, 0.005) + send_to_address("bob", tr_addr, 0.005) + mine_and_confirm() + + # Now spend both in the same transaction + utxos = get_utxos("alice", 1) + + wpkh_utxo = None + tr_utxo = None + for u in utxos: + if u.get("address", "").startswith("tb1q") and u["amount"] >= 0.004 and not wpkh_utxo: + wpkh_utxo = u + elif u.get("address", "").startswith("tb1p") and u["amount"] >= 0.004 and not tr_utxo: + tr_utxo = u + + if not wpkh_utxo or not tr_utxo: + # Fallback: try with desc type + for u in utxos: + desc = u.get("desc", "") + if "wpkh" in desc and u["amount"] >= 0.004 and not wpkh_utxo: + wpkh_utxo = u + elif "tr(" in desc and u["amount"] >= 0.004 and not tr_utxo: + tr_utxo = u + + if not wpkh_utxo or not tr_utxo: + info("Could not find both UTXO types, listing available:") + for u in utxos: + info(f" {u.get('address','?')[:25]}... = {u['amount']} ({u.get('desc','?')[:20]})") + info("Skipping mixed-input test, testing output-side mixing instead...") + + # Output-side mixing: pay to P2WPKH and change to P2TR + if utxos: + dest_wpkh = get_new_address("bob", "bech32") + dest_tr = get_new_address("bob", "bech32m") + u = utxos[0] + half = round(u["amount"] / 2 - 0.00005, 8) + raw_tx = create_raw_tx( + [{"txid": u["txid"], "vout": u["vout"]}], + [{dest_wpkh: half}, {dest_tr: half}] + ) + signed = sign_raw_tx("alice", raw_tx) + txid = send_raw(signed["hex"]) + mine_and_confirm() + tx = get_tx(txid) + output_types = set() + for vout in tx["vout"]: + output_types.add(vout["scriptPubKey"]["type"]) + check(len(output_types) >= 2, + f"Output script types: {output_types} — heterogeneous outputs") + return True + + info(f"P2WPKH UTXO: {wpkh_utxo['amount']} BTC at {wpkh_utxo.get('address','?')[:20]}...") + info(f"P2TR UTXO: {tr_utxo['amount']} BTC at {tr_utxo.get('address','?')[:20]}...") + + dest = get_new_address("bob", "bech32") + total = wpkh_utxo["amount"] + tr_utxo["amount"] + send_amt = round(total - 0.0002, 8) + + raw_tx = create_raw_tx( + [ + {"txid": wpkh_utxo["txid"], "vout": wpkh_utxo["vout"]}, + {"txid": tr_utxo["txid"], "vout": tr_utxo["vout"]}, + ], + [{dest: send_amt}] + ) + signed = sign_raw_tx("alice", raw_tx) + txid = send_raw(signed["hex"]) + info(f"Mixed-type TX: {txid[:16]}...") + + mine_and_confirm() + + # DETECT: Check if inputs have different script types + tx = get_tx(txid) + input_types = set() + for vin in tx["vin"]: + parent = get_tx(vin["txid"]) + script_type = parent["vout"][vin["vout"]]["scriptPubKey"]["type"] + input_types.add(script_type) + info(f" Input type: {script_type}") + + check(len(input_types) >= 2, + f"Input script types: {input_types} — heterogeneous (fingerprint!)") + + info("PRIVACY IMPACT: Mixing script types is a behavioral fingerprint") + info(" and reveals the wallet controls both address families") + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 8: Merging Previously Separate UTXO Clusters +# ═══════════════════════════════════════════════════════════════════════════════ +def test_08_cluster_merge(): + header(8, "Merging Previously Separate UTXO Clusters") + + ensure_funds("bob", 2.0) + ensure_funds("carol", 2.0) + + # REPRODUCE: Create two separate clusters for Alice + # Cluster A: from Bob + cluster_a_addr = get_new_address("alice", "bech32") + txid_a = send_to_address("bob", cluster_a_addr, 0.004) + info(f"Cluster A (from Bob): {cluster_a_addr[:20]}... = 0.004 BTC") + + # Cluster B: from Carol + cluster_b_addr = get_new_address("alice", "bech32") + txid_b = send_to_address("carol", cluster_b_addr, 0.004) + info(f"Cluster B (from Carol): {cluster_b_addr[:20]}... = 0.004 BTC") + + mine_and_confirm() + + # Find the specific UTXOs + utxos = get_utxos("alice", 1) + utxo_a = next((u for u in utxos if u["txid"] == txid_a), None) + utxo_b = next((u for u in utxos if u["txid"] == txid_b), None) + + if not utxo_a or not utxo_b: + info("Searching for UTXOs by address...") + utxo_a = next((u for u in utxos if u.get("address") == cluster_a_addr), None) + utxo_b = next((u for u in utxos if u.get("address") == cluster_b_addr), None) + + if not utxo_a or not utxo_b: + info("Could not locate both cluster UTXOs") + return False + + # MERGE: Spend one from each cluster together + dest = get_new_address("bob", "bech32") + total = utxo_a["amount"] + utxo_b["amount"] + send_amt = round(total - 0.0002, 8) + + raw_tx = create_raw_tx( + [ + {"txid": utxo_a["txid"], "vout": utxo_a["vout"]}, + {"txid": utxo_b["txid"], "vout": utxo_b["vout"]}, + ], + [{dest: send_amt}] + ) + signed = sign_raw_tx("alice", raw_tx) + merge_txid = send_raw(signed["hex"]) + info(f"Cluster merge TX: {merge_txid[:16]}...") + + mine_and_confirm() + + # DETECT: Check if inputs come from different source clusters + merge_tx = get_tx(merge_txid) + source_txids = [vin["txid"] for vin in merge_tx["vin"]] + + # Trace each input to its source + sources = {} + for vin in merge_tx["vin"]: + parent = get_tx(vin["txid"]) + # Who funded this? Check the inputs of the parent tx + if parent["vin"][0].get("coinbase"): + sources[vin["txid"]] = "coinbase" + else: + grandparent_txid = parent["vin"][0]["txid"] + grandparent = get_tx(grandparent_txid) + # Check which wallet owned the input + sources[vin["txid"]] = grandparent_txid[:16] + + distinct_sources = len(set(sources.values())) + check(len(source_txids) >= 2, + f"Merge TX has {len(source_txids)} inputs from different funding transactions") + + check(distinct_sources >= 2 or len(source_txids) >= 2, + f"Inputs trace to {distinct_sources} distinct source chains — clusters merged!") + + info("PRIVACY IMPACT: Previously separate identity clusters (Bob-linked") + info(" and Carol-linked) are now permanently merged into one cluster") + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 9: UTXO Historical Depth (Lookback Depth) +# ═══════════════════════════════════════════════════════════════════════════════ +def test_09_lookback_depth(): + header(9, "UTXO Historical Depth (Lookback Depth)") + + ensure_funds("alice", 1.0) + + # REPRODUCE: Create an "old" UTXO and let it age many blocks + old_addr = get_new_address("alice", "bech32") + old_txid = send_to_address("miner", old_addr, 0.01) + info(f"Old UTXO created: {old_txid[:16]}...") + + mine_blocks(20) # Age it 20 blocks + info("Mined 20 blocks to age the UTXO") + + # Create a "new" UTXO + new_addr = get_new_address("alice", "bech32") + new_txid = send_to_address("miner", new_addr, 0.01) + info(f"New UTXO created: {new_txid[:16]}...") + + mine_and_confirm() + + # DETECT: Compare confirmation depths + old_tx = get_tx(old_txid) + new_tx = get_tx(new_txid) + + old_confs = old_tx.get("confirmations", 0) + new_confs = new_tx.get("confirmations", 0) + + check(old_confs > new_confs + 10, + f"Old UTXO: {old_confs} confirmations vs New UTXO: {new_confs} confirmations (diff={old_confs - new_confs})") + + # Ancestor chain analysis + def trace_depth(txid, max_depth=10): + """Walk back through the transaction chain.""" + depth = 0 + current_txid = txid + chain = [current_txid[:16]] + for _ in range(max_depth): + tx = get_tx(current_txid) + if tx["vin"][0].get("coinbase"): + chain.append("COINBASE") + break + current_txid = tx["vin"][0]["txid"] + chain.append(current_txid[:16]) + depth += 1 + return depth, chain + + old_depth, old_chain = trace_depth(old_txid) + new_depth, new_chain = trace_depth(new_txid) + + info(f"Old UTXO chain depth: {old_depth} hops: {' → '.join(old_chain[:5])}") + info(f"New UTXO chain depth: {new_depth} hops: {' → '.join(new_chain[:5])}") + + check(old_confs >= 15, + f"Old UTXO has ≥15 confirmations ({old_confs}) — detectable age pattern") + + info("PRIVACY IMPACT: UTXO age reveals dormancy patterns, coin hoarding,") + info(" or can distinguish 'fresh' exchange withdrawals from aged savings") + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 10: Probable Exchange Origin +# ═══════════════════════════════════════════════════════════════════════════════ +def test_10_exchange_origin(): + header(10, "Identification of Probable Exchange Origin") + + ensure_funds("exchange", 5.0) + + # REPRODUCE: Simulate exchange batch withdrawal (many outputs) + batch_outputs = {} + recipients = [] + for i in range(8): + # Send to alice, bob, carol in round-robin plus random wallets + wallets = ["alice", "bob", "carol", "alice", "bob", "carol", "alice", "bob"] + addr = get_new_address(wallets[i], "bech32") + batch_outputs[addr] = round(0.01 + (i * 0.001), 8) + recipients.append((wallets[i], addr[:15])) + + info(f"Exchange batch withdrawal: {len(batch_outputs)} recipients") + for w, a in recipients: + info(f" → {w}: {a}...") + + # Use sendmany for batch + txid = cli("sendmany", "", json.dumps(batch_outputs), wallet="exchange") + info(f"Batch TX: {txid[:16]}...") + + mine_and_confirm() + + # DETECT: Analyze the transaction for exchange-like patterns + tx = get_tx(txid) + num_outputs = len(tx["vout"]) + num_inputs = len(tx["vin"]) + + # Exchange heuristics: + # 1. High output count (batching) + is_batch = num_outputs >= 5 + check(is_batch, + f"High output count: {num_outputs} outputs (≥5 = likely batch withdrawal)") + + # 2. Round-ish payment amounts (exchanges often use round amounts) + round_outputs = 0 + for vout in tx["vout"]: + sats = int(round(vout["value"] * 1e8)) + if sats % 100000 == 0 or sats % 10000 == 0: + round_outputs += 1 + + # 3. Large input(s) relative to individual outputs + input_total = 0 + for vin in tx["vin"]: + parent = get_tx(vin["txid"]) + input_total += parent["vout"][vin["vout"]]["value"] + + # Exclude the largest output (likely change) — look at median payment + output_vals = sorted([v["value"] for v in tx["vout"]]) + median_output = output_vals[len(output_vals) // 2] + ratio = input_total / median_output if median_output > 0 else 0 + + check(ratio > 3, + f"Input/median-output ratio: {ratio:.1f}x (high ratio suggests large hot wallet)") + + # 4. Many unique recipient addresses + unique_addrs = set() + for vout in tx["vout"]: + addr = vout["scriptPubKey"].get("address", "") + if addr: + unique_addrs.add(addr) + + check(len(unique_addrs) >= 5, + f"Unique recipient addresses: {len(unique_addrs)} (many = batch pattern)") + + info("PRIVACY IMPACT: UTXOs from exchange withdrawals reveal the user") + info(" interacted with that exchange, enabling entity-linking") + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 11: UTXOs from Risk Sources ("Dirty Money") +# ═══════════════════════════════════════════════════════════════════════════════ +def test_11_tainted_utxos(): + header(11, 'UTXOs from Risk Sources ("Dirty Money" / Taint)') + + ensure_funds("risky", 2.0) + ensure_funds("alice", 1.0) + + # REPRODUCE: "risky" (known bad actor) sends to Alice + alice_tainted_addr = get_new_address("alice", "bech32") + taint_txid = send_to_address("risky", alice_tainted_addr, 0.01) + info(f"Taint TX (risky → alice): {taint_txid[:16]}...") + + # Also give Alice a clean UTXO + alice_clean_addr = get_new_address("alice", "bech32") + clean_txid = send_to_address("bob", alice_clean_addr, 0.01) + info(f"Clean TX (bob → alice): {clean_txid[:16]}...") + + mine_and_confirm() + + # Step 2: Alice consolidates tainted + clean (taint propagation!) + utxos = get_utxos("alice", 1) + tainted_utxo = next((u for u in utxos if u["txid"] == taint_txid), None) + clean_utxo = next((u for u in utxos if u["txid"] == clean_txid), None) + + if not tainted_utxo or not clean_utxo: + info("Locating UTXOs by address...") + tainted_utxo = next((u for u in utxos if u.get("address") == alice_tainted_addr), None) + clean_utxo = next((u for u in utxos if u.get("address") == alice_clean_addr), None) + + if not tainted_utxo or not clean_utxo: + info("Could not find both UTXOs") + return False + + # Merge tainted + clean + dest = get_new_address("carol", "bech32") + total = tainted_utxo["amount"] + clean_utxo["amount"] + send_amt = round(total - 0.0002, 8) + + raw_tx = create_raw_tx( + [ + {"txid": tainted_utxo["txid"], "vout": tainted_utxo["vout"]}, + {"txid": clean_utxo["txid"], "vout": clean_utxo["vout"]}, + ], + [{dest: send_amt}] + ) + signed = sign_raw_tx("alice", raw_tx) + merge_txid = send_raw(signed["hex"]) + info(f"Taint merge TX: {merge_txid[:16]}...") + + mine_and_confirm() + + # DETECT: Taint analysis + # Build set of TXIDs that originated from the "risky" wallet + risky_txids = set() + risky_txs = cli("listtransactions", "*", 100, 0, wallet="risky") + for rtx in risky_txs: + if rtx.get("txid"): + risky_txids.add(rtx["txid"]) + + merge_tx = get_tx(merge_txid) + + tainted_inputs = 0 + clean_inputs = 0 + for vin in merge_tx["vin"]: + parent_txid = vin["txid"] + # A parent TX is tainted if it appears in risky wallet's history + is_tainted = parent_txid in risky_txids + if is_tainted: + tainted_inputs += 1 + info(f" Input from {parent_txid[:16]}... — TAINTED (from risky source)") + else: + clean_inputs += 1 + info(f" Input from {parent_txid[:16]}... — CLEAN") + + check(tainted_inputs >= 1, + f"Found {tainted_inputs} tainted input(s) in the merge transaction") + check(tainted_inputs >= 1 and clean_inputs >= 1, + f"TAINT PROPAGATION: {tainted_inputs} tainted + {clean_inputs} clean merged → all outputs tainted") + + # Taint scoring + taint_ratio = tainted_inputs / (tainted_inputs + clean_inputs) if (tainted_inputs + clean_inputs) > 0 else 0 + info(f" Taint ratio: {taint_ratio:.0%} of inputs from risky sources") + + info("PRIVACY IMPACT: Merging tainted + clean funds contaminates ALL outputs") + info(" Carol now receives 'dirty' coins even though she dealt with Alice") + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# VULNERABILITY 12: Behavioral Fingerprinting +# ═══════════════════════════════════════════════════════════════════════════════ +def test_12_behavioral_fingerprint(): + header(12, "Behavioral Fingerprinting") + + ensure_funds("alice", 3.0) + ensure_funds("bob", 3.0) + + # REPRODUCE: Create distinctive transaction patterns for Alice vs Bob + alice_txids = [] + bob_txids = [] + + info("Creating Alice's transactions (consistent behavioral pattern)...") + # Alice's pattern: always round payments, always bech32, always ~same fee + for i in range(5): + dest = get_new_address("carol", "bech32") # Alice always pays to bech32 + amount = 0.01 * (i + 1) # Always round amounts + txid = send_to_address("alice", dest, amount) + alice_txids.append(txid) + info(f" Alice TX {i+1}: {amount:.8f} BTC → bech32") + + mine_and_confirm() + + info("Creating Bob's transactions (different behavioral pattern)...") + # Bob's pattern: odd amounts, mixes address types + for i in range(5): + addr_type = "bech32m" if i % 2 == 0 else "bech32" # Bob mixes types + dest = get_new_address("carol", addr_type) + amount = 0.00723 * (i + 1) + 0.00011 # Odd amounts + amount = round(amount, 8) + txid = send_to_address("bob", dest, amount) + bob_txids.append(txid) + info(f" Bob TX {i+1}: {amount:.8f} BTC → {addr_type}") + + mine_and_confirm() + + # DETECT: Extract behavioral features and distinguish users + def extract_features(txids, label): + features = { + "label": label, + "output_counts": [], + "has_round_payment": [], + "output_types": [], + "feerate_estimates": [], + "rbf_signals": [], + } + for txid in txids: + tx = get_tx(txid) + if not tx: + continue + + # Output count + features["output_counts"].append(len(tx["vout"])) + + # Round payment detection + for vout in tx["vout"]: + sats = int(round(vout["value"] * 1e8)) + is_round = sats % 100000 == 0 or sats % 1000000 == 0 + features["has_round_payment"].append(is_round) + + # Output script types + for vout in tx["vout"]: + features["output_types"].append(vout["scriptPubKey"]["type"]) + + # RBF signaling (sequence < 0xfffffffe) + for vin in tx["vin"]: + seq = vin.get("sequence", 0xffffffff) + features["rbf_signals"].append(seq < 0xfffffffe) + + # Fee estimation (size * feerate) + if "vsize" in tx and "fee" in tx: + feerate = abs(tx.get("fee", 0)) / tx["vsize"] * 1e8 # sat/vB + features["feerate_estimates"].append(feerate) + + return features + + alice_features = extract_features(alice_txids, "alice") + bob_features = extract_features(bob_txids, "bob") + + # Analysis + alice_round_ratio = sum(alice_features["has_round_payment"]) / max(len(alice_features["has_round_payment"]), 1) + bob_round_ratio = sum(bob_features["has_round_payment"]) / max(len(bob_features["has_round_payment"]), 1) + + alice_type_set = set(alice_features["output_types"]) + bob_type_set = set(bob_features["output_types"]) + + info(f"\n {'Feature':<30} {'Alice':<25} {'Bob':<25}") + info(f" {'─'*80}") + info(f" {'Round payment ratio':<30} {alice_round_ratio:<25.0%} {bob_round_ratio:<25.0%}") + info(f" {'Output types used':<30} {str(alice_type_set):<25} {str(bob_type_set):<25}") + info(f" {'Avg output count':<30} " + f"{sum(alice_features['output_counts'])/max(len(alice_features['output_counts']),1):<25.1f} " + f"{sum(bob_features['output_counts'])/max(len(bob_features['output_counts']),1):<25.1f}") + + alice_rbf = sum(alice_features["rbf_signals"]) / max(len(alice_features["rbf_signals"]), 1) + bob_rbf = sum(bob_features["rbf_signals"]) / max(len(bob_features["rbf_signals"]), 1) + info(f" {'RBF signal ratio':<30} {alice_rbf:<25.0%} {bob_rbf:<25.0%}") + + # Distinguishability test + features_differ = ( + abs(alice_round_ratio - bob_round_ratio) > 0.3 or + alice_type_set != bob_type_set or + abs(alice_rbf - bob_rbf) > 0.3 + ) + + check(features_differ, + "Behavioral features DIFFER between Alice and Bob — fingerprinting possible") + + check(alice_round_ratio > bob_round_ratio, + f"Alice uses more round amounts ({alice_round_ratio:.0%}) than Bob ({bob_round_ratio:.0%})") + + # Check if Bob mixes script types more + bob_mixes = len(bob_type_set) >= 2 + check(bob_mixes or alice_type_set != bob_type_set, + f"Script type diversity differs: Alice={alice_type_set}, Bob={bob_type_set}") + + info("\nPRIVACY IMPACT: Consistent behavioral patterns allow re-identification") + info(" of the same entity across transactions even without address reuse") + return True + + +# ═══════════════════════════════════════════════════════════════════════════════ +# MAIN: Run all tests +# ═══════════════════════════════════════════════════════════════════════════════ +def main(): + print(f"\n{BOLD}{'═'*78}{RESET}") + print(f"{BOLD}{CYAN} BITCOIN PRIVACY VULNERABILITY TEST SUITE{RESET}") + print(f"{BOLD}{CYAN} Custom Signet — {get_block_count()} blocks{RESET}") + print(f"{BOLD}{'═'*78}{RESET}") + + # Check which test to run + test_filter = None + if len(sys.argv) > 1: + for arg in sys.argv[1:]: + if arg == "-k" and sys.argv.index(arg) + 1 < len(sys.argv): + test_filter = sys.argv[sys.argv.index(arg) + 1] + elif arg.isdigit(): + test_filter = arg + + tests = [ + (1, "Address Reuse", test_01_address_reuse), + (2, "Multi-input / CIOH", test_02_consolidation_cioh), + (3, "Dust UTXO Detection", test_03_dust_detection), + (4, "Dust Spending w/ Normal", test_04_dust_spending), + (5, "Change Detection", test_05_change_detection), + (6, "Consolidation Origin", test_06_consolidation_origin), + (7, "Script Type Mixing", test_07_script_type_mixing), + (8, "Cluster Merge", test_08_cluster_merge), + (9, "Lookback Depth", test_09_lookback_depth), + (10, "Exchange Origin", test_10_exchange_origin), + (11, "Tainted UTXOs", test_11_tainted_utxos), + (12, "Behavioral Fingerprint", test_12_behavioral_fingerprint), + ] + + results = {} + for num, name, func in tests: + if test_filter and str(num) != test_filter: + continue + try: + result = func() + results[num] = "PASS" if result else "FAIL" + except Exception as e: + results[num] = f"ERROR: {e}" + print(f" {RED}✗ ERROR:{RESET} {e}") + import traceback + traceback.print_exc() + + # Summary + print(f"\n{'═'*78}") + print(f"{BOLD} TEST SUMMARY{RESET}") + print(f"{'═'*78}") + for num, name, _ in tests: + if num in results: + status = results[num] + color = GREEN if status == "PASS" else RED + print(f" {color}{'✓' if status=='PASS' else '✗'}{RESET} Vulnerability {num:2d}: {name:<35} [{status}]") + + print(f"\n {GREEN}Passed checks: {PASS_COUNT}{RESET}") + print(f" {RED}Failed checks: {FAIL_COUNT}{RESET}") + print(f" Total: {PASS_COUNT + FAIL_COUNT}") + print() + + return FAIL_COUNT == 0 + + +if __name__ == "__main__": + success = main() + sys.exit(0 if success else 1) diff --git a/backend/script/verify.py b/backend/script/verify.py new file mode 100644 index 0000000..6fac3f5 --- /dev/null +++ b/backend/script/verify.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +""" +verify.py +========= +End-to-end proof that detect.py catches every vulnerability that reproduce.py +creates — on a REGTEST chain. + +Steps: + 1. Wipe & restart regtest + 2. Create wallets, fund miner + 3. Run reproduce.py (create all 12 vulnerability scenarios) + 4. Run detect.py --wallet alice (capture output) + 5. Parse output and assert every detector (1–12) produced ≥1 finding + 6. Print a 12-row proof table + +Usage: + python3 verify.py +""" + +import subprocess +import sys +import os +import re +import time + +DIR = os.path.dirname(os.path.abspath(__file__)) +WALLETS = ["miner", "alice", "bob", "carol", "exchange", "risky"] + +G = "\033[92m" +R = "\033[91m" +B = "\033[1m" +C = "\033[96m" +Y = "\033[93m" +RST = "\033[0m" + + +def run(cmd, check=True, timeout=300): + """Run a shell command, return stdout.""" + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout) + if check and result.returncode != 0: + print(f" {R}FAIL:{RST} {cmd}") + print(f" stderr: {result.stderr.strip()}") + sys.exit(1) + return result.stdout.strip() + + +def btc(cmd): + return run(f"bitcoin-cli -regtest {cmd}") + + +def btcw(wallet, cmd): + return run(f"bitcoin-cli -regtest -rpcwallet={wallet} {cmd}") + + +def banner(msg): + print(f"\n{B}{C}{'═' * 70}{RST}") + print(f"{B}{C} {msg}{RST}") + print(f"{B}{C}{'═' * 70}{RST}") + + +# ───────────────────────────────────────────────────────────────────────────── +# Step 1: Fresh regtest +# ───────────────────────────────────────────────────────────────────────────── +def setup_regtest(): + banner("Step 1: Fresh regtest chain") + # Stop if running + run("bitcoin-cli -regtest stop 2>/dev/null || true", check=False) + time.sleep(2) + + # Wipe + run("rm -rf ~/.bitcoin/regtest") + print(" ✓ Wiped regtest datadir") + + # Ensure bitcoin.conf exists with regtest settings + conf = os.path.expanduser("~/.bitcoin/bitcoin.conf") + with open(conf, "w") as f: + f.write("regtest=1\ntxindex=1\n\n[regtest]\n" + "fallbackfee=0.00010\ndustrelayfee=0.00000001\n" + "acceptnonstdtxn=1\nserver=1\n") + print(" ✓ Wrote bitcoin.conf") + + # Start + run("bitcoind -regtest -daemon") + # Wait for RPC to become ready + print(" … waiting for bitcoind RPC …", end="", flush=True) + for i in range(30): + time.sleep(1) + res = subprocess.run("bitcoin-cli -regtest getblockchaininfo", + shell=True, capture_output=True, text=True, timeout=10) + if res.returncode == 0: + print(f" ready after {i+1}s") + break + else: + print(f"\n {R}ERROR: bitcoind didn't start after 30s{RST}") + sys.exit(1) + print(" ✓ bitcoind started") + + # Create wallets + for w in WALLETS: + btc(f'createwallet "{w}"') + print(f" ✓ Created wallets: {', '.join(WALLETS)}") + + # Mine 110 blocks to get mature coinbases + addr = btcw("miner", 'getnewaddress "" bech32') + btc(f"generatetoaddress 110 {addr}") + balance = btcw("miner", "getbalance") + print(f" ✓ Mined 110 blocks — miner balance: {balance} BTC") + + +# ───────────────────────────────────────────────────────────────────────────── +# Step 2: Reproduce +# ───────────────────────────────────────────────────────────────────────────── +def run_reproduce(): + banner("Step 2: Run reproduce.py (create 12 vulnerability scenarios)") + result = subprocess.run( + [sys.executable, os.path.join(DIR, "reproduce.py")], + capture_output=True, text=True, timeout=300, + ) + if result.returncode != 0: + print(f" {R}reproduce.py FAILED:{RST}") + print(result.stderr) + sys.exit(1) + + # Count successes + successes = result.stdout.count("✓") + print(f" ✓ reproduce.py completed — {successes} scenario(s) created") + # Print abbreviated output + for line in result.stdout.split("\n"): + if "✓" in line or "REPRODUCE" in line: + print(f" {line.strip()}") + return result.stdout + + +# ───────────────────────────────────────────────────────────────────────────── +# Step 3: Detect +# ───────────────────────────────────────────────────────────────────────────── +def run_detect(): + banner("Step 3: Run detect.py --wallet alice") + result = subprocess.run( + [sys.executable, os.path.join(DIR, "detect.py"), + "--wallet", "alice", + "--known-risky-wallets", "risky", + "--known-exchange-wallets", "exchange"], + capture_output=True, text=True, timeout=300, + ) + if result.returncode != 0: + print(f" {R}detect.py FAILED:{RST}") + print(result.stderr) + sys.exit(1) + print(f" ✓ detect.py completed") + return result.stdout + + +# ───────────────────────────────────────────────────────────────────────────── +# Step 4: Parse & verify +# ───────────────────────────────────────────────────────────────────────────── +DETECTORS = { + 1: ("Address Reuse", r"1 · Address Reuse"), + 2: ("CIOH", r"2 · Common Input Ownership"), + 3: ("Dust UTXO Detection", r"3 · Dust UTXO Detection"), + 4: ("Dust Spent with Normal", r"4 · Dust Spent with Normal"), + 5: ("Change Output Detection", r"5 · Probable Change Output"), + 6: ("Consolidation Origin", r"6 · UTXOs from Prior Consolidation"), + 7: ("Script Type Mixing", r"7 · Script Type Mixing"), + 8: ("Cluster Merge", r"8 · Cluster Merge"), + 9: ("UTXO Age / Lookback", r"9 · UTXO Age"), + 10: ("Exchange Origin", r"10 · Probable Exchange Origin"), + 11: ("Tainted UTXOs", r"11 · Tainted UTXOs"), + 12: ("Behavioral Fingerprint", r"12 · Behavioral Fingerprint"), +} + + +def parse_and_verify(detect_output): + banner("Step 4: Verification — does detect catch every reproduced vulnerability?") + + # Split output into sections per detector + lines = detect_output.split("\n") + results = {} + current_id = None + + for line in lines: + # Check if this line starts a detector section + for did, (name, pattern) in DETECTORS.items(): + if pattern in line: + current_id = did + results[did] = {"findings": 0, "warnings": 0, "lines": []} + break + # Count findings/warnings within current section + if current_id is not None: + if "FINDING" in line: + results[current_id]["findings"] += 1 + if "WARNING" in line: + results[current_id]["warnings"] += 1 + results[current_id]["lines"].append(line) + + # Also parse the summary line + total_findings = 0 + total_warnings = 0 + m = re.search(r"Findings:\s+(\d+)", detect_output) + if m: + total_findings = int(m.group(1)) + m = re.search(r"Warnings:\s+(\d+)", detect_output) + if m: + total_warnings = int(m.group(1)) + + # ── Print proof table ── + print() + print(f" {'#':>3} {'Detector':<30} {'Findings':>8} {'Warnings':>8} {'Status'}") + print(f" {'─'*3} {'─'*30} {'─'*8} {'─'*8} {'─'*8}") + + all_pass = True + for did in sorted(DETECTORS.keys()): + name = DETECTORS[did][0] + r = results.get(did, {"findings": 0, "warnings": 0}) + f_count = r["findings"] + w_count = r["warnings"] + detected = f_count > 0 or w_count > 0 + status = f"{G}PASS ✓{RST}" if detected else f"{R}FAIL ✗{RST}" + if not detected: + all_pass = False + print(f" {did:>3} {name:<30} {f_count:>8} {w_count:>8} {status}") + + print(f" {'─'*3} {'─'*30} {'─'*8} {'─'*8} {'─'*8}") + print(f" {'':>3} {'TOTAL':<30} {total_findings:>8} {total_warnings:>8}") + print() + + if all_pass: + print(f" {G}{B}═══ ALL 12 DETECTORS FIRED — PROOF COMPLETE ═══{RST}") + print(f" {G}Every reproduced vulnerability was caught by detect.py on regtest.{RST}") + else: + failed = [did for did in DETECTORS if results.get(did, {}).get("findings", 0) == 0 + and results.get(did, {}).get("warnings", 0) == 0] + print(f" {R}{B}═══ FAILURE — {len(failed)} detector(s) did not fire ═══{RST}") + for did in failed: + print(f" {R} Detector {did}: {DETECTORS[did][0]}{RST}") + + return all_pass + + +# ───────────────────────────────────────────────────────────────────────────── +# Main +# ───────────────────────────────────────────────────────────────────────────── +def main(): + print(f"\n{B}{'═' * 70}{RST}") + print(f"{B}{C} VERIFY: reproduce → detect end-to-end proof on REGTEST{RST}") + print(f"{B}{'═' * 70}{RST}") + + setup_regtest() + run_reproduce() + detect_output = run_detect() + passed = parse_and_verify(detect_output) + + print() + sys.exit(0 if passed else 1) + + +if __name__ == "__main__": + main()