""" core/adaptive_meds.py - Adaptive medication timing and nagging logic This module handles: - Discord presence tracking for wake detection - Adaptive medication schedule calculations - Nagging logic for missed medications - Quiet hours enforcement """ import json import uuid from datetime import datetime, timedelta, time from typing import Optional, Dict, List, Tuple import core.postgres as postgres from core.tz import user_now, user_today_for def get_adaptive_settings(user_uuid: str) -> Optional[Dict]: """Get user's adaptive medication settings.""" rows = postgres.select("adaptive_med_settings", {"user_uuid": user_uuid}) if rows: return rows[0] return None def get_user_presence(user_uuid: str) -> Optional[Dict]: """Get user's Discord presence data.""" rows = postgres.select("user_presence", {"user_uuid": user_uuid}) if rows: return rows[0] return None def update_user_presence(user_uuid: str, discord_user_id: str, is_online: bool): """Update user's presence status.""" now = datetime.utcnow() presence = get_user_presence(user_uuid) if presence: # Update existing record updates = {"is_currently_online": is_online, "updated_at": now} if is_online: updates["last_online_at"] = now else: updates["last_offline_at"] = now postgres.update("user_presence", updates, {"user_uuid": user_uuid}) else: # Create new record data = { "id": str(uuid.uuid4()), "user_uuid": user_uuid, "discord_user_id": discord_user_id, "is_currently_online": is_online, "last_online_at": now if is_online else None, "last_offline_at": now if not is_online else None, "presence_history": json.dumps([]), "updated_at": now, } postgres.insert("user_presence", data) def record_presence_event(user_uuid: str, event_type: str, timestamp: datetime): """Record a presence event in the history.""" presence = get_user_presence(user_uuid) if not presence: return raw_history = presence.get("presence_history", []) history = json.loads(raw_history) if isinstance(raw_history, str) else raw_history # Add new event history.append({"type": event_type, "timestamp": timestamp.isoformat()}) # Keep only last 7 days of history (up to 100 events) history = history[-100:] postgres.update( "user_presence", {"presence_history": json.dumps(history)}, {"user_uuid": user_uuid}, ) def calculate_typical_wake_time(user_uuid: str) -> Optional[time]: """Calculate user's typical wake time based on presence history.""" presence = get_user_presence(user_uuid) if not presence: return None raw_history = presence.get("presence_history", []) history = json.loads(raw_history) if isinstance(raw_history, str) else raw_history if len(history) < 3: return None # Get all "online" events wake_times = [] for event in history: if event["type"] == "online": ts = datetime.fromisoformat(event["timestamp"]) wake_times.append(ts.time()) if not wake_times: return None # Calculate average wake time (convert to minutes since midnight) total_minutes = sum(t.hour * 60 + t.minute for t in wake_times) avg_minutes = total_minutes // len(wake_times) return time(avg_minutes // 60, avg_minutes % 60) def detect_wake_event(user_uuid: str, current_time: datetime) -> Optional[datetime]: """Detect if user just woke up based on presence change.""" presence = get_user_presence(user_uuid) if not presence: return None # Check if they just came online if presence.get("is_currently_online"): last_online = presence.get("last_online_at") last_offline = presence.get("last_offline_at") if last_online and last_offline: offline_duration = last_online - last_offline # If they were offline for more than 30 minutes, consider it a wake event if offline_duration.total_seconds() > 1800: # 30 minutes return last_online return None def is_quiet_hours(user_uuid: str, check_time: datetime) -> bool: """Check if current time is within user's quiet hours.""" settings = get_adaptive_settings(user_uuid) if not settings: return False quiet_start = settings.get("quiet_hours_start") quiet_end = settings.get("quiet_hours_end") if not quiet_start or not quiet_end: return False current_time = check_time.time() # Handle quiet hours that span midnight if quiet_start > quiet_end: return current_time >= quiet_start or current_time <= quiet_end else: return quiet_start <= current_time <= quiet_end def calculate_adjusted_times( user_uuid: str, base_times: List[str], wake_time: Optional[datetime] = None ) -> List[Tuple[str, int]]: """ Calculate adjusted medication times based on wake time. Args: user_uuid: User's UUID base_times: List of base times in "HH:MM" format wake_time: Optional wake time to use for adjustment Returns: List of (adjusted_time_str, offset_minutes) tuples """ settings = get_adaptive_settings(user_uuid) if not settings or not settings.get("adaptive_timing_enabled"): # Return base times with 0 offset return [(t, 0) for t in base_times] # Get user's timezone prefs = postgres.select("user_preferences", {"user_uuid": user_uuid}) offset_minutes = prefs[0].get("timezone_offset", 0) if prefs else 0 # Get current time in user's timezone user_current_time = user_now(offset_minutes) today = user_current_time.date() # Determine wake time if wake_time is None: # Try to get from presence detection wake_time = detect_wake_event(user_uuid, user_current_time) if wake_time is None: # Use typical wake time if available typical_wake = calculate_typical_wake_time(user_uuid) if typical_wake: wake_time = datetime.combine(today, typical_wake) if wake_time is None: # Default wake time (8 AM) wake_time = datetime.combine(today, time(8, 0)) # Calculate offset from first med time if not base_times: return [] first_med_time = datetime.strptime(base_times[0], "%H:%M").time() first_med_datetime = datetime.combine(today, first_med_time) # Calculate how late they are if wake_time.time() > first_med_time: # They woke up after their first med time offset_minutes = int((wake_time - first_med_datetime).total_seconds() / 60) else: offset_minutes = 0 # Adjust all times adjusted = [] for base_time_str in base_times: base_time = datetime.strptime(base_time_str, "%H:%M").time() base_datetime = datetime.combine(today, base_time) # Add offset adjusted_datetime = base_datetime + timedelta(minutes=offset_minutes) adjusted_time_str = adjusted_datetime.strftime("%H:%M") adjusted.append((adjusted_time_str, offset_minutes)) return adjusted def should_send_nag( user_uuid: str, med_id: str, scheduled_time: str, current_time: datetime ) -> Tuple[bool, str]: """ Determine if we should send a nag notification. Returns: (should_nag: bool, reason: str) """ settings = get_adaptive_settings(user_uuid) if not settings: return False, "No settings" if not settings.get("nagging_enabled"): return False, "Nagging disabled" # Check quiet hours if is_quiet_hours(user_uuid, current_time): return False, "Quiet hours" # Check if user is online (don't nag if offline unless presence tracking disabled) presence = get_user_presence(user_uuid) if presence and settings.get("presence_tracking_enabled"): if not presence.get("is_currently_online"): return False, "User offline" # Get today's schedule record for this specific time slot today = current_time.date() query = {"user_uuid": user_uuid, "medication_id": med_id, "adjustment_date": today} if scheduled_time is not None: query["adjusted_time"] = scheduled_time schedules = postgres.select("medication_schedules", query) if not schedules: return False, "No schedule found" schedule = schedules[0] nag_count = schedule.get("nag_count", 0) max_nags = settings.get("max_nag_count", 4) if nag_count >= max_nags: return False, f"Max nags reached ({max_nags})" # Check if it's time to nag last_nag = schedule.get("last_nag_at") nag_interval = settings.get("nag_interval_minutes", 15) if last_nag: time_since_last_nag = (current_time - last_nag).total_seconds() / 60 if time_since_last_nag < nag_interval: return False, f"Too soon ({int(time_since_last_nag)} < {nag_interval} min)" # Check if this specific dose was already taken today logs = postgres.select( "med_logs", { "medication_id": med_id, "user_uuid": user_uuid, "action": "taken", "scheduled_time": scheduled_time, }, ) # Filter to today's logs for this time slot today_logs = [ log for log in logs if log.get("created_at") and log["created_at"].date() == today ] if today_logs: return False, "Already taken today" return True, "Time to nag" def record_nag_sent(user_uuid: str, med_id: str, scheduled_time: str): """Record that a nag was sent.""" today = user_today_for(user_uuid) query = {"user_uuid": user_uuid, "medication_id": med_id, "adjustment_date": today} if scheduled_time is not None: query["adjusted_time"] = scheduled_time schedules = postgres.select("medication_schedules", query) if schedules: schedule = schedules[0] new_nag_count = schedule.get("nag_count", 0) + 1 postgres.update( "medication_schedules", {"nag_count": new_nag_count, "last_nag_at": datetime.utcnow()}, {"id": schedule["id"]}, ) def create_daily_schedule(user_uuid: str, med_id: str, base_times: List[str]): """Create today's medication schedule with adaptive adjustments.""" today = user_today_for(user_uuid) # Check if schedule already exists existing = postgres.select( "medication_schedules", {"user_uuid": user_uuid, "medication_id": med_id, "adjustment_date": today}, ) if existing: return # Calculate adjusted times adjusted_times = calculate_adjusted_times(user_uuid, base_times) # Create schedule records for each time for base_time, (adjusted_time, offset) in zip(base_times, adjusted_times): data = { "id": str(uuid.uuid4()), "user_uuid": user_uuid, "medication_id": med_id, "base_time": base_time, "adjusted_time": adjusted_time, "adjustment_date": today, "adjustment_minutes": offset, "nag_count": 0, "status": "pending", } postgres.insert("medication_schedules", data) def mark_med_taken(user_uuid: str, med_id: str, scheduled_time: str): """Mark a medication as taken.""" today = user_today_for(user_uuid) postgres.update( "medication_schedules", {"status": "taken"}, { "user_uuid": user_uuid, "medication_id": med_id, "adjustment_date": today, "adjusted_time": scheduled_time, }, )