Add filter pipeline core infrastructure (Phase 1)

Implements plugin-based content filtering system with multi-level caching:

Core Components:
- FilterEngine: Main orchestrator for content filtering
- FilterCache: 3-level caching (memory, AI results, filterset results)
- FilterConfig: Configuration loader for filter_config.json & filtersets.json
- FilterResult & AIAnalysisResult: Data models for filter results

Architecture:
- BaseStage: Abstract class for pipeline stages
- BaseFilterPlugin: Abstract class for filter plugins
- Multi-threaded parallel processing support
- Content-hash based AI result caching (cost savings)
- Filterset result caching (fast filterset switching)

Configuration:
- filter_config.json: AI models, caching, parallel workers
- Using only Llama 70B for cost efficiency
- Compatible with existing filtersets.json

Integration:
- apply_filterset() API compatible with user preferences
- process_batch() for batch post processing
- Lazy-loaded stages to avoid import errors when AI disabled

Related to issue #8 (filtering engine implementation)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-11 22:46:10 -05:00
parent 07df6d8f0a
commit 94e12041ec
10 changed files with 1143 additions and 0 deletions

376
filter_pipeline/engine.py Normal file
View File

@@ -0,0 +1,376 @@
"""
Filter Engine
Main orchestrator for content filtering pipeline.
"""
import logging
import traceback
from typing import List, Dict, Any, Optional
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from .config import FilterConfig
from .cache import FilterCache
from .models import FilterResult, ProcessingStatus, AIAnalysisResult
logger = logging.getLogger(__name__)
class FilterEngine:
"""
Main filter pipeline orchestrator.
Coordinates multi-stage content filtering with intelligent caching.
Compatible with user preferences and filterset selections.
"""
_instance = None
def __init__(
self,
config_file: str = 'filter_config.json',
filtersets_file: str = 'filtersets.json'
):
"""
Initialize filter engine.
Args:
config_file: Path to filter_config.json
filtersets_file: Path to filtersets.json
"""
self.config = FilterConfig(config_file, filtersets_file)
self.cache = FilterCache(self.config.get_cache_dir())
# Lazy-loaded stages (will be imported when AI is enabled)
self._stages = None
logger.info("FilterEngine initialized")
logger.info(f"Configuration: {self.config.get_config_summary()}")
@classmethod
def get_instance(cls) -> 'FilterEngine':
"""Get singleton instance of FilterEngine"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def _init_stages(self):
"""Initialize pipeline stages (lazy loading)"""
if self._stages is not None:
return
from .stages.categorizer import CategorizerStage
from .stages.moderator import ModeratorStage
from .stages.filter import FilterStage
from .stages.ranker import RankerStage
# Initialize stages based on configuration
self._stages = {
'categorizer': CategorizerStage(self.config, self.cache),
'moderator': ModeratorStage(self.config, self.cache),
'filter': FilterStage(self.config, self.cache),
'ranker': RankerStage(self.config, self.cache)
}
logger.info(f"Initialized {len(self._stages)} pipeline stages")
def apply_filterset(
self,
posts: List[Dict[str, Any]],
filterset_name: str = 'no_filter',
use_cache: bool = True
) -> List[Dict[str, Any]]:
"""
Apply filterset to posts (compatible with user preferences).
This is the main public API used by app.py when loading user feeds.
Args:
posts: List of post dictionaries
filterset_name: Name of filterset from user settings (e.g., 'safe_content')
use_cache: Whether to use cached results
Returns:
List of posts that passed the filter, with score and metadata added
"""
if not posts:
return []
# Validate filterset exists
filterset = self.config.get_filterset(filterset_name)
if not filterset:
logger.warning(f"Filterset '{filterset_name}' not found, using 'no_filter'")
filterset_name = 'no_filter'
logger.info(f"Applying filterset '{filterset_name}' to {len(posts)} posts")
# Check if we have cached filterset results
if use_cache and self.config.is_cache_enabled():
filterset_version = self.config.get_filterset_version(filterset_name)
cached_results = self.cache.get_filterset_results(
filterset_name,
filterset_version,
self.config.get_cache_ttl_hours()
)
if cached_results:
# Filter posts using cached results
filtered_posts = []
for post in posts:
post_uuid = post.get('uuid')
if post_uuid in cached_results:
result = cached_results[post_uuid]
if result.passed:
# Add filter metadata to post
post['_filter_score'] = result.score
post['_filter_categories'] = result.categories
post['_filter_tags'] = result.tags
filtered_posts.append(post)
logger.info(f"Cache hit: {len(filtered_posts)}/{len(posts)} posts passed filter")
return filtered_posts
# Cache miss or disabled - process posts through pipeline
results = self.process_batch(posts, filterset_name)
# Save to filterset cache
if self.config.is_cache_enabled():
filterset_version = self.config.get_filterset_version(filterset_name)
results_dict = {r.post_uuid: r for r in results}
self.cache.set_filterset_results(filterset_name, filterset_version, results_dict)
# Build filtered post list
filtered_posts = []
results_by_uuid = {r.post_uuid: r for r in results}
for post in posts:
post_uuid = post.get('uuid')
result = results_by_uuid.get(post_uuid)
if result and result.passed:
# Add filter metadata to post
post['_filter_score'] = result.score
post['_filter_categories'] = result.categories
post['_filter_tags'] = result.tags
filtered_posts.append(post)
logger.info(f"Processed: {len(filtered_posts)}/{len(posts)} posts passed filter")
return filtered_posts
def process_batch(
self,
posts: List[Dict[str, Any]],
filterset_name: str = 'no_filter'
) -> List[FilterResult]:
"""
Process batch of posts through pipeline.
Args:
posts: List of post dictionaries
filterset_name: Name of filterset to apply
Returns:
List of FilterResults for each post
"""
if not posts:
return []
# Special case: no_filter passes everything with default scores
if filterset_name == 'no_filter':
return self._process_no_filter(posts)
# Initialize stages if needed
if self.config.is_ai_enabled():
self._init_stages()
# If AI is disabled but filterset requires it, fall back to no_filter
if not self.config.is_ai_enabled() and filterset_name != 'no_filter':
logger.warning(f"AI disabled but '{filterset_name}' requires AI - falling back to 'no_filter'")
return self._process_no_filter(posts)
# Get pipeline stages for this filterset
stage_names = self._get_stages_for_filterset(filterset_name)
# Process posts (parallel or sequential based on config)
if self.config.is_parallel_enabled():
results = self._process_batch_parallel(posts, filterset_name, stage_names)
else:
results = self._process_batch_sequential(posts, filterset_name, stage_names)
return results
def _process_no_filter(self, posts: List[Dict[str, Any]]) -> List[FilterResult]:
"""Process posts with no_filter (all pass with default scores)"""
results = []
for post in posts:
result = FilterResult(
post_uuid=post.get('uuid', ''),
passed=True,
score=0.5, # Neutral score
categories=[],
tags=[],
filterset_name='no_filter',
processed_at=datetime.now(),
status=ProcessingStatus.COMPLETED
)
results.append(result)
return results
def _get_stages_for_filterset(self, filterset_name: str) -> List[str]:
"""Get pipeline stages to run for a filterset"""
filterset = self.config.get_filterset(filterset_name)
# Check if filterset specifies custom stages
if filterset and 'pipeline_stages' in filterset:
return filterset['pipeline_stages']
# Use default stages
return self.config.get_default_stages()
def _process_batch_parallel(
self,
posts: List[Dict[str, Any]],
filterset_name: str,
stage_names: List[str]
) -> List[FilterResult]:
"""Process posts in parallel"""
results = [None] * len(posts)
workers = self.config.get_parallel_workers()
def process_single_post(idx_post):
idx, post = idx_post
try:
result = self._process_single_post(post, filterset_name, stage_names)
return idx, result
except Exception as e:
logger.error(f"Error processing post {idx}: {e}")
logger.error(traceback.format_exc())
# Return failed result
return idx, FilterResult(
post_uuid=post.get('uuid', ''),
passed=False,
score=0.0,
filterset_name=filterset_name,
processed_at=datetime.now(),
status=ProcessingStatus.FAILED,
error=str(e)
)
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(process_single_post, (i, post)): i
for i, post in enumerate(posts)}
for future in as_completed(futures):
idx, result = future.result()
results[idx] = result
return results
def _process_batch_sequential(
self,
posts: List[Dict[str, Any]],
filterset_name: str,
stage_names: List[str]
) -> List[FilterResult]:
"""Process posts sequentially"""
results = []
for post in posts:
try:
result = self._process_single_post(post, filterset_name, stage_names)
results.append(result)
except Exception as e:
logger.error(f"Error processing post: {e}")
results.append(FilterResult(
post_uuid=post.get('uuid', ''),
passed=False,
score=0.0,
filterset_name=filterset_name,
processed_at=datetime.now(),
status=ProcessingStatus.FAILED,
error=str(e)
))
return results
def _process_single_post(
self,
post: Dict[str, Any],
filterset_name: str,
stage_names: List[str]
) -> FilterResult:
"""
Process single post through pipeline stages.
Stages are run sequentially: Categorizer → Moderator → Filter → Ranker
"""
# Initialize result
result = FilterResult(
post_uuid=post.get('uuid', ''),
passed=True, # Start as passed, stages can reject
score=0.5, # Default score
filterset_name=filterset_name,
processed_at=datetime.now(),
status=ProcessingStatus.PROCESSING
)
# Run each stage
for stage_name in stage_names:
if stage_name not in self._stages:
logger.warning(f"Stage '{stage_name}' not found, skipping")
continue
stage = self._stages[stage_name]
if not stage.is_enabled():
logger.debug(f"Stage '{stage_name}' disabled, skipping")
continue
# Process through stage
try:
result = stage.process(post, result)
# If post was rejected by this stage, stop processing
if not result.passed:
logger.debug(f"Post {post.get('uuid', '')} rejected by {stage_name}")
break
except Exception as e:
logger.error(f"Error in stage '{stage_name}': {e}")
result.status = ProcessingStatus.FAILED
result.error = f"{stage_name}: {str(e)}"
result.passed = False
break
# Mark as completed if not failed
if result.status != ProcessingStatus.FAILED:
result.status = ProcessingStatus.COMPLETED
return result
# ===== Utility Methods =====
def get_available_filtersets(self) -> List[str]:
"""Get list of available filterset names (for user settings UI)"""
return self.config.get_filterset_names()
def get_filterset_description(self, name: str) -> Optional[str]:
"""Get description of a filterset (for user settings UI)"""
filterset = self.config.get_filterset(name)
return filterset.get('description') if filterset else None
def invalidate_filterset_cache(self, filterset_name: str):
"""Invalidate cache for a filterset (when definition changes)"""
self.cache.invalidate_filterset(filterset_name)
logger.info(f"Invalidated cache for filterset '{filterset_name}'")
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
return self.cache.get_cache_stats()
def reload_config(self):
"""Reload configuration from disk"""
self.config.reload()
self._stages = None # Force re-initialization of stages
logger.info("Configuration reloaded")