Files
Synculous-2/bot/commands/medications.py
2026-02-16 13:16:18 -06:00

755 lines
27 KiB
Python

"""
Medications command handler - bot-side hooks for medication management
"""
import asyncio
import re
from datetime import datetime, timedelta, timezone
from bot.command_registry import register_module
import ai.parser as ai_parser
def _get_nearest_scheduled_time(times, user_tz=None):
"""Find the nearest scheduled time in user's timezone.
Args:
times: List of time strings like ["08:00", "20:00"]
user_tz: pytz timezone object for the user
Returns the time as HH:MM string, or None if no times provided.
"""
if not times:
return None
# Get current time in user's timezone
if user_tz is None:
# Default to UTC if no timezone provided
user_tz = timezone.utc
now = datetime.now(user_tz)
now_minutes = now.hour * 60 + now.minute
# Find the time closest to now (within ±4 hours window)
best_time = None
best_diff = float("inf")
window_minutes = 4 * 60 # 4 hour window
for time_str in times:
try:
# Parse time string (e.g., "12:00")
hour, minute = map(int, time_str.split(":"))
time_minutes = hour * 60 + minute
# Calculate difference
diff = abs(time_minutes - now_minutes)
# Only consider times within the window
if diff <= window_minutes and diff < best_diff:
best_diff = diff
best_time = time_str
except (ValueError, AttributeError):
continue
# If no time within window, use the most recent past time
if best_time is None:
# Get current time in user's timezone again for reference
now = datetime.now(user_tz)
now_minutes = now.hour * 60 + now.minute
# Find the most recent past time
past_times = []
for time_str in times:
try:
hour, minute = map(int, time_str.split(":"))
time_minutes = hour * 60 + minute
# If time is in the past today, calculate minutes since then
if time_minutes <= now_minutes:
past_times.append((time_minutes, time_str))
else:
# If time is in the future, consider it as yesterday's time
past_times.append((time_minutes - 24 * 60, time_str))
except (ValueError, AttributeError):
continue
if past_times:
# Find the time closest to now (most recent past)
past_times.sort()
best_time = past_times[-1][1]
return best_time
if not best_time:
for time_str in times:
try:
hour, minute = map(int, time_str.split(":"))
time_minutes = hour * 60 + minute
# If this time was earlier today, it's a candidate
if time_minutes <= now_minutes:
diff = now_minutes - time_minutes
if diff < best_diff:
best_diff = diff
best_time = time_str
except (ValueError, AttributeError):
continue
# If still no time found, use the first scheduled time
if not best_time and times:
best_time = times[0]
return best_time
async def _get_user_timezone(bot, user_id):
"""Check if user has timezone set, ask for it if not.
Returns the timezone string or None if user cancels.
"""
# Check if user has timezone set
user_data = await bot.api.get_user_data(user_id)
if user_data and user_data.get('timezone'):
return user_data['timezone']
# Ask user for their timezone
await bot.send_dm(user_id, "🕐 I don't have your timezone set yet. Could you please tell me your timezone?\n\n" +
"You can provide it in various formats:\n" +
"- Timezone name (e.g., 'America/New_York', 'Europe/London')\n" +
"- UTC offset (e.g., 'UTC+2', '-05:00')\n" +
"- Common abbreviations (e.g., 'EST', 'PST')\n\n" +
"Please reply with your timezone and I'll set it up for you!")
# Wait for user response (simplified - in real implementation this would be more complex)
# For now, we'll just return None to indicate we need to handle this differently
return None
await bot.send_dm(user_id, "I didn't understand that timezone format. Please say something like:\n"
'- "UTC-8" or "-8" for Pacific Time\n'
'- "UTC+1" or "+1" for Central European Time\n'
'- "PST", "EST", "CST", "MST" for US timezones')
return None
# Check if user has timezone set in preferences
resp, status = api_request("get", "/api/preferences", token)
if status == 200 and resp:
offset = resp.get("timezone_offset")
if offset is not None:
return offset
# No timezone set - need to ask user
return None
def _parse_timezone(user_input):
"""Parse timezone string to offset in minutes.
Examples:
"UTC-8" -> 480 (8 hours behind UTC)
"-8" -> 480
"PST" -> 480
"EST" -> 300
"+5:30" -> -330
Returns offset in minutes (positive = behind UTC) or None if invalid.
"""
user_input = user_input.strip().upper()
# Common timezone abbreviations
tz_map = {
"PST": 480,
"PDT": 420,
"MST": 420,
"MDT": 360,
"CST": 360,
"CDT": 300,
"EST": 300,
"EDT": 240,
"GMT": 0,
"UTC": 0,
}
if user_input in tz_map:
return tz_map[user_input]
# Try to parse offset format
# Remove UTC prefix if present
if user_input.startswith("UTC"):
user_input = user_input[3:]
# Match patterns like -8, +5, -5:30, +5:30
match = re.match(r"^([+-])?(\d+)(?::(\d+))?$", user_input)
if match:
sign = -1 if match.group(1) == "-" else 1
hours = int(match.group(2))
minutes = int(match.group(3)) if match.group(3) else 0
# Convert to offset minutes (positive = behind UTC)
# If user says UTC-8, they're 8 hours BEHIND UTC, so offset is +480
offset_minutes = (hours * 60 + minutes) * sign * -1
return offset_minutes
return None
async def _get_scheduled_time_from_context(message, med_name):
"""Fetch recent messages and extract scheduled time from medication reminder.
Looks for bot messages in the last 10 messages that match the pattern:
"Time to take {med_name} (...) · HH:MM"
Returns the scheduled time string (e.g., "12:00") or None if not found.
"""
try:
print(
f"[DEBUG] Looking for reminder for '{med_name}' in channel history",
flush=True,
)
# Get last 10 messages from channel history (increased from 5)
async for msg in message.channel.history(limit=10):
# Skip the current message
if msg.id == message.id:
continue
# Check if this is a bot message
if not msg.author.bot:
continue
content = msg.content
print(f"[DEBUG] Checking bot message: {content[:100]}...", flush=True)
# Look for reminder pattern: "Time to take {med_name} (...) · HH:MM"
# First try exact match
pattern = rf"Time to take {re.escape(med_name)}.*?·\s*(\d{{1,2}}:\d{{2}})"
match = re.search(pattern, content, re.IGNORECASE)
if match:
scheduled_time = match.group(1)
print(f"[DEBUG] Found scheduled time: {scheduled_time}", flush=True)
return scheduled_time
# If no exact match, try to extract any medication name from reminder
# Pattern: "Time to take (name) (dosage) · time"
general_pattern = r"Time to take\s+(\w+)\s+\(.*?\)\s+·\s*(\d{1,2}:\d{2})"
general_match = re.search(general_pattern, content, re.IGNORECASE)
if general_match:
reminder_med_name = general_match.group(1)
# Check if the names match (case insensitive, or one contains the other)
if (
med_name.lower() in reminder_med_name.lower()
or reminder_med_name.lower() in med_name.lower()
):
scheduled_time = general_match.group(2)
print(
f"[DEBUG] Found scheduled time via partial match: {scheduled_time}",
flush=True,
)
return scheduled_time
print(f"[DEBUG] No reminder found for '{med_name}'", flush=True)
except Exception as e:
print(f"[DEBUG] Error in _get_scheduled_time_from_context: {e}", flush=True)
return None
async def handle_medication(message, session, parsed):
action = parsed.get("action", "unknown")
token = session["token"]
user_uuid = session["user_uuid"]
if action == "list":
resp, status = api_request("get", "/api/medications", token)
if status == 200:
meds = resp if isinstance(resp, list) else []
if not meds:
await message.channel.send("You don't have any medications yet.")
else:
lines = [
f"- **{m['name']}**: {m['dosage']} {m['unit']} ({m.get('frequency', 'n/a')})"
for m in meds
]
await message.channel.send("**Your medications:**\n" + "\n".join(lines))
else:
await message.channel.send(
f"Error: {resp.get('error', 'Failed to fetch medications')}"
)
elif action == "add":
name = parsed.get("name")
dosage = parsed.get("dosage")
unit = parsed.get("unit", "mg")
frequency = parsed.get("frequency", "daily")
times = parsed.get("times", ["08:00"])
days_of_week = parsed.get("days_of_week", [])
interval_days = parsed.get("interval_days")
needs_confirmation = parsed.get("needs_confirmation", False)
confirmation_prompt = parsed.get("confirmation_prompt")
if not name or not dosage:
await message.channel.send("Please provide medication name and dosage.")
return
# Handle confirmation
if needs_confirmation:
# Store pending action in session for confirmation
if "pending_confirmations" not in session:
session["pending_confirmations"] = {}
confirmation_id = f"med_add_{name}"
session["pending_confirmations"][confirmation_id] = {
"action": "add",
"name": name,
"dosage": dosage,
"unit": unit,
"frequency": frequency,
"times": times,
"days_of_week": days_of_week,
"interval_days": interval_days,
}
schedule_desc = _format_schedule(
frequency, times, days_of_week, interval_days
)
await message.channel.send(
f"{confirmation_prompt}\n\n"
f"**Details:**\n"
f"- Name: {name}\n"
f"- Dosage: {dosage} {unit}\n"
f"- Schedule: {schedule_desc}\n\n"
f"Reply **yes** to confirm, or **no** to cancel."
)
return
await _add_medication(
message,
token,
name,
dosage,
unit,
frequency,
times,
days_of_week,
interval_days,
)
elif action == "take":
med_id = parsed.get("medication_id")
name = parsed.get("name")
med_id, name, found = await _find_medication_by_name(
message, token, med_id, name
)
if not found:
return
if not med_id:
await message.channel.send("Which medication did you take?")
return
# Check if we have user's timezone
timezone_offset = await _get_user_timezone(message, session, token)
if timezone_offset is None:
# Need to ask for timezone first
if "pending_confirmations" not in session:
session["pending_confirmations"] = {}
# Store that we're waiting for timezone, along with the med info
session["pending_confirmations"]["timezone"] = {
"action": "take",
"medication_id": med_id,
"name": name,
}
await message.channel.send(
"📍 I need to know your timezone to track medications correctly.\n\n"
'What timezone are you in? (e.g., "UTC-8", "PST", "EST", "+1")'
)
return
# Try to get scheduled time from recent reminder context
scheduled_time = await _get_scheduled_time_from_context(message, name)
# If not found in context, calculate from medication schedule
if not scheduled_time:
# Get medication details to find scheduled times
med_resp, med_status = api_request(
"get", f"/api/medications/{med_id}", token
)
if med_status == 200 and med_resp:
times = med_resp.get("times", [])
scheduled_time = _get_nearest_scheduled_time(times, timezone_offset)
# Build request body with scheduled_time if found
request_body = {}
if scheduled_time:
request_body["scheduled_time"] = scheduled_time
resp, status = api_request(
"post", f"/api/medications/{med_id}/take", token, request_body
)
if status == 201:
if scheduled_time:
await message.channel.send(
f"✅ Logged **{name}** for {scheduled_time}! Great job staying on track."
)
else:
await message.channel.send(
f"Logged **{name}**! Great job staying on track."
)
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to log')}")
elif action == "skip":
med_id = parsed.get("medication_id")
name = parsed.get("name")
reason = parsed.get("reason", "Skipped by user")
med_id, name, found = await _find_medication_by_name(
message, token, med_id, name
)
if not found:
return
if not med_id:
await message.channel.send("Which medication are you skipping?")
return
# Try to get scheduled time from recent reminder context
scheduled_time = await _get_scheduled_time_from_context(message, name)
# Build request body with scheduled_time if found
request_body = {"reason": reason}
if scheduled_time:
request_body["scheduled_time"] = scheduled_time
resp, status = api_request(
"post", f"/api/medications/{med_id}/skip", token, request_body
)
if status == 201:
await message.channel.send(f"Skipped **{name}**. {reason}")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to log')}")
elif action == "today":
resp, status = api_request("get", "/api/medications/today", token)
if status == 200:
meds = resp if isinstance(resp, list) else []
if not meds:
await message.channel.send("No medications scheduled for today!")
else:
lines = []
for item in meds:
med = item.get("medication", {})
times = item.get("scheduled_times", [])
taken = item.get("taken_times", [])
skipped = item.get("skipped_times", [])
is_prn = item.get("is_prn", False)
med_name = med.get("name", "Unknown")
dosage = f"{med.get('dosage', '')} {med.get('unit', '')}".strip()
if is_prn:
status_icon = "💊"
lines.append(
f"{status_icon} **{med_name}** {dosage} (as needed)"
)
elif times:
for time in times:
if time in taken:
status_icon = ""
elif time in skipped:
status_icon = "⏭️"
else:
status_icon = ""
lines.append(
f"{status_icon} **{med_name}** {dosage} at {time}"
)
else:
lines.append(f"💊 **{med_name}** {dosage}")
await message.channel.send(
"**Today's Medications:**\n" + "\n".join(lines)
)
else:
error_msg = "Failed to fetch today's schedule"
await message.channel.send(f"Error: {resp.get('error', error_msg)}")
elif action == "refills":
resp, status = api_request("get", "/api/medications/refills-due", token)
if status == 200:
meds = resp if isinstance(resp, list) else []
if not meds:
await message.channel.send("No refills due soon! 🎉")
else:
lines = []
for med in meds:
name = med.get("name", "Unknown")
qty = med.get("quantity_remaining")
refill_date = med.get("refill_date")
if qty is not None and qty <= 7:
lines.append(f"⚠️ **{name}**: Only {qty} remaining")
elif refill_date:
lines.append(f"📅 **{name}**: Refill due by {refill_date}")
await message.channel.send("**Refills Due:**\n" + "\n".join(lines))
else:
await message.channel.send(
f"Error: {resp.get('error', 'Failed to fetch refills')}"
)
elif action == "snooze":
med_id = parsed.get("medication_id")
name = parsed.get("name")
minutes = parsed.get("minutes", 15)
med_id, name, found = await _find_medication_by_name(
message, token, med_id, name
)
if not found:
return
if not med_id:
await message.channel.send("Which medication reminder should I snooze?")
return
resp, status = api_request(
"post", f"/api/medications/{med_id}/snooze", token, {"minutes": minutes}
)
if status == 200:
await message.channel.send(f"⏰ Snoozed **{name}** for {minutes} minutes.")
else:
await message.channel.send(
f"Error: {resp.get('error', 'Failed to snooze')}"
)
elif action == "adherence":
med_id = parsed.get("medication_id")
name = parsed.get("name")
if name and not med_id:
# Look up by name first
med_id, name, found = await _find_medication_by_name(
message, token, None, name
)
if not found:
return
if med_id:
resp, status = api_request(
"get", f"/api/medications/{med_id}/adherence", token
)
else:
resp, status = api_request("get", "/api/medications/adherence", token)
if status == 200:
if isinstance(resp, list):
lines = []
for m in resp:
adherence = m.get("adherence_percent")
if adherence is not None:
lines.append(
f"- **{m['name']}**: {adherence}% adherence ({m.get('taken', 0)}/{m.get('expected', 0)} doses)"
)
else:
lines.append(
f"- **{m['name']}**: PRN medication (no adherence tracking)"
)
await message.channel.send("**Adherence Stats:**\n" + "\n".join(lines))
else:
adherence = resp.get("adherence_percent")
if adherence is not None:
await message.channel.send(
f"**{resp.get('name')}**:\n"
f"- Adherence: {adherence}%\n"
f"- Taken: {resp.get('taken', 0)}/{resp.get('expected', 0)} doses\n"
f"- Skipped: {resp.get('skipped', 0)}"
)
else:
await message.channel.send(
f"**{resp.get('name')}**: PRN medication (no adherence tracking)"
)
else:
await message.channel.send(
f"Error: {resp.get('error', 'Failed to fetch adherence')}"
)
elif action == "delete":
med_id = parsed.get("medication_id")
name = parsed.get("name")
needs_confirmation = parsed.get("needs_confirmation", True)
med_id, name, found = await _find_medication_by_name(
message, token, med_id, name
)
if not found:
return
if not med_id:
await message.channel.send("Which medication should I delete?")
return
# Handle confirmation
if needs_confirmation:
if "pending_confirmations" not in session:
session["pending_confirmations"] = {}
confirmation_id = f"med_delete_{name}"
session["pending_confirmations"][confirmation_id] = {
"action": "delete",
"interaction_type": "medication",
"medication_id": med_id,
"name": name,
"needs_confirmation": False, # Skip confirmation next time
}
await message.channel.send(
f"⚠️ Are you sure you want to delete **{name}**?\n\n"
f"This will also delete all logs for this medication.\n\n"
f"Reply **yes** to confirm deletion, or **no** to cancel."
)
return
# Actually delete
resp, status = api_request("delete", f"/api/medications/{med_id}", token)
if status == 200:
await message.channel.send(f"🗑️ Deleted **{name}** and all its logs.")
else:
await message.channel.send(
f"Error: {resp.get('error', 'Failed to delete medication')}"
)
else:
await message.channel.send(
f"Unknown action: {action}. Try: list, add, delete, take, skip, today, refills, snooze, or adherence."
)
async def _add_medication(
message, token, name, dosage, unit, frequency, times, days_of_week, interval_days
):
"""Helper to add a medication."""
data = {
"name": name,
"dosage": dosage,
"unit": unit,
"frequency": frequency,
"times": times,
}
# Add optional fields if present
if days_of_week:
data["days_of_week"] = days_of_week
if interval_days:
data["interval_days"] = interval_days
resp, status = api_request("post", "/api/medications", token, data)
if status == 201:
schedule_desc = _format_schedule(frequency, times, days_of_week, interval_days)
await message.channel.send(
f"✅ Added **{name}** ({dosage} {unit}) - {schedule_desc}"
)
else:
error_msg = resp.get("error", "Failed to add medication")
if "invalid input syntax" in error_msg.lower():
await message.channel.send(
f"Error: I couldn't understand the schedule format. Try saying it differently, like 'every day at 9am' or 'on Monday and Wednesday at 8pm'."
)
else:
await message.channel.send(f"Error: {error_msg}")
async def _find_medication_by_name(message, token, med_id, name):
"""Helper to find medication by name if ID not provided. Returns (med_id, name, found)."""
if med_id:
return med_id, name, True
if not name:
return None, None, True # Will prompt user later
# Look up medication by name
resp, status = api_request("get", "/api/medications", token)
if status != 200:
await message.channel.send("Error looking up medication. Please try again.")
return None, None, False
meds = resp if isinstance(resp, list) else []
# Try exact match first
matching = [m for m in meds if m["name"].lower() == name.lower()]
# Then try partial match
if not matching:
matching = [
m
for m in meds
if name.lower() in m["name"].lower() or m["name"].lower() in name.lower()
]
if not matching:
await message.channel.send(
f"❌ I couldn't find a medication called '{name}'.\n\nYour medications:\n"
+ "\n".join([f"- {m['name']}" for m in meds[:10]])
)
return None, None, False
if len(matching) > 1:
# Multiple matches - show list
lines = [f"{i + 1}. {m['name']}" for i, m in enumerate(matching[:5])]
await message.channel.send(
f"🔍 I found multiple medications matching '{name}':\n"
+ "\n".join(lines)
+ "\n\nPlease specify which one (e.g., 'take 1' or say the full name)."
)
return None, None, False
return matching[0]["id"], matching[0]["name"], True
def _format_schedule(frequency, times, days_of_week, interval_days):
"""Helper to format schedule description."""
if frequency == "daily":
return f"daily at {', '.join(times)}"
elif frequency == "twice_daily":
return f"twice daily at {', '.join(times)}"
elif frequency == "specific_days":
days_str = ", ".join(days_of_week) if days_of_week else "certain days"
return f"on {days_str} at {', '.join(times)}"
elif frequency == "every_n_days":
return f"every {interval_days} days at {', '.join(times)}"
elif frequency == "as_needed":
return "as needed"
else:
return f"{frequency} at {', '.join(times)}"
def api_request(method, endpoint, token, data=None):
import requests
import os
API_URL = os.getenv("API_URL", "http://app:5000")
url = f"{API_URL}{endpoint}"
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
try:
resp = getattr(requests, method)(url, headers=headers, json=data, timeout=10)
try:
return resp.json(), resp.status_code
except ValueError:
return {}, resp.status_code
except requests.RequestException:
return {"error": "API unavailable"}, 503
def validate_medication_json(data):
errors = []
if not isinstance(data, dict):
return ["Response must be a JSON object"]
if "error" in data:
return []
if "action" not in data:
errors.append("Missing required field: action")
return errors
register_module("medication", handle_medication)
ai_parser.register_validator("medication", validate_medication_json)