Files
Synculous-2/core/adaptive_meds.py

429 lines
14 KiB
Python

"""
core/adaptive_meds.py - Adaptive medication timing and nagging logic
This module handles:
- Discord presence tracking for wake detection
- Adaptive medication schedule calculations
- Nagging logic for missed medications
- Quiet hours enforcement
"""
import json
import uuid
from datetime import datetime, timedelta, time, timezone
from typing import Optional, Dict, List, Tuple
import core.postgres as postgres
from core.tz import user_now, user_today_for
def _normalize_time(val):
"""Convert datetime.time objects to 'HH:MM' strings for use in VARCHAR queries."""
if isinstance(val, time):
return val.strftime("%H:%M")
if val is not None:
return str(val)[:5]
return val
def get_adaptive_settings(user_uuid: str) -> Optional[Dict]:
"""Get user's adaptive medication settings."""
rows = postgres.select("adaptive_med_settings", {"user_uuid": user_uuid})
if rows:
return rows[0]
return None
def get_user_presence(user_uuid: str) -> Optional[Dict]:
"""Get user's Discord presence data."""
rows = postgres.select("user_presence", {"user_uuid": user_uuid})
if rows:
return rows[0]
return None
def update_user_presence(user_uuid: str, discord_user_id: str, is_online: bool):
"""Update user's presence status."""
now = datetime.utcnow()
presence = get_user_presence(user_uuid)
if presence:
# Update existing record
updates = {"is_currently_online": is_online, "updated_at": now}
if is_online:
updates["last_online_at"] = now
else:
updates["last_offline_at"] = now
postgres.update("user_presence", updates, {"user_uuid": user_uuid})
else:
# Create new record
data = {
"id": str(uuid.uuid4()),
"user_uuid": user_uuid,
"discord_user_id": discord_user_id,
"is_currently_online": is_online,
"last_online_at": now if is_online else None,
"last_offline_at": now if not is_online else None,
"presence_history": json.dumps([]),
"updated_at": now,
}
postgres.insert("user_presence", data)
def record_presence_event(user_uuid: str, event_type: str, timestamp: datetime):
"""Record a presence event in the history."""
presence = get_user_presence(user_uuid)
if not presence:
return
raw_history = presence.get("presence_history", [])
history = json.loads(raw_history) if isinstance(raw_history, str) else raw_history
# Add new event
history.append({"type": event_type, "timestamp": timestamp.isoformat()})
# Keep only last 7 days of history (up to 100 events)
history = history[-100:]
postgres.update(
"user_presence",
{"presence_history": json.dumps(history)},
{"user_uuid": user_uuid},
)
def calculate_typical_wake_time(user_uuid: str) -> Optional[time]:
"""Calculate user's typical wake time based on presence history."""
presence = get_user_presence(user_uuid)
if not presence:
return None
raw_history = presence.get("presence_history", [])
history = json.loads(raw_history) if isinstance(raw_history, str) else raw_history
if len(history) < 3:
return None
# Get all "online" events
wake_times = []
for event in history:
if event["type"] == "online":
ts = datetime.fromisoformat(event["timestamp"])
wake_times.append(ts.time())
if not wake_times:
return None
# Calculate average wake time (convert to minutes since midnight)
total_minutes = sum(t.hour * 60 + t.minute for t in wake_times)
avg_minutes = total_minutes // len(wake_times)
return time(avg_minutes // 60, avg_minutes % 60)
def detect_wake_event(user_uuid: str, current_time: datetime) -> Optional[datetime]:
"""Detect if user just woke up based on presence change."""
presence = get_user_presence(user_uuid)
if not presence:
return None
# Check if they just came online
if presence.get("is_currently_online"):
last_online = presence.get("last_online_at")
last_offline = presence.get("last_offline_at")
if last_online and last_offline:
offline_duration = last_online - last_offline
# If they were offline for more than 30 minutes, consider it a wake event
if offline_duration.total_seconds() > 1800: # 30 minutes
return last_online
return None
def is_quiet_hours(user_uuid: str, check_time: datetime) -> bool:
"""Check if current time is within user's quiet hours."""
settings = get_adaptive_settings(user_uuid)
if not settings:
return False
quiet_start = settings.get("quiet_hours_start")
quiet_end = settings.get("quiet_hours_end")
if not quiet_start or not quiet_end:
return False
current_time = check_time.time()
# Handle quiet hours that span midnight
if quiet_start > quiet_end:
return current_time >= quiet_start or current_time <= quiet_end
else:
return quiet_start <= current_time <= quiet_end
def calculate_adjusted_times(
user_uuid: str, base_times: List[str], wake_time: Optional[datetime] = None
) -> List[Tuple[str, int]]:
"""
Calculate adjusted medication times based on wake time.
Args:
user_uuid: User's UUID
base_times: List of base times in "HH:MM" format
wake_time: Optional wake time to use for adjustment
Returns:
List of (adjusted_time_str, offset_minutes) tuples
"""
settings = get_adaptive_settings(user_uuid)
if not settings or not settings.get("adaptive_timing_enabled"):
# Return base times with 0 offset
return [(t, 0) for t in base_times]
# Get user's timezone
prefs = postgres.select("user_preferences", {"user_uuid": user_uuid})
offset_minutes = prefs[0].get("timezone_offset", 0) if prefs else 0
# Get current time in user's timezone
user_current_time = user_now(offset_minutes)
today = user_current_time.date()
# Determine wake time
if wake_time is None:
# Try to get from presence detection
wake_time = detect_wake_event(user_uuid, user_current_time)
if wake_time is None:
# Use typical wake time if available
typical_wake = calculate_typical_wake_time(user_uuid)
if typical_wake:
wake_time = datetime.combine(today, typical_wake)
if wake_time is None:
# Default wake time (8 AM)
wake_time = datetime.combine(today, time(8, 0))
# Calculate offset from first med time
if not base_times:
return []
first_med_time = datetime.strptime(base_times[0], "%H:%M").time()
first_med_datetime = datetime.combine(today, first_med_time)
# Calculate how late they are
if wake_time.time() > first_med_time:
# They woke up after their first med time
offset_minutes = int((wake_time - first_med_datetime).total_seconds() / 60)
else:
offset_minutes = 0
# Adjust all times
adjusted = []
for base_time_str in base_times:
base_time = datetime.strptime(base_time_str, "%H:%M").time()
base_datetime = datetime.combine(today, base_time)
# Add offset
adjusted_datetime = base_datetime + timedelta(minutes=offset_minutes)
adjusted_time_str = adjusted_datetime.strftime("%H:%M")
adjusted.append((adjusted_time_str, offset_minutes))
return adjusted
def should_send_nag(
user_uuid: str, med_id: str, scheduled_time, current_time: datetime
) -> Tuple[bool, str]:
"""
Determine if we should send a nag notification.
Returns:
(should_nag: bool, reason: str)
"""
scheduled_time = _normalize_time(scheduled_time)
# Don't nag for doses that aren't due yet
if scheduled_time:
sched_hour, sched_min = int(scheduled_time[:2]), int(scheduled_time[3:5])
sched_as_time = time(sched_hour, sched_min)
if current_time.time() < sched_as_time:
return False, "Not yet due"
settings = get_adaptive_settings(user_uuid)
if not settings:
return False, "No settings"
if not settings.get("nagging_enabled"):
return False, "Nagging disabled"
# Check quiet hours
if is_quiet_hours(user_uuid, current_time):
return False, "Quiet hours"
# Check if user is online (don't nag if offline unless presence tracking disabled)
presence = get_user_presence(user_uuid)
if presence and settings.get("presence_tracking_enabled"):
if not presence.get("is_currently_online"):
return False, "User offline"
# Get today's schedule record for this specific time slot
today = user_today_for(user_uuid)
query = {"user_uuid": user_uuid, "medication_id": med_id, "adjustment_date": today}
if scheduled_time is not None:
query["adjusted_time"] = scheduled_time
schedules = postgres.select("medication_schedules", query)
if not schedules:
return False, "No schedule found"
schedule = schedules[0]
nag_count = schedule.get("nag_count", 0)
max_nags = settings.get("max_nag_count", 4)
if nag_count >= max_nags:
return False, f"Max nags reached ({max_nags})"
# Check if it's time to nag
last_nag = schedule.get("last_nag_at")
nag_interval = settings.get("nag_interval_minutes", 15)
if last_nag:
if isinstance(last_nag, datetime) and last_nag.tzinfo is None:
last_nag = last_nag.replace(tzinfo=timezone.utc)
time_since_last_nag = (current_time - last_nag).total_seconds() / 60
if time_since_last_nag < nag_interval:
return False, f"Too soon ({int(time_since_last_nag)} < {nag_interval} min)"
# Check if this specific dose was already taken today
logs = postgres.select(
"med_logs",
{
"medication_id": med_id,
"user_uuid": user_uuid,
"action": "taken",
},
)
# Get medication times to calculate dose interval for proximity check
med = postgres.select_one("medications", {"id": med_id})
dose_interval_minutes = 60 # default fallback
if med and med.get("times"):
times = med["times"]
if len(times) >= 2:
time_minutes = []
for t in times:
t = _normalize_time(t)
if t:
h, m = int(t[:2]), int(t[3:5])
time_minutes.append(h * 60 + m)
time_minutes.sort()
intervals = []
for i in range(1, len(time_minutes)):
intervals.append(time_minutes[i] - time_minutes[i - 1])
if intervals:
dose_interval_minutes = min(intervals)
proximity_window = max(30, dose_interval_minutes // 2)
# Filter to today's logs and check for this specific dose
for log in logs:
created_at = log.get("created_at")
if not created_at:
continue
if created_at.date() != today:
continue
log_scheduled_time = log.get("scheduled_time")
if log_scheduled_time:
log_scheduled_time = _normalize_time(log_scheduled_time)
if log_scheduled_time == scheduled_time:
return False, "Already taken today"
else:
if scheduled_time and created_at:
log_hour = created_at.hour
log_min = created_at.minute
sched_hour, sched_min = (
int(scheduled_time[:2]),
int(scheduled_time[3:5]),
)
diff_minutes = abs(
(log_hour * 60 + log_min) - (sched_hour * 60 + sched_min)
)
if diff_minutes <= proximity_window:
return False, "Already taken today"
return True, "Time to nag"
def record_nag_sent(user_uuid: str, med_id: str, scheduled_time):
"""Record that a nag was sent."""
scheduled_time = _normalize_time(scheduled_time)
today = user_today_for(user_uuid)
query = {"user_uuid": user_uuid, "medication_id": med_id, "adjustment_date": today}
if scheduled_time is not None:
query["adjusted_time"] = scheduled_time
schedules = postgres.select("medication_schedules", query)
if schedules:
schedule = schedules[0]
new_nag_count = schedule.get("nag_count", 0) + 1
postgres.update(
"medication_schedules",
{"nag_count": new_nag_count, "last_nag_at": datetime.now(timezone.utc)},
{"id": schedule["id"]},
)
def create_daily_schedule(user_uuid: str, med_id: str, base_times: List[str]):
"""Create today's medication schedule with adaptive adjustments."""
today = user_today_for(user_uuid)
# Check if schedule already exists
existing = postgres.select(
"medication_schedules",
{"user_uuid": user_uuid, "medication_id": med_id, "adjustment_date": today},
)
if existing:
return
# Calculate adjusted times
adjusted_times = calculate_adjusted_times(user_uuid, base_times)
# Create schedule records for each time
for base_time, (adjusted_time, offset) in zip(base_times, adjusted_times):
data = {
"id": str(uuid.uuid4()),
"user_uuid": user_uuid,
"medication_id": med_id,
"base_time": base_time,
"adjusted_time": adjusted_time,
"adjustment_date": today,
"adjustment_minutes": offset,
"nag_count": 0,
"status": "pending",
}
postgres.insert("medication_schedules", data)
def mark_med_taken(user_uuid: str, med_id: str, scheduled_time):
"""Mark a medication as taken."""
scheduled_time = _normalize_time(scheduled_time)
today = user_today_for(user_uuid)
postgres.update(
"medication_schedules",
{"status": "taken"},
{
"user_uuid": user_uuid,
"medication_id": med_id,
"adjustment_date": today,
"adjusted_time": scheduled_time,
},
)