""" Configuration Loader Loads and validates filter pipeline configuration. """ import json import os import logging from pathlib import Path from typing import Dict, List, Any, Optional logger = logging.getLogger(__name__) class FilterConfig: """Configuration for filter pipeline""" def __init__( self, config_file: str = 'filter_config.json', filtersets_file: str = 'filtersets.json' ): self.config_file = Path(config_file) self.filtersets_file = Path(filtersets_file) # Load configurations self.config = self._load_config() self.filtersets = self._load_filtersets() def _load_config(self) -> Dict: """Load filter_config.json""" if not self.config_file.exists(): logger.warning(f"{self.config_file} not found, using defaults") return self._get_default_config() try: with open(self.config_file, 'r') as f: config = json.load(f) logger.info(f"Loaded filter config from {self.config_file}") return config except Exception as e: logger.error(f"Error loading {self.config_file}: {e}") return self._get_default_config() def _load_filtersets(self) -> Dict: """Load filtersets.json""" if not self.filtersets_file.exists(): logger.error(f"{self.filtersets_file} not found!") return {} try: with open(self.filtersets_file, 'r') as f: filtersets = json.load(f) logger.info(f"Loaded {len(filtersets)} filtersets from {self.filtersets_file}") return filtersets except Exception as e: logger.error(f"Error loading {self.filtersets_file}: {e}") return {} @staticmethod def _get_default_config() -> Dict: """Get default configuration""" return { 'ai': { 'enabled': False, # Disabled by default until API key is configured 'openrouter_key_file': 'openrouter_key.txt', 'models': { 'cheap': 'meta-llama/llama-3.3-70b-instruct', 'smart': 'anthropic/claude-3.5-sonnet' }, 'parallel_workers': 10, 'timeout_seconds': 60 }, 'cache': { 'enabled': True, 'ai_cache_dir': 'data/filter_cache', 'filterset_cache_ttl_hours': 24 }, 'pipeline': { 'default_stages': ['categorizer', 'moderator', 'filter', 'ranker'], 'batch_size': 50, 'enable_parallel': True }, 'output': { 'filtered_dir': 'data/filtered', 'save_rejected': False # Don't save posts that fail filters } } # ===== AI Configuration ===== def is_ai_enabled(self) -> bool: """Check if AI processing is enabled""" return self.config.get('ai', {}).get('enabled', False) def get_openrouter_key(self) -> Optional[str]: """Get OpenRouter API key""" # Try environment variable first key = os.getenv('OPENROUTER_API_KEY') if key: return key # Try key file key_file = self.config.get('ai', {}).get('openrouter_key_file') if key_file and Path(key_file).exists(): try: with open(key_file, 'r') as f: return f.read().strip() except Exception as e: logger.error(f"Error reading API key from {key_file}: {e}") return None def get_ai_model(self, model_type: str = 'cheap') -> str: """Get AI model name for a given type (cheap/smart)""" models = self.config.get('ai', {}).get('models', {}) return models.get(model_type, 'meta-llama/llama-3.3-70b-instruct') def get_parallel_workers(self) -> int: """Get number of parallel workers for AI processing""" return self.config.get('ai', {}).get('parallel_workers', 10) # ===== Cache Configuration ===== def is_cache_enabled(self) -> bool: """Check if caching is enabled""" return self.config.get('cache', {}).get('enabled', True) def get_cache_dir(self) -> str: """Get cache directory path""" return self.config.get('cache', {}).get('ai_cache_dir', 'data/filter_cache') def get_cache_ttl_hours(self) -> int: """Get filterset cache TTL in hours""" return self.config.get('cache', {}).get('filterset_cache_ttl_hours', 24) # ===== Pipeline Configuration ===== def get_default_stages(self) -> List[str]: """Get default pipeline stages""" return self.config.get('pipeline', {}).get('default_stages', [ 'categorizer', 'moderator', 'filter', 'ranker' ]) def get_batch_size(self) -> int: """Get batch processing size""" return self.config.get('pipeline', {}).get('batch_size', 50) def is_parallel_enabled(self) -> bool: """Check if parallel processing is enabled""" return self.config.get('pipeline', {}).get('enable_parallel', True) # ===== Filterset Methods ===== def get_filterset(self, name: str) -> Optional[Dict]: """Get filterset configuration by name""" return self.filtersets.get(name) def get_filterset_names(self) -> List[str]: """Get list of available filterset names""" return list(self.filtersets.keys()) def get_filterset_version(self, name: str) -> Optional[str]: """Get version of filterset (for cache invalidation)""" filterset = self.get_filterset(name) if not filterset: return None # Use explicit version if present if 'version' in filterset: return str(filterset['version']) # Otherwise compute hash of definition import hashlib definition_json = json.dumps(filterset, sort_keys=True) return hashlib.md5(definition_json.encode()).hexdigest()[:8] # ===== Output Configuration ===== def get_filtered_dir(self) -> str: """Get directory for filtered posts""" return self.config.get('output', {}).get('filtered_dir', 'data/filtered') def should_save_rejected(self) -> bool: """Check if rejected posts should be saved""" return self.config.get('output', {}).get('save_rejected', False) # ===== Utility Methods ===== def reload(self): """Reload configurations from disk""" self.config = self._load_config() self.filtersets = self._load_filtersets() logger.info("Configuration reloaded") def get_config_summary(self) -> Dict[str, Any]: """Get summary of configuration""" return { 'ai_enabled': self.is_ai_enabled(), 'cache_enabled': self.is_cache_enabled(), 'parallel_enabled': self.is_parallel_enabled(), 'num_filtersets': len(self.filtersets), 'filterset_names': self.get_filterset_names(), 'default_stages': self.get_default_stages(), 'batch_size': self.get_batch_size() }