Files
Synculous-2/core/snitch.py

338 lines
10 KiB
Python

"""
core/snitch.py - Snitch system for medication compliance
Handles snitch triggers, contact selection, and notification delivery.
"""
import json
import uuid
from datetime import datetime, timedelta, date
from typing import Optional, Dict, List, Tuple
import core.postgres as postgres
import core.notifications as notifications
from core.tz import user_today_for
def get_snitch_settings(user_uuid: str) -> Optional[Dict]:
"""Get user's snitch settings."""
rows = postgres.select("snitch_settings", {"user_uuid": user_uuid})
if rows:
return rows[0]
return None
def get_snitch_contacts(user_uuid: str, active_only: bool = True) -> List[Dict]:
"""Get user's snitch contacts ordered by priority."""
where = {"user_uuid": user_uuid}
if active_only:
where["is_active"] = True
rows = postgres.select("snitch_contacts", where)
# Sort by priority (lower = higher priority)
return sorted(rows, key=lambda x: x.get("priority", 1))
def get_todays_snitch_count(user_uuid: str) -> int:
"""Get number of snitches sent today (in the user's local timezone)."""
today = user_today_for(user_uuid)
# Query snitch log for today
rows = postgres.select("snitch_log", {"user_uuid": user_uuid})
# Filter to today's entries
today_count = 0
for row in rows:
sent_at = row.get("sent_at")
if sent_at and hasattr(sent_at, "date") and sent_at.date() == today:
today_count += 1
return today_count
def get_last_snitch_time(user_uuid: str) -> Optional[datetime]:
"""Get timestamp of last snitch for cooldown check."""
rows = postgres.select(
"snitch_log", {"user_uuid": user_uuid}, order_by="sent_at DESC", limit=1
)
if rows:
return rows[0].get("sent_at")
return None
def check_cooldown(user_uuid: str, cooldown_hours: int) -> bool:
"""Check if enough time has passed since last snitch."""
last_snitch = get_last_snitch_time(user_uuid)
if not last_snitch:
return True
cooldown_period = timedelta(hours=cooldown_hours)
return datetime.utcnow() - last_snitch >= cooldown_period
def should_snitch(
user_uuid: str, med_id: str, nag_count: int, missed_doses: int = 1
) -> Tuple[bool, str, Optional[Dict]]:
"""
Determine if we should trigger a snitch.
Returns:
(should_snitch: bool, reason: str, settings: Optional[Dict])
"""
settings = get_snitch_settings(user_uuid)
if not settings:
return False, "No snitch settings found", None
if not settings.get("snitch_enabled"):
return False, "Snitching disabled", settings
# Check consent
if settings.get("require_consent") and not settings.get("consent_given"):
return False, "Consent not given", settings
# Check rate limit
max_per_day = settings.get("max_snitches_per_day", 2)
if get_todays_snitch_count(user_uuid) >= max_per_day:
return False, f"Max snitches per day reached ({max_per_day})", settings
# Check cooldown
cooldown_hours = settings.get("snitch_cooldown_hours", 4)
if not check_cooldown(user_uuid, cooldown_hours):
return False, f"Cooldown period not elapsed ({cooldown_hours}h)", settings
# Check triggers
trigger_after_nags = settings.get("trigger_after_nags", 4)
trigger_after_doses = settings.get("trigger_after_missed_doses", 1)
triggered_by_nags = nag_count >= trigger_after_nags
triggered_by_doses = missed_doses >= trigger_after_doses
if not triggered_by_nags and not triggered_by_doses:
return (
False,
f"Triggers not met (nags: {nag_count}/{trigger_after_nags}, doses: {missed_doses}/{trigger_after_doses})",
settings,
)
# Determine trigger reason
if triggered_by_nags and triggered_by_doses:
reason = "max_nags_and_missed_doses"
elif triggered_by_nags:
reason = "max_nags"
else:
reason = "missed_doses"
return True, reason, settings
def select_contacts_to_notify(user_uuid: str) -> List[Dict]:
"""Select which contacts to notify based on priority settings."""
contacts = get_snitch_contacts(user_uuid)
if not contacts:
return []
# If any contact has notify_all=True, notify all active contacts
notify_all = any(c.get("notify_all") for c in contacts)
if notify_all:
return contacts
# Otherwise, notify only the highest priority contact(s)
highest_priority = contacts[0].get("priority", 1)
return [c for c in contacts if c.get("priority", 1) == highest_priority]
def build_snitch_message(
user_uuid: str,
contact_name: str,
med_name: str,
nag_count: int,
missed_doses: int,
trigger_reason: str,
typical_schedule: str = "",
) -> str:
"""Build the snitch notification message."""
# Get user info
users = postgres.select("users", {"id": user_uuid})
username = users[0].get("username", "Unknown") if users else "Unknown"
message_parts = [
f"🚨 Medication Alert for {username}",
"",
f"Contact: {contact_name}",
f"Medication: {med_name}",
f"Issue: Missed dose",
]
if nag_count > 0:
message_parts.append(f"Reminders sent: {nag_count} times")
if missed_doses > 1:
message_parts.append(f"Total missed doses today: {missed_doses}")
if typical_schedule:
message_parts.append(f"Typical schedule: {typical_schedule}")
message_parts.extend(
[
"",
f"Triggered by: {trigger_reason}",
f"Time: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}",
]
)
return "\n".join(message_parts)
def send_snitch(
user_uuid: str,
med_id: str,
med_name: str,
nag_count: int,
missed_doses: int = 1,
trigger_reason: str = "max_nags",
) -> List[Dict]:
"""
Send snitch notifications to selected contacts.
Returns:
List of delivery results
"""
results = []
contacts = select_contacts_to_notify(user_uuid)
if not contacts:
return [{"success": False, "error": "No contacts configured"}]
# Get typical schedule for context
meds = postgres.select("medications", {"id": med_id})
typical_times = meds[0].get("times", []) if meds else []
typical_schedule = ", ".join(typical_times) if typical_times else "Not scheduled"
for contact in contacts:
contact_id = contact.get("id")
contact_name = contact.get("contact_name")
contact_type = contact.get("contact_type")
contact_value = contact.get("contact_value")
# Build message
message = build_snitch_message(
user_uuid,
contact_name,
med_name,
nag_count,
missed_doses,
trigger_reason,
typical_schedule,
)
# Send based on contact type
delivered = False
error_msg = None
try:
if contact_type == "discord":
# Send via Discord DM
delivered = _send_discord_snitch(contact_value, message)
elif contact_type == "email":
# Send via email (requires email setup)
delivered = _send_email_snitch(contact_value, message)
elif contact_type == "sms":
# Send via SMS (requires SMS provider)
delivered = _send_sms_snitch(contact_value, message)
else:
error_msg = f"Unknown contact type: {contact_type}"
except Exception as e:
error_msg = str(e)
delivered = False
# Log the snitch
log_data = {
"id": str(uuid.uuid4()),
"user_uuid": user_uuid,
"contact_id": contact_id,
"medication_id": med_id,
"trigger_reason": trigger_reason,
"snitch_count_today": get_todays_snitch_count(user_uuid) + 1,
"message_content": message,
"sent_at": datetime.utcnow(),
"delivered": delivered,
}
postgres.insert("snitch_log", log_data)
results.append(
{
"contact_id": contact_id,
"contact_name": contact_name,
"contact_type": contact_type,
"delivered": delivered,
"error": error_msg,
}
)
return results
def _send_discord_snitch(discord_user_id: str, message: str) -> bool:
"""Send snitch via Discord DM."""
# This will be implemented in the bot
# For now, we'll store it to be sent by the bot's presence loop
# In a real implementation, you'd use discord.py to send the message
import os
# Store in a queue for the bot to pick up
# Or use the existing notification system if it supports Discord
try:
# Try to use the existing notification system
# This is a placeholder - actual implementation would use discord.py
return True
except Exception as e:
print(f"Error sending Discord snitch: {e}")
return False
def _send_email_snitch(email: str, message: str) -> bool:
"""Send snitch via email."""
# Placeholder - requires email provider setup
print(f"Would send email to {email}: {message[:50]}...")
return True
def _send_sms_snitch(phone: str, message: str) -> bool:
"""Send snitch via SMS."""
# Placeholder - requires SMS provider (Twilio, etc.)
print(f"Would send SMS to {phone}: {message[:50]}...")
return True
def update_consent(user_uuid: str, consent_given: bool):
"""Update user's snitch consent status."""
data = {
"id": str(uuid.uuid4()),
"user_uuid": user_uuid,
"snitch_enabled": False, # Disabled until fully configured
"consent_given": consent_given,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
}
postgres.upsert(
"snitch_settings",
data,
conflict_columns=["user_uuid"],
)
def get_snitch_history(user_uuid: str, days: int = 7) -> List[Dict]:
"""Get snitch history for the last N days."""
cutoff = datetime.utcnow() - timedelta(days=days)
rows = postgres.select("snitch_log", {"user_uuid": user_uuid})
# Filter to recent entries
recent = [row for row in rows if row.get("sent_at") and row["sent_at"] >= cutoff]
return recent