All checks were successful
Build Docker / Build Image (push) Successful in 1m16s
810 lines
20 KiB
810 lines
20 KiB
from datetime import datetime, timedelta
from handywrapper import api
import os
import dotenv
import requests
import re
import domainLookup
import json
APIKEY = os.getenv("hsd_api")
ip = os.getenv("hsd_ip")
if ip is None:
ip = "localhost"
show_expired = os.getenv("show_expired")
if show_expired is None:
show_expired = False
hsd = api.hsd(APIKEY,ip)
hsw = api.hsw(APIKEY,ip)
# Verify the connection
response = hsd.getInfo()
def check_account(cookie: str):
if cookie is None:
return False
# Check the account
if cookie.count(":") < 1:
return False
account = cookie.split(":")[0]
# Check if the account is valid
info = hsw.getAccountInfo(account, 'default')
if 'error' in info:
return False
return account
def check_password(cookie: str, password: str):
account = check_account(cookie)
if account == False:
return False
# Check if the password is valid
info = hsw.rpc_selectWallet(account)
if info['error'] is not None:
return False
info = hsw.rpc_walletPassphrase(password,10)
if info['error'] is not None:
return False
return True
def createWallet(account: str, password: str):
# Create the account
# Python wrapper doesn't support this yet
response = requests.put(f"http://x:{APIKEY}@{ip}:12039/wallet/{account}")
if response.status_code != 200:
return {
"error": {
"message": "Error creating account"
# Get seed
seed = hsw.getMasterHDKey(account)
seed = seed['mnemonic']['phrase']
# Encrypt the wallet (python wrapper doesn't support this yet)
response ="http://x:{APIKEY}@{ip}:12039/wallet/{account}/passphrase",
json={"passphrase": password})
return {
"seed": seed,
"account": account,
"password": password
def importWallet(account: str, password: str,seed: str):
# Import the wallet
data = {
"passphrase": password,
"mnemonic": seed,
response = requests.put(f"http://x:{APIKEY}@{ip}:12039/wallet/{account}",json=data)
if response.status_code != 200:
return {
"error": {
"message": "Error creating account"
return {
"seed": seed,
"account": account,
"password": password
def listWallets():
# List the wallets
response = hsw.listWallets()
# Check if response is json or an array
if isinstance(response, list):
return response
return ['Wallet not connected']
def getBalance(account: str):
# Get the total balance
info = hsw.getBalance('default',account)
if 'error' in info:
return {'available': 0, 'total': 0}
total = info['confirmed']
available = total - info['lockedConfirmed']
locked = info['lockedConfirmed'] / 1000000
# Convert to HNS
total = total / 1000000
available = available / 1000000
domains = getDomains(account)
domainValue = 0
for domain in domains:
if domain['state'] == "CLOSED":
domainValue += domain['value']
total = total - (domainValue/1000000)
locked = locked - (domainValue/1000000)
# Only keep 2 decimal places
total = round(total, 2)
available = round(available, 2)
return {'available': available, 'total': total, 'locked': locked}
def getBlockHeight():
# Get the block height
info = hsd.getInfo()
if 'error' in info:
return 0
return info['chain']['height']
def getAddress(account: str):
# Get the address
info = hsw.getAccountInfo(account, 'default')
if 'error' in info:
return ''
return info['receiveAddress']
def getPendingTX(account: str):
# Get the pending transactions
info = hsw.getWalletTxHistory(account)
if 'error' in info:
return 0
pending = 0
for tx in info:
if tx['confirmations'] < 1:
pending += 1
return pending
def getDomains(account,own=True):
if own:
response = requests.get(f"http://x:{APIKEY}@{ip}:12039/wallet/{account}/name?own=true")
response = requests.get(f"http://x:{APIKEY}@{ip}:12039/wallet/{account}/name")
info = response.json()
if show_expired:
return info
# Remove any expired domains
domains = []
for domain in info:
if 'stats' in domain:
if 'daysUntilExpire' in domain['stats']:
if domain['stats']['daysUntilExpire'] < 0:
return domains
def getTransactions(account):
# Get the transactions
info = hsw.getWalletTxHistory(account)
if 'error' in info:
return []
return info
def check_address(address: str, allow_name: bool = True, return_address: bool = False):
# Check if the address is valid
if address.startswith('@'):
# Check if the address is a name
if not allow_name and not return_address:
return 'Invalid address'
elif not allow_name and return_address:
return False
return check_hip2(address[1:])
# Check if the address is a valid HNS address
response ="http://x:{APIKEY}@{ip}:12037",json={
"method": "validateaddress",
"params": [address]
if response['error'] is not None:
if return_address:
return False
return 'Invalid address'
if response['result']['isvalid'] == True:
if return_address:
return address
return 'Valid address'
if return_address:
return False
return 'Invalid address'
def check_hip2(domain: str):
# Check if the domain is valid
domain = domain.lower()
if re.match(r'^[a-zA-Z0-9\-\.]{1,63}$', domain) is None:
return 'Invalid address'
address = domainLookup.hip2(domain)
if address.startswith("Hip2: "):
return address
if not check_address(address, False,True):
return 'Hip2: Lookup succeeded but address is invalid'
return address
def send(account,address,amount):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
response = hsw.rpc_walletPassphrase(password,10)
# Unlock the account
# response ="http://x:{APIKEY}@{ip}:12039/wallet/{account_name}/unlock",
# json={"passphrase": password,"timeout": 10})
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
response = hsw.rpc_sendToAddress(address,amount)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
return {
"tx": response['result']
def getDomain(domain: str):
# Get the domain
response = hsd.rpc_getNameInfo(domain)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
return response['result']
def renewDomain(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
response = hsw.sendRENEW(account_name,password,domain)
return response
def getDNS(domain: str):
# Get the DNS
response = hsd.rpc_getNameResource(domain)
if response['error'] is not None:
return {
"error": response['error']['message']
if 'result' not in response:
return {
"error": "No DNS records"
if 'records' not in response['result']:
return []
return response['result']['records']
def setDNS(account,domain,records):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
records = json.loads(records)
newRecords = []
TXTRecords = []
for record in records:
if record['type'] == 'TXT':
if 'txt' not in record:
for txt in record['txt']:
elif record['type'] == 'NS':
'type': 'NS',
'ns': record['value']
elif record['type'] in ['GLUE4','GLUE6',"SYNTH4","SYNTH6"]:
'type': record['type'],
'ns': str(record['value']).split(' ')[0],
'address': str(record['value']).split(' ')[1]
if len(TXTRecords) > 0:
'type': 'TXT',
'txt': TXTRecords
data = '{"records":'+str(newRecords).replace("'","\"")+'}'
response = hsw.sendUPDATE(account_name,password,domain,data)
return response
def getNodeSync():
response = hsd.getInfo()
if 'error' in response:
return 0
sync = response['chain']['progress']*100
sync = round(sync, 2)
return sync
def getWalletStatus():
response = hsw.rpc_getWalletInfo()
if 'error' in response and response['error'] != None:
return "Error"
# return response
walletHeight = response['result']['height']
# Get the current block height
nodeHeight = getBlockHeight()
if walletHeight < nodeHeight:
return f"Scanning {walletHeight/nodeHeight*100:.2f}%"
elif walletHeight == nodeHeight:
return "Ready"
return "Error wallet ahead of node"
def getBids(account, domain="NONE"):
if domain == "NONE":
return hsw.getWalletBids(account)
response = hsw.getWalletBidsByName(domain,account)
return response
def getReveals(account,domain):
return hsw.getWalletRevealsByName(domain,account)
def getRevealTX(reveal):
prevout = reveal['prevout']
hash = prevout['hash']
index = prevout['index']
tx = hsd.getTxByHash(hash)
return tx['inputs'][index]['prevout']['hash']
def revealAuction(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
response = hsw.sendREVEAL(account_name,password,domain)
return response
except Exception as e:
return {
"error": str(e)
def revealAll(account):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
# Try to select and login to the wallet
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
# Try to send the batch of all renew, reveal and redeem actions
return"http://x:{APIKEY}@{ip}:12039",json={"method": "sendbatch","params": [[["REVEAL"]]]}).json()
except Exception as e:
return {
"error": {
"message": str(e)
def rescan_auction(account,domain):
# Get height of the start of the auction
response = hsw.rpc_selectWallet(account)
response = hsd.rpc_getNameInfo(domain)
if 'result' not in response:
return {
"error": "Invalid domain"
if 'bidPeriodStart' not in response['result']['info']['stats']:
return {
"error": "Not in auction"
height = response['result']['info']['height']-1
response = hsw.rpc_importName(domain,height)
return response
def bid(account,domain,bid,blind):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
bid = int(bid)*1000000
lockup = int(blind)*1000000 + bid
response = hsw.sendBID(account_name,password,domain,bid,lockup)
return response
except Exception as e:
return {
"error": {
"message": str(e)
def openAuction(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
response = hsw.sendOPEN(account_name,password,domain)
return response
except Exception as e:
return {
"error": {
"message": str(e)
def transfer(account,domain,address):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
response = hsw.sendTRANSFER(account_name,password,domain,address)
return response
except Exception as e:
return {
"error": {
"message": str(e)
def finalize(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
response = hsw.rpc_sendFINALIZE(domain)
return response
except Exception as e:
return {
"error": {
"message": str(e)
def cancelTransfer(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
response = hsw.rpc_sendCANCEL(domain)
return response
except Exception as e:
return {
"error": {
"message": str(e)
def revoke(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
response = hsw.rpc_sendREVOKE(domain)
return response
except Exception as e:
return {
"error": {
"message": str(e)
#region settingsAPIs
def rescan():
response = hsw.walletRescan(0)
return response
except Exception as e:
return {
"error": {
"message": str(e)
def resendTXs():
response = hsw.walletResend()
return response
except Exception as e:
return {
"error": {
"message": str(e)
def zapTXs(account):
age = 60 * 20 # 20 minutes
account_name = check_account(account)
if account_name == False:
return {
"error": {
"message": "Invalid account"
response ="http://x:{APIKEY}@{ip}:12039/wallet/{account_name}/zap",
json={"age": age,
"account": "default"
return response
except Exception as e:
return {
"error": {
"message": str(e)
def getxPub(account):
account_name = check_account(account)
if account_name == False:
return {
"error": {
"message": "Invalid account"
response = hsw.getAccountInfo(account_name,"default")
if 'error' in response:
return {
"error": {
"message": response['error']['message']
return response['accountKey']
return response
except Exception as e:
return {
"error": {
"message": str(e)
def signMessage(account,domain,message):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
response = hsw.rpc_signMessageWithName(domain,message)
return response
except Exception as e:
return {
"error": {
"message": str(e)
def generateReport(account,format="{name},{expiry},{value},{maxBid}"):
domains = getDomains(account)
lines = [format.replace("{","").replace("}","")]
for domain in domains:
line = format.replace("{name}",domain['name'])
expiry = "N/A"
expiryBlock = "N/A"
if 'daysUntilExpire' in domain['stats']:
days = domain['stats']['daysUntilExpire']
# Convert to dateTime
expiry = + timedelta(days=days)
expiry = expiry.strftime("%d/%m/%Y %H:%M:%S")
expiryBlock = str(domain['stats']['renewalPeriodEnd'])
line = line.replace("{expiry}",expiry)
line = line.replace("{state}",domain['state'])
line = line.replace("{expiryBlock}",expiryBlock)
line = line.replace("{value}",str(domain['value']/1000000))
line = line.replace("{maxBid}",str(domain['highest']/1000000))
line = line.replace("{openHeight}",str(domain['height']))
return lines
def convertHNS(value: int):
return value/1000000 |