""" bot.py - Discord bot client with session management and command routing Features: - Login flow with username/password - Session management with JWT tokens - AI-powered command parsing via registry - Background task loop for polling - JurySystem DBT integration for mental health support """ import discord from discord.ext import tasks import os import sys import json import base64 import requests import bcrypt import pickle import numpy as np from openai import OpenAI from bot.command_registry import get_handler, list_registered, register_module import ai.parser as ai_parser import bot.commands.routines # noqa: F401 - registers handler import bot.commands.medications # noqa: F401 - registers handler import bot.commands.knowledge # noqa: F401 - registers handler DISCORD_BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN") API_URL = os.getenv("API_URL", "http://app:5000") user_sessions = {} login_state = {} message_history = {} user_cache = {} CACHE_FILE = "/app/user_cache.pkl" intents = discord.Intents.default() intents.message_content = True intents.presences = True intents.members = True client = discord.Client(intents=intents) # ==================== JurySystem Integration ==================== class SimpleVectorStore: """A simple in-memory vector store using NumPy.""" def __init__(self): self.vectors = [] self.metadata = [] def add(self, vectors, metadatas): self.vectors.extend(vectors) self.metadata.extend(metadatas) def search(self, query_vector, top_k=5): if not self.vectors: return [] query_vec = np.array(query_vector) doc_vecs = np.array(self.vectors) norms = np.linalg.norm(doc_vecs, axis=1) valid_indices = norms > 0 scores = np.zeros(len(doc_vecs)) dot_products = np.dot(doc_vecs, query_vec) scores[valid_indices] = dot_products[valid_indices] / ( norms[valid_indices] * np.linalg.norm(query_vec) ) top_indices = np.argsort(scores)[-top_k:][::-1] results = [] for idx in top_indices: results.append({"metadata": self.metadata[idx], "score": scores[idx]}) return results class JurySystem: """DBT Knowledge Base Query System""" def __init__(self): config_path = os.getenv("CONFIG_PATH", "config.json") kb_path = os.getenv( "KNOWLEDGE_BASE_PATH", "bot/data/dbt_knowledge.embeddings.json" ) with open(config_path, "r") as f: self.config = json.load(f) self.client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=self.config["openrouter_api_key"], ) self.vector_store = SimpleVectorStore() self._load_knowledge_base(kb_path) def _load_knowledge_base(self, kb_path): print(f"Loading DBT knowledge base from {kb_path}...") try: with open(kb_path, "r", encoding="utf-8") as f: data = json.load(f) vectors = [] metadata = [] for item in data: vectors.append(item["embedding"]) metadata.append( {"id": item["id"], "source": item["source"], "text": item["text"]} ) self.vector_store.add(vectors, metadata) print(f"Loaded {len(vectors)} chunks into DBT vector store.") except Exception as e: print(f"Error loading DBT knowledge base: {e}") raise def _retrieve_sync(self, query_text, top_k=5): """Embed query and search vector store. Returns list of chunk dicts.""" response = self.client.embeddings.create( model="qwen/qwen3-embedding-8b", input=query_text ) query_emb = response.data[0].embedding return self.vector_store.search(query_emb, top_k=top_k) async def retrieve(self, query_text, top_k=5): """Async retrieval — returns list of {metadata, score} dicts.""" import asyncio return await asyncio.to_thread(self._retrieve_sync, query_text, top_k) async def query(self, query_text): """Query the DBT knowledge base (legacy path, kept for compatibility).""" try: context_chunks = await self.retrieve(query_text) if not context_chunks: return "I couldn't find relevant DBT information for that query." context_text = "\n\n---\n\n".join( [chunk["metadata"]["text"] for chunk in context_chunks] ) system_prompt = """You are a helpful DBT (Dialectical Behavior Therapy) assistant. Use the provided context from the DBT Skills Training Handouts to answer the user's question. If the answer is not in the context, say you don't know based on the provided text. Be concise, compassionate, and practical.""" from ai.jury_council import generate_rag_answer return await generate_rag_answer(query_text, context_text, system_prompt) except Exception as e: return f"Error querying DBT knowledge base: {e}" # Initialize JurySystem jury_system = None try: jury_system = JurySystem() print("✓ JurySystem (DBT) initialized successfully") except Exception as e: print(f"⚠ JurySystem initialization failed: {e}") # ==================== Original Bot Functions ==================== def decodeJwtPayload(token): payload = token.split(".")[1] payload += "=" * (4 - len(payload) % 4) return json.loads(base64.urlsafe_b64decode(payload)) def apiRequest(method, endpoint, token=None, data=None, _retried=False): url = f"{API_URL}{endpoint}" headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" try: resp = getattr(requests, method)(url, headers=headers, json=data, timeout=10) # Auto-refresh on 401 using refresh token if resp.status_code == 401 and not _retried: new_token = _try_refresh_token_for_session(token) if new_token: return apiRequest(method, endpoint, token=new_token, data=data, _retried=True) try: return resp.json(), resp.status_code except ValueError: return {}, resp.status_code except requests.RequestException: return {"error": "API unavailable"}, 503 def _try_refresh_token_for_session(expired_token): """Find the discord user with this token and refresh it using their refresh token.""" for discord_id, session in user_sessions.items(): if session.get("token") == expired_token: refresh_token = session.get("refresh_token") if not refresh_token: # Check cache for refresh token cached = getCachedUser(discord_id) if cached: refresh_token = cached.get("refresh_token") if refresh_token: result, status = apiRequest("post", "/api/refresh", data={"refresh_token": refresh_token}, _retried=True) if status == 200 and "token" in result: new_token = result["token"] session["token"] = new_token # Update cache cached = getCachedUser(discord_id) or {} cached["refresh_token"] = refresh_token setCachedUser(discord_id, cached) return new_token return None def loadCache(): try: if os.path.exists(CACHE_FILE): with open(CACHE_FILE, "rb") as f: global user_cache user_cache = pickle.load(f) print(f"Loaded cache for {len(user_cache)} users") except Exception as e: print(f"Error loading cache: {e}") def saveCache(): try: with open(CACHE_FILE, "wb") as f: pickle.dump(user_cache, f) except Exception as e: print(f"Error saving cache: {e}") def hashPassword(password): return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verifyPassword(password, hashed): return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8")) def getCachedUser(discord_id): return user_cache.get(discord_id) def setCachedUser(discord_id, user_data): user_cache[discord_id] = user_data saveCache() def negotiateToken(discord_id, username, password): cached = getCachedUser(discord_id) # Try refresh token first (avoids sending password) if cached and cached.get("refresh_token"): result, status = apiRequest( "post", "/api/refresh", data={"refresh_token": cached["refresh_token"]}, _retried=True, ) if status == 200 and "token" in result: token = result["token"] payload = decodeJwtPayload(token) user_uuid = payload["sub"] cached["user_uuid"] = user_uuid setCachedUser(discord_id, cached) return token, user_uuid # Fall back to password login, always request refresh token (trust_device) login_data = {"username": username, "password": password, "trust_device": True} if ( cached and cached.get("username") == username and cached.get("hashed_password") and verifyPassword(password, cached.get("hashed_password")) ): result, status = apiRequest("post", "/api/login", data=login_data, _retried=True) if status == 200 and "token" in result: token = result["token"] payload = decodeJwtPayload(token) user_uuid = payload["sub"] setCachedUser( discord_id, { "hashed_password": cached["hashed_password"], "user_uuid": user_uuid, "username": username, "refresh_token": result.get("refresh_token"), }, ) return token, user_uuid return None, None result, status = apiRequest("post", "/api/login", data=login_data, _retried=True) if status == 200 and "token" in result: token = result["token"] payload = decodeJwtPayload(token) user_uuid = payload["sub"] setCachedUser( discord_id, { "hashed_password": hashPassword(password), "user_uuid": user_uuid, "username": username, "refresh_token": result.get("refresh_token"), }, ) return token, user_uuid return None, None async def handleAuthFailure(message): discord_id = message.author.id user_sessions.pop(discord_id, None) await message.channel.send( "Your session has expired. Send any message to log in again." ) async def handleLoginStep(message): discord_id = message.author.id state = login_state[discord_id] if state["step"] == "username": state["username"] = message.content.strip() state["step"] = "password" await message.channel.send("Password?") elif state["step"] == "password": username = state["username"] password = message.content.strip() del login_state[discord_id] token, user_uuid = negotiateToken(discord_id, username, password) if token and user_uuid: user_sessions[discord_id] = { "token": token, "user_uuid": user_uuid, "username": username, } registered = ", ".join(list_registered()) or "none" await message.channel.send( f"Welcome back **{username}**!\n\n" f"Registered modules: {registered}\n\n" f"Send 'help' for available commands." ) else: await message.channel.send( "Invalid credentials. Send any message to try again." ) async def sendHelpMessage(message): help_msg = """**🤖 Synculous Bot - Natural Language Commands** Just talk to me naturally! Here are some examples: **💊 Medications:** • "add lsd 50 mcg every tuesday at 4:20pm" • "take my wellbutrin" • "what meds do i have today?" • "show my refills" • "snooze my reminder for 30 minutes" • "check adherence" **📋 Routines:** • "create morning routine with brush teeth, shower, eat" • "start my morning routine" • "done" (complete current step) • "skip" (skip current step) • "pause/resume" (pause or continue) • "what steps are in my routine?" • "schedule workout for monday wednesday friday at 7am" • "show my stats" **🧠 DBT Support:** • "how do I use distress tolerance?" • "explain radical acceptance" • "give me a DBT skill for anger" • "what are the TIPP skills?" **💡 Tips:** • I understand natural language, typos, and slang • If I'm unsure, I'll ask for clarification • For important actions, I'll ask you to confirm with "yes" or "no" • When you're in a routine, shortcuts like "done", "skip", "pause" work automatically""" await message.channel.send(help_msg) async def checkActiveSession(session): """Check if user has an active routine session and return details.""" token = session.get("token") if not token: return None resp, status = apiRequest("get", "/api/sessions/active", token) if status == 200 and "session" in resp: return resp return None async def handleConfirmation(message, session): """Handle yes/no confirmation responses. Returns True if handled.""" discord_id = message.author.id user_input = message.content.lower().strip() if "pending_confirmations" not in session: return False pending = session["pending_confirmations"] if not pending: return False confirmation_id = list(pending.keys())[-1] confirmation_data = pending[confirmation_id] if user_input in ("yes", "y", "yeah", "sure", "ok", "confirm"): del pending[confirmation_id] interaction_type = confirmation_data.get("interaction_type") handler = get_handler(interaction_type) if handler: fake_parsed = confirmation_data.copy() fake_parsed["needs_confirmation"] = False await handler(message, session, fake_parsed) return True elif user_input in ("no", "n", "nah", "cancel", "abort"): del pending[confirmation_id] await message.channel.send("❌ Cancelled.") return True return False async def handleActiveSessionShortcuts(message, session, active_session): """Handle shortcuts like 'done', 'skip', 'next' when in active session.""" user_input = message.content.lower().strip() shortcuts = { "done": ("routine", "complete"), "finished": ("routine", "complete"), "complete": ("routine", "complete"), "next": ("routine", "complete"), "skip": ("routine", "skip"), "pass": ("routine", "skip"), "pause": ("routine", "pause"), "hold": ("routine", "pause"), "resume": ("routine", "resume"), "continue": ("routine", "resume"), "stop": ("routine", "cancel"), "quit": ("routine", "cancel"), "abort": ("routine", "abort"), } if user_input in shortcuts: interaction_type, action = shortcuts[user_input] handler = get_handler(interaction_type) if handler: fake_parsed = {"action": action} await handler(message, session, fake_parsed) return True return False async def handleDBTQuery(message): """Handle DBT-related queries using JurySystem + jury council pipeline.""" if not jury_system: return False # Keywords that indicate a DBT query dbt_keywords = [ "dbt", "distress tolerance", "emotion regulation", "interpersonal effectiveness", "mindfulness", "radical acceptance", "wise mind", "tipp", "dearman", "check the facts", "opposite action", "cope ahead", "abc please", "stop skill", "pros and cons", "half smile", "willing hands", ] user_input_lower = message.content.lower() is_dbt_query = any(keyword in user_input_lower for keyword in dbt_keywords) if not is_dbt_query: return False from ai.jury_council import ( generate_search_questions, run_jury_filter, generate_rag_answer, split_for_discord, ) async with message.channel.typing(): # Step 1: Generate candidate questions via Qwen Nitro (fallback: qwen3-235b) candidates, gen_error = await generate_search_questions(message.content) if gen_error: await message.channel.send(f"⚠️ **Question generator failed:** {gen_error}") return True # Step 2: Jury council filters candidates → safe question JSON list jury_result = await run_jury_filter(candidates, message.content) breakdown = jury_result.format_breakdown() # Always show the jury deliberation (verbose, as requested) for chunk in split_for_discord(breakdown): await message.channel.send(chunk) if jury_result.has_error: return True if not jury_result.safe_questions: return True await message.channel.send("🔍 Searching knowledge base with approved questions...") # Step 3: Multi-query retrieval — deduplicated by chunk ID seen_ids = set() context_chunks = [] for q in jury_result.safe_questions: results = await jury_system.retrieve(q) for r in results: chunk_id = r["metadata"].get("id") if chunk_id not in seen_ids: seen_ids.add(chunk_id) context_chunks.append(r["metadata"]["text"]) if not context_chunks: await message.channel.send("⚠️ No relevant content found in the knowledge base.") return True context = "\n\n---\n\n".join(context_chunks) # Step 4: Generate answer with qwen3-235b system_prompt = """You are a helpful mental health support assistant with expertise in DBT (Dialectical Behavior Therapy). Use the provided context to answer the user's question accurately and compassionately. If the answer is not in the context, say so — do not invent information. Be concise, practical, and supportive.""" answer = await generate_rag_answer(message.content, context, system_prompt) await message.channel.send(f"🧠 **Response:**\n{answer}") return True async def routeCommand(message): discord_id = message.author.id session = user_sessions[discord_id] user_input = message.content.lower() if "help" in user_input or "what can i say" in user_input: await sendHelpMessage(message) return # Check for active session first active_session = await checkActiveSession(session) # Handle confirmation responses confirmation_handled = await handleConfirmation(message, session) if confirmation_handled: return # Handle shortcuts when in active session if active_session: shortcut_handled = await handleActiveSessionShortcuts( message, session, active_session ) if shortcut_handled: return # Check for DBT queries dbt_handled = await handleDBTQuery(message) if dbt_handled: return async with message.channel.typing(): history = message_history.get(discord_id, []) # Add context about active session to help AI understand context = "" if active_session: session_data = active_session.get("session", {}) routine_name = session_data.get("routine_name", "a routine") current_step = session_data.get("current_step_index", 0) + 1 total_steps = active_session.get("total_steps", 0) context = f"\n[Context: User is currently in active session for '{routine_name}', on step {current_step} of {total_steps}. They can say 'done', 'skip', 'pause', 'resume', or 'stop'.]" parsed = await ai_parser.parse( message.content + context, "command_parser", history=history ) if discord_id not in message_history: message_history[discord_id] = [] message_history[discord_id].append((message.content, parsed)) message_history[discord_id] = message_history[discord_id][-5:] if "needs_clarification" in parsed: await message.channel.send( f"I'm not quite sure what you mean. {parsed['needs_clarification']}" ) return if "error" in parsed: await message.channel.send( f"I had trouble understanding that: {parsed['error']}" ) return interaction_type = parsed.get("interaction_type") handler = get_handler(interaction_type) if handler: await handler(message, session, parsed) else: registered = ", ".join(list_registered()) or "none" await message.channel.send( f"Unknown command type '{interaction_type}'. Registered modules: {registered}" ) def _restore_sessions_from_cache(): """Try to restore user sessions from cached refresh tokens on startup.""" restored = 0 for discord_id, cached in user_cache.items(): refresh_token = cached.get("refresh_token") if not refresh_token: continue result, status = apiRequest( "post", "/api/refresh", data={"refresh_token": refresh_token}, _retried=True, ) if status == 200 and "token" in result: token = result["token"] payload = decodeJwtPayload(token) user_uuid = payload["sub"] user_sessions[discord_id] = { "token": token, "user_uuid": user_uuid, "username": cached.get("username", ""), "refresh_token": refresh_token, } restored += 1 if restored: print(f"Restored {restored} user session(s) from cache") @client.event async def on_ready(): print(f"Bot logged in as {client.user}") loadCache() _restore_sessions_from_cache() backgroundLoop.start() @client.event async def on_message(message): if message.author == client.user: return if not isinstance(message.channel, discord.DMChannel): return discord_id = message.author.id if discord_id in login_state: await handleLoginStep(message) return if discord_id not in user_sessions: login_state[discord_id] = {"step": "username"} await message.channel.send("Welcome! Send your username to log in.") return await routeCommand(message) @tasks.loop(seconds=60) async def backgroundLoop(): """Override this in your domain module or extend as needed.""" pass @backgroundLoop.before_loop async def beforeBackgroundLoop(): await client.wait_until_ready() # ==================== Discord Presence Tracking ==================== async def update_presence_tracking(): """Track Discord presence for users with presence tracking enabled.""" print(f"[DEBUG] update_presence_tracking() called", flush=True) try: import core.adaptive_meds as adaptive_meds import core.postgres as postgres print(f"[DEBUG] Running presence tracking. Guilds: {len(client.guilds)}", flush=True) for guild in client.guilds: print(f"[DEBUG] Guild: {guild.name} ({guild.id}) - Members: {guild.member_count}") # Get all users with presence tracking enabled settings = postgres.select( "adaptive_med_settings", {"presence_tracking_enabled": True} ) print(f"[DEBUG] Found {len(settings)} users with presence tracking enabled") for setting in settings: user_uuid = setting.get("user_uuid") # Get user's Discord ID from notifications table notif_settings = postgres.select("notifications", {"user_uuid": user_uuid}) if not notif_settings: continue discord_user_id = notif_settings[0].get("discord_user_id") print(f"[DEBUG] Looking for Discord user: {discord_user_id}", flush=True) if not discord_user_id: print(f"[DEBUG] No Discord ID for user {user_uuid}", flush=True) continue # Get the member from a shared guild (needed for presence data) try: member = None target_id = int(discord_user_id) # Search through all guilds the bot is in for guild in client.guilds: member = guild.get_member(target_id) print(f"[DEBUG] Checked guild {guild.name}, member: {member}", flush=True) if member: break if not member: print(f"[DEBUG] User {discord_user_id} not found in any shared guild", flush=True) continue # Check if user is online is_online = member.status != discord.Status.offline print(f"[DEBUG] User status: {member.status}, is_online: {is_online}", flush=True) # Get current presence from DB presence = adaptive_meds.get_user_presence(user_uuid) was_online = presence.get("is_currently_online") if presence else False print(f"[DEBUG] Previous state: {was_online}, Current: {is_online}", flush=True) # Update presence if changed if is_online != was_online: adaptive_meds.update_user_presence( user_uuid, discord_user_id, is_online ) # Record the event from datetime import datetime event_type = "online" if is_online else "offline" adaptive_meds.record_presence_event( user_uuid, event_type, datetime.utcnow() ) print( f"Presence update: User {user_uuid} is now {'online' if is_online else 'offline'}" ) except Exception as e: print(f"Error tracking presence for user {user_uuid}: {e}") except Exception as e: print(f"Error in presence tracking loop: {e}") @tasks.loop(seconds=30) async def presenceTrackingLoop(): """Track Discord presence every 30 seconds.""" try: await update_presence_tracking() except Exception as e: print(f"[ERROR] presenceTrackingLoop failed: {e}", flush=True) import traceback traceback.print_exc() @presenceTrackingLoop.before_loop async def beforePresenceTrackingLoop(): await client.wait_until_ready() @tasks.loop(seconds=30) async def snitchCheckLoop(): """Check for pending snitch notifications and send them.""" try: import core.snitch as snitch_core import core.postgres as postgres from datetime import datetime, timedelta # Get pending snitches from the last 5 minutes that haven't been sent cutoff = datetime.utcnow() - timedelta(minutes=5) pending_snitches = postgres.select("snitch_log", where={"delivered": False}) for snitch in pending_snitches: sent_at = snitch.get("sent_at") if not sent_at or sent_at < cutoff: continue contact_id = snitch.get("contact_id") if not contact_id: continue # Get contact details contacts = postgres.select("snitch_contacts", {"id": contact_id}) if not contacts: continue contact = contacts[0] if contact.get("contact_type") != "discord": continue discord_user_id = contact.get("contact_value") message = snitch.get("message_content", "Snitch notification") try: # Send Discord DM user = await client.fetch_user(int(discord_user_id)) if user: await user.send(message) # Mark as delivered postgres.update( "snitch_log", {"delivered": True}, {"id": snitch.get("id")} ) print( f"Snitch sent to {contact.get('contact_name')} (Discord: {discord_user_id})" ) except Exception as e: print(f"Error sending snitch to {discord_user_id}: {e}") except Exception as e: print(f"Error in snitch check loop: {e}") @snitchCheckLoop.before_loop async def beforeSnitchCheckLoop(): await client.wait_until_ready() @client.event async def on_ready(): print(f"Bot logged in as {client.user}", flush=True) print(f"Connected to {len(client.guilds)} guilds", flush=True) loadCache() backgroundLoop.start() presenceTrackingLoop.start() print(f"[DEBUG] Presence tracking loop started", flush=True) snitchCheckLoop.start() if __name__ == "__main__": client.run(DISCORD_BOT_TOKEN)