FireTax/chains/solana.py
Nathan Woodburn 6ff1afd46d
All checks were successful
Build Docker / BuildImage (push) Successful in 51s
feat: Add initial solana integration
2024-11-28 17:26:04 +11:00

274 lines
9.6 KiB
Python

import json
import requests
import os
import time
from datetime import datetime
from price import get_historical_fiat_price
import dotenv
dotenv.load_dotenv()
fiat = os.getenv("fiat")
# Chain Data
info = {
"name": "Solana",
"ticker": "SOL",
"description": "Solana Chain",
"version": "1.0",
"author": "Nathan.Woodburn/",
"APIInfo": "This chain uses helius RPC service. Please provide a helius API key."
}
known_tokens = {
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v":"usdc",
"cbbtcf3aa214zXHbiAZQwf4122FBYbraNdFqgw4iMij":"coinbase-wrapped-btc",
"Grass7B4RdKfBCjTKgSqnXkqjwiGvQyFbuSCUJr3XXjs":"grass",
"J1toso1uCk3RLmjorhTtrVwY9HJ7X8V9yYac6Y7kGCPn":"jito-staked-sol",
"3NZ9JMVBmGAqocybic2c7LQCJScmgsAZ6vQqTDzcqmJh":"bitcoin",
"So11111111111111111111111111111111111111112":"solana",
"27G8MtK7VtTcCHkpASjSDdkWWYfoqT6ggEuKidVJidD4":"jupiter-perpetuals-liquidity-provider-token",
"JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN":"jupiter"
}
other_token_names = {
"jtojtomepa8beP8AuQc6eXt5FriJwfFMwQx2v2f9mCL":"JITO",
"9YZ2syoQHvMeksp4MYZoYMtLyFWkkyBgAsVuuJzSZwVu":"WDBRNT",
"J1toso1uCk3RLmjorhTtrVwY9HJ7X8V9yYac6Y7kGCPn":"JitoSOL",
"8HWnGWTAXiFFtNsZwgk2AyvbUqqt8gcDVVcRVCZCfXC1":"Nathan.Woodburn/",
"cbbtcf3aa214zXHbiAZQwf4122FBYbraNdFqgw4iMij":"cbBTC",
"3NZ9JMVBmGAqocybic2c7LQCJScmgsAZ6vQqTDzcqmJh":"wBTC (Wormhole)",
"So11111111111111111111111111111111111111112":"wSOL",
"27G8MtK7VtTcCHkpASjSDdkWWYfoqT6ggEuKidVJidD4":"JLP",
"JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN":"JUP"
}
def validateAddress(address: str):
return True
def importAddress(address: str):
if not os.path.exists("chain_data/solana.json"):
addresses = [{"address": address,"txs":[],"lastSynced":0}]
with open("chain_data/solana.json", "w") as f:
json.dump(addresses, f)
return True
with open("chain_data/solana.json", "r") as f:
addresses = json.load(f)
print(addresses)
for existingAddress in addresses:
if existingAddress["address"] == address:
return True
addresses.append({"address": address,"txs":[],"lastSynced":0})
with open("chain_data/solana.json", "w") as f:
json.dump(addresses, f)
return True
def listAddresses():
with open("chain_data/solana.json", "r") as f:
addresses = json.load(f)
addresses = [address["address"] for address in addresses]
return addresses
def deleteAddress(address: str):
with open("chain_data/solana.json", "r") as f:
addresses = json.load(f)
for existingAddress in addresses:
if existingAddress["address"] == address:
addresses.remove(existingAddress)
with open("chain_data/solana.json", "w") as f:
json.dump(addresses, f)
return True
return False
def addAPIKey(key: str):
with open("api_keys/solana.txt", "w") as f:
f.write(key)
return True
def sync():
with open("chain_data/solana.json", "r") as f:
addresses = json.load(f)
# Get SOLScan API key
if os.path.exists("api_keys/solana.txt"):
with open("api_keys/solana.txt", "r") as f:
apiKey = f.read()
else:
return False
allTxs = []
for address in addresses:
print("Checking address: " + address["address"])
transactions = []
resp = requests.get("https://api.helius.xyz/v0/addresses/" + address["address"] + "/transactions/?api-key=" + apiKey)
while True:
if resp.status_code != 200:
print("Error syncing Solana chain")
print(resp.status_code)
print(resp.text)
break
if len(resp.json()) == 0:
break
transactions.extend(resp.json())
resp = requests.get("https://api.helius.xyz/v0/addresses/" + address["address"] + "/transactions/?api-key=" + apiKey + "&before=" + resp.json()[-1]["signature"])
print("Checking page...")
allTxs.append({
"address": address["address"],
"txs": transactions,
"lastSynced": time.time()
})
with open("chain_data/solana.json", "w") as f:
json.dump(allTxs, f)
# Parse transactions
for address in allTxs:
for tx in address["txs"]:
parseTransaction(tx,address["address"])
return True
def getTransactions():
with open("chain_data/solana.json", "r") as f:
addresses = json.load(f)
transactions = []
for address in addresses:
for tx in address["txs"]:
transactions.append(parseTransaction(tx,address["address"]))
return transactions
def parseTransaction(tx,address):
solChange = getSOLChange(tx,address)
tokenChange = getTokenChange(tx, address)
fiatChange = getFiatChange(tx, address)
return {
"hash": tx["signature"],
"timestamp": tx["timestamp"],
"SOLChange": solChange,
"TokenChange": tokenChange,
"FiatChange": fiatChange,
"type": tx["type"],
"description": tx["description"]
}
def getSOLChange(tx,address):
for account in tx["accountData"]:
if account["account"] == address:
return account["nativeBalanceChange"] * 0.000000001
return 0
def getTokenChange(tx,address):
changes = []
for transfer in tx["tokenTransfers"]:
if transfer["fromUserAccount"] == address:
changes.append({
"userAccount": transfer["fromUserAccount"],
"tokenAccount": transfer["fromTokenAccount"],
"tokenAmount": transfer["tokenAmount"] * -1,
"mint": transfer["mint"],
"tokenStandard": transfer["tokenStandard"],
"toUserAccount": transfer["toUserAccount"],
"toTokenAccount": transfer["toTokenAccount"],
"fiat": get_token_fiat(transfer,address,tx["timestamp"])
})
if transfer["toUserAccount"] == address:
changes.append({
"userAccount": transfer["toUserAccount"],
"tokenAccount": transfer["toTokenAccount"],
"tokenAmount": transfer["tokenAmount"],
"mint": transfer["mint"],
"tokenStandard": transfer["tokenStandard"],
"fromUserAccount": transfer["fromUserAccount"],
"fromTokenAccount": transfer["fromTokenAccount"],
"fiat": get_token_fiat(transfer,address,tx["timestamp"])
})
return changes
def get_token_fiat(transfer,address,timestamp):
date = datetime.fromtimestamp(timestamp).strftime('%d-%m-%Y')
tokenID = transfer["mint"]
if tokenID in known_tokens:
tokenID = known_tokens[tokenID]
return transfer["tokenAmount"] * get_historical_fiat_price(tokenID, date)
return "Unknown"
def getFiatChange(tx,address):
solChange = getSOLChange(tx,address)
timestamp = tx["timestamp"]
date = datetime.fromtimestamp(timestamp).strftime('%d-%m-%Y')
fiat_rate = get_historical_fiat_price("solana", date)
tokenChanges = getTokenChange(tx, address)
tokenFiatChange = 0
for tokenChange in tokenChanges:
if 'fiat' in tokenChange:
if tokenChange["fiat"] != "Unknown":
tokenFiatChange += tokenChange["fiat"]
return (fiat_rate * solChange) + tokenFiatChange
def getTokenName(token_ID):
name = token_ID
if token_ID in known_tokens:
name = known_tokens[token_ID].upper()
if token_ID in other_token_names:
name = other_token_names[token_ID]
return name
def convertTimestamp(timestamp):
return datetime.fromtimestamp(timestamp).strftime('%d %b %Y %I:%M %p')
def renderSOLChange(solChange):
if solChange > 0:
if solChange < 0.001:
return f"+ <0.001 SOL"
return f"+{round(solChange,4)} SOL"
if solChange > -0.001:
return f"- <0.001 SOL"
return f"{round(solChange,4)} SOL"
def renderFiatChange(fiatChange):
if fiatChange == "Unknown":
return "Unknown Fiat Value"
if fiatChange > 0:
if fiatChange < 0.01:
return f"+ <0.01 {fiat.upper()}"
return f"+{round(fiatChange,2)} {fiat.upper()}"
if fiatChange > -0.01:
return f"- <0.01 {fiat.upper()}"
return f"{round(fiatChange,2)} {fiat.upper()}"
def getTransactionsRender():
transactions = getTransactions()
html = ""
totalChange = 0
for tx in transactions:
totalChange += tx["FiatChange"]
# If fiat change <= 0.001 then don't show
if abs(tx["FiatChange"]) <= 0.005 and len(tx["TokenChange"]) <= 1:
continue
html += f"<div class='transaction'>"
html += f"<div class='transaction-description'><a target='_blank' href='https://solscan.io/tx/{tx['hash']}'>{tx['description'] or "Unknown TX"}</a> ({convertTimestamp(tx['timestamp'])})</div>"
html += f"<div class='transaction-sol-change'>SOL change: {renderSOLChange(tx['SOLChange'])}</div>"
html += f"<div class='transaction-token-change'>"
for tokenChange in tx["TokenChange"]:
html += f"<div class='token-change'>"
html += f"<div class='token-change-amount'>{tokenChange['tokenAmount']} {getTokenName(tokenChange['mint'])} ({renderFiatChange(tokenChange['fiat'])})</div>"
html += f"</div>"
html += f"</div>"
if tx["FiatChange"] != 0:
html += f"<div class='transaction-fiat-change'>{tx['FiatChange']} {fiat.upper()}</div>"
html += f"</div>"
return f"<h3>Transactions: {len(transactions)} (P/L {renderFiatChange(totalChange)})</h3>" + html