fireexplorer-indexer/main.py

494 lines
15 KiB
Python

import json
from clickhouse_driver import Client
import clickhouse_connect
import requests
from time import sleep
import json
import sys
from indexerClasses import Block, Transaction, Input, Output, Covenant,Name
import asyncio
import signal
import dotenv
import os
from flask import Flask, jsonify
import threading
dotenv.load_dotenv()
HSD_API_KEY = ""
HSD_PORT = 12037
HSD_IP = "127.0.0.1"
if os.getenv("HSD_API_KEY"):
HSD_API_KEY = os.getenv("HSD_API_KEY")
if os.getenv("HSD_PORT"):
HSD_PORT = os.getenv("HSD_PORT")
if os.getenv("HSD_IP"):
HSD_IP = os.getenv("HSD_IP")
HSD_URL = f"http://x:{HSD_API_KEY}@{HSD_IP}:{HSD_PORT}"
if os.getenv("HSD_URL"):
HSD_URL = os.getenv("HSD_URL")
DB_HOST = "localhost"
DB_USER = "indexer"
DB_PASSWORD = "supersecretpassword"
DB_NAME = "fireindexer"
if os.getenv("DB_HOST"):
DB_HOST = os.getenv("DB_HOST")
if os.getenv("DB_USER"):
DB_USER = os.getenv("DB_USER")
if os.getenv("DB_PASSWORD"):
DB_PASSWORD = os.getenv("DB_PASSWORD")
if os.getenv("DB_NAME"):
DB_NAME = os.getenv("DB_NAME")
# Clickhouse Database Setup
dbSave = clickhouse_connect.create_client(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
dbGet = Client(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
def indexBlock(blockHeight):
"""Indexes a block"""
# Get block data
blockData = requests.get(f"{HSD_URL}/block/{blockHeight}")
if blockData.status_code != 200:
print(f"Error fetching block {blockHeight}: {blockData.status_code}")
return -1
print(blockHeight,end='',flush=True)
saveBlock(blockData.json())
return 0
def getNameFromHash(nameHash):
name = requests.post(HSD_URL, json={"method": "getnamebyhash", "params": [nameHash]})
if name.status_code != 200:
print(f"Error fetching name {nameHash}: {name.status_code}")
return -1
name = name.json()
if not name["result"]:
return -1
name = name["result"]
nameInfo = requests.post(HSD_URL, json={"method": "getnameinfo", "params": [name]})
if nameInfo.status_code != 200:
print(f"Error fetching name info {name}: {nameInfo.status_code}")
return -1
nameInfo = nameInfo.json()
if not nameInfo["result"]:
print(f"Error fetching name info {name}: {nameInfo['error']}")
return -1
return nameInfo["result"]
def saveTransactions(txList, blockHeight):
if not txList:
return
# Prepare data for batch insert
txValues = [
(
txData["hash"], txData["witnessHash"], txData["fee"], txData["rate"],
txData["mtime"], blockHeight, txData["index"], txData["version"],
json.dumps(txData["inputs"]), json.dumps(txData["outputs"]),
txData["locktime"], txData["hex"]
)
for txData in txList
]
print(f"Inserting {len(txValues)} transactions...")
return dbSave.insert("transactions", txValues, column_names=[
"hash", "witnessHash", "fee", "rate", "mtime", "block", "tx_index", "version",
"inputs", "outputs", "locktime", "hex"
])
def saveBlock(blockData):
hashes = [tx["hash"] for tx in blockData["txs"]]
# Bulk save transactions
saveTransactions(blockData["txs"], blockData["height"])
# Insert block if it doesn't exist
blockValues = [(
blockData["hash"], blockData["height"], blockData["depth"], blockData["version"],
blockData["prevBlock"], blockData["merkleRoot"], blockData["witnessRoot"],
blockData["treeRoot"], blockData["reservedRoot"], blockData["time"],
blockData["bits"], blockData["nonce"], blockData["extraNonce"],
blockData["mask"], json.dumps(hashes) # Convert tx hashes to JSON string
)]
dbSave.insert("blocks", blockValues, column_names=[
"hash", "height", "depth", "version", "prevBlock", "merkleRoot", "witnessRoot",
"treeRoot", "reservedRoot", "time", "bits", "nonce", "extraNonce",
"mask", "txs"
])
print('block saved')
# def setupDB():
# """Creates the database tables"""
# dbSave.execute("CREATE TABLE IF NOT EXISTS blocks ( hash String, height UInt64, depth Int32, version Int32, prevBlock String, merkleRoot String, witnessRoot String, treeRoot String, reservedRoot String, time UInt32, bits Int32, nonce UInt64, extraNonce String, mask String, txs String ) ENGINE = MergeTree() ORDER BY (hash, height)")
# dbSave.execute("CREATE TABLE IF NOT EXISTS transactions ( hash String, witnessHash String, fee Int64, rate Int64, mtime Int64, block UInt64, tx_index Int32, version Int32, inputs String, outputs String, locktime Int64, hex String ) ENGINE = MergeTree() ORDER BY (hash, block)")
# dbSave.execute("CREATE TABLE IF NOT EXISTS names ( name String, nameHash String, state String, height UInt64, lastRenewal Int64, owner String, value Int64, highest Int64, data String, transfer Int64, revoked Int64, claimed Int64, renewals Int64, registered UInt8, expired UInt8, weak UInt8, stats String, start String, txs String, bids String ) ENGINE = MergeTree() ORDER BY (name, height)")
# Get the newest block height in the database
def getNewestBlock() -> int:
"""Returns the height of the newest block in the database"""
newestBlock = dbGet.query("SELECT height FROM blocks ORDER BY height DESC LIMIT 1").result
return int(newestBlock[0][0]) if newestBlock else -1
def dbCheck():
# For the first 100 blocks, check for transactions
for i in range(100):
block = dbGet.query(f"SELECT * FROM blocks WHERE height = {i}").result
if not block:
return
print(Block(block[0]))
def getBlock(height) -> Block | None:
"""Fetch a block by height"""
block = dbGet.query(f"SELECT * FROM blocks WHERE height = {height}").result
return Block(block[0]) if block else None
def getTransaction(tx_hash) -> Transaction | None:
"""Fetch a transaction by hash"""
tx = dbGet.query(f"SELECT * FROM transactions WHERE hash = '{tx_hash}'").result
return Transaction(tx[0]) if tx else None
def getTransactions(height) -> list[Transaction] | None:
"""Fetch all transactions for a given block height"""
txs = dbGet.query(f"SELECT * FROM transactions WHERE block = {height}").result
return [Transaction(tx) for tx in txs] if txs else None
def getNameFromHash(nameHash):
"""Fetch a name record by nameHash"""
name = dbGet.query(f"SELECT * FROM names WHERE nameHash = '{nameHash}'").result
return Name(name[0]) if name else -1
def getNamesFromBlock(height):
transactions = getTransactions(height)
if not transactions:
return -1
namesToSave: list[Name] = []
names = []
for tx in transactions:
for output in tx.outputs:
cov = output.covenant
if cov.type == 0: # NONE
continue
# Check if name exists in block
if cov.nameHash in names:
for name in namesToSave:
if name.nameHash == cov.nameHash:
# Remove name from list
namesToSave.remove(name)
# Update name
name.update(cov,tx)
else:
name = getNameFromHash(cov.nameHash)
if name == -1:
# Create new name
name = Name(cov)
name.txs.append(tx.hash)
name.height = height
else:
name.update(cov,tx)
namesToSave.append(name)
queryData = []
for name in namesToSave:
nameInfo = name.toJSON()
queryData.append((
nameInfo["name"],
nameInfo["nameHash"],
nameInfo["state"],
nameInfo["height"],
nameInfo["lastRenewal"],
json.dumps(nameInfo["owner"]),
nameInfo["value"],
nameInfo["highest"],
json.dumps(nameInfo["data"]),
json.dumps(nameInfo["transfer"]),
nameInfo["revoked"],
nameInfo["claimed"],
nameInfo["renewals"],
nameInfo["registered"],
nameInfo["expired"],
nameInfo["weak"],
json.dumps(nameInfo["stats"]),
json.dumps(nameInfo["start"]),
json.dumps(nameInfo["txs"]),
json.dumps(nameInfo["bids"])
))
dbSave.insert("names", queryData, column_names=[
"name", "nameHash", "state", "height", "lastRenewal", "owner", "value", "highest",
"data", "transfer", "revoked", "claimed", "renewals", "registered", "expired",
"weak", "stats", "start", "txs", "bids"
])
return 0
def getNodeHeight():
response = requests.get(HSD_URL)
if response.status_code != 200:
print(f"Error fetching Node Block: {response.text}")
return -1
info = response.json()
return info["chain"]["height"]
def getFirstMissingBlock():
"""Finds missing block heights in the database."""
height = dbGet.execute("SELECT height FROM blocks ORDER BY height ASC")
heights = [row[0] for row in height]
if not heights:
return 0
block = 0
for i in heights:
if i == block:
block += 1
else:
return block
return block
async def main():
blockWatcher = BlockWatcher(HSD_URL)
blockWatcher.start()
# Handle Ctrl+C interrupt
def handle_interrupt(sig, frame):
print("\nReceived interrupt, stopping...")
blockWatcher.stop()
signal.signal(signal.SIGINT, handle_interrupt) # Handle Ctrl+C
# Keep running until interrupted
while blockWatcher.running:
await asyncio.sleep(1)
print("Closing mempool watcher.", end="")
signal.signal(signal.SIGINT, signal.SIG_DFL)
while blockWatcher.closing:
print(".", end="")
await asyncio.sleep(1)
# region Classes
class BlockWatcher:
def __init__(self, url ,check_interval=1):
self.url = url
self.checkInterval = check_interval
self.running = True
self.closing = False
self.block = 0
def start(self):
asyncio.create_task(self.loop())
async def loop(self):
while self.running:
response = requests.get(self.url)
if response.status_code != 200:
print(f"Error fetching info: {response.status_code}")
await asyncio.sleep(self.checkInterval)
continue
info = response.json()
height = info["chain"]["height"]
if height > self.block:
self.block = height
print(f"New block: {height}")
if indexBlock(height) != 0:
print("Error indexing block")
self.block = self.block - 1
else:
# Check if there are any new names
if getNamesFromBlock(height) < 0:
print("Error indexing names")
await asyncio.sleep(self.checkInterval)
self.closing = False
def stop(self):
self.closing = True
self.running = False
class CatchUp:
def __init__(self, currentHeight, targetHeight):
self.currentHeight = currentHeight - 1
self.targetHeight = targetHeight
self.running = True
self.closing = False
self.interupted = False
async def catchUp(self):
print(f"Catching up from {self.currentHeight} to {self.targetHeight}")
def signal_handler(sig, frame):
self.interupted = True
self.stop()
print("\n\nCaught Ctrl+C\n")
signal.signal(signal.SIGINT, signal_handler)
asyncio.create_task(self.loop())
while self.running:
await asyncio.sleep(1)
print("Stopping catch up")
while self.closing:
await asyncio.sleep(1)
print("Stopped catch up")
async def loop(self):
while self.running:
if self.currentHeight >= self.targetHeight:
self.running = False
print(f"Caught up to {self.targetHeight}")
return
if indexBlock(self.currentHeight + 1) != 0:
print(f"Error indexing block {self.currentHeight + 1}")
self.running = False
return
self.currentHeight += 1
self.closing = False
def stop(self):
self.closing = True
self.running = False
class NameSyncer:
def __init__(self, currentHeight, targetHeight):
self.currentHeight = currentHeight - 1
self.targetHeight = targetHeight
self.running = True
self.closing = False
self.interupted = False
async def sync(self):
print(f"Syncing names from {self.currentHeight} to {self.targetHeight}")
def signal_handler(sig, frame):
self.interupted = True
self.stop()
print("\n\nCaught Ctrl+C\n")
signal.signal(signal.SIGINT, signal_handler)
asyncio.create_task(self.loop())
while self.running:
await asyncio.sleep(1)
print("Stopping catch up")
while self.closing:
await asyncio.sleep(1)
print("Stopped catch up")
async def loop(self):
while self.running:
if self.currentHeight >= self.targetHeight:
self.running = False
print(f"Caught up to {self.targetHeight}")
return
if getNamesFromBlock(self.currentHeight + 1) != 0:
print(f"Error indexing names {self.currentHeight + 1}")
self.running = False
return
self.currentHeight += 1
self.closing = False
def stop(self):
self.closing = True
self.running = False
# endregion
# region Server
app = Flask(__name__)
@app.route("/")
def ping():
return jsonify({"status": "OK",
"height": getNodeHeight(),
"indexed": getNewestBlock()
})
def run_server():
app.run(host='0.0.0.0', port=3000)
def start_flask_in_thread():
"""Starts the Flask server in a background thread."""
flask_thread = threading.Thread(target=run_server)
flask_thread.daemon = True # Ensures that the Flask thread will exit when the main program exits
flask_thread.start()
# endregion
if __name__ == "__main__":
# Webserver in background
start_flask_in_thread()
# setupDB()
# Check if DB needs to catch up
newestBlock = getFirstMissingBlock()
NodeHeight = getNodeHeight()
if newestBlock == -1:
print("ERROR GETTING NEWEST BLOCK")
sys.exit(1)
if NodeHeight == -1:
print("ERROR GETTING NODE HEIGHT")
sys.exit(1)
if newestBlock < NodeHeight:
print(f"Database is out of sync. Catching up from {newestBlock} to {NodeHeight}")
catchUpper = CatchUp(newestBlock, NodeHeight)
asyncio.run(catchUpper.catchUp())
if catchUpper.interupted:
sys.exit(1)
# Get names
namesyncer = NameSyncer(2000, 2025)
asyncio.run(namesyncer.sync())
if namesyncer.interupted:
sys.exit(1)
# print("Starting mempool watcher.")
# asyncio.run(main())
print("Finished")