firewallet-mobile-api/hsd.py
Nathan Woodburn 1a7b62efa2
All checks were successful
Build Docker / BuildImage (push) Successful in 33s
fix: Balance should return hide domain value and fix send max
2024-06-04 15:17:17 +10:00

384 lines
9.4 KiB
Python

import functools
import time
import requests
import dotenv
import os
import json
import uuid
import hns
dotenv.load_dotenv()
HSD_API_KEY = os.getenv('HSD_API_KEY')
HSD_API_IP = os.getenv('HSD_API_IP')
if not HSD_API_IP:
HSD_API_IP = '127.0.0.1'
HSD_API_NETWORK = os.getenv('HSD_API_NETWORK')
if not HSD_API_NETWORK:
HSD_API_NETWORK = 'main'
network_port_int = {
'main': 1203,
'testnet': 1303,
'regtest': 1403,
'simnet': 1503
}
account_file = 'accounts.json'
wallet_file = 'wallets.json'
if not os.path.exists('.local'):
account_file = '/data/accounts.json'
wallet_file = '/data/wallets.json'
# Create any missing files
if not os.path.exists(account_file):
with open(account_file, 'w') as f:
json.dump({}, f, indent=4)
if not os.path.exists(wallet_file):
with open(wallet_file, 'w') as f:
json.dump([], f, indent=4)
def cache_with_ttl(max_age, maxsize=128, typed=False):
"""Least-recently-used cache decorator with time-based cache invalidation.
Args:
max_age: Time to live for cached results (in seconds).
maxsize: Maximum cache size (see `functools.lru_cache`).
typed: Cache on distinct input types (see `functools.lru_cache`).
"""
def _decorator(fn):
@functools.lru_cache(maxsize=maxsize, typed=typed)
def _new(*args, __time_salt, **kwargs):
return fn(*args, **kwargs)
@functools.wraps(fn)
def _wrapped(*args, **kwargs):
return _new(*args, **kwargs, __time_salt=int(time.time() / max_age))
return _wrapped
return _decorator
def url(walletURL:bool) -> str:
if walletURL:
return f'http://x:{HSD_API_KEY}@{HSD_API_IP}:{network_port_int[HSD_API_NETWORK]}9/'
else:
return f'http://x:{HSD_API_KEY}@{HSD_API_IP}:{network_port_int[HSD_API_NETWORK]}7/'
def rescan() -> dict:
return requests.post(url(True)+'rescan', json={'height':0}).json()
def get_master() -> dict:
return requests.get(url(True)+'a61ba47a-54de-43b9-9d22-fcf6827cd5c2/master').json()
def get_status() -> dict:
return requests.get(url(False)).json()
def get_wallets() -> dict:
return requests.get(url(True)+'wallet').json()
def create_account() -> dict:
# Generate a UUID
userID = str(uuid.uuid4())
with open(account_file, 'r') as f:
accounts = json.load(f)
# Check if the user already exists
if userID in accounts:
return {
'error': 'Please try again'
}
accounts[userID] = {
'wallets': []
}
with open(account_file, 'w') as f:
json.dump(accounts, f, indent=4)
return {
'userID': userID
}
def get_account(userID:str) -> dict:
accounts = get_accounts()
if userID in accounts:
return accounts[userID]
else:
return {
'error': 'User not found'
}
def get_accounts() -> dict:
# Read from accounts
with open(account_file, 'r') as f:
accounts = json.load(f)
return accounts
def import_wallet(name:str,userID:str,xpub:str) -> dict:
accounts = get_accounts()
if userID not in accounts:
return {
'error': 'User not found'
}
# Check if the wallet already exists
for wallet in accounts[userID]['wallets']:
if wallet['name'] == name:
return {
'error': 'Wallet already exists'
}
# Create the wallet using a UUID
walletID = str(uuid.uuid4())
with open(wallet_file, 'r') as f:
wallets = json.load(f)
if walletID in wallets:
return {
'error': 'Please try again'
}
wallet = {
'name': name,
'xpub': xpub,
'walletID': walletID
}
wallet_data = {
"watchOnly": True,
"accountKey": xpub,
}
response = requests.put(url(True)+'wallet/'+walletID, json=wallet_data)
if response.status_code != 200:
print(response.text)
return {
'error': 'Error creating wallet'
}
accounts[userID]['wallets'].append(wallet)
with open(account_file, 'w') as f:
json.dump(accounts, f, indent=4)
wallets.append(walletID)
with open(wallet_file, 'w') as f:
json.dump(wallets, f, indent=4)
# Rescan the wallet to get the balance
rescan()
return {
'walletID': walletID
}
@cache_with_ttl(60*60)
def get_wallet_UUID(userID:str, name:str) -> str|None:
accounts = get_accounts()
if userID not in accounts:
return None
walletID = None
for wallet in accounts[userID]['wallets']:
if wallet['name'] == name:
walletID = wallet['walletID']
break
return walletID
def get_wallet(userID:str, name:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
return requests.get(url(True)+'wallet/'+walletID).json()
def get_address(userID:str, name:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
request = requests.get(url(True)+'wallet/'+walletID+'/account/default').json()
return {
'address': request['receiveAddress']
}
def get_balance(userID:str, name:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
balance = requests.get(url(True)+'wallet/'+walletID+'/balance?account=default').json()
domains_value = get_domains_value(userID, name)
# Convert to human readable
return_json = {
'available': (balance['confirmed']-domains_value)/10**6,
'available_small': balance['confirmed']-domains_value,
'locked': balance['lockedConfirmed']/10**6,
'locked_small': balance['lockedConfirmed'],
}
return return_json
@cache_with_ttl(60*1)
def get_domains(userID:str, name:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
request = requests.get(url(True)+'wallet/'+walletID+'/name?own=true').json()
names = []
names_value = 0
for name in request:
names_value += name['value']
names.append({
'name': name['name'],
'state': name['state'],
'highest': name['highest']/10**6,
'value': name['value']/10**6,
'height': name['height'],
'stats': name['stats']
})
# Save name value to accounts
with open(account_file, 'r') as f:
accounts = json.load(f)
for account in accounts:
if account == userID:
wallets = accounts[account]['wallets']
for wallet in wallets:
if wallet['walletID'] == walletID:
wallet['names_value'] = names_value
break
with open(account_file, 'w') as f:
json.dump(accounts, f, indent=4)
return names
def get_domains_value(userID:str, name:str) -> int:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
with open(account_file, 'r') as f:
accounts = json.load(f)
for account in accounts:
if account == userID:
wallets = accounts[account]['wallets']
for wallet in wallets:
if wallet['walletID'] == walletID:
if 'names_value' in wallet:
return wallet['names_value']
names = get_domains(userID, name)
names_value = 0
for name in names:
names_value += name['value']
return names_value
def send_to_address(userID:str, name:str, address:str, amount:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
# Verify amount
amount = float(amount)
if amount <= 0:
return {
'error': 'Amount must be greater than 0'
}
balance = get_balance(userID, name)
if balance['available'] < amount:
return {
'error': 'Insufficient funds'
}
subtractFee = False
if balance['available'] - amount < 0.03:
subtractFee = True
amount = balance['available']
data = {
'outputs': [
{
'address': address,
'value': hns.to_small(amount)
}
],
'sign': False,
'subtractFee': subtractFee
}
response = requests.post(url(True)+'wallet/'+walletID+'/create', json=data)
if response.status_code != 200:
print(response.text)
return {
'error': 'Error sending'
}
return response.json()
@cache_with_ttl(60*1)
def get_transactions(userID:str, name:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
request = requests.get(url(True)+'wallet/'+walletID+'/tx/history').json()
return request
# region Auctions
@cache_with_ttl(60*1)
def get_auctions(userID:str, name:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
request = requests.get(url(True)+'wallet/'+walletID+'/auction')
return request.json()
# endregion