""" Filter Engine Main orchestrator for content filtering pipeline. """ import logging import traceback from typing import List, Dict, Any, Optional from datetime import datetime from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from .config import FilterConfig from .cache import FilterCache from .models import FilterResult, ProcessingStatus, AIAnalysisResult logger = logging.getLogger(__name__) class FilterEngine: """ Main filter pipeline orchestrator. Coordinates multi-stage content filtering with intelligent caching. Compatible with user preferences and filterset selections. """ _instance = None def __init__( self, config_file: str = 'filter_config.json', filtersets_file: str = 'filtersets.json' ): """ Initialize filter engine. Args: config_file: Path to filter_config.json filtersets_file: Path to filtersets.json """ self.config = FilterConfig(config_file, filtersets_file) self.cache = FilterCache(self.config.get_cache_dir()) # Lazy-loaded stages (will be imported when AI is enabled) self._stages = None logger.info("FilterEngine initialized") logger.info(f"Configuration: {self.config.get_config_summary()}") @classmethod def get_instance(cls) -> 'FilterEngine': """Get singleton instance of FilterEngine""" if cls._instance is None: cls._instance = cls() return cls._instance def _init_stages(self): """Initialize pipeline stages (lazy loading)""" if self._stages is not None: return from .stages.categorizer import CategorizerStage from .stages.moderator import ModeratorStage from .stages.filter import FilterStage from .stages.ranker import RankerStage # Initialize stages based on configuration self._stages = { 'categorizer': CategorizerStage(self.config, self.cache), 'moderator': ModeratorStage(self.config, self.cache), 'filter': FilterStage(self.config, self.cache), 'ranker': RankerStage(self.config, self.cache) } logger.info(f"Initialized {len(self._stages)} pipeline stages") def apply_filterset( self, posts: List[Dict[str, Any]], filterset_name: str = 'no_filter', use_cache: bool = True ) -> List[Dict[str, Any]]: """ Apply filterset to posts (compatible with user preferences). This is the main public API used by app.py when loading user feeds. Args: posts: List of post dictionaries filterset_name: Name of filterset from user settings (e.g., 'safe_content') use_cache: Whether to use cached results Returns: List of posts that passed the filter, with score and metadata added """ if not posts: return [] # Validate filterset exists filterset = self.config.get_filterset(filterset_name) if not filterset: logger.warning(f"Filterset '{filterset_name}' not found, using 'no_filter'") filterset_name = 'no_filter' logger.info(f"Applying filterset '{filterset_name}' to {len(posts)} posts") # Check if we have cached filterset results if use_cache and self.config.is_cache_enabled(): filterset_version = self.config.get_filterset_version(filterset_name) cached_results = self.cache.get_filterset_results( filterset_name, filterset_version, self.config.get_cache_ttl_hours() ) if cached_results: # Filter posts using cached results filtered_posts = [] for post in posts: post_uuid = post.get('uuid') if post_uuid in cached_results: result = cached_results[post_uuid] if result.passed: # Add filter metadata to post post['_filter_score'] = result.score post['_filter_categories'] = result.categories post['_filter_tags'] = result.tags filtered_posts.append(post) logger.info(f"Cache hit: {len(filtered_posts)}/{len(posts)} posts passed filter") return filtered_posts # Cache miss or disabled - process posts through pipeline results = self.process_batch(posts, filterset_name) # Save to filterset cache if self.config.is_cache_enabled(): filterset_version = self.config.get_filterset_version(filterset_name) results_dict = {r.post_uuid: r for r in results} self.cache.set_filterset_results(filterset_name, filterset_version, results_dict) # Build filtered post list filtered_posts = [] results_by_uuid = {r.post_uuid: r for r in results} for post in posts: post_uuid = post.get('uuid') result = results_by_uuid.get(post_uuid) if result and result.passed: # Add filter metadata to post post['_filter_score'] = result.score post['_filter_categories'] = result.categories post['_filter_tags'] = result.tags filtered_posts.append(post) logger.info(f"Processed: {len(filtered_posts)}/{len(posts)} posts passed filter") return filtered_posts def process_batch( self, posts: List[Dict[str, Any]], filterset_name: str = 'no_filter' ) -> List[FilterResult]: """ Process batch of posts through pipeline. Args: posts: List of post dictionaries filterset_name: Name of filterset to apply Returns: List of FilterResults for each post """ if not posts: return [] # Special case: no_filter passes everything with default scores if filterset_name == 'no_filter': return self._process_no_filter(posts) # Initialize stages if needed if self.config.is_ai_enabled(): self._init_stages() # If AI is disabled but filterset requires it, fall back to no_filter if not self.config.is_ai_enabled() and filterset_name != 'no_filter': logger.warning(f"AI disabled but '{filterset_name}' requires AI - falling back to 'no_filter'") return self._process_no_filter(posts) # Get pipeline stages for this filterset stage_names = self._get_stages_for_filterset(filterset_name) # Process posts (parallel or sequential based on config) if self.config.is_parallel_enabled(): results = self._process_batch_parallel(posts, filterset_name, stage_names) else: results = self._process_batch_sequential(posts, filterset_name, stage_names) return results def _process_no_filter(self, posts: List[Dict[str, Any]]) -> List[FilterResult]: """Process posts with no_filter (all pass with default scores)""" results = [] for post in posts: result = FilterResult( post_uuid=post.get('uuid', ''), passed=True, score=0.5, # Neutral score categories=[], tags=[], filterset_name='no_filter', processed_at=datetime.now(), status=ProcessingStatus.COMPLETED ) results.append(result) return results def _get_stages_for_filterset(self, filterset_name: str) -> List[str]: """Get pipeline stages to run for a filterset""" filterset = self.config.get_filterset(filterset_name) # Check if filterset specifies custom stages if filterset and 'pipeline_stages' in filterset: return filterset['pipeline_stages'] # Use default stages return self.config.get_default_stages() def _process_batch_parallel( self, posts: List[Dict[str, Any]], filterset_name: str, stage_names: List[str] ) -> List[FilterResult]: """Process posts in parallel""" results = [None] * len(posts) workers = self.config.get_parallel_workers() def process_single_post(idx_post): idx, post = idx_post try: result = self._process_single_post(post, filterset_name, stage_names) return idx, result except Exception as e: logger.error(f"Error processing post {idx}: {e}") logger.error(traceback.format_exc()) # Return failed result return idx, FilterResult( post_uuid=post.get('uuid', ''), passed=False, score=0.0, filterset_name=filterset_name, processed_at=datetime.now(), status=ProcessingStatus.FAILED, error=str(e) ) with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(process_single_post, (i, post)): i for i, post in enumerate(posts)} for future in as_completed(futures): idx, result = future.result() results[idx] = result return results def _process_batch_sequential( self, posts: List[Dict[str, Any]], filterset_name: str, stage_names: List[str] ) -> List[FilterResult]: """Process posts sequentially""" results = [] for post in posts: try: result = self._process_single_post(post, filterset_name, stage_names) results.append(result) except Exception as e: logger.error(f"Error processing post: {e}") results.append(FilterResult( post_uuid=post.get('uuid', ''), passed=False, score=0.0, filterset_name=filterset_name, processed_at=datetime.now(), status=ProcessingStatus.FAILED, error=str(e) )) return results def _process_single_post( self, post: Dict[str, Any], filterset_name: str, stage_names: List[str] ) -> FilterResult: """ Process single post through pipeline stages. Stages are run sequentially: Categorizer → Moderator → Filter → Ranker """ # Initialize result result = FilterResult( post_uuid=post.get('uuid', ''), passed=True, # Start as passed, stages can reject score=0.5, # Default score filterset_name=filterset_name, processed_at=datetime.now(), status=ProcessingStatus.PROCESSING ) # Run each stage for stage_name in stage_names: if stage_name not in self._stages: logger.warning(f"Stage '{stage_name}' not found, skipping") continue stage = self._stages[stage_name] if not stage.is_enabled(): logger.debug(f"Stage '{stage_name}' disabled, skipping") continue # Process through stage try: result = stage.process(post, result) # If post was rejected by this stage, stop processing if not result.passed: logger.debug(f"Post {post.get('uuid', '')} rejected by {stage_name}") break except Exception as e: logger.error(f"Error in stage '{stage_name}': {e}") result.status = ProcessingStatus.FAILED result.error = f"{stage_name}: {str(e)}" result.passed = False break # Mark as completed if not failed if result.status != ProcessingStatus.FAILED: result.status = ProcessingStatus.COMPLETED return result # ===== Utility Methods ===== def get_available_filtersets(self) -> List[str]: """Get list of available filterset names (for user settings UI)""" return self.config.get_filterset_names() def get_filterset_description(self, name: str) -> Optional[str]: """Get description of a filterset (for user settings UI)""" filterset = self.config.get_filterset(name) return filterset.get('description') if filterset else None def invalidate_filterset_cache(self, filterset_name: str): """Invalidate cache for a filterset (when definition changes)""" self.cache.invalidate_filterset(filterset_name) logger.info(f"Invalidated cache for filterset '{filterset_name}'") def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics""" return self.cache.get_cache_stats() def reload_config(self): """Reload configuration from disk""" self.config.reload() self._stages = None # Force re-initialization of stages logger.info("Configuration reloaded")