Files
balanceboard/filter_pipeline/config.py
chelsea 94e12041ec Add filter pipeline core infrastructure (Phase 1)
Implements plugin-based content filtering system with multi-level caching:

Core Components:
- FilterEngine: Main orchestrator for content filtering
- FilterCache: 3-level caching (memory, AI results, filterset results)
- FilterConfig: Configuration loader for filter_config.json & filtersets.json
- FilterResult & AIAnalysisResult: Data models for filter results

Architecture:
- BaseStage: Abstract class for pipeline stages
- BaseFilterPlugin: Abstract class for filter plugins
- Multi-threaded parallel processing support
- Content-hash based AI result caching (cost savings)
- Filterset result caching (fast filterset switching)

Configuration:
- filter_config.json: AI models, caching, parallel workers
- Using only Llama 70B for cost efficiency
- Compatible with existing filtersets.json

Integration:
- apply_filterset() API compatible with user preferences
- process_batch() for batch post processing
- Lazy-loaded stages to avoid import errors when AI disabled

Related to issue #8 (filtering engine implementation)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 22:46:10 -05:00

207 lines
7.1 KiB
Python

"""
Configuration Loader
Loads and validates filter pipeline configuration.
"""
import json
import os
import logging
from pathlib import Path
from typing import Dict, List, Any, Optional
logger = logging.getLogger(__name__)
class FilterConfig:
"""Configuration for filter pipeline"""
def __init__(
self,
config_file: str = 'filter_config.json',
filtersets_file: str = 'filtersets.json'
):
self.config_file = Path(config_file)
self.filtersets_file = Path(filtersets_file)
# Load configurations
self.config = self._load_config()
self.filtersets = self._load_filtersets()
def _load_config(self) -> Dict:
"""Load filter_config.json"""
if not self.config_file.exists():
logger.warning(f"{self.config_file} not found, using defaults")
return self._get_default_config()
try:
with open(self.config_file, 'r') as f:
config = json.load(f)
logger.info(f"Loaded filter config from {self.config_file}")
return config
except Exception as e:
logger.error(f"Error loading {self.config_file}: {e}")
return self._get_default_config()
def _load_filtersets(self) -> Dict:
"""Load filtersets.json"""
if not self.filtersets_file.exists():
logger.error(f"{self.filtersets_file} not found!")
return {}
try:
with open(self.filtersets_file, 'r') as f:
filtersets = json.load(f)
logger.info(f"Loaded {len(filtersets)} filtersets from {self.filtersets_file}")
return filtersets
except Exception as e:
logger.error(f"Error loading {self.filtersets_file}: {e}")
return {}
@staticmethod
def _get_default_config() -> Dict:
"""Get default configuration"""
return {
'ai': {
'enabled': False, # Disabled by default until API key is configured
'openrouter_key_file': 'openrouter_key.txt',
'models': {
'cheap': 'meta-llama/llama-3.3-70b-instruct',
'smart': 'anthropic/claude-3.5-sonnet'
},
'parallel_workers': 10,
'timeout_seconds': 60
},
'cache': {
'enabled': True,
'ai_cache_dir': 'data/filter_cache',
'filterset_cache_ttl_hours': 24
},
'pipeline': {
'default_stages': ['categorizer', 'moderator', 'filter', 'ranker'],
'batch_size': 50,
'enable_parallel': True
},
'output': {
'filtered_dir': 'data/filtered',
'save_rejected': False # Don't save posts that fail filters
}
}
# ===== AI Configuration =====
def is_ai_enabled(self) -> bool:
"""Check if AI processing is enabled"""
return self.config.get('ai', {}).get('enabled', False)
def get_openrouter_key(self) -> Optional[str]:
"""Get OpenRouter API key"""
# Try environment variable first
key = os.getenv('OPENROUTER_API_KEY')
if key:
return key
# Try key file
key_file = self.config.get('ai', {}).get('openrouter_key_file')
if key_file and Path(key_file).exists():
try:
with open(key_file, 'r') as f:
return f.read().strip()
except Exception as e:
logger.error(f"Error reading API key from {key_file}: {e}")
return None
def get_ai_model(self, model_type: str = 'cheap') -> str:
"""Get AI model name for a given type (cheap/smart)"""
models = self.config.get('ai', {}).get('models', {})
return models.get(model_type, 'meta-llama/llama-3.3-70b-instruct')
def get_parallel_workers(self) -> int:
"""Get number of parallel workers for AI processing"""
return self.config.get('ai', {}).get('parallel_workers', 10)
# ===== Cache Configuration =====
def is_cache_enabled(self) -> bool:
"""Check if caching is enabled"""
return self.config.get('cache', {}).get('enabled', True)
def get_cache_dir(self) -> str:
"""Get cache directory path"""
return self.config.get('cache', {}).get('ai_cache_dir', 'data/filter_cache')
def get_cache_ttl_hours(self) -> int:
"""Get filterset cache TTL in hours"""
return self.config.get('cache', {}).get('filterset_cache_ttl_hours', 24)
# ===== Pipeline Configuration =====
def get_default_stages(self) -> List[str]:
"""Get default pipeline stages"""
return self.config.get('pipeline', {}).get('default_stages', [
'categorizer', 'moderator', 'filter', 'ranker'
])
def get_batch_size(self) -> int:
"""Get batch processing size"""
return self.config.get('pipeline', {}).get('batch_size', 50)
def is_parallel_enabled(self) -> bool:
"""Check if parallel processing is enabled"""
return self.config.get('pipeline', {}).get('enable_parallel', True)
# ===== Filterset Methods =====
def get_filterset(self, name: str) -> Optional[Dict]:
"""Get filterset configuration by name"""
return self.filtersets.get(name)
def get_filterset_names(self) -> List[str]:
"""Get list of available filterset names"""
return list(self.filtersets.keys())
def get_filterset_version(self, name: str) -> Optional[str]:
"""Get version of filterset (for cache invalidation)"""
filterset = self.get_filterset(name)
if not filterset:
return None
# Use explicit version if present
if 'version' in filterset:
return str(filterset['version'])
# Otherwise compute hash of definition
import hashlib
definition_json = json.dumps(filterset, sort_keys=True)
return hashlib.md5(definition_json.encode()).hexdigest()[:8]
# ===== Output Configuration =====
def get_filtered_dir(self) -> str:
"""Get directory for filtered posts"""
return self.config.get('output', {}).get('filtered_dir', 'data/filtered')
def should_save_rejected(self) -> bool:
"""Check if rejected posts should be saved"""
return self.config.get('output', {}).get('save_rejected', False)
# ===== Utility Methods =====
def reload(self):
"""Reload configurations from disk"""
self.config = self._load_config()
self.filtersets = self._load_filtersets()
logger.info("Configuration reloaded")
def get_config_summary(self) -> Dict[str, Any]:
"""Get summary of configuration"""
return {
'ai_enabled': self.is_ai_enabled(),
'cache_enabled': self.is_cache_enabled(),
'parallel_enabled': self.is_parallel_enabled(),
'num_filtersets': len(self.filtersets),
'filterset_names': self.get_filterset_names(),
'default_stages': self.get_default_stages(),
'batch_size': self.get_batch_size()
}