Compare commits

...

2 Commits

Author SHA1 Message Date
8c1e055a05 Integrate FilterEngine with app.py (Phase 4)
Complete integration of filter pipeline with web application:

App.py Integration:
- Initialize FilterEngine singleton at startup
- Update /api/posts endpoint to use FilterEngine.apply_filterset()
- Apply user's filterset preference from settings
- Sort posts by filter_score (highest first), then timestamp
- Add filter metadata to post responses (filter_score, categories, tags)

Settings Page Updates:
- Dynamically load available filtersets from FilterEngine
- Show filterset descriptions in settings UI
- Validate filterset selection against FilterEngine

Security:
- Update _is_safe_filterset() to use FilterEngine's list
- Dynamic ALLOWED_FILTERSETS from filtersets.json

User Experience:
- Posts automatically filtered based on user preferences
- Quality/relevance scores affect post ordering
- Transparent filter metadata available in API

Caching:
- FilterEngine uses 3-level cache for efficiency
- Cache reused across page loads (5min TTL)
- AI results cached permanently

Next Steps:
- Polling service integration
- Database model for persistent results
- UI for cache stats and filter debugging

Related to filtering engine implementation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 22:57:18 -05:00
a3ea1e9bdb Add filter pipeline stages and plugins (Phase 2 & 3)
Implements complete content filtering pipeline with AI-powered analysis:

Phase 2 - Pipeline Stages:
- CategorizerStage: AI topic detection with content-hash caching
- ModeratorStage: Safety/quality analysis (violence, hate speech, quality scores)
- FilterStage: Fast rule-based filtering from filtersets.json
- RankerStage: Multi-factor scoring (quality, recency, source tier, engagement)

Phase 3 - Filter Plugins:
- KeywordFilterPlugin: Blocklist/allowlist keyword filtering
- QualityFilterPlugin: Quality metrics (length, caps, clickbait detection)

AI Client:
- OpenRouterClient: Llama 70B integration with retry logic
- Methods: categorize(), moderate(), score_quality(), analyze_sentiment()
- Content-hash based caching for cost efficiency

Pipeline Flow:
Raw Post → Categorizer → Moderator → Filter → Ranker → Scored Post

Key Features:
- All AI results cached permanently by content hash
- Parallel processing support (10 workers)
- Fallback modes when AI disabled
- Comprehensive scoring breakdown
- Plugin architecture for extensibility

Related to filtering engine implementation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 22:54:38 -05:00
10 changed files with 1305 additions and 31 deletions

85
app.py
View File

@@ -51,7 +51,7 @@ app.config['AUTH0_CLIENT_SECRET'] = os.getenv('AUTH0_CLIENT_SECRET', '')
app.config['AUTH0_AUDIENCE'] = os.getenv('AUTH0_AUDIENCE', '')
# Configuration constants
ALLOWED_FILTERSETS = {'no_filter', 'safe_content'}
# Note: ALLOWED_FILTERSETS will be dynamically loaded from filter_engine
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
UPLOAD_FOLDER = 'static/avatars'
MAX_FILENAME_LENGTH = 100
@@ -82,6 +82,11 @@ from polling_service import polling_service
polling_service.init_app(app)
polling_service.start()
# Initialize filter engine
from filter_pipeline import FilterEngine
filter_engine = FilterEngine.get_instance()
logger.info(f"FilterEngine initialized with {len(filter_engine.get_available_filtersets())} filtersets")
# Initialize OAuth for Auth0
oauth = OAuth(app)
auth0 = oauth.register(
@@ -105,7 +110,9 @@ def _is_safe_filterset(filterset):
"""Validate filterset name for security"""
if not filterset or not isinstance(filterset, str):
return False
return filterset in ALLOWED_FILTERSETS and re.match(r'^[a-zA-Z0-9_-]+$', filterset)
# Check against available filtersets from filter_engine
allowed_filtersets = set(filter_engine.get_available_filtersets())
return filterset in allowed_filtersets and re.match(r'^[a-zA-Z0-9_-]+$', filterset)
def _is_safe_path(path):
"""Validate file path for security"""
@@ -348,32 +355,39 @@ def api_posts():
try:
# Load platform configuration
platform_config = load_platform_config()
# Get query parameters
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE))
community = request.args.get('community', '')
platform = request.args.get('platform', '')
search_query = request.args.get('q', '').lower().strip()
# Get user's filterset preference (or default to no_filter)
filterset_name = 'no_filter'
if current_user.is_authenticated:
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
filterset_name = user_settings.get('filter_set', 'no_filter')
except:
filterset_name = 'no_filter'
# Use cached data for better performance
cached_posts, cached_comments = _load_posts_cache()
posts = []
# Process cached posts
# Collect raw posts for filtering
raw_posts = []
for post_uuid, post_data in cached_posts.items():
# Apply community filter
# Apply community filter (before filterset)
if community and post_data.get('source', '').lower() != community.lower():
continue
# Apply platform filter
# Apply platform filter (before filterset)
if platform and post_data.get('platform', '').lower() != platform.lower():
continue
# Apply search filter
# Apply search filter (before filterset)
if search_query:
# Search in title, content, author, and source
title = post_data.get('title', '').lower()
content = post_data.get('content', '').lower()
author = post_data.get('author', '').lower()
@@ -385,17 +399,25 @@ def api_posts():
search_query in source):
continue
# Get comment count from cache
raw_posts.append(post_data)
# Apply filterset using FilterEngine
filtered_posts = filter_engine.apply_filterset(raw_posts, filterset_name, use_cache=True)
# Build response posts with metadata
posts = []
for post_data in filtered_posts:
post_uuid = post_data.get('uuid')
comment_count = len(cached_comments.get(post_uuid, []))
# Get proper display name for source
source_display = get_display_name_for_source(
post_data.get('platform', ''),
post_data.get('source', ''),
platform_config
)
# Create post object with actual title
# Create post object with filter metadata
post = {
'id': post_uuid,
'title': post_data.get('title', 'Untitled'),
@@ -409,12 +431,16 @@ def api_posts():
'source': post_data.get('source', ''),
'source_display': source_display,
'tags': post_data.get('tags', []),
'external_url': post_data.get('url', '')
'external_url': post_data.get('url', ''),
# Add filter metadata
'filter_score': post_data.get('_filter_score', 0.5),
'filter_categories': post_data.get('_filter_categories', []),
'filter_tags': post_data.get('_filter_tags', [])
}
posts.append(post)
# Sort by timestamp (newest first)
posts.sort(key=lambda x: x['timestamp'], reverse=True)
# Sort by filter score (highest first), then timestamp
posts.sort(key=lambda x: (x['filter_score'], x['timestamp']), reverse=True)
# Calculate pagination
total_posts = len(posts)
@@ -1090,17 +1116,18 @@ def settings_filters():
current_filter = user_settings.get('filter_set', 'no_filter')
# Load available filter sets
filter_sets = {}
try:
with open('filtersets.json', 'r') as f:
filter_sets = json.load(f)
except:
filter_sets = {}
# Load available filter sets from FilterEngine
filter_sets_list = []
for filterset_name in filter_engine.get_available_filtersets():
description = filter_engine.get_filterset_description(filterset_name)
filter_sets_list.append({
'name': filterset_name,
'description': description or f'{filterset_name} filter'
})
return render_template('settings_filters.html',
user=current_user,
filter_sets=filter_sets,
filter_sets=filter_sets_list,
current_filter=current_filter)

View File

@@ -0,0 +1,326 @@
"""
AI Client
OpenRouter API client for content analysis (Llama 70B only).
"""
import requests
import logging
import time
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class OpenRouterClient:
"""
OpenRouter API client for AI-powered content analysis.
Uses only Llama 70B for cost efficiency.
"""
def __init__(self, api_key: str, model: str = 'meta-llama/llama-3.3-70b-instruct'):
"""
Initialize OpenRouter client.
Args:
api_key: OpenRouter API key
model: Model to use (default: Llama 70B)
"""
self.api_key = api_key
self.model = model
self.base_url = 'https://openrouter.ai/api/v1/chat/completions'
self.timeout = 60
self.max_retries = 3
self.retry_delay = 2 # seconds
def call_model(
self,
prompt: str,
max_tokens: int = 500,
temperature: float = 0.7,
system_prompt: Optional[str] = None
) -> str:
"""
Call AI model with prompt.
Args:
prompt: User prompt
max_tokens: Maximum tokens in response
temperature: Sampling temperature (0.0-1.0)
system_prompt: Optional system prompt
Returns:
Model response text
Raises:
Exception if API call fails after retries
"""
messages = []
if system_prompt:
messages.append({'role': 'system', 'content': system_prompt})
messages.append({'role': 'user', 'content': prompt})
payload = {
'model': self.model,
'messages': messages,
'max_tokens': max_tokens,
'temperature': temperature
}
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
'HTTP-Referer': 'https://github.com/balanceboard',
'X-Title': 'BalanceBoard Filter Pipeline'
}
# Retry loop
last_error = None
for attempt in range(self.max_retries):
try:
response = requests.post(
self.base_url,
headers=headers,
json=payload,
timeout=self.timeout
)
response.raise_for_status()
data = response.json()
# Extract response text
result = data['choices'][0]['message']['content'].strip()
logger.debug(f"AI call successful (attempt {attempt + 1})")
return result
except requests.exceptions.RequestException as e:
last_error = e
logger.warning(f"AI call failed (attempt {attempt + 1}/{self.max_retries}): {e}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (attempt + 1)) # Exponential backoff
continue
# All retries failed
error_msg = f"AI call failed after {self.max_retries} attempts: {last_error}"
logger.error(error_msg)
raise Exception(error_msg)
def categorize(self, title: str, content: str, categories: list) -> Dict[str, Any]:
"""
Categorize content into predefined categories.
Args:
title: Post title
content: Post content/description
categories: List of valid category names
Returns:
Dict with 'category' and 'confidence' keys
"""
category_list = ', '.join(categories)
prompt = f"""Classify this content into ONE of these categories: {category_list}
Title: {title}
Content: {content[:500]}
Respond in this EXACT format:
CATEGORY: [category name]
CONFIDENCE: [0.0-1.0]"""
try:
response = self.call_model(prompt, max_tokens=20, temperature=0.3)
# Parse response
lines = response.strip().split('\n')
category = None
confidence = 0.5
for line in lines:
if line.startswith('CATEGORY:'):
category = line.split(':', 1)[1].strip().lower()
elif line.startswith('CONFIDENCE:'):
try:
confidence = float(line.split(':', 1)[1].strip())
except:
confidence = 0.5
# Validate category
if category not in [c.lower() for c in categories]:
category = categories[0].lower() # Default to first category
return {
'category': category,
'confidence': confidence
}
except Exception as e:
logger.error(f"Categorization failed: {e}")
return {
'category': categories[0].lower(),
'confidence': 0.0
}
def moderate(self, title: str, content: str) -> Dict[str, Any]:
"""
Perform content moderation (safety analysis).
Args:
title: Post title
content: Post content/description
Returns:
Dict with moderation flags and scores
"""
prompt = f"""Analyze this content for safety issues.
Title: {title}
Content: {content[:500]}
Respond in this EXACT format:
VIOLENCE: [0.0-1.0]
SEXUAL: [0.0-1.0]
HATE_SPEECH: [0.0-1.0]
HARASSMENT: [0.0-1.0]
IS_SAFE: [YES/NO]"""
try:
response = self.call_model(prompt, max_tokens=50, temperature=0.3)
# Parse response
moderation = {
'violence': 0.0,
'sexual_content': 0.0,
'hate_speech': 0.0,
'harassment': 0.0,
'is_safe': True
}
lines = response.strip().split('\n')
for line in lines:
if ':' not in line:
continue
key, value = line.split(':', 1)
key = key.strip().lower()
value = value.strip()
if key == 'violence':
moderation['violence'] = float(value)
elif key == 'sexual':
moderation['sexual_content'] = float(value)
elif key == 'hate_speech':
moderation['hate_speech'] = float(value)
elif key == 'harassment':
moderation['harassment'] = float(value)
elif key == 'is_safe':
moderation['is_safe'] = value.upper() == 'YES'
return moderation
except Exception as e:
logger.error(f"Moderation failed: {e}")
return {
'violence': 0.0,
'sexual_content': 0.0,
'hate_speech': 0.0,
'harassment': 0.0,
'is_safe': True
}
def score_quality(self, title: str, content: str) -> float:
"""
Score content quality (0.0-1.0).
Args:
title: Post title
content: Post content/description
Returns:
Quality score (0.0-1.0)
"""
prompt = f"""Rate this content's quality on a scale of 0.0 to 1.0.
Consider:
- Clarity and informativeness
- Proper grammar and formatting
- Lack of clickbait or sensationalism
- Factual tone
Title: {title}
Content: {content[:500]}
Respond with ONLY a number between 0.0 and 1.0 (e.g., 0.7)"""
try:
response = self.call_model(prompt, max_tokens=10, temperature=0.3)
# Extract number
score = float(response.strip())
score = max(0.0, min(1.0, score)) # Clamp to 0-1
return score
except Exception as e:
logger.error(f"Quality scoring failed: {e}")
return 0.5 # Default neutral score
def analyze_sentiment(self, title: str, content: str) -> Dict[str, Any]:
"""
Analyze sentiment of content.
Args:
title: Post title
content: Post content/description
Returns:
Dict with 'sentiment' (positive/neutral/negative) and 'score'
"""
prompt = f"""Analyze the sentiment of this content.
Title: {title}
Content: {content[:500]}
Respond in this EXACT format:
SENTIMENT: [positive/neutral/negative]
SCORE: [-1.0 to 1.0]"""
try:
response = self.call_model(prompt, max_tokens=20, temperature=0.3)
# Parse response
sentiment = 'neutral'
score = 0.0
lines = response.strip().split('\n')
for line in lines:
if ':' not in line:
continue
key, value = line.split(':', 1)
key = key.strip().lower()
value = value.strip()
if key == 'sentiment':
sentiment = value.lower()
elif key == 'score':
try:
score = float(value)
score = max(-1.0, min(1.0, score))
except:
score = 0.0
return {
'sentiment': sentiment,
'score': score
}
except Exception as e:
logger.error(f"Sentiment analysis failed: {e}")
return {
'sentiment': 'neutral',
'score': 0.0
}

View File

@@ -4,5 +4,7 @@ Pluggable filters for content filtering.
"""
from .base import BaseFilterPlugin
from .keyword import KeywordFilterPlugin
from .quality import QualityFilterPlugin
__all__ = ['BaseFilterPlugin']
__all__ = ['BaseFilterPlugin', 'KeywordFilterPlugin', 'QualityFilterPlugin']

View File

@@ -0,0 +1,95 @@
"""
Keyword Filter Plugin
Simple keyword-based filtering.
"""
import logging
from typing import Dict, Any, Optional, List
from .base import BaseFilterPlugin
logger = logging.getLogger(__name__)
class KeywordFilterPlugin(BaseFilterPlugin):
"""
Filter posts based on keyword matching.
Supports:
- Blocklist: Reject posts containing blocked keywords
- Allowlist: Only allow posts containing allowed keywords
- Case-insensitive matching
"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.blocklist = [k.lower() for k in config.get('blocklist', [])]
self.allowlist = [k.lower() for k in config.get('allowlist', [])]
self.check_title = config.get('check_title', True)
self.check_content = config.get('check_content', True)
def get_name(self) -> str:
return "KeywordFilter"
def should_filter(self, post: Dict[str, Any], context: Optional[Dict] = None) -> bool:
"""
Check if post should be filtered out based on keywords.
Returns:
True if post contains blocked keywords or missing allowed keywords
"""
text = self._get_text(post)
# Check blocklist
if self.blocklist:
for keyword in self.blocklist:
if keyword in text:
logger.debug(f"KeywordFilter: Blocked keyword '{keyword}' found")
return True
# Check allowlist (if specified, at least one keyword must be present)
if self.allowlist:
found = any(keyword in text for keyword in self.allowlist)
if not found:
logger.debug("KeywordFilter: No allowed keywords found")
return True
return False
def score(self, post: Dict[str, Any], context: Optional[Dict] = None) -> float:
"""
Score based on keyword presence.
Returns:
1.0 if allowlist keywords present, 0.5 neutral, 0.0 if blocklist keywords present
"""
text = self._get_text(post)
# Check blocklist
if self.blocklist:
for keyword in self.blocklist:
if keyword in text:
return 0.0
# Check allowlist
if self.allowlist:
matches = sum(1 for keyword in self.allowlist if keyword in text)
if matches > 0:
return min(1.0, 0.5 + (matches * 0.1))
return 0.5 # Neutral
def _get_text(self, post: Dict[str, Any]) -> str:
"""Get searchable text from post"""
text_parts = []
if self.check_title:
title = post.get('title', '')
text_parts.append(title)
if self.check_content:
content = post.get('content', '')
text_parts.append(content)
return ' '.join(text_parts).lower()

View File

@@ -0,0 +1,128 @@
"""
Quality Filter Plugin
Filter based on quality metrics (readability, length, etc).
"""
import logging
import re
from typing import Dict, Any, Optional
from .base import BaseFilterPlugin
logger = logging.getLogger(__name__)
class QualityFilterPlugin(BaseFilterPlugin):
"""
Filter posts based on quality metrics.
Metrics:
- Title length (too short or too long)
- Content length
- Excessive caps (SHOUTING)
- Excessive punctuation (!!!)
- Clickbait patterns
"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.min_title_length = config.get('min_title_length', 10)
self.max_title_length = config.get('max_title_length', 300)
self.min_content_length = config.get('min_content_length', 0)
self.max_caps_ratio = config.get('max_caps_ratio', 0.5)
self.max_exclamation_marks = config.get('max_exclamation_marks', 3)
# Clickbait patterns
self.clickbait_patterns = [
r'you won\'t believe',
r'shocking',
r'doctors hate',
r'this one trick',
r'number \d+ will',
r'what happened next'
]
def get_name(self) -> str:
return "QualityFilter"
def should_filter(self, post: Dict[str, Any], context: Optional[Dict] = None) -> bool:
"""
Check if post should be filtered based on quality.
Returns:
True if post fails quality checks
"""
title = post.get('title', '')
content = post.get('content', '')
# Check title length
if len(title) < self.min_title_length:
logger.debug(f"QualityFilter: Title too short ({len(title)} chars)")
return True
if len(title) > self.max_title_length:
logger.debug(f"QualityFilter: Title too long ({len(title)} chars)")
return True
# Check content length (if specified)
if self.min_content_length > 0 and len(content) < self.min_content_length:
logger.debug(f"QualityFilter: Content too short ({len(content)} chars)")
return True
# Check excessive caps
if len(title) > 0:
caps_ratio = sum(1 for c in title if c.isupper()) / len(title)
if caps_ratio > self.max_caps_ratio and len(title) > 10:
logger.debug(f"QualityFilter: Excessive caps ({caps_ratio:.1%})")
return True
# Check excessive exclamation marks
exclamations = title.count('!')
if exclamations > self.max_exclamation_marks:
logger.debug(f"QualityFilter: Excessive exclamations ({exclamations})")
return True
# Check clickbait patterns
title_lower = title.lower()
for pattern in self.clickbait_patterns:
if re.search(pattern, title_lower):
logger.debug(f"QualityFilter: Clickbait pattern detected: {pattern}")
return True
return False
def score(self, post: Dict[str, Any], context: Optional[Dict] = None) -> float:
"""
Score post quality.
Returns:
Quality score 0.0-1.0
"""
title = post.get('title', '')
content = post.get('content', '')
score = 1.0
# Penalize for short title
if len(title) < 20:
score -= 0.1
# Penalize for excessive caps
if len(title) > 0:
caps_ratio = sum(1 for c in title if c.isupper()) / len(title)
if caps_ratio > 0.3:
score -= (caps_ratio - 0.3) * 0.5
# Penalize for exclamation marks
exclamations = title.count('!')
if exclamations > 0:
score -= exclamations * 0.05
# Bonus for longer content
if len(content) > 500:
score += 0.1
elif len(content) > 200:
score += 0.05
return max(0.0, min(1.0, score))

View File

@@ -4,5 +4,9 @@ Sequential processing stages for content filtering.
"""
from .base_stage import BaseStage
from .categorizer import CategorizerStage
from .moderator import ModeratorStage
from .filter import FilterStage
from .ranker import RankerStage
__all__ = ['BaseStage']
__all__ = ['BaseStage', 'CategorizerStage', 'ModeratorStage', 'FilterStage', 'RankerStage']

View File

@@ -0,0 +1,167 @@
"""
Categorizer Stage
Detect topics and categories using AI (cached by content hash).
"""
import logging
from typing import Dict, Any
from datetime import datetime
from .base_stage import BaseStage
from ..models import FilterResult, AIAnalysisResult
from ..cache import FilterCache
from ..ai_client import OpenRouterClient
logger = logging.getLogger(__name__)
class CategorizerStage(BaseStage):
"""
Stage 1: Categorize content and extract tags.
Uses AI to detect topics/categories with content-hash based caching.
"""
def __init__(self, config, cache: FilterCache):
super().__init__(config, cache)
# Initialize AI client if enabled
self.ai_client = None
if config.is_ai_enabled():
api_key = config.get_openrouter_key()
if api_key:
model = config.get_ai_model('cheap') # Use cheap model
self.ai_client = OpenRouterClient(api_key, model)
logger.info("Categorizer: AI client initialized")
else:
logger.warning("Categorizer: AI enabled but no API key found")
# Default categories
self.default_categories = [
'technology', 'programming', 'science', 'news',
'politics', 'business', 'entertainment', 'sports', 'other'
]
def get_name(self) -> str:
return "Categorizer"
def process(self, post: Dict[str, Any], result: FilterResult) -> FilterResult:
"""
Categorize post and add tags.
Args:
post: Post data
result: Current FilterResult
Returns:
Updated FilterResult with categories and tags
"""
title = post.get('title', '')
content = post.get('content', '')
# Compute content hash for caching
content_hash = self.cache.compute_content_hash(title, content)
result.cache_key = content_hash
# Try to get cached AI analysis
cached_analysis = self.cache.get_ai_analysis(content_hash)
if cached_analysis:
# Use cached categorization
result.categories = cached_analysis.categories
result.tags.extend(self._extract_platform_tags(post))
logger.debug(f"Categorizer: Cache hit for {content_hash[:8]}...")
return result
# No cache, need to categorize
if self.ai_client:
categories, category_scores = self._categorize_with_ai(title, content)
else:
# Fallback: Use platform/source as category
categories, category_scores = self._categorize_fallback(post)
# Store in AI analysis result for caching
ai_analysis = AIAnalysisResult(
content_hash=content_hash,
categories=categories,
category_scores=category_scores,
analyzed_at=datetime.now(),
model_used=self.ai_client.model if self.ai_client else 'fallback'
)
# Cache AI analysis
self.cache.set_ai_analysis(content_hash, ai_analysis)
# Update result
result.categories = categories
result.tags.extend(self._extract_platform_tags(post))
logger.debug(f"Categorizer: Analyzed {content_hash[:8]}... -> {categories}")
return result
def _categorize_with_ai(self, title: str, content: str) -> tuple:
"""
Categorize using AI.
Returns:
(categories list, category_scores dict)
"""
try:
response = self.ai_client.categorize(title, content, self.default_categories)
category = response.get('category', 'other')
confidence = response.get('confidence', 0.5)
categories = [category]
category_scores = {category: confidence}
return categories, category_scores
except Exception as e:
logger.error(f"AI categorization failed: {e}")
return ['other'], {'other': 0.0}
def _categorize_fallback(self, post: Dict[str, Any]) -> tuple:
"""
Fallback categorization using platform/source.
Returns:
(categories list, category_scores dict)
"""
# Use source as category
source = post.get('source', '').lower()
# Map common sources to categories
category_map = {
'programming': 'programming',
'python': 'programming',
'javascript': 'programming',
'technology': 'technology',
'science': 'science',
'politics': 'politics',
'worldnews': 'news',
'news': 'news'
}
category = category_map.get(source, 'other')
return [category], {category: 0.5}
def _extract_platform_tags(self, post: Dict[str, Any]) -> list:
"""Extract tags from platform, source, etc."""
tags = []
platform = post.get('platform', '')
if platform:
tags.append(platform)
source = post.get('source', '')
if source:
tags.append(source)
# Extract existing tags
existing_tags = post.get('tags', [])
if existing_tags:
tags.extend(existing_tags)
return list(set(tags)) # Remove duplicates

View File

@@ -0,0 +1,171 @@
"""
Filter Stage
Apply filterset rules to posts (no AI needed - fast rule evaluation).
"""
import logging
from typing import Dict, Any, List
from .base_stage import BaseStage
from ..models import FilterResult
logger = logging.getLogger(__name__)
class FilterStage(BaseStage):
"""
Stage 3: Apply filterset rules.
Evaluates filter conditions from filtersets.json without AI.
Fast rule-based filtering.
"""
def get_name(self) -> str:
return "Filter"
def process(self, post: Dict[str, Any], result: FilterResult) -> FilterResult:
"""
Apply filterset rules to post.
Args:
post: Post data
result: Current FilterResult
Returns:
Updated FilterResult (may be rejected)
"""
# Get filterset configuration
filterset = self.config.get_filterset(result.filterset_name)
if not filterset:
logger.warning(f"Filterset '{result.filterset_name}' not found")
return result
# Apply post rules
post_rules = filterset.get('post_rules', {})
if not self._evaluate_rules(post, result, post_rules):
result.passed = False
logger.debug(f"Filter: Post {post.get('uuid', '')} rejected by filterset rules")
return result
# Post passed all rules
logger.debug(f"Filter: Post {post.get('uuid', '')} passed filterset '{result.filterset_name}'")
return result
def _evaluate_rules(
self,
post: Dict[str, Any],
result: FilterResult,
rules: Dict[str, Any]
) -> bool:
"""
Evaluate all rules for a post.
Returns:
True if post passes all rules, False otherwise
"""
for field, condition in rules.items():
if not self._evaluate_condition(post, result, field, condition):
logger.debug(f"Filter: Failed condition '{field}': {condition}")
return False
return True
def _evaluate_condition(
self,
post: Dict[str, Any],
result: FilterResult,
field: str,
condition: Any
) -> bool:
"""
Evaluate a single condition.
Supported conditions:
- {"equals": value}
- {"not_equals": value}
- {"in": [values]}
- {"not_in": [values]}
- {"min": value}
- {"max": value}
- {"includes_any": [values]}
- {"excludes": [values]}
Args:
post: Post data
result: FilterResult with moderation data
field: Field path (e.g., "score", "moderation.flags.is_safe")
condition: Condition dict
Returns:
True if condition passes
"""
# Get field value
value = self._get_field_value(post, result, field)
# Evaluate condition
if isinstance(condition, dict):
for op, expected in condition.items():
if op == 'equals':
if value != expected:
return False
elif op == 'not_equals':
if value == expected:
return False
elif op == 'in':
if value not in expected:
return False
elif op == 'not_in':
if value in expected:
return False
elif op == 'min':
if value < expected:
return False
elif op == 'max':
if value > expected:
return False
elif op == 'includes_any':
# Check if any expected value is in the field (for lists)
if not isinstance(value, list):
return False
if not any(item in value for item in expected):
return False
elif op == 'excludes':
# Check that none of the excluded values are present
if isinstance(value, list):
if any(item in expected for item in value):
return False
elif value in expected:
return False
else:
logger.warning(f"Unknown condition operator: {op}")
return True
def _get_field_value(self, post: Dict[str, Any], result: FilterResult, field: str):
"""
Get field value from post or result.
Supports nested fields like "moderation.flags.is_safe"
"""
parts = field.split('.')
# Check if field is in moderation data
if parts[0] == 'moderation' and result.moderation_data:
value = result.moderation_data
for part in parts[1:]:
if isinstance(value, dict):
value = value.get(part)
else:
return None
return value
# Check post data
value = post
for part in parts:
if isinstance(value, dict):
value = value.get(part)
else:
return None
return value

View File

@@ -0,0 +1,153 @@
"""
Moderator Stage
Safety and quality analysis using AI (cached by content hash).
"""
import logging
from typing import Dict, Any
from datetime import datetime
from .base_stage import BaseStage
from ..models import FilterResult, AIAnalysisResult
from ..cache import FilterCache
from ..ai_client import OpenRouterClient
logger = logging.getLogger(__name__)
class ModeratorStage(BaseStage):
"""
Stage 2: Content moderation and quality analysis.
Uses AI to analyze safety, quality, and sentiment with content-hash based caching.
"""
def __init__(self, config, cache: FilterCache):
super().__init__(config, cache)
# Initialize AI client if enabled
self.ai_client = None
if config.is_ai_enabled():
api_key = config.get_openrouter_key()
if api_key:
model = config.get_ai_model('cheap') # Use cheap model
self.ai_client = OpenRouterClient(api_key, model)
logger.info("Moderator: AI client initialized")
else:
logger.warning("Moderator: AI enabled but no API key found")
def get_name(self) -> str:
return "Moderator"
def process(self, post: Dict[str, Any], result: FilterResult) -> FilterResult:
"""
Moderate post for safety and quality.
Args:
post: Post data
result: Current FilterResult
Returns:
Updated FilterResult with moderation data
"""
title = post.get('title', '')
content = post.get('content', '')
# Use existing cache key from Categorizer
content_hash = result.cache_key or self.cache.compute_content_hash(title, content)
# Try to get cached AI analysis
cached_analysis = self.cache.get_ai_analysis(content_hash)
if cached_analysis and cached_analysis.moderation:
# Use cached moderation data
result.moderation_data = cached_analysis.moderation
result.score_breakdown['quality'] = cached_analysis.quality_score
logger.debug(f"Moderator: Cache hit for {content_hash[:8]}...")
return result
# No cache, need to moderate
if self.ai_client:
moderation, quality_score, sentiment = self._moderate_with_ai(title, content)
else:
# Fallback: Safe defaults
moderation, quality_score, sentiment = self._moderate_fallback(post)
# Update or create AI analysis result
if cached_analysis:
# Update existing analysis with moderation data
cached_analysis.moderation = moderation
cached_analysis.quality_score = quality_score
cached_analysis.sentiment = sentiment.get('sentiment')
cached_analysis.sentiment_score = sentiment.get('score', 0.0)
ai_analysis = cached_analysis
else:
# Create new analysis
ai_analysis = AIAnalysisResult(
content_hash=content_hash,
moderation=moderation,
quality_score=quality_score,
sentiment=sentiment.get('sentiment'),
sentiment_score=sentiment.get('score', 0.0),
analyzed_at=datetime.now(),
model_used=self.ai_client.model if self.ai_client else 'fallback'
)
# Cache AI analysis
self.cache.set_ai_analysis(content_hash, ai_analysis)
# Update result
result.moderation_data = moderation
result.score_breakdown['quality'] = quality_score
result.score_breakdown['sentiment'] = sentiment.get('score', 0.0)
logger.debug(f"Moderator: Analyzed {content_hash[:8]}... (quality: {quality_score:.2f})")
return result
def _moderate_with_ai(self, title: str, content: str) -> tuple:
"""
Moderate using AI.
Returns:
(moderation dict, quality_score float, sentiment dict)
"""
try:
# Run moderation
moderation = self.ai_client.moderate(title, content)
# Run quality scoring
quality_score = self.ai_client.score_quality(title, content)
# Run sentiment analysis
sentiment = self.ai_client.analyze_sentiment(title, content)
return moderation, quality_score, sentiment
except Exception as e:
logger.error(f"AI moderation failed: {e}")
return self._moderate_fallback({})
def _moderate_fallback(self, post: Dict[str, Any]) -> tuple:
"""
Fallback moderation with safe defaults.
Returns:
(moderation dict, quality_score float, sentiment dict)
"""
moderation = {
'violence': 0.0,
'sexual_content': 0.0,
'hate_speech': 0.0,
'harassment': 0.0,
'is_safe': True
}
quality_score = 0.5 # Neutral quality
sentiment = {
'sentiment': 'neutral',
'score': 0.0
}
return moderation, quality_score, sentiment

View File

@@ -0,0 +1,201 @@
"""
Ranker Stage
Score and rank posts based on quality, recency, and source.
"""
import logging
from typing import Dict, Any
from datetime import datetime
from .base_stage import BaseStage
from ..models import FilterResult
logger = logging.getLogger(__name__)
class RankerStage(BaseStage):
"""
Stage 4: Score and rank posts.
Combines multiple factors:
- Quality score (from Moderator)
- Recency (how recent the post is)
- Source tier (platform/source reputation)
- User engagement (score, replies)
"""
def __init__(self, config, cache):
super().__init__(config, cache)
# Scoring weights
self.weights = {
'quality': 0.3,
'recency': 0.25,
'source_tier': 0.25,
'engagement': 0.20
}
# Source tiers (higher = better)
self.source_tiers = {
'tier1': ['hackernews', 'arxiv', 'nature', 'science'],
'tier2': ['reddit', 'stackoverflow', 'github'],
'tier3': ['twitter', 'medium', 'dev.to']
}
def get_name(self) -> str:
return "Ranker"
def process(self, post: Dict[str, Any], result: FilterResult) -> FilterResult:
"""
Calculate final score for post.
Args:
post: Post data
result: Current FilterResult
Returns:
Updated FilterResult with final score
"""
# Calculate component scores
quality_score = self._get_quality_score(result)
recency_score = self._calculate_recency_score(post)
source_score = self._calculate_source_score(post)
engagement_score = self._calculate_engagement_score(post)
# Store breakdown
result.score_breakdown.update({
'quality': quality_score,
'recency': recency_score,
'source_tier': source_score,
'engagement': engagement_score
})
# Calculate weighted final score
final_score = (
quality_score * self.weights['quality'] +
recency_score * self.weights['recency'] +
source_score * self.weights['source_tier'] +
engagement_score * self.weights['engagement']
)
result.score = final_score
logger.debug(
f"Ranker: Post {post.get('uuid', '')[:8]}... score={final_score:.3f} "
f"(q:{quality_score:.2f}, r:{recency_score:.2f}, s:{source_score:.2f}, e:{engagement_score:.2f})"
)
return result
def _get_quality_score(self, result: FilterResult) -> float:
"""Get quality score from Moderator stage"""
return result.score_breakdown.get('quality', 0.5)
def _calculate_recency_score(self, post: Dict[str, Any]) -> float:
"""
Calculate recency score based on post age.
Returns:
Score 0.0-1.0 (1.0 = very recent, 0.0 = very old)
"""
timestamp = post.get('timestamp')
if not timestamp:
return 0.5 # Neutral if no timestamp
try:
# Convert to datetime
if isinstance(timestamp, int):
post_time = datetime.fromtimestamp(timestamp)
else:
post_time = datetime.fromisoformat(str(timestamp))
# Calculate age in hours
age_seconds = (datetime.now() - post_time).total_seconds()
age_hours = age_seconds / 3600
# Scoring curve
if age_hours < 1:
return 1.0
elif age_hours < 6:
return 0.9
elif age_hours < 12:
return 0.75
elif age_hours < 24:
return 0.6
elif age_hours < 48:
return 0.4
elif age_hours < 168: # 1 week
return 0.25
else:
return 0.1
except Exception as e:
logger.debug(f"Error calculating recency: {e}")
return 0.5
def _calculate_source_score(self, post: Dict[str, Any]) -> float:
"""
Calculate source tier score.
Returns:
Score 0.0-1.0 based on source reputation
"""
platform = post.get('platform', '').lower()
source = post.get('source', '').lower()
# Check tier 1
if any(t in platform or t in source for t in self.source_tiers['tier1']):
return 1.0
# Check tier 2
if any(t in platform or t in source for t in self.source_tiers['tier2']):
return 0.7
# Check tier 3
if any(t in platform or t in source for t in self.source_tiers['tier3']):
return 0.5
# Unknown source
return 0.3
def _calculate_engagement_score(self, post: Dict[str, Any]) -> float:
"""
Calculate engagement score based on upvotes/score and comments.
Returns:
Score 0.0-1.0 based on engagement metrics
"""
score = post.get('score', 0)
replies = post.get('replies', 0)
# Normalize scores (logarithmic scale)
import math
# Score component (0-1)
if score <= 0:
score_component = 0.0
elif score < 10:
score_component = score / 10
elif score < 100:
score_component = 0.1 + (math.log10(score) - 1) * 0.3 # 0.1-0.4
elif score < 1000:
score_component = 0.4 + (math.log10(score) - 2) * 0.3 # 0.4-0.7
else:
score_component = min(1.0, 0.7 + (math.log10(score) - 3) * 0.1) # 0.7-1.0
# Replies component (0-1)
if replies <= 0:
replies_component = 0.0
elif replies < 5:
replies_component = replies / 5
elif replies < 20:
replies_component = 0.2 + (replies - 5) / 15 * 0.3 # 0.2-0.5
elif replies < 100:
replies_component = 0.5 + (math.log10(replies) - math.log10(20)) / (2 - math.log10(20)) * 0.3 # 0.5-0.8
else:
replies_component = min(1.0, 0.8 + (math.log10(replies) - 2) * 0.1) # 0.8-1.0
# Weighted combination (score matters more than replies)
engagement_score = score_component * 0.7 + replies_component * 0.3
return engagement_score