firewalletbrowser/account.py

252 lines
6.1 KiB
Python
Raw Normal View History

2023-12-28 13:34:48 +11:00
from handywrapper import api
import os
import dotenv
import requests
2023-12-28 16:04:45 +11:00
import re
import domainLookup
import json
2023-12-28 13:34:48 +11:00
dotenv.load_dotenv()
APIKEY = os.getenv("hsd_api")
hsd = api.hsd(APIKEY)
hsw = api.hsw(APIKEY)
# Verify the connection
response = hsd.getInfo()
def check_account(cookie: str):
2023-12-28 16:04:45 +11:00
if cookie is None:
return False
2023-12-28 13:34:48 +11:00
# Check the account
if cookie.count(":") < 1:
return False
account = cookie.split(":")[0]
# Check if the account is valid
info = hsw.getAccountInfo(account, 'default')
if 'error' in info:
return False
return account
def getBalance(account: str):
# Get the total balance
info = hsw.getBalance('default',account)
if 'error' in info:
return {'available': 0, 'total': 0}
total = info['confirmed']
available = total - info['lockedConfirmed']
# Convert to HNS
total = total / 1000000
available = available / 1000000
# Only keep 2 decimal places
total = round(total, 2)
available = round(available, 2)
return {'available': available, 'total': total}
2023-12-28 16:58:11 +11:00
def getAddress(account: str):
# Get the address
info = hsw.getAccountInfo(account, 'default')
if 'error' in info:
return ''
return info['receiveAddress']
2023-12-28 13:34:48 +11:00
def getPendingTX(account: str):
# Get the pending transactions
2023-12-28 18:31:44 +11:00
info = hsw.getWalletTxHistory(account)
2023-12-28 13:34:48 +11:00
if 'error' in info:
return 0
pending = 0
for tx in info:
if tx['confirmations'] < 1:
pending += 1
return pending
def getDomains(account):
# Get the domains
# info = hsw.getWalletNames(account)
# if 'error' in info:
# return []
# use requests to get the domains
response = requests.get(f"http://x:{APIKEY}@127.0.0.1:12039/wallet/{account}/name?own=true")
info = response.json()
return info
def getTransactions(account):
# Get the transactions
info = hsw.getWalletTxHistory(account)
if 'error' in info:
return []
2023-12-28 16:04:45 +11:00
return info
def check_address(address: str, allow_name: bool = True, return_address: bool = False):
# Check if the address is valid
if address.startswith('@'):
# Check if the address is a name
if not allow_name and not return_address:
return 'Invalid address'
elif not allow_name and return_address:
return False
return check_hip2(address[1:])
# Check if the address is a valid HNS address
response = requests.post(f"http://x:{APIKEY}@127.0.0.1:12037",json={
"method": "validateaddress",
"params": [address]
}).json()
if response['error'] is not None:
if return_address:
return False
return 'Invalid address'
if response['result']['isvalid'] == True:
if return_address:
return address
return 'Valid address'
if return_address:
return False
return 'Invalid address'
def check_hip2(domain: str):
# Check if the domain is valid
domain = domain.lower()
if re.match(r'^[a-zA-Z0-9\-\.]{1,63}$', domain) is None:
return 'Invalid address'
address = domainLookup.hip2(domain)
2023-12-28 16:32:20 +11:00
if address.startswith("Hip2: "):
return address
2023-12-28 16:04:45 +11:00
if not check_address(address, False,True):
return 'Hip2: Lookup succeeded but address is invalid'
return address
def send(account,address,amount):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
2023-12-28 16:32:20 +11:00
response = hsw.rpc_selectWallet(account_name)
if response['error'] is not None:
2023-12-28 16:04:45 +11:00
return {
2023-12-28 16:32:20 +11:00
"error": response['error']['message']
2023-12-28 16:04:45 +11:00
}
2023-12-28 16:32:20 +11:00
response = hsw.rpc_walletPassphrase(password,10)
# Unlock the account
# response = requests.post(f"http://x:{APIKEY}@127.0.0.1:12039/wallet/{account_name}/unlock",
# json={"passphrase": password,"timeout": 10})
if response['error'] is not None:
2023-12-28 16:04:45 +11:00
return {
2023-12-28 16:32:20 +11:00
"error": response['error']['message']
2023-12-28 16:04:45 +11:00
}
2023-12-28 16:32:20 +11:00
response = hsw.rpc_sendToAddress(address,amount)
if response['error'] is not None:
2023-12-28 16:04:45 +11:00
return {
2023-12-28 16:32:20 +11:00
"error": response['error']['message']
2023-12-28 16:04:45 +11:00
}
return {
"tx": response['result']
2023-12-28 18:04:38 +11:00
}
def getDomain(domain: str):
# Get the domain
response = hsd.rpc_getNameInfo(domain)
if response['error'] is not None:
return {
"error": response['error']['message']
}
2023-12-28 18:31:44 +11:00
return response['result']
def renewDomain(account,domain):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": "Invalid account"
}
response = hsw.sendRENEW(account_name,password,domain)
return response
def getDNS(domain: str):
# Get the DNS
response = hsd.rpc_getNameResource(domain)
if response['error'] is not None:
return {
"error": response['error']['message']
}
return response['result']['records']
2023-12-29 11:42:20 +11:00
def getNodeSync():
response = hsd.getInfo()
return response['chain']['progress']*100
2023-12-29 12:42:07 +11:00
def getBids(account, domain):
response = hsw.getWalletBidsByName(domain,account)
return response
def rescan_auction(account,domain):
# Get height of the start of the auction
response = hsw.rpc_selectWallet(account)
response = hsd.rpc_getNameInfo(domain)
if 'result' not in response:
return {
"error": "Invalid domain"
}
if 'bidPeriodStart' not in response['result']['info']['stats']:
return {
"error": "Not in auction"
}
height = response['result']['info']['stats']['bidPeriodStart']
response = hsw.rpc_importName(domain,height)
return response
def bid(account,domain,bid,blind):
account_name = check_account(account)
password = ":".join(account.split(":")[1:])
if account_name == False:
return {
"error": "Invalid account"
}
bid = int(bid)*1000000
lockup = int(blind)*1000000 + bid
try:
response = hsw.sendBID(account_name,password,domain,bid,lockup)
return response
except Exception as e:
return {
"error": str(e)
}