stWDBRN/server.py
Nathan Woodburn 606d31ba17
All checks were successful
Build Docker / BuildImage (push) Successful in 32s
fix: Add tx signature verification
2024-12-10 11:20:35 +11:00

673 lines
20 KiB
Python

from functools import lru_cache
import json
from flask import (
Flask,
make_response,
redirect,
request,
jsonify,
render_template,
send_from_directory,
send_file,
)
import os
import json
import requests
from datetime import datetime
import dotenv
import solana
import solana.rpc
from solana.rpc.api import Client
import solana.rpc.api
import solana.utils
from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solana.rpc.types import TokenAccountOpts
from pycoingecko import CoinGeckoAPI
from cachetools import TTLCache
from cachetools import cached
import threading
import time
from solders.transaction import Transaction
import spl.token.client
import cache
import spl
import spl.token
from spl.token.instructions import create_associated_token_account,mint_to,MintToParams
from solana.rpc.commitment import Confirmed
from solana.rpc.api import Client
from solana.rpc.types import TxOpts
from solana.transaction import Transaction
dotenv.load_dotenv()
app = Flask(__name__)
solana_client = Client(os.getenv("SOLANA_URL"))
blockFrost_API = os.getenv("BLOCKFROST")
stWDBRN_token_mint = Pubkey.from_string(
"mNT61ixgiLnggJ4qf5hDCNj7vTiCqnqysosjncrBydf")
vault_sol_address = Pubkey.from_string(
"NWywvhcqdkJsm1s9VVviPm9UfyDtyCW9t8kDb24PDPN")
vault_cardano_address = "stake1uy4qd785pcds7ph2jue2lrhhxa698c5959375lqdv3yphcgwc8qna"
vault_sui_address = "0x7e4fa1592e4fad084789f9fe1a4d7631a2e6477b658e777ae95351681bcbe8da"
fiat = "USD"
usd_to_fiat = 1
stablecoins = ["usdc", "usdt", "dai"]
coingecko_client = CoinGeckoAPI()
def find(name, path):
for root, dirs, files in os.walk(path):
if name in files:
return os.path.join(root, name)
# Assets routes
@app.route("/assets/<path:path>")
def send_assets(path):
if path.endswith(".json"):
return send_from_directory(
"templates/assets", path, mimetype="application/json"
)
if os.path.isfile("templates/assets/" + path):
return send_from_directory("templates/assets", path)
# Try looking in one of the directories
filename: str = path.split("/")[-1]
if (
filename.endswith(".png")
or filename.endswith(".jpg")
or filename.endswith(".jpeg")
or filename.endswith(".svg")
):
if os.path.isfile("templates/assets/img/" + filename):
return send_from_directory("templates/assets/img", filename)
if os.path.isfile("templates/assets/img/favicon/" + filename):
return send_from_directory("templates/assets/img/favicon", filename)
return render_template("404.html"), 404
# region Special routes
@app.route("/favicon.png")
def faviconPNG():
return send_from_directory("templates/assets/img", "favicon.png")
@app.route("/.well-known/<path:path>")
def wellknown(path):
# Try to proxy to https://nathan.woodburn.au/.well-known/
req = requests.get(f"https://nathan.woodburn.au/.well-known/{path}")
return make_response(
req.content, 200, {"Content-Type": req.headers["Content-Type"]}
)
# endregion
# region Main routes
@app.route("/")
def index():
tokenSupply = getTokenSupplyString()
tokenValue = getTokenPrice()
tokens = getTokens()
solValue = getSolValue()
vaultBalance = getVaultBalance()
vaultBalance = "{:.2f}".format(vaultBalance)
# pie_chart_data = [(token['symbol'].upper(), token['value'], f"{token['name']}: ${'{:.2f}'.format(token['value'])}") for token in tokens]
# pie_chart_data.append(("SOL", solValue, f"Solana: ${'{:.2f}'.format(solValue)}"))
# cardanoBalance = getCardanoValue(vault_cardano_address)
# cardanoBalance = "{:.2f}".format(cardanoBalance)
# pie_chart_data.append(
# ("ADA", cardanoBalance, f"Cardano: ${cardanoBalance}"))
# other_investment_types = {}
# other_investments = getOtherInvestments()
# for investment in other_investments:
# if investment["type"] not in other_investment_types:
# other_investment_types[investment["type"]] = 0
# other_investment_types[investment["type"]] += investment["value"]
# for investment_type in other_investment_types:
# pie_chart_data.append((investment_type, other_investment_types[investment_type], f"{investment_type}: ${other_investment_types[investment_type]}"))
return render_template("index.html", value=tokenValue, supply=tokenSupply, vault=vaultBalance)
@app.route("/<path:path>")
def catch_all(path: str):
if os.path.isfile("templates/" + path):
return render_template(path)
# Try with .html
if os.path.isfile("templates/" + path + ".html"):
return render_template(path + ".html")
if os.path.isfile("templates/" + path.strip("/") + ".html"):
return render_template(path.strip("/") + ".html")
# Try to find a file matching
if path.count("/") < 1:
# Try to find a file matching
filename = find(path, "templates")
if filename:
return send_file(filename)
return render_template("404.html"), 404
# endregion
# region Solana
@cache.file_cache()
def get_coin_price(coin_id):
try:
if coin_id in stablecoins:
return 1 * usd_to_fiat
price = coingecko_client.get_price(ids=coin_id, vs_currencies=fiat)
return price[coin_id][fiat.lower()]
except:
return 0
@cache.file_cache()
def get_token_price(token_address: str, chain='solana'):
try:
price = coingecko_client.get_token_price(
id=chain, contract_addresses=token_address, vs_currencies=fiat)
return price[token_address][fiat.lower()]
except:
return 0
def getTokenSupplyString() -> str:
supply = getTokenSupply()
return "{:.2f}".format(supply)
@cache.file_cache(60)
def getTokenSupply() -> int:
supply = solana_client.get_token_supply(stWDBRN_token_mint)
return supply.value.ui_amount
@cache.file_cache()
def getSolBalance() -> int:
SOLbalance = solana_client.get_balance(
vault_sol_address).value / 1000000000
return SOLbalance
def getSolValue() -> int:
SOLbalance = getSolBalance()
SOLPrice = get_coin_price("solana")
return SOLbalance * SOLPrice
def getVaultBalance() -> int:
# Get balance of vault
vaultBalance = 0
vaultBalance += getSolValue()
tokens = getTokens()
tokenValue = 0
for token in tokens:
tokenValue += token["value"]
vaultBalance += tokenValue
vaultBalance += getCardanoValue(vault_cardano_address)
vaultBalance += getOtherInvestmentsValue()
return vaultBalance
@cache.file_cache(300)
def getTokens(chain:str=None):
tokens = []
if chain == "solana" or chain == None:
programID = Pubkey.from_string(
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
tokenAccounts = solana_client.get_token_accounts_by_owner(
vault_sol_address,
TokenAccountOpts(program_id=programID)
)
tokenAccounts = tokenAccounts.value
for tokenAccount in tokenAccounts:
pubkey = tokenAccount.pubkey
account = solana_client.get_token_account_balance(pubkey)
mint = tokenAccount.account.data[:32]
mint = Pubkey(mint)
# Decode the mint
token = {
"mint": str(mint),
"balance": account.value.ui_amount
}
token["price"] = get_token_price(token["mint"])
token["value"] = token["price"] * token["balance"]
if token["value"] < 0.01:
continue
data = getTokenData(str(mint))
token["name"] = data["name"]
token["symbol"] = data["symbol"]
tokens.append(token)
if chain == "sui" or chain == None:
# Get SUI tokens
tokens.extend(getSuiTokens(vault_sui_address))
return tokens
def getTokenData(tokenMint, chain='solana'):
if not os.path.exists("tokens"):
os.makedirs("tokens")
if os.path.exists(f"tokens/{tokenMint}.json"):
with open(f"tokens/{tokenMint}.json") as f:
data = json.load(f)
return data
else:
data = coingecko_client.get_coin_info_from_contract_address_by_id(
chain, tokenMint)
with open(f"tokens/{tokenMint}.json", "w") as f:
json.dump(data, f)
return data
def getTokenPrice():
# Get number of tokens minted
supply = getTokenSupply()
vaultBalance = getVaultBalance()
value = vaultBalance/supply
# Round value to 2 decimal places and ensure it shows 2 decimal places
value = "{:.2f}".format(value)
return value
# endregion
# region Cardano
get_cardano_balance_cache = TTLCache(maxsize=1, ttl=3600)
@cache.file_cache(300)
def getCardanoBalance(address: str):
# Get balance of cardano address
try:
response = requests.get(f"https://cardano-mainnet.blockfrost.io/api/v0/accounts/{address}", headers={"project_id": blockFrost_API})
if response.status_code != 200:
print("Error getting cardano balance")
return 0
data = response.json()
if "controlled_amount" in data:
return int(data["controlled_amount"]) / 1000000
return 0
except:
print("Error getting cardano balance")
return 0
def getCardanoValue(address: str):
balance = getCardanoBalance(address)
price = get_coin_price("cardano")
return balance * price
# endregion
# region Sui
@cache.file_cache(300)
def getSuiTokens(address: str):
url = "https://fullnode.mainnet.sui.io/"
# Define the payload for the RPC call
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "suix_getAllBalances",
"params": [
address
]
}
headers = {
"Content-Type": "application/json"
}
# Make the POST request
tokens = []
try:
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status() # Raise an HTTPError for bad responses
result = response.json()
for coin in result['result']:
token = {
"mint": coin['coinType'],
"balance": int(coin['totalBalance'])/10**9,
}
token["price"] = get_token_price(token["mint"], 'sui')
token["value"] = token["price"] * token["balance"]
if token["value"] < 0.01:
continue
data = getTokenData(str(token["mint"]), 'sui')
token["name"] = data["name"]
token["symbol"] = data["symbol"]
tokens.append(token)
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
return tokens
# endregion
# region Other Investments
@cache.file_cache(300)
def getOtherInvestments():
data = requests.get("https://cloud.woodburn.au/s/stwdbrn_other/download/other_investments.json")
return data.json()
def getOtherInvestmentTypes():
data = getOtherInvestments()
types = {}
for investment in data:
if investment["type"] not in types:
types[investment["type"]] = {
"name": investment["type"],
"description": investment["type"],
"value": 0,
"amount": 0,
}
types[investment["type"]]["value"] += investment["value"]
types[investment["type"]]["amount"] += 1
return types
def getOtherInvestmentsValue():
data = getOtherInvestments()
value = 0
for investment in data:
value += investment["value"]
return value
# endregion
# region API Routes
@app.route("/api/v1/tokens")
def api_tokens():
tokens = getTokens("solana")
for t in tokens:
t["url"] = f"https://explorer.solana.com/address/{t['mint']}"
return jsonify(tokens)
@app.route("/api/v1/token")
def api_token():
# Get number of tokens minted
supply = getTokenSupply()
token = {}
# Get balance of vault
token["SOL"] = {
"name": "Solana",
"amount": round(getSolBalance() / supply,4),
"value": round(getSolValue() / supply,2)
}
if token["SOL"]["value"] < 0.01:
token["SOL"]["amount"] = 0
token["SOL"]["value"] = 0
tokens = getTokens()
for t in tokens:
token[t["symbol"].upper()] = t
if token[t["symbol"].upper()]["value"]/supply < 0.01:
token[t["symbol"].upper()]["amount"] = 0
token[t["symbol"].upper()]["value"] = 0
else:
token[t["symbol"].upper()]["amount"] = t["balance"] / supply
token[t["symbol"].upper()]["value"] = t["price"] * t["balance"] / supply
# Round value to 4 decimal places
token[t["symbol"].upper()]["value"] = round(token[t["symbol"].upper()]["value"], 2)
token[t["symbol"].upper()]["amount"] = round(token[t["symbol"].upper()]["amount"], 4)
# Remove balance key
del token[t["symbol"].upper()]["balance"]
token["ADA"] = {
"name": "Cardano",
"amount": round(getCardanoBalance(vault_cardano_address) / supply,4),
"value": round(getCardanoValue(vault_cardano_address) / supply,2)
}
if token["ADA"]["value"] < 0.01:
token["ADA"]["amount"] = 0
token["ADA"]["value"] = 0
# For each key add tooltip
for key in token:
token[key]["tooltip"] = f"{token[key]['amount']} {key} (${token[key]['value']})"
other_investment_types = getOtherInvestmentTypes()
for investment_type in other_investment_types:
token[investment_type] = {
"name": f'{other_investment_types[investment_type]["name"]} Positions',
"description": other_investment_types[investment_type]["description"],
"value": round(other_investment_types[investment_type]["value"] / supply,2),
"amount": other_investment_types[investment_type]["amount"],
"tooltip": f"{other_investment_types[investment_type]['amount']} Positions (${other_investment_types[investment_type]['value']})"
}
token["total"] = {
"name": "stWDBRN",
"description": "stWDBRN total value (USD)",
"amount": 1,
"value":float(getTokenPrice())
}
return jsonify(token)
@app.route("/api/v1/vault")
def api_vault():
tokens = getTokens()
vaultBalance = getVaultBalance()
vaultBalance = "{:.2f}".format(vaultBalance)
vault = {}
vault["SOL"] = {
"name": "Solana",
"amount": round(getSolBalance(),4),
"value": round(getSolValue(),2)
}
if vault["SOL"]["value"] < 0.01:
vault["SOL"]["amount"] = 0
vault["SOL"]["value"] = 0
vault["ADA"] = {
"name": "Cardano",
"amount": round(getCardanoBalance(vault_cardano_address),4),
"value": round(getCardanoValue(vault_cardano_address),2)
}
if vault["ADA"]["value"] < 0.01:
vault["ADA"]["amount"] = 0
vault["ADA"]["value"] = 0
for t in tokens:
vault[t["symbol"].upper()] = t
if vault[t["symbol"].upper()]["value"] < 0.01:
vault[t["symbol"].upper()]["amount"] = 0
vault[t["symbol"].upper()]["value"] = 0
else:
vault[t["symbol"].upper()]["amount"] = t["balance"]
vault[t["symbol"].upper()]["value"] = t["price"] * t["balance"]
# Round value to 4 decimal places
vault[t["symbol"].upper()]["value"] = round(vault[t["symbol"].upper()]["value"], 2)
vault[t["symbol"].upper()]["amount"] = round(vault[t["symbol"].upper()]["amount"], 4)
# Remove balance key
del vault[t["symbol"].upper()]["balance"]
# For each key add tooltip
for key in vault:
vault[key]["tooltip"] = f"{vault[key]['amount']} {key} (${vault[key]['value']})"
other_investment_types = getOtherInvestmentTypes()
for investment_type in other_investment_types:
vault[investment_type] = {
"name": f'{other_investment_types[investment_type]["name"]} Positions',
"description": other_investment_types[investment_type]["description"],
"value": other_investment_types[investment_type]["value"],
"amount": other_investment_types[investment_type]["amount"],
"tooltip": f"{other_investment_types[investment_type]['amount']} Positions (${other_investment_types[investment_type]['value']})"
}
vault["total"] = {
"name": "Vault",
"description": "Total Vault value (USD)",
"value": vaultBalance
}
return jsonify(vault)
@app.route("/api/v1/other")
@app.route("/api/v1/defi")
def api_other_investments():
data = getOtherInvestments()
return jsonify(data)
@app.route("/api/v1/deposit",methods=["POST"])
def api_deposit():
# Get authorization header
auth = request.headers.get("authorization")
if not auth:
return jsonify({"error": "Missing authorization header"}), 401
if auth != os.getenv("DEPOSIT_HEADER"):
return jsonify({"error": "Invalid authorization header"}), 401
# Get data
data = request.get_json()
parseDeposit(data)
return jsonify(data)
def parseDeposit(data):
stWDBRN_price = float(getTokenPrice())
for tx in data:
if 'nativeTransfers' not in tx:
continue
if 'tokenTransfers' not in tx:
continue
# Skip memo txs
if 'Memo' in json.dumps(tx):
print(f"Skipping deposit as it contains memo: {tx['description']}")
continue
signature = tx['signature']
if not os.path.exists(f"cache/txs.json"):
with open(f"cache/txs.json", "w") as f:
json.dump([], f)
with open(f"cache/txs.json") as f:
txs = json.load(f)
if signature in txs:
print(f"Skipping duplicate tx: {signature}")
continue
txs.append(signature)
with open(f"cache/txs.json", "w") as f:
json.dump(txs, f)
for transfer in tx['nativeTransfers']:
if transfer['toUserAccount'] != str(vault_sol_address):
continue
solAmount = transfer['amount'] / 1000000000
# Get USD value
solValue = get_coin_price("solana") * solAmount
stWDBRN_amount = solValue / stWDBRN_price
stWDBRN_amount = round(stWDBRN_amount, 9)
mint_stWDBRN(stWDBRN_amount, transfer['fromUserAccount'])
for transfer in tx['tokenTransfers']:
if transfer['toUserAccount'] != str(vault_sol_address):
continue
# Get token data
token_price = get_token_price(transfer['mint'])
USDvalue = transfer['tokenAmount'] * token_price
stWDBRN_amount = USDvalue / stWDBRN_price
stWDBRN_amount = round(stWDBRN_amount, 9)
mint_stWDBRN(stWDBRN_amount, transfer['fromUserAccount'])
def mint_stWDBRN(amount, to_user_account):
if amount < 0.5:
print(f"Skipping minting of {amount} stWDBRN to {to_user_account} as it is less than 0.5", flush=True)
return
# Small fee for minting
if amount > 10:
amount = amount - 0.05
print(f"Minting {amount} stWDBRN to {to_user_account}", flush=True)
TOKEN_PROGRAM_ID = Pubkey.from_string("TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb")
WALLET = os.getenv("WALLET")
walletbytes = []
WALLET = json.loads(WALLET)
for i in WALLET:
walletbytes.append(int(i))
wallet_keypair = Keypair.from_bytes(walletbytes)
token = spl.token.client.Token(solana_client,stWDBRN_token_mint,TOKEN_PROGRAM_ID,wallet_keypair)
to_Pubkey = Pubkey.from_string(to_user_account)
# Check if account exists
account = token.get_accounts_by_owner(to_Pubkey)
if len(account.value) > 1:
print(f"ERROR getting token account")
return
if len(account.value) == 0:
print("NEED TO MINT ACCOUNT")
# Create account
to_Pubkey = token.create_account(to_Pubkey)
print(f"Created token account {to_Pubkey}")
else:
to_Pubkey = account.value[0].pubkey
print(token.mint_to(to_Pubkey,wallet_keypair,int(amount*10**9)))
# endregion
# region Error Catching
# 404 catch all
@app.errorhandler(404)
def not_found(e):
return render_template("404.html"), 404
# endregion
if __name__ == "__main__":
app.run(debug=True, port=5000, host="0.0.0.0")