#!/usr/bin/env python3 """ Data Collection Script Collects posts and comments from multiple platforms with UUID-based storage. Functional approach - no classes, just functions. """ import json import uuid from datetime import datetime, timedelta from pathlib import Path from typing import List, Dict, Tuple from data_collection_lib import data_methods # ===== STORAGE FUNCTIONS ===== def ensure_directories(storage_dir: str) -> Dict[str, Path]: """Create and return directory paths""" base = Path(storage_dir) dirs = { 'posts': base / 'posts', 'comments': base / 'comments', 'moderation': base / 'moderation', 'base': base } for path in dirs.values(): path.mkdir(parents=True, exist_ok=True) return dirs def load_index(storage_dir: str) -> Dict: """Load post index from disk""" index_file = Path(storage_dir) / 'post_index.json' if index_file.exists(): with open(index_file, 'r') as f: index = json.load(f) print(f"Loaded index with {len(index)} posts") return index return {} def save_index(index: Dict, storage_dir: str): """Save post index to disk""" index_file = Path(storage_dir) / 'post_index.json' with open(index_file, 'w') as f: json.dump(index, f, indent=2) def load_state(storage_dir: str) -> Dict: """Load collection state from disk""" state_file = Path(storage_dir) / 'collection_state.json' if state_file.exists(): with open(state_file, 'r') as f: state = json.load(f) print(f"Loaded collection state: {state.get('last_run', 'never')}") return state return {} def save_state(state: Dict, storage_dir: str): """Save collection state to disk""" state_file = Path(storage_dir) / 'collection_state.json' with open(state_file, 'w') as f: json.dump(state, f, indent=2) def generate_uuid() -> str: """Generate a new UUID""" return str(uuid.uuid4()) # ===== MODERATION FUNCTIONS ===== def create_moderation_stub(target_id: str, target_type: str, dirs: Dict) -> str: """Create moderation stub file and return UUID""" mod_uuid = generate_uuid() moderation_data = { "target_id": target_id, "target_type": target_type, "analyzed_at": int(datetime.now().timestamp()), "model_version": "stub-1.0", "flags": { "requires_review": False, "is_blocked": False, "is_flagged": False, "is_safe": True } } mod_file = dirs['moderation'] / f"{mod_uuid}.json" with open(mod_file, 'w') as f: json.dump(moderation_data, f, indent=2) return mod_uuid # ===== POST FUNCTIONS ===== def save_post(post: Dict, platform: str, index: Dict, dirs: Dict) -> str: """Save post to UUID-based file, return UUID""" post_id = f"{platform}_{post['id']}" # Check if already exists if post_id in index: return index[post_id] # Generate UUID and save post_uuid = generate_uuid() post['uuid'] = post_uuid post['moderation_uuid'] = create_moderation_stub(post_id, 'post', dirs) post_file = dirs['posts'] / f"{post_uuid}.json" with open(post_file, 'w') as f: json.dump(post, f, indent=2) # Update index index[post_id] = post_uuid return post_uuid # ===== COMMENT FUNCTIONS ===== def save_comment(comment: Dict, post_uuid: str, platform: str, dirs: Dict) -> str: """Save comment to UUID-based file, return UUID""" comment_uuid = generate_uuid() comment['uuid'] = comment_uuid comment['post_uuid'] = post_uuid comment['platform'] = platform comment['moderation_uuid'] = create_moderation_stub( f"{platform}_comment_{comment['id']}", 'comment', dirs ) comment_file = dirs['comments'] / f"{comment_uuid}.json" with open(comment_file, 'w') as f: json.dump(comment, f, indent=2) return comment_uuid def fetch_and_save_comments(post: Dict, platform: str, dirs: Dict, max_comments: int = 50) -> List[str]: """Fetch comments for post and save them, return list of UUIDs""" comments = [] post_id = post.get('id') # Fetch comments based on platform if platform == 'reddit': source = post.get('source', '').replace('r/', '') comments = data_methods.comment_fetchers.fetch_reddit_comments(post_id, source, max_comments) elif platform == 'hackernews': if post_id.startswith('hn_'): story_id = post_id[3:] comments = data_methods.comment_fetchers.fetch_hackernews_comments(story_id, max_comments) # Save comments with parent UUID mapping comment_uuid_map = {} comment_uuids = [] post_uuid = post.get('uuid') for comment in comments: # Map parent ID to UUID parent_id = comment.get('parent_comment_id') if parent_id and parent_id in comment_uuid_map: comment['parent_comment_uuid'] = comment_uuid_map[parent_id] else: comment['parent_comment_uuid'] = None # Save comment comment_uuid = save_comment(comment, post_uuid, platform, dirs) comment_uuid_map[comment['id']] = comment_uuid comment_uuids.append(comment_uuid) return comment_uuids # ===== COLLECTION FUNCTIONS ===== def collect_platform(platform: str, community: str, start_date: str, end_date: str, max_posts: int, fetch_comments: bool, index: Dict, dirs: Dict) -> int: """Collect posts and comments from a platform, return count of new posts""" print(f"\nCollecting from {platform}" + (f"/{community}" if community else "")) try: # Fetch posts new_posts = data_methods.getData(platform, start_date, end_date, community, max_posts) if not new_posts: print(f" No posts retrieved") return 0 print(f" Retrieved {len(new_posts)} posts") # Process each post added_count = 0 for post in new_posts: post_id = f"{platform}_{post['id']}" # Skip if already collected if post_id in index: continue # Save post post_uuid = save_post(post, platform, index, dirs) added_count += 1 # Fetch and save comments if fetch_comments: comment_uuids = fetch_and_save_comments(post, platform, dirs) if comment_uuids: print(f" Post {post['id']}: saved {len(comment_uuids)} comments") if added_count > 0: print(f" Added {added_count} new posts") return added_count except Exception as e: print(f" Error: {e}") import traceback traceback.print_exc() return 0 def calculate_date_range(days_back: int, state: Dict) -> Tuple[str, str]: """Calculate start and end dates for collection, considering resume""" end_date = datetime.now() start_date = end_date - timedelta(days=days_back) # Resume from last run if recent if state.get('last_run'): last_run = datetime.fromisoformat(state['last_run']) if (end_date - last_run).total_seconds() < 3600: # Less than 1 hour ago print(f"Last run was {last_run.isoformat()}, resuming from that point") start_date = last_run return start_date.isoformat(), end_date.isoformat() def collect_batch(sources: List[Dict], storage_dir: str, days_back: int = 1, fetch_comments: bool = True): """Main collection function - orchestrates everything""" # Setup dirs = ensure_directories(storage_dir) index = load_index(storage_dir) state = load_state(storage_dir) # Calculate date range start_iso, end_iso = calculate_date_range(days_back, state) print(f"\n{'='*60}") print(f"Collection Period: {start_iso} to {end_iso}") print(f"Fetch comments: {fetch_comments}") print(f"{'='*60}") # Collect from each source total_new = 0 for source in sources: platform = source['platform'] community = source.get('community', '') max_posts = source.get('max_posts', 100) count = collect_platform( platform, community, start_iso, end_iso, max_posts, fetch_comments, index, dirs ) total_new += count # Update and save state state['last_run'] = end_iso state['total_posts'] = len(index) state['last_batch_count'] = total_new save_index(index, storage_dir) save_state(state, storage_dir) print(f"\n{'='*60}") print(f"Collection Complete") print(f" New posts this run: {total_new}") print(f" Total posts in stash: {len(index)}") print(f"{'='*60}\n") def get_stats(storage_dir: str) -> Dict: """Get collection statistics""" dirs = ensure_directories(storage_dir) index = load_index(storage_dir) state = load_state(storage_dir) post_count = len(list(dirs['posts'].glob('*.json'))) comment_count = len(list(dirs['comments'].glob('*.json'))) moderation_count = len(list(dirs['moderation'].glob('*.json'))) return { 'total_posts': post_count, 'total_comments': comment_count, 'total_moderation_records': moderation_count, 'index_entries': len(index), 'last_run': state.get('last_run', 'never'), 'storage_dir': storage_dir } def print_stats(storage_dir: str): """Print collection statistics""" stats = get_stats(storage_dir) print(f"\n{'='*60}") print(f"Collection Statistics") print(f"{'='*60}") print(f"Total posts: {stats['total_posts']}") print(f"Total comments: {stats['total_comments']}") print(f"Total moderation records: {stats['total_moderation_records']}") print(f"Index entries: {stats['index_entries']}") print(f"Last run: {stats['last_run']}") print(f"Storage: {stats['storage_dir']}") print(f"{'='*60}\n") # ===== MAIN ENTRY POINT ===== def load_platform_config(config_file: str = "./platform_config.json") -> Dict: """Load platform configuration from JSON file""" try: with open(config_file, 'r') as f: return json.load(f) except Exception as e: print(f"Error loading platform config: {e}") # Return minimal fallback config return { "collection_targets": [ {'platform': 'reddit', 'community': 'python', 'max_posts': 50, 'priority': 'high'}, {'platform': 'reddit', 'community': 'programming', 'max_posts': 50, 'priority': 'high'}, {'platform': 'hackernews', 'community': 'front_page', 'max_posts': 50, 'priority': 'high'}, ] } def get_collection_sources(config: Dict, priority_filter: str = None) -> List[Dict]: """Extract collection sources from platform config, optionally filtered by priority""" sources = [] for target in config.get('collection_targets', []): # Apply priority filter if specified if priority_filter and target.get('priority') != priority_filter: continue sources.append({ 'platform': target['platform'], 'community': target['community'], 'max_posts': target['max_posts'] }) return sources def main(): """Main entry point""" storage_dir = "./data" # Load platform configuration platform_config = load_platform_config() # Get collection sources (all priorities for comprehensive collection) sources = get_collection_sources(platform_config) print(f"Loaded {len(sources)} collection targets from platform configuration") for source in sources: print(f" - {source['platform']}/{source['community']}: {source['max_posts']} posts") # Collect posts and comments collect_batch(sources, storage_dir, days_back=1, fetch_comments=True) # Print statistics print_stats(storage_dir) if __name__ == "__main__": main()