from datetime import datetime, timedelta from handywrapper import api import os import dotenv import requests import re import domainLookup import json import time import concurrent.futures dotenv.load_dotenv() HSD_API = os.getenv("HSD_API") HSD_IP = os.getenv("HSD_IP") if HSD_IP is None: HSD_IP = "localhost" HSD_NETWORK = os.getenv("HSD_NETWORK") HSD_WALLET_PORT = 12039 HSD_NODE_PORT = 12037 if not HSD_NETWORK: HSD_NETWORK = "main" else: HSD_NETWORK = HSD_NETWORK.lower() if HSD_NETWORK == "simnet": HSD_WALLET_PORT = 15039 HSD_NODE_PORT = 15037 elif HSD_NETWORK == "testnet": HSD_WALLET_PORT = 13039 HSD_NODE_PORT = 13037 elif HSD_NETWORK == "regtest": HSD_WALLET_PORT = 14039 HSD_NODE_PORT = 14037 SHOW_EXPIRED = os.getenv("SHOW_EXPIRED") if SHOW_EXPIRED is None: SHOW_EXPIRED = False hsd = api.hsd(HSD_API, HSD_IP, HSD_NODE_PORT) hsw = api.hsw(HSD_API, HSD_IP, HSD_WALLET_PORT) cacheTime = 3600 # Verify the connection response = hsd.getInfo() EXCLUDE = ["primary"] if os.getenv("EXCLUDE") is not None: EXCLUDE = os.getenv("EXCLUDE").split(",") def hsdConnected(): if hsdVersion() == -1: return False return True def hsdVersion(format=True): info = hsd.getInfo() if 'error' in info: return -1 if format: return float('.'.join(info['version'].split(".")[:2])) else: return info['version'] def check_account(cookie: str): if cookie is None: return False # Check the account if cookie.count(":") < 1: return False account = cookie.split(":")[0] # Check if the account is valid info = hsw.getAccountInfo(account, 'default') if 'error' in info: return False return account def check_password(cookie: str, password: str): account = check_account(cookie) if account == False: return False # Check if the password is valid info = hsw.rpc_selectWallet(account) if info['error'] is not None: return False info = hsw.rpc_walletPassphrase(password, 1) if info['error'] is not None: if info['error']['message'] != "Wallet is not encrypted.": return False return True def createWallet(account: str, password: str): if not hsdConnected(): return { "error": { "message": "Node not connected" } } # Create the account # Python wrapper doesn't support this yet response = requests.put( f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}") if response.status_code != 200: return { "error": { "message": "Error creating account" } } # Get seed seed = hsw.getMasterHDKey(account) seed = seed['mnemonic']['phrase'] # Encrypt the wallet (python wrapper doesn't support this yet) response = requests.post(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}/passphrase", json={"passphrase": password}) return { "seed": seed, "account": account, "password": password } def importWallet(account: str, password: str, seed: str): if not hsdConnected(): return { "error": { "message": "Node not connected" } } # Import the wallet data = { "passphrase": password, "mnemonic": seed, } response = requests.put( f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}", json=data) if response.status_code != 200: return { "error": { "message": "Error creating account" } } return { "seed": seed, "account": account, "password": password } def listWallets(): # List the wallets response = hsw.listWallets() # Check if response is json or an array if isinstance(response, list): # Remove excluded wallets response = [wallet for wallet in response if wallet not in EXCLUDE] return response return ['Wallet not connected'] def selectWallet(account: str): # Select wallet response = hsw.rpc_selectWallet(account) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } def getBalance(account: str): # Get the total balance info = hsw.getBalance('default', account) if 'error' in info: return {'available': 0, 'total': 0} total = info['confirmed'] available = total - info['lockedConfirmed'] locked = info['lockedConfirmed'] / 1000000 # Convert to HNS total = total / 1000000 available = available / 1000000 domains = getDomains(account) domainValue = 0 for domain in domains: if domain['state'] == "CLOSED": domainValue += domain['value'] total = total - (domainValue/1000000) locked = locked - (domainValue/1000000) # Only keep 2 decimal places total = round(total, 2) available = round(available, 2) return {'available': available, 'total': total, 'locked': locked} def getBlockHeight(): # Get the block height info = hsd.getInfo() if 'error' in info: return 0 return info['chain']['height'] def getAddress(account: str): # Get the address info = hsw.getAccountInfo(account, 'default') if 'error' in info: return '' return info['receiveAddress'] def getPendingTX(account: str): pending = 0 page = 1 pageSize = 10 while True: txs = getTransactions(account, page, pageSize) page += 1 pendingPage = 0 for tx in txs: if tx['confirmations'] < 1: pending += 1 pendingPage += 1 if pendingPage < pageSize: break return pending def getDomains(account, own=True): if own: response = requests.get( f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}/name?own=true") else: response = requests.get( f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}/name") info = response.json() if SHOW_EXPIRED: return info # Remove any expired domains domains = [] for domain in info: if 'stats' in domain: if 'daysUntilExpire' in domain['stats']: if domain['stats']['daysUntilExpire'] < 0: continue domains.append(domain) return domains def getPageTXCache(account, page, size=100): page = f"{page}-{size}" if not os.path.exists(f'cache'): os.mkdir(f'cache') if not os.path.exists(f'cache/{account}_page.json'): with open(f'cache/{account}_page.json', 'w') as f: f.write('{}') with open(f'cache/{account}_page.json') as f: pageCache = json.load(f) if page in pageCache and pageCache[page]['time'] > int(time.time()) - cacheTime: return pageCache[page]['txid'] return None def pushPageTXCache(account, page, txid, size=100): page = f"{page}-{size}" if not os.path.exists(f'cache/{account}_page.json'): with open(f'cache/{account}_page.json', 'w') as f: f.write('{}') with open(f'cache/{account}_page.json') as f: pageCache = json.load(f) pageCache[page] = { 'time': int(time.time()), 'txid': txid } with open(f'cache/{account}_page.json', 'w') as f: json.dump(pageCache, f, indent=4) return pageCache[page]['txid'] def getTXFromPage(account, page, size=100): if page == 1: return getTransactions(account, 1, size)[-1]['hash'] cached = getPageTXCache(account, page, size) if cached: return getPageTXCache(account, page, size) previous = getTransactions(account, page, size) if len(previous) == 0: return None hash = previous[-1]['hash'] pushPageTXCache(account, page, hash, size) return hash def getTransactions(account, page=1, limit=100): # Get the transactions if hsdVersion() < 7: if page != 1: return [] info = hsw.getWalletTxHistory(account) if 'error' in info: return [] return info[::-1] lastTX = None if page < 1: return [] if page > 1: lastTX = getTXFromPage(account, page-1, limit) if lastTX: response = requests.get( f'http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}/tx/history?reverse=true&limit={limit}&after={lastTX}') elif page == 1: response = requests.get( f'http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}/tx/history?reverse=true&limit={limit}') else: return [] if response.status_code != 200: print(response.text) return [] data = response.json() # Refresh the cache if the next page is different nextPage = getPageTXCache(account, page, limit) if nextPage is not None and nextPage != data[-1]['hash']: print(f'Refreshing page {page}') pushPageTXCache(account, page, data[-1]['hash'], limit) return data def getAllTransactions(account): # Get the transactions page = 0 txs = [] while True: txs += getTransactions(account, page, 1000) if len(txs) == 0: break page += 1 return txs def check_address(address: str, allow_name: bool = True, return_address: bool = False): # Check if the address is valid if address.startswith('@'): # Check if the address is a name if not allow_name and not return_address: return 'Invalid address' elif not allow_name and return_address: return False return check_hip2(address[1:]) # Check if the address is a valid HNS address response = requests.post(f"http://x:{HSD_API}@{HSD_IP}:{HSD_NODE_PORT}", json={ "method": "validateaddress", "params": [address] }).json() if response['error'] is not None: if return_address: return False return 'Invalid address' if response['result']['isvalid'] == True: if return_address: return address return 'Valid address' if return_address: return False return 'Invalid address' def check_hip2(domain: str): # Check if the domain is valid domain = domain.lower() if re.match(r'^[a-zA-Z0-9\-\.]{1,63}$', domain) is None: return 'Invalid domain' address = domainLookup.hip2(domain) if address.startswith("Hip2: "): return address if not check_address(address, False, True): return 'Hip2: Lookup succeeded but address is invalid' return address def send(account, address, amount): account_name = check_account(account) password = ":".join(account.split(":")[1:]) response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) # Unlock the account # response = requests.post(f"http://x:{APIKEY}@{ip}:{HSD_WALLET_PORT}/wallet/{account_name}/unlock", # json={"passphrase": password,"timeout": 10}) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendToAddress(address, amount) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } return { "tx": response['result'] } def isOwnDomain(account, name: str): domains = getDomains(account) for domain in domains: if domain['name'] == name: return True return False def getDomain(domain: str): # Get the domain response = hsd.rpc_getNameInfo(domain) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } return response['result'] def getAddressFromCoin(coinhash: str, coinindex = 0): # Get the address from the hash response = requests.get(f"http://x:{HSD_API}@{HSD_IP}:{HSD_NODE_PORT}/coin/{coinhash}/{coinindex}") if response.status_code != 200: return { "error": { "message": "Error getting address from coin" } } data = response.json() if 'address' not in data: return { "error": { "message": "Error getting address from coin" } } return data['address'] def renewDomain(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } response = hsw.sendRENEW(account_name, password, domain) return response def getDNS(domain: str): # Get the DNS response = hsd.rpc_getNameResource(domain) if response['error'] is not None: return { "error": response['error']['message'] } if 'result' not in response: return { "error": "No DNS records" } if response['result'] == None: return [] if 'records' not in response['result']: return [] return response['result']['records'] def setDNS(account, domain, records): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } records = json.loads(records) newRecords = [] TXTRecords = [] for record in records: if record['type'] == 'TXT': if 'txt' not in record: TXTRecords.append(record['value']) else: for txt in record['txt']: TXTRecords.append(txt) elif record['type'] == 'NS': if 'value' in record: newRecords.append({ 'type': 'NS', 'ns': record['value'] }) elif 'ns' in record: newRecords.append({ 'type': 'NS', 'ns': record['ns'] }) else: return { 'error': { 'message': 'Invalid NS record' } } elif record['type'] in ['GLUE4', 'GLUE6', "SYNTH4", "SYNTH6"]: newRecords.append({ 'type': record['type'], 'ns': str(record['value']).split(' ')[0], 'address': str(record['value']).split(' ')[1] }) else: newRecords.append(record) if len(TXTRecords) > 0: newRecords.append({ 'type': 'TXT', 'txt': TXTRecords }) data = '{"records":'+str(newRecords).replace("'", "\"")+'}' response = hsw.sendUPDATE(account_name, password, domain, data) return response def register(account, domain): # Maybe add default dns records? return setDNS(account, domain, '[]') def getNodeSync(): response = hsd.getInfo() if 'error' in response: return 0 sync = response['chain']['progress']*100 sync = round(sync, 2) return sync def getWalletStatus(): response = hsw.rpc_getWalletInfo() if 'error' in response and response['error'] != None: return "Error" # return response walletHeight = response['result']['height'] # Get the current block height nodeHeight = getBlockHeight() if walletHeight < nodeHeight: return f"Scanning {walletHeight/nodeHeight*100:.2f}%" elif walletHeight == nodeHeight: return "Ready" else: return "Error wallet ahead of node" # Add a simple cache for bid data _bid_cache = {} _bid_cache_time = {} _cache_duration = 300 # Increased cache duration to 5 minutes for bids # Add domain info cache _domain_info_cache = {} _domain_info_time = {} _domain_info_duration = 600 # Cache domain info for 10 minutes # Add wallet authentication cache _wallet_auth_cache = {} _wallet_auth_time = {} _wallet_auth_duration = 300 # Increased to 5 minutes for wallet auth def getBids(account, domain="NONE"): cache_key = f"{account}:{domain}" current_time = time.time() # Return cached data if available and fresh if cache_key in _bid_cache and current_time - _bid_cache_time.get(cache_key, 0) < _cache_duration: return _bid_cache[cache_key] try: if domain == "NONE": response = hsw.getWalletBids(account) else: response = hsw.getWalletBidsByName(domain, account) # Add backup for bids with no value bids = [] for bid in response: if 'value' not in bid: bid['value'] = -1000000 # Backup for older HSD versions if 'height' not in bid: bid['height'] = 0 bids.append(bid) # Cache the results _bid_cache[cache_key] = bids _bid_cache_time[cache_key] = current_time return bids except Exception as e: print(f"Error fetching bids: {str(e)}") return [] def _fetch_domain_info(domain): """Helper function to fetch domain info with caching""" current_time = time.time() # Check cache first if (domain in _domain_info_cache and current_time - _domain_info_time.get(domain, 0) < _domain_info_duration): return _domain_info_cache[domain] # Fetch domain info domain_info = getDomain(domain) # Store in cache _domain_info_cache[domain] = domain_info _domain_info_time[domain] = current_time return domain_info def _fetch_domain_batch(domains, max_workers=10): """Fetch multiple domains in parallel""" results = {} with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # Create a mapping of futures to domains future_to_domain = {executor.submit(_fetch_domain_info, domain): domain for domain in domains} # Process as they complete for future in concurrent.futures.as_completed(future_to_domain): domain = future_to_domain[future] try: results[domain] = future.result() except Exception as e: print(f"Error fetching domain {domain}: {str(e)}") results[domain] = {"error": str(e)} return results def getPossibleOutbids(account): # Get all bids bids = getBids(account) if not bids or 'error' in bids: return [] # Get current height current_height = getBlockHeight() # Sort out bids older than 720 blocks and extract domain names filtered_bids = [] domains_to_check = set() for bid in bids: if (current_height - bid['height']) <= 720: filtered_bids.append(bid) domains_to_check.add(bid['name']) if not domains_to_check: return [] # Fetch all domain info in parallel domain_info_map = _fetch_domain_batch(domains_to_check) # Pre-filter domains in bidding state bidding_domains = { domain: info for domain, info in domain_info_map.items() if ('info' in info and 'state' in info['info'] and info['info']['state'] == "BIDDING") } # Process the results possible_outbids = [] processed_domains = set() # Group bids by domain name for efficient processing bids_by_domain = {} for bid in filtered_bids: domain = bid['name'] if domain not in bids_by_domain: bids_by_domain[domain] = [] bids_by_domain[domain].append(bid) # Analyze each domain in bidding state for domain, info in bidding_domains.items(): if domain in processed_domains or domain not in bids_by_domain: continue processed_domains.add(domain) # Get all bids for this domain in one call domain_bids = getBids(account, domain) # Find the highest bid we've made our_highest_bid = max( (bid['value'] for bid in domain_bids if bid.get("own", False)), default=0 ) # Quick check if any unrevealed bids could outbid us if any( bid["lockup"] > our_highest_bid for bid in domain_bids if not bid.get("own", False) and bid.get('value', 0) == -1000000 ): possible_outbids.append(domain) return possible_outbids def getReveals(account, domain): return hsw.getWalletRevealsByName(domain, account) def getPendingReveals(account): bids = getBids(account) # Only get domains in REVEAL state to reduce API calls domains = [d for d in getDomains(account, False) if d['state'] == "REVEAL"] if not domains: # Early return if no domains in REVEAL state return [] pending = [] # Create a dictionary for O(1) lookups domain_names = {domain['name']: domain for domain in domains} # Group bids by name to batch process reveals bids_by_name = {} for bid in bids: if bid['name'] in domain_names: if bid['name'] not in bids_by_name: bids_by_name[bid['name']] = [] bids_by_name[bid['name']].append(bid) # Fetch reveals for each domain once reveals_by_name = {} for domain_name in bids_by_name: reveals_by_name[domain_name] = getReveals(account, domain_name) # Check each bid against the reveals for domain_name, domain_bids in bids_by_name.items(): domain_reveals = reveals_by_name[domain_name] for bid in domain_bids: # Check if this bid has been revealed bid_revealed = any( reveal['own'] == True and bid['value'] == reveal['value'] for reveal in domain_reveals ) if not bid_revealed: pending.append(bid) return pending def getPendingRedeems(account, password): hsw.rpc_selectWallet(account) hsw.rpc_walletPassphrase(password, 10) tx = hsw.rpc_createREDEEM('', 'default') if tx['error']: return [] pending = [] try: # Collect all nameHashes first name_hashes = [] for output in tx['result']['outputs']: if output['covenant']['type'] != 5: continue if output['covenant']['action'] != "REDEEM": continue name_hashes.append(output['covenant']['items'][0]) # Batch processing name hashes name_lookup = {} for name_hash in name_hashes: name = hsd.rpc_getNameByHash(name_hash) if name['error']: pending.append(name_hash) else: pending.append(name['result']) name_lookup[name_hash] = name['result'] except Exception as e: print(f"Failed to parse redeems: {str(e)}") return pending def getPendingRegisters(account): bids = getBids(account) domains = getDomains(account, False) # Create dictionaries for O(1) lookups bids_by_name = {} for bid in bids: if bid['name'] not in bids_by_name: bids_by_name[bid['name']] = [] bids_by_name[bid['name']].append(bid) pending = [] for domain in domains: if domain['state'] == "CLOSED" and domain['registered'] == False: if domain['name'] in bids_by_name: for bid in bids_by_name[domain['name']]: if bid['value'] == domain['highest']: pending.append(bid) return pending def getPendingFinalizes(account, password): tx = createBatch(f'{account}:{password}', [["FINALIZE"]]) if 'error' in tx: return [] pending = [] try: # Collect all nameHashes first name_hashes = [] for output in tx['outputs']: if output['covenant']['type'] != 10: continue if output['covenant']['action'] != "FINALIZE": continue name_hashes.append(output['covenant']['items'][0]) # Batch lookup for name hashes for name_hash in name_hashes: name = hsd.rpc_getNameByHash(name_hash) if name['error']: pending.append(name_hash) else: pending.append(name['result']) except Exception as e: print(f"Failed to parse finalizes: {str(e)}") return pending def getRevealTX(reveal): prevout = reveal['prevout'] hash = prevout['hash'] index = prevout['index'] tx = hsd.getTxByHash(hash) if 'inputs' not in tx: print(f'Something is up with this tx: {hash}') print(tx) print('---') # No idea what happened here # Check if registered? return None return tx['inputs'][index]['prevout']['hash'] def revealAuction(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.sendREVEAL(account_name, password, domain) return response except Exception as e: return { "error": str(e) } def revealAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: # Try to select and login to the wallet response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } return requests.post(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}", json={"method": "sendbatch", "params": [[["REVEAL"]]]}).json() except Exception as e: return { "error": { "message": str(e) } } def redeemAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: # Try to select and login to the wallet response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } return requests.post(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}", json={"method": "sendbatch", "params": [[["REDEEM"]]]}).json() except Exception as e: return { "error": { "message": str(e) } } def registerAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } # try: domains = getPendingRegisters(account_name) if len(domains) == 0: return { "error": { "message": "Nothing to do." } } batch = [] for domain in domains: batch.append(["UPDATE", domain['name'], {"records": []}]) return sendBatch(account, batch) def finalizeAll(account): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } return sendBatch(account, [["FINALIZE"]]) def rescan_auction(account, domain): # Get height of the start of the auction response = hsw.rpc_selectWallet(account) response = hsd.rpc_getNameInfo(domain) if 'result' not in response: return { "error": "Invalid domain" } if 'bidPeriodStart' not in response['result']['info']['stats']: return { "error": "Not in auction" } height = response['result']['info']['height']-1 response = hsw.rpc_importName(domain, height) return response def bid(account, domain, bid, blind): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } bid = int(bid)*1000000 lockup = int(blind)*1000000 + bid try: response = hsw.sendBID(account_name, password, domain, bid, lockup) return response except Exception as e: return { "error": { "message": str(e) } } def openAuction(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.sendOPEN(account_name, password, domain) return response except Exception as e: return { "error": { "message": str(e) } } def transfer(account, domain, address): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.sendTRANSFER(account_name, password, domain, address) return response except Exception as e: return { "error": { "message": str(e) } } def finalize(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendFINALIZE(domain) return response except Exception as e: return { "error": { "message": str(e) } } def cancelTransfer(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendCANCEL(domain) return response except Exception as e: return { "error": { "message": str(e) } } def revoke(account, domain): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_sendREVOKE(domain) return response except Exception as e: return { "error": { "message": str(e) } } def _prepare_wallet_for_batch(account_name, password): """Helper function to prepare wallet for batch operations with caching""" cache_key = f"{account_name}:{password}" current_time = time.time() # Return cached authentication if available and fresh if (cache_key in _wallet_auth_cache and current_time - _wallet_auth_time.get(cache_key, 0) < _wallet_auth_duration): return _wallet_auth_cache[cache_key] # Select and unlock wallet result = {'success': False, 'error': None} # Try to select the wallet select_response = hsw.rpc_selectWallet(account_name) if select_response['error'] is not None: result['error'] = {"message": select_response['error']['message']} return result # Try to unlock the wallet unlock_response = hsw.rpc_walletPassphrase(password, 30) # Increased timeout to reduce future unlocks if (unlock_response['error'] is not None and unlock_response['error']['message'] != "Wallet is not encrypted."): result['error'] = {"message": unlock_response['error']['message']} return result # Authentication successful result['success'] = True # Cache the authentication result _wallet_auth_cache[cache_key] = result _wallet_auth_time[cache_key] = current_time return result def _execute_batch_operation(account_name, batch, operation_type="sendbatch"): """Execute a batch operation with the specified wallet""" # Make the batch request try: response = requests.post( f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}", json={"method": operation_type, "params": [batch]}, timeout=30 # Add timeout to prevent hanging ).json() if response['error'] is not None: return response if 'result' not in response: return {"error": {"message": "No result"}} return response['result'] except Exception as e: return {"error": {"message": str(e)}} def sendBatch(account, batch): account_name = check_account(account) if account_name == False: return {"error": {"message": "Invalid account"}} password = ":".join(account.split(":")[1:]) # Prepare the wallet (this uses caching) auth_result = _prepare_wallet_for_batch(account_name, password) if not auth_result['success']: return auth_result['error'] # Execute the batch operation return _execute_batch_operation(account_name, batch, "sendbatch") def createBatch(account, batch): account_name = check_account(account) if account_name == False: return {"error": {"message": "Invalid account"}} password = ":".join(account.split(":")[1:]) # Prepare the wallet (this uses caching) auth_result = _prepare_wallet_for_batch(account_name, password) if not auth_result['success']: return auth_result['error'] # Execute the batch operation return _execute_batch_operation(account_name, batch, "createbatch") # region settingsAPIs def rescan(): try: response = hsw.walletRescan(0) return response except Exception as e: return { "error": { "message": str(e) } } def resendTXs(): try: response = hsw.walletResend() return response except Exception as e: return { "error": { "message": str(e) } } def zapTXs(account): age = 60 * 20 # 20 minutes account_name = check_account(account) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = requests.post(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account_name}/zap", json={"age": age, "account": "default" }) return response except Exception as e: return { "error": { "message": str(e) } } def getxPub(account): account_name = check_account(account) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.getAccountInfo(account_name, "default") if 'error' in response: return { "error": { "message": response['error']['message'] } } return response['accountKey'] return response except Exception as e: return { "error": { "message": str(e) } } def signMessage(account, domain, message): account_name = check_account(account) password = ":".join(account.split(":")[1:]) if account_name == False: return { "error": { "message": "Invalid account" } } try: response = hsw.rpc_selectWallet(account_name) if response['error'] is not None: return { "error": { "message": response['error']['message'] } } response = hsw.rpc_walletPassphrase(password, 10) if response['error'] is not None: if response['error']['message'] != "Wallet is not encrypted.": return { "error": { "message": response['error']['message'] } } response = hsw.rpc_signMessageWithName(domain, message) return response except Exception as e: return { "error": { "message": str(e) } } def verifyMessageWithName(domain, signature, message): try: response = hsd.rpc_verifyMessageWithName(domain, signature, message) if 'result' in response: return response['result'] return False except Exception as e: return False def verifyMessage(address, signature, message): try: response = hsd.rpc_verifyMessage(address, signature, message) if 'result' in response: return response['result'] return False except Exception as e: return False # endregion def generateReport(account, format="{name},{expiry},{value},{maxBid}"): domains = getDomains(account) lines = [format.replace("{", "").replace("}", "")] for domain in domains: line = format.replace("{name}", domain['name']) expiry = "N/A" expiryBlock = "N/A" if 'daysUntilExpire' in domain['stats']: days = domain['stats']['daysUntilExpire'] # Convert to dateTime expiry = datetime.now() + timedelta(days=days) expiry = expiry.strftime("%d/%m/%Y %H:%M:%S") expiryBlock = str(domain['stats']['renewalPeriodEnd']) line = line.replace("{expiry}", expiry) line = line.replace("{state}", domain['state']) line = line.replace("{expiryBlock}", expiryBlock) line = line.replace("{value}", str(domain['value']/1000000)) line = line.replace("{maxBid}", str(domain['highest']/1000000)) line = line.replace("{openHeight}", str(domain['height'])) lines.append(line) return lines def convertHNS(value: int): return value/1000000