Files
stealth/backend/script/DONTRUN_mytest.py
2026-02-27 13:22:19 -03:00

97 lines
3.0 KiB
Python

"""
bitcoin_rpc.py — Thin wrapper around bitcoin-cli for Python tests.
Uses subprocess calls to bitcoin-cli -regtest.
"""
import json
import subprocess
import time
import os
import requests
CLI = "bitcoin-cli"
SIGNET_ARGS = [CLI]
def cli_testnet(*args, wallet=None):
"""Call Bitcoin RPC endpoint via HTTP and return parsed JSON or string."""
url = "https://bitcoin-testnet-rpc.publicnode.com"
args=args[0].split(' ')
print(f"args: {args}")
def parse_param(p):
try:
return json.loads(p)
except (json.JSONDecodeError, TypeError):
return p
payload = {
"jsonrpc": "1.0",
"id": "cli",
"method": args[0] if args else "getblockcount",
"params": [parse_param(p) for p in args[1:]] if len(args) > 1 else []
}
print(f"RPC request: {payload}")
try:
response = requests.post(url, json=payload, timeout=60)
response.raise_for_status()
result = response.json()
if "error" in result and result["error"]:
raise RuntimeError(f"bitcoin-cli error: {result['error']}")
return result.get("result")
except requests.RequestException as e:
raise RuntimeError(f"RPC request failed: {str(e)}")
def cli_mainnet(*args, wallet=None):
"""Call bitcoin-cli -regtest [wallet] <args> and return parsed JSON or string."""
cmd = list(SIGNET_ARGS)
if wallet:
cmd.append(f"-rpcwallet={wallet}")
cmd.extend(str(a) for a in args)
cmd = ' '.join(cmd)
ssh_pass = os.environ.get("SSHPASS") or os.environ.get("SSH_PASS")
if not ssh_pass:
raise RuntimeError("Environment variable SSHPASS or SSH_PASS must be set")
thecli = f"sshpass -p {ssh_pass} ssh root@95.111.247.57 '{cmd}'"
result = subprocess.run(thecli, shell=True, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise RuntimeError(f"bitcoin-cli error: {result.stderr.strip()}\n cmd: {' '.join(cmd)}")
output = result.stdout.strip()
if not output:
return None
try:
return json.loads(output)
except json.JSONDecodeError:
return output
def cli_regtest(*args, wallet=None):
"""Call bitcoin-cli -regtest [wallet] <args> and return parsed JSON or string."""
cmd = [CLI, "-regtest"]
if wallet:
cmd.append(f"-rpcwallet={wallet}")
cmd.extend(str(a) for a in args)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise RuntimeError(f"bitcoin-cli error: {result.stderr.strip()}\n cmd: {' '.join(cmd)}")
output = result.stdout.strip()
if not output:
return None
try:
return json.loads(output)
except json.JSONDecodeError:
return output
block_count = cli_testnet("getblockcount")
print("testnet:", block_count)
blockhash = cli_testnet(f"getblockhash {block_count}")
block = cli_testnet(f"getblock {blockhash}")
print("testnet:", block)
# print("mainnet:", cli_mainnet("getblockcount"))
# print("regtest:", cli_regtest("getblockcount"))