- Flask-based web application with PostgreSQL - User authentication and session management - Content moderation and filtering - Docker deployment with docker-compose - Admin interface for content management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
341 lines
9.7 KiB
Python
341 lines
9.7 KiB
Python
"""
|
|
User Authentication Service
|
|
Handles user management, authentication, and session management using SQLAlchemy.
|
|
"""
|
|
|
|
import time
|
|
import logging
|
|
from typing import Optional, List
|
|
from functools import wraps
|
|
from models import User, db
|
|
|
|
|
|
def db_retry(max_retries=3, delay=0.1):
|
|
"""
|
|
Decorator to retry database operations with exponential backoff.
|
|
|
|
Args:
|
|
max_retries: Maximum number of retry attempts
|
|
delay: Base delay between retries (exponentially increased)
|
|
"""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
logger = logging.getLogger(__name__)
|
|
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
|
|
except Exception as e:
|
|
# Check if this is a database-related error
|
|
error_msg = str(e).lower()
|
|
is_db_error = any(keyword in error_msg for keyword in [
|
|
'connection', 'timeout', 'database', 'postgresql', 'psycopg2',
|
|
'server closed', 'lost connection', 'connection reset'
|
|
])
|
|
|
|
if not is_db_error or attempt == max_retries:
|
|
# Not a retryable error or final attempt
|
|
db.session.rollback()
|
|
logger.error(f"Database operation failed: {e}")
|
|
raise
|
|
|
|
# Retry with exponential backoff
|
|
retry_delay = delay * (2 ** attempt)
|
|
logger.warning(f"Database error (attempt {attempt + 1}/{max_retries + 1}): {e}")
|
|
logger.info(f"Retrying in {retry_delay:.2f}s...")
|
|
|
|
db.session.rollback()
|
|
time.sleep(retry_delay)
|
|
|
|
return None
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
class UserService:
|
|
"""Service for managing users with SQLAlchemy and PostgreSQL"""
|
|
|
|
def __init__(self):
|
|
"""
|
|
Initialize user service.
|
|
No arguments needed - uses SQLAlchemy db instance from models.
|
|
"""
|
|
pass
|
|
|
|
@db_retry(max_retries=3, delay=0.2)
|
|
def create_user(self, username: str, email: str, password: str = None, is_admin: bool = False, auth0_id: str = None) -> Optional[str]:
|
|
"""
|
|
Create a new user.
|
|
|
|
Args:
|
|
username: Unique username
|
|
email: Unique email address
|
|
password: Plain text password (will be hashed with bcrypt) - optional for OAuth
|
|
is_admin: Whether user is admin
|
|
auth0_id: Auth0 user ID for OAuth users
|
|
|
|
Returns:
|
|
User ID if successful, None if error
|
|
"""
|
|
try:
|
|
# Create new user (password is automatically hashed in __init__)
|
|
user = User(
|
|
username=username,
|
|
email=email,
|
|
password=password,
|
|
is_admin=is_admin,
|
|
auth0_id=auth0_id
|
|
)
|
|
|
|
# Add to database
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
return user.id
|
|
|
|
except ValueError as e:
|
|
# Input validation error
|
|
db.session.rollback()
|
|
print(f"Validation error creating user: {e}")
|
|
return None
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
print(f"Error creating user: {e}")
|
|
return None
|
|
|
|
@db_retry(max_retries=2, delay=0.1)
|
|
def authenticate(self, username: str, password: str) -> Optional[User]:
|
|
"""
|
|
Authenticate user with username/password.
|
|
|
|
Args:
|
|
username: Username or email
|
|
password: Plain text password
|
|
|
|
Returns:
|
|
User object if authenticated, None otherwise
|
|
"""
|
|
try:
|
|
# Query for user by username or email
|
|
user = User.query.filter(
|
|
(User.username == username) | (User.email == username)
|
|
).first()
|
|
|
|
if not user:
|
|
return None
|
|
|
|
# Check password using bcrypt
|
|
if user.check_password(password):
|
|
# Update last login
|
|
user.update_last_login()
|
|
return user
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error authenticating user: {e}")
|
|
return None
|
|
|
|
@db_retry(max_retries=2, delay=0.1)
|
|
def get_user_by_id(self, user_id: str) -> Optional[User]:
|
|
"""
|
|
Get user by ID.
|
|
|
|
Args:
|
|
user_id: User UUID
|
|
|
|
Returns:
|
|
User object if found, None otherwise
|
|
"""
|
|
try:
|
|
return User.query.get(user_id)
|
|
except Exception as e:
|
|
print(f"Error getting user: {e}")
|
|
return None
|
|
|
|
def get_user_by_username(self, username: str) -> Optional[User]:
|
|
"""
|
|
Get user by username.
|
|
|
|
Args:
|
|
username: Username
|
|
|
|
Returns:
|
|
User object if found, None otherwise
|
|
"""
|
|
try:
|
|
return User.query.filter_by(username=username).first()
|
|
except Exception as e:
|
|
print(f"Error getting user by username: {e}")
|
|
return None
|
|
|
|
def get_user_by_email(self, email: str) -> Optional[User]:
|
|
"""
|
|
Get user by email.
|
|
|
|
Args:
|
|
email: Email address
|
|
|
|
Returns:
|
|
User object if found, None otherwise
|
|
"""
|
|
try:
|
|
return User.query.filter_by(email=email).first()
|
|
except Exception as e:
|
|
print(f"Error getting user by email: {e}")
|
|
return None
|
|
|
|
def username_exists(self, username: str) -> bool:
|
|
"""
|
|
Check if username already exists.
|
|
|
|
Args:
|
|
username: Username to check
|
|
|
|
Returns:
|
|
True if username exists, False otherwise
|
|
"""
|
|
try:
|
|
return User.query.filter_by(username=username).first() is not None
|
|
except Exception as e:
|
|
print(f"Error checking username: {e}")
|
|
return False
|
|
|
|
def email_exists(self, email: str) -> bool:
|
|
"""
|
|
Check if email already exists.
|
|
|
|
Args:
|
|
email: Email to check
|
|
|
|
Returns:
|
|
True if email exists, False otherwise
|
|
"""
|
|
try:
|
|
return User.query.filter_by(email=email).first() is not None
|
|
except Exception as e:
|
|
print(f"Error checking email: {e}")
|
|
return False
|
|
|
|
def get_all_users(self) -> List[User]:
|
|
"""
|
|
Get all users (for admin panel).
|
|
|
|
Returns:
|
|
List of User objects
|
|
"""
|
|
try:
|
|
return User.query.order_by(User.created_at.desc()).all()
|
|
except Exception as e:
|
|
print(f"Error getting all users: {e}")
|
|
return []
|
|
|
|
def delete_user(self, user_id: str) -> bool:
|
|
"""
|
|
Delete a user (admin only).
|
|
|
|
Args:
|
|
user_id: User ID to delete
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
user = User.query.get(user_id)
|
|
if user:
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
print(f"Error deleting user: {e}")
|
|
return False
|
|
|
|
def update_user_admin_status(self, user_id: str, is_admin: bool) -> bool:
|
|
"""
|
|
Update user's admin status.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
is_admin: New admin status
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
user = User.query.get(user_id)
|
|
if user:
|
|
user.is_admin = is_admin
|
|
db.session.commit()
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
print(f"Error updating admin status: {e}")
|
|
return False
|
|
|
|
def update_password(self, user_id: str, new_password: str) -> bool:
|
|
"""
|
|
Update user's password.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
new_password: New plain text password (will be hashed)
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
user = User.query.get(user_id)
|
|
if user:
|
|
user.set_password(new_password)
|
|
db.session.commit()
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
print(f"Error updating password: {e}")
|
|
return False
|
|
|
|
def get_user_by_auth0_id(self, auth0_id: str) -> Optional[User]:
|
|
"""
|
|
Get user by Auth0 ID.
|
|
|
|
Args:
|
|
auth0_id: Auth0 user identifier
|
|
|
|
Returns:
|
|
User object if found, None otherwise
|
|
"""
|
|
try:
|
|
user = User.query.filter_by(auth0_id=auth0_id).first()
|
|
return user
|
|
except Exception as e:
|
|
print(f"Error getting user by Auth0 ID: {e}")
|
|
return None
|
|
|
|
def link_auth0_account(self, user_id: str, auth0_id: str) -> bool:
|
|
"""
|
|
Link an existing user account to Auth0.
|
|
|
|
Args:
|
|
user_id: Existing user ID
|
|
auth0_id: Auth0 user identifier
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
user = User.query.filter_by(id=user_id).first()
|
|
if user:
|
|
user.auth0_id = auth0_id
|
|
db.session.commit()
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error linking Auth0 account: {e}")
|
|
return False
|