Files
balanceboard/filter_pipeline/cache.py
chelsea 94e12041ec Add filter pipeline core infrastructure (Phase 1)
Implements plugin-based content filtering system with multi-level caching:

Core Components:
- FilterEngine: Main orchestrator for content filtering
- FilterCache: 3-level caching (memory, AI results, filterset results)
- FilterConfig: Configuration loader for filter_config.json & filtersets.json
- FilterResult & AIAnalysisResult: Data models for filter results

Architecture:
- BaseStage: Abstract class for pipeline stages
- BaseFilterPlugin: Abstract class for filter plugins
- Multi-threaded parallel processing support
- Content-hash based AI result caching (cost savings)
- Filterset result caching (fast filterset switching)

Configuration:
- filter_config.json: AI models, caching, parallel workers
- Using only Llama 70B for cost efficiency
- Compatible with existing filtersets.json

Integration:
- apply_filterset() API compatible with user preferences
- process_batch() for batch post processing
- Lazy-loaded stages to avoid import errors when AI disabled

Related to issue #8 (filtering engine implementation)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 22:46:10 -05:00

260 lines
8.6 KiB
Python

"""
Multi-Level Caching System
Implements 3-tier caching for filter pipeline efficiency.
"""
import json
import hashlib
import os
import logging
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from .models import AIAnalysisResult, FilterResult
logger = logging.getLogger(__name__)
class FilterCache:
"""
Three-level caching system:
Level 1: In-memory cache (fastest, TTL-based)
Level 2: AI analysis cache (persistent, content-hash based)
Level 3: Filterset result cache (persistent, filterset version based)
"""
def __init__(self, cache_dir: str = 'data/filter_cache'):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Level 1: In-memory cache
self.memory_cache: Dict[str, tuple[Any, datetime]] = {}
self.memory_ttl = timedelta(minutes=5)
# Level 2: AI analysis cache directory
self.ai_cache_dir = self.cache_dir / 'ai_analysis'
self.ai_cache_dir.mkdir(exist_ok=True)
# Level 3: Filterset result cache directory
self.filterset_cache_dir = self.cache_dir / 'filtersets'
self.filterset_cache_dir.mkdir(exist_ok=True)
# ===== Level 1: Memory Cache =====
def get_memory(self, key: str) -> Optional[Any]:
"""Get from memory cache if not expired"""
if key in self.memory_cache:
value, timestamp = self.memory_cache[key]
if datetime.now() - timestamp < self.memory_ttl:
return value
else:
# Expired, remove
del self.memory_cache[key]
return None
def set_memory(self, key: str, value: Any):
"""Store in memory cache"""
self.memory_cache[key] = (value, datetime.now())
def clear_memory(self):
"""Clear all memory cache"""
self.memory_cache.clear()
# ===== Level 2: AI Analysis Cache (Persistent) =====
@staticmethod
def compute_content_hash(title: str, content: str) -> str:
"""Compute SHA-256 hash of content for caching"""
text = f"{title}\n{content}".encode('utf-8')
return hashlib.sha256(text).hexdigest()
def get_ai_analysis(self, content_hash: str) -> Optional[AIAnalysisResult]:
"""
Get AI analysis result from cache.
Args:
content_hash: SHA-256 hash of content
Returns:
AIAnalysisResult if cached, None otherwise
"""
# Check memory first
mem_key = f"ai_{content_hash}"
cached = self.get_memory(mem_key)
if cached:
return cached
# Check disk
cache_file = self.ai_cache_dir / f"{content_hash}.json"
if cache_file.exists():
try:
with open(cache_file, 'r') as f:
data = json.load(f)
result = AIAnalysisResult.from_dict(data)
# Store in memory for faster access
self.set_memory(mem_key, result)
logger.debug(f"AI analysis cache hit for {content_hash[:8]}...")
return result
except Exception as e:
logger.error(f"Error loading AI cache {content_hash}: {e}")
return None
return None
def set_ai_analysis(self, content_hash: str, result: AIAnalysisResult):
"""
Store AI analysis result in cache (persistent).
Args:
content_hash: SHA-256 hash of content
result: AIAnalysisResult to cache
"""
# Store in memory
mem_key = f"ai_{content_hash}"
self.set_memory(mem_key, result)
# Store on disk (persistent)
cache_file = self.ai_cache_dir / f"{content_hash}.json"
try:
with open(cache_file, 'w') as f:
json.dump(result.to_dict(), f, indent=2)
logger.debug(f"Cached AI analysis for {content_hash[:8]}...")
except Exception as e:
logger.error(f"Error saving AI cache {content_hash}: {e}")
# ===== Level 3: Filterset Result Cache =====
def _get_filterset_version(self, filterset_name: str, filtersets_config: Dict) -> str:
"""Get version hash of filterset definition for cache invalidation"""
filterset_def = filtersets_config.get(filterset_name, {})
# Include version field if present, otherwise hash the entire definition
if 'version' in filterset_def:
return str(filterset_def['version'])
# Compute hash of filterset definition
definition_json = json.dumps(filterset_def, sort_keys=True)
return hashlib.md5(definition_json.encode()).hexdigest()[:8]
def get_filterset_results(
self,
filterset_name: str,
filterset_version: str,
max_age_hours: int = 24
) -> Optional[Dict[str, FilterResult]]:
"""
Get cached filterset results.
Args:
filterset_name: Name of filterset
filterset_version: Version hash of filterset definition
max_age_hours: Maximum age of cache in hours
Returns:
Dict mapping post_uuid to FilterResult, or None if cache invalid
"""
cache_file = self.filterset_cache_dir / f"{filterset_name}_{filterset_version}.json"
if not cache_file.exists():
return None
# Check age
try:
file_age = datetime.now() - datetime.fromtimestamp(cache_file.stat().st_mtime)
if file_age > timedelta(hours=max_age_hours):
logger.debug(f"Filterset cache expired for {filterset_name}")
return None
# Load cache
with open(cache_file, 'r') as f:
data = json.load(f)
# Deserialize FilterResults
results = {
uuid: FilterResult.from_dict(result_data)
for uuid, result_data in data.items()
}
logger.info(f"Filterset cache hit for {filterset_name} ({len(results)} results)")
return results
except Exception as e:
logger.error(f"Error loading filterset cache {filterset_name}: {e}")
return None
def set_filterset_results(
self,
filterset_name: str,
filterset_version: str,
results: Dict[str, FilterResult]
):
"""
Store filterset results in cache.
Args:
filterset_name: Name of filterset
filterset_version: Version hash of filterset definition
results: Dict mapping post_uuid to FilterResult
"""
cache_file = self.filterset_cache_dir / f"{filterset_name}_{filterset_version}.json"
try:
# Serialize FilterResults
data = {
uuid: result.to_dict()
for uuid, result in results.items()
}
with open(cache_file, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Cached {len(results)} filterset results for {filterset_name}")
except Exception as e:
logger.error(f"Error saving filterset cache {filterset_name}: {e}")
def invalidate_filterset(self, filterset_name: str):
"""
Invalidate all caches for a filterset (when definition changes).
Args:
filterset_name: Name of filterset to invalidate
"""
pattern = f"{filterset_name}_*.json"
for cache_file in self.filterset_cache_dir.glob(pattern):
try:
cache_file.unlink()
logger.info(f"Invalidated filterset cache: {cache_file.name}")
except Exception as e:
logger.error(f"Error invalidating cache {cache_file}: {e}")
# ===== Utility Methods =====
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
ai_cache_count = len(list(self.ai_cache_dir.glob('*.json')))
filterset_cache_count = len(list(self.filterset_cache_dir.glob('*.json')))
memory_cache_count = len(self.memory_cache)
return {
'memory_cache_size': memory_cache_count,
'ai_cache_size': ai_cache_count,
'filterset_cache_size': filterset_cache_count,
'ai_cache_dir': str(self.ai_cache_dir),
'filterset_cache_dir': str(self.filterset_cache_dir)
}
def clear_all(self):
"""Clear all caches (use with caution!)"""
self.clear_memory()
# Clear AI cache
for cache_file in self.ai_cache_dir.glob('*.json'):
cache_file.unlink()
# Clear filterset cache
for cache_file in self.filterset_cache_dir.glob('*.json'):
cache_file.unlink()
logger.warning("All filter caches cleared")