From e30ced94c34e625fd784c7424a08bfa54787ff6a Mon Sep 17 00:00:00 2001 From: Nathan Woodburn Date: Sat, 8 Feb 2025 14:04:25 +1100 Subject: [PATCH] feat: Update to clickhouse db --- indexerClasses.py | 248 +++++++++++++++++++++++++++++++++++++++++++++- main.py | 220 ++++++++++++++++++++++++++++++++++++---- requirements.txt | 2 +- 3 files changed, 443 insertions(+), 27 deletions(-) diff --git a/indexerClasses.py b/indexerClasses.py index fd1c6a3..657e7e0 100644 --- a/indexerClasses.py +++ b/indexerClasses.py @@ -42,6 +42,25 @@ class Block: def __str__(self): return f"Block {self.height}" + def toJSON(self) -> dict: + return { + "hash": self.hash, + "height": self.height, + "depth": self.depth, + "version": self.version, + "prevBlock": self.prevBlock, + "merkleRoot": self.merkleRoot, + "witnessRoot": self.witnessRoot, + "treeRoot": self.treeRoot, + "reservedRoot": self.reservedRoot, + "time": self.time, + "bits": self.bits, + "nonce": self.nonce, + "extraNonce": self.extraNonce, + "mask": self.mask, + "txs": self.txs + } + class Transaction: def __init__(self, data): if isinstance(data, dict): @@ -67,10 +86,10 @@ class Transaction: self.index = data[6] self.version = data[7] # Load inputs with Input class - self.inputs = [] + self.inputs: list[Input] = [] for input in json.loads(data[8]): self.inputs.append(Input(input)) - self.outputs = [] + self.outputs: list[Output] = [] for output in json.loads(data[9]): self.outputs.append(Output(output)) self.locktime = data[10] @@ -81,6 +100,22 @@ class Transaction: def __str__(self): return f"Transaction {self.hash}" + def toJSON(self) -> dict: + return { + "hash": self.hash, + "witnessHash": self.witnessHash, + "fee": self.fee, + "rate": self.rate, + "mtime": self.mtime, + "block": self.block, + "index": self.index, + "version": self.version, + "inputs": [input.toJSON() for input in self.inputs], + "outputs": [output.toJSON() for output in self.outputs], + "locktime": self.locktime, + "hex": self.hex + } + class Input: def __init__(self, data): @@ -88,12 +123,26 @@ class Input: self.prevout = data["prevout"] self.witness = data["witness"] self.sequence = data["sequence"] - self.coin = Coin(data["coin"]) + self.address = None + self.coin = None + if "address" in data: + self.address = data["address"] + if "coin" in data: + self.coin = Coin(data["coin"]) else: raise ValueError("Invalid data type") def __str__(self): return f"Input {self.prevout['hash']} {self.coin}" + + def toJSON(self) -> dict: + return { + "prevout": self.prevout, + "witness": self.witness, + "sequence": self.sequence, + "address": self.address, + "coin": self.coin.toJSON() if self.coin else None + } class Output: @@ -108,6 +157,20 @@ class Output: def __str__(self): return f"Output {self.value} {self.address} {self.covenant}" + def toJSON(self) -> dict: + return { + "value": self.value, + "address": self.address, + "covenant": self.covenant.toJSON() + } + + +def hex_to_ascii(hex_string): + # Convert the hex string to bytes + bytes_obj = bytes.fromhex(hex_string) + # Decode the bytes object to an ASCII string + ascii_string = bytes_obj.decode('ascii') + return ascii_string class Covenant: def __init__(self, data): @@ -115,11 +178,72 @@ class Covenant: self.type = data["type"] self.action = data["action"] self.items = data["items"] + self.nameHash = None + self.height = None + self.name = None + self.flags = None + self.hash = None + self.nonce = None + self.recordData = None + self.blockHash = None + self.version = None + self.Address = None + self.claimHeight = None + self.renewalCount = None + + + if self.type > 0: # All but NONE + self.nameHash = self.items[0] + self.height = self.items[1] + + if self.type == 1: # CLAIM + self.flags = self.items[3] + + if self.type in [1,2,3]: # CLAIM, OPEN, BID + self.name = hex_to_ascii(self.items[2]) + + if self.type == 3: # BID + self.hash = self.items[3] + + if self.type == 4: # REVEAL + self.nonce = self.items[2] + + if self.type in [6,7]: # REGISTER, UPDATE + self.recordData = self.items[2] + + if self.type == 6: # REGISTER + self.blockHash = self.items[3] + + if self.type == 8: # RENEW + self.blockHash = self.items[2] + + if self.type == 9: # TRANSFER + self.version = self.items[2] + self.Address = self.items[3] + + if self.type == 10: # FINALIZE + self.name = hex_to_ascii(self.items[2]) + self.flags = self.items[3] + self.claimHeight= self.items[4] + self.renewalCount = self.items[5] + self.blockHash = self.items[6] + else: raise ValueError("Invalid data type") def __str__(self): - return f"Covenant {self.type} {self.action}" + return self.toString() + + def toString(self): + return self.action + + def toJSON(self) -> dict: + return { + "type": self.type, + "action": self.action, + "items": self.items + } + class Coin: def __init__(self, data): @@ -134,4 +258,118 @@ class Coin: raise ValueError("Invalid data type") def __str__(self): - return f"Coin {self.value} {self.address} {self.covenant}" \ No newline at end of file + return f"Coin {self.value} {self.address} {self.covenant}" + + + def toJSON(self) -> dict: + return { + "version": self.version, + "height": self.height, + "value": self.value, + "address": self.address, + "covenant": self.covenant.toJSON(), + "coinbase": self.coinbase + } + +class Bid: + def __init__(self, covenant: Covenant, tx: Transaction): + self.name = covenant.name + self.nameHash = covenant.nameHash + self.height = tx.block + self.tx: Transaction = tx + self.bidHash = covenant.hash + self.bid = covenant + self.value = 0 + self.blind = 0 + + + + + +class Name: + def __init__(self, data): + self.name = None + self.nameHash = None + self.state = "CLOSED" + self.height = 0 + self.lastRenewal = 0 + self.owner = None + self.value = 0 + self.highest = 0 + self.data = None + self.transfer = 0 + self.revoked = 0 + self.claimed = 0 + self.renewals = 0 + self.registered = False + self.expired = False + self.weak = False + self.stats = None + self.start = None + self.txs = [] + self.bids = [] + + if isinstance(data, Covenant): + if not data.type in [1,2]: + print(data.type) + raise ValueError("Invalid covenant type") + + self.name = data.name + self.nameHash = data.nameHash + self.height = data.height + + if data.type == 2: # OPEN + self.state = "OPEN" + + elif isinstance(data, dict): + for key, value in data.items(): + setattr(self, key, value) + elif isinstance(data, list) or isinstance(data, tuple): + for key, value in zip(self.__dict__.keys(), data): + setattr(self, key, value) + else: + raise ValueError("Invalid data type") + + def __str__(self): + return self.name + + + def update(self, covenant: Covenant, tx: Transaction): + if covenant.type == 0: # NONE + return + if covenant.type == 1: # CLAIM + self.state = "CLOSED" + self.claimed += 1 + if covenant.type == 2: # OPEN + self.state = "OPEN" + if self.height == 0: + self.height = covenant.height + if covenant.type == 3: # BID + bid: Bid = Bid(covenant, tx) + print(covenant.toJSON()) + + + + def toJSON(self) -> dict: + return { + "name": self.name, + "nameHash": self.nameHash, + "state": self.state, + "height": self.height, + "lastRenewal": self.lastRenewal, + "owner": self.owner, + "value": self.value, + "highest": self.highest, + "data": self.data, + "transfer": self.transfer, + "revoked": self.revoked, + "claimed": self.claimed, + "renewals": self.renewals, + "registered": self.registered, + "expired": self.expired, + "weak": self.weak, + "stats": self.stats, + "start": self.start, + "txs": self.txs, + "bids": self.bids + } \ No newline at end of file diff --git a/main.py b/main.py index f8c6cb8..9828eea 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,10 @@ import json -import mysql.connector +from clickhouse_driver import Client import requests from time import sleep import json import sys -from indexerClasses import Block, Transaction +from indexerClasses import Block, Transaction, Input, Output, Covenant,Name import asyncio import signal import dotenv @@ -45,23 +45,19 @@ if os.getenv("DB_NAME"): DB_NAME = os.getenv("DB_NAME") -# MySQL Database Setup -dbSave = mysql.connector.connect( +# Clickhouse Database Setup +dbSave = Client( host=DB_HOST, user=DB_USER, password=DB_PASSWORD, - database=DB_NAME, - charset='utf8mb4', - collation='utf8mb4_unicode_ci', + database=DB_NAME ) -dbGet = mysql.connector.connect( +dbGet = Client( host=DB_HOST, user=DB_USER, password=DB_PASSWORD, - database=DB_NAME, - charset='utf8mb4', - collation='utf8mb4_unicode_ci', + database=DB_NAME ) def indexBlock(blockHeight): @@ -76,6 +72,25 @@ def indexBlock(blockHeight): return 0 +def getNameFromHash(nameHash): + name = requests.post(HSD_URL, json={"method": "getnamebyhash", "params": [nameHash]}) + if name.status_code != 200: + print(f"Error fetching name {nameHash}: {name.status_code}") + return -1 + name = name.json() + if not name["result"]: + return -1 + name = name["result"] + nameInfo = requests.post(HSD_URL, json={"method": "getnameinfo", "params": [name]}) + if nameInfo.status_code != 200: + print(f"Error fetching name info {name}: {nameInfo.status_code}") + return -1 + nameInfo = nameInfo.json() + if not nameInfo["result"]: + print(f"Error fetching name info {name}: {nameInfo['error']}") + return -1 + return nameInfo["result"] + def saveTransactions(txList, blockHeight): if not txList: return @@ -134,14 +149,15 @@ def saveBlock(blockData): def setupDB(): """Creates the database tables""" with dbSave.cursor() as cursor: - cursor.execute("CREATE TABLE IF NOT EXISTS blocks (hash VARCHAR(64), height BIGINT, depth INT, version INT, prevBlock VARCHAR(64), merkleRoot VARCHAR(64), witnessRoot VARCHAR(64), treeRoot VARCHAR(64), reservedRoot VARCHAR(64), time INT, bits INT, nonce BIGINT UNSIGNED, extraNonce VARCHAR(64), mask VARCHAR(64), txs JSON)") - cursor.execute("CREATE TABLE IF NOT EXISTS transactions (hash VARCHAR(64), witnessHash VARCHAR(64), fee BIGINT, rate BIGINT, mtime BIGINT, block BIGINT, `index` INT, version INT, inputs JSON, outputs JSON, locktime BIGINT, hex LONGTEXT)") + cursor.execute("CREATE TABLE IF NOT EXISTS blocks ( hash String, height UInt64, depth Int32, version Int32, prevBlock String, merkleRoot String, witnessRoot String, treeRoot String, reservedRoot String, time UInt32, bits Int32, nonce UInt64, extraNonce String, mask String, txs String ) ENGINE = MergeTree() ORDER BY (hash, height)") + cursor.execute("CREATE TABLE IF NOT EXISTS transactions ( hash String, witnessHash String, fee Int64, rate Int64, mtime Int64, block UInt64, index Int32, version Int32, inputs String, outputs String, locktime Int64, hex String ) ENGINE = MergeTree() ORDER BY (hash, block)") + cursor.execute("CREATE TABLE IF NOT EXISTS names ( name String, nameHash String, state String, height UInt64, lastRenewal Int64, owner String, value Int64, highest Int64, data String, transfer Int64, revoked Int64, claimed Int64, renewals Int64, registered UInt8, expired UInt8, weak UInt8, stats String, start String, txs String, bids String ) ENGINE = MergeTree() ORDER BY (name, height)") # Get the newest block height in the database def getNewestBlock() -> int: """Returns the height of the newest block in the database""" - dbNB = mysql.connector.connect( + dbNB = Client( host=DB_HOST, user=DB_USER, password=DB_PASSWORD, @@ -170,7 +186,7 @@ def dbCheck(): print(block) -def getBlock(height): +def getBlock(height) -> Block | None: with dbGet.cursor() as cursor: cursor.execute("SELECT * FROM blocks WHERE height = %s", (height,)) block = cursor.fetchone() @@ -178,7 +194,7 @@ def getBlock(height): return None return Block(block) -def getTransaction(hash): +def getTransaction(hash) -> Transaction | None: with dbGet.cursor() as cursor: cursor.execute("SELECT * FROM transactions WHERE hash = %s", (hash,)) tx = cursor.fetchone() @@ -186,6 +202,112 @@ def getTransaction(hash): return None return Transaction(tx) + +def getTransactions(height) -> list[Transaction] | None: + with dbGet.cursor() as cursor: + cursor.execute("SELECT * FROM transactions WHERE block = %s", (height,)) + txs = cursor.fetchall() + if not txs: + return None + # Convert to list of Transaction objects + return [Transaction(tx) for tx in txs] + +def getNameFromHash(nameHash): + # Connect to db + with dbGet.cursor() as cursor: + cursor.execute("SELECT * FROM names WHERE nameHash = %s", (nameHash,)) + name = cursor.fetchone() + if not name: + return -1 + return Name(name) + +def getNamesFromBlock(height): + transactions = getTransactions(height) + if not transactions: + return -1 + + namesToSave: list[Name] = [] + names = [] + + for tx in transactions: + for output in tx.outputs: + cov = output.covenant + if cov.type == 0: # NONE + continue + # Check if name exists in block + if cov.nameHash in names: + for name in namesToSave: + if name.nameHash == cov.nameHash: + name.update(cov,tx) + name.txs.append(tx.hash) + namesToSave.remove(name) + else: + name = getNameFromHash(cov.nameHash) + if name == -1: + # Create new name + name = Name(cov) + name.txs.append(tx.hash) + name.height = height + else: + name.update(cov,tx) + name.txs.append(tx.hash) + namesToSave.append(name) + + queryData = [] + for name in namesToSave: + nameInfo = name.toJSON() + queryData.append(( + nameInfo["name"], + nameInfo["nameHash"], + nameInfo["state"], + nameInfo["height"], + nameInfo["lastRenewal"], + json.dumps(nameInfo["owner"]), + nameInfo["value"], + nameInfo["highest"], + json.dumps(nameInfo["data"]), + json.dumps(nameInfo["transfer"]), + nameInfo["revoked"], + nameInfo["claimed"], + nameInfo["renewals"], + nameInfo["registered"], + nameInfo["expired"], + nameInfo["weak"], + json.dumps(nameInfo["stats"]), + json.dumps(nameInfo["start"]), + json.dumps(nameInfo["txs"]), + json.dumps(nameInfo["bids"]) + )) + + query = """ + INSERT INTO names (name, nameHash, state, height, lastRenewal, owner, value, highest, data, transfer, revoked, claimed, renewals, registered, expired, weak, stats, start, txs, bids) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + state=VALUES(state), + height=VALUES(height), + lastRenewal=VALUES(lastRenewal), + owner=VALUES(owner), + value=VALUES(value), + highest=VALUES(highest), + data=VALUES(data), + transfer=VALUES(transfer), + revoked=VALUES(revoked), + claimed=VALUES(claimed), + renewals=VALUES(renewals), + registered=VALUES(registered), + expired=VALUES(expired), + weak=VALUES(weak), + stats=VALUES(stats), + start=VALUES(start), + txs=VALUES(txs), + bids=VALUES(bids) + """ + with dbSave.cursor() as cursor: + cursor.executemany(query, queryData) + dbSave.commit() + + return 0 + def getNodeHeight(): @@ -212,7 +334,6 @@ def getFirstMissingBlock(): block += 1 else: return block - return block @@ -270,6 +391,10 @@ class BlockWatcher: if indexBlock(height) != 0: print("Error indexing block") self.block = self.block - 1 + else: + # Check if there are any new names + if getNamesFromBlock(height) != 0: + print("Error indexing names") await asyncio.sleep(self.checkInterval) @@ -322,6 +447,50 @@ class CatchUp: def stop(self): self.closing = True self.running = False + +class NameSyncer: + def __init__(self, currentHeight, targetHeight): + self.currentHeight = currentHeight - 1 + self.targetHeight = targetHeight + self.running = True + self.closing = False + self.interupted = False + + async def sync(self): + print(f"Syncing names from {self.currentHeight} to {self.targetHeight}") + def signal_handler(sig, frame): + self.interupted = True + self.stop() + print("\n\nCaught Ctrl+C\n") + + signal.signal(signal.SIGINT, signal_handler) + asyncio.create_task(self.loop()) + while self.running: + await asyncio.sleep(1) + print("Stopping catch up") + while self.closing: + await asyncio.sleep(1) + print("Stopped catch up") + + + + async def loop(self): + while self.running: + if self.currentHeight >= self.targetHeight: + self.running = False + print(f"Caught up to {self.targetHeight}") + return + + if getNamesFromBlock(self.currentHeight + 1) != 0: + print(f"Error indexing names {self.currentHeight + 1}") + self.running = False + return + self.currentHeight += 1 + self.closing = False + + def stop(self): + self.closing = True + self.running = False # endregion @@ -367,8 +536,17 @@ if __name__ == "__main__": asyncio.run(catchUpper.catchUp()) if catchUpper.interupted: sys.exit(1) - - - + print("Starting mempool watcher.") - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) + + # Get names + # namesyncer = NameSyncer(2000, 2025) + # asyncio.run(namesyncer.sync()) + # if namesyncer.interupted: + # sys.exit(1) + + + + + print("Finished") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e48e790..2ec733f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -mysql-connector-python +clickhouse-driver requests python-dotenv flask \ No newline at end of file