diff --git a/filter_pipeline/ai_client.py b/filter_pipeline/ai_client.py new file mode 100644 index 0000000..eca2311 --- /dev/null +++ b/filter_pipeline/ai_client.py @@ -0,0 +1,326 @@ +""" +AI Client +OpenRouter API client for content analysis (Llama 70B only). +""" + +import requests +import logging +import time +from typing import Optional, Dict, Any + +logger = logging.getLogger(__name__) + + +class OpenRouterClient: + """ + OpenRouter API client for AI-powered content analysis. + Uses only Llama 70B for cost efficiency. + """ + + def __init__(self, api_key: str, model: str = 'meta-llama/llama-3.3-70b-instruct'): + """ + Initialize OpenRouter client. + + Args: + api_key: OpenRouter API key + model: Model to use (default: Llama 70B) + """ + self.api_key = api_key + self.model = model + self.base_url = 'https://openrouter.ai/api/v1/chat/completions' + self.timeout = 60 + self.max_retries = 3 + self.retry_delay = 2 # seconds + + def call_model( + self, + prompt: str, + max_tokens: int = 500, + temperature: float = 0.7, + system_prompt: Optional[str] = None + ) -> str: + """ + Call AI model with prompt. + + Args: + prompt: User prompt + max_tokens: Maximum tokens in response + temperature: Sampling temperature (0.0-1.0) + system_prompt: Optional system prompt + + Returns: + Model response text + + Raises: + Exception if API call fails after retries + """ + messages = [] + + if system_prompt: + messages.append({'role': 'system', 'content': system_prompt}) + + messages.append({'role': 'user', 'content': prompt}) + + payload = { + 'model': self.model, + 'messages': messages, + 'max_tokens': max_tokens, + 'temperature': temperature + } + + headers = { + 'Authorization': f'Bearer {self.api_key}', + 'Content-Type': 'application/json', + 'HTTP-Referer': 'https://github.com/balanceboard', + 'X-Title': 'BalanceBoard Filter Pipeline' + } + + # Retry loop + last_error = None + for attempt in range(self.max_retries): + try: + response = requests.post( + self.base_url, + headers=headers, + json=payload, + timeout=self.timeout + ) + + response.raise_for_status() + data = response.json() + + # Extract response text + result = data['choices'][0]['message']['content'].strip() + + logger.debug(f"AI call successful (attempt {attempt + 1})") + return result + + except requests.exceptions.RequestException as e: + last_error = e + logger.warning(f"AI call failed (attempt {attempt + 1}/{self.max_retries}): {e}") + + if attempt < self.max_retries - 1: + time.sleep(self.retry_delay * (attempt + 1)) # Exponential backoff + continue + + # All retries failed + error_msg = f"AI call failed after {self.max_retries} attempts: {last_error}" + logger.error(error_msg) + raise Exception(error_msg) + + def categorize(self, title: str, content: str, categories: list) -> Dict[str, Any]: + """ + Categorize content into predefined categories. + + Args: + title: Post title + content: Post content/description + categories: List of valid category names + + Returns: + Dict with 'category' and 'confidence' keys + """ + category_list = ', '.join(categories) + + prompt = f"""Classify this content into ONE of these categories: {category_list} + +Title: {title} +Content: {content[:500]} + +Respond in this EXACT format: +CATEGORY: [category name] +CONFIDENCE: [0.0-1.0]""" + + try: + response = self.call_model(prompt, max_tokens=20, temperature=0.3) + + # Parse response + lines = response.strip().split('\n') + category = None + confidence = 0.5 + + for line in lines: + if line.startswith('CATEGORY:'): + category = line.split(':', 1)[1].strip().lower() + elif line.startswith('CONFIDENCE:'): + try: + confidence = float(line.split(':', 1)[1].strip()) + except: + confidence = 0.5 + + # Validate category + if category not in [c.lower() for c in categories]: + category = categories[0].lower() # Default to first category + + return { + 'category': category, + 'confidence': confidence + } + + except Exception as e: + logger.error(f"Categorization failed: {e}") + return { + 'category': categories[0].lower(), + 'confidence': 0.0 + } + + def moderate(self, title: str, content: str) -> Dict[str, Any]: + """ + Perform content moderation (safety analysis). + + Args: + title: Post title + content: Post content/description + + Returns: + Dict with moderation flags and scores + """ + prompt = f"""Analyze this content for safety issues. + +Title: {title} +Content: {content[:500]} + +Respond in this EXACT format: +VIOLENCE: [0.0-1.0] +SEXUAL: [0.0-1.0] +HATE_SPEECH: [0.0-1.0] +HARASSMENT: [0.0-1.0] +IS_SAFE: [YES/NO]""" + + try: + response = self.call_model(prompt, max_tokens=50, temperature=0.3) + + # Parse response + moderation = { + 'violence': 0.0, + 'sexual_content': 0.0, + 'hate_speech': 0.0, + 'harassment': 0.0, + 'is_safe': True + } + + lines = response.strip().split('\n') + for line in lines: + if ':' not in line: + continue + + key, value = line.split(':', 1) + key = key.strip().lower() + value = value.strip() + + if key == 'violence': + moderation['violence'] = float(value) + elif key == 'sexual': + moderation['sexual_content'] = float(value) + elif key == 'hate_speech': + moderation['hate_speech'] = float(value) + elif key == 'harassment': + moderation['harassment'] = float(value) + elif key == 'is_safe': + moderation['is_safe'] = value.upper() == 'YES' + + return moderation + + except Exception as e: + logger.error(f"Moderation failed: {e}") + return { + 'violence': 0.0, + 'sexual_content': 0.0, + 'hate_speech': 0.0, + 'harassment': 0.0, + 'is_safe': True + } + + def score_quality(self, title: str, content: str) -> float: + """ + Score content quality (0.0-1.0). + + Args: + title: Post title + content: Post content/description + + Returns: + Quality score (0.0-1.0) + """ + prompt = f"""Rate this content's quality on a scale of 0.0 to 1.0. + +Consider: +- Clarity and informativeness +- Proper grammar and formatting +- Lack of clickbait or sensationalism +- Factual tone + +Title: {title} +Content: {content[:500]} + +Respond with ONLY a number between 0.0 and 1.0 (e.g., 0.7)""" + + try: + response = self.call_model(prompt, max_tokens=10, temperature=0.3) + + # Extract number + score = float(response.strip()) + score = max(0.0, min(1.0, score)) # Clamp to 0-1 + + return score + + except Exception as e: + logger.error(f"Quality scoring failed: {e}") + return 0.5 # Default neutral score + + def analyze_sentiment(self, title: str, content: str) -> Dict[str, Any]: + """ + Analyze sentiment of content. + + Args: + title: Post title + content: Post content/description + + Returns: + Dict with 'sentiment' (positive/neutral/negative) and 'score' + """ + prompt = f"""Analyze the sentiment of this content. + +Title: {title} +Content: {content[:500]} + +Respond in this EXACT format: +SENTIMENT: [positive/neutral/negative] +SCORE: [-1.0 to 1.0]""" + + try: + response = self.call_model(prompt, max_tokens=20, temperature=0.3) + + # Parse response + sentiment = 'neutral' + score = 0.0 + + lines = response.strip().split('\n') + for line in lines: + if ':' not in line: + continue + + key, value = line.split(':', 1) + key = key.strip().lower() + value = value.strip() + + if key == 'sentiment': + sentiment = value.lower() + elif key == 'score': + try: + score = float(value) + score = max(-1.0, min(1.0, score)) + except: + score = 0.0 + + return { + 'sentiment': sentiment, + 'score': score + } + + except Exception as e: + logger.error(f"Sentiment analysis failed: {e}") + return { + 'sentiment': 'neutral', + 'score': 0.0 + } diff --git a/filter_pipeline/plugins/__init__.py b/filter_pipeline/plugins/__init__.py index 050764f..92217ca 100644 --- a/filter_pipeline/plugins/__init__.py +++ b/filter_pipeline/plugins/__init__.py @@ -4,5 +4,7 @@ Pluggable filters for content filtering. """ from .base import BaseFilterPlugin +from .keyword import KeywordFilterPlugin +from .quality import QualityFilterPlugin -__all__ = ['BaseFilterPlugin'] +__all__ = ['BaseFilterPlugin', 'KeywordFilterPlugin', 'QualityFilterPlugin'] diff --git a/filter_pipeline/plugins/keyword.py b/filter_pipeline/plugins/keyword.py new file mode 100644 index 0000000..ffc61d6 --- /dev/null +++ b/filter_pipeline/plugins/keyword.py @@ -0,0 +1,95 @@ +""" +Keyword Filter Plugin +Simple keyword-based filtering. +""" + +import logging +from typing import Dict, Any, Optional, List + +from .base import BaseFilterPlugin + +logger = logging.getLogger(__name__) + + +class KeywordFilterPlugin(BaseFilterPlugin): + """ + Filter posts based on keyword matching. + + Supports: + - Blocklist: Reject posts containing blocked keywords + - Allowlist: Only allow posts containing allowed keywords + - Case-insensitive matching + """ + + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + + self.blocklist = [k.lower() for k in config.get('blocklist', [])] + self.allowlist = [k.lower() for k in config.get('allowlist', [])] + self.check_title = config.get('check_title', True) + self.check_content = config.get('check_content', True) + + def get_name(self) -> str: + return "KeywordFilter" + + def should_filter(self, post: Dict[str, Any], context: Optional[Dict] = None) -> bool: + """ + Check if post should be filtered out based on keywords. + + Returns: + True if post contains blocked keywords or missing allowed keywords + """ + text = self._get_text(post) + + # Check blocklist + if self.blocklist: + for keyword in self.blocklist: + if keyword in text: + logger.debug(f"KeywordFilter: Blocked keyword '{keyword}' found") + return True + + # Check allowlist (if specified, at least one keyword must be present) + if self.allowlist: + found = any(keyword in text for keyword in self.allowlist) + if not found: + logger.debug("KeywordFilter: No allowed keywords found") + return True + + return False + + def score(self, post: Dict[str, Any], context: Optional[Dict] = None) -> float: + """ + Score based on keyword presence. + + Returns: + 1.0 if allowlist keywords present, 0.5 neutral, 0.0 if blocklist keywords present + """ + text = self._get_text(post) + + # Check blocklist + if self.blocklist: + for keyword in self.blocklist: + if keyword in text: + return 0.0 + + # Check allowlist + if self.allowlist: + matches = sum(1 for keyword in self.allowlist if keyword in text) + if matches > 0: + return min(1.0, 0.5 + (matches * 0.1)) + + return 0.5 # Neutral + + def _get_text(self, post: Dict[str, Any]) -> str: + """Get searchable text from post""" + text_parts = [] + + if self.check_title: + title = post.get('title', '') + text_parts.append(title) + + if self.check_content: + content = post.get('content', '') + text_parts.append(content) + + return ' '.join(text_parts).lower() diff --git a/filter_pipeline/plugins/quality.py b/filter_pipeline/plugins/quality.py new file mode 100644 index 0000000..065f6cb --- /dev/null +++ b/filter_pipeline/plugins/quality.py @@ -0,0 +1,128 @@ +""" +Quality Filter Plugin +Filter based on quality metrics (readability, length, etc). +""" + +import logging +import re +from typing import Dict, Any, Optional + +from .base import BaseFilterPlugin + +logger = logging.getLogger(__name__) + + +class QualityFilterPlugin(BaseFilterPlugin): + """ + Filter posts based on quality metrics. + + Metrics: + - Title length (too short or too long) + - Content length + - Excessive caps (SHOUTING) + - Excessive punctuation (!!!) + - Clickbait patterns + """ + + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + + self.min_title_length = config.get('min_title_length', 10) + self.max_title_length = config.get('max_title_length', 300) + self.min_content_length = config.get('min_content_length', 0) + self.max_caps_ratio = config.get('max_caps_ratio', 0.5) + self.max_exclamation_marks = config.get('max_exclamation_marks', 3) + + # Clickbait patterns + self.clickbait_patterns = [ + r'you won\'t believe', + r'shocking', + r'doctors hate', + r'this one trick', + r'number \d+ will', + r'what happened next' + ] + + def get_name(self) -> str: + return "QualityFilter" + + def should_filter(self, post: Dict[str, Any], context: Optional[Dict] = None) -> bool: + """ + Check if post should be filtered based on quality. + + Returns: + True if post fails quality checks + """ + title = post.get('title', '') + content = post.get('content', '') + + # Check title length + if len(title) < self.min_title_length: + logger.debug(f"QualityFilter: Title too short ({len(title)} chars)") + return True + + if len(title) > self.max_title_length: + logger.debug(f"QualityFilter: Title too long ({len(title)} chars)") + return True + + # Check content length (if specified) + if self.min_content_length > 0 and len(content) < self.min_content_length: + logger.debug(f"QualityFilter: Content too short ({len(content)} chars)") + return True + + # Check excessive caps + if len(title) > 0: + caps_ratio = sum(1 for c in title if c.isupper()) / len(title) + if caps_ratio > self.max_caps_ratio and len(title) > 10: + logger.debug(f"QualityFilter: Excessive caps ({caps_ratio:.1%})") + return True + + # Check excessive exclamation marks + exclamations = title.count('!') + if exclamations > self.max_exclamation_marks: + logger.debug(f"QualityFilter: Excessive exclamations ({exclamations})") + return True + + # Check clickbait patterns + title_lower = title.lower() + for pattern in self.clickbait_patterns: + if re.search(pattern, title_lower): + logger.debug(f"QualityFilter: Clickbait pattern detected: {pattern}") + return True + + return False + + def score(self, post: Dict[str, Any], context: Optional[Dict] = None) -> float: + """ + Score post quality. + + Returns: + Quality score 0.0-1.0 + """ + title = post.get('title', '') + content = post.get('content', '') + + score = 1.0 + + # Penalize for short title + if len(title) < 20: + score -= 0.1 + + # Penalize for excessive caps + if len(title) > 0: + caps_ratio = sum(1 for c in title if c.isupper()) / len(title) + if caps_ratio > 0.3: + score -= (caps_ratio - 0.3) * 0.5 + + # Penalize for exclamation marks + exclamations = title.count('!') + if exclamations > 0: + score -= exclamations * 0.05 + + # Bonus for longer content + if len(content) > 500: + score += 0.1 + elif len(content) > 200: + score += 0.05 + + return max(0.0, min(1.0, score)) diff --git a/filter_pipeline/stages/__init__.py b/filter_pipeline/stages/__init__.py index 401804e..4201b06 100644 --- a/filter_pipeline/stages/__init__.py +++ b/filter_pipeline/stages/__init__.py @@ -4,5 +4,9 @@ Sequential processing stages for content filtering. """ from .base_stage import BaseStage +from .categorizer import CategorizerStage +from .moderator import ModeratorStage +from .filter import FilterStage +from .ranker import RankerStage -__all__ = ['BaseStage'] +__all__ = ['BaseStage', 'CategorizerStage', 'ModeratorStage', 'FilterStage', 'RankerStage'] diff --git a/filter_pipeline/stages/categorizer.py b/filter_pipeline/stages/categorizer.py new file mode 100644 index 0000000..22eb668 --- /dev/null +++ b/filter_pipeline/stages/categorizer.py @@ -0,0 +1,167 @@ +""" +Categorizer Stage +Detect topics and categories using AI (cached by content hash). +""" + +import logging +from typing import Dict, Any +from datetime import datetime + +from .base_stage import BaseStage +from ..models import FilterResult, AIAnalysisResult +from ..cache import FilterCache +from ..ai_client import OpenRouterClient + +logger = logging.getLogger(__name__) + + +class CategorizerStage(BaseStage): + """ + Stage 1: Categorize content and extract tags. + + Uses AI to detect topics/categories with content-hash based caching. + """ + + def __init__(self, config, cache: FilterCache): + super().__init__(config, cache) + + # Initialize AI client if enabled + self.ai_client = None + if config.is_ai_enabled(): + api_key = config.get_openrouter_key() + if api_key: + model = config.get_ai_model('cheap') # Use cheap model + self.ai_client = OpenRouterClient(api_key, model) + logger.info("Categorizer: AI client initialized") + else: + logger.warning("Categorizer: AI enabled but no API key found") + + # Default categories + self.default_categories = [ + 'technology', 'programming', 'science', 'news', + 'politics', 'business', 'entertainment', 'sports', 'other' + ] + + def get_name(self) -> str: + return "Categorizer" + + def process(self, post: Dict[str, Any], result: FilterResult) -> FilterResult: + """ + Categorize post and add tags. + + Args: + post: Post data + result: Current FilterResult + + Returns: + Updated FilterResult with categories and tags + """ + title = post.get('title', '') + content = post.get('content', '') + + # Compute content hash for caching + content_hash = self.cache.compute_content_hash(title, content) + result.cache_key = content_hash + + # Try to get cached AI analysis + cached_analysis = self.cache.get_ai_analysis(content_hash) + + if cached_analysis: + # Use cached categorization + result.categories = cached_analysis.categories + result.tags.extend(self._extract_platform_tags(post)) + + logger.debug(f"Categorizer: Cache hit for {content_hash[:8]}...") + return result + + # No cache, need to categorize + if self.ai_client: + categories, category_scores = self._categorize_with_ai(title, content) + else: + # Fallback: Use platform/source as category + categories, category_scores = self._categorize_fallback(post) + + # Store in AI analysis result for caching + ai_analysis = AIAnalysisResult( + content_hash=content_hash, + categories=categories, + category_scores=category_scores, + analyzed_at=datetime.now(), + model_used=self.ai_client.model if self.ai_client else 'fallback' + ) + + # Cache AI analysis + self.cache.set_ai_analysis(content_hash, ai_analysis) + + # Update result + result.categories = categories + result.tags.extend(self._extract_platform_tags(post)) + + logger.debug(f"Categorizer: Analyzed {content_hash[:8]}... -> {categories}") + return result + + def _categorize_with_ai(self, title: str, content: str) -> tuple: + """ + Categorize using AI. + + Returns: + (categories list, category_scores dict) + """ + try: + response = self.ai_client.categorize(title, content, self.default_categories) + + category = response.get('category', 'other') + confidence = response.get('confidence', 0.5) + + categories = [category] + category_scores = {category: confidence} + + return categories, category_scores + + except Exception as e: + logger.error(f"AI categorization failed: {e}") + return ['other'], {'other': 0.0} + + def _categorize_fallback(self, post: Dict[str, Any]) -> tuple: + """ + Fallback categorization using platform/source. + + Returns: + (categories list, category_scores dict) + """ + # Use source as category + source = post.get('source', '').lower() + + # Map common sources to categories + category_map = { + 'programming': 'programming', + 'python': 'programming', + 'javascript': 'programming', + 'technology': 'technology', + 'science': 'science', + 'politics': 'politics', + 'worldnews': 'news', + 'news': 'news' + } + + category = category_map.get(source, 'other') + return [category], {category: 0.5} + + def _extract_platform_tags(self, post: Dict[str, Any]) -> list: + """Extract tags from platform, source, etc.""" + tags = [] + + platform = post.get('platform', '') + if platform: + tags.append(platform) + + source = post.get('source', '') + if source: + tags.append(source) + + # Extract existing tags + existing_tags = post.get('tags', []) + if existing_tags: + tags.extend(existing_tags) + + return list(set(tags)) # Remove duplicates diff --git a/filter_pipeline/stages/filter.py b/filter_pipeline/stages/filter.py new file mode 100644 index 0000000..72804a0 --- /dev/null +++ b/filter_pipeline/stages/filter.py @@ -0,0 +1,171 @@ +""" +Filter Stage +Apply filterset rules to posts (no AI needed - fast rule evaluation). +""" + +import logging +from typing import Dict, Any, List + +from .base_stage import BaseStage +from ..models import FilterResult + +logger = logging.getLogger(__name__) + + +class FilterStage(BaseStage): + """ + Stage 3: Apply filterset rules. + + Evaluates filter conditions from filtersets.json without AI. + Fast rule-based filtering. + """ + + def get_name(self) -> str: + return "Filter" + + def process(self, post: Dict[str, Any], result: FilterResult) -> FilterResult: + """ + Apply filterset rules to post. + + Args: + post: Post data + result: Current FilterResult + + Returns: + Updated FilterResult (may be rejected) + """ + # Get filterset configuration + filterset = self.config.get_filterset(result.filterset_name) + + if not filterset: + logger.warning(f"Filterset '{result.filterset_name}' not found") + return result + + # Apply post rules + post_rules = filterset.get('post_rules', {}) + if not self._evaluate_rules(post, result, post_rules): + result.passed = False + logger.debug(f"Filter: Post {post.get('uuid', '')} rejected by filterset rules") + return result + + # Post passed all rules + logger.debug(f"Filter: Post {post.get('uuid', '')} passed filterset '{result.filterset_name}'") + return result + + def _evaluate_rules( + self, + post: Dict[str, Any], + result: FilterResult, + rules: Dict[str, Any] + ) -> bool: + """ + Evaluate all rules for a post. + + Returns: + True if post passes all rules, False otherwise + """ + for field, condition in rules.items(): + if not self._evaluate_condition(post, result, field, condition): + logger.debug(f"Filter: Failed condition '{field}': {condition}") + return False + + return True + + def _evaluate_condition( + self, + post: Dict[str, Any], + result: FilterResult, + field: str, + condition: Any + ) -> bool: + """ + Evaluate a single condition. + + Supported conditions: + - {"equals": value} + - {"not_equals": value} + - {"in": [values]} + - {"not_in": [values]} + - {"min": value} + - {"max": value} + - {"includes_any": [values]} + - {"excludes": [values]} + + Args: + post: Post data + result: FilterResult with moderation data + field: Field path (e.g., "score", "moderation.flags.is_safe") + condition: Condition dict + + Returns: + True if condition passes + """ + # Get field value + value = self._get_field_value(post, result, field) + + # Evaluate condition + if isinstance(condition, dict): + for op, expected in condition.items(): + if op == 'equals': + if value != expected: + return False + elif op == 'not_equals': + if value == expected: + return False + elif op == 'in': + if value not in expected: + return False + elif op == 'not_in': + if value in expected: + return False + elif op == 'min': + if value < expected: + return False + elif op == 'max': + if value > expected: + return False + elif op == 'includes_any': + # Check if any expected value is in the field (for lists) + if not isinstance(value, list): + return False + if not any(item in value for item in expected): + return False + elif op == 'excludes': + # Check that none of the excluded values are present + if isinstance(value, list): + if any(item in expected for item in value): + return False + elif value in expected: + return False + else: + logger.warning(f"Unknown condition operator: {op}") + + return True + + def _get_field_value(self, post: Dict[str, Any], result: FilterResult, field: str): + """ + Get field value from post or result. + + Supports nested fields like "moderation.flags.is_safe" + """ + parts = field.split('.') + + # Check if field is in moderation data + if parts[0] == 'moderation' and result.moderation_data: + value = result.moderation_data + for part in parts[1:]: + if isinstance(value, dict): + value = value.get(part) + else: + return None + return value + + # Check post data + value = post + for part in parts: + if isinstance(value, dict): + value = value.get(part) + else: + return None + + return value diff --git a/filter_pipeline/stages/moderator.py b/filter_pipeline/stages/moderator.py new file mode 100644 index 0000000..504bb7f --- /dev/null +++ b/filter_pipeline/stages/moderator.py @@ -0,0 +1,153 @@ +""" +Moderator Stage +Safety and quality analysis using AI (cached by content hash). +""" + +import logging +from typing import Dict, Any +from datetime import datetime + +from .base_stage import BaseStage +from ..models import FilterResult, AIAnalysisResult +from ..cache import FilterCache +from ..ai_client import OpenRouterClient + +logger = logging.getLogger(__name__) + + +class ModeratorStage(BaseStage): + """ + Stage 2: Content moderation and quality analysis. + + Uses AI to analyze safety, quality, and sentiment with content-hash based caching. + """ + + def __init__(self, config, cache: FilterCache): + super().__init__(config, cache) + + # Initialize AI client if enabled + self.ai_client = None + if config.is_ai_enabled(): + api_key = config.get_openrouter_key() + if api_key: + model = config.get_ai_model('cheap') # Use cheap model + self.ai_client = OpenRouterClient(api_key, model) + logger.info("Moderator: AI client initialized") + else: + logger.warning("Moderator: AI enabled but no API key found") + + def get_name(self) -> str: + return "Moderator" + + def process(self, post: Dict[str, Any], result: FilterResult) -> FilterResult: + """ + Moderate post for safety and quality. + + Args: + post: Post data + result: Current FilterResult + + Returns: + Updated FilterResult with moderation data + """ + title = post.get('title', '') + content = post.get('content', '') + + # Use existing cache key from Categorizer + content_hash = result.cache_key or self.cache.compute_content_hash(title, content) + + # Try to get cached AI analysis + cached_analysis = self.cache.get_ai_analysis(content_hash) + + if cached_analysis and cached_analysis.moderation: + # Use cached moderation data + result.moderation_data = cached_analysis.moderation + result.score_breakdown['quality'] = cached_analysis.quality_score + + logger.debug(f"Moderator: Cache hit for {content_hash[:8]}...") + return result + + # No cache, need to moderate + if self.ai_client: + moderation, quality_score, sentiment = self._moderate_with_ai(title, content) + else: + # Fallback: Safe defaults + moderation, quality_score, sentiment = self._moderate_fallback(post) + + # Update or create AI analysis result + if cached_analysis: + # Update existing analysis with moderation data + cached_analysis.moderation = moderation + cached_analysis.quality_score = quality_score + cached_analysis.sentiment = sentiment.get('sentiment') + cached_analysis.sentiment_score = sentiment.get('score', 0.0) + ai_analysis = cached_analysis + else: + # Create new analysis + ai_analysis = AIAnalysisResult( + content_hash=content_hash, + moderation=moderation, + quality_score=quality_score, + sentiment=sentiment.get('sentiment'), + sentiment_score=sentiment.get('score', 0.0), + analyzed_at=datetime.now(), + model_used=self.ai_client.model if self.ai_client else 'fallback' + ) + + # Cache AI analysis + self.cache.set_ai_analysis(content_hash, ai_analysis) + + # Update result + result.moderation_data = moderation + result.score_breakdown['quality'] = quality_score + result.score_breakdown['sentiment'] = sentiment.get('score', 0.0) + + logger.debug(f"Moderator: Analyzed {content_hash[:8]}... (quality: {quality_score:.2f})") + return result + + def _moderate_with_ai(self, title: str, content: str) -> tuple: + """ + Moderate using AI. + + Returns: + (moderation dict, quality_score float, sentiment dict) + """ + try: + # Run moderation + moderation = self.ai_client.moderate(title, content) + + # Run quality scoring + quality_score = self.ai_client.score_quality(title, content) + + # Run sentiment analysis + sentiment = self.ai_client.analyze_sentiment(title, content) + + return moderation, quality_score, sentiment + + except Exception as e: + logger.error(f"AI moderation failed: {e}") + return self._moderate_fallback({}) + + def _moderate_fallback(self, post: Dict[str, Any]) -> tuple: + """ + Fallback moderation with safe defaults. + + Returns: + (moderation dict, quality_score float, sentiment dict) + """ + moderation = { + 'violence': 0.0, + 'sexual_content': 0.0, + 'hate_speech': 0.0, + 'harassment': 0.0, + 'is_safe': True + } + + quality_score = 0.5 # Neutral quality + + sentiment = { + 'sentiment': 'neutral', + 'score': 0.0 + } + + return moderation, quality_score, sentiment diff --git a/filter_pipeline/stages/ranker.py b/filter_pipeline/stages/ranker.py new file mode 100644 index 0000000..69c5232 --- /dev/null +++ b/filter_pipeline/stages/ranker.py @@ -0,0 +1,201 @@ +""" +Ranker Stage +Score and rank posts based on quality, recency, and source. +""" + +import logging +from typing import Dict, Any +from datetime import datetime + +from .base_stage import BaseStage +from ..models import FilterResult + +logger = logging.getLogger(__name__) + + +class RankerStage(BaseStage): + """ + Stage 4: Score and rank posts. + + Combines multiple factors: + - Quality score (from Moderator) + - Recency (how recent the post is) + - Source tier (platform/source reputation) + - User engagement (score, replies) + """ + + def __init__(self, config, cache): + super().__init__(config, cache) + + # Scoring weights + self.weights = { + 'quality': 0.3, + 'recency': 0.25, + 'source_tier': 0.25, + 'engagement': 0.20 + } + + # Source tiers (higher = better) + self.source_tiers = { + 'tier1': ['hackernews', 'arxiv', 'nature', 'science'], + 'tier2': ['reddit', 'stackoverflow', 'github'], + 'tier3': ['twitter', 'medium', 'dev.to'] + } + + def get_name(self) -> str: + return "Ranker" + + def process(self, post: Dict[str, Any], result: FilterResult) -> FilterResult: + """ + Calculate final score for post. + + Args: + post: Post data + result: Current FilterResult + + Returns: + Updated FilterResult with final score + """ + # Calculate component scores + quality_score = self._get_quality_score(result) + recency_score = self._calculate_recency_score(post) + source_score = self._calculate_source_score(post) + engagement_score = self._calculate_engagement_score(post) + + # Store breakdown + result.score_breakdown.update({ + 'quality': quality_score, + 'recency': recency_score, + 'source_tier': source_score, + 'engagement': engagement_score + }) + + # Calculate weighted final score + final_score = ( + quality_score * self.weights['quality'] + + recency_score * self.weights['recency'] + + source_score * self.weights['source_tier'] + + engagement_score * self.weights['engagement'] + ) + + result.score = final_score + + logger.debug( + f"Ranker: Post {post.get('uuid', '')[:8]}... score={final_score:.3f} " + f"(q:{quality_score:.2f}, r:{recency_score:.2f}, s:{source_score:.2f}, e:{engagement_score:.2f})" + ) + + return result + + def _get_quality_score(self, result: FilterResult) -> float: + """Get quality score from Moderator stage""" + return result.score_breakdown.get('quality', 0.5) + + def _calculate_recency_score(self, post: Dict[str, Any]) -> float: + """ + Calculate recency score based on post age. + + Returns: + Score 0.0-1.0 (1.0 = very recent, 0.0 = very old) + """ + timestamp = post.get('timestamp') + if not timestamp: + return 0.5 # Neutral if no timestamp + + try: + # Convert to datetime + if isinstance(timestamp, int): + post_time = datetime.fromtimestamp(timestamp) + else: + post_time = datetime.fromisoformat(str(timestamp)) + + # Calculate age in hours + age_seconds = (datetime.now() - post_time).total_seconds() + age_hours = age_seconds / 3600 + + # Scoring curve + if age_hours < 1: + return 1.0 + elif age_hours < 6: + return 0.9 + elif age_hours < 12: + return 0.75 + elif age_hours < 24: + return 0.6 + elif age_hours < 48: + return 0.4 + elif age_hours < 168: # 1 week + return 0.25 + else: + return 0.1 + + except Exception as e: + logger.debug(f"Error calculating recency: {e}") + return 0.5 + + def _calculate_source_score(self, post: Dict[str, Any]) -> float: + """ + Calculate source tier score. + + Returns: + Score 0.0-1.0 based on source reputation + """ + platform = post.get('platform', '').lower() + source = post.get('source', '').lower() + + # Check tier 1 + if any(t in platform or t in source for t in self.source_tiers['tier1']): + return 1.0 + + # Check tier 2 + if any(t in platform or t in source for t in self.source_tiers['tier2']): + return 0.7 + + # Check tier 3 + if any(t in platform or t in source for t in self.source_tiers['tier3']): + return 0.5 + + # Unknown source + return 0.3 + + def _calculate_engagement_score(self, post: Dict[str, Any]) -> float: + """ + Calculate engagement score based on upvotes/score and comments. + + Returns: + Score 0.0-1.0 based on engagement metrics + """ + score = post.get('score', 0) + replies = post.get('replies', 0) + + # Normalize scores (logarithmic scale) + import math + + # Score component (0-1) + if score <= 0: + score_component = 0.0 + elif score < 10: + score_component = score / 10 + elif score < 100: + score_component = 0.1 + (math.log10(score) - 1) * 0.3 # 0.1-0.4 + elif score < 1000: + score_component = 0.4 + (math.log10(score) - 2) * 0.3 # 0.4-0.7 + else: + score_component = min(1.0, 0.7 + (math.log10(score) - 3) * 0.1) # 0.7-1.0 + + # Replies component (0-1) + if replies <= 0: + replies_component = 0.0 + elif replies < 5: + replies_component = replies / 5 + elif replies < 20: + replies_component = 0.2 + (replies - 5) / 15 * 0.3 # 0.2-0.5 + elif replies < 100: + replies_component = 0.5 + (math.log10(replies) - math.log10(20)) / (2 - math.log10(20)) * 0.3 # 0.5-0.8 + else: + replies_component = min(1.0, 0.8 + (math.log10(replies) - 2) * 0.1) # 0.8-1.0 + + # Weighted combination (score matters more than replies) + engagement_score = score_component * 0.7 + replies_component * 0.3 + + return engagement_score