Files
balanceboard/data_collection_lib.py
chelsea 47cca9d45e Add RSS feed support to BalanceBoard
- Implement RSS/Atom feed parser using feedparser library
- Add RSS platform configuration with sample feeds (HN RSS, Lobsters RSS, Reddit RSS)
- Support both RSS 2.0 and Atom formats with automatic detection
- Extract and normalize: title, author, link, content, tags, timestamps
- HTML entity unescaping and tag stripping for clean content
- Fallback handling for missing fields
- Users can add any RSS feed URL as a collection source

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 17:20:52 -05:00

745 lines
32 KiB
Python

import requests
import json
import datetime as dt
import time
from collections import defaultdict, deque
class RateLimiter:
"""
Simple rate limiter to prevent excessive API calls.
Tracks requests per domain and enforces delays.
"""
def __init__(self):
self.request_times = defaultdict(deque) # domain -> deque of timestamps
self.domain_limits = {
'reddit.com': {'requests': 60, 'window': 60}, # 60 requests per minute
'api.stackexchange.com': {'requests': 300, 'window': 86400}, # 300 per day
'hacker-news.firebaseio.com': {'requests': 300, 'window': 60}, # 300 per minute
'lobste.rs': {'requests': 30, 'window': 60}, # 30 per minute
'default': {'requests': 60, 'window': 60} # Default rate limit
}
def wait_if_needed(self, url: str):
"""
Check rate limit and wait if necessary before making request.
Args:
url: The URL being requested
"""
from urllib.parse import urlparse
domain = urlparse(url).netloc
current_time = time.time()
# Get rate limit for this domain
limit_config = self.domain_limits.get(domain, self.domain_limits['default'])
max_requests = limit_config['requests']
time_window = limit_config['window']
# Get request times for this domain
times = self.request_times[domain]
# Remove requests outside the time window
cutoff_time = current_time - time_window
while times and times[0] < cutoff_time:
times.popleft()
# Check if we're at the rate limit
if len(times) >= max_requests:
# Calculate how long to wait
oldest_request = times[0]
wait_time = time_window - (current_time - oldest_request)
if wait_time > 0:
import logging
logger = logging.getLogger(__name__)
logger.info(f"Rate limit reached for {domain}. Waiting {wait_time:.1f}s")
time.sleep(wait_time)
# Record this request
times.append(current_time)
# Global rate limiter instance
_rate_limiter = RateLimiter()
#a collection of static methods to grab reddit and reddit like data from various sources
class data_methods():
@staticmethod
def getData(platform, start_date, end_date, community, max_posts):
if platform == "reddit":
return data_methods.fetchers.getRedditData(start_date, end_date, community, max_posts)
elif platform == "pushshift":
return data_methods.fetchers.getPushshiftData(start_date, end_date, community, max_posts)
elif platform == "hackernews":
return data_methods.fetchers.getHackerNewsData(start_date, end_date, community, max_posts)
elif platform == "lobsters":
return data_methods.fetchers.getLobstersData(start_date, end_date, community, max_posts)
elif platform == "stackexchange":
return data_methods.fetchers.getStackExchangeData(start_date, end_date, community, max_posts)
elif platform == "rss":
return data_methods.fetchers.getRSSData(start_date, end_date, community, max_posts)
else:
print("dataGrab.getData: platform not recognized")
return None
# ===== ATOMIC UTILITY FUNCTIONS =====
class utils():
"""Generic utility functions used across all fetchers"""
@staticmethod
def http_get_json(url, headers=None, params=None, timeout=30, max_retries=3):
"""
Generic HTTP GET request that returns JSON with comprehensive error handling.
Args:
url: Target URL
headers: HTTP headers
params: Query parameters
timeout: Request timeout in seconds
max_retries: Maximum number of retry attempts
Returns:
JSON response data
Raises:
requests.RequestException: On persistent failure after retries
"""
import time
import logging
logger = logging.getLogger(__name__)
for attempt in range(max_retries + 1):
try:
# Add retry delay for subsequent attempts
if attempt > 0:
delay = min(2 ** attempt, 30) # Exponential backoff, max 30s
logger.info(f"Retrying request to {url} in {delay}s (attempt {attempt + 1}/{max_retries + 1})")
time.sleep(delay)
# Apply rate limiting before making the request
_rate_limiter.wait_if_needed(url)
response = requests.get(url, headers=headers, params=params, timeout=timeout)
# Handle different HTTP status codes
if response.status_code == 429: # Rate limited
retry_after = int(response.headers.get('Retry-After', 60))
if attempt < max_retries:
logger.warning(f"Rate limited. Waiting {retry_after}s before retry")
time.sleep(retry_after)
continue
response.raise_for_status()
# Validate JSON response
try:
json_data = response.json()
return json_data
except ValueError as e:
logger.error(f"Invalid JSON response from {url}: {e}")
if attempt < max_retries:
continue
raise requests.RequestException(f"Invalid JSON response: {e}")
except requests.exceptions.Timeout:
logger.warning(f"Request timeout for {url} (attempt {attempt + 1})")
if attempt == max_retries:
raise
except requests.exceptions.ConnectionError:
logger.warning(f"Connection error for {url} (attempt {attempt + 1})")
if attempt == max_retries:
raise
except requests.exceptions.HTTPError as e:
# Don't retry on client errors (4xx) except rate limiting
if 400 <= e.response.status_code < 500 and e.response.status_code != 429:
logger.error(f"Client error {e.response.status_code} for {url}: {e}")
raise
logger.warning(f"HTTP error {e.response.status_code} for {url} (attempt {attempt + 1})")
if attempt == max_retries:
raise
except Exception as e:
logger.error(f"Unexpected error for {url}: {e}")
if attempt == max_retries:
raise
raise requests.RequestException(f"Failed to fetch {url} after {max_retries + 1} attempts")
@staticmethod
def filter_by_date_range(posts, start_date, end_date):
"""Filter posts by timestamp range"""
start_ts = int(dt.datetime.fromisoformat(start_date).timestamp())
end_ts = int(dt.datetime.fromisoformat(end_date).timestamp())
return [p for p in posts if p and start_ts <= p['timestamp'] <= end_ts]
@staticmethod
def convert_iso_to_timestamp(iso_string):
"""Convert ISO format datetime string to Unix timestamp"""
return int(dt.datetime.fromisoformat(iso_string.replace('Z', '+00:00')).timestamp())
# ===== URL AND PARAMETER BUILDERS =====
class builders():
"""Functions to build URLs, headers, and parameters"""
@staticmethod
def build_reddit_url(subreddit):
return f"https://www.reddit.com/r/{subreddit}/new.json"
@staticmethod
def build_reddit_headers():
return {'User-Agent': 'Mozilla/5.0 (compatible; DataCollector/1.0)'}
@staticmethod
def build_reddit_params(limit):
return {'limit': limit}
@staticmethod
def build_reddit_search_url(subreddit, start_date, end_date):
"""Build Reddit search URL for time-based queries"""
return f"https://www.reddit.com/r/{subreddit}/search.json"
@staticmethod
def build_reddit_search_params(limit, start_date, end_date):
"""Build search parameters for Reddit API with time constraints"""
import datetime
# Convert date strings to timestamps for Reddit API
try:
start_ts = int(datetime.datetime.fromisoformat(start_date.replace('Z', '+00:00')).timestamp())
end_ts = int(datetime.datetime.fromisoformat(end_date.replace('Z', '+00:00')).timestamp())
# Use Reddit's search syntax for time-based queries
# Reddit search uses 'after:' and 'before:' with timestamps
query = f"after:{start_ts} before:{end_ts}"
return {
'q': query,
'sort': 'new',
'restrict_sr': 'true', # Restrict to subreddit
'limit': limit,
't': 'all' # Time period: all
}
except (ValueError, TypeError):
# Fallback to simple search without time constraints
return {
'q': '*', # Match all posts
'sort': 'new',
'restrict_sr': 'true',
'limit': limit,
't': 'week' # Default to past week
}
@staticmethod
def build_hackernews_top_stories_url():
return "https://hacker-news.firebaseio.com/v0/topstories.json"
@staticmethod
def build_hackernews_story_url(story_id):
return f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"
@staticmethod
def build_lobsters_url():
return "https://lobste.rs/hottest.json"
@staticmethod
def build_stackexchange_url():
return f"https://api.stackexchange.com/2.3/questions"
@staticmethod
def build_stackexchange_params(site, limit, start_date, end_date):
start_ts = int(dt.datetime.fromisoformat(start_date).timestamp())
end_ts = int(dt.datetime.fromisoformat(end_date).timestamp())
return {
'site': site,
'pagesize': limit,
'fromdate': start_ts,
'todate': end_ts,
'sort': 'votes',
'order': 'desc'
}
@staticmethod
def build_rss_url(feed_url):
"""RSS feeds use the URL directly as provided in config"""
return feed_url
# ===== SCHEMA CONVERTERS =====
class converters():
"""Functions to convert platform-specific data to unified schema"""
@staticmethod
def reddit_to_schema(child):
post = child['data']
return {
'platform': 'reddit',
'id': post.get('id'),
'title': post.get('title'),
'author': post.get('author'),
'timestamp': int(post.get('created_utc', 0)),
'score': post.get('score', 0),
'replies': post.get('num_comments', 0),
'url': post.get('url'),
'content': post.get('selftext', ''),
'source': post.get('subreddit'),
'tags': [post.get('link_flair_text', '')],
'meta': {'is_self': post.get('is_self', False)}
}
@staticmethod
def hackernews_to_schema(raw):
if not raw or raw.get('type') != 'story':
return None
return {
'platform': 'hackernews',
'id': f"hn_{raw.get('id')}",
'title': raw.get('title'),
'author': raw.get('by', 'unknown'),
'timestamp': int(raw.get('time', 0)),
'score': raw.get('score', 0),
'replies': raw.get('descendants', 0),
'url': raw.get('url', f"https://news.ycombinator.com/item?id={raw.get('id')}"),
'content': raw.get('text', ''),
'source': 'hackernews',
'tags': ['hackernews'],
'meta': {}
}
@staticmethod
def lobsters_to_schema(raw):
submitter = raw.get('submitter_user', 'unknown')
author = submitter.get('username', 'unknown') if isinstance(submitter, dict) else submitter
return {
'platform': 'lobsters',
'id': f"lob_{raw.get('short_id')}",
'title': raw.get('title'),
'author': author,
'timestamp': data_methods.utils.convert_iso_to_timestamp(raw.get('created_at')),
'score': raw.get('score', 0),
'replies': raw.get('comment_count', 0),
'url': raw.get('url', raw.get('comments_url')),
'content': raw.get('description', ''),
'source': 'lobsters',
'tags': raw.get('tags', []),
'meta': {}
}
@staticmethod
def stackexchange_to_schema(raw, community):
return {
'platform': 'stackexchange',
'id': f"se_{raw.get('question_id')}",
'title': raw.get('title'),
'author': raw.get('owner', {}).get('display_name', 'unknown'),
'timestamp': int(raw.get('creation_date', 0)),
'score': raw.get('score', 0),
'replies': raw.get('answer_count', 0),
'url': raw.get('link'),
'content': '',
'source': community,
'tags': raw.get('tags', []),
'meta': {'view_count': raw.get('view_count', 0)}
}
@staticmethod
def rss_to_schema(entry, feed_url):
"""
Convert RSS/Atom feed entry to unified schema.
Supports both RSS 2.0 and Atom formats via feedparser.
"""
import hashlib
from html import unescape
# Extract link (RSS uses 'link', Atom may use 'links')
link = entry.get('link', '')
if not link and 'links' in entry and len(entry['links']) > 0:
link = entry['links'][0].get('href', '')
# Generate ID from link or guid
entry_id = entry.get('id', entry.get('guid', link))
if not entry_id:
# Fallback: hash of title + link
entry_id = hashlib.md5(f"{entry.get('title', '')}{link}".encode()).hexdigest()
# Clean up ID to make it URL-safe
safe_id = entry_id.replace('/', '_').replace(':', '_')[:100]
# Extract timestamp
timestamp = 0
if 'published_parsed' in entry and entry['published_parsed']:
import time
timestamp = int(time.mktime(entry['published_parsed']))
elif 'updated_parsed' in entry and entry['updated_parsed']:
import time
timestamp = int(time.mktime(entry['updated_parsed']))
# Extract author
author = 'unknown'
if 'author' in entry:
author = entry['author']
elif 'author_detail' in entry:
author = entry['author_detail'].get('name', 'unknown')
# Extract content (try summary, then description, then content)
content = ''
if 'summary' in entry:
content = unescape(entry['summary'])
elif 'description' in entry:
content = unescape(entry['description'])
elif 'content' in entry and len(entry['content']) > 0:
content = unescape(entry['content'][0].get('value', ''))
# Strip HTML tags for cleaner content
import re
content = re.sub(r'<[^>]+>', '', content)
# Extract tags/categories
tags = []
if 'tags' in entry:
tags = [tag.get('term', '') for tag in entry['tags']]
return {
'platform': 'rss',
'id': f"rss_{safe_id}",
'title': unescape(entry.get('title', 'Untitled')),
'author': author,
'timestamp': timestamp,
'score': 0, # RSS doesn't have scores
'replies': 0, # RSS doesn't track comments
'url': link,
'content': content[:1000], # Limit content length
'source': feed_url,
'tags': tags,
'meta': {
'feed_url': feed_url,
'guid': entry.get('guid', '')
}
}
# ===== COMMENT FETCHERS =====
class comment_fetchers():
"""Functions to fetch comments for posts from various platforms"""
@staticmethod
def fetch_reddit_comments(post_id, subreddit, max_comments=50):
"""
Fetch comments for a Reddit post.
Note: Reddit JSON API has limited comment support without auth.
Returns list of comment dicts with parent relationships.
"""
# Reddit comment API: /r/{subreddit}/comments/{post_id}.json
url = f"https://www.reddit.com/r/{subreddit}/comments/{post_id}.json"
headers = {'User-Agent': 'Mozilla/5.0 (compatible; DataCollector/1.0)'}
try:
raw = data_methods.utils.http_get_json(url, headers=headers)
# Reddit returns [post_data, comments_data]
if len(raw) < 2:
return []
comments_data = raw[1]['data']['children']
comments = []
def extract_comment(comment_obj, parent_id=None, depth=0):
if comment_obj['kind'] != 't1': # t1 = comment
return
data = comment_obj['data']
comments.append({
'id': data.get('id'),
'parent_comment_id': parent_id,
'author': data.get('author', '[deleted]'),
'content': data.get('body', ''),
'timestamp': int(data.get('created_utc', 0)),
'score': data.get('score', 0),
'depth': depth
})
# Process replies
if 'replies' in data and isinstance(data['replies'], dict):
for reply in data['replies']['data']['children']:
extract_comment(reply, data.get('id'), depth + 1)
# Extract all comments
for comment_obj in comments_data:
extract_comment(comment_obj, None, 0)
return comments[:max_comments]
except Exception as e:
print(f"Error fetching Reddit comments: {e}")
return []
@staticmethod
def fetch_hackernews_comments(story_id, max_comments=50):
"""
Fetch comments for a HackerNews story.
HN provides comment IDs in the 'kids' field.
"""
comments = []
def fetch_comment_recursive(comment_id, parent_id=None, depth=0):
if len(comments) >= max_comments:
return
url = f"https://hacker-news.firebaseio.com/v0/item/{comment_id}.json"
try:
raw = data_methods.utils.http_get_json(url)
if not raw or raw.get('deleted') or raw.get('dead'):
return
comments.append({
'id': str(raw.get('id')),
'parent_comment_id': parent_id,
'author': raw.get('by', 'unknown'),
'content': raw.get('text', ''),
'timestamp': int(raw.get('time', 0)),
'score': 0, # HN doesn't provide comment scores via API
'depth': depth
})
# Fetch child comments
if 'kids' in raw:
for kid_id in raw['kids'][:5]: # Limit children
fetch_comment_recursive(kid_id, str(raw.get('id')), depth + 1)
except Exception as e:
print(f"Error fetching HN comment {comment_id}: {e}")
# Start with top-level comment IDs from story
try:
story_url = f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"
story = data_methods.utils.http_get_json(story_url)
if 'kids' in story:
for kid_id in story['kids'][:10]: # Limit top-level comments
fetch_comment_recursive(kid_id, None, 0)
except Exception as e:
print(f"Error fetching HN story for comments: {e}")
return comments
@staticmethod
def fetch_lobsters_comments(story_id):
"""
Lobsters provides comments in the story JSON.
"""
# Lobsters API doesn't easily provide comment trees
# Would need to parse HTML or use authenticated API
return []
@staticmethod
def fetch_stackexchange_comments(question_id, site='stackoverflow'):
"""
Fetch comments for a StackExchange question and its answers.
Uses the public StackExchange API v2.3.
"""
import datetime
comments = []
try:
# First, get question comments
question_comments_url = f"https://api.stackexchange.com/2.3/questions/{question_id}/comments"
params = {
'site': site,
'filter': 'default', # Includes basic comment data
'page': 1,
'pagesize': 100
}
response = data_methods.utils.http_get_json(question_comments_url, params=params)
if response and 'items' in response:
for comment in response['items']:
comments.append({
'uuid': f"se_{site}_{comment['comment_id']}",
'platform': 'stackexchange',
'source': site,
'content': comment.get('body', ''),
'author': comment.get('owner', {}).get('display_name', 'Anonymous'),
'timestamp': datetime.datetime.fromtimestamp(
comment.get('creation_date', 0)
).isoformat() + 'Z',
'score': comment.get('score', 0),
'parent_post_id': str(question_id),
'parent_comment_uuid': None, # Top-level comment
'depth': 0,
'se_comment_id': comment['comment_id'],
'se_post_id': comment.get('post_id'),
'se_post_type': comment.get('post_type', 'question')
})
# Then get answer IDs for this question
answers_url = f"https://api.stackexchange.com/2.3/questions/{question_id}/answers"
answers_params = {
'site': site,
'filter': 'default',
'page': 1,
'pagesize': 50
}
answers_response = data_methods.utils.http_get_json(answers_url, params=answers_params)
if answers_response and 'items' in answers_response:
# Get comments for each answer
for answer in answers_response['items']:
answer_id = answer['answer_id']
answer_comments_url = f"https://api.stackexchange.com/2.3/answers/{answer_id}/comments"
answer_comments_response = data_methods.utils.http_get_json(answer_comments_url, params=params)
if answer_comments_response and 'items' in answer_comments_response:
for comment in answer_comments_response['items']:
comments.append({
'uuid': f"se_{site}_{comment['comment_id']}",
'platform': 'stackexchange',
'source': site,
'content': comment.get('body', ''),
'author': comment.get('owner', {}).get('display_name', 'Anonymous'),
'timestamp': datetime.datetime.fromtimestamp(
comment.get('creation_date', 0)
).isoformat() + 'Z',
'score': comment.get('score', 0),
'parent_post_id': str(answer_id),
'parent_comment_uuid': None, # SE comments are flat
'depth': 0,
'se_comment_id': comment['comment_id'],
'se_post_id': comment.get('post_id'),
'se_post_type': comment.get('post_type', 'answer')
})
return comments[:100] # Limit total comments
except Exception as e:
print(f"Error fetching StackExchange comments for {question_id} on {site}: {e}")
return []
# ===== PLATFORM FETCHERS (ORCHESTRATION) =====
class fetchers():
"""Orchestration functions that compose atomic functions"""
@staticmethod
def getRedditData(start_date, end_date, community, max_posts):
# Build request components
url = data_methods.builders.build_reddit_url(community)
headers = data_methods.builders.build_reddit_headers()
params = data_methods.builders.build_reddit_params(max_posts)
# Fetch and extract
raw = data_methods.utils.http_get_json(url, headers, params)
children = raw['data']['children']
# Convert and filter
posts = [data_methods.converters.reddit_to_schema(c) for c in children]
return data_methods.utils.filter_by_date_range(posts, start_date, end_date)
@staticmethod
def getPushshiftData(start_date, end_date, community, max_posts):
"""
Alternative Reddit data collection using official Reddit API.
Since Pushshift is deprecated, we use Reddit's native search/listing endpoints.
"""
try:
# Use Reddit's native search for historical posts within date range
# Build search URL for the specific subreddit and time range
url = data_methods.builders.build_reddit_search_url(community, start_date, end_date)
headers = data_methods.builders.build_reddit_headers()
params = data_methods.builders.build_reddit_search_params(max_posts, start_date, end_date)
# Fetch data from Reddit search
raw = data_methods.utils.http_get_json(url, headers, params)
if not raw or 'data' not in raw or 'children' not in raw['data']:
return []
children = raw['data']['children']
# Convert and filter by date range
posts = [data_methods.converters.reddit_to_schema(c) for c in children]
return data_methods.utils.filter_by_date_range(posts, start_date, end_date)
except Exception as e:
print(f"Error fetching Reddit search data: {e}")
return []
@staticmethod
def getHackerNewsData(start_date, end_date, community, max_posts):
# Fetch story IDs
ids_url = data_methods.builders.build_hackernews_top_stories_url()
ids = data_methods.utils.http_get_json(ids_url)[:max_posts]
# Fetch individual stories
stories = []
for story_id in ids:
story_url = data_methods.builders.build_hackernews_story_url(story_id)
stories.append(data_methods.utils.http_get_json(story_url))
# Convert and filter
posts = [data_methods.converters.hackernews_to_schema(s) for s in stories]
return data_methods.utils.filter_by_date_range(posts, start_date, end_date)
@staticmethod
def getLobstersData(start_date, end_date, community, max_posts):
# Fetch posts
url = data_methods.builders.build_lobsters_url()
raw = data_methods.utils.http_get_json(url)[:max_posts]
# Convert and filter
posts = [data_methods.converters.lobsters_to_schema(r) for r in raw]
return data_methods.utils.filter_by_date_range(posts, start_date, end_date)
@staticmethod
def getStackExchangeData(start_date, end_date, community, max_posts):
# Build request components
url = data_methods.builders.build_stackexchange_url()
params = data_methods.builders.build_stackexchange_params(community, max_posts, start_date, end_date)
# Fetch and convert
raw = data_methods.utils.http_get_json(url, params=params)
return [data_methods.converters.stackexchange_to_schema(q, community) for q in raw.get('items', [])]
@staticmethod
def getRSSData(start_date, end_date, feed_url, max_posts):
"""
Fetch and parse RSS/Atom feeds.
Requires feedparser library: pip install feedparser
"""
try:
import feedparser
except ImportError:
print("Error: feedparser not installed. Run: pip install feedparser")
return []
try:
# Fetch and parse RSS feed
feed = feedparser.parse(feed_url)
# Check for errors
if hasattr(feed, 'bozo') and feed.bozo:
print(f"Warning: RSS feed may have issues: {feed.get('bozo_exception', 'Unknown error')}")
# Extract entries
entries = feed.get('entries', [])[:max_posts]
if not entries:
print(f"No entries found in RSS feed: {feed_url}")
return []
# Convert entries to unified schema
posts = [data_methods.converters.rss_to_schema(entry, feed_url) for entry in entries]
# Filter by date range
return data_methods.utils.filter_by_date_range(posts, start_date, end_date)
except Exception as e:
print(f"Error fetching RSS feed {feed_url}: {e}")
import traceback
traceback.print_exc()
return []