Compare commits

..

7 Commits

3 changed files with 82 additions and 137 deletions

View File

@@ -228,6 +228,9 @@ class Covenant:
self.renewalCount = self.items[5]
self.blockHash = self.items[6]
# TYPE 11 - REVOKE (Only has namehash and height)
else:
raise ValueError("Invalid data type")

205
main.py
View File

@@ -1,5 +1,6 @@
import json
from clickhouse_driver import Client
import clickhouse_connect
import requests
from time import sleep
import json
@@ -46,7 +47,7 @@ if os.getenv("DB_NAME"):
# Clickhouse Database Setup
dbSave = Client(
dbSave = clickhouse_connect.create_client(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
@@ -96,27 +97,22 @@ def saveTransactions(txList, blockHeight):
return
# Prepare data for batch insert
txValues = []
for txData in txList:
print('.', end='', flush=True)
txValues.append((
txValues = [
(
txData["hash"], txData["witnessHash"], txData["fee"], txData["rate"],
txData["mtime"], blockHeight, txData["index"], txData["version"],
json.dumps(txData["inputs"]), json.dumps(txData["outputs"]),
txData["locktime"], txData["hex"]
))
)
for txData in txList
]
print(f"Inserting {len(txValues)} transactions...")
return dbSave.insert("transactions", txValues, column_names=[
"hash", "witnessHash", "fee", "rate", "mtime", "block", "tx_index", "version",
"inputs", "outputs", "locktime", "hex"
])
# Bulk insert transactions
query = """
INSERT INTO transactions (hash, witnessHash, fee, rate, mtime, block, tx_index, version,
inputs, outputs, locktime, hex)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE hash=hash
"""
with dbSave.cursor() as cursor:
cursor.executemany(query, txValues)
dbSave.commit()
def saveBlock(blockData):
hashes = [tx["hash"] for tx in blockData["txs"]]
@@ -125,101 +121,68 @@ def saveBlock(blockData):
saveTransactions(blockData["txs"], blockData["height"])
# Insert block if it doesn't exist
query = """
INSERT INTO blocks (hash, height, depth, version, prevBlock, merkleRoot, witnessRoot,
treeRoot, reservedRoot, time, bits, nonce, extraNonce, mask, txs)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE hash=hash
"""
blockValues = (
blockValues = [(
blockData["hash"], blockData["height"], blockData["depth"], blockData["version"],
blockData["prevBlock"], blockData["merkleRoot"], blockData["witnessRoot"],
blockData["treeRoot"], blockData["reservedRoot"], blockData["time"],
blockData["bits"], blockData["nonce"], blockData["extraNonce"],
blockData["mask"], json.dumps(hashes)
)
blockData["mask"], json.dumps(hashes) # Convert tx hashes to JSON string
)]
with dbSave.cursor() as cursor:
cursor.execute(query, blockValues)
dbSave.commit()
print('')
dbSave.insert("blocks", blockValues, column_names=[
"hash", "height", "depth", "version", "prevBlock", "merkleRoot", "witnessRoot",
"treeRoot", "reservedRoot", "time", "bits", "nonce", "extraNonce",
"mask", "txs"
])
def setupDB():
"""Creates the database tables"""
with dbSave.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS blocks ( hash String, height UInt64, depth Int32, version Int32, prevBlock String, merkleRoot String, witnessRoot String, treeRoot String, reservedRoot String, time UInt32, bits Int32, nonce UInt64, extraNonce String, mask String, txs String ) ENGINE = MergeTree() ORDER BY (hash, height)")
cursor.execute("CREATE TABLE IF NOT EXISTS transactions ( hash String, witnessHash String, fee Int64, rate Int64, mtime Int64, block UInt64, tx_index Int32, version Int32, inputs String, outputs String, locktime Int64, hex String ) ENGINE = MergeTree() ORDER BY (hash, block)")
cursor.execute("CREATE TABLE IF NOT EXISTS names ( name String, nameHash String, state String, height UInt64, lastRenewal Int64, owner String, value Int64, highest Int64, data String, transfer Int64, revoked Int64, claimed Int64, renewals Int64, registered UInt8, expired UInt8, weak UInt8, stats String, start String, txs String, bids String ) ENGINE = MergeTree() ORDER BY (name, height)")
print('block saved')
# def setupDB():
# """Creates the database tables"""
# dbSave.execute("CREATE TABLE IF NOT EXISTS blocks ( hash String, height UInt64, depth Int32, version Int32, prevBlock String, merkleRoot String, witnessRoot String, treeRoot String, reservedRoot String, time UInt32, bits Int32, nonce UInt64, extraNonce String, mask String, txs String ) ENGINE = MergeTree() ORDER BY (hash, height)")
# dbSave.execute("CREATE TABLE IF NOT EXISTS transactions ( hash String, witnessHash String, fee Int64, rate Int64, mtime Int64, block UInt64, tx_index Int32, version Int32, inputs String, outputs String, locktime Int64, hex String ) ENGINE = MergeTree() ORDER BY (hash, block)")
# dbSave.execute("CREATE TABLE IF NOT EXISTS names ( name String, nameHash String, state String, height UInt64, lastRenewal Int64, owner String, value Int64, highest Int64, data String, transfer Int64, revoked Int64, claimed Int64, renewals Int64, registered UInt8, expired UInt8, weak UInt8, stats String, start String, txs String, bids String ) ENGINE = MergeTree() ORDER BY (name, height)")
# Get the newest block height in the database
def getNewestBlock() -> int:
"""Returns the height of the newest block in the database"""
dbNB = Client(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
charset='utf8mb4',
collation='utf8mb4_unicode_ci',
)
with dbNB.cursor() as cursor:
cursor.execute("SELECT height FROM blocks ORDER BY height DESC LIMIT 1")
newestBlock = cursor.fetchone()
if newestBlock:
return int(newestBlock[0])
dbNB.close()
return -1
newestBlock = dbGet.query("SELECT height FROM blocks ORDER BY height DESC LIMIT 1").result
return int(newestBlock[0][0]) if newestBlock else -1
def dbCheck():
# For the first 100 blocks, check for transactions
for i in range(100):
with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM blocks WHERE height = %s", (i,))
block = cursor.fetchone()
block = dbGet.query(f"SELECT * FROM blocks WHERE height = {i}").result
if not block:
return
block = Block(block)
print(block)
print(Block(block[0]))
def getBlock(height) -> Block | None:
with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM blocks WHERE height = %s", (height,))
block = cursor.fetchone()
if not block:
return None
return Block(block)
"""Fetch a block by height"""
block = dbGet.query(f"SELECT * FROM blocks WHERE height = {height}").result
return Block(block[0]) if block else None
def getTransaction(hash) -> Transaction | None:
with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM transactions WHERE hash = %s", (hash,))
tx = cursor.fetchone()
if not tx:
return None
return Transaction(tx)
def getTransaction(tx_hash) -> Transaction | None:
"""Fetch a transaction by hash"""
tx = dbGet.query(f"SELECT * FROM transactions WHERE hash = '{tx_hash}'").result
return Transaction(tx[0]) if tx else None
def getTransactions(height) -> list[Transaction] | None:
with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM transactions WHERE block = %s", (height,))
txs = cursor.fetchall()
if not txs:
return None
# Convert to list of Transaction objects
return [Transaction(tx) for tx in txs]
"""Fetch all transactions for a given block height"""
txs = dbGet.query(f"SELECT * FROM transactions WHERE block = {height}").result
return [Transaction(tx) for tx in txs] if txs else None
def getNameFromHash(nameHash):
# Connect to db
with dbGet.cursor() as cursor:
cursor.execute("SELECT * FROM names WHERE nameHash = %s", (nameHash,))
name = cursor.fetchone()
if not name:
return -1
return Name(name)
"""Fetch a name record by nameHash"""
name = dbGet.query(f"SELECT * FROM names WHERE nameHash = '{nameHash}'").result
return Name(name[0]) if name else -1
def getNamesFromBlock(height):
transactions = getTransactions(height)
@@ -280,37 +243,14 @@ def getNamesFromBlock(height):
json.dumps(nameInfo["bids"])
))
query = """
INSERT INTO names (name, nameHash, state, height, lastRenewal, owner, value, highest, data, transfer, revoked, claimed, renewals, registered, expired, weak, stats, start, txs, bids)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
state=VALUES(state),
height=VALUES(height),
lastRenewal=VALUES(lastRenewal),
owner=VALUES(owner),
value=VALUES(value),
highest=VALUES(highest),
data=VALUES(data),
transfer=VALUES(transfer),
revoked=VALUES(revoked),
claimed=VALUES(claimed),
renewals=VALUES(renewals),
registered=VALUES(registered),
expired=VALUES(expired),
weak=VALUES(weak),
stats=VALUES(stats),
start=VALUES(start),
txs=VALUES(txs),
bids=VALUES(bids)
"""
with dbSave.cursor() as cursor:
cursor.executemany(query, queryData)
dbSave.commit()
dbSave.insert("names", queryData, column_names=[
"name", "nameHash", "state", "height", "lastRenewal", "owner", "value", "highest",
"data", "transfer", "revoked", "claimed", "renewals", "registered", "expired",
"weak", "stats", "start", "txs", "bids"
])
return 0
def getNodeHeight():
response = requests.get(HSD_URL)
if response.status_code != 200:
@@ -319,23 +259,22 @@ def getNodeHeight():
info = response.json()
return info["chain"]["height"]
def getFirstMissingBlock() -> int:
"""Finds the first missing block height in the database."""
def getFirstMissingBlock():
"""Finds missing block heights in the database."""
with dbGet.cursor() as cursor:
cursor.execute("SELECT height FROM blocks ORDER BY height ASC")
heights = [row[0] for row in cursor.fetchall()]
# Fetch all existing block heights in ascending order
result = dbGet.execute("SELECT height FROM blocks ORDER BY height ASC").result
heights = [row[0] for row in result]
if not heights:
return 0
return 0 # No blocks found, start from 0
block = 0
for i in heights:
if i == block:
block += 1
else:
return block
return block
# Find the first missing block height
for expected, actual in enumerate(heights):
if expected != actual:
return expected # First missing height found
return len(heights) # No missing block, return next expected height
async def main():
@@ -394,7 +333,7 @@ class BlockWatcher:
self.block = self.block - 1
else:
# Check if there are any new names
if getNamesFromBlock(height) != 0:
if getNamesFromBlock(height) < 0:
print("Error indexing names")
await asyncio.sleep(self.checkInterval)
@@ -519,7 +458,7 @@ def start_flask_in_thread():
if __name__ == "__main__":
# Webserver in background
start_flask_in_thread()
setupDB()
# setupDB()
# Check if DB needs to catch up
newestBlock = getFirstMissingBlock()
NodeHeight = getNodeHeight()
@@ -538,14 +477,16 @@ if __name__ == "__main__":
if catchUpper.interupted:
sys.exit(1)
print("Starting mempool watcher.")
asyncio.run(main())
# Get names
# namesyncer = NameSyncer(2000, 2025)
# asyncio.run(namesyncer.sync())
# if namesyncer.interupted:
# sys.exit(1)
namesyncer = NameSyncer(2000, 2025)
asyncio.run(namesyncer.sync())
if namesyncer.interupted:
sys.exit(1)
# print("Starting mempool watcher.")
# asyncio.run(main())

View File

@@ -1,4 +1,5 @@
clickhouse-driver
clickhouse-connect
requests
python-dotenv
flask