- Docker deployment ready
- Content aggregation and filtering
- User authentication
- Polling service for updates
🤖 Generated with Claude Code
346 lines
11 KiB
Python
346 lines
11 KiB
Python
"""
|
|
Filter Library
|
|
Bare bones utilities for filtering posts and comments based on rules.
|
|
"""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
from abc import ABC, abstractmethod
|
|
|
|
|
|
class filter_lib:
|
|
"""Atomic filter utility functions"""
|
|
|
|
@staticmethod
|
|
def load_filterset(path: str) -> Dict:
|
|
"""Load filterset JSON from file"""
|
|
with open(path, 'r') as f:
|
|
return json.load(f)
|
|
|
|
@staticmethod
|
|
def load_data_by_uuid(uuid: str, data_dir: str) -> Optional[Dict]:
|
|
"""Load single JSON file by UUID"""
|
|
file_path = Path(data_dir) / f"{uuid}.json"
|
|
if not file_path.exists():
|
|
return None
|
|
|
|
with open(file_path, 'r') as f:
|
|
return json.load(f)
|
|
|
|
@staticmethod
|
|
def merge_moderation(item: Dict, moderation_data: Dict) -> Dict:
|
|
"""Merge item with its moderation data by UUID"""
|
|
mod_uuid = item.get('moderation_uuid')
|
|
if mod_uuid and mod_uuid in moderation_data:
|
|
item['moderation'] = moderation_data[mod_uuid]
|
|
else:
|
|
item['moderation'] = {}
|
|
return item
|
|
|
|
@staticmethod
|
|
def get_nested_value(obj: Dict, path: str) -> Any:
|
|
"""Get value from nested dict using dot notation (e.g., 'moderation.flags.is_safe')"""
|
|
keys = path.split('.')
|
|
value = obj
|
|
for key in keys:
|
|
if isinstance(value, dict) and key in value:
|
|
value = value[key]
|
|
else:
|
|
return None
|
|
return value
|
|
|
|
@staticmethod
|
|
def evaluate_rule(value: Any, operator: str, target: Any) -> bool:
|
|
"""Evaluate single rule: value operator target"""
|
|
if value is None:
|
|
return False
|
|
|
|
if operator == 'equals':
|
|
return value == target
|
|
elif operator == 'not_equals':
|
|
return value != target
|
|
elif operator == 'in':
|
|
return value in target
|
|
elif operator == 'not_in':
|
|
return value not in target
|
|
elif operator == 'min':
|
|
return value >= target
|
|
elif operator == 'max':
|
|
return value <= target
|
|
elif operator == 'after':
|
|
return value > target
|
|
elif operator == 'before':
|
|
return value < target
|
|
elif operator == 'contains':
|
|
return target in value
|
|
elif operator == 'excludes':
|
|
if isinstance(value, list):
|
|
return not any(item in target for item in value)
|
|
return target not in value
|
|
elif operator == 'includes':
|
|
if isinstance(value, list):
|
|
return target in value
|
|
return False
|
|
elif operator == 'includes_any':
|
|
# Special case for topic matching
|
|
if isinstance(value, list) and isinstance(target, list):
|
|
for topic_item in value:
|
|
for rule in target:
|
|
if (topic_item.get('topic') == rule.get('topic') and
|
|
topic_item.get('confidence', 0) >= rule.get('confidence_min', 0)):
|
|
return True
|
|
return False
|
|
elif operator == 'min_length':
|
|
return len(str(value)) >= target
|
|
elif operator == 'max_length':
|
|
return len(str(value)) <= target
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def apply_rules(item: Dict, rules: Dict) -> bool:
|
|
"""
|
|
Apply multiple rules to item, return True if all pass (AND logic).
|
|
Rules format: {"field.path": {"operator": value}}
|
|
"""
|
|
if not rules:
|
|
return True # Empty rules = pass all
|
|
|
|
for field_path, rule_def in rules.items():
|
|
value = filter_lib.get_nested_value(item, field_path)
|
|
|
|
# Support multiple operators per field
|
|
for operator, target in rule_def.items():
|
|
if not filter_lib.evaluate_rule(value, operator, target):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class CommentFilterMode(ABC):
|
|
"""Abstract base class for comment filtering modes"""
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
|
|
"""Filter comments based on rules and moderation data. Override in subclasses."""
|
|
pass
|
|
|
|
|
|
class TreePruningMode(CommentFilterMode):
|
|
"""
|
|
Tree Pruning Filter Mode (Default)
|
|
Fruit of the poisonous tree: if parent fails moderation, remove all children.
|
|
"""
|
|
|
|
@staticmethod
|
|
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
|
|
"""
|
|
Filter comments using tree pruning.
|
|
Build tree structure, evaluate from root down, prune toxic branches.
|
|
"""
|
|
if not comments:
|
|
return []
|
|
|
|
# Merge moderation data into comments
|
|
for comment in comments:
|
|
filter_lib.merge_moderation(comment, moderation_data)
|
|
|
|
# Build tree structure
|
|
tree = TreePruningMode._build_tree(comments)
|
|
|
|
# Prune tree based on rules
|
|
pruned = TreePruningMode._prune_tree(tree, rules)
|
|
|
|
# Flatten back to list
|
|
return TreePruningMode._flatten_tree(pruned)
|
|
|
|
@staticmethod
|
|
def _build_tree(comments: List[Dict]) -> List[Dict]:
|
|
"""Build nested tree from flat comment list"""
|
|
# Create lookup dict
|
|
comment_map = {c['uuid']: {**c, 'children': []} for c in comments}
|
|
|
|
# Build tree
|
|
roots = []
|
|
for comment in comments:
|
|
parent_uuid = comment.get('parent_comment_uuid')
|
|
if parent_uuid and parent_uuid in comment_map:
|
|
comment_map[parent_uuid]['children'].append(comment_map[comment['uuid']])
|
|
else:
|
|
roots.append(comment_map[comment['uuid']])
|
|
|
|
return roots
|
|
|
|
@staticmethod
|
|
def _prune_tree(tree: List[Dict], rules: Dict) -> List[Dict]:
|
|
"""
|
|
Recursively prune tree.
|
|
If node fails rules, remove it and all children.
|
|
"""
|
|
pruned = []
|
|
|
|
for node in tree:
|
|
# Check if this node passes rules
|
|
if filter_lib.apply_rules(node, rules):
|
|
# Node passes, recursively check children
|
|
if node.get('children'):
|
|
node['children'] = TreePruningMode._prune_tree(node['children'], rules)
|
|
pruned.append(node)
|
|
# If node fails, it and all children are discarded (tree pruning)
|
|
|
|
return pruned
|
|
|
|
@staticmethod
|
|
def _flatten_tree(tree: List[Dict]) -> List[Dict]:
|
|
"""Flatten tree back to list"""
|
|
flat = []
|
|
|
|
def traverse(nodes):
|
|
for node in nodes:
|
|
children = node.pop('children', [])
|
|
flat.append(node)
|
|
if children:
|
|
traverse(children)
|
|
|
|
traverse(tree)
|
|
return flat
|
|
|
|
|
|
class IndividualFilterMode(CommentFilterMode):
|
|
"""
|
|
Individual Filter Mode
|
|
Each comment evaluated independently, no tree pruning.
|
|
"""
|
|
|
|
@staticmethod
|
|
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
|
|
"""Filter comments individually"""
|
|
filtered = []
|
|
|
|
for comment in comments:
|
|
# Merge moderation
|
|
filter_lib.merge_moderation(comment, moderation_data)
|
|
|
|
# Apply rules
|
|
if filter_lib.apply_rules(comment, rules):
|
|
filtered.append(comment)
|
|
|
|
return filtered
|
|
|
|
|
|
class ScoreBasedFilterMode(CommentFilterMode):
|
|
"""
|
|
Score-Based Filter Mode
|
|
Filter comments based on score thresholds, keeping high-quality content.
|
|
"""
|
|
|
|
@staticmethod
|
|
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
|
|
"""Filter comments based on score and rules"""
|
|
filtered = []
|
|
|
|
for comment in comments:
|
|
# Merge moderation
|
|
filter_lib.merge_moderation(comment, moderation_data)
|
|
|
|
# Apply basic rules first
|
|
if not filter_lib.apply_rules(comment, rules):
|
|
continue
|
|
|
|
# Additional score-based filtering
|
|
score = comment.get('score', 0)
|
|
min_score = rules.get('score', {}).get('min', -1000) # Default very low threshold
|
|
|
|
if score >= min_score:
|
|
filtered.append(comment)
|
|
|
|
return filtered
|
|
|
|
|
|
class TimeBoundFilterMode(CommentFilterMode):
|
|
"""
|
|
Time-Bound Filter Mode
|
|
Filter comments within specific time ranges.
|
|
"""
|
|
|
|
@staticmethod
|
|
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
|
|
"""Filter comments within time bounds"""
|
|
from datetime import datetime
|
|
|
|
filtered = []
|
|
|
|
for comment in comments:
|
|
# Merge moderation
|
|
filter_lib.merge_moderation(comment, moderation_data)
|
|
|
|
# Apply basic rules first
|
|
if not filter_lib.apply_rules(comment, rules):
|
|
continue
|
|
|
|
# Time-based filtering
|
|
timestamp = comment.get('timestamp')
|
|
if timestamp:
|
|
try:
|
|
comment_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
|
|
time_rules = rules.get('timestamp', {})
|
|
|
|
after = time_rules.get('after')
|
|
before = time_rules.get('before')
|
|
|
|
if after:
|
|
after_time = datetime.fromisoformat(after.replace('Z', '+00:00'))
|
|
if comment_time <= after_time:
|
|
continue
|
|
|
|
if before:
|
|
before_time = datetime.fromisoformat(before.replace('Z', '+00:00'))
|
|
if comment_time >= before_time:
|
|
continue
|
|
|
|
filtered.append(comment)
|
|
except (ValueError, TypeError):
|
|
# Skip malformed timestamps
|
|
continue
|
|
else:
|
|
# No timestamp, include if no time rules
|
|
if 'timestamp' not in rules:
|
|
filtered.append(comment)
|
|
|
|
return filtered
|
|
|
|
|
|
class ContentLengthFilterMode(CommentFilterMode):
|
|
"""
|
|
Content Length Filter Mode
|
|
Filter comments based on content length criteria.
|
|
"""
|
|
|
|
@staticmethod
|
|
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
|
|
"""Filter comments based on content length"""
|
|
filtered = []
|
|
|
|
for comment in comments:
|
|
# Merge moderation
|
|
filter_lib.merge_moderation(comment, moderation_data)
|
|
|
|
# Apply basic rules first
|
|
if not filter_lib.apply_rules(comment, rules):
|
|
continue
|
|
|
|
# Content length filtering
|
|
content = comment.get('content', '')
|
|
content_length = len(content)
|
|
|
|
length_rules = rules.get('content_length', {})
|
|
min_length = length_rules.get('min', 0)
|
|
max_length = length_rules.get('max', float('inf'))
|
|
|
|
if min_length <= content_length <= max_length:
|
|
filtered.append(comment)
|
|
|
|
return filtered
|