import core.users as users import core.postgres as postgres import bcrypt import jwt from jwt.exceptions import ExpiredSignatureError, InvalidTokenError import datetime import os REFRESH_TOKEN_SECRET = None def _get_refresh_secret(): global REFRESH_TOKEN_SECRET if REFRESH_TOKEN_SECRET is None: REFRESH_TOKEN_SECRET = os.getenv("JWT_SECRET", "") + "_refresh" return REFRESH_TOKEN_SECRET def verifyLoginToken(login_token, username=False, userUUID=False): if username: userUUID = users.getUserUUID(username) if userUUID: try: decoded_token = jwt.decode( login_token, os.getenv("JWT_SECRET"), algorithms=["HS256"] ) if decoded_token.get("sub") == str(userUUID): return True return False except (ExpiredSignatureError, InvalidTokenError): return False return False def getUserpasswordHash(userUUID): user = postgres.select_one("users", {"id": userUUID}) if user: pw_hash = user.get("password_hashed") if isinstance(pw_hash, memoryview): return bytes(pw_hash) return pw_hash return None def getLoginToken(username, password): userUUID = users.getUserUUID(username) if userUUID: formatted_pass = password.encode("utf-8") users_hashed_pw = getUserpasswordHash(userUUID) if bcrypt.checkpw(formatted_pass, users_hashed_pw): payload = { "sub": userUUID, "name": users.getUserFirstName(userUUID), "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1), } return jwt.encode(payload, os.getenv("JWT_SECRET"), algorithm="HS256") return False def createRefreshToken(userUUID): """Create a long-lived refresh token (30 days).""" payload = { "sub": str(userUUID), "type": "refresh", "exp": datetime.datetime.utcnow() + datetime.timedelta(days=30), } return jwt.encode(payload, _get_refresh_secret(), algorithm="HS256") def refreshAccessToken(refresh_token): """Validate a refresh token and return a new access token + user_uuid. Returns (access_token, user_uuid) or (None, None).""" try: decoded = jwt.decode( refresh_token, _get_refresh_secret(), algorithms=["HS256"] ) if decoded.get("type") != "refresh": return None, None user_uuid = decoded.get("sub") if not user_uuid: return None, None # Verify user still exists user = postgres.select_one("users", {"id": user_uuid}) if not user: return None, None # Create new access token payload = { "sub": user_uuid, "name": user.get("first_name", ""), "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1), } access_token = jwt.encode(payload, os.getenv("JWT_SECRET"), algorithm="HS256") return access_token, user_uuid except (ExpiredSignatureError, InvalidTokenError): return None, None def unregisterUser(userUUID, password): pw_hash = getUserpasswordHash(userUUID) if not pw_hash: return False if bcrypt.checkpw(password.encode("utf-8"), pw_hash): return users.deleteUser(userUUID) return False