from functools import lru_cache import json from flask import ( Flask, make_response, redirect, request, jsonify, render_template, send_from_directory, send_file, ) import os import json import requests from datetime import datetime import dotenv import solana import solana.rpc from solana.rpc.api import Client import solana.rpc.api from solders.pubkey import Pubkey from solana.rpc.types import TokenAccountOpts from pycoingecko import CoinGeckoAPI from cachetools import TTLCache from cachetools import cached dotenv.load_dotenv() app = Flask(__name__) solana_client = Client(os.environ["SOLANA_URL"]) token = Pubkey.from_string("mNT61ixgiLnggJ4qf5hDCNj7vTiCqnqysosjncrBydf") vault = Pubkey.from_string("NWywvhcqdkJsm1s9VVviPm9UfyDtyCW9t8kDb24PDPN") fiat = "USD" usd_to_fiat = 1 stablecoins = ["usdc", "usdt", "dai"] coingecko_client = CoinGeckoAPI() def find(name, path): for root, dirs, files in os.walk(path): if name in files: return os.path.join(root, name) # Assets routes @app.route("/assets/") def send_assets(path): if path.endswith(".json"): return send_from_directory( "templates/assets", path, mimetype="application/json" ) if os.path.isfile("templates/assets/" + path): return send_from_directory("templates/assets", path) # Try looking in one of the directories filename: str = path.split("/")[-1] if ( filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg") or filename.endswith(".svg") ): if os.path.isfile("templates/assets/img/" + filename): return send_from_directory("templates/assets/img", filename) if os.path.isfile("templates/assets/img/favicon/" + filename): return send_from_directory("templates/assets/img/favicon", filename) return render_template("404.html"), 404 # region Special routes @app.route("/favicon.png") def faviconPNG(): return send_from_directory("templates/assets/img", "favicon.png") @app.route("/.well-known/") def wellknown(path): # Try to proxy to https://nathan.woodburn.au/.well-known/ req = requests.get(f"https://nathan.woodburn.au/.well-known/{path}") return make_response( req.content, 200, {"Content-Type": req.headers["Content-Type"]} ) # endregion # region Main routes @app.route("/") def index(): tokenSupply = getTokenSupplyString() tokenValue = getTokenPrice() tokens = getTokens() solValue = getSolValue() pie_chart_data = [(token['symbol'].upper(), token['value'], f"{token['name']}: ${'{:.2f}'.format(token['value'])}") for token in tokens] pie_chart_data.append(("SOL", solValue, f"Solana: ${"{:.2f}".format(solValue)}")) return render_template("index.html", value=tokenValue, supply=tokenSupply, pie_chart=pie_chart_data) @app.route("/") def catch_all(path: str): if os.path.isfile("templates/" + path): return render_template(path) # Try with .html if os.path.isfile("templates/" + path + ".html"): return render_template(path + ".html") if os.path.isfile("templates/" + path.strip("/") + ".html"): return render_template(path.strip("/") + ".html") # Try to find a file matching if path.count("/") < 1: # Try to find a file matching filename = find(path, "templates") if filename: return send_file(filename) return render_template("404.html"), 404 # endregion # region Solana coin_price_cache = TTLCache(maxsize=100, ttl=3600) @cached(coin_price_cache) def get_coin_price(coin_id): try: if coin_id in stablecoins: return 1 * usd_to_fiat price = coingecko_client.get_price(ids=coin_id, vs_currencies=fiat) return price[coin_id][fiat.lower()] except: return 0 token_price_cache = TTLCache(maxsize=100, ttl=3600) @cached(token_price_cache) def get_token_price(token_address:str): try: price = coingecko_client.get_token_price(id='solana',contract_addresses=token_address,vs_currencies=fiat) return price[token_address][fiat.lower()] except: return 0 token_supply_str_cache = TTLCache(maxsize=1, ttl=3600) @cached(token_supply_str_cache) def getTokenSupplyString() -> str: supply = solana_client.get_token_supply(token) return supply.value.ui_amount_string token_supply_cache = TTLCache(maxsize=1, ttl=3600) @cached(token_supply_cache) def getTokenSupply() -> int: supply = solana_client.get_token_supply(token) return supply.value.ui_amount sol_value_cache = TTLCache(maxsize=1, ttl=3600) @cached(sol_value_cache) def getSolValue() -> int: SOLbalance = solana_client.get_balance(vault).value / 1000000000 SOLPrice = get_coin_price("solana") return SOLbalance * SOLPrice vault_balance_cache = TTLCache(maxsize=1, ttl=3600) @cached(vault_balance_cache) def getVaultBalance() -> int: # Get balance of vault SOLbalance = getSolValue() tokens = getTokens() tokenValue = 0 for token in tokens: tokenValue += token["value"] print(tokens) return SOLbalance + tokenValue get_tokens_cache = TTLCache(maxsize=1, ttl=3600) @cached(get_tokens_cache) def getTokens(): programID = Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") tokenAccounts = solana_client.get_token_accounts_by_owner( vault, TokenAccountOpts(program_id=programID) ) tokenAccounts = tokenAccounts.value tokens = [] for tokenAccount in tokenAccounts: pubkey = tokenAccount.pubkey account = solana_client.get_token_account_balance(pubkey) mint = tokenAccount.account.data[:32] mint = Pubkey(mint) # Decode the mint token = { "mint": str(mint), "balance": account.value.ui_amount } token["price"] = get_token_price(token["mint"]) token["value"] = token["price"] * token["balance"] data = getTokenData(str(mint)) token["name"] = data["name"] token["symbol"] = data["symbol"] tokens.append(token) return tokens def getTokenData(tokenMint): if not os.path.exists("tokens"): os.makedirs("tokens") if os.path.exists(f"tokens/{tokenMint}.json"): with open(f"tokens/{tokenMint}.json") as f: data = json.load(f) return data else: data = coingecko_client.get_coin_info_from_contract_address_by_id('solana', tokenMint) with open(f"tokens/{tokenMint}.json", "w") as f: json.dump(data, f) return data def getTokenPrice(): # Get number of tokens minted supply = getTokenSupply() vaultBalance = getVaultBalance() value = vaultBalance/supply # Round value to 2 decimal places and ensure it shows 2 decimal places value = "{:.2f}".format(value) return value # endregion # region Error Catching # 404 catch all @app.errorhandler(404) def not_found(e): return render_template("404.html"), 404 # endregion if __name__ == "__main__": app.run(debug=True, port=5000, host="0.0.0.0")