stWDBRN/server.py

531 lines
15 KiB
Python
Raw Normal View History

2024-12-04 18:59:04 +11:00
from functools import lru_cache
2024-12-04 16:35:47 +11:00
import json
from flask import (
Flask,
make_response,
redirect,
request,
jsonify,
render_template,
send_from_directory,
send_file,
)
import os
import json
import requests
from datetime import datetime
import dotenv
2024-12-04 18:59:04 +11:00
import solana
import solana.rpc
from solana.rpc.api import Client
import solana.rpc.api
2024-12-05 19:20:47 +11:00
import solana.utils
2024-12-04 18:59:04 +11:00
from solders.pubkey import Pubkey
2024-12-05 19:20:47 +11:00
from solders.keypair import Keypair
2024-12-04 18:59:04 +11:00
from solana.rpc.types import TokenAccountOpts
from pycoingecko import CoinGeckoAPI
from cachetools import TTLCache
from cachetools import cached
2024-12-05 12:53:05 +11:00
import threading
import time
2024-12-05 19:20:47 +11:00
from solders.transaction import Transaction
import spl.token.client
2024-12-05 12:53:05 +11:00
import cache
2024-12-05 19:20:47 +11:00
import spl
import spl.token
from spl.token.instructions import create_associated_token_account,mint_to,MintToParams
from solana.rpc.commitment import Confirmed
from solana.rpc.api import Client
from solana.rpc.types import TxOpts
from solana.transaction import Transaction
2024-12-04 16:35:47 +11:00
dotenv.load_dotenv()
app = Flask(__name__)
2024-12-05 12:26:58 +11:00
solana_client = Client(os.getenv("SOLANA_URL"))
blockFrost_API = os.getenv("BLOCKFROST")
2024-12-05 12:53:05 +11:00
stWDBRN_token_mint = Pubkey.from_string(
"mNT61ixgiLnggJ4qf5hDCNj7vTiCqnqysosjncrBydf")
vault_sol_address = Pubkey.from_string(
"NWywvhcqdkJsm1s9VVviPm9UfyDtyCW9t8kDb24PDPN")
2024-12-05 12:26:58 +11:00
vault_cardano_address = "stake1uy4qd785pcds7ph2jue2lrhhxa698c5959375lqdv3yphcgwc8qna"
2024-12-04 18:59:04 +11:00
fiat = "USD"
usd_to_fiat = 1
stablecoins = ["usdc", "usdt", "dai"]
coingecko_client = CoinGeckoAPI()
2024-12-04 16:35:47 +11:00
def find(name, path):
for root, dirs, files in os.walk(path):
if name in files:
return os.path.join(root, name)
# Assets routes
2024-12-05 12:53:05 +11:00
2024-12-04 16:35:47 +11:00
@app.route("/assets/<path:path>")
def send_assets(path):
if path.endswith(".json"):
return send_from_directory(
"templates/assets", path, mimetype="application/json"
)
if os.path.isfile("templates/assets/" + path):
return send_from_directory("templates/assets", path)
# Try looking in one of the directories
filename: str = path.split("/")[-1]
if (
filename.endswith(".png")
or filename.endswith(".jpg")
or filename.endswith(".jpeg")
or filename.endswith(".svg")
):
if os.path.isfile("templates/assets/img/" + filename):
return send_from_directory("templates/assets/img", filename)
if os.path.isfile("templates/assets/img/favicon/" + filename):
return send_from_directory("templates/assets/img/favicon", filename)
return render_template("404.html"), 404
# region Special routes
@app.route("/favicon.png")
def faviconPNG():
return send_from_directory("templates/assets/img", "favicon.png")
@app.route("/.well-known/<path:path>")
def wellknown(path):
# Try to proxy to https://nathan.woodburn.au/.well-known/
req = requests.get(f"https://nathan.woodburn.au/.well-known/{path}")
return make_response(
req.content, 200, {"Content-Type": req.headers["Content-Type"]}
)
# endregion
# region Main routes
@app.route("/")
def index():
2024-12-04 18:59:04 +11:00
tokenSupply = getTokenSupplyString()
tokenValue = getTokenPrice()
2024-12-04 20:13:36 +11:00
tokens = getTokens()
solValue = getSolValue()
2024-12-05 12:26:58 +11:00
vaultBalance = getVaultBalance()
vaultBalance = "{:.2f}".format(vaultBalance)
# For testing
# tokenSupply = 20
# tokenValue = 1.01
# tokens = [{"symbol":"stWDBRN","name":"Stake With Us","value":1.01}]
# solValue = 10
# vaultBalance = "20.00"
2024-12-05 12:59:24 +11:00
pie_chart_data = [(token['symbol'].upper(), token['value'], f"{token['name']}: ${'{:.2f}'.format(token['value'])}") for token in tokens]
pie_chart_data.append(("SOL", solValue, f"Solana: ${'{:.2f}'.format(solValue)}"))
2024-12-05 12:26:58 +11:00
cardanoBalance = getCardanoValue(vault_cardano_address)
cardanoBalance = "{:.2f}".format(cardanoBalance)
2024-12-05 12:53:05 +11:00
pie_chart_data.append(
("ADA", cardanoBalance, f"Cardano: ${cardanoBalance}"))
2024-12-05 12:26:58 +11:00
return render_template("index.html", value=tokenValue, supply=tokenSupply, vault=vaultBalance)
2024-12-04 16:35:47 +11:00
@app.route("/<path:path>")
def catch_all(path: str):
if os.path.isfile("templates/" + path):
return render_template(path)
# Try with .html
if os.path.isfile("templates/" + path + ".html"):
return render_template(path + ".html")
if os.path.isfile("templates/" + path.strip("/") + ".html"):
return render_template(path.strip("/") + ".html")
# Try to find a file matching
if path.count("/") < 1:
# Try to find a file matching
filename = find(path, "templates")
if filename:
return send_file(filename)
return render_template("404.html"), 404
# endregion
2024-12-04 18:59:04 +11:00
# region Solana
2024-12-05 12:53:05 +11:00
@cache.file_cache()
2024-12-04 18:59:04 +11:00
def get_coin_price(coin_id):
try:
if coin_id in stablecoins:
return 1 * usd_to_fiat
2024-12-05 12:53:05 +11:00
2024-12-04 18:59:04 +11:00
price = coingecko_client.get_price(ids=coin_id, vs_currencies=fiat)
return price[coin_id][fiat.lower()]
except:
return 0
2024-12-05 12:53:05 +11:00
@cache.file_cache()
def get_token_price(token_address: str):
2024-12-04 18:59:04 +11:00
try:
2024-12-05 12:53:05 +11:00
price = coingecko_client.get_token_price(
id='solana', contract_addresses=token_address, vs_currencies=fiat)
2024-12-04 18:59:04 +11:00
return price[token_address][fiat.lower()]
except:
return 0
def getTokenSupplyString() -> str:
2024-12-05 20:56:30 +11:00
supply = getTokenSupply()
return "{:.2f}".format(supply)
2024-12-04 18:59:04 +11:00
2024-12-05 12:53:05 +11:00
@cache.file_cache()
2024-12-04 18:59:04 +11:00
def getTokenSupply() -> int:
2024-12-05 12:26:58 +11:00
supply = solana_client.get_token_supply(stWDBRN_token_mint)
2024-12-04 18:59:04 +11:00
return supply.value.ui_amount
2024-12-05 13:38:29 +11:00
@cache.file_cache()
def getSolBalance() -> int:
SOLbalance = solana_client.get_balance(
vault_sol_address).value / 1000000000
return SOLbalance
2024-12-05 12:53:05 +11:00
@cache.file_cache()
2024-12-04 20:13:36 +11:00
def getSolValue() -> int:
2024-12-05 12:53:05 +11:00
SOLbalance = solana_client.get_balance(
vault_sol_address).value / 1000000000
2024-12-04 20:13:36 +11:00
SOLPrice = get_coin_price("solana")
return SOLbalance * SOLPrice
2024-12-05 12:53:05 +11:00
@cache.file_cache()
2024-12-04 18:59:04 +11:00
def getVaultBalance() -> int:
# Get balance of vault
2024-12-05 12:26:58 +11:00
vaultBalance = 0
vaultBalance += getSolValue()
2024-12-04 20:13:36 +11:00
tokens = getTokens()
tokenValue = 0
for token in tokens:
tokenValue += token["value"]
2024-12-05 12:26:58 +11:00
vaultBalance += tokenValue
vaultBalance += getCardanoValue(vault_cardano_address)
return vaultBalance
2024-12-04 18:59:04 +11:00
2024-12-05 12:53:05 +11:00
@cache.file_cache()
2024-12-04 20:13:36 +11:00
def getTokens():
2024-12-05 12:53:05 +11:00
programID = Pubkey.from_string(
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
2024-12-04 18:59:04 +11:00
tokenAccounts = solana_client.get_token_accounts_by_owner(
2024-12-05 12:53:05 +11:00
vault_sol_address,
TokenAccountOpts(program_id=programID)
)
2024-12-04 18:59:04 +11:00
tokenAccounts = tokenAccounts.value
tokens = []
for tokenAccount in tokenAccounts:
pubkey = tokenAccount.pubkey
account = solana_client.get_token_account_balance(pubkey)
mint = tokenAccount.account.data[:32]
mint = Pubkey(mint)
# Decode the mint
token = {
"mint": str(mint),
"balance": account.value.ui_amount
}
token["price"] = get_token_price(token["mint"])
token["value"] = token["price"] * token["balance"]
2024-12-05 12:53:05 +11:00
2024-12-04 20:45:27 +11:00
if token["value"] < 0.01:
continue
2024-12-04 20:13:36 +11:00
data = getTokenData(str(mint))
token["name"] = data["name"]
token["symbol"] = data["symbol"]
2024-12-04 18:59:04 +11:00
tokens.append(token)
2024-12-04 20:13:36 +11:00
return tokens
def getTokenData(tokenMint):
if not os.path.exists("tokens"):
os.makedirs("tokens")
if os.path.exists(f"tokens/{tokenMint}.json"):
with open(f"tokens/{tokenMint}.json") as f:
data = json.load(f)
return data
else:
2024-12-05 12:53:05 +11:00
data = coingecko_client.get_coin_info_from_contract_address_by_id(
'solana', tokenMint)
2024-12-04 20:13:36 +11:00
with open(f"tokens/{tokenMint}.json", "w") as f:
json.dump(data, f)
return data
2024-12-04 18:59:04 +11:00
def getTokenPrice():
# Get number of tokens minted
supply = getTokenSupply()
vaultBalance = getVaultBalance()
value = vaultBalance/supply
# Round value to 2 decimal places and ensure it shows 2 decimal places
value = "{:.2f}".format(value)
2024-12-05 12:53:05 +11:00
2024-12-04 18:59:04 +11:00
return value
# endregion
2024-12-05 12:26:58 +11:00
# region Cardano
get_cardano_balance_cache = TTLCache(maxsize=1, ttl=3600)
2024-12-05 12:53:05 +11:00
@cache.file_cache()
2024-12-05 12:26:58 +11:00
def getCardanoBalance(address: str):
# Get balance of cardano address
try:
2024-12-05 12:59:24 +11:00
response = requests.get(f"https://cardano-mainnet.blockfrost.io/api/v0/accounts/{address}", headers={"project_id": blockFrost_API})
2024-12-05 12:26:58 +11:00
if response.status_code != 200:
print("Error getting cardano balance")
return 0
data = response.json()
if "controlled_amount" in data:
return int(data["controlled_amount"]) / 1000000
return 0
except:
print("Error getting cardano balance")
return 0
2024-12-05 12:53:05 +11:00
2024-12-05 12:26:58 +11:00
def getCardanoValue(address: str):
balance = getCardanoBalance(address)
price = get_coin_price("cardano")
return balance * price
# endregion
2024-12-05 13:38:29 +11:00
# region API Routes
@app.route("/api/v1/token")
def api_token():
# Get number of tokens minted
supply = getTokenSupply()
token = {}
# Get balance of vault
token["SOL"] = {
"name": "Solana",
2024-12-05 14:32:26 +11:00
"amount": round(getSolBalance() / supply,4),
"value": round(getSolValue() / supply,2)
2024-12-05 13:38:29 +11:00
}
if token["SOL"]["value"] < 0.01:
token["SOL"]["amount"] = 0
token["SOL"]["value"] = 0
tokens = getTokens()
for t in tokens:
token[t["symbol"].upper()] = t
if token[t["symbol"].upper()]["value"]/supply < 0.01:
token[t["symbol"].upper()]["amount"] = 0
token[t["symbol"].upper()]["value"] = 0
else:
token[t["symbol"].upper()]["amount"] = t["balance"] / supply
token[t["symbol"].upper()]["value"] = t["price"] * t["balance"] / supply
# Round value to 4 decimal places
token[t["symbol"].upper()]["value"] = round(token[t["symbol"].upper()]["value"], 2)
2024-12-05 13:38:29 +11:00
token[t["symbol"].upper()]["amount"] = round(token[t["symbol"].upper()]["amount"], 4)
# Remove balance key
del token[t["symbol"].upper()]["balance"]
token["ADA"] = {
"name": "Cardano",
"amount": round(getCardanoBalance(vault_cardano_address) / supply,4),
"value": round(getCardanoValue(vault_cardano_address) / supply,2)
2024-12-05 13:38:29 +11:00
}
if token["ADA"]["value"] < 0.01:
token["ADA"]["amount"] = 0
token["ADA"]["value"] = 0
# For each key add tooltip
for key in token:
token[key]["tooltip"] = f"{token[key]['amount']} {key} (${token[key]['value']})"
2024-12-05 13:38:29 +11:00
token["total"] = {
"name": "stWDBRN",
"description": "stWDBRN total value (USD)",
"amount": 1,
"value":float(getTokenPrice())
}
return jsonify(token)
@app.route("/api/v1/vault")
def api_vault():
tokens = getTokens()
vaultBalance = getVaultBalance()
vaultBalance = "{:.2f}".format(vaultBalance)
vault = {}
vault["SOL"] = {
"name": "Solana",
"amount": round(getSolBalance(),4),
"value": round(getSolValue(),2)
}
if vault["SOL"]["value"] < 0.01:
vault["SOL"]["amount"] = 0
vault["SOL"]["value"] = 0
vault["ADA"] = {
"name": "Cardano",
"amount": round(getCardanoBalance(vault_cardano_address),4),
"value": round(getCardanoValue(vault_cardano_address),2)
}
if vault["ADA"]["value"] < 0.01:
vault["ADA"]["amount"] = 0
vault["ADA"]["value"] = 0
for t in tokens:
vault[t["symbol"].upper()] = t
if vault[t["symbol"].upper()]["value"] < 0.01:
vault[t["symbol"].upper()]["amount"] = 0
vault[t["symbol"].upper()]["value"] = 0
else:
vault[t["symbol"].upper()]["amount"] = t["balance"]
vault[t["symbol"].upper()]["value"] = t["price"] * t["balance"]
# Round value to 4 decimal places
vault[t["symbol"].upper()]["value"] = round(vault[t["symbol"].upper()]["value"], 2)
vault[t["symbol"].upper()]["amount"] = round(vault[t["symbol"].upper()]["amount"], 4)
# Remove balance key
del vault[t["symbol"].upper()]["balance"]
# For each key add tooltip
for key in vault:
vault[key]["tooltip"] = f"{vault[key]['amount']} {key} (${vault[key]['value']})"
vault["total"] = {
"name": "Vault",
"description": "Total Vault value (USD)",
"value": vaultBalance
}
return jsonify(vault)
2024-12-05 19:20:47 +11:00
@app.route("/api/v1/deposit",methods=["POST"])
def api_deposit():
# Get authorization header
auth = request.headers.get("authorization")
if not auth:
return jsonify({"error": "Missing authorization header"}), 401
if auth != os.getenv("DEPOSIT_HEADER"):
return jsonify({"error": "Invalid authorization header"}), 401
# Get data
data = request.get_json()
parseDeposit(data)
return jsonify(data)
def parseDeposit(data):
stWDBRN_price = float(getTokenPrice())
for tx in data:
if 'nativeTransfers' not in tx:
continue
if 'tokenTransfers' not in tx:
continue
# Skip memo txs
if 'Memo' in json.dumps(tx):
print("Skipping deposit as it contains memo")
2024-12-05 20:27:57 +11:00
print(json.dumps(tx, indent=4))
continue
2024-12-05 19:20:47 +11:00
for transfer in tx['nativeTransfers']:
if transfer['toUserAccount'] != str(vault_sol_address):
continue
solAmount = transfer['amount'] / 1000000000
# Get USD value
solValue = get_coin_price("solana") * solAmount
stWDBRN_amount = solValue / stWDBRN_price
stWDBRN_amount = round(stWDBRN_amount, 9)
mint_stWDBRN(stWDBRN_amount, transfer['fromUserAccount'])
for transfer in tx['tokenTransfers']:
if transfer['toUserAccount'] != str(vault_sol_address):
continue
# Get token data
token_price = get_token_price(transfer['mint'])
USDvalue = transfer['tokenAmount'] * token_price
stWDBRN_amount = USDvalue / stWDBRN_price
stWDBRN_amount = round(stWDBRN_amount, 9)
mint_stWDBRN(stWDBRN_amount, transfer['fromUserAccount'])
def mint_stWDBRN(amount, to_user_account):
2024-12-05 19:38:28 +11:00
if amount < 0.5:
print(f"Skipping minting of {amount} stWDBRN to {to_user_account} as it is less than 0.5", flush=True)
2024-12-05 19:20:47 +11:00
return
2024-12-05 19:38:28 +11:00
print(f"Minting {amount} stWDBRN to {to_user_account}", flush=True)
2024-12-05 19:20:47 +11:00
TOKEN_PROGRAM_ID = Pubkey.from_string("TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb")
WALLET = os.getenv("WALLET")
walletbytes = []
WALLET = json.loads(WALLET)
for i in WALLET:
walletbytes.append(int(i))
2024-12-05 19:20:47 +11:00
wallet_keypair = Keypair.from_bytes(walletbytes)
token = spl.token.client.Token(solana_client,stWDBRN_token_mint,TOKEN_PROGRAM_ID,wallet_keypair)
to_Pubkey = Pubkey.from_string(to_user_account)
# Check if account exists
account = token.get_accounts_by_owner(to_Pubkey)
if len(account.value) > 1:
print(f"ERROR getting token account")
return
if len(account.value) == 0:
print("NEED TO MINT ACCOUNT")
# Create account
to_Pubkey = token.create_account(to_Pubkey)
print(f"Created token account {to_Pubkey}")
else:
to_Pubkey = account.value[0].pubkey
print(token.mint_to(to_Pubkey,wallet_keypair,int(amount*10**9)))
2024-12-05 13:38:29 +11:00
# endregion
2024-12-04 16:35:47 +11:00
# region Error Catching
# 404 catch all
2024-12-05 12:53:05 +11:00
2024-12-04 16:35:47 +11:00
@app.errorhandler(404)
def not_found(e):
return render_template("404.html"), 404
# endregion
2024-12-05 12:53:05 +11:00
2024-12-04 16:35:47 +11:00
if __name__ == "__main__":
app.run(debug=True, port=5000, host="0.0.0.0")