firewalletbrowser/account.py

1136 lines
30 KiB
Python
Raw Normal View History

2024-01-26 03:51:52 +11:00
from datetime import datetime, timedelta
2023-12-28 13:34:48 +11:00
from handywrapper import api
import os
import dotenv
import requests
2023-12-28 16:04:45 +11:00
import re
import domainLookup
import json
import time
2023-12-28 13:34:48 +11:00
dotenv.load_dotenv()
HSD_API = os.getenv("HSD_API")
HSD_IP = os.getenv("HSD_IP")
2025-01-31 11:01:10 +11:00
if HSD_IP is None:
HSD_IP = "localhost"
HSD_NETWORK = os.getenv("HSD_NETWORK")
2025-01-31 11:01:10 +11:00
HSD_WALLET_PORT = 12039
HSD_NODE_PORT = 12037
if not HSD_NETWORK:
HSD_NETWORK = "main"
else:
HSD_NETWORK = HSD_NETWORK.lower()
if HSD_NETWORK == "simnet":
HSD_WALLET_PORT = 15039
HSD_NODE_PORT = 15037
elif HSD_NETWORK == "testnet":
HSD_WALLET_PORT = 13039
HSD_NODE_PORT = 13037
elif HSD_NETWORK == "regtest":
HSD_WALLET_PORT = 14039
HSD_NODE_PORT = 14037
2024-01-29 15:48:36 +11:00
SHOW_EXPIRED = os.getenv("SHOW_EXPIRED")
if SHOW_EXPIRED is None:
SHOW_EXPIRED = False
2024-01-29 15:48:36 +11:00
2025-01-31 11:01:10 +11:00
hsd = api.hsd(HSD_API,HSD_IP,HSD_NODE_PORT)
hsw = api.hsw(HSD_API,HSD_IP,HSD_WALLET_PORT)
2024-01-29 15:42:54 +11:00
cacheTime = 3600
2023-12-28 13:34:48 +11:00
# Verify the connection
response = hsd.getInfo()
EXCLUDE = ["primary"]
if os.getenv("EXCLUDE") is not None:
EXCLUDE = os.getenv("EXCLUDE").split(",")
2023-12-28 13:34:48 +11:00
def hsdConnected():
if hsdVersion() == -1:
return False
return True
def hsdVersion(format=True):
info = hsd.getInfo()
if 'error' in info:
return -1
if format:
return float('.'.join(info['version'].split(".")[:2]))
else:
return info['version']
2023-12-28 13:34:48 +11:00
def check_account(cookie: str):
2023-12-28 16:04:45 +11:00
if cookie is None:
return False
2023-12-28 13:34:48 +11:00
# Check the account
if cookie.count(":") < 1:
return False
account = cookie.split(":")[0]
# Check if the account is valid
info = hsw.getAccountInfo(account, 'default')
if 'error' in info:
return False
return account
2024-01-26 03:51:52 +11:00
def check_password(cookie: str, password: str):
account = check_account(cookie)
if account == False:
return False
# Check if the password is valid
info = hsw.rpc_selectWallet(account)
if info['error'] is not None:
return False
info = hsw.rpc_walletPassphrase(password,1)
2024-01-26 03:51:52 +11:00
if info['error'] is not None:
if info['error']['message'] != "Wallet is not encrypted.":
return False
2024-01-26 03:51:52 +11:00
return True
def createWallet(account: str, password: str):
if not hsdConnected():
return {
"error": {
"message": "Node not connected"
}
}
# Create the account
# Python wrapper doesn't support this yet
2025-01-31 11:01:10 +11:00
response = requests.put(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}")
if response.status_code != 200:
return {
"error": {
"message": "Error creating account"
}
}
# Get seed
seed = hsw.getMasterHDKey(account)
seed = seed['mnemonic']['phrase']
# Encrypt the wallet (python wrapper doesn't support this yet)
2025-01-31 11:01:10 +11:00
response = requests.post(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}/passphrase",
json={"passphrase": password})
return {
"seed": seed,
"account": account,
"password": password
}
2024-02-04 16:55:28 +11:00
def importWallet(account: str, password: str,seed: str):
if not hsdConnected():
return {
"error": {
"message": "Node not connected"
}
}
2024-02-04 16:55:28 +11:00
# Import the wallet
data = {
"passphrase": password,
"mnemonic": seed,
}
2025-01-31 11:01:10 +11:00
response = requests.put(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}",json=data)
2024-02-04 16:55:28 +11:00
if response.status_code != 200:
return {
"error": {
"message": "Error creating account"
}
}
return {
"seed": seed,
"account": account,
"password": password
}
def listWallets():
# List the wallets
response = hsw.listWallets()
# Check if response is json or an array
if isinstance(response, list):
# Remove excluded wallets
response = [wallet for wallet in response if wallet not in EXCLUDE]
return response
return ['Wallet not connected']
2023-12-28 13:34:48 +11:00
def selectWallet(account: str):
# Select wallet
response = hsw.rpc_selectWallet(account)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
}
}
2023-12-28 13:34:48 +11:00
def getBalance(account: str):
# Get the total balance
info = hsw.getBalance('default',account)
if 'error' in info:
return {'available': 0, 'total': 0}
total = info['confirmed']
available = total - info['lockedConfirmed']
2024-02-13 11:18:16 +11:00
locked = info['lockedConfirmed'] / 1000000
2023-12-28 13:34:48 +11:00
# Convert to HNS
total = total / 1000000
available = available / 1000000
domains = getDomains(account)
domainValue = 0
for domain in domains:
if domain['state'] == "CLOSED":
domainValue += domain['value']
total = total - (domainValue/1000000)
locked = locked - (domainValue/1000000)
2023-12-28 13:34:48 +11:00
# Only keep 2 decimal places
total = round(total, 2)
available = round(available, 2)
2024-02-13 11:18:16 +11:00
return {'available': available, 'total': total, 'locked': locked}
2023-12-28 13:34:48 +11:00
2024-01-26 03:51:52 +11:00
def getBlockHeight():
# Get the block height
info = hsd.getInfo()
if 'error' in info:
return 0
return info['chain']['height']
2023-12-28 16:58:11 +11:00
def getAddress(account: str):
# Get the address
info = hsw.getAccountInfo(account, 'default')
if 'error' in info:
return ''
return info['receiveAddress']
2023-12-28 13:34:48 +11:00
def getPendingTX(account: str):
pending = 0
page = 1
pageSize = 10
while True:
txs = getTransactions(account,page,pageSize)
page+=1
pendingPage = 0
for tx in txs:
if tx['confirmations'] < 1:
pending+=1
pendingPage+=1
if pendingPage < pageSize:
break
2023-12-28 13:34:48 +11:00
return pending
2024-02-13 11:18:16 +11:00
def getDomains(account,own=True):
if own:
2025-01-31 11:01:10 +11:00
response = requests.get(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}/name?own=true")
2024-02-13 11:18:16 +11:00
else:
2025-01-31 11:01:10 +11:00
response = requests.get(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}/name")
2023-12-28 13:34:48 +11:00
info = response.json()
2024-11-21 15:40:02 +11:00
if SHOW_EXPIRED:
2024-11-21 15:40:02 +11:00
return info
# Remove any expired domains
domains = []
for domain in info:
if 'stats' in domain:
if 'daysUntilExpire' in domain['stats']:
if domain['stats']['daysUntilExpire'] < 0:
continue
domains.append(domain)
return domains
2023-12-28 13:34:48 +11:00
def getPageTXCache(account,page,size=100):
page = f"{page}-{size}"
if not os.path.exists(f'cache'):
os.mkdir(f'cache')
if not os.path.exists(f'cache/{account}_page.json'):
with open(f'cache/{account}_page.json', 'w') as f:
f.write('{}')
with open(f'cache/{account}_page.json') as f:
pageCache = json.load(f)
if page in pageCache and pageCache[page]['time'] > int(time.time()) - cacheTime:
return pageCache[page]['txid']
return None
def pushPageTXCache(account,page,txid,size=100):
page = f"{page}-{size}"
if not os.path.exists(f'cache/{account}_page.json'):
with open(f'cache/{account}_page.json', 'w') as f:
f.write('{}')
with open(f'cache/{account}_page.json') as f:
pageCache = json.load(f)
pageCache[page] = {
'time': int(time.time()),
'txid': txid
}
with open(f'cache/{account}_page.json', 'w') as f:
json.dump(pageCache, f,indent=4)
return pageCache[page]['txid']
def getTXFromPage(account,page,size=100):
if page == 1:
return getTransactions(account,1,size)[-1]['hash']
cached = getPageTXCache(account,page,size)
if cached:
return getPageTXCache(account,page,size)
previous = getTransactions(account,page,size)
if len(previous) == 0:
return None
hash = previous[-1]['hash']
pushPageTXCache(account,page,hash,size)
return hash
def getTransactions(account,page=1,limit=100):
2023-12-28 13:34:48 +11:00
# Get the transactions
if hsdVersion() < 7:
if page != 1:
return []
info = hsw.getWalletTxHistory(account)
if 'error' in info:
return []
return info[::-1]
lastTX = None
if page < 1:
return []
if page > 1:
lastTX = getTXFromPage(account,page-1,limit)
if lastTX:
2025-01-31 11:01:10 +11:00
response = requests.get(f'http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}/tx/history?reverse=true&limit={limit}&after={lastTX}')
elif page == 1:
2025-01-31 11:01:10 +11:00
response = requests.get(f'http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account}/tx/history?reverse=true&limit={limit}')
else:
2023-12-28 13:34:48 +11:00
return []
2023-12-28 16:04:45 +11:00
if response.status_code != 200:
print(response.text)
return []
data = response.json()
# Refresh the cache if the next page is different
nextPage = getPageTXCache(account,page,limit)
if nextPage is not None and nextPage != data[-1]['hash']:
print(f'Refreshing page {page}')
pushPageTXCache(account,page,data[-1]['hash'],limit)
return data
def getAllTransactions(account):
# Get the transactions
page = 0
txs = []
while True:
txs += getTransactions(account,page,1000)
if len(txs) == 0:
break
page += 1
return txs
2023-12-28 16:04:45 +11:00
def check_address(address: str, allow_name: bool = True, return_address: bool = False):
# Check if the address is valid
if address.startswith('@'):
# Check if the address is a name
if not allow_name and not return_address:
return 'Invalid address'
elif not allow_name and return_address:
return False
return check_hip2(address[1:])
# Check if the address is a valid HNS address
2025-01-31 11:01:10 +11:00
response = requests.post(f"http://x:{HSD_API}@{HSD_IP}:{HSD_NODE_PORT}",json={
2023-12-28 16:04:45 +11:00
"method": "validateaddress",
"params": [address]
}).json()
if response['error'] is not None:
if return_address:
return False
return 'Invalid address'
if response['result']['isvalid'] == True:
if return_address:
return address
return 'Valid address'
if return_address:
return False
return 'Invalid address'
def check_hip2(domain: str):
# Check if the domain is valid
domain = domain.lower()
if re.match(r'^[a-zA-Z0-9\-\.]{1,63}$', domain) is None:
return 'Invalid address'
address = domainLookup.hip2(domain)
2023-12-28 16:32:20 +11:00
if address.startswith("Hip2: "):
return address
2023-12-28 16:04:45 +11:00
if not check_address(address, False,True):
return 'Hip2: Lookup succeeded but address is invalid'
return address
def send(account,address,amount):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
2023-12-28 16:32:20 +11:00
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
2023-12-28 16:04:45 +11:00
return {
2024-01-26 03:51:52 +11:00
"error": {
"message": response['error']['message']
}
2023-12-28 16:04:45 +11:00
}
2023-12-28 16:32:20 +11:00
response = hsw.rpc_walletPassphrase(password,10)
# Unlock the account
2025-01-31 11:01:10 +11:00
# response = requests.post(f"http://x:{APIKEY}@{ip}:{HSD_WALLET_PORT}/wallet/{account_name}/unlock",
2023-12-28 16:32:20 +11:00
# json={"passphrase": password,"timeout": 10})
if response['error'] is not None:
2023-12-28 16:04:45 +11:00
return {
2024-01-26 03:51:52 +11:00
"error": {
"message": response['error']['message']
}
2023-12-28 16:04:45 +11:00
}
2023-12-28 16:32:20 +11:00
response = hsw.rpc_sendToAddress(address,amount)
if response['error'] is not None:
2023-12-28 16:04:45 +11:00
return {
2024-01-26 03:51:52 +11:00
"error": {
"message": response['error']['message']
}
2023-12-28 16:04:45 +11:00
}
return {
"tx": response['result']
2023-12-28 18:04:38 +11:00
}
def isOwnDomain(account,name: str):
domains = getDomains(account)
for domain in domains:
print(domain)
if domain['name'] == name:
return True
return False
2023-12-28 18:04:38 +11:00
def getDomain(domain: str):
# Get the domain
response = hsd.rpc_getNameInfo(domain)
if response['error'] is not None:
return {
2024-01-26 03:51:52 +11:00
"error": {
"message": response['error']['message']
}
2023-12-28 18:04:38 +11:00
}
2023-12-28 18:31:44 +11:00
return response['result']
def renewDomain(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
2024-01-26 03:51:52 +11:00
"error": {
"message": "Invalid account"
}
2023-12-28 18:31:44 +11:00
}
response = hsw.sendRENEW(account_name,password,domain)
return response
def getDNS(domain: str):
# Get the DNS
response = hsd.rpc_getNameResource(domain)
if response['error'] is not None:
return {
"error": response['error']['message']
}
if 'result' not in response:
return {
"error": "No DNS records"
}
2025-01-31 11:01:10 +11:00
if response['result'] == None:
return []
if 'records' not in response['result']:
return []
2023-12-28 18:31:44 +11:00
return response['result']['records']
2023-12-29 11:42:20 +11:00
2024-01-25 23:15:59 +11:00
def setDNS(account,domain,records):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
2024-01-26 03:51:52 +11:00
"error": {
"message": "Invalid account"
}
2024-01-25 23:15:59 +11:00
}
records = json.loads(records)
newRecords = []
TXTRecords = []
for record in records:
if record['type'] == 'TXT':
if 'txt' not in record:
TXTRecords.append(record['value'])
else:
for txt in record['txt']:
TXTRecords.append(txt)
2024-01-25 23:15:59 +11:00
elif record['type'] == 'NS':
newRecords.append({
'type': 'NS',
'ns': record['value']
})
elif record['type'] in ['GLUE4','GLUE6',"SYNTH4","SYNTH6"]:
newRecords.append({
'type': record['type'],
'ns': str(record['value']).split(' ')[0],
'address': str(record['value']).split(' ')[1]
})
else:
newRecords.append(record)
if len(TXTRecords) > 0:
newRecords.append({
'type': 'TXT',
'txt': TXTRecords
2024-02-13 13:41:56 +11:00
})
2024-01-25 23:15:59 +11:00
data = '{"records":'+str(newRecords).replace("'","\"")+'}'
response = hsw.sendUPDATE(account_name,password,domain,data)
return response
def register(account,domain):
# Maybe add default dns records?
return setDNS(account,domain,'[]')
2024-01-25 23:15:59 +11:00
2023-12-29 11:42:20 +11:00
def getNodeSync():
response = hsd.getInfo()
if 'error' in response:
return 0
2023-12-29 13:57:20 +11:00
sync = response['chain']['progress']*100
sync = round(sync, 2)
return sync
2023-12-29 12:42:07 +11:00
2025-01-14 15:54:25 +11:00
def getWalletStatus():
response = hsw.rpc_getWalletInfo()
if 'error' in response and response['error'] != None:
return "Error"
# return response
walletHeight = response['result']['height']
# Get the current block height
nodeHeight = getBlockHeight()
if walletHeight < nodeHeight:
return f"Scanning {walletHeight/nodeHeight*100:.2f}%"
elif walletHeight == nodeHeight:
return "Ready"
else:
return "Error wallet ahead of node"
2024-02-13 11:18:16 +11:00
def getBids(account, domain="NONE"):
if domain == "NONE":
response = hsw.getWalletBids(account)
else:
response = hsw.getWalletBidsByName(domain,account)
# Add backup for bids with no value
bids = []
for bid in response:
if 'value' not in bid:
bid['value'] = -1000000
bids.append(bid)
return bids
2023-12-29 12:42:07 +11:00
2023-12-29 13:57:20 +11:00
def getReveals(account,domain):
return hsw.getWalletRevealsByName(domain,account)
def getPendingReveals(account):
bids = getBids(account)
domains = getDomains(account,False)
pending = []
for domain in domains:
if domain['state'] == "REVEAL":
reveals = getReveals(account,domain['name'])
for bid in bids:
if bid['name'] == domain['name']:
state_found = False
for reveal in reveals:
if reveal['own'] == True:
if bid['value'] == reveal['value']:
state_found = True
if not state_found:
pending.append(bid)
return pending
#! TODO
def getPendingRedeems(account):
bids = getBids(account)
domains = getDomains(account,False)
pending = []
return pending
def getPendingRegisters(account):
bids = getBids(account)
domains = getDomains(account,False)
pending = []
for domain in domains:
if domain['state'] == "CLOSED" and domain['registered'] == False:
for bid in bids:
if bid['name'] == domain['name']:
if bid['value'] == domain['highest']:
pending.append(bid)
return pending
2023-12-29 13:57:20 +11:00
def getRevealTX(reveal):
prevout = reveal['prevout']
hash = prevout['hash']
index = prevout['index']
tx = hsd.getTxByHash(hash)
2025-01-31 11:01:10 +11:00
if 'inputs' not in tx:
print(f'Something is up with this tx: {hash}')
print(tx)
print('---')
# No idea what happened here
# Check if registered?
2025-01-31 11:01:10 +11:00
return None
2023-12-29 14:24:43 +11:00
return tx['inputs'][index]['prevout']['hash']
2023-12-29 13:57:20 +11:00
2023-12-29 14:24:43 +11:00
def revealAuction(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
2024-01-26 03:51:52 +11:00
"error": {
"message": "Invalid account"
}
2023-12-29 14:24:43 +11:00
}
try:
response = hsw.sendREVEAL(account_name,password,domain)
return response
except Exception as e:
return {
"error": str(e)
}
2023-12-29 13:57:20 +11:00
2024-02-13 11:18:16 +11:00
def revealAll(account):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
}
}
try:
# Try to select and login to the wallet
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
return
2025-01-31 11:01:10 +11:00
return requests.post(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}",json={"method": "sendbatch","params": [[["REVEAL"]]]}).json()
2024-02-13 11:18:16 +11:00
except Exception as e:
return {
"error": {
"message": str(e)
}
}
def redeemAll(account):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
}
}
try:
# Try to select and login to the wallet
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
return
return requests.post(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}",json={"method": "sendbatch","params": [[["REDEEM"]]]}).json()
except Exception as e:
return {
"error": {
"message": str(e)
}
}
def registerAll(account):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
}
}
# try:
domains = getPendingRegisters(account_name)
if len(domains) == 0:
return {
"error": {
"message": "Nothing to do."
}
}
batch = []
for domain in domains:
batch.append(["UPDATE",domain['name'],{"records":[]}])
return sendBatch(account,batch)
2023-12-29 12:42:07 +11:00
def rescan_auction(account,domain):
# Get height of the start of the auction
response = hsw.rpc_selectWallet(account)
response = hsd.rpc_getNameInfo(domain)
if 'result' not in response:
return {
"error": "Invalid domain"
}
if 'bidPeriodStart' not in response['result']['info']['stats']:
return {
"error": "Not in auction"
}
2023-12-29 13:57:20 +11:00
height = response['result']['info']['height']-1
2023-12-29 12:42:07 +11:00
response = hsw.rpc_importName(domain,height)
return response
def bid(account,domain,bid,blind):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
2024-01-26 03:51:52 +11:00
"error": {
"message": "Invalid account"
}
2023-12-29 12:42:07 +11:00
}
bid = int(bid)*1000000
lockup = int(blind)*1000000 + bid
try:
response = hsw.sendBID(account_name,password,domain,bid,lockup)
return response
except Exception as e:
return {
2024-01-26 03:51:52 +11:00
"error": {
"message": str(e)
}
2023-12-29 12:42:07 +11:00
}
2023-12-29 12:50:15 +11:00
def openAuction(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
2024-01-26 03:51:52 +11:00
"error": {
"message": "Invalid account"
}
2023-12-29 12:50:15 +11:00
}
try:
response = hsw.sendOPEN(account_name,password,domain)
return response
except Exception as e:
return {
2024-01-26 03:51:52 +11:00
"error": {
"message": str(e)
}
}
def transfer(account,domain,address):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
}
}
try:
response = hsw.sendTRANSFER(account_name,password,domain,address)
return response
except Exception as e:
return {
"error": {
"message": str(e)
}
}
def finalize(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
}
}
try:
2024-02-04 16:55:28 +11:00
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
}
}
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
}
}
response = hsw.rpc_sendFINALIZE(domain)
2024-01-26 03:51:52 +11:00
return response
except Exception as e:
return {
"error": {
"message": str(e)
}
}
def cancelTransfer(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
}
}
try:
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
}
}
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
}
}
response = hsw.rpc_sendCANCEL(domain)
return response
except Exception as e:
return {
"error": {
"message": str(e)
}
}
def revoke(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
}
}
try:
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
}
}
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
}
}
response = hsw.rpc_sendREVOKE(domain)
return response
2024-01-26 03:51:52 +11:00
except Exception as e:
return {
"error": {
"message": str(e)
}
}
2025-01-27 23:42:46 +11:00
def sendBatch(account, batch):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
}
}
try:
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
}
}
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
}
}
2025-01-31 11:01:10 +11:00
response = requests.post(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}",json={
2025-01-27 23:42:46 +11:00
"method": "sendbatch",
"params": [batch]
}).json()
if response['error'] is not None:
2025-01-28 13:20:44 +11:00
return response
2025-01-27 23:42:46 +11:00
if 'result' not in response:
return {
"error": {
"message": "No result"
}
}
return response['result']
except Exception as e:
return {
"error": {
"message": str(e)
}
}
#region settingsAPIs
def rescan():
try:
response = hsw.walletRescan(0)
return response
except Exception as e:
return {
"error": {
"message": str(e)
}
}
def resendTXs():
try:
response = hsw.walletResend()
return response
except Exception as e:
return {
"error": {
"message": str(e)
}
}
def zapTXs(account):
age = 60 * 20 # 20 minutes
account_name = check_account(account)
if account_name == False:
return {
"error": {
"message": "Invalid account"
}
}
try:
2025-01-31 11:01:10 +11:00
response = requests.post(f"http://x:{HSD_API}@{HSD_IP}:{HSD_WALLET_PORT}/wallet/{account_name}/zap",
json={"age": age,
"account": "default"
})
return response
except Exception as e:
return {
"error": {
"message": str(e)
}
}
def getxPub(account):
account_name = check_account(account)
if account_name == False:
return {
"error": {
"message": "Invalid account"
}
}
try:
print(account_name)
response = hsw.getAccountInfo(account_name,"default")
if 'error' in response:
return {
"error": {
"message": response['error']['message']
}
}
return response['accountKey']
return response
except Exception as e:
return {
"error": {
"message": str(e)
}
}
def signMessage(account,domain,message):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": {
"message": "Invalid account"
}
}
try:
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
}
}
response = hsw.rpc_walletPassphrase(password,10)
if response['error'] is not None:
return {
"error": {
"message": response['error']['message']
}
}
response = hsw.rpc_signMessageWithName(domain,message)
return response
except Exception as e:
return {
"error": {
"message": str(e)
}
}
def verifyMessageWithName(domain,signature,message):
try:
response = hsd.rpc_verifyMessageWithName(domain,signature,message)
if 'result' in response:
return response['result']
return False
except Exception as e:
return False
def verifyMessage(address,signature,message):
try:
response = hsd.rpc_verifyMessage(address,signature,message)
if 'result' in response:
return response['result']
return False
except Exception as e:
return False
#endregion
2024-01-26 03:51:52 +11:00
2024-02-05 22:40:13 +11:00
def generateReport(account,format="{name},{expiry},{value},{maxBid}"):
2024-01-26 03:51:52 +11:00
domains = getDomains(account)
lines = [format.replace("{","").replace("}","")]
for domain in domains:
line = format.replace("{name}",domain['name'])
expiry = "N/A"
expiryBlock = "N/A"
if 'daysUntilExpire' in domain['stats']:
days = domain['stats']['daysUntilExpire']
# Convert to dateTime
expiry = datetime.now() + timedelta(days=days)
expiry = expiry.strftime("%d/%m/%Y %H:%M:%S")
expiryBlock = str(domain['stats']['renewalPeriodEnd'])
line = line.replace("{expiry}",expiry)
line = line.replace("{state}",domain['state'])
line = line.replace("{expiryBlock}",expiryBlock)
line = line.replace("{value}",str(domain['value']/1000000))
line = line.replace("{maxBid}",str(domain['highest']/1000000))
line = line.replace("{openHeight}",str(domain['height']))
lines.append(line)
2024-02-05 22:40:13 +11:00
return lines
def convertHNS(value: int):
return value/1000000