Implements plugin-based content filtering system with multi-level caching: Core Components: - FilterEngine: Main orchestrator for content filtering - FilterCache: 3-level caching (memory, AI results, filterset results) - FilterConfig: Configuration loader for filter_config.json & filtersets.json - FilterResult & AIAnalysisResult: Data models for filter results Architecture: - BaseStage: Abstract class for pipeline stages - BaseFilterPlugin: Abstract class for filter plugins - Multi-threaded parallel processing support - Content-hash based AI result caching (cost savings) - Filterset result caching (fast filterset switching) Configuration: - filter_config.json: AI models, caching, parallel workers - Using only Llama 70B for cost efficiency - Compatible with existing filtersets.json Integration: - apply_filterset() API compatible with user preferences - process_batch() for batch post processing - Lazy-loaded stages to avoid import errors when AI disabled Related to issue #8 (filtering engine implementation) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
122 lines
4.3 KiB
Python
122 lines
4.3 KiB
Python
"""
|
|
Filter Pipeline Models
|
|
Data models for filter results and processing status.
|
|
"""
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Dict, List, Optional, Any
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
|
|
|
|
class ProcessingStatus(Enum):
|
|
"""Status of content processing"""
|
|
PENDING = 'pending'
|
|
PROCESSING = 'processing'
|
|
COMPLETED = 'completed'
|
|
FAILED = 'failed'
|
|
CACHED = 'cached'
|
|
|
|
|
|
@dataclass
|
|
class FilterResult:
|
|
"""
|
|
Result of filtering pipeline for a single post.
|
|
|
|
Attributes:
|
|
post_uuid: Unique identifier for the post
|
|
passed: Whether post passed the filter
|
|
score: Relevance/quality score (0.0-1.0)
|
|
categories: Detected categories/topics
|
|
tags: Additional tags applied
|
|
moderation_data: Safety and quality analysis results
|
|
filterset_name: Name of filterset applied
|
|
cache_key: Content hash for caching
|
|
processed_at: Timestamp of processing
|
|
status: Processing status
|
|
error: Error message if failed
|
|
"""
|
|
post_uuid: str
|
|
passed: bool
|
|
score: float
|
|
categories: List[str] = field(default_factory=list)
|
|
tags: List[str] = field(default_factory=list)
|
|
moderation_data: Dict[str, Any] = field(default_factory=dict)
|
|
filterset_name: str = 'no_filter'
|
|
cache_key: Optional[str] = None
|
|
processed_at: Optional[datetime] = None
|
|
status: ProcessingStatus = ProcessingStatus.PENDING
|
|
error: Optional[str] = None
|
|
|
|
# Detailed scoring breakdown
|
|
score_breakdown: Dict[str, float] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> Dict:
|
|
"""Convert to dictionary for JSON serialization"""
|
|
return {
|
|
'post_uuid': self.post_uuid,
|
|
'passed': self.passed,
|
|
'score': self.score,
|
|
'categories': self.categories,
|
|
'tags': self.tags,
|
|
'moderation_data': self.moderation_data,
|
|
'filterset_name': self.filterset_name,
|
|
'cache_key': self.cache_key,
|
|
'processed_at': self.processed_at.isoformat() if self.processed_at else None,
|
|
'status': self.status.value if isinstance(self.status, ProcessingStatus) else self.status,
|
|
'error': self.error,
|
|
'score_breakdown': self.score_breakdown
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict) -> 'FilterResult':
|
|
"""Create from dictionary"""
|
|
# Handle datetime deserialization
|
|
if data.get('processed_at') and isinstance(data['processed_at'], str):
|
|
data['processed_at'] = datetime.fromisoformat(data['processed_at'])
|
|
|
|
# Handle enum deserialization
|
|
if data.get('status') and isinstance(data['status'], str):
|
|
data['status'] = ProcessingStatus(data['status'])
|
|
|
|
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
|
|
|
|
|
|
@dataclass
|
|
class AIAnalysisResult:
|
|
"""
|
|
Result of AI analysis (categorization, moderation, etc).
|
|
Cached separately from FilterResult for reuse across filtersets.
|
|
"""
|
|
content_hash: str
|
|
categories: List[str] = field(default_factory=list)
|
|
category_scores: Dict[str, float] = field(default_factory=dict)
|
|
moderation: Dict[str, Any] = field(default_factory=dict)
|
|
quality_score: float = 0.5
|
|
sentiment: Optional[str] = None
|
|
sentiment_score: float = 0.0
|
|
analyzed_at: Optional[datetime] = None
|
|
model_used: Optional[str] = None
|
|
|
|
def to_dict(self) -> Dict:
|
|
"""Convert to dictionary for JSON serialization"""
|
|
return {
|
|
'content_hash': self.content_hash,
|
|
'categories': self.categories,
|
|
'category_scores': self.category_scores,
|
|
'moderation': self.moderation,
|
|
'quality_score': self.quality_score,
|
|
'sentiment': self.sentiment,
|
|
'sentiment_score': self.sentiment_score,
|
|
'analyzed_at': self.analyzed_at.isoformat() if self.analyzed_at else None,
|
|
'model_used': self.model_used
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict) -> 'AIAnalysisResult':
|
|
"""Create from dictionary"""
|
|
if data.get('analyzed_at') and isinstance(data['analyzed_at'], str):
|
|
data['analyzed_at'] = datetime.fromisoformat(data['analyzed_at'])
|
|
|
|
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
|