Web client: trustDevice now defaults to true so a refresh token is always issued on login, preventing deauth after the 1-hour access token expiry. Users can still uncheck the box on shared devices. Bot: cache file path is now env-configurable (BOT_CACHE_FILE) and defaults to /app/cache/user_cache.pkl. Docker Compose mounts a named volume at /app/cache so the session cache survives container restarts. saveCache() now creates the directory if it doesn't exist. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
876 lines
29 KiB
Python
876 lines
29 KiB
Python
"""
|
|
bot.py - Discord bot client with session management and command routing
|
|
|
|
Features:
|
|
- Login flow with username/password
|
|
- Session management with JWT tokens
|
|
- AI-powered command parsing via registry
|
|
- Background task loop for polling
|
|
- JurySystem DBT integration for mental health support
|
|
"""
|
|
|
|
import discord
|
|
from discord.ext import tasks
|
|
import os
|
|
import sys
|
|
import json
|
|
import base64
|
|
import requests
|
|
import bcrypt
|
|
import pickle
|
|
import numpy as np
|
|
from openai import OpenAI
|
|
|
|
from bot.command_registry import get_handler, list_registered, register_module
|
|
import ai.parser as ai_parser
|
|
import bot.commands.routines # noqa: F401 - registers handler
|
|
import bot.commands.medications # noqa: F401 - registers handler
|
|
import bot.commands.knowledge # noqa: F401 - registers handler
|
|
|
|
DISCORD_BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
|
|
API_URL = os.getenv("API_URL", "http://app:5000")
|
|
|
|
user_sessions = {}
|
|
login_state = {}
|
|
message_history = {}
|
|
user_cache = {}
|
|
CACHE_FILE = os.getenv("BOT_CACHE_FILE", "/app/cache/user_cache.pkl")
|
|
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
intents.presences = True
|
|
intents.members = True
|
|
|
|
client = discord.Client(intents=intents)
|
|
|
|
|
|
# ==================== JurySystem Integration ====================
|
|
|
|
|
|
class SimpleVectorStore:
|
|
"""A simple in-memory vector store using NumPy."""
|
|
|
|
def __init__(self):
|
|
self.vectors = []
|
|
self.metadata = []
|
|
|
|
def add(self, vectors, metadatas):
|
|
self.vectors.extend(vectors)
|
|
self.metadata.extend(metadatas)
|
|
|
|
def search(self, query_vector, top_k=5):
|
|
if not self.vectors:
|
|
return []
|
|
|
|
query_vec = np.array(query_vector)
|
|
doc_vecs = np.array(self.vectors)
|
|
norms = np.linalg.norm(doc_vecs, axis=1)
|
|
valid_indices = norms > 0
|
|
scores = np.zeros(len(doc_vecs))
|
|
dot_products = np.dot(doc_vecs, query_vec)
|
|
scores[valid_indices] = dot_products[valid_indices] / (
|
|
norms[valid_indices] * np.linalg.norm(query_vec)
|
|
)
|
|
top_indices = np.argsort(scores)[-top_k:][::-1]
|
|
|
|
results = []
|
|
for idx in top_indices:
|
|
results.append({"metadata": self.metadata[idx], "score": scores[idx]})
|
|
return results
|
|
|
|
|
|
class JurySystem:
|
|
"""DBT Knowledge Base Query System"""
|
|
|
|
def __init__(self):
|
|
config_path = os.getenv("CONFIG_PATH", "config.json")
|
|
kb_path = os.getenv(
|
|
"KNOWLEDGE_BASE_PATH", "bot/data/dbt_knowledge.embeddings.json"
|
|
)
|
|
|
|
with open(config_path, "r") as f:
|
|
self.config = json.load(f)
|
|
|
|
self.client = OpenAI(
|
|
base_url="https://openrouter.ai/api/v1",
|
|
api_key=self.config["openrouter_api_key"],
|
|
)
|
|
self.vector_store = SimpleVectorStore()
|
|
self._load_knowledge_base(kb_path)
|
|
|
|
def _load_knowledge_base(self, kb_path):
|
|
print(f"Loading DBT knowledge base from {kb_path}...")
|
|
try:
|
|
with open(kb_path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
vectors = []
|
|
metadata = []
|
|
for item in data:
|
|
vectors.append(item["embedding"])
|
|
metadata.append(
|
|
{"id": item["id"], "source": item["source"], "text": item["text"]}
|
|
)
|
|
self.vector_store.add(vectors, metadata)
|
|
print(f"Loaded {len(vectors)} chunks into DBT vector store.")
|
|
except Exception as e:
|
|
print(f"Error loading DBT knowledge base: {e}")
|
|
raise
|
|
|
|
def _retrieve_sync(self, query_text, top_k=5):
|
|
"""Embed query and search vector store. Returns list of chunk dicts."""
|
|
response = self.client.embeddings.create(
|
|
model="qwen/qwen3-embedding-8b", input=query_text
|
|
)
|
|
query_emb = response.data[0].embedding
|
|
return self.vector_store.search(query_emb, top_k=top_k)
|
|
|
|
async def retrieve(self, query_text, top_k=5):
|
|
"""Async retrieval — returns list of {metadata, score} dicts."""
|
|
import asyncio
|
|
return await asyncio.to_thread(self._retrieve_sync, query_text, top_k)
|
|
|
|
async def query(self, query_text):
|
|
"""Query the DBT knowledge base (legacy path, kept for compatibility)."""
|
|
try:
|
|
context_chunks = await self.retrieve(query_text)
|
|
if not context_chunks:
|
|
return "I couldn't find relevant DBT information for that query."
|
|
|
|
context_text = "\n\n---\n\n".join(
|
|
[chunk["metadata"]["text"] for chunk in context_chunks]
|
|
)
|
|
|
|
system_prompt = """You are a helpful DBT (Dialectical Behavior Therapy) assistant.
|
|
Use the provided context from the DBT Skills Training Handouts to answer the user's question.
|
|
If the answer is not in the context, say you don't know based on the provided text.
|
|
Be concise, compassionate, and practical."""
|
|
|
|
from ai.jury_council import generate_rag_answer
|
|
return await generate_rag_answer(query_text, context_text, system_prompt)
|
|
except Exception as e:
|
|
return f"Error querying DBT knowledge base: {e}"
|
|
|
|
|
|
# Initialize JurySystem
|
|
jury_system = None
|
|
try:
|
|
jury_system = JurySystem()
|
|
print("✓ JurySystem (DBT) initialized successfully")
|
|
except Exception as e:
|
|
print(f"⚠ JurySystem initialization failed: {e}")
|
|
|
|
|
|
# ==================== Original Bot Functions ====================
|
|
|
|
|
|
def decodeJwtPayload(token):
|
|
payload = token.split(".")[1]
|
|
payload += "=" * (4 - len(payload) % 4)
|
|
return json.loads(base64.urlsafe_b64decode(payload))
|
|
|
|
|
|
def apiRequest(method, endpoint, token=None, data=None, _retried=False):
|
|
url = f"{API_URL}{endpoint}"
|
|
headers = {"Content-Type": "application/json"}
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
try:
|
|
resp = getattr(requests, method)(url, headers=headers, json=data, timeout=10)
|
|
# Auto-refresh on 401 using refresh token
|
|
if resp.status_code == 401 and not _retried:
|
|
new_token = _try_refresh_token_for_session(token)
|
|
if new_token:
|
|
return apiRequest(method, endpoint, token=new_token, data=data, _retried=True)
|
|
try:
|
|
return resp.json(), resp.status_code
|
|
except ValueError:
|
|
return {}, resp.status_code
|
|
except requests.RequestException:
|
|
return {"error": "API unavailable"}, 503
|
|
|
|
|
|
def _try_refresh_token_for_session(expired_token):
|
|
"""Find the discord user with this token and refresh it using their refresh token."""
|
|
for discord_id, session in user_sessions.items():
|
|
if session.get("token") == expired_token:
|
|
refresh_token = session.get("refresh_token")
|
|
if not refresh_token:
|
|
# Check cache for refresh token
|
|
cached = getCachedUser(discord_id)
|
|
if cached:
|
|
refresh_token = cached.get("refresh_token")
|
|
if refresh_token:
|
|
result, status = apiRequest("post", "/api/refresh",
|
|
data={"refresh_token": refresh_token},
|
|
_retried=True)
|
|
if status == 200 and "token" in result:
|
|
new_token = result["token"]
|
|
session["token"] = new_token
|
|
# Update cache
|
|
cached = getCachedUser(discord_id) or {}
|
|
cached["refresh_token"] = refresh_token
|
|
setCachedUser(discord_id, cached)
|
|
return new_token
|
|
return None
|
|
|
|
|
|
def loadCache():
|
|
try:
|
|
if os.path.exists(CACHE_FILE):
|
|
with open(CACHE_FILE, "rb") as f:
|
|
global user_cache
|
|
user_cache = pickle.load(f)
|
|
print(f"Loaded cache for {len(user_cache)} users")
|
|
except Exception as e:
|
|
print(f"Error loading cache: {e}")
|
|
|
|
|
|
def saveCache():
|
|
try:
|
|
os.makedirs(os.path.dirname(CACHE_FILE), exist_ok=True)
|
|
with open(CACHE_FILE, "wb") as f:
|
|
pickle.dump(user_cache, f)
|
|
except Exception as e:
|
|
print(f"Error saving cache: {e}")
|
|
|
|
|
|
def hashPassword(password):
|
|
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
|
|
|
|
|
def verifyPassword(password, hashed):
|
|
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
|
|
|
|
|
|
def getCachedUser(discord_id):
|
|
return user_cache.get(discord_id)
|
|
|
|
|
|
def setCachedUser(discord_id, user_data):
|
|
user_cache[discord_id] = user_data
|
|
saveCache()
|
|
|
|
|
|
def negotiateToken(discord_id, username, password):
|
|
cached = getCachedUser(discord_id)
|
|
|
|
# Try refresh token first (avoids sending password)
|
|
if cached and cached.get("refresh_token"):
|
|
result, status = apiRequest(
|
|
"post", "/api/refresh",
|
|
data={"refresh_token": cached["refresh_token"]},
|
|
_retried=True,
|
|
)
|
|
if status == 200 and "token" in result:
|
|
token = result["token"]
|
|
payload = decodeJwtPayload(token)
|
|
user_uuid = payload["sub"]
|
|
cached["user_uuid"] = user_uuid
|
|
setCachedUser(discord_id, cached)
|
|
return token, user_uuid
|
|
|
|
# Fall back to password login, always request refresh token (trust_device)
|
|
login_data = {"username": username, "password": password, "trust_device": True}
|
|
|
|
if (
|
|
cached
|
|
and cached.get("username") == username
|
|
and cached.get("hashed_password")
|
|
and verifyPassword(password, cached.get("hashed_password"))
|
|
):
|
|
result, status = apiRequest("post", "/api/login", data=login_data, _retried=True)
|
|
if status == 200 and "token" in result:
|
|
token = result["token"]
|
|
payload = decodeJwtPayload(token)
|
|
user_uuid = payload["sub"]
|
|
setCachedUser(
|
|
discord_id,
|
|
{
|
|
"hashed_password": cached["hashed_password"],
|
|
"user_uuid": user_uuid,
|
|
"username": username,
|
|
"refresh_token": result.get("refresh_token"),
|
|
},
|
|
)
|
|
return token, user_uuid
|
|
return None, None
|
|
|
|
result, status = apiRequest("post", "/api/login", data=login_data, _retried=True)
|
|
if status == 200 and "token" in result:
|
|
token = result["token"]
|
|
payload = decodeJwtPayload(token)
|
|
user_uuid = payload["sub"]
|
|
setCachedUser(
|
|
discord_id,
|
|
{
|
|
"hashed_password": hashPassword(password),
|
|
"user_uuid": user_uuid,
|
|
"username": username,
|
|
"refresh_token": result.get("refresh_token"),
|
|
},
|
|
)
|
|
return token, user_uuid
|
|
return None, None
|
|
|
|
|
|
async def handleAuthFailure(message):
|
|
discord_id = message.author.id
|
|
user_sessions.pop(discord_id, None)
|
|
await message.channel.send(
|
|
"Your session has expired. Send any message to log in again."
|
|
)
|
|
|
|
|
|
async def handleLoginStep(message):
|
|
discord_id = message.author.id
|
|
state = login_state[discord_id]
|
|
|
|
if state["step"] == "username":
|
|
state["username"] = message.content.strip()
|
|
state["step"] = "password"
|
|
await message.channel.send("Password?")
|
|
|
|
elif state["step"] == "password":
|
|
username = state["username"]
|
|
password = message.content.strip()
|
|
del login_state[discord_id]
|
|
|
|
token, user_uuid = negotiateToken(discord_id, username, password)
|
|
|
|
if token and user_uuid:
|
|
user_sessions[discord_id] = {
|
|
"token": token,
|
|
"user_uuid": user_uuid,
|
|
"username": username,
|
|
}
|
|
registered = ", ".join(list_registered()) or "none"
|
|
await message.channel.send(
|
|
f"Welcome back **{username}**!\n\n"
|
|
f"Registered modules: {registered}\n\n"
|
|
f"Send 'help' for available commands."
|
|
)
|
|
else:
|
|
await message.channel.send(
|
|
"Invalid credentials. Send any message to try again."
|
|
)
|
|
|
|
|
|
async def sendHelpMessage(message):
|
|
help_msg = """**🤖 Synculous Bot - Natural Language Commands**
|
|
|
|
Just talk to me naturally! Here are some examples:
|
|
|
|
**💊 Medications:**
|
|
• "add lsd 50 mcg every tuesday at 4:20pm"
|
|
• "take my wellbutrin"
|
|
• "what meds do i have today?"
|
|
• "show my refills"
|
|
• "snooze my reminder for 30 minutes"
|
|
• "check adherence"
|
|
|
|
**📋 Routines:**
|
|
• "create morning routine with brush teeth, shower, eat"
|
|
• "start my morning routine"
|
|
• "done" (complete current step)
|
|
• "skip" (skip current step)
|
|
• "pause/resume" (pause or continue)
|
|
• "what steps are in my routine?"
|
|
• "schedule workout for monday wednesday friday at 7am"
|
|
• "show my stats"
|
|
|
|
**🧠 DBT Support:**
|
|
• "how do I use distress tolerance?"
|
|
• "explain radical acceptance"
|
|
• "give me a DBT skill for anger"
|
|
• "what are the TIPP skills?"
|
|
|
|
**💡 Tips:**
|
|
• I understand natural language, typos, and slang
|
|
• If I'm unsure, I'll ask for clarification
|
|
• For important actions, I'll ask you to confirm with "yes" or "no"
|
|
• When you're in a routine, shortcuts like "done", "skip", "pause" work automatically"""
|
|
await message.channel.send(help_msg)
|
|
|
|
|
|
async def checkActiveSession(session):
|
|
"""Check if user has an active routine session and return details."""
|
|
token = session.get("token")
|
|
if not token:
|
|
return None
|
|
|
|
resp, status = apiRequest("get", "/api/sessions/active", token)
|
|
if status == 200 and "session" in resp:
|
|
return resp
|
|
return None
|
|
|
|
|
|
async def handleConfirmation(message, session):
|
|
"""Handle yes/no confirmation responses. Returns True if handled."""
|
|
discord_id = message.author.id
|
|
user_input = message.content.lower().strip()
|
|
|
|
if "pending_confirmations" not in session:
|
|
return False
|
|
|
|
pending = session["pending_confirmations"]
|
|
if not pending:
|
|
return False
|
|
|
|
confirmation_id = list(pending.keys())[-1]
|
|
confirmation_data = pending[confirmation_id]
|
|
|
|
if user_input in ("yes", "y", "yeah", "sure", "ok", "confirm"):
|
|
del pending[confirmation_id]
|
|
interaction_type = confirmation_data.get("interaction_type")
|
|
handler = get_handler(interaction_type)
|
|
|
|
if handler:
|
|
fake_parsed = confirmation_data.copy()
|
|
fake_parsed["needs_confirmation"] = False
|
|
await handler(message, session, fake_parsed)
|
|
return True
|
|
|
|
elif user_input in ("no", "n", "nah", "cancel", "abort"):
|
|
del pending[confirmation_id]
|
|
await message.channel.send("❌ Cancelled.")
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
async def handleActiveSessionShortcuts(message, session, active_session):
|
|
"""Handle shortcuts like 'done', 'skip', 'next' when in active session."""
|
|
user_input = message.content.lower().strip()
|
|
|
|
shortcuts = {
|
|
"done": ("routine", "complete"),
|
|
"finished": ("routine", "complete"),
|
|
"complete": ("routine", "complete"),
|
|
"next": ("routine", "complete"),
|
|
"skip": ("routine", "skip"),
|
|
"pass": ("routine", "skip"),
|
|
"pause": ("routine", "pause"),
|
|
"hold": ("routine", "pause"),
|
|
"resume": ("routine", "resume"),
|
|
"continue": ("routine", "resume"),
|
|
"stop": ("routine", "cancel"),
|
|
"quit": ("routine", "cancel"),
|
|
"abort": ("routine", "abort"),
|
|
}
|
|
|
|
if user_input in shortcuts:
|
|
interaction_type, action = shortcuts[user_input]
|
|
handler = get_handler(interaction_type)
|
|
if handler:
|
|
fake_parsed = {"action": action}
|
|
await handler(message, session, fake_parsed)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
async def handleDBTQuery(message):
|
|
"""Handle DBT-related queries using JurySystem + jury council pipeline."""
|
|
if not jury_system:
|
|
return False
|
|
|
|
# Keywords that indicate a DBT query
|
|
dbt_keywords = [
|
|
"dbt",
|
|
"distress tolerance",
|
|
"emotion regulation",
|
|
"interpersonal effectiveness",
|
|
"mindfulness",
|
|
"radical acceptance",
|
|
"wise mind",
|
|
"tipp",
|
|
"dearman",
|
|
"check the facts",
|
|
"opposite action",
|
|
"cope ahead",
|
|
"abc please",
|
|
"stop skill",
|
|
"pros and cons",
|
|
"half smile",
|
|
"willing hands",
|
|
]
|
|
|
|
user_input_lower = message.content.lower()
|
|
is_dbt_query = any(keyword in user_input_lower for keyword in dbt_keywords)
|
|
|
|
if not is_dbt_query:
|
|
return False
|
|
|
|
from ai.jury_council import (
|
|
generate_search_questions,
|
|
run_jury_filter,
|
|
generate_rag_answer,
|
|
split_for_discord,
|
|
)
|
|
|
|
async with message.channel.typing():
|
|
# Step 1: Generate candidate questions via Qwen Nitro (fallback: qwen3-235b)
|
|
candidates, gen_error = await generate_search_questions(message.content)
|
|
if gen_error:
|
|
await message.channel.send(f"⚠️ **Question generator failed:** {gen_error}")
|
|
return True
|
|
|
|
# Step 2: Jury council filters candidates → safe question JSON list
|
|
jury_result = await run_jury_filter(candidates, message.content)
|
|
breakdown = jury_result.format_breakdown()
|
|
|
|
# Always show the jury deliberation (verbose, as requested)
|
|
for chunk in split_for_discord(breakdown):
|
|
await message.channel.send(chunk)
|
|
|
|
if jury_result.has_error:
|
|
return True
|
|
|
|
if not jury_result.safe_questions:
|
|
return True
|
|
|
|
await message.channel.send("🔍 Searching knowledge base with approved questions...")
|
|
|
|
# Step 3: Multi-query retrieval — deduplicated by chunk ID
|
|
seen_ids = set()
|
|
context_chunks = []
|
|
for q in jury_result.safe_questions:
|
|
results = await jury_system.retrieve(q)
|
|
for r in results:
|
|
chunk_id = r["metadata"].get("id")
|
|
if chunk_id not in seen_ids:
|
|
seen_ids.add(chunk_id)
|
|
context_chunks.append(r["metadata"]["text"])
|
|
|
|
if not context_chunks:
|
|
await message.channel.send("⚠️ No relevant content found in the knowledge base.")
|
|
return True
|
|
|
|
context = "\n\n---\n\n".join(context_chunks)
|
|
|
|
# Step 4: Generate answer with qwen3-235b
|
|
system_prompt = """You are a helpful mental health support assistant with expertise in DBT (Dialectical Behavior Therapy).
|
|
Use the provided context to answer the user's question accurately and compassionately.
|
|
If the answer is not in the context, say so — do not invent information.
|
|
Be concise, practical, and supportive."""
|
|
|
|
answer = await generate_rag_answer(message.content, context, system_prompt)
|
|
|
|
await message.channel.send(f"🧠 **Response:**\n{answer}")
|
|
return True
|
|
|
|
|
|
async def routeCommand(message):
|
|
discord_id = message.author.id
|
|
session = user_sessions[discord_id]
|
|
user_input = message.content.lower()
|
|
|
|
if "help" in user_input or "what can i say" in user_input:
|
|
await sendHelpMessage(message)
|
|
return
|
|
|
|
# Check for active session first
|
|
active_session = await checkActiveSession(session)
|
|
|
|
# Handle confirmation responses
|
|
confirmation_handled = await handleConfirmation(message, session)
|
|
if confirmation_handled:
|
|
return
|
|
|
|
# Handle shortcuts when in active session
|
|
if active_session:
|
|
shortcut_handled = await handleActiveSessionShortcuts(
|
|
message, session, active_session
|
|
)
|
|
if shortcut_handled:
|
|
return
|
|
|
|
# Check for DBT queries
|
|
dbt_handled = await handleDBTQuery(message)
|
|
if dbt_handled:
|
|
return
|
|
|
|
async with message.channel.typing():
|
|
history = message_history.get(discord_id, [])
|
|
|
|
# Add context about active session to help AI understand
|
|
context = ""
|
|
if active_session:
|
|
session_data = active_session.get("session", {})
|
|
routine_name = session_data.get("routine_name", "a routine")
|
|
current_step = session_data.get("current_step_index", 0) + 1
|
|
total_steps = active_session.get("total_steps", 0)
|
|
context = f"\n[Context: User is currently in active session for '{routine_name}', on step {current_step} of {total_steps}. They can say 'done', 'skip', 'pause', 'resume', or 'stop'.]"
|
|
|
|
parsed = await ai_parser.parse(
|
|
message.content + context, "command_parser", history=history
|
|
)
|
|
|
|
if discord_id not in message_history:
|
|
message_history[discord_id] = []
|
|
message_history[discord_id].append((message.content, parsed))
|
|
message_history[discord_id] = message_history[discord_id][-5:]
|
|
|
|
if "needs_clarification" in parsed:
|
|
await message.channel.send(
|
|
f"I'm not quite sure what you mean. {parsed['needs_clarification']}"
|
|
)
|
|
return
|
|
|
|
if "error" in parsed:
|
|
await message.channel.send(
|
|
f"I had trouble understanding that: {parsed['error']}"
|
|
)
|
|
return
|
|
|
|
interaction_type = parsed.get("interaction_type")
|
|
handler = get_handler(interaction_type)
|
|
|
|
if handler:
|
|
await handler(message, session, parsed)
|
|
else:
|
|
registered = ", ".join(list_registered()) or "none"
|
|
await message.channel.send(
|
|
f"Unknown command type '{interaction_type}'. Registered modules: {registered}"
|
|
)
|
|
|
|
|
|
def _restore_sessions_from_cache():
|
|
"""Try to restore user sessions from cached refresh tokens on startup."""
|
|
restored = 0
|
|
for discord_id, cached in user_cache.items():
|
|
refresh_token = cached.get("refresh_token")
|
|
if not refresh_token:
|
|
continue
|
|
result, status = apiRequest(
|
|
"post", "/api/refresh",
|
|
data={"refresh_token": refresh_token},
|
|
_retried=True,
|
|
)
|
|
if status == 200 and "token" in result:
|
|
token = result["token"]
|
|
payload = decodeJwtPayload(token)
|
|
user_uuid = payload["sub"]
|
|
user_sessions[discord_id] = {
|
|
"token": token,
|
|
"user_uuid": user_uuid,
|
|
"username": cached.get("username", ""),
|
|
"refresh_token": refresh_token,
|
|
}
|
|
restored += 1
|
|
if restored:
|
|
print(f"Restored {restored} user session(s) from cache")
|
|
|
|
|
|
@client.event
|
|
async def on_ready():
|
|
print(f"Bot logged in as {client.user}")
|
|
loadCache()
|
|
_restore_sessions_from_cache()
|
|
backgroundLoop.start()
|
|
|
|
|
|
@client.event
|
|
async def on_message(message):
|
|
if message.author == client.user:
|
|
return
|
|
if not isinstance(message.channel, discord.DMChannel):
|
|
return
|
|
|
|
discord_id = message.author.id
|
|
|
|
if discord_id in login_state:
|
|
await handleLoginStep(message)
|
|
return
|
|
|
|
if discord_id not in user_sessions:
|
|
login_state[discord_id] = {"step": "username"}
|
|
await message.channel.send("Welcome! Send your username to log in.")
|
|
return
|
|
|
|
await routeCommand(message)
|
|
|
|
|
|
@tasks.loop(seconds=60)
|
|
async def backgroundLoop():
|
|
"""Override this in your domain module or extend as needed."""
|
|
pass
|
|
|
|
|
|
@backgroundLoop.before_loop
|
|
async def beforeBackgroundLoop():
|
|
await client.wait_until_ready()
|
|
|
|
|
|
# ==================== Discord Presence Tracking ====================
|
|
|
|
|
|
async def update_presence_tracking():
|
|
"""Track Discord presence for users with presence tracking enabled."""
|
|
print(f"[DEBUG] update_presence_tracking() called", flush=True)
|
|
try:
|
|
import core.adaptive_meds as adaptive_meds
|
|
import core.postgres as postgres
|
|
|
|
print(f"[DEBUG] Running presence tracking. Guilds: {len(client.guilds)}", flush=True)
|
|
for guild in client.guilds:
|
|
print(f"[DEBUG] Guild: {guild.name} ({guild.id}) - Members: {guild.member_count}")
|
|
|
|
# Get all users with presence tracking enabled
|
|
settings = postgres.select(
|
|
"adaptive_med_settings", {"presence_tracking_enabled": True}
|
|
)
|
|
|
|
print(f"[DEBUG] Found {len(settings)} users with presence tracking enabled")
|
|
|
|
for setting in settings:
|
|
user_uuid = setting.get("user_uuid")
|
|
|
|
# Get user's Discord ID from notifications table
|
|
notif_settings = postgres.select("notifications", {"user_uuid": user_uuid})
|
|
if not notif_settings:
|
|
continue
|
|
|
|
discord_user_id = notif_settings[0].get("discord_user_id")
|
|
print(f"[DEBUG] Looking for Discord user: {discord_user_id}", flush=True)
|
|
if not discord_user_id:
|
|
print(f"[DEBUG] No Discord ID for user {user_uuid}", flush=True)
|
|
continue
|
|
|
|
# Get the member from a shared guild (needed for presence data)
|
|
try:
|
|
member = None
|
|
target_id = int(discord_user_id)
|
|
|
|
# Search through all guilds the bot is in
|
|
for guild in client.guilds:
|
|
member = guild.get_member(target_id)
|
|
print(f"[DEBUG] Checked guild {guild.name}, member: {member}", flush=True)
|
|
if member:
|
|
break
|
|
|
|
if not member:
|
|
print(f"[DEBUG] User {discord_user_id} not found in any shared guild", flush=True)
|
|
continue
|
|
|
|
# Check if user is online
|
|
is_online = member.status != discord.Status.offline
|
|
print(f"[DEBUG] User status: {member.status}, is_online: {is_online}", flush=True)
|
|
|
|
# Get current presence from DB
|
|
presence = adaptive_meds.get_user_presence(user_uuid)
|
|
was_online = presence.get("is_currently_online") if presence else False
|
|
print(f"[DEBUG] Previous state: {was_online}, Current: {is_online}", flush=True)
|
|
|
|
# Update presence if changed
|
|
if is_online != was_online:
|
|
adaptive_meds.update_user_presence(
|
|
user_uuid, discord_user_id, is_online
|
|
)
|
|
|
|
# Record the event
|
|
from datetime import datetime
|
|
|
|
event_type = "online" if is_online else "offline"
|
|
adaptive_meds.record_presence_event(
|
|
user_uuid, event_type, datetime.utcnow()
|
|
)
|
|
|
|
print(
|
|
f"Presence update: User {user_uuid} is now {'online' if is_online else 'offline'}"
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f"Error tracking presence for user {user_uuid}: {e}")
|
|
|
|
except Exception as e:
|
|
print(f"Error in presence tracking loop: {e}")
|
|
|
|
|
|
@tasks.loop(seconds=30)
|
|
async def presenceTrackingLoop():
|
|
"""Track Discord presence every 30 seconds."""
|
|
try:
|
|
await update_presence_tracking()
|
|
except Exception as e:
|
|
print(f"[ERROR] presenceTrackingLoop failed: {e}", flush=True)
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
@presenceTrackingLoop.before_loop
|
|
async def beforePresenceTrackingLoop():
|
|
await client.wait_until_ready()
|
|
|
|
|
|
@tasks.loop(seconds=30)
|
|
async def snitchCheckLoop():
|
|
"""Check for pending snitch notifications and send them."""
|
|
try:
|
|
import core.snitch as snitch_core
|
|
import core.postgres as postgres
|
|
from datetime import datetime, timedelta
|
|
|
|
# Get pending snitches from the last 5 minutes that haven't been sent
|
|
cutoff = datetime.utcnow() - timedelta(minutes=5)
|
|
pending_snitches = postgres.select("snitch_log", where={"delivered": False})
|
|
|
|
for snitch in pending_snitches:
|
|
sent_at = snitch.get("sent_at")
|
|
if not sent_at or sent_at < cutoff:
|
|
continue
|
|
|
|
contact_id = snitch.get("contact_id")
|
|
if not contact_id:
|
|
continue
|
|
|
|
# Get contact details
|
|
contacts = postgres.select("snitch_contacts", {"id": contact_id})
|
|
if not contacts:
|
|
continue
|
|
|
|
contact = contacts[0]
|
|
if contact.get("contact_type") != "discord":
|
|
continue
|
|
|
|
discord_user_id = contact.get("contact_value")
|
|
message = snitch.get("message_content", "Snitch notification")
|
|
|
|
try:
|
|
# Send Discord DM
|
|
user = await client.fetch_user(int(discord_user_id))
|
|
if user:
|
|
await user.send(message)
|
|
# Mark as delivered
|
|
postgres.update(
|
|
"snitch_log", {"delivered": True}, {"id": snitch.get("id")}
|
|
)
|
|
print(
|
|
f"Snitch sent to {contact.get('contact_name')} (Discord: {discord_user_id})"
|
|
)
|
|
except Exception as e:
|
|
print(f"Error sending snitch to {discord_user_id}: {e}")
|
|
|
|
except Exception as e:
|
|
print(f"Error in snitch check loop: {e}")
|
|
|
|
|
|
@snitchCheckLoop.before_loop
|
|
async def beforeSnitchCheckLoop():
|
|
await client.wait_until_ready()
|
|
|
|
|
|
@client.event
|
|
async def on_ready():
|
|
print(f"Bot logged in as {client.user}", flush=True)
|
|
print(f"Connected to {len(client.guilds)} guilds", flush=True)
|
|
loadCache()
|
|
backgroundLoop.start()
|
|
presenceTrackingLoop.start()
|
|
print(f"[DEBUG] Presence tracking loop started", flush=True)
|
|
snitchCheckLoop.start()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
client.run(DISCORD_BOT_TOKEN)
|