generated from nathanwoodburn/python-webserver-template
fix: Split line in string literal
All checks were successful
Build Docker / BuildImage (push) Successful in 31s
All checks were successful
Build Docker / BuildImage (push) Successful in 31s
This commit is contained in:
parent
e8d5dc8f9f
commit
feba858628
55
cache.py
55
cache.py
@ -1,10 +1,11 @@
|
|||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import threading
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from time import time
|
from time import time, sleep
|
||||||
|
|
||||||
def file_cache(folder="cache", ttl=300):
|
def file_cache(folder="cache", ttl=3600):
|
||||||
"""
|
"""
|
||||||
Decorator to cache function results in the specified folder with a TTL.
|
Decorator to cache function results in the specified folder with a TTL.
|
||||||
|
|
||||||
@ -15,6 +16,29 @@ def file_cache(folder="cache", ttl=300):
|
|||||||
if not os.path.exists(folder):
|
if not os.path.exists(folder):
|
||||||
os.makedirs(folder)
|
os.makedirs(folder)
|
||||||
|
|
||||||
|
def refresh_cache(func, cache_file, cache_key, args, kwargs):
|
||||||
|
def refresh_loop():
|
||||||
|
while True:
|
||||||
|
sleep(ttl)
|
||||||
|
try:
|
||||||
|
# Check if the cache is less than half the ttl (wait for half ttl to pass)
|
||||||
|
if os.path.exists(cache_file):
|
||||||
|
with open(cache_file, "r") as f:
|
||||||
|
cached_data = json.load(f)
|
||||||
|
if time() - cached_data["timestamp"] < ttl / 2:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Re-compute the result and update the cache
|
||||||
|
print(f"Refreshing cache for {func.__name__}...")
|
||||||
|
result = func(*args, **kwargs)
|
||||||
|
with open(cache_file, "w") as f:
|
||||||
|
json.dump({"timestamp": time(), "result": result}, f)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error during cache refresh: {e}")
|
||||||
|
|
||||||
|
threading.Thread(target=refresh_loop, daemon=True).start()
|
||||||
|
|
||||||
|
|
||||||
def decorator(func):
|
def decorator(func):
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
@ -23,7 +47,17 @@ def file_cache(folder="cache", ttl=300):
|
|||||||
f"{func.__name__}-{args}-{kwargs}".encode("utf-8")
|
f"{func.__name__}-{args}-{kwargs}".encode("utf-8")
|
||||||
).hexdigest()
|
).hexdigest()
|
||||||
cache_file = os.path.join(folder, f"{cache_key}.json")
|
cache_file = os.path.join(folder, f"{cache_key}.json")
|
||||||
|
|
||||||
|
# Start a background thread for auto-refresh if it doesn't exist
|
||||||
|
if not os.path.exists(cache_file):
|
||||||
|
print(f"Creating cache for {func.__name__} with auto-refresh.")
|
||||||
|
result = func(*args, **kwargs)
|
||||||
|
with open(cache_file, "w") as f:
|
||||||
|
json.dump({"timestamp": time(), "result": result}, f)
|
||||||
|
|
||||||
|
# Start refresh thread
|
||||||
|
refresh_cache(func, cache_file, cache_key, args, kwargs)
|
||||||
|
|
||||||
# Check if cache exists and is valid
|
# Check if cache exists and is valid
|
||||||
if os.path.exists(cache_file):
|
if os.path.exists(cache_file):
|
||||||
try:
|
try:
|
||||||
@ -34,15 +68,14 @@ def file_cache(folder="cache", ttl=300):
|
|||||||
return cached_data["result"]
|
return cached_data["result"]
|
||||||
except (IOError, ValueError, KeyError):
|
except (IOError, ValueError, KeyError):
|
||||||
pass # In case of error, re-compute the result
|
pass # In case of error, re-compute the result
|
||||||
|
|
||||||
# Call the function and cache the result
|
|
||||||
result = func(*args, **kwargs)
|
|
||||||
try:
|
|
||||||
with open(cache_file, "w") as f:
|
|
||||||
json.dump({"timestamp": time(), "result": result}, f)
|
|
||||||
except (IOError, TypeError) as e:
|
|
||||||
print(f"Warning: Could not cache result: {e}")
|
|
||||||
|
|
||||||
|
# If cache is expired or invalid, recompute and update
|
||||||
|
result = func(*args, **kwargs)
|
||||||
|
with open(cache_file, "w") as f:
|
||||||
|
json.dump({"timestamp": time(), "result": result}, f)
|
||||||
|
|
||||||
|
# Start refresh thread if it doesn't exist
|
||||||
|
refresh_cache(func, cache_file, cache_key, args, kwargs)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
@ -116,10 +116,8 @@ def index():
|
|||||||
# solValue = 10
|
# solValue = 10
|
||||||
# vaultBalance = "20.00"
|
# vaultBalance = "20.00"
|
||||||
|
|
||||||
pie_chart_data = [(token['symbol'].upper(), token['value'], f"{token['name']}: ${
|
pie_chart_data = [(token['symbol'].upper(), token['value'], f"{token['name']}: ${'{:.2f}'.format(token['value'])}") for token in tokens]
|
||||||
'{:.2f}'.format(token['value'])}") for token in tokens]
|
pie_chart_data.append(("SOL", solValue, f"Solana: ${'{:.2f}'.format(solValue)}"))
|
||||||
pie_chart_data.append(
|
|
||||||
("SOL", solValue, f"Solana: ${'{:.2f}'.format(solValue)}"))
|
|
||||||
|
|
||||||
cardanoBalance = getCardanoValue(vault_cardano_address)
|
cardanoBalance = getCardanoValue(vault_cardano_address)
|
||||||
cardanoBalance = "{:.2f}".format(cardanoBalance)
|
cardanoBalance = "{:.2f}".format(cardanoBalance)
|
||||||
@ -282,8 +280,7 @@ get_cardano_balance_cache = TTLCache(maxsize=1, ttl=3600)
|
|||||||
def getCardanoBalance(address: str):
|
def getCardanoBalance(address: str):
|
||||||
# Get balance of cardano address
|
# Get balance of cardano address
|
||||||
try:
|
try:
|
||||||
response = requests.get(f"https://cardano-mainnet.blockfrost.io/api/v0/accounts/{
|
response = requests.get(f"https://cardano-mainnet.blockfrost.io/api/v0/accounts/{address}", headers={"project_id": blockFrost_API})
|
||||||
address}", headers={"project_id": blockFrost_API})
|
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
print("Error getting cardano balance")
|
print("Error getting cardano balance")
|
||||||
return 0
|
return 0
|
||||||
|
Loading…
Reference in New Issue
Block a user