""" User Authentication Service Handles user management, authentication, and session management using SQLAlchemy. """ import time import logging from typing import Optional, List from functools import wraps from models import User, db def db_retry(max_retries=3, delay=0.1): """ Decorator to retry database operations with exponential backoff. Args: max_retries: Maximum number of retry attempts delay: Base delay between retries (exponentially increased) """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): logger = logging.getLogger(__name__) for attempt in range(max_retries + 1): try: return func(*args, **kwargs) except Exception as e: # Check if this is a database-related error error_msg = str(e).lower() is_db_error = any(keyword in error_msg for keyword in [ 'connection', 'timeout', 'database', 'postgresql', 'psycopg2', 'server closed', 'lost connection', 'connection reset' ]) if not is_db_error or attempt == max_retries: # Not a retryable error or final attempt db.session.rollback() logger.error(f"Database operation failed: {e}") raise # Retry with exponential backoff retry_delay = delay * (2 ** attempt) logger.warning(f"Database error (attempt {attempt + 1}/{max_retries + 1}): {e}") logger.info(f"Retrying in {retry_delay:.2f}s...") db.session.rollback() time.sleep(retry_delay) return None return wrapper return decorator class UserService: """Service for managing users with SQLAlchemy and PostgreSQL""" def __init__(self): """ Initialize user service. No arguments needed - uses SQLAlchemy db instance from models. """ pass @db_retry(max_retries=3, delay=0.2) def create_user(self, username: str, email: str, password: str = None, is_admin: bool = False, auth0_id: str = None) -> Optional[str]: """ Create a new user. Args: username: Unique username email: Unique email address password: Plain text password (will be hashed with bcrypt) - optional for OAuth is_admin: Whether user is admin auth0_id: Auth0 user ID for OAuth users Returns: User ID if successful, None if error """ try: # Create new user (password is automatically hashed in __init__) user = User( username=username, email=email, password=password, is_admin=is_admin, auth0_id=auth0_id ) # Add to database db.session.add(user) db.session.commit() return user.id except ValueError as e: # Input validation error db.session.rollback() print(f"Validation error creating user: {e}") return None except Exception as e: db.session.rollback() print(f"Error creating user: {e}") return None @db_retry(max_retries=2, delay=0.1) def authenticate(self, username: str, password: str) -> Optional[User]: """ Authenticate user with username/password. Args: username: Username or email password: Plain text password Returns: User object if authenticated, None otherwise """ try: # Query for user by username or email user = User.query.filter( (User.username == username) | (User.email == username) ).first() if not user: return None # Check password using bcrypt if user.check_password(password): # Update last login user.update_last_login() return user return None except Exception as e: print(f"Error authenticating user: {e}") return None @db_retry(max_retries=2, delay=0.1) def get_user_by_id(self, user_id: str) -> Optional[User]: """ Get user by ID. Args: user_id: User UUID Returns: User object if found, None otherwise """ try: return User.query.get(user_id) except Exception as e: print(f"Error getting user: {e}") return None def get_user_by_username(self, username: str) -> Optional[User]: """ Get user by username. Args: username: Username Returns: User object if found, None otherwise """ try: return User.query.filter_by(username=username).first() except Exception as e: print(f"Error getting user by username: {e}") return None def get_user_by_email(self, email: str) -> Optional[User]: """ Get user by email. Args: email: Email address Returns: User object if found, None otherwise """ try: return User.query.filter_by(email=email).first() except Exception as e: print(f"Error getting user by email: {e}") return None def username_exists(self, username: str) -> bool: """ Check if username already exists. Args: username: Username to check Returns: True if username exists, False otherwise """ try: return User.query.filter_by(username=username).first() is not None except Exception as e: print(f"Error checking username: {e}") return False def email_exists(self, email: str) -> bool: """ Check if email already exists. Args: email: Email to check Returns: True if email exists, False otherwise """ try: return User.query.filter_by(email=email).first() is not None except Exception as e: print(f"Error checking email: {e}") return False def get_all_users(self) -> List[User]: """ Get all users (for admin panel). Returns: List of User objects """ try: return User.query.order_by(User.created_at.desc()).all() except Exception as e: print(f"Error getting all users: {e}") return [] def delete_user(self, user_id: str) -> bool: """ Delete a user (admin only). Args: user_id: User ID to delete Returns: True if successful, False otherwise """ try: user = User.query.get(user_id) if user: db.session.delete(user) db.session.commit() return True return False except Exception as e: db.session.rollback() print(f"Error deleting user: {e}") return False def update_user_admin_status(self, user_id: str, is_admin: bool) -> bool: """ Update user's admin status. Args: user_id: User ID is_admin: New admin status Returns: True if successful, False otherwise """ try: user = User.query.get(user_id) if user: user.is_admin = is_admin db.session.commit() return True return False except Exception as e: db.session.rollback() print(f"Error updating admin status: {e}") return False def update_password(self, user_id: str, new_password: str) -> bool: """ Update user's password. Args: user_id: User ID new_password: New plain text password (will be hashed) Returns: True if successful, False otherwise """ try: user = User.query.get(user_id) if user: user.set_password(new_password) db.session.commit() return True return False except Exception as e: db.session.rollback() print(f"Error updating password: {e}") return False def get_user_by_auth0_id(self, auth0_id: str) -> Optional[User]: """ Get user by Auth0 ID. Args: auth0_id: Auth0 user identifier Returns: User object if found, None otherwise """ try: user = User.query.filter_by(auth0_id=auth0_id).first() return user except Exception as e: print(f"Error getting user by Auth0 ID: {e}") return None def link_auth0_account(self, user_id: str, auth0_id: str) -> bool: """ Link an existing user account to Auth0. Args: user_id: Existing user ID auth0_id: Auth0 user identifier Returns: True if successful, False otherwise """ try: user = User.query.filter_by(id=user_id).first() if user: user.auth0_id = auth0_id db.session.commit() return True return False except Exception as e: print(f"Error linking Auth0 account: {e}") return False