generated from nathanwoodburn/python-webserver-template
Nathan Woodburn
6ff1afd46d
All checks were successful
Build Docker / BuildImage (push) Successful in 51s
274 lines
9.6 KiB
Python
274 lines
9.6 KiB
Python
import json
|
|
import requests
|
|
import os
|
|
import time
|
|
from datetime import datetime
|
|
from price import get_historical_fiat_price
|
|
import dotenv
|
|
|
|
dotenv.load_dotenv()
|
|
|
|
fiat = os.getenv("fiat")
|
|
|
|
# Chain Data
|
|
info = {
|
|
"name": "Solana",
|
|
"ticker": "SOL",
|
|
"description": "Solana Chain",
|
|
"version": "1.0",
|
|
"author": "Nathan.Woodburn/",
|
|
"APIInfo": "This chain uses helius RPC service. Please provide a helius API key."
|
|
}
|
|
|
|
|
|
known_tokens = {
|
|
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v":"usdc",
|
|
"cbbtcf3aa214zXHbiAZQwf4122FBYbraNdFqgw4iMij":"coinbase-wrapped-btc",
|
|
"Grass7B4RdKfBCjTKgSqnXkqjwiGvQyFbuSCUJr3XXjs":"grass",
|
|
"J1toso1uCk3RLmjorhTtrVwY9HJ7X8V9yYac6Y7kGCPn":"jito-staked-sol",
|
|
"3NZ9JMVBmGAqocybic2c7LQCJScmgsAZ6vQqTDzcqmJh":"bitcoin",
|
|
"So11111111111111111111111111111111111111112":"solana",
|
|
"27G8MtK7VtTcCHkpASjSDdkWWYfoqT6ggEuKidVJidD4":"jupiter-perpetuals-liquidity-provider-token",
|
|
"JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN":"jupiter"
|
|
}
|
|
other_token_names = {
|
|
"jtojtomepa8beP8AuQc6eXt5FriJwfFMwQx2v2f9mCL":"JITO",
|
|
"9YZ2syoQHvMeksp4MYZoYMtLyFWkkyBgAsVuuJzSZwVu":"WDBRNT",
|
|
"J1toso1uCk3RLmjorhTtrVwY9HJ7X8V9yYac6Y7kGCPn":"JitoSOL",
|
|
"8HWnGWTAXiFFtNsZwgk2AyvbUqqt8gcDVVcRVCZCfXC1":"Nathan.Woodburn/",
|
|
"cbbtcf3aa214zXHbiAZQwf4122FBYbraNdFqgw4iMij":"cbBTC",
|
|
"3NZ9JMVBmGAqocybic2c7LQCJScmgsAZ6vQqTDzcqmJh":"wBTC (Wormhole)",
|
|
"So11111111111111111111111111111111111111112":"wSOL",
|
|
"27G8MtK7VtTcCHkpASjSDdkWWYfoqT6ggEuKidVJidD4":"JLP",
|
|
"JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN":"JUP"
|
|
}
|
|
|
|
def validateAddress(address: str):
|
|
return True
|
|
|
|
def importAddress(address: str):
|
|
if not os.path.exists("chain_data/solana.json"):
|
|
addresses = [{"address": address,"txs":[],"lastSynced":0}]
|
|
with open("chain_data/solana.json", "w") as f:
|
|
json.dump(addresses, f)
|
|
return True
|
|
|
|
with open("chain_data/solana.json", "r") as f:
|
|
addresses = json.load(f)
|
|
print(addresses)
|
|
for existingAddress in addresses:
|
|
if existingAddress["address"] == address:
|
|
return True
|
|
|
|
addresses.append({"address": address,"txs":[],"lastSynced":0})
|
|
with open("chain_data/solana.json", "w") as f:
|
|
json.dump(addresses, f)
|
|
return True
|
|
|
|
|
|
def listAddresses():
|
|
with open("chain_data/solana.json", "r") as f:
|
|
addresses = json.load(f)
|
|
addresses = [address["address"] for address in addresses]
|
|
return addresses
|
|
|
|
def deleteAddress(address: str):
|
|
with open("chain_data/solana.json", "r") as f:
|
|
addresses = json.load(f)
|
|
for existingAddress in addresses:
|
|
if existingAddress["address"] == address:
|
|
addresses.remove(existingAddress)
|
|
with open("chain_data/solana.json", "w") as f:
|
|
json.dump(addresses, f)
|
|
return True
|
|
return False
|
|
|
|
def addAPIKey(key: str):
|
|
with open("api_keys/solana.txt", "w") as f:
|
|
f.write(key)
|
|
return True
|
|
|
|
def sync():
|
|
with open("chain_data/solana.json", "r") as f:
|
|
addresses = json.load(f)
|
|
|
|
# Get SOLScan API key
|
|
if os.path.exists("api_keys/solana.txt"):
|
|
with open("api_keys/solana.txt", "r") as f:
|
|
apiKey = f.read()
|
|
else:
|
|
return False
|
|
allTxs = []
|
|
for address in addresses:
|
|
print("Checking address: " + address["address"])
|
|
transactions = []
|
|
resp = requests.get("https://api.helius.xyz/v0/addresses/" + address["address"] + "/transactions/?api-key=" + apiKey)
|
|
while True:
|
|
if resp.status_code != 200:
|
|
print("Error syncing Solana chain")
|
|
print(resp.status_code)
|
|
print(resp.text)
|
|
break
|
|
if len(resp.json()) == 0:
|
|
break
|
|
transactions.extend(resp.json())
|
|
resp = requests.get("https://api.helius.xyz/v0/addresses/" + address["address"] + "/transactions/?api-key=" + apiKey + "&before=" + resp.json()[-1]["signature"])
|
|
print("Checking page...")
|
|
|
|
|
|
allTxs.append({
|
|
"address": address["address"],
|
|
"txs": transactions,
|
|
"lastSynced": time.time()
|
|
})
|
|
|
|
with open("chain_data/solana.json", "w") as f:
|
|
json.dump(allTxs, f)
|
|
|
|
# Parse transactions
|
|
for address in allTxs:
|
|
for tx in address["txs"]:
|
|
parseTransaction(tx,address["address"])
|
|
|
|
return True
|
|
|
|
def getTransactions():
|
|
with open("chain_data/solana.json", "r") as f:
|
|
addresses = json.load(f)
|
|
transactions = []
|
|
for address in addresses:
|
|
for tx in address["txs"]:
|
|
transactions.append(parseTransaction(tx,address["address"]))
|
|
|
|
return transactions
|
|
|
|
def parseTransaction(tx,address):
|
|
solChange = getSOLChange(tx,address)
|
|
tokenChange = getTokenChange(tx, address)
|
|
fiatChange = getFiatChange(tx, address)
|
|
return {
|
|
"hash": tx["signature"],
|
|
"timestamp": tx["timestamp"],
|
|
"SOLChange": solChange,
|
|
"TokenChange": tokenChange,
|
|
"FiatChange": fiatChange,
|
|
"type": tx["type"],
|
|
"description": tx["description"]
|
|
}
|
|
|
|
def getSOLChange(tx,address):
|
|
for account in tx["accountData"]:
|
|
if account["account"] == address:
|
|
return account["nativeBalanceChange"] * 0.000000001
|
|
return 0
|
|
|
|
def getTokenChange(tx,address):
|
|
changes = []
|
|
for transfer in tx["tokenTransfers"]:
|
|
if transfer["fromUserAccount"] == address:
|
|
changes.append({
|
|
"userAccount": transfer["fromUserAccount"],
|
|
"tokenAccount": transfer["fromTokenAccount"],
|
|
"tokenAmount": transfer["tokenAmount"] * -1,
|
|
"mint": transfer["mint"],
|
|
"tokenStandard": transfer["tokenStandard"],
|
|
"toUserAccount": transfer["toUserAccount"],
|
|
"toTokenAccount": transfer["toTokenAccount"],
|
|
"fiat": get_token_fiat(transfer,address,tx["timestamp"])
|
|
})
|
|
if transfer["toUserAccount"] == address:
|
|
changes.append({
|
|
"userAccount": transfer["toUserAccount"],
|
|
"tokenAccount": transfer["toTokenAccount"],
|
|
"tokenAmount": transfer["tokenAmount"],
|
|
"mint": transfer["mint"],
|
|
"tokenStandard": transfer["tokenStandard"],
|
|
"fromUserAccount": transfer["fromUserAccount"],
|
|
"fromTokenAccount": transfer["fromTokenAccount"],
|
|
"fiat": get_token_fiat(transfer,address,tx["timestamp"])
|
|
})
|
|
|
|
return changes
|
|
|
|
def get_token_fiat(transfer,address,timestamp):
|
|
date = datetime.fromtimestamp(timestamp).strftime('%d-%m-%Y')
|
|
tokenID = transfer["mint"]
|
|
if tokenID in known_tokens:
|
|
tokenID = known_tokens[tokenID]
|
|
return transfer["tokenAmount"] * get_historical_fiat_price(tokenID, date)
|
|
return "Unknown"
|
|
|
|
def getFiatChange(tx,address):
|
|
solChange = getSOLChange(tx,address)
|
|
timestamp = tx["timestamp"]
|
|
date = datetime.fromtimestamp(timestamp).strftime('%d-%m-%Y')
|
|
|
|
fiat_rate = get_historical_fiat_price("solana", date)
|
|
|
|
tokenChanges = getTokenChange(tx, address)
|
|
tokenFiatChange = 0
|
|
for tokenChange in tokenChanges:
|
|
if 'fiat' in tokenChange:
|
|
if tokenChange["fiat"] != "Unknown":
|
|
tokenFiatChange += tokenChange["fiat"]
|
|
|
|
return (fiat_rate * solChange) + tokenFiatChange
|
|
|
|
def getTokenName(token_ID):
|
|
name = token_ID
|
|
if token_ID in known_tokens:
|
|
name = known_tokens[token_ID].upper()
|
|
if token_ID in other_token_names:
|
|
name = other_token_names[token_ID]
|
|
return name
|
|
def convertTimestamp(timestamp):
|
|
return datetime.fromtimestamp(timestamp).strftime('%d %b %Y %I:%M %p')
|
|
|
|
def renderSOLChange(solChange):
|
|
if solChange > 0:
|
|
if solChange < 0.001:
|
|
return f"+ <0.001 SOL"
|
|
return f"+{round(solChange,4)} SOL"
|
|
|
|
if solChange > -0.001:
|
|
return f"- <0.001 SOL"
|
|
return f"{round(solChange,4)} SOL"
|
|
|
|
def renderFiatChange(fiatChange):
|
|
if fiatChange == "Unknown":
|
|
return "Unknown Fiat Value"
|
|
if fiatChange > 0:
|
|
if fiatChange < 0.01:
|
|
return f"+ <0.01 {fiat.upper()}"
|
|
return f"+{round(fiatChange,2)} {fiat.upper()}"
|
|
|
|
if fiatChange > -0.01:
|
|
return f"- <0.01 {fiat.upper()}"
|
|
return f"{round(fiatChange,2)} {fiat.upper()}"
|
|
|
|
|
|
def getTransactionsRender():
|
|
transactions = getTransactions()
|
|
html = ""
|
|
|
|
totalChange = 0
|
|
for tx in transactions:
|
|
totalChange += tx["FiatChange"]
|
|
# If fiat change <= 0.001 then don't show
|
|
if abs(tx["FiatChange"]) <= 0.005 and len(tx["TokenChange"]) <= 1:
|
|
continue
|
|
|
|
html += f"<div class='transaction'>"
|
|
html += f"<div class='transaction-description'><a target='_blank' href='https://solscan.io/tx/{tx['hash']}'>{tx['description'] or "Unknown TX"}</a> ({convertTimestamp(tx['timestamp'])})</div>"
|
|
html += f"<div class='transaction-sol-change'>SOL change: {renderSOLChange(tx['SOLChange'])}</div>"
|
|
html += f"<div class='transaction-token-change'>"
|
|
for tokenChange in tx["TokenChange"]:
|
|
html += f"<div class='token-change'>"
|
|
html += f"<div class='token-change-amount'>{tokenChange['tokenAmount']} {getTokenName(tokenChange['mint'])} ({renderFiatChange(tokenChange['fiat'])})</div>"
|
|
html += f"</div>"
|
|
html += f"</div>"
|
|
|
|
if tx["FiatChange"] != 0:
|
|
html += f"<div class='transaction-fiat-change'>{tx['FiatChange']} {fiat.upper()}</div>"
|
|
html += f"</div>"
|
|
return f"<h3>Transactions: {len(transactions)} (P/L {renderFiatChange(totalChange)})</h3>" + html |