import os import json import hashlib import threading from functools import wraps from time import time, sleep def file_cache(ttl=3600): """ Decorator to cache function results in the specified folder with a TTL. Args: folder (str): Directory where cached files will be stored. ttl (int): Time-to-live for the cache in seconds. """ folder="cache" if not os.path.exists(folder): os.makedirs(folder) def refresh_cache(func, cache_file, cache_key, args, kwargs): def refresh_loop(): while True: sleep(ttl) try: if os.path.exists(cache_file): with open(cache_file, "r") as f: cached_data = json.load(f) if time() - cached_data["timestamp"] < ttl: return # Re-compute the result and update the cache result = func(*args, **kwargs) with open(cache_file, "w") as f: json.dump({"timestamp": time(), "result": result}, f) except Exception as e: print(f"Error during cache refresh of {cache_file} (Name: {func.__name__}): {e}",flush=True) threading.Thread(target=refresh_loop).start() def decorator(func): @wraps(func) def wrapper(*args, **kwargs): # Create a unique cache key based on the function name and arguments cache_key = hashlib.md5( f"{func.__name__}-{args}-{kwargs}".encode("utf-8") ).hexdigest() cache_file = os.path.join(folder, f"{cache_key}.json") # Start a background thread for auto-refresh if it doesn't exist if not os.path.exists(cache_file): result = func(*args, **kwargs) with open(cache_file, "w") as f: json.dump({"timestamp": time(), "result": result}, f) # Start refresh thread refresh_cache(func, cache_file, cache_key, args, kwargs) # Check if cache exists and is valid if os.path.exists(cache_file): try: with open(cache_file, "r") as f: cached_data = json.load(f) # Check if the cache has expired if time() - cached_data["timestamp"] < (ttl * 2): refresh_cache(func, cache_file, cache_key, args, kwargs) return cached_data["result"] except (IOError, ValueError, KeyError): pass # In case of error, re-compute the result # If cache is expired or invalid, recompute and update result = func(*args, **kwargs) with open(cache_file, "w") as f: json.dump({"timestamp": time(), "result": result}, f) # Start refresh thread if it doesn't exist refresh_cache(func, cache_file, cache_key, args, kwargs) return result return wrapper return decorator