Initial commit: BalanceBoard - Reddit-style content aggregator
- Flask-based web application with PostgreSQL - User authentication and session management - Content moderation and filtering - Docker deployment with docker-compose - Admin interface for content management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
390
data_collection.py
Normal file
390
data_collection.py
Normal file
@@ -0,0 +1,390 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Data Collection Script
|
||||
Collects posts and comments from multiple platforms with UUID-based storage.
|
||||
Functional approach - no classes, just functions.
|
||||
"""
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Tuple
|
||||
from data_collection_lib import data_methods
|
||||
|
||||
|
||||
# ===== STORAGE FUNCTIONS =====
|
||||
|
||||
def ensure_directories(storage_dir: str) -> Dict[str, Path]:
|
||||
"""Create and return directory paths"""
|
||||
base = Path(storage_dir)
|
||||
|
||||
dirs = {
|
||||
'posts': base / 'posts',
|
||||
'comments': base / 'comments',
|
||||
'moderation': base / 'moderation',
|
||||
'base': base
|
||||
}
|
||||
|
||||
for path in dirs.values():
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
return dirs
|
||||
|
||||
|
||||
def load_index(storage_dir: str) -> Dict:
|
||||
"""Load post index from disk"""
|
||||
index_file = Path(storage_dir) / 'post_index.json'
|
||||
|
||||
if index_file.exists():
|
||||
with open(index_file, 'r') as f:
|
||||
index = json.load(f)
|
||||
print(f"Loaded index with {len(index)} posts")
|
||||
return index
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def save_index(index: Dict, storage_dir: str):
|
||||
"""Save post index to disk"""
|
||||
index_file = Path(storage_dir) / 'post_index.json'
|
||||
with open(index_file, 'w') as f:
|
||||
json.dump(index, f, indent=2)
|
||||
|
||||
|
||||
def load_state(storage_dir: str) -> Dict:
|
||||
"""Load collection state from disk"""
|
||||
state_file = Path(storage_dir) / 'collection_state.json'
|
||||
|
||||
if state_file.exists():
|
||||
with open(state_file, 'r') as f:
|
||||
state = json.load(f)
|
||||
print(f"Loaded collection state: {state.get('last_run', 'never')}")
|
||||
return state
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def save_state(state: Dict, storage_dir: str):
|
||||
"""Save collection state to disk"""
|
||||
state_file = Path(storage_dir) / 'collection_state.json'
|
||||
with open(state_file, 'w') as f:
|
||||
json.dump(state, f, indent=2)
|
||||
|
||||
|
||||
def generate_uuid() -> str:
|
||||
"""Generate a new UUID"""
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
# ===== MODERATION FUNCTIONS =====
|
||||
|
||||
def create_moderation_stub(target_id: str, target_type: str, dirs: Dict) -> str:
|
||||
"""Create moderation stub file and return UUID"""
|
||||
mod_uuid = generate_uuid()
|
||||
|
||||
moderation_data = {
|
||||
"target_id": target_id,
|
||||
"target_type": target_type,
|
||||
"analyzed_at": int(datetime.now().timestamp()),
|
||||
"model_version": "stub-1.0",
|
||||
"flags": {
|
||||
"requires_review": False,
|
||||
"is_blocked": False,
|
||||
"is_flagged": False,
|
||||
"is_safe": True
|
||||
}
|
||||
}
|
||||
|
||||
mod_file = dirs['moderation'] / f"{mod_uuid}.json"
|
||||
with open(mod_file, 'w') as f:
|
||||
json.dump(moderation_data, f, indent=2)
|
||||
|
||||
return mod_uuid
|
||||
|
||||
|
||||
# ===== POST FUNCTIONS =====
|
||||
|
||||
def save_post(post: Dict, platform: str, index: Dict, dirs: Dict) -> str:
|
||||
"""Save post to UUID-based file, return UUID"""
|
||||
post_id = f"{platform}_{post['id']}"
|
||||
|
||||
# Check if already exists
|
||||
if post_id in index:
|
||||
return index[post_id]
|
||||
|
||||
# Generate UUID and save
|
||||
post_uuid = generate_uuid()
|
||||
post['uuid'] = post_uuid
|
||||
post['moderation_uuid'] = create_moderation_stub(post_id, 'post', dirs)
|
||||
|
||||
post_file = dirs['posts'] / f"{post_uuid}.json"
|
||||
with open(post_file, 'w') as f:
|
||||
json.dump(post, f, indent=2)
|
||||
|
||||
# Update index
|
||||
index[post_id] = post_uuid
|
||||
|
||||
return post_uuid
|
||||
|
||||
|
||||
# ===== COMMENT FUNCTIONS =====
|
||||
|
||||
def save_comment(comment: Dict, post_uuid: str, platform: str, dirs: Dict) -> str:
|
||||
"""Save comment to UUID-based file, return UUID"""
|
||||
comment_uuid = generate_uuid()
|
||||
|
||||
comment['uuid'] = comment_uuid
|
||||
comment['post_uuid'] = post_uuid
|
||||
comment['platform'] = platform
|
||||
comment['moderation_uuid'] = create_moderation_stub(
|
||||
f"{platform}_comment_{comment['id']}",
|
||||
'comment',
|
||||
dirs
|
||||
)
|
||||
|
||||
comment_file = dirs['comments'] / f"{comment_uuid}.json"
|
||||
with open(comment_file, 'w') as f:
|
||||
json.dump(comment, f, indent=2)
|
||||
|
||||
return comment_uuid
|
||||
|
||||
|
||||
def fetch_and_save_comments(post: Dict, platform: str, dirs: Dict, max_comments: int = 50) -> List[str]:
|
||||
"""Fetch comments for post and save them, return list of UUIDs"""
|
||||
comments = []
|
||||
post_id = post.get('id')
|
||||
|
||||
# Fetch comments based on platform
|
||||
if platform == 'reddit':
|
||||
source = post.get('source', '').replace('r/', '')
|
||||
comments = data_methods.comment_fetchers.fetch_reddit_comments(post_id, source, max_comments)
|
||||
elif platform == 'hackernews':
|
||||
if post_id.startswith('hn_'):
|
||||
story_id = post_id[3:]
|
||||
comments = data_methods.comment_fetchers.fetch_hackernews_comments(story_id, max_comments)
|
||||
|
||||
# Save comments with parent UUID mapping
|
||||
comment_uuid_map = {}
|
||||
comment_uuids = []
|
||||
post_uuid = post.get('uuid')
|
||||
|
||||
for comment in comments:
|
||||
# Map parent ID to UUID
|
||||
parent_id = comment.get('parent_comment_id')
|
||||
if parent_id and parent_id in comment_uuid_map:
|
||||
comment['parent_comment_uuid'] = comment_uuid_map[parent_id]
|
||||
else:
|
||||
comment['parent_comment_uuid'] = None
|
||||
|
||||
# Save comment
|
||||
comment_uuid = save_comment(comment, post_uuid, platform, dirs)
|
||||
comment_uuid_map[comment['id']] = comment_uuid
|
||||
comment_uuids.append(comment_uuid)
|
||||
|
||||
return comment_uuids
|
||||
|
||||
|
||||
# ===== COLLECTION FUNCTIONS =====
|
||||
|
||||
def collect_platform(platform: str, community: str, start_date: str, end_date: str,
|
||||
max_posts: int, fetch_comments: bool, index: Dict, dirs: Dict) -> int:
|
||||
"""Collect posts and comments from a platform, return count of new posts"""
|
||||
print(f"\nCollecting from {platform}" + (f"/{community}" if community else ""))
|
||||
|
||||
try:
|
||||
# Fetch posts
|
||||
new_posts = data_methods.getData(platform, start_date, end_date, community, max_posts)
|
||||
|
||||
if not new_posts:
|
||||
print(f" No posts retrieved")
|
||||
return 0
|
||||
|
||||
print(f" Retrieved {len(new_posts)} posts")
|
||||
|
||||
# Process each post
|
||||
added_count = 0
|
||||
for post in new_posts:
|
||||
post_id = f"{platform}_{post['id']}"
|
||||
|
||||
# Skip if already collected
|
||||
if post_id in index:
|
||||
continue
|
||||
|
||||
# Save post
|
||||
post_uuid = save_post(post, platform, index, dirs)
|
||||
added_count += 1
|
||||
|
||||
# Fetch and save comments
|
||||
if fetch_comments:
|
||||
comment_uuids = fetch_and_save_comments(post, platform, dirs)
|
||||
if comment_uuids:
|
||||
print(f" Post {post['id']}: saved {len(comment_uuids)} comments")
|
||||
|
||||
if added_count > 0:
|
||||
print(f" Added {added_count} new posts")
|
||||
|
||||
return added_count
|
||||
|
||||
except Exception as e:
|
||||
print(f" Error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return 0
|
||||
|
||||
|
||||
def calculate_date_range(days_back: int, state: Dict) -> Tuple[str, str]:
|
||||
"""Calculate start and end dates for collection, considering resume"""
|
||||
end_date = datetime.now()
|
||||
start_date = end_date - timedelta(days=days_back)
|
||||
|
||||
# Resume from last run if recent
|
||||
if state.get('last_run'):
|
||||
last_run = datetime.fromisoformat(state['last_run'])
|
||||
if (end_date - last_run).total_seconds() < 3600: # Less than 1 hour ago
|
||||
print(f"Last run was {last_run.isoformat()}, resuming from that point")
|
||||
start_date = last_run
|
||||
|
||||
return start_date.isoformat(), end_date.isoformat()
|
||||
|
||||
|
||||
def collect_batch(sources: List[Dict], storage_dir: str, days_back: int = 1, fetch_comments: bool = True):
|
||||
"""Main collection function - orchestrates everything"""
|
||||
|
||||
# Setup
|
||||
dirs = ensure_directories(storage_dir)
|
||||
index = load_index(storage_dir)
|
||||
state = load_state(storage_dir)
|
||||
|
||||
# Calculate date range
|
||||
start_iso, end_iso = calculate_date_range(days_back, state)
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Collection Period: {start_iso} to {end_iso}")
|
||||
print(f"Fetch comments: {fetch_comments}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
# Collect from each source
|
||||
total_new = 0
|
||||
for source in sources:
|
||||
platform = source['platform']
|
||||
community = source.get('community', '')
|
||||
max_posts = source.get('max_posts', 100)
|
||||
|
||||
count = collect_platform(
|
||||
platform, community, start_iso, end_iso,
|
||||
max_posts, fetch_comments, index, dirs
|
||||
)
|
||||
total_new += count
|
||||
|
||||
# Update and save state
|
||||
state['last_run'] = end_iso
|
||||
state['total_posts'] = len(index)
|
||||
state['last_batch_count'] = total_new
|
||||
|
||||
save_index(index, storage_dir)
|
||||
save_state(state, storage_dir)
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Collection Complete")
|
||||
print(f" New posts this run: {total_new}")
|
||||
print(f" Total posts in stash: {len(index)}")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
|
||||
def get_stats(storage_dir: str) -> Dict:
|
||||
"""Get collection statistics"""
|
||||
dirs = ensure_directories(storage_dir)
|
||||
index = load_index(storage_dir)
|
||||
state = load_state(storage_dir)
|
||||
|
||||
post_count = len(list(dirs['posts'].glob('*.json')))
|
||||
comment_count = len(list(dirs['comments'].glob('*.json')))
|
||||
moderation_count = len(list(dirs['moderation'].glob('*.json')))
|
||||
|
||||
return {
|
||||
'total_posts': post_count,
|
||||
'total_comments': comment_count,
|
||||
'total_moderation_records': moderation_count,
|
||||
'index_entries': len(index),
|
||||
'last_run': state.get('last_run', 'never'),
|
||||
'storage_dir': storage_dir
|
||||
}
|
||||
|
||||
|
||||
def print_stats(storage_dir: str):
|
||||
"""Print collection statistics"""
|
||||
stats = get_stats(storage_dir)
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Collection Statistics")
|
||||
print(f"{'='*60}")
|
||||
print(f"Total posts: {stats['total_posts']}")
|
||||
print(f"Total comments: {stats['total_comments']}")
|
||||
print(f"Total moderation records: {stats['total_moderation_records']}")
|
||||
print(f"Index entries: {stats['index_entries']}")
|
||||
print(f"Last run: {stats['last_run']}")
|
||||
print(f"Storage: {stats['storage_dir']}")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
|
||||
# ===== MAIN ENTRY POINT =====
|
||||
|
||||
def load_platform_config(config_file: str = "./platform_config.json") -> Dict:
|
||||
"""Load platform configuration from JSON file"""
|
||||
try:
|
||||
with open(config_file, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
print(f"Error loading platform config: {e}")
|
||||
# Return minimal fallback config
|
||||
return {
|
||||
"collection_targets": [
|
||||
{'platform': 'reddit', 'community': 'python', 'max_posts': 50, 'priority': 'high'},
|
||||
{'platform': 'reddit', 'community': 'programming', 'max_posts': 50, 'priority': 'high'},
|
||||
{'platform': 'hackernews', 'community': 'front_page', 'max_posts': 50, 'priority': 'high'},
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def get_collection_sources(config: Dict, priority_filter: str = None) -> List[Dict]:
|
||||
"""Extract collection sources from platform config, optionally filtered by priority"""
|
||||
sources = []
|
||||
|
||||
for target in config.get('collection_targets', []):
|
||||
# Apply priority filter if specified
|
||||
if priority_filter and target.get('priority') != priority_filter:
|
||||
continue
|
||||
|
||||
sources.append({
|
||||
'platform': target['platform'],
|
||||
'community': target['community'],
|
||||
'max_posts': target['max_posts']
|
||||
})
|
||||
|
||||
return sources
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point"""
|
||||
storage_dir = "./data"
|
||||
|
||||
# Load platform configuration
|
||||
platform_config = load_platform_config()
|
||||
|
||||
# Get collection sources (all priorities for comprehensive collection)
|
||||
sources = get_collection_sources(platform_config)
|
||||
|
||||
print(f"Loaded {len(sources)} collection targets from platform configuration")
|
||||
for source in sources:
|
||||
print(f" - {source['platform']}/{source['community']}: {source['max_posts']} posts")
|
||||
|
||||
# Collect posts and comments
|
||||
collect_batch(sources, storage_dir, days_back=1, fetch_comments=True)
|
||||
|
||||
# Print statistics
|
||||
print_stats(storage_dir)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user