140 lines
5.0 KiB
Python
140 lines
5.0 KiB
Python
import json
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Dict
|
|
|
|
import requests
|
|
|
|
from Memory import MemoryManager
|
|
|
|
|
|
ntfyBaseUrl = os.getenv("NTFY_BASE_URL")
|
|
ntfyFixedTopic = os.getenv("NTFY_TOPIC")
|
|
ntfyTopicTemplate = os.getenv("NTFY_TOPIC_TEMPLATE", "adhdbot-{userId}")
|
|
ntfyAuthToken = os.getenv("NTFY_AUTH_TOKEN")
|
|
|
|
|
|
class NotificationDispatcher:
|
|
"""Lightweight bridge to ntfy for reminder payloads."""
|
|
|
|
@staticmethod
|
|
def handleAiResponse(userId: str, aiResponseText: str):
|
|
if not ntfyBaseUrl:
|
|
return
|
|
candidates = MemoryManager.collectJsonCandidates(aiResponseText)
|
|
for candidate in candidates:
|
|
try:
|
|
payload = json.loads(candidate)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if payload.get("action") != "schedule_reminder":
|
|
continue
|
|
reminder = payload.get("reminder") or {}
|
|
NotificationDispatcher.sendReminder(userId, reminder)
|
|
|
|
@staticmethod
|
|
def sendReminder(userId: str, reminder: Dict[str, Any]):
|
|
metadata = reminder.get("metadata") or {}
|
|
actualUser = userId or reminder.get("user") or metadata.get("user") or "user"
|
|
topic = NotificationDispatcher.buildTopic(actualUser)
|
|
url = NotificationDispatcher.buildTopicUrl(topic)
|
|
if not url:
|
|
return
|
|
|
|
message_text = str(reminder.get("message") or reminder.get("details") or "Reminder from ADHDbot").strip()
|
|
if not message_text:
|
|
message_text = "Reminder from ADHDbot"
|
|
|
|
due_text = NotificationDispatcher.normalizeTrigger(reminder)
|
|
body_lines = [message_text]
|
|
if due_text:
|
|
body_lines.append(f"Due: {due_text}")
|
|
body = "\n".join(body_lines)
|
|
|
|
headers = NotificationDispatcher.buildHeaders(message_text)
|
|
payload = body.encode("utf-8")
|
|
|
|
try:
|
|
NotificationDispatcher.postToNtfy(url, payload, headers)
|
|
print(f"[notify] Sent reminder to ntfy topic '{topic}'")
|
|
except UnicodeEncodeError:
|
|
safe_headers = NotificationDispatcher.stripUnicodeHeaders(headers)
|
|
try:
|
|
NotificationDispatcher.postToNtfy(url, payload, safe_headers)
|
|
print(f"[notify] Sent reminder to ntfy topic '{topic}' (header sanitized)")
|
|
except (UnicodeEncodeError, requests.RequestException) as error:
|
|
print(f"[notify] Failed to send reminder after sanitizing headers: {error}")
|
|
except requests.RequestException as error:
|
|
print(f"[notify] Failed to send reminder: {error}")
|
|
|
|
@staticmethod
|
|
def buildTopicUrl(topic: str) -> str:
|
|
if not topic:
|
|
return ""
|
|
base = ntfyBaseUrl.rstrip("/")
|
|
topicSlug = topic.lstrip("/")
|
|
return f"{base}/{topicSlug}"
|
|
|
|
@staticmethod
|
|
def buildTopic(actualUser: str) -> str:
|
|
fixed = (ntfyFixedTopic or "").strip()
|
|
if fixed:
|
|
return fixed
|
|
template = ntfyTopicTemplate or ""
|
|
if not template:
|
|
return ""
|
|
try:
|
|
return template.format(userId=actualUser)
|
|
except (KeyError, IndexError, ValueError):
|
|
return template
|
|
|
|
@staticmethod
|
|
def buildHeaders(message_text: str) -> Dict[str, str]:
|
|
headers = {"Title": NotificationDispatcher.buildTitleHeader(message_text)}
|
|
if ntfyAuthToken:
|
|
headers["Authorization"] = f"Bearer {ntfyAuthToken}"
|
|
return headers
|
|
|
|
@staticmethod
|
|
def stripUnicodeHeaders(headers: Dict[str, str]) -> Dict[str, str]:
|
|
safe_headers: Dict[str, str] = {}
|
|
for key, value in headers.items():
|
|
if key.lower() == "title":
|
|
safe_headers[key] = NotificationDispatcher.buildTitleHeader(value or "")
|
|
continue
|
|
safe_headers[key] = value
|
|
safe_headers.setdefault("Title", "Reminder from ADHDbot")
|
|
return safe_headers
|
|
|
|
@staticmethod
|
|
def postToNtfy(url: str, payload: bytes, headers: Dict[str, str]):
|
|
response = requests.post(url, data=payload, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
@staticmethod
|
|
def buildTitleHeader(text: str) -> str:
|
|
snippet = (text or "").strip()[:120]
|
|
if not snippet:
|
|
snippet = "Reminder from ADHDbot"
|
|
ascii_only = snippet.encode("ascii", "ignore").decode("ascii").strip()
|
|
return ascii_only or "Reminder from ADHDbot"
|
|
|
|
@staticmethod
|
|
def normalizeTrigger(reminder: Dict[str, Any]):
|
|
trigger = reminder.get("trigger") or {}
|
|
value = trigger.get("value")
|
|
if not value:
|
|
return ""
|
|
try:
|
|
text = str(value).strip()
|
|
if text.endswith("Z"):
|
|
text = text[:-1] + "+00:00"
|
|
parsed = datetime.fromisoformat(text)
|
|
if not parsed.tzinfo:
|
|
parsed = parsed.replace(tzinfo=timezone.utc)
|
|
trigger["value"] = parsed.isoformat()
|
|
reminder["trigger"] = trigger
|
|
return parsed.isoformat()
|
|
except ValueError:
|
|
return str(value)
|