""" BalanceBoard Web Application Flask server with user authentication and content serving. """ import os import re import logging import time from pathlib import Path from werkzeug.utils import secure_filename from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, abort, session, jsonify from flask_login import LoginManager, login_user, logout_user, login_required, current_user from dotenv import load_dotenv from functools import lru_cache from collections import defaultdict from authlib.integrations.flask_client import OAuth from urllib.parse import quote_plus, urlencode from database import init_db, db from models import User, bcrypt from user_service import UserService import json # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('app.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Initialize Flask app app = Flask(__name__, static_folder='themes', template_folder='templates') app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production') app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size app.config['ALLOW_ANONYMOUS_ACCESS'] = os.getenv('ALLOW_ANONYMOUS_ACCESS', 'true').lower() == 'true' # Application branding configuration app.config['APP_NAME'] = os.getenv('APP_NAME', 'BalanceBoard') app.config['LOGO_PATH'] = os.getenv('LOGO_PATH', 'logo.png') # Auth0 Configuration app.config['AUTH0_DOMAIN'] = os.getenv('AUTH0_DOMAIN', '') app.config['AUTH0_CLIENT_ID'] = os.getenv('AUTH0_CLIENT_ID', '') app.config['AUTH0_CLIENT_SECRET'] = os.getenv('AUTH0_CLIENT_SECRET', '') app.config['AUTH0_AUDIENCE'] = os.getenv('AUTH0_AUDIENCE', '') # Configuration constants # Note: ALLOWED_FILTERSETS will be dynamically loaded from filter_engine ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'} UPLOAD_FOLDER = 'static/avatars' MAX_FILENAME_LENGTH = 100 DEFAULT_PORT = 5021 DEFAULT_PAGE_SIZE = 20 MIN_PASSWORD_LENGTH = 8 MAX_USERNAME_LENGTH = 80 MAX_EMAIL_LENGTH = 120 MAX_COMMUNITY_NAME_LENGTH = 100 # Initialize database init_db(app) # Initialize bcrypt bcrypt.init_app(app) # Initialize Flask-Login login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' login_manager.login_message = 'Please log in to access this page.' # Initialize user service user_service = UserService() # Initialize polling service from polling_service import polling_service polling_service.init_app(app) polling_service.start() # Initialize filter engine from filter_pipeline import FilterEngine filter_engine = FilterEngine.get_instance() logger.info(f"FilterEngine initialized with {len(filter_engine.get_available_filtersets())} filtersets") # Initialize OAuth for Auth0 oauth = OAuth(app) auth0 = oauth.register( 'auth0', client_id=app.config['AUTH0_CLIENT_ID'], client_secret=app.config['AUTH0_CLIENT_SECRET'], server_metadata_url=f'https://{app.config["AUTH0_DOMAIN"]}/.well-known/openid_configuration', client_kwargs={ 'scope': 'openid profile email', } ) # Cache for posts and comments - improves performance post_cache = {} comment_cache = defaultdict(list) cache_timestamp = 0 CACHE_DURATION = 300 # 5 minutes # Security helper functions def _is_safe_filterset(filterset): """Validate filterset name for security""" if not filterset or not isinstance(filterset, str): return False # Check against available filtersets from filter_engine allowed_filtersets = set(filter_engine.get_available_filtersets()) return filterset in allowed_filtersets and re.match(r'^[a-zA-Z0-9_-]+$', filterset) def _is_safe_path(path): """Validate file path for security""" if not path or not isinstance(path, str): return False # Check for directory traversal attempts if '..' in path or path.startswith('/') or '\\' in path: return False # Only allow alphanumeric, dots, hyphens, underscores, and forward slashes return re.match(r'^[a-zA-Z0-9._/-]+$', path) is not None def _is_allowed_file(filename): """Check if file extension is allowed""" return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def _load_posts_cache(): """Load and cache posts data for better performance""" global post_cache, comment_cache, cache_timestamp current_time = time.time() if current_time - cache_timestamp < CACHE_DURATION and post_cache: return post_cache, comment_cache # Clear existing cache post_cache.clear() comment_cache.clear() posts_dir = Path('data/posts') comments_dir = Path('data/comments') # Load all posts if posts_dir.exists(): for post_file in posts_dir.glob('*.json'): try: with open(post_file, 'r') as f: post_data = json.load(f) post_uuid = post_data.get('uuid') if post_uuid: post_cache[post_uuid] = post_data except (json.JSONDecodeError, IOError) as e: logger.debug(f"Error reading post file {post_file}: {e}") continue # Load all comments and group by post UUID if comments_dir.exists(): for comment_file in comments_dir.glob('*.json'): try: with open(comment_file, 'r') as f: comment_data = json.load(f) post_uuid = comment_data.get('post_uuid') if post_uuid: comment_cache[post_uuid].append(comment_data) except (json.JSONDecodeError, IOError) as e: logger.debug(f"Error reading comment file {comment_file}: {e}") continue cache_timestamp = current_time logger.info(f"Cache refreshed: {len(post_cache)} posts, {len(comment_cache)} comment groups") return post_cache, comment_cache def _invalidate_cache(): """Invalidate the cache to force refresh""" global cache_timestamp cache_timestamp = 0 def _validate_user_settings(settings_str): """Validate and sanitize user settings JSON""" try: if not settings_str: return {} settings = json.loads(settings_str) if not isinstance(settings, dict): logger.warning("User settings must be a JSON object") return {} # Validate specific fields validated = {} # Filter set validation if 'filter_set' in settings: filter_set = settings['filter_set'] if isinstance(filter_set, str) and _is_safe_filterset(filter_set): validated['filter_set'] = filter_set # Communities validation if 'communities' in settings: communities = settings['communities'] if isinstance(communities, list): # Validate each community name safe_communities = [] for community in communities: if isinstance(community, str) and len(community) <= MAX_COMMUNITY_NAME_LENGTH and re.match(r'^[a-zA-Z0-9_-]+$', community): safe_communities.append(community) validated['communities'] = safe_communities # Experience settings validation if 'experience' in settings: exp = settings['experience'] if isinstance(exp, dict): safe_exp = {} bool_fields = ['infinite_scroll', 'auto_refresh', 'push_notifications', 'dark_patterns_opt_in', 'time_filter_enabled'] for field in bool_fields: if field in exp and isinstance(exp[field], bool): safe_exp[field] = exp[field] # Handle time_filter_days as integer if 'time_filter_days' in exp and isinstance(exp['time_filter_days'], int) and exp['time_filter_days'] > 0: safe_exp['time_filter_days'] = exp['time_filter_days'] validated['experience'] = safe_exp return validated except (json.JSONDecodeError, TypeError) as e: logger.warning(f"Invalid user settings JSON: {e}") return {} # Add custom Jinja filters @app.template_filter('nl2br') def nl2br_filter(text): """Convert newlines to
tags""" if not text: return text return text.replace('\n', '
\n') @login_manager.user_loader def load_user(user_id): """Load user by ID for Flask-Login""" return user_service.get_user_by_id(user_id) # ============================================================ # STATIC CONTENT ROUTES # ============================================================ @app.before_request def check_first_user(): """Check if any users exist, redirect to admin creation if not""" # Skip for static files and auth routes if request.endpoint and ( request.endpoint.startswith('static') or request.endpoint in ['login', 'signup', 'admin_setup', 'serve_theme', 'serve_logo'] ): return # Skip if user is already authenticated if current_user.is_authenticated: return # Check if any users exist try: user_count = User.query.count() if user_count == 0: return redirect(url_for('admin_setup')) except Exception as e: # If database is not ready, skip check logger.warning(f"Database not ready for user count check: {e}") pass def calculate_quick_stats(): """Calculate quick stats for dashboard""" from datetime import datetime, timedelta cached_posts, _ = _load_posts_cache() # Calculate posts from today (last 24 hours) now = datetime.utcnow() today_start = now - timedelta(hours=24) today_timestamp = today_start.timestamp() posts_today = sum(1 for post in cached_posts.values() if post.get('timestamp', 0) >= today_timestamp) return { 'posts_today': posts_today, 'total_posts': len(cached_posts) } @app.route('/') def index(): """Serve the main feed page""" # Calculate stats quick_stats = calculate_quick_stats() if current_user.is_authenticated: # Load user settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except (json.JSONDecodeError, TypeError) as e: logger.warning(f"Invalid user settings JSON for user {current_user.id}: {e}") user_settings = {} return render_template('dashboard.html', user_settings=user_settings, quick_stats=quick_stats) else: # Check if anonymous access is allowed if app.config.get('ALLOW_ANONYMOUS_ACCESS', False): # Anonymous mode - allow browsing with default settings user_settings = { 'filter_set': 'no_filter', 'communities': [], 'experience': { 'infinite_scroll': False, 'auto_refresh': False, 'push_notifications': False, 'dark_patterns_opt_in': False, 'time_filter_enabled': False, 'time_filter_days': 7 } } return render_template('dashboard.html', user_settings=user_settings, anonymous=True, quick_stats=quick_stats) else: # Redirect non-authenticated users to login return redirect(url_for('login')) @app.route('/feed/') def feed_content(filterset='no_filter'): """Serve filtered feed content""" # Validate filterset to prevent directory traversal if not _is_safe_filterset(filterset): logger.warning(f"Invalid filterset requested: {filterset}") abort(404) # Additional path validation safe_path = os.path.normpath(f'active_html/{filterset}/index.html') if not safe_path.startswith('active_html/'): logger.warning(f"Path traversal attempt detected: {filterset}") abort(404) return send_from_directory(f'active_html/{filterset}', 'index.html') def load_platform_config(): """Load platform configuration""" try: with open('platform_config.json', 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError, IOError) as e: logger.warning(f"Could not load platform config: {e}") return {"platforms": {}, "collection_targets": []} def get_display_name_for_source(platform, source, platform_config): """Get proper display name for a source based on platform""" if not platform_config or 'platforms' not in platform_config: return source platform_info = platform_config['platforms'].get(platform, {}) # For platforms with communities, find the community info if platform_info.get('supports_communities'): for community in platform_info.get('communities', []): if community['id'] == source: return community['display_name'] # Fallback to prefix + source for Reddit-like platforms prefix = platform_info.get('prefix', '') return f"{prefix}{source}" if source else platform_info.get('name', platform) else: # For platforms without communities, use the platform name return platform_info.get('name', platform) @app.route('/api/posts') def api_posts(): """API endpoint to get posts data with pagination and filtering""" try: # Load platform configuration platform_config = load_platform_config() # Get query parameters page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE)) community = request.args.get('community', '') platform = request.args.get('platform', '') search_query = request.args.get('q', '').lower().strip() filter_override = request.args.get('filter', '') # Get user's filterset preference, community selections, and time filter filterset_name = 'no_filter' user_communities = [] time_filter_enabled = False time_filter_days = 7 if current_user.is_authenticated: try: user_settings = json.loads(current_user.settings) if current_user.settings else {} filterset_name = user_settings.get('filter_set', 'no_filter') user_communities = user_settings.get('communities', []) experience_settings = user_settings.get('experience', {}) time_filter_enabled = experience_settings.get('time_filter_enabled', False) time_filter_days = experience_settings.get('time_filter_days', 7) except: filterset_name = 'no_filter' user_communities = [] time_filter_enabled = False time_filter_days = 7 # Override filterset if specified in request (for sidebar filter switching) if filter_override and _is_safe_filterset(filter_override): filterset_name = filter_override # Use cached data for better performance cached_posts, cached_comments = _load_posts_cache() # Calculate time filter cutoff if enabled time_cutoff = None if time_filter_enabled: from datetime import datetime, timedelta cutoff_date = datetime.utcnow() - timedelta(days=time_filter_days) time_cutoff = cutoff_date.timestamp() # Collect raw posts for filtering raw_posts = [] for post_uuid, post_data in cached_posts.items(): # Apply time filter first if enabled if time_filter_enabled and time_cutoff: post_timestamp = post_data.get('timestamp', 0) if post_timestamp < time_cutoff: continue # Apply community filter (before filterset) if community and post_data.get('source', '').lower() != community.lower(): continue # Apply platform filter (before filterset) if platform and post_data.get('platform', '').lower() != platform.lower(): continue # Apply user's community preferences (before filterset) if user_communities: post_source = post_data.get('source', '').lower() post_platform = post_data.get('platform', '').lower() post_id = post_data.get('id', '').lower() # Check if this post matches any of the user's selected communities matches_community = False for selected_community in user_communities: selected_community = selected_community.lower() # Enhanced matching logic: # 1. Exact source match # 2. Platform match (for generic selections like 'hackernews') # 3. Partial match in post ID for platform-specific communities # 4. Handle current data mismatch where HN posts have source="hackernews" if (post_source == selected_community or post_platform == selected_community or (selected_community in post_source) or (selected_community in post_id) or (post_platform == 'hackernews' and selected_community in ['front_page', 'ask', 'show', 'hackernews'])): matches_community = True break if not matches_community: continue # Apply search filter (before filterset) if search_query: title = post_data.get('title', '').lower() content = post_data.get('content', '').lower() author = post_data.get('author', '').lower() source = post_data.get('source', '').lower() if not (search_query in title or search_query in content or search_query in author or search_query in source): continue raw_posts.append(post_data) # Apply filterset using FilterEngine filtered_posts = filter_engine.apply_filterset(raw_posts, filterset_name, use_cache=True) # Build response posts with metadata posts = [] for post_data in filtered_posts: post_uuid = post_data.get('uuid') comment_count = len(cached_comments.get(post_uuid, [])) # Get proper display name for source source_display = get_display_name_for_source( post_data.get('platform', ''), post_data.get('source', ''), platform_config ) # Create post object with filter metadata post = { 'id': post_uuid, 'title': post_data.get('title', 'Untitled'), 'author': post_data.get('author', 'Unknown'), 'platform': post_data.get('platform', 'unknown'), 'score': post_data.get('score', 0), 'timestamp': post_data.get('timestamp', 0), 'url': f'/post/{post_uuid}', 'comments_count': comment_count, 'content_preview': (post_data.get('content', '') or '')[:200] + '...' if post_data.get('content') else '', 'source': post_data.get('source', ''), 'source_display': source_display, 'tags': post_data.get('tags', []), 'external_url': post_data.get('url', ''), # Add filter metadata 'filter_score': post_data.get('_filter_score', 0.5), 'filter_categories': post_data.get('_filter_categories', []), 'filter_tags': post_data.get('_filter_tags', []) } posts.append(post) # Sort by filter score (highest first), then timestamp posts.sort(key=lambda x: (x['filter_score'], x['timestamp']), reverse=True) # Calculate pagination total_posts = len(posts) start_idx = (page - 1) * per_page end_idx = start_idx + per_page paginated_posts = posts[start_idx:end_idx] total_pages = (total_posts + per_page - 1) // per_page has_next = page < total_pages has_prev = page > 1 return { 'posts': paginated_posts, 'pagination': { 'current_page': page, 'total_pages': total_pages, 'total_posts': total_posts, 'per_page': per_page, 'has_next': has_next, 'has_prev': has_prev } } except Exception as e: print(f"Error loading posts: {e}") return {'posts': [], 'error': str(e), 'pagination': {'current_page': 1, 'total_pages': 0, 'total_posts': 0, 'per_page': DEFAULT_PAGE_SIZE, 'has_next': False, 'has_prev': False}} @app.route('/api/platforms') def api_platforms(): """API endpoint to get platform configuration and available communities""" try: platform_config = load_platform_config() # Build community list for filtering UI communities = [] posts_dir = Path('data/posts') source_counts = {} # Count posts per source to show actual available communities for post_file in posts_dir.glob('*.json'): try: with open(post_file, 'r') as f: post_data = json.load(f) platform = post_data.get('platform', 'unknown') source = post_data.get('source', '') key = f"{platform}:{source}" source_counts[key] = source_counts.get(key, 0) + 1 except: continue # Build community list from actual data and platform config for key, count in source_counts.items(): platform, source = key.split(':', 1) # Get display info from platform config platform_info = platform_config.get('platforms', {}).get(platform, {}) community_info = None if platform_info.get('supports_communities'): for community in platform_info.get('communities', []): if community['id'] == source: community_info = community break # Create community entry if community_info: community_entry = { 'platform': platform, 'id': source, 'name': community_info['name'], 'display_name': community_info['display_name'], 'icon': community_info.get('icon', platform_info.get('icon', '📄')), 'count': count, 'description': community_info.get('description', '') } else: # Fallback for sources not in config display_name = get_display_name_for_source(platform, source, platform_config) community_entry = { 'platform': platform, 'id': source, 'name': source or platform, 'display_name': display_name, 'icon': platform_info.get('icon', '📄'), 'count': count, 'description': f"Posts from {display_name}" } communities.append(community_entry) # Sort communities by count (most posts first) communities.sort(key=lambda x: x['count'], reverse=True) return { 'platforms': platform_config.get('platforms', {}), 'communities': communities, 'total_communities': len(communities) } except Exception as e: print(f"Error loading platform configuration: {e}") return { 'platforms': {}, 'communities': [], 'total_communities': 0, 'error': str(e) } @app.route('/api/content-timestamp') def api_content_timestamp(): """API endpoint to get the last content update timestamp for auto-refresh""" try: posts_dir = Path('data/posts') if not posts_dir.exists(): return jsonify({'timestamp': 0}) # Get the most recent modification time of any post file latest_mtime = 0 for post_file in posts_dir.glob('*.json'): mtime = post_file.stat().st_mtime if mtime > latest_mtime: latest_mtime = mtime return jsonify({'timestamp': latest_mtime}) except Exception as e: logger.error(f"Error getting content timestamp: {e}") return jsonify({'error': 'Failed to get content timestamp'}), 500 @app.route('/api/bookmark', methods=['POST']) @login_required def api_bookmark(): """Toggle bookmark status for a post""" try: from models import Bookmark data = request.get_json() if not data or 'post_uuid' not in data: return jsonify({'error': 'Missing post_uuid'}), 400 post_uuid = data['post_uuid'] if not post_uuid: return jsonify({'error': 'Invalid post_uuid'}), 400 # Check if bookmark already exists existing_bookmark = Bookmark.query.filter_by( user_id=current_user.id, post_uuid=post_uuid ).first() if existing_bookmark: # Remove bookmark db.session.delete(existing_bookmark) db.session.commit() return jsonify({'bookmarked': False, 'message': 'Bookmark removed'}) else: # Add bookmark - get post data for caching cached_posts, _ = _load_posts_cache() post_data = cached_posts.get(post_uuid, {}) bookmark = Bookmark( user_id=current_user.id, post_uuid=post_uuid, title=post_data.get('title', ''), platform=post_data.get('platform', ''), source=post_data.get('source', '') ) db.session.add(bookmark) db.session.commit() return jsonify({'bookmarked': True, 'message': 'Bookmark added'}) except Exception as e: logger.error(f"Error toggling bookmark: {e}") return jsonify({'error': 'Failed to toggle bookmark'}), 500 @app.route('/api/bookmarks') @login_required def api_bookmarks(): """Get user's bookmarks""" try: from models import Bookmark page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE)) # Get user's bookmarks with pagination bookmarks_query = Bookmark.query.filter_by(user_id=current_user.id).order_by(Bookmark.created_at.desc()) total_bookmarks = bookmarks_query.count() bookmarks = bookmarks_query.offset((page - 1) * per_page).limit(per_page).all() # Load current posts cache to get updated data cached_posts, cached_comments = _load_posts_cache() # Build response bookmark_posts = [] for bookmark in bookmarks: # Try to get current post data, fallback to cached data post_data = cached_posts.get(bookmark.post_uuid) if post_data: # Post still exists in current data comment_count = len(cached_comments.get(bookmark.post_uuid, [])) post = { 'id': bookmark.post_uuid, 'title': post_data.get('title', bookmark.title or 'Untitled'), 'author': post_data.get('author', 'Unknown'), 'platform': post_data.get('platform', bookmark.platform or 'unknown'), 'score': post_data.get('score', 0), 'timestamp': post_data.get('timestamp', 0), 'url': f'/post/{bookmark.post_uuid}', 'comments_count': comment_count, 'content_preview': (post_data.get('content', '') or '')[:200] + '...' if post_data.get('content') else '', 'source': post_data.get('source', bookmark.source or ''), 'bookmarked_at': bookmark.created_at.isoformat(), 'external_url': post_data.get('url', '') } else: # Post no longer in current data, use cached bookmark data post = { 'id': bookmark.post_uuid, 'title': bookmark.title or 'Untitled', 'author': 'Unknown', 'platform': bookmark.platform or 'unknown', 'score': 0, 'timestamp': 0, 'url': f'/post/{bookmark.post_uuid}', 'comments_count': 0, 'content_preview': 'Content no longer available', 'source': bookmark.source or '', 'bookmarked_at': bookmark.created_at.isoformat(), 'external_url': '', 'archived': True # Mark as archived } bookmark_posts.append(post) total_pages = (total_bookmarks + per_page - 1) // per_page has_next = page < total_pages has_prev = page > 1 return jsonify({ 'posts': bookmark_posts, 'pagination': { 'current_page': page, 'total_pages': total_pages, 'total_posts': total_bookmarks, 'per_page': per_page, 'has_next': has_next, 'has_prev': has_prev } }) except Exception as e: logger.error(f"Error getting bookmarks: {e}") return jsonify({'error': 'Failed to get bookmarks'}), 500 @app.route('/api/bookmark-status/') @login_required def api_bookmark_status(post_uuid): """Check if a post is bookmarked by current user""" try: from models import Bookmark bookmark = Bookmark.query.filter_by( user_id=current_user.id, post_uuid=post_uuid ).first() return jsonify({'bookmarked': bookmark is not None}) except Exception as e: logger.error(f"Error checking bookmark status: {e}") return jsonify({'error': 'Failed to check bookmark status'}), 500 @app.route('/api/filters') def api_filters(): """API endpoint to get available filters""" try: filters = [] # Get current user's filter preference current_filter = 'no_filter' if current_user.is_authenticated: try: user_settings = json.loads(current_user.settings) if current_user.settings else {} current_filter = user_settings.get('filter_set', 'no_filter') except: pass # Get available filtersets from filter engine for filterset_name in filter_engine.get_available_filtersets(): filterset_config = filter_engine.config.get_filterset(filterset_name) if filterset_config: # Map filter names to icons and display names icon_map = { 'no_filter': '🌐', 'safe_content': '✅', 'tech_only': '💻', 'high_quality': '⭐', 'custom_example': '🎯' } name_map = { 'no_filter': 'All Content', 'safe_content': 'Safe Content', 'tech_only': 'Tech Only', 'high_quality': 'High Quality', 'custom_example': 'Custom Example' } filters.append({ 'id': filterset_name, 'name': name_map.get(filterset_name, filterset_name.replace('_', ' ').title()), 'description': filterset_config.get('description', ''), 'icon': icon_map.get(filterset_name, '🔧'), 'active': filterset_name == current_filter }) return jsonify({'filters': filters}) except Exception as e: logger.error(f"Error getting filters: {e}") return jsonify({'error': 'Failed to get filters'}), 500 @app.route('/bookmarks') @login_required def bookmarks(): """Bookmarks page""" return render_template('bookmarks.html', user=current_user) def build_comment_tree(comments): """Build a hierarchical comment tree from flat comment list""" # Create lookup dict by UUID comment_dict = {c['uuid']: {**c, 'replies': []} for c in comments} # Build tree structure root_comments = [] for comment in comments: parent_uuid = comment.get('parent_comment_uuid') if parent_uuid and parent_uuid in comment_dict: # Add as reply to parent comment_dict[parent_uuid]['replies'].append(comment_dict[comment['uuid']]) else: # Top-level comment root_comments.append(comment_dict[comment['uuid']]) # Sort at each level by timestamp def sort_tree(comments_list): comments_list.sort(key=lambda x: x.get('timestamp', 0)) for comment in comments_list: if comment.get('replies'): sort_tree(comment['replies']) sort_tree(root_comments) return root_comments @app.route('/post/') def post_detail(post_id): """Serve individual post detail page with modern theme""" try: # Load platform configuration platform_config = load_platform_config() # Use cached data for better performance cached_posts, cached_comments = _load_posts_cache() # Get post data from cache post_data = cached_posts.get(post_id) if not post_data: return render_template('404.html'), 404 # Add source display name post_data['source_display'] = get_display_name_for_source( post_data.get('platform', ''), post_data.get('source', ''), platform_config ) # Get comments from cache comments_flat = cached_comments.get(post_id, []) logger.info(f"Loading post {post_id}: found {len(comments_flat)} comments") # Build comment tree comments = build_comment_tree(comments_flat) # Load user settings if authenticated user_settings = {} if current_user.is_authenticated: try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} return render_template('post_detail.html', post=post_data, comments=comments, user_settings=user_settings) except Exception as e: print(f"Error loading post {post_id}: {e}") return render_template('404.html'), 404 @app.route('/themes/') def serve_theme(filename): """Serve theme files (CSS, JS)""" # Validate filename to prevent directory traversal if not _is_safe_path(filename) or '..' in filename: logger.warning(f"Unsafe theme file requested: {filename}") abort(404) return send_from_directory('themes', filename) @app.route('/logo.png') def serve_logo(): """Serve configurable logo""" logo_path = app.config['LOGO_PATH'] # If it's just a filename, serve from current directory if '/' not in logo_path: return send_from_directory('.', logo_path) else: # If it's a full path, split directory and filename directory = os.path.dirname(logo_path) filename = os.path.basename(logo_path) return send_from_directory(directory, filename) @app.route('/static/') def serve_static(filename): """Serve static files (avatars, etc.)""" # Validate filename to prevent directory traversal if not _is_safe_path(filename) or '..' in filename: logger.warning(f"Unsafe static file requested: {filename}") abort(404) return send_from_directory('static', filename) # ============================================================ # AUTHENTICATION ROUTES # ============================================================ @app.route('/login', methods=['GET', 'POST']) def login(): """Login page""" if current_user.is_authenticated: return redirect(url_for('index')) # Check if Auth0 is configured auth0_configured = bool(app.config.get('AUTH0_DOMAIN') and app.config.get('AUTH0_CLIENT_ID')) if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') remember = request.form.get('remember', False) == 'on' if not user_service: flash('User service not available', 'error') return render_template('login.html', auth0_configured=auth0_configured) user = user_service.authenticate(username, password) if user: login_user(user, remember=remember) flash(f'Welcome back, {user.username}!', 'success') # Redirect to next page or home next_page = request.args.get('next') return redirect(next_page) if next_page else redirect(url_for('index')) else: flash('Invalid username or password', 'error') return render_template('login.html', auth0_configured=auth0_configured) @app.route('/password-reset-request', methods=['GET', 'POST']) def password_reset_request(): """Request a password reset""" if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': email = request.form.get('email', '').strip().lower() if not email: flash('Please enter your email address', 'error') return render_template('password_reset_request.html') # Find user by email user = User.query.filter_by(email=email).first() # Always show success message for security (don't reveal if email exists) flash('If an account exists with that email, a password reset link has been sent.', 'success') if user and user.password_hash: # Only send reset if user has a password (not OAuth only) # Generate reset token token = user.generate_reset_token() # Build reset URL reset_url = url_for('password_reset', token=token, _external=True) # Log the reset URL (in production, this would be emailed) logger.info(f"Password reset requested for {email}. Reset URL: {reset_url}") # For now, also flash it for development (remove in production) flash(f'Reset link (development only): {reset_url}', 'info') return redirect(url_for('login')) return render_template('password_reset_request.html') @app.route('/password-reset/', methods=['GET', 'POST']) def password_reset(token): """Reset password with token""" if current_user.is_authenticated: return redirect(url_for('index')) # Find user by token user = User.query.filter_by(reset_token=token).first() if not user or not user.verify_reset_token(token): flash('Invalid or expired reset token', 'error') return redirect(url_for('login')) if request.method == 'POST': password = request.form.get('password', '') confirm_password = request.form.get('confirm_password', '') if not password or len(password) < 6: flash('Password must be at least 6 characters', 'error') return render_template('password_reset.html') if password != confirm_password: flash('Passwords do not match', 'error') return render_template('password_reset.html') # Set new password user.set_password(password) user.clear_reset_token() flash('Your password has been reset successfully. You can now log in.', 'success') return redirect(url_for('login')) return render_template('password_reset.html') # Auth0 Routes @app.route('/auth0/login') def auth0_login(): """Redirect to Auth0 for authentication""" # Check if Auth0 is configured if not app.config.get('AUTH0_DOMAIN') or not app.config.get('AUTH0_CLIENT_ID'): flash('Auth0 authentication is not configured. Please use email/password login or contact the administrator.', 'error') return redirect(url_for('login')) try: redirect_uri = url_for('auth0_callback', _external=True) return auth0.authorize_redirect(redirect_uri) except Exception as e: logger.error(f"Auth0 login error: {e}") flash('Auth0 authentication failed. Please use email/password login.', 'error') return redirect(url_for('login')) @app.route('/auth0/callback') def auth0_callback(): """Handle Auth0 callback and create/login user""" try: # Get the access token from Auth0 token = auth0.authorize_access_token() # Get user info from Auth0 user_info = token.get('userinfo') if not user_info: user_info = auth0.parse_id_token(token) # Extract user details auth0_id = user_info.get('sub') email = user_info.get('email') username = user_info.get('nickname') or user_info.get('preferred_username') or email.split('@')[0] if not auth0_id or not email: flash('Unable to get user information from Auth0', 'error') return redirect(url_for('login')) # Check if user exists with this Auth0 ID user = user_service.get_user_by_auth0_id(auth0_id) if not user: # Check if user exists with this email (for account linking) existing_user = user_service.get_user_by_email(email) if existing_user: # Link existing account to Auth0 user_service.link_auth0_account(existing_user.id, auth0_id) user = existing_user flash(f'Account linked successfully! Welcome back, {user.username}!', 'success') else: # Create new user # Generate unique username if needed base_username = username[:MAX_USERNAME_LENGTH-3] # Leave room for suffix unique_username = base_username counter = 1 while user_service.username_exists(unique_username): unique_username = f"{base_username}_{counter}" counter += 1 user_id = user_service.create_user( username=unique_username, email=email, password=None, # No password for OAuth users is_admin=False, auth0_id=auth0_id ) if user_id: user = user_service.get_user_by_id(user_id) flash(f'Account created successfully! Welcome, {user.username}!', 'success') else: flash('Failed to create user account', 'error') return redirect(url_for('login')) else: flash(f'Welcome back, {user.username}!', 'success') # Log in the user if user: login_user(user, remember=True) # Store Auth0 info in session for future use session['auth0_user_info'] = user_info # Redirect to next page or home next_page = request.args.get('next') return redirect(next_page) if next_page else redirect(url_for('index')) except Exception as e: logger.error(f"Auth0 callback error: {e}") flash('Authentication failed. Please try again.', 'error') return redirect(url_for('login')) @app.route('/auth0/logout') @login_required def auth0_logout(): """Logout from Auth0 and local session""" # Clear session session.clear() logout_user() # Build Auth0 logout URL domain = app.config['AUTH0_DOMAIN'] client_id = app.config['AUTH0_CLIENT_ID'] return_to = url_for('index', _external=True) logout_url = f'https://{domain}/v2/logout?' + urlencode({ 'returnTo': return_to, 'client_id': client_id }, quote_via=quote_plus) return redirect(logout_url) @app.route('/admin-setup', methods=['GET', 'POST']) def admin_setup(): """Create first admin user""" # Check if users already exist try: user_count = User.query.count() if user_count > 0: flash('Admin user already exists.', 'info') return redirect(url_for('login')) except Exception as e: logger.warning(f"Database error checking existing users: {e}") pass if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') password_confirm = request.form.get('password_confirm') # Validation if not username or not email or not password: flash('All fields are required', 'error') return render_template('admin_setup.html') if password != password_confirm: flash('Passwords do not match', 'error') return render_template('admin_setup.html') if len(password) < MIN_PASSWORD_LENGTH: flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error') return render_template('admin_setup.html') # Create admin user user_id = user_service.create_user(username, email, password, is_admin=True) if user_id: flash('Admin account created successfully! Please log in.', 'success') return redirect(url_for('login')) else: flash('Error creating admin account. Please try again.', 'error') return render_template('admin_setup.html') @app.route('/signup', methods=['GET', 'POST']) def signup(): """Signup page""" if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') password_confirm = request.form.get('password_confirm') if not user_service: flash('User service not available', 'error') return render_template('signup.html') # Validation if not username or not email or not password: flash('All fields are required', 'error') return render_template('signup.html') if password != password_confirm: flash('Passwords do not match', 'error') return render_template('signup.html') if len(password) < MIN_PASSWORD_LENGTH: flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error') return render_template('signup.html') if user_service.username_exists(username): flash('Username already taken', 'error') return render_template('signup.html') if user_service.email_exists(email): flash('Email already registered', 'error') return render_template('signup.html') # Create user user_id = user_service.create_user(username, email, password) if user_id: flash('Account created successfully! Please log in.', 'success') return redirect(url_for('login')) else: flash('Error creating account. Please try again.', 'error') return render_template('signup.html') @app.route('/logout') @login_required def logout(): """Logout current user""" logout_user() flash('You have been logged out.', 'info') return redirect(url_for('index')) @app.route('/settings') @login_required def settings(): """Main settings page""" # Load user settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} # Load available filter sets try: with open('filtersets.json', 'r') as f: filter_sets = json.load(f) except: filter_sets = {} return render_template('settings.html', user=current_user, user_settings=user_settings, filter_sets=filter_sets) @app.route('/settings/profile', methods=['GET', 'POST']) @login_required def settings_profile(): """Profile settings page""" if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') default_avatar = request.form.get('default_avatar') # Validation if not username or not email: flash('Username and email are required', 'error') return render_template('settings_profile.html', user=current_user) # Check if username is taken by another user if username != current_user.username and user_service.username_exists(username): flash('Username already taken', 'error') return render_template('settings_profile.html', user=current_user) # Check if email is taken by another user if email != current_user.email and user_service.email_exists(email): flash('Email already registered', 'error') return render_template('settings_profile.html', user=current_user) # Update user current_user.username = username current_user.email = email # Handle default avatar selection if default_avatar and default_avatar.startswith('default_'): current_user.profile_picture_url = f"/static/default-avatars/{default_avatar}.png" db.session.commit() flash('Profile updated successfully', 'success') return redirect(url_for('settings')) # Available default avatars default_avatars = [ {'id': 'default_1', 'name': 'Gradient Blue', 'bg': 'linear-gradient(135deg, #667eea 0%, #764ba2 100%)'}, {'id': 'default_2', 'name': 'Gradient Green', 'bg': 'linear-gradient(135deg, #4facfe 0%, #00f2fe 100%)'}, {'id': 'default_3', 'name': 'Gradient Orange', 'bg': 'linear-gradient(135deg, #fa709a 0%, #fee140 100%)'}, {'id': 'default_4', 'name': 'Gradient Purple', 'bg': 'linear-gradient(135deg, #a8edea 0%, #fed6e3 100%)'}, {'id': 'default_5', 'name': 'Brand Colors', 'bg': 'linear-gradient(135deg, #4db6ac 0%, #26a69a 100%)'}, {'id': 'default_6', 'name': 'Sunset', 'bg': 'linear-gradient(135deg, #ff7e5f 0%, #feb47b 100%)'}, ] return render_template('settings_profile.html', user=current_user, default_avatars=default_avatars) @app.route('/settings/communities', methods=['GET', 'POST']) @login_required def settings_communities(): """Community/source selection settings""" if request.method == 'POST': # Get selected communities selected_communities = request.form.getlist('communities') # Load current settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} # Update communities user_settings['communities'] = selected_communities # Save settings current_user.settings = json.dumps(user_settings) db.session.commit() flash('Community preferences updated', 'success') return redirect(url_for('settings')) # Load current settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} selected_communities = user_settings.get('communities', []) except: selected_communities = [] # Get available communities from platform config and collection targets available_communities = [] # Load platform configuration with error handling try: platform_config = load_platform_config() if not platform_config: platform_config = {"platforms": {}, "collection_targets": []} except Exception as e: logger.error(f"Error loading platform config: {e}") platform_config = {"platforms": {}, "collection_targets": []} # Get enabled communities from collection_targets (what's actually being crawled) enabled_communities = set() try: for target in platform_config.get('collection_targets', []): if 'platform' in target and 'community' in target: enabled_communities.add((target['platform'], target['community'])) except Exception as e: logger.error(f"Error processing collection_targets: {e}") # Build community list from platform config for communities that are enabled try: for platform_name, platform_info in platform_config.get('platforms', {}).items(): if not isinstance(platform_info, dict): continue communities = platform_info.get('communities', []) if not isinstance(communities, list): continue for community_info in communities: try: if not isinstance(community_info, dict): continue # Only include communities that are in collection_targets if (platform_name, community_info['id']) in enabled_communities: available_communities.append({ 'id': community_info['id'], 'name': community_info['name'], 'display_name': community_info.get('display_name', community_info['name']), 'platform': platform_name, 'icon': community_info.get('icon', platform_info.get('icon', '📄')), 'description': community_info.get('description', '') }) except Exception as e: logger.error(f"Error processing community {community_info}: {e}") continue except Exception as e: logger.error(f"Error building community list: {e}") logger.info(f"Found {len(available_communities)} available communities") return render_template('settings_communities.html', user=current_user, available_communities=available_communities, selected_communities=selected_communities) @app.route('/settings/filters', methods=['GET', 'POST']) @login_required def settings_filters(): """Filter settings page""" if request.method == 'POST': selected_filter = request.form.get('filter_set', 'no_filter') # Load and validate current settings user_settings = _validate_user_settings(current_user.settings) # Validate new filter setting if _is_safe_filterset(selected_filter): user_settings['filter_set'] = selected_filter else: flash('Invalid filter selection', 'error') return redirect(url_for('settings')) # Save validated settings try: current_user.settings = json.dumps(user_settings) db.session.commit() flash('Filter settings updated successfully', 'success') except Exception as e: db.session.rollback() logger.error(f"Error saving filter settings for user {current_user.id}: {e}") flash('Error saving settings', 'error') return redirect(url_for('settings')) # Load current settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} current_filter = user_settings.get('filter_set', 'no_filter') # Load available filter sets from FilterEngine as a dictionary filter_sets = {} for filterset_name in filter_engine.get_available_filtersets(): filter_sets[filterset_name] = filter_engine.config.get_filterset(filterset_name) return render_template('settings_filters.html', user=current_user, filter_sets=filter_sets, current_filter=current_filter) @app.route('/settings/experience', methods=['GET', 'POST']) @login_required def settings_experience(): """Experience and behavioral settings page - opt-in addictive features""" if request.method == 'POST': # Load current settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} # Get experience settings with defaults (all opt-in, so default to False) user_settings['experience'] = { 'infinite_scroll': request.form.get('infinite_scroll') == 'on', 'auto_refresh': request.form.get('auto_refresh') == 'on', 'push_notifications': request.form.get('push_notifications') == 'on', 'dark_patterns_opt_in': request.form.get('dark_patterns_opt_in') == 'on', 'time_filter_enabled': request.form.get('time_filter_enabled') == 'on', 'time_filter_days': int(request.form.get('time_filter_days', 7)) } # Save settings current_user.settings = json.dumps(user_settings) db.session.commit() flash('Experience settings updated successfully', 'success') return redirect(url_for('settings')) # Load current settings try: user_settings = json.loads(current_user.settings) if current_user.settings else {} except: user_settings = {} experience_settings = user_settings.get('experience', { 'infinite_scroll': False, 'auto_refresh': False, 'push_notifications': False, 'dark_patterns_opt_in': False, 'time_filter_enabled': False, 'time_filter_days': 7 }) return render_template('settings_experience.html', user=current_user, experience_settings=experience_settings) @app.route('/upload-avatar', methods=['POST']) @login_required def upload_avatar(): """Upload profile picture""" try: # Debug logging logger.info(f"Avatar upload attempt by user {current_user.id} ({current_user.username})") logger.debug(f"Request files: {list(request.files.keys())}") logger.debug(f"Request form: {dict(request.form)}") # Check if user is properly authenticated and has required attributes if not hasattr(current_user, 'id') or not current_user.id: logger.error("User missing ID attribute") flash('Authentication error. Please log in again.', 'error') return redirect(url_for('login')) if not hasattr(current_user, 'username') or not current_user.username: logger.error("User missing username attribute") flash('User profile incomplete. Please update your profile.', 'error') return redirect(url_for('settings_profile')) # Check for file in request if 'avatar' not in request.files: logger.warning("No avatar file in request") flash('No file selected', 'error') return redirect(url_for('settings_profile')) file = request.files['avatar'] if file.filename == '': logger.warning("Empty filename provided") flash('No file selected', 'error') return redirect(url_for('settings_profile')) logger.info(f"Processing file: {file.filename}") # Validate file type and size if not _is_allowed_file(file.filename): logger.warning(f"Invalid file type: {file.filename}") flash('Invalid file type. Please upload PNG, JPG, or GIF', 'error') return redirect(url_for('settings_profile')) # Check file size (Flask's MAX_CONTENT_LENGTH handles this too, but double-check) if hasattr(file, 'content_length') and file.content_length > app.config.get('MAX_CONTENT_LENGTH', 16*1024*1024): logger.warning(f"File too large: {file.content_length}") flash('File too large. Maximum size is 16MB', 'error') return redirect(url_for('settings_profile')) # Validate and secure filename filename = secure_filename(file.filename) if not filename or len(filename) > MAX_FILENAME_LENGTH: logger.warning(f"Invalid filename after sanitization: {filename}") flash('Invalid filename', 'error') return redirect(url_for('settings_profile')) # Add user ID to make filename unique and prevent conflicts unique_filename = f"{current_user.id}_{filename}" logger.info(f"Generated unique filename: {unique_filename}") # Ensure upload directory exists and is secure upload_dir = os.path.abspath(UPLOAD_FOLDER) os.makedirs(upload_dir, exist_ok=True) upload_path = os.path.join(upload_dir, unique_filename) # Final security check - ensure path is within upload directory if not os.path.abspath(upload_path).startswith(upload_dir): logger.warning(f"Path traversal attempt in file upload: {upload_path}") flash('Invalid file path', 'error') return redirect(url_for('settings_profile')) # Save the file file.save(upload_path) logger.info(f"File saved successfully: {upload_path}") # Update user profile with database error handling old_avatar_url = current_user.profile_picture_url current_user.profile_picture_url = f"/static/avatars/{unique_filename}" db.session.commit() logger.info(f"User profile updated successfully for {current_user.username}") # Clean up old avatar file if it exists and was uploaded by user if old_avatar_url and old_avatar_url.startswith('/static/avatars/') and current_user.id in old_avatar_url: try: old_file_path = os.path.join(upload_dir, os.path.basename(old_avatar_url)) if os.path.exists(old_file_path): os.remove(old_file_path) logger.info(f"Cleaned up old avatar: {old_file_path}") except Exception as e: logger.warning(f"Could not clean up old avatar: {e}") flash('Profile picture updated successfully', 'success') return redirect(url_for('settings_profile')) except Exception as e: logger.error(f"Unexpected error in avatar upload: {e}") db.session.rollback() flash('An unexpected error occurred. Please try again.', 'error') return redirect(url_for('settings_profile')) @app.route('/profile') @login_required def profile(): """User profile page""" return render_template('profile.html', user=current_user) # ============================================================ # ADMIN ROUTES # ============================================================ @app.route('/admin') @login_required def admin_panel(): """Admin panel - user management""" if not current_user.is_admin: flash('Access denied. Admin privileges required.', 'error') return redirect(url_for('index')) if not user_service: flash('User service not available', 'error') return redirect(url_for('index')) users = user_service.get_all_users() return render_template('admin.html', users=users) @app.route('/admin/user//delete', methods=['POST']) @login_required def admin_delete_user(user_id): """Delete user (admin only)""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) # Prevent self-deletion if current_user.id == user_id: flash('You cannot delete your own account!', 'error') return redirect(url_for('admin_panel')) user = user_service.get_user_by_id(user_id) if user: username = user.username if user_service.delete_user(user_id): flash(f'User {username} has been deleted.', 'success') logger.info(f"Admin {current_user.id} deleted user {username} ({user_id})") else: flash('Error deleting user', 'error') logger.error(f"Failed to delete user {user_id}") else: flash('User not found', 'error') return redirect(url_for('admin_panel')) @app.route('/admin/user//toggle-admin', methods=['POST']) @login_required def admin_toggle_admin(user_id): """Toggle user admin status""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) target_user = user_service.get_user_by_id(user_id) if target_user: new_status = not target_user.is_admin # Toggle admin status user_service.update_user_admin_status(user_id, new_status) flash('Admin status updated', 'success') else: flash('User not found', 'error') return redirect(url_for('admin_panel')) # This route is duplicate - removed in favor of the UUID-based route above # This route is duplicate - removed in favor of the UUID-based route above @app.route('/admin/regenerate_content', methods=['POST']) @login_required def admin_regenerate_content(): """Regenerate all HTML content""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('admin_panel')) try: import subprocess import shlex # Secure subprocess execution with absolute paths and validation script_path = os.path.abspath('generate_html.py') if not os.path.exists(script_path): flash('Content generation script not found', 'error') return redirect(url_for('admin_panel')) # Use absolute python path and validate arguments python_exe = os.path.abspath(os.sys.executable) cmd = [python_exe, script_path, '--filterset', 'no_filter', '--theme', 'vanilla-js'] # Execute with timeout and security restrictions result = subprocess.run( cmd, capture_output=True, text=True, cwd=os.path.abspath('.'), timeout=300, # 5 minute timeout check=False ) if result.returncode == 0: flash('Content regenerated successfully', 'success') logger.info(f"Content regenerated by admin user {current_user.id}") # Invalidate cache since content was regenerated _invalidate_cache() else: flash('Error regenerating content', 'error') logger.error(f"Content regeneration failed: {result.stderr}") except subprocess.TimeoutExpired: flash('Content regeneration timed out', 'error') logger.error("Content regeneration timed out") except Exception as e: flash(f'Error regenerating content: {str(e)}', 'error') logger.error(f"Content regeneration error: {e}") return redirect(url_for('admin_panel')) @app.route('/admin/clear_cache', methods=['POST']) @login_required def admin_clear_cache(): """Clear application cache""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('admin_panel')) try: # Clear any cache directories or temp files import shutil import os cache_dirs = ['cache', 'temp'] for cache_dir in cache_dirs: if os.path.exists(cache_dir): shutil.rmtree(cache_dir) # Clear application cache _invalidate_cache() flash('Cache cleared successfully', 'success') logger.info(f"Cache cleared by admin user {current_user.id}") except Exception as e: flash(f'Error clearing cache: {str(e)}', 'error') logger.error(f"Cache clearing error: {e}") return redirect(url_for('admin_panel')) @app.route('/admin/backup_data', methods=['POST']) @login_required def admin_backup_data(): """Create backup of application data""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('admin_panel')) try: import shutil import os from datetime import datetime timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_name = f'balanceboard_backup_{timestamp}' # Create backup directory backup_dir = f'backups/{backup_name}' os.makedirs(backup_dir, exist_ok=True) # Copy important directories dirs_to_backup = ['data', 'templates', 'themes', 'static'] for dir_name in dirs_to_backup: if os.path.exists(dir_name): shutil.copytree(dir_name, f'{backup_dir}/{dir_name}') # Copy important files files_to_backup = ['app.py', 'models.py', 'database.py', 'filtersets.json'] for file_name in files_to_backup: if os.path.exists(file_name): shutil.copy2(file_name, backup_dir) flash(f'Backup created: {backup_name}', 'success') except Exception as e: flash(f'Error creating backup: {str(e)}', 'error') return redirect(url_for('admin_panel')) # ============================================================ # POLLING MANAGEMENT ROUTES # ============================================================ @app.route('/admin/polling') @login_required def admin_polling(): """Admin polling management page""" if not current_user.is_admin: flash('Access denied. Admin privileges required.', 'error') return redirect(url_for('index')) from models import PollSource, PollLog from polling_service import polling_service # Get all poll sources with recent logs sources = PollSource.query.order_by(PollSource.platform, PollSource.display_name).all() # Get scheduler status scheduler_status = polling_service.get_status() # Load platform config for available sources platform_config = load_platform_config() return render_template('admin_polling.html', sources=sources, scheduler_status=scheduler_status, platform_config=platform_config) @app.route('/admin/polling/add', methods=['POST']) @login_required def admin_polling_add(): """Add a new poll source""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource platform = request.form.get('platform') source_id = request.form.get('source_id') custom_source_id = request.form.get('custom_source_id') display_name = request.form.get('display_name') poll_interval = int(request.form.get('poll_interval', 60)) max_posts = int(request.form.get('max_posts', 100)) fetch_comments = request.form.get('fetch_comments', 'true') == 'true' priority = request.form.get('priority', 'medium') # Use custom source if provided, otherwise use dropdown if custom_source_id and custom_source_id.strip(): source_id = custom_source_id.strip() if not platform or not source_id or not display_name: flash('Missing required fields', 'error') return redirect(url_for('admin_polling')) # Check if source already exists existing = PollSource.query.filter_by(platform=platform, source_id=source_id).first() if existing: flash(f'Source {platform}:{source_id} already exists', 'warning') return redirect(url_for('admin_polling')) # Create new source (disabled by default) source = PollSource( platform=platform, source_id=source_id, display_name=display_name, poll_interval_minutes=poll_interval, max_posts=max_posts, fetch_comments=fetch_comments, priority=priority, enabled=False, created_by=current_user.id ) db.session.add(source) db.session.commit() flash(f'Added polling source: {display_name}', 'success') logger.info(f"Admin {current_user.id} added poll source {platform}:{source_id}") return redirect(url_for('admin_polling')) @app.route('/admin/polling//toggle', methods=['POST']) @login_required def admin_polling_toggle(source_id): """Toggle a poll source on/off""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource source = PollSource.query.get(source_id) if not source: flash('Source not found', 'error') return redirect(url_for('admin_polling')) source.enabled = not source.enabled db.session.commit() status = 'enabled' if source.enabled else 'disabled' flash(f'Polling {status} for {source.display_name}', 'success') return redirect(url_for('admin_polling')) @app.route('/admin/polling//update', methods=['POST']) @login_required def admin_polling_update(source_id): """Update poll source configuration""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource source = PollSource.query.get(source_id) if not source: flash('Source not found', 'error') return redirect(url_for('admin_polling')) # Update all configurable fields if request.form.get('poll_interval'): source.poll_interval_minutes = int(request.form.get('poll_interval')) if request.form.get('max_posts'): source.max_posts = int(request.form.get('max_posts')) if request.form.get('fetch_comments') is not None: source.fetch_comments = request.form.get('fetch_comments') == 'true' if request.form.get('priority'): source.priority = request.form.get('priority') if request.form.get('display_name'): source.display_name = request.form.get('display_name') db.session.commit() flash(f'Updated settings for {source.display_name}', 'success') return redirect(url_for('admin_polling')) @app.route('/admin/polling//poll-now', methods=['POST']) @login_required def admin_polling_poll_now(source_id): """Manually trigger polling for a source""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource from polling_service import polling_service source = PollSource.query.get(source_id) if not source: flash('Source not found', 'error') return redirect(url_for('admin_polling')) # Trigger polling in background try: polling_service.poll_now(source_id) flash(f'Polling started for {source.display_name}', 'success') except Exception as e: flash(f'Error starting poll: {str(e)}', 'error') logger.error(f"Error triggering poll for {source_id}: {e}") return redirect(url_for('admin_polling')) @app.route('/admin/polling//delete', methods=['POST']) @login_required def admin_polling_delete(source_id): """Delete a poll source""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource source = PollSource.query.get(source_id) if not source: flash('Source not found', 'error') return redirect(url_for('admin_polling')) display_name = source.display_name db.session.delete(source) db.session.commit() flash(f'Deleted polling source: {display_name}', 'success') logger.info(f"Admin {current_user.id} deleted poll source {source_id}") return redirect(url_for('admin_polling')) @app.route('/admin/polling//logs') @login_required def admin_polling_logs(source_id): """View logs for a specific poll source""" if not current_user.is_admin: flash('Access denied', 'error') return redirect(url_for('index')) from models import PollSource, PollLog source = PollSource.query.get(source_id) if not source: flash('Source not found', 'error') return redirect(url_for('admin_polling')) # Get recent logs (limit to 50) logs = source.logs.limit(50).all() return render_template('admin_polling_logs.html', source=source, logs=logs) # ============================================================ # TEMPLATE CONTEXT PROCESSORS # ============================================================ @app.context_processor def inject_app_config(): """Inject app configuration into all templates""" return { 'APP_NAME': app.config['APP_NAME'] } # ============================================================ # ERROR HANDLERS # ============================================================ @app.errorhandler(404) def not_found(e): """404 page""" return render_template('404.html'), 404 @app.errorhandler(500) def server_error(e): """500 page""" return render_template('500.html'), 500 # ============================================================ # INITIALIZATION # ============================================================ if __name__ == '__main__': print("✓ BalanceBoard starting...") print("✓ Database: PostgreSQL with SQLAlchemy") print("✓ Password hashing: bcrypt") print("✓ Authentication: Flask-Login") app.run(host='0.0.0.0', port=DEFAULT_PORT, debug=True)