import functools import time import requests import dotenv import os import json import uuid import hns dotenv.load_dotenv() HSD_API_KEY = os.getenv('HSD_API_KEY') HSD_API_IP = os.getenv('HSD_API_IP') if not HSD_API_IP: HSD_API_IP = '127.0.0.1' HSD_API_NETWORK = os.getenv('HSD_API_NETWORK') if not HSD_API_NETWORK: HSD_API_NETWORK = 'main' network_port_int = { 'main': 1203, 'testnet': 1303, 'regtest': 1403, 'simnet': 1503 } account_file = 'accounts.json' wallet_file = 'wallets.json' if not os.path.exists('.local'): account_file = '/data/accounts.json' wallet_file = '/data/wallets.json' # Create any missing files if not os.path.exists(account_file): with open(account_file, 'w') as f: json.dump({}, f, indent=4) if not os.path.exists(wallet_file): with open(wallet_file, 'w') as f: json.dump([], f, indent=4) def cache_with_ttl(max_age, maxsize=128, typed=False): """Least-recently-used cache decorator with time-based cache invalidation. Args: max_age: Time to live for cached results (in seconds). maxsize: Maximum cache size (see `functools.lru_cache`). typed: Cache on distinct input types (see `functools.lru_cache`). """ def _decorator(fn): @functools.lru_cache(maxsize=maxsize, typed=typed) def _new(*args, __time_salt, **kwargs): return fn(*args, **kwargs) @functools.wraps(fn) def _wrapped(*args, **kwargs): return _new(*args, **kwargs, __time_salt=int(time.time() / max_age)) return _wrapped return _decorator def url(walletURL:bool) -> str: if walletURL: return f'http://x:{HSD_API_KEY}@{HSD_API_IP}:{network_port_int[HSD_API_NETWORK]}9/' else: return f'http://x:{HSD_API_KEY}@{HSD_API_IP}:{network_port_int[HSD_API_NETWORK]}7/' def rescan() -> dict: return requests.post(url(True)+'rescan', json={'height':0}).json() def get_master() -> dict: return requests.get(url(True)+'a61ba47a-54de-43b9-9d22-fcf6827cd5c2/master').json() def get_status() -> dict: return requests.get(url(False)).json() def get_wallets() -> dict: return requests.get(url(True)+'wallet').json() def create_account() -> dict: # Generate a UUID userID = str(uuid.uuid4()) with open(account_file, 'r') as f: accounts = json.load(f) # Check if the user already exists if userID in accounts: return { 'error': 'Please try again' } accounts[userID] = { 'wallets': [] } with open(account_file, 'w') as f: json.dump(accounts, f, indent=4) return { 'userID': userID } def get_account(userID:str) -> dict: accounts = get_accounts() if userID in accounts: return accounts[userID] else: return { 'error': 'User not found' } def get_accounts() -> dict: # Read from accounts with open(account_file, 'r') as f: accounts = json.load(f) return accounts def import_wallet(name:str,userID:str,xpub:str) -> dict: accounts = get_accounts() if userID not in accounts: return { 'error': 'User not found' } # Check if the wallet already exists for wallet in accounts[userID]['wallets']: if wallet['name'] == name: return { 'error': 'Wallet already exists' } # Create the wallet using a UUID walletID = str(uuid.uuid4()) with open(wallet_file, 'r') as f: wallets = json.load(f) if walletID in wallets: return { 'error': 'Please try again' } wallet = { 'name': name, 'xpub': xpub, 'walletID': walletID } wallet_data = { "watchOnly": True, "accountKey": xpub, } response = requests.put(url(True)+'wallet/'+walletID, json=wallet_data) if response.status_code != 200: print(response.text) return { 'error': 'Error creating wallet' } accounts[userID]['wallets'].append(wallet) with open(account_file, 'w') as f: json.dump(accounts, f, indent=4) wallets.append(walletID) with open(wallet_file, 'w') as f: json.dump(wallets, f, indent=4) # Rescan the wallet to get the balance rescan() return { 'walletID': walletID } @cache_with_ttl(60*60) def get_wallet_UUID(userID:str, name:str) -> str|None: accounts = get_accounts() if userID not in accounts: return None walletID = None for wallet in accounts[userID]['wallets']: if wallet['name'] == name: walletID = wallet['walletID'] break return walletID def get_wallet(userID:str, name:str) -> dict: walletID = get_wallet_UUID(userID, name) if not walletID: return { 'error': 'Wallet not found' } return requests.get(url(True)+'wallet/'+walletID).json() def get_address(userID:str, name:str) -> dict: walletID = get_wallet_UUID(userID, name) if not walletID: return { 'error': 'Wallet not found' } request = requests.get(url(True)+'wallet/'+walletID+'/account/default').json() return { 'address': request['receiveAddress'] } def get_balance(userID:str, name:str) -> dict: walletID = get_wallet_UUID(userID, name) if not walletID: return { 'error': 'Wallet not found' } balance = requests.get(url(True)+'wallet/'+walletID+'/balance?account=default').json() domains_value = get_domains_value(userID, name) # Convert to human readable return_json = { 'available': (balance['confirmed']-domains_value)/10**6, 'available_small': balance['confirmed']-domains_value, 'locked': balance['lockedConfirmed']/10**6, 'locked_small': balance['lockedConfirmed'], } return return_json @cache_with_ttl(60*1) def get_domains(userID:str, name:str) -> dict: walletID = get_wallet_UUID(userID, name) if not walletID: return { 'error': 'Wallet not found' } request = requests.get(url(True)+'wallet/'+walletID+'/name?own=true').json() names = [] names_value = 0 for name in request: names_value += name['value'] names.append({ 'name': name['name'], 'state': name['state'], 'highest': name['highest']/10**6, 'value': name['value']/10**6, 'height': name['height'], 'stats': name['stats'] }) # Save name value to accounts with open(account_file, 'r') as f: accounts = json.load(f) for account in accounts: if account == userID: wallets = accounts[account]['wallets'] for wallet in wallets: if wallet['walletID'] == walletID: wallet['names_value'] = names_value break with open(account_file, 'w') as f: json.dump(accounts, f, indent=4) return names def get_domains_value(userID:str, name:str) -> int: walletID = get_wallet_UUID(userID, name) if not walletID: return { 'error': 'Wallet not found' } with open(account_file, 'r') as f: accounts = json.load(f) for account in accounts: if account == userID: wallets = accounts[account]['wallets'] for wallet in wallets: if wallet['walletID'] == walletID: if 'names_value' in wallet: return wallet['names_value'] names = get_domains(userID, name) names_value = 0 for name in names: names_value += name['value'] return names_value def send_to_address(userID:str, name:str, address:str, amount:str) -> dict: walletID = get_wallet_UUID(userID, name) if not walletID: return { 'error': 'Wallet not found' } # Verify amount amount = float(amount) if amount <= 0: return { 'error': 'Amount must be greater than 0' } balance = get_balance(userID, name) if balance['available'] < amount: return { 'error': 'Insufficient funds' } subtractFee = False if balance['available'] - amount < 0.03: subtractFee = True amount = balance['available'] data = { 'outputs': [ { 'address': address, 'value': hns.to_small(amount) } ], 'sign': False, 'subtractFee': subtractFee } response = requests.post(url(True)+'wallet/'+walletID+'/create', json=data) if response.status_code != 200: print(response.text) return { 'error': 'Error sending' } return response.json() @cache_with_ttl(60*1) def get_transactions(userID:str, name:str) -> dict: walletID = get_wallet_UUID(userID, name) if not walletID: return { 'error': 'Wallet not found' } request = requests.get(url(True)+'wallet/'+walletID+'/tx/history').json() return request # region Auctions @cache_with_ttl(60*1) def get_auctions(userID:str, name:str) -> dict: walletID = get_wallet_UUID(userID, name) if not walletID: return { 'error': 'Wallet not found' } request = requests.get(url(True)+'wallet/'+walletID+'/auction') return request.json() # endregion