Files
balanceboard/polling_service.py
chelsea eead6033e2 Fix polling to use 24-hour window instead of resume feature
- Polling now always collects posts from last 24 hours
- Removes resume feature that created too-narrow time windows
- Fixes issue where polls returned 0 posts due to minutes-wide ranges

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 18:23:43 -05:00

221 lines
7.2 KiB
Python

"""
Polling Service
Background service for collecting data from configured sources.
"""
import logging
import traceback
from datetime import datetime
from pathlib import Path
from typing import Dict, List
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from database import db
from models import PollSource, PollLog
from data_collection import collect_platform, get_collection_sources, load_platform_config
logger = logging.getLogger(__name__)
class PollingService:
"""Background polling service using APScheduler"""
def __init__(self, app=None):
self.scheduler = BackgroundScheduler()
self.app = app
self.storage_dir = 'data'
def init_app(self, app):
"""Initialize with Flask app"""
self.app = app
self.storage_dir = app.config.get('POLL_STORAGE_DIR', 'data')
def start(self):
"""Start the scheduler"""
if not self.scheduler.running:
self.scheduler.start()
logger.info("Polling scheduler started")
# Schedule the poll checker to run every minute
self.scheduler.add_job(
func=self._check_and_poll,
trigger=IntervalTrigger(minutes=1),
id='poll_checker',
name='Check and poll sources',
replace_existing=True
)
logger.info("Poll checker job scheduled")
def stop(self):
"""Stop the scheduler"""
if self.scheduler.running:
self.scheduler.shutdown()
logger.info("Polling scheduler stopped")
def _check_and_poll(self):
"""Check which sources need polling and poll them"""
if not self.app:
logger.error("No app context available")
return
with self.app.app_context():
try:
# Get all enabled sources
sources = PollSource.query.filter_by(enabled=True).all()
for source in sources:
# Check if source needs polling
if self._should_poll(source):
self._poll_source(source)
except Exception as e:
logger.error(f"Error in poll checker: {e}")
logger.error(traceback.format_exc())
def _should_poll(self, source: PollSource) -> bool:
"""Determine if a source should be polled now"""
if not source.last_poll_time:
# Never polled, should poll
return True
# Calculate time since last poll
time_since_poll = datetime.utcnow() - source.last_poll_time
minutes_since_poll = time_since_poll.total_seconds() / 60
# Poll if interval has elapsed
return minutes_since_poll >= source.poll_interval_minutes
def _poll_source(self, source: PollSource):
"""Poll a single source"""
logger.info(f"Polling {source.platform}:{source.source_id}")
# Create poll log
poll_log = PollLog(
source_id=source.id,
started_at=datetime.utcnow(),
status='running'
)
db.session.add(poll_log)
db.session.commit()
try:
# Perform the actual data collection
result = self._collect_data(source)
# Update poll log
poll_log.completed_at = datetime.utcnow()
poll_log.status = 'success'
poll_log.posts_found = result.get('posts_found', 0)
poll_log.posts_new = result.get('posts_new', 0)
poll_log.posts_updated = result.get('posts_updated', 0)
# Update source
source.last_poll_time = datetime.utcnow()
source.last_poll_status = 'success'
source.last_poll_error = None
source.posts_collected += result.get('posts_new', 0)
db.session.commit()
logger.info(f"Polling completed for {source.platform}:{source.source_id} - "
f"{result.get('posts_new', 0)} new posts")
except Exception as e:
error_msg = str(e)
error_trace = traceback.format_exc()
logger.error(f"Error polling {source.platform}:{source.source_id}: {error_msg}")
logger.error(error_trace)
# Update poll log
poll_log.completed_at = datetime.utcnow()
poll_log.status = 'error'
poll_log.error_message = f"{error_msg}\n\n{error_trace}"
# Update source
source.last_poll_time = datetime.utcnow()
source.last_poll_status = 'error'
source.last_poll_error = error_msg
db.session.commit()
def _collect_data(self, source: PollSource) -> Dict:
"""
Collect data from a source.
Wraps the existing data_collection.py functionality.
"""
from data_collection import ensure_directories, load_index, save_index, load_state, save_state
from datetime import datetime, timedelta
# Setup directories and load state
dirs = ensure_directories(self.storage_dir)
index = load_index(self.storage_dir)
state = load_state(self.storage_dir)
# Calculate date range - always use last 24 hours for polling
# Don't use the resume feature as it can create too narrow windows
end_date = datetime.now()
start_date = end_date - timedelta(hours=24)
start_iso = start_date.isoformat()
end_iso = end_date.isoformat()
try:
# Call the existing collect_platform function
posts_collected = collect_platform(
platform=source.platform,
community=source.source_id,
start_date=start_iso,
end_date=end_iso,
max_posts=100, # Default limit
fetch_comments=True,
index=index,
dirs=dirs
)
# Save updated index and state
save_index(index, self.storage_dir)
state['last_run'] = end_iso
save_state(state, self.storage_dir)
return {
'posts_found': posts_collected,
'posts_new': posts_collected,
'posts_updated': 0
}
except Exception as e:
logger.error(f"Error in _collect_data: {e}")
return {
'posts_found': 0,
'posts_new': 0,
'posts_updated': 0
}
def poll_now(self, source_id: str):
"""Manually trigger polling for a specific source"""
with self.app.app_context():
source = PollSource.query.get(source_id)
if source:
self._poll_source(source)
return True
return False
def get_status(self) -> Dict:
"""Get scheduler status"""
return {
'running': self.scheduler.running,
'jobs': [
{
'id': job.id,
'name': job.name,
'next_run': job.next_run_time.isoformat() if job.next_run_time else None
}
for job in self.scheduler.get_jobs()
]
}
# Global polling service instance
polling_service = PollingService()