""" BalanceBoard Web Application Flask server with user authentication and content serving. """ import os import re import logging import time from pathlib import Path from werkzeug.utils import secure_filename from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, abort, session, jsonify from flask_login import LoginManager, login_user, logout_user, login_required, current_user from dotenv import load_dotenv from functools import lru_cache from collections import defaultdict from authlib.integrations.flask_client import OAuth from urllib.parse import quote_plus, urlencode from database import init_db, db from models import User, bcrypt from user_service import UserService import json # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('app.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Initialize Flask app app = Flask(__name__, static_folder='themes', template_folder='templates') app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production') app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size # Auth0 Configuration app.config['AUTH0_DOMAIN'] = os.getenv('AUTH0_DOMAIN', '') app.config['AUTH0_CLIENT_ID'] = os.getenv('AUTH0_CLIENT_ID', '') app.config['AUTH0_CLIENT_SECRET'] = os.getenv('AUTH0_CLIENT_SECRET', '') app.config['AUTH0_AUDIENCE'] = os.getenv('AUTH0_AUDIENCE', '') # Configuration constants ALLOWED_FILTERSETS = {'no_filter', 'safe_content'} ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'} UPLOAD_FOLDER = 'static/avatars' MAX_FILENAME_LENGTH = 100 DEFAULT_PORT = 5021 DEFAULT_PAGE_SIZE = 20 MIN_PASSWORD_LENGTH = 8 MAX_USERNAME_LENGTH = 80 MAX_EMAIL_LENGTH = 120 MAX_COMMUNITY_NAME_LENGTH = 100 # Initialize database init_db(app) # Initialize bcrypt bcrypt.init_app(app) # Initialize Flask-Login login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' login_manager.login_message = 'Please log in to access this page.' # Initialize user service user_service = UserService() # Initialize polling service from polling_service import polling_service polling_service.init_app(app) polling_service.start() # Initialize OAuth for Auth0 oauth = OAuth(app) auth0 = oauth.register( 'auth0', client_id=app.config['AUTH0_CLIENT_ID'], client_secret=app.config['AUTH0_CLIENT_SECRET'], server_metadata_url=f'https://{app.config["AUTH0_DOMAIN"]}/.well-known/openid_configuration', client_kwargs={ 'scope': 'openid profile email', } ) # Cache for posts and comments - improves performance post_cache = {} comment_cache = defaultdict(list) cache_timestamp = 0 CACHE_DURATION = 300 # 5 minutes # Security helper functions def _is_safe_filterset(filterset): """Validate filterset name for security""" if not filterset or not isinstance(filterset, str): return False return filterset in ALLOWED_FILTERSETS and re.match(r'^[a-zA-Z0-9_-]+$', filterset) def _is_safe_path(path): """Validate file path for security""" if not path or not isinstance(path, str): return False # Check for directory traversal attempts if '..' in path or path.startswith('/') or '\\' in path: return False # Only allow alphanumeric, dots, hyphens, underscores, and forward slashes return re.match(r'^[a-zA-Z0-9._/-]+$', path) is not None def _is_allowed_file(filename): """Check if file extension is allowed""" return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def _load_posts_cache(): """Load and cache posts data for better performance""" global post_cache, comment_cache, cache_timestamp current_time = time.time() if current_time - cache_timestamp < CACHE_DURATION and post_cache: return post_cache, comment_cache # Clear existing cache post_cache.clear() comment_cache.clear() posts_dir = Path('data/posts') comments_dir = Path('data/comments') # Load all posts if posts_dir.exists(): for post_file in posts_dir.glob('*.json'): try: with open(post_file, 'r') as f: post_data = json.load(f) post_uuid = post_data.get('uuid') if post_uuid: post_cache[post_uuid] = post_data except (json.JSONDecodeError, IOError) as e: logger.debug(f"Error reading post file {post_file}: {e}") continue # Load all comments and group by post UUID if comments_dir.exists(): for comment_file in comments_dir.glob('*.json'): try: with open(comment_file, 'r') as f: comment_data = json.load(f) post_uuid = comment_data.get('post_uuid') if post_uuid: comment_cache[post_uuid].append(comment_data) except (json.JSONDecodeError, IOError) as e: logger.debug(f"Error reading comment file {comment_file}: {e}") continue cache_timestamp = current_time logger.info(f"Cache refreshed: {len(post_cache)} posts, {len(comment_cache)} comment groups") return post_cache, comment_cache def _invalidate_cache(): """Invalidate the cache to force refresh""" global cache_timestamp cache_timestamp = 0 def _validate_user_settings(settings_str): """Validate and sanitize user settings JSON""" try: if not settings_str: return {} settings = json.loads(settings_str) if not isinstance(settings, dict): logger.warning("User settings must be a JSON object") return {} # Validate specific fields validated = {} # Filter set validation if 'filter_set' in settings: filter_set = settings['filter_set'] if isinstance(filter_set, str) and _is_safe_filterset(filter_set): validated['filter_set'] = filter_set # Communities validation if 'communities' in settings: communities = settings['communities'] if isinstance(communities, list): # Validate each community name safe_communities = [] for community in communities: if isinstance(community, str) and len(community) <= MAX_COMMUNITY_NAME_LENGTH and re.match(r'^[a-zA-Z0-9_-]+$', community): safe_communities.append(community) validated['communities'] = safe_communities # Experience settings validation if 'experience' in settings: exp = settings['experience'] if isinstance(exp, dict): safe_exp = {} bool_fields = ['infinite_scroll', 'auto_refresh', 'push_notifications', 'dark_patterns_opt_in'] for field in bool_fields: if field in exp and isinstance(exp[field], bool): safe_exp[field] = exp[field] validated['experience'] = safe_exp return validated except (json.JSONDecodeError, TypeError) as e: logger.warning(f"Invalid user settings JSON: {e}") return {} # Add custom Jinja filters @app.template_filter('nl2br') def nl2br_filter(text): """Convert newlines to
tags""" if not text: return text return text.replace('\n', '
\n') @login_manager.user_loader def load_user(user_id): """Load user by ID for Flask-Login""" return user_service.get_user_by_id(user_id) # ============================================================ # STATIC CONTENT ROUTES # ============================================================ @app.before_request def check_first_user(): """Check if any users exist, redirect to admin creation if not""" # Skip for static files and auth routes if request.endpoint and ( request.endpoint.startswith('static') or request.endpoint in ['login', 'signup', 'admin_setup', 'serve_theme', 'serve_logo'] ): return # Skip if user is already authenticated if current_user.is_authenticated: return # Check if any users exist try: user_count = User.query.count() if user_count == 0: return redirect(url_for('admin_setup')) except Exception as e: # If database is not ready, skip check logger.warning(f"Database not ready for user count check: {e}") pass @app.route('/') def index(): """Serve the main feed page""" if current_user.is_authenticated: # Load user settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except (json.JSONDecodeError, TypeError) as e: logger.warning(f"Invalid user settings JSON for user {current_user.id}: {e}") user_settings = {} return render_template('dashboard.html', user_settings=user_settings) else: # Redirect non-authenticated users to login return redirect(url_for('login')) @app.route('/feed/') def feed_content(filterset='no_filter'): """Serve filtered feed content""" # Validate filterset to prevent directory traversal if not _is_safe_filterset(filterset): logger.warning(f"Invalid filterset requested: {filterset}") abort(404) # Additional path validation safe_path = os.path.normpath(f'active_html/{filterset}/index.html') if not safe_path.startswith('active_html/'): logger.warning(f"Path traversal attempt detected: {filterset}") abort(404) return send_from_directory(f'active_html/{filterset}', 'index.html') def load_platform_config(): """Load platform configuration""" try: with open('platform_config.json', 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError, IOError) as e: logger.warning(f"Could not load platform config: {e}") return {"platforms": {}, "collection_targets": []} def get_display_name_for_source(platform, source, platform_config): """Get proper display name for a source based on platform""" if not platform_config or 'platforms' not in platform_config: return source platform_info = platform_config['platforms'].get(platform, {}) # For platforms with communities, find the community info if platform_info.get('supports_communities'): for community in platform_info.get('communities', []): if community['id'] == source: return community['display_name'] # Fallback to prefix + source for Reddit-like platforms prefix = platform_info.get('prefix', '') return f"{prefix}{source}" if source else platform_info.get('name', platform) else: # For platforms without communities, use the platform name return platform_info.get('name', platform) @app.route('/api/posts') def api_posts(): """API endpoint to get posts data with pagination and filtering""" try: # Load platform configuration platform_config = load_platform_config() # Get query parameters page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE)) community = request.args.get('community', '') platform = request.args.get('platform', '') # Use cached data for better performance cached_posts, cached_comments = _load_posts_cache() posts = [] # Process cached posts for post_uuid, post_data in cached_posts.items(): # Apply community filter if community and post_data.get('source', '').lower() != community.lower(): continue # Apply platform filter if platform and post_data.get('platform', '').lower() != platform.lower(): continue # Get comment count from cache comment_count = len(cached_comments.get(post_uuid, [])) # Get proper display name for source source_display = get_display_name_for_source( post_data.get('platform', ''), post_data.get('source', ''), platform_config ) # Create post object with actual title post = { 'id': post_uuid, 'title': post_data.get('title', 'Untitled'), 'author': post_data.get('author', 'Unknown'), 'platform': post_data.get('platform', 'unknown'), 'score': post_data.get('score', 0), 'timestamp': post_data.get('timestamp', 0), 'url': f'/post/{post_uuid}', 'comments_count': comment_count, 'content_preview': (post_data.get('content', '') or '')[:200] + '...' if post_data.get('content') else '', 'source': post_data.get('source', ''), 'source_display': source_display, 'tags': post_data.get('tags', []), 'external_url': post_data.get('url', '') } posts.append(post) # Sort by timestamp (newest first) posts.sort(key=lambda x: x['timestamp'], reverse=True) # Calculate pagination total_posts = len(posts) start_idx = (page - 1) * per_page end_idx = start_idx + per_page paginated_posts = posts[start_idx:end_idx] total_pages = (total_posts + per_page - 1) // per_page has_next = page < total_pages has_prev = page > 1 return { 'posts': paginated_posts, 'pagination': { 'current_page': page, 'total_pages': total_pages, 'total_posts': total_posts, 'per_page': per_page, 'has_next': has_next, 'has_prev': has_prev } } except Exception as e: print(f"Error loading posts: {e}") return {'posts': [], 'error': str(e), 'pagination': {'current_page': 1, 'total_pages': 0, 'total_posts': 0, 'per_page': DEFAULT_PAGE_SIZE, 'has_next': False, 'has_prev': False}} @app.route('/api/platforms') def api_platforms(): """API endpoint to get platform configuration and available communities""" try: platform_config = load_platform_config() # Build community list for filtering UI communities = [] posts_dir = Path('data/posts') source_counts = {} # Count posts per source to show actual available communities for post_file in posts_dir.glob('*.json'): try: with open(post_file, 'r') as f: post_data = json.load(f) platform = post_data.get('platform', 'unknown') source = post_data.get('source', '') key = f"{platform}:{source}" source_counts[key] = source_counts.get(key, 0) + 1 except: continue # Build community list from actual data and platform config for key, count in source_counts.items(): platform, source = key.split(':', 1) # Get display info from platform config platform_info = platform_config.get('platforms', {}).get(platform, {}) community_info = None if platform_info.get('supports_communities'): for community in platform_info.get('communities', []): if community['id'] == source: community_info = community break # Create community entry if community_info: community_entry = { 'platform': platform, 'id': source, 'name': community_info['name'], 'display_name': community_info['display_name'], 'icon': community_info.get('icon', platform_info.get('icon', '📄')), 'count': count, 'description': community_info.get('description', '') } else: # Fallback for sources not in config display_name = get_display_name_for_source(platform, source, platform_config) community_entry = { 'platform': platform, 'id': source, 'name': source or platform, 'display_name': display_name, 'icon': platform_info.get('icon', '📄'), 'count': count, 'description': f"Posts from {display_name}" } communities.append(community_entry) # Sort communities by count (most posts first) communities.sort(key=lambda x: x['count'], reverse=True) return { 'platforms': platform_config.get('platforms', {}), 'communities': communities, 'total_communities': len(communities) } except Exception as e: print(f"Error loading platform configuration: {e}") return { 'platforms': {}, 'communities': [], 'total_communities': 0, 'error': str(e) } @app.route('/api/content-timestamp') def api_content_timestamp(): """API endpoint to get the last content update timestamp for auto-refresh""" try: posts_dir = Path('data/posts') if not posts_dir.exists(): return jsonify({'timestamp': 0}) # Get the most recent modification time of any post file latest_mtime = 0 for post_file in posts_dir.glob('*.json'): mtime = post_file.stat().st_mtime if mtime > latest_mtime: latest_mtime = mtime return jsonify({'timestamp': latest_mtime}) except Exception as e: logger.error(f"Error getting content timestamp: {e}") return jsonify({'error': 'Failed to get content timestamp'}), 500 @app.route('/post/') def post_detail(post_id): """Serve individual post detail page with modern theme""" try: # Load platform configuration platform_config = load_platform_config() # Use cached data for better performance cached_posts, cached_comments = _load_posts_cache() # Get post data from cache post_data = cached_posts.get(post_id) if not post_data: return render_template('404.html'), 404 # Add source display name post_data['source_display'] = get_display_name_for_source( post_data.get('platform', ''), post_data.get('source', ''), platform_config ) # Get comments from cache comments = cached_comments.get(post_id, []) # Sort comments by timestamp comments.sort(key=lambda x: x.get('timestamp', 0)) # Load user settings if authenticated user_settings = {} if current_user.is_authenticated: try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} return render_template('post_detail.html', post=post_data, comments=comments, user_settings=user_settings) except Exception as e: print(f"Error loading post {post_id}: {e}") return render_template('404.html'), 404 @app.route('/themes/') def serve_theme(filename): """Serve theme files (CSS, JS)""" # Validate filename to prevent directory traversal if not _is_safe_path(filename) or '..' in filename: logger.warning(f"Unsafe theme file requested: {filename}") abort(404) return send_from_directory('themes', filename) @app.route('/logo.png') def serve_logo(): """Serve logo""" return send_from_directory('.', 'logo.png') @app.route('/static/') def serve_static(filename): """Serve static files (avatars, etc.)""" # Validate filename to prevent directory traversal if not _is_safe_path(filename) or '..' in filename: logger.warning(f"Unsafe static file requested: {filename}") abort(404) return send_from_directory('static', filename) # ============================================================ # AUTHENTICATION ROUTES # ============================================================ @app.route('/login', methods=['GET', 'POST']) def login(): """Login page""" if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') remember = request.form.get('remember', False) == 'on' if not user_service: flash('User service not available', 'error') return render_template('login.html') user = user_service.authenticate(username, password) if user: login_user(user, remember=remember) flash(f'Welcome back, {user.username}!', 'success') # Redirect to next page or home next_page = request.args.get('next') return redirect(next_page) if next_page else redirect(url_for('index')) else: flash('Invalid username or password', 'error') return render_template('login.html') # Auth0 Routes @app.route('/auth0/login') def auth0_login(): """Redirect to Auth0 for authentication""" redirect_uri = url_for('auth0_callback', _external=True) return auth0.authorize_redirect(redirect_uri) @app.route('/auth0/callback') def auth0_callback(): """Handle Auth0 callback and create/login user""" try: # Get the access token from Auth0 token = auth0.authorize_access_token() # Get user info from Auth0 user_info = token.get('userinfo') if not user_info: user_info = auth0.parse_id_token(token) # Extract user details auth0_id = user_info.get('sub') email = user_info.get('email') username = user_info.get('nickname') or user_info.get('preferred_username') or email.split('@')[0] if not auth0_id or not email: flash('Unable to get user information from Auth0', 'error') return redirect(url_for('login')) # Check if user exists with this Auth0 ID user = user_service.get_user_by_auth0_id(auth0_id) if not user: # Check if user exists with this email (for account linking) existing_user = user_service.get_user_by_email(email) if existing_user: # Link existing account to Auth0 user_service.link_auth0_account(existing_user.id, auth0_id) user = existing_user flash(f'Account linked successfully! Welcome back, {user.username}!', 'success') else: # Create new user # Generate unique username if needed base_username = username[:MAX_USERNAME_LENGTH-3] # Leave room for suffix unique_username = base_username counter = 1 while user_service.username_exists(unique_username): unique_username = f"{base_username}_{counter}" counter += 1 user_id = user_service.create_user( username=unique_username, email=email, password=None, # No password for OAuth users is_admin=False, auth0_id=auth0_id ) if user_id: user = user_service.get_user_by_id(user_id) flash(f'Account created successfully! Welcome, {user.username}!', 'success') else: flash('Failed to create user account', 'error') return redirect(url_for('login')) else: flash(f'Welcome back, {user.username}!', 'success') # Log in the user if user: login_user(user, remember=True) # Store Auth0 info in session for future use session['auth0_user_info'] = user_info # Redirect to next page or home next_page = request.args.get('next') return redirect(next_page) if next_page else redirect(url_for('index')) except Exception as e: logger.error(f"Auth0 callback error: {e}") flash('Authentication failed. Please try again.', 'error') return redirect(url_for('login')) @app.route('/auth0/logout') @login_required def auth0_logout(): """Logout from Auth0 and local session""" # Clear session session.clear() logout_user() # Build Auth0 logout URL domain = app.config['AUTH0_DOMAIN'] client_id = app.config['AUTH0_CLIENT_ID'] return_to = url_for('index', _external=True) logout_url = f'https://{domain}/v2/logout?' + urlencode({ 'returnTo': return_to, 'client_id': client_id }, quote_via=quote_plus) return redirect(logout_url) @app.route('/admin-setup', methods=['GET', 'POST']) def admin_setup(): """Create first admin user""" # Check if users already exist try: user_count = User.query.count() if user_count > 0: flash('Admin user already exists.', 'info') return redirect(url_for('login')) except Exception as e: logger.warning(f"Database error checking existing users: {e}") pass if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') password_confirm = request.form.get('password_confirm') # Validation if not username or not email or not password: flash('All fields are required', 'error') return render_template('admin_setup.html') if password != password_confirm: flash('Passwords do not match', 'error') return render_template('admin_setup.html') if len(password) < MIN_PASSWORD_LENGTH: flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error') return render_template('admin_setup.html') # Create admin user user_id = user_service.create_user(username, email, password, is_admin=True) if user_id: flash('Admin account created successfully! Please log in.', 'success') return redirect(url_for('login')) else: flash('Error creating admin account. Please try again.', 'error') return render_template('admin_setup.html') @app.route('/signup', methods=['GET', 'POST']) def signup(): """Signup page""" if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') password_confirm = request.form.get('password_confirm') if not user_service: flash('User service not available', 'error') return render_template('signup.html') # Validation if not username or not email or not password: flash('All fields are required', 'error') return render_template('signup.html') if password != password_confirm: flash('Passwords do not match', 'error') return render_template('signup.html') if len(password) < MIN_PASSWORD_LENGTH: flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error') return render_template('signup.html') if user_service.username_exists(username): flash('Username already taken', 'error') return render_template('signup.html') if user_service.email_exists(email): flash('Email already registered', 'error') return render_template('signup.html') # Create user user_id = user_service.create_user(username, email, password) if user_id: flash('Account created successfully! Please log in.', 'success') return redirect(url_for('login')) else: flash('Error creating account. Please try again.', 'error') return render_template('signup.html') @app.route('/logout') @login_required def logout(): """Logout current user""" logout_user() flash('You have been logged out.', 'info') return redirect(url_for('index')) @app.route('/settings') @login_required def settings(): """Main settings page""" # Load user settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} # Load available filter sets try: with open('filtersets.json', 'r') as f: filter_sets = json.load(f) except: filter_sets = {} return render_template('settings.html', user=current_user, user_settings=user_settings, filter_sets=filter_sets) @app.route('/settings/profile', methods=['GET', 'POST']) @login_required def settings_profile(): """Profile settings page""" if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') default_avatar = request.form.get('default_avatar') # Validation if not username or not email: flash('Username and email are required', 'error') return render_template('settings_profile.html', user=current_user) # Check if username is taken by another user if username != current_user.username and user_service.username_exists(username): flash('Username already taken', 'error') return render_template('settings_profile.html', user=current_user) # Check if email is taken by another user if email != current_user.email and user_service.email_exists(email): flash('Email already registered', 'error') return render_template('settings_profile.html', user=current_user) # Update user current_user.username = username current_user.email = email # Handle default avatar selection if default_avatar and default_avatar.startswith('default_'): current_user.profile_picture_url = f"/static/default-avatars/{default_avatar}.png" db.session.commit() flash('Profile updated successfully', 'success') return redirect(url_for('settings')) # Available default avatars default_avatars = [ {'id': 'default_1', 'name': 'Gradient Blue', 'bg': 'linear-gradient(135deg, #667eea 0%, #764ba2 100%)'}, {'id': 'default_2', 'name': 'Gradient Green', 'bg': 'linear-gradient(135deg, #4facfe 0%, #00f2fe 100%)'}, {'id': 'default_3', 'name': 'Gradient Orange', 'bg': 'linear-gradient(135deg, #fa709a 0%, #fee140 100%)'}, {'id': 'default_4', 'name': 'Gradient Purple', 'bg': 'linear-gradient(135deg, #a8edea 0%, #fed6e3 100%)'}, {'id': 'default_5', 'name': 'Brand Colors', 'bg': 'linear-gradient(135deg, #4db6ac 0%, #26a69a 100%)'}, {'id': 'default_6', 'name': 'Sunset', 'bg': 'linear-gradient(135deg, #ff7e5f 0%, #feb47b 100%)'}, ] return render_template('settings_profile.html', user=current_user, default_avatars=default_avatars) @app.route('/settings/communities', methods=['GET', 'POST']) @login_required def settings_communities(): """Community/source selection settings""" if request.method == 'POST': # Get selected communities selected_communities = request.form.getlist('communities') # Load current settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} # Update communities user_settings['communities'] = selected_communities # Save settings current_user.settings = json.dumps(user_settings) db.session.commit() flash('Community preferences updated', 'success') return redirect(url_for('settings')) # Load current settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} selected_communities = user_settings.get('communities', []) except: selected_communities = [] # Available communities available_communities = [ {'id': 'programming', 'name': 'Programming', 'platform': 'reddit'}, {'id': 'python', 'name': 'Python', 'platform': 'reddit'}, {'id': 'technology', 'name': 'Technology', 'platform': 'reddit'}, {'id': 'hackernews', 'name': 'Hacker News', 'platform': 'hackernews'}, {'id': 'lobsters', 'name': 'Lobsters', 'platform': 'lobsters'}, {'id': 'stackoverflow', 'name': 'Stack Overflow', 'platform': 'stackoverflow'}, ] return render_template('settings_communities.html', user=current_user, available_communities=available_communities, selected_communities=selected_communities) @app.route('/settings/filters', methods=['GET', 'POST']) @login_required def settings_filters(): """Filter settings page""" if request.method == 'POST': selected_filter = request.form.get('filter_set', 'no_filter') # Load and validate current settings user_settings = _validate_user_settings(current_user.settings) # Validate new filter setting if _is_safe_filterset(selected_filter): user_settings['filter_set'] = selected_filter else: flash('Invalid filter selection', 'error') return redirect(url_for('settings')) # Save validated settings try: current_user.settings = json.dumps(user_settings) db.session.commit() flash('Filter settings updated successfully', 'success') except Exception as e: db.session.rollback() logger.error(f"Error saving filter settings for user {current_user.id}: {e}") flash('Error saving settings', 'error') return redirect(url_for('settings')) # Load current settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} current_filter = user_settings.get('filter_set', 'no_filter') # Load available filter sets filter_sets = {} try: with open('filtersets.json', 'r') as f: filter_sets = json.load(f) except: filter_sets = {} return render_template('settings_filters.html', user=current_user, filter_sets=filter_sets, current_filter=current_filter) @app.route('/settings/experience', methods=['GET', 'POST']) @login_required def settings_experience(): """Experience and behavioral settings page - opt-in addictive features""" if request.method == 'POST': # Load current settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} # Get experience settings with defaults (all opt-in, so default to False) user_settings['experience'] = { 'infinite_scroll': request.form.get('infinite_scroll') == 'on', 'auto_refresh': request.form.get('auto_refresh') == 'on', 'push_notifications': request.form.get('push_notifications') == 'on', 'dark_patterns_opt_in': request.form.get('dark_patterns_opt_in') == 'on' } # Save settings current_user.settings = json.dumps(user_settings) db.session.commit() flash('Experience settings updated successfully', 'success') return redirect(url_for('settings')) # Load current settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} experience_settings = user_settings.get('experience', { 'infinite_scroll': False, 'auto_refresh': False, 'push_notifications': False, 'dark_patterns_opt_in': False }) return render_template('settings_experience.html', user=current_user, experience_settings=experience_settings) @app.route('/upload-avatar', methods=['POST']) @login_required def upload_avatar(): """Upload profile picture""" if 'avatar' not in request.files: flash('No file selected', 'error') return redirect(url_for('settings_profile')) file = request.files['avatar'] if file.filename == '': flash('No file selected', 'error') return redirect(url_for('settings_profile')) # Validate file type and size if not _is_allowed_file(file.filename): flash('Invalid file type. Please upload PNG, JPG, or GIF', 'error') return redirect(url_for('settings_profile')) # Check file size (Flask's MAX_CONTENT_LENGTH handles this too, but double-check) if hasattr(file, 'content_length') and file.content_length > app.config['MAX_CONTENT_LENGTH']: flash('File too large. Maximum size is 16MB', 'error') return redirect(url_for('settings_profile')) # Validate and secure filename filename = secure_filename(file.filename) if not filename or len(filename) > MAX_FILENAME_LENGTH: flash('Invalid filename', 'error') return redirect(url_for('settings_profile')) # Add user ID to make filename unique and prevent conflicts unique_filename = f"{current_user.id}_{filename}" # Ensure upload directory exists and is secure upload_dir = os.path.abspath(UPLOAD_FOLDER) os.makedirs(upload_dir, exist_ok=True) upload_path = os.path.join(upload_dir, unique_filename) # Final security check - ensure path is within upload directory if not os.path.abspath(upload_path).startswith(upload_dir): logger.warning(f"Path traversal attempt in file upload: {upload_path}") flash('Invalid file path', 'error') return redirect(url_for('settings_profile')) try: file.save(upload_path) logger.info(f"File uploaded successfully: {unique_filename} by user {current_user.id}") except Exception as e: logger.error(f"Error saving uploaded file: {e}") flash('Error saving file', 'error') return redirect(url_for('settings_profile')) # Update user profile current_user.profile_picture_url = f"/static/avatars/{unique_filename}" db.session.commit() flash('Profile picture updated successfully', 'success') return redirect(url_for('settings_profile')) @app.route('/profile') @login_required def profile(): """User profile page""" return render_template('profile.html', user=current_user) # ============================================================ # ADMIN ROUTES # ============================================================ @app.route('/admin') @login_required def admin_panel(): """Admin panel - user management""" if not current_user.is_admin: flash('Access denied. Admin privileges required.', 'error') return redirect(url_for('index')) if not user_service: flash('User service not available', 'error') return redirect(url_for('index')) users = user_service.get_all_users() return render_template('admin.html', users=users) @app.route('/admin/user//delete', methods=['POST']) @login_required def admin_delete_user(user_id): """Delete user (admin only)""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) # Prevent self-deletion if current_user.id == user_id: flash('You cannot delete your own account!', 'error') return redirect(url_for('admin_panel')) user = user_service.get_user_by_id(user_id) if user: username = user.username if user_service.delete_user(user_id): flash(f'User {username} has been deleted.', 'success') logger.info(f"Admin {current_user.id} deleted user {username} ({user_id})") else: flash('Error deleting user', 'error') logger.error(f"Failed to delete user {user_id}") else: flash('User not found', 'error') return redirect(url_for('admin_panel')) @app.route('/admin/user//toggle-admin', methods=['POST']) @login_required def admin_toggle_admin(user_id): """Toggle user admin status""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) target_user = user_service.get_user_by_id(user_id) if target_user: new_status = not target_user.is_admin # Toggle admin status user_service.update_user_admin_status(user_id, new_status) flash('Admin status updated', 'success') else: flash('User not found', 'error') return redirect(url_for('admin_panel')) # This route is duplicate - removed in favor of the UUID-based route above # This route is duplicate - removed in favor of the UUID-based route above @app.route('/admin/regenerate_content', methods=['POST']) @login_required def admin_regenerate_content(): """Regenerate all HTML content""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('admin_panel')) try: import subprocess import shlex # Secure subprocess execution with absolute paths and validation script_path = os.path.abspath('generate_html.py') if not os.path.exists(script_path): flash('Content generation script not found', 'error') return redirect(url_for('admin_panel')) # Use absolute python path and validate arguments python_exe = os.path.abspath(os.sys.executable) cmd = [python_exe, script_path, '--filterset', 'no_filter', '--theme', 'vanilla-js'] # Execute with timeout and security restrictions result = subprocess.run( cmd, capture_output=True, text=True, cwd=os.path.abspath('.'), timeout=300, # 5 minute timeout check=False ) if result.returncode == 0: flash('Content regenerated successfully', 'success') logger.info(f"Content regenerated by admin user {current_user.id}") # Invalidate cache since content was regenerated _invalidate_cache() else: flash('Error regenerating content', 'error') logger.error(f"Content regeneration failed: {result.stderr}") except subprocess.TimeoutExpired: flash('Content regeneration timed out', 'error') logger.error("Content regeneration timed out") except Exception as e: flash(f'Error regenerating content: {str(e)}', 'error') logger.error(f"Content regeneration error: {e}") return redirect(url_for('admin_panel')) @app.route('/admin/clear_cache', methods=['POST']) @login_required def admin_clear_cache(): """Clear application cache""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('admin_panel')) try: # Clear any cache directories or temp files import shutil import os cache_dirs = ['cache', 'temp'] for cache_dir in cache_dirs: if os.path.exists(cache_dir): shutil.rmtree(cache_dir) # Clear application cache _invalidate_cache() flash('Cache cleared successfully', 'success') logger.info(f"Cache cleared by admin user {current_user.id}") except Exception as e: flash(f'Error clearing cache: {str(e)}', 'error') logger.error(f"Cache clearing error: {e}") return redirect(url_for('admin_panel')) @app.route('/admin/backup_data', methods=['POST']) @login_required def admin_backup_data(): """Create backup of application data""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('admin_panel')) try: import shutil import os from datetime import datetime timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_name = f'balanceboard_backup_{timestamp}' # Create backup directory backup_dir = f'backups/{backup_name}' os.makedirs(backup_dir, exist_ok=True) # Copy important directories dirs_to_backup = ['data', 'templates', 'themes', 'static'] for dir_name in dirs_to_backup: if os.path.exists(dir_name): shutil.copytree(dir_name, f'{backup_dir}/{dir_name}') # Copy important files files_to_backup = ['app.py', 'models.py', 'database.py', 'filtersets.json'] for file_name in files_to_backup: if os.path.exists(file_name): shutil.copy2(file_name, backup_dir) flash(f'Backup created: {backup_name}', 'success') except Exception as e: flash(f'Error creating backup: {str(e)}', 'error') return redirect(url_for('admin_panel')) # ============================================================ # POLLING MANAGEMENT ROUTES # ============================================================ @app.route('/admin/polling') @login_required def admin_polling(): """Admin polling management page""" if not current_user.is_admin: flash('Access denied. Admin privileges required.', 'error') return redirect(url_for('index')) from models import PollSource, PollLog from polling_service import polling_service # Get all poll sources with recent logs sources = PollSource.query.order_by(PollSource.platform, PollSource.display_name).all() # Get scheduler status scheduler_status = polling_service.get_status() # Load platform config for available sources platform_config = load_platform_config() return render_template('admin_polling.html', sources=sources, scheduler_status=scheduler_status, platform_config=platform_config) @app.route('/admin/polling/add', methods=['POST']) @login_required def admin_polling_add(): """Add a new poll source""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource platform = request.form.get('platform') source_id = request.form.get('source_id') display_name = request.form.get('display_name') poll_interval = int(request.form.get('poll_interval', 60)) if not platform or not source_id or not display_name: flash('Missing required fields', 'error') return redirect(url_for('admin_polling')) # Check if source already exists existing = PollSource.query.filter_by(platform=platform, source_id=source_id).first() if existing: flash(f'Source {platform}:{source_id} already exists', 'warning') return redirect(url_for('admin_polling')) # Create new source source = PollSource( platform=platform, source_id=source_id, display_name=display_name, poll_interval_minutes=poll_interval, enabled=True, created_by=current_user.id ) db.session.add(source) db.session.commit() flash(f'Added polling source: {display_name}', 'success') logger.info(f"Admin {current_user.id} added poll source {platform}:{source_id}") return redirect(url_for('admin_polling')) @app.route('/admin/polling//toggle', methods=['POST']) @login_required def admin_polling_toggle(source_id): """Toggle a poll source on/off""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource source = PollSource.query.get(source_id) if not source: flash('Source not found', 'error') return redirect(url_for('admin_polling')) source.enabled = not source.enabled db.session.commit() status = 'enabled' if source.enabled else 'disabled' flash(f'Polling {status} for {source.display_name}', 'success') return redirect(url_for('admin_polling')) @app.route('/admin/polling//update', methods=['POST']) @login_required def admin_polling_update(source_id): """Update poll source configuration""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource source = PollSource.query.get(source_id) if not source: flash('Source not found', 'error') return redirect(url_for('admin_polling')) poll_interval = request.form.get('poll_interval') if poll_interval: source.poll_interval_minutes = int(poll_interval) db.session.commit() flash(f'Updated interval for {source.display_name}', 'success') return redirect(url_for('admin_polling')) @app.route('/admin/polling//poll-now', methods=['POST']) @login_required def admin_polling_poll_now(source_id): """Manually trigger polling for a source""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource from polling_service import polling_service source = PollSource.query.get(source_id) if not source: flash('Source not found', 'error') return redirect(url_for('admin_polling')) # Trigger polling in background try: polling_service.poll_now(source_id) flash(f'Polling started for {source.display_name}', 'success') except Exception as e: flash(f'Error starting poll: {str(e)}', 'error') logger.error(f"Error triggering poll for {source_id}: {e}") return redirect(url_for('admin_polling')) @app.route('/admin/polling//delete', methods=['POST']) @login_required def admin_polling_delete(source_id): """Delete a poll source""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource source = PollSource.query.get(source_id) if not source: flash('Source not found', 'error') return redirect(url_for('admin_polling')) display_name = source.display_name db.session.delete(source) db.session.commit() flash(f'Deleted polling source: {display_name}', 'success') logger.info(f"Admin {current_user.id} deleted poll source {source_id}") return redirect(url_for('admin_polling')) @app.route('/admin/polling//logs') @login_required def admin_polling_logs(source_id): """View logs for a specific poll source""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource, PollLog source = PollSource.query.get(source_id) if not source: flash('Source not found', 'error') return redirect(url_for('admin_polling')) # Get recent logs (limit to 50) logs = source.logs.limit(50).all() return render_template('admin_polling_logs.html', source=source, logs=logs) # ============================================================ # ERROR HANDLERS # ============================================================ @app.errorhandler(404) def not_found(e): """404 page""" return render_template('404.html'), 404 @app.errorhandler(500) def server_error(e): """500 page""" return render_template('500.html'), 500 # ============================================================ # INITIALIZATION # ============================================================ if __name__ == '__main__': print("✓ BalanceBoard starting...") print("✓ Database: PostgreSQL with SQLAlchemy") print("✓ Password hashing: bcrypt") print("✓ Authentication: Flask-Login") app.run(host='0.0.0.0', port=DEFAULT_PORT, debug=True)