generated from nathanwoodburn/python-webserver-template
Nathan Woodburn
7aebf2b92f
All checks were successful
Build Docker / BuildImage (push) Successful in 34s
442 lines
12 KiB
Python
442 lines
12 KiB
Python
from functools import lru_cache
|
|
import json
|
|
from flask import (
|
|
Flask,
|
|
make_response,
|
|
redirect,
|
|
request,
|
|
jsonify,
|
|
render_template,
|
|
send_from_directory,
|
|
send_file,
|
|
)
|
|
import os
|
|
import json
|
|
import requests
|
|
from datetime import datetime
|
|
import dotenv
|
|
import solana
|
|
import solana.rpc
|
|
from solana.rpc.api import Client
|
|
import solana.rpc.api
|
|
from solders.pubkey import Pubkey
|
|
from solana.rpc.types import TokenAccountOpts
|
|
from pycoingecko import CoinGeckoAPI
|
|
from cachetools import TTLCache
|
|
from cachetools import cached
|
|
import threading
|
|
import time
|
|
import cache
|
|
|
|
dotenv.load_dotenv()
|
|
|
|
app = Flask(__name__)
|
|
|
|
solana_client = Client(os.getenv("SOLANA_URL"))
|
|
blockFrost_API = os.getenv("BLOCKFROST")
|
|
stWDBRN_token_mint = Pubkey.from_string(
|
|
"mNT61ixgiLnggJ4qf5hDCNj7vTiCqnqysosjncrBydf")
|
|
vault_sol_address = Pubkey.from_string(
|
|
"NWywvhcqdkJsm1s9VVviPm9UfyDtyCW9t8kDb24PDPN")
|
|
vault_cardano_address = "stake1uy4qd785pcds7ph2jue2lrhhxa698c5959375lqdv3yphcgwc8qna"
|
|
|
|
fiat = "USD"
|
|
usd_to_fiat = 1
|
|
stablecoins = ["usdc", "usdt", "dai"]
|
|
|
|
coingecko_client = CoinGeckoAPI()
|
|
|
|
|
|
def find(name, path):
|
|
for root, dirs, files in os.walk(path):
|
|
if name in files:
|
|
return os.path.join(root, name)
|
|
|
|
# Assets routes
|
|
|
|
|
|
@app.route("/assets/<path:path>")
|
|
def send_assets(path):
|
|
if path.endswith(".json"):
|
|
return send_from_directory(
|
|
"templates/assets", path, mimetype="application/json"
|
|
)
|
|
|
|
if os.path.isfile("templates/assets/" + path):
|
|
return send_from_directory("templates/assets", path)
|
|
|
|
# Try looking in one of the directories
|
|
filename: str = path.split("/")[-1]
|
|
if (
|
|
filename.endswith(".png")
|
|
or filename.endswith(".jpg")
|
|
or filename.endswith(".jpeg")
|
|
or filename.endswith(".svg")
|
|
):
|
|
if os.path.isfile("templates/assets/img/" + filename):
|
|
return send_from_directory("templates/assets/img", filename)
|
|
if os.path.isfile("templates/assets/img/favicon/" + filename):
|
|
return send_from_directory("templates/assets/img/favicon", filename)
|
|
|
|
return render_template("404.html"), 404
|
|
|
|
|
|
# region Special routes
|
|
@app.route("/favicon.png")
|
|
def faviconPNG():
|
|
return send_from_directory("templates/assets/img", "favicon.png")
|
|
|
|
|
|
@app.route("/.well-known/<path:path>")
|
|
def wellknown(path):
|
|
# Try to proxy to https://nathan.woodburn.au/.well-known/
|
|
req = requests.get(f"https://nathan.woodburn.au/.well-known/{path}")
|
|
return make_response(
|
|
req.content, 200, {"Content-Type": req.headers["Content-Type"]}
|
|
)
|
|
|
|
|
|
# endregion
|
|
|
|
|
|
# region Main routes
|
|
@app.route("/")
|
|
def index():
|
|
tokenSupply = getTokenSupplyString()
|
|
tokenValue = getTokenPrice()
|
|
tokens = getTokens()
|
|
solValue = getSolValue()
|
|
vaultBalance = getVaultBalance()
|
|
vaultBalance = "{:.2f}".format(vaultBalance)
|
|
|
|
# For testing
|
|
# tokenSupply = 20
|
|
# tokenValue = 1.01
|
|
# tokens = [{"symbol":"stWDBRN","name":"Stake With Us","value":1.01}]
|
|
# solValue = 10
|
|
# vaultBalance = "20.00"
|
|
|
|
pie_chart_data = [(token['symbol'].upper(), token['value'], f"{token['name']}: ${'{:.2f}'.format(token['value'])}") for token in tokens]
|
|
pie_chart_data.append(("SOL", solValue, f"Solana: ${'{:.2f}'.format(solValue)}"))
|
|
|
|
cardanoBalance = getCardanoValue(vault_cardano_address)
|
|
cardanoBalance = "{:.2f}".format(cardanoBalance)
|
|
pie_chart_data.append(
|
|
("ADA", cardanoBalance, f"Cardano: ${cardanoBalance}"))
|
|
|
|
return render_template("index.html", value=tokenValue, supply=tokenSupply, vault=vaultBalance)
|
|
|
|
|
|
@app.route("/<path:path>")
|
|
def catch_all(path: str):
|
|
if os.path.isfile("templates/" + path):
|
|
return render_template(path)
|
|
|
|
# Try with .html
|
|
if os.path.isfile("templates/" + path + ".html"):
|
|
return render_template(path + ".html")
|
|
|
|
if os.path.isfile("templates/" + path.strip("/") + ".html"):
|
|
return render_template(path.strip("/") + ".html")
|
|
|
|
# Try to find a file matching
|
|
if path.count("/") < 1:
|
|
# Try to find a file matching
|
|
filename = find(path, "templates")
|
|
if filename:
|
|
return send_file(filename)
|
|
|
|
return render_template("404.html"), 404
|
|
|
|
# endregion
|
|
|
|
|
|
# region Solana
|
|
@cache.file_cache()
|
|
def get_coin_price(coin_id):
|
|
|
|
try:
|
|
if coin_id in stablecoins:
|
|
return 1 * usd_to_fiat
|
|
|
|
price = coingecko_client.get_price(ids=coin_id, vs_currencies=fiat)
|
|
|
|
return price[coin_id][fiat.lower()]
|
|
except:
|
|
return 0
|
|
|
|
@cache.file_cache()
|
|
def get_token_price(token_address: str):
|
|
try:
|
|
price = coingecko_client.get_token_price(
|
|
id='solana', contract_addresses=token_address, vs_currencies=fiat)
|
|
return price[token_address][fiat.lower()]
|
|
except:
|
|
return 0
|
|
|
|
@cache.file_cache()
|
|
def getTokenSupplyString() -> str:
|
|
supply = solana_client.get_token_supply(stWDBRN_token_mint)
|
|
return supply.value.ui_amount_string
|
|
|
|
@cache.file_cache()
|
|
def getTokenSupply() -> int:
|
|
supply = solana_client.get_token_supply(stWDBRN_token_mint)
|
|
return supply.value.ui_amount
|
|
|
|
@cache.file_cache()
|
|
def getSolBalance() -> int:
|
|
SOLbalance = solana_client.get_balance(
|
|
vault_sol_address).value / 1000000000
|
|
return SOLbalance
|
|
|
|
@cache.file_cache()
|
|
def getSolValue() -> int:
|
|
SOLbalance = solana_client.get_balance(
|
|
vault_sol_address).value / 1000000000
|
|
SOLPrice = get_coin_price("solana")
|
|
return SOLbalance * SOLPrice
|
|
|
|
@cache.file_cache()
|
|
def getVaultBalance() -> int:
|
|
# Get balance of vault
|
|
vaultBalance = 0
|
|
vaultBalance += getSolValue()
|
|
tokens = getTokens()
|
|
tokenValue = 0
|
|
for token in tokens:
|
|
tokenValue += token["value"]
|
|
|
|
vaultBalance += tokenValue
|
|
vaultBalance += getCardanoValue(vault_cardano_address)
|
|
|
|
return vaultBalance
|
|
|
|
@cache.file_cache()
|
|
def getTokens():
|
|
programID = Pubkey.from_string(
|
|
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
|
|
|
|
tokenAccounts = solana_client.get_token_accounts_by_owner(
|
|
vault_sol_address,
|
|
TokenAccountOpts(program_id=programID)
|
|
)
|
|
|
|
tokenAccounts = tokenAccounts.value
|
|
tokens = []
|
|
|
|
for tokenAccount in tokenAccounts:
|
|
pubkey = tokenAccount.pubkey
|
|
account = solana_client.get_token_account_balance(pubkey)
|
|
mint = tokenAccount.account.data[:32]
|
|
mint = Pubkey(mint)
|
|
# Decode the mint
|
|
token = {
|
|
"mint": str(mint),
|
|
"balance": account.value.ui_amount
|
|
}
|
|
|
|
token["price"] = get_token_price(token["mint"])
|
|
token["value"] = token["price"] * token["balance"]
|
|
|
|
if token["value"] < 0.01:
|
|
continue
|
|
|
|
data = getTokenData(str(mint))
|
|
token["name"] = data["name"]
|
|
token["symbol"] = data["symbol"]
|
|
tokens.append(token)
|
|
|
|
return tokens
|
|
|
|
def getTokenData(tokenMint):
|
|
if not os.path.exists("tokens"):
|
|
os.makedirs("tokens")
|
|
|
|
if os.path.exists(f"tokens/{tokenMint}.json"):
|
|
with open(f"tokens/{tokenMint}.json") as f:
|
|
data = json.load(f)
|
|
return data
|
|
else:
|
|
data = coingecko_client.get_coin_info_from_contract_address_by_id(
|
|
'solana', tokenMint)
|
|
with open(f"tokens/{tokenMint}.json", "w") as f:
|
|
json.dump(data, f)
|
|
return data
|
|
|
|
|
|
def getTokenPrice():
|
|
# Get number of tokens minted
|
|
supply = getTokenSupply()
|
|
vaultBalance = getVaultBalance()
|
|
value = vaultBalance/supply
|
|
# Round value to 2 decimal places and ensure it shows 2 decimal places
|
|
value = "{:.2f}".format(value)
|
|
|
|
return value
|
|
|
|
|
|
# endregion
|
|
|
|
# region Cardano
|
|
get_cardano_balance_cache = TTLCache(maxsize=1, ttl=3600)
|
|
|
|
|
|
@cache.file_cache()
|
|
def getCardanoBalance(address: str):
|
|
# Get balance of cardano address
|
|
try:
|
|
response = requests.get(f"https://cardano-mainnet.blockfrost.io/api/v0/accounts/{address}", headers={"project_id": blockFrost_API})
|
|
if response.status_code != 200:
|
|
print("Error getting cardano balance")
|
|
return 0
|
|
data = response.json()
|
|
if "controlled_amount" in data:
|
|
return int(data["controlled_amount"]) / 1000000
|
|
return 0
|
|
|
|
except:
|
|
print("Error getting cardano balance")
|
|
return 0
|
|
|
|
|
|
def getCardanoValue(address: str):
|
|
balance = getCardanoBalance(address)
|
|
price = get_coin_price("cardano")
|
|
return balance * price
|
|
|
|
# endregion
|
|
|
|
# region API Routes
|
|
|
|
@app.route("/api/v1/token")
|
|
def api_token():
|
|
# Get number of tokens minted
|
|
supply = getTokenSupply()
|
|
token = {}
|
|
# Get balance of vault
|
|
token["SOL"] = {
|
|
"name": "Solana",
|
|
"amount": round(getSolBalance() / supply,4),
|
|
"value": round(getSolValue() / supply,2)
|
|
}
|
|
if token["SOL"]["value"] < 0.01:
|
|
token["SOL"]["amount"] = 0
|
|
token["SOL"]["value"] = 0
|
|
|
|
tokens = getTokens()
|
|
for t in tokens:
|
|
token[t["symbol"].upper()] = t
|
|
if token[t["symbol"].upper()]["value"]/supply < 0.01:
|
|
token[t["symbol"].upper()]["amount"] = 0
|
|
token[t["symbol"].upper()]["value"] = 0
|
|
else:
|
|
token[t["symbol"].upper()]["amount"] = t["balance"] / supply
|
|
token[t["symbol"].upper()]["value"] = t["price"] * t["balance"] / supply
|
|
# Round value to 4 decimal places
|
|
token[t["symbol"].upper()]["value"] = round(token[t["symbol"].upper()]["value"], 2)
|
|
token[t["symbol"].upper()]["amount"] = round(token[t["symbol"].upper()]["amount"], 4)
|
|
|
|
# Remove balance key
|
|
del token[t["symbol"].upper()]["balance"]
|
|
|
|
|
|
|
|
token["ADA"] = {
|
|
"name": "Cardano",
|
|
"amount": round(getCardanoBalance(vault_cardano_address) / supply,4),
|
|
"value": round(getCardanoValue(vault_cardano_address) / supply,2)
|
|
}
|
|
if token["ADA"]["value"] < 0.01:
|
|
token["ADA"]["amount"] = 0
|
|
token["ADA"]["value"] = 0
|
|
|
|
# For each key add tooltip
|
|
for key in token:
|
|
token[key]["tooltip"] = f"{token[key]['amount']} {key} (${token[key]['value']})"
|
|
|
|
|
|
token["total"] = {
|
|
"name": "stWDBRN",
|
|
"description": "stWDBRN total value (USD)",
|
|
"amount": 1,
|
|
"value":float(getTokenPrice())
|
|
}
|
|
|
|
|
|
return jsonify(token)
|
|
|
|
@app.route("/api/v1/vault")
|
|
def api_vault():
|
|
tokens = getTokens()
|
|
vaultBalance = getVaultBalance()
|
|
vaultBalance = "{:.2f}".format(vaultBalance)
|
|
|
|
|
|
vault = {}
|
|
vault["SOL"] = {
|
|
"name": "Solana",
|
|
"amount": round(getSolBalance(),4),
|
|
"value": round(getSolValue(),2)
|
|
}
|
|
if vault["SOL"]["value"] < 0.01:
|
|
vault["SOL"]["amount"] = 0
|
|
vault["SOL"]["value"] = 0
|
|
|
|
vault["ADA"] = {
|
|
"name": "Cardano",
|
|
"amount": round(getCardanoBalance(vault_cardano_address),4),
|
|
"value": round(getCardanoValue(vault_cardano_address),2)
|
|
}
|
|
if vault["ADA"]["value"] < 0.01:
|
|
vault["ADA"]["amount"] = 0
|
|
vault["ADA"]["value"] = 0
|
|
|
|
for t in tokens:
|
|
vault[t["symbol"].upper()] = t
|
|
if vault[t["symbol"].upper()]["value"] < 0.01:
|
|
vault[t["symbol"].upper()]["amount"] = 0
|
|
vault[t["symbol"].upper()]["value"] = 0
|
|
else:
|
|
vault[t["symbol"].upper()]["amount"] = t["balance"]
|
|
vault[t["symbol"].upper()]["value"] = t["price"] * t["balance"]
|
|
# Round value to 4 decimal places
|
|
vault[t["symbol"].upper()]["value"] = round(vault[t["symbol"].upper()]["value"], 2)
|
|
vault[t["symbol"].upper()]["amount"] = round(vault[t["symbol"].upper()]["amount"], 4)
|
|
|
|
# Remove balance key
|
|
del vault[t["symbol"].upper()]["balance"]
|
|
|
|
# For each key add tooltip
|
|
for key in vault:
|
|
vault[key]["tooltip"] = f"{vault[key]['amount']} {key} (${vault[key]['value']})"
|
|
|
|
vault["total"] = {
|
|
"name": "Vault",
|
|
"description": "Total Vault value (USD)",
|
|
"value": vaultBalance
|
|
}
|
|
|
|
return jsonify(vault)
|
|
|
|
|
|
|
|
|
|
# endregion
|
|
|
|
|
|
# region Error Catching
|
|
# 404 catch all
|
|
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(e):
|
|
return render_template("404.html"), 404
|
|
|
|
|
|
# endregion
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(debug=True, port=5000, host="0.0.0.0")
|