Files
balanceboard/filter_lib.py
chelsea e821a26b48 Initial commit: BalanceBoard - Reddit-style content aggregator
- Flask-based web application with PostgreSQL
- User authentication and session management
- Content moderation and filtering
- Docker deployment with docker-compose
- Admin interface for content management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 16:11:13 -05:00

346 lines
11 KiB
Python

"""
Filter Library
Bare bones utilities for filtering posts and comments based on rules.
"""
import json
from pathlib import Path
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod
class filter_lib:
"""Atomic filter utility functions"""
@staticmethod
def load_filterset(path: str) -> Dict:
"""Load filterset JSON from file"""
with open(path, 'r') as f:
return json.load(f)
@staticmethod
def load_data_by_uuid(uuid: str, data_dir: str) -> Optional[Dict]:
"""Load single JSON file by UUID"""
file_path = Path(data_dir) / f"{uuid}.json"
if not file_path.exists():
return None
with open(file_path, 'r') as f:
return json.load(f)
@staticmethod
def merge_moderation(item: Dict, moderation_data: Dict) -> Dict:
"""Merge item with its moderation data by UUID"""
mod_uuid = item.get('moderation_uuid')
if mod_uuid and mod_uuid in moderation_data:
item['moderation'] = moderation_data[mod_uuid]
else:
item['moderation'] = {}
return item
@staticmethod
def get_nested_value(obj: Dict, path: str) -> Any:
"""Get value from nested dict using dot notation (e.g., 'moderation.flags.is_safe')"""
keys = path.split('.')
value = obj
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return None
return value
@staticmethod
def evaluate_rule(value: Any, operator: str, target: Any) -> bool:
"""Evaluate single rule: value operator target"""
if value is None:
return False
if operator == 'equals':
return value == target
elif operator == 'not_equals':
return value != target
elif operator == 'in':
return value in target
elif operator == 'not_in':
return value not in target
elif operator == 'min':
return value >= target
elif operator == 'max':
return value <= target
elif operator == 'after':
return value > target
elif operator == 'before':
return value < target
elif operator == 'contains':
return target in value
elif operator == 'excludes':
if isinstance(value, list):
return not any(item in target for item in value)
return target not in value
elif operator == 'includes':
if isinstance(value, list):
return target in value
return False
elif operator == 'includes_any':
# Special case for topic matching
if isinstance(value, list) and isinstance(target, list):
for topic_item in value:
for rule in target:
if (topic_item.get('topic') == rule.get('topic') and
topic_item.get('confidence', 0) >= rule.get('confidence_min', 0)):
return True
return False
elif operator == 'min_length':
return len(str(value)) >= target
elif operator == 'max_length':
return len(str(value)) <= target
else:
return False
@staticmethod
def apply_rules(item: Dict, rules: Dict) -> bool:
"""
Apply multiple rules to item, return True if all pass (AND logic).
Rules format: {"field.path": {"operator": value}}
"""
if not rules:
return True # Empty rules = pass all
for field_path, rule_def in rules.items():
value = filter_lib.get_nested_value(item, field_path)
# Support multiple operators per field
for operator, target in rule_def.items():
if not filter_lib.evaluate_rule(value, operator, target):
return False
return True
class CommentFilterMode(ABC):
"""Abstract base class for comment filtering modes"""
@staticmethod
@abstractmethod
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
"""Filter comments based on rules and moderation data. Override in subclasses."""
pass
class TreePruningMode(CommentFilterMode):
"""
Tree Pruning Filter Mode (Default)
Fruit of the poisonous tree: if parent fails moderation, remove all children.
"""
@staticmethod
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
"""
Filter comments using tree pruning.
Build tree structure, evaluate from root down, prune toxic branches.
"""
if not comments:
return []
# Merge moderation data into comments
for comment in comments:
filter_lib.merge_moderation(comment, moderation_data)
# Build tree structure
tree = TreePruningMode._build_tree(comments)
# Prune tree based on rules
pruned = TreePruningMode._prune_tree(tree, rules)
# Flatten back to list
return TreePruningMode._flatten_tree(pruned)
@staticmethod
def _build_tree(comments: List[Dict]) -> List[Dict]:
"""Build nested tree from flat comment list"""
# Create lookup dict
comment_map = {c['uuid']: {**c, 'children': []} for c in comments}
# Build tree
roots = []
for comment in comments:
parent_uuid = comment.get('parent_comment_uuid')
if parent_uuid and parent_uuid in comment_map:
comment_map[parent_uuid]['children'].append(comment_map[comment['uuid']])
else:
roots.append(comment_map[comment['uuid']])
return roots
@staticmethod
def _prune_tree(tree: List[Dict], rules: Dict) -> List[Dict]:
"""
Recursively prune tree.
If node fails rules, remove it and all children.
"""
pruned = []
for node in tree:
# Check if this node passes rules
if filter_lib.apply_rules(node, rules):
# Node passes, recursively check children
if node.get('children'):
node['children'] = TreePruningMode._prune_tree(node['children'], rules)
pruned.append(node)
# If node fails, it and all children are discarded (tree pruning)
return pruned
@staticmethod
def _flatten_tree(tree: List[Dict]) -> List[Dict]:
"""Flatten tree back to list"""
flat = []
def traverse(nodes):
for node in nodes:
children = node.pop('children', [])
flat.append(node)
if children:
traverse(children)
traverse(tree)
return flat
class IndividualFilterMode(CommentFilterMode):
"""
Individual Filter Mode
Each comment evaluated independently, no tree pruning.
"""
@staticmethod
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
"""Filter comments individually"""
filtered = []
for comment in comments:
# Merge moderation
filter_lib.merge_moderation(comment, moderation_data)
# Apply rules
if filter_lib.apply_rules(comment, rules):
filtered.append(comment)
return filtered
class ScoreBasedFilterMode(CommentFilterMode):
"""
Score-Based Filter Mode
Filter comments based on score thresholds, keeping high-quality content.
"""
@staticmethod
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
"""Filter comments based on score and rules"""
filtered = []
for comment in comments:
# Merge moderation
filter_lib.merge_moderation(comment, moderation_data)
# Apply basic rules first
if not filter_lib.apply_rules(comment, rules):
continue
# Additional score-based filtering
score = comment.get('score', 0)
min_score = rules.get('score', {}).get('min', -1000) # Default very low threshold
if score >= min_score:
filtered.append(comment)
return filtered
class TimeBoundFilterMode(CommentFilterMode):
"""
Time-Bound Filter Mode
Filter comments within specific time ranges.
"""
@staticmethod
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
"""Filter comments within time bounds"""
from datetime import datetime
filtered = []
for comment in comments:
# Merge moderation
filter_lib.merge_moderation(comment, moderation_data)
# Apply basic rules first
if not filter_lib.apply_rules(comment, rules):
continue
# Time-based filtering
timestamp = comment.get('timestamp')
if timestamp:
try:
comment_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
time_rules = rules.get('timestamp', {})
after = time_rules.get('after')
before = time_rules.get('before')
if after:
after_time = datetime.fromisoformat(after.replace('Z', '+00:00'))
if comment_time <= after_time:
continue
if before:
before_time = datetime.fromisoformat(before.replace('Z', '+00:00'))
if comment_time >= before_time:
continue
filtered.append(comment)
except (ValueError, TypeError):
# Skip malformed timestamps
continue
else:
# No timestamp, include if no time rules
if 'timestamp' not in rules:
filtered.append(comment)
return filtered
class ContentLengthFilterMode(CommentFilterMode):
"""
Content Length Filter Mode
Filter comments based on content length criteria.
"""
@staticmethod
def filter(comments: List[Dict], rules: Dict, moderation_data: Dict) -> List[Dict]:
"""Filter comments based on content length"""
filtered = []
for comment in comments:
# Merge moderation
filter_lib.merge_moderation(comment, moderation_data)
# Apply basic rules first
if not filter_lib.apply_rules(comment, rules):
continue
# Content length filtering
content = comment.get('content', '')
content_length = len(content)
length_rules = rules.get('content_length', {})
min_length = length_rules.get('min', 0)
max_length = length_rules.get('max', float('inf'))
if min_length <= content_length <= max_length:
filtered.append(comment)
return filtered