""" Database Models SQLAlchemy models for the application. """ import uuid from datetime import datetime from flask_login import UserMixin from flask_bcrypt import Bcrypt from database import db # Initialize bcrypt bcrypt = Bcrypt() class User(UserMixin, db.Model): """User model with bcrypt password hashing""" __tablename__ = 'users' # Primary fields id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) username = db.Column(db.String(80), unique=True, nullable=False, index=True) email = db.Column(db.String(120), unique=True, nullable=False, index=True) password_hash = db.Column(db.String(128), nullable=True) # Nullable for OAuth users # OAuth fields auth0_id = db.Column(db.String(255), unique=True, nullable=True, index=True) # User attributes is_admin = db.Column(db.Boolean, default=False, nullable=False) is_active = db.Column(db.Boolean, default=True, nullable=False) # Profile profile_picture_url = db.Column(db.String(255), nullable=True) # Timestamps created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) last_login = db.Column(db.DateTime, nullable=True) # User settings (JSON stored as text) settings = db.Column(db.Text, default='{}') def __init__(self, username, email, password=None, is_admin=False, auth0_id=None): """ Initialize a new user. Args: username: Unique username email: Unique email address password: Plain text password (will be hashed) - optional for OAuth users is_admin: Whether user is admin (default False) auth0_id: Auth0 user ID for OAuth users (optional) """ # Validate inputs if not username or not isinstance(username, str) or len(username) > 80: raise ValueError("Invalid username") if not email or not isinstance(email, str) or len(email) > 120: raise ValueError("Invalid email") if password is not None and (not isinstance(password, str) or len(password) < 1): raise ValueError("Invalid password") if password is None and auth0_id is None: raise ValueError("Either password or auth0_id must be provided") self.id = str(uuid.uuid4()) self.username = username.strip() self.email = email.strip().lower() self.auth0_id = auth0_id if password: self.set_password(password) else: self.password_hash = None # OAuth users don't have passwords self.is_admin = bool(is_admin) self.is_active = True self.created_at = datetime.utcnow() def set_password(self, password): """ Hash and set user password using bcrypt. Args: password: Plain text password """ self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8') def check_password(self, password): """ Verify password against stored hash. Args: password: Plain text password to check Returns: True if password matches, False otherwise """ return bcrypt.check_password_hash(self.password_hash, password) def update_last_login(self): """Update the last login timestamp""" self.last_login = datetime.utcnow() db.session.commit() def get_id(self): """Required by Flask-Login""" return self.id def __repr__(self): return f'' class Session(db.Model): """User session model for tracking active sessions""" __tablename__ = 'user_sessions' session_id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) user_id = db.Column(db.String(36), db.ForeignKey('users.id'), nullable=False) created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) expires_at = db.Column(db.DateTime, nullable=False) # Relationship user = db.relationship('User', backref=db.backref('sessions', lazy=True)) def __repr__(self): return f'' class PollSource(db.Model): """Source polling configuration""" __tablename__ = 'poll_sources' id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) platform = db.Column(db.String(50), nullable=False, index=True) # reddit, hackernews, etc. source_id = db.Column(db.String(100), nullable=False) # programming, python, etc. display_name = db.Column(db.String(200), nullable=False) # Polling configuration enabled = db.Column(db.Boolean, default=True, nullable=False) poll_interval_minutes = db.Column(db.Integer, default=60, nullable=False) # How often to poll # Status tracking last_poll_time = db.Column(db.DateTime, nullable=True) last_poll_status = db.Column(db.String(50), nullable=True) # success, error, etc. last_poll_error = db.Column(db.Text, nullable=True) posts_collected = db.Column(db.Integer, default=0, nullable=False) # Metadata created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow) created_by = db.Column(db.String(36), db.ForeignKey('users.id'), nullable=True) # Unique constraint on platform + source_id __table_args__ = ( db.UniqueConstraint('platform', 'source_id', name='unique_platform_source'), ) def __repr__(self): return f'' class PollLog(db.Model): """Log of polling activities""" __tablename__ = 'poll_logs' id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) source_id = db.Column(db.String(36), db.ForeignKey('poll_sources.id'), nullable=False, index=True) started_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) completed_at = db.Column(db.DateTime, nullable=True) status = db.Column(db.String(50), nullable=False) # running, success, error posts_found = db.Column(db.Integer, default=0) posts_new = db.Column(db.Integer, default=0) posts_updated = db.Column(db.Integer, default=0) error_message = db.Column(db.Text, nullable=True) # Relationship source = db.relationship('PollSource', backref=db.backref('logs', lazy='dynamic', order_by='PollLog.started_at.desc()')) def __repr__(self): return f''