from datetime import datetime, timedelta from handywrapper import api import os import dotenv import requests import re import domainLookup import json import time import subprocess import atexit import signal import sys import threading import sqlite3 from functools import wraps dotenv.load_dotenv() HSD_API = os.getenv("HSD_API","") HSD_IP = os.getenv("HSD_IP","localhost") HSD_NETWORK = os.getenv("HSD_NETWORK", "main") HSD_WALLET_PORT = 12039 HSD_NODE_PORT = 12037 HSD_NETWORK = HSD_NETWORK.lower() if HSD_NETWORK == "simnet": HSD_WALLET_PORT = 15039 HSD_NODE_PORT = 15037 elif HSD_NETWORK == "testnet": HSD_WALLET_PORT = 13039 HSD_NODE_PORT = 13037 elif HSD_NETWORK == "regtest": HSD_WALLET_PORT = 14039 HSD_NODE_PORT = 14037 HSD_INTERNAL_NODE = os.getenv("INTERNAL_HSD","false").lower() in ["1","true","yes"] if HSD_INTERNAL_NODE: if HSD_API == "": # Use a random API KEY HSD_API = "firewallet-" + str(int(time.time())) HSD_IP = "localhost" SHOW_EXPIRED = os.getenv("SHOW_EXPIRED") if SHOW_EXPIRED is None: SHOW_EXPIRED = False HSD_PROCESS = None SPV_MODE = None # Get hsdconfig.json HSD_CONFIG = { "version": "v8.0.0", "chainMigrate": 4, "walletMigrate": 7, "minNodeVersion": 20, "minNpmVersion": 8, "spv": False, "flags": [ "--agent=FireWallet" ] } CACHE_TTL = int(os.getenv("CACHE_TTL",90)) if not os.path.exists('hsdconfig.json'): with open('hsdconfig.json', 'w') as f: f.write(json.dumps(HSD_CONFIG, indent=4)) else: with open('hsdconfig.json') as f: hsdConfigTMP = json.load(f) for key in hsdConfigTMP: HSD_CONFIG[key] = hsdConfigTMP[key] hsd = api.hsd(HSD_API, HSD_IP, HSD_NODE_PORT) hsw = api.hsw(HSD_API, HSD_IP, HSD_WALLET_PORT) cacheTime = 3600 # Verify the connection response = hsd.getInfo() EXCLUDE = os.getenv("EXCLUDE","primary").split(",") def hsdConnected(): if hsdVersion() == -1: return False return True def hsdVersion(format=True): info = hsd.getInfo() if 'error' in info: return -1 # Check if SPV mode is enabled global SPV_MODE if info.get('chain',{}).get('options',{}).get('spv',False): SPV_MODE = True else: SPV_MODE = False if format: return float('.'.join(info['version'].split(".")[:2])) else: return info['version'] def check_account(cookie: str | None): if cookie is None: return False # Check the account if cookie.count(":") < 1: return False account = cookie.split(":")[0] # Check if the account is valid info = hsw.getAccountInfo(account, 'default') if 'error' in info: return False return account def check_password(cookie: str|None, password: str|None): if cookie is None: return False if password is None: password = "" account = check_account(cookie) if account == False: return False # Check if the password is valid info = hsw.rpc_selectWallet(account) if info['error'] is not None: return False info = hsw.rpc_walletPassphrase(password, 1) if info['error'] is not None: if info['error']['message'] != "Wallet is not encrypted.": return False return True def createWallet(account: str, password: str): if not hsdConnected(): return { "error": { "message": "Node not connected" } } # Create the account # Python wrapper doesn't support this yet response = requests.put(get_wallet_api_url(f"wallet/{account}")) if response.status_code != 200: return { "error": { "message": "Error creating account" } } # Get seed seed = hsw.getMasterHDKey(account) seed = seed['mnemonic']['phrase'] # Encrypt the wallet (python wrapper doesn't support this yet) response = requests.post(get_wallet_api_url(f"/wallet/{account}/passphrase"), json={"passphrase": password}) return { "seed": seed, "account": account, "password": password } def importWallet(account: str, password: str, seed: str): if not hsdConnected(): return { "error": { "message": "Node not connected" } } # Import the wallet data = { "passphrase": password, "mnemonic": seed, } response = requests.put(get_wallet_api_url(f"/wallet/{account}"), json=data) if response.status_code != 200: return { "error": { "message": "Error creating account" } } return { "seed": seed, "account": account, "password": password } def listWallets(): # List the wallets response = hsw.listWallets() # Check if response is json or an array if isinstance(response, list): # Remove excluded wallets response = [wallet for wallet in response if wallet not in EXCLUDE] return response return ['Wallet not connected'] def selectWallet(account: str): # Select wallet response = hsw.rpc_selectWallet(account) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } def init_domain_db(): """Initialize the SQLite database for domain cache.""" os.makedirs('cache', exist_ok=True) db_path = os.path.join('cache', 'domains.db') conn = sqlite3.connect(db_path) cursor = conn.cursor() # Create the domains table if it doesn't exist cursor.execute(''' CREATE TABLE IF NOT EXISTS domains ( name TEXT PRIMARY KEY, info TEXT, last_updated INTEGER ) ''') conn.commit() conn.close() def getCachedDomains(): """Get cached domain information from SQLite database.""" init_domain_db() # Ensure DB exists db_path = os.path.join('cache', 'domains.db') conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row # This allows accessing columns by name cursor = conn.cursor() # Get all domains from the database cursor.execute('SELECT name, info, last_updated FROM domains') rows = cursor.fetchall() # Convert to dictionary format domain_cache = {} for row in rows: try: domain_cache[row['name']] = json.loads(row['info']) domain_cache[row['name']]['last_updated'] = row['last_updated'] except json.JSONDecodeError: print(f"Error parsing cached data for domain {row['name']}") conn.close() return domain_cache ACTIVE_DOMAIN_UPDATES = set() # Track domains being updated DOMAIN_UPDATE_LOCK = threading.Lock() # For thread-safe access to ACTIVE_DOMAIN_UPDATES def update_domain_cache(domain_names: list): """Fetch domain info and update the SQLite cache.""" if not domain_names: return # Filter out domains that are already being updated domains_to_update = [] with DOMAIN_UPDATE_LOCK: for domain in domain_names: if domain not in ACTIVE_DOMAIN_UPDATES: ACTIVE_DOMAIN_UPDATES.add(domain) domains_to_update.append(domain) if not domains_to_update: # All requested domains are already being updated return try: # Initialize database init_domain_db() db_path = os.path.join('cache', 'domains.db') conn = sqlite3.connect(db_path) cursor = conn.cursor() for domain_name in domains_to_update: try: # Get domain info from node domain_info = getDomain(domain_name) if 'error' in domain_info or not domain_info.get('info'): print(f"Failed to get info for domain {domain_name}: {domain_info.get('error', 'Unknown error')}", flush=True) continue # Update or insert into database now = int(time.time()) serialized_info = json.dumps(domain_info) cursor.execute( 'INSERT OR REPLACE INTO domains (name, info, last_updated) VALUES (?, ?, ?)', (domain_name, serialized_info, now) ) print(f"Updated cache for domain {domain_name}") except Exception as e: print(f"Error updating cache for domain {domain_name}: {str(e)}") finally: # Always remove from active set, even if there was an error with DOMAIN_UPDATE_LOCK: if domain_name in ACTIVE_DOMAIN_UPDATES: ACTIVE_DOMAIN_UPDATES.remove(domain_name) # Commit all changes at once conn.commit() conn.close() except Exception as e: print(f"Error updating domain cache: {str(e)}", flush=True) # Make sure to clean up the active set on any exception with DOMAIN_UPDATE_LOCK: for domain in domains_to_update: if domain in ACTIVE_DOMAIN_UPDATES: ACTIVE_DOMAIN_UPDATES.remove(domain) print("Updated cache for domains") def getBalance(account: str): # Get the total balance info = hsw.getBalance('default', account) if 'error' in info: return {'available': 0, 'total': 0} total = info['confirmed'] available = total - info['lockedConfirmed'] locked = info['lockedConfirmed'] / 1000000 # Convert to HNS total = total / 1000000 available = available / 1000000 domains = getDomains(account) domainValue = 0 domains_to_update = [] # Track domains that need cache updates if isSPV(): # Initialize database if needed init_domain_db() # Connect to the database directly for efficient querying db_path = os.path.join('cache', 'domains.db') conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() now = int(time.time()) cache_cutoff = now - (CACHE_TTL * 86400) # Cache TTL in days for domain in domains: domain_name = domain['name'] # Check if domain is in cache and still fresh cursor.execute( 'SELECT info, last_updated FROM domains WHERE name = ?', (domain_name,) ) row = cursor.fetchone() # Only add domain for update if: # 1. Not in cache or stale # 2. Not currently being updated by another thread with DOMAIN_UPDATE_LOCK: if (not row or row['last_updated'] < cache_cutoff) and domain_name not in ACTIVE_DOMAIN_UPDATES: domains_to_update.append(domain_name) continue # Use the cached info try: if row: # Make sure we have data domain_info = json.loads(row['info']) if domain_info.get('info', {}).get('state', "") == "CLOSED": domainValue += domain_info.get('info', {}).get('value', 0) except json.JSONDecodeError: # Only add for update if not already being updated with DOMAIN_UPDATE_LOCK: if domain_name not in ACTIVE_DOMAIN_UPDATES: domains_to_update.append(domain_name) conn.close() else: for domain in domains: if domain['state'] == "CLOSED": domainValue += domain['value'] # Start background thread to update cache for missing domains if domains_to_update: thread = threading.Thread( target=update_domain_cache, args=(domains_to_update,), daemon=True ) thread.start() total = total - (domainValue/1000000) locked = locked - (domainValue/1000000) # Only keep 2 decimal places total = round(total, 2) available = round(available, 2) return {'available': available, 'total': total, 'locked': locked} def getBlockHeight(): # Get the block height info = hsd.getInfo() if 'error' in info: return 0 return info['chain']['height'] def getAddress(account: str): # Get the address info = hsw.getAccountInfo(account, 'default') if 'error' in info: return '' return info['receiveAddress'] def getPendingTX(account: str): pending = 0 page = 1 pageSize = 10 while True: txs = getTransactions(account, page, pageSize) page += 1 pendingPage = 0 for tx in txs: if tx['confirmations'] < 1: pending += 1 pendingPage += 1 if pendingPage < pageSize: break return pending def getDomains(account, own=True): if own: response = requests.get(get_wallet_api_url(f"/wallet/{account}/name?own=true")) else: response = requests.get(get_wallet_api_url(f"/wallet/{account}/name")) info = response.json() if SHOW_EXPIRED: return info # Remove any expired domains domains = [] for domain in info: if 'stats' in domain: if 'daysUntilExpire' in domain['stats']: if domain['stats']['daysUntilExpire'] < 0: continue domains.append(domain) return domains def getPageTXCache(account, page, size=100): page = f"{page}-{size}" if not os.path.exists(f'cache'): os.mkdir(f'cache') if not os.path.exists(f'cache/{account}_page.json'): with open(f'cache/{account}_page.json', 'w') as f: f.write('{}') with open(f'cache/{account}_page.json') as f: pageCache = json.load(f) if page in pageCache and pageCache[page]['time'] > int(time.time()) - cacheTime: return pageCache[page]['txid'] return None def pushPageTXCache(account, page, txid, size=100): page = f"{page}-{size}" if not os.path.exists(f'cache/{account}_page.json'): with open(f'cache/{account}_page.json', 'w') as f: f.write('{}') with open(f'cache/{account}_page.json') as f: pageCache = json.load(f) pageCache[page] = { 'time': int(time.time()), 'txid': txid } with open(f'cache/{account}_page.json', 'w') as f: json.dump(pageCache, f, indent=4) return pageCache[page]['txid'] def getTXFromPage(account, page, size=100): if page == 1: return getTransactions(account, 1, size)[-1]['hash'] cached = getPageTXCache(account, page, size) if cached: return getPageTXCache(account, page, size) previous = getTransactions(account, page, size) if len(previous) == 0: return None hash = previous[-1]['hash'] pushPageTXCache(account, page, hash, size) return hash def getTransactions(account, page=1, limit=100): # Get the transactions if hsdVersion() < 7: if page != 1: return [] info = hsw.getWalletTxHistory(account) if 'error' in info: return [] return info[::-1] lastTX = None if page < 1: return [] if page > 1: lastTX = getTXFromPage(account, page-1, limit) if lastTX: response = requests.get(get_wallet_api_url(f"/wallet/{account}/tx/history?reverse=true&limit={limit}&after={lastTX}")) elif page == 1: response = requests.get(get_wallet_api_url(f"/wallet/{account}/tx/history?reverse=true&limit={limit}")) else: return [] if response.status_code != 200: print(response.text) return [] data = response.json() # Refresh the cache if the next page is different nextPage = getPageTXCache(account, page, limit) if nextPage is not None and nextPage != data[-1]['hash']: print(f'Refreshing page {page}') pushPageTXCache(account, page, data[-1]['hash'], limit) return data def getAllTransactions(account): # Get the transactions page = 0 txs = [] while True: txs += getTransactions(account, page, 1000) if len(txs) == 0: break page += 1 return txs def check_address(address: str, allow_name: bool = True, return_address: bool = False): # Check if the address is valid if address.startswith('@'): # Check if the address is a name if not allow_name and not return_address: return 'Invalid address' elif not allow_name and return_address: return False return check_hip2(address[1:]) # Check if the address is a valid HNS address response = requests.post(get_node_api_url(), json={ "method": "validateaddress", "params": [address] }).json() if response['error'] is not None: if return_address: return False return 'Invalid address' if response['result']['isvalid'] == True: if return_address: return address return 'Valid address' if return_address: return False return 'Invalid address' def check_hip2(domain: str): # Check if the domain is valid domain = domain.lower() if re.match(r'^[a-zA-Z0-9\-\.]{1,63}$', domain) is None: return 'Invalid domain' address = domainLookup.hip2(domain) if not address.startswith("Hip2: "): if not check_address(address, False, True): return 'Hip2: Lookup succeeded but address is invalid' return address # Check if DISABLE_WALLETDNS is set if os.getenv("DISABLE_WALLETDNS","").lower() in ["1","true","yes"]: return "No HIP2 record found for this domain" # Try using WALLET TXT record address = domainLookup.wallet_txt(domain) if not address.startswith("hs1"): return "No HIP2 or WALLET record found for this domain" if not check_address(address, False, True): return 'WALLET DNS record found but address is invalid' return address def send(account, address, amount): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if not account_name: return { "error": { "message": "Invalid account" } } response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) # Unlock the account if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendToAddress(address, amount) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } return { "tx": response['result'] } def isOwnDomain(account, name: str): # Get domain domain_info = getDomain(name) if 'info' not in domain_info or domain_info['info'] is None: return False if 'owner' not in domain_info['info']: return False owner = getAddressFromCoin(domain_info['info']['owner'].get("hash"),domain_info['info']['owner'].get("index")) # Select the account hsw.rpc_selectWallet(account) account = hsw.rpc_getAccount(owner) if 'error' in account and account['error'] is not None: return False if 'result' not in account: return False if account['result'] == 'default': return True return False def isOwnPrevout(account, prevout: dict): if 'hash' not in prevout or 'index' not in prevout: return False # Get the address from the prevout address = getAddressFromCoin(prevout['hash'], prevout['index']) # Select the account hsw.rpc_selectWallet(account) account = hsw.rpc_getAccount(address) if 'error' in account and account['error'] is not None: return False if 'result' not in account: return False if account['result'] == 'default': return True return False def getDomain(domain: str): if isSPV(): response = requests.get(f"https://hsd.hns.au/api/v1/name/{domain}").json() if 'error' in response: return { "error": { "message": response['error'] } } return response # Get the domain response = hsd.rpc_getNameInfo(domain) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } return response['result'] def isKnownDomain(domain: str) -> bool: # Get the domain response = hsd.rpc_getNameInfo(domain) if response['error'] is not None: return False if response['result'] is None or response['result'].get('info') is None: return False return True def getAddressFromCoin(coinhash: str, coinindex = 0): # Get the address from the hash response = requests.get(get_node_api_url(f"coin/{coinhash}/{coinindex}")) if response.status_code != 200: print(f"Error getting address from coin") return "No Owner" data = response.json() if 'address' not in data: print(json.dumps(data, indent=4)) return "No Owner" return data['address'] def renewDomain(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } response = hsw.sendRENEW(account_name, password, domain) return response def getDNS(domain: str): # Get the DNS response = hsd.rpc_getNameResource(domain) if response['error'] is not None: return { "error": response['error']['message'] } if 'result' not in response: return { "error": "No DNS records" } if response['result'] == None: return [] if 'records' not in response['result']: return [] return response['result']['records'] def setDNS(account, domain, records): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } records = json.loads(records) newRecords = [] TXTRecords = [] for record in records: if record['type'] == 'TXT': if 'txt' not in record: TXTRecords.append(record['value']) else: for txt in record['txt']: TXTRecords.append(txt) elif record['type'] == 'NS': if 'value' in record: newRecords.append({ 'type': 'NS', 'ns': record['value'] }) elif 'ns' in record: newRecords.append({ 'type': 'NS', 'ns': record['ns'] }) else: return { 'error': { 'message': 'Invalid NS record' } } elif record['type'] in ['GLUE4', 'GLUE6', "SYNTH4", "SYNTH6"]: newRecords.append({ 'type': record['type'], 'ns': str(record['value']).split(' ')[0], 'address': str(record['value']).split(' ')[1] }) else: newRecords.append(record) if len(TXTRecords) > 0: newRecords.append({ 'type': 'TXT', 'txt': TXTRecords }) data = '{"records":'+str(newRecords).replace("'", "\"")+'}' response = hsw.sendUPDATE(account_name, password, domain, data) return response def register(account, domain): # Maybe add default dns records? return setDNS(account, domain, '[]') def getNodeSync(): response = hsd.getInfo() if 'error' in response: return 0 sync = response['chain']['progress']*100 sync = round(sync, 2) return sync def getWalletStatus(): response = hsw.rpc_getWalletInfo() if 'error' in response and response['error'] != None: return "Error" # return response walletHeight = response['result']['height'] # Get the current block height nodeHeight = getBlockHeight() if walletHeight < nodeHeight: return f"Scanning {walletHeight/nodeHeight*100:.2f}%" elif walletHeight == nodeHeight: return "Ready" else: return "Error wallet ahead of node" def getBids(account, domain="NONE"): if domain == "NONE": response = hsw.getWalletBids(account) else: response = hsw.getWalletBidsByName(domain, account) # Add backup for bids with no value bids = [] for bid in response: if 'value' not in bid: bid['value'] = -1000000 # Backup for older HSD versions if 'height' not in bid: bid['height'] = 0 bids.append(bid) return bids def getReveals(account, domain): return hsw.getWalletRevealsByName(domain, account) def getPendingReveals(account): bids = getBids(account) domains = getDomains(account, False) pending = [] for domain in domains: if domain['state'] == "REVEAL": reveals = getReveals(account, domain['name']) for bid in bids: if bid['name'] == domain['name']: state_found = False for reveal in reveals: if reveal['own'] == True: if bid['value'] == reveal['value']: state_found = True if not state_found: pending.append(bid) return pending def getPendingRedeems(account, password): hsw.rpc_selectWallet(account) hsw.rpc_walletPassphrase(password, 10) tx = hsw.rpc_createREDEEM('', 'default') if tx['error']: return [] pending = [] try: for output in tx['result']['outputs']: if output['covenant']['type'] != 5: continue if output['covenant']['action'] != "REDEEM": continue nameHash = output['covenant']['items'][0] # Try to get the name from hash name = hsd.rpc_getNameByHash(nameHash) if name['error']: pending.append(nameHash) else: pending.append(name['result']) except: print("Failed to parse redeems") return pending def getPendingRegisters(account): bids = getBids(account) domains = getDomains(account, False) pending = [] for domain in domains: if domain['state'] == "CLOSED" and domain['registered'] == False: for bid in bids: if bid['name'] == domain['name']: if bid['value'] == domain['highest']: # Double check the domain is actually in the node if isKnownDomain(domain['name']): pending.append(bid) return pending def getPendingFinalizes(account, password): tx = createBatch(f'{account}:{password}', [["FINALIZE"]]) if 'error' in tx: return [] pending = [] try: for output in tx['outputs']: if type(output) != dict: continue if not 'covenant' in output: continue if output['covenant'].get("type") != 10: continue if output['covenant'].get('action') != "FINALIZE": continue nameHash = output['covenant']['items'][0] # Try to get the name from hash name = hsd.rpc_getNameByHash(nameHash) if name['error']: pending.append(nameHash) else: pending.append(name['result']) except: print("Failed to parse finalizes") return pending def getRevealTX(reveal): prevout = reveal['prevout'] hash = prevout['hash'] index = prevout['index'] tx = hsd.getTxByHash(hash) if 'inputs' not in tx: print(f'Something is up with this tx: {hash}') print(tx) print('---') # No idea what happened here # Check if registered? return None return tx['inputs'][index]['prevout']['hash'] def revealAuction(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.sendREVEAL(account_name, password, domain) return response except Exception as e: return { "error": str(e) } def revealAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: # Try to select and login to the wallet response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } return requests.post(get_wallet_api_url(), json={"method": "sendbatch", "params": [[["REVEAL"]]]}).json() except Exception as e: return { "error": { "message": str(e) } } def redeemAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: # Try to select and login to the wallet response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } return requests.post(get_wallet_api_url(), json={"method": "sendbatch", "params": [[["REDEEM"]]]}).json() except Exception as e: return { "error": { "message": str(e) } } def registerAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } # try: domains = getPendingRegisters(account_name) if len(domains) == 0: return { "error": { "message": "Nothing to do." } } batch = [] for domain in domains: batch.append(["UPDATE", domain['name'], {"records": []}]) return sendBatch(account, batch) def finalizeAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } return sendBatch(account, [["FINALIZE"]]) def rescan_auction(account, domain): # Get height of the start of the auction response = hsw.rpc_selectWallet(account) response = hsd.rpc_getNameInfo(domain) if 'result' not in response: return { "error": "Invalid domain" } if 'height' not in response['result']['info']: return { "error": "Can't find start" } height = response['result']['info']['height']-1 response = hsw.rpc_importName(domain, height) return response def bid(account, domain, bid, blind): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } bid = int(bid)*1000000 lockup = int(blind)*1000000 + bid try: response = hsw.sendBID(account_name, password, domain, bid, lockup) return response except Exception as e: return { "error": { "message": str(e) } } def openAuction(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.sendOPEN(account_name, password, domain) return response except Exception as e: return { "error": { "message": str(e) } } def transfer(account, domain, address): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.sendTRANSFER(account_name, password, domain, address) return response except Exception as e: return { "error": { "message": str(e) } } def finalize(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendFINALIZE(domain) return response except Exception as e: return { "error": { "message": str(e) } } def cancelTransfer(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendCANCEL(domain) return response except Exception as e: return { "error": { "message": str(e) } } def revoke(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendREVOKE(domain) return response except Exception as e: return { "error": { "message": str(e) } } def sendBatch(account, batch): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = requests.post(get_wallet_api_url(), json={ "method": "sendbatch", "params": [batch] }).json() if response['error'] is not None: return response if 'result' not in response: return { "error": { "message": "No result" } } return response['result'] except Exception as e: return { "error": { "message": str(e) } } def createBatch(account, batch): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = requests.post(get_wallet_api_url(), json={ "method": "createbatch", "params": [batch] }).json() if response['error'] is not None: return response if 'result' not in response: return { "error": { "message": "No result" } } return response['result'] except Exception as e: return { "error": { "message": str(e) } } # region Mempool def getMempoolTxs(): # hsd-cli rpc getrawmempool response = hsd.rpc_getRawMemPool() if 'error' in response and response['error'] is not None: return [] return response['result'] if 'result' in response else [] def getMempoolBids(): mempoolTxs = getMempoolTxs() bids = {} for txid in mempoolTxs: tx = hsd.getTxByHash(txid) if 'error' in tx and tx['error'] is not None: print(f"Error getting tx {txid}: {tx['error']}") continue if 'outputs' not in tx: print(f"Error getting outputs for tx {txid}") continue for output in tx['outputs']: if output['covenant']['action'] not in ["BID", "REVEAL"]: continue if output['covenant']['action'] == "REVEAL": # Try to find bid tx from inputs namehash = output['covenant']['items'][0] for txInput in tx['inputs']: if txInput['coin']['covenant']['action'] != "BID": continue if txInput['coin']['covenant']['items'][0] != namehash: continue name = txInput['coin']['covenant']['items'][2] # Convert name from hex to ascii name = bytes.fromhex(name).decode('ascii') bid = { 'txid': txid, 'lockup': txInput['coin']['value'], 'revealed': True, 'height': -1, 'value': output['value'], 'sort_value': txInput['coin']['value'], 'owner': "Unknown" } if name not in bids: bids[name] = [] bids[name].append(bid) continue name = output['covenant']['items'][2] # Convert name from hex to ascii name = bytes.fromhex(name).decode('ascii') if name not in bids: bids[name] = [] bid = { 'txid': txid, 'value': -1000000, # Default value if not found 'lockup': output['value'], 'revealed': False, 'height': -1, 'sort_value': output['value'], 'owner': "Unknown" } bids[name].append(bid) return bids # endregion # region settingsAPIs def rescan(): try: response = hsw.walletRescan(0) return response except Exception as e: return { "error": { "message": str(e) } } def resendTXs(): try: response = hsw.walletResend() return response except Exception as e: return { "error": { "message": str(e) } } def zapTXs(account): age = 60 * 20 # 20 minutes account_name = check_account(account) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = requests.post(get_wallet_api_url(f"/wallet/{account_name}/zap"), json={"age": age, "account": "default" }) return response except Exception as e: return { "error": { "message": str(e) } } def getxPub(account): account_name = check_account(account) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.getAccountInfo(account_name, "default") if 'error' in response: return { "error": { "message": response['error']['message'] } } return response['accountKey'] return response except Exception as e: return { "error": { "message": str(e) } } def signMessage(account, domain, message): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_signMessageWithName(domain, message) return response except Exception as e: return { "error": { "message": str(e) } } def verifyMessageWithName(domain, signature, message): try: response = hsd.rpc_verifyMessageWithName(domain, signature, message) if 'result' in response: return response['result'] return False except Exception as e: return False def verifyMessage(address, signature, message): try: response = hsd.rpc_verifyMessage(address, signature, message) if 'result' in response: return response['result'] return False except Exception as e: return False # endregion def generateReport(account, format="{name},{expiry},{value},{maxBid}"): domains = getDomains(account) lines = [format.replace("{", "").replace("}", "")] for domain in domains: line = format.replace("{name}", domain['name']) expiry = "N/A" expiryBlock = "N/A" if 'daysUntilExpire' in domain['stats']: days = domain['stats']['daysUntilExpire'] # Convert to dateTime expiry = datetime.now() + timedelta(days=days) expiry = expiry.strftime("%d/%m/%Y %H:%M:%S") expiryBlock = str(domain['stats']['renewalPeriodEnd']) line = line.replace("{expiry}", expiry) line = line.replace("{state}", domain['state']) line = line.replace("{expiryBlock}", expiryBlock) line = line.replace("{value}", str(domain['value']/1000000)) line = line.replace("{maxBid}", str(domain['highest']/1000000)) line = line.replace("{openHeight}", str(domain['height'])) lines.append(line) return lines def convertHNS(value: int): return value/1000000 SPV_EXTERNAL_ROUTES = [ "name", "coin", "tx", "block" ] def get_node_api_url(path=''): """Construct a URL for the HSD node API.""" base_url = f"http://x:{HSD_API}@{HSD_IP}:{HSD_NODE_PORT}" if isSPV() and any(path.startswith(route) for route in SPV_EXTERNAL_ROUTES): # If in SPV mode and the path is one of the external routes, use the external API base_url = f"https://hsd.hns.au/api/v1" if path: # Ensure path starts with a slash if it's not empty if not path.startswith('/'): path = f'/{path}' return f"{base_url}{path}" return base_url def get_wallet_api_url(path=''): """Construct a URL for the HSD wallet API.""" base_url = f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}" if path: # Ensure path starts with a slash if it's not empty if not path.startswith('/'): path = f'/{path}' return f"{base_url}{path}" return base_url def isSPV() -> bool: global SPV_MODE if SPV_MODE is None: info = hsd.getInfo() if 'error' in info: return False # Check if SPV mode is enabled if info.get('chain',{}).get('options',{}).get('spv',False): SPV_MODE = True else: SPV_MODE = False return SPV_MODE # region HSD Internal Node def checkPreRequisites() -> dict[str, bool]: prerequisites = { "node": False, "npm": False, "git": False, "hsd": False } # Check if node is installed and get version nodeSubprocess = subprocess.run(["node", "-v"], capture_output=True, text=True) if nodeSubprocess.returncode == 0: major_version = int(nodeSubprocess.stdout.strip().lstrip('v').split('.')[0]) if major_version >= HSD_CONFIG.get("minNodeVersion", 20): prerequisites["node"] = True # Check if npm is installed npmSubprocess = subprocess.run(["npm", "-v"], capture_output=True, text=True) if npmSubprocess.returncode == 0: major_version = int(npmSubprocess.stdout.strip().split('.')[0]) if major_version >= HSD_CONFIG.get("minNPMVersion", 8): prerequisites["npm"] = True # Check if git is installed gitSubprocess = subprocess.run(["git", "-v"], capture_output=True, text=True) if gitSubprocess.returncode == 0: prerequisites["git"] = True # Check if hsd is installed if os.path.exists("./hsd/bin/hsd"): prerequisites["hsd"] = True return prerequisites def hsdInit(): if not HSD_INTERNAL_NODE: return prerequisites = checkPreRequisites() PREREQ_MESSAGES = { "node": "Install Node.js from https://nodejs.org/en/download (Version >= {minNodeVersion})", "npm": "Install npm (version >= {minNPMVersion}) - usually comes with Node.js", "git": "Install Git from https://git-scm.com/downloads"} # Check if all prerequisites are met (except hsd) if not all(prerequisites[key] for key in prerequisites if key != "hsd"): print("HSD Internal Node prerequisites not met:") for key, value in prerequisites.items(): if not value: print(f" - {key} is missing or does not meet the version requirement.") exit(1) return # Check if hsd is installed if not prerequisites["hsd"]: print("HSD not found, installing...") # If hsd folder exists, remove it if os.path.exists("hsd"): os.rmdir("hsd") # Clone hsd repo gitClone = subprocess.run(["git", "clone", "--depth", "1", "--branch", HSD_CONFIG.get("version", "latest"), "https://github.com/handshake-org/hsd.git", "hsd"], capture_output=True, text=True) if gitClone.returncode != 0: print("Failed to clone hsd repository:") print(gitClone.stderr) exit(1) print("Cloned hsd repository.") # Install hsd dependencies print("Installing hsd dependencies...") npmInstall = subprocess.run(["npm", "install"], cwd="hsd", capture_output=True, text=True) if npmInstall.returncode != 0: print("Failed to install hsd dependencies:") print(npmInstall.stderr) exit(1) print("Installed hsd dependencies.") def hsdStart(): global HSD_PROCESS global SPV_MODE if not HSD_INTERNAL_NODE: return # Check if hsd was started in the last 30 seconds if os.path.exists("hsd.lock"): lock_time = os.path.getmtime("hsd.lock") if time.time() - lock_time < 30: print("HSD was started recently, skipping start.") return else: os.remove("hsd.lock") print("Starting HSD...") # Create a lock file with open("hsd.lock", "w") as f: f.write(str(time.time())) # Config lookups with defaults chain_migrate = HSD_CONFIG.get("chainMigrate", False) wallet_migrate = HSD_CONFIG.get("walletMigrate", False) spv = HSD_CONFIG.get("spv", False) prefix = HSD_CONFIG.get("prefix", os.path.join(os.getcwd(), "hsd-data")) # Base command cmd = [ "node", "./hsd/bin/hsd", f"--network={HSD_NETWORK}", f"--prefix={prefix}", f"--api-key={HSD_API}", "--http-host=127.0.0.1", "--log-console=false" ] # Conditionally add migration flags if chain_migrate: cmd.append(f"--chain-migrate={chain_migrate}") if wallet_migrate: cmd.append(f"--wallet-migrate={wallet_migrate}") SPV_MODE = spv if spv: cmd.append("--spv") # Add flags if len(HSD_CONFIG.get("flags",[])) > 0: for flag in HSD_CONFIG.get("flags",[]): cmd.append(flag) # Launch process HSD_PROCESS = subprocess.Popen( cmd, cwd=os.getcwd(), text=True ) print(f"HSD started with PID {HSD_PROCESS.pid}") atexit.register(hsdStop) # Handle Ctrl+C try: signal.signal(signal.SIGINT, lambda s, f: (hsdStop(), sys.exit(0))) signal.signal(signal.SIGTERM, lambda s, f: (hsdStop(), sys.exit(0))) except: pass def hsdStop(): global HSD_PROCESS if HSD_PROCESS is None: return print("Stopping HSD...") # Send SIGINT (like Ctrl+C) HSD_PROCESS.send_signal(signal.SIGINT) try: HSD_PROCESS.wait(timeout=10) # wait for graceful exit print("HSD shut down cleanly.") except subprocess.TimeoutExpired: print("HSD did not exit yet, is it alright???") # Clean up lock file if os.path.exists("hsd.lock"): os.remove("hsd.lock") HSD_PROCESS = None def hsdRestart(): hsdStop() time.sleep(2) hsdStart() checkPreRequisites() hsdInit() hsdStart() # endregion