Files
chelsea 94e12041ec Add filter pipeline core infrastructure (Phase 1)
Implements plugin-based content filtering system with multi-level caching:

Core Components:
- FilterEngine: Main orchestrator for content filtering
- FilterCache: 3-level caching (memory, AI results, filterset results)
- FilterConfig: Configuration loader for filter_config.json & filtersets.json
- FilterResult & AIAnalysisResult: Data models for filter results

Architecture:
- BaseStage: Abstract class for pipeline stages
- BaseFilterPlugin: Abstract class for filter plugins
- Multi-threaded parallel processing support
- Content-hash based AI result caching (cost savings)
- Filterset result caching (fast filterset switching)

Configuration:
- filter_config.json: AI models, caching, parallel workers
- Using only Llama 70B for cost efficiency
- Compatible with existing filtersets.json

Integration:
- apply_filterset() API compatible with user preferences
- process_batch() for batch post processing
- Lazy-loaded stages to avoid import errors when AI disabled

Related to issue #8 (filtering engine implementation)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 22:46:10 -05:00

122 lines
4.3 KiB
Python

"""
Filter Pipeline Models
Data models for filter results and processing status.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime
from enum import Enum
class ProcessingStatus(Enum):
"""Status of content processing"""
PENDING = 'pending'
PROCESSING = 'processing'
COMPLETED = 'completed'
FAILED = 'failed'
CACHED = 'cached'
@dataclass
class FilterResult:
"""
Result of filtering pipeline for a single post.
Attributes:
post_uuid: Unique identifier for the post
passed: Whether post passed the filter
score: Relevance/quality score (0.0-1.0)
categories: Detected categories/topics
tags: Additional tags applied
moderation_data: Safety and quality analysis results
filterset_name: Name of filterset applied
cache_key: Content hash for caching
processed_at: Timestamp of processing
status: Processing status
error: Error message if failed
"""
post_uuid: str
passed: bool
score: float
categories: List[str] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
moderation_data: Dict[str, Any] = field(default_factory=dict)
filterset_name: str = 'no_filter'
cache_key: Optional[str] = None
processed_at: Optional[datetime] = None
status: ProcessingStatus = ProcessingStatus.PENDING
error: Optional[str] = None
# Detailed scoring breakdown
score_breakdown: Dict[str, float] = field(default_factory=dict)
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization"""
return {
'post_uuid': self.post_uuid,
'passed': self.passed,
'score': self.score,
'categories': self.categories,
'tags': self.tags,
'moderation_data': self.moderation_data,
'filterset_name': self.filterset_name,
'cache_key': self.cache_key,
'processed_at': self.processed_at.isoformat() if self.processed_at else None,
'status': self.status.value if isinstance(self.status, ProcessingStatus) else self.status,
'error': self.error,
'score_breakdown': self.score_breakdown
}
@classmethod
def from_dict(cls, data: Dict) -> 'FilterResult':
"""Create from dictionary"""
# Handle datetime deserialization
if data.get('processed_at') and isinstance(data['processed_at'], str):
data['processed_at'] = datetime.fromisoformat(data['processed_at'])
# Handle enum deserialization
if data.get('status') and isinstance(data['status'], str):
data['status'] = ProcessingStatus(data['status'])
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
@dataclass
class AIAnalysisResult:
"""
Result of AI analysis (categorization, moderation, etc).
Cached separately from FilterResult for reuse across filtersets.
"""
content_hash: str
categories: List[str] = field(default_factory=list)
category_scores: Dict[str, float] = field(default_factory=dict)
moderation: Dict[str, Any] = field(default_factory=dict)
quality_score: float = 0.5
sentiment: Optional[str] = None
sentiment_score: float = 0.0
analyzed_at: Optional[datetime] = None
model_used: Optional[str] = None
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization"""
return {
'content_hash': self.content_hash,
'categories': self.categories,
'category_scores': self.category_scores,
'moderation': self.moderation,
'quality_score': self.quality_score,
'sentiment': self.sentiment,
'sentiment_score': self.sentiment_score,
'analyzed_at': self.analyzed_at.isoformat() if self.analyzed_at else None,
'model_used': self.model_used
}
@classmethod
def from_dict(cls, data: Dict) -> 'AIAnalysisResult':
"""Create from dictionary"""
if data.get('analyzed_at') and isinstance(data['analyzed_at'], str):
data['analyzed_at'] = datetime.fromisoformat(data['analyzed_at'])
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})