""" Filter Library Bare bones utilities for filtering posts and comments based on rules. """ import json from pathlib import Path from typing import Dict, List, Any, Optional from abc import ABC, abstractmethod class filter_lib: """Atomic filter utility functions""" @staticmethod def load_filterset(path: str) -> Dict: """Load filterset JSON from file""" with open(path, 'r') as f: return json.load(f) @staticmethod def load_data_by_uuid(uuid: str, data_dir: str) -> Optional[Dict]: """Load single JSON file by UUID""" file_path = Path(data_dir) / f"{uuid}.json" if not file_path.exists(): return None with open(file_path, 'r') as f: return json.load(f) @staticmethod def merge_moderation(item: Dict, moderation_data: Dict) -> Dict: """Merge item with its moderation data by UUID""" mod_uuid = item.get('moderation_uuid') if mod_uuid and mod_uuid in moderation_data: item['moderation'] = moderation_data[mod_uuid] else: item['moderation'] = {} return item @staticmethod def get_nested_value(obj: Dict, path: str) -> Any: """Get value from nested dict using dot notation (e.g., 'moderation.flags.is_safe')""" keys = path.split('.') value = obj for key in keys: if isinstance(value, dict) and key in value: value = value[key] else: return None return value @staticmethod def evaluate_rule(value: Any, operator: str, target: Any) -> bool: """Evaluate single rule: value operator target""" if value is None: return False if operator == 'equals': return value == target elif operator == 'not_equals': return value != target elif operator == 'in': return value in target elif operator == 'not_in': return value not in target elif operator == 'min': return value >= target elif operator == 'max': return value <= target elif operator == 'after': return value > target elif operator == 'before': return value < target elif operator == 'contains': return target in value elif operator == 'excludes': if isinstance(value, list): return not any(item in target for item in value) return target not in value elif operator == 'includes': if isinstance(value, list): return target in value return False elif operator == 'includes_any': # Special case for topic matching if isinstance(value, list) and isinstance(target, list): for topic_item in value: for rule in target: if (topic_item.get('topic') == rule.get('topic') and topic_item.get('confidence', 0) >= rule.get('confidence_min', 0)): return True return False elif operator == 'min_length': return len(str(value)) >= target elif operator == 'max_length': return len(str(value)) <= target else: return False @staticmethod def apply_rules(item: Dict, rules: Dict) -> bool: """ Apply multiple rules to item, return True if all pass (AND logic). Rules format: {"field.path": {"operator": value}} """ if not rules: return True # Empty rules = pass all for field_path, rule_def in rules.items(): value = filter_lib.get_nested_value(item, field_path) # Support multiple operators per field for operator, target in rule_def.items(): if not filter_lib.evaluate_rule(value, operator, target): return False return True class CommentFilterMode(ABC): """Abstract base class for comment filtering modes""" @staticmethod @abstractmethod def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]: """Filter comments based on rules and moderation data. Override in subclasses.""" pass class TreePruningMode(CommentFilterMode): """ Tree Pruning Filter Mode (Default) Fruit of the poisonous tree: if parent fails moderation, remove all children. """ @staticmethod def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]: """ Filter comments using tree pruning. Build tree structure, evaluate from root down, prune toxic branches. """ if not comments: return [] # Merge moderation data into comments for comment in comments: filter_lib.merge_moderation(comment, moderation_data) # Build tree structure tree = TreePruningMode._build_tree(comments) # Prune tree based on rules pruned = TreePruningMode._prune_tree(tree, rules) # Flatten back to list return TreePruningMode._flatten_tree(pruned) @staticmethod def _build_tree(comments: List[Dict]) -> List[Dict]: """Build nested tree from flat comment list""" # Create lookup dict comment_map = {c['uuid']: {**c, 'children': []} for c in comments} # Build tree roots = [] for comment in comments: parent_uuid = comment.get('parent_comment_uuid') if parent_uuid and parent_uuid in comment_map: comment_map[parent_uuid]['children'].append(comment_map[comment['uuid']]) else: roots.append(comment_map[comment['uuid']]) return roots @staticmethod def _prune_tree(tree: List[Dict], rules: Dict) -> List[Dict]: """ Recursively prune tree. If node fails rules, remove it and all children. """ pruned = [] for node in tree: # Check if this node passes rules if filter_lib.apply_rules(node, rules): # Node passes, recursively check children if node.get('children'): node['children'] = TreePruningMode._prune_tree(node['children'], rules) pruned.append(node) # If node fails, it and all children are discarded (tree pruning) return pruned @staticmethod def _flatten_tree(tree: List[Dict]) -> List[Dict]: """Flatten tree back to list""" flat = [] def traverse(nodes): for node in nodes: children = node.pop('children', []) flat.append(node) if children: traverse(children) traverse(tree) return flat class IndividualFilterMode(CommentFilterMode): """ Individual Filter Mode Each comment evaluated independently, no tree pruning. """ @staticmethod def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]: """Filter comments individually""" filtered = [] for comment in comments: # Merge moderation filter_lib.merge_moderation(comment, moderation_data) # Apply rules if filter_lib.apply_rules(comment, rules): filtered.append(comment) return filtered class ScoreBasedFilterMode(CommentFilterMode): """ Score-Based Filter Mode Filter comments based on score thresholds, keeping high-quality content. """ @staticmethod def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]: """Filter comments based on score and rules""" filtered = [] for comment in comments: # Merge moderation filter_lib.merge_moderation(comment, moderation_data) # Apply basic rules first if not filter_lib.apply_rules(comment, rules): continue # Additional score-based filtering score = comment.get('score', 0) min_score = rules.get('score', {}).get('min', -1000) # Default very low threshold if score >= min_score: filtered.append(comment) return filtered class TimeBoundFilterMode(CommentFilterMode): """ Time-Bound Filter Mode Filter comments within specific time ranges. """ @staticmethod def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]: """Filter comments within time bounds""" from datetime import datetime filtered = [] for comment in comments: # Merge moderation filter_lib.merge_moderation(comment, moderation_data) # Apply basic rules first if not filter_lib.apply_rules(comment, rules): continue # Time-based filtering timestamp = comment.get('timestamp') if timestamp: try: comment_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) time_rules = rules.get('timestamp', {}) after = time_rules.get('after') before = time_rules.get('before') if after: after_time = datetime.fromisoformat(after.replace('Z', '+00:00')) if comment_time <= after_time: continue if before: before_time = datetime.fromisoformat(before.replace('Z', '+00:00')) if comment_time >= before_time: continue filtered.append(comment) except (ValueError, TypeError): # Skip malformed timestamps continue else: # No timestamp, include if no time rules if 'timestamp' not in rules: filtered.append(comment) return filtered class ContentLengthFilterMode(CommentFilterMode): """ Content Length Filter Mode Filter comments based on content length criteria. """ @staticmethod def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]: """Filter comments based on content length""" filtered = [] for comment in comments: # Merge moderation filter_lib.merge_moderation(comment, moderation_data) # Apply basic rules first if not filter_lib.apply_rules(comment, rules): continue # Content length filtering content = comment.get('content', '') content_length = len(content) length_rules = rules.get('content_length', {}) min_length = length_rules.get('min', 0) max_length = length_rules.get('max', float('inf')) if min_length <= content_length <= max_length: filtered.append(comment) return filtered