firewallet-mobile-api/hsd.py
Nathan Woodburn f9af1b2606
All checks were successful
Build Docker / BuildImage (push) Successful in 45s
feat: Add initial watchonly server files
2024-06-03 12:22:28 +10:00

346 lines
8.2 KiB
Python

from functools import cache
import requests
import dotenv
import os
import json
import uuid
import hns
dotenv.load_dotenv()
HSD_API_KEY = os.getenv('HSD_API_KEY')
HSD_API_IP = os.getenv('HSD_API_IP')
if not HSD_API_IP:
HSD_API_IP = '127.0.0.1'
HSD_API_NETWORK = os.getenv('HSD_API_NETWORK')
if not HSD_API_NETWORK:
HSD_API_NETWORK = 'main'
network_port_int = {
'main': 1203,
'testnet': 1303,
'regtest': 1403,
'simnet': 1503
}
account_file = 'accounts.json'
wallet_file = 'wallets.json'
if not os.path.exists('.local'):
account_file = '/data/accounts.json'
wallet_file = '/data/wallets.json'
# Create any missing files
if not os.path.exists('accounts.json'):
with open('accounts.json', 'w') as f:
json.dump({}, f, indent=4)
if not os.path.exists('wallets.json'):
with open('wallets.json', 'w') as f:
json.dump([], f, indent=4)
def url(walletURL:bool) -> str:
if walletURL:
return f'http://x:{HSD_API_KEY}@{HSD_API_IP}:{network_port_int[HSD_API_NETWORK]}9/'
else:
return f'http://x:{HSD_API_KEY}@{HSD_API_IP}:{network_port_int[HSD_API_NETWORK]}7/'
def rescan() -> dict:
return requests.post(url(True)+'rescan', json={'height':0}).json()
def get_master() -> dict:
return requests.get(url(True)+'a61ba47a-54de-43b9-9d22-fcf6827cd5c2/master').json()
def get_status() -> dict:
return requests.get(url(False)).json()
def get_wallets() -> dict:
return requests.get(url(True)+'wallet').json()
def create_account() -> dict:
# Generate a UUID
userID = str(uuid.uuid4())
with open('accounts.json', 'r') as f:
accounts = json.load(f)
# Check if the user already exists
if userID in accounts:
return {
'error': 'Please try again'
}
accounts[userID] = {
'wallets': []
}
with open('accounts.json', 'w') as f:
json.dump(accounts, f, indent=4)
return {
'userID': userID
}
def get_account(userID:str) -> dict:
accounts = get_accounts()
if userID in accounts:
return accounts[userID]
else:
return {
'error': 'User not found'
}
def get_accounts() -> dict:
# Read from accounts
with open('accounts.json', 'r') as f:
accounts = json.load(f)
return accounts
def import_wallet(name:str,userID:str,xpub:str) -> dict:
accounts = get_accounts()
if userID not in accounts:
return {
'error': 'User not found'
}
# Check if the wallet already exists
for wallet in accounts[userID]['wallets']:
if wallet['name'] == name:
return {
'error': 'Wallet already exists'
}
# Create the wallet using a UUID
walletID = str(uuid.uuid4())
with open('wallets.json', 'r') as f:
wallets = json.load(f)
if walletID in wallets:
return {
'error': 'Please try again'
}
wallet = {
'name': name,
'xpub': xpub,
'walletID': walletID
}
wallet_data = {
"watchOnly": True,
"accountKey": xpub,
}
response = requests.put(url(True)+'wallet/'+walletID, json=wallet_data)
if response.status_code != 200:
print(response.text)
return {
'error': 'Error creating wallet'
}
accounts[userID]['wallets'].append(wallet)
with open('accounts.json', 'w') as f:
json.dump(accounts, f, indent=4)
wallets.append(walletID)
with open('wallets.json', 'w') as f:
json.dump(wallets, f, indent=4)
# Rescan the wallet to get the balance
rescan()
return {
'walletID': walletID
}
@cache
def get_wallet_UUID(userID:str, name:str) -> str|None:
accounts = get_accounts()
if userID not in accounts:
return None
walletID = None
for wallet in accounts[userID]['wallets']:
if wallet['name'] == name:
walletID = wallet['walletID']
break
return walletID
def get_wallet(userID:str, name:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
return requests.get(url(True)+'wallet/'+walletID).json()
def get_address(userID:str, name:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
request = requests.get(url(True)+'wallet/'+walletID+'/account/default').json()
return {
'address': request['receiveAddress']
}
def get_balance(userID:str, name:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
balance = requests.get(url(True)+'wallet/'+walletID+'/balance?account=default').json()
domains_value = get_domains_value(userID, name)
# Convert to human readable
return_json = {
'available': balance['confirmed']/10**6,
'available_small': balance['confirmed'],
'locked': (balance['lockedConfirmed']-domains_value)/10**6,
'locked_small': balance['lockedConfirmed']-domains_value,
}
return return_json
@cache
def get_domains(userID:str, name:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
request = requests.get(url(True)+'wallet/'+walletID+'/name?own=true').json()
names = []
names_value = 0
for name in request:
names_value += name['value']
names.append({
'name': name['name'],
'state': name['state'],
'highest': name['highest']/10**6,
'value': name['value']/10**6,
'height': name['height'],
'stats': name['stats']
})
# Save name value to accounts
with open('accounts.json', 'r') as f:
accounts = json.load(f)
for account in accounts:
if account == userID:
wallets = accounts[account]['wallets']
for wallet in wallets:
if wallet['walletID'] == walletID:
wallet['names_value'] = names_value
break
with open('accounts.json', 'w') as f:
json.dump(accounts, f, indent=4)
return names
def get_domains_value(userID:str, name:str) -> int:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
with open('accounts.json', 'r') as f:
accounts = json.load(f)
for account in accounts:
if account == userID:
wallets = accounts[account]['wallets']
for wallet in wallets:
if wallet['walletID'] == walletID:
if 'names_value' in wallet:
return wallet['names_value']
names = get_domains(userID, name)
names_value = 0
for name in names:
names_value += name['value']
return names_value
def send_to_address(userID:str, name:str, address:str, amount:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
# Verify amount
amount = float(amount)
if amount <= 0:
return {
'error': 'Amount must be greater than 0'
}
balance = get_balance(userID, name)
if balance['available'] < amount:
return {
'error': 'Insufficient funds'
}
data = {
'outputs': [
{
'address': address,
'value': hns.to_small(amount)
}
],
'sign': False
}
response = requests.post(url(True)+'wallet/'+walletID+'/create', json=data)
if response.status_code != 200:
print(response.text)
return {
'error': 'Error sending'
}
return response.json()
# region Auctions
@cache
def get_auctions(userID:str, name:str) -> dict:
walletID = get_wallet_UUID(userID, name)
if not walletID:
return {
'error': 'Wallet not found'
}
request = requests.get(url(True)+'wallet/'+walletID+'/auction')
return request.json()
# endregion