from functools import lru_cache import json from flask import ( Flask, make_response, redirect, request, jsonify, render_template, send_from_directory, send_file, ) import os import json import requests from datetime import datetime import dotenv import solana import solana.rpc from solana.rpc.api import Client import solana.rpc.api import solana.utils from solders.pubkey import Pubkey from solders.keypair import Keypair from solana.rpc.types import TokenAccountOpts from pycoingecko import CoinGeckoAPI from cachetools import TTLCache from cachetools import cached import threading import time from solders.transaction import Transaction import spl.token.client import cache import spl import spl.token from spl.token.instructions import create_associated_token_account,mint_to,MintToParams from solana.rpc.commitment import Confirmed from solana.rpc.api import Client from solana.rpc.types import TxOpts from solana.transaction import Transaction import asyncio dotenv.load_dotenv() app = Flask(__name__) solana_client = Client(os.getenv("SOLANA_URL")) blockFrost_API = os.getenv("BLOCKFROST") stWDBRN_token_mint = Pubkey.from_string( "mNT61ixgiLnggJ4qf5hDCNj7vTiCqnqysosjncrBydf") vault_sol_address = Pubkey.from_string( "NWywvhcqdkJsm1s9VVviPm9UfyDtyCW9t8kDb24PDPN") vault_cardano_address = "stake1uy4qd785pcds7ph2jue2lrhhxa698c5959375lqdv3yphcgwc8qna" vault_sui_address = "0x7e4fa1592e4fad084789f9fe1a4d7631a2e6477b658e777ae95351681bcbe8da" fiat = "USD" stablecoins = ["usdc", "usdt", "dai"] usd_to_aud_backup = 1.56 coingecko_client = CoinGeckoAPI() def find(name, path): for root, dirs, files in os.walk(path): if name in files: return os.path.join(root, name) # Assets routes @app.route("/assets/") def send_assets(path): if path.endswith(".json"): return send_from_directory( "templates/assets", path, mimetype="application/json" ) if os.path.isfile("templates/assets/" + path): return send_from_directory("templates/assets", path) # Try looking in one of the directories filename: str = path.split("/")[-1] if ( filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".svg") ): if os.path.isfile("templates/assets/img/" + filename): return send_from_directory("templates/assets/img", filename) if os.path.isfile("templates/assets/img/favicon/" + filename): return send_from_directory("templates/assets/img/favicon", filename) return render_template("404.html"), 404 # region Special routes @app.route("/favicon.png") def faviconPNG(): return send_from_directory("templates/assets/img", "favicon.png") @app.route("/.well-known/") def wellknown(path): # Try to proxy to https://nathan.woodburn.au/.well-known/ req = requests.get(f"https://nathan.woodburn.au/.well-known/{path}") return make_response( req.content, 200, {"Content-Type": req.headers["Content-Type"]} ) # endregion # region Main routes @app.route("/") def index(): tokenSupply = getTokenSupplyString() tokenValue = getTokenPrice() vaultBalance = getVaultBalance() vaultBalance = "{:.2f}".format(vaultBalance) return render_template("index.html", value=tokenValue, supply=tokenSupply, vault=vaultBalance, vault_aud=usd_to_aud(vaultBalance), value_aud=usd_to_aud(tokenValue)) @app.route("/embed") def embed(): tokenSupply = getTokenSupplyString() tokenValue = getTokenPrice() vaultBalance = getVaultBalance() vaultBalance = "{:.2f}".format(vaultBalance) return render_template("embed.html", value=tokenValue, supply=tokenSupply, vault=vaultBalance, vault_aud=usd_to_aud(vaultBalance), value_aud=usd_to_aud(tokenValue)) @app.route("/") def catch_all(path: str): if os.path.isfile("templates/" + path): return render_template(path) # Try with .html if os.path.isfile("templates/" + path + ".html"): return render_template(path + ".html") if os.path.isfile("templates/" + path.strip("/") + ".html"): return render_template(path.strip("/") + ".html") # Try to find a file matching if path.count("/") < 1: # Try to find a file matching filename = find(path, "templates") if filename: return send_file(filename) return render_template("404.html"), 404 # endregion # region Solana @cache.file_cache() def get_coin_price(coin_id): try: if coin_id in stablecoins: return 1 price = coingecko_client.get_price(ids=coin_id, vs_currencies=fiat) return price[coin_id][fiat.lower()] except: return 0 @cache.file_cache() def get_token_price(token_address: str, chain='solana'): try: price = coingecko_client.get_token_price( id=chain, contract_addresses=token_address, vs_currencies=fiat) return price[token_address][fiat.lower()] except: return 0 def getTokenSupplyString() -> str: supply = getTokenSupply() return "{:.2f}".format(supply) @cache.file_cache(120) def getTokenSupply() -> int: supply = solana_client.get_token_supply(stWDBRN_token_mint) return supply.value.ui_amount @cache.file_cache() def getSolBalance() -> int: SOLbalance = solana_client.get_balance( vault_sol_address).value / 1000000000 return SOLbalance def getSolValue() -> int: SOLbalance = getSolBalance() SOLPrice = get_coin_price("solana") return SOLbalance * SOLPrice def getVaultBalance() -> int: # Get balance of vault vaultBalance = 0 vaultBalance += getSolValue() tokens = getTokens() tokenValue = 0 for token in tokens: tokenValue += token["value"] vaultBalance += tokenValue vaultBalance += getCardanoValue(vault_cardano_address) vaultBalance += getOtherInvestmentsValue() return vaultBalance @cache.file_cache(120) def getTokens(chain:str=None): tokens = [] if chain == "solana" or chain == None: programID = Pubkey.from_string( "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") tokenAccounts = solana_client.get_token_accounts_by_owner( vault_sol_address, TokenAccountOpts(program_id=programID) ) tokenAccounts = tokenAccounts.value for tokenAccount in tokenAccounts: pubkey = tokenAccount.pubkey account = solana_client.get_token_account_balance(pubkey) mint = tokenAccount.account.data[:32] mint = Pubkey(mint) # Decode the mint token = { "mint": str(mint), "balance": account.value.ui_amount } token["price"] = get_token_price(token["mint"]) token["value"] = token["price"] * token["balance"] if token["value"] < 0.01: continue data = getTokenData(str(mint)) token["name"] = data["name"] token["symbol"] = data["symbol"] tokens.append(token) if chain == "sui" or chain == None: # Get SUI tokens tokens.extend(getSuiTokens(vault_sui_address)) return tokens def getTokens_nocache(chain:str=None): tokens = [] if chain == "solana" or chain == None: programID = Pubkey.from_string( "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") tokenAccounts = solana_client.get_token_accounts_by_owner( vault_sol_address, TokenAccountOpts(program_id=programID) ) tokenAccounts = tokenAccounts.value for tokenAccount in tokenAccounts: pubkey = tokenAccount.pubkey account = solana_client.get_token_account_balance(pubkey) mint = tokenAccount.account.data[:32] mint = Pubkey(mint) # Decode the mint token = { "mint": str(mint), "balance": account.value.ui_amount } token["price"] = get_token_price(token["mint"]) token["value"] = token["price"] * token["balance"] if token["value"] < 0.01: continue data = getTokenData(str(mint)) token["name"] = data["name"] token["symbol"] = data["symbol"] tokens.append(token) if chain == "sui" or chain == None: # Get SUI tokens tokens.extend(getSuiTokens(vault_sui_address)) return tokens def getTokenData(tokenMint, chain='solana'): if not os.path.exists("tokens"): os.makedirs("tokens") if os.path.exists(f"tokens/{tokenMint}.json"): with open(f"tokens/{tokenMint}.json") as f: data = json.load(f) return data else: data = coingecko_client.get_coin_info_from_contract_address_by_id( chain, tokenMint) with open(f"tokens/{tokenMint}.json", "w") as f: json.dump(data, f) return data def getTokenPrice(): # Get number of tokens minted supply = getTokenSupply() vaultBalance = getVaultBalance() value = vaultBalance/supply # Round value to 2 decimal places and ensure it shows 2 decimal places value = "{:.2f}".format(value) return value # endregion # region Cardano get_cardano_balance_cache = TTLCache(maxsize=1, ttl=3600) @cache.file_cache(300) def getCardanoBalance(address: str): # Get balance of cardano address if not os.path.exists("cache/cardano_balance.json"): with open("cache/cardano_balance.json", "w") as f: json.dump({"balance": 0}, f) last = 0 with open("cache/cardano_balance.json") as f: data = json.load(f) last = data["balance"] try: response = requests.get(f"https://cardano-mainnet.blockfrost.io/api/v0/accounts/{address}", headers={"project_id": blockFrost_API}) if response.status_code != 200: print("Error getting cardano balance",flush=True) # Get last known balance return last data = response.json() if "controlled_amount" in data: with open("cache/cardano_balance.json", "w") as f: json.dump({"balance": int(data["controlled_amount"]) / 1000000}, f) return int(data["controlled_amount"]) / 1000000 return last except: print("Error getting cardano balance",flush=True) return last def getCardanoValue(address: str): balance = getCardanoBalance(address) price = get_coin_price("cardano") return balance * price # endregion # region Sui @cache.file_cache(120) def getSuiTokens(address: str): url = "https://fullnode.mainnet.sui.io/" # Define the payload for the RPC call payload = { "jsonrpc": "2.0", "id": 1, "method": "suix_getAllBalances", "params": [ address ] } headers = { "Content-Type": "application/json" } # Make the POST request tokens = [] try: response = requests.post(url, json=payload, headers=headers) response.raise_for_status() # Raise an HTTPError for bad responses result = response.json() for coin in result['result']: token = { "mint": coin['coinType'], "balance": int(coin['totalBalance'])/10**9, } token["price"] = get_token_price(token["mint"], 'sui') token["value"] = token["price"] * token["balance"] if token["value"] < 0.01: continue data = getTokenData(str(token["mint"]), 'sui') token["name"] = data["name"] token["symbol"] = data["symbol"] tokens.append(token) except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") return tokens def getSuiTokens_nocache(address: str): url = "https://fullnode.mainnet.sui.io/" # Define the payload for the RPC call payload = { "jsonrpc": "2.0", "id": 1, "method": "suix_getAllBalances", "params": [ address ] } headers = { "Content-Type": "application/json" } # Make the POST request tokens = [] try: response = requests.post(url, json=payload, headers=headers) response.raise_for_status() # Raise an HTTPError for bad responses result = response.json() for coin in result['result']: token = { "mint": coin['coinType'], "balance": int(coin['totalBalance'])/10**9, } token["price"] = get_token_price(token["mint"], 'sui') token["value"] = token["price"] * token["balance"] if token["value"] < 0.01: continue data = getTokenData(str(token["mint"]), 'sui') token["name"] = data["name"] token["symbol"] = data["symbol"] tokens.append(token) except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") return tokens # endregion # region Other Investments @cache.file_cache(60) def getOtherInvestments(): data = requests.get("https://cloud.woodburn.au/s/stwdbrn_other/download/other_investments.json") return data.json() def getOtherInvestmentTypes(): data = getOtherInvestments() types = {} for investment in data: if investment["type"] not in types: types[investment["type"]] = { "name": investment["type"], "description": investment["type"], "value": 0, "amount": 0, } types[investment["type"]]["value"] += investment["value"] types[investment["type"]]["amount"] += 1 return types def getOtherInvestmentsValue(): data = getOtherInvestments() value = 0 for investment in data: value += investment["value"] return value # endregion # region API Routes @app.route("/api/v1/tokens") def api_tokens(): tokens = getTokens("solana") for t in tokens: t["url"] = f"https://explorer.solana.com/address/{t['mint']}" return jsonify(tokens) @app.route("/api/v1/token") def api_token(): # Get number of tokens minted supply = getTokenSupply() token = {} # Get balance of vault token["SOL"] = { "name": "Solana", "amount": round(getSolBalance() / supply,4), "value": round(getSolValue() / supply,2) } if token["SOL"]["value"] < 0.01: token["SOL"]["amount"] = 0 token["SOL"]["value"] = 0 tokens = getTokens() for t in tokens: token[t["symbol"].upper()] = t if token[t["symbol"].upper()]["value"]/supply < 0.01: token[t["symbol"].upper()]["amount"] = 0 token[t["symbol"].upper()]["value"] = 0 else: token[t["symbol"].upper()]["amount"] = t["balance"] / supply token[t["symbol"].upper()]["value"] = t["price"] * t["balance"] / supply # Round value to 4 decimal places token[t["symbol"].upper()]["value"] = round(token[t["symbol"].upper()]["value"], 2) token[t["symbol"].upper()]["amount"] = round(token[t["symbol"].upper()]["amount"], 4) # Remove balance key del token[t["symbol"].upper()]["balance"] token["ADA"] = { "name": "Cardano", "amount": round(getCardanoBalance(vault_cardano_address) / supply,4), "value": round(getCardanoValue(vault_cardano_address) / supply,2) } if token["ADA"]["value"] < 0.01: token["ADA"]["amount"] = 0 token["ADA"]["value"] = 0 # For each key add tooltip for key in token: token[key]["tooltip"] = f"{token[key]['amount']} {key} (${token[key]['value']})" other_investment_types = getOtherInvestmentTypes() for investment_type in other_investment_types: token[investment_type] = { "name": f'{other_investment_types[investment_type]["name"]} Positions', "description": other_investment_types[investment_type]["description"], "value": round(other_investment_types[investment_type]["value"] / supply,2), "amount": other_investment_types[investment_type]["amount"], "tooltip": f"{other_investment_types[investment_type]['amount']} Positions (${other_investment_types[investment_type]['value']})" } token["total"] = { "name": "stWDBRN", "description": "stWDBRN total value (USD)", "amount": 1, "value":float(getTokenPrice()) } return jsonify(token) @app.route("/api/v1/vault") def api_vault(): tokens = getTokens() vaultBalance = getVaultBalance() vaultBalance = "{:.2f}".format(vaultBalance) vault = {} vault["SOL"] = { "name": "Solana", "amount": round(getSolBalance(),4), "value": round(getSolValue(),2) } if vault["SOL"]["value"] < 0.01: vault["SOL"]["amount"] = 0 vault["SOL"]["value"] = 0 vault["ADA"] = { "name": "Cardano", "amount": round(getCardanoBalance(vault_cardano_address),4), "value": round(getCardanoValue(vault_cardano_address),2) } if vault["ADA"]["value"] < 0.01: vault["ADA"]["amount"] = 0 vault["ADA"]["value"] = 0 for t in tokens: vault[t["symbol"].upper()] = t if vault[t["symbol"].upper()]["value"] < 0.01: vault[t["symbol"].upper()]["amount"] = 0 vault[t["symbol"].upper()]["value"] = 0 else: vault[t["symbol"].upper()]["amount"] = t["balance"] vault[t["symbol"].upper()]["value"] = t["price"] * t["balance"] # Round value to 4 decimal places vault[t["symbol"].upper()]["value"] = round(vault[t["symbol"].upper()]["value"], 2) vault[t["symbol"].upper()]["amount"] = round(vault[t["symbol"].upper()]["amount"], 4) # Remove balance key del vault[t["symbol"].upper()]["balance"] # For each key add tooltip for key in vault: vault[key]["tooltip"] = f"{vault[key]['amount']} {key} (${vault[key]['value']})" other_investment_types = getOtherInvestmentTypes() for investment_type in other_investment_types: vault[investment_type] = { "name": f'{other_investment_types[investment_type]["name"]} Positions', "description": other_investment_types[investment_type]["description"], "value": other_investment_types[investment_type]["value"], "amount": other_investment_types[investment_type]["amount"], "tooltip": f"{other_investment_types[investment_type]['amount']} Positions (${'{:.2f}'.format(other_investment_types[investment_type]['value'])})" } vault["total"] = { "name": "Vault", "description": "Total Vault value (USD)", "value": vaultBalance } return jsonify(vault) @app.route("/api/v1/aud/") def api_aud(aud): aud = float(aud) usd = aud / aud_to_usd_rate() # Calculate the number of stWDBRN tokens price = float(getTokenPrice()) return jsonify({ "aud": aud, "usd": round(usd, 2), "stWDBRN": round(usd / price,2) }) def usd_to_aud(usd): usd = float(usd) value = usd * aud_to_usd_rate() # Round value to 2 decimal places and ensure it shows 2 decimal places value = "{:.2f}".format(value) return value @cache.file_cache(21600) def aud_to_usd_rate(): api_key = os.getenv("API_LAYER") resp = requests.get(f"https://apilayer.net/api/live?access_key={api_key}¤cies=AUD&source=USD&format=1") if resp.status_code != 200: print("Error getting AUD price") return usd_to_aud_backup data = resp.json() if "quotes" not in data: print("Error getting AUD price") return usd_to_aud_backup if "USDAUD" not in data["quotes"]: print("Error getting AUD price") return usd_to_aud_backup return data["quotes"]["USDAUD"] @app.route("/api/v1/other") @app.route("/api/v1/defi") def api_other_investments(): data = getOtherInvestments() return jsonify(data) @app.route("/api/v1/deposit",methods=["POST"]) def api_deposit(): # Get authorization header auth = request.headers.get("authorization") if not auth: return jsonify({"error": "Missing authorization header"}), 401 if auth != os.getenv("DEPOSIT_HEADER"): return jsonify({"error": "Invalid authorization header"}), 401 # Get data data = request.get_json() parseDeposit(data) return jsonify(data) def parseDeposit(data): for tx in data: if 'nativeTransfers' not in tx: continue if 'tokenTransfers' not in tx: continue # Skip memo txs if 'Memo' in json.dumps(tx): print(f"Skipping deposit as it contains memo: {tx['description']}") continue signature = tx['signature'] if not os.path.exists(f"cache/txs.json"): with open(f"cache/txs.json", "w") as f: json.dump([], f) with open(f"cache/txs.json") as f: txs = json.load(f) if signature in txs: print(f"Skipping duplicate tx: {signature}") continue txs.append(signature) with open(f"cache/txs.json", "w") as f: json.dump(txs, f) for transfer in tx['nativeTransfers']: if transfer['toUserAccount'] != str(vault_sol_address): continue solAmount = transfer['amount'] / 1000000000 # Get USD value solValue = get_coin_price("solana") * solAmount usd_amount = solValue usd_amount = round(usd_amount, 9) asyncio.run(mint_stWDBRN(usd_amount, transfer['fromUserAccount'])) for transfer in tx['tokenTransfers']: if transfer['toUserAccount'] != str(vault_sol_address): continue # Get token data token_price = get_token_price(transfer['mint']) USDvalue = transfer['tokenAmount'] * token_price usd_amount = USDvalue usd_amount = round(usd_amount, 9) asyncio.run(mint_stWDBRN(usd_amount, transfer['fromUserAccount'])) def stWDBRN_nocache(): supply = solana_client.get_token_supply(stWDBRN_token_mint) supply = supply.value.ui_amount vaultBalance = 0 SOLbalance = solana_client.get_balance( vault_sol_address).value / 1000000000 SOLPrice = get_coin_price("solana") vaultBalance += SOLbalance * SOLPrice tokens = getTokens_nocache() tokenValue = 0 for token in tokens: tokenValue += token["value"] vaultBalance += tokenValue vaultBalance += getCardanoValue(vault_cardano_address) vaultBalance += getOtherInvestmentsValue() stWDBRN_price = vaultBalance/supply return stWDBRN_price async def mint_stWDBRN(USD_amount, to_user_account): if USD_amount < 0.5: print(f"Skipping minting of {USD_amount} USD to {to_user_account} as it is less than 0.5", flush=True) return # Get stWDBRN price no cache stWDBRN_price = stWDBRN_nocache() amount = USD_amount / stWDBRN_price # Small fee for minting if amount > 10: amount = amount - 0.05 print(f"Minting {amount} stWDBRN to {to_user_account}", flush=True) TOKEN_PROGRAM_ID = Pubkey.from_string("TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb") WALLET = os.getenv("WALLET") walletbytes = [] WALLET = json.loads(WALLET) for i in WALLET: walletbytes.append(int(i)) wallet_keypair = Keypair.from_bytes(walletbytes) token = spl.token.client.Token(solana_client,stWDBRN_token_mint,TOKEN_PROGRAM_ID,wallet_keypair) to_Pubkey = Pubkey.from_string(to_user_account) # Check if account exists account = token.get_accounts_by_owner(to_Pubkey) if len(account.value) > 1: print(f"ERROR getting token account") return if len(account.value) == 0: print("NEED TO MINT ACCOUNT") # Create account to_Pubkey = token.create_account(to_Pubkey) print(f"Created token account {to_Pubkey}") else: to_Pubkey = account.value[0].pubkey print(token.mint_to(to_Pubkey,wallet_keypair,int(amount*10**9))) # endregion # region Error Catching # 404 catch all @app.errorhandler(404) def not_found(e): return render_template("404.html"), 404 # endregion if __name__ == "__main__": app.run(debug=True, port=5000, host="0.0.0.0")