First synculous 2 Big-Pickle pass.

This commit is contained in:
2026-02-12 23:07:48 -06:00
parent 25d05e0e86
commit 3e1134575b
26 changed files with 2729 additions and 59 deletions

View File

@@ -2,8 +2,14 @@
## API route registration — `api/main.py`
Lines 10-11: imported `api.routes.routines` and `api.routes.medications`
Line 16: added both to `ROUTE_MODULES` list so they auto-register on startup
Lines 10-17: imported route modules and added to `ROUTE_MODULES`:
- `api.routes.routines`
- `api.routes.medications`
- `api.routes.routine_steps_extended`
- `api.routes.routine_sessions_extended`
- `api.routes.routine_templates`
- `api.routes.routine_stats`
- `api.routes.routine_tags`
## Bot command registration — `bot/bot.py`
@@ -35,10 +41,72 @@ Updated the `command_parser` system prompt to list the two interaction types
(`routine`, `medication`) and the fields to extract for each. This is what
tells the LLM how to parse natural language into the right action structure.
## What's NOT hooked yet (needs implementation)
## Extended Routines API — New Modules
- `config/schema.sql` — needs tables for routines, routine_steps,
routine_sessions, routine_schedules, medications, med_logs
- The actual body of every API route (all prototyped as `pass`)
- The actual body of both bot command handlers
- The three scheduler check functions
### Routine Steps Extended — `api/routes/routine_steps_extended.py`
- `PUT /api/routines/<id>/steps/<step_id>/instructions` — update step instructions
- `PUT /api/routines/<id>/steps/<step_id>/type` — update step type (timer, checklist, etc)
- `PUT /api/routines/<id>/steps/<step_id>/media` — update media URL
### Routine Sessions Extended — `api/routes/routine_sessions_extended.py`
- `POST /api/sessions/<id>/pause` — pause active session
- `POST /api/sessions/<id>/resume` — resume paused session
- `POST /api/sessions/<id>/abort` — abort with reason
- `POST /api/sessions/<id>/note` — add note to session
- `PUT /api/sessions/<id>/duration` — record actual duration
- `GET /api/sessions/<id>` — get session with notes
### Routine Templates — `api/routes/routine_templates.py`
- `GET /api/templates` — list templates
- `POST /api/templates` — create template
- `GET /api/templates/<id>` — get template with steps
- `POST /api/templates/<id>/clone` — clone to user's routines
- `PUT /api/templates/<id>` — update template
- `DELETE /api/templates/<id>` — delete template
- `POST /api/templates/<id>/steps` — add step to template
### Routine Stats — `api/routes/routine_stats.py`
- `GET /api/routines/<id>/stats` — completion stats
- `GET /api/routines/streaks` — all user streaks
- `GET /api/routines/<id>/streak` — specific routine streak
- `GET /api/routines/weekly-summary` — weekly progress
### Routine Tags — `api/routes/routine_tags.py`
- `GET /api/tags` — list tags
- `POST /api/tags` — create tag
- `DELETE /api/tags/<id>` — delete tag
- `POST /api/routines/<id>/tags` — add tags to routine
- `DELETE /api/routines/<id>/tags/<tag_id>` — remove tag
- `GET /api/routines/<id>/tags` — get routine's tags
## Core Modules — Business Logic
### `core/routines.py`
Shared functions for routine operations:
- `start_session()` — create and start a new session
- `pause_session()` — pause an active session
- `resume_session()` — resume a paused session
- `abort_session()` — abort with reason
- `complete_session()` — mark complete and update streak
- `clone_template()` — clone template to user's routines
- `calculate_streak()` — get current streak
### `core/stats.py`
Statistics calculation functions:
- `get_routine_stats()` — completion rate, avg duration, total time
- `get_user_streaks()` — all streaks for user
- `get_weekly_summary()` — weekly progress summary
- `get_monthly_summary()` — monthly progress summary
## Bot Commands — Extended Routines
New actions added to `bot/commands/routines.py`:
- `pause` — pause current session
- `resume` — resume paused session
- `abort` — abort with reason
- `note` — add note to session
- `stats` — show completion statistics
- `streak` — show current streak
- `templates` — list available templates
- `clone` — clone a template
- `tag` — add tag to routine

View File

@@ -11,10 +11,23 @@ import core.users as users
import core.postgres as postgres
import api.routes.routines as routines_routes
import api.routes.medications as medications_routes
import api.routes.routine_steps_extended as routine_steps_extended_routes
import api.routes.routine_sessions_extended as routine_sessions_extended_routes
import api.routes.routine_templates as routine_templates_routes
import api.routes.routine_stats as routine_stats_routes
import api.routes.routine_tags as routine_tags_routes
app = flask.Flask(__name__)
ROUTE_MODULES = [routines_routes, medications_routes]
ROUTE_MODULES = [
routines_routes,
medications_routes,
routine_steps_extended_routes,
routine_sessions_extended_routes,
routine_templates_routes,
routine_stats_routes,
routine_tags_routes,
]
def register_routes(module):

View File

@@ -3,6 +3,7 @@ Medications API - medication scheduling, logging, and adherence tracking
"""
import os
import uuid
import flask
import jwt
import core.auth as auth
@@ -39,7 +40,8 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
meds = postgres.select("medications", where={"user_uuid": user_uuid}, order_by="name")
return flask.jsonify(meds), 200
@app.route("/api/medications", methods=["POST"])
def api_addMedication():
@@ -48,7 +50,18 @@ def register(app):
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
data = flask.request.get_json()
pass
if not data:
return flask.jsonify({"error": "missing body"}), 400
required = ["name", "dosage", "unit", "frequency"]
missing = [f for f in required if not data.get(f)]
if missing:
return flask.jsonify({"error": f"missing required fields: {', '.join(missing)}"}), 400
data["id"] = str(uuid.uuid4())
data["user_uuid"] = user_uuid
data["times"] = data.get("times", [])
data["active"] = True
med = postgres.insert("medications", data)
return flask.jsonify(med), 201
@app.route("/api/medications/<med_id>", methods=["GET"])
def api_getMedication(med_id):
@@ -56,7 +69,10 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
med = postgres.select_one("medications", {"id": med_id, "user_uuid": user_uuid})
if not med:
return flask.jsonify({"error": "not found"}), 404
return flask.jsonify(med), 200
@app.route("/api/medications/<med_id>", methods=["PUT"])
def api_updateMedication(med_id):
@@ -65,7 +81,17 @@ def register(app):
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
data = flask.request.get_json()
pass
if not data:
return flask.jsonify({"error": "missing body"}), 400
existing = postgres.select_one("medications", {"id": med_id, "user_uuid": user_uuid})
if not existing:
return flask.jsonify({"error": "not found"}), 404
allowed = ["name", "dosage", "unit", "frequency", "times", "notes", "active"]
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return flask.jsonify({"error": "no valid fields to update"}), 400
result = postgres.update("medications", updates, {"id": med_id, "user_uuid": user_uuid})
return flask.jsonify(result[0] if result else {}), 200
@app.route("/api/medications/<med_id>", methods=["DELETE"])
def api_deleteMedication(med_id):
@@ -73,7 +99,12 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
existing = postgres.select_one("medications", {"id": med_id, "user_uuid": user_uuid})
if not existing:
return flask.jsonify({"error": "not found"}), 404
postgres.delete("med_logs", {"medication_id": med_id})
postgres.delete("medications", {"id": med_id, "user_uuid": user_uuid})
return flask.jsonify({"deleted": True}), 200
# ── Medication Logging (take / skip / snooze) ─────────────────
@@ -83,8 +114,20 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
med = postgres.select_one("medications", {"id": med_id, "user_uuid": user_uuid})
if not med:
return flask.jsonify({"error": "not found"}), 404
data = flask.request.get_json() or {}
pass
log_entry = {
"id": str(uuid.uuid4()),
"medication_id": med_id,
"user_uuid": user_uuid,
"action": "taken",
"scheduled_time": data.get("scheduled_time"),
"notes": data.get("notes"),
}
log = postgres.insert("med_logs", log_entry)
return flask.jsonify(log), 201
@app.route("/api/medications/<med_id>/skip", methods=["POST"])
def api_skipMedication(med_id):
@@ -92,8 +135,20 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
med = postgres.select_one("medications", {"id": med_id, "user_uuid": user_uuid})
if not med:
return flask.jsonify({"error": "not found"}), 404
data = flask.request.get_json() or {}
pass
log_entry = {
"id": str(uuid.uuid4()),
"medication_id": med_id,
"user_uuid": user_uuid,
"action": "skipped",
"scheduled_time": data.get("scheduled_time"),
"notes": data.get("reason"),
}
log = postgres.insert("med_logs", log_entry)
return flask.jsonify(log), 201
@app.route("/api/medications/<med_id>/snooze", methods=["POST"])
def api_snoozeMedication(med_id):
@@ -101,8 +156,12 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
data = flask.request.get_json()
pass
med = postgres.select_one("medications", {"id": med_id, "user_uuid": user_uuid})
if not med:
return flask.jsonify({"error": "not found"}), 404
data = flask.request.get_json() or {}
minutes = data.get("minutes", 15)
return flask.jsonify({"snoozed_until_minutes": minutes}), 200
# ── Medication Log / History ──────────────────────────────────
@@ -112,7 +171,17 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
med = postgres.select_one("medications", {"id": med_id, "user_uuid": user_uuid})
if not med:
return flask.jsonify({"error": "not found"}), 404
days = flask.request.args.get("days", 30, type=int)
logs = postgres.select(
"med_logs",
where={"medication_id": med_id},
order_by="created_at DESC",
limit=days * 10,
)
return flask.jsonify(logs), 200
@app.route("/api/medications/today", methods=["GET"])
def api_todaysMeds():
@@ -120,7 +189,26 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
meds = postgres.select("medications", where={"user_uuid": user_uuid, "active": True})
from datetime import datetime
today = datetime.now().strftime("%Y-%m-%d")
result = []
for med in meds:
times = med.get("times", [])
taken_times = [
log["scheduled_time"]
for log in postgres.select(
"med_logs",
where={"medication_id": med["id"], "action": "taken"},
)
if log.get("scheduled_time", "").startswith(today)
]
result.append({
"medication": med,
"scheduled_times": times,
"taken_times": taken_times,
})
return flask.jsonify(result), 200
# ── Adherence Stats ───────────────────────────────────────────
@@ -130,7 +218,27 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
days = flask.request.args.get("days", 30, type=int)
meds = postgres.select("medications", where={"user_uuid": user_uuid, "active": True})
result = []
for med in meds:
logs = postgres.select(
"med_logs",
where={"medication_id": med["id"]},
limit=days * 10,
)
taken = sum(1 for log in logs if log.get("action") == "taken")
skipped = sum(1 for log in logs if log.get("action") == "skipped")
total = taken + skipped
adherence = (taken / total * 100) if total > 0 else 0
result.append({
"medication_id": med["id"],
"name": med["name"],
"taken": taken,
"skipped": skipped,
"adherence_percent": round(adherence, 1),
})
return flask.jsonify(result), 200
@app.route("/api/medications/<med_id>/adherence", methods=["GET"])
def api_medAdherence(med_id):
@@ -138,7 +246,26 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
med = postgres.select_one("medications", {"id": med_id, "user_uuid": user_uuid})
if not med:
return flask.jsonify({"error": "not found"}), 404
days = flask.request.args.get("days", 30, type=int)
logs = postgres.select(
"med_logs",
where={"medication_id": med_id},
limit=days * 10,
)
taken = sum(1 for log in logs if log.get("action") == "taken")
skipped = sum(1 for log in logs if log.get("action") == "skipped")
total = taken + skipped
adherence = (taken / total * 100) if total > 0 else 0
return flask.jsonify({
"medication_id": med_id,
"name": med["name"],
"taken": taken,
"skipped": skipped,
"adherence_percent": round(adherence, 1),
}), 200
# ── Refills ───────────────────────────────────────────────────
@@ -148,8 +275,18 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
med = postgres.select_one("medications", {"id": med_id, "user_uuid": user_uuid})
if not med:
return flask.jsonify({"error": "not found"}), 404
data = flask.request.get_json()
pass
if not data:
return flask.jsonify({"error": "missing body"}), 400
allowed = ["quantity_remaining", "refill_date", "pharmacy_notes"]
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return flask.jsonify({"error": "no valid fields to update"}), 400
result = postgres.update("medications", updates, {"id": med_id, "user_uuid": user_uuid})
return flask.jsonify(result[0] if result else {}), 200
@app.route("/api/medications/refills-due", methods=["GET"])
def api_refillsDue():
@@ -157,4 +294,19 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
days_ahead = flask.request.args.get("days_ahead", 7, type=int)
from datetime import datetime, timedelta
cutoff = (datetime.now() + timedelta(days=days_ahead)).strftime("%Y-%m-%d")
meds = postgres.select(
"medications",
where={"user_uuid": user_uuid},
)
due = []
for med in meds:
qty = med.get("quantity_remaining")
refill_date = med.get("refill_date")
if qty is not None and qty <= 7:
due.append(med)
elif refill_date and refill_date <= cutoff:
due.append(med)
return flask.jsonify(due), 200

View File

@@ -0,0 +1,149 @@
"""
Routine Sessions Extended API - pause, resume, abort, notes, duration tracking
"""
import os
import uuid
from datetime import datetime
import flask
import jwt
import core.auth as auth
import core.postgres as postgres
def _get_user_uuid(token):
try:
payload = jwt.decode(token, os.getenv("JWT_SECRET"), algorithms=["HS256"])
return payload.get("sub")
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
def _auth(request):
header = request.headers.get("Authorization", "")
if not header.startswith("Bearer "):
return None
token = header[7:]
user_uuid = _get_user_uuid(token)
if not user_uuid or not auth.verifyLoginToken(token, userUUID=user_uuid):
return None
return user_uuid
def register(app):
@app.route("/api/sessions/<session_id>/pause", methods=["POST"])
def api_pauseSession(session_id):
"""Pause an active session."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
session = postgres.select_one("routine_sessions", {"id": session_id, "user_uuid": user_uuid})
if not session:
return flask.jsonify({"error": "session not found"}), 404
if session.get("status") != "active":
return flask.jsonify({"error": "session not active"}), 400
result = postgres.update(
"routine_sessions",
{"status": "paused", "paused_at": datetime.now().isoformat()},
{"id": session_id}
)
return flask.jsonify({"status": "paused"}), 200
@app.route("/api/sessions/<session_id>/resume", methods=["POST"])
def api_resumeSession(session_id):
"""Resume a paused session."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
session = postgres.select_one("routine_sessions", {"id": session_id, "user_uuid": user_uuid})
if not session:
return flask.jsonify({"error": "session not found"}), 404
if session.get("status") != "paused":
return flask.jsonify({"error": "session not paused"}), 400
result = postgres.update(
"routine_sessions",
{"status": "active", "paused_at": None},
{"id": session_id}
)
return flask.jsonify({"status": "active"}), 200
@app.route("/api/sessions/<session_id>/abort", methods=["POST"])
def api_abortSession(session_id):
"""Abort a session with reason. Body: {reason: string}"""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
session = postgres.select_one("routine_sessions", {"id": session_id, "user_uuid": user_uuid})
if not session:
return flask.jsonify({"error": "session not found"}), 404
data = flask.request.get_json() or {}
reason = data.get("reason", "Aborted by user")
result = postgres.update(
"routine_sessions",
{"status": "aborted", "abort_reason": reason, "completed_at": datetime.now().isoformat()},
{"id": session_id}
)
return flask.jsonify({"status": "aborted", "reason": reason}), 200
@app.route("/api/sessions/<session_id>/note", methods=["POST"])
def api_addSessionNote(session_id):
"""Add a note to the session. Body: {step_index: int, note: string}"""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
session = postgres.select_one("routine_sessions", {"id": session_id, "user_uuid": user_uuid})
if not session:
return flask.jsonify({"error": "session not found"}), 404
data = flask.request.get_json()
if not data or not data.get("note"):
return flask.jsonify({"error": "missing note"}), 400
note_entry = {
"id": str(uuid.uuid4()),
"session_id": session_id,
"step_index": data.get("step_index"),
"note": data["note"],
}
note = postgres.insert("routine_session_notes", note_entry)
return flask.jsonify(note), 201
@app.route("/api/sessions/<session_id>/duration", methods=["PUT"])
def api_setSessionDuration(session_id):
"""Record actual duration. Body: {actual_duration_minutes: int}"""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
session = postgres.select_one("routine_sessions", {"id": session_id, "user_uuid": user_uuid})
if not session:
return flask.jsonify({"error": "session not found"}), 404
data = flask.request.get_json()
if not data:
return flask.jsonify({"error": "missing body"}), 400
duration = data.get("actual_duration_minutes")
if duration is None:
return flask.jsonify({"error": "missing actual_duration_minutes"}), 400
result = postgres.update(
"routine_sessions",
{"actual_duration_minutes": duration},
{"id": session_id}
)
return flask.jsonify(result[0] if result else {}), 200
@app.route("/api/sessions/<session_id>", methods=["GET"])
def api_getSessionDetails(session_id):
"""Get session details with all notes."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
session = postgres.select_one("routine_sessions", {"id": session_id, "user_uuid": user_uuid})
if not session:
return flask.jsonify({"error": "session not found"}), 404
notes = postgres.select("routine_session_notes", {"session_id": session_id}, order_by="created_at")
routine = postgres.select_one("routines", {"id": session["routine_id"]})
steps = postgres.select("routine_steps", {"routine_id": session["routine_id"]}, order_by="position")
return flask.jsonify({
"session": session,
"routine": routine,
"steps": steps,
"notes": notes,
}), 200

154
api/routes/routine_stats.py Normal file
View File

@@ -0,0 +1,154 @@
"""
Routine Stats API - completion statistics and streaks
"""
import os
from datetime import datetime, timedelta
import flask
import jwt
import core.auth as auth
import core.postgres as postgres
def _get_user_uuid(token):
try:
payload = jwt.decode(token, os.getenv("JWT_SECRET"), algorithms=["HS256"])
return payload.get("sub")
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
def _auth(request):
header = request.headers.get("Authorization", "")
if not header.startswith("Bearer "):
return None
token = header[7:]
user_uuid = _get_user_uuid(token)
if not user_uuid or not auth.verifyLoginToken(token, userUUID=user_uuid):
return None
return user_uuid
def register(app):
@app.route("/api/routines/<routine_id>/stats", methods=["GET"])
def api_routineStats(routine_id):
"""Get completion stats for a routine. Query: ?days=30"""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "routine not found"}), 404
days = flask.request.args.get("days", 30, type=int)
sessions = postgres.select(
"routine_sessions",
where={"routine_id": routine_id},
limit=days,
)
completed = sum(1 for s in sessions if s.get("status") == "completed")
aborted = sum(1 for s in sessions if s.get("status") == "aborted")
total_duration = sum(
s.get("actual_duration_minutes", 0) or 0
for s in sessions
if s.get("actual_duration_minutes")
)
avg_duration = total_duration / completed if completed > 0 else 0
completion_rate = (completed / len(sessions) * 100) if sessions else 0
return flask.jsonify({
"routine_id": routine_id,
"routine_name": routine["name"],
"period_days": days,
"total_sessions": len(sessions),
"completed": completed,
"aborted": aborted,
"completion_rate_percent": round(completion_rate, 1),
"avg_duration_minutes": round(avg_duration, 1),
"total_time_minutes": total_duration,
}), 200
@app.route("/api/routines/streaks", methods=["GET"])
def api_userStreaks():
"""Get all streaks for the user."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
streaks = postgres.select("routine_streaks", where={"user_uuid": user_uuid})
routines = postgres.select("routines", where={"user_uuid": user_uuid})
routine_map = {r["id"]: r["name"] for r in routines}
result = []
for streak in streaks:
result.append({
"routine_id": streak["routine_id"],
"routine_name": routine_map.get(streak["routine_id"], "Unknown"),
"current_streak": streak["current_streak"],
"longest_streak": streak["longest_streak"],
"last_completed_date": streak.get("last_completed_date"),
})
return flask.jsonify(result), 200
@app.route("/api/routines/<routine_id>/streak", methods=["GET"])
def api_routineStreak(routine_id):
"""Get streak for a specific routine."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "routine not found"}), 404
streak = postgres.select_one("routine_streaks", {"routine_id": routine_id, "user_uuid": user_uuid})
if not streak:
return flask.jsonify({
"routine_id": routine_id,
"routine_name": routine["name"],
"current_streak": 0,
"longest_streak": 0,
"last_completed_date": None,
}), 200
return flask.jsonify({
"routine_id": routine_id,
"routine_name": routine["name"],
"current_streak": streak["current_streak"],
"longest_streak": streak["longest_streak"],
"last_completed_date": streak.get("last_completed_date"),
}), 200
@app.route("/api/routines/weekly-summary", methods=["GET"])
def api_weeklySummary():
"""Get weekly progress summary."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routines = postgres.select("routines", where={"user_uuid": user_uuid})
routine_ids = [r["id"] for r in routines]
if not routine_ids:
return flask.jsonify({
"total_completed": 0,
"total_time_minutes": 0,
"routines_started": 0,
"routines": [],
}), 200
week_ago = (datetime.now() - timedelta(days=7)).isoformat()
sessions = postgres.select("routine_sessions", where={"user_uuid": user_uuid})
week_sessions = [s for s in sessions if s.get("created_at") and str(s["created_at"]) >= week_ago]
completed = [s for s in week_sessions if s.get("status") == "completed"]
total_time = sum(
s.get("actual_duration_minutes", 0) or 0
for s in completed
if s.get("actual_duration_minutes")
)
routine_summaries = []
for routine in routines:
r_sessions = [s for s in week_sessions if s.get("routine_id") == routine["id"]]
r_completed = sum(1 for s in r_sessions if s.get("status") == "completed")
routine_summaries.append({
"routine_id": routine["id"],
"name": routine["name"],
"completed_this_week": r_completed,
})
return flask.jsonify({
"total_completed": len(completed),
"total_time_minutes": total_time,
"routines_started": len(set(s.get("routine_id") for s in week_sessions)),
"routines": routine_summaries,
}), 200

View File

@@ -0,0 +1,94 @@
"""
Routine Steps Extended API - instructions, step types, and media for steps
"""
import os
import uuid
import flask
import jwt
import core.auth as auth
import core.postgres as postgres
def _get_user_uuid(token):
try:
payload = jwt.decode(token, os.getenv("JWT_SECRET"), algorithms=["HS256"])
return payload.get("sub")
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
def _auth(request):
header = request.headers.get("Authorization", "")
if not header.startswith("Bearer "):
return None
token = header[7:]
user_uuid = _get_user_uuid(token)
if not user_uuid or not auth.verifyLoginToken(token, userUUID=user_uuid):
return None
return user_uuid
def register(app):
@app.route("/api/routines/<routine_id>/steps/<step_id>/instructions", methods=["PUT"])
def api_updateStepInstructions(routine_id, step_id):
"""Update step instructions. Body: {instructions: string}"""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "routine not found"}), 404
step = postgres.select_one("routine_steps", {"id": step_id, "routine_id": routine_id})
if not step:
return flask.jsonify({"error": "step not found"}), 404
data = flask.request.get_json()
if not data:
return flask.jsonify({"error": "missing body"}), 400
instructions = data.get("instructions")
if instructions is None:
return flask.jsonify({"error": "missing instructions"}), 400
result = postgres.update("routine_steps", {"instructions": instructions}, {"id": step_id})
return flask.jsonify(result[0] if result else {}), 200
@app.route("/api/routines/<routine_id>/steps/<step_id>/type", methods=["PUT"])
def api_updateStepType(routine_id, step_id):
"""Update step type. Body: {step_type: 'generic'|'timer'|'checklist'|'meditation'|'exercise'}"""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "routine not found"}), 404
step = postgres.select_one("routine_steps", {"id": step_id, "routine_id": routine_id})
if not step:
return flask.jsonify({"error": "step not found"}), 404
data = flask.request.get_json()
if not data:
return flask.jsonify({"error": "missing body"}), 400
step_type = data.get("step_type")
allowed_types = ["generic", "timer", "checklist", "meditation", "exercise"]
if step_type not in allowed_types:
return flask.jsonify({"error": f"invalid step_type. allowed: {allowed_types}"}), 400
result = postgres.update("routine_steps", {"step_type": step_type}, {"id": step_id})
return flask.jsonify(result[0] if result else {}), 200
@app.route("/api/routines/<routine_id>/steps/<step_id>/media", methods=["PUT"])
def api_updateStepMedia(routine_id, step_id):
"""Update step media URL. Body: {media_url: string}"""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "routine not found"}), 404
step = postgres.select_one("routine_steps", {"id": step_id, "routine_id": routine_id})
if not step:
return flask.jsonify({"error": "step not found"}), 404
data = flask.request.get_json()
if not data:
return flask.jsonify({"error": "missing body"}), 400
media_url = data.get("media_url")
result = postgres.update("routine_steps", {"media_url": media_url}, {"id": step_id})
return flask.jsonify(result[0] if result else {}), 200

125
api/routes/routine_tags.py Normal file
View File

@@ -0,0 +1,125 @@
"""
Routine Tags API - structured categories for routines
"""
import os
import uuid
import flask
import jwt
import core.auth as auth
import core.postgres as postgres
def _get_user_uuid(token):
try:
payload = jwt.decode(token, os.getenv("JWT_SECRET"), algorithms=["HS256"])
return payload.get("sub")
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
def _auth(request):
header = request.headers.get("Authorization", "")
if not header.startswith("Bearer "):
return None
token = header[7:]
user_uuid = _get_user_uuid(token)
if not user_uuid or not auth.verifyLoginToken(token, userUUID=user_uuid):
return None
return user_uuid
def register(app):
@app.route("/api/tags", methods=["GET"])
def api_listTags():
"""List all available tags."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
tags = postgres.select("routine_tags", order_by="name")
return flask.jsonify(tags), 200
@app.route("/api/tags", methods=["POST"])
def api_createTag():
"""Create a new tag. Body: {name, color?}"""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
data = flask.request.get_json()
if not data or not data.get("name"):
return flask.jsonify({"error": "missing required field: name"}), 400
existing = postgres.select_one("routine_tags", {"name": data["name"]})
if existing:
return flask.jsonify({"error": "tag already exists"}), 409
tag = {
"id": str(uuid.uuid4()),
"name": data["name"],
"color": data.get("color", "#888888"),
}
result = postgres.insert("routine_tags", tag)
return flask.jsonify(result), 201
@app.route("/api/tags/<tag_id>", methods=["DELETE"])
def api_deleteTag(tag_id):
"""Delete a tag."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
tag = postgres.select_one("routine_tags", {"id": tag_id})
if not tag:
return flask.jsonify({"error": "tag not found"}), 404
postgres.delete("routine_routine_tags", {"tag_id": tag_id})
postgres.delete("routine_tags", {"id": tag_id})
return flask.jsonify({"deleted": True}), 200
@app.route("/api/routines/<routine_id>/tags", methods=["POST"])
def api_addTagToRoutine(routine_id):
"""Add tags to a routine. Body: {tag_ids: [uuid, ...]}"""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "routine not found"}), 404
data = flask.request.get_json()
if not data or not data.get("tag_ids"):
return flask.jsonify({"error": "missing tag_ids"}), 400
for tag_id in data["tag_ids"]:
existing_link = postgres.select_one("routine_routine_tags", {"routine_id": routine_id, "tag_id": tag_id})
if not existing_link:
postgres.insert("routine_routine_tags", {"routine_id": routine_id, "tag_id": tag_id})
tags = postgres.execute("""
SELECT t.* FROM routine_tags t
JOIN routine_routine_tags rt ON t.id = rt.tag_id
WHERE rt.routine_id = %s
""", {"routine_id": routine_id})
return flask.jsonify(tags), 200
@app.route("/api/routines/<routine_id>/tags/<tag_id>", methods=["DELETE"])
def api_removeTagFromRoutine(routine_id, tag_id):
"""Remove a tag from a routine."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "routine not found"}), 404
postgres.delete("routine_routine_tags", {"routine_id": routine_id, "tag_id": tag_id})
return flask.jsonify({"removed": True}), 200
@app.route("/api/routines/<routine_id>/tags", methods=["GET"])
def api_getRoutineTags(routine_id):
"""Get tags for a routine."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "routine not found"}), 404
tags = postgres.execute("""
SELECT t.* FROM routine_tags t
JOIN routine_routine_tags rt ON t.id = rt.tag_id
WHERE rt.routine_id = %s
""", {"routine_id": routine_id})
return flask.jsonify(tags), 200

View File

@@ -0,0 +1,168 @@
"""
Routine Templates API - pre-built routines that users can clone
"""
import os
import uuid
import flask
import jwt
import core.auth as auth
import core.postgres as postgres
def _get_user_uuid(token):
try:
payload = jwt.decode(token, os.getenv("JWT_SECRET"), algorithms=["HS256"])
return payload.get("sub")
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
def _auth(request):
header = request.headers.get("Authorization", "")
if not header.startswith("Bearer "):
return None
token = header[7:]
user_uuid = _get_user_uuid(token)
if not user_uuid or not auth.verifyLoginToken(token, userUUID=user_uuid):
return None
return user_uuid
def register(app):
@app.route("/api/templates", methods=["GET"])
def api_listTemplates():
"""List all available templates."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
templates = postgres.select("routine_templates", order_by="name")
for template in templates:
steps = postgres.select("routine_template_steps", {"template_id": template["id"]}, order_by="position")
template["step_count"] = len(steps)
return flask.jsonify(templates), 200
@app.route("/api/templates", methods=["POST"])
def api_createTemplate():
"""Create a new template (admin only in production). Body: {name, description?, icon?}"""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
data = flask.request.get_json()
if not data:
return flask.jsonify({"error": "missing body"}), 400
if not data.get("name"):
return flask.jsonify({"error": "missing required field: name"}), 400
data["id"] = str(uuid.uuid4())
data["created_by_admin"] = False
template = postgres.insert("routine_templates", data)
return flask.jsonify(template), 201
@app.route("/api/templates/<template_id>", methods=["GET"])
def api_getTemplate(template_id):
"""Get a template with its steps."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
template = postgres.select_one("routine_templates", {"id": template_id})
if not template:
return flask.jsonify({"error": "template not found"}), 404
steps = postgres.select("routine_template_steps", {"template_id": template_id}, order_by="position")
return flask.jsonify({"template": template, "steps": steps}), 200
@app.route("/api/templates/<template_id>/clone", methods=["POST"])
def api_cloneTemplate(template_id):
"""Clone a template to user's routines."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
template = postgres.select_one("routine_templates", {"id": template_id})
if not template:
return flask.jsonify({"error": "template not found"}), 404
template_steps = postgres.select("routine_template_steps", {"template_id": template_id}, order_by="position")
new_routine = {
"id": str(uuid.uuid4()),
"user_uuid": user_uuid,
"name": template["name"],
"description": template.get("description"),
"icon": template.get("icon"),
}
routine = postgres.insert("routines", new_routine)
for step in template_steps:
new_step = {
"id": str(uuid.uuid4()),
"routine_id": routine["id"],
"name": step["name"],
"instructions": step.get("instructions"),
"step_type": step.get("step_type", "generic"),
"duration_minutes": step.get("duration_minutes"),
"media_url": step.get("media_url"),
"position": step["position"],
}
postgres.insert("routine_steps", new_step)
return flask.jsonify(routine), 201
@app.route("/api/templates/<template_id>", methods=["PUT"])
def api_updateTemplate(template_id):
"""Update a template."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
template = postgres.select_one("routine_templates", {"id": template_id})
if not template:
return flask.jsonify({"error": "template not found"}), 404
data = flask.request.get_json()
if not data:
return flask.jsonify({"error": "missing body"}), 400
allowed = ["name", "description", "icon"]
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return flask.jsonify({"error": "no valid fields to update"}), 400
result = postgres.update("routine_templates", updates, {"id": template_id})
return flask.jsonify(result[0] if result else {}), 200
@app.route("/api/templates/<template_id>", methods=["DELETE"])
def api_deleteTemplate(template_id):
"""Delete a template."""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
template = postgres.select_one("routine_templates", {"id": template_id})
if not template:
return flask.jsonify({"error": "template not found"}), 404
postgres.delete("routine_template_steps", {"template_id": template_id})
postgres.delete("routine_templates", {"id": template_id})
return flask.jsonify({"deleted": True}), 200
@app.route("/api/templates/<template_id>/steps", methods=["POST"])
def api_addTemplateStep(template_id):
"""Add a step to a template. Body: {name, instructions?, step_type?, duration_minutes?, position?}"""
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
template = postgres.select_one("routine_templates", {"id": template_id})
if not template:
return flask.jsonify({"error": "template not found"}), 404
data = flask.request.get_json()
if not data or not data.get("name"):
return flask.jsonify({"error": "missing required field: name"}), 400
max_pos = postgres.select(
"routine_template_steps",
{"template_id": template_id},
order_by="position DESC",
limit=1,
)
next_pos = (max_pos[0]["position"] + 1) if max_pos else 1
step = {
"id": str(uuid.uuid4()),
"template_id": template_id,
"name": data["name"],
"instructions": data.get("instructions"),
"step_type": data.get("step_type", "generic"),
"duration_minutes": data.get("duration_minutes"),
"media_url": data.get("media_url"),
"position": data.get("position", next_pos),
}
result = postgres.insert("routine_template_steps", step)
return flask.jsonify(result), 201

View File

@@ -5,6 +5,7 @@ Routines have ordered steps. Users start sessions to walk through them.
"""
import os
import uuid
import flask
import jwt
import core.auth as auth
@@ -41,7 +42,8 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
routines = postgres.select("routines", where={"user_uuid": user_uuid}, order_by="name")
return flask.jsonify(routines), 200
@app.route("/api/routines", methods=["POST"])
def api_createRoutine():
@@ -50,7 +52,14 @@ def register(app):
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
data = flask.request.get_json()
pass
if not data:
return flask.jsonify({"error": "missing body"}), 400
if not data.get("name"):
return flask.jsonify({"error": "missing required field: name"}), 400
data["id"] = str(uuid.uuid4())
data["user_uuid"] = user_uuid
routine = postgres.insert("routines", data)
return flask.jsonify(routine), 201
@app.route("/api/routines/<routine_id>", methods=["GET"])
def api_getRoutine(routine_id):
@@ -58,7 +67,15 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "not found"}), 404
steps = postgres.select(
"routine_steps",
where={"routine_id": routine_id},
order_by="position",
)
return flask.jsonify({"routine": routine, "steps": steps}), 200
@app.route("/api/routines/<routine_id>", methods=["PUT"])
def api_updateRoutine(routine_id):
@@ -67,7 +84,17 @@ def register(app):
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
data = flask.request.get_json()
pass
if not data:
return flask.jsonify({"error": "missing body"}), 400
existing = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not existing:
return flask.jsonify({"error": "not found"}), 404
allowed = ["name", "description", "icon"]
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return flask.jsonify({"error": "no valid fields to update"}), 400
result = postgres.update("routines", updates, {"id": routine_id, "user_uuid": user_uuid})
return flask.jsonify(result[0] if result else {}), 200
@app.route("/api/routines/<routine_id>", methods=["DELETE"])
def api_deleteRoutine(routine_id):
@@ -75,7 +102,14 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
existing = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not existing:
return flask.jsonify({"error": "not found"}), 404
postgres.delete("routine_sessions", {"routine_id": routine_id})
postgres.delete("routine_steps", {"routine_id": routine_id})
postgres.delete("routine_schedules", {"routine_id": routine_id})
postgres.delete("routines", {"id": routine_id, "user_uuid": user_uuid})
return flask.jsonify({"deleted": True}), 200
# ── Steps CRUD ────────────────────────────────────────────────
@@ -85,7 +119,15 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "not found"}), 404
steps = postgres.select(
"routine_steps",
where={"routine_id": routine_id},
order_by="position",
)
return flask.jsonify(steps), 200
@app.route("/api/routines/<routine_id>/steps", methods=["POST"])
def api_addStep(routine_id):
@@ -93,8 +135,30 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "not found"}), 404
data = flask.request.get_json()
pass
if not data:
return flask.jsonify({"error": "missing body"}), 400
if not data.get("name"):
return flask.jsonify({"error": "missing required field: name"}), 400
max_pos = postgres.select(
"routine_steps",
where={"routine_id": routine_id},
order_by="position DESC",
limit=1,
)
next_pos = (max_pos[0]["position"] + 1) if max_pos else 1
step = {
"id": str(uuid.uuid4()),
"routine_id": routine_id,
"name": data["name"],
"duration_minutes": data.get("duration_minutes"),
"position": data.get("position", next_pos),
}
result = postgres.insert("routine_steps", step)
return flask.jsonify(result), 201
@app.route("/api/routines/<routine_id>/steps/<step_id>", methods=["PUT"])
def api_updateStep(routine_id, step_id):
@@ -102,8 +166,21 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "not found"}), 404
data = flask.request.get_json()
pass
if not data:
return flask.jsonify({"error": "missing body"}), 400
existing = postgres.select_one("routine_steps", {"id": step_id, "routine_id": routine_id})
if not existing:
return flask.jsonify({"error": "step not found"}), 404
allowed = ["name", "duration_minutes", "position"]
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
return flask.jsonify({"error": "no valid fields to update"}), 400
result = postgres.update("routine_steps", updates, {"id": step_id, "routine_id": routine_id})
return flask.jsonify(result[0] if result else {}), 200
@app.route("/api/routines/<routine_id>/steps/<step_id>", methods=["DELETE"])
def api_deleteStep(routine_id, step_id):
@@ -111,7 +188,14 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "not found"}), 404
existing = postgres.select_one("routine_steps", {"id": step_id, "routine_id": routine_id})
if not existing:
return flask.jsonify({"error": "step not found"}), 404
postgres.delete("routine_steps", {"id": step_id, "routine_id": routine_id})
return flask.jsonify({"deleted": True}), 200
@app.route("/api/routines/<routine_id>/steps/reorder", methods=["PUT"])
def api_reorderSteps(routine_id):
@@ -119,8 +203,21 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "not found"}), 404
data = flask.request.get_json()
pass
if not data or not data.get("step_ids"):
return flask.jsonify({"error": "missing step_ids"}), 400
step_ids = data["step_ids"]
for i, step_id in enumerate(step_ids):
postgres.update("routine_steps", {"position": i + 1}, {"id": step_id, "routine_id": routine_id})
steps = postgres.select(
"routine_steps",
where={"routine_id": routine_id},
order_by="position",
)
return flask.jsonify(steps), 200
# ── Routine Sessions (active run-through) ─────────────────────
@@ -130,7 +227,28 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "not found"}), 404
active = postgres.select_one("routine_sessions", {"user_uuid": user_uuid, "status": "active"})
if active:
return flask.jsonify({"error": "already have active session", "session_id": active["id"]}), 409
steps = postgres.select(
"routine_steps",
where={"routine_id": routine_id},
order_by="position",
)
if not steps:
return flask.jsonify({"error": "no steps in routine"}), 400
session = {
"id": str(uuid.uuid4()),
"routine_id": routine_id,
"user_uuid": user_uuid,
"status": "active",
"current_step_index": 0,
}
result = postgres.insert("routine_sessions", session)
return flask.jsonify({"session": result, "current_step": steps[0]}), 201
@app.route("/api/sessions/active", methods=["GET"])
def api_getActiveSession():
@@ -138,7 +256,17 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
session = postgres.select_one("routine_sessions", {"user_uuid": user_uuid, "status": "active"})
if not session:
return flask.jsonify({"error": "no active session"}), 404
routine = postgres.select_one("routines", {"id": session["routine_id"]})
steps = postgres.select(
"routine_steps",
where={"routine_id": session["routine_id"]},
order_by="position",
)
current_step = steps[session["current_step_index"]] if session["current_step_index"] < len(steps) else None
return flask.jsonify({"session": session, "routine": routine, "current_step": current_step}), 200
@app.route("/api/sessions/<session_id>/complete-step", methods=["POST"])
def api_completeStep(session_id):
@@ -146,8 +274,23 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
data = flask.request.get_json()
pass
session = postgres.select_one("routine_sessions", {"id": session_id, "user_uuid": user_uuid})
if not session:
return flask.jsonify({"error": "not found"}), 404
if session["status"] != "active":
return flask.jsonify({"error": "session not active"}), 400
data = flask.request.get_json() or {}
steps = postgres.select(
"routine_steps",
where={"routine_id": session["routine_id"]},
order_by="position",
)
next_index = session["current_step_index"] + 1
if next_index >= len(steps):
postgres.update("routine_sessions", {"status": "completed"}, {"id": session_id})
return flask.jsonify({"session": {"status": "completed"}, "next_step": None}), 200
postgres.update("routine_sessions", {"current_step_index": next_index}, {"id": session_id})
return flask.jsonify({"session": {"current_step_index": next_index}, "next_step": steps[next_index]}), 200
@app.route("/api/sessions/<session_id>/skip-step", methods=["POST"])
def api_skipStep(session_id):
@@ -155,8 +298,22 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
data = flask.request.get_json()
pass
session = postgres.select_one("routine_sessions", {"id": session_id, "user_uuid": user_uuid})
if not session:
return flask.jsonify({"error": "not found"}), 404
if session["status"] != "active":
return flask.jsonify({"error": "session not active"}), 400
steps = postgres.select(
"routine_steps",
where={"routine_id": session["routine_id"]},
order_by="position",
)
next_index = session["current_step_index"] + 1
if next_index >= len(steps):
postgres.update("routine_sessions", {"status": "completed"}, {"id": session_id})
return flask.jsonify({"session": {"status": "completed"}, "next_step": None}), 200
postgres.update("routine_sessions", {"current_step_index": next_index}, {"id": session_id})
return flask.jsonify({"session": {"current_step_index": next_index}, "next_step": steps[next_index]}), 200
@app.route("/api/sessions/<session_id>/cancel", methods=["POST"])
def api_cancelSession(session_id):
@@ -164,7 +321,11 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
session = postgres.select_one("routine_sessions", {"id": session_id, "user_uuid": user_uuid})
if not session:
return flask.jsonify({"error": "not found"}), 404
postgres.update("routine_sessions", {"status": "cancelled"}, {"id": session_id})
return flask.jsonify({"session": {"status": "cancelled"}}), 200
# ── Routine History / Stats ───────────────────────────────────
@@ -174,7 +335,17 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "not found"}), 404
days = flask.request.args.get("days", 7, type=int)
sessions = postgres.select(
"routine_sessions",
where={"routine_id": routine_id},
order_by="created_at DESC",
limit=days,
)
return flask.jsonify(sessions), 200
# ── Routine Scheduling ────────────────────────────────────────
@@ -184,8 +355,26 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "not found"}), 404
data = flask.request.get_json()
pass
if not data:
return flask.jsonify({"error": "missing body"}), 400
existing = postgres.select_one("routine_schedules", {"routine_id": routine_id})
schedule_data = {
"routine_id": routine_id,
"days": data.get("days", []),
"time": data.get("time"),
"remind": data.get("remind", True),
}
if existing:
result = postgres.update("routine_schedules", schedule_data, {"routine_id": routine_id})
return flask.jsonify(result[0] if result else {}), 200
else:
schedule_data["id"] = str(uuid.uuid4())
result = postgres.insert("routine_schedules", schedule_data)
return flask.jsonify(result), 201
@app.route("/api/routines/<routine_id>/schedule", methods=["GET"])
def api_getRoutineSchedule(routine_id):
@@ -193,7 +382,13 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "not found"}), 404
schedule = postgres.select_one("routine_schedules", {"routine_id": routine_id})
if not schedule:
return flask.jsonify({"error": "no schedule set"}), 404
return flask.jsonify(schedule), 200
@app.route("/api/routines/<routine_id>/schedule", methods=["DELETE"])
def api_deleteRoutineSchedule(routine_id):
@@ -201,4 +396,8 @@ def register(app):
user_uuid = _auth(flask.request)
if not user_uuid:
return flask.jsonify({"error": "unauthorized"}), 401
pass
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return flask.jsonify({"error": "not found"}), 404
postgres.delete("routine_schedules", {"routine_id": routine_id})
return flask.jsonify({"deleted": True}), 200

View File

@@ -2,6 +2,7 @@
Medications command handler - bot-side hooks for medication management
"""
import asyncio
from bot.command_registry import register_module
import ai.parser as ai_parser
@@ -11,8 +12,92 @@ async def handle_medication(message, session, parsed):
token = session["token"]
user_uuid = session["user_uuid"]
# TODO: wire up API calls per action
pass
if action == "list":
resp, status = api_request("get", "/api/medications", token)
if status == 200:
meds = resp if isinstance(resp, list) else []
if not meds:
await message.channel.send("You don't have any medications yet.")
else:
lines = [f"- **{m['name']}**: {m['dosage']} {m['unit']} ({m.get('frequency', 'n/a')})" for m in meds]
await message.channel.send("**Your medications:**\n" + "\n".join(lines))
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to fetch medications')}")
elif action == "add":
name = parsed.get("name")
dosage = parsed.get("dosage")
unit = parsed.get("unit", "mg")
frequency = parsed.get("frequency", "daily")
times = parsed.get("times", ["08:00"])
if not name or not dosage:
await message.channel.send("Please provide medication name and dosage.")
return
data = {"name": name, "dosage": dosage, "unit": unit, "frequency": frequency, "times": times}
resp, status = api_request("post", "/api/medications", token, data)
if status == 201:
await message.channel.send(f"Added **{name}** to your medications!")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to add medication')}")
elif action == "take":
med_id = parsed.get("medication_id")
if not med_id:
await message.channel.send("Which medication did you take?")
return
resp, status = api_request("post", f"/api/medications/{med_id}/take", token, {})
if status == 201:
await message.channel.send("Logged it! Great job staying on track.")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to log')}")
elif action == "skip":
med_id = parsed.get("medication_id")
reason = parsed.get("reason", "Skipped by user")
if not med_id:
await message.channel.send("Which medication are you skipping?")
return
resp, status = api_request("post", f"/api/medications/{med_id}/skip", token, {"reason": reason})
if status == 201:
await message.channel.send("OK, noted.")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to log')}")
elif action == "adherence":
med_id = parsed.get("medication_id")
if med_id:
resp, status = api_request("get", f"/api/medications/{med_id}/adherence", token)
else:
resp, status = api_request("get", "/api/medications/adherence", token)
if status == 200:
if isinstance(resp, list):
lines = [f"- {m['name']}: {m['adherence_percent']}% adherence" for m in resp]
await message.channel.send("**Adherence:**\n" + "\n".join(lines))
else:
await message.channel.send(f"**{resp.get('name')}**: {resp.get('adherence_percent')}% adherence")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to fetch adherence')}")
else:
await message.channel.send(f"Unknown action: {action}. Try: list, add, take, skip, or adherence.")
def api_request(method, endpoint, token, data=None):
import requests
import os
API_URL = os.getenv("API_URL", "http://app:5000")
url = f"{API_URL}{endpoint}"
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
try:
resp = getattr(requests, method)(url, headers=headers, json=data, timeout=10)
try:
return resp.json(), resp.status_code
except ValueError:
return {}, resp.status_code
except requests.RequestException:
return {"error": "API unavailable"}, 503
def validate_medication_json(data):

View File

@@ -11,8 +11,266 @@ async def handle_routine(message, session, parsed):
token = session["token"]
user_uuid = session["user_uuid"]
# TODO: wire up API calls per action
pass
if action == "list":
resp, status = api_request("get", "/api/routines", token)
if status == 200:
routines = resp if isinstance(resp, list) else []
if not routines:
await message.channel.send("You don't have any routines yet.")
else:
lines = [f"- **{r['name']}**: {r.get('description', 'No description')}" for r in routines]
await message.channel.send("**Your routines:**\n" + "\n".join(lines))
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to fetch routines')}")
elif action == "create":
name = parsed.get("name")
description = parsed.get("description")
if not name:
await message.channel.send("Please provide a routine name.")
return
data = {"name": name, "description": description or ""}
resp, status = api_request("post", "/api/routines", token, data)
if status == 201:
await message.channel.send(f"Created routine **{name}**!")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to create routine')}")
elif action == "start":
routine_id = parsed.get("routine_id")
if not routine_id:
await message.channel.send("Which routine would you like to start?")
return
resp, status = api_request("post", f"/api/routines/{routine_id}/start", token)
if status == 201:
step = resp.get("current_step", {})
await message.channel.send(f"Started! First step: **{step.get('name', 'Unknown')}**")
elif status == 409:
await message.channel.send(f"You already have an active session: {resp.get('error', '')}")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to start routine')}")
elif action == "complete":
session_resp, _ = api_request("get", "/api/sessions/active", token)
if "session" not in session_resp:
await message.channel.send("No active routine session.")
return
session_id = session_resp["session"]["id"]
resp, status = api_request("post", f"/api/sessions/{session_id}/complete-step", token)
if status == 200:
if resp.get("next_step"):
await message.channel.send(f"Done! Next: **{resp['next_step'].get('name', 'Unknown')}**")
else:
await message.channel.send("Completed all steps! Great job!")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to complete step')}")
elif action == "skip":
session_resp, _ = api_request("get", "/api/sessions/active", token)
if "session" not in session_resp:
await message.channel.send("No active routine session.")
return
session_id = session_resp["session"]["id"]
resp, status = api_request("post", f"/api/sessions/{session_id}/skip-step", token)
if status == 200:
if resp.get("next_step"):
await message.channel.send(f"Skipped! Next: **{resp['next_step'].get('name', 'Unknown')}**")
else:
await message.channel.send("All steps skipped! Routine ended.")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to skip step')}")
elif action == "cancel":
session_resp, _ = api_request("get", "/api/sessions/active", token)
if "session" not in session_resp:
await message.channel.send("No active routine session to cancel.")
return
session_id = session_resp["session"]["id"]
resp, status = api_request("post", f"/api/sessions/{session_id}/cancel", token)
if status == 200:
await message.channel.send("Routine cancelled.")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to cancel')}")
elif action == "history":
routine_id = parsed.get("routine_id")
if not routine_id:
await message.channel.send("Which routine's history?")
return
resp, status = api_request("get", f"/api/routines/{routine_id}/history", token)
if status == 200:
sessions = resp if isinstance(resp, list) else []
if not sessions:
await message.channel.send("No history yet.")
else:
lines = [f"- {s.get('status', 'unknown')} on {s.get('created_at', '')}" for s in sessions[:5]]
await message.channel.send("**Recent sessions:**\n" + "\n".join(lines))
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to fetch history')}")
elif action == "pause":
session_resp, _ = api_request("get", "/api/sessions/active", token)
if "session" not in session_resp:
await message.channel.send("No active routine session to pause.")
return
session_id = session_resp["session"]["id"]
resp, status = api_request("post", f"/api/sessions/{session_id}/pause", token)
if status == 200:
await message.channel.send("Routine paused. Say 'resume' when ready to continue.")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to pause')}")
elif action == "resume":
session_resp, _ = api_request("get", "/api/sessions/active", token)
if "session" not in session_resp:
await message.channel.send("You don't have a paused session.")
return
resp, status = api_request("post", f"/api/sessions/{session_resp['session']['id']}/resume", token)
if status == 200:
await message.channel.send("Resumed! Let's keep going.")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to resume')}")
elif action == "abort":
session_resp, _ = api_request("get", "/api/sessions/active", token)
if "session" not in session_resp:
await message.channel.send("No active routine session to abort.")
return
session_id = session_resp["session"]["id"]
reason = parsed.get("reason", "Aborted by user")
resp, status = api_request("post", f"/api/sessions/{session_id}/abort", token, {"reason": reason})
if status == 200:
await message.channel.send("Routine aborted. No worries, you can try again later!")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to abort')}")
elif action == "note":
session_resp, _ = api_request("get", "/api/sessions/active", token)
if "session" not in session_resp:
await message.channel.send("No active routine session.")
return
session_id = session_resp["session"]["id"]
note = parsed.get("note")
if not note:
await message.channel.send("What note would you like to add?")
return
step_index = session_resp.get("session", {}).get("current_step_index", 0)
resp, status = api_request("post", f"/api/sessions/{session_id}/note", token, {"step_index": step_index, "note": note})
if status == 201:
await message.channel.send(f"Note saved: {note}")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to save note')}")
elif action == "stats":
routine_id = parsed.get("routine_id")
if routine_id:
resp, status = api_request("get", f"/api/routines/{routine_id}/stats", token)
else:
await message.channel.send("Which routine's stats? (Please specify routine)")
return
if status == 200:
await message.channel.send(
f"**{resp.get('routine_name')} Stats:**\n"
f"- Completion rate: {resp.get('completion_rate_percent')}%\n"
f"- Completed: {resp.get('completed')}/{resp.get('total_sessions')}\n"
f"- Avg duration: {resp.get('avg_duration_minutes')} min"
)
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to fetch stats')}")
elif action == "streak":
routine_id = parsed.get("routine_id")
if routine_id:
resp, status = api_request("get", f"/api/routines/{routine_id}/streak", token)
else:
resp, status = api_request("get", "/api/routines/streaks", token)
if status == 200:
streaks = resp if isinstance(resp, list) else []
if not streaks:
await message.channel.send("No streaks yet. Complete routines to build streaks!")
else:
lines = [f"- {s.get('routine_name')}: {s.get('current_streak')} day streak" for s in streaks]
await message.channel.send("**Your streaks:**\n" + "\n".join(lines))
return
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to fetch streaks')}")
return
if status == 200:
await message.channel.send(
f"**{resp.get('routine_name')}**\n"
f"Current streak: {resp.get('current_streak')} days\n"
f"Longest streak: {resp.get('longest_streak')} days"
)
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to fetch streak')}")
elif action == "templates":
resp, status = api_request("get", "/api/templates", token)
if status == 200:
templates = resp if isinstance(resp, list) else []
if not templates:
await message.channel.send("No templates available yet.")
else:
lines = [f"- **{t['name']}**: {t.get('description', 'No description')}" for t in templates[:10]]
await message.channel.send("**Available templates:**\n" + "\n".join(lines))
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to fetch templates')}")
elif action == "clone":
template_id = parsed.get("template_id")
if not template_id:
await message.channel.send("Which template would you like to clone?")
return
resp, status = api_request("post", f"/api/templates/{template_id}/clone", token)
if status == 201:
await message.channel.send(f"Cloned! Created routine: **{resp.get('name')}**")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to clone template')}")
elif action == "tag":
routine_id = parsed.get("routine_id")
tag_name = parsed.get("tag")
if not routine_id or not tag_name:
await message.channel.send("Please specify routine and tag.")
return
tags_resp, _ = api_request("get", "/api/tags", token)
existing_tag = next((t for t in tags_resp if t.get("name", "").lower() == tag_name.lower()), None)
if existing_tag:
tag_id = existing_tag["id"]
else:
tag_resp, tag_status = api_request("post", "/api/tags", token, {"name": tag_name})
if tag_status == 201:
tag_id = tag_resp["id"]
else:
await message.channel.send(f"Error creating tag: {tag_resp.get('error')}")
return
resp, status = api_request("post", f"/api/routines/{routine_id}/tags", token, {"tag_ids": [tag_id]})
if status == 200:
await message.channel.send(f"Added tag **{tag_name}** to routine!")
else:
await message.channel.send(f"Error: {resp.get('error', 'Failed to add tag')}")
else:
await message.channel.send(f"Unknown action: {action}. Try: list, create, start, complete, skip, cancel, history, pause, resume, abort, note, stats, streak, templates, clone, or tag.")
def api_request(method, endpoint, token, data=None):
import requests
import os
API_URL = os.getenv("API_URL", "http://app:5000")
url = f"{API_URL}{endpoint}"
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
try:
resp = getattr(requests, method)(url, headers=headers, json=data, timeout=10)
try:
return resp.json(), resp.status_code
except ValueError:
return {}, resp.status_code
except requests.RequestException:
return {"error": "API unavailable"}, 503
def validate_routine_json(data):

11
config/.env Normal file
View File

@@ -0,0 +1,11 @@
DISCORD_BOT_TOKEN=MTQ2NzYwMTc2ODM0NjE2MTE3Mw.G7BKQ-.kivCRj7mOl6aS5VyX4RW9hirqzm7qJ8nJOVMpE
API_URL=http://app:5000
DB_HOST=db
DB_PORT=5432
DB_NAME=app
DB_USER=app
DB_PASS=y8Khu7pJQZq6ywFDIJiqpx4zYmclHGHw
JWT_SECRET=bf773b4562221bef4d304ae5752a68931382ea3e98fe38394a098f73e0c776e1
OPENROUTER_API_KEY=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
OPENROUTER_BASE_URL=https://openrouter.ai/api/v1
AI_CONFIG_PATH=/app/ai/ai_config.json

20
config/.env.example Normal file
View File

@@ -0,0 +1,20 @@
# Discord Bot
DISCORD_BOT_TOKEN=your_discord_bot_token_here
# API
API_URL=http://app:5000
# Database
DB_HOST=db
DB_PORT=5432
DB_NAME=app
DB_USER=app
DB_PASS=your_db_password_here
# JWT
JWT_SECRET=your_jwt_secret_here
# AI / OpenRouter
OPENROUTER_API_KEY=your_openrouter_api_key_here
OPENROUTER_BASE_URL=https://openrouter.ai/api/v1
AI_CONFIG_PATH=/app/ai/ai_config.json

30
config/schema.sql Normal file
View File

@@ -0,0 +1,30 @@
-- Users table (minimal)
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY,
username VARCHAR(255) UNIQUE NOT NULL,
password_hashed BYTEA NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Notifications table
CREATE TABLE IF NOT EXISTS notifications (
id UUID PRIMARY KEY,
user_uuid UUID REFERENCES users(id) ON DELETE CASCADE UNIQUE,
discord_webhook VARCHAR(500),
discord_enabled BOOLEAN DEFAULT FALSE,
ntfy_topic VARCHAR(255),
ntfy_enabled BOOLEAN DEFAULT FALSE,
last_message_sent TIMESTAMP,
current_notification_status VARCHAR(50) DEFAULT 'inactive',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Add your domain tables below
-- Example:
-- CREATE TABLE IF NOT EXISTS examples (
-- id UUID PRIMARY KEY,
-- user_uuid UUID REFERENCES users(id) ON DELETE CASCADE,
-- name VARCHAR(255) NOT NULL,
-- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
-- );

224
core/routines.py Normal file
View File

@@ -0,0 +1,224 @@
"""
core/routines.py - Shared business logic for routines
This module contains reusable functions for routine operations
that can be called from API routes, bot commands, or scheduler.
"""
import uuid
from datetime import datetime, date
import core.postgres as postgres
def start_session(routine_id, user_uuid):
"""
Create and start a new routine session.
Returns the session object or None if routine not found.
"""
routine = postgres.select_one("routines", {"id": routine_id, "user_uuid": user_uuid})
if not routine:
return None
active_session = postgres.select_one(
"routine_sessions",
{"user_uuid": user_uuid, "status": "active"}
)
if active_session:
return {"error": "already_active", "session_id": active_session["id"]}
steps = postgres.select(
"routine_steps",
{"routine_id": routine_id},
order_by="position"
)
if not steps:
return {"error": "no_steps"}
session = {
"id": str(uuid.uuid4()),
"routine_id": routine_id,
"user_uuid": user_uuid,
"status": "active",
"current_step_index": 0,
}
result = postgres.insert("routine_sessions", session)
return {"session": result, "current_step": steps[0]}
def pause_session(session_id, user_uuid):
"""Pause an active session."""
session = postgres.select_one(
"routine_sessions",
{"id": session_id, "user_uuid": user_uuid}
)
if not session:
return None
if session.get("status") != "active":
return {"error": "not_active"}
result = postgres.update(
"routine_sessions",
{"status": "paused", "paused_at": datetime.now().isoformat()},
{"id": session_id}
)
return result
def resume_session(session_id, user_uuid):
"""Resume a paused session."""
session = postgres.select_one(
"routine_sessions",
{"id": session_id, "user_uuid": user_uuid}
)
if not session:
return None
if session.get("status") != "paused":
return {"error": "not_paused"}
result = postgres.update(
"routine_sessions",
{"status": "active", "paused_at": None},
{"id": session_id}
)
return result
def abort_session(session_id, user_uuid, reason=None):
"""Abort a session with optional reason."""
session = postgres.select_one(
"routine_sessions",
{"id": session_id, "user_uuid": user_uuid}
)
if not session:
return None
result = postgres.update(
"routine_sessions",
{
"status": "aborted",
"abort_reason": reason or "Aborted by user",
"completed_at": datetime.now().isoformat()
},
{"id": session_id}
)
return result
def complete_session(session_id, user_uuid):
"""Mark a session as completed and update streak."""
session = postgres.select_one(
"routine_sessions",
{"id": session_id, "user_uuid": user_uuid}
)
if not session:
return None
completed_at = datetime.now()
result = postgres.update(
"routine_sessions",
{"status": "completed", "completed_at": completed_at.isoformat()},
{"id": session_id}
)
_update_streak(user_uuid, session["routine_id"])
return result
def clone_template(template_id, user_uuid):
"""Clone a template to user's routines."""
template = postgres.select_one("routine_templates", {"id": template_id})
if not template:
return None
template_steps = postgres.select(
"routine_template_steps",
{"template_id": template_id},
order_by="position"
)
new_routine = {
"id": str(uuid.uuid4()),
"user_uuid": user_uuid,
"name": template["name"],
"description": template.get("description"),
"icon": template.get("icon"),
}
routine = postgres.insert("routines", new_routine)
for step in template_steps:
new_step = {
"id": str(uuid.uuid4()),
"routine_id": routine["id"],
"name": step["name"],
"instructions": step.get("instructions"),
"step_type": step.get("step_type", "generic"),
"duration_minutes": step.get("duration_minutes"),
"media_url": step.get("media_url"),
"position": step["position"],
}
postgres.insert("routine_steps", new_step)
return routine
def _update_streak(user_uuid, routine_id):
"""Update streak after completing a session. Resets if day was missed."""
today = date.today()
streak = postgres.select_one(
"routine_streaks",
{"user_uuid": user_uuid, "routine_id": routine_id}
)
if not streak:
new_streak = {
"id": str(uuid.uuid4()),
"user_uuid": user_uuid,
"routine_id": routine_id,
"current_streak": 1,
"longest_streak": 1,
"last_completed_date": today.isoformat(),
}
return postgres.insert("routine_streaks", new_streak)
last_completed = streak.get("last_completed_date")
if last_completed:
if isinstance(last_completed, str):
last_completed = date.fromisoformat(last_completed)
days_diff = (today - last_completed).days
if days_diff == 0:
return streak
elif days_diff == 1:
new_streak = streak["current_streak"] + 1
else:
new_streak = 1
else:
new_streak = 1
longest = max(streak["longest_streak"], new_streak)
postgres.update(
"routine_streaks",
{
"current_streak": new_streak,
"longest_streak": longest,
"last_completed_date": today.isoformat(),
},
{"id": streak["id"]}
)
return streak
def calculate_streak(user_uuid, routine_id):
"""Get current streak for a routine."""
streak = postgres.select_one(
"routine_streaks",
{"user_uuid": user_uuid, "routine_id": routine_id}
)
return streak
def get_active_session(user_uuid):
"""Get user's currently active session."""
return postgres.select_one(
"routine_sessions",
{"user_uuid": user_uuid, "status": "active"}
)

160
core/stats.py Normal file
View File

@@ -0,0 +1,160 @@
"""
core/stats.py - Statistics calculations for routines
This module contains functions for calculating routine statistics,
completion rates, streaks, and weekly summaries.
"""
from datetime import datetime, timedelta, date
import core.postgres as postgres
def get_routine_stats(routine_id, user_uuid, days=30):
"""
Get completion statistics for a routine over a period.
Returns dict with completion_rate, avg_duration, total_time, etc.
"""
sessions = postgres.select(
"routine_sessions",
{"routine_id": routine_id, "user_uuid": user_uuid},
limit=days * 3,
)
completed = sum(1 for s in sessions if s.get("status") == "completed")
aborted = sum(1 for s in sessions if s.get("status") == "aborted")
total_duration = sum(
s.get("actual_duration_minutes", 0) or 0
for s in sessions
if s.get("actual_duration_minutes")
)
avg_duration = total_duration / completed if completed > 0 else 0
completion_rate = (completed / len(sessions) * 100) if sessions else 0
return {
"total_sessions": len(sessions),
"completed": completed,
"aborted": aborted,
"completion_rate_percent": round(completion_rate, 1),
"avg_duration_minutes": round(avg_duration, 1),
"total_time_minutes": total_duration,
}
def get_user_streaks(user_uuid):
"""
Get all streaks for a user across all routines.
Returns list of streak objects with routine names.
"""
streaks = postgres.select("routine_streaks", {"user_uuid": user_uuid})
routines = postgres.select("routines", {"user_uuid": user_uuid})
routine_map = {r["id"]: r["name"] for r in routines}
result = []
for streak in streaks:
result.append({
"routine_id": streak["routine_id"],
"routine_name": routine_map.get(streak["routine_id"], "Unknown"),
"current_streak": streak["current_streak"],
"longest_streak": streak["longest_streak"],
"last_completed_date": streak.get("last_completed_date"),
})
return result
def get_weekly_summary(user_uuid):
"""
Get weekly progress summary for a user.
Returns total completed, total time, routines started, per-routine breakdown.
"""
routines = postgres.select("routines", {"user_uuid": user_uuid})
if not routines:
return {
"total_completed": 0,
"total_time_minutes": 0,
"routines_started": 0,
"routines": [],
}
week_ago = datetime.now() - timedelta(days=7)
sessions = postgres.select("routine_sessions", {"user_uuid": user_uuid})
week_sessions = [
s for s in sessions
if s.get("created_at") and s["created_at"] >= week_ago
]
completed = [s for s in week_sessions if s.get("status") == "completed"]
total_time = sum(
s.get("actual_duration_minutes", 0) or 0
for s in completed
if s.get("actual_duration_minutes")
)
routine_summaries = []
for routine in routines:
r_sessions = [s for s in week_sessions if s.get("routine_id") == routine["id"]]
r_completed = sum(1 for s in r_sessions if s.get("status") == "completed")
routine_summaries.append({
"routine_id": routine["id"],
"name": routine["name"],
"completed_this_week": r_completed,
})
return {
"total_completed": len(completed),
"total_time_minutes": total_time,
"routines_started": len(set(s.get("routine_id") for s in week_sessions)),
"routines": routine_summaries,
}
def calculate_completion_rate(sessions, completed_only=True):
"""Calculate completion rate from a list of sessions."""
if not sessions:
return 0.0
if completed_only:
completed = sum(1 for s in sessions if s.get("status") == "completed")
return (completed / len(sessions)) * 100
return 0.0
def get_monthly_summary(user_uuid, year=None, month=None):
"""
Get monthly progress summary.
Defaults to current month if year/month not specified.
"""
if year is None or month is None:
now = datetime.now()
year = now.year
month = now.month
start_date = datetime(year, month, 1)
if month == 12:
end_date = datetime(year + 1, 1, 1)
else:
end_date = datetime(year, month + 1, 1)
sessions = postgres.select("routine_sessions", {"user_uuid": user_uuid})
month_sessions = [
s for s in sessions
if s.get("created_at") and start_date <= s["created_at"] < end_date
]
completed = [s for s in month_sessions if s.get("status") == "completed"]
total_time = sum(
s.get("actual_duration_minutes", 0) or 0
for s in completed
if s.get("actual_duration_minutes")
)
return {
"year": year,
"month": month,
"total_sessions": len(month_sessions),
"completed": len(completed),
"total_time_minutes": total_time,
}

View File

@@ -5,3 +5,5 @@ PyJWT>=2.8.0
discord.py>=2.3.0
openai>=1.0.0
requests>=2.31.0
pytest>=7.0.0
pytest-asyncio>=0.21.0

View File

@@ -7,6 +7,10 @@ Override poll_callback() with your domain-specific logic.
import os
import time
import logging
from datetime import datetime
import core.postgres as postgres
import core.notifications as notifications
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@@ -16,24 +20,77 @@ POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", 60))
def check_medication_reminders():
"""Check for medications due now and send notifications."""
# TODO: query medications table for doses due within the poll window
# TODO: cross-ref med_logs to skip already-taken doses
# TODO: send via core.notifications._sendToEnabledChannels()
pass
try:
meds = postgres.select("medications", where={"active": True})
now = datetime.now()
current_time = now.strftime("%H:%M")
today = now.strftime("%Y-%m-%d")
for med in meds:
times = med.get("times", [])
if current_time not in times:
continue
logs = postgres.select(
"med_logs",
where={"medication_id": med["id"]},
)
already_taken = any(
log.get("action") == "taken"
and log.get("scheduled_time", "").startswith(today)
for log in logs
)
if already_taken:
continue
user_settings = notifications.getNotificationSettings(med["user_uuid"])
if user_settings:
msg = f"Time to take {med['name']} ({med['dosage']} {med['unit']})"
notifications._sendToEnabledChannels(user_settings, msg)
except Exception as e:
logger.error(f"Error checking medication reminders: {e}")
def check_routine_reminders():
"""Check for scheduled routines due now and send notifications."""
# TODO: query routine_schedules for routines due within the poll window
# TODO: send via core.notifications._sendToEnabledChannels()
pass
try:
now = datetime.now()
current_time = now.strftime("%H:%M")
current_day = now.strftime("%a").lower()
schedules = postgres.select("routine_schedules", where={"remind": True})
for schedule in schedules:
if current_time != schedule.get("time"):
continue
days = schedule.get("days", [])
if current_day not in days:
continue
routine = postgres.select_one("routines", {"id": schedule["routine_id"]})
if not routine:
continue
user_settings = notifications.getNotificationSettings(routine["user_uuid"])
if user_settings:
msg = f"Time to start your routine: {routine['name']}"
notifications._sendToEnabledChannels(user_settings, msg)
except Exception as e:
logger.error(f"Error checking routine reminders: {e}")
def check_refills():
"""Check for medications running low on refills."""
# TODO: query medications where quantity_remaining is low
# TODO: send refill reminder via notifications
pass
try:
meds = postgres.select("medications")
for med in meds:
qty = med.get("quantity_remaining")
if qty is not None and qty <= 7:
user_settings = notifications.getNotificationSettings(med["user_uuid"])
if user_settings:
msg = f"Low on {med['name']}: only {qty} doses remaining. Time to refill!"
notifications._sendToEnabledChannels(user_settings, msg)
except Exception as e:
logger.error(f"Error checking refills: {e}")
def poll_callback():

2
tests/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# tests/__init__.py
# Test package

2
tests/api/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# tests/api/__init__.py
# API tests package

View File

@@ -0,0 +1,130 @@
"""
Tests for Routine Steps Extended API
"""
import pytest
import requests
import uuid
@pytest.mark.api
class TestRoutineStepsExtended:
"""Tests for routine_steps_extended.py endpoints."""
def test_update_step_instructions_success(self, api_base_url, auth_headers, test_routine_uuid, test_step_uuid, sample_routine_data, sample_step_data, db_helper):
"""Test updating step instructions successfully."""
# Setup: create routine and step
routine = db_helper.create_routine(sample_routine_data)
step = db_helper.create_step({**sample_step_data, "routine_id": routine["id"]})
response = requests.put(
f"{api_base_url}/api/routines/{routine['id']}/steps/{step['id']}/instructions",
headers=auth_headers,
json={"instructions": "New instructions for step"}
)
assert response.status_code == 200
data = response.json()
assert data["instructions"] == "New instructions for step"
def test_update_step_instructions_unauthorized(self, api_base_url, test_routine_uuid, test_step_uuid):
"""Test updating step instructions without auth."""
response = requests.put(
f"{api_base_url}/api/routines/{test_routine_uuid}/steps/{test_step_uuid}/instructions",
json={"instructions": "Test"}
)
assert response.status_code == 401
def test_update_step_instructions_missing_body(self, api_base_url, auth_headers, sample_routine_data, sample_step_data, db_helper):
"""Test updating step instructions with missing body."""
routine = db_helper.create_routine(sample_routine_data)
step = db_helper.create_step({**sample_step_data, "routine_id": routine["id"]})
response = requests.put(
f"{api_base_url}/api/routines/{routine['id']}/steps/{step['id']}/instructions",
headers=auth_headers
)
assert response.status_code == 400
def test_update_step_type_success(self, api_base_url, auth_headers, sample_routine_data, sample_step_data, db_helper):
"""Test updating step type successfully."""
routine = db_helper.create_routine(sample_routine_data)
step = db_helper.create_step({**sample_step_data, "routine_id": routine["id"]})
response = requests.put(
f"{api_base_url}/api/routines/{routine['id']}/steps/{step['id']}/type",
headers=auth_headers,
json={"step_type": "timer"}
)
assert response.status_code == 200
data = response.json()
assert data["step_type"] == "timer"
def test_update_step_type_invalid_type(self, api_base_url, auth_headers, sample_routine_data, sample_step_data, db_helper):
"""Test updating step type with invalid type."""
routine = db_helper.create_routine(sample_routine_data)
step = db_helper.create_step({**sample_step_data, "routine_id": routine["id"]})
response = requests.put(
f"{api_base_url}/api/routines/{routine['id']}/steps/{step['id']}/type",
headers=auth_headers,
json={"step_type": "invalid_type"}
)
assert response.status_code == 400
def test_update_step_type_all_valid_types(self, api_base_url, auth_headers, sample_routine_data, sample_step_data, db_helper):
"""Test all valid step types."""
routine = db_helper.create_routine(sample_routine_data)
step = db_helper.create_step({**sample_step_data, "routine_id": routine["id"]})
valid_types = ["generic", "timer", "checklist", "meditation", "exercise"]
for step_type in valid_types:
response = requests.put(
f"{api_base_url}/api/routines/{routine['id']}/steps/{step['id']}/type",
headers=auth_headers,
json={"step_type": step_type}
)
assert response.status_code == 200
def test_update_step_media_success(self, api_base_url, auth_headers, sample_routine_data, sample_step_data, db_helper):
"""Test updating step media URL successfully."""
routine = db_helper.create_routine(sample_routine_data)
step = db_helper.create_step({**sample_step_data, "routine_id": routine["id"]})
response = requests.put(
f"{api_base_url}/api/routines/{routine['id']}/steps/{step['id']}/media",
headers=auth_headers,
json={"media_url": "https://example.com/audio.mp3"}
)
assert response.status_code == 200
data = response.json()
assert data["media_url"] == "https://example.com/audio.mp3"
def test_update_step_media_empty(self, api_base_url, auth_headers, sample_routine_data, sample_step_data, db_helper):
"""Test updating step media with empty URL."""
routine = db_helper.create_routine(sample_routine_data)
step = db_helper.create_step({**sample_step_data, "routine_id": routine["id"]})
response = requests.put(
f"{api_base_url}/api/routines/{routine['id']}/steps/{step['id']}/media",
headers=auth_headers,
json={"media_url": ""}
)
assert response.status_code == 200
def test_update_step_not_found(self, api_base_url, auth_headers, test_routine_uuid):
"""Test updating non-existent step."""
response = requests.put(
f"{api_base_url}/api/routines/{test_routine_uuid}/steps/{uuid.uuid4()}/instructions",
headers=auth_headers,
json={"instructions": "Test"}
)
assert response.status_code == 404

199
tests/conftest.py Normal file
View File

@@ -0,0 +1,199 @@
"""
conftest.py - pytest fixtures and configuration
"""
import os
import sys
import pytest
import uuid
from datetime import datetime, date
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Set test environment variables
os.environ.setdefault("DB_HOST", "localhost")
os.environ.setdefault("DB_PORT", "5432")
os.environ.setdefault("DB_NAME", "app")
os.environ.setdefault("DB_USER", "app")
os.environ.setdefault("DB_PASS", "app")
os.environ.setdefault("JWT_SECRET", "test-secret-key")
@pytest.fixture
def test_user_uuid():
"""Generate a test user UUID."""
return str(uuid.uuid4())
@pytest.fixture
def test_routine_uuid():
"""Generate a test routine UUID."""
return str(uuid.uuid4())
@pytest.fixture
def test_step_uuid():
"""Generate a test step UUID."""
return str(uuid.uuid4())
@pytest.fixture
def test_session_uuid():
"""Generate a test session UUID."""
return str(uuid.uuid4())
@pytest.fixture
def test_tag_uuid():
"""Generate a test tag UUID."""
return str(uuid.uuid4())
@pytest.fixture
def test_template_uuid():
"""Generate a test template UUID."""
return str(uuid.uuid4())
@pytest.fixture
def sample_routine_data(test_user_uuid):
"""Sample routine data for testing."""
return {
"id": str(uuid.uuid4()),
"user_uuid": test_user_uuid,
"name": "Test Routine",
"description": "A test routine",
"icon": "test",
"created_at": datetime.now(),
}
@pytest.fixture
def sample_step_data(test_routine_uuid):
"""Sample step data for testing."""
return {
"id": str(uuid.uuid4()),
"routine_id": test_routine_uuid,
"name": "Test Step",
"instructions": "Do something",
"step_type": "generic",
"duration_minutes": 5,
"media_url": "https://example.com/media.mp3",
"position": 1,
"created_at": datetime.now(),
}
@pytest.fixture
def sample_session_data(test_routine_uuid, test_user_uuid):
"""Sample session data for testing."""
return {
"id": str(uuid.uuid4()),
"routine_id": test_routine_uuid,
"user_uuid": test_user_uuid,
"status": "active",
"current_step_index": 0,
"created_at": datetime.now(),
}
@pytest.fixture
def sample_tag_data():
"""Sample tag data for testing."""
return {
"id": str(uuid.uuid4()),
"name": "morning",
"color": "#FF0000",
}
@pytest.fixture
def sample_template_data():
"""Sample template data for testing."""
return {
"id": str(uuid.uuid4()),
"name": "Morning Routine",
"description": "Start your day right",
"icon": "sun",
"created_by_admin": False,
"created_at": datetime.now(),
}
@pytest.fixture
def mock_auth_header():
"""Mock authorization header for testing."""
import jwt
token = jwt.encode({"sub": str(uuid.uuid4())}, "test-secret-key", algorithm="HS256")
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def api_base_url():
"""Base URL for API tests."""
return os.environ.get("API_URL", "http://localhost:8080")
@pytest.fixture
def auth_token(test_user_uuid):
"""Generate a valid auth token for testing."""
import jwt
return jwt.encode({"sub": test_user_uuid}, "test-secret-key", algorithm="HS256")
@pytest.fixture
def auth_headers(auth_token):
"""Authorization headers with valid token."""
return {"Authorization": f"Bearer {auth_token}", "Content-Type": "application/json"}
def pytest_configure(config):
"""Configure pytest with custom markers."""
config.addinivalue_line("markers", "api: mark test as API test")
config.addinivalue_line("markers", "core: mark test as core module test")
config.addinivalue_line("markers", "integration: mark test as integration test")
config.addinivalue_line("markers", "slow: mark test as slow running")
@pytest.fixture
def db_helper():
"""Fixture that provides a DBHelper instance."""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class DBHelper:
def __init__(self):
import core.postgres as postgres
self.postgres = postgres
def create_user(self, data=None):
if data is None:
data = {}
user = {
"id": data.get("id", str(uuid.uuid4())),
"username": data.get("username", f"testuser_{uuid.uuid4().hex[:8]}"),
"password_hashed": data.get("password_hashed", "$2b$12$test"),
}
return self.postgres.insert("users", user)
def create_routine(self, data):
return self.postgres.insert("routines", data)
def create_step(self, data):
return self.postgres.insert("routine_steps", data)
def create_session(self, data):
return self.postgres.insert("routine_sessions", data)
def create_tag(self, data):
return self.postgres.insert("routine_tags", data)
def create_template(self, data):
return self.postgres.insert("routine_templates", data)
def create_streak(self, data):
return self.postgres.insert("routine_streaks", data)
return DBHelper()

2
tests/core/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# tests/core/__init__.py
# Core module tests package

78
tests/db_helper.py Normal file
View File

@@ -0,0 +1,78 @@
"""
Database helper for tests - provides fixtures for creating test data
"""
import uuid
import pytest
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
class DBHelper:
"""Helper class for creating test data in the database."""
def __init__(self):
import core.postgres as postgres
self.postgres = postgres
def create_user(self, data=None):
"""Create a test user."""
if data is None:
data = {}
user = {
"id": data.get("id", str(uuid.uuid4())),
"username": data.get("username", f"testuser_{uuid.uuid4().hex[:8]}"),
"password_hashed": data.get("password_hashed", "$2b$12$test"),
}
return self.postgres.insert("users", user)
def create_routine(self, data):
"""Create a test routine."""
return self.postgres.insert("routines", data)
def create_step(self, data):
"""Create a test routine step."""
return self.postgres.insert("routine_steps", data)
def create_session(self, data):
"""Create a test routine session."""
return self.postgres.insert("routine_sessions", data)
def create_tag(self, data):
"""Create a test tag."""
return self.postgres.insert("routine_tags", data)
def create_template(self, data):
"""Create a test template."""
return self.postgres.insert("routine_templates", data)
def create_template_step(self, data):
"""Create a test template step."""
return self.postgres.insert("routine_template_steps", data)
def create_streak(self, data):
"""Create a test streak."""
return self.postgres.insert("routine_streaks", data)
def cleanup(self, user_uuid):
"""Clean up test data for a user."""
# Delete in reverse order of dependencies
self.postgres.delete("routine_streaks", {"user_uuid": user_uuid})
self.postgres.delete("routine_session_notes", {})
self.postgres.delete("routine_sessions", {"user_uuid": user_uuid})
self.postgres.delete("routine_steps", {})
self.postgres.delete("routine_routine_tags", {})
self.postgres.delete("routine_schedules", {})
self.postgres.delete("routine_template_steps", {})
self.postgres.delete("routine_templates", {})
self.postgres.delete("routines", {"user_uuid": user_uuid})
self.postgres.delete("users", {"id": user_uuid})
@pytest.fixture
def db_helper():
"""Fixture that provides a DBHelper instance."""
return DBHelper()

View File

@@ -0,0 +1,2 @@
# tests/integration/__init__.py
# Integration tests package

286
tests/test_routines_api.py Normal file
View File

@@ -0,0 +1,286 @@
"""
Comprehensive test script for Extended Routines API
This script can be run manually to test all endpoints.
Usage: python test_routines_api.py
"""
import requests
import json
import sys
import uuid
import time
BASE_URL = "http://localhost:8080"
# Test user credentials
TEST_USERNAME = "testuser"
TEST_PASSWORD = "testpass123"
def get_token():
"""Login and get auth token."""
resp = requests.post(f"{BASE_URL}/api/login", json={
"username": TEST_USERNAME,
"password": TEST_PASSWORD
})
if resp.status_code == 200:
return resp.json()["token"]
print(f"Login failed: {resp.text}")
return None
def make_request(method, endpoint, token, data=None):
"""Make authenticated API request."""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
url = f"{BASE_URL}{endpoint}"
resp = requests.request(method, url, headers=headers, json=data)
return resp
def test_medications_crud(token):
"""Test medications CRUD operations."""
print("\n=== Testing Medications CRUD ===")
# List medications
resp = make_request("GET", "/api/medications", token)
print(f"GET /api/medications: {resp.status_code}")
# Add medication
med_data = {
"name": f"Test Med {uuid.uuid4().hex[:6]}",
"dosage": "100",
"unit": "mg",
"frequency": "daily",
"times": ["08:00"]
}
resp = make_request("POST", "/api/medications", token, med_data)
print(f"POST /api/medications: {resp.status_code}")
med_id = resp.json().get("id")
# Get medication
if med_id:
resp = make_request("GET", f"/api/medications/{med_id}", token)
print(f"GET /api/medications/{med_id}: {resp.status_code}")
# Update medication
resp = make_request("PUT", f"/api/medications/{med_id}", token, {"notes": "test note"})
print(f"PUT /api/medications/{med_id}: {resp.status_code}")
# Take medication
resp = make_request("POST", f"/api/medications/{med_id}/take", token, {"scheduled_time": "08:00"})
print(f"POST /api/medications/{med_id}/take: {resp.status_code}")
# Get adherence
resp = make_request("GET", f"/api/medications/{med_id}/adherence", token)
print(f"GET /api/medications/{med_id}/adherence: {resp.status_code}")
# Delete medication
resp = make_request("DELETE", f"/api/medications/{med_id}", token)
print(f"DELETE /api/medications/{med_id}: {resp.status_code}")
def test_routines_crud(token):
"""Test routines CRUD operations."""
print("\n=== Testing Routines CRUD ===")
# List routines
resp = make_request("GET", "/api/routines", token)
print(f"GET /api/routines: {resp.status_code}")
# Create routine
routine_data = {
"name": f"Test Routine {uuid.uuid4().hex[:6]}",
"description": "A test routine"
}
resp = make_request("POST", "/api/routines", token, routine_data)
print(f"POST /api/routines: {resp.status_code}")
routine_id = resp.json().get("id")
if routine_id:
# Get routine
resp = make_request("GET", f"/api/routines/{routine_id}", token)
print(f"GET /api/routines/{routine_id}: {resp.status_code}")
# Add step
step_data = {
"name": "Test Step",
"duration_minutes": 5
}
resp = make_request("POST", f"/api/routines/{routine_id}/steps", token, step_data)
print(f"POST /api/routines/{routine_id}/steps: {resp.status_code}")
step_id = resp.json().get("id")
if step_id:
# Update step instructions
resp = make_request("PUT", f"/api/routines/{routine_id}/steps/{step_id}/instructions",
token, {"instructions": "Do this step carefully"})
print(f"PUT /steps/{step_id}/instructions: {resp.status_code}")
# Update step type
resp = make_request("PUT", f"/api/routines/{routine_id}/steps/{step_id}/type",
token, {"step_type": "timer"})
print(f"PUT /steps/{step_id}/type: {resp.status_code}")
# Update step media
resp = make_request("PUT", f"/api/routines/{routine_id}/steps/{step_id}/media",
token, {"media_url": "https://example.com/audio.mp3"})
print(f"PUT /steps/{step_id}/media: {resp.status_code}")
# Start session
resp = make_request("POST", f"/api/routines/{routine_id}/start", token)
print(f"POST /routines/{routine_id}/start: {resp.status_code}")
session_id = resp.json().get("session", {}).get("id")
if session_id:
# Complete step
resp = make_request("POST", f"/api/sessions/{session_id}/complete-step", token)
print(f"POST /sessions/{session_id}/complete-step: {resp.status_code}")
# Pause session (if still active)
resp = make_request("POST", f"/api/routines/{routine_id}/start", token)
if resp.status_code == 201:
session_id2 = resp.json().get("session", {}).get("id")
resp = make_request("POST", f"/api/sessions/{session_id2}/pause", token)
print(f"POST /sessions/{session_id2}/pause: {resp.status_code}")
# Resume session
resp = make_request("POST", f"/api/sessions/{session_id2}/resume", token)
print(f"POST /sessions/{session_id2}/resume: {resp.status_code}")
# Abort session
resp = make_request("POST", f"/api/sessions/{session_id2}/abort", token, {"reason": "Test abort"})
print(f"POST /sessions/{session_id2}/abort: {resp.status_code}")
# Get session details
resp = make_request("GET", f"/api/sessions/{session_id}", token)
print(f"GET /sessions/{session_id}: {resp.status_code}")
# Get stats
resp = make_request("GET", f"/api/routines/{routine_id}/stats", token)
print(f"GET /routines/{routine_id}/stats: {resp.status_code}")
# Get streak
resp = make_request("GET", f"/api/routines/{routine_id}/streak", token)
print(f"GET /routines/{routine_id}/streak: {resp.status_code}")
# Delete routine
resp = make_request("DELETE", f"/api/routines/{routine_id}", token)
print(f"DELETE /api/routines/{routine_id}: {resp.status_code}")
def test_templates(token):
"""Test template operations."""
print("\n=== Testing Templates ===")
# List templates
resp = make_request("GET", "/api/templates", token)
print(f"GET /api/templates: {resp.status_code}")
# Create template
template_data = {
"name": f"Test Template {uuid.uuid4().hex[:6]}",
"description": "A test template"
}
resp = make_request("POST", "/api/templates", token, template_data)
print(f"POST /api/templates: {resp.status_code}")
template_id = resp.json().get("id")
if template_id:
# Get template
resp = make_request("GET", f"/api/templates/{template_id}", token)
print(f"GET /api/templates/{template_id}: {resp.status_code}")
# Add template step
step_data = {"name": "Template Step 1"}
resp = make_request("POST", f"/api/templates/{template_id}/steps", token, step_data)
print(f"POST /templates/{template_id}/steps: {resp.status_code}")
# Clone template
resp = make_request("POST", f"/api/templates/{template_id}/clone", token)
print(f"POST /templates/{template_id}/clone: {resp.status_code}")
# Delete template
resp = make_request("DELETE", f"/api/templates/{template_id}", token)
print(f"DELETE /api/templates/{template_id}: {resp.status_code}")
def test_tags(token):
"""Test tag operations."""
print("\n=== Testing Tags ===")
# List tags
resp = make_request("GET", "/api/tags", token)
print(f"GET /api/tags: {resp.status_code}")
# Create tag
tag_data = {
"name": f"testtag_{uuid.uuid4().hex[:6]}",
"color": "#FF0000"
}
resp = make_request("POST", "/api/tags", token, tag_data)
print(f"POST /api/tags: {resp.status_code}")
tag_id = resp.json().get("id")
# Get streaks
resp = make_request("GET", "/api/routines/streaks", token)
print(f"GET /api/routines/streaks: {resp.status_code}")
# Get weekly summary
resp = make_request("GET", "/api/routines/weekly-summary", token)
print(f"GET /api/routines/weekly-summary: {resp.status_code}")
if tag_id:
# Delete tag
resp = make_request("DELETE", f"/api/tags/{tag_id}", token)
print(f"DELETE /api/tags/{tag_id}: {resp.status_code}")
def test_auth_errors(token):
"""Test authentication errors."""
print("\n=== Testing Auth Errors ===")
# Request without token
resp = requests.get(f"{BASE_URL}/api/medications")
print(f"GET /api/medications (no token): {resp.status_code}")
# Request with invalid token
resp = requests.get(f"{BASE_URL}/api/medications",
headers={"Authorization": "Bearer invalid_token"})
print(f"GET /api/medications (invalid token): {resp.status_code}")
def main():
"""Run all tests."""
print("=" * 50)
print("Extended Routines API - Comprehensive Test")
print("=" * 50)
# Wait for API to be ready
print("\nWaiting for API...")
for i in range(10):
try:
resp = requests.get(f"{BASE_URL}/health")
if resp.status_code == 200:
print("API is ready!")
break
except:
pass
time.sleep(1)
# Get auth token
token = get_token()
if not token:
print("Failed to get auth token")
sys.exit(1)
print(f"Got auth token: {token[:20]}...")
# Run tests
test_auth_errors(token)
test_medications_crud(token)
test_routines_crud(token)
test_templates(token)
test_tags(token)
print("\n" + "=" * 50)
print("All tests completed!")
print("=" * 50)
if __name__ == "__main__":
main()