import json import mysql.connector import requests from time import sleep import json import sys from indexerClasses import Block, Transaction import asyncio import signal import dotenv import os from flask import Flask, jsonify import threading dotenv.load_dotenv() HSD_API_KEY = "" HSD_PORT = 12037 HSD_IP = "127.0.0.1" if os.getenv("HSD_API_KEY"): HSD_API_KEY = os.getenv("HSD_API_KEY") if os.getenv("HSD_PORT"): HSD_PORT = os.getenv("HSD_PORT") if os.getenv("HSD_IP"): HSD_IP = os.getenv("HSD_IP") HSD_URL = f"http://x:{HSD_API_KEY}@{HSD_IP}:{HSD_PORT}" if os.getenv("HSD_URL"): HSD_URL = os.getenv("HSD_URL") DB_HOST = "localhost" DB_USER = "indexer" DB_PASSWORD = "supersecretpassword" DB_NAME = "fireindexer" if os.getenv("DB_HOST"): DB_HOST = os.getenv("DB_HOST") if os.getenv("DB_USER"): DB_USER = os.getenv("DB_USER") if os.getenv("DB_PASSWORD"): DB_PASSWORD = os.getenv("DB_PASSWORD") if os.getenv("DB_NAME"): DB_NAME = os.getenv("DB_NAME") # MySQL Database Setup db = mysql.connector.connect( host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME, charset='utf8mb4', collation='utf8mb4_unicode_ci', ) def indexBlock(blockHeight): """Indexes a block""" # Get block data blockData = requests.get(f"{HSD_URL}/block/{blockHeight}") if blockData.status_code != 200: print(f"Error fetching block {blockHeight}: {blockData.status_code}") return -1 print(blockHeight,end='',flush=True) saveBlock(blockData.json()) return 0 def saveTransaction(txData,blockHeight): dbT = mysql.connector.connect( host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME, charset='utf8mb4', collation='utf8mb4_unicode_ci', ) with dbT.cursor() as cursor: # Check if transaction exists in database cursor.execute("SELECT * FROM transactions WHERE hash = %s", (txData["hash"],)) txExists = cursor.fetchone() if txExists: print('*',end='',flush=True) return cursor.execute("INSERT INTO transactions (hash, witnessHash, fee, rate, mtime, block, `index`, version, inputs, outputs, locktime, hex) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", (txData["hash"], txData["witnessHash"], txData["fee"], txData["rate"], txData["mtime"], blockHeight, txData["index"], txData["version"], json.dumps(txData["inputs"]), json.dumps(txData["outputs"]), txData["locktime"], txData["hex"])) dbT.commit() dbT.close() print('.',end='',flush=True) def saveBlock(blockData): hashes = [] for tx in blockData["txs"]: saveTransaction(tx,blockData["height"]) hashes.append(tx["hash"]) # Create a new connection dbB = mysql.connector.connect( host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME, charset='utf8mb4', collation='utf8mb4_unicode_ci', ) with dbB.cursor() as cursor: # Check if block exists in database cursor.execute("SELECT * FROM blocks WHERE height = %s", (blockData["height"],)) blockExists = cursor.fetchone() if blockExists: print('-',flush=True) return cursor.execute("INSERT INTO blocks (hash, height, depth, version, prevBlock, merkleRoot, witnessRoot, treeRoot, reservedRoot, time, bits, nonce, extraNonce, mask, txs) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", (blockData["hash"], blockData["height"], blockData["depth"], blockData["version"], blockData["prevBlock"], blockData["merkleRoot"], blockData["witnessRoot"], blockData["treeRoot"], blockData["reservedRoot"], blockData["time"], blockData["bits"], blockData["nonce"], blockData["extraNonce"], blockData["mask"], json.dumps(hashes))) dbB.commit() dbB.close() print('') def setupDB(): """Creates the database tables""" with db.cursor() as cursor: cursor.execute("CREATE TABLE IF NOT EXISTS blocks (hash VARCHAR(64), height BIGINT, depth INT, version INT, prevBlock VARCHAR(64), merkleRoot VARCHAR(64), witnessRoot VARCHAR(64), treeRoot VARCHAR(64), reservedRoot VARCHAR(64), time INT, bits INT, nonce BIGINT UNSIGNED, extraNonce VARCHAR(64), mask VARCHAR(64), txs JSON)") cursor.execute("CREATE TABLE IF NOT EXISTS transactions (hash VARCHAR(64), witnessHash VARCHAR(64), fee BIGINT, rate BIGINT, mtime BIGINT, block BIGINT, `index` INT, version INT, inputs JSON, outputs JSON, locktime BIGINT, hex LONGTEXT)") # Get the newest block height in the database def getNewestBlock() -> int: """Returns the height of the newest block in the database""" dbN = mysql.connector.connect( host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME, charset='utf8mb4', collation='utf8mb4_unicode_ci', ) with dbN.cursor() as cursor: cursor.execute("SELECT height FROM blocks ORDER BY height DESC LIMIT 1") newestBlock = cursor.fetchone() if newestBlock: return int(newestBlock[0]) dbN.close() return -1 def dbCheck(): # For the first 100 blocks, check for transactions for i in range(100): with db.cursor() as cursor: cursor.execute("SELECT * FROM blocks WHERE height = %s", (i,)) block = cursor.fetchone() if not block: return block = Block(block) print(block) def getBlock(height): with db.cursor() as cursor: cursor.execute("SELECT * FROM blocks WHERE height = %s", (height,)) block = cursor.fetchone() if not block: return None return Block(block) def getTransaction(hash): with db.cursor() as cursor: cursor.execute("SELECT * FROM transactions WHERE hash = %s", (hash,)) tx = cursor.fetchone() if not tx: return None return Transaction(tx) def getNodeHeight(): response = requests.get(HSD_URL) if response.status_code != 200: print(f"Error fetching Node Block: {response.text}") return -1 info = response.json() return info["chain"]["height"] def getFirstMissingBlock(): """Finds missing block heights in the database.""" with db.cursor() as cursor: cursor.execute("SELECT height FROM blocks ORDER BY height ASC") heights = [row[0] for row in cursor.fetchall()] if not heights: return 0 block = 0 for i in heights: if i == block: block += 1 else: return block return block async def main(): blockWatcher = BlockWatcher(HSD_URL) blockWatcher.start() # Handle Ctrl+C interrupt def handle_interrupt(sig, frame): print("\nReceived interrupt, stopping...") blockWatcher.stop() signal.signal(signal.SIGINT, handle_interrupt) # Handle Ctrl+C # Keep running until interrupted while blockWatcher.running: await asyncio.sleep(1) print("Closing mempool watcher.", end="") signal.signal(signal.SIGINT, signal.SIG_DFL) while blockWatcher.closing: print(".", end="") await asyncio.sleep(1) # region Classes class BlockWatcher: def __init__(self, url ,check_interval=1): self.url = url self.checkInterval = check_interval self.running = True self.closing = False self.block = 0 def start(self): asyncio.create_task(self.loop()) async def loop(self): while self.running: response = requests.get(self.url) if response.status_code != 200: print(f"Error fetching info: {response.status_code}") await asyncio.sleep(self.checkInterval) continue info = response.json() height = info["chain"]["height"] if height > self.block: self.block = height print(f"New block: {height}") if indexBlock(height) != 0: print("Error indexing block") await asyncio.sleep(self.checkInterval) self.closing = False def stop(self): self.closing = True self.running = False class CatchUp: def __init__(self, currentHeight, targetHeight): self.currentHeight = currentHeight - 1 self.targetHeight = targetHeight self.running = True self.closing = False self.interupted = False async def catchUp(self): print(f"Catching up from {self.currentHeight} to {self.targetHeight}") def signal_handler(sig, frame): self.interupted = True self.stop() print("\n\nCaught Ctrl+C\n") signal.signal(signal.SIGINT, signal_handler) asyncio.create_task(self.loop()) while self.running: await asyncio.sleep(1) print("Stopping catch up") while self.closing: await asyncio.sleep(1) print("Stopped catch up") async def loop(self): while self.running: if self.currentHeight >= self.targetHeight: self.running = False print(f"Caught up to {self.targetHeight}") return if indexBlock(self.currentHeight + 1) != 0: print(f"Error indexing block {self.currentHeight + 1}") self.running = False return self.currentHeight += 1 self.closing = False def stop(self): self.closing = True self.running = False # endregion # region Server app = Flask(__name__) @app.route("/") def ping(): return jsonify({"status": "OK", "height": getNodeHeight(), "indexed": getNewestBlock() }) def run_server(): app.run(host='0.0.0.0', port=3000) def start_flask_in_thread(): """Starts the Flask server in a background thread.""" flask_thread = threading.Thread(target=run_server) flask_thread.daemon = True # Ensures that the Flask thread will exit when the main program exits flask_thread.start() # endregion if __name__ == "__main__": # Webserver in background start_flask_in_thread() setupDB() # Check if DB needs to catch up newestBlock = getFirstMissingBlock() NodeHeight = getNodeHeight() if newestBlock == -1: print("ERROR GETTING NEWEST BLOCK") sys.exit(1) if NodeHeight == -1: print("ERROR GETTING NODE HEIGHT") sys.exit(1) if newestBlock < NodeHeight: print(f"Database is out of sync. Catching up from {newestBlock} to {NodeHeight}") catchUpper = CatchUp(newestBlock, NodeHeight) asyncio.run(catchUpper.catchUp()) if catchUpper.interupted: sys.exit(1) print("Starting mempool watcher.") asyncio.run(main())