338 lines
10 KiB
Python
338 lines
10 KiB
Python
"""
|
|
core/snitch.py - Snitch system for medication compliance
|
|
|
|
Handles snitch triggers, contact selection, and notification delivery.
|
|
"""
|
|
|
|
import json
|
|
import uuid
|
|
from datetime import datetime, timedelta, date
|
|
from typing import Optional, Dict, List, Tuple
|
|
import core.postgres as postgres
|
|
import core.notifications as notifications
|
|
from core.tz import user_today_for
|
|
|
|
|
|
def get_snitch_settings(user_uuid: str) -> Optional[Dict]:
|
|
"""Get user's snitch settings."""
|
|
rows = postgres.select("snitch_settings", {"user_uuid": user_uuid})
|
|
if rows:
|
|
return rows[0]
|
|
return None
|
|
|
|
|
|
def get_snitch_contacts(user_uuid: str, active_only: bool = True) -> List[Dict]:
|
|
"""Get user's snitch contacts ordered by priority."""
|
|
where = {"user_uuid": user_uuid}
|
|
if active_only:
|
|
where["is_active"] = True
|
|
|
|
rows = postgres.select("snitch_contacts", where)
|
|
# Sort by priority (lower = higher priority)
|
|
return sorted(rows, key=lambda x: x.get("priority", 1))
|
|
|
|
|
|
def get_todays_snitch_count(user_uuid: str) -> int:
|
|
"""Get number of snitches sent today (in the user's local timezone)."""
|
|
today = user_today_for(user_uuid)
|
|
|
|
# Query snitch log for today
|
|
rows = postgres.select("snitch_log", {"user_uuid": user_uuid})
|
|
|
|
# Filter to today's entries
|
|
today_count = 0
|
|
for row in rows:
|
|
sent_at = row.get("sent_at")
|
|
if sent_at and hasattr(sent_at, "date") and sent_at.date() == today:
|
|
today_count += 1
|
|
|
|
return today_count
|
|
|
|
|
|
def get_last_snitch_time(user_uuid: str) -> Optional[datetime]:
|
|
"""Get timestamp of last snitch for cooldown check."""
|
|
rows = postgres.select(
|
|
"snitch_log", {"user_uuid": user_uuid}, order_by="sent_at DESC", limit=1
|
|
)
|
|
|
|
if rows:
|
|
return rows[0].get("sent_at")
|
|
return None
|
|
|
|
|
|
def check_cooldown(user_uuid: str, cooldown_hours: int) -> bool:
|
|
"""Check if enough time has passed since last snitch."""
|
|
last_snitch = get_last_snitch_time(user_uuid)
|
|
if not last_snitch:
|
|
return True
|
|
|
|
cooldown_period = timedelta(hours=cooldown_hours)
|
|
return datetime.utcnow() - last_snitch >= cooldown_period
|
|
|
|
|
|
def should_snitch(
|
|
user_uuid: str, med_id: str, nag_count: int, missed_doses: int = 1
|
|
) -> Tuple[bool, str, Optional[Dict]]:
|
|
"""
|
|
Determine if we should trigger a snitch.
|
|
|
|
Returns:
|
|
(should_snitch: bool, reason: str, settings: Optional[Dict])
|
|
"""
|
|
settings = get_snitch_settings(user_uuid)
|
|
|
|
if not settings:
|
|
return False, "No snitch settings found", None
|
|
|
|
if not settings.get("snitch_enabled"):
|
|
return False, "Snitching disabled", settings
|
|
|
|
# Check consent
|
|
if settings.get("require_consent") and not settings.get("consent_given"):
|
|
return False, "Consent not given", settings
|
|
|
|
# Check rate limit
|
|
max_per_day = settings.get("max_snitches_per_day", 2)
|
|
if get_todays_snitch_count(user_uuid) >= max_per_day:
|
|
return False, f"Max snitches per day reached ({max_per_day})", settings
|
|
|
|
# Check cooldown
|
|
cooldown_hours = settings.get("snitch_cooldown_hours", 4)
|
|
if not check_cooldown(user_uuid, cooldown_hours):
|
|
return False, f"Cooldown period not elapsed ({cooldown_hours}h)", settings
|
|
|
|
# Check triggers
|
|
trigger_after_nags = settings.get("trigger_after_nags", 4)
|
|
trigger_after_doses = settings.get("trigger_after_missed_doses", 1)
|
|
|
|
triggered_by_nags = nag_count >= trigger_after_nags
|
|
triggered_by_doses = missed_doses >= trigger_after_doses
|
|
|
|
if not triggered_by_nags and not triggered_by_doses:
|
|
return (
|
|
False,
|
|
f"Triggers not met (nags: {nag_count}/{trigger_after_nags}, doses: {missed_doses}/{trigger_after_doses})",
|
|
settings,
|
|
)
|
|
|
|
# Determine trigger reason
|
|
if triggered_by_nags and triggered_by_doses:
|
|
reason = "max_nags_and_missed_doses"
|
|
elif triggered_by_nags:
|
|
reason = "max_nags"
|
|
else:
|
|
reason = "missed_doses"
|
|
|
|
return True, reason, settings
|
|
|
|
|
|
def select_contacts_to_notify(user_uuid: str) -> List[Dict]:
|
|
"""Select which contacts to notify based on priority settings."""
|
|
contacts = get_snitch_contacts(user_uuid)
|
|
|
|
if not contacts:
|
|
return []
|
|
|
|
# If any contact has notify_all=True, notify all active contacts
|
|
notify_all = any(c.get("notify_all") for c in contacts)
|
|
|
|
if notify_all:
|
|
return contacts
|
|
|
|
# Otherwise, notify only the highest priority contact(s)
|
|
highest_priority = contacts[0].get("priority", 1)
|
|
return [c for c in contacts if c.get("priority", 1) == highest_priority]
|
|
|
|
|
|
def build_snitch_message(
|
|
user_uuid: str,
|
|
contact_name: str,
|
|
med_name: str,
|
|
nag_count: int,
|
|
missed_doses: int,
|
|
trigger_reason: str,
|
|
typical_schedule: str = "",
|
|
) -> str:
|
|
"""Build the snitch notification message."""
|
|
|
|
# Get user info
|
|
users = postgres.select("users", {"id": user_uuid})
|
|
username = users[0].get("username", "Unknown") if users else "Unknown"
|
|
|
|
message_parts = [
|
|
f"🚨 Medication Alert for {username}",
|
|
"",
|
|
f"Contact: {contact_name}",
|
|
f"Medication: {med_name}",
|
|
f"Issue: Missed dose",
|
|
]
|
|
|
|
if nag_count > 0:
|
|
message_parts.append(f"Reminders sent: {nag_count} times")
|
|
|
|
if missed_doses > 1:
|
|
message_parts.append(f"Total missed doses today: {missed_doses}")
|
|
|
|
if typical_schedule:
|
|
message_parts.append(f"Typical schedule: {typical_schedule}")
|
|
|
|
message_parts.extend(
|
|
[
|
|
"",
|
|
f"Triggered by: {trigger_reason}",
|
|
f"Time: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}",
|
|
]
|
|
)
|
|
|
|
return "\n".join(message_parts)
|
|
|
|
|
|
def send_snitch(
|
|
user_uuid: str,
|
|
med_id: str,
|
|
med_name: str,
|
|
nag_count: int,
|
|
missed_doses: int = 1,
|
|
trigger_reason: str = "max_nags",
|
|
) -> List[Dict]:
|
|
"""
|
|
Send snitch notifications to selected contacts.
|
|
|
|
Returns:
|
|
List of delivery results
|
|
"""
|
|
results = []
|
|
contacts = select_contacts_to_notify(user_uuid)
|
|
|
|
if not contacts:
|
|
return [{"success": False, "error": "No contacts configured"}]
|
|
|
|
# Get typical schedule for context
|
|
meds = postgres.select("medications", {"id": med_id})
|
|
typical_times = meds[0].get("times", []) if meds else []
|
|
typical_schedule = ", ".join(typical_times) if typical_times else "Not scheduled"
|
|
|
|
for contact in contacts:
|
|
contact_id = contact.get("id")
|
|
contact_name = contact.get("contact_name")
|
|
contact_type = contact.get("contact_type")
|
|
contact_value = contact.get("contact_value")
|
|
|
|
# Build message
|
|
message = build_snitch_message(
|
|
user_uuid,
|
|
contact_name,
|
|
med_name,
|
|
nag_count,
|
|
missed_doses,
|
|
trigger_reason,
|
|
typical_schedule,
|
|
)
|
|
|
|
# Send based on contact type
|
|
delivered = False
|
|
error_msg = None
|
|
|
|
try:
|
|
if contact_type == "discord":
|
|
# Send via Discord DM
|
|
delivered = _send_discord_snitch(contact_value, message)
|
|
elif contact_type == "email":
|
|
# Send via email (requires email setup)
|
|
delivered = _send_email_snitch(contact_value, message)
|
|
elif contact_type == "sms":
|
|
# Send via SMS (requires SMS provider)
|
|
delivered = _send_sms_snitch(contact_value, message)
|
|
else:
|
|
error_msg = f"Unknown contact type: {contact_type}"
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
delivered = False
|
|
|
|
# Log the snitch
|
|
log_data = {
|
|
"id": str(uuid.uuid4()),
|
|
"user_uuid": user_uuid,
|
|
"contact_id": contact_id,
|
|
"medication_id": med_id,
|
|
"trigger_reason": trigger_reason,
|
|
"snitch_count_today": get_todays_snitch_count(user_uuid) + 1,
|
|
"message_content": message,
|
|
"sent_at": datetime.utcnow(),
|
|
"delivered": delivered,
|
|
}
|
|
postgres.insert("snitch_log", log_data)
|
|
|
|
results.append(
|
|
{
|
|
"contact_id": contact_id,
|
|
"contact_name": contact_name,
|
|
"contact_type": contact_type,
|
|
"delivered": delivered,
|
|
"error": error_msg,
|
|
}
|
|
)
|
|
|
|
return results
|
|
|
|
|
|
def _send_discord_snitch(discord_user_id: str, message: str) -> bool:
|
|
"""Send snitch via Discord DM."""
|
|
# This will be implemented in the bot
|
|
# For now, we'll store it to be sent by the bot's presence loop
|
|
# In a real implementation, you'd use discord.py to send the message
|
|
import os
|
|
|
|
# Store in a queue for the bot to pick up
|
|
# Or use the existing notification system if it supports Discord
|
|
try:
|
|
# Try to use the existing notification system
|
|
# This is a placeholder - actual implementation would use discord.py
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error sending Discord snitch: {e}")
|
|
return False
|
|
|
|
|
|
def _send_email_snitch(email: str, message: str) -> bool:
|
|
"""Send snitch via email."""
|
|
# Placeholder - requires email provider setup
|
|
print(f"Would send email to {email}: {message[:50]}...")
|
|
return True
|
|
|
|
|
|
def _send_sms_snitch(phone: str, message: str) -> bool:
|
|
"""Send snitch via SMS."""
|
|
# Placeholder - requires SMS provider (Twilio, etc.)
|
|
print(f"Would send SMS to {phone}: {message[:50]}...")
|
|
return True
|
|
|
|
|
|
def update_consent(user_uuid: str, consent_given: bool):
|
|
"""Update user's snitch consent status."""
|
|
data = {
|
|
"id": str(uuid.uuid4()),
|
|
"user_uuid": user_uuid,
|
|
"snitch_enabled": False, # Disabled until fully configured
|
|
"consent_given": consent_given,
|
|
"created_at": datetime.utcnow(),
|
|
"updated_at": datetime.utcnow(),
|
|
}
|
|
postgres.upsert(
|
|
"snitch_settings",
|
|
data,
|
|
conflict_columns=["user_uuid"],
|
|
)
|
|
|
|
|
|
def get_snitch_history(user_uuid: str, days: int = 7) -> List[Dict]:
|
|
"""Get snitch history for the last N days."""
|
|
cutoff = datetime.utcnow() - timedelta(days=days)
|
|
|
|
rows = postgres.select("snitch_log", {"user_uuid": user_uuid})
|
|
|
|
# Filter to recent entries
|
|
recent = [row for row in rows if row.get("sent_at") and row["sent_at"] >= cutoff]
|
|
|
|
return recent
|