68 KiB
====== LLM Bot Framework - Complete Technical Documentation ======
===== Introduction =====
The LLM Bot Framework is a template for building Discord bots powered by Large Language Models (LLMs) with natural language command parsing. It provides a complete architecture for creating domain-specific bots that can understand and execute user commands expressed in natural language.
The framework demonstrates a modular, layered architecture that separates concerns cleanly:
- AI Layer - Natural language parsing via LLM
- Bot Layer - Discord client with session management
- API Layer - RESTful endpoints with JWT authentication
- Core Layer - Database, authentication, notifications
- Scheduler Layer - Background task processing
===== Architecture Overview =====
==== The Big Picture ====
At its core, the framework implements a request-response flow that transforms natural language into structured actions:
flowchart TD A["User Message (Discord DM)"] --> B["bot/bot.py"] B --> C["ai/parser.py (LLM)"] C --> D["Structured JSON"] D --> E["command_registry"] E --> F["Domain Handler"] F --> G["API Request (HTTP)"] G --> H["api/main.py (Flask)"] H --> I["core/ modules"] I --> J[("PostgreSQL")] J -.-> I I -.-> H H -.-> G G -.-> F F -.-> B B -.-> A==== Global Architecture ====
flowchart TB subgraph external ["External Services"] DISCORD["Discord API"] OPENROUTER["OpenRouter API"] endsubgraph docker ["Docker Compose"]
subgraph bot_svc ["bot service"]
BOT["bot/bot.py"]
REG["command_registry.py"]
CMDS["commands/example.py"]
end
subgraph app_svc ["app service (port 8080)"]
FLASK["api/main.py (Flask)"]
ROUTES["api/routes/example.py"]
end
subgraph core_svc ["core/"]
AUTH["auth.py"]
USERS["users.py"]
NOTIF["notifications.py"]
PG["postgres.py"]
end
subgraph ai_svc ["ai/"]
PARSER["parser.py"]
CONFIG["ai_config.json"]
end
subgraph sched_svc ["scheduler service"]
DAEMON["daemon.py"]
end
DB[("PostgreSQL")]
end
DISCORD <--> BOT
BOT --> PARSER
PARSER --> OPENROUTER
PARSER --> CONFIG
BOT --> REG
REG --> CMDS
CMDS --> PARSER
BOT -- "HTTP" --> FLASK
FLASK --> ROUTES
FLASK --> AUTH
FLASK --> USERS
ROUTES --> AUTH
ROUTES --> PG
AUTH --> USERS
AUTH --> PG
USERS --> PG
NOTIF --> PG
PG --> DB
DAEMON --> PG
==== Docker Service Orchestration ====
flowchart LR DB[("db\nPostgreSQL:16\nport 5432")] -- "healthcheck:\npg_isready" --> DB DB -- "service_healthy" --> APP["app\nFlask API\nport 8080:5000"] DB -- "service_healthy" --> SCHED["scheduler\ndaemon.py"] APP -- "service_started" --> BOT["bot\nbot.bot"] ENV[".env\n(DB_PASS)"] -.-> DB CENV["config/.env\n(all vars)"] -.-> APP CENV -.-> BOT CENV -.-> SCHED===== Core Layer =====
==== core/postgres.py - Generic PostgreSQL CRUD ====
This module provides a database abstraction layer that eliminates the need to write raw SQL for common operations. It uses parameterized queries throughout to prevent SQL injection.
=== Configuration ===
Connection settings are pulled from environment variables:
^ Variable ^ Default ^ Description ^ | DB_HOST | localhost | PostgreSQL server hostname | | DB_PORT | 5432 | PostgreSQL server port | | DB_NAME | app | Database name | | DB_USER | app | Database user | | DB_PASS | (empty) | Database password |
=== Internal Functions ===
_get_config() - Returns a dictionary of connection parameters
def _get_config():
return {
"host": os.environ.get("DB_HOST", "localhost"),
"port": int(os.environ.get("DB_PORT", 5432)),
"dbname": os.environ.get("DB_NAME", "app"),
"user": os.environ.get("DB_USER", "app"),
"password": os.environ.get("DB_PASS", ""),
}
This function centralizes database configuration, making it easy to test with different databases or override settings.
_safe_id(name) - Validates and escapes SQL identifiers
def _safe_id(name):
if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", name):
raise ValueError(f"Invalid SQL identifier: {name}")
return f'"{name}"'
Critical for security. This function:
- Validates that the identifier contains only alphanumeric characters and underscores
- Must start with a letter or underscore
- Wraps the identifier in double quotes for PostgreSQL
This prevents SQL injection through table or column names. Without this, a malicious input like ''users; DROP TABLE users;--'' could execute destructive commands.
_build_where(where, prefix="") - Constructs WHERE clauses from dictionaries
def _build_where(where, prefix=""):
clauses = []
params = {}
for i, (col, val) in enumerate(where.items()):
param_name = f"{prefix}{col}_{i}"
safe_col = _safe_id(col)
# ... handles various conditions
This function transforms Python dictionaries into SQL WHERE clauses:
Simple equality:
{"username": "john"}
→ WHERE "username" = %(username_0)s
Comparison operators:
{"age": (">", 18)}
→ WHERE "age" > %(age_0)s
Supported operators: =, !=, <, >, <=, >=, LIKE, ILIKE, IN, IS, IS NOT
IN clause:
{"status": ("IN", ["active", "pending"])}
→ WHERE "status" IN (%(status_0_0)s, %(status_0_1)s)
NULL checks:
{"deleted_at": None}
→ WHERE "deleted_at" IS NULL
{"deleted_at": ("IS NOT", None)}
→ WHERE "deleted_at" IS NOT NULL
The ''prefix'' parameter prevents parameter name collisions when the same column appears in multiple parts of a query (e.g., in both SET and WHERE clauses).
=== Connection Management ===
flowchart TD CRUD["CRUD function\n(insert/select/update/delete)"] --> GC["get_cursor(dict_cursor)"] GC --> GCONN["get_connection()"] GCONN --> CONNECT["psycopg2.connect(**_get_config())"] CONNECT --> YIELD_CONN["yield conn"] YIELD_CONN --> CURSOR["conn.cursor(RealDictCursor)"] CURSOR --> YIELD_CUR["yield cursor"] YIELD_CUR --> EXEC["cur.execute(query, params)"] EXEC --> SUCCESS{"Success?"} SUCCESS -- "Yes" --> COMMIT["conn.commit()"] SUCCESS -- "Exception" --> ROLLBACK["conn.rollback()"] COMMIT --> CLOSE["conn.close()"] ROLLBACK --> CLOSEget_connection() - Context manager for database connections
@contextmanager
def get_connection():
conn = psycopg2.connect(**_get_config())
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
This context manager ensures:
- Automatic connection opening when entering the context
- Automatic commit on successful completion
- Automatic rollback on any exception
- Guaranteed cleanup - connection always closes
get_cursor(dict_cursor=True) - Context manager for cursors
@contextmanager
def get_cursor(dict_cursor=True):
with get_connection() as conn:
factory = psycopg2.extras.RealDictCursor if dict_cursor else None
cur = conn.cursor(cursor_factory=factory)
try:
yield cur
finally:
cur.close()
By default, returns a ''RealDictCursor'' which returns rows as dictionaries instead of tuples, making results easier to work with:
# With dict cursor:
{"id": "abc123", "username": "john"}
Without dict cursor:
("abc123", "john")
=== CRUD Operations ===
insert(table, data) - Insert a single row
def insert(table, data):
columns = list(data.keys())
placeholders = [f"%({col})s" for col in columns]
safe_cols = [_safe_id(c) for c in columns]
query = f"""
INSERT INTO {_safe_id(table)}
({", ".join(safe_cols)})
VALUES ({", ".join(placeholders)})
RETURNING *
"""
with get_cursor() as cur:
cur.execute(query, data)
return dict(cur.fetchone()) if cur.rowcount else None
Example usage:
user = postgres.insert("users", {
"id": str(uuid.uuid4()),
"username": "john",
"password_hashed": hashed_pw
})
Returns the inserted row with all fields
The ''RETURNING *'' clause returns the complete inserted row, including any auto-generated fields like ''created_at''.
select(table, where=None, order_by=None, limit=None, offset=None) - Query rows
def select(table, where=None, order_by=None, limit=None, offset=None):
query = f"SELECT * FROM {_safe_id(table)}"
params = {}
if where:
clauses, params = _build_where(where)
query += f" WHERE {clauses}"
if order_by:
if isinstance(order_by, list):
order_by = ", ".join(order_by)
query += f" ORDER BY {order_by}"
if limit is not None:
query += f" LIMIT {int(limit)}"
if offset is not None:
query += f" OFFSET {int(offset)}"
with get_cursor() as cur:
cur.execute(query, params)
return [dict(row) for row in cur.fetchall()]
Examples:
Get all users
all_users = postgres.select("users")
Get users with filtering
active_users = postgres.select("users",
where={"status": "active"},
order_by="created_at DESC",
limit=10
)
Complex filtering
adults = postgres.select("users",
where={"age": (">=", 18), "status": "active"}
)
select_one(table, where) - Query a single row
def select_one(table, where):
results = select(table, where=where, limit=1)
return results[0] if results else None
Convenience method that returns ''None'' if no row found, instead of an empty list:
user = postgres.select_one("users", {"username": "john"})
if user:
print(user["id"])
update(table, data, where) - Update rows
def update(table, data, where):
set_columns = list(data.keys())
set_clause = ", ".join(f"{_safe_id(col)} = %(set_{col})s" for col in set_columns)
params = {f"set_{col}": val for col, val in data.items()}
where_clause, where_params = _build_where(where, prefix="where_")
params.update(where_params)
query = f"""
UPDATE {_safe_id(table)}
SET {set_clause}
WHERE {where_clause}
RETURNING *
"""
with get_cursor() as cur:
cur.execute(query, params)
return [dict(row) for row in cur.fetchall()]
The ''prefix'' parameter prevents parameter name collisions. Example:
updated = postgres.update(
"users",
{"status": "inactive"},
{"id": user_uuid}
)
Returns all updated rows (useful when updating multiple rows).
delete(table, where) - Delete rows
def delete(table, where):
where_clause, params = _build_where(where)
query = f"""
DELETE FROM {_safe_id(table)}
WHERE {where_clause}
RETURNING *
"""
with get_cursor() as cur:
cur.execute(query, params)
return [dict(row) for row in cur.fetchall()]
Returns deleted rows for confirmation/auditing:
deleted = postgres.delete("users", {"id": user_uuid})
print(f"Deleted {len(deleted)} user(s)")
=== Utility Functions ===
count(table, where=None) - Count rows
def count(table, where=None):
query = f"SELECT COUNT(*) as count FROM {_safe_id(table)}"
params = {}
if where:
clauses, params = _build_where(where)
query += f" WHERE {clauses}"
with get_cursor() as cur:
cur.execute(query, params)
return cur.fetchone()["count"]
exists(table, where) - Check if rows exist
def exists(table, where):
return count(table, where) > 0
upsert(table, data, conflict_columns) - Insert or update
def upsert(table, data, conflict_columns):
columns = list(data.keys())
placeholders = [f"%({col})s" for col in columns]
safe_cols = [_safe_id(c) for c in columns]
conflict_cols = [_safe_id(c) for c in conflict_columns]
update_cols = [c for c in columns if c not in conflict_columns]
update_clause = ", ".join(
f"{_safe_id(c)} = EXCLUDED.{_safe_id(c)}" for c in update_cols
)
query = f"""
INSERT INTO {_safe_id(table)}
({", ".join(safe_cols)})
VALUES ({", ".join(placeholders)})
ON CONFLICT ({", ".join(conflict_cols)})
DO UPDATE SET {update_clause}
RETURNING *
"""
Example: Create or update user settings
settings = postgres.upsert(
"notifications",
{
"user_uuid": user_uuid,
"discord_enabled": True,
"discord_webhook": "https://..."
},
conflict_columns=["user_uuid"]
)
insert_many(table, rows) - Bulk insert
def insert_many(table, rows):
if not rows:
return 0
columns = list(rows[0].keys())
safe_cols = [_safe_id(c) for c in columns]
query = f"""
INSERT INTO {_safe_id(table)}
({", ".join(safe_cols)})
VALUES %s
"""
template = f"({', '.join(f'%({col})s' for col in columns)})"
with get_cursor() as cur:
psycopg2.extras.execute_values(
cur, query, rows, template=template, page_size=100
)
return cur.rowcount
Uses ''execute_values'' for efficient bulk inserts (up to 100 rows per batch):
rows = [
{"id": str(uuid.uuid4()), "name": "Task 1"},
{"id": str(uuid.uuid4()), "name": "Task 2"},
{"id": str(uuid.uuid4()), "name": "Task 3"},
]
count = postgres.insert_many("tasks", rows)
execute(query, params=None) - Execute raw SQL
def execute(query, params=None):
with get_cursor() as cur:
cur.execute(query, params or {})
if cur.description:
return [dict(row) for row in cur.fetchall()]
return cur.rowcount
For complex queries that don't fit the CRUD pattern:
results = postgres.execute("""
SELECT u.username, COUNT(t.id) as task_count
FROM users u
LEFT JOIN tasks t ON t.user_uuid = u.id
GROUP BY u.username
HAVING COUNT(t.id) > :min_count
""", {"min_count": 5})
table_exists(table) - Check if table exists
get_columns(table) - Get table schema information
==== core/auth.py - JWT Authentication ====
This module handles user authentication using JWT (JSON Web Tokens) with bcrypt password hashing.
=== Token Management ===
flowchart LR subgraph getLoginToken A["username, password"] --> B["users.getUserUUID()"] B --> C["getUserpasswordHash()"] C --> D["bcrypt.checkpw()"] D -- "match" --> E["jwt.encode(payload)"] D -- "no match" --> F["return False"] E --> G["return JWT token"] endsubgraph verifyLoginToken
H["token, username/userUUID"] --> I{"username\nprovided?"}
I -- "Yes" --> J["users.getUserUUID()"]
J --> K["jwt.decode()"]
I -- "No" --> K
K -- "valid" --> L{"sub == userUUID?"}
L -- "Yes" --> M["return True"]
L -- "No" --> N["return False"]
K -- "expired/invalid" --> N
end
getLoginToken(username, password) - Generate JWT on successful login
def getLoginToken(username, password):
userUUID = users.getUserUUID(username)
if userUUID:
formatted_pass = password.encode("utf-8")
users_hashed_pw = getUserpasswordHash(userUUID)
if bcrypt.checkpw(formatted_pass, users_hashed_pw):
payload = {
"sub": userUUID,
"name": users.getUserFirstName(userUUID),
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1),
}
return jwt.encode(payload, os.getenv("JWT_SECRET"), algorithm="HS256")
return False
The JWT payload contains:
- sub (subject) - User's UUID, used for identification
- name - User's display name
- exp (expiration) - Token expires in 1 hour
verifyLoginToken(login_token, username=False, userUUID=False) - Validate JWT
def verifyLoginToken(login_token, username=False, userUUID=False):
if username:
userUUID = users.getUserUUID(username)
if userUUID:
try:
decoded_token = jwt.decode(
login_token, os.getenv("JWT_SECRET"), algorithms=["HS256"]
)
if decoded_token.get("sub") == str(userUUID):
return True
return False
except (ExpiredSignatureError, InvalidTokenError):
return False
return False
Validates that:
- The token is properly signed
- The token hasn't expired
- The token belongs to the claimed user
=== Password Management ===
getUserpasswordHash(userUUID) - Retrieve stored password hash
def getUserpasswordHash(userUUID):
user = postgres.select_one("users", {"id": userUUID})
if user:
pw_hash = user.get("password_hashed")
if isinstance(pw_hash, memoryview):
return bytes(pw_hash)
return pw_hash
return None
Handles the case where PostgreSQL returns ''BYTEA'' as a ''memoryview'' object.
flowchart LR A["unregisterUser(userUUID, password)"] --> B["getUserpasswordHash()"] B --> C["postgres.select_one('users')"] C --> D{"hash found?"} D -- "No" --> E["return False"] D -- "Yes" --> F["bcrypt.checkpw()"] F -- "match" --> G["users.deleteUser()"] F -- "no match" --> E G --> H["postgres.delete('users')"]unregisterUser(userUUID, password) - Delete user account with password confirmation
def unregisterUser(userUUID, password):
pw_hash = getUserpasswordHash(userUUID)
if not pw_hash:
return False
if bcrypt.checkpw(password.encode("utf-8"), pw_hash):
return users.deleteUser(userUUID)
return False
Requires password re-entry to prevent unauthorized account deletion.
==== core/users.py - User Management ====
This module provides CRUD operations for users with validation and security considerations.
=== Query Functions ===
getUserUUID(username) - Get UUID from username
def getUserUUID(username):
userRecord = postgres.select_one("users", {"username": username})
if userRecord:
return userRecord["id"]
return False
getUserFirstName(userUUID) - Get user's display name
def getUserFirstName(userUUID):
userRecord = postgres.select_one("users", {"id": userUUID})
if userRecord:
return userRecord.get("username")
return None
isUsernameAvailable(username) - Check username uniqueness
def isUsernameAvailable(username):
return not postgres.exists("users", {"username": username})
doesUserUUIDExist(userUUID) - Verify UUID exists
def doesUserUUIDExist(userUUID):
return postgres.exists("users", {"id": userUUID})
=== Mutation Functions ===
flowchart TD REG["registerUser(username, password)"] --> AVAIL["isUsernameAvailable()"] AVAIL --> EXISTS["postgres.exists('users')"] EXISTS -- "taken" --> RET_F["return False"] EXISTS -- "available" --> HASH["bcrypt.hashpw(password, gensalt())"] HASH --> CREATE["createUser(user_data)"] CREATE --> VALIDATE["validateUser()"] VALIDATE -- "invalid" --> RAISE["raise ValueError"] VALIDATE -- "valid" --> INSERT["postgres.insert('users')"] INSERT --> RET_T["return True"]UPD["updateUser(userUUID, data)"] --> LOOKUP["postgres.select_one('users')"]
LOOKUP -- "not found" --> RET_F2["return False"]
LOOKUP -- "found" --> FILTER["filter blocked fields\n(id, password_hashed, created_at)"]
FILTER --> UPD_DB["postgres.update('users')"]
DEL["deleteUser(userUUID)"] --> LOOKUP2["postgres.select_one('users')"]
LOOKUP2 -- "not found" --> RET_F3["return False"]
LOOKUP2 -- "found" --> DEL_DB["postgres.delete('users')"]
registerUser(username, password, data=None) - Create new user
def registerUser(username, password, data=None):
if isUsernameAvailable(username):
hashed_pass = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
user_data = {
"id": str(uuid.uuid4()),
"username": username,
"password_hashed": hashed_pass,
}
if data:
user_data.update(data)
createUser(user_data)
return True
return False
Uses ''bcrypt.gensalt()'' to generate a unique salt for each password.
updateUser(userUUID, data_dict) - Update user fields
def updateUser(userUUID, data_dict):
user = postgres.select_one("users", {"id": userUUID})
if not user:
return False
blocked = {"id", "password_hashed", "created_at"}
allowed = set(user.keys()) - blocked
updates = {k: v for k, v in data_dict.items() if k in allowed}
if not updates:
return False
postgres.update("users", updates, {"id": userUUID})
return True
Blocked fields prevent modification of:
- ''id'' - Primary key should never change
- ''password_hashed'' - Use ''changePassword()'' instead
- ''created_at'' - Audit field should be immutable
changePassword(userUUID, new_password) - Securely update password
def changePassword(userUUID, new_password):
user = postgres.select_one("users", {"id": userUUID})
if not user:
return False
hashed = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt())
postgres.update("users", {"password_hashed": hashed}, {"id": userUUID})
return True
deleteUser(userUUID) - Remove user record
def deleteUser(userUUID):
user = postgres.select_one("users", {"id": userUUID})
if not user:
return False
postgres.delete("users", {"id": userUUID})
return True
=== Internal Functions ===
createUser(data_dict) - Internal user creation with validation
def createUser(data_dict):
user_schema = {
"id": None,
"username": None,
"password_hashed": None,
"created_at": None,
}
for key in user_schema:
if key in data_dict:
user_schema[key] = data_dict[key]
is_valid, errors = validateUser(user_schema)
if not is_valid:
raise ValueError(f"Invalid user data: {', '.join(errors)}")
postgres.insert("users", user_schema)
validateUser(user) - Ensure required fields present
def validateUser(user):
required = ["id", "username", "password_hashed"]
missing = [f for f in required if f not in user or user[f] is None]
if missing:
return False, missing
return True, []
==== core/notifications.py - Multi-Channel Notifications ====
This module provides notification routing to multiple channels (Discord webhooks, ntfy).
flowchart TD SEND["_sendToEnabledChannels(settings, message)"] SEND --> CHK_D{"discord_enabled\nand webhook set?"} CHK_D -- "Yes" --> DISC["discord.send(webhook_url, message)"] CHK_D -- "No" --> CHK_N DISC --> CHK_N{"ntfy_enabled\nand topic set?"} CHK_N -- "Yes" --> NTFY["ntfy.send(topic, message)"] CHK_N -- "No" --> RESULT NTFY --> RESULT["return True if any succeeded"]DISC -- "POST webhook_url\n{content: message}" --> DISCORD_API["Discord Webhook\n(expects 204)"]
NTFY -- "POST ntfy.sh/topic\nmessage body" --> NTFY_API["ntfy.sh\n(expects 200)"]
GET["getNotificationSettings(userUUID)"] --> DB_SEL["postgres.select_one('notifications')"]
SET["setNotificationSettings(userUUID, data)"] --> DB_CHK{"existing\nrecord?"}
DB_CHK -- "Yes" --> DB_UPD["postgres.update('notifications')"]
DB_CHK -- "No" --> DB_INS["postgres.insert('notifications')"]
=== Notification Channels ===
discord.send(webhook_url, message) - Send via Discord webhook
class discord:
@staticmethod
def send(webhook_url, message):
try:
response = requests.post(webhook_url, json={"content": message})
return response.status_code == 204
except:
return False
Discord webhooks return ''204 No Content'' on success.
ntfy.send(topic, message) - Send via ntfy.sh
class ntfy:
@staticmethod
def send(topic, message):
try:
response = requests.post(
f"https://ntfy.sh/{topic}", data=message.encode("utf-8")
)
return response.status_code == 200
except:
return False
ntfy.sh is a free push notification service. Users subscribe to topics.
=== Settings Management ===
getNotificationSettings(userUUID) - Retrieve user notification config
def getNotificationSettings(userUUID):
settings = postgres.select_one("notifications", {"user_uuid": userUUID})
if not settings:
return False
return settings
setNotificationSettings(userUUID, data_dict) - Update notification config
def setNotificationSettings(userUUID, data_dict):
existing = postgres.select_one("notifications", {"user_uuid": userUUID})
allowed = [
"discord_webhook",
"discord_enabled",
"ntfy_topic",
"ntfy_enabled",
]
updates = {k: v for k, v in data_dict.items() if k in allowed}
if not updates:
return False
if existing:
postgres.update("notifications", updates, {"user_uuid": userUUID})
else:
updates["id"] = str(uuid.uuid4())
updates["user_uuid"] = userUUID
postgres.insert("notifications", updates)
return True
Implements an upsert pattern - updates if exists, inserts if not.
_sendToEnabledChannels(notif_settings, message) - Route to all enabled channels
def _sendToEnabledChannels(notif_settings, message):
sent = False
if notif_settings.get("discord_enabled") and notif_settings.get("discord_webhook"):
if discord.send(notif_settings["discord_webhook"], message):
sent = True
if notif_settings.get("ntfy_enabled") and notif_settings.get("ntfy_topic"):
if ntfy.send(notif_settings["ntfy_topic"], message):
sent = True
return sent
Returns ''True'' if any channel succeeded, allowing partial failures.
===== AI Layer =====
==== ai/parser.py - LLM-Powered JSON Parser ====
This is the heart of the natural language interface. It transforms user messages into structured JSON using an LLM, with automatic retry and validation.
=== Configuration Loading ===
CONFIG_PATH = os.environ.get(
"AI_CONFIG_PATH", os.path.join(os.path.dirname(__file__), "ai_config.json")
)
with open(CONFIG_PATH, "r") as f:
AI_CONFIG = json.load(f)
Configuration is loaded once at module import time.
=== OpenAI Client Initialization ===
client = OpenAI(
api_key=os.getenv("OPENROUTER_API_KEY"),
base_url=os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1"),
)
Uses OpenRouter for model flexibility - can use any OpenAI-compatible model including:
- OpenAI models (GPT-4, GPT-3.5)
- Anthropic models (Claude)
- Open-source models (Llama, Qwen, Mistral)
=== Internal Functions ===
_extract_json_from_text(text) - Extract JSON from reasoning model output
def _extract_json_from_text(text):
match = re.search(r"```json\s*(\{.*?\})\s*```", text, re.DOTALL)
if match:
return match.group(1)
match = re.search(r"(\{[^{}]*\})", text, re.DOTALL)
if match:
return match.group(1)
return None
Some reasoning models (like Qwen with thinking) output reasoning before the JSON. This extracts:
- JSON inside markdown code blocks: ''%%
json {...}%%'' - First JSON object in text: ''{...}''
_call_llm(system_prompt, user_prompt) - Execute LLM request
def _call_llm(system_prompt, user_prompt):
try:
response = client.chat.completions.create(
model=AI_CONFIG["model"],
max_tokens=AI_CONFIG.get("max_tokens", 8192),
timeout=AI_CONFIG["validation"]["timeout_seconds"],
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
msg = response.choices[0].message
text = msg.content.strip() if msg.content else ""
if text:
return text
reasoning = getattr(msg, "reasoning", None)
if reasoning:
extracted = _extract_json_from_text(reasoning)
if extracted:
return extracted
return None
except Exception as e:
print(f"LLM error: {type(e).__name__}: {e}", flush=True)
return None
Handles both:
- Standard responses - JSON in ''message.content''
- Reasoning responses - JSON extracted from ''message.reasoning''
=== Main Parsing Function ===
flowchart TD START["parse(user_input, interaction_type)"] --> RETRY_CHK{"retry_count\n>= max_retries?"} RETRY_CHK -- "Yes" --> ERR_MAX["return {error: 'Failed after N retries'}"] RETRY_CHK -- "No" --> PROMPT["Build prompt from\nai_config.json template"] PROMPT --> HISTORY["Inject last 3 conversation turns"] HISTORY --> LLM["_call_llm(system, user_prompt)"] LLM --> LLM_CHK{"Response\nreceived?"} LLM_CHK -- "No" --> ERR_UNAVAIL["return {error: 'AI unavailable'}"] LLM_CHK -- "Yes" --> JSON_PARSE["json.loads(response)"] JSON_PARSE -- "JSONDecodeError" --> RETRY_JSON["parse(..., retry+1,\nerrors=['not valid JSON'])"] JSON_PARSE -- "Success" --> VALIDATE{"Custom validator\nregistered?"} VALIDATE -- "No" --> RETURN["return parsed JSON"] VALIDATE -- "Yes" --> RUN_VAL["validator(parsed)"] RUN_VAL --> VAL_CHK{"Validation\nerrors?"} VAL_CHK -- "No" --> RETURN VAL_CHK -- "Yes" --> RETRY_VAL["parse(..., retry+1,\nerrors=validation_errors)"]parse(user_input, interaction_type, retry_count=0, errors=None, history=None)
def parse(user_input, interaction_type, retry_count=0, errors=None, history=None):
if retry_count >= AI_CONFIG["validation"]["max_retries"]:
return {
"error": f"Failed to parse after {retry_count} retries",
"user_input": user_input,
}
Parameters:
- ''user_input'' - Raw user message
- ''interaction_type'' - Key in config prompts (e.g., ''command_parser'')
- ''retry_count'' - Internal counter for automatic retry
- ''errors'' - Previous validation errors (for retry context)
- ''history'' - List of (message, parsed_result) tuples
Retry Logic:
try:
parsed = json.loads(response_text)
except json.JSONDecodeError:
return parse(
user_input,
interaction_type,
retry_count + 1,
["Response was not valid JSON"],
history=history,
)
When JSON parsing fails, the function recursively calls itself with incremented retry count, passing the error to give the LLM context to fix its output.
Validation Integration:
validator = AI_CONFIG["validation"].get("validators", {}).get(interaction_type)
if validator:
validation_errors = validator(parsed)
if validation_errors:
return parse(
user_input,
interaction_type,
retry_count + 1,
validation_errors,
history=history,
)
Custom validators are called after successful JSON parsing. If validation fails, the parser retries with the validation errors as context.
Conversation History:
history_context = "No previous context"
if history and len(history) > 0:
history_lines = []
for i, (msg, result) in enumerate(history[-3:]):
history_lines.append(f"{i + 1}. User: {msg}")
if isinstance(result, dict) and not result.get("error"):
history_lines.append(f" Parsed: {json.dumps(result)}")
else:
history_lines.append(f" Parsed: {result}")
history_context = "\n".join(history_lines)
The last 3 conversation turns are included for context, enabling:
- Pronoun resolution ("Add it to my list")
- Follow-up commands ("Change the second one")
- Clarification handling
=== Validator Registration ===
register_validator(interaction_type, validator_fn)
def register_validator(interaction_type, validator_fn):
if "validators" not in AI_CONFIG["validation"]:
AI_CONFIG["validation"]["validators"] = {}
AI_CONFIG["validation"]["validators"][interaction_type] = validator_fn
Domain modules register their validators:
def validate_example_json(data):
errors = []
if "action" not in data:
errors.append("Missing required field: action")
return errors
ai_parser.register_validator("example", validate_example_json)
==== ai/ai_config.json - AI Configuration ====
{
"model": "qwen/qwen3-next-80b-a3b-thinking:nitro",
"max_tokens": 8192,
"prompts": {
"command_parser": {
"system": "...",
"user_template": "..."
}
},
"validation": {
"max_retries": 3,
"timeout_seconds": 15,
"validators": {}
}
}
Configuration Fields:
^ Field ^ Purpose ^ | model | OpenRouter model identifier | | max_tokens | Maximum response length | | prompts | Prompt templates by interaction type | | validation.max_retries | Retry attempts on failure | | validation.timeout_seconds | LLM request timeout | | validation.validators | Runtime-registered validators |
Prompt Structure:
Each prompt has:
- ''system'' - System prompt defining the AI's role
- ''user_template'' - Template with ''{user_input}'' and ''{history_context}'' placeholders
===== Bot Layer =====
==== bot/bot.py - Discord Client ====
This is the Discord bot client that manages user sessions and routes commands.
=== Global State ===
user_sessions = {} # discord_id → {token, user_uuid, username}
login_state = {} # discord_id → {step, username}
message_history = {} # discord_id → [(msg, parsed), ...]
user_cache = {} # discord_id → {hashed_password, user_uuid, username}
CACHE_FILE = "/app/user_cache.pkl"
user_sessions - Active authenticated sessions
login_state - Tracks multi-step login flow
message_history - Last 5 messages per user for context
user_cache - Persisted credentials for auto-login
flowchart TD MSG["on_message(message)"] --> SELF{"Own\nmessage?"} SELF -- "Yes" --> IGNORE["ignore"] SELF -- "No" --> DM{"Is DM\nchannel?"} DM -- "No" --> IGNORE DM -- "Yes" --> LOGIN_CHK{"In\nlogin_state?"} LOGIN_CHK -- "Yes" --> HANDLE_LOGIN["handleLoginStep()"] LOGIN_CHK -- "No" --> SESSION_CHK{"Has\nsession?"} SESSION_CHK -- "No" --> START_LOGIN["Start login flow\n(ask for username)"] SESSION_CHK -- "Yes" --> ROUTE["routeCommand()"]ROUTE --> HELP_CHK{"'help' in\nmessage?"}
HELP_CHK -- "Yes" --> HELP["sendHelpMessage()"]
HELP_CHK -- "No" --> PARSE["ai_parser.parse()"]
PARSE --> CLARIFY{"needs_\nclarification?"}
CLARIFY -- "Yes" --> ASK["Ask user to clarify"]
CLARIFY -- "No" --> ERROR_CHK{"error in\nparsed?"}
ERROR_CHK -- "Yes" --> SHOW_ERR["Show error"]
ERROR_CHK -- "No" --> HANDLER["get_handler(interaction_type)"]
HANDLER -- "found" --> EXEC["handler(message, session, parsed)"]
HANDLER -- "not found" --> UNKNOWN["'Unknown command type'"]
=== Discord Client Setup ===
intents = discord.Intents.default()
intents.message_content = True
client = discord.Client(intents=intents)
''message_content'' intent is required to read message text.
=== Utility Functions ===
decodeJwtPayload(token) - Decode JWT without verification
def decodeJwtPayload(token):
payload = token.split(".")[1]
payload += "=" * (4 - len(payload) % 4)
return json.loads(base64.urlsafe_b64decode(payload))
Used to extract ''user_uuid'' from tokens. Note: This does not verify the signature; verification happens server-side.
apiRequest(method, endpoint, token=None, data=None) - Make HTTP requests to API
def apiRequest(method, endpoint, token=None, data=None):
url = f"{API_URL}{endpoint}"
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
try:
resp = getattr(requests, method)(url, headers=headers, json=data, timeout=10)
try:
return resp.json(), resp.status_code
except ValueError:
return {}, resp.status_code
except requests.RequestException:
return {"error": "API unavailable"}, 503
Returns a tuple of (response_data, status_code) for easy handling.
=== Cache Management ===
loadCache() - Load persisted user credentials
def loadCache():
try:
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, "rb") as f:
global user_cache
user_cache = pickle.load(f)
print(f"Loaded cache for {len(user_cache)} users")
except Exception as e:
print(f"Error loading cache: {e}")
saveCache() - Persist user credentials
def saveCache():
try:
with open(CACHE_FILE, "wb") as f:
pickle.dump(user_cache, f)
except Exception as e:
print(f"Error saving cache: {e}")
Why cache credentials?
- Users don't need to re-login every session
- Passwords are hashed locally for verification
- New tokens are fetched automatically
hashPassword() / verifyPassword() - Local password handling
def hashPassword(password):
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verifyPassword(password, hashed):
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
=== Authentication Flow ===
flowchart TD START["negotiateToken(discord_id, username, password)"] --> CACHE{"Cached user\nfound?"} CACHE -- "Yes" --> VERIFY["verifyPassword(password, cached.hashed_password)"] VERIFY -- "Match" --> API_CACHED["POST /api/login"] VERIFY -- "Mismatch" --> API_FRESH["POST /api/login"] CACHE -- "No" --> API_FRESHAPI_CACHED -- "200 + token" --> DECODE_C["decodeJwtPayload(token)"]
DECODE_C --> UPDATE_C["Update cache\n(keep existing hash)"]
UPDATE_C --> RET_OK["return (token, user_uuid)"]
API_FRESH -- "200 + token" --> DECODE_F["decodeJwtPayload(token)"]
DECODE_F --> UPDATE_F["Cache new credentials\n(hashPassword(password))"]
UPDATE_F --> RET_OK
API_CACHED -- "failure" --> RET_FAIL["return (None, None)"]
API_FRESH -- "failure" --> RET_FAIL
negotiateToken(discord_id, username, password) - Get token with caching
def negotiateToken(discord_id, username, password):
cached = getCachedUser(discord_id)
if (
cached
and cached.get("username") == username
and verifyPassword(password, cached.get("hashed_password"))
):
result, status = apiRequest(
"post", "/api/login", data={"username": username, "password": password}
)
if status == 200 and "token" in result:
token = result["token"]
payload = decodeJwtPayload(token)
user_uuid = payload["sub"]
setCachedUser(
discord_id,
{
"hashed_password": cached["hashed_password"],
"user_uuid": user_uuid,
"username": username,
},
)
return token, user_uuid
return None, None
result, status = apiRequest(
"post", "/api/login", data={"username": username, "password": password}
)
if status == 200 and "token" in result:
token = result["token"]
payload = decodeJwtPayload(token)
user_uuid = payload["sub"]
setCachedUser(
discord_id,
{
"hashed_password": hashPassword(password),
"user_uuid": user_uuid,
"username": username,
},
)
return token, user_uuid
return None, None
Flow:
- Check if user has cached credentials
- If cached password matches, fetch new token
- If no cache or mismatch, authenticate normally
- Cache credentials for future sessions
handleAuthFailure(message) - Handle expired sessions
async def handleAuthFailure(message):
discord_id = message.author.id
user_sessions.pop(discord_id, None)
await message.channel.send(
"Your session has expired. Send any message to log in again."
)
=== Login Flow ===
handleLoginStep(message) - Multi-step login process
async def handleLoginStep(message):
discord_id = message.author.id
state = login_state[discord_id]
if state["step"] == "username":
state["username"] = message.content.strip()
state["step"] = "password"
await message.channel.send("Password?")
elif state["step"] == "password":
username = state["username"]
password = message.content.strip()
del login_state[discord_id]
token, user_uuid = negotiateToken(discord_id, username, password)
if token and user_uuid:
user_sessions[discord_id] = {
"token": token,
"user_uuid": user_uuid,
"username": username,
}
registered = ", ".join(list_registered()) or "none"
await message.channel.send(
f"Welcome back **{username}**!\n\n"
f"Registered modules: {registered}\n\n"
f"Send 'help' for available commands."
)
else:
await message.channel.send(
"Invalid credentials. Send any message to try again."
)
State Machine:
stateDiagram-v2 [*] --> AskUsername: First message (no session) AskUsername --> AskPassword: User sends username AskPassword --> Authenticated: negotiateToken() succeeds AskPassword --> [*]: Invalid credentials\n(send any message to retry) Authenticated --> routeCommand: Subsequent messages=== Command Routing ===
routeCommand(message) - Parse and route commands
async def routeCommand(message):
discord_id = message.author.id
session = user_sessions[discord_id]
user_input = message.content.lower()
if "help" in user_input or "what can i say" in user_input:
await sendHelpMessage(message)
return
async with message.channel.typing():
history = message_history.get(discord_id, [])
parsed = ai_parser.parse(message.content, "command_parser", history=history)
if discord_id not in message_history:
message_history[discord_id] = []
message_history[discord_id].append((message.content, parsed))
message_history[discord_id] = message_history[discord_id][-5:]
if "needs_clarification" in parsed:
await message.channel.send(
f"I'm not quite sure what you mean. {parsed['needs_clarification']}"
)
return
if "error" in parsed:
await message.channel.send(
f"I had trouble understanding that: {parsed['error']}"
)
return
interaction_type = parsed.get("interaction_type")
handler = get_handler(interaction_type)
if handler:
await handler(message, session, parsed)
else:
registered = ", ".join(list_registered()) or "none"
await message.channel.send(
f"Unknown command type '{interaction_type}'. Registered modules: {registered}"
)
Flow:
- Check for help request
- Show typing indicator while LLM processes
- Parse with AI, including conversation history
- Update message history (keep last 5)
- Handle clarification requests
- Handle parsing errors
- Route to appropriate handler
- Handle unknown interaction types
=== Discord Event Handlers ===
on_ready() - Bot startup
@client.event
async def on_ready():
print(f"Bot logged in as {client.user}")
loadCache()
backgroundLoop.start()
on_message(message) - Message handler
@client.event
async def on_message(message):
if message.author == client.user:
return
if not isinstance(message.channel, discord.DMChannel):
return
discord_id = message.author.id
if discord_id in login_state:
await handleLoginStep(message)
return
if discord_id not in user_sessions:
login_state[discord_id] = {"step": "username"}
await message.channel.send("Welcome! Send your username to log in.")
return
await routeCommand(message)
Filters:
- Ignore own messages
- Only respond to DMs (not server channels)
=== Background Tasks ===
backgroundLoop() - Scheduled task execution
@tasks.loop(seconds=60)
async def backgroundLoop():
"""Override this in your domain module or extend as needed."""
pass
@backgroundLoop.before_loop
async def beforeBackgroundLoop():
await client.wait_until_ready()
Runs every 60 seconds. Override for domain-specific polling.
==== bot/command_registry.py - Module Registration ====
flowchart TD subgraph "Module Registration (at import time)" EX["commands/example.py"] -- "register_module('example', handle_example)" --> REG["COMMAND_MODULES dict"] EX -- "register_validator('example', validate_fn)" --> VAL["AI_CONFIG validators dict"] HABIT["commands/habits.py"] -- "register_module('habit', handle_habit)" --> REG HABIT -- "register_validator('habit', validate_fn)" --> VAL endsubgraph "Runtime Dispatch"
PARSED["parsed JSON\n{interaction_type: 'example'}"] --> GET["get_handler('example')"]
GET --> REG
REG --> HANDLER["handle_example(message, session, parsed)"]
end
This module provides a simple registry pattern for command handlers.
COMMAND_MODULES = {}
def register_module(interaction_type, handler):
COMMAND_MODULES[interaction_type] = handler
def get_handler(interaction_type):
return COMMAND_MODULES.get(interaction_type)
def list_registered():
return list(COMMAND_MODULES.keys())
Why a registry?
- Decouples command handling from bot logic
- Domain modules self-register
- Easy to add/remove modules without touching core code
==== bot/commands/example.py - Example Command Module ====
This demonstrates the pattern for creating domain modules.
from bot.command_registry import register_module
import ai.parser as ai_parser
async def handle_example(message, session, parsed):
action = parsed.get("action", "unknown")
token = session["token"]
user_uuid = session["user_uuid"]
if action == "check":
await message.channel.send(
f"Checking example items for {session['username']}..."
)
elif action == "add":
item_name = parsed.get("item_name", "unnamed")
await message.channel.send(f"Adding example item: **{item_name}**")
else:
await message.channel.send(f"Unknown example action: {action}")
def validate_example_json(data):
errors = []
if not isinstance(data, dict):
return ["Response must be a JSON object"]
if "error" in data:
return []
if "action" not in data:
errors.append("Missing required field: action")
action = data.get("action")
if action == "add" and "item_name" not in data:
errors.append("Missing required field for add: item_name")
return errors
register_module("example", handle_example)
ai_parser.register_validator("example", validate_example_json)
Pattern:
- Define async handler function
- Define validation function
- Register both at module load time
===== API Layer =====
==== api/main.py - Flask Application ====
This is the REST API server providing endpoints for authentication, user management, and domain operations.
=== Route Registration ===
ROUTE_MODULES = []
def register_routes(module):
"""Register a routes module. Module should have a register(app) function."""
ROUTE_MODULES.append(module)
Modules are registered before app startup:
if name == "main":
for module in ROUTE_MODULES:
if hasattr(module, "register"):
module.register(app)
app.run(host="0.0.0.0", port=5000)
=== Authentication Endpoints ===
POST /api/register - Create new user
@app.route("/api/register", methods=["POST"])
def api_register():
data = flask.request.get_json()
username = data.get("username")
password = data.get("password")
if not username or not password:
return flask.jsonify({"error": "username and password required"}), 400
result = users.registerUser(username, password, data)
if result:
return flask.jsonify({"success": True}), 201
else:
return flask.jsonify({"error": "username taken"}), 409
Response Codes:
- ''201'' - User created successfully
- ''400'' - Missing required fields
- ''409'' - Username already exists
POST /api/login - Authenticate user
@app.route("/api/login", methods=["POST"])
def api_login():
data = flask.request.get_json()
username = data.get("username")
password = data.get("password")
if not username or not password:
return flask.jsonify({"error": "username and password required"}), 400
token = auth.getLoginToken(username, password)
if token:
return flask.jsonify({"token": token}), 200
else:
return flask.jsonify({"error": "invalid credentials"}), 401
Response Codes:
- ''200'' - Returns JWT token
- ''400'' - Missing required fields
- ''401'' - Invalid credentials
=== User Endpoints ===
GET /api/getUserUUID/ - Get user's UUID
@app.route("/api/getUserUUID/", methods=["GET"])
def api_getUserUUID(username):
header = flask.request.headers.get("Authorization", "")
if not header.startswith("Bearer "):
return flask.jsonify({"error": "missing token"}), 401
token = header[7:]
if auth.verifyLoginToken(token, username):
return flask.jsonify(users.getUserUUID(username)), 200
else:
return flask.jsonify({"error": "unauthorized"}), 401
GET /api/user/ - Get user details
@app.route("/api/user/", methods=["GET"])
def api_getUser(userUUID):
# ... auth check ...
user = postgres.select_one("users", {"id": userUUID})
if user:
user.pop("password_hashed", None) # Never return password hash
return flask.jsonify(user), 200
else:
return flask.jsonify({"error": "user not found"}), 404
Security note: ''password_hashed'' is explicitly removed before returning.
PUT /api/user/ - Update user
@app.route("/api/user/", methods=["PUT"])
def api_updateUser(userUUID):
# ... auth check ...
data = flask.request.get_json()
result = users.updateUser(userUUID, data)
if result:
return flask.jsonify({"success": True}), 200
else:
return flask.jsonify({"error": "no valid fields to update"}), 400
DELETE /api/user/ - Delete user
@app.route("/api/user/", methods=["DELETE"])
def api_deleteUser(userUUID):
# ... auth check ...
data = flask.request.get_json()
password = data.get("password")
if not password:
return flask.jsonify(
{"error": "password required for account deletion"}
), 400
result = auth.unregisterUser(userUUID, password)
if result:
return flask.jsonify({"success": True}), 200
else:
return flask.jsonify({"error": "invalid password"}), 401
Security: Requires password confirmation for deletion.
=== Health Check ===
GET /health - Service health
@app.route("/health", methods=["GET"])
def health_check():
return flask.jsonify({"status": "ok"}), 200
Used by Docker health checks and load balancers.
==== api/routes/example.py - Example Route Module ====
def _get_user_uuid(token):
"""Decode JWT to extract user UUID. Returns None on failure."""
try:
payload = jwt.decode(token, os.getenv("JWT_SECRET"), algorithms=["HS256"])
return payload.get("sub")
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
def register(app):
@app.route("/api/example", methods=["GET"])
def api_listExamples():
header = flask.request.headers.get("Authorization", "")
if not header.startswith("Bearer "):
return flask.jsonify({"error": "missing token"}), 401
token = header[7:]
user_uuid = _get_user_uuid(token)
if not user_uuid or not auth.verifyLoginToken(token, userUUID=user_uuid):
return flask.jsonify({"error": "unauthorized"}), 401
items = postgres.select("examples")
return flask.jsonify(items), 200
@app.route("/api/example", methods=["POST"])
def api_addExample():
# ... similar pattern ...
data = flask.request.get_json()
item = postgres.insert("examples", data)
return flask.jsonify(item), 201
Authentication Pattern:
- Extract Bearer token from Authorization header
- Decode token to extract user UUID
- Verify token belongs to that user via ''verifyLoginToken''
- Return ''401 Unauthorized'' if invalid
- Proceed with request if valid
===== Scheduler Layer =====
==== scheduler/daemon.py - Background Task Daemon ====
flowchart TD START["daemon_loop()"] --> POLL["poll_callback()"] POLL --> SLEEP["time.sleep(POLL_INTERVAL)"] SLEEP --> POLL POLL -- "Exception" --> LOG["logger.error()"] LOG --> SLEEPThis module provides a simple polling loop for background tasks.
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", 60))
def poll_callback():
"""
Override this function with your domain logic.
Called every POLL_INTERVAL seconds.
"""
pass
def daemon_loop():
logger.info("Scheduler daemon starting")
while True:
try:
poll_callback()
except Exception as e:
logger.error(f"Poll callback error: {e}")
time.sleep(POLL_INTERVAL)
Usage:
Override ''poll_callback()'' in your implementation:
scheduler/daemon.py (customized)
def poll_callback():
# Check for due tasks
tasks = postgres.select("tasks", where={"due_at": ("<=", datetime.now())})
for task in tasks:
settings = notifications.getNotificationSettings(task["user_uuid"])
notifications._sendToEnabledChannels(
settings,
f"Task due: {task['name']}"
)
Error Handling: Exceptions are caught and logged, allowing the daemon to continue running.
===== Configuration =====
==== config/schema.sql - Database Schema ====
erDiagram users { UUID id PK VARCHAR username UK "NOT NULL" BYTEA password_hashed "NOT NULL" TIMESTAMP created_at "DEFAULT NOW()" }notifications {
UUID id PK
UUID user_uuid FK,UK "REFERENCES users(id)"
VARCHAR discord_webhook
BOOLEAN discord_enabled "DEFAULT FALSE"
VARCHAR ntfy_topic
BOOLEAN ntfy_enabled "DEFAULT FALSE"
TIMESTAMP last_message_sent
VARCHAR current_notification_status "DEFAULT 'inactive'"
TIMESTAMP created_at "DEFAULT NOW()"
TIMESTAMP updated_at "DEFAULT NOW()"
}
users ||--o| notifications : "ON DELETE CASCADE"
-- Users table (minimal)
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY,
username VARCHAR(255) UNIQUE NOT NULL,
password_hashed BYTEA NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Notifications table
CREATE TABLE IF NOT EXISTS notifications (
id UUID PRIMARY KEY,
user_uuid UUID REFERENCES users(id) ON DELETE CASCADE UNIQUE,
discord_webhook VARCHAR(500),
discord_enabled BOOLEAN DEFAULT FALSE,
ntfy_topic VARCHAR(255),
ntfy_enabled BOOLEAN DEFAULT FALSE,
last_message_sent TIMESTAMP,
current_notification_status VARCHAR(50) DEFAULT 'inactive',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Design Notes:
- ''UUID'' primary keys for security (not auto-increment)
- ''ON DELETE CASCADE'' automatically removes related records
- ''BYTEA'' for password hashes (binary data)
==== config/.env.example - Environment Variables ====
# Discord Bot
DISCORD_BOT_TOKEN=your_discord_bot_token_here
API
API_URL=http://app:5000
Database
DB_HOST=db
DB_PORT=5432
DB_NAME=app
DB_USER=app
DB_PASS=your_db_password_here
JWT
JWT_SECRET=your_jwt_secret_here
AI / OpenRouter
OPENROUTER_API_KEY=your_openrouter_api_key_here
OPENROUTER_BASE_URL=https://openrouter.ai/api/v1
AI_CONFIG_PATH=/app/ai/ai_config.json
===== Docker Deployment =====
==== Dockerfile ====
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "-m", "api.main"]
Base Image: ''python:3.11-slim'' for minimal footprint
Default Command: Runs the Flask API via ''python -m api.main'' (ensures ''/app'' is on sys.path for correct module resolution)
==== docker-compose.yml ====
services:
db:
image: postgres:16
environment:
POSTGRES_DB: app
POSTGRES_USER: app
POSTGRES_PASSWORD: ${DB_PASS}
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
- ./config/schema.sql:/docker-entrypoint-initdb.d/schema.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U app"]
interval: 5s
timeout: 5s
retries: 5
app:
build: .
ports:
- "8080:5000"
env_file: config/.env
depends_on:
db:
condition: service_healthy
scheduler:
build: .
command: ["python", "-m", "scheduler.daemon"]
env_file: config/.env
depends_on:
db:
condition: service_healthy
bot:
build: .
command: ["python", "-m", "bot.bot"]
env_file: config/.env
depends_on:
app:
condition: service_started
volumes:
pgdata:
Services:
^ Service ^ Purpose ^ Dependencies ^ | db | PostgreSQL database | None | | app | Flask API (port 8080) | db (healthy) | | scheduler | Background tasks | db (healthy) | | bot | Discord client | app |
Health Checks:
- ''db'' uses PostgreSQL's built-in ''pg_isready''
- Other services wait for their dependencies
Environment Files:
- ''config/.env'' - Loaded by app, bot, and scheduler services
- Root ''.env'' - Provides ''DB_PASS'' for docker-compose variable substitution (''${DB_PASS}'' in the db service)
Volume Mounts:
- ''pgdata'' - Persistent database storage
- ''schema.sql'' - Auto-runs on first startup
===== Data Flow Examples =====
==== Example 1: User Login ====
sequenceDiagram actor User participant Bot as bot.py participant API as api/main.py participant Auth as core/auth.py participant DB as PostgreSQLUser->>Bot: DM "username123"
Bot->>Bot: login_state[id] = {step: "username"}
Bot->>User: "Password?"
User->>Bot: DM "mypassword"
Bot->>API: POST /api/login {username, password}
API->>Auth: getLoginToken("username123", "mypassword")
Auth->>DB: SELECT * FROM users WHERE username='username123'
DB-->>Auth: user record (with hashed pw)
Auth->>Auth: bcrypt.checkpw(password, hash)
Auth-->>API: JWT token
API-->>Bot: {token: "eyJ..."} 200
Bot->>Bot: Store session {token, user_uuid, username}
Bot->>User: "Welcome back username123!"
==== Example 2: Natural Language Command ====
sequenceDiagram actor User participant Bot as bot.py participant Parser as ai/parser.py participant LLM as OpenRouter participant Registry as command_registry participant Handler as Domain Handler participant API as api/main.py participant DB as PostgreSQLUser->>Bot: DM "add a task to buy groceries"
Bot->>Bot: Show typing indicator
Bot->>Parser: parse("add a task...", "command_parser", history)
Parser->>LLM: chat.completions.create(system + user prompt)
LLM-->>Parser: {"interaction_type":"task","action":"add","task_name":"buy groceries"}
Parser->>Parser: json.loads() + validate
Parser-->>Bot: parsed JSON
Bot->>Bot: Update message_history (keep last 5)
Bot->>Registry: get_handler("task")
Registry-->>Bot: handle_task function
Bot->>Handler: handle_task(message, session, parsed)
Handler->>API: POST /api/tasks {name: "buy groceries"}
API->>DB: INSERT INTO tasks...
DB-->>API: inserted row
API-->>Handler: {id, name, ...} 201
Handler->>User: "Added task: **buy groceries**"
==== Example 3: API Request with Authentication ====
sequenceDiagram actor Client as External Client participant Flask as api/main.py participant Route as routes/example.py participant Auth as core/auth.py participant PG as core/postgres.py participant DB as PostgreSQLClient->>Flask: POST /api/tasks<br/>Authorization: Bearer <jwt>
Flask->>Route: Route handler
Route->>Route: _get_user_uuid(token)<br/>jwt.decode() → sub claim
Route->>Auth: verifyLoginToken(token, userUUID)
Auth->>Auth: jwt.decode() + check sub == userUUID
Auth-->>Route: True
Route->>PG: insert("tasks", data)
PG->>DB: INSERT INTO tasks... RETURNING *
DB-->>PG: inserted row
PG-->>Route: {id, name, ...}
Route-->>Client: 201 {id, name, ...}
===== Security Considerations =====
==== SQL Injection Prevention ====
All database queries use parameterized queries:
Safe - parameterized
cur.execute("SELECT * FROM users WHERE id = %(id)s", {"id": user_id})
Unsafe - string interpolation (NOT used)
cur.execute(f"SELECT * FROM users WHERE id = '{user_id}'")
The ''_safe_id()'' function validates SQL identifiers.
==== Password Storage ====
Passwords are:
- Hashed with bcrypt (adaptive hash function)
- Salted automatically by bcrypt
- Never stored in plain text
- Never returned in API responses
==== JWT Security ====
- Tokens expire after 1 hour
- Signed with secret key (''JWT_SECRET'')
- Contain minimal data (UUID, name)
- Verified on every request
==== Rate Limiting ====
Not implemented in this template. Consider adding:
- Rate limiting on API endpoints
- Login attempt throttling
- Command cooldown per user
===== Extending the Framework =====
==== Adding a New Domain Module ====
Step 1: Create database table
Edit ''config/schema.sql'':
CREATE TABLE IF NOT EXISTS habits (
id UUID PRIMARY KEY,
user_uuid UUID REFERENCES users(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
streak INT DEFAULT 0,
last_completed DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Step 2: Create API routes
Create ''api/routes/habits.py'':
import os
import flask
import jwt
import core.auth as auth
import core.postgres as postgres
import uuid
def _get_user_uuid(token):
"""Decode JWT to extract user UUID. Returns None on failure."""
try:
payload = jwt.decode(token, os.getenv("JWT_SECRET"), algorithms=["HS256"])
return payload.get("sub")
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
def register(app):
@app.route("/api/habits", methods=["GET"])
def api_listHabits():
header = flask.request.headers.get("Authorization", "")
if not header.startswith("Bearer "):
return flask.jsonify({"error": "missing token"}), 401
token = header[7:]
user_uuid = _get_user_uuid(token)
if not user_uuid or not auth.verifyLoginToken(token, userUUID=user_uuid):
return flask.jsonify({"error": "unauthorized"}), 401
items = postgres.select("habits", where={"user_uuid": user_uuid})
return flask.jsonify(items), 200
@app.route("/api/habits", methods=["POST"])
def api_addHabit():
# ... similar pattern ...
pass
Register in ''api/main.py'':
import api.routes.habits as habits_routes
register_routes(habits_routes)
Step 3: Create bot commands
Create ''bot/commands/habits.py'':
from bot.command_registry import register_module
import ai.parser as ai_parser
from bot.bot import apiRequest
async def handle_habit(message, session, parsed):
action = parsed.get("action")
token = session["token"]
if action == "list":
result, status = apiRequest("get", "/api/habits", token)
if status == 200:
lines = [f"- {h['name']} (streak: {h['streak']})" for h in result]
await message.channel.send("Your habits:\n" + "\n".join(lines))
elif action == "add":
name = parsed.get("habit_name")
result, status = apiRequest("post", "/api/habits", token, {"name": name})
if status == 201:
await message.channel.send(f"Created habit: **{name}**")
def validate_habit_json(data):
errors = []
if "action" not in data:
errors.append("Missing required field: action")
return errors
register_module("habit", handle_habit)
ai_parser.register_validator("habit", validate_habit_json)
Step 4: Add AI prompts
Edit ''ai/ai_config.json'':
{
"prompts": {
"command_parser": { ... },
"habit_parser": {
"system": "You parse habit tracking commands...",
"user_template": "Parse: "{user_input}". Return JSON with action (list/add/complete) and habit_name."
}
}
}
===== Troubleshooting =====
==== Common Issues ====
Bot not responding to DMs
- Verify ''message_content'' intent is enabled in Discord Developer Portal
- Check bot has permission to read messages
API returning 401 Unauthorized
- Verify token is valid and not expired
- Check Authorization header format: ''Bearer ''
AI parser failing
- Verify OpenRouter API key is valid
- Check model is available
- Review prompts in ''ai_config.json''
Database connection errors
- Verify PostgreSQL is running
- Check environment variables match Docker config
==== Debugging Tips ====
Enable verbose logging:
import logging
logging.basicConfig(level=logging.DEBUG)
Test database connection:
from core import postgres
print(postgres.select("users"))
Test AI parser:
import ai.parser as ai_parser
result = ai_parser.parse("add task buy milk", "command_parser")
print(result)
===== Conclusion =====
The LLM Bot Framework provides a solid foundation for building AI-powered Discord bots. Its modular architecture allows developers to:
- Add new domains without modifying core code
- Use any OpenAI-compatible LLM
- Deploy easily with Docker
- Scale with PostgreSQL
Key design decisions:
- Separation of concerns - Bot, API, and core logic are independent
- Configuration-driven - AI behavior is customizable via JSON
- Security-first - Parameterized queries, hashed passwords, JWT auth
- Developer experience - Clear patterns for extending the framework
The framework demonstrates how LLMs can bridge the gap between natural language and structured database operations, enabling conversational interfaces that feel intuitive to users.