import requests import json import datetime as dt import time from collections import defaultdict, deque class RateLimiter: """ Simple rate limiter to prevent excessive API calls. Tracks requests per domain and enforces delays. """ def __init__(self): self.request_times = defaultdict(deque) # domain -> deque of timestamps self.domain_limits = { 'reddit.com': {'requests': 60, 'window': 60}, # 60 requests per minute 'api.stackexchange.com': {'requests': 300, 'window': 86400}, # 300 per day 'hacker-news.firebaseio.com': {'requests': 300, 'window': 60}, # 300 per minute 'lobste.rs': {'requests': 30, 'window': 60}, # 30 per minute 'default': {'requests': 60, 'window': 60} # Default rate limit } def wait_if_needed(self, url: str): """ Check rate limit and wait if necessary before making request. Args: url: The URL being requested """ from urllib.parse import urlparse domain = urlparse(url).netloc current_time = time.time() # Get rate limit for this domain limit_config = self.domain_limits.get(domain, self.domain_limits['default']) max_requests = limit_config['requests'] time_window = limit_config['window'] # Get request times for this domain times = self.request_times[domain] # Remove requests outside the time window cutoff_time = current_time - time_window while times and times[0] < cutoff_time: times.popleft() # Check if we're at the rate limit if len(times) >= max_requests: # Calculate how long to wait oldest_request = times[0] wait_time = time_window - (current_time - oldest_request) if wait_time > 0: import logging logger = logging.getLogger(__name__) logger.info(f"Rate limit reached for {domain}. Waiting {wait_time:.1f}s") time.sleep(wait_time) # Record this request times.append(current_time) # Global rate limiter instance _rate_limiter = RateLimiter() #a collection of static methods to grab reddit and reddit like data from various sources class data_methods(): @staticmethod def getData(platform, start_date, end_date, community, max_posts): if platform == "reddit": return data_methods.fetchers.getRedditData(start_date, end_date, community, max_posts) elif platform == "pushshift": return data_methods.fetchers.getPushshiftData(start_date, end_date, community, max_posts) elif platform == "hackernews": return data_methods.fetchers.getHackerNewsData(start_date, end_date, community, max_posts) elif platform == "lobsters": return data_methods.fetchers.getLobstersData(start_date, end_date, community, max_posts) elif platform == "stackexchange": return data_methods.fetchers.getStackExchangeData(start_date, end_date, community, max_posts) elif platform == "rss": return data_methods.fetchers.getRSSData(start_date, end_date, community, max_posts) else: print("dataGrab.getData: platform not recognized") return None # ===== ATOMIC UTILITY FUNCTIONS ===== class utils(): """Generic utility functions used across all fetchers""" @staticmethod def http_get_json(url, headers=None, params=None, timeout=30, max_retries=3): """ Generic HTTP GET request that returns JSON with comprehensive error handling. Args: url: Target URL headers: HTTP headers params: Query parameters timeout: Request timeout in seconds max_retries: Maximum number of retry attempts Returns: JSON response data Raises: requests.RequestException: On persistent failure after retries """ import time import logging logger = logging.getLogger(__name__) for attempt in range(max_retries + 1): try: # Add retry delay for subsequent attempts if attempt > 0: delay = min(2 ** attempt, 30) # Exponential backoff, max 30s logger.info(f"Retrying request to {url} in {delay}s (attempt {attempt + 1}/{max_retries + 1})") time.sleep(delay) # Apply rate limiting before making the request _rate_limiter.wait_if_needed(url) response = requests.get(url, headers=headers, params=params, timeout=timeout) # Handle different HTTP status codes if response.status_code == 429: # Rate limited retry_after = int(response.headers.get('Retry-After', 60)) if attempt < max_retries: logger.warning(f"Rate limited. Waiting {retry_after}s before retry") time.sleep(retry_after) continue response.raise_for_status() # Validate JSON response try: json_data = response.json() return json_data except ValueError as e: logger.error(f"Invalid JSON response from {url}: {e}") if attempt < max_retries: continue raise requests.RequestException(f"Invalid JSON response: {e}") except requests.exceptions.Timeout: logger.warning(f"Request timeout for {url} (attempt {attempt + 1})") if attempt == max_retries: raise except requests.exceptions.ConnectionError: logger.warning(f"Connection error for {url} (attempt {attempt + 1})") if attempt == max_retries: raise except requests.exceptions.HTTPError as e: # Don't retry on client errors (4xx) except rate limiting if 400 <= e.response.status_code < 500 and e.response.status_code != 429: logger.error(f"Client error {e.response.status_code} for {url}: {e}") raise logger.warning(f"HTTP error {e.response.status_code} for {url} (attempt {attempt + 1})") if attempt == max_retries: raise except Exception as e: logger.error(f"Unexpected error for {url}: {e}") if attempt == max_retries: raise raise requests.RequestException(f"Failed to fetch {url} after {max_retries + 1} attempts") @staticmethod def filter_by_date_range(posts, start_date, end_date): """Filter posts by timestamp range""" start_ts = int(dt.datetime.fromisoformat(start_date).timestamp()) end_ts = int(dt.datetime.fromisoformat(end_date).timestamp()) return [p for p in posts if p and start_ts <= p['timestamp'] <= end_ts] @staticmethod def convert_iso_to_timestamp(iso_string): """Convert ISO format datetime string to Unix timestamp""" return int(dt.datetime.fromisoformat(iso_string.replace('Z', '+00:00')).timestamp()) # ===== URL AND PARAMETER BUILDERS ===== class builders(): """Functions to build URLs, headers, and parameters""" @staticmethod def build_reddit_url(subreddit): return f"https://www.reddit.com/r/{subreddit}/new.json" @staticmethod def build_reddit_headers(): return {'User-Agent': 'Mozilla/5.0 (compatible; DataCollector/1.0)'} @staticmethod def build_reddit_params(limit): return {'limit': limit} @staticmethod def build_reddit_search_url(subreddit, start_date, end_date): """Build Reddit search URL for time-based queries""" return f"https://www.reddit.com/r/{subreddit}/search.json" @staticmethod def build_reddit_search_params(limit, start_date, end_date): """Build search parameters for Reddit API with time constraints""" import datetime # Convert date strings to timestamps for Reddit API try: start_ts = int(datetime.datetime.fromisoformat(start_date.replace('Z', '+00:00')).timestamp()) end_ts = int(datetime.datetime.fromisoformat(end_date.replace('Z', '+00:00')).timestamp()) # Use Reddit's search syntax for time-based queries # Reddit search uses 'after:' and 'before:' with timestamps query = f"after:{start_ts} before:{end_ts}" return { 'q': query, 'sort': 'new', 'restrict_sr': 'true', # Restrict to subreddit 'limit': limit, 't': 'all' # Time period: all } except (ValueError, TypeError): # Fallback to simple search without time constraints return { 'q': '*', # Match all posts 'sort': 'new', 'restrict_sr': 'true', 'limit': limit, 't': 'week' # Default to past week } @staticmethod def build_hackernews_top_stories_url(): return "https://hacker-news.firebaseio.com/v0/topstories.json" @staticmethod def build_hackernews_story_url(story_id): return f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json" @staticmethod def build_lobsters_url(): return "https://lobste.rs/hottest.json" @staticmethod def build_stackexchange_url(): return f"https://api.stackexchange.com/2.3/questions" @staticmethod def build_stackexchange_params(site, limit, start_date, end_date): start_ts = int(dt.datetime.fromisoformat(start_date).timestamp()) end_ts = int(dt.datetime.fromisoformat(end_date).timestamp()) return { 'site': site, 'pagesize': limit, 'fromdate': start_ts, 'todate': end_ts, 'sort': 'votes', 'order': 'desc' } @staticmethod def build_rss_url(feed_url): """RSS feeds use the URL directly as provided in config""" return feed_url # ===== SCHEMA CONVERTERS ===== class converters(): """Functions to convert platform-specific data to unified schema""" @staticmethod def reddit_to_schema(child): post = child['data'] return { 'platform': 'reddit', 'id': post.get('id'), 'title': post.get('title'), 'author': post.get('author'), 'timestamp': int(post.get('created_utc', 0)), 'score': post.get('score', 0), 'replies': post.get('num_comments', 0), 'url': post.get('url'), 'content': post.get('selftext', ''), 'source': post.get('subreddit'), 'tags': [post.get('link_flair_text', '')], 'meta': {'is_self': post.get('is_self', False)} } # In data_methods.converters.hackernews_to_schema() @staticmethod def hackernews_to_schema(raw, community='front_page'): # Add community parameter if not raw or raw.get('type') != 'story': return None return { 'platform': 'hackernews', 'id': f"hn_{raw.get('id')}", 'title': raw.get('title'), 'author': raw.get('by', 'unknown'), 'timestamp': int(raw.get('time', 0)), 'score': raw.get('score', 0), 'replies': raw.get('descendants', 0), 'url': raw.get('url', f"https://news.ycombinator.com/item?id={raw.get('id')}"), 'content': raw.get('text', ''), # ==================================================================== # FIX: Use the community parameter for the source # ==================================================================== 'source': community, # ==================================================================== 'tags': ['hackernews'], 'meta': {} } @staticmethod def lobsters_to_schema(raw): submitter = raw.get('submitter_user', 'unknown') author = submitter.get('username', 'unknown') if isinstance(submitter, dict) else submitter return { 'platform': 'lobsters', 'id': f"lob_{raw.get('short_id')}", 'title': raw.get('title'), 'author': author, 'timestamp': data_methods.utils.convert_iso_to_timestamp(raw.get('created_at')), 'score': raw.get('score', 0), 'replies': raw.get('comment_count', 0), 'url': raw.get('url', raw.get('comments_url')), 'content': raw.get('description', ''), 'source': 'lobsters', 'tags': raw.get('tags', []), 'meta': {} } @staticmethod def stackexchange_to_schema(raw, community): return { 'platform': 'stackexchange', 'id': f"se_{raw.get('question_id')}", 'title': raw.get('title'), 'author': raw.get('owner', {}).get('display_name', 'unknown'), 'timestamp': int(raw.get('creation_date', 0)), 'score': raw.get('score', 0), 'replies': raw.get('answer_count', 0), 'url': raw.get('link'), 'content': '', 'source': community, 'tags': raw.get('tags', []), 'meta': {'view_count': raw.get('view_count', 0)} } @staticmethod def rss_to_schema(entry, feed_url): """ Convert RSS/Atom feed entry to unified schema. Supports both RSS 2.0 and Atom formats via feedparser. """ import hashlib from html import unescape # Extract link (RSS uses 'link', Atom may use 'links') link = entry.get('link', '') if not link and 'links' in entry and len(entry['links']) > 0: link = entry['links'][0].get('href', '') # Generate ID from link or guid entry_id = entry.get('id', entry.get('guid', link)) if not entry_id: # Fallback: hash of title + link entry_id = hashlib.md5(f"{entry.get('title', '')}{link}".encode()).hexdigest() # Clean up ID to make it URL-safe safe_id = entry_id.replace('/', '_').replace(':', '_')[:100] # Extract timestamp timestamp = 0 if 'published_parsed' in entry and entry['published_parsed']: import time timestamp = int(time.mktime(entry['published_parsed'])) elif 'updated_parsed' in entry and entry['updated_parsed']: import time timestamp = int(time.mktime(entry['updated_parsed'])) # Extract author author = 'unknown' if 'author' in entry: author = entry['author'] elif 'author_detail' in entry: author = entry['author_detail'].get('name', 'unknown') # Extract content (try summary, then description, then content) content = '' if 'summary' in entry: content = unescape(entry['summary']) elif 'description' in entry: content = unescape(entry['description']) elif 'content' in entry and len(entry['content']) > 0: content = unescape(entry['content'][0].get('value', '')) # Strip HTML tags for cleaner content import re content = re.sub(r'<[^>]+>', '', content) # Extract tags/categories tags = [] if 'tags' in entry: tags = [tag.get('term', '') for tag in entry['tags']] return { 'platform': 'rss', 'id': f"rss_{safe_id}", 'title': unescape(entry.get('title', 'Untitled')), 'author': author, 'timestamp': timestamp, 'score': 0, # RSS doesn't have scores 'replies': 0, # RSS doesn't track comments 'url': link, 'content': content[:1000], # Limit content length 'source': feed_url, 'tags': tags, 'meta': { 'feed_url': feed_url, 'guid': entry.get('guid', '') } } # ===== COMMENT FETCHERS ===== class comment_fetchers(): """Functions to fetch comments for posts from various platforms""" @staticmethod def fetch_reddit_comments(post_id, subreddit, max_comments=50): """ Fetch comments for a Reddit post. Note: Reddit JSON API has limited comment support without auth. Returns list of comment dicts with parent relationships. """ # Reddit comment API: /r/{subreddit}/comments/{post_id}.json url = f"https://www.reddit.com/r/{subreddit}/comments/{post_id}.json" headers = {'User-Agent': 'Mozilla/5.0 (compatible; DataCollector/1.0)'} try: raw = data_methods.utils.http_get_json(url, headers=headers) # Reddit returns [post_data, comments_data] if len(raw) < 2: return [] comments_data = raw[1]['data']['children'] comments = [] def extract_comment(comment_obj, parent_id=None, depth=0): if comment_obj['kind'] != 't1': # t1 = comment return data = comment_obj['data'] comments.append({ 'id': data.get('id'), 'parent_comment_id': parent_id, 'author': data.get('author', '[deleted]'), 'content': data.get('body', ''), 'timestamp': int(data.get('created_utc', 0)), 'score': data.get('score', 0), 'depth': depth }) # Process replies if 'replies' in data and isinstance(data['replies'], dict): for reply in data['replies']['data']['children']: extract_comment(reply, data.get('id'), depth + 1) # Extract all comments for comment_obj in comments_data: extract_comment(comment_obj, None, 0) return comments[:max_comments] except Exception as e: print(f"Error fetching Reddit comments: {e}") return [] @staticmethod def fetch_hackernews_comments(story_id, max_comments=50): """ Fetch comments for a HackerNews story. HN provides comment IDs in the 'kids' field. """ comments = [] def fetch_comment_recursive(comment_id, parent_id=None, depth=0): if len(comments) >= max_comments: return url = f"https://hacker-news.firebaseio.com/v0/item/{comment_id}.json" try: raw = data_methods.utils.http_get_json(url) if not raw or raw.get('deleted') or raw.get('dead'): return comments.append({ 'id': str(raw.get('id')), 'parent_comment_id': parent_id, 'author': raw.get('by', 'unknown'), 'content': raw.get('text', ''), 'timestamp': int(raw.get('time', 0)), 'score': 0, # HN doesn't provide comment scores via API 'depth': depth }) # Fetch child comments if 'kids' in raw: for kid_id in raw['kids'][:5]: # Limit children fetch_comment_recursive(kid_id, str(raw.get('id')), depth + 1) except Exception as e: print(f"Error fetching HN comment {comment_id}: {e}") # Start with top-level comment IDs from story try: story_url = f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json" story = data_methods.utils.http_get_json(story_url) if 'kids' in story: for kid_id in story['kids'][:10]: # Limit top-level comments fetch_comment_recursive(kid_id, None, 0) except Exception as e: print(f"Error fetching HN story for comments: {e}") return comments @staticmethod def fetch_lobsters_comments(story_id): """ Lobsters provides comments in the story JSON. """ # Lobsters API doesn't easily provide comment trees # Would need to parse HTML or use authenticated API return [] @staticmethod def fetch_stackexchange_comments(question_id, site='stackoverflow'): """ Fetch comments for a StackExchange question and its answers. Uses the public StackExchange API v2.3. """ import datetime comments = [] try: # First, get question comments question_comments_url = f"https://api.stackexchange.com/2.3/questions/{question_id}/comments" params = { 'site': site, 'filter': 'default', # Includes basic comment data 'page': 1, 'pagesize': 100 } response = data_methods.utils.http_get_json(question_comments_url, params=params) if response and 'items' in response: for comment in response['items']: comments.append({ 'uuid': f"se_{site}_{comment['comment_id']}", 'platform': 'stackexchange', 'source': site, 'content': comment.get('body', ''), 'author': comment.get('owner', {}).get('display_name', 'Anonymous'), 'timestamp': datetime.datetime.fromtimestamp( comment.get('creation_date', 0) ).isoformat() + 'Z', 'score': comment.get('score', 0), 'parent_post_id': str(question_id), 'parent_comment_uuid': None, # Top-level comment 'depth': 0, 'se_comment_id': comment['comment_id'], 'se_post_id': comment.get('post_id'), 'se_post_type': comment.get('post_type', 'question') }) # Then get answer IDs for this question answers_url = f"https://api.stackexchange.com/2.3/questions/{question_id}/answers" answers_params = { 'site': site, 'filter': 'default', 'page': 1, 'pagesize': 50 } answers_response = data_methods.utils.http_get_json(answers_url, params=answers_params) if answers_response and 'items' in answers_response: # Get comments for each answer for answer in answers_response['items']: answer_id = answer['answer_id'] answer_comments_url = f"https://api.stackexchange.com/2.3/answers/{answer_id}/comments" answer_comments_response = data_methods.utils.http_get_json(answer_comments_url, params=params) if answer_comments_response and 'items' in answer_comments_response: for comment in answer_comments_response['items']: comments.append({ 'uuid': f"se_{site}_{comment['comment_id']}", 'platform': 'stackexchange', 'source': site, 'content': comment.get('body', ''), 'author': comment.get('owner', {}).get('display_name', 'Anonymous'), 'timestamp': datetime.datetime.fromtimestamp( comment.get('creation_date', 0) ).isoformat() + 'Z', 'score': comment.get('score', 0), 'parent_post_id': str(answer_id), 'parent_comment_uuid': None, # SE comments are flat 'depth': 0, 'se_comment_id': comment['comment_id'], 'se_post_id': comment.get('post_id'), 'se_post_type': comment.get('post_type', 'answer') }) return comments[:100] # Limit total comments except Exception as e: print(f"Error fetching StackExchange comments for {question_id} on {site}: {e}") return [] # ===== PLATFORM FETCHERS (ORCHESTRATION) ===== class fetchers(): """Orchestration functions that compose atomic functions""" @staticmethod def getRedditData(start_date, end_date, community, max_posts): # Build request components url = data_methods.builders.build_reddit_url(community) headers = data_methods.builders.build_reddit_headers() params = data_methods.builders.build_reddit_params(max_posts) # Fetch and extract raw = data_methods.utils.http_get_json(url, headers, params) children = raw['data']['children'] # Convert and filter posts = [data_methods.converters.reddit_to_schema(c) for c in children] return data_methods.utils.filter_by_date_range(posts, start_date, end_date) @staticmethod def getPushshiftData(start_date, end_date, community, max_posts): """ Alternative Reddit data collection using official Reddit API. Since Pushshift is deprecated, we use Reddit's native search/listing endpoints. """ try: # Use Reddit's native search for historical posts within date range # Build search URL for the specific subreddit and time range url = data_methods.builders.build_reddit_search_url(community, start_date, end_date) headers = data_methods.builders.build_reddit_headers() params = data_methods.builders.build_reddit_search_params(max_posts, start_date, end_date) # Fetch data from Reddit search raw = data_methods.utils.http_get_json(url, headers, params) if not raw or 'data' not in raw or 'children' not in raw['data']: return [] children = raw['data']['children'] # Convert and filter by date range posts = [data_methods.converters.reddit_to_schema(c) for c in children] return data_methods.utils.filter_by_date_range(posts, start_date, end_date) except Exception as e: print(f"Error fetching Reddit search data: {e}") return [] @staticmethod def getHackerNewsData(start_date, end_date, community, max_posts): # Fetch story IDs ids_url = data_methods.builders.build_hackernews_top_stories_url() ids = data_methods.utils.http_get_json(ids_url)[:max_posts] # Fetch individual stories stories = [] for story_id in ids: story_url = data_methods.builders.build_hackernews_story_url(story_id) stories.append(data_methods.utils.http_get_json(story_url)) # Convert and filter posts = [data_methods.converters.hackernews_to_schema(s, community) for s in stories] return data_methods.utils.filter_by_date_range(posts, start_date, end_date) @staticmethod def getLobstersData(start_date, end_date, community, max_posts): # Fetch posts url = data_methods.builders.build_lobsters_url() raw = data_methods.utils.http_get_json(url)[:max_posts] # Convert and filter posts = [data_methods.converters.lobsters_to_schema(r) for r in raw] return data_methods.utils.filter_by_date_range(posts, start_date, end_date) @staticmethod def getStackExchangeData(start_date, end_date, community, max_posts): # Build request components url = data_methods.builders.build_stackexchange_url() params = data_methods.builders.build_stackexchange_params(community, max_posts, start_date, end_date) # Fetch and convert raw = data_methods.utils.http_get_json(url, params=params) return [data_methods.converters.stackexchange_to_schema(q, community) for q in raw.get('items', [])] @staticmethod def getRSSData(start_date, end_date, feed_url, max_posts): """ Fetch and parse RSS/Atom feeds. Requires feedparser library: pip install feedparser """ try: import feedparser except ImportError: print("Error: feedparser not installed. Run: pip install feedparser") return [] try: # Fetch and parse RSS feed feed = feedparser.parse(feed_url) # Check for errors if hasattr(feed, 'bozo') and feed.bozo: print(f"Warning: RSS feed may have issues: {feed.get('bozo_exception', 'Unknown error')}") # Extract entries entries = feed.get('entries', [])[:max_posts] if not entries: print(f"No entries found in RSS feed: {feed_url}") return [] # Convert entries to unified schema posts = [data_methods.converters.rss_to_schema(entry, feed_url) for entry in entries] # Filter by date range return data_methods.utils.filter_by_date_range(posts, start_date, end_date) except Exception as e: print(f"Error fetching RSS feed {feed_url}: {e}") import traceback traceback.print_exc() return []