from functools import lru_cache import json from flask import ( Flask, make_response, redirect, request, jsonify, render_template, send_from_directory, send_file, ) import os import json import requests from datetime import datetime import dotenv import solana import solana.rpc from solana.rpc.api import Client import solana.rpc.api from solders.pubkey import Pubkey from solana.rpc.types import TokenAccountOpts from pycoingecko import CoinGeckoAPI from cachetools import TTLCache from cachetools import cached import threading import time import cache dotenv.load_dotenv() app = Flask(__name__) solana_client = Client(os.getenv("SOLANA_URL")) blockFrost_API = os.getenv("BLOCKFROST") stWDBRN_token_mint = Pubkey.from_string( "mNT61ixgiLnggJ4qf5hDCNj7vTiCqnqysosjncrBydf") vault_sol_address = Pubkey.from_string( "NWywvhcqdkJsm1s9VVviPm9UfyDtyCW9t8kDb24PDPN") vault_cardano_address = "stake1uy4qd785pcds7ph2jue2lrhhxa698c5959375lqdv3yphcgwc8qna" fiat = "USD" usd_to_fiat = 1 stablecoins = ["usdc", "usdt", "dai"] coingecko_client = CoinGeckoAPI() def find(name, path): for root, dirs, files in os.walk(path): if name in files: return os.path.join(root, name) # Assets routes @app.route("/assets/") def send_assets(path): if path.endswith(".json"): return send_from_directory( "templates/assets", path, mimetype="application/json" ) if os.path.isfile("templates/assets/" + path): return send_from_directory("templates/assets", path) # Try looking in one of the directories filename: str = path.split("/")[-1] if ( filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".svg") ): if os.path.isfile("templates/assets/img/" + filename): return send_from_directory("templates/assets/img", filename) if os.path.isfile("templates/assets/img/favicon/" + filename): return send_from_directory("templates/assets/img/favicon", filename) return render_template("404.html"), 404 # region Special routes @app.route("/favicon.png") def faviconPNG(): return send_from_directory("templates/assets/img", "favicon.png") @app.route("/.well-known/") def wellknown(path): # Try to proxy to https://nathan.woodburn.au/.well-known/ req = requests.get(f"https://nathan.woodburn.au/.well-known/{path}") return make_response( req.content, 200, {"Content-Type": req.headers["Content-Type"]} ) # endregion # region Main routes @app.route("/") def index(): tokenSupply = getTokenSupplyString() tokenValue = getTokenPrice() tokens = getTokens() solValue = getSolValue() vaultBalance = getVaultBalance() vaultBalance = "{:.2f}".format(vaultBalance) # For testing # tokenSupply = 20 # tokenValue = 1.01 # tokens = [{"symbol":"stWDBRN","name":"Stake With Us","value":1.01}] # solValue = 10 # vaultBalance = "20.00" pie_chart_data = [(token['symbol'].upper(), token['value'], f"{token['name']}: ${'{:.2f}'.format(token['value'])}") for token in tokens] pie_chart_data.append(("SOL", solValue, f"Solana: ${'{:.2f}'.format(solValue)}")) cardanoBalance = getCardanoValue(vault_cardano_address) cardanoBalance = "{:.2f}".format(cardanoBalance) pie_chart_data.append( ("ADA", cardanoBalance, f"Cardano: ${cardanoBalance}")) return render_template("index.html", value=tokenValue, supply=tokenSupply, vault=vaultBalance) @app.route("/") def catch_all(path: str): if os.path.isfile("templates/" + path): return render_template(path) # Try with .html if os.path.isfile("templates/" + path + ".html"): return render_template(path + ".html") if os.path.isfile("templates/" + path.strip("/") + ".html"): return render_template(path.strip("/") + ".html") # Try to find a file matching if path.count("/") < 1: # Try to find a file matching filename = find(path, "templates") if filename: return send_file(filename) return render_template("404.html"), 404 # endregion # region Solana @cache.file_cache() def get_coin_price(coin_id): try: if coin_id in stablecoins: return 1 * usd_to_fiat price = coingecko_client.get_price(ids=coin_id, vs_currencies=fiat) return price[coin_id][fiat.lower()] except: return 0 @cache.file_cache() def get_token_price(token_address: str): try: price = coingecko_client.get_token_price( id='solana', contract_addresses=token_address, vs_currencies=fiat) return price[token_address][fiat.lower()] except: return 0 @cache.file_cache() def getTokenSupplyString() -> str: supply = solana_client.get_token_supply(stWDBRN_token_mint) return supply.value.ui_amount_string @cache.file_cache() def getTokenSupply() -> int: supply = solana_client.get_token_supply(stWDBRN_token_mint) return supply.value.ui_amount @cache.file_cache() def getSolBalance() -> int: SOLbalance = solana_client.get_balance( vault_sol_address).value / 1000000000 return SOLbalance @cache.file_cache() def getSolValue() -> int: SOLbalance = solana_client.get_balance( vault_sol_address).value / 1000000000 SOLPrice = get_coin_price("solana") return SOLbalance * SOLPrice @cache.file_cache() def getVaultBalance() -> int: # Get balance of vault vaultBalance = 0 vaultBalance += getSolValue() tokens = getTokens() tokenValue = 0 for token in tokens: tokenValue += token["value"] vaultBalance += tokenValue vaultBalance += getCardanoValue(vault_cardano_address) return vaultBalance @cache.file_cache() def getTokens(): programID = Pubkey.from_string( "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") tokenAccounts = solana_client.get_token_accounts_by_owner( vault_sol_address, TokenAccountOpts(program_id=programID) ) tokenAccounts = tokenAccounts.value tokens = [] for tokenAccount in tokenAccounts: pubkey = tokenAccount.pubkey account = solana_client.get_token_account_balance(pubkey) mint = tokenAccount.account.data[:32] mint = Pubkey(mint) # Decode the mint token = { "mint": str(mint), "balance": account.value.ui_amount } token["price"] = get_token_price(token["mint"]) token["value"] = token["price"] * token["balance"] if token["value"] < 0.01: continue data = getTokenData(str(mint)) token["name"] = data["name"] token["symbol"] = data["symbol"] tokens.append(token) return tokens def getTokenData(tokenMint): if not os.path.exists("tokens"): os.makedirs("tokens") if os.path.exists(f"tokens/{tokenMint}.json"): with open(f"tokens/{tokenMint}.json") as f: data = json.load(f) return data else: data = coingecko_client.get_coin_info_from_contract_address_by_id( 'solana', tokenMint) with open(f"tokens/{tokenMint}.json", "w") as f: json.dump(data, f) return data def getTokenPrice(): # Get number of tokens minted supply = getTokenSupply() vaultBalance = getVaultBalance() value = vaultBalance/supply # Round value to 2 decimal places and ensure it shows 2 decimal places value = "{:.2f}".format(value) return value # endregion # region Cardano get_cardano_balance_cache = TTLCache(maxsize=1, ttl=3600) @cache.file_cache() def getCardanoBalance(address: str): # Get balance of cardano address try: response = requests.get(f"https://cardano-mainnet.blockfrost.io/api/v0/accounts/{address}", headers={"project_id": blockFrost_API}) if response.status_code != 200: print("Error getting cardano balance") return 0 data = response.json() if "controlled_amount" in data: return int(data["controlled_amount"]) / 1000000 return 0 except: print("Error getting cardano balance") return 0 def getCardanoValue(address: str): balance = getCardanoBalance(address) price = get_coin_price("cardano") return balance * price # endregion # region API Routes @app.route("/api/v1/token") def api_token(): # Get number of tokens minted supply = getTokenSupply() token = {} # Get balance of vault token["SOL"] = { "name": "Solana", "amount": round(getSolBalance() / supply,4), "value": round(getSolValue() / supply,2) } if token["SOL"]["value"] < 0.01: token["SOL"]["amount"] = 0 token["SOL"]["value"] = 0 tokens = getTokens() for t in tokens: token[t["symbol"].upper()] = t if token[t["symbol"].upper()]["value"]/supply < 0.01: token[t["symbol"].upper()]["amount"] = 0 token[t["symbol"].upper()]["value"] = 0 else: token[t["symbol"].upper()]["amount"] = t["balance"] / supply token[t["symbol"].upper()]["value"] = t["price"] * t["balance"] / supply # Round value to 4 decimal places token[t["symbol"].upper()]["value"] = round(token[t["symbol"].upper()]["value"], 2) token[t["symbol"].upper()]["amount"] = round(token[t["symbol"].upper()]["amount"], 4) # Remove balance key del token[t["symbol"].upper()]["balance"] token["ADA"] = { "name": "Cardano", "amount": round(getCardanoBalance(vault_cardano_address) / supply,4), "value": round(getCardanoValue(vault_cardano_address) / supply,2) } if token["ADA"]["value"] < 0.01: token["ADA"]["amount"] = 0 token["ADA"]["value"] = 0 # For each key add tooltip for key in token: token[key]["tooltip"] = f"{token[key]['amount']} {key} (${token[key]['value']})" token["total"] = { "name": "stWDBRN", "description": "stWDBRN total value (USD)", "amount": 1, "value":float(getTokenPrice()) } return jsonify(token) @app.route("/api/v1/vault") def api_vault(): tokens = getTokens() vaultBalance = getVaultBalance() vaultBalance = "{:.2f}".format(vaultBalance) vault = {} vault["SOL"] = { "name": "Solana", "amount": round(getSolBalance(),4), "value": round(getSolValue(),2) } if vault["SOL"]["value"] < 0.01: vault["SOL"]["amount"] = 0 vault["SOL"]["value"] = 0 vault["ADA"] = { "name": "Cardano", "amount": round(getCardanoBalance(vault_cardano_address),4), "value": round(getCardanoValue(vault_cardano_address),2) } if vault["ADA"]["value"] < 0.01: vault["ADA"]["amount"] = 0 vault["ADA"]["value"] = 0 for t in tokens: vault[t["symbol"].upper()] = t if vault[t["symbol"].upper()]["value"] < 0.01: vault[t["symbol"].upper()]["amount"] = 0 vault[t["symbol"].upper()]["value"] = 0 else: vault[t["symbol"].upper()]["amount"] = t["balance"] vault[t["symbol"].upper()]["value"] = t["price"] * t["balance"] # Round value to 4 decimal places vault[t["symbol"].upper()]["value"] = round(vault[t["symbol"].upper()]["value"], 2) vault[t["symbol"].upper()]["amount"] = round(vault[t["symbol"].upper()]["amount"], 4) # Remove balance key del vault[t["symbol"].upper()]["balance"] # For each key add tooltip for key in vault: vault[key]["tooltip"] = f"{vault[key]['amount']} {key} (${vault[key]['value']})" vault["total"] = { "name": "Vault", "description": "Total Vault value (USD)", "value": vaultBalance } return jsonify(vault) # endregion # region Error Catching # 404 catch all @app.errorhandler(404) def not_found(e): return render_template("404.html"), 404 # endregion if __name__ == "__main__": app.run(debug=True, port=5000, host="0.0.0.0")