""" Multi-Level Caching System Implements 3-tier caching for filter pipeline efficiency. """ import json import hashlib import os import logging from pathlib import Path from typing import Optional, Dict, Any from datetime import datetime, timedelta from .models import AIAnalysisResult, FilterResult logger = logging.getLogger(__name__) class FilterCache: """ Three-level caching system: Level 1: In-memory cache (fastest, TTL-based) Level 2: AI analysis cache (persistent, content-hash based) Level 3: Filterset result cache (persistent, filterset version based) """ def __init__(self, cache_dir: str = 'data/filter_cache'): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) # Level 1: In-memory cache self.memory_cache: Dict[str, tuple[Any, datetime]] = {} self.memory_ttl = timedelta(minutes=5) # Level 2: AI analysis cache directory self.ai_cache_dir = self.cache_dir / 'ai_analysis' self.ai_cache_dir.mkdir(exist_ok=True) # Level 3: Filterset result cache directory self.filterset_cache_dir = self.cache_dir / 'filtersets' self.filterset_cache_dir.mkdir(exist_ok=True) # ===== Level 1: Memory Cache ===== def get_memory(self, key: str) -> Optional[Any]: """Get from memory cache if not expired""" if key in self.memory_cache: value, timestamp = self.memory_cache[key] if datetime.now() - timestamp < self.memory_ttl: return value else: # Expired, remove del self.memory_cache[key] return None def set_memory(self, key: str, value: Any): """Store in memory cache""" self.memory_cache[key] = (value, datetime.now()) def clear_memory(self): """Clear all memory cache""" self.memory_cache.clear() # ===== Level 2: AI Analysis Cache (Persistent) ===== @staticmethod def compute_content_hash(title: str, content: str) -> str: """Compute SHA-256 hash of content for caching""" text = f"{title}\n{content}".encode('utf-8') return hashlib.sha256(text).hexdigest() def get_ai_analysis(self, content_hash: str) -> Optional[AIAnalysisResult]: """ Get AI analysis result from cache. Args: content_hash: SHA-256 hash of content Returns: AIAnalysisResult if cached, None otherwise """ # Check memory first mem_key = f"ai_{content_hash}" cached = self.get_memory(mem_key) if cached: return cached # Check disk cache_file = self.ai_cache_dir / f"{content_hash}.json" if cache_file.exists(): try: with open(cache_file, 'r') as f: data = json.load(f) result = AIAnalysisResult.from_dict(data) # Store in memory for faster access self.set_memory(mem_key, result) logger.debug(f"AI analysis cache hit for {content_hash[:8]}...") return result except Exception as e: logger.error(f"Error loading AI cache {content_hash}: {e}") return None return None def set_ai_analysis(self, content_hash: str, result: AIAnalysisResult): """ Store AI analysis result in cache (persistent). Args: content_hash: SHA-256 hash of content result: AIAnalysisResult to cache """ # Store in memory mem_key = f"ai_{content_hash}" self.set_memory(mem_key, result) # Store on disk (persistent) cache_file = self.ai_cache_dir / f"{content_hash}.json" try: with open(cache_file, 'w') as f: json.dump(result.to_dict(), f, indent=2) logger.debug(f"Cached AI analysis for {content_hash[:8]}...") except Exception as e: logger.error(f"Error saving AI cache {content_hash}: {e}") # ===== Level 3: Filterset Result Cache ===== def _get_filterset_version(self, filterset_name: str, filtersets_config: Dict) -> str: """Get version hash of filterset definition for cache invalidation""" filterset_def = filtersets_config.get(filterset_name, {}) # Include version field if present, otherwise hash the entire definition if 'version' in filterset_def: return str(filterset_def['version']) # Compute hash of filterset definition definition_json = json.dumps(filterset_def, sort_keys=True) return hashlib.md5(definition_json.encode()).hexdigest()[:8] def get_filterset_results( self, filterset_name: str, filterset_version: str, max_age_hours: int = 24 ) -> Optional[Dict[str, FilterResult]]: """ Get cached filterset results. Args: filterset_name: Name of filterset filterset_version: Version hash of filterset definition max_age_hours: Maximum age of cache in hours Returns: Dict mapping post_uuid to FilterResult, or None if cache invalid """ cache_file = self.filterset_cache_dir / f"{filterset_name}_{filterset_version}.json" if not cache_file.exists(): return None # Check age try: file_age = datetime.now() - datetime.fromtimestamp(cache_file.stat().st_mtime) if file_age > timedelta(hours=max_age_hours): logger.debug(f"Filterset cache expired for {filterset_name}") return None # Load cache with open(cache_file, 'r') as f: data = json.load(f) # Deserialize FilterResults results = { uuid: FilterResult.from_dict(result_data) for uuid, result_data in data.items() } logger.info(f"Filterset cache hit for {filterset_name} ({len(results)} results)") return results except Exception as e: logger.error(f"Error loading filterset cache {filterset_name}: {e}") return None def set_filterset_results( self, filterset_name: str, filterset_version: str, results: Dict[str, FilterResult] ): """ Store filterset results in cache. Args: filterset_name: Name of filterset filterset_version: Version hash of filterset definition results: Dict mapping post_uuid to FilterResult """ cache_file = self.filterset_cache_dir / f"{filterset_name}_{filterset_version}.json" try: # Serialize FilterResults data = { uuid: result.to_dict() for uuid, result in results.items() } with open(cache_file, 'w') as f: json.dump(data, f, indent=2) logger.info(f"Cached {len(results)} filterset results for {filterset_name}") except Exception as e: logger.error(f"Error saving filterset cache {filterset_name}: {e}") def invalidate_filterset(self, filterset_name: str): """ Invalidate all caches for a filterset (when definition changes). Args: filterset_name: Name of filterset to invalidate """ pattern = f"{filterset_name}_*.json" for cache_file in self.filterset_cache_dir.glob(pattern): try: cache_file.unlink() logger.info(f"Invalidated filterset cache: {cache_file.name}") except Exception as e: logger.error(f"Error invalidating cache {cache_file}: {e}") # ===== Utility Methods ===== def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics""" ai_cache_count = len(list(self.ai_cache_dir.glob('*.json'))) filterset_cache_count = len(list(self.filterset_cache_dir.glob('*.json'))) memory_cache_count = len(self.memory_cache) return { 'memory_cache_size': memory_cache_count, 'ai_cache_size': ai_cache_count, 'filterset_cache_size': filterset_cache_count, 'ai_cache_dir': str(self.ai_cache_dir), 'filterset_cache_dir': str(self.filterset_cache_dir) } def clear_all(self): """Clear all caches (use with caution!)""" self.clear_memory() # Clear AI cache for cache_file in self.ai_cache_dir.glob('*.json'): cache_file.unlink() # Clear filterset cache for cache_file in self.filterset_cache_dir.glob('*.json'): cache_file.unlink() logger.warning("All filter caches cleared")