from datetime import datetime, timedelta from handywrapper import api import os import dotenv import requests import re import domainLookup import json import time import subprocess import atexit import signal import sys dotenv.load_dotenv() HSD_API = os.getenv("HSD_API","") HSD_IP = os.getenv("HSD_IP","localhost") HSD_NETWORK = os.getenv("HSD_NETWORK", "main") HSD_WALLET_PORT = 12039 HSD_NODE_PORT = 12037 HSD_NETWORK = HSD_NETWORK.lower() if HSD_NETWORK == "simnet": HSD_WALLET_PORT = 15039 HSD_NODE_PORT = 15037 elif HSD_NETWORK == "testnet": HSD_WALLET_PORT = 13039 HSD_NODE_PORT = 13037 elif HSD_NETWORK == "regtest": HSD_WALLET_PORT = 14039 HSD_NODE_PORT = 14037 HSD_INTERNAL_NODE = os.getenv("INTERNAL_HSD","false").lower() in ["1","true","yes"] if HSD_INTERNAL_NODE: if HSD_API == "": # Use a random API KEY HSD_API = "firewallet-" + str(int(time.time())) HSD_IP = "localhost" SHOW_EXPIRED = os.getenv("SHOW_EXPIRED") if SHOW_EXPIRED is None: SHOW_EXPIRED = False HSD_PROCESS = None # Get hsdconfig.json HSD_CONFIG = {} if not os.path.exists('hsdconfig.json'): # Pull from the latest git response = requests.get("https://git.woodburn.au/nathanwoodburn/firewalletbrowser/raw/branch/main/hsdconfig.json") if response.status_code == 200: with open('hsdconfig.json', 'w') as f: f.write(response.text) HSD_CONFIG = response.json() else: with open('hsdconfig.json') as f: HSD_CONFIG = json.load(f) hsd = api.hsd(HSD_API, HSD_IP, HSD_NODE_PORT) hsw = api.hsw(HSD_API, HSD_IP, HSD_WALLET_PORT) cacheTime = 3600 # Verify the connection response = hsd.getInfo() EXCLUDE = os.getenv("EXCLUDE","primary").split(",") def hsdConnected(): if hsdVersion() == -1: return False return True def hsdVersion(format=True): info = hsd.getInfo() if 'error' in info: return -1 if format: return float('.'.join(info['version'].split(".")[:2])) else: return info['version'] def check_account(cookie: str | None): if cookie is None: return False # Check the account if cookie.count(":") < 1: return False account = cookie.split(":")[0] # Check if the account is valid info = hsw.getAccountInfo(account, 'default') if 'error' in info: return False return account def check_password(cookie: str|None, password: str|None): if cookie is None: return False if password is None: password = "" account = check_account(cookie) if account == False: return False # Check if the password is valid info = hsw.rpc_selectWallet(account) if info['error'] is not None: return False info = hsw.rpc_walletPassphrase(password, 1) if info['error'] is not None: if info['error']['message'] != "Wallet is not encrypted.": return False return True def createWallet(account: str, password: str): if not hsdConnected(): return { "error": { "message": "Node not connected" } } # Create the account # Python wrapper doesn't support this yet response = requests.put(get_wallet_api_url(f"wallet/{account}")) if response.status_code != 200: return { "error": { "message": "Error creating account" } } # Get seed seed = hsw.getMasterHDKey(account) seed = seed['mnemonic']['phrase'] # Encrypt the wallet (python wrapper doesn't support this yet) response = requests.post(get_wallet_api_url(f"/wallet/{account}/passphrase"), json={"passphrase": password}) return { "seed": seed, "account": account, "password": password } def importWallet(account: str, password: str, seed: str): if not hsdConnected(): return { "error": { "message": "Node not connected" } } # Import the wallet data = { "passphrase": password, "mnemonic": seed, } response = requests.put(get_wallet_api_url(f"/wallet/{account}"), json=data) if response.status_code != 200: return { "error": { "message": "Error creating account" } } return { "seed": seed, "account": account, "password": password } def listWallets(): # List the wallets response = hsw.listWallets() # Check if response is json or an array if isinstance(response, list): # Remove excluded wallets response = [wallet for wallet in response if wallet not in EXCLUDE] return response return ['Wallet not connected'] def selectWallet(account: str): # Select wallet response = hsw.rpc_selectWallet(account) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } def getBalance(account: str): # Get the total balance info = hsw.getBalance('default', account) if 'error' in info: return {'available': 0, 'total': 0} total = info['confirmed'] available = total - info['lockedConfirmed'] locked = info['lockedConfirmed'] / 1000000 # Convert to HNS total = total / 1000000 available = available / 1000000 domains = getDomains(account) domainValue = 0 for domain in domains: if domain['state'] == "CLOSED": domainValue += domain['value'] total = total - (domainValue/1000000) locked = locked - (domainValue/1000000) # Only keep 2 decimal places total = round(total, 2) available = round(available, 2) return {'available': available, 'total': total, 'locked': locked} def getBlockHeight(): # Get the block height info = hsd.getInfo() if 'error' in info: return 0 return info['chain']['height'] def getAddress(account: str): # Get the address info = hsw.getAccountInfo(account, 'default') if 'error' in info: return '' return info['receiveAddress'] def getPendingTX(account: str): pending = 0 page = 1 pageSize = 10 while True: txs = getTransactions(account, page, pageSize) page += 1 pendingPage = 0 for tx in txs: if tx['confirmations'] < 1: pending += 1 pendingPage += 1 if pendingPage < pageSize: break return pending def getDomains(account, own=True): if own: response = requests.get(get_wallet_api_url(f"/wallet/{account}/name?own=true")) else: response = requests.get(get_wallet_api_url(f"/wallet/{account}/name")) info = response.json() if SHOW_EXPIRED: return info # Remove any expired domains domains = [] for domain in info: if 'stats' in domain: if 'daysUntilExpire' in domain['stats']: if domain['stats']['daysUntilExpire'] < 0: continue domains.append(domain) return domains def getPageTXCache(account, page, size=100): page = f"{page}-{size}" if not os.path.exists(f'cache'): os.mkdir(f'cache') if not os.path.exists(f'cache/{account}_page.json'): with open(f'cache/{account}_page.json', 'w') as f: f.write('{}') with open(f'cache/{account}_page.json') as f: pageCache = json.load(f) if page in pageCache and pageCache[page]['time'] > int(time.time()) - cacheTime: return pageCache[page]['txid'] return None def pushPageTXCache(account, page, txid, size=100): page = f"{page}-{size}" if not os.path.exists(f'cache/{account}_page.json'): with open(f'cache/{account}_page.json', 'w') as f: f.write('{}') with open(f'cache/{account}_page.json') as f: pageCache = json.load(f) pageCache[page] = { 'time': int(time.time()), 'txid': txid } with open(f'cache/{account}_page.json', 'w') as f: json.dump(pageCache, f, indent=4) return pageCache[page]['txid'] def getTXFromPage(account, page, size=100): if page == 1: return getTransactions(account, 1, size)[-1]['hash'] cached = getPageTXCache(account, page, size) if cached: return getPageTXCache(account, page, size) previous = getTransactions(account, page, size) if len(previous) == 0: return None hash = previous[-1]['hash'] pushPageTXCache(account, page, hash, size) return hash def getTransactions(account, page=1, limit=100): # Get the transactions if hsdVersion() < 7: if page != 1: return [] info = hsw.getWalletTxHistory(account) if 'error' in info: return [] return info[::-1] lastTX = None if page < 1: return [] if page > 1: lastTX = getTXFromPage(account, page-1, limit) if lastTX: response = requests.get(get_wallet_api_url(f"/wallet/{account}/tx/history?reverse=true&limit={limit}&after={lastTX}")) elif page == 1: response = requests.get(get_wallet_api_url(f"/wallet/{account}/tx/history?reverse=true&limit={limit}")) else: return [] if response.status_code != 200: print(response.text) return [] data = response.json() # Refresh the cache if the next page is different nextPage = getPageTXCache(account, page, limit) if nextPage is not None and nextPage != data[-1]['hash']: print(f'Refreshing page {page}') pushPageTXCache(account, page, data[-1]['hash'], limit) return data def getAllTransactions(account): # Get the transactions page = 0 txs = [] while True: txs += getTransactions(account, page, 1000) if len(txs) == 0: break page += 1 return txs def check_address(address: str, allow_name: bool = True, return_address: bool = False): # Check if the address is valid if address.startswith('@'): # Check if the address is a name if not allow_name and not return_address: return 'Invalid address' elif not allow_name and return_address: return False return check_hip2(address[1:]) # Check if the address is a valid HNS address response = requests.post(get_node_api_url(), json={ "method": "validateaddress", "params": [address] }).json() if response['error'] is not None: if return_address: return False return 'Invalid address' if response['result']['isvalid'] == True: if return_address: return address return 'Valid address' if return_address: return False return 'Invalid address' def check_hip2(domain: str): # Check if the domain is valid domain = domain.lower() if re.match(r'^[a-zA-Z0-9\-\.]{1,63}$', domain) is None: return 'Invalid domain' address = domainLookup.hip2(domain) if not address.startswith("Hip2: "): if not check_address(address, False, True): return 'Hip2: Lookup succeeded but address is invalid' return address # Check if DISABLE_WALLETDNS is set if os.getenv("DISABLE_WALLETDNS","").lower() in ["1","true","yes"]: return "No HIP2 record found for this domain" # Try using WALLET TXT record address = domainLookup.wallet_txt(domain) if not address.startswith("hs1"): return "No HIP2 or WALLET record found for this domain" if not check_address(address, False, True): return 'WALLET DNS record found but address is invalid' return address def send(account, address, amount): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if not account_name: return { "error": { "message": "Invalid account" } } response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) # Unlock the account if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendToAddress(address, amount) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } return { "tx": response['result'] } def isOwnDomain(account, name: str): # Get domain domain_info = getDomain(name) owner = getAddressFromCoin(domain_info['info']['owner']['hash'],domain_info['info']['owner']['index']) # Select the account hsw.rpc_selectWallet(account) account = hsw.rpc_getAccount(owner) if 'error' in account and account['error'] is not None: return False if 'result' not in account: return False if account['result'] == 'default': return True return False def isOwnPrevout(account, prevout: dict): if 'hash' not in prevout or 'index' not in prevout: return False # Get the address from the prevout address = getAddressFromCoin(prevout['hash'], prevout['index']) # Select the account hsw.rpc_selectWallet(account) account = hsw.rpc_getAccount(address) if 'error' in account and account['error'] is not None: return False if 'result' not in account: return False if account['result'] == 'default': return True return False def getDomain(domain: str): # Get the domain response = hsd.rpc_getNameInfo(domain) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } return response['result'] def getAddressFromCoin(coinhash: str, coinindex = 0): # Get the address from the hash response = requests.get(get_node_api_url(f"coin/{coinhash}/{coinindex}")) if response.status_code != 200: print(f"Error getting address from coin: {response.text}") return "No Owner" data = response.json() if 'address' not in data: print(json.dumps(data, indent=4)) return "No Owner" return data['address'] def renewDomain(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } response = hsw.sendRENEW(account_name, password, domain) return response def getDNS(domain: str): # Get the DNS response = hsd.rpc_getNameResource(domain) if response['error'] is not None: return { "error": response['error']['message'] } if 'result' not in response: return { "error": "No DNS records" } if response['result'] == None: return [] if 'records' not in response['result']: return [] return response['result']['records'] def setDNS(account, domain, records): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } records = json.loads(records) newRecords = [] TXTRecords = [] for record in records: if record['type'] == 'TXT': if 'txt' not in record: TXTRecords.append(record['value']) else: for txt in record['txt']: TXTRecords.append(txt) elif record['type'] == 'NS': if 'value' in record: newRecords.append({ 'type': 'NS', 'ns': record['value'] }) elif 'ns' in record: newRecords.append({ 'type': 'NS', 'ns': record['ns'] }) else: return { 'error': { 'message': 'Invalid NS record' } } elif record['type'] in ['GLUE4', 'GLUE6', "SYNTH4", "SYNTH6"]: newRecords.append({ 'type': record['type'], 'ns': str(record['value']).split(' ')[0], 'address': str(record['value']).split(' ')[1] }) else: newRecords.append(record) if len(TXTRecords) > 0: newRecords.append({ 'type': 'TXT', 'txt': TXTRecords }) data = '{"records":'+str(newRecords).replace("'", "\"")+'}' response = hsw.sendUPDATE(account_name, password, domain, data) return response def register(account, domain): # Maybe add default dns records? return setDNS(account, domain, '[]') def getNodeSync(): response = hsd.getInfo() if 'error' in response: return 0 sync = response['chain']['progress']*100 sync = round(sync, 2) return sync def getWalletStatus(): response = hsw.rpc_getWalletInfo() if 'error' in response and response['error'] != None: return "Error" # return response walletHeight = response['result']['height'] # Get the current block height nodeHeight = getBlockHeight() if walletHeight < nodeHeight: return f"Scanning {walletHeight/nodeHeight*100:.2f}%" elif walletHeight == nodeHeight: return "Ready" else: return "Error wallet ahead of node" def getBids(account, domain="NONE"): if domain == "NONE": response = hsw.getWalletBids(account) else: response = hsw.getWalletBidsByName(domain, account) # Add backup for bids with no value bids = [] for bid in response: if 'value' not in bid: bid['value'] = -1000000 # Backup for older HSD versions if 'height' not in bid: bid['height'] = 0 bids.append(bid) return bids def getReveals(account, domain): return hsw.getWalletRevealsByName(domain, account) def getPendingReveals(account): bids = getBids(account) domains = getDomains(account, False) pending = [] for domain in domains: if domain['state'] == "REVEAL": reveals = getReveals(account, domain['name']) for bid in bids: if bid['name'] == domain['name']: state_found = False for reveal in reveals: if reveal['own'] == True: if bid['value'] == reveal['value']: state_found = True if not state_found: pending.append(bid) return pending def getPendingRedeems(account, password): hsw.rpc_selectWallet(account) hsw.rpc_walletPassphrase(password, 10) tx = hsw.rpc_createREDEEM('', 'default') if tx['error']: return [] pending = [] try: for output in tx['result']['outputs']: if output['covenant']['type'] != 5: continue if output['covenant']['action'] != "REDEEM": continue nameHash = output['covenant']['items'][0] # Try to get the name from hash name = hsd.rpc_getNameByHash(nameHash) if name['error']: pending.append(nameHash) else: pending.append(name['result']) except: print("Failed to parse redeems") return pending def getPendingRegisters(account): bids = getBids(account) domains = getDomains(account, False) pending = [] for domain in domains: if domain['state'] == "CLOSED" and domain['registered'] == False: for bid in bids: if bid['name'] == domain['name']: if bid['value'] == domain['highest']: pending.append(bid) return pending def getPendingFinalizes(account, password): tx = createBatch(f'{account}:{password}', [["FINALIZE"]]) if 'error' in tx: return [] pending = [] try: for output in tx['outputs']: if type(output) != dict: continue if not 'covenant' in output: continue if output['covenant'].get("type") != 10: continue if output['covenant'].get('action') != "FINALIZE": continue nameHash = output['covenant']['items'][0] # Try to get the name from hash name = hsd.rpc_getNameByHash(nameHash) if name['error']: pending.append(nameHash) else: pending.append(name['result']) except: print("Failed to parse finalizes") return pending def getRevealTX(reveal): prevout = reveal['prevout'] hash = prevout['hash'] index = prevout['index'] tx = hsd.getTxByHash(hash) if 'inputs' not in tx: print(f'Something is up with this tx: {hash}') print(tx) print('---') # No idea what happened here # Check if registered? return None return tx['inputs'][index]['prevout']['hash'] def revealAuction(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.sendREVEAL(account_name, password, domain) return response except Exception as e: return { "error": str(e) } def revealAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: # Try to select and login to the wallet response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } return requests.post(get_wallet_api_url(), json={"method": "sendbatch", "params": [[["REVEAL"]]]}).json() except Exception as e: return { "error": { "message": str(e) } } def redeemAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: # Try to select and login to the wallet response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } return requests.post(get_wallet_api_url(), json={"method": "sendbatch", "params": [[["REDEEM"]]]}).json() except Exception as e: return { "error": { "message": str(e) } } def registerAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } # try: domains = getPendingRegisters(account_name) if len(domains) == 0: return { "error": { "message": "Nothing to do." } } batch = [] for domain in domains: batch.append(["UPDATE", domain['name'], {"records": []}]) return sendBatch(account, batch) def finalizeAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } return sendBatch(account, [["FINALIZE"]]) def rescan_auction(account, domain): # Get height of the start of the auction response = hsw.rpc_selectWallet(account) response = hsd.rpc_getNameInfo(domain) if 'result' not in response: return { "error": "Invalid domain" } if 'height' not in response['result']['info']: return { "error": "Can't find start" } height = response['result']['info']['height']-1 response = hsw.rpc_importName(domain, height) return response def bid(account, domain, bid, blind): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } bid = int(bid)*1000000 lockup = int(blind)*1000000 + bid try: response = hsw.sendBID(account_name, password, domain, bid, lockup) return response except Exception as e: return { "error": { "message": str(e) } } def openAuction(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.sendOPEN(account_name, password, domain) return response except Exception as e: return { "error": { "message": str(e) } } def transfer(account, domain, address): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.sendTRANSFER(account_name, password, domain, address) return response except Exception as e: return { "error": { "message": str(e) } } def finalize(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendFINALIZE(domain) return response except Exception as e: return { "error": { "message": str(e) } } def cancelTransfer(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendCANCEL(domain) return response except Exception as e: return { "error": { "message": str(e) } } def revoke(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendREVOKE(domain) return response except Exception as e: return { "error": { "message": str(e) } } def sendBatch(account, batch): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = requests.post(get_wallet_api_url(), json={ "method": "sendbatch", "params": [batch] }).json() if response['error'] is not None: return response if 'result' not in response: return { "error": { "message": "No result" } } return response['result'] except Exception as e: return { "error": { "message": str(e) } } def createBatch(account, batch): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = requests.post(get_wallet_api_url(), json={ "method": "createbatch", "params": [batch] }).json() if response['error'] is not None: return response if 'result' not in response: return { "error": { "message": "No result" } } return response['result'] except Exception as e: return { "error": { "message": str(e) } } # region Mempool def getMempoolTxs(): # hsd-cli rpc getrawmempool response = hsd.rpc_getRawMemPool() if 'error' in response and response['error'] is not None: return [] return response['result'] if 'result' in response else [] def getMempoolBids(): mempoolTxs = getMempoolTxs() bids = {} for txid in mempoolTxs: tx = hsd.getTxByHash(txid) if 'error' in tx and tx['error'] is not None: print(f"Error getting tx {txid}: {tx['error']}") continue if 'outputs' not in tx: print(f"Error getting outputs for tx {txid}") continue for output in tx['outputs']: if output['covenant']['action'] not in ["BID", "REVEAL"]: continue if output['covenant']['action'] == "REVEAL": # Try to find bid tx from inputs namehash = output['covenant']['items'][0] for txInput in tx['inputs']: if txInput['coin']['covenant']['action'] != "BID": continue if txInput['coin']['covenant']['items'][0] != namehash: continue name = txInput['coin']['covenant']['items'][2] # Convert name from hex to ascii name = bytes.fromhex(name).decode('ascii') bid = { 'txid': txid, 'lockup': txInput['coin']['value'], 'revealed': True, 'height': -1, 'value': output['value'], 'sort_value': txInput['coin']['value'], 'owner': "Unknown" } if name not in bids: bids[name] = [] bids[name].append(bid) continue name = output['covenant']['items'][2] # Convert name from hex to ascii name = bytes.fromhex(name).decode('ascii') if name not in bids: bids[name] = [] bid = { 'txid': txid, 'value': -1000000, # Default value if not found 'lockup': output['value'], 'revealed': False, 'height': -1, 'sort_value': output['value'], 'owner': "Unknown" } bids[name].append(bid) return bids # endregion # region settingsAPIs def rescan(): try: response = hsw.walletRescan(0) return response except Exception as e: return { "error": { "message": str(e) } } def resendTXs(): try: response = hsw.walletResend() return response except Exception as e: return { "error": { "message": str(e) } } def zapTXs(account): age = 60 * 20 # 20 minutes account_name = check_account(account) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = requests.post(get_wallet_api_url(f"/wallet/{account_name}/zap"), json={"age": age, "account": "default" }) return response except Exception as e: return { "error": { "message": str(e) } } def getxPub(account): account_name = check_account(account) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.getAccountInfo(account_name, "default") if 'error' in response: return { "error": { "message": response['error']['message'] } } return response['accountKey'] return response except Exception as e: return { "error": { "message": str(e) } } def signMessage(account, domain, message): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_signMessageWithName(domain, message) return response except Exception as e: return { "error": { "message": str(e) } } def verifyMessageWithName(domain, signature, message): try: response = hsd.rpc_verifyMessageWithName(domain, signature, message) if 'result' in response: return response['result'] return False except Exception as e: return False def verifyMessage(address, signature, message): try: response = hsd.rpc_verifyMessage(address, signature, message) if 'result' in response: return response['result'] return False except Exception as e: return False # endregion def generateReport(account, format="{name},{expiry},{value},{maxBid}"): domains = getDomains(account) lines = [format.replace("{", "").replace("}", "")] for domain in domains: line = format.replace("{name}", domain['name']) expiry = "N/A" expiryBlock = "N/A" if 'daysUntilExpire' in domain['stats']: days = domain['stats']['daysUntilExpire'] # Convert to dateTime expiry = datetime.now() + timedelta(days=days) expiry = expiry.strftime("%d/%m/%Y %H:%M:%S") expiryBlock = str(domain['stats']['renewalPeriodEnd']) line = line.replace("{expiry}", expiry) line = line.replace("{state}", domain['state']) line = line.replace("{expiryBlock}", expiryBlock) line = line.replace("{value}", str(domain['value']/1000000)) line = line.replace("{maxBid}", str(domain['highest']/1000000)) line = line.replace("{openHeight}", str(domain['height'])) lines.append(line) return lines def convertHNS(value: int): return value/1000000 def get_node_api_url(path=''): """Construct a URL for the HSD node API.""" base_url = f"http://x:{HSD_API}@{HSD_IP}:{HSD_NODE_PORT}" if path: # Ensure path starts with a slash if it's not empty if not path.startswith('/'): path = f'/{path}' return f"{base_url}{path}" return base_url def get_wallet_api_url(path=''): """Construct a URL for the HSD wallet API.""" base_url = f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}" if path: # Ensure path starts with a slash if it's not empty if not path.startswith('/'): path = f'/{path}' return f"{base_url}{path}" return base_url # region HSD Internal Node def checkPreRequisites() -> dict[str, bool]: prerequisites = { "node": False, "npm": False, "git": False, "hsd": False } # Check if node is installed and get version nodeSubprocess = subprocess.run(["node", "-v"], capture_output=True, text=True) if nodeSubprocess.returncode == 0: major_version = int(nodeSubprocess.stdout.strip().lstrip('v').split('.')[0]) if major_version >= HSD_CONFIG.get("minNodeVersion", 20): prerequisites["node"] = True # Check if npm is installed npmSubprocess = subprocess.run(["npm", "-v"], capture_output=True, text=True) if npmSubprocess.returncode == 0: major_version = int(npmSubprocess.stdout.strip().split('.')[0]) if major_version >= HSD_CONFIG.get("minNPMVersion", 8): prerequisites["npm"] = True # Check if git is installed gitSubprocess = subprocess.run(["git", "-v"], capture_output=True, text=True) if gitSubprocess.returncode == 0: prerequisites["git"] = True # Check if hsd is installed if os.path.exists("./hsd/bin/hsd"): prerequisites["hsd"] = True return prerequisites def hsdInit(): if not HSD_INTERNAL_NODE: return prerequisites = checkPreRequisites() PREREQ_MESSAGES = { "node": "Install Node.js from https://nodejs.org/en/download (Version >= {minNodeVersion})", "npm": "Install npm (version >= {minNPMVersion}) - usually comes with Node.js", "git": "Install Git from https://git-scm.com/downloads"} # Check if all prerequisites are met (except hsd) if not all(prerequisites[key] for key in prerequisites if key != "hsd"): print("HSD Internal Node prerequisites not met:") for key, value in prerequisites.items(): if not value: print(f" - {key} is missing or does not meet the version requirement.") exit(1) return # Check if hsd is installed if not prerequisites["hsd"]: print("HSD not found, installing...") # If hsd folder exists, remove it if os.path.exists("hsd"): os.rmdir("hsd") # Clone hsd repo gitClone = subprocess.run(["git", "clone", "--depth", "1", "--branch", HSD_CONFIG.get("version", "latest"), "https://github.com/handshake-org/hsd.git", "hsd"], capture_output=True, text=True) if gitClone.returncode != 0: print("Failed to clone hsd repository:") print(gitClone.stderr) exit(1) print("Cloned hsd repository.") # Install hsd dependencies print("Installing hsd dependencies...") npmInstall = subprocess.run(["npm", "install"], cwd="hsd", capture_output=True, text=True) if npmInstall.returncode != 0: print("Failed to install hsd dependencies:") print(npmInstall.stderr) exit(1) print("Installed hsd dependencies.") def hsdStart(): global HSD_PROCESS if not HSD_INTERNAL_NODE: return # Check if hsd was started in the last 30 seconds if os.path.exists("hsd.lock"): lock_time = os.path.getmtime("hsd.lock") if time.time() - lock_time < 30: print("HSD was started recently, skipping start.") return else: os.remove("hsd.lock") print("Starting HSD...") # Create a lock file with open("hsd.lock", "w") as f: f.write(str(time.time())) # Config lookups with defaults chain_migrate = HSD_CONFIG.get("chainMigrate", False) wallet_migrate = HSD_CONFIG.get("walletMigrate", False) spv = HSD_CONFIG.get("spv", False) # Base command cmd = [ "node", "./hsd/bin/hsd", f"--network={HSD_NETWORK}", f"--prefix={os.path.join(os.getcwd(), 'hsd-data')}", f"--api-key={HSD_API}", "--agent=FireWallet", "--http-host=127.0.0.1", "--log-console=false" ] # Conditionally add migration flags if chain_migrate: cmd.append(f"--chain-migrate={chain_migrate}") if wallet_migrate: cmd.append(f"--wallet-migrate={wallet_migrate}") if spv: cmd.append("--spv") # Launch process HSD_PROCESS = subprocess.Popen( cmd, cwd=os.getcwd(), text=True ) print(f"HSD started with PID {HSD_PROCESS.pid}") atexit.register(hsdStop) # Handle Ctrl+C try: signal.signal(signal.SIGINT, lambda s, f: (hsdStop(), sys.exit(0))) signal.signal(signal.SIGTERM, lambda s, f: (hsdStop(), sys.exit(0))) except: pass def hsdStop(): global HSD_PROCESS if HSD_PROCESS is None: return print("Stopping HSD...") # Send SIGINT (like Ctrl+C) HSD_PROCESS.send_signal(signal.SIGINT) try: HSD_PROCESS.wait(timeout=10) # wait for graceful exit print("HSD shut down cleanly.") except subprocess.TimeoutExpired: print("HSD did not exit yet, is it alright???") # Clean up lock file if os.path.exists("hsd.lock"): os.remove("hsd.lock") HSD_PROCESS = None def hsdRestart(): hsdStop() time.sleep(2) hsdStart() checkPreRequisites() hsdInit() hsdStart() # endregion