Add filter pipeline stages and plugins (Phase 2 & 3)

Implements complete content filtering pipeline with AI-powered analysis:

Phase 2 - Pipeline Stages:
- CategorizerStage: AI topic detection with content-hash caching
- ModeratorStage: Safety/quality analysis (violence, hate speech, quality scores)
- FilterStage: Fast rule-based filtering from filtersets.json
- RankerStage: Multi-factor scoring (quality, recency, source tier, engagement)

Phase 3 - Filter Plugins:
- KeywordFilterPlugin: Blocklist/allowlist keyword filtering
- QualityFilterPlugin: Quality metrics (length, caps, clickbait detection)

AI Client:
- OpenRouterClient: Llama 70B integration with retry logic
- Methods: categorize(), moderate(), score_quality(), analyze_sentiment()
- Content-hash based caching for cost efficiency

Pipeline Flow:
Raw Post → Categorizer → Moderator → Filter → Ranker → Scored Post

Key Features:
- All AI results cached permanently by content hash
- Parallel processing support (10 workers)
- Fallback modes when AI disabled
- Comprehensive scoring breakdown
- Plugin architecture for extensibility

Related to filtering engine implementation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-11 22:54:38 -05:00
parent 94e12041ec
commit a3ea1e9bdb
9 changed files with 1249 additions and 2 deletions

View File

@@ -4,5 +4,7 @@ Pluggable filters for content filtering.
"""
from .base import BaseFilterPlugin
from .keyword import KeywordFilterPlugin
from .quality import QualityFilterPlugin
__all__ = ['BaseFilterPlugin']
__all__ = ['BaseFilterPlugin', 'KeywordFilterPlugin', 'QualityFilterPlugin']

View File

@@ -0,0 +1,95 @@
"""
Keyword Filter Plugin
Simple keyword-based filtering.
"""
import logging
from typing import Dict, Any, Optional, List
from .base import BaseFilterPlugin
logger = logging.getLogger(__name__)
class KeywordFilterPlugin(BaseFilterPlugin):
"""
Filter posts based on keyword matching.
Supports:
- Blocklist: Reject posts containing blocked keywords
- Allowlist: Only allow posts containing allowed keywords
- Case-insensitive matching
"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.blocklist = [k.lower() for k in config.get('blocklist', [])]
self.allowlist = [k.lower() for k in config.get('allowlist', [])]
self.check_title = config.get('check_title', True)
self.check_content = config.get('check_content', True)
def get_name(self) -> str:
return "KeywordFilter"
def should_filter(self, post: Dict[str, Any], context: Optional[Dict] = None) -> bool:
"""
Check if post should be filtered out based on keywords.
Returns:
True if post contains blocked keywords or missing allowed keywords
"""
text = self._get_text(post)
# Check blocklist
if self.blocklist:
for keyword in self.blocklist:
if keyword in text:
logger.debug(f"KeywordFilter: Blocked keyword '{keyword}' found")
return True
# Check allowlist (if specified, at least one keyword must be present)
if self.allowlist:
found = any(keyword in text for keyword in self.allowlist)
if not found:
logger.debug("KeywordFilter: No allowed keywords found")
return True
return False
def score(self, post: Dict[str, Any], context: Optional[Dict] = None) -> float:
"""
Score based on keyword presence.
Returns:
1.0 if allowlist keywords present, 0.5 neutral, 0.0 if blocklist keywords present
"""
text = self._get_text(post)
# Check blocklist
if self.blocklist:
for keyword in self.blocklist:
if keyword in text:
return 0.0
# Check allowlist
if self.allowlist:
matches = sum(1 for keyword in self.allowlist if keyword in text)
if matches > 0:
return min(1.0, 0.5 + (matches * 0.1))
return 0.5 # Neutral
def _get_text(self, post: Dict[str, Any]) -> str:
"""Get searchable text from post"""
text_parts = []
if self.check_title:
title = post.get('title', '')
text_parts.append(title)
if self.check_content:
content = post.get('content', '')
text_parts.append(content)
return ' '.join(text_parts).lower()

View File

@@ -0,0 +1,128 @@
"""
Quality Filter Plugin
Filter based on quality metrics (readability, length, etc).
"""
import logging
import re
from typing import Dict, Any, Optional
from .base import BaseFilterPlugin
logger = logging.getLogger(__name__)
class QualityFilterPlugin(BaseFilterPlugin):
"""
Filter posts based on quality metrics.
Metrics:
- Title length (too short or too long)
- Content length
- Excessive caps (SHOUTING)
- Excessive punctuation (!!!)
- Clickbait patterns
"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.min_title_length = config.get('min_title_length', 10)
self.max_title_length = config.get('max_title_length', 300)
self.min_content_length = config.get('min_content_length', 0)
self.max_caps_ratio = config.get('max_caps_ratio', 0.5)
self.max_exclamation_marks = config.get('max_exclamation_marks', 3)
# Clickbait patterns
self.clickbait_patterns = [
r'you won\'t believe',
r'shocking',
r'doctors hate',
r'this one trick',
r'number \d+ will',
r'what happened next'
]
def get_name(self) -> str:
return "QualityFilter"
def should_filter(self, post: Dict[str, Any], context: Optional[Dict] = None) -> bool:
"""
Check if post should be filtered based on quality.
Returns:
True if post fails quality checks
"""
title = post.get('title', '')
content = post.get('content', '')
# Check title length
if len(title) < self.min_title_length:
logger.debug(f"QualityFilter: Title too short ({len(title)} chars)")
return True
if len(title) > self.max_title_length:
logger.debug(f"QualityFilter: Title too long ({len(title)} chars)")
return True
# Check content length (if specified)
if self.min_content_length > 0 and len(content) < self.min_content_length:
logger.debug(f"QualityFilter: Content too short ({len(content)} chars)")
return True
# Check excessive caps
if len(title) > 0:
caps_ratio = sum(1 for c in title if c.isupper()) / len(title)
if caps_ratio > self.max_caps_ratio and len(title) > 10:
logger.debug(f"QualityFilter: Excessive caps ({caps_ratio:.1%})")
return True
# Check excessive exclamation marks
exclamations = title.count('!')
if exclamations > self.max_exclamation_marks:
logger.debug(f"QualityFilter: Excessive exclamations ({exclamations})")
return True
# Check clickbait patterns
title_lower = title.lower()
for pattern in self.clickbait_patterns:
if re.search(pattern, title_lower):
logger.debug(f"QualityFilter: Clickbait pattern detected: {pattern}")
return True
return False
def score(self, post: Dict[str, Any], context: Optional[Dict] = None) -> float:
"""
Score post quality.
Returns:
Quality score 0.0-1.0
"""
title = post.get('title', '')
content = post.get('content', '')
score = 1.0
# Penalize for short title
if len(title) < 20:
score -= 0.1
# Penalize for excessive caps
if len(title) > 0:
caps_ratio = sum(1 for c in title if c.isupper()) / len(title)
if caps_ratio > 0.3:
score -= (caps_ratio - 0.3) * 0.5
# Penalize for exclamation marks
exclamations = title.count('!')
if exclamations > 0:
score -= exclamations * 0.05
# Bonus for longer content
if len(content) > 500:
score += 0.1
elif len(content) > 200:
score += 0.05
return max(0.0, min(1.0, score))