Nathan Woodburn
1a7b62efa2
All checks were successful
Build Docker / BuildImage (push) Successful in 33s
384 lines
9.4 KiB
Python
384 lines
9.4 KiB
Python
import functools
|
|
import time
|
|
import requests
|
|
import dotenv
|
|
import os
|
|
import json
|
|
import uuid
|
|
import hns
|
|
|
|
dotenv.load_dotenv()
|
|
|
|
HSD_API_KEY = os.getenv('HSD_API_KEY')
|
|
HSD_API_IP = os.getenv('HSD_API_IP')
|
|
if not HSD_API_IP:
|
|
HSD_API_IP = '127.0.0.1'
|
|
|
|
HSD_API_NETWORK = os.getenv('HSD_API_NETWORK')
|
|
if not HSD_API_NETWORK:
|
|
HSD_API_NETWORK = 'main'
|
|
|
|
network_port_int = {
|
|
'main': 1203,
|
|
'testnet': 1303,
|
|
'regtest': 1403,
|
|
'simnet': 1503
|
|
}
|
|
|
|
|
|
account_file = 'accounts.json'
|
|
wallet_file = 'wallets.json'
|
|
|
|
if not os.path.exists('.local'):
|
|
account_file = '/data/accounts.json'
|
|
wallet_file = '/data/wallets.json'
|
|
|
|
# Create any missing files
|
|
if not os.path.exists(account_file):
|
|
with open(account_file, 'w') as f:
|
|
json.dump({}, f, indent=4)
|
|
|
|
if not os.path.exists(wallet_file):
|
|
with open(wallet_file, 'w') as f:
|
|
json.dump([], f, indent=4)
|
|
|
|
def cache_with_ttl(max_age, maxsize=128, typed=False):
|
|
"""Least-recently-used cache decorator with time-based cache invalidation.
|
|
|
|
Args:
|
|
max_age: Time to live for cached results (in seconds).
|
|
maxsize: Maximum cache size (see `functools.lru_cache`).
|
|
typed: Cache on distinct input types (see `functools.lru_cache`).
|
|
"""
|
|
def _decorator(fn):
|
|
@functools.lru_cache(maxsize=maxsize, typed=typed)
|
|
def _new(*args, __time_salt, **kwargs):
|
|
return fn(*args, **kwargs)
|
|
|
|
@functools.wraps(fn)
|
|
def _wrapped(*args, **kwargs):
|
|
return _new(*args, **kwargs, __time_salt=int(time.time() / max_age))
|
|
|
|
return _wrapped
|
|
|
|
return _decorator
|
|
|
|
|
|
def url(walletURL:bool) -> str:
|
|
if walletURL:
|
|
return f'http://x:{HSD_API_KEY}@{HSD_API_IP}:{network_port_int[HSD_API_NETWORK]}9/'
|
|
else:
|
|
return f'http://x:{HSD_API_KEY}@{HSD_API_IP}:{network_port_int[HSD_API_NETWORK]}7/'
|
|
|
|
|
|
def rescan() -> dict:
|
|
return requests.post(url(True)+'rescan', json={'height':0}).json()
|
|
|
|
def get_master() -> dict:
|
|
return requests.get(url(True)+'a61ba47a-54de-43b9-9d22-fcf6827cd5c2/master').json()
|
|
|
|
def get_status() -> dict:
|
|
return requests.get(url(False)).json()
|
|
|
|
def get_wallets() -> dict:
|
|
return requests.get(url(True)+'wallet').json()
|
|
|
|
def create_account() -> dict:
|
|
# Generate a UUID
|
|
userID = str(uuid.uuid4())
|
|
|
|
with open(account_file, 'r') as f:
|
|
accounts = json.load(f)
|
|
|
|
# Check if the user already exists
|
|
if userID in accounts:
|
|
return {
|
|
'error': 'Please try again'
|
|
}
|
|
|
|
accounts[userID] = {
|
|
'wallets': []
|
|
}
|
|
|
|
with open(account_file, 'w') as f:
|
|
json.dump(accounts, f, indent=4)
|
|
|
|
return {
|
|
'userID': userID
|
|
}
|
|
|
|
def get_account(userID:str) -> dict:
|
|
accounts = get_accounts()
|
|
if userID in accounts:
|
|
return accounts[userID]
|
|
else:
|
|
return {
|
|
'error': 'User not found'
|
|
}
|
|
|
|
def get_accounts() -> dict:
|
|
# Read from accounts
|
|
with open(account_file, 'r') as f:
|
|
accounts = json.load(f)
|
|
return accounts
|
|
|
|
|
|
def import_wallet(name:str,userID:str,xpub:str) -> dict:
|
|
accounts = get_accounts()
|
|
|
|
if userID not in accounts:
|
|
return {
|
|
'error': 'User not found'
|
|
}
|
|
|
|
# Check if the wallet already exists
|
|
for wallet in accounts[userID]['wallets']:
|
|
if wallet['name'] == name:
|
|
return {
|
|
'error': 'Wallet already exists'
|
|
}
|
|
|
|
# Create the wallet using a UUID
|
|
walletID = str(uuid.uuid4())
|
|
with open(wallet_file, 'r') as f:
|
|
wallets = json.load(f)
|
|
|
|
if walletID in wallets:
|
|
return {
|
|
'error': 'Please try again'
|
|
}
|
|
|
|
wallet = {
|
|
'name': name,
|
|
'xpub': xpub,
|
|
'walletID': walletID
|
|
}
|
|
|
|
wallet_data = {
|
|
"watchOnly": True,
|
|
"accountKey": xpub,
|
|
}
|
|
response = requests.put(url(True)+'wallet/'+walletID, json=wallet_data)
|
|
if response.status_code != 200:
|
|
print(response.text)
|
|
return {
|
|
'error': 'Error creating wallet'
|
|
}
|
|
|
|
accounts[userID]['wallets'].append(wallet)
|
|
with open(account_file, 'w') as f:
|
|
json.dump(accounts, f, indent=4)
|
|
|
|
wallets.append(walletID)
|
|
with open(wallet_file, 'w') as f:
|
|
json.dump(wallets, f, indent=4)
|
|
|
|
# Rescan the wallet to get the balance
|
|
rescan()
|
|
|
|
return {
|
|
'walletID': walletID
|
|
}
|
|
|
|
|
|
@cache_with_ttl(60*60)
|
|
def get_wallet_UUID(userID:str, name:str) -> str|None:
|
|
accounts = get_accounts()
|
|
|
|
if userID not in accounts:
|
|
return None
|
|
|
|
walletID = None
|
|
for wallet in accounts[userID]['wallets']:
|
|
if wallet['name'] == name:
|
|
walletID = wallet['walletID']
|
|
break
|
|
|
|
return walletID
|
|
|
|
def get_wallet(userID:str, name:str) -> dict:
|
|
walletID = get_wallet_UUID(userID, name)
|
|
if not walletID:
|
|
return {
|
|
'error': 'Wallet not found'
|
|
}
|
|
|
|
return requests.get(url(True)+'wallet/'+walletID).json()
|
|
|
|
def get_address(userID:str, name:str) -> dict:
|
|
walletID = get_wallet_UUID(userID, name)
|
|
if not walletID:
|
|
return {
|
|
'error': 'Wallet not found'
|
|
}
|
|
|
|
request = requests.get(url(True)+'wallet/'+walletID+'/account/default').json()
|
|
return {
|
|
'address': request['receiveAddress']
|
|
}
|
|
|
|
def get_balance(userID:str, name:str) -> dict:
|
|
walletID = get_wallet_UUID(userID, name)
|
|
if not walletID:
|
|
return {
|
|
'error': 'Wallet not found'
|
|
}
|
|
|
|
balance = requests.get(url(True)+'wallet/'+walletID+'/balance?account=default').json()
|
|
|
|
domains_value = get_domains_value(userID, name)
|
|
|
|
# Convert to human readable
|
|
return_json = {
|
|
'available': (balance['confirmed']-domains_value)/10**6,
|
|
'available_small': balance['confirmed']-domains_value,
|
|
'locked': balance['lockedConfirmed']/10**6,
|
|
'locked_small': balance['lockedConfirmed'],
|
|
}
|
|
|
|
return return_json
|
|
|
|
|
|
@cache_with_ttl(60*1)
|
|
def get_domains(userID:str, name:str) -> dict:
|
|
walletID = get_wallet_UUID(userID, name)
|
|
if not walletID:
|
|
return {
|
|
'error': 'Wallet not found'
|
|
}
|
|
|
|
request = requests.get(url(True)+'wallet/'+walletID+'/name?own=true').json()
|
|
|
|
names = []
|
|
names_value = 0
|
|
for name in request:
|
|
names_value += name['value']
|
|
names.append({
|
|
'name': name['name'],
|
|
'state': name['state'],
|
|
'highest': name['highest']/10**6,
|
|
'value': name['value']/10**6,
|
|
'height': name['height'],
|
|
'stats': name['stats']
|
|
})
|
|
|
|
# Save name value to accounts
|
|
with open(account_file, 'r') as f:
|
|
accounts = json.load(f)
|
|
|
|
|
|
for account in accounts:
|
|
if account == userID:
|
|
wallets = accounts[account]['wallets']
|
|
for wallet in wallets:
|
|
if wallet['walletID'] == walletID:
|
|
wallet['names_value'] = names_value
|
|
break
|
|
|
|
with open(account_file, 'w') as f:
|
|
json.dump(accounts, f, indent=4)
|
|
|
|
return names
|
|
|
|
|
|
def get_domains_value(userID:str, name:str) -> int:
|
|
walletID = get_wallet_UUID(userID, name)
|
|
if not walletID:
|
|
return {
|
|
'error': 'Wallet not found'
|
|
}
|
|
|
|
with open(account_file, 'r') as f:
|
|
accounts = json.load(f)
|
|
|
|
for account in accounts:
|
|
if account == userID:
|
|
wallets = accounts[account]['wallets']
|
|
for wallet in wallets:
|
|
if wallet['walletID'] == walletID:
|
|
if 'names_value' in wallet:
|
|
return wallet['names_value']
|
|
|
|
|
|
names = get_domains(userID, name)
|
|
names_value = 0
|
|
for name in names:
|
|
names_value += name['value']
|
|
|
|
return names_value
|
|
|
|
|
|
def send_to_address(userID:str, name:str, address:str, amount:str) -> dict:
|
|
walletID = get_wallet_UUID(userID, name)
|
|
if not walletID:
|
|
return {
|
|
'error': 'Wallet not found'
|
|
}
|
|
|
|
# Verify amount
|
|
amount = float(amount)
|
|
if amount <= 0:
|
|
return {
|
|
'error': 'Amount must be greater than 0'
|
|
}
|
|
|
|
balance = get_balance(userID, name)
|
|
if balance['available'] < amount:
|
|
return {
|
|
'error': 'Insufficient funds'
|
|
}
|
|
|
|
subtractFee = False
|
|
if balance['available'] - amount < 0.03:
|
|
subtractFee = True
|
|
amount = balance['available']
|
|
|
|
data = {
|
|
'outputs': [
|
|
{
|
|
'address': address,
|
|
'value': hns.to_small(amount)
|
|
}
|
|
],
|
|
'sign': False,
|
|
'subtractFee': subtractFee
|
|
}
|
|
|
|
response = requests.post(url(True)+'wallet/'+walletID+'/create', json=data)
|
|
if response.status_code != 200:
|
|
print(response.text)
|
|
return {
|
|
'error': 'Error sending'
|
|
}
|
|
|
|
|
|
return response.json()
|
|
|
|
@cache_with_ttl(60*1)
|
|
def get_transactions(userID:str, name:str) -> dict:
|
|
walletID = get_wallet_UUID(userID, name)
|
|
if not walletID:
|
|
return {
|
|
'error': 'Wallet not found'
|
|
}
|
|
|
|
request = requests.get(url(True)+'wallet/'+walletID+'/tx/history').json()
|
|
return request
|
|
|
|
|
|
|
|
# region Auctions
|
|
|
|
@cache_with_ttl(60*1)
|
|
def get_auctions(userID:str, name:str) -> dict:
|
|
walletID = get_wallet_UUID(userID, name)
|
|
if not walletID:
|
|
return {
|
|
'error': 'Wallet not found'
|
|
}
|
|
|
|
request = requests.get(url(True)+'wallet/'+walletID+'/auction')
|
|
return request.json()
|
|
|
|
|
|
# endregion |