mirror of
https://github.com/LORDBABUINO/stealth.git
synced 2026-04-26 23:49:59 -07:00
1080 lines
44 KiB
Python
1080 lines
44 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
test_vulnerabilities.py
|
||
=======================
|
||
Reproduces and verifies 12 Bitcoin privacy vulnerabilities on a local custom Signet.
|
||
|
||
Each test:
|
||
1. Creates the vulnerability scenario using real Bitcoin transactions
|
||
2. Analyzes the on-chain data to DETECT the vulnerability
|
||
3. Asserts the detection is correct (proving the vulnerability exists)
|
||
|
||
Usage:
|
||
python3 test_vulnerabilities.py # Run all tests
|
||
python3 test_vulnerabilities.py -k 1 # Run test for vulnerability 1
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import json
|
||
import time
|
||
import math
|
||
from collections import defaultdict
|
||
|
||
# Add project dir to path
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
from bitcoin_rpc import (
|
||
cli, mine_blocks, get_tx, get_utxos, get_balance,
|
||
get_new_address, send_to_address, create_raw_tx, sign_raw_tx,
|
||
send_raw, decode_raw_tx, get_block_count, create_funded_psbt,
|
||
process_psbt, finalize_psbt,
|
||
)
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# ANSI colors for output
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
GREEN = "\033[92m"
|
||
RED = "\033[91m"
|
||
YELLOW = "\033[93m"
|
||
CYAN = "\033[96m"
|
||
BOLD = "\033[1m"
|
||
RESET = "\033[0m"
|
||
|
||
PASS_COUNT = 0
|
||
FAIL_COUNT = 0
|
||
|
||
|
||
def header(num, title):
|
||
print(f"\n{'═'*78}")
|
||
print(f"{BOLD}{CYAN} VULNERABILITY {num}: {title}{RESET}")
|
||
print(f"{'═'*78}")
|
||
|
||
|
||
def check(condition, msg):
|
||
global PASS_COUNT, FAIL_COUNT
|
||
if condition:
|
||
PASS_COUNT += 1
|
||
print(f" {GREEN}✓ PASS:{RESET} {msg}")
|
||
else:
|
||
FAIL_COUNT += 1
|
||
print(f" {RED}✗ FAIL:{RESET} {msg}")
|
||
return condition
|
||
|
||
|
||
def info(msg):
|
||
print(f" {YELLOW}ℹ{RESET} {msg}")
|
||
|
||
|
||
def ensure_funds(wallet, min_btc=0.5):
|
||
"""Ensure wallet has at least min_btc, fund from miner if needed."""
|
||
bal = get_balance(wallet)
|
||
if bal < min_btc:
|
||
addr = get_new_address(wallet, "bech32")
|
||
send_to_address("miner", addr, min_btc + 0.1)
|
||
mine_blocks(1)
|
||
|
||
|
||
def mine_and_confirm():
|
||
"""Mine 1 block to confirm pending transactions."""
|
||
mine_blocks(1)
|
||
time.sleep(1)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 1: Address Reuse (Reutilização de endereços)
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_01_address_reuse():
|
||
header(1, "Address Reuse (Reutilização de endereços)")
|
||
|
||
ensure_funds("bob", 1.0)
|
||
|
||
# REPRODUCE: Generate ONE address for Alice, receive payments multiple times
|
||
reused_addr = get_new_address("alice", "bech32")
|
||
info(f"Alice's reused address: {reused_addr}")
|
||
|
||
txid1 = send_to_address("bob", reused_addr, 0.01)
|
||
txid2 = send_to_address("bob", reused_addr, 0.02)
|
||
info(f"TX1: {txid1[:16]}... (0.01 BTC)")
|
||
info(f"TX2: {txid2[:16]}... (0.02 BTC)")
|
||
|
||
mine_and_confirm()
|
||
|
||
# DETECT: Find the same address appearing as output in multiple transactions
|
||
tx1 = get_tx(txid1)
|
||
tx2 = get_tx(txid2)
|
||
|
||
addr_occurrences = defaultdict(list)
|
||
for tx_data, txid in [(tx1, txid1), (tx2, txid2)]:
|
||
for vout in tx_data["vout"]:
|
||
addr = vout.get("scriptPubKey", {}).get("address", "")
|
||
if addr:
|
||
addr_occurrences[addr].append(txid)
|
||
|
||
# Check: reused_addr appears in outputs of BOTH transactions
|
||
reuse_count = len(addr_occurrences.get(reused_addr, []))
|
||
check(reuse_count >= 2,
|
||
f"Address {reused_addr[:20]}... found in {reuse_count} distinct transactions (need ≥2)")
|
||
|
||
# Show the privacy impact
|
||
info(f"PRIVACY IMPACT: An observer can link TX1 and TX2 to the same entity")
|
||
info(f" because the same address {reused_addr[:20]}... receives funds in both")
|
||
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 2: Multi-input Transactions (Consolidation / CIOH)
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_02_consolidation_cioh():
|
||
header(2, "Multi-input Transactions (Consolidation / CIOH)")
|
||
|
||
ensure_funds("bob", 2.0)
|
||
|
||
# REPRODUCE: Create 5 separate UTXOs for Alice, then spend them all at once
|
||
alice_addrs = []
|
||
for i in range(5):
|
||
addr = get_new_address("alice", "bech32")
|
||
send_to_address("bob", addr, 0.005)
|
||
alice_addrs.append(addr)
|
||
info(f"UTXO {i+1}: 0.005 BTC -> {addr[:20]}...")
|
||
|
||
mine_and_confirm()
|
||
|
||
# Select all Alice's UTXOs explicitly
|
||
utxos = get_utxos("alice", 1)
|
||
small_utxos = [u for u in utxos if 0.004 < u["amount"] < 0.006]
|
||
info(f"Found {len(small_utxos)} small UTXOs to consolidate")
|
||
|
||
# Build consolidation TX using PSBT
|
||
inputs = [{"txid": u["txid"], "vout": u["vout"]} for u in small_utxos[:5]]
|
||
dest_addr = get_new_address("bob", "bech32")
|
||
|
||
total_input = sum(u["amount"] for u in small_utxos[:5])
|
||
send_amount = round(total_input - 0.001, 8) # leave fee
|
||
|
||
psbt_result = create_funded_psbt(
|
||
"alice",
|
||
inputs,
|
||
[{dest_addr: send_amount}],
|
||
{"subtractFeeFromOutputs": [0], "add_inputs": False}
|
||
)
|
||
psbt = psbt_result["psbt"]
|
||
signed = process_psbt("alice", psbt)
|
||
final = finalize_psbt(signed["psbt"])
|
||
txid = send_raw(final["hex"])
|
||
info(f"Consolidation TX: {txid[:16]}...")
|
||
|
||
mine_and_confirm()
|
||
|
||
# DETECT: Transaction with N≥2 inputs = CIOH trigger
|
||
tx = get_tx(txid)
|
||
num_inputs = len(tx["vin"])
|
||
num_outputs = len(tx["vout"])
|
||
|
||
check(num_inputs >= 2,
|
||
f"Transaction has {num_inputs} inputs (CIOH: all inputs assumed same owner)")
|
||
check(num_inputs >= 3 and num_outputs <= 2,
|
||
f"Consolidation shape: {num_inputs} inputs → {num_outputs} outputs (many→few)")
|
||
|
||
info(f"PRIVACY IMPACT: All {num_inputs} input addresses are now linked as same entity")
|
||
for vin in tx["vin"]:
|
||
parent_tx = get_tx(vin["txid"])
|
||
addr = parent_tx["vout"][vin["vout"]]["scriptPubKey"].get("address", "?")
|
||
info(f" Linked address: {addr[:25]}...")
|
||
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 3: Dust UTXO Detection (Detecção de UTXOs dust)
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_03_dust_detection():
|
||
header(3, "Dust UTXO Detection (Detecção de UTXOs dust)")
|
||
|
||
ensure_funds("bob", 1.0)
|
||
|
||
# REPRODUCE: Create very small UTXOs (dust-class)
|
||
# Standard dust threshold for P2WPKH is ~294 sats at default relay fee
|
||
# We'll create UTXOs of 546 sats (0.00000546) and 1000 sats (0.00001000)
|
||
alice_dust_addr1 = get_new_address("alice", "bech32")
|
||
alice_dust_addr2 = get_new_address("alice", "bech32")
|
||
info(f"Dust target address 1: {alice_dust_addr1[:20]}...")
|
||
info(f"Dust target address 2: {alice_dust_addr2[:20]}...")
|
||
|
||
# Use raw tx to create precise dust amounts
|
||
bob_utxos = get_utxos("bob", 1)
|
||
big_utxo = max(bob_utxos, key=lambda u: u["amount"])
|
||
info(f"Using Bob's UTXO: {big_utxo['amount']} BTC")
|
||
|
||
change_addr = get_new_address("bob", "bech32")
|
||
change_amount = round(big_utxo["amount"] - 0.00001000 - 0.00000546 - 0.0001, 8)
|
||
|
||
raw_tx = create_raw_tx(
|
||
[{"txid": big_utxo["txid"], "vout": big_utxo["vout"]}],
|
||
[
|
||
{alice_dust_addr1: 0.00001000}, # 1000 sats - dust-class
|
||
{alice_dust_addr2: 0.00000546}, # 546 sats - at dust threshold
|
||
{change_addr: change_amount},
|
||
]
|
||
)
|
||
signed = sign_raw_tx("bob", raw_tx)
|
||
txid = send_raw(signed["hex"])
|
||
info(f"Dust TX: {txid[:16]}...")
|
||
|
||
mine_and_confirm()
|
||
|
||
# DETECT: Scan outputs for values below dust threshold
|
||
tx = get_tx(txid)
|
||
DUST_THRESHOLD_SATS = 1000 # Conservative: anything ≤ 1000 sats is "dust-class"
|
||
STRICT_DUST_SATS = 546 # Bitcoin Core's strict P2WPKH dust limit
|
||
|
||
dust_outputs = []
|
||
for vout in tx["vout"]:
|
||
value_sats = int(round(vout["value"] * 1e8))
|
||
if value_sats <= DUST_THRESHOLD_SATS:
|
||
dust_outputs.append({
|
||
"vout_n": vout["n"],
|
||
"value_sats": value_sats,
|
||
"address": vout["scriptPubKey"].get("address", "?"),
|
||
"is_strict_dust": value_sats <= STRICT_DUST_SATS,
|
||
})
|
||
|
||
check(len(dust_outputs) >= 2,
|
||
f"Found {len(dust_outputs)} dust outputs (≤{DUST_THRESHOLD_SATS} sats)")
|
||
|
||
strict_dust = [d for d in dust_outputs if d["is_strict_dust"]]
|
||
check(len(strict_dust) >= 1,
|
||
f"Found {len(strict_dust)} outputs at/below strict dust threshold (≤{STRICT_DUST_SATS} sats)")
|
||
|
||
for d in dust_outputs:
|
||
info(f" Dust output #{d['vout_n']}: {d['value_sats']} sats -> {d['address'][:20]}... "
|
||
f"({'STRICT DUST' if d['is_strict_dust'] else 'dust-class'})")
|
||
|
||
info("PRIVACY IMPACT: Dust UTXOs can be used as tracking tokens (dust attacks)")
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 4: Spending Dust with Other Inputs
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_04_dust_spending():
|
||
header(4, "Spending Dust UTXOs with Other Inputs")
|
||
|
||
ensure_funds("alice", 1.0)
|
||
|
||
# REPRODUCE: Alice has dust UTXOs from test 3, plus normal UTXOs
|
||
# Spend a dust UTXO together with a normal UTXO
|
||
utxos = get_utxos("alice", 1)
|
||
|
||
dust_utxos = [u for u in utxos if u["amount"] <= 0.00001]
|
||
normal_utxos = [u for u in utxos if u["amount"] > 0.001]
|
||
|
||
if not dust_utxos:
|
||
info("No dust UTXOs found, creating one...")
|
||
# Create a dust UTXO for alice
|
||
ensure_funds("bob", 1.0)
|
||
alice_addr = get_new_address("alice", "bech32")
|
||
bob_utxos = get_utxos("bob", 1)
|
||
big_utxo = max(bob_utxos, key=lambda u: u["amount"])
|
||
change_addr = get_new_address("bob", "bech32")
|
||
change_amount = round(big_utxo["amount"] - 0.00001000 - 0.0001, 8)
|
||
raw_tx = create_raw_tx(
|
||
[{"txid": big_utxo["txid"], "vout": big_utxo["vout"]}],
|
||
[{alice_addr: 0.00001000}, {change_addr: change_amount}]
|
||
)
|
||
signed = sign_raw_tx("bob", raw_tx)
|
||
send_raw(signed["hex"])
|
||
mine_and_confirm()
|
||
utxos = get_utxos("alice", 1)
|
||
dust_utxos = [u for u in utxos if u["amount"] <= 0.00001]
|
||
normal_utxos = [u for u in utxos if u["amount"] > 0.001]
|
||
|
||
if not normal_utxos:
|
||
info("No normal UTXOs found, creating one...")
|
||
ensure_funds("alice", 0.5)
|
||
mine_and_confirm()
|
||
utxos = get_utxos("alice", 1)
|
||
normal_utxos = [u for u in utxos if u["amount"] > 0.001]
|
||
|
||
dust = dust_utxos[0]
|
||
normal = normal_utxos[0]
|
||
|
||
info(f"Dust UTXO: {dust['amount']:.8f} BTC ({int(dust['amount']*1e8)} sats)")
|
||
info(f"Normal UTXO: {normal['amount']:.8f} BTC")
|
||
|
||
# Spend both together
|
||
dest_addr = get_new_address("bob", "bech32")
|
||
total = dust["amount"] + normal["amount"]
|
||
send_amt = round(total - 0.0001, 8)
|
||
|
||
raw_tx = create_raw_tx(
|
||
[
|
||
{"txid": dust["txid"], "vout": dust["vout"]},
|
||
{"txid": normal["txid"], "vout": normal["vout"]},
|
||
],
|
||
[{dest_addr: send_amt}]
|
||
)
|
||
signed = sign_raw_tx("alice", raw_tx)
|
||
txid = send_raw(signed["hex"])
|
||
info(f"Dust-spend TX: {txid[:16]}...")
|
||
|
||
mine_and_confirm()
|
||
|
||
# DETECT: A tx with inputs mixing dust and non-dust
|
||
tx = get_tx(txid)
|
||
input_values = []
|
||
for vin in tx["vin"]:
|
||
parent = get_tx(vin["txid"])
|
||
val = parent["vout"][vin["vout"]]["value"]
|
||
input_values.append(val)
|
||
|
||
dust_inputs = [v for v in input_values if v <= 0.00001]
|
||
non_dust_inputs = [v for v in input_values if v > 0.001]
|
||
|
||
check(len(dust_inputs) >= 1 and len(non_dust_inputs) >= 1,
|
||
f"TX mixes {len(dust_inputs)} dust input(s) with {len(non_dust_inputs)} normal input(s)")
|
||
|
||
info("PRIVACY IMPACT: Dust attack succeeds—the dust sender can now link")
|
||
info(" Alice's normal UTXO to the dust tracking token via CIOH")
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 5: Change Detection (Detecção provável de troco)
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_05_change_detection():
|
||
header(5, "Probable Change Detection (Detecção provável de troco)")
|
||
|
||
ensure_funds("alice", 1.0)
|
||
|
||
# REPRODUCE: Alice pays Bob a round amount; wallet auto-creates change
|
||
bob_addr = get_new_address("bob", "bech32")
|
||
txid = send_to_address("alice", bob_addr, 0.05) # Round payment
|
||
info(f"Payment TX: {txid[:16]}...")
|
||
|
||
mine_and_confirm()
|
||
|
||
# DETECT: Heuristic change detection
|
||
tx = get_tx(txid)
|
||
|
||
payment_output = None
|
||
change_candidate = None
|
||
|
||
for vout in tx["vout"]:
|
||
addr = vout["scriptPubKey"].get("address", "")
|
||
value = vout["value"]
|
||
value_sats = int(round(value * 1e8))
|
||
|
||
# Heuristic 1: Round amount = payment (not change)
|
||
is_round = (value_sats % 100000 == 0) or (value_sats % 1000000 == 0)
|
||
|
||
# Heuristic 2: Recipient address
|
||
is_to_bob = (addr == bob_addr)
|
||
|
||
if is_to_bob or is_round:
|
||
payment_output = {"n": vout["n"], "value": value, "addr": addr, "round": is_round}
|
||
else:
|
||
change_candidate = {"n": vout["n"], "value": value, "addr": addr, "round": is_round}
|
||
|
||
check(payment_output is not None,
|
||
f"Payment output detected: {payment_output['value']:.8f} BTC (round={payment_output['round']})")
|
||
|
||
check(change_candidate is not None,
|
||
f"Change candidate detected: {change_candidate['value']:.8f} BTC (non-round amount)")
|
||
|
||
if payment_output and change_candidate:
|
||
# Verify: change output should be the "odd" amount
|
||
check(not change_candidate["round"],
|
||
f"Change has non-round value ({int(change_candidate['value']*1e8)} sats) — strong change indicator")
|
||
|
||
# Heuristic 3: Same script type as input
|
||
input_tx = get_tx(tx["vin"][0]["txid"])
|
||
input_type = input_tx["vout"][tx["vin"][0]["vout"]]["scriptPubKey"]["type"]
|
||
change_type = tx["vout"][change_candidate["n"]]["scriptPubKey"]["type"]
|
||
check(input_type == change_type,
|
||
f"Change has same script type as input ({change_type}) — another strong indicator")
|
||
|
||
info("PRIVACY IMPACT: Observer can distinguish payment from change,")
|
||
info(" identifying the sender's change address and tracking their funds")
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 6: UTXOs from Prior Consolidation
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_06_consolidation_origin():
|
||
header(6, "UTXOs Originating from Prior Consolidation")
|
||
|
||
ensure_funds("bob", 2.0)
|
||
|
||
# REPRODUCE: Step 1 - Create a consolidation transaction for Alice
|
||
for i in range(4):
|
||
addr = get_new_address("alice", "bech32")
|
||
send_to_address("bob", addr, 0.003)
|
||
|
||
mine_and_confirm()
|
||
|
||
# Consolidate
|
||
utxos = get_utxos("alice", 1)
|
||
small_utxos = [u for u in utxos if 0.002 < u["amount"] < 0.004][:4]
|
||
|
||
if len(small_utxos) < 2:
|
||
info(f"Not enough small UTXOs ({len(small_utxos)}), creating more...")
|
||
for i in range(4):
|
||
addr = get_new_address("alice", "bech32")
|
||
send_to_address("bob", addr, 0.003)
|
||
mine_and_confirm()
|
||
utxos = get_utxos("alice", 1)
|
||
small_utxos = [u for u in utxos if 0.002 < u["amount"] < 0.004][:4]
|
||
|
||
inputs = [{"txid": u["txid"], "vout": u["vout"]} for u in small_utxos]
|
||
consolidation_addr = get_new_address("alice", "bech32")
|
||
total = sum(u["amount"] for u in small_utxos)
|
||
send_amt = round(total - 0.0001, 8)
|
||
|
||
raw_tx = create_raw_tx(inputs, [{consolidation_addr: send_amt}])
|
||
signed = sign_raw_tx("alice", raw_tx)
|
||
consolidation_txid = send_raw(signed["hex"])
|
||
info(f"Consolidation TX: {consolidation_txid[:16]}... ({len(inputs)} inputs → 1 output)")
|
||
|
||
mine_and_confirm()
|
||
|
||
# Step 2 - Spend the consolidated output
|
||
utxos = get_utxos("alice", 1)
|
||
consolidated = [u for u in utxos if u["txid"] == consolidation_txid]
|
||
|
||
if consolidated:
|
||
dest = get_new_address("carol", "bech32")
|
||
spend_amt = round(consolidated[0]["amount"] - 0.0001, 8)
|
||
raw_tx = create_raw_tx(
|
||
[{"txid": consolidated[0]["txid"], "vout": consolidated[0]["vout"]}],
|
||
[{dest: spend_amt}]
|
||
)
|
||
signed = sign_raw_tx("alice", raw_tx)
|
||
spend_txid = send_raw(signed["hex"])
|
||
info(f"Spend TX: {spend_txid[:16]}...")
|
||
mine_and_confirm()
|
||
|
||
# DETECT: Check if input's parent tx has consolidation shape
|
||
spend_tx = get_tx(spend_txid)
|
||
parent_txid = spend_tx["vin"][0]["txid"]
|
||
parent_tx = get_tx(parent_txid)
|
||
parent_inputs = len(parent_tx["vin"])
|
||
parent_outputs = len(parent_tx["vout"])
|
||
|
||
is_from_consolidation = parent_inputs >= 3 and parent_outputs <= 2
|
||
|
||
check(is_from_consolidation,
|
||
f"UTXO parent has consolidation shape: {parent_inputs} inputs → {parent_outputs} output(s)")
|
||
check(parent_inputs >= 3,
|
||
f"Parent tx has {parent_inputs} inputs (threshold: ≥3 = consolidation)")
|
||
|
||
info("PRIVACY IMPACT: UTXOs born from consolidation carry the full")
|
||
info(" cluster linkage of ALL inputs that were merged")
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 7: Script Type Inconsistency / Mixing
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_07_script_type_mixing():
|
||
header(7, "Script Type Inconsistency / Mixing")
|
||
|
||
ensure_funds("bob", 2.0)
|
||
|
||
# REPRODUCE: Create UTXOs of different script types for Alice
|
||
wpkh_addr = get_new_address("alice", "bech32") # P2WPKH (bc1q...)
|
||
tr_addr = get_new_address("alice", "bech32m") # P2TR (bc1p...)
|
||
|
||
info(f"P2WPKH address: {wpkh_addr[:20]}...")
|
||
info(f"P2TR address: {tr_addr[:20]}...")
|
||
|
||
send_to_address("bob", wpkh_addr, 0.005)
|
||
send_to_address("bob", tr_addr, 0.005)
|
||
mine_and_confirm()
|
||
|
||
# Now spend both in the same transaction
|
||
utxos = get_utxos("alice", 1)
|
||
|
||
wpkh_utxo = None
|
||
tr_utxo = None
|
||
for u in utxos:
|
||
if u.get("address", "").startswith("tb1q") and u["amount"] >= 0.004 and not wpkh_utxo:
|
||
wpkh_utxo = u
|
||
elif u.get("address", "").startswith("tb1p") and u["amount"] >= 0.004 and not tr_utxo:
|
||
tr_utxo = u
|
||
|
||
if not wpkh_utxo or not tr_utxo:
|
||
# Fallback: try with desc type
|
||
for u in utxos:
|
||
desc = u.get("desc", "")
|
||
if "wpkh" in desc and u["amount"] >= 0.004 and not wpkh_utxo:
|
||
wpkh_utxo = u
|
||
elif "tr(" in desc and u["amount"] >= 0.004 and not tr_utxo:
|
||
tr_utxo = u
|
||
|
||
if not wpkh_utxo or not tr_utxo:
|
||
info("Could not find both UTXO types, listing available:")
|
||
for u in utxos:
|
||
info(f" {u.get('address','?')[:25]}... = {u['amount']} ({u.get('desc','?')[:20]})")
|
||
info("Skipping mixed-input test, testing output-side mixing instead...")
|
||
|
||
# Output-side mixing: pay to P2WPKH and change to P2TR
|
||
if utxos:
|
||
dest_wpkh = get_new_address("bob", "bech32")
|
||
dest_tr = get_new_address("bob", "bech32m")
|
||
u = utxos[0]
|
||
half = round(u["amount"] / 2 - 0.00005, 8)
|
||
raw_tx = create_raw_tx(
|
||
[{"txid": u["txid"], "vout": u["vout"]}],
|
||
[{dest_wpkh: half}, {dest_tr: half}]
|
||
)
|
||
signed = sign_raw_tx("alice", raw_tx)
|
||
txid = send_raw(signed["hex"])
|
||
mine_and_confirm()
|
||
tx = get_tx(txid)
|
||
output_types = set()
|
||
for vout in tx["vout"]:
|
||
output_types.add(vout["scriptPubKey"]["type"])
|
||
check(len(output_types) >= 2,
|
||
f"Output script types: {output_types} — heterogeneous outputs")
|
||
return True
|
||
|
||
info(f"P2WPKH UTXO: {wpkh_utxo['amount']} BTC at {wpkh_utxo.get('address','?')[:20]}...")
|
||
info(f"P2TR UTXO: {tr_utxo['amount']} BTC at {tr_utxo.get('address','?')[:20]}...")
|
||
|
||
dest = get_new_address("bob", "bech32")
|
||
total = wpkh_utxo["amount"] + tr_utxo["amount"]
|
||
send_amt = round(total - 0.0002, 8)
|
||
|
||
raw_tx = create_raw_tx(
|
||
[
|
||
{"txid": wpkh_utxo["txid"], "vout": wpkh_utxo["vout"]},
|
||
{"txid": tr_utxo["txid"], "vout": tr_utxo["vout"]},
|
||
],
|
||
[{dest: send_amt}]
|
||
)
|
||
signed = sign_raw_tx("alice", raw_tx)
|
||
txid = send_raw(signed["hex"])
|
||
info(f"Mixed-type TX: {txid[:16]}...")
|
||
|
||
mine_and_confirm()
|
||
|
||
# DETECT: Check if inputs have different script types
|
||
tx = get_tx(txid)
|
||
input_types = set()
|
||
for vin in tx["vin"]:
|
||
parent = get_tx(vin["txid"])
|
||
script_type = parent["vout"][vin["vout"]]["scriptPubKey"]["type"]
|
||
input_types.add(script_type)
|
||
info(f" Input type: {script_type}")
|
||
|
||
check(len(input_types) >= 2,
|
||
f"Input script types: {input_types} — heterogeneous (fingerprint!)")
|
||
|
||
info("PRIVACY IMPACT: Mixing script types is a behavioral fingerprint")
|
||
info(" and reveals the wallet controls both address families")
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 8: Merging Previously Separate UTXO Clusters
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_08_cluster_merge():
|
||
header(8, "Merging Previously Separate UTXO Clusters")
|
||
|
||
ensure_funds("bob", 2.0)
|
||
ensure_funds("carol", 2.0)
|
||
|
||
# REPRODUCE: Create two separate clusters for Alice
|
||
# Cluster A: from Bob
|
||
cluster_a_addr = get_new_address("alice", "bech32")
|
||
txid_a = send_to_address("bob", cluster_a_addr, 0.004)
|
||
info(f"Cluster A (from Bob): {cluster_a_addr[:20]}... = 0.004 BTC")
|
||
|
||
# Cluster B: from Carol
|
||
cluster_b_addr = get_new_address("alice", "bech32")
|
||
txid_b = send_to_address("carol", cluster_b_addr, 0.004)
|
||
info(f"Cluster B (from Carol): {cluster_b_addr[:20]}... = 0.004 BTC")
|
||
|
||
mine_and_confirm()
|
||
|
||
# Find the specific UTXOs
|
||
utxos = get_utxos("alice", 1)
|
||
utxo_a = next((u for u in utxos if u["txid"] == txid_a), None)
|
||
utxo_b = next((u for u in utxos if u["txid"] == txid_b), None)
|
||
|
||
if not utxo_a or not utxo_b:
|
||
info("Searching for UTXOs by address...")
|
||
utxo_a = next((u for u in utxos if u.get("address") == cluster_a_addr), None)
|
||
utxo_b = next((u for u in utxos if u.get("address") == cluster_b_addr), None)
|
||
|
||
if not utxo_a or not utxo_b:
|
||
info("Could not locate both cluster UTXOs")
|
||
return False
|
||
|
||
# MERGE: Spend one from each cluster together
|
||
dest = get_new_address("bob", "bech32")
|
||
total = utxo_a["amount"] + utxo_b["amount"]
|
||
send_amt = round(total - 0.0002, 8)
|
||
|
||
raw_tx = create_raw_tx(
|
||
[
|
||
{"txid": utxo_a["txid"], "vout": utxo_a["vout"]},
|
||
{"txid": utxo_b["txid"], "vout": utxo_b["vout"]},
|
||
],
|
||
[{dest: send_amt}]
|
||
)
|
||
signed = sign_raw_tx("alice", raw_tx)
|
||
merge_txid = send_raw(signed["hex"])
|
||
info(f"Cluster merge TX: {merge_txid[:16]}...")
|
||
|
||
mine_and_confirm()
|
||
|
||
# DETECT: Check if inputs come from different source clusters
|
||
merge_tx = get_tx(merge_txid)
|
||
source_txids = [vin["txid"] for vin in merge_tx["vin"]]
|
||
|
||
# Trace each input to its source
|
||
sources = {}
|
||
for vin in merge_tx["vin"]:
|
||
parent = get_tx(vin["txid"])
|
||
# Who funded this? Check the inputs of the parent tx
|
||
if parent["vin"][0].get("coinbase"):
|
||
sources[vin["txid"]] = "coinbase"
|
||
else:
|
||
grandparent_txid = parent["vin"][0]["txid"]
|
||
grandparent = get_tx(grandparent_txid)
|
||
# Check which wallet owned the input
|
||
sources[vin["txid"]] = grandparent_txid[:16]
|
||
|
||
distinct_sources = len(set(sources.values()))
|
||
check(len(source_txids) >= 2,
|
||
f"Merge TX has {len(source_txids)} inputs from different funding transactions")
|
||
|
||
check(distinct_sources >= 2 or len(source_txids) >= 2,
|
||
f"Inputs trace to {distinct_sources} distinct source chains — clusters merged!")
|
||
|
||
info("PRIVACY IMPACT: Previously separate identity clusters (Bob-linked")
|
||
info(" and Carol-linked) are now permanently merged into one cluster")
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 9: UTXO Historical Depth (Lookback Depth)
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_09_lookback_depth():
|
||
header(9, "UTXO Historical Depth (Lookback Depth)")
|
||
|
||
ensure_funds("alice", 1.0)
|
||
|
||
# REPRODUCE: Create an "old" UTXO and let it age many blocks
|
||
old_addr = get_new_address("alice", "bech32")
|
||
old_txid = send_to_address("miner", old_addr, 0.01)
|
||
info(f"Old UTXO created: {old_txid[:16]}...")
|
||
|
||
mine_blocks(20) # Age it 20 blocks
|
||
info("Mined 20 blocks to age the UTXO")
|
||
|
||
# Create a "new" UTXO
|
||
new_addr = get_new_address("alice", "bech32")
|
||
new_txid = send_to_address("miner", new_addr, 0.01)
|
||
info(f"New UTXO created: {new_txid[:16]}...")
|
||
|
||
mine_and_confirm()
|
||
|
||
# DETECT: Compare confirmation depths
|
||
old_tx = get_tx(old_txid)
|
||
new_tx = get_tx(new_txid)
|
||
|
||
old_confs = old_tx.get("confirmations", 0)
|
||
new_confs = new_tx.get("confirmations", 0)
|
||
|
||
check(old_confs > new_confs + 10,
|
||
f"Old UTXO: {old_confs} confirmations vs New UTXO: {new_confs} confirmations (diff={old_confs - new_confs})")
|
||
|
||
# Ancestor chain analysis
|
||
def trace_depth(txid, max_depth=10):
|
||
"""Walk back through the transaction chain."""
|
||
depth = 0
|
||
current_txid = txid
|
||
chain = [current_txid[:16]]
|
||
for _ in range(max_depth):
|
||
tx = get_tx(current_txid)
|
||
if tx["vin"][0].get("coinbase"):
|
||
chain.append("COINBASE")
|
||
break
|
||
current_txid = tx["vin"][0]["txid"]
|
||
chain.append(current_txid[:16])
|
||
depth += 1
|
||
return depth, chain
|
||
|
||
old_depth, old_chain = trace_depth(old_txid)
|
||
new_depth, new_chain = trace_depth(new_txid)
|
||
|
||
info(f"Old UTXO chain depth: {old_depth} hops: {' → '.join(old_chain[:5])}")
|
||
info(f"New UTXO chain depth: {new_depth} hops: {' → '.join(new_chain[:5])}")
|
||
|
||
check(old_confs >= 15,
|
||
f"Old UTXO has ≥15 confirmations ({old_confs}) — detectable age pattern")
|
||
|
||
info("PRIVACY IMPACT: UTXO age reveals dormancy patterns, coin hoarding,")
|
||
info(" or can distinguish 'fresh' exchange withdrawals from aged savings")
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 10: Probable Exchange Origin
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_10_exchange_origin():
|
||
header(10, "Identification of Probable Exchange Origin")
|
||
|
||
ensure_funds("exchange", 5.0)
|
||
|
||
# REPRODUCE: Simulate exchange batch withdrawal (many outputs)
|
||
batch_outputs = {}
|
||
recipients = []
|
||
for i in range(8):
|
||
# Send to alice, bob, carol in round-robin plus random wallets
|
||
wallets = ["alice", "bob", "carol", "alice", "bob", "carol", "alice", "bob"]
|
||
addr = get_new_address(wallets[i], "bech32")
|
||
batch_outputs[addr] = round(0.01 + (i * 0.001), 8)
|
||
recipients.append((wallets[i], addr[:15]))
|
||
|
||
info(f"Exchange batch withdrawal: {len(batch_outputs)} recipients")
|
||
for w, a in recipients:
|
||
info(f" → {w}: {a}...")
|
||
|
||
# Use sendmany for batch
|
||
txid = cli("sendmany", "", json.dumps(batch_outputs), wallet="exchange")
|
||
info(f"Batch TX: {txid[:16]}...")
|
||
|
||
mine_and_confirm()
|
||
|
||
# DETECT: Analyze the transaction for exchange-like patterns
|
||
tx = get_tx(txid)
|
||
num_outputs = len(tx["vout"])
|
||
num_inputs = len(tx["vin"])
|
||
|
||
# Exchange heuristics:
|
||
# 1. High output count (batching)
|
||
is_batch = num_outputs >= 5
|
||
check(is_batch,
|
||
f"High output count: {num_outputs} outputs (≥5 = likely batch withdrawal)")
|
||
|
||
# 2. Round-ish payment amounts (exchanges often use round amounts)
|
||
round_outputs = 0
|
||
for vout in tx["vout"]:
|
||
sats = int(round(vout["value"] * 1e8))
|
||
if sats % 100000 == 0 or sats % 10000 == 0:
|
||
round_outputs += 1
|
||
|
||
# 3. Large input(s) relative to individual outputs
|
||
input_total = 0
|
||
for vin in tx["vin"]:
|
||
parent = get_tx(vin["txid"])
|
||
input_total += parent["vout"][vin["vout"]]["value"]
|
||
|
||
# Exclude the largest output (likely change) — look at median payment
|
||
output_vals = sorted([v["value"] for v in tx["vout"]])
|
||
median_output = output_vals[len(output_vals) // 2]
|
||
ratio = input_total / median_output if median_output > 0 else 0
|
||
|
||
check(ratio > 3,
|
||
f"Input/median-output ratio: {ratio:.1f}x (high ratio suggests large hot wallet)")
|
||
|
||
# 4. Many unique recipient addresses
|
||
unique_addrs = set()
|
||
for vout in tx["vout"]:
|
||
addr = vout["scriptPubKey"].get("address", "")
|
||
if addr:
|
||
unique_addrs.add(addr)
|
||
|
||
check(len(unique_addrs) >= 5,
|
||
f"Unique recipient addresses: {len(unique_addrs)} (many = batch pattern)")
|
||
|
||
info("PRIVACY IMPACT: UTXOs from exchange withdrawals reveal the user")
|
||
info(" interacted with that exchange, enabling entity-linking")
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 11: UTXOs from Risk Sources ("Dirty Money")
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_11_tainted_utxos():
|
||
header(11, 'UTXOs from Risk Sources ("Dirty Money" / Taint)')
|
||
|
||
ensure_funds("risky", 2.0)
|
||
ensure_funds("alice", 1.0)
|
||
|
||
# REPRODUCE: "risky" (known bad actor) sends to Alice
|
||
alice_tainted_addr = get_new_address("alice", "bech32")
|
||
taint_txid = send_to_address("risky", alice_tainted_addr, 0.01)
|
||
info(f"Taint TX (risky → alice): {taint_txid[:16]}...")
|
||
|
||
# Also give Alice a clean UTXO
|
||
alice_clean_addr = get_new_address("alice", "bech32")
|
||
clean_txid = send_to_address("bob", alice_clean_addr, 0.01)
|
||
info(f"Clean TX (bob → alice): {clean_txid[:16]}...")
|
||
|
||
mine_and_confirm()
|
||
|
||
# Step 2: Alice consolidates tainted + clean (taint propagation!)
|
||
utxos = get_utxos("alice", 1)
|
||
tainted_utxo = next((u for u in utxos if u["txid"] == taint_txid), None)
|
||
clean_utxo = next((u for u in utxos if u["txid"] == clean_txid), None)
|
||
|
||
if not tainted_utxo or not clean_utxo:
|
||
info("Locating UTXOs by address...")
|
||
tainted_utxo = next((u for u in utxos if u.get("address") == alice_tainted_addr), None)
|
||
clean_utxo = next((u for u in utxos if u.get("address") == alice_clean_addr), None)
|
||
|
||
if not tainted_utxo or not clean_utxo:
|
||
info("Could not find both UTXOs")
|
||
return False
|
||
|
||
# Merge tainted + clean
|
||
dest = get_new_address("carol", "bech32")
|
||
total = tainted_utxo["amount"] + clean_utxo["amount"]
|
||
send_amt = round(total - 0.0002, 8)
|
||
|
||
raw_tx = create_raw_tx(
|
||
[
|
||
{"txid": tainted_utxo["txid"], "vout": tainted_utxo["vout"]},
|
||
{"txid": clean_utxo["txid"], "vout": clean_utxo["vout"]},
|
||
],
|
||
[{dest: send_amt}]
|
||
)
|
||
signed = sign_raw_tx("alice", raw_tx)
|
||
merge_txid = send_raw(signed["hex"])
|
||
info(f"Taint merge TX: {merge_txid[:16]}...")
|
||
|
||
mine_and_confirm()
|
||
|
||
# DETECT: Taint analysis
|
||
# Build set of TXIDs that originated from the "risky" wallet
|
||
risky_txids = set()
|
||
risky_txs = cli("listtransactions", "*", 100, 0, wallet="risky")
|
||
for rtx in risky_txs:
|
||
if rtx.get("txid"):
|
||
risky_txids.add(rtx["txid"])
|
||
|
||
merge_tx = get_tx(merge_txid)
|
||
|
||
tainted_inputs = 0
|
||
clean_inputs = 0
|
||
for vin in merge_tx["vin"]:
|
||
parent_txid = vin["txid"]
|
||
# A parent TX is tainted if it appears in risky wallet's history
|
||
is_tainted = parent_txid in risky_txids
|
||
if is_tainted:
|
||
tainted_inputs += 1
|
||
info(f" Input from {parent_txid[:16]}... — TAINTED (from risky source)")
|
||
else:
|
||
clean_inputs += 1
|
||
info(f" Input from {parent_txid[:16]}... — CLEAN")
|
||
|
||
check(tainted_inputs >= 1,
|
||
f"Found {tainted_inputs} tainted input(s) in the merge transaction")
|
||
check(tainted_inputs >= 1 and clean_inputs >= 1,
|
||
f"TAINT PROPAGATION: {tainted_inputs} tainted + {clean_inputs} clean merged → all outputs tainted")
|
||
|
||
# Taint scoring
|
||
taint_ratio = tainted_inputs / (tainted_inputs + clean_inputs) if (tainted_inputs + clean_inputs) > 0 else 0
|
||
info(f" Taint ratio: {taint_ratio:.0%} of inputs from risky sources")
|
||
|
||
info("PRIVACY IMPACT: Merging tainted + clean funds contaminates ALL outputs")
|
||
info(" Carol now receives 'dirty' coins even though she dealt with Alice")
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# VULNERABILITY 12: Behavioral Fingerprinting
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def test_12_behavioral_fingerprint():
|
||
header(12, "Behavioral Fingerprinting")
|
||
|
||
ensure_funds("alice", 3.0)
|
||
ensure_funds("bob", 3.0)
|
||
|
||
# REPRODUCE: Create distinctive transaction patterns for Alice vs Bob
|
||
alice_txids = []
|
||
bob_txids = []
|
||
|
||
info("Creating Alice's transactions (consistent behavioral pattern)...")
|
||
# Alice's pattern: always round payments, always bech32, always ~same fee
|
||
for i in range(5):
|
||
dest = get_new_address("carol", "bech32") # Alice always pays to bech32
|
||
amount = 0.01 * (i + 1) # Always round amounts
|
||
txid = send_to_address("alice", dest, amount)
|
||
alice_txids.append(txid)
|
||
info(f" Alice TX {i+1}: {amount:.8f} BTC → bech32")
|
||
|
||
mine_and_confirm()
|
||
|
||
info("Creating Bob's transactions (different behavioral pattern)...")
|
||
# Bob's pattern: odd amounts, mixes address types
|
||
for i in range(5):
|
||
addr_type = "bech32m" if i % 2 == 0 else "bech32" # Bob mixes types
|
||
dest = get_new_address("carol", addr_type)
|
||
amount = 0.00723 * (i + 1) + 0.00011 # Odd amounts
|
||
amount = round(amount, 8)
|
||
txid = send_to_address("bob", dest, amount)
|
||
bob_txids.append(txid)
|
||
info(f" Bob TX {i+1}: {amount:.8f} BTC → {addr_type}")
|
||
|
||
mine_and_confirm()
|
||
|
||
# DETECT: Extract behavioral features and distinguish users
|
||
def extract_features(txids, label):
|
||
features = {
|
||
"label": label,
|
||
"output_counts": [],
|
||
"has_round_payment": [],
|
||
"output_types": [],
|
||
"feerate_estimates": [],
|
||
"rbf_signals": [],
|
||
}
|
||
for txid in txids:
|
||
tx = get_tx(txid)
|
||
if not tx:
|
||
continue
|
||
|
||
# Output count
|
||
features["output_counts"].append(len(tx["vout"]))
|
||
|
||
# Round payment detection
|
||
for vout in tx["vout"]:
|
||
sats = int(round(vout["value"] * 1e8))
|
||
is_round = sats % 100000 == 0 or sats % 1000000 == 0
|
||
features["has_round_payment"].append(is_round)
|
||
|
||
# Output script types
|
||
for vout in tx["vout"]:
|
||
features["output_types"].append(vout["scriptPubKey"]["type"])
|
||
|
||
# RBF signaling (sequence < 0xfffffffe)
|
||
for vin in tx["vin"]:
|
||
seq = vin.get("sequence", 0xffffffff)
|
||
features["rbf_signals"].append(seq < 0xfffffffe)
|
||
|
||
# Fee estimation (size * feerate)
|
||
if "vsize" in tx and "fee" in tx:
|
||
feerate = abs(tx.get("fee", 0)) / tx["vsize"] * 1e8 # sat/vB
|
||
features["feerate_estimates"].append(feerate)
|
||
|
||
return features
|
||
|
||
alice_features = extract_features(alice_txids, "alice")
|
||
bob_features = extract_features(bob_txids, "bob")
|
||
|
||
# Analysis
|
||
alice_round_ratio = sum(alice_features["has_round_payment"]) / max(len(alice_features["has_round_payment"]), 1)
|
||
bob_round_ratio = sum(bob_features["has_round_payment"]) / max(len(bob_features["has_round_payment"]), 1)
|
||
|
||
alice_type_set = set(alice_features["output_types"])
|
||
bob_type_set = set(bob_features["output_types"])
|
||
|
||
info(f"\n {'Feature':<30} {'Alice':<25} {'Bob':<25}")
|
||
info(f" {'─'*80}")
|
||
info(f" {'Round payment ratio':<30} {alice_round_ratio:<25.0%} {bob_round_ratio:<25.0%}")
|
||
info(f" {'Output types used':<30} {str(alice_type_set):<25} {str(bob_type_set):<25}")
|
||
info(f" {'Avg output count':<30} "
|
||
f"{sum(alice_features['output_counts'])/max(len(alice_features['output_counts']),1):<25.1f} "
|
||
f"{sum(bob_features['output_counts'])/max(len(bob_features['output_counts']),1):<25.1f}")
|
||
|
||
alice_rbf = sum(alice_features["rbf_signals"]) / max(len(alice_features["rbf_signals"]), 1)
|
||
bob_rbf = sum(bob_features["rbf_signals"]) / max(len(bob_features["rbf_signals"]), 1)
|
||
info(f" {'RBF signal ratio':<30} {alice_rbf:<25.0%} {bob_rbf:<25.0%}")
|
||
|
||
# Distinguishability test
|
||
features_differ = (
|
||
abs(alice_round_ratio - bob_round_ratio) > 0.3 or
|
||
alice_type_set != bob_type_set or
|
||
abs(alice_rbf - bob_rbf) > 0.3
|
||
)
|
||
|
||
check(features_differ,
|
||
"Behavioral features DIFFER between Alice and Bob — fingerprinting possible")
|
||
|
||
check(alice_round_ratio > bob_round_ratio,
|
||
f"Alice uses more round amounts ({alice_round_ratio:.0%}) than Bob ({bob_round_ratio:.0%})")
|
||
|
||
# Check if Bob mixes script types more
|
||
bob_mixes = len(bob_type_set) >= 2
|
||
check(bob_mixes or alice_type_set != bob_type_set,
|
||
f"Script type diversity differs: Alice={alice_type_set}, Bob={bob_type_set}")
|
||
|
||
info("\nPRIVACY IMPACT: Consistent behavioral patterns allow re-identification")
|
||
info(" of the same entity across transactions even without address reuse")
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# MAIN: Run all tests
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
def main():
|
||
print(f"\n{BOLD}{'═'*78}{RESET}")
|
||
print(f"{BOLD}{CYAN} BITCOIN PRIVACY VULNERABILITY TEST SUITE{RESET}")
|
||
print(f"{BOLD}{CYAN} Custom Signet — {get_block_count()} blocks{RESET}")
|
||
print(f"{BOLD}{'═'*78}{RESET}")
|
||
|
||
# Check which test to run
|
||
test_filter = None
|
||
if len(sys.argv) > 1:
|
||
for arg in sys.argv[1:]:
|
||
if arg == "-k" and sys.argv.index(arg) + 1 < len(sys.argv):
|
||
test_filter = sys.argv[sys.argv.index(arg) + 1]
|
||
elif arg.isdigit():
|
||
test_filter = arg
|
||
|
||
tests = [
|
||
(1, "Address Reuse", test_01_address_reuse),
|
||
(2, "Multi-input / CIOH", test_02_consolidation_cioh),
|
||
(3, "Dust UTXO Detection", test_03_dust_detection),
|
||
(4, "Dust Spending w/ Normal", test_04_dust_spending),
|
||
(5, "Change Detection", test_05_change_detection),
|
||
(6, "Consolidation Origin", test_06_consolidation_origin),
|
||
(7, "Script Type Mixing", test_07_script_type_mixing),
|
||
(8, "Cluster Merge", test_08_cluster_merge),
|
||
(9, "Lookback Depth", test_09_lookback_depth),
|
||
(10, "Exchange Origin", test_10_exchange_origin),
|
||
(11, "Tainted UTXOs", test_11_tainted_utxos),
|
||
(12, "Behavioral Fingerprint", test_12_behavioral_fingerprint),
|
||
]
|
||
|
||
results = {}
|
||
for num, name, func in tests:
|
||
if test_filter and str(num) != test_filter:
|
||
continue
|
||
try:
|
||
result = func()
|
||
results[num] = "PASS" if result else "FAIL"
|
||
except Exception as e:
|
||
results[num] = f"ERROR: {e}"
|
||
print(f" {RED}✗ ERROR:{RESET} {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
# Summary
|
||
print(f"\n{'═'*78}")
|
||
print(f"{BOLD} TEST SUMMARY{RESET}")
|
||
print(f"{'═'*78}")
|
||
for num, name, _ in tests:
|
||
if num in results:
|
||
status = results[num]
|
||
color = GREEN if status == "PASS" else RED
|
||
print(f" {color}{'✓' if status=='PASS' else '✗'}{RESET} Vulnerability {num:2d}: {name:<35} [{status}]")
|
||
|
||
print(f"\n {GREEN}Passed checks: {PASS_COUNT}{RESET}")
|
||
print(f" {RED}Failed checks: {FAIL_COUNT}{RESET}")
|
||
print(f" Total: {PASS_COUNT + FAIL_COUNT}")
|
||
print()
|
||
|
||
return FAIL_COUNT == 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
success = main()
|
||
sys.exit(0 if success else 1)
|