fireexplorer-indexer/main.py

374 lines
11 KiB
Python

import json
import mysql.connector
import requests
from time import sleep
import json
import sys
from indexerClasses import Block, Transaction
import asyncio
import signal
import dotenv
import os
from flask import Flask, jsonify
import threading
dotenv.load_dotenv()
HSD_API_KEY = ""
HSD_PORT = 12037
HSD_IP = "127.0.0.1"
if os.getenv("HSD_API_KEY"):
HSD_API_KEY = os.getenv("HSD_API_KEY")
if os.getenv("HSD_PORT"):
HSD_PORT = os.getenv("HSD_PORT")
if os.getenv("HSD_IP"):
HSD_IP = os.getenv("HSD_IP")
HSD_URL = f"http://x:{HSD_API_KEY}@{HSD_IP}:{HSD_PORT}"
if os.getenv("HSD_URL"):
HSD_URL = os.getenv("HSD_URL")
DB_HOST = "localhost"
DB_USER = "indexer"
DB_PASSWORD = "supersecretpassword"
DB_NAME = "fireindexer"
if os.getenv("DB_HOST"):
DB_HOST = os.getenv("DB_HOST")
if os.getenv("DB_USER"):
DB_USER = os.getenv("DB_USER")
if os.getenv("DB_PASSWORD"):
DB_PASSWORD = os.getenv("DB_PASSWORD")
if os.getenv("DB_NAME"):
DB_NAME = os.getenv("DB_NAME")
# MySQL Database Setup
dbSave = mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
charset='utf8mb4',
collation='utf8mb4_unicode_ci',
)
dbGet = mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
charset='utf8mb4',
collation='utf8mb4_unicode_ci',
)
def indexBlock(blockHeight):
"""Indexes a block"""
# Get block data
blockData = requests.get(f"{HSD_URL}/block/{blockHeight}")
if blockData.status_code != 200:
print(f"Error fetching block {blockHeight}: {blockData.status_code}")
return -1
print(blockHeight,end='',flush=True)
saveBlock(blockData.json())
return 0
def saveTransactions(txList, blockHeight):
if not txList:
return
# Prepare data for batch insert
txValues = []
for txData in txList:
print('.', end='', flush=True)
txValues.append((
txData["hash"], txData["witnessHash"], txData["fee"], txData["rate"],
txData["mtime"], blockHeight, txData["index"], txData["version"],
json.dumps(txData["inputs"]), json.dumps(txData["outputs"]),
txData["locktime"], txData["hex"]
))
# Bulk insert transactions
query = """
INSERT INTO transactions (hash, witnessHash, fee, rate, mtime, block, `index`, version,
inputs, outputs, locktime, hex)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE hash=hash
"""
with dbSave.cursor() as cursor:
cursor.executemany(query, txValues)
dbSave.commit()
def saveBlock(blockData):
hashes = [tx["hash"] for tx in blockData["txs"]]
# Bulk save transactions
saveTransactions(blockData["txs"], blockData["height"])
# Insert block if it doesn't exist
query = """
INSERT INTO blocks (hash, height, depth, version, prevBlock, merkleRoot, witnessRoot,
treeRoot, reservedRoot, time, bits, nonce, extraNonce, mask, txs)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE hash=hash
"""
blockValues = (
blockData["hash"], blockData["height"], blockData["depth"], blockData["version"],
blockData["prevBlock"], blockData["merkleRoot"], blockData["witnessRoot"],
blockData["treeRoot"], blockData["reservedRoot"], blockData["time"],
blockData["bits"], blockData["nonce"], blockData["extraNonce"],
blockData["mask"], json.dumps(hashes)
)
with dbSave.cursor() as cursor:
cursor.execute(query, blockValues)
dbSave.commit()
print('')
def setupDB():
"""Creates the database tables"""
with dbSave.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS blocks (hash VARCHAR(64), height BIGINT, depth INT, version INT, prevBlock VARCHAR(64), merkleRoot VARCHAR(64), witnessRoot VARCHAR(64), treeRoot VARCHAR(64), reservedRoot VARCHAR(64), time INT, bits INT, nonce BIGINT UNSIGNED, extraNonce VARCHAR(64), mask VARCHAR(64), txs JSON)")
cursor.execute("CREATE TABLE IF NOT EXISTS transactions (hash VARCHAR(64), witnessHash VARCHAR(64), fee BIGINT, rate BIGINT, mtime BIGINT, block BIGINT, `index` INT, version INT, inputs JSON, outputs JSON, locktime BIGINT, hex LONGTEXT)")
# Get the newest block height in the database
def getNewestBlock() -> int:
"""Returns the height of the newest block in the database"""
dbNB = mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
charset='utf8mb4',
collation='utf8mb4_unicode_ci',
)
with dbNB.cursor() as cursor:
cursor.execute("SELECT height FROM blocks ORDER BY height DESC LIMIT 1")
newestBlock = cursor.fetchone()
if newestBlock:
return int(newestBlock[0])
dbNB.close()
return -1
def dbCheck():
# For the first 100 blocks, check for transactions
for i in range(100):
with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM blocks WHERE height = %s", (i,))
block = cursor.fetchone()
if not block:
return
block = Block(block)
print(block)
def getBlock(height):
with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM blocks WHERE height = %s", (height,))
block = cursor.fetchone()
if not block:
return None
return Block(block)
def getTransaction(hash):
with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM transactions WHERE hash = %s", (hash,))
tx = cursor.fetchone()
if not tx:
return None
return Transaction(tx)
def getNodeHeight():
response = requests.get(HSD_URL)
if response.status_code != 200:
print(f"Error fetching Node Block: {response.text}")
return -1
info = response.json()
return info["chain"]["height"]
def getFirstMissingBlock():
"""Finds missing block heights in the database."""
with dbGet.cursor() as cursor:
cursor.execute("SELECT height FROM blocks ORDER BY height ASC")
heights = [row[0] for row in cursor.fetchall()]
if not heights:
return 0
block = 0
for i in heights:
if i == block:
block += 1
else:
return block
return block
async def main():
blockWatcher = BlockWatcher(HSD_URL)
blockWatcher.start()
# Handle Ctrl+C interrupt
def handle_interrupt(sig, frame):
print("\nReceived interrupt, stopping...")
blockWatcher.stop()
signal.signal(signal.SIGINT, handle_interrupt) # Handle Ctrl+C
# Keep running until interrupted
while blockWatcher.running:
await asyncio.sleep(1)
print("Closing mempool watcher.", end="")
signal.signal(signal.SIGINT, signal.SIG_DFL)
while blockWatcher.closing:
print(".", end="")
await asyncio.sleep(1)
# region Classes
class BlockWatcher:
def __init__(self, url ,check_interval=1):
self.url = url
self.checkInterval = check_interval
self.running = True
self.closing = False
self.block = 0
def start(self):
asyncio.create_task(self.loop())
async def loop(self):
while self.running:
response = requests.get(self.url)
if response.status_code != 200:
print(f"Error fetching info: {response.status_code}")
await asyncio.sleep(self.checkInterval)
continue
info = response.json()
height = info["chain"]["height"]
if height > self.block:
self.block = height
print(f"New block: {height}")
if indexBlock(height) != 0:
print("Error indexing block")
self.block = self.block - 1
await asyncio.sleep(self.checkInterval)
self.closing = False
def stop(self):
self.closing = True
self.running = False
class CatchUp:
def __init__(self, currentHeight, targetHeight):
self.currentHeight = currentHeight - 1
self.targetHeight = targetHeight
self.running = True
self.closing = False
self.interupted = False
async def catchUp(self):
print(f"Catching up from {self.currentHeight} to {self.targetHeight}")
def signal_handler(sig, frame):
self.interupted = True
self.stop()
print("\n\nCaught Ctrl+C\n")
signal.signal(signal.SIGINT, signal_handler)
asyncio.create_task(self.loop())
while self.running:
await asyncio.sleep(1)
print("Stopping catch up")
while self.closing:
await asyncio.sleep(1)
print("Stopped catch up")
async def loop(self):
while self.running:
if self.currentHeight >= self.targetHeight:
self.running = False
print(f"Caught up to {self.targetHeight}")
return
if indexBlock(self.currentHeight + 1) != 0:
print(f"Error indexing block {self.currentHeight + 1}")
self.running = False
return
self.currentHeight += 1
self.closing = False
def stop(self):
self.closing = True
self.running = False
# endregion
# region Server
app = Flask(__name__)
@app.route("/")
def ping():
return jsonify({"status": "OK",
"height": getNodeHeight(),
"indexed": getNewestBlock()
})
def run_server():
app.run(host='0.0.0.0', port=3000)
def start_flask_in_thread():
"""Starts the Flask server in a background thread."""
flask_thread = threading.Thread(target=run_server)
flask_thread.daemon = True # Ensures that the Flask thread will exit when the main program exits
flask_thread.start()
# endregion
if __name__ == "__main__":
# Webserver in background
start_flask_in_thread()
setupDB()
# Check if DB needs to catch up
newestBlock = getFirstMissingBlock()
NodeHeight = getNodeHeight()
if newestBlock == -1:
print("ERROR GETTING NEWEST BLOCK")
sys.exit(1)
if NodeHeight == -1:
print("ERROR GETTING NODE HEIGHT")
sys.exit(1)
if newestBlock < NodeHeight:
print(f"Database is out of sync. Catching up from {newestBlock} to {NodeHeight}")
catchUpper = CatchUp(newestBlock, NodeHeight)
asyncio.run(catchUpper.catchUp())
if catchUpper.interupted:
sys.exit(1)
print("Starting mempool watcher.")
asyncio.run(main())