from functools import lru_cache import json from flask import ( Flask, make_response, redirect, request, jsonify, render_template, send_from_directory, send_file, ) import os import json import requests from datetime import datetime import dotenv import solana import solana.rpc from solana.rpc.api import Client import solana.rpc.api import solana.utils from solders.pubkey import Pubkey from solders.keypair import Keypair from solana.rpc.types import TokenAccountOpts from pycoingecko import CoinGeckoAPI from cachetools import TTLCache from cachetools import cached import threading import time from solders.transaction import Transaction import spl.token.client import cache import spl import spl.token from spl.token.instructions import create_associated_token_account,mint_to,MintToParams from solana.rpc.commitment import Confirmed from solana.rpc.api import Client from solana.rpc.types import TxOpts from solana.transaction import Transaction dotenv.load_dotenv() app = Flask(__name__) solana_client = Client(os.getenv("SOLANA_URL")) blockFrost_API = os.getenv("BLOCKFROST") stWDBRN_token_mint = Pubkey.from_string( "mNT61ixgiLnggJ4qf5hDCNj7vTiCqnqysosjncrBydf") vault_sol_address = Pubkey.from_string( "NWywvhcqdkJsm1s9VVviPm9UfyDtyCW9t8kDb24PDPN") vault_cardano_address = "stake1uy4qd785pcds7ph2jue2lrhhxa698c5959375lqdv3yphcgwc8qna" fiat = "USD" usd_to_fiat = 1 stablecoins = ["usdc", "usdt", "dai"] coingecko_client = CoinGeckoAPI() def find(name, path): for root, dirs, files in os.walk(path): if name in files: return os.path.join(root, name) # Assets routes @app.route("/assets/") def send_assets(path): if path.endswith(".json"): return send_from_directory( "templates/assets", path, mimetype="application/json" ) if os.path.isfile("templates/assets/" + path): return send_from_directory("templates/assets", path) # Try looking in one of the directories filename: str = path.split("/")[-1] if ( filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".svg") ): if os.path.isfile("templates/assets/img/" + filename): return send_from_directory("templates/assets/img", filename) if os.path.isfile("templates/assets/img/favicon/" + filename): return send_from_directory("templates/assets/img/favicon", filename) return render_template("404.html"), 404 # region Special routes @app.route("/favicon.png") def faviconPNG(): return send_from_directory("templates/assets/img", "favicon.png") @app.route("/.well-known/") def wellknown(path): # Try to proxy to https://nathan.woodburn.au/.well-known/ req = requests.get(f"https://nathan.woodburn.au/.well-known/{path}") return make_response( req.content, 200, {"Content-Type": req.headers["Content-Type"]} ) # endregion # region Main routes @app.route("/") def index(): tokenSupply = getTokenSupplyString() tokenValue = getTokenPrice() tokens = getTokens() solValue = getSolValue() vaultBalance = getVaultBalance() vaultBalance = "{:.2f}".format(vaultBalance) # For testing # tokenSupply = 20 # tokenValue = 1.01 # tokens = [{"symbol":"stWDBRN","name":"Stake With Us","value":1.01}] # solValue = 10 # vaultBalance = "20.00" pie_chart_data = [(token['symbol'].upper(), token['value'], f"{token['name']}: ${'{:.2f}'.format(token['value'])}") for token in tokens] pie_chart_data.append(("SOL", solValue, f"Solana: ${'{:.2f}'.format(solValue)}")) cardanoBalance = getCardanoValue(vault_cardano_address) cardanoBalance = "{:.2f}".format(cardanoBalance) pie_chart_data.append( ("ADA", cardanoBalance, f"Cardano: ${cardanoBalance}")) return render_template("index.html", value=tokenValue, supply=tokenSupply, vault=vaultBalance) @app.route("/") def catch_all(path: str): if os.path.isfile("templates/" + path): return render_template(path) # Try with .html if os.path.isfile("templates/" + path + ".html"): return render_template(path + ".html") if os.path.isfile("templates/" + path.strip("/") + ".html"): return render_template(path.strip("/") + ".html") # Try to find a file matching if path.count("/") < 1: # Try to find a file matching filename = find(path, "templates") if filename: return send_file(filename) return render_template("404.html"), 404 # endregion # region Solana @cache.file_cache() def get_coin_price(coin_id): try: if coin_id in stablecoins: return 1 * usd_to_fiat price = coingecko_client.get_price(ids=coin_id, vs_currencies=fiat) return price[coin_id][fiat.lower()] except: return 0 @cache.file_cache() def get_token_price(token_address: str): try: price = coingecko_client.get_token_price( id='solana', contract_addresses=token_address, vs_currencies=fiat) return price[token_address][fiat.lower()] except: return 0 @cache.file_cache() def getTokenSupplyString() -> str: supply = solana_client.get_token_supply(stWDBRN_token_mint) return supply.value.ui_amount_string @cache.file_cache() def getTokenSupply() -> int: supply = solana_client.get_token_supply(stWDBRN_token_mint) return supply.value.ui_amount @cache.file_cache() def getSolBalance() -> int: SOLbalance = solana_client.get_balance( vault_sol_address).value / 1000000000 return SOLbalance @cache.file_cache() def getSolValue() -> int: SOLbalance = solana_client.get_balance( vault_sol_address).value / 1000000000 SOLPrice = get_coin_price("solana") return SOLbalance * SOLPrice @cache.file_cache() def getVaultBalance() -> int: # Get balance of vault vaultBalance = 0 vaultBalance += getSolValue() tokens = getTokens() tokenValue = 0 for token in tokens: tokenValue += token["value"] vaultBalance += tokenValue vaultBalance += getCardanoValue(vault_cardano_address) return vaultBalance @cache.file_cache() def getTokens(): programID = Pubkey.from_string( "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") tokenAccounts = solana_client.get_token_accounts_by_owner( vault_sol_address, TokenAccountOpts(program_id=programID) ) tokenAccounts = tokenAccounts.value tokens = [] for tokenAccount in tokenAccounts: pubkey = tokenAccount.pubkey account = solana_client.get_token_account_balance(pubkey) mint = tokenAccount.account.data[:32] mint = Pubkey(mint) # Decode the mint token = { "mint": str(mint), "balance": account.value.ui_amount } token["price"] = get_token_price(token["mint"]) token["value"] = token["price"] * token["balance"] if token["value"] < 0.01: continue data = getTokenData(str(mint)) token["name"] = data["name"] token["symbol"] = data["symbol"] tokens.append(token) return tokens def getTokenData(tokenMint): if not os.path.exists("tokens"): os.makedirs("tokens") if os.path.exists(f"tokens/{tokenMint}.json"): with open(f"tokens/{tokenMint}.json") as f: data = json.load(f) return data else: data = coingecko_client.get_coin_info_from_contract_address_by_id( 'solana', tokenMint) with open(f"tokens/{tokenMint}.json", "w") as f: json.dump(data, f) return data def getTokenPrice(): # Get number of tokens minted supply = getTokenSupply() vaultBalance = getVaultBalance() value = vaultBalance/supply # Round value to 2 decimal places and ensure it shows 2 decimal places value = "{:.2f}".format(value) return value # endregion # region Cardano get_cardano_balance_cache = TTLCache(maxsize=1, ttl=3600) @cache.file_cache() def getCardanoBalance(address: str): # Get balance of cardano address try: response = requests.get(f"https://cardano-mainnet.blockfrost.io/api/v0/accounts/{address}", headers={"project_id": blockFrost_API}) if response.status_code != 200: print("Error getting cardano balance") return 0 data = response.json() if "controlled_amount" in data: return int(data["controlled_amount"]) / 1000000 return 0 except: print("Error getting cardano balance") return 0 def getCardanoValue(address: str): balance = getCardanoBalance(address) price = get_coin_price("cardano") return balance * price # endregion # region API Routes @app.route("/api/v1/token") def api_token(): # Get number of tokens minted supply = getTokenSupply() token = {} # Get balance of vault token["SOL"] = { "name": "Solana", "amount": round(getSolBalance() / supply,4), "value": round(getSolValue() / supply,2) } if token["SOL"]["value"] < 0.01: token["SOL"]["amount"] = 0 token["SOL"]["value"] = 0 tokens = getTokens() for t in tokens: token[t["symbol"].upper()] = t if token[t["symbol"].upper()]["value"]/supply < 0.01: token[t["symbol"].upper()]["amount"] = 0 token[t["symbol"].upper()]["value"] = 0 else: token[t["symbol"].upper()]["amount"] = t["balance"] / supply token[t["symbol"].upper()]["value"] = t["price"] * t["balance"] / supply # Round value to 4 decimal places token[t["symbol"].upper()]["value"] = round(token[t["symbol"].upper()]["value"], 2) token[t["symbol"].upper()]["amount"] = round(token[t["symbol"].upper()]["amount"], 4) # Remove balance key del token[t["symbol"].upper()]["balance"] token["ADA"] = { "name": "Cardano", "amount": round(getCardanoBalance(vault_cardano_address) / supply,4), "value": round(getCardanoValue(vault_cardano_address) / supply,2) } if token["ADA"]["value"] < 0.01: token["ADA"]["amount"] = 0 token["ADA"]["value"] = 0 # For each key add tooltip for key in token: token[key]["tooltip"] = f"{token[key]['amount']} {key} (${token[key]['value']})" token["total"] = { "name": "stWDBRN", "description": "stWDBRN total value (USD)", "amount": 1, "value":float(getTokenPrice()) } return jsonify(token) @app.route("/api/v1/vault") def api_vault(): tokens = getTokens() vaultBalance = getVaultBalance() vaultBalance = "{:.2f}".format(vaultBalance) vault = {} vault["SOL"] = { "name": "Solana", "amount": round(getSolBalance(),4), "value": round(getSolValue(),2) } if vault["SOL"]["value"] < 0.01: vault["SOL"]["amount"] = 0 vault["SOL"]["value"] = 0 vault["ADA"] = { "name": "Cardano", "amount": round(getCardanoBalance(vault_cardano_address),4), "value": round(getCardanoValue(vault_cardano_address),2) } if vault["ADA"]["value"] < 0.01: vault["ADA"]["amount"] = 0 vault["ADA"]["value"] = 0 for t in tokens: vault[t["symbol"].upper()] = t if vault[t["symbol"].upper()]["value"] < 0.01: vault[t["symbol"].upper()]["amount"] = 0 vault[t["symbol"].upper()]["value"] = 0 else: vault[t["symbol"].upper()]["amount"] = t["balance"] vault[t["symbol"].upper()]["value"] = t["price"] * t["balance"] # Round value to 4 decimal places vault[t["symbol"].upper()]["value"] = round(vault[t["symbol"].upper()]["value"], 2) vault[t["symbol"].upper()]["amount"] = round(vault[t["symbol"].upper()]["amount"], 4) # Remove balance key del vault[t["symbol"].upper()]["balance"] # For each key add tooltip for key in vault: vault[key]["tooltip"] = f"{vault[key]['amount']} {key} (${vault[key]['value']})" vault["total"] = { "name": "Vault", "description": "Total Vault value (USD)", "value": vaultBalance } return jsonify(vault) @app.route("/api/v1/deposit",methods=["POST"]) def api_deposit(): # Get authorization header auth = request.headers.get("authorization") if not auth: return jsonify({"error": "Missing authorization header"}), 401 if auth != os.getenv("DEPOSIT_HEADER"): return jsonify({"error": "Invalid authorization header"}), 401 # Get data data = request.get_json() parseDeposit(data) return jsonify(data) def parseDeposit(data): stWDBRN_price = float(getTokenPrice()) for tx in data: if 'nativeTransfers' not in tx: continue if 'tokenTransfers' not in tx: continue # Search for string (TOPUP) if 'TOPUP' in json.dumps(tx): print("TOPUP: Skipping deposit") print(json.dumps(tx, indent=4)) continue for transfer in tx['nativeTransfers']: if transfer['toUserAccount'] != str(vault_sol_address): continue solAmount = transfer['amount'] / 1000000000 # Get USD value solValue = get_coin_price("solana") * solAmount stWDBRN_amount = solValue / stWDBRN_price stWDBRN_amount = round(stWDBRN_amount, 9) mint_stWDBRN(stWDBRN_amount, transfer['fromUserAccount']) for transfer in tx['tokenTransfers']: if transfer['toUserAccount'] != str(vault_sol_address): continue # Get token data token_price = get_token_price(transfer['mint']) USDvalue = transfer['tokenAmount'] * token_price stWDBRN_amount = USDvalue / stWDBRN_price stWDBRN_amount = round(stWDBRN_amount, 9) mint_stWDBRN(stWDBRN_amount, transfer['fromUserAccount']) def mint_stWDBRN(amount, to_user_account): if amount < 0.5: print(f"Skipping minting of {amount} stWDBRN to {to_user_account} as it is less than 0.5", flush=True) return print(f"Minting {amount} stWDBRN to {to_user_account}", flush=True) TOKEN_PROGRAM_ID = Pubkey.from_string("TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb") WALLET = os.getenv("WALLET") walletbytes = [] WALLET = json.loads(WALLET) for i in WALLET: walletbytes.append(int(i)) wallet_keypair = Keypair.from_bytes(walletbytes) token = spl.token.client.Token(solana_client,stWDBRN_token_mint,TOKEN_PROGRAM_ID,wallet_keypair) to_Pubkey = Pubkey.from_string(to_user_account) # Check if account exists account = token.get_accounts_by_owner(to_Pubkey) if len(account.value) > 1: print(f"ERROR getting token account") return if len(account.value) == 0: print("NEED TO MINT ACCOUNT") # Create account to_Pubkey = token.create_account(to_Pubkey) print(f"Created token account {to_Pubkey}") else: to_Pubkey = account.value[0].pubkey print(token.mint_to(to_Pubkey,wallet_keypair,int(amount*10**9))) # endregion # region Error Catching # 404 catch all @app.errorhandler(404) def not_found(e): return render_template("404.html"), 404 # endregion if __name__ == "__main__": app.run(debug=True, port=5000, host="0.0.0.0")