""" Polling Service Background service for collecting data from configured sources. """ import logging import traceback from datetime import datetime from pathlib import Path from typing import Dict, List from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger from database import db from models import PollSource, PollLog from data_collection import collect_platform, get_collection_sources, load_platform_config logger = logging.getLogger(__name__) class PollingService: """Background polling service using APScheduler""" def __init__(self, app=None): self.scheduler = BackgroundScheduler() self.app = app self.storage_dir = 'data' def init_app(self, app): """Initialize with Flask app""" self.app = app self.storage_dir = app.config.get('POLL_STORAGE_DIR', 'data') def start(self): """Start the scheduler""" if not self.scheduler.running: self.scheduler.start() logger.info("Polling scheduler started") # Schedule the poll checker to run every minute self.scheduler.add_job( func=self._check_and_poll, trigger=IntervalTrigger(minutes=1), id='poll_checker', name='Check and poll sources', replace_existing=True ) logger.info("Poll checker job scheduled") def stop(self): """Stop the scheduler""" if self.scheduler.running: self.scheduler.shutdown() logger.info("Polling scheduler stopped") def _check_and_poll(self): """Check which sources need polling and poll them""" if not self.app: logger.error("No app context available") return with self.app.app_context(): try: # Get all enabled sources sources = PollSource.query.filter_by(enabled=True).all() for source in sources: # Check if source needs polling if self._should_poll(source): self._poll_source(source) except Exception as e: logger.error(f"Error in poll checker: {e}") logger.error(traceback.format_exc()) def _should_poll(self, source: PollSource) -> bool: """Determine if a source should be polled now""" if not source.last_poll_time: # Never polled, should poll return True # Calculate time since last poll time_since_poll = datetime.utcnow() - source.last_poll_time minutes_since_poll = time_since_poll.total_seconds() / 60 # Poll if interval has elapsed return minutes_since_poll >= source.poll_interval_minutes def _poll_source(self, source: PollSource): """Poll a single source""" logger.info(f"Polling {source.platform}:{source.source_id}") # Create poll log poll_log = PollLog( source_id=source.id, started_at=datetime.utcnow(), status='running' ) db.session.add(poll_log) db.session.commit() try: # Perform the actual data collection result = self._collect_data(source) # Update poll log poll_log.completed_at = datetime.utcnow() poll_log.status = 'success' poll_log.posts_found = result.get('posts_found', 0) poll_log.posts_new = result.get('posts_new', 0) poll_log.posts_updated = result.get('posts_updated', 0) # Update source source.last_poll_time = datetime.utcnow() source.last_poll_status = 'success' source.last_poll_error = None source.posts_collected += result.get('posts_new', 0) db.session.commit() logger.info(f"Polling completed for {source.platform}:{source.source_id} - " f"{result.get('posts_new', 0)} new posts") except Exception as e: error_msg = str(e) error_trace = traceback.format_exc() logger.error(f"Error polling {source.platform}:{source.source_id}: {error_msg}") logger.error(error_trace) # Update poll log poll_log.completed_at = datetime.utcnow() poll_log.status = 'error' poll_log.error_message = f"{error_msg}\n\n{error_trace}" # Update source source.last_poll_time = datetime.utcnow() source.last_poll_status = 'error' source.last_poll_error = error_msg db.session.commit() def _collect_data(self, source: PollSource) -> Dict: """ Collect data from a source. Wraps the existing data_collection.py functionality. """ from data_collection import ensure_directories, load_index, save_index, load_state, save_state from datetime import datetime, timedelta # Setup directories and load state dirs = ensure_directories(self.storage_dir) index = load_index(self.storage_dir) state = load_state(self.storage_dir) # Calculate date range - always use last 24 hours for polling # Don't use the resume feature as it can create too narrow windows end_date = datetime.now() start_date = end_date - timedelta(hours=24) start_iso = start_date.isoformat() end_iso = end_date.isoformat() try: # Call the existing collect_platform function using source settings posts_collected = collect_platform( platform=source.platform, community=source.source_id, start_date=start_iso, end_date=end_iso, max_posts=source.max_posts or 100, fetch_comments=source.fetch_comments if hasattr(source, 'fetch_comments') else True, index=index, dirs=dirs ) # Save updated index and state save_index(index, self.storage_dir) state['last_run'] = end_iso save_state(state, self.storage_dir) return { 'posts_found': posts_collected, 'posts_new': posts_collected, 'posts_updated': 0 } except Exception as e: logger.error(f"Error in _collect_data: {e}") return { 'posts_found': 0, 'posts_new': 0, 'posts_updated': 0 } def poll_now(self, source_id: str): """Manually trigger polling for a specific source""" with self.app.app_context(): source = PollSource.query.get(source_id) if source: self._poll_source(source) return True return False def get_status(self) -> Dict: """Get scheduler status""" return { 'running': self.scheduler.running, 'jobs': [ { 'id': job.id, 'name': job.name, 'next_run': job.next_run_time.isoformat() if job.next_run_time else None } for job in self.scheduler.get_jobs() ] } # Global polling service instance polling_service = PollingService()