Implements plugin-based content filtering system with multi-level caching: Core Components: - FilterEngine: Main orchestrator for content filtering - FilterCache: 3-level caching (memory, AI results, filterset results) - FilterConfig: Configuration loader for filter_config.json & filtersets.json - FilterResult & AIAnalysisResult: Data models for filter results Architecture: - BaseStage: Abstract class for pipeline stages - BaseFilterPlugin: Abstract class for filter plugins - Multi-threaded parallel processing support - Content-hash based AI result caching (cost savings) - Filterset result caching (fast filterset switching) Configuration: - filter_config.json: AI models, caching, parallel workers - Using only Llama 70B for cost efficiency - Compatible with existing filtersets.json Integration: - apply_filterset() API compatible with user preferences - process_batch() for batch post processing - Lazy-loaded stages to avoid import errors when AI disabled Related to issue #8 (filtering engine implementation) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
377 lines
13 KiB
Python
377 lines
13 KiB
Python
"""
|
|
Filter Engine
|
|
Main orchestrator for content filtering pipeline.
|
|
"""
|
|
|
|
import logging
|
|
import traceback
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
from .config import FilterConfig
|
|
from .cache import FilterCache
|
|
from .models import FilterResult, ProcessingStatus, AIAnalysisResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FilterEngine:
|
|
"""
|
|
Main filter pipeline orchestrator.
|
|
|
|
Coordinates multi-stage content filtering with intelligent caching.
|
|
Compatible with user preferences and filterset selections.
|
|
"""
|
|
|
|
_instance = None
|
|
|
|
def __init__(
|
|
self,
|
|
config_file: str = 'filter_config.json',
|
|
filtersets_file: str = 'filtersets.json'
|
|
):
|
|
"""
|
|
Initialize filter engine.
|
|
|
|
Args:
|
|
config_file: Path to filter_config.json
|
|
filtersets_file: Path to filtersets.json
|
|
"""
|
|
self.config = FilterConfig(config_file, filtersets_file)
|
|
self.cache = FilterCache(self.config.get_cache_dir())
|
|
|
|
# Lazy-loaded stages (will be imported when AI is enabled)
|
|
self._stages = None
|
|
|
|
logger.info("FilterEngine initialized")
|
|
logger.info(f"Configuration: {self.config.get_config_summary()}")
|
|
|
|
@classmethod
|
|
def get_instance(cls) -> 'FilterEngine':
|
|
"""Get singleton instance of FilterEngine"""
|
|
if cls._instance is None:
|
|
cls._instance = cls()
|
|
return cls._instance
|
|
|
|
def _init_stages(self):
|
|
"""Initialize pipeline stages (lazy loading)"""
|
|
if self._stages is not None:
|
|
return
|
|
|
|
from .stages.categorizer import CategorizerStage
|
|
from .stages.moderator import ModeratorStage
|
|
from .stages.filter import FilterStage
|
|
from .stages.ranker import RankerStage
|
|
|
|
# Initialize stages based on configuration
|
|
self._stages = {
|
|
'categorizer': CategorizerStage(self.config, self.cache),
|
|
'moderator': ModeratorStage(self.config, self.cache),
|
|
'filter': FilterStage(self.config, self.cache),
|
|
'ranker': RankerStage(self.config, self.cache)
|
|
}
|
|
|
|
logger.info(f"Initialized {len(self._stages)} pipeline stages")
|
|
|
|
def apply_filterset(
|
|
self,
|
|
posts: List[Dict[str, Any]],
|
|
filterset_name: str = 'no_filter',
|
|
use_cache: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Apply filterset to posts (compatible with user preferences).
|
|
|
|
This is the main public API used by app.py when loading user feeds.
|
|
|
|
Args:
|
|
posts: List of post dictionaries
|
|
filterset_name: Name of filterset from user settings (e.g., 'safe_content')
|
|
use_cache: Whether to use cached results
|
|
|
|
Returns:
|
|
List of posts that passed the filter, with score and metadata added
|
|
"""
|
|
if not posts:
|
|
return []
|
|
|
|
# Validate filterset exists
|
|
filterset = self.config.get_filterset(filterset_name)
|
|
if not filterset:
|
|
logger.warning(f"Filterset '{filterset_name}' not found, using 'no_filter'")
|
|
filterset_name = 'no_filter'
|
|
|
|
logger.info(f"Applying filterset '{filterset_name}' to {len(posts)} posts")
|
|
|
|
# Check if we have cached filterset results
|
|
if use_cache and self.config.is_cache_enabled():
|
|
filterset_version = self.config.get_filterset_version(filterset_name)
|
|
cached_results = self.cache.get_filterset_results(
|
|
filterset_name,
|
|
filterset_version,
|
|
self.config.get_cache_ttl_hours()
|
|
)
|
|
|
|
if cached_results:
|
|
# Filter posts using cached results
|
|
filtered_posts = []
|
|
for post in posts:
|
|
post_uuid = post.get('uuid')
|
|
if post_uuid in cached_results:
|
|
result = cached_results[post_uuid]
|
|
if result.passed:
|
|
# Add filter metadata to post
|
|
post['_filter_score'] = result.score
|
|
post['_filter_categories'] = result.categories
|
|
post['_filter_tags'] = result.tags
|
|
filtered_posts.append(post)
|
|
|
|
logger.info(f"Cache hit: {len(filtered_posts)}/{len(posts)} posts passed filter")
|
|
return filtered_posts
|
|
|
|
# Cache miss or disabled - process posts through pipeline
|
|
results = self.process_batch(posts, filterset_name)
|
|
|
|
# Save to filterset cache
|
|
if self.config.is_cache_enabled():
|
|
filterset_version = self.config.get_filterset_version(filterset_name)
|
|
results_dict = {r.post_uuid: r for r in results}
|
|
self.cache.set_filterset_results(filterset_name, filterset_version, results_dict)
|
|
|
|
# Build filtered post list
|
|
filtered_posts = []
|
|
results_by_uuid = {r.post_uuid: r for r in results}
|
|
|
|
for post in posts:
|
|
post_uuid = post.get('uuid')
|
|
result = results_by_uuid.get(post_uuid)
|
|
|
|
if result and result.passed:
|
|
# Add filter metadata to post
|
|
post['_filter_score'] = result.score
|
|
post['_filter_categories'] = result.categories
|
|
post['_filter_tags'] = result.tags
|
|
filtered_posts.append(post)
|
|
|
|
logger.info(f"Processed: {len(filtered_posts)}/{len(posts)} posts passed filter")
|
|
return filtered_posts
|
|
|
|
def process_batch(
|
|
self,
|
|
posts: List[Dict[str, Any]],
|
|
filterset_name: str = 'no_filter'
|
|
) -> List[FilterResult]:
|
|
"""
|
|
Process batch of posts through pipeline.
|
|
|
|
Args:
|
|
posts: List of post dictionaries
|
|
filterset_name: Name of filterset to apply
|
|
|
|
Returns:
|
|
List of FilterResults for each post
|
|
"""
|
|
if not posts:
|
|
return []
|
|
|
|
# Special case: no_filter passes everything with default scores
|
|
if filterset_name == 'no_filter':
|
|
return self._process_no_filter(posts)
|
|
|
|
# Initialize stages if needed
|
|
if self.config.is_ai_enabled():
|
|
self._init_stages()
|
|
|
|
# If AI is disabled but filterset requires it, fall back to no_filter
|
|
if not self.config.is_ai_enabled() and filterset_name != 'no_filter':
|
|
logger.warning(f"AI disabled but '{filterset_name}' requires AI - falling back to 'no_filter'")
|
|
return self._process_no_filter(posts)
|
|
|
|
# Get pipeline stages for this filterset
|
|
stage_names = self._get_stages_for_filterset(filterset_name)
|
|
|
|
# Process posts (parallel or sequential based on config)
|
|
if self.config.is_parallel_enabled():
|
|
results = self._process_batch_parallel(posts, filterset_name, stage_names)
|
|
else:
|
|
results = self._process_batch_sequential(posts, filterset_name, stage_names)
|
|
|
|
return results
|
|
|
|
def _process_no_filter(self, posts: List[Dict[str, Any]]) -> List[FilterResult]:
|
|
"""Process posts with no_filter (all pass with default scores)"""
|
|
results = []
|
|
for post in posts:
|
|
result = FilterResult(
|
|
post_uuid=post.get('uuid', ''),
|
|
passed=True,
|
|
score=0.5, # Neutral score
|
|
categories=[],
|
|
tags=[],
|
|
filterset_name='no_filter',
|
|
processed_at=datetime.now(),
|
|
status=ProcessingStatus.COMPLETED
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def _get_stages_for_filterset(self, filterset_name: str) -> List[str]:
|
|
"""Get pipeline stages to run for a filterset"""
|
|
filterset = self.config.get_filterset(filterset_name)
|
|
|
|
# Check if filterset specifies custom stages
|
|
if filterset and 'pipeline_stages' in filterset:
|
|
return filterset['pipeline_stages']
|
|
|
|
# Use default stages
|
|
return self.config.get_default_stages()
|
|
|
|
def _process_batch_parallel(
|
|
self,
|
|
posts: List[Dict[str, Any]],
|
|
filterset_name: str,
|
|
stage_names: List[str]
|
|
) -> List[FilterResult]:
|
|
"""Process posts in parallel"""
|
|
results = [None] * len(posts)
|
|
workers = self.config.get_parallel_workers()
|
|
|
|
def process_single_post(idx_post):
|
|
idx, post = idx_post
|
|
try:
|
|
result = self._process_single_post(post, filterset_name, stage_names)
|
|
return idx, result
|
|
except Exception as e:
|
|
logger.error(f"Error processing post {idx}: {e}")
|
|
logger.error(traceback.format_exc())
|
|
# Return failed result
|
|
return idx, FilterResult(
|
|
post_uuid=post.get('uuid', ''),
|
|
passed=False,
|
|
score=0.0,
|
|
filterset_name=filterset_name,
|
|
processed_at=datetime.now(),
|
|
status=ProcessingStatus.FAILED,
|
|
error=str(e)
|
|
)
|
|
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(process_single_post, (i, post)): i
|
|
for i, post in enumerate(posts)}
|
|
|
|
for future in as_completed(futures):
|
|
idx, result = future.result()
|
|
results[idx] = result
|
|
|
|
return results
|
|
|
|
def _process_batch_sequential(
|
|
self,
|
|
posts: List[Dict[str, Any]],
|
|
filterset_name: str,
|
|
stage_names: List[str]
|
|
) -> List[FilterResult]:
|
|
"""Process posts sequentially"""
|
|
results = []
|
|
for post in posts:
|
|
try:
|
|
result = self._process_single_post(post, filterset_name, stage_names)
|
|
results.append(result)
|
|
except Exception as e:
|
|
logger.error(f"Error processing post: {e}")
|
|
results.append(FilterResult(
|
|
post_uuid=post.get('uuid', ''),
|
|
passed=False,
|
|
score=0.0,
|
|
filterset_name=filterset_name,
|
|
processed_at=datetime.now(),
|
|
status=ProcessingStatus.FAILED,
|
|
error=str(e)
|
|
))
|
|
|
|
return results
|
|
|
|
def _process_single_post(
|
|
self,
|
|
post: Dict[str, Any],
|
|
filterset_name: str,
|
|
stage_names: List[str]
|
|
) -> FilterResult:
|
|
"""
|
|
Process single post through pipeline stages.
|
|
|
|
Stages are run sequentially: Categorizer → Moderator → Filter → Ranker
|
|
"""
|
|
# Initialize result
|
|
result = FilterResult(
|
|
post_uuid=post.get('uuid', ''),
|
|
passed=True, # Start as passed, stages can reject
|
|
score=0.5, # Default score
|
|
filterset_name=filterset_name,
|
|
processed_at=datetime.now(),
|
|
status=ProcessingStatus.PROCESSING
|
|
)
|
|
|
|
# Run each stage
|
|
for stage_name in stage_names:
|
|
if stage_name not in self._stages:
|
|
logger.warning(f"Stage '{stage_name}' not found, skipping")
|
|
continue
|
|
|
|
stage = self._stages[stage_name]
|
|
|
|
if not stage.is_enabled():
|
|
logger.debug(f"Stage '{stage_name}' disabled, skipping")
|
|
continue
|
|
|
|
# Process through stage
|
|
try:
|
|
result = stage.process(post, result)
|
|
|
|
# If post was rejected by this stage, stop processing
|
|
if not result.passed:
|
|
logger.debug(f"Post {post.get('uuid', '')} rejected by {stage_name}")
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in stage '{stage_name}': {e}")
|
|
result.status = ProcessingStatus.FAILED
|
|
result.error = f"{stage_name}: {str(e)}"
|
|
result.passed = False
|
|
break
|
|
|
|
# Mark as completed if not failed
|
|
if result.status != ProcessingStatus.FAILED:
|
|
result.status = ProcessingStatus.COMPLETED
|
|
|
|
return result
|
|
|
|
# ===== Utility Methods =====
|
|
|
|
def get_available_filtersets(self) -> List[str]:
|
|
"""Get list of available filterset names (for user settings UI)"""
|
|
return self.config.get_filterset_names()
|
|
|
|
def get_filterset_description(self, name: str) -> Optional[str]:
|
|
"""Get description of a filterset (for user settings UI)"""
|
|
filterset = self.config.get_filterset(name)
|
|
return filterset.get('description') if filterset else None
|
|
|
|
def invalidate_filterset_cache(self, filterset_name: str):
|
|
"""Invalidate cache for a filterset (when definition changes)"""
|
|
self.cache.invalidate_filterset(filterset_name)
|
|
logger.info(f"Invalidated cache for filterset '{filterset_name}'")
|
|
|
|
def get_cache_stats(self) -> Dict[str, Any]:
|
|
"""Get cache statistics"""
|
|
return self.cache.get_cache_stats()
|
|
|
|
def reload_config(self):
|
|
"""Reload configuration from disk"""
|
|
self.config.reload()
|
|
self._stages = None # Force re-initialization of stages
|
|
logger.info("Configuration reloaded")
|