From 94e12041ec473d9a6400b6768c7c6c150302cb6c Mon Sep 17 00:00:00 2001 From: chelsea Date: Sat, 11 Oct 2025 22:46:10 -0500 Subject: [PATCH] Add filter pipeline core infrastructure (Phase 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements plugin-based content filtering system with multi-level caching: Core Components: - FilterEngine: Main orchestrator for content filtering - FilterCache: 3-level caching (memory, AI results, filterset results) - FilterConfig: Configuration loader for filter_config.json & filtersets.json - FilterResult & AIAnalysisResult: Data models for filter results Architecture: - BaseStage: Abstract class for pipeline stages - BaseFilterPlugin: Abstract class for filter plugins - Multi-threaded parallel processing support - Content-hash based AI result caching (cost savings) - Filterset result caching (fast filterset switching) Configuration: - filter_config.json: AI models, caching, parallel workers - Using only Llama 70B for cost efficiency - Compatible with existing filtersets.json Integration: - apply_filterset() API compatible with user preferences - process_batch() for batch post processing - Lazy-loaded stages to avoid import errors when AI disabled Related to issue #8 (filtering engine implementation) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- filter_config.json | 27 ++ filter_pipeline/__init__.py | 10 + filter_pipeline/cache.py | 259 ++++++++++++++++++ filter_pipeline/config.py | 206 +++++++++++++++ filter_pipeline/engine.py | 376 +++++++++++++++++++++++++++ filter_pipeline/models.py | 121 +++++++++ filter_pipeline/plugins/__init__.py | 8 + filter_pipeline/plugins/base.py | 66 +++++ filter_pipeline/stages/__init__.py | 8 + filter_pipeline/stages/base_stage.py | 62 +++++ 10 files changed, 1143 insertions(+) create mode 100644 filter_config.json create mode 100644 filter_pipeline/__init__.py create mode 100644 filter_pipeline/cache.py create mode 100644 filter_pipeline/config.py create mode 100644 filter_pipeline/engine.py create mode 100644 filter_pipeline/models.py create mode 100644 filter_pipeline/plugins/__init__.py create mode 100644 filter_pipeline/plugins/base.py create mode 100644 filter_pipeline/stages/__init__.py create mode 100644 filter_pipeline/stages/base_stage.py diff --git a/filter_config.json b/filter_config.json new file mode 100644 index 0000000..7b19b2e --- /dev/null +++ b/filter_config.json @@ -0,0 +1,27 @@ +{ + "ai": { + "enabled": false, + "openrouter_key_file": "openrouter_key.txt", + "models": { + "cheap": "meta-llama/llama-3.3-70b-instruct", + "smart": "meta-llama/llama-3.3-70b-instruct" + }, + "parallel_workers": 10, + "timeout_seconds": 60, + "note": "Using only Llama 70B for cost efficiency" + }, + "cache": { + "enabled": true, + "ai_cache_dir": "data/filter_cache", + "filterset_cache_ttl_hours": 24 + }, + "pipeline": { + "default_stages": ["categorizer", "moderator", "filter", "ranker"], + "batch_size": 50, + "enable_parallel": true + }, + "output": { + "filtered_dir": "data/filtered", + "save_rejected": false + } +} diff --git a/filter_pipeline/__init__.py b/filter_pipeline/__init__.py new file mode 100644 index 0000000..8e88f62 --- /dev/null +++ b/filter_pipeline/__init__.py @@ -0,0 +1,10 @@ +""" +Filter Pipeline Package +Content filtering, categorization, and ranking system for BalanceBoard. +""" + +from .engine import FilterEngine +from .models import FilterResult, ProcessingStatus + +__all__ = ['FilterEngine', 'FilterResult', 'ProcessingStatus'] +__version__ = '1.0.0' diff --git a/filter_pipeline/cache.py b/filter_pipeline/cache.py new file mode 100644 index 0000000..7209af4 --- /dev/null +++ b/filter_pipeline/cache.py @@ -0,0 +1,259 @@ +""" +Multi-Level Caching System +Implements 3-tier caching for filter pipeline efficiency. +""" + +import json +import hashlib +import os +import logging +from pathlib import Path +from typing import Optional, Dict, Any +from datetime import datetime, timedelta +from .models import AIAnalysisResult, FilterResult + +logger = logging.getLogger(__name__) + + +class FilterCache: + """ + Three-level caching system: + Level 1: In-memory cache (fastest, TTL-based) + Level 2: AI analysis cache (persistent, content-hash based) + Level 3: Filterset result cache (persistent, filterset version based) + """ + + def __init__(self, cache_dir: str = 'data/filter_cache'): + self.cache_dir = Path(cache_dir) + self.cache_dir.mkdir(parents=True, exist_ok=True) + + # Level 1: In-memory cache + self.memory_cache: Dict[str, tuple[Any, datetime]] = {} + self.memory_ttl = timedelta(minutes=5) + + # Level 2: AI analysis cache directory + self.ai_cache_dir = self.cache_dir / 'ai_analysis' + self.ai_cache_dir.mkdir(exist_ok=True) + + # Level 3: Filterset result cache directory + self.filterset_cache_dir = self.cache_dir / 'filtersets' + self.filterset_cache_dir.mkdir(exist_ok=True) + + # ===== Level 1: Memory Cache ===== + + def get_memory(self, key: str) -> Optional[Any]: + """Get from memory cache if not expired""" + if key in self.memory_cache: + value, timestamp = self.memory_cache[key] + if datetime.now() - timestamp < self.memory_ttl: + return value + else: + # Expired, remove + del self.memory_cache[key] + return None + + def set_memory(self, key: str, value: Any): + """Store in memory cache""" + self.memory_cache[key] = (value, datetime.now()) + + def clear_memory(self): + """Clear all memory cache""" + self.memory_cache.clear() + + # ===== Level 2: AI Analysis Cache (Persistent) ===== + + @staticmethod + def compute_content_hash(title: str, content: str) -> str: + """Compute SHA-256 hash of content for caching""" + text = f"{title}\n{content}".encode('utf-8') + return hashlib.sha256(text).hexdigest() + + def get_ai_analysis(self, content_hash: str) -> Optional[AIAnalysisResult]: + """ + Get AI analysis result from cache. + + Args: + content_hash: SHA-256 hash of content + + Returns: + AIAnalysisResult if cached, None otherwise + """ + # Check memory first + mem_key = f"ai_{content_hash}" + cached = self.get_memory(mem_key) + if cached: + return cached + + # Check disk + cache_file = self.ai_cache_dir / f"{content_hash}.json" + if cache_file.exists(): + try: + with open(cache_file, 'r') as f: + data = json.load(f) + result = AIAnalysisResult.from_dict(data) + + # Store in memory for faster access + self.set_memory(mem_key, result) + + logger.debug(f"AI analysis cache hit for {content_hash[:8]}...") + return result + except Exception as e: + logger.error(f"Error loading AI cache {content_hash}: {e}") + return None + + return None + + def set_ai_analysis(self, content_hash: str, result: AIAnalysisResult): + """ + Store AI analysis result in cache (persistent). + + Args: + content_hash: SHA-256 hash of content + result: AIAnalysisResult to cache + """ + # Store in memory + mem_key = f"ai_{content_hash}" + self.set_memory(mem_key, result) + + # Store on disk (persistent) + cache_file = self.ai_cache_dir / f"{content_hash}.json" + try: + with open(cache_file, 'w') as f: + json.dump(result.to_dict(), f, indent=2) + logger.debug(f"Cached AI analysis for {content_hash[:8]}...") + except Exception as e: + logger.error(f"Error saving AI cache {content_hash}: {e}") + + # ===== Level 3: Filterset Result Cache ===== + + def _get_filterset_version(self, filterset_name: str, filtersets_config: Dict) -> str: + """Get version hash of filterset definition for cache invalidation""" + filterset_def = filtersets_config.get(filterset_name, {}) + # Include version field if present, otherwise hash the entire definition + if 'version' in filterset_def: + return str(filterset_def['version']) + + # Compute hash of filterset definition + definition_json = json.dumps(filterset_def, sort_keys=True) + return hashlib.md5(definition_json.encode()).hexdigest()[:8] + + def get_filterset_results( + self, + filterset_name: str, + filterset_version: str, + max_age_hours: int = 24 + ) -> Optional[Dict[str, FilterResult]]: + """ + Get cached filterset results. + + Args: + filterset_name: Name of filterset + filterset_version: Version hash of filterset definition + max_age_hours: Maximum age of cache in hours + + Returns: + Dict mapping post_uuid to FilterResult, or None if cache invalid + """ + cache_file = self.filterset_cache_dir / f"{filterset_name}_{filterset_version}.json" + + if not cache_file.exists(): + return None + + # Check age + try: + file_age = datetime.now() - datetime.fromtimestamp(cache_file.stat().st_mtime) + if file_age > timedelta(hours=max_age_hours): + logger.debug(f"Filterset cache expired for {filterset_name}") + return None + + # Load cache + with open(cache_file, 'r') as f: + data = json.load(f) + + # Deserialize FilterResults + results = { + uuid: FilterResult.from_dict(result_data) + for uuid, result_data in data.items() + } + + logger.info(f"Filterset cache hit for {filterset_name} ({len(results)} results)") + return results + + except Exception as e: + logger.error(f"Error loading filterset cache {filterset_name}: {e}") + return None + + def set_filterset_results( + self, + filterset_name: str, + filterset_version: str, + results: Dict[str, FilterResult] + ): + """ + Store filterset results in cache. + + Args: + filterset_name: Name of filterset + filterset_version: Version hash of filterset definition + results: Dict mapping post_uuid to FilterResult + """ + cache_file = self.filterset_cache_dir / f"{filterset_name}_{filterset_version}.json" + + try: + # Serialize FilterResults + data = { + uuid: result.to_dict() + for uuid, result in results.items() + } + + with open(cache_file, 'w') as f: + json.dump(data, f, indent=2) + + logger.info(f"Cached {len(results)} filterset results for {filterset_name}") + except Exception as e: + logger.error(f"Error saving filterset cache {filterset_name}: {e}") + + def invalidate_filterset(self, filterset_name: str): + """ + Invalidate all caches for a filterset (when definition changes). + + Args: + filterset_name: Name of filterset to invalidate + """ + pattern = f"{filterset_name}_*.json" + for cache_file in self.filterset_cache_dir.glob(pattern): + try: + cache_file.unlink() + logger.info(f"Invalidated filterset cache: {cache_file.name}") + except Exception as e: + logger.error(f"Error invalidating cache {cache_file}: {e}") + + # ===== Utility Methods ===== + + def get_cache_stats(self) -> Dict[str, Any]: + """Get cache statistics""" + ai_cache_count = len(list(self.ai_cache_dir.glob('*.json'))) + filterset_cache_count = len(list(self.filterset_cache_dir.glob('*.json'))) + memory_cache_count = len(self.memory_cache) + + return { + 'memory_cache_size': memory_cache_count, + 'ai_cache_size': ai_cache_count, + 'filterset_cache_size': filterset_cache_count, + 'ai_cache_dir': str(self.ai_cache_dir), + 'filterset_cache_dir': str(self.filterset_cache_dir) + } + + def clear_all(self): + """Clear all caches (use with caution!)""" + self.clear_memory() + + # Clear AI cache + for cache_file in self.ai_cache_dir.glob('*.json'): + cache_file.unlink() + + # Clear filterset cache + for cache_file in self.filterset_cache_dir.glob('*.json'): + cache_file.unlink() + + logger.warning("All filter caches cleared") diff --git a/filter_pipeline/config.py b/filter_pipeline/config.py new file mode 100644 index 0000000..e7db7f0 --- /dev/null +++ b/filter_pipeline/config.py @@ -0,0 +1,206 @@ +""" +Configuration Loader +Loads and validates filter pipeline configuration. +""" + +import json +import os +import logging +from pathlib import Path +from typing import Dict, List, Any, Optional + +logger = logging.getLogger(__name__) + + +class FilterConfig: + """Configuration for filter pipeline""" + + def __init__( + self, + config_file: str = 'filter_config.json', + filtersets_file: str = 'filtersets.json' + ): + self.config_file = Path(config_file) + self.filtersets_file = Path(filtersets_file) + + # Load configurations + self.config = self._load_config() + self.filtersets = self._load_filtersets() + + def _load_config(self) -> Dict: + """Load filter_config.json""" + if not self.config_file.exists(): + logger.warning(f"{self.config_file} not found, using defaults") + return self._get_default_config() + + try: + with open(self.config_file, 'r') as f: + config = json.load(f) + logger.info(f"Loaded filter config from {self.config_file}") + return config + except Exception as e: + logger.error(f"Error loading {self.config_file}: {e}") + return self._get_default_config() + + def _load_filtersets(self) -> Dict: + """Load filtersets.json""" + if not self.filtersets_file.exists(): + logger.error(f"{self.filtersets_file} not found!") + return {} + + try: + with open(self.filtersets_file, 'r') as f: + filtersets = json.load(f) + logger.info(f"Loaded {len(filtersets)} filtersets from {self.filtersets_file}") + return filtersets + except Exception as e: + logger.error(f"Error loading {self.filtersets_file}: {e}") + return {} + + @staticmethod + def _get_default_config() -> Dict: + """Get default configuration""" + return { + 'ai': { + 'enabled': False, # Disabled by default until API key is configured + 'openrouter_key_file': 'openrouter_key.txt', + 'models': { + 'cheap': 'meta-llama/llama-3.3-70b-instruct', + 'smart': 'anthropic/claude-3.5-sonnet' + }, + 'parallel_workers': 10, + 'timeout_seconds': 60 + }, + 'cache': { + 'enabled': True, + 'ai_cache_dir': 'data/filter_cache', + 'filterset_cache_ttl_hours': 24 + }, + 'pipeline': { + 'default_stages': ['categorizer', 'moderator', 'filter', 'ranker'], + 'batch_size': 50, + 'enable_parallel': True + }, + 'output': { + 'filtered_dir': 'data/filtered', + 'save_rejected': False # Don't save posts that fail filters + } + } + + # ===== AI Configuration ===== + + def is_ai_enabled(self) -> bool: + """Check if AI processing is enabled""" + return self.config.get('ai', {}).get('enabled', False) + + def get_openrouter_key(self) -> Optional[str]: + """Get OpenRouter API key""" + # Try environment variable first + key = os.getenv('OPENROUTER_API_KEY') + if key: + return key + + # Try key file + key_file = self.config.get('ai', {}).get('openrouter_key_file') + if key_file and Path(key_file).exists(): + try: + with open(key_file, 'r') as f: + return f.read().strip() + except Exception as e: + logger.error(f"Error reading API key from {key_file}: {e}") + + return None + + def get_ai_model(self, model_type: str = 'cheap') -> str: + """Get AI model name for a given type (cheap/smart)""" + models = self.config.get('ai', {}).get('models', {}) + return models.get(model_type, 'meta-llama/llama-3.3-70b-instruct') + + def get_parallel_workers(self) -> int: + """Get number of parallel workers for AI processing""" + return self.config.get('ai', {}).get('parallel_workers', 10) + + # ===== Cache Configuration ===== + + def is_cache_enabled(self) -> bool: + """Check if caching is enabled""" + return self.config.get('cache', {}).get('enabled', True) + + def get_cache_dir(self) -> str: + """Get cache directory path""" + return self.config.get('cache', {}).get('ai_cache_dir', 'data/filter_cache') + + def get_cache_ttl_hours(self) -> int: + """Get filterset cache TTL in hours""" + return self.config.get('cache', {}).get('filterset_cache_ttl_hours', 24) + + # ===== Pipeline Configuration ===== + + def get_default_stages(self) -> List[str]: + """Get default pipeline stages""" + return self.config.get('pipeline', {}).get('default_stages', [ + 'categorizer', 'moderator', 'filter', 'ranker' + ]) + + def get_batch_size(self) -> int: + """Get batch processing size""" + return self.config.get('pipeline', {}).get('batch_size', 50) + + def is_parallel_enabled(self) -> bool: + """Check if parallel processing is enabled""" + return self.config.get('pipeline', {}).get('enable_parallel', True) + + # ===== Filterset Methods ===== + + def get_filterset(self, name: str) -> Optional[Dict]: + """Get filterset configuration by name""" + return self.filtersets.get(name) + + def get_filterset_names(self) -> List[str]: + """Get list of available filterset names""" + return list(self.filtersets.keys()) + + def get_filterset_version(self, name: str) -> Optional[str]: + """Get version of filterset (for cache invalidation)""" + filterset = self.get_filterset(name) + if not filterset: + return None + + # Use explicit version if present + if 'version' in filterset: + return str(filterset['version']) + + # Otherwise compute hash of definition + import hashlib + definition_json = json.dumps(filterset, sort_keys=True) + return hashlib.md5(definition_json.encode()).hexdigest()[:8] + + # ===== Output Configuration ===== + + def get_filtered_dir(self) -> str: + """Get directory for filtered posts""" + return self.config.get('output', {}).get('filtered_dir', 'data/filtered') + + def should_save_rejected(self) -> bool: + """Check if rejected posts should be saved""" + return self.config.get('output', {}).get('save_rejected', False) + + # ===== Utility Methods ===== + + def reload(self): + """Reload configurations from disk""" + self.config = self._load_config() + self.filtersets = self._load_filtersets() + logger.info("Configuration reloaded") + + def get_config_summary(self) -> Dict[str, Any]: + """Get summary of configuration""" + return { + 'ai_enabled': self.is_ai_enabled(), + 'cache_enabled': self.is_cache_enabled(), + 'parallel_enabled': self.is_parallel_enabled(), + 'num_filtersets': len(self.filtersets), + 'filterset_names': self.get_filterset_names(), + 'default_stages': self.get_default_stages(), + 'batch_size': self.get_batch_size() + } diff --git a/filter_pipeline/engine.py b/filter_pipeline/engine.py new file mode 100644 index 0000000..e79e5a4 --- /dev/null +++ b/filter_pipeline/engine.py @@ -0,0 +1,376 @@ +""" +Filter Engine +Main orchestrator for content filtering pipeline. +""" + +import logging +import traceback +from typing import List, Dict, Any, Optional +from datetime import datetime +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed + +from .config import FilterConfig +from .cache import FilterCache +from .models import FilterResult, ProcessingStatus, AIAnalysisResult + +logger = logging.getLogger(__name__) + + +class FilterEngine: + """ + Main filter pipeline orchestrator. + + Coordinates multi-stage content filtering with intelligent caching. + Compatible with user preferences and filterset selections. + """ + + _instance = None + + def __init__( + self, + config_file: str = 'filter_config.json', + filtersets_file: str = 'filtersets.json' + ): + """ + Initialize filter engine. + + Args: + config_file: Path to filter_config.json + filtersets_file: Path to filtersets.json + """ + self.config = FilterConfig(config_file, filtersets_file) + self.cache = FilterCache(self.config.get_cache_dir()) + + # Lazy-loaded stages (will be imported when AI is enabled) + self._stages = None + + logger.info("FilterEngine initialized") + logger.info(f"Configuration: {self.config.get_config_summary()}") + + @classmethod + def get_instance(cls) -> 'FilterEngine': + """Get singleton instance of FilterEngine""" + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def _init_stages(self): + """Initialize pipeline stages (lazy loading)""" + if self._stages is not None: + return + + from .stages.categorizer import CategorizerStage + from .stages.moderator import ModeratorStage + from .stages.filter import FilterStage + from .stages.ranker import RankerStage + + # Initialize stages based on configuration + self._stages = { + 'categorizer': CategorizerStage(self.config, self.cache), + 'moderator': ModeratorStage(self.config, self.cache), + 'filter': FilterStage(self.config, self.cache), + 'ranker': RankerStage(self.config, self.cache) + } + + logger.info(f"Initialized {len(self._stages)} pipeline stages") + + def apply_filterset( + self, + posts: List[Dict[str, Any]], + filterset_name: str = 'no_filter', + use_cache: bool = True + ) -> List[Dict[str, Any]]: + """ + Apply filterset to posts (compatible with user preferences). + + This is the main public API used by app.py when loading user feeds. + + Args: + posts: List of post dictionaries + filterset_name: Name of filterset from user settings (e.g., 'safe_content') + use_cache: Whether to use cached results + + Returns: + List of posts that passed the filter, with score and metadata added + """ + if not posts: + return [] + + # Validate filterset exists + filterset = self.config.get_filterset(filterset_name) + if not filterset: + logger.warning(f"Filterset '{filterset_name}' not found, using 'no_filter'") + filterset_name = 'no_filter' + + logger.info(f"Applying filterset '{filterset_name}' to {len(posts)} posts") + + # Check if we have cached filterset results + if use_cache and self.config.is_cache_enabled(): + filterset_version = self.config.get_filterset_version(filterset_name) + cached_results = self.cache.get_filterset_results( + filterset_name, + filterset_version, + self.config.get_cache_ttl_hours() + ) + + if cached_results: + # Filter posts using cached results + filtered_posts = [] + for post in posts: + post_uuid = post.get('uuid') + if post_uuid in cached_results: + result = cached_results[post_uuid] + if result.passed: + # Add filter metadata to post + post['_filter_score'] = result.score + post['_filter_categories'] = result.categories + post['_filter_tags'] = result.tags + filtered_posts.append(post) + + logger.info(f"Cache hit: {len(filtered_posts)}/{len(posts)} posts passed filter") + return filtered_posts + + # Cache miss or disabled - process posts through pipeline + results = self.process_batch(posts, filterset_name) + + # Save to filterset cache + if self.config.is_cache_enabled(): + filterset_version = self.config.get_filterset_version(filterset_name) + results_dict = {r.post_uuid: r for r in results} + self.cache.set_filterset_results(filterset_name, filterset_version, results_dict) + + # Build filtered post list + filtered_posts = [] + results_by_uuid = {r.post_uuid: r for r in results} + + for post in posts: + post_uuid = post.get('uuid') + result = results_by_uuid.get(post_uuid) + + if result and result.passed: + # Add filter metadata to post + post['_filter_score'] = result.score + post['_filter_categories'] = result.categories + post['_filter_tags'] = result.tags + filtered_posts.append(post) + + logger.info(f"Processed: {len(filtered_posts)}/{len(posts)} posts passed filter") + return filtered_posts + + def process_batch( + self, + posts: List[Dict[str, Any]], + filterset_name: str = 'no_filter' + ) -> List[FilterResult]: + """ + Process batch of posts through pipeline. + + Args: + posts: List of post dictionaries + filterset_name: Name of filterset to apply + + Returns: + List of FilterResults for each post + """ + if not posts: + return [] + + # Special case: no_filter passes everything with default scores + if filterset_name == 'no_filter': + return self._process_no_filter(posts) + + # Initialize stages if needed + if self.config.is_ai_enabled(): + self._init_stages() + + # If AI is disabled but filterset requires it, fall back to no_filter + if not self.config.is_ai_enabled() and filterset_name != 'no_filter': + logger.warning(f"AI disabled but '{filterset_name}' requires AI - falling back to 'no_filter'") + return self._process_no_filter(posts) + + # Get pipeline stages for this filterset + stage_names = self._get_stages_for_filterset(filterset_name) + + # Process posts (parallel or sequential based on config) + if self.config.is_parallel_enabled(): + results = self._process_batch_parallel(posts, filterset_name, stage_names) + else: + results = self._process_batch_sequential(posts, filterset_name, stage_names) + + return results + + def _process_no_filter(self, posts: List[Dict[str, Any]]) -> List[FilterResult]: + """Process posts with no_filter (all pass with default scores)""" + results = [] + for post in posts: + result = FilterResult( + post_uuid=post.get('uuid', ''), + passed=True, + score=0.5, # Neutral score + categories=[], + tags=[], + filterset_name='no_filter', + processed_at=datetime.now(), + status=ProcessingStatus.COMPLETED + ) + results.append(result) + + return results + + def _get_stages_for_filterset(self, filterset_name: str) -> List[str]: + """Get pipeline stages to run for a filterset""" + filterset = self.config.get_filterset(filterset_name) + + # Check if filterset specifies custom stages + if filterset and 'pipeline_stages' in filterset: + return filterset['pipeline_stages'] + + # Use default stages + return self.config.get_default_stages() + + def _process_batch_parallel( + self, + posts: List[Dict[str, Any]], + filterset_name: str, + stage_names: List[str] + ) -> List[FilterResult]: + """Process posts in parallel""" + results = [None] * len(posts) + workers = self.config.get_parallel_workers() + + def process_single_post(idx_post): + idx, post = idx_post + try: + result = self._process_single_post(post, filterset_name, stage_names) + return idx, result + except Exception as e: + logger.error(f"Error processing post {idx}: {e}") + logger.error(traceback.format_exc()) + # Return failed result + return idx, FilterResult( + post_uuid=post.get('uuid', ''), + passed=False, + score=0.0, + filterset_name=filterset_name, + processed_at=datetime.now(), + status=ProcessingStatus.FAILED, + error=str(e) + ) + + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = {executor.submit(process_single_post, (i, post)): i + for i, post in enumerate(posts)} + + for future in as_completed(futures): + idx, result = future.result() + results[idx] = result + + return results + + def _process_batch_sequential( + self, + posts: List[Dict[str, Any]], + filterset_name: str, + stage_names: List[str] + ) -> List[FilterResult]: + """Process posts sequentially""" + results = [] + for post in posts: + try: + result = self._process_single_post(post, filterset_name, stage_names) + results.append(result) + except Exception as e: + logger.error(f"Error processing post: {e}") + results.append(FilterResult( + post_uuid=post.get('uuid', ''), + passed=False, + score=0.0, + filterset_name=filterset_name, + processed_at=datetime.now(), + status=ProcessingStatus.FAILED, + error=str(e) + )) + + return results + + def _process_single_post( + self, + post: Dict[str, Any], + filterset_name: str, + stage_names: List[str] + ) -> FilterResult: + """ + Process single post through pipeline stages. + + Stages are run sequentially: Categorizer → Moderator → Filter → Ranker + """ + # Initialize result + result = FilterResult( + post_uuid=post.get('uuid', ''), + passed=True, # Start as passed, stages can reject + score=0.5, # Default score + filterset_name=filterset_name, + processed_at=datetime.now(), + status=ProcessingStatus.PROCESSING + ) + + # Run each stage + for stage_name in stage_names: + if stage_name not in self._stages: + logger.warning(f"Stage '{stage_name}' not found, skipping") + continue + + stage = self._stages[stage_name] + + if not stage.is_enabled(): + logger.debug(f"Stage '{stage_name}' disabled, skipping") + continue + + # Process through stage + try: + result = stage.process(post, result) + + # If post was rejected by this stage, stop processing + if not result.passed: + logger.debug(f"Post {post.get('uuid', '')} rejected by {stage_name}") + break + + except Exception as e: + logger.error(f"Error in stage '{stage_name}': {e}") + result.status = ProcessingStatus.FAILED + result.error = f"{stage_name}: {str(e)}" + result.passed = False + break + + # Mark as completed if not failed + if result.status != ProcessingStatus.FAILED: + result.status = ProcessingStatus.COMPLETED + + return result + + # ===== Utility Methods ===== + + def get_available_filtersets(self) -> List[str]: + """Get list of available filterset names (for user settings UI)""" + return self.config.get_filterset_names() + + def get_filterset_description(self, name: str) -> Optional[str]: + """Get description of a filterset (for user settings UI)""" + filterset = self.config.get_filterset(name) + return filterset.get('description') if filterset else None + + def invalidate_filterset_cache(self, filterset_name: str): + """Invalidate cache for a filterset (when definition changes)""" + self.cache.invalidate_filterset(filterset_name) + logger.info(f"Invalidated cache for filterset '{filterset_name}'") + + def get_cache_stats(self) -> Dict[str, Any]: + """Get cache statistics""" + return self.cache.get_cache_stats() + + def reload_config(self): + """Reload configuration from disk""" + self.config.reload() + self._stages = None # Force re-initialization of stages + logger.info("Configuration reloaded") diff --git a/filter_pipeline/models.py b/filter_pipeline/models.py new file mode 100644 index 0000000..08abb4c --- /dev/null +++ b/filter_pipeline/models.py @@ -0,0 +1,121 @@ +""" +Filter Pipeline Models +Data models for filter results and processing status. +""" + +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Any +from datetime import datetime +from enum import Enum + + +class ProcessingStatus(Enum): + """Status of content processing""" + PENDING = 'pending' + PROCESSING = 'processing' + COMPLETED = 'completed' + FAILED = 'failed' + CACHED = 'cached' + + +@dataclass +class FilterResult: + """ + Result of filtering pipeline for a single post. + + Attributes: + post_uuid: Unique identifier for the post + passed: Whether post passed the filter + score: Relevance/quality score (0.0-1.0) + categories: Detected categories/topics + tags: Additional tags applied + moderation_data: Safety and quality analysis results + filterset_name: Name of filterset applied + cache_key: Content hash for caching + processed_at: Timestamp of processing + status: Processing status + error: Error message if failed + """ + post_uuid: str + passed: bool + score: float + categories: List[str] = field(default_factory=list) + tags: List[str] = field(default_factory=list) + moderation_data: Dict[str, Any] = field(default_factory=dict) + filterset_name: str = 'no_filter' + cache_key: Optional[str] = None + processed_at: Optional[datetime] = None + status: ProcessingStatus = ProcessingStatus.PENDING + error: Optional[str] = None + + # Detailed scoring breakdown + score_breakdown: Dict[str, float] = field(default_factory=dict) + + def to_dict(self) -> Dict: + """Convert to dictionary for JSON serialization""" + return { + 'post_uuid': self.post_uuid, + 'passed': self.passed, + 'score': self.score, + 'categories': self.categories, + 'tags': self.tags, + 'moderation_data': self.moderation_data, + 'filterset_name': self.filterset_name, + 'cache_key': self.cache_key, + 'processed_at': self.processed_at.isoformat() if self.processed_at else None, + 'status': self.status.value if isinstance(self.status, ProcessingStatus) else self.status, + 'error': self.error, + 'score_breakdown': self.score_breakdown + } + + @classmethod + def from_dict(cls, data: Dict) -> 'FilterResult': + """Create from dictionary""" + # Handle datetime deserialization + if data.get('processed_at') and isinstance(data['processed_at'], str): + data['processed_at'] = datetime.fromisoformat(data['processed_at']) + + # Handle enum deserialization + if data.get('status') and isinstance(data['status'], str): + data['status'] = ProcessingStatus(data['status']) + + return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}) + + +@dataclass +class AIAnalysisResult: + """ + Result of AI analysis (categorization, moderation, etc). + Cached separately from FilterResult for reuse across filtersets. + """ + content_hash: str + categories: List[str] = field(default_factory=list) + category_scores: Dict[str, float] = field(default_factory=dict) + moderation: Dict[str, Any] = field(default_factory=dict) + quality_score: float = 0.5 + sentiment: Optional[str] = None + sentiment_score: float = 0.0 + analyzed_at: Optional[datetime] = None + model_used: Optional[str] = None + + def to_dict(self) -> Dict: + """Convert to dictionary for JSON serialization""" + return { + 'content_hash': self.content_hash, + 'categories': self.categories, + 'category_scores': self.category_scores, + 'moderation': self.moderation, + 'quality_score': self.quality_score, + 'sentiment': self.sentiment, + 'sentiment_score': self.sentiment_score, + 'analyzed_at': self.analyzed_at.isoformat() if self.analyzed_at else None, + 'model_used': self.model_used + } + + @classmethod + def from_dict(cls, data: Dict) -> 'AIAnalysisResult': + """Create from dictionary""" + if data.get('analyzed_at') and isinstance(data['analyzed_at'], str): + data['analyzed_at'] = datetime.fromisoformat(data['analyzed_at']) + + return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}) diff --git a/filter_pipeline/plugins/__init__.py b/filter_pipeline/plugins/__init__.py new file mode 100644 index 0000000..050764f --- /dev/null +++ b/filter_pipeline/plugins/__init__.py @@ -0,0 +1,8 @@ +""" +Filter Plugins +Pluggable filters for content filtering. +""" + +from .base import BaseFilterPlugin + +__all__ = ['BaseFilterPlugin'] diff --git a/filter_pipeline/plugins/base.py b/filter_pipeline/plugins/base.py new file mode 100644 index 0000000..82b5f67 --- /dev/null +++ b/filter_pipeline/plugins/base.py @@ -0,0 +1,66 @@ +""" +Base Filter Plugin +Abstract base class for all filter plugins. +""" + +from abc import ABC, abstractmethod +from typing import Dict, Any, Optional + + +class BaseFilterPlugin(ABC): + """ + Abstract base class for filter plugins. + + Plugins can be used within stages to implement specific filtering logic. + Examples: keyword filtering, AI-based filtering, quality scoring, etc. + """ + + def __init__(self, config: Dict[str, Any]): + """ + Initialize plugin. + + Args: + config: Plugin configuration dictionary + """ + self.config = config + self.enabled = config.get('enabled', True) + + @abstractmethod + def should_filter(self, post: Dict[str, Any], context: Optional[Dict] = None) -> bool: + """ + Determine if post should be filtered OUT. + + Args: + post: Post data dictionary + context: Optional context from previous stages + + Returns: + True if post should be filtered OUT (rejected), False to keep it + """ + pass + + @abstractmethod + def score(self, post: Dict[str, Any], context: Optional[Dict] = None) -> float: + """ + Calculate relevance/quality score for post. + + Args: + post: Post data dictionary + context: Optional context from previous stages + + Returns: + Score from 0.0 (lowest) to 1.0 (highest) + """ + pass + + @abstractmethod + def get_name(self) -> str: + """Get plugin name for logging""" + pass + + def is_enabled(self) -> bool: + """Check if plugin is enabled""" + return self.enabled + + def __repr__(self) -> str: + return f"<{self.get_name()} enabled={self.enabled}>" diff --git a/filter_pipeline/stages/__init__.py b/filter_pipeline/stages/__init__.py new file mode 100644 index 0000000..401804e --- /dev/null +++ b/filter_pipeline/stages/__init__.py @@ -0,0 +1,8 @@ +""" +Pipeline Stages +Sequential processing stages for content filtering. +""" + +from .base_stage import BaseStage + +__all__ = ['BaseStage'] diff --git a/filter_pipeline/stages/base_stage.py b/filter_pipeline/stages/base_stage.py new file mode 100644 index 0000000..a35feea --- /dev/null +++ b/filter_pipeline/stages/base_stage.py @@ -0,0 +1,62 @@ +""" +Base Stage +Abstract base class for all pipeline stages. +""" + +from abc import ABC, abstractmethod +from typing import List, Dict, Any +from ..models import FilterResult + + +class BaseStage(ABC): + """ + Abstract base class for pipeline stages. + + Each stage processes posts sequentially and can modify FilterResults. + Stages are executed in order: Categorizer → Moderator → Filter → Ranker + """ + + def __init__(self, config: Dict[str, Any], cache: Any): + """ + Initialize stage. + + Args: + config: Configuration dictionary for this stage + cache: FilterCache instance + """ + self.config = config + self.cache = cache + self.enabled = config.get('enabled', True) + + @abstractmethod + def process( + self, + post: Dict[str, Any], + result: FilterResult + ) -> FilterResult: + """ + Process a single post and update its FilterResult. + + Args: + post: Post data dictionary + result: Current FilterResult for this post + + Returns: + Updated FilterResult + + Raises: + Exception if processing fails + """ + pass + + @abstractmethod + def get_name(self) -> str: + """Get stage name for logging""" + pass + + def is_enabled(self) -> bool: + """Check if stage is enabled""" + return self.enabled + + def __repr__(self) -> str: + return f"<{self.get_name()} enabled={self.enabled}>"