Files
Synculous-2/DOCUMENTATION.md
2026-02-12 22:11:52 -06:00

68 KiB

====== LLM Bot Framework - Complete Technical Documentation ======

===== Introduction =====

The LLM Bot Framework is a template for building Discord bots powered by Large Language Models (LLMs) with natural language command parsing. It provides a complete architecture for creating domain-specific bots that can understand and execute user commands expressed in natural language.

The framework demonstrates a modular, layered architecture that separates concerns cleanly:

  • AI Layer - Natural language parsing via LLM
  • Bot Layer - Discord client with session management
  • API Layer - RESTful endpoints with JWT authentication
  • Core Layer - Database, authentication, notifications
  • Scheduler Layer - Background task processing

===== Architecture Overview =====

==== The Big Picture ====

At its core, the framework implements a request-response flow that transforms natural language into structured actions:

flowchart TD A["User Message (Discord DM)"] --> B["bot/bot.py"] B --> C["ai/parser.py (LLM)"] C --> D["Structured JSON"] D --> E["command_registry"] E --> F["Domain Handler"] F --> G["API Request (HTTP)"] G --> H["api/main.py (Flask)"] H --> I["core/ modules"] I --> J[("PostgreSQL")] J -.-> I I -.-> H H -.-> G G -.-> F F -.-> B B -.-> A

==== Global Architecture ====

flowchart TB subgraph external ["External Services"] DISCORD["Discord API"] OPENROUTER["OpenRouter API"] end
subgraph docker ["Docker Compose"]
    subgraph bot_svc ["bot service"]
        BOT["bot/bot.py"]
        REG["command_registry.py"]
        CMDS["commands/example.py"]
    end

    subgraph app_svc ["app service (port 8080)"]
        FLASK["api/main.py (Flask)"]
        ROUTES["api/routes/example.py"]
    end

    subgraph core_svc ["core/"]
        AUTH["auth.py"]
        USERS["users.py"]
        NOTIF["notifications.py"]
        PG["postgres.py"]
    end

    subgraph ai_svc ["ai/"]
        PARSER["parser.py"]
        CONFIG["ai_config.json"]
    end

    subgraph sched_svc ["scheduler service"]
        DAEMON["daemon.py"]
    end

    DB[("PostgreSQL")]
end

DISCORD <--> BOT
BOT --> PARSER
PARSER --> OPENROUTER
PARSER --> CONFIG
BOT --> REG
REG --> CMDS
CMDS --> PARSER
BOT -- "HTTP" --> FLASK
FLASK --> ROUTES
FLASK --> AUTH
FLASK --> USERS
ROUTES --> AUTH
ROUTES --> PG
AUTH --> USERS
AUTH --> PG
USERS --> PG
NOTIF --> PG
PG --> DB
DAEMON --> PG

==== Docker Service Orchestration ====

flowchart LR DB[("db\nPostgreSQL:16\nport 5432")] -- "healthcheck:\npg_isready" --> DB DB -- "service_healthy" --> APP["app\nFlask API\nport 8080:5000"] DB -- "service_healthy" --> SCHED["scheduler\ndaemon.py"] APP -- "service_started" --> BOT["bot\nbot.bot"] ENV[".env\n(DB_PASS)"] -.-> DB CENV["config/.env\n(all vars)"] -.-> APP CENV -.-> BOT CENV -.-> SCHED

===== Core Layer =====

==== core/postgres.py - Generic PostgreSQL CRUD ====

This module provides a database abstraction layer that eliminates the need to write raw SQL for common operations. It uses parameterized queries throughout to prevent SQL injection.

=== Configuration ===

Connection settings are pulled from environment variables:

^ Variable ^ Default ^ Description ^ | DB_HOST | localhost | PostgreSQL server hostname | | DB_PORT | 5432 | PostgreSQL server port | | DB_NAME | app | Database name | | DB_USER | app | Database user | | DB_PASS | (empty) | Database password |

=== Internal Functions ===

_get_config() - Returns a dictionary of connection parameters

def _get_config(): return { "host": os.environ.get("DB_HOST", "localhost"), "port": int(os.environ.get("DB_PORT", 5432)), "dbname": os.environ.get("DB_NAME", "app"), "user": os.environ.get("DB_USER", "app"), "password": os.environ.get("DB_PASS", ""), }

This function centralizes database configuration, making it easy to test with different databases or override settings.

_safe_id(name) - Validates and escapes SQL identifiers

def _safe_id(name): if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", name): raise ValueError(f"Invalid SQL identifier: {name}") return f'"{name}"'

Critical for security. This function:

  • Validates that the identifier contains only alphanumeric characters and underscores
  • Must start with a letter or underscore
  • Wraps the identifier in double quotes for PostgreSQL

This prevents SQL injection through table or column names. Without this, a malicious input like ''users; DROP TABLE users;--'' could execute destructive commands.

_build_where(where, prefix="") - Constructs WHERE clauses from dictionaries

def _build_where(where, prefix=""): clauses = [] params = {} for i, (col, val) in enumerate(where.items()): param_name = f"{prefix}{col}_{i}" safe_col = _safe_id(col) # ... handles various conditions

This function transforms Python dictionaries into SQL WHERE clauses:

Simple equality: {"username": "john"}

→ WHERE "username" = %(username_0)s

Comparison operators: {"age": (">", 18)}

→ WHERE "age" > %(age_0)s

Supported operators: =, !=, <, >, <=, >=, LIKE, ILIKE, IN, IS, IS NOT

IN clause: {"status": ("IN", ["active", "pending"])}

→ WHERE "status" IN (%(status_0_0)s, %(status_0_1)s)

NULL checks: {"deleted_at": None}

→ WHERE "deleted_at" IS NULL

{"deleted_at": ("IS NOT", None)}

→ WHERE "deleted_at" IS NOT NULL

The ''prefix'' parameter prevents parameter name collisions when the same column appears in multiple parts of a query (e.g., in both SET and WHERE clauses).

=== Connection Management ===

flowchart TD CRUD["CRUD function\n(insert/select/update/delete)"] --> GC["get_cursor(dict_cursor)"] GC --> GCONN["get_connection()"] GCONN --> CONNECT["psycopg2.connect(**_get_config())"] CONNECT --> YIELD_CONN["yield conn"] YIELD_CONN --> CURSOR["conn.cursor(RealDictCursor)"] CURSOR --> YIELD_CUR["yield cursor"] YIELD_CUR --> EXEC["cur.execute(query, params)"] EXEC --> SUCCESS{"Success?"} SUCCESS -- "Yes" --> COMMIT["conn.commit()"] SUCCESS -- "Exception" --> ROLLBACK["conn.rollback()"] COMMIT --> CLOSE["conn.close()"] ROLLBACK --> CLOSE

get_connection() - Context manager for database connections

@contextmanager def get_connection(): conn = psycopg2.connect(**_get_config()) try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close()

This context manager ensures:

  • Automatic connection opening when entering the context
  • Automatic commit on successful completion
  • Automatic rollback on any exception
  • Guaranteed cleanup - connection always closes

get_cursor(dict_cursor=True) - Context manager for cursors

@contextmanager def get_cursor(dict_cursor=True): with get_connection() as conn: factory = psycopg2.extras.RealDictCursor if dict_cursor else None cur = conn.cursor(cursor_factory=factory) try: yield cur finally: cur.close()

By default, returns a ''RealDictCursor'' which returns rows as dictionaries instead of tuples, making results easier to work with:

# With dict cursor: {"id": "abc123", "username": "john"}

Without dict cursor:

("abc123", "john")

=== CRUD Operations ===

insert(table, data) - Insert a single row

def insert(table, data): columns = list(data.keys()) placeholders = [f"%({col})s" for col in columns] safe_cols = [_safe_id(c) for c in columns]
query = f"""
    INSERT INTO {_safe_id(table)}
    ({", ".join(safe_cols)})
    VALUES ({", ".join(placeholders)})
    RETURNING *
"""
with get_cursor() as cur:
    cur.execute(query, data)
    return dict(cur.fetchone()) if cur.rowcount else None

Example usage: user = postgres.insert("users", { "id": str(uuid.uuid4()), "username": "john", "password_hashed": hashed_pw })

Returns the inserted row with all fields

The ''RETURNING *'' clause returns the complete inserted row, including any auto-generated fields like ''created_at''.

select(table, where=None, order_by=None, limit=None, offset=None) - Query rows

def select(table, where=None, order_by=None, limit=None, offset=None): query = f"SELECT * FROM {_safe_id(table)}" params = {}
if where:
    clauses, params = _build_where(where)
    query += f" WHERE {clauses}"
if order_by:
    if isinstance(order_by, list):
        order_by = ", ".join(order_by)
    query += f" ORDER BY {order_by}"
if limit is not None:
    query += f" LIMIT {int(limit)}"
if offset is not None:
    query += f" OFFSET {int(offset)}"

with get_cursor() as cur:
    cur.execute(query, params)
    return [dict(row) for row in cur.fetchall()]

Examples:

Get all users

all_users = postgres.select("users")

Get users with filtering

active_users = postgres.select("users", where={"status": "active"}, order_by="created_at DESC", limit=10 )

Complex filtering

adults = postgres.select("users", where={"age": (">=", 18), "status": "active"} )

select_one(table, where) - Query a single row

def select_one(table, where): results = select(table, where=where, limit=1) return results[0] if results else None

Convenience method that returns ''None'' if no row found, instead of an empty list: user = postgres.select_one("users", {"username": "john"}) if user: print(user["id"])

update(table, data, where) - Update rows

def update(table, data, where): set_columns = list(data.keys()) set_clause = ", ".join(f"{_safe_id(col)} = %(set_{col})s" for col in set_columns) params = {f"set_{col}": val for col, val in data.items()}
where_clause, where_params = _build_where(where, prefix="where_")
params.update(where_params)

query = f"""
    UPDATE {_safe_id(table)}
    SET {set_clause}
    WHERE {where_clause}
    RETURNING *
"""
with get_cursor() as cur:
    cur.execute(query, params)
    return [dict(row) for row in cur.fetchall()]

The ''prefix'' parameter prevents parameter name collisions. Example: updated = postgres.update( "users", {"status": "inactive"}, {"id": user_uuid} )

Returns all updated rows (useful when updating multiple rows).

delete(table, where) - Delete rows

def delete(table, where): where_clause, params = _build_where(where) query = f""" DELETE FROM {_safe_id(table)} WHERE {where_clause} RETURNING * """ with get_cursor() as cur: cur.execute(query, params) return [dict(row) for row in cur.fetchall()]

Returns deleted rows for confirmation/auditing: deleted = postgres.delete("users", {"id": user_uuid}) print(f"Deleted {len(deleted)} user(s)")

=== Utility Functions ===

count(table, where=None) - Count rows

def count(table, where=None): query = f"SELECT COUNT(*) as count FROM {_safe_id(table)}" params = {} if where: clauses, params = _build_where(where) query += f" WHERE {clauses}" with get_cursor() as cur: cur.execute(query, params) return cur.fetchone()["count"]

exists(table, where) - Check if rows exist

def exists(table, where): return count(table, where) > 0

upsert(table, data, conflict_columns) - Insert or update

def upsert(table, data, conflict_columns): columns = list(data.keys()) placeholders = [f"%({col})s" for col in columns] safe_cols = [_safe_id(c) for c in columns] conflict_cols = [_safe_id(c) for c in conflict_columns]
update_cols = [c for c in columns if c not in conflict_columns]
update_clause = ", ".join(
    f"{_safe_id(c)} = EXCLUDED.{_safe_id(c)}" for c in update_cols
)

query = f"""
    INSERT INTO {_safe_id(table)}
    ({", ".join(safe_cols)})
    VALUES ({", ".join(placeholders)})
    ON CONFLICT ({", ".join(conflict_cols)})
    DO UPDATE SET {update_clause}
    RETURNING *
"""

Example: Create or update user settings settings = postgres.upsert( "notifications", { "user_uuid": user_uuid, "discord_enabled": True, "discord_webhook": "https://..." }, conflict_columns=["user_uuid"] )

insert_many(table, rows) - Bulk insert

def insert_many(table, rows): if not rows: return 0 columns = list(rows[0].keys()) safe_cols = [_safe_id(c) for c in columns] query = f""" INSERT INTO {_safe_id(table)} ({", ".join(safe_cols)}) VALUES %s """ template = f"({', '.join(f'%({col})s' for col in columns)})" with get_cursor() as cur: psycopg2.extras.execute_values( cur, query, rows, template=template, page_size=100 ) return cur.rowcount

Uses ''execute_values'' for efficient bulk inserts (up to 100 rows per batch): rows = [ {"id": str(uuid.uuid4()), "name": "Task 1"}, {"id": str(uuid.uuid4()), "name": "Task 2"}, {"id": str(uuid.uuid4()), "name": "Task 3"}, ] count = postgres.insert_many("tasks", rows)

execute(query, params=None) - Execute raw SQL

def execute(query, params=None): with get_cursor() as cur: cur.execute(query, params or {}) if cur.description: return [dict(row) for row in cur.fetchall()] return cur.rowcount

For complex queries that don't fit the CRUD pattern: results = postgres.execute(""" SELECT u.username, COUNT(t.id) as task_count FROM users u LEFT JOIN tasks t ON t.user_uuid = u.id GROUP BY u.username HAVING COUNT(t.id) > :min_count """, {"min_count": 5})

table_exists(table) - Check if table exists

get_columns(table) - Get table schema information

==== core/auth.py - JWT Authentication ====

This module handles user authentication using JWT (JSON Web Tokens) with bcrypt password hashing.

=== Token Management ===

flowchart LR subgraph getLoginToken A["username, password"] --> B["users.getUserUUID()"] B --> C["getUserpasswordHash()"] C --> D["bcrypt.checkpw()"] D -- "match" --> E["jwt.encode(payload)"] D -- "no match" --> F["return False"] E --> G["return JWT token"] end
subgraph verifyLoginToken
    H["token, username/userUUID"] --> I{"username\nprovided?"}
    I -- "Yes" --> J["users.getUserUUID()"]
    J --> K["jwt.decode()"]
    I -- "No" --> K
    K -- "valid" --> L{"sub == userUUID?"}
    L -- "Yes" --> M["return True"]
    L -- "No" --> N["return False"]
    K -- "expired/invalid" --> N
end

getLoginToken(username, password) - Generate JWT on successful login

def getLoginToken(username, password): userUUID = users.getUserUUID(username) if userUUID: formatted_pass = password.encode("utf-8") users_hashed_pw = getUserpasswordHash(userUUID) if bcrypt.checkpw(formatted_pass, users_hashed_pw): payload = { "sub": userUUID, "name": users.getUserFirstName(userUUID), "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1), } return jwt.encode(payload, os.getenv("JWT_SECRET"), algorithm="HS256") return False

The JWT payload contains:

  • sub (subject) - User's UUID, used for identification
  • name - User's display name
  • exp (expiration) - Token expires in 1 hour

verifyLoginToken(login_token, username=False, userUUID=False) - Validate JWT

def verifyLoginToken(login_token, username=False, userUUID=False): if username: userUUID = users.getUserUUID(username)
if userUUID:
    try:
        decoded_token = jwt.decode(
            login_token, os.getenv("JWT_SECRET"), algorithms=["HS256"]
        )
        if decoded_token.get("sub") == str(userUUID):
            return True
        return False
    except (ExpiredSignatureError, InvalidTokenError):
        return False
return False

Validates that:

  1. The token is properly signed
  2. The token hasn't expired
  3. The token belongs to the claimed user

=== Password Management ===

getUserpasswordHash(userUUID) - Retrieve stored password hash

def getUserpasswordHash(userUUID): user = postgres.select_one("users", {"id": userUUID}) if user: pw_hash = user.get("password_hashed") if isinstance(pw_hash, memoryview): return bytes(pw_hash) return pw_hash return None

Handles the case where PostgreSQL returns ''BYTEA'' as a ''memoryview'' object.

flowchart LR A["unregisterUser(userUUID, password)"] --> B["getUserpasswordHash()"] B --> C["postgres.select_one('users')"] C --> D{"hash found?"} D -- "No" --> E["return False"] D -- "Yes" --> F["bcrypt.checkpw()"] F -- "match" --> G["users.deleteUser()"] F -- "no match" --> E G --> H["postgres.delete('users')"]

unregisterUser(userUUID, password) - Delete user account with password confirmation

def unregisterUser(userUUID, password): pw_hash = getUserpasswordHash(userUUID) if not pw_hash: return False if bcrypt.checkpw(password.encode("utf-8"), pw_hash): return users.deleteUser(userUUID) return False

Requires password re-entry to prevent unauthorized account deletion.

==== core/users.py - User Management ====

This module provides CRUD operations for users with validation and security considerations.

=== Query Functions ===

getUserUUID(username) - Get UUID from username def getUserUUID(username): userRecord = postgres.select_one("users", {"username": username}) if userRecord: return userRecord["id"] return False

getUserFirstName(userUUID) - Get user's display name def getUserFirstName(userUUID): userRecord = postgres.select_one("users", {"id": userUUID}) if userRecord: return userRecord.get("username") return None

isUsernameAvailable(username) - Check username uniqueness def isUsernameAvailable(username): return not postgres.exists("users", {"username": username})

doesUserUUIDExist(userUUID) - Verify UUID exists def doesUserUUIDExist(userUUID): return postgres.exists("users", {"id": userUUID})

=== Mutation Functions ===

flowchart TD REG["registerUser(username, password)"] --> AVAIL["isUsernameAvailable()"] AVAIL --> EXISTS["postgres.exists('users')"] EXISTS -- "taken" --> RET_F["return False"] EXISTS -- "available" --> HASH["bcrypt.hashpw(password, gensalt())"] HASH --> CREATE["createUser(user_data)"] CREATE --> VALIDATE["validateUser()"] VALIDATE -- "invalid" --> RAISE["raise ValueError"] VALIDATE -- "valid" --> INSERT["postgres.insert('users')"] INSERT --> RET_T["return True"]
UPD["updateUser(userUUID, data)"] --> LOOKUP["postgres.select_one('users')"]
LOOKUP -- "not found" --> RET_F2["return False"]
LOOKUP -- "found" --> FILTER["filter blocked fields\n(id, password_hashed, created_at)"]
FILTER --> UPD_DB["postgres.update('users')"]

DEL["deleteUser(userUUID)"] --> LOOKUP2["postgres.select_one('users')"]
LOOKUP2 -- "not found" --> RET_F3["return False"]
LOOKUP2 -- "found" --> DEL_DB["postgres.delete('users')"]

registerUser(username, password, data=None) - Create new user def registerUser(username, password, data=None): if isUsernameAvailable(username): hashed_pass = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()) user_data = { "id": str(uuid.uuid4()), "username": username, "password_hashed": hashed_pass, } if data: user_data.update(data) createUser(user_data) return True return False

Uses ''bcrypt.gensalt()'' to generate a unique salt for each password.

updateUser(userUUID, data_dict) - Update user fields def updateUser(userUUID, data_dict): user = postgres.select_one("users", {"id": userUUID}) if not user: return False blocked = {"id", "password_hashed", "created_at"} allowed = set(user.keys()) - blocked updates = {k: v for k, v in data_dict.items() if k in allowed} if not updates: return False postgres.update("users", updates, {"id": userUUID}) return True

Blocked fields prevent modification of:

  • ''id'' - Primary key should never change
  • ''password_hashed'' - Use ''changePassword()'' instead
  • ''created_at'' - Audit field should be immutable

changePassword(userUUID, new_password) - Securely update password def changePassword(userUUID, new_password): user = postgres.select_one("users", {"id": userUUID}) if not user: return False hashed = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt()) postgres.update("users", {"password_hashed": hashed}, {"id": userUUID}) return True

deleteUser(userUUID) - Remove user record def deleteUser(userUUID): user = postgres.select_one("users", {"id": userUUID}) if not user: return False postgres.delete("users", {"id": userUUID}) return True

=== Internal Functions ===

createUser(data_dict) - Internal user creation with validation def createUser(data_dict): user_schema = { "id": None, "username": None, "password_hashed": None, "created_at": None, } for key in user_schema: if key in data_dict: user_schema[key] = data_dict[key]

is_valid, errors = validateUser(user_schema)
if not is_valid:
    raise ValueError(f"Invalid user data: {', '.join(errors)}")

postgres.insert("users", user_schema)

validateUser(user) - Ensure required fields present def validateUser(user): required = ["id", "username", "password_hashed"] missing = [f for f in required if f not in user or user[f] is None] if missing: return False, missing return True, []

==== core/notifications.py - Multi-Channel Notifications ====

This module provides notification routing to multiple channels (Discord webhooks, ntfy).

flowchart TD SEND["_sendToEnabledChannels(settings, message)"] SEND --> CHK_D{"discord_enabled\nand webhook set?"} CHK_D -- "Yes" --> DISC["discord.send(webhook_url, message)"] CHK_D -- "No" --> CHK_N DISC --> CHK_N{"ntfy_enabled\nand topic set?"} CHK_N -- "Yes" --> NTFY["ntfy.send(topic, message)"] CHK_N -- "No" --> RESULT NTFY --> RESULT["return True if any succeeded"]
DISC -- "POST webhook_url\n{content: message}" --> DISCORD_API["Discord Webhook\n(expects 204)"]
NTFY -- "POST ntfy.sh/topic\nmessage body" --> NTFY_API["ntfy.sh\n(expects 200)"]

GET["getNotificationSettings(userUUID)"] --> DB_SEL["postgres.select_one('notifications')"]
SET["setNotificationSettings(userUUID, data)"] --> DB_CHK{"existing\nrecord?"}
DB_CHK -- "Yes" --> DB_UPD["postgres.update('notifications')"]
DB_CHK -- "No" --> DB_INS["postgres.insert('notifications')"]

=== Notification Channels ===

discord.send(webhook_url, message) - Send via Discord webhook class discord: @staticmethod def send(webhook_url, message): try: response = requests.post(webhook_url, json={"content": message}) return response.status_code == 204 except: return False

Discord webhooks return ''204 No Content'' on success.

ntfy.send(topic, message) - Send via ntfy.sh class ntfy: @staticmethod def send(topic, message): try: response = requests.post( f"https://ntfy.sh/{topic}", data=message.encode("utf-8") ) return response.status_code == 200 except: return False

ntfy.sh is a free push notification service. Users subscribe to topics.

=== Settings Management ===

getNotificationSettings(userUUID) - Retrieve user notification config def getNotificationSettings(userUUID): settings = postgres.select_one("notifications", {"user_uuid": userUUID}) if not settings: return False return settings

setNotificationSettings(userUUID, data_dict) - Update notification config def setNotificationSettings(userUUID, data_dict): existing = postgres.select_one("notifications", {"user_uuid": userUUID}) allowed = [ "discord_webhook", "discord_enabled", "ntfy_topic", "ntfy_enabled", ] updates = {k: v for k, v in data_dict.items() if k in allowed} if not updates: return False if existing: postgres.update("notifications", updates, {"user_uuid": userUUID}) else: updates["id"] = str(uuid.uuid4()) updates["user_uuid"] = userUUID postgres.insert("notifications", updates) return True

Implements an upsert pattern - updates if exists, inserts if not.

_sendToEnabledChannels(notif_settings, message) - Route to all enabled channels def _sendToEnabledChannels(notif_settings, message): sent = False

if notif_settings.get("discord_enabled") and notif_settings.get("discord_webhook"):
    if discord.send(notif_settings["discord_webhook"], message):
        sent = True

if notif_settings.get("ntfy_enabled") and notif_settings.get("ntfy_topic"):
    if ntfy.send(notif_settings["ntfy_topic"], message):
        sent = True

return sent

Returns ''True'' if any channel succeeded, allowing partial failures.

===== AI Layer =====

==== ai/parser.py - LLM-Powered JSON Parser ====

This is the heart of the natural language interface. It transforms user messages into structured JSON using an LLM, with automatic retry and validation.

=== Configuration Loading ===

CONFIG_PATH = os.environ.get( "AI_CONFIG_PATH", os.path.join(os.path.dirname(__file__), "ai_config.json") )

with open(CONFIG_PATH, "r") as f: AI_CONFIG = json.load(f)

Configuration is loaded once at module import time.

=== OpenAI Client Initialization ===

client = OpenAI( api_key=os.getenv("OPENROUTER_API_KEY"), base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1"), )

Uses OpenRouter for model flexibility - can use any OpenAI-compatible model including:

  • OpenAI models (GPT-4, GPT-3.5)
  • Anthropic models (Claude)
  • Open-source models (Llama, Qwen, Mistral)

=== Internal Functions ===

_extract_json_from_text(text) - Extract JSON from reasoning model output

def _extract_json_from_text(text): match = re.search(r"```json\s*(\{.*?\})\s*```", text, re.DOTALL) if match: return match.group(1) match = re.search(r"(\{[^{}]*\})", text, re.DOTALL) if match: return match.group(1) return None

Some reasoning models (like Qwen with thinking) output reasoning before the JSON. This extracts:

  1. JSON inside markdown code blocks: ''%%json {...} %%''
  2. First JSON object in text: ''{...}''
flowchart LR CALL["_call_llm(system, user)"] --> API["OpenAI client\nchat.completions.create()"] API --> MSG["response.choices[0].message"] MSG --> CHK_C{"msg.content\nnon-empty?"} CHK_C -- "Yes" --> RET_TEXT["return content"] CHK_C -- "No" --> CHK_R{"msg.reasoning\nexists?"} CHK_R -- "Yes" --> EXTRACT["_extract_json_from_text()"] CHK_R -- "No" --> RET_NONE["return None"] EXTRACT --> RET_JSON["return extracted JSON"] API -- "Exception" --> LOG["print error"] --> RET_NONE

_call_llm(system_prompt, user_prompt) - Execute LLM request

def _call_llm(system_prompt, user_prompt): try: response = client.chat.completions.create( model=AI_CONFIG["model"], max_tokens=AI_CONFIG.get("max_tokens", 8192), timeout=AI_CONFIG["validation"]["timeout_seconds"], messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], ) msg = response.choices[0].message text = msg.content.strip() if msg.content else "" if text: return text reasoning = getattr(msg, "reasoning", None) if reasoning: extracted = _extract_json_from_text(reasoning) if extracted: return extracted return None except Exception as e: print(f"LLM error: {type(e).__name__}: {e}", flush=True) return None

Handles both:

  • Standard responses - JSON in ''message.content''
  • Reasoning responses - JSON extracted from ''message.reasoning''

=== Main Parsing Function ===

flowchart TD START["parse(user_input, interaction_type)"] --> RETRY_CHK{"retry_count\n>= max_retries?"} RETRY_CHK -- "Yes" --> ERR_MAX["return {error: 'Failed after N retries'}"] RETRY_CHK -- "No" --> PROMPT["Build prompt from\nai_config.json template"] PROMPT --> HISTORY["Inject last 3 conversation turns"] HISTORY --> LLM["_call_llm(system, user_prompt)"] LLM --> LLM_CHK{"Response\nreceived?"} LLM_CHK -- "No" --> ERR_UNAVAIL["return {error: 'AI unavailable'}"] LLM_CHK -- "Yes" --> JSON_PARSE["json.loads(response)"] JSON_PARSE -- "JSONDecodeError" --> RETRY_JSON["parse(..., retry+1,\nerrors=['not valid JSON'])"] JSON_PARSE -- "Success" --> VALIDATE{"Custom validator\nregistered?"} VALIDATE -- "No" --> RETURN["return parsed JSON"] VALIDATE -- "Yes" --> RUN_VAL["validator(parsed)"] RUN_VAL --> VAL_CHK{"Validation\nerrors?"} VAL_CHK -- "No" --> RETURN VAL_CHK -- "Yes" --> RETRY_VAL["parse(..., retry+1,\nerrors=validation_errors)"]

parse(user_input, interaction_type, retry_count=0, errors=None, history=None)

def parse(user_input, interaction_type, retry_count=0, errors=None, history=None): if retry_count >= AI_CONFIG["validation"]["max_retries"]: return { "error": f"Failed to parse after {retry_count} retries", "user_input": user_input, }

Parameters:

  • ''user_input'' - Raw user message
  • ''interaction_type'' - Key in config prompts (e.g., ''command_parser'')
  • ''retry_count'' - Internal counter for automatic retry
  • ''errors'' - Previous validation errors (for retry context)
  • ''history'' - List of (message, parsed_result) tuples

Retry Logic: try: parsed = json.loads(response_text) except json.JSONDecodeError: return parse( user_input, interaction_type, retry_count + 1, ["Response was not valid JSON"], history=history, )

When JSON parsing fails, the function recursively calls itself with incremented retry count, passing the error to give the LLM context to fix its output.

Validation Integration: validator = AI_CONFIG["validation"].get("validators", {}).get(interaction_type) if validator: validation_errors = validator(parsed) if validation_errors: return parse( user_input, interaction_type, retry_count + 1, validation_errors, history=history, )

Custom validators are called after successful JSON parsing. If validation fails, the parser retries with the validation errors as context.

Conversation History: history_context = "No previous context" if history and len(history) > 0: history_lines = [] for i, (msg, result) in enumerate(history[-3:]): history_lines.append(f"{i + 1}. User: {msg}") if isinstance(result, dict) and not result.get("error"): history_lines.append(f" Parsed: {json.dumps(result)}") else: history_lines.append(f" Parsed: {result}") history_context = "\n".join(history_lines)

The last 3 conversation turns are included for context, enabling:

  • Pronoun resolution ("Add it to my list")
  • Follow-up commands ("Change the second one")
  • Clarification handling

=== Validator Registration ===

register_validator(interaction_type, validator_fn) def register_validator(interaction_type, validator_fn): if "validators" not in AI_CONFIG["validation"]: AI_CONFIG["validation"]["validators"] = {} AI_CONFIG["validation"]["validators"][interaction_type] = validator_fn

Domain modules register their validators: def validate_example_json(data): errors = [] if "action" not in data: errors.append("Missing required field: action") return errors

ai_parser.register_validator("example", validate_example_json)

==== ai/ai_config.json - AI Configuration ====

{ "model": "qwen/qwen3-next-80b-a3b-thinking:nitro", "max_tokens": 8192, "prompts": { "command_parser": { "system": "...", "user_template": "..." } }, "validation": { "max_retries": 3, "timeout_seconds": 15, "validators": {} } }

Configuration Fields:

^ Field ^ Purpose ^ | model | OpenRouter model identifier | | max_tokens | Maximum response length | | prompts | Prompt templates by interaction type | | validation.max_retries | Retry attempts on failure | | validation.timeout_seconds | LLM request timeout | | validation.validators | Runtime-registered validators |

Prompt Structure:

Each prompt has:

  • ''system'' - System prompt defining the AI's role
  • ''user_template'' - Template with ''{user_input}'' and ''{history_context}'' placeholders

===== Bot Layer =====

==== bot/bot.py - Discord Client ====

This is the Discord bot client that manages user sessions and routes commands.

=== Global State ===

user_sessions = {} # discord_id → {token, user_uuid, username} login_state = {} # discord_id → {step, username} message_history = {} # discord_id → [(msg, parsed), ...] user_cache = {} # discord_id → {hashed_password, user_uuid, username} CACHE_FILE = "/app/user_cache.pkl"

user_sessions - Active authenticated sessions

login_state - Tracks multi-step login flow

message_history - Last 5 messages per user for context

user_cache - Persisted credentials for auto-login

flowchart TD MSG["on_message(message)"] --> SELF{"Own\nmessage?"} SELF -- "Yes" --> IGNORE["ignore"] SELF -- "No" --> DM{"Is DM\nchannel?"} DM -- "No" --> IGNORE DM -- "Yes" --> LOGIN_CHK{"In\nlogin_state?"} LOGIN_CHK -- "Yes" --> HANDLE_LOGIN["handleLoginStep()"] LOGIN_CHK -- "No" --> SESSION_CHK{"Has\nsession?"} SESSION_CHK -- "No" --> START_LOGIN["Start login flow\n(ask for username)"] SESSION_CHK -- "Yes" --> ROUTE["routeCommand()"]
ROUTE --> HELP_CHK{"'help' in\nmessage?"}
HELP_CHK -- "Yes" --> HELP["sendHelpMessage()"]
HELP_CHK -- "No" --> PARSE["ai_parser.parse()"]
PARSE --> CLARIFY{"needs_\nclarification?"}
CLARIFY -- "Yes" --> ASK["Ask user to clarify"]
CLARIFY -- "No" --> ERROR_CHK{"error in\nparsed?"}
ERROR_CHK -- "Yes" --> SHOW_ERR["Show error"]
ERROR_CHK -- "No" --> HANDLER["get_handler(interaction_type)"]
HANDLER -- "found" --> EXEC["handler(message, session, parsed)"]
HANDLER -- "not found" --> UNKNOWN["'Unknown command type'"]

=== Discord Client Setup ===

intents = discord.Intents.default() intents.message_content = True

client = discord.Client(intents=intents)

''message_content'' intent is required to read message text.

=== Utility Functions ===

decodeJwtPayload(token) - Decode JWT without verification def decodeJwtPayload(token): payload = token.split(".")[1] payload += "=" * (4 - len(payload) % 4) return json.loads(base64.urlsafe_b64decode(payload))

Used to extract ''user_uuid'' from tokens. Note: This does not verify the signature; verification happens server-side.

apiRequest(method, endpoint, token=None, data=None) - Make HTTP requests to API def apiRequest(method, endpoint, token=None, data=None): url = f"{API_URL}{endpoint}" headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" try: resp = getattr(requests, method)(url, headers=headers, json=data, timeout=10) try: return resp.json(), resp.status_code except ValueError: return {}, resp.status_code except requests.RequestException: return {"error": "API unavailable"}, 503

Returns a tuple of (response_data, status_code) for easy handling.

=== Cache Management ===

loadCache() - Load persisted user credentials def loadCache(): try: if os.path.exists(CACHE_FILE): with open(CACHE_FILE, "rb") as f: global user_cache user_cache = pickle.load(f) print(f"Loaded cache for {len(user_cache)} users") except Exception as e: print(f"Error loading cache: {e}")

saveCache() - Persist user credentials def saveCache(): try: with open(CACHE_FILE, "wb") as f: pickle.dump(user_cache, f) except Exception as e: print(f"Error saving cache: {e}")

Why cache credentials?

  • Users don't need to re-login every session
  • Passwords are hashed locally for verification
  • New tokens are fetched automatically

hashPassword() / verifyPassword() - Local password handling def hashPassword(password): return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")

def verifyPassword(password, hashed): return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))

=== Authentication Flow ===

flowchart TD START["negotiateToken(discord_id, username, password)"] --> CACHE{"Cached user\nfound?"} CACHE -- "Yes" --> VERIFY["verifyPassword(password, cached.hashed_password)"] VERIFY -- "Match" --> API_CACHED["POST /api/login"] VERIFY -- "Mismatch" --> API_FRESH["POST /api/login"] CACHE -- "No" --> API_FRESH
API_CACHED -- "200 + token" --> DECODE_C["decodeJwtPayload(token)"]
DECODE_C --> UPDATE_C["Update cache\n(keep existing hash)"]
UPDATE_C --> RET_OK["return (token, user_uuid)"]

API_FRESH -- "200 + token" --> DECODE_F["decodeJwtPayload(token)"]
DECODE_F --> UPDATE_F["Cache new credentials\n(hashPassword(password))"]
UPDATE_F --> RET_OK

API_CACHED -- "failure" --> RET_FAIL["return (None, None)"]
API_FRESH -- "failure" --> RET_FAIL

negotiateToken(discord_id, username, password) - Get token with caching def negotiateToken(discord_id, username, password): cached = getCachedUser(discord_id) if ( cached and cached.get("username") == username and verifyPassword(password, cached.get("hashed_password")) ): result, status = apiRequest( "post", "/api/login", data={"username": username, "password": password} ) if status == 200 and "token" in result: token = result["token"] payload = decodeJwtPayload(token) user_uuid = payload["sub"] setCachedUser( discord_id, { "hashed_password": cached["hashed_password"], "user_uuid": user_uuid, "username": username, }, ) return token, user_uuid return None, None

result, status = apiRequest(
    "post", "/api/login", data={"username": username, "password": password}
)
if status == 200 and "token" in result:
    token = result["token"]
    payload = decodeJwtPayload(token)
    user_uuid = payload["sub"]
    setCachedUser(
        discord_id,
        {
            "hashed_password": hashPassword(password),
            "user_uuid": user_uuid,
            "username": username,
        },
    )
    return token, user_uuid
return None, None

Flow:

  1. Check if user has cached credentials
  2. If cached password matches, fetch new token
  3. If no cache or mismatch, authenticate normally
  4. Cache credentials for future sessions

handleAuthFailure(message) - Handle expired sessions async def handleAuthFailure(message): discord_id = message.author.id user_sessions.pop(discord_id, None) await message.channel.send( "Your session has expired. Send any message to log in again." )

=== Login Flow ===

handleLoginStep(message) - Multi-step login process async def handleLoginStep(message): discord_id = message.author.id state = login_state[discord_id]

if state["step"] == "username":
    state["username"] = message.content.strip()
    state["step"] = "password"
    await message.channel.send("Password?")

elif state["step"] == "password":
    username = state["username"]
    password = message.content.strip()
    del login_state[discord_id]

    token, user_uuid = negotiateToken(discord_id, username, password)

    if token and user_uuid:
        user_sessions[discord_id] = {
            "token": token,
            "user_uuid": user_uuid,
            "username": username,
        }
        registered = ", ".join(list_registered()) or "none"
        await message.channel.send(
            f"Welcome back **{username}**!\n\n"
            f"Registered modules: {registered}\n\n"
            f"Send 'help' for available commands."
        )
    else:
        await message.channel.send(
            "Invalid credentials. Send any message to try again."
        )

State Machine:

stateDiagram-v2 [*] --> AskUsername: First message (no session) AskUsername --> AskPassword: User sends username AskPassword --> Authenticated: negotiateToken() succeeds AskPassword --> [*]: Invalid credentials\n(send any message to retry) Authenticated --> routeCommand: Subsequent messages

=== Command Routing ===

routeCommand(message) - Parse and route commands async def routeCommand(message): discord_id = message.author.id session = user_sessions[discord_id] user_input = message.content.lower()

if "help" in user_input or "what can i say" in user_input:
    await sendHelpMessage(message)
    return

async with message.channel.typing():
    history = message_history.get(discord_id, [])
    parsed = ai_parser.parse(message.content, "command_parser", history=history)

    if discord_id not in message_history:
        message_history[discord_id] = []
    message_history[discord_id].append((message.content, parsed))
    message_history[discord_id] = message_history[discord_id][-5:]

if "needs_clarification" in parsed:
    await message.channel.send(
        f"I'm not quite sure what you mean. {parsed['needs_clarification']}"
    )
    return

if "error" in parsed:
    await message.channel.send(
        f"I had trouble understanding that: {parsed['error']}"
    )
    return

interaction_type = parsed.get("interaction_type")
handler = get_handler(interaction_type)

if handler:
    await handler(message, session, parsed)
else:
    registered = ", ".join(list_registered()) or "none"
    await message.channel.send(
        f"Unknown command type '{interaction_type}'. Registered modules: {registered}"
    )

Flow:

  1. Check for help request
  2. Show typing indicator while LLM processes
  3. Parse with AI, including conversation history
  4. Update message history (keep last 5)
  5. Handle clarification requests
  6. Handle parsing errors
  7. Route to appropriate handler
  8. Handle unknown interaction types

=== Discord Event Handlers ===

on_ready() - Bot startup @client.event async def on_ready(): print(f"Bot logged in as {client.user}") loadCache() backgroundLoop.start()

on_message(message) - Message handler @client.event async def on_message(message): if message.author == client.user: return if not isinstance(message.channel, discord.DMChannel): return

discord_id = message.author.id

if discord_id in login_state:
    await handleLoginStep(message)
    return

if discord_id not in user_sessions:
    login_state[discord_id] = {"step": "username"}
    await message.channel.send("Welcome! Send your username to log in.")
    return

await routeCommand(message)

Filters:

  • Ignore own messages
  • Only respond to DMs (not server channels)

=== Background Tasks ===

backgroundLoop() - Scheduled task execution @tasks.loop(seconds=60) async def backgroundLoop(): """Override this in your domain module or extend as needed.""" pass

@backgroundLoop.before_loop async def beforeBackgroundLoop(): await client.wait_until_ready()

Runs every 60 seconds. Override for domain-specific polling.

==== bot/command_registry.py - Module Registration ====

flowchart TD subgraph "Module Registration (at import time)" EX["commands/example.py"] -- "register_module('example', handle_example)" --> REG["COMMAND_MODULES dict"] EX -- "register_validator('example', validate_fn)" --> VAL["AI_CONFIG validators dict"] HABIT["commands/habits.py"] -- "register_module('habit', handle_habit)" --> REG HABIT -- "register_validator('habit', validate_fn)" --> VAL end
subgraph "Runtime Dispatch"
    PARSED["parsed JSON\n{interaction_type: 'example'}"] --> GET["get_handler('example')"]
    GET --> REG
    REG --> HANDLER["handle_example(message, session, parsed)"]
end

This module provides a simple registry pattern for command handlers.

COMMAND_MODULES = {}

def register_module(interaction_type, handler): COMMAND_MODULES[interaction_type] = handler

def get_handler(interaction_type): return COMMAND_MODULES.get(interaction_type)

def list_registered(): return list(COMMAND_MODULES.keys())

Why a registry?

  • Decouples command handling from bot logic
  • Domain modules self-register
  • Easy to add/remove modules without touching core code

==== bot/commands/example.py - Example Command Module ====

This demonstrates the pattern for creating domain modules.

from bot.command_registry import register_module import ai.parser as ai_parser

async def handle_example(message, session, parsed): action = parsed.get("action", "unknown") token = session["token"] user_uuid = session["user_uuid"]

if action == "check":
    await message.channel.send(
        f"Checking example items for {session['username']}..."
    )
elif action == "add":
    item_name = parsed.get("item_name", "unnamed")
    await message.channel.send(f"Adding example item: **{item_name}**")
else:
    await message.channel.send(f"Unknown example action: {action}")

def validate_example_json(data): errors = []

if not isinstance(data, dict):
    return ["Response must be a JSON object"]

if "error" in data:
    return []

if "action" not in data:
    errors.append("Missing required field: action")

action = data.get("action")

if action == "add" and "item_name" not in data:
    errors.append("Missing required field for add: item_name")

return errors

register_module("example", handle_example) ai_parser.register_validator("example", validate_example_json)

Pattern:

  1. Define async handler function
  2. Define validation function
  3. Register both at module load time

===== API Layer =====

==== api/main.py - Flask Application ====

This is the REST API server providing endpoints for authentication, user management, and domain operations.

=== Route Registration ===

ROUTE_MODULES = []

def register_routes(module): """Register a routes module. Module should have a register(app) function.""" ROUTE_MODULES.append(module)

Modules are registered before app startup: if name == "main": for module in ROUTE_MODULES: if hasattr(module, "register"): module.register(app) app.run(host="0.0.0.0", port=5000)

flowchart LR subgraph "API Auth Pattern (all protected routes)" REQ["Incoming Request"] --> HDR{"Authorization\nheader?"} HDR -- "Missing/invalid" --> R401A["401 missing token"] HDR -- "Bearer token" --> DECODE["_get_user_uuid(token)\nor extract from URL"] DECODE --> VERIFY["auth.verifyLoginToken\n(token, userUUID)"] VERIFY -- "True" --> HANDLER["Route handler logic"] VERIFY -- "False" --> R401B["401 unauthorized"] end

=== Authentication Endpoints ===

POST /api/register - Create new user @app.route("/api/register", methods=["POST"]) def api_register(): data = flask.request.get_json() username = data.get("username") password = data.get("password") if not username or not password: return flask.jsonify({"error": "username and password required"}), 400 result = users.registerUser(username, password, data) if result: return flask.jsonify({"success": True}), 201 else: return flask.jsonify({"error": "username taken"}), 409

Response Codes:

  • ''201'' - User created successfully
  • ''400'' - Missing required fields
  • ''409'' - Username already exists

POST /api/login - Authenticate user @app.route("/api/login", methods=["POST"]) def api_login(): data = flask.request.get_json() username = data.get("username") password = data.get("password") if not username or not password: return flask.jsonify({"error": "username and password required"}), 400 token = auth.getLoginToken(username, password) if token: return flask.jsonify({"token": token}), 200 else: return flask.jsonify({"error": "invalid credentials"}), 401

Response Codes:

  • ''200'' - Returns JWT token
  • ''400'' - Missing required fields
  • ''401'' - Invalid credentials

=== User Endpoints ===

GET /api/getUserUUID/ - Get user's UUID @app.route("/api/getUserUUID/", methods=["GET"]) def api_getUserUUID(username): header = flask.request.headers.get("Authorization", "") if not header.startswith("Bearer "): return flask.jsonify({"error": "missing token"}), 401 token = header[7:] if auth.verifyLoginToken(token, username): return flask.jsonify(users.getUserUUID(username)), 200 else: return flask.jsonify({"error": "unauthorized"}), 401

GET /api/user/ - Get user details @app.route("/api/user/", methods=["GET"]) def api_getUser(userUUID): # ... auth check ... user = postgres.select_one("users", {"id": userUUID}) if user: user.pop("password_hashed", None) # Never return password hash return flask.jsonify(user), 200 else: return flask.jsonify({"error": "user not found"}), 404

Security note: ''password_hashed'' is explicitly removed before returning.

PUT /api/user/ - Update user @app.route("/api/user/", methods=["PUT"]) def api_updateUser(userUUID): # ... auth check ... data = flask.request.get_json() result = users.updateUser(userUUID, data) if result: return flask.jsonify({"success": True}), 200 else: return flask.jsonify({"error": "no valid fields to update"}), 400

DELETE /api/user/ - Delete user @app.route("/api/user/", methods=["DELETE"]) def api_deleteUser(userUUID): # ... auth check ... data = flask.request.get_json() password = data.get("password") if not password: return flask.jsonify( {"error": "password required for account deletion"} ), 400 result = auth.unregisterUser(userUUID, password) if result: return flask.jsonify({"success": True}), 200 else: return flask.jsonify({"error": "invalid password"}), 401

Security: Requires password confirmation for deletion.

=== Health Check ===

GET /health - Service health @app.route("/health", methods=["GET"]) def health_check(): return flask.jsonify({"status": "ok"}), 200

Used by Docker health checks and load balancers.

==== api/routes/example.py - Example Route Module ====

def _get_user_uuid(token): """Decode JWT to extract user UUID. Returns None on failure.""" try: payload = jwt.decode(token, os.getenv("JWT_SECRET"), algorithms=["HS256"]) return payload.get("sub") except (jwt.ExpiredSignatureError, jwt.InvalidTokenError): return None

def register(app): @app.route("/api/example", methods=["GET"]) def api_listExamples(): header = flask.request.headers.get("Authorization", "") if not header.startswith("Bearer "): return flask.jsonify({"error": "missing token"}), 401 token = header[7:]

    user_uuid = _get_user_uuid(token)
    if not user_uuid or not auth.verifyLoginToken(token, userUUID=user_uuid):
        return flask.jsonify({"error": "unauthorized"}), 401

    items = postgres.select("examples")
    return flask.jsonify(items), 200

@app.route("/api/example", methods=["POST"])
def api_addExample():
    # ... similar pattern ...
    data = flask.request.get_json()
    item = postgres.insert("examples", data)
    return flask.jsonify(item), 201

Authentication Pattern:

  1. Extract Bearer token from Authorization header
  2. Decode token to extract user UUID
  3. Verify token belongs to that user via ''verifyLoginToken''
  4. Return ''401 Unauthorized'' if invalid
  5. Proceed with request if valid

===== Scheduler Layer =====

==== scheduler/daemon.py - Background Task Daemon ====

flowchart TD START["daemon_loop()"] --> POLL["poll_callback()"] POLL --> SLEEP["time.sleep(POLL_INTERVAL)"] SLEEP --> POLL POLL -- "Exception" --> LOG["logger.error()"] LOG --> SLEEP

This module provides a simple polling loop for background tasks.

POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", 60))

def poll_callback(): """ Override this function with your domain logic. Called every POLL_INTERVAL seconds. """ pass

def daemon_loop(): logger.info("Scheduler daemon starting") while True: try: poll_callback() except Exception as e: logger.error(f"Poll callback error: {e}") time.sleep(POLL_INTERVAL)

Usage: Override ''poll_callback()'' in your implementation:

scheduler/daemon.py (customized)

def poll_callback(): # Check for due tasks tasks = postgres.select("tasks", where={"due_at": ("<=", datetime.now())}) for task in tasks: settings = notifications.getNotificationSettings(task["user_uuid"]) notifications._sendToEnabledChannels( settings, f"Task due: {task['name']}" )

Error Handling: Exceptions are caught and logged, allowing the daemon to continue running.

===== Configuration =====

==== config/schema.sql - Database Schema ====

erDiagram users { UUID id PK VARCHAR username UK "NOT NULL" BYTEA password_hashed "NOT NULL" TIMESTAMP created_at "DEFAULT NOW()" }
notifications {
    UUID id PK
    UUID user_uuid FK,UK "REFERENCES users(id)"
    VARCHAR discord_webhook
    BOOLEAN discord_enabled "DEFAULT FALSE"
    VARCHAR ntfy_topic
    BOOLEAN ntfy_enabled "DEFAULT FALSE"
    TIMESTAMP last_message_sent
    VARCHAR current_notification_status "DEFAULT 'inactive'"
    TIMESTAMP created_at "DEFAULT NOW()"
    TIMESTAMP updated_at "DEFAULT NOW()"
}

users ||--o| notifications : "ON DELETE CASCADE"
-- Users table (minimal) CREATE TABLE IF NOT EXISTS users ( id UUID PRIMARY KEY, username VARCHAR(255) UNIQUE NOT NULL, password_hashed BYTEA NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

-- Notifications table CREATE TABLE IF NOT EXISTS notifications ( id UUID PRIMARY KEY, user_uuid UUID REFERENCES users(id) ON DELETE CASCADE UNIQUE, discord_webhook VARCHAR(500), discord_enabled BOOLEAN DEFAULT FALSE, ntfy_topic VARCHAR(255), ntfy_enabled BOOLEAN DEFAULT FALSE, last_message_sent TIMESTAMP, current_notification_status VARCHAR(50) DEFAULT 'inactive', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

Design Notes:

  • ''UUID'' primary keys for security (not auto-increment)
  • ''ON DELETE CASCADE'' automatically removes related records
  • ''BYTEA'' for password hashes (binary data)

==== config/.env.example - Environment Variables ====

# Discord Bot DISCORD_BOT_TOKEN=your_discord_bot_token_here

API

API_URL=http://app:5000

Database

DB_HOST=db DB_PORT=5432 DB_NAME=app DB_USER=app DB_PASS=your_db_password_here

JWT

JWT_SECRET=your_jwt_secret_here

AI / OpenRouter

OPENROUTER_API_KEY=your_openrouter_api_key_here OPENROUTER_BASE_URL=https://openrouter.ai/api/v1 AI_CONFIG_PATH=/app/ai/ai_config.json

===== Docker Deployment =====

==== Dockerfile ====

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "-m", "api.main"]

Base Image: ''python:3.11-slim'' for minimal footprint

Default Command: Runs the Flask API via ''python -m api.main'' (ensures ''/app'' is on sys.path for correct module resolution)

==== docker-compose.yml ====

services: db: image: postgres:16 environment: POSTGRES_DB: app POSTGRES_USER: app POSTGRES_PASSWORD: ${DB_PASS} ports: - "5432:5432" volumes: - pgdata:/var/lib/postgresql/data - ./config/schema.sql:/docker-entrypoint-initdb.d/schema.sql healthcheck: test: ["CMD-SHELL", "pg_isready -U app"] interval: 5s timeout: 5s retries: 5

app: build: . ports: - "8080:5000" env_file: config/.env depends_on: db: condition: service_healthy

scheduler: build: . command: ["python", "-m", "scheduler.daemon"] env_file: config/.env depends_on: db: condition: service_healthy

bot: build: . command: ["python", "-m", "bot.bot"] env_file: config/.env depends_on: app: condition: service_started

volumes: pgdata:

Services:

^ Service ^ Purpose ^ Dependencies ^ | db | PostgreSQL database | None | | app | Flask API (port 8080) | db (healthy) | | scheduler | Background tasks | db (healthy) | | bot | Discord client | app |

Health Checks:

  • ''db'' uses PostgreSQL's built-in ''pg_isready''
  • Other services wait for their dependencies

Environment Files:

  • ''config/.env'' - Loaded by app, bot, and scheduler services
  • Root ''.env'' - Provides ''DB_PASS'' for docker-compose variable substitution (''${DB_PASS}'' in the db service)

Volume Mounts:

  • ''pgdata'' - Persistent database storage
  • ''schema.sql'' - Auto-runs on first startup

===== Data Flow Examples =====

==== Example 1: User Login ====

sequenceDiagram actor User participant Bot as bot.py participant API as api/main.py participant Auth as core/auth.py participant DB as PostgreSQL
User->>Bot: DM "username123"
Bot->>Bot: login_state[id] = {step: "username"}
Bot->>User: "Password?"
User->>Bot: DM "mypassword"
Bot->>API: POST /api/login {username, password}
API->>Auth: getLoginToken("username123", "mypassword")
Auth->>DB: SELECT * FROM users WHERE username='username123'
DB-->>Auth: user record (with hashed pw)
Auth->>Auth: bcrypt.checkpw(password, hash)
Auth-->>API: JWT token
API-->>Bot: {token: "eyJ..."} 200
Bot->>Bot: Store session {token, user_uuid, username}
Bot->>User: "Welcome back username123!"

==== Example 2: Natural Language Command ====

sequenceDiagram actor User participant Bot as bot.py participant Parser as ai/parser.py participant LLM as OpenRouter participant Registry as command_registry participant Handler as Domain Handler participant API as api/main.py participant DB as PostgreSQL
User->>Bot: DM "add a task to buy groceries"
Bot->>Bot: Show typing indicator
Bot->>Parser: parse("add a task...", "command_parser", history)
Parser->>LLM: chat.completions.create(system + user prompt)
LLM-->>Parser: {"interaction_type":"task","action":"add","task_name":"buy groceries"}
Parser->>Parser: json.loads() + validate
Parser-->>Bot: parsed JSON
Bot->>Bot: Update message_history (keep last 5)
Bot->>Registry: get_handler("task")
Registry-->>Bot: handle_task function
Bot->>Handler: handle_task(message, session, parsed)
Handler->>API: POST /api/tasks {name: "buy groceries"}
API->>DB: INSERT INTO tasks...
DB-->>API: inserted row
API-->>Handler: {id, name, ...} 201
Handler->>User: "Added task: **buy groceries**"

==== Example 3: API Request with Authentication ====

sequenceDiagram actor Client as External Client participant Flask as api/main.py participant Route as routes/example.py participant Auth as core/auth.py participant PG as core/postgres.py participant DB as PostgreSQL
Client->>Flask: POST /api/tasks<br/>Authorization: Bearer <jwt>
Flask->>Route: Route handler
Route->>Route: _get_user_uuid(token)<br/>jwt.decode() → sub claim
Route->>Auth: verifyLoginToken(token, userUUID)
Auth->>Auth: jwt.decode() + check sub == userUUID
Auth-->>Route: True
Route->>PG: insert("tasks", data)
PG->>DB: INSERT INTO tasks... RETURNING *
DB-->>PG: inserted row
PG-->>Route: {id, name, ...}
Route-->>Client: 201 {id, name, ...}

===== Security Considerations =====

==== SQL Injection Prevention ====

All database queries use parameterized queries:

Safe - parameterized

cur.execute("SELECT * FROM users WHERE id = %(id)s", {"id": user_id})

Unsafe - string interpolation (NOT used)

cur.execute(f"SELECT * FROM users WHERE id = '{user_id}'")

The ''_safe_id()'' function validates SQL identifiers.

==== Password Storage ====

Passwords are:

  • Hashed with bcrypt (adaptive hash function)
  • Salted automatically by bcrypt
  • Never stored in plain text
  • Never returned in API responses

==== JWT Security ====

  • Tokens expire after 1 hour
  • Signed with secret key (''JWT_SECRET'')
  • Contain minimal data (UUID, name)
  • Verified on every request

==== Rate Limiting ====

Not implemented in this template. Consider adding:

  • Rate limiting on API endpoints
  • Login attempt throttling
  • Command cooldown per user

===== Extending the Framework =====

==== Adding a New Domain Module ====

Step 1: Create database table

Edit ''config/schema.sql'': CREATE TABLE IF NOT EXISTS habits ( id UUID PRIMARY KEY, user_uuid UUID REFERENCES users(id) ON DELETE CASCADE, name VARCHAR(255) NOT NULL, streak INT DEFAULT 0, last_completed DATE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

Step 2: Create API routes

Create ''api/routes/habits.py'': import os import flask import jwt import core.auth as auth import core.postgres as postgres import uuid

def _get_user_uuid(token): """Decode JWT to extract user UUID. Returns None on failure.""" try: payload = jwt.decode(token, os.getenv("JWT_SECRET"), algorithms=["HS256"]) return payload.get("sub") except (jwt.ExpiredSignatureError, jwt.InvalidTokenError): return None

def register(app): @app.route("/api/habits", methods=["GET"]) def api_listHabits(): header = flask.request.headers.get("Authorization", "") if not header.startswith("Bearer "): return flask.jsonify({"error": "missing token"}), 401 token = header[7:]

    user_uuid = _get_user_uuid(token)
    if not user_uuid or not auth.verifyLoginToken(token, userUUID=user_uuid):
        return flask.jsonify({"error": "unauthorized"}), 401

    items = postgres.select("habits", where={"user_uuid": user_uuid})
    return flask.jsonify(items), 200

@app.route("/api/habits", methods=["POST"])
def api_addHabit():
    # ... similar pattern ...
    pass

Register in ''api/main.py'': import api.routes.habits as habits_routes register_routes(habits_routes)

Step 3: Create bot commands

Create ''bot/commands/habits.py'': from bot.command_registry import register_module import ai.parser as ai_parser from bot.bot import apiRequest

async def handle_habit(message, session, parsed): action = parsed.get("action") token = session["token"]

if action == "list":
    result, status = apiRequest("get", "/api/habits", token)
    if status == 200:
        lines = [f"- {h['name']} (streak: {h['streak']})" for h in result]
        await message.channel.send("Your habits:\n" + "\n".join(lines))

elif action == "add":
    name = parsed.get("habit_name")
    result, status = apiRequest("post", "/api/habits", token, {"name": name})
    if status == 201:
        await message.channel.send(f"Created habit: **{name}**")

def validate_habit_json(data): errors = [] if "action" not in data: errors.append("Missing required field: action") return errors

register_module("habit", handle_habit) ai_parser.register_validator("habit", validate_habit_json)

Step 4: Add AI prompts

Edit ''ai/ai_config.json'': { "prompts": { "command_parser": { ... }, "habit_parser": { "system": "You parse habit tracking commands...", "user_template": "Parse: "{user_input}". Return JSON with action (list/add/complete) and habit_name." } } }

===== Troubleshooting =====

==== Common Issues ====

Bot not responding to DMs

  • Verify ''message_content'' intent is enabled in Discord Developer Portal
  • Check bot has permission to read messages

API returning 401 Unauthorized

  • Verify token is valid and not expired
  • Check Authorization header format: ''Bearer ''

AI parser failing

  • Verify OpenRouter API key is valid
  • Check model is available
  • Review prompts in ''ai_config.json''

Database connection errors

  • Verify PostgreSQL is running
  • Check environment variables match Docker config

==== Debugging Tips ====

Enable verbose logging: import logging logging.basicConfig(level=logging.DEBUG)

Test database connection: from core import postgres print(postgres.select("users"))

Test AI parser: import ai.parser as ai_parser result = ai_parser.parse("add task buy milk", "command_parser") print(result)

===== Conclusion =====

The LLM Bot Framework provides a solid foundation for building AI-powered Discord bots. Its modular architecture allows developers to:

  • Add new domains without modifying core code
  • Use any OpenAI-compatible LLM
  • Deploy easily with Docker
  • Scale with PostgreSQL

Key design decisions:

  • Separation of concerns - Bot, API, and core logic are independent
  • Configuration-driven - AI behavior is customizable via JSON
  • Security-first - Parameterized queries, hashed passwords, JWT auth
  • Developer experience - Clear patterns for extending the framework

The framework demonstrates how LLMs can bridge the gap between natural language and structured database operations, enabling conversational interfaces that feel intuitive to users.