feat: Update to clickhouse db

This commit is contained in:
Nathan Woodburn 2025-02-08 14:04:25 +11:00
parent 3b6830e216
commit e30ced94c3
Signed by: nathanwoodburn
GPG Key ID: 203B000478AD0EF1
3 changed files with 443 additions and 27 deletions

View File

@ -42,6 +42,25 @@ class Block:
def __str__(self): def __str__(self):
return f"Block {self.height}" return f"Block {self.height}"
def toJSON(self) -> dict:
return {
"hash": self.hash,
"height": self.height,
"depth": self.depth,
"version": self.version,
"prevBlock": self.prevBlock,
"merkleRoot": self.merkleRoot,
"witnessRoot": self.witnessRoot,
"treeRoot": self.treeRoot,
"reservedRoot": self.reservedRoot,
"time": self.time,
"bits": self.bits,
"nonce": self.nonce,
"extraNonce": self.extraNonce,
"mask": self.mask,
"txs": self.txs
}
class Transaction: class Transaction:
def __init__(self, data): def __init__(self, data):
if isinstance(data, dict): if isinstance(data, dict):
@ -67,10 +86,10 @@ class Transaction:
self.index = data[6] self.index = data[6]
self.version = data[7] self.version = data[7]
# Load inputs with Input class # Load inputs with Input class
self.inputs = [] self.inputs: list[Input] = []
for input in json.loads(data[8]): for input in json.loads(data[8]):
self.inputs.append(Input(input)) self.inputs.append(Input(input))
self.outputs = [] self.outputs: list[Output] = []
for output in json.loads(data[9]): for output in json.loads(data[9]):
self.outputs.append(Output(output)) self.outputs.append(Output(output))
self.locktime = data[10] self.locktime = data[10]
@ -81,6 +100,22 @@ class Transaction:
def __str__(self): def __str__(self):
return f"Transaction {self.hash}" return f"Transaction {self.hash}"
def toJSON(self) -> dict:
return {
"hash": self.hash,
"witnessHash": self.witnessHash,
"fee": self.fee,
"rate": self.rate,
"mtime": self.mtime,
"block": self.block,
"index": self.index,
"version": self.version,
"inputs": [input.toJSON() for input in self.inputs],
"outputs": [output.toJSON() for output in self.outputs],
"locktime": self.locktime,
"hex": self.hex
}
class Input: class Input:
def __init__(self, data): def __init__(self, data):
@ -88,12 +123,26 @@ class Input:
self.prevout = data["prevout"] self.prevout = data["prevout"]
self.witness = data["witness"] self.witness = data["witness"]
self.sequence = data["sequence"] self.sequence = data["sequence"]
self.coin = Coin(data["coin"]) self.address = None
self.coin = None
if "address" in data:
self.address = data["address"]
if "coin" in data:
self.coin = Coin(data["coin"])
else: else:
raise ValueError("Invalid data type") raise ValueError("Invalid data type")
def __str__(self): def __str__(self):
return f"Input {self.prevout['hash']} {self.coin}" return f"Input {self.prevout['hash']} {self.coin}"
def toJSON(self) -> dict:
return {
"prevout": self.prevout,
"witness": self.witness,
"sequence": self.sequence,
"address": self.address,
"coin": self.coin.toJSON() if self.coin else None
}
class Output: class Output:
@ -108,6 +157,20 @@ class Output:
def __str__(self): def __str__(self):
return f"Output {self.value} {self.address} {self.covenant}" return f"Output {self.value} {self.address} {self.covenant}"
def toJSON(self) -> dict:
return {
"value": self.value,
"address": self.address,
"covenant": self.covenant.toJSON()
}
def hex_to_ascii(hex_string):
# Convert the hex string to bytes
bytes_obj = bytes.fromhex(hex_string)
# Decode the bytes object to an ASCII string
ascii_string = bytes_obj.decode('ascii')
return ascii_string
class Covenant: class Covenant:
def __init__(self, data): def __init__(self, data):
@ -115,11 +178,72 @@ class Covenant:
self.type = data["type"] self.type = data["type"]
self.action = data["action"] self.action = data["action"]
self.items = data["items"] self.items = data["items"]
self.nameHash = None
self.height = None
self.name = None
self.flags = None
self.hash = None
self.nonce = None
self.recordData = None
self.blockHash = None
self.version = None
self.Address = None
self.claimHeight = None
self.renewalCount = None
if self.type > 0: # All but NONE
self.nameHash = self.items[0]
self.height = self.items[1]
if self.type == 1: # CLAIM
self.flags = self.items[3]
if self.type in [1,2,3]: # CLAIM, OPEN, BID
self.name = hex_to_ascii(self.items[2])
if self.type == 3: # BID
self.hash = self.items[3]
if self.type == 4: # REVEAL
self.nonce = self.items[2]
if self.type in [6,7]: # REGISTER, UPDATE
self.recordData = self.items[2]
if self.type == 6: # REGISTER
self.blockHash = self.items[3]
if self.type == 8: # RENEW
self.blockHash = self.items[2]
if self.type == 9: # TRANSFER
self.version = self.items[2]
self.Address = self.items[3]
if self.type == 10: # FINALIZE
self.name = hex_to_ascii(self.items[2])
self.flags = self.items[3]
self.claimHeight= self.items[4]
self.renewalCount = self.items[5]
self.blockHash = self.items[6]
else: else:
raise ValueError("Invalid data type") raise ValueError("Invalid data type")
def __str__(self): def __str__(self):
return f"Covenant {self.type} {self.action}" return self.toString()
def toString(self):
return self.action
def toJSON(self) -> dict:
return {
"type": self.type,
"action": self.action,
"items": self.items
}
class Coin: class Coin:
def __init__(self, data): def __init__(self, data):
@ -134,4 +258,118 @@ class Coin:
raise ValueError("Invalid data type") raise ValueError("Invalid data type")
def __str__(self): def __str__(self):
return f"Coin {self.value} {self.address} {self.covenant}" return f"Coin {self.value} {self.address} {self.covenant}"
def toJSON(self) -> dict:
return {
"version": self.version,
"height": self.height,
"value": self.value,
"address": self.address,
"covenant": self.covenant.toJSON(),
"coinbase": self.coinbase
}
class Bid:
def __init__(self, covenant: Covenant, tx: Transaction):
self.name = covenant.name
self.nameHash = covenant.nameHash
self.height = tx.block
self.tx: Transaction = tx
self.bidHash = covenant.hash
self.bid = covenant
self.value = 0
self.blind = 0
class Name:
def __init__(self, data):
self.name = None
self.nameHash = None
self.state = "CLOSED"
self.height = 0
self.lastRenewal = 0
self.owner = None
self.value = 0
self.highest = 0
self.data = None
self.transfer = 0
self.revoked = 0
self.claimed = 0
self.renewals = 0
self.registered = False
self.expired = False
self.weak = False
self.stats = None
self.start = None
self.txs = []
self.bids = []
if isinstance(data, Covenant):
if not data.type in [1,2]:
print(data.type)
raise ValueError("Invalid covenant type")
self.name = data.name
self.nameHash = data.nameHash
self.height = data.height
if data.type == 2: # OPEN
self.state = "OPEN"
elif isinstance(data, dict):
for key, value in data.items():
setattr(self, key, value)
elif isinstance(data, list) or isinstance(data, tuple):
for key, value in zip(self.__dict__.keys(), data):
setattr(self, key, value)
else:
raise ValueError("Invalid data type")
def __str__(self):
return self.name
def update(self, covenant: Covenant, tx: Transaction):
if covenant.type == 0: # NONE
return
if covenant.type == 1: # CLAIM
self.state = "CLOSED"
self.claimed += 1
if covenant.type == 2: # OPEN
self.state = "OPEN"
if self.height == 0:
self.height = covenant.height
if covenant.type == 3: # BID
bid: Bid = Bid(covenant, tx)
print(covenant.toJSON())
def toJSON(self) -> dict:
return {
"name": self.name,
"nameHash": self.nameHash,
"state": self.state,
"height": self.height,
"lastRenewal": self.lastRenewal,
"owner": self.owner,
"value": self.value,
"highest": self.highest,
"data": self.data,
"transfer": self.transfer,
"revoked": self.revoked,
"claimed": self.claimed,
"renewals": self.renewals,
"registered": self.registered,
"expired": self.expired,
"weak": self.weak,
"stats": self.stats,
"start": self.start,
"txs": self.txs,
"bids": self.bids
}

220
main.py
View File

@ -1,10 +1,10 @@
import json import json
import mysql.connector from clickhouse_driver import Client
import requests import requests
from time import sleep from time import sleep
import json import json
import sys import sys
from indexerClasses import Block, Transaction from indexerClasses import Block, Transaction, Input, Output, Covenant,Name
import asyncio import asyncio
import signal import signal
import dotenv import dotenv
@ -45,23 +45,19 @@ if os.getenv("DB_NAME"):
DB_NAME = os.getenv("DB_NAME") DB_NAME = os.getenv("DB_NAME")
# MySQL Database Setup # Clickhouse Database Setup
dbSave = mysql.connector.connect( dbSave = Client(
host=DB_HOST, host=DB_HOST,
user=DB_USER, user=DB_USER,
password=DB_PASSWORD, password=DB_PASSWORD,
database=DB_NAME, database=DB_NAME
charset='utf8mb4',
collation='utf8mb4_unicode_ci',
) )
dbGet = mysql.connector.connect( dbGet = Client(
host=DB_HOST, host=DB_HOST,
user=DB_USER, user=DB_USER,
password=DB_PASSWORD, password=DB_PASSWORD,
database=DB_NAME, database=DB_NAME
charset='utf8mb4',
collation='utf8mb4_unicode_ci',
) )
def indexBlock(blockHeight): def indexBlock(blockHeight):
@ -76,6 +72,25 @@ def indexBlock(blockHeight):
return 0 return 0
def getNameFromHash(nameHash):
name = requests.post(HSD_URL, json={"method": "getnamebyhash", "params": [nameHash]})
if name.status_code != 200:
print(f"Error fetching name {nameHash}: {name.status_code}")
return -1
name = name.json()
if not name["result"]:
return -1
name = name["result"]
nameInfo = requests.post(HSD_URL, json={"method": "getnameinfo", "params": [name]})
if nameInfo.status_code != 200:
print(f"Error fetching name info {name}: {nameInfo.status_code}")
return -1
nameInfo = nameInfo.json()
if not nameInfo["result"]:
print(f"Error fetching name info {name}: {nameInfo['error']}")
return -1
return nameInfo["result"]
def saveTransactions(txList, blockHeight): def saveTransactions(txList, blockHeight):
if not txList: if not txList:
return return
@ -134,14 +149,15 @@ def saveBlock(blockData):
def setupDB(): def setupDB():
"""Creates the database tables""" """Creates the database tables"""
with dbSave.cursor() as cursor: with dbSave.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS blocks (hash VARCHAR(64), height BIGINT, depth INT, version INT, prevBlock VARCHAR(64), merkleRoot VARCHAR(64), witnessRoot VARCHAR(64), treeRoot VARCHAR(64), reservedRoot VARCHAR(64), time INT, bits INT, nonce BIGINT UNSIGNED, extraNonce VARCHAR(64), mask VARCHAR(64), txs JSON)") cursor.execute("CREATE TABLE IF NOT EXISTS blocks ( hash String, height UInt64, depth Int32, version Int32, prevBlock String, merkleRoot String, witnessRoot String, treeRoot String, reservedRoot String, time UInt32, bits Int32, nonce UInt64, extraNonce String, mask String, txs String ) ENGINE = MergeTree() ORDER BY (hash, height)")
cursor.execute("CREATE TABLE IF NOT EXISTS transactions (hash VARCHAR(64), witnessHash VARCHAR(64), fee BIGINT, rate BIGINT, mtime BIGINT, block BIGINT, `index` INT, version INT, inputs JSON, outputs JSON, locktime BIGINT, hex LONGTEXT)") cursor.execute("CREATE TABLE IF NOT EXISTS transactions ( hash String, witnessHash String, fee Int64, rate Int64, mtime Int64, block UInt64, index Int32, version Int32, inputs String, outputs String, locktime Int64, hex String ) ENGINE = MergeTree() ORDER BY (hash, block)")
cursor.execute("CREATE TABLE IF NOT EXISTS names ( name String, nameHash String, state String, height UInt64, lastRenewal Int64, owner String, value Int64, highest Int64, data String, transfer Int64, revoked Int64, claimed Int64, renewals Int64, registered UInt8, expired UInt8, weak UInt8, stats String, start String, txs String, bids String ) ENGINE = MergeTree() ORDER BY (name, height)")
# Get the newest block height in the database # Get the newest block height in the database
def getNewestBlock() -> int: def getNewestBlock() -> int:
"""Returns the height of the newest block in the database""" """Returns the height of the newest block in the database"""
dbNB = mysql.connector.connect( dbNB = Client(
host=DB_HOST, host=DB_HOST,
user=DB_USER, user=DB_USER,
password=DB_PASSWORD, password=DB_PASSWORD,
@ -170,7 +186,7 @@ def dbCheck():
print(block) print(block)
def getBlock(height): def getBlock(height) -> Block | None:
with dbGet.cursor() as cursor: with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM blocks WHERE height = %s", (height,)) cursor.execute("SELECT * FROM blocks WHERE height = %s", (height,))
block = cursor.fetchone() block = cursor.fetchone()
@ -178,7 +194,7 @@ def getBlock(height):
return None return None
return Block(block) return Block(block)
def getTransaction(hash): def getTransaction(hash) -> Transaction | None:
with dbGet.cursor() as cursor: with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM transactions WHERE hash = %s", (hash,)) cursor.execute("SELECT * FROM transactions WHERE hash = %s", (hash,))
tx = cursor.fetchone() tx = cursor.fetchone()
@ -186,6 +202,112 @@ def getTransaction(hash):
return None return None
return Transaction(tx) return Transaction(tx)
def getTransactions(height) -> list[Transaction] | None:
with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM transactions WHERE block = %s", (height,))
txs = cursor.fetchall()
if not txs:
return None
# Convert to list of Transaction objects
return [Transaction(tx) for tx in txs]
def getNameFromHash(nameHash):
# Connect to db
with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM names WHERE nameHash = %s", (nameHash,))
name = cursor.fetchone()
if not name:
return -1
return Name(name)
def getNamesFromBlock(height):
transactions = getTransactions(height)
if not transactions:
return -1
namesToSave: list[Name] = []
names = []
for tx in transactions:
for output in tx.outputs:
cov = output.covenant
if cov.type == 0: # NONE
continue
# Check if name exists in block
if cov.nameHash in names:
for name in namesToSave:
if name.nameHash == cov.nameHash:
name.update(cov,tx)
name.txs.append(tx.hash)
namesToSave.remove(name)
else:
name = getNameFromHash(cov.nameHash)
if name == -1:
# Create new name
name = Name(cov)
name.txs.append(tx.hash)
name.height = height
else:
name.update(cov,tx)
name.txs.append(tx.hash)
namesToSave.append(name)
queryData = []
for name in namesToSave:
nameInfo = name.toJSON()
queryData.append((
nameInfo["name"],
nameInfo["nameHash"],
nameInfo["state"],
nameInfo["height"],
nameInfo["lastRenewal"],
json.dumps(nameInfo["owner"]),
nameInfo["value"],
nameInfo["highest"],
json.dumps(nameInfo["data"]),
json.dumps(nameInfo["transfer"]),
nameInfo["revoked"],
nameInfo["claimed"],
nameInfo["renewals"],
nameInfo["registered"],
nameInfo["expired"],
nameInfo["weak"],
json.dumps(nameInfo["stats"]),
json.dumps(nameInfo["start"]),
json.dumps(nameInfo["txs"]),
json.dumps(nameInfo["bids"])
))
query = """
INSERT INTO names (name, nameHash, state, height, lastRenewal, owner, value, highest, data, transfer, revoked, claimed, renewals, registered, expired, weak, stats, start, txs, bids)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
state=VALUES(state),
height=VALUES(height),
lastRenewal=VALUES(lastRenewal),
owner=VALUES(owner),
value=VALUES(value),
highest=VALUES(highest),
data=VALUES(data),
transfer=VALUES(transfer),
revoked=VALUES(revoked),
claimed=VALUES(claimed),
renewals=VALUES(renewals),
registered=VALUES(registered),
expired=VALUES(expired),
weak=VALUES(weak),
stats=VALUES(stats),
start=VALUES(start),
txs=VALUES(txs),
bids=VALUES(bids)
"""
with dbSave.cursor() as cursor:
cursor.executemany(query, queryData)
dbSave.commit()
return 0
def getNodeHeight(): def getNodeHeight():
@ -212,7 +334,6 @@ def getFirstMissingBlock():
block += 1 block += 1
else: else:
return block return block
return block return block
@ -270,6 +391,10 @@ class BlockWatcher:
if indexBlock(height) != 0: if indexBlock(height) != 0:
print("Error indexing block") print("Error indexing block")
self.block = self.block - 1 self.block = self.block - 1
else:
# Check if there are any new names
if getNamesFromBlock(height) != 0:
print("Error indexing names")
await asyncio.sleep(self.checkInterval) await asyncio.sleep(self.checkInterval)
@ -322,6 +447,50 @@ class CatchUp:
def stop(self): def stop(self):
self.closing = True self.closing = True
self.running = False self.running = False
class NameSyncer:
def __init__(self, currentHeight, targetHeight):
self.currentHeight = currentHeight - 1
self.targetHeight = targetHeight
self.running = True
self.closing = False
self.interupted = False
async def sync(self):
print(f"Syncing names from {self.currentHeight} to {self.targetHeight}")
def signal_handler(sig, frame):
self.interupted = True
self.stop()
print("\n\nCaught Ctrl+C\n")
signal.signal(signal.SIGINT, signal_handler)
asyncio.create_task(self.loop())
while self.running:
await asyncio.sleep(1)
print("Stopping catch up")
while self.closing:
await asyncio.sleep(1)
print("Stopped catch up")
async def loop(self):
while self.running:
if self.currentHeight >= self.targetHeight:
self.running = False
print(f"Caught up to {self.targetHeight}")
return
if getNamesFromBlock(self.currentHeight + 1) != 0:
print(f"Error indexing names {self.currentHeight + 1}")
self.running = False
return
self.currentHeight += 1
self.closing = False
def stop(self):
self.closing = True
self.running = False
# endregion # endregion
@ -367,8 +536,17 @@ if __name__ == "__main__":
asyncio.run(catchUpper.catchUp()) asyncio.run(catchUpper.catchUp())
if catchUpper.interupted: if catchUpper.interupted:
sys.exit(1) sys.exit(1)
print("Starting mempool watcher.") print("Starting mempool watcher.")
asyncio.run(main()) asyncio.run(main())
# Get names
# namesyncer = NameSyncer(2000, 2025)
# asyncio.run(namesyncer.sync())
# if namesyncer.interupted:
# sys.exit(1)
print("Finished")

View File

@ -1,4 +1,4 @@
mysql-connector-python clickhouse-driver
requests requests
python-dotenv python-dotenv
flask flask