FireTax/chains/solana.py
Nathan Woodburn e612637589
All checks were successful
Build Docker / BuildImage (push) Successful in 35s
feat: Started on SOL integration
2024-11-28 14:05:27 +11:00

106 lines
2.9 KiB
Python

import json
import requests
import os
import time
# Chain Data
info = {
"name": "Solana",
"ticker": "SOL",
"description": "Solana Chain",
"version": "1.0",
"author": "Nathan.Woodburn/",
"APIInfo": "This chain uses helius RPC service. Please provide a helius API key."
}
def validateAddress(address: str):
return True
def importAddress(address: str):
if not os.path.exists("chain_data/solana.json"):
addresses = [{"address": address,"txs":[],"lastSynced":0}]
with open("chain_data/solana.json", "w") as f:
json.dump(addresses, f)
return True
with open("chain_data/solana.json", "r") as f:
addresses = json.load(f)
print(addresses)
for existingAddress in addresses:
if existingAddress["address"] == address:
return True
addresses.append({"address": address,"txs":[],"lastSynced":0})
with open("chain_data/solana.json", "w") as f:
json.dump(addresses, f)
return True
def listAddresses():
with open("chain_data/solana.json", "r") as f:
addresses = json.load(f)
addresses = [address["address"] for address in addresses]
return addresses
def deleteAddress(address: str):
with open("chain_data/solana.json", "r") as f:
addresses = json.load(f)
for existingAddress in addresses:
if existingAddress["address"] == address:
addresses.remove(existingAddress)
with open("chain_data/solana.json", "w") as f:
json.dump(addresses, f)
return True
return False
def addAPIKey(key: str):
with open("api_keys/solana.txt", "w") as f:
f.write(key)
return True
def sync():
with open("chain_data/solana.json", "r") as f:
addresses = json.load(f)
# Get SOLScan API key
if os.path.exists("api_keys/solana.txt"):
with open("api_keys/solana.txt", "r") as f:
apiKey = f.read()
else:
return False
allTxs = []
for address in addresses:
print("Checking address: " + address["address"])
resp = requests.get("https://api.helius.xyz/v0/addresses/" + address["address"] + "/transactions/?api-key=" + apiKey)
if resp.status_code != 200:
print("Error syncing Solana chain")
print(resp.status_code)
return False
transactions = resp.json()
print(transactions)
allTxs.append({
"address": address["address"],
"txs": transactions,
"lastSynced": time.time()
})
with open("chain_data/solana.json", "w") as f:
json.dump(allTxs, f)
return True
def getTransactions():
with open("chain_data/solana.json", "r") as f:
addresses = json.load(f)
transactions = []
for address in addresses:
for tx in address["txs"]:
transactions.append(tx)
#TODO Parse transactions
return transactions