From 6a01b9e450ecccdbf0e2283e975951c3d578da50 Mon Sep 17 00:00:00 2001 From: Renato Britto Date: Fri, 27 Feb 2026 00:24:31 -0300 Subject: [PATCH] feat: add miner and setup.sh --- backend/script/miner | 604 ++++++++++++++++++++++++++++++++++++++++ backend/script/setup.sh | 185 ++++++++++++ 2 files changed, 789 insertions(+) create mode 100755 backend/script/miner create mode 100755 backend/script/setup.sh diff --git a/backend/script/miner b/backend/script/miner new file mode 100755 index 0000000..f46d88b --- /dev/null +++ b/backend/script/miner @@ -0,0 +1,604 @@ +#!/usr/bin/env python3 +# Copyright (c) 2020-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import argparse +import json +import logging +import math +import os +import re +import shlex +import sys +import time +import subprocess + +PATH_BASE_CONTRIB_SIGNET = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) +PATH_BASE_TEST_FUNCTIONAL = os.path.abspath(os.path.join(PATH_BASE_CONTRIB_SIGNET, "..", "..", "test", "functional")) +sys.path.insert(0, PATH_BASE_TEST_FUNCTIONAL) + +from test_framework.blocktools import get_witness_script, script_BIP34_coinbase_height, SIGNET_HEADER # noqa: E402 +from test_framework.messages import CBlock, CBlockHeader, COutPoint, CTransaction, CTxIn, CTxInWitness, CTxOut, from_binary, from_hex, ser_string, ser_uint256, tx_from_hex, MAX_SEQUENCE_NONFINAL # noqa: E402 +from test_framework.psbt import PSBT, PSBTMap, PSBT_GLOBAL_UNSIGNED_TX, PSBT_IN_FINAL_SCRIPTSIG, PSBT_IN_FINAL_SCRIPTWITNESS, PSBT_IN_NON_WITNESS_UTXO, PSBT_IN_SIGHASH_TYPE # noqa: E402 +from test_framework.script import CScript, CScriptOp # noqa: E402 + +logging.basicConfig( + format='%(asctime)s %(levelname)s %(message)s', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S') + +PSBT_SIGNET_BLOCK = b"\xfc\x06signetb" # proprietary PSBT global field holding the block being signed +RE_MULTIMINER = re.compile(r"^(\d+)(-(\d+))?/(\d+)$") + +def signet_txs(block, challenge): + # assumes signet solution has not been added yet so does not need + # to be removed + + txs = block.vtx[:] + txs[0] = CTransaction(txs[0]) + txs[0].vout[-1].scriptPubKey += CScriptOp.encode_op_pushdata(SIGNET_HEADER) + hashes = [] + for tx in txs: + hashes.append(ser_uint256(tx.txid_int)) + mroot = block.get_merkle_root(hashes) + + sd = b"" + sd += block.nVersion.to_bytes(4, "little", signed=True) + sd += ser_uint256(block.hashPrevBlock) + sd += ser_uint256(mroot) + sd += block.nTime.to_bytes(4, "little") + + to_spend = CTransaction() + to_spend.version = 0 + to_spend.nLockTime = 0 + to_spend.vin = [CTxIn(COutPoint(0, 0xFFFFFFFF), b"\x00" + CScriptOp.encode_op_pushdata(sd), 0)] + to_spend.vout = [CTxOut(0, challenge)] + + spend = CTransaction() + spend.version = 0 + spend.nLockTime = 0 + spend.vin = [CTxIn(COutPoint(to_spend.txid_int, 0), b"", 0)] + spend.vout = [CTxOut(0, b"\x6a")] + + return spend, to_spend + +def decode_challenge_psbt(b64psbt): + psbt = PSBT.from_base64(b64psbt) + + assert len(psbt.tx.vin) == 1 + assert len(psbt.tx.vout) == 1 + assert PSBT_SIGNET_BLOCK in psbt.g.map + return psbt + +def get_block_from_psbt(psbt): + return from_binary(CBlock, psbt.g.map[PSBT_SIGNET_BLOCK]) + +def get_solution_from_psbt(psbt, emptyok=False): + scriptSig = psbt.i[0].map.get(PSBT_IN_FINAL_SCRIPTSIG, b"") + scriptWitness = psbt.i[0].map.get(PSBT_IN_FINAL_SCRIPTWITNESS, b"\x00") + if emptyok and len(scriptSig) == 0 and scriptWitness == b"\x00": + return None + return ser_string(scriptSig) + scriptWitness + +def finish_block(block, signet_solution, grind_cmd): + if signet_solution is None: + pass # Don't need to add a signet commitment if there's no signet signature needed + else: + block.vtx[0].vout[-1].scriptPubKey += CScriptOp.encode_op_pushdata(SIGNET_HEADER + signet_solution) + block.hashMerkleRoot = block.calc_merkle_root() + if grind_cmd is None: + block.solve() + else: + headhex = CBlockHeader.serialize(block).hex() + cmd = shlex.split(grind_cmd) + [headhex] + newheadhex = subprocess.run(cmd, stdout=subprocess.PIPE, input=b"", check=True).stdout.strip() + newhead = from_hex(CBlockHeader(), newheadhex.decode('utf8')) + block.nNonce = newhead.nNonce + return block + +def new_block(tmpl, reward_spk, *, blocktime=None, poolid=None): + scriptSig = script_BIP34_coinbase_height(tmpl["height"]) + if poolid is not None: + scriptSig = CScript(b"" + scriptSig + CScriptOp.encode_op_pushdata(poolid)) + + cbtx = CTransaction() + cbtx.nLockTime = tmpl["height"] - 1 + cbtx.vin = [CTxIn(COutPoint(0, 0xffffffff), scriptSig, MAX_SEQUENCE_NONFINAL)] + cbtx.vout = [CTxOut(tmpl["coinbasevalue"], reward_spk)] + cbtx.vin[0].nSequence = 2**32-2 + + block = CBlock() + block.nVersion = tmpl["version"] + block.hashPrevBlock = int(tmpl["previousblockhash"], 16) + block.nTime = tmpl["curtime"] if blocktime is None else blocktime + if block.nTime < tmpl["mintime"]: + block.nTime = tmpl["mintime"] + block.nBits = int(tmpl["bits"], 16) + block.nNonce = 0 + block.vtx = [cbtx] + [tx_from_hex(t["data"]) for t in tmpl["transactions"]] + + witnonce = 0 + witroot = block.calc_witness_merkle_root() + cbwit = CTxInWitness() + cbwit.scriptWitness.stack = [ser_uint256(witnonce)] + block.vtx[0].wit.vtxinwit = [cbwit] + block.vtx[0].vout.append(CTxOut(0, bytes(get_witness_script(witroot, witnonce)))) + + block.hashMerkleRoot = block.calc_merkle_root() + + return block + +def generate_psbt(block, signet_spk): + signet_spk_bin = bytes.fromhex(signet_spk) + signme, spendme = signet_txs(block, signet_spk_bin) + psbt = PSBT() + psbt.g = PSBTMap( {PSBT_GLOBAL_UNSIGNED_TX: signme.serialize(), + PSBT_SIGNET_BLOCK: block.serialize() + } ) + psbt.i = [ PSBTMap( {PSBT_IN_NON_WITNESS_UTXO: spendme.serialize(), + PSBT_IN_SIGHASH_TYPE: bytes([1,0,0,0])}) + ] + psbt.o = [ PSBTMap() ] + return psbt.to_base64() + +def get_poolid(args): + if args.poolid is not None: + return args.poolid.encode('utf8') + elif args.poolnum is not None: + return b"/signet:%d/" % (args.poolnum) + else: + return None + +def get_reward_addr_spk(args, height): + assert args.address is not None or args.descriptor is not None + + if hasattr(args, "reward_spk"): + return args.address, args.reward_spk + + if args.address is not None: + reward_addr = args.address + elif '*' not in args.descriptor: + reward_addr = args.address = json.loads(args.bcli("deriveaddresses", args.descriptor))[0] + else: + remove = [k for k in args.derived_addresses.keys() if k+20 <= height] + for k in remove: + del args.derived_addresses[k] + if height not in args.derived_addresses: + addrs = json.loads(args.bcli("deriveaddresses", args.descriptor, "[%d,%d]" % (height, height+20))) + for k, a in enumerate(addrs): + args.derived_addresses[height+k] = a + reward_addr = args.derived_addresses[height] + + reward_spk = bytes.fromhex(json.loads(args.bcli("getaddressinfo", reward_addr))["scriptPubKey"]) + if args.address is not None: + # will always be the same, so cache + args.reward_spk = reward_spk + + return reward_addr, reward_spk + +def do_genpsbt(args): + poolid = get_poolid(args) + tmpl = json.load(sys.stdin) + signet_spk = tmpl["signet_challenge"] + _, reward_spk = get_reward_addr_spk(args, tmpl["height"]) + block = new_block(tmpl, reward_spk, poolid=poolid) + psbt = generate_psbt(block, signet_spk) + print(psbt) + +def do_solvepsbt(args): + psbt = decode_challenge_psbt(sys.stdin.read()) + block = get_block_from_psbt(psbt) + signet_solution = get_solution_from_psbt(psbt, emptyok=True) + block = finish_block(block, signet_solution, args.grind_cmd) + print(block.serialize().hex()) + +def nbits_to_target(nbits): + shift = (nbits >> 24) & 0xff + return (nbits & 0x00ffffff) * 2**(8*(shift - 3)) + +def target_to_nbits(target): + tstr = "{0:x}".format(target) + if len(tstr) < 6: + tstr = ("000000"+tstr)[-6:] + if len(tstr) % 2 != 0: + tstr = "0" + tstr + if int(tstr[0],16) >= 0x8: + # avoid "negative" + tstr = "00" + tstr + fix = int(tstr[:6], 16) + sz = len(tstr)//2 + if tstr[6:] != "0"*(sz*2-6): + fix += 1 + + return int("%02x%06x" % (sz,fix), 16) + +def seconds_to_hms(s): + if s == 0: + return "0s" + neg = (s < 0) + if neg: + s = -s + out = "" + if s % 60 > 0: + out = "%ds" % (s % 60) + s //= 60 + if s % 60 > 0: + out = "%dm%s" % (s % 60, out) + s //= 60 + if s > 0: + out = "%dh%s" % (s, out) + if neg: + out = "-" + out + return out + +def trivial_challenge(spkhex): + """ + BIP325 allows omitting the signet commitment when scriptSig and + scriptWitness are both empty. This is the case for trivial + challenges such as OP_TRUE or a single data push. + """ + spk = bytes.fromhex(spkhex) + if len(spk) == 1 and 0x51 <= spk[0] <= 0x60: + # OP_TRUE/OP_1...OP_16 + return True + elif 2 <= len(spk) <= 76 and spk[0] + 1 == len(spk): + # Single fixed push of 1-75 bytes + return True + return False + +class Generate: + INTERVAL = 600.0*2016/2015 # 10 minutes, adjusted for the off-by-one bug + + + def __init__(self, multiminer=None, ultimate_target=None, poisson=False, max_interval=1800, + standby_delay=0, backup_delay=0, set_block_time=None, + poolid=None): + if multiminer is None: + multiminer = (0, 1, 1) + (self.multi_low, self.multi_high, self.multi_period) = multiminer + self.ultimate_target = ultimate_target + self.poisson = poisson + self.max_interval = max_interval + self.standby_delay = standby_delay + self.backup_delay = backup_delay + self.set_block_time = set_block_time + self.poolid = poolid + + def next_block_delta(self, last_nbits, last_hash): + # strategy: + # 1) work out how far off our desired target we are + # 2) cap it to a factor of 4 since that's the best we can do in a single retarget period + # 3) use that to work out the desired average interval in this retarget period + # 4) if doing poisson, use the last hash to pick a uniformly random number in [0,1), and work out a random multiplier to vary the average by + # 5) cap the resulting interval between 1 second and 1 hour to avoid extremes + + current_target = nbits_to_target(last_nbits) + retarget_factor = self.ultimate_target / current_target + retarget_factor = max(0.25, min(retarget_factor, 4.0)) + + avg_interval = self.INTERVAL * retarget_factor + + if self.poisson: + det_rand = int(last_hash[-8:], 16) * 2**-32 + this_interval_variance = -math.log1p(-det_rand) + else: + this_interval_variance = 1 + + this_interval = avg_interval * this_interval_variance + this_interval = max(1, min(this_interval, self.max_interval)) + + return this_interval + + def next_block_is_mine(self, last_hash): + det_rand = int(last_hash[-16:-8], 16) + return self.multi_low <= (det_rand % self.multi_period) < self.multi_high + + def next_block_time(self, now, bestheader, is_first_block): + if self.set_block_time is not None: + logging.debug("Setting start time to %d", self.set_block_time) + self.mine_time = self.set_block_time + self.action_time = now + self.is_mine = True + elif bestheader["height"] == 0: + time_delta = self.INTERVAL * 100 # plenty of time to mine 100 blocks + logging.info("Backdating time for first block to %d minutes ago" % (time_delta/60)) + self.mine_time = now - time_delta + self.action_time = now + self.is_mine = True + else: + time_delta = self.next_block_delta(int(bestheader["bits"], 16), bestheader["hash"]) + self.mine_time = bestheader["time"] + time_delta + + self.is_mine = self.next_block_is_mine(bestheader["hash"]) + + self.action_time = self.mine_time + if not self.is_mine: + self.action_time += self.backup_delay + + if self.standby_delay > 0: + self.action_time += self.standby_delay + elif is_first_block: + # for non-standby, always mine immediately on startup, + # even if the next block shouldn't be ours + self.action_time = now + + # don't want fractional times so round down + self.mine_time = int(self.mine_time) + self.action_time = int(self.action_time) + + # can't mine a block 2h in the future; 1h55m for some safety + self.action_time = max(self.action_time, self.mine_time - 6900) + + def gbt(self, bcli, bestblockhash, now): + tmpl = json.loads(bcli("getblocktemplate", '{"rules":["signet","segwit"]}')) + if tmpl["previousblockhash"] != bestblockhash: + logging.warning("GBT based off unexpected block (%s not %s), retrying", tmpl["previousblockhash"], bci["bestblockhash"]) + time.sleep(1) + return None + + if tmpl["mintime"] > self.mine_time: + logging.info("Updating block time from %d to %d", self.mine_time, tmpl["mintime"]) + self.mine_time = tmpl["mintime"] + if self.mine_time > now: + logging.error("GBT mintime is in the future: %d is %d seconds later than %d", self.mine_time, (self.mine_time-now), now) + return None + + return tmpl + + def mine(self, bcli, grind_cmd, tmpl, reward_spk): + block = new_block(tmpl, reward_spk, blocktime=self.mine_time, poolid=self.poolid) + + signet_spk = tmpl["signet_challenge"] + if trivial_challenge(signet_spk): + signet_solution = None + else: + psbt = generate_psbt(block, signet_spk) + input_stream = os.linesep.join([psbt, "true", "ALL"]).encode('utf8') + psbt_signed = json.loads(bcli("-stdin", "walletprocesspsbt", input=input_stream)) + if not psbt_signed.get("complete",False): + logging.debug("Generated PSBT: %s" % (psbt,)) + sys.stderr.write("PSBT signing failed\n") + return None + psbt = decode_challenge_psbt(psbt_signed["psbt"]) + signet_solution = get_solution_from_psbt(psbt) + + return finish_block(block, signet_solution, grind_cmd) + +def do_generate(args): + if args.set_block_time is not None: + max_blocks = 1 + elif args.max_blocks is not None: + if args.max_blocks < 1: + logging.error("--max_blocks must specify a positive integer") + return 1 + max_blocks = args.max_blocks + elif args.ongoing: + max_blocks = None + else: + max_blocks = 1 + + if args.set_block_time is not None and args.set_block_time < 0: + args.set_block_time = time.time() + logging.info("Treating negative block time as current time (%d)" % (args.set_block_time)) + + if args.min_nbits: + args.nbits = "1e0377ae" + logging.info("Using nbits=%s" % (args.nbits)) + + if args.set_block_time is None: + if args.nbits is None or len(args.nbits) != 8: + logging.error("Must specify --nbits (use calibrate command to determine value)") + return 1 + + if args.multiminer is None: + my_blocks = (0,1,1) + else: + if not args.ongoing: + logging.error("Cannot specify --multiminer without --ongoing") + return 1 + m = RE_MULTIMINER.match(args.multiminer) + if m is None: + logging.error("--multiminer argument must be k/m or j-k/m") + return 1 + start,_,stop,total = m.groups() + if stop is None: + stop = start + start, stop, total = map(int, (start, stop, total)) + if stop < start or start <= 0 or total < stop or total == 0: + logging.error("Inconsistent values for --multiminer") + return 1 + my_blocks = (start-1, stop, total) + + if args.max_interval < 960: + logging.error("--max-interval must be at least 960 (16 minutes)") + return 1 + + poolid = get_poolid(args) + + ultimate_target = nbits_to_target(int(args.nbits,16)) + + gen = Generate(multiminer=my_blocks, ultimate_target=ultimate_target, poisson=args.poisson, max_interval=args.max_interval, + standby_delay=args.standby_delay, backup_delay=args.backup_delay, set_block_time=args.set_block_time, poolid=poolid) + + mined_blocks = 0 + bestheader = {"hash": None} + lastheader = None + while max_blocks is None or mined_blocks < max_blocks: + + # current status? + bci = json.loads(args.bcli("getblockchaininfo")) + + if bestheader["hash"] != bci["bestblockhash"]: + bestheader = json.loads(args.bcli("getblockheader", bci["bestblockhash"])) + + if lastheader is None: + lastheader = bestheader["hash"] + elif bestheader["hash"] != lastheader: + next_delta = gen.next_block_delta(int(bestheader["bits"], 16), bestheader["hash"]) + next_delta += bestheader["time"] - time.time() + next_is_mine = gen.next_block_is_mine(bestheader["hash"]) + logging.info("Received new block at height %d; next in %s (%s)", bestheader["height"], seconds_to_hms(next_delta), ("mine" if next_is_mine else "backup")) + lastheader = bestheader["hash"] + + # when is the next block due to be mined? + now = time.time() + gen.next_block_time(now, bestheader, (mined_blocks == 0)) + + # ready to go? otherwise sleep and check for new block + if now < gen.action_time: + sleep_for = min(gen.action_time - now, 60) + if gen.mine_time < now: + # someone else might have mined the block, + # so check frequently, so we don't end up late + # mining the next block if it's ours + sleep_for = min(20, sleep_for) + minestr = "mine" if gen.is_mine else "backup" + logging.debug("Sleeping for %s, next block due in %s (%s)" % (seconds_to_hms(sleep_for), seconds_to_hms(gen.mine_time - now), minestr)) + time.sleep(sleep_for) + continue + + # gbt + tmpl = gen.gbt(args.bcli, bci["bestblockhash"], now) + if tmpl is None: + continue + + logging.debug("GBT template: %s", tmpl) + + # address for reward + reward_addr, reward_spk = get_reward_addr_spk(args, tmpl["height"]) + + # mine block + logging.debug("Mining block delta=%s start=%s mine=%s", seconds_to_hms(gen.mine_time-bestheader["time"]), gen.mine_time, gen.is_mine) + mined_blocks += 1 + block = gen.mine(args.bcli, args.grind_cmd, tmpl, reward_spk) + if block is None: + return 1 + + # submit block + r = args.bcli("-stdin", "submitblock", input=block.serialize().hex().encode('utf8')) + + # report + bstr = "block" if gen.is_mine else "backup block" + + next_delta = gen.next_block_delta(block.nBits, block.hash_hex) + next_delta += block.nTime - time.time() + next_is_mine = gen.next_block_is_mine(block.hash_hex) + + logging.debug("Block hash %s payout to %s", block.hash_hex, reward_addr) + logging.info("Mined %s at height %d; next in %s (%s)", bstr, tmpl["height"], seconds_to_hms(next_delta), ("mine" if next_is_mine else "backup")) + if r != "": + logging.warning("submitblock returned %s for height %d hash %s", r, tmpl["height"], block.hash_hex) + lastheader = block.hash_hex + +def do_calibrate(args): + if args.nbits is not None and args.seconds is not None: + sys.stderr.write("Can only specify one of --nbits or --seconds\n") + return 1 + if args.nbits is not None and len(args.nbits) != 8: + sys.stderr.write("Must specify 8 hex digits for --nbits\n") + return 1 + + TRIALS = 600 # gets variance down pretty low + TRIAL_BITS = 0x1e3ea75f # takes about 5m to do 600 trials + + header = CBlockHeader() + header.nBits = TRIAL_BITS + targ = nbits_to_target(header.nBits) + + start = time.time() + count = 0 + for i in range(TRIALS): + header.nTime = i + header.nNonce = 0 + headhex = header.serialize().hex() + cmd = shlex.split(args.grind_cmd) + [headhex] + newheadhex = subprocess.run(cmd, stdout=subprocess.PIPE, input=b"", check=True).stdout.strip() + + avg = (time.time() - start) * 1.0 / TRIALS + + if args.nbits is not None: + want_targ = nbits_to_target(int(args.nbits,16)) + want_time = avg*targ/want_targ + else: + want_time = args.seconds if args.seconds is not None else 25 + want_targ = int(targ*(avg/want_time)) + + print("nbits=%08x for %ds average mining time" % (target_to_nbits(want_targ), want_time)) + return 0 + +def bitcoin_cli(basecmd, args, **kwargs): + cmd = basecmd + ["-signet"] + args + logging.debug("Calling bitcoin-cli: %r", cmd) + out = subprocess.run(cmd, stdout=subprocess.PIPE, **kwargs, check=True).stdout + if isinstance(out, bytes): + out = out.decode('utf8') + return out.strip() + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--cli", default="bitcoin-cli", type=str, help="bitcoin-cli command") + parser.add_argument("--debug", action="store_true", help="Print debugging info") + parser.add_argument("--quiet", action="store_true", help="Only print warnings/errors") + + cmds = parser.add_subparsers(help="sub-commands") + genpsbt = cmds.add_parser("genpsbt", help="Generate a block PSBT for signing") + genpsbt.set_defaults(fn=do_genpsbt) + + solvepsbt = cmds.add_parser("solvepsbt", help="Solve a signed block PSBT") + solvepsbt.set_defaults(fn=do_solvepsbt) + + generate = cmds.add_parser("generate", help="Mine blocks") + generate.set_defaults(fn=do_generate) + howmany = generate.add_mutually_exclusive_group() + howmany.add_argument("--ongoing", action="store_true", help="Keep mining blocks") + howmany.add_argument("--max-blocks", default=None, type=int, help="Max blocks to mine (default=1)") + howmany.add_argument("--set-block-time", default=None, type=int, help="Set block time (unix timestamp); implies --max-blocks=1") + nbit_target = generate.add_mutually_exclusive_group() + nbit_target.add_argument("--nbits", default=None, type=str, help="Target nBits (specify difficulty)") + nbit_target.add_argument("--min-nbits", action="store_true", help="Target minimum nBits (use min difficulty)") + generate.add_argument("--poisson", action="store_true", help="Simulate randomised block times") + generate.add_argument("--multiminer", default=None, type=str, help="Specify which set of blocks to mine (eg: 1-40/100 for the first 40%%, 2/3 for the second 3rd)") + generate.add_argument("--backup-delay", default=300, type=int, help="Seconds to delay before mining blocks reserved for other miners (default=300)") + generate.add_argument("--standby-delay", default=0, type=int, help="Seconds to delay before mining blocks (default=0)") + generate.add_argument("--max-interval", default=1800, type=int, help="Maximum interblock interval (seconds)") + + calibrate = cmds.add_parser("calibrate", help="Calibrate difficulty") + calibrate.set_defaults(fn=do_calibrate) + calibrate_by = calibrate.add_mutually_exclusive_group() + calibrate_by.add_argument("--nbits", type=str, default=None) + calibrate_by.add_argument("--seconds", type=int, default=None) + + for sp in [genpsbt, generate]: + payto = sp.add_mutually_exclusive_group(required=True) + payto.add_argument("--address", default=None, type=str, help="Address for block reward payment") + payto.add_argument("--descriptor", default=None, type=str, help="Descriptor for block reward payment") + pool = sp.add_mutually_exclusive_group() + pool.add_argument("--poolnum", default=None, type=int, help="Identify blocks that you mine") + pool.add_argument("--poolid", default=None, type=str, help="Identify blocks that you mine (eg: /signet:1/)") + + for sp in [solvepsbt, generate, calibrate]: + sp.add_argument("--grind-cmd", default=None, type=str, required=(sp==calibrate), help="Command to grind a block header for proof-of-work") + + args = parser.parse_args(sys.argv[1:]) + + args.bcli = lambda *a, input=b"", **kwargs: bitcoin_cli(shlex.split(args.cli), list(a), input=input, **kwargs) + + if hasattr(args, "address") and hasattr(args, "descriptor"): + args.derived_addresses = {} + + if args.debug: + logging.getLogger().setLevel(logging.DEBUG) + elif args.quiet: + logging.getLogger().setLevel(logging.WARNING) + else: + logging.getLogger().setLevel(logging.INFO) + + if hasattr(args, "fn"): + return args.fn(args) + else: + logging.error("Must specify command") + return 1 + +if __name__ == "__main__": + main() diff --git a/backend/script/setup.sh b/backend/script/setup.sh new file mode 100755 index 0000000..25be195 --- /dev/null +++ b/backend/script/setup.sh @@ -0,0 +1,185 @@ +#!/usr/bin/env bash +# ============================================================================= +# setup.sh — Bootstrap Bitcoin Core regtest for privacy vulnerability testing +# ============================================================================= +# Reproduces the full environment: +# • Writes ~/.bitcoin/bitcoin.conf (regtest, txindex, dustrelayfee, etc.) +# • Stops any running bitcoind (both regtest and signet) +# • Optionally wipes the regtest data dir (pass --fresh to start from block 0) +# • Starts: bitcoind -daemon -regtest +# • Creates wallets: miner alice bob carol exchange risky +# • Mines 110 blocks so coinbases mature and miner has spendable BTC +# +# Usage: +# ./setup.sh # keep existing chain state, reload wallets +# ./setup.sh --fresh # wipe regtest, start from genesis +# ============================================================================= +set -euo pipefail + +# ─── Colours ────────────────────────────────────────────────────────────────── +G="\033[92m"; Y="\033[93m"; R="\033[91m"; B="\033[1m"; C="\033[96m"; RST="\033[0m" +ok() { echo -e " ${G}✓${RST} $*"; } +info() { echo -e " ${Y}ℹ${RST} $*"; } +err() { echo -e " ${R}✗${RST} $*"; exit 1; } + +# ─── Config ─────────────────────────────────────────────────────────────────── +BITCOIN_CONF="${HOME}/.bitcoin/bitcoin.conf" +REGTEST_DIR="${HOME}/.bitcoin/regtest" +WALLETS=(miner alice bob carol exchange risky) +INITIAL_BLOCKS=110 # must be >100 so coinbases mature +MINER_FUND_BTC=500 # approximate, depends on block subsidy + +# ─── Parse args ─────────────────────────────────────────────────────────────── +FRESH=0 +for arg in "$@"; do + [[ "$arg" == "--fresh" ]] && FRESH=1 +done + +echo "" +echo -e "${B}${C}══════════════════════════════════════════════════════════${RST}" +echo -e "${B}${C} Bitcoin Regtest Setup — privacy vulnerability harness${RST}" +echo -e "${B}${C}══════════════════════════════════════════════════════════${RST}" +[[ $FRESH -eq 1 ]] && echo -e " ${Y}Mode: FRESH — regtest chain will be wiped${RST}" + +# ─── 1. Stop running daemons ────────────────────────────────────────────────── +echo "" +echo -e "${B}Step 1: Stop any running bitcoind${RST}" + +# Try to stop regtest instance (port 18443) +if bitcoin-cli -regtest stop 2>/dev/null; then + ok "Stopped regtest bitcoind" + sleep 2 +else + info "No regtest bitcoind running (or already stopped)" +fi + +# Try to stop signet instance (port 38332) if one is running +if bitcoin-cli -signet stop 2>/dev/null; then + ok "Stopped signet bitcoind" + sleep 2 +else + info "No signet bitcoind running" +fi + +# Hard-kill any remaining bitcoind processes +if pgrep -x bitcoind > /dev/null 2>&1; then + info "Hard-killing remaining bitcoind processes …" + pkill -x bitcoind || true + sleep 2 +fi + +# ─── 2. Write bitcoin.conf ──────────────────────────────────────────────────── +echo "" +echo -e "${B}Step 2: Write ${BITCOIN_CONF}${RST}" +mkdir -p "$(dirname "$BITCOIN_CONF")" + +cat > "$BITCOIN_CONF" << 'EOF' +# Bitcoin Core configuration +# Network: regtest (local testing only) +regtest=1 +txindex=1 + +[regtest] +# Fee policy — needed so wallets can broadcast without estimatefee data +fallbackfee=0.00010 + +# Allow outputs as small as 1 sat (needed for dust-attack reproduction) +dustrelayfee=0.00000001 + +# Accept non-standard transactions (needed for some test scenarios) +acceptnonstdtxn=1 + +# Enable RPC server +server=1 +EOF + +ok "Wrote bitcoin.conf" + +# ─── 3. Optionally wipe regtest chain ───────────────────────────────────────── +if [[ $FRESH -eq 1 ]]; then + echo "" + echo -e "${B}Step 3: Wipe regtest data dir${RST}" + rm -rf "$REGTEST_DIR" + ok "Wiped ${REGTEST_DIR}" +else + echo "" + info "Step 3: Keeping existing regtest chain (use --fresh to wipe)" +fi + +# ─── 4. Start bitcoind ──────────────────────────────────────────────────────── +echo "" +echo -e "${B}Step 4: Start bitcoind -daemon -regtest${RST}" +bitcoind -daemon -regtest +ok "bitcoind launched" + +# Wait for RPC to become ready +echo -n " … waiting for RPC" +for i in $(seq 1 30); do + sleep 1 + echo -n "." + if bitcoin-cli -regtest getblockchaininfo > /dev/null 2>&1; then + echo "" + ok "RPC ready after ${i}s" + break + fi + if [[ $i -eq 30 ]]; then + echo "" + err "bitcoind did not respond within 30s — check logs at ${REGTEST_DIR}/debug.log" + fi +done + +BLOCKS=$(bitcoin-cli -regtest getblockcount) +info "Chain height: ${BLOCKS} blocks" + +# ─── 5. Create / load wallets ───────────────────────────────────────────────── +echo "" +echo -e "${B}Step 5: Create wallets${RST}" +for w in "${WALLETS[@]}"; do + if bitcoin-cli -regtest createwallet "$w" 2>/dev/null | grep -q '"name"'; then + ok "Created wallet: ${w}" + else + # Wallet DB already exists on disk — just load it + if bitcoin-cli -regtest loadwallet "$w" 2>/dev/null | grep -q '"name"'; then + ok "Loaded existing wallet: ${w}" + else + # Already loaded (returned error -35) + info "Wallet already loaded: ${w}" + fi + fi +done + +# ─── 6. Mine initial blocks (only if fresh or chain has <110 blocks) ────────── +echo "" +echo -e "${B}Step 6: Mine initial blocks${RST}" +BLOCKS=$(bitcoin-cli -regtest getblockcount) + +if [[ $BLOCKS -lt $INITIAL_BLOCKS ]]; then + NEED=$(( INITIAL_BLOCKS - BLOCKS )) + info "At block ${BLOCKS}, need ${NEED} more to reach ${INITIAL_BLOCKS}" + MINER_ADDR=$(bitcoin-cli -regtest -rpcwallet=miner getnewaddress "" bech32) + bitcoin-cli -regtest generatetoaddress "$NEED" "$MINER_ADDR" > /dev/null + BLOCKS=$(bitcoin-cli -regtest getblockcount) + ok "Mined to block ${BLOCKS}" +else + ok "Already at block ${BLOCKS} — no mining needed" +fi + +MINER_BAL=$(bitcoin-cli -regtest -rpcwallet=miner getbalance) +ok "Miner balance: ${MINER_BAL} BTC" + +# ─── 7. Summary ─────────────────────────────────────────────────────────────── +echo "" +echo -e "${B}${C}══════════════════════════════════════════════════════════${RST}" +echo -e "${B} Setup complete!${RST}" +echo -e "${B}${C}══════════════════════════════════════════════════════════${RST}" +echo -e " Chain: ${G}regtest${RST}" +echo -e " Blocks: ${G}$(bitcoin-cli -regtest getblockcount)${RST}" +echo -e " Wallets: ${G}${WALLETS[*]}${RST}" +echo "" +echo -e " Next steps:" +echo -e " python3 reproduce.py # create 12 vulnerability scenarios" +echo -e " python3 detect.py --wallet alice \\" +echo -e " --known-risky-wallets risky \\" +echo -e " --known-exchange-wallets exchange" +echo -e " python3 verify.py --fresh # full automated proof" +echo ""