""" core/snitch.py - Snitch system for medication compliance Handles snitch triggers, contact selection, and notification delivery. """ import json import uuid from datetime import datetime, timedelta, date from typing import Optional, Dict, List, Tuple import core.postgres as postgres import core.notifications as notifications from core.tz import user_today_for def get_snitch_settings(user_uuid: str) -> Optional[Dict]: """Get user's snitch settings.""" rows = postgres.select("snitch_settings", {"user_uuid": user_uuid}) if rows: return rows[0] return None def get_snitch_contacts(user_uuid: str, active_only: bool = True) -> List[Dict]: """Get user's snitch contacts ordered by priority.""" where = {"user_uuid": user_uuid} if active_only: where["is_active"] = True rows = postgres.select("snitch_contacts", where) # Sort by priority (lower = higher priority) return sorted(rows, key=lambda x: x.get("priority", 1)) def get_todays_snitch_count(user_uuid: str) -> int: """Get number of snitches sent today (in the user's local timezone).""" today = user_today_for(user_uuid) # Query snitch log for today rows = postgres.select("snitch_log", {"user_uuid": user_uuid}) # Filter to today's entries today_count = 0 for row in rows: sent_at = row.get("sent_at") if sent_at and hasattr(sent_at, "date") and sent_at.date() == today: today_count += 1 return today_count def get_last_snitch_time(user_uuid: str) -> Optional[datetime]: """Get timestamp of last snitch for cooldown check.""" rows = postgres.select( "snitch_log", {"user_uuid": user_uuid}, order_by="sent_at DESC", limit=1 ) if rows: return rows[0].get("sent_at") return None def check_cooldown(user_uuid: str, cooldown_hours: int) -> bool: """Check if enough time has passed since last snitch.""" last_snitch = get_last_snitch_time(user_uuid) if not last_snitch: return True cooldown_period = timedelta(hours=cooldown_hours) return datetime.utcnow() - last_snitch >= cooldown_period def should_snitch( user_uuid: str, med_id: str, nag_count: int, missed_doses: int = 1 ) -> Tuple[bool, str, Optional[Dict]]: """ Determine if we should trigger a snitch. Returns: (should_snitch: bool, reason: str, settings: Optional[Dict]) """ settings = get_snitch_settings(user_uuid) if not settings: return False, "No snitch settings found", None if not settings.get("snitch_enabled"): return False, "Snitching disabled", settings # Check consent if settings.get("require_consent") and not settings.get("consent_given"): return False, "Consent not given", settings # Check rate limit max_per_day = settings.get("max_snitches_per_day", 2) if get_todays_snitch_count(user_uuid) >= max_per_day: return False, f"Max snitches per day reached ({max_per_day})", settings # Check cooldown cooldown_hours = settings.get("snitch_cooldown_hours", 4) if not check_cooldown(user_uuid, cooldown_hours): return False, f"Cooldown period not elapsed ({cooldown_hours}h)", settings # Check triggers trigger_after_nags = settings.get("trigger_after_nags", 4) trigger_after_doses = settings.get("trigger_after_missed_doses", 1) triggered_by_nags = nag_count >= trigger_after_nags triggered_by_doses = missed_doses >= trigger_after_doses if not triggered_by_nags and not triggered_by_doses: return ( False, f"Triggers not met (nags: {nag_count}/{trigger_after_nags}, doses: {missed_doses}/{trigger_after_doses})", settings, ) # Determine trigger reason if triggered_by_nags and triggered_by_doses: reason = "max_nags_and_missed_doses" elif triggered_by_nags: reason = "max_nags" else: reason = "missed_doses" return True, reason, settings def select_contacts_to_notify(user_uuid: str) -> List[Dict]: """Select which contacts to notify based on priority settings.""" contacts = get_snitch_contacts(user_uuid) if not contacts: return [] # If any contact has notify_all=True, notify all active contacts notify_all = any(c.get("notify_all") for c in contacts) if notify_all: return contacts # Otherwise, notify only the highest priority contact(s) highest_priority = contacts[0].get("priority", 1) return [c for c in contacts if c.get("priority", 1) == highest_priority] def build_snitch_message( user_uuid: str, contact_name: str, med_name: str, nag_count: int, missed_doses: int, trigger_reason: str, typical_schedule: str = "", ) -> str: """Build the snitch notification message.""" # Get user info users = postgres.select("users", {"id": user_uuid}) username = users[0].get("username", "Unknown") if users else "Unknown" message_parts = [ f"🚨 Medication Alert for {username}", "", f"Contact: {contact_name}", f"Medication: {med_name}", f"Issue: Missed dose", ] if nag_count > 0: message_parts.append(f"Reminders sent: {nag_count} times") if missed_doses > 1: message_parts.append(f"Total missed doses today: {missed_doses}") if typical_schedule: message_parts.append(f"Typical schedule: {typical_schedule}") message_parts.extend( [ "", f"Triggered by: {trigger_reason}", f"Time: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}", ] ) return "\n".join(message_parts) def send_snitch( user_uuid: str, med_id: str, med_name: str, nag_count: int, missed_doses: int = 1, trigger_reason: str = "max_nags", ) -> List[Dict]: """ Send snitch notifications to selected contacts. Returns: List of delivery results """ results = [] contacts = select_contacts_to_notify(user_uuid) if not contacts: return [{"success": False, "error": "No contacts configured"}] # Get typical schedule for context meds = postgres.select("medications", {"id": med_id}) typical_times = meds[0].get("times", []) if meds else [] typical_schedule = ", ".join(typical_times) if typical_times else "Not scheduled" for contact in contacts: contact_id = contact.get("id") contact_name = contact.get("contact_name") contact_type = contact.get("contact_type") contact_value = contact.get("contact_value") # Build message message = build_snitch_message( user_uuid, contact_name, med_name, nag_count, missed_doses, trigger_reason, typical_schedule, ) # Send based on contact type delivered = False error_msg = None try: if contact_type == "discord": # Send via Discord DM delivered = _send_discord_snitch(contact_value, message) elif contact_type == "email": # Send via email (requires email setup) delivered = _send_email_snitch(contact_value, message) elif contact_type == "sms": # Send via SMS (requires SMS provider) delivered = _send_sms_snitch(contact_value, message) else: error_msg = f"Unknown contact type: {contact_type}" except Exception as e: error_msg = str(e) delivered = False # Log the snitch log_data = { "id": str(uuid.uuid4()), "user_uuid": user_uuid, "contact_id": contact_id, "medication_id": med_id, "trigger_reason": trigger_reason, "snitch_count_today": get_todays_snitch_count(user_uuid) + 1, "message_content": message, "sent_at": datetime.utcnow(), "delivered": delivered, } postgres.insert("snitch_log", log_data) results.append( { "contact_id": contact_id, "contact_name": contact_name, "contact_type": contact_type, "delivered": delivered, "error": error_msg, } ) return results def _send_discord_snitch(discord_user_id: str, message: str) -> bool: """Send snitch via Discord DM.""" # This will be implemented in the bot # For now, we'll store it to be sent by the bot's presence loop # In a real implementation, you'd use discord.py to send the message import os # Store in a queue for the bot to pick up # Or use the existing notification system if it supports Discord try: # Try to use the existing notification system # This is a placeholder - actual implementation would use discord.py return True except Exception as e: print(f"Error sending Discord snitch: {e}") return False def _send_email_snitch(email: str, message: str) -> bool: """Send snitch via email.""" # Placeholder - requires email provider setup print(f"Would send email to {email}: {message[:50]}...") return True def _send_sms_snitch(phone: str, message: str) -> bool: """Send snitch via SMS.""" # Placeholder - requires SMS provider (Twilio, etc.) print(f"Would send SMS to {phone}: {message[:50]}...") return True def update_consent(user_uuid: str, consent_given: bool): """Update user's snitch consent status.""" data = { "id": str(uuid.uuid4()), "user_uuid": user_uuid, "snitch_enabled": False, # Disabled until fully configured "consent_given": consent_given, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), } postgres.upsert( "snitch_settings", data, conflict_columns=["user_uuid"], ) def get_snitch_history(user_uuid: str, days: int = 7) -> List[Dict]: """Get snitch history for the last N days.""" cutoff = datetime.utcnow() - timedelta(days=days) rows = postgres.select("snitch_log", {"user_uuid": user_uuid}) # Filter to recent entries recent = [row for row in rows if row.get("sent_at") and row["sent_at"] >= cutoff] return recent