Files
balanceboard/app.py
chelsea 83dd85ffa3 Add authentication improvements and search functionality
- Implement anonymous access control with ALLOW_ANONYMOUS_ACCESS env var
- Add complete password reset workflow with token-based validation
- Add username recovery functionality for better UX
- Implement full-text search API with relevance scoring and highlighting
- Add Docker compatibility improvements with permission handling and fallback storage
- Add quick stats API for real-time dashboard updates
- Improve security with proper token expiration and input validation
- Add search result pagination and navigation
- Enhance error handling and logging throughout the application

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 21:11:03 -05:00

1848 lines
64 KiB
Python

"""
BalanceBoard Web Application
Flask server with user authentication and content serving.
"""
import os
import re
import logging
import time
import datetime
from pathlib import Path
from werkzeug.utils import secure_filename
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, abort, session, jsonify
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from dotenv import load_dotenv
from functools import lru_cache
from collections import defaultdict
from authlib.integrations.flask_client import OAuth
from urllib.parse import quote_plus, urlencode
from database import init_db, db
from models import User, bcrypt
from user_service import UserService
import json
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__,
static_folder='themes',
template_folder='templates')
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
app.config['ALLOW_ANONYMOUS_ACCESS'] = os.getenv('ALLOW_ANONYMOUS_ACCESS', 'true').lower() == 'true'
# Auth0 Configuration
app.config['AUTH0_DOMAIN'] = os.getenv('AUTH0_DOMAIN', '')
app.config['AUTH0_CLIENT_ID'] = os.getenv('AUTH0_CLIENT_ID', '')
app.config['AUTH0_CLIENT_SECRET'] = os.getenv('AUTH0_CLIENT_SECRET', '')
app.config['AUTH0_AUDIENCE'] = os.getenv('AUTH0_AUDIENCE', '')
# Configuration constants
ALLOWED_FILTERSETS = {'no_filter', 'safe_content'}
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
UPLOAD_FOLDER = 'static/avatars'
MAX_FILENAME_LENGTH = 100
DEFAULT_PORT = 5021
DEFAULT_PAGE_SIZE = 20
MIN_PASSWORD_LENGTH = 8
MAX_USERNAME_LENGTH = 80
MAX_EMAIL_LENGTH = 120
MAX_COMMUNITY_NAME_LENGTH = 100
# Initialize database
init_db(app)
# Initialize bcrypt
bcrypt.init_app(app)
# Initialize Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Please log in to access this page.'
# Initialize user service
user_service = UserService()
# Initialize polling service
from polling_service import polling_service
polling_service.init_app(app)
polling_service.start()
# Initialize OAuth for Auth0
oauth = OAuth(app)
auth0 = oauth.register(
'auth0',
client_id=app.config['AUTH0_CLIENT_ID'],
client_secret=app.config['AUTH0_CLIENT_SECRET'],
server_metadata_url=f'https://{app.config["AUTH0_DOMAIN"]}/.well-known/openid_configuration',
client_kwargs={
'scope': 'openid profile email',
}
)
# Cache for posts and comments - improves performance
post_cache = {}
comment_cache = defaultdict(list)
cache_timestamp = 0
CACHE_DURATION = 300 # 5 minutes
# Security helper functions
def _is_safe_filterset(filterset):
"""Validate filterset name for security"""
if not filterset or not isinstance(filterset, str):
return False
return filterset in ALLOWED_FILTERSETS and re.match(r'^[a-zA-Z0-9_-]+$', filterset)
def _is_safe_path(path):
"""Validate file path for security"""
if not path or not isinstance(path, str):
return False
# Check for directory traversal attempts
if '..' in path or path.startswith('/') or '\\' in path:
return False
# Only allow alphanumeric, dots, hyphens, underscores, and forward slashes
return re.match(r'^[a-zA-Z0-9._/-]+$', path) is not None
def _is_allowed_file(filename):
"""Check if file extension is allowed"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def _load_posts_cache():
"""Load and cache posts data for better performance"""
global post_cache, comment_cache, cache_timestamp
current_time = time.time()
if current_time - cache_timestamp < CACHE_DURATION and post_cache:
return post_cache, comment_cache
# Clear existing cache
post_cache.clear()
comment_cache.clear()
posts_dir = Path('data/posts')
comments_dir = Path('data/comments')
# Load all posts
if posts_dir.exists():
for post_file in posts_dir.glob('*.json'):
try:
with open(post_file, 'r') as f:
post_data = json.load(f)
post_uuid = post_data.get('uuid')
if post_uuid:
post_cache[post_uuid] = post_data
except (json.JSONDecodeError, IOError) as e:
logger.debug(f"Error reading post file {post_file}: {e}")
continue
# Load all comments and group by post UUID
if comments_dir.exists():
for comment_file in comments_dir.glob('*.json'):
try:
with open(comment_file, 'r') as f:
comment_data = json.load(f)
post_uuid = comment_data.get('post_uuid')
if post_uuid:
comment_cache[post_uuid].append(comment_data)
except (json.JSONDecodeError, IOError) as e:
logger.debug(f"Error reading comment file {comment_file}: {e}")
continue
cache_timestamp = current_time
logger.info(f"Cache refreshed: {len(post_cache)} posts, {len(comment_cache)} comment groups")
return post_cache, comment_cache
def _invalidate_cache():
"""Invalidate the cache to force refresh"""
global cache_timestamp
cache_timestamp = 0
def _validate_user_settings(settings_str):
"""Validate and sanitize user settings JSON"""
try:
if not settings_str:
return {}
settings = json.loads(settings_str)
if not isinstance(settings, dict):
logger.warning("User settings must be a JSON object")
return {}
# Validate specific fields
validated = {}
# Filter set validation
if 'filter_set' in settings:
filter_set = settings['filter_set']
if isinstance(filter_set, str) and _is_safe_filterset(filter_set):
validated['filter_set'] = filter_set
# Communities validation
if 'communities' in settings:
communities = settings['communities']
if isinstance(communities, list):
# Validate each community name
safe_communities = []
for community in communities:
if isinstance(community, str) and len(community) <= MAX_COMMUNITY_NAME_LENGTH and re.match(r'^[a-zA-Z0-9_-]+$', community):
safe_communities.append(community)
validated['communities'] = safe_communities
# Experience settings validation
if 'experience' in settings:
exp = settings['experience']
if isinstance(exp, dict):
safe_exp = {}
bool_fields = ['infinite_scroll', 'auto_refresh', 'push_notifications', 'dark_patterns_opt_in']
for field in bool_fields:
if field in exp and isinstance(exp[field], bool):
safe_exp[field] = exp[field]
validated['experience'] = safe_exp
return validated
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Invalid user settings JSON: {e}")
return {}
# Add custom Jinja filters
@app.template_filter('nl2br')
def nl2br_filter(text):
"""Convert newlines to <br> tags"""
if not text:
return text
return text.replace('\n', '<br>\n')
@login_manager.user_loader
def load_user(user_id):
"""Load user by ID for Flask-Login"""
return user_service.get_user_by_id(user_id)
# ============================================================
# STATIC CONTENT ROUTES
# ============================================================
@app.before_request
def check_first_user():
"""Check if any users exist, redirect to admin creation if not"""
# Skip for static files and auth routes
if request.endpoint and (
request.endpoint.startswith('static') or
request.endpoint in ['login', 'signup', 'admin_setup', 'serve_theme', 'serve_logo']
):
return
# Skip if user is already authenticated
if current_user.is_authenticated:
return
# Check if any users exist
try:
user_count = User.query.count()
if user_count == 0:
return redirect(url_for('admin_setup'))
except Exception as e:
# If database is not ready, skip check
logger.warning(f"Database not ready for user count check: {e}")
pass
@app.route('/')
def index():
"""Serve the main feed page"""
if current_user.is_authenticated:
# Load user settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Invalid user settings JSON for user {current_user.id}: {e}")
user_settings = {}
return render_template('dashboard.html', user_settings=user_settings)
else:
# Check if anonymous access is allowed
if app.config.get('ALLOW_ANONYMOUS_ACCESS', False):
# Anonymous access allowed - use default settings
user_settings = {
'experience': {
'infinite_scroll': False,
'auto_refresh': False
},
'communities': []
}
return render_template('dashboard.html', user_settings=user_settings)
else:
# Redirect non-authenticated users to login
return redirect(url_for('login'))
@app.route('/feed/<filterset>')
def feed_content(filterset='no_filter'):
"""Serve filtered feed content"""
# Validate filterset to prevent directory traversal
if not _is_safe_filterset(filterset):
logger.warning(f"Invalid filterset requested: {filterset}")
abort(404)
# Additional path validation
safe_path = os.path.normpath(f'active_html/{filterset}/index.html')
if not safe_path.startswith('active_html/'):
logger.warning(f"Path traversal attempt detected: {filterset}")
abort(404)
return send_from_directory(f'active_html/{filterset}', 'index.html')
def load_platform_config():
"""Load platform configuration"""
try:
with open('platform_config.json', 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
logger.warning(f"Could not load platform config: {e}")
return {"platforms": {}, "collection_targets": []}
def get_display_name_for_source(platform, source, platform_config):
"""Get proper display name for a source based on platform"""
if not platform_config or 'platforms' not in platform_config:
return source
platform_info = platform_config['platforms'].get(platform, {})
# For platforms with communities, find the community info
if platform_info.get('supports_communities'):
for community in platform_info.get('communities', []):
if community['id'] == source:
return community['display_name']
# Fallback to prefix + source for Reddit-like platforms
prefix = platform_info.get('prefix', '')
return f"{prefix}{source}" if source else platform_info.get('name', platform)
else:
# For platforms without communities, use the platform name
return platform_info.get('name', platform)
@app.route('/api/posts')
def api_posts():
"""API endpoint to get posts data with pagination and filtering"""
try:
# Load platform configuration
platform_config = load_platform_config()
# Get query parameters
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE))
community = request.args.get('community', '')
platform = request.args.get('platform', '')
# Use cached data for better performance
cached_posts, cached_comments = _load_posts_cache()
posts = []
# Process cached posts
for post_uuid, post_data in cached_posts.items():
# Apply community filter
if community and post_data.get('source', '').lower() != community.lower():
continue
# Apply platform filter
if platform and post_data.get('platform', '').lower() != platform.lower():
continue
# Get comment count from cache
comment_count = len(cached_comments.get(post_uuid, []))
# Get proper display name for source
source_display = get_display_name_for_source(
post_data.get('platform', ''),
post_data.get('source', ''),
platform_config
)
# Create post object with actual title
post = {
'id': post_uuid,
'title': post_data.get('title', 'Untitled'),
'author': post_data.get('author', 'Unknown'),
'platform': post_data.get('platform', 'unknown'),
'score': post_data.get('score', 0),
'timestamp': post_data.get('timestamp', 0),
'url': f'/post/{post_uuid}',
'comments_count': comment_count,
'content_preview': (post_data.get('content', '') or '')[:200] + '...' if post_data.get('content') else '',
'source': post_data.get('source', ''),
'source_display': source_display,
'tags': post_data.get('tags', []),
'external_url': post_data.get('url', '')
}
posts.append(post)
# Sort by timestamp (newest first)
posts.sort(key=lambda x: x['timestamp'], reverse=True)
# Calculate pagination
total_posts = len(posts)
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_posts = posts[start_idx:end_idx]
total_pages = (total_posts + per_page - 1) // per_page
has_next = page < total_pages
has_prev = page > 1
return {
'posts': paginated_posts,
'pagination': {
'current_page': page,
'total_pages': total_pages,
'total_posts': total_posts,
'per_page': per_page,
'has_next': has_next,
'has_prev': has_prev
}
}
except Exception as e:
print(f"Error loading posts: {e}")
return {'posts': [], 'error': str(e), 'pagination': {'current_page': 1, 'total_pages': 0, 'total_posts': 0, 'per_page': DEFAULT_PAGE_SIZE, 'has_next': False, 'has_prev': False}}
@app.route('/api/platforms')
def api_platforms():
"""API endpoint to get platform configuration and available communities"""
try:
platform_config = load_platform_config()
# Build community list for filtering UI
communities = []
posts_dir = Path('data/posts')
source_counts = {}
# Count posts per source to show actual available communities
for post_file in posts_dir.glob('*.json'):
try:
with open(post_file, 'r') as f:
post_data = json.load(f)
platform = post_data.get('platform', 'unknown')
source = post_data.get('source', '')
key = f"{platform}:{source}"
source_counts[key] = source_counts.get(key, 0) + 1
except:
continue
# Build community list from actual data and platform config
for key, count in source_counts.items():
platform, source = key.split(':', 1)
# Get display info from platform config
platform_info = platform_config.get('platforms', {}).get(platform, {})
community_info = None
if platform_info.get('supports_communities'):
for community in platform_info.get('communities', []):
if community['id'] == source:
community_info = community
break
# Create community entry
if community_info:
community_entry = {
'platform': platform,
'id': source,
'name': community_info['name'],
'display_name': community_info['display_name'],
'icon': community_info.get('icon', platform_info.get('icon', '📄')),
'count': count,
'description': community_info.get('description', '')
}
else:
# Fallback for sources not in config
display_name = get_display_name_for_source(platform, source, platform_config)
community_entry = {
'platform': platform,
'id': source,
'name': source or platform,
'display_name': display_name,
'icon': platform_info.get('icon', '📄'),
'count': count,
'description': f"Posts from {display_name}"
}
communities.append(community_entry)
# Sort communities by count (most posts first)
communities.sort(key=lambda x: x['count'], reverse=True)
return {
'platforms': platform_config.get('platforms', {}),
'communities': communities,
'total_communities': len(communities)
}
except Exception as e:
print(f"Error loading platform configuration: {e}")
return {
'platforms': {},
'communities': [],
'total_communities': 0,
'error': str(e)
}
@app.route('/api/content-timestamp')
def api_content_timestamp():
"""API endpoint to get the last content update timestamp for auto-refresh"""
try:
posts_dir = Path('data/posts')
if not posts_dir.exists():
return jsonify({'timestamp': 0})
# Get the most recent modification time of any post file
latest_mtime = 0
for post_file in posts_dir.glob('*.json'):
mtime = post_file.stat().st_mtime
if mtime > latest_mtime:
latest_mtime = mtime
return jsonify({'timestamp': latest_mtime})
except Exception as e:
logger.error(f"Error getting content timestamp: {e}")
return jsonify({'error': 'Failed to get content timestamp'}), 500
@app.route('/api/stats')
def api_stats():
"""API endpoint to get quick stats data"""
try:
# Load cached posts
cached_posts, cached_comments = _load_posts_cache()
# Count posts from today
today = datetime.utcnow().date()
posts_today = 0
for post_uuid, post_data in cached_posts.items():
post_timestamp = post_data.get('timestamp', 0)
post_date = datetime.fromtimestamp(post_timestamp).date()
if post_date == today:
posts_today += 1
return jsonify({
'posts_today': posts_today,
'total_posts': len(cached_posts)
})
except Exception as e:
logger.error(f"Error getting stats: {e}")
return jsonify({'error': 'Failed to get stats'}), 500
@app.route('/api/search')
def api_search():
"""API endpoint to search posts"""
try:
query = request.args.get('q', '').strip()
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 20))
if not query:
return jsonify({
'error': 'Search query is required'
}), 400
# Load cached posts
cached_posts, cached_comments = _load_posts_cache()
# Simple text search in title, content, and author
search_results = []
query_lower = query.lower()
for post_uuid, post_data in cached_posts.items():
title = post_data.get('title', '').lower()
content_preview = post_data.get('content_preview', '').lower()
author = post_data.get('author', '').lower()
tags = ' '.join(post_data.get('tags', [])).lower()
# Check if query matches any text field
if (query_lower in title or
query_lower in content_preview or
query_lower in author or
query_lower in tags):
# Get comment count
comment_count = len(cached_comments.get(post_uuid, []))
# Add search score (simple keyword matching)
score = 0
if query_lower in title:
score += 3 # Title matches are more important
if query_lower in content_preview:
score += 1
if query_lower in author:
score += 2
if query_lower in tags:
score += 1
# Create search result with post data
search_result = post_data.copy()
search_result['id'] = post_uuid
search_result['comment_count'] = comment_count
search_result['search_score'] = score
search_result['matched_fields'] = []
if query_lower in title:
search_result['matched_fields'].append('title')
if query_lower in content_preview:
search_result['matched_fields'].append('content')
if query_lower in author:
search_result['matched_fields'].append('author')
if query_lower in tags:
search_result['matched_fields'].append('tags')
search_results.append(search_result)
# Sort by search score (descending) and then by timestamp (descending)
search_results.sort(key=lambda x: (-x.get('search_score', 0), -x.get('timestamp', 0)))
# Apply pagination
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_results = search_results[start_idx:end_idx]
return jsonify({
'query': query,
'posts': paginated_results,
'pagination': {
'current_page': page,
'per_page': per_page,
'total_posts': len(search_results),
'total_pages': (len(search_results) + per_page - 1) // per_page,
'has_next': end_idx < len(search_results),
'has_prev': page > 1
}
})
except Exception as e:
logger.error(f"Error in search: {e}")
return jsonify({'error': 'Failed to perform search'}), 500
@app.route('/post/<post_id>')
def post_detail(post_id):
"""Serve individual post detail page with modern theme"""
try:
# Load platform configuration
platform_config = load_platform_config()
# Use cached data for better performance
cached_posts, cached_comments = _load_posts_cache()
# Get post data from cache
post_data = cached_posts.get(post_id)
if not post_data:
return render_template('404.html'), 404
# Add source display name
post_data['source_display'] = get_display_name_for_source(
post_data.get('platform', ''),
post_data.get('source', ''),
platform_config
)
# Get comments from cache
comments = cached_comments.get(post_id, [])
# Sort comments by timestamp
comments.sort(key=lambda x: x.get('timestamp', 0))
# Load user settings if authenticated
user_settings = {}
if current_user.is_authenticated:
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
return render_template('post_detail.html', post=post_data, comments=comments, user_settings=user_settings)
except Exception as e:
print(f"Error loading post {post_id}: {e}")
return render_template('404.html'), 404
@app.route('/themes/<path:filename>')
def serve_theme(filename):
"""Serve theme files (CSS, JS)"""
# Validate filename to prevent directory traversal
if not _is_safe_path(filename) or '..' in filename:
logger.warning(f"Unsafe theme file requested: {filename}")
abort(404)
return send_from_directory('themes', filename)
@app.route('/logo.png')
def serve_logo():
"""Serve logo"""
return send_from_directory('.', 'logo.png')
@app.route('/static/<path:filename>')
def serve_static(filename):
"""Serve static files (avatars, etc.)"""
# Validate filename to prevent directory traversal
if not _is_safe_path(filename) or '..' in filename:
logger.warning(f"Unsafe static file requested: {filename}")
abort(404)
return send_from_directory('static', filename)
# ============================================================
# AUTHENTICATION ROUTES
# ============================================================
@app.route('/login', methods=['GET', 'POST'])
def login():
"""Login page"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
remember = request.form.get('remember', False) == 'on'
if not user_service:
flash('User service not available', 'error')
return render_template('login.html')
user = user_service.authenticate(username, password)
if user:
login_user(user, remember=remember)
flash(f'Welcome back, {user.username}!', 'success')
# Redirect to next page or home
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('index'))
else:
flash('Invalid username or password', 'error')
return render_template('login.html')
@app.route('/forgot_username', methods=['GET', 'POST'])
def forgot_username():
"""Forgot username page"""
if request.method == 'POST':
email = request.form.get('email', '').strip()
if not email:
flash('Email address is required', 'error')
return render_template('forgot_username.html')
try:
# Find user by email
user = User.query.filter_by(email=email).first()
if user:
# Send username notification (simplified)
logger.info(f"Username requested for user {user.username} ({email})")
flash(f'Your username is: {user.username}', 'success')
else:
# Don't reveal if email exists for security
logger.info(f"Username requested for unknown email: {email}")
flash('If this email address is registered, you will receive your username.', 'success')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Error in username request: {e}")
flash('An error occurred. Please try again.', 'error')
return render_template('forgot_username.html')
return render_template('forgot_username.html')
@app.route('/forgot_password', methods=['GET', 'POST'])
def forgot_password():
"""Forgot password page"""
if request.method == 'POST':
email = request.form.get('email', '').strip()
if not email:
flash('Email address is required', 'error')
return render_template('forgot_password.html')
try:
# Find user by email
user = User.query.filter_by(email=email).first()
if user:
# Generate password reset token (simplified)
from datetime import datetime, timedelta
import hashlib
# Create a simple token
timestamp = datetime.utcnow().isoformat()
token_string = f"{user.id}:{email}:{timestamp}"
token = hashlib.sha256(token_string.encode()).hexdigest()[:32]
# Store token with expiration
user.settings = user.settings or '{}'
import json
settings = json.loads(user.settings)
settings['password_reset_token'] = token
settings['password_reset_expires'] = (datetime.utcnow() + timedelta(hours=1)).isoformat()
user.settings = json.dumps(settings)
db.session.commit()
# Log the reset request for security
logger.info(f"Password reset requested for user {user.username} ({email})")
flash('Password reset instructions have been sent to your email address.', 'success')
# Note: In production, you would send an actual email with the reset link here
# Example: send_password_reset_email(user.email, token)
else:
# Don't reveal if email exists for security
logger.info(f"Password reset requested for unknown email: {email}")
flash('If this email address is registered, you will receive reset instructions.', 'success')
return redirect(url_for('login'))
except Exception as e:
logger.error(f"Error in password reset request: {e}")
flash('An error occurred. Please try again.', 'error')
return render_template('forgot_password.html')
return render_template('forgot_password.html')
@app.route('/reset_password/<token>', methods=['GET', 'POST'])
def reset_password(token):
"""Password reset page with token"""
try:
# Find user with this token
from datetime import datetime
user = User.query.filter(
(User.settings.op('~*')(token)) | # Token in settings
(User.email.op('~*')(token[:20])) # Partial match in email as fallback
).first()
if not user:
flash('Invalid or expired reset link.', 'error')
return redirect(url_for('forgot_password'))
# Check if token exists and is valid (simplified check)
user_settings = json.loads(user.settings or '{}')
stored_token = user_settings.get('password_reset_token')
expires_str = user_settings.get('password_reset_expires')
if not stored_token or stored_token != token:
flash('Invalid or expired reset link.', 'error')
return redirect(url_for('forgot_password'))
# Check expiration
if expires_str:
expires_time = datetime.fromisoformat(expires_str)
if datetime.utcnow() > expires_time:
flash('Reset link has expired.', 'error')
return redirect(url_for('forgot_password'))
# Handle password reset
if request.method == 'POST':
password = request.form.get('password', '').strip()
confirm_password = request.form.get('confirm_password', '').strip()
if not password or not confirm_password:
flash('Both password fields are required.', 'error')
return render_template('reset_password.html', token=token)
if password != confirm_password:
flash('Passwords do not match.', 'error')
return render_template('reset_password.html', token=token)
if len(password) < 8:
flash('Password must be at least 8 characters long.', 'error')
return render_template('reset_password.html', token=token)
# Update password
user.set_password(password)
# Clear the reset token
user_settings.pop('password_reset_token', None)
user_settings.pop('password_reset_expires', None)
user.settings = json.dumps(user_settings)
db.session.commit()
logger.info(f"Password reset completed for user {user.username}")
flash('Your password has been successfully reset. Please log in.', 'success')
return redirect(url_for('login'))
return render_template('reset_password.html', token=token)
except Exception as e:
logger.error(f"Error in password reset: {e}")
flash('An error occurred. Please try again.', 'error')
return redirect(url_for('forgot_password'))
# Auth0 Routes
@app.route('/auth0/login')
def auth0_login():
"""Redirect to Auth0 for authentication"""
redirect_uri = url_for('auth0_callback', _external=True)
return auth0.authorize_redirect(redirect_uri)
@app.route('/auth0/callback')
def auth0_callback():
"""Handle Auth0 callback and create/login user"""
try:
# Get the access token from Auth0
token = auth0.authorize_access_token()
# Get user info from Auth0
user_info = token.get('userinfo')
if not user_info:
user_info = auth0.parse_id_token(token)
# Extract user details
auth0_id = user_info.get('sub')
email = user_info.get('email')
username = user_info.get('nickname') or user_info.get('preferred_username') or email.split('@')[0]
if not auth0_id or not email:
flash('Unable to get user information from Auth0', 'error')
return redirect(url_for('login'))
# Check if user exists with this Auth0 ID
user = user_service.get_user_by_auth0_id(auth0_id)
if not user:
# Check if user exists with this email (for account linking)
existing_user = user_service.get_user_by_email(email)
if existing_user:
# Link existing account to Auth0
user_service.link_auth0_account(existing_user.id, auth0_id)
user = existing_user
flash(f'Account linked successfully! Welcome back, {user.username}!', 'success')
else:
# Create new user
# Generate unique username if needed
base_username = username[:MAX_USERNAME_LENGTH-3] # Leave room for suffix
unique_username = base_username
counter = 1
while user_service.username_exists(unique_username):
unique_username = f"{base_username}_{counter}"
counter += 1
user_id = user_service.create_user(
username=unique_username,
email=email,
password=None, # No password for OAuth users
is_admin=False,
auth0_id=auth0_id
)
if user_id:
user = user_service.get_user_by_id(user_id)
flash(f'Account created successfully! Welcome, {user.username}!', 'success')
else:
flash('Failed to create user account', 'error')
return redirect(url_for('login'))
else:
flash(f'Welcome back, {user.username}!', 'success')
# Log in the user
if user:
login_user(user, remember=True)
# Store Auth0 info in session for future use
session['auth0_user_info'] = user_info
# Redirect to next page or home
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('index'))
except Exception as e:
logger.error(f"Auth0 callback error: {e}")
flash('Authentication failed. Please try again.', 'error')
return redirect(url_for('login'))
@app.route('/auth0/logout')
@login_required
def auth0_logout():
"""Logout from Auth0 and local session"""
# Clear session
session.clear()
logout_user()
# Build Auth0 logout URL
domain = app.config['AUTH0_DOMAIN']
client_id = app.config['AUTH0_CLIENT_ID']
return_to = url_for('index', _external=True)
logout_url = f'https://{domain}/v2/logout?' + urlencode({
'returnTo': return_to,
'client_id': client_id
}, quote_via=quote_plus)
return redirect(logout_url)
@app.route('/admin-setup', methods=['GET', 'POST'])
def admin_setup():
"""Create first admin user"""
# Check if users already exist
try:
user_count = User.query.count()
if user_count > 0:
flash('Admin user already exists.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.warning(f"Database error checking existing users: {e}")
pass
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
password_confirm = request.form.get('password_confirm')
# Validation
if not username or not email or not password:
flash('All fields are required', 'error')
return render_template('admin_setup.html')
if password != password_confirm:
flash('Passwords do not match', 'error')
return render_template('admin_setup.html')
if len(password) < MIN_PASSWORD_LENGTH:
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
return render_template('admin_setup.html')
# Create admin user
user_id = user_service.create_user(username, email, password, is_admin=True)
if user_id:
flash('Admin account created successfully! Please log in.', 'success')
return redirect(url_for('login'))
else:
flash('Error creating admin account. Please try again.', 'error')
return render_template('admin_setup.html')
@app.route('/signup', methods=['GET', 'POST'])
def signup():
"""Signup page"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
password_confirm = request.form.get('password_confirm')
if not user_service:
flash('User service not available', 'error')
return render_template('signup.html')
# Validation
if not username or not email or not password:
flash('All fields are required', 'error')
return render_template('signup.html')
if password != password_confirm:
flash('Passwords do not match', 'error')
return render_template('signup.html')
if len(password) < MIN_PASSWORD_LENGTH:
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
return render_template('signup.html')
if user_service.username_exists(username):
flash('Username already taken', 'error')
return render_template('signup.html')
if user_service.email_exists(email):
flash('Email already registered', 'error')
return render_template('signup.html')
# Create user
user_id = user_service.create_user(username, email, password)
if user_id:
flash('Account created successfully! Please log in.', 'success')
return redirect(url_for('login'))
else:
flash('Error creating account. Please try again.', 'error')
return render_template('signup.html')
@app.route('/logout')
@login_required
def logout():
"""Logout current user"""
logout_user()
flash('You have been logged out.', 'info')
return redirect(url_for('index'))
@app.route('/settings')
@login_required
def settings():
"""Main settings page"""
# Load user settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Load available filter sets
try:
with open('filtersets.json', 'r') as f:
filter_sets = json.load(f)
except:
filter_sets = {}
return render_template('settings.html',
user=current_user,
user_settings=user_settings,
filter_sets=filter_sets)
@app.route('/settings/profile', methods=['GET', 'POST'])
@login_required
def settings_profile():
"""Profile settings page"""
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
default_avatar = request.form.get('default_avatar')
# Validation
if not username or not email:
flash('Username and email are required', 'error')
return render_template('settings_profile.html', user=current_user)
# Check if username is taken by another user
if username != current_user.username and user_service.username_exists(username):
flash('Username already taken', 'error')
return render_template('settings_profile.html', user=current_user)
# Check if email is taken by another user
if email != current_user.email and user_service.email_exists(email):
flash('Email already registered', 'error')
return render_template('settings_profile.html', user=current_user)
# Update user
current_user.username = username
current_user.email = email
# Handle default avatar selection
if default_avatar and default_avatar.startswith('default_'):
current_user.profile_picture_url = f"/static/default-avatars/{default_avatar}.png"
db.session.commit()
flash('Profile updated successfully', 'success')
return redirect(url_for('settings'))
# Available default avatars
default_avatars = [
{'id': 'default_1', 'name': 'Gradient Blue', 'bg': 'linear-gradient(135deg, #667eea 0%, #764ba2 100%)'},
{'id': 'default_2', 'name': 'Gradient Green', 'bg': 'linear-gradient(135deg, #4facfe 0%, #00f2fe 100%)'},
{'id': 'default_3', 'name': 'Gradient Orange', 'bg': 'linear-gradient(135deg, #fa709a 0%, #fee140 100%)'},
{'id': 'default_4', 'name': 'Gradient Purple', 'bg': 'linear-gradient(135deg, #a8edea 0%, #fed6e3 100%)'},
{'id': 'default_5', 'name': 'Brand Colors', 'bg': 'linear-gradient(135deg, #4db6ac 0%, #26a69a 100%)'},
{'id': 'default_6', 'name': 'Sunset', 'bg': 'linear-gradient(135deg, #ff7e5f 0%, #feb47b 100%)'},
]
return render_template('settings_profile.html', user=current_user, default_avatars=default_avatars)
@app.route('/settings/communities', methods=['GET', 'POST'])
@login_required
def settings_communities():
"""Community/source selection settings"""
if request.method == 'POST':
# Get selected communities
selected_communities = request.form.getlist('communities')
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Update communities
user_settings['communities'] = selected_communities
# Save settings
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Community preferences updated', 'success')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
selected_communities = user_settings.get('communities', [])
except:
selected_communities = []
# Available communities
available_communities = [
{'id': 'programming', 'name': 'Programming', 'platform': 'reddit'},
{'id': 'python', 'name': 'Python', 'platform': 'reddit'},
{'id': 'technology', 'name': 'Technology', 'platform': 'reddit'},
{'id': 'hackernews', 'name': 'Hacker News', 'platform': 'hackernews'},
{'id': 'lobsters', 'name': 'Lobsters', 'platform': 'lobsters'},
{'id': 'stackoverflow', 'name': 'Stack Overflow', 'platform': 'stackoverflow'},
]
return render_template('settings_communities.html',
user=current_user,
available_communities=available_communities,
selected_communities=selected_communities)
@app.route('/settings/filters', methods=['GET', 'POST'])
@login_required
def settings_filters():
"""Filter settings page"""
if request.method == 'POST':
selected_filter = request.form.get('filter_set', 'no_filter')
# Load and validate current settings
user_settings = _validate_user_settings(current_user.settings)
# Validate new filter setting
if _is_safe_filterset(selected_filter):
user_settings['filter_set'] = selected_filter
else:
flash('Invalid filter selection', 'error')
return redirect(url_for('settings'))
# Save validated settings
try:
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Filter settings updated successfully', 'success')
except Exception as e:
db.session.rollback()
logger.error(f"Error saving filter settings for user {current_user.id}: {e}")
flash('Error saving settings', 'error')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
current_filter = user_settings.get('filter_set', 'no_filter')
# Load available filter sets
filter_sets = {}
try:
with open('filtersets.json', 'r') as f:
filter_sets = json.load(f)
except:
filter_sets = {}
return render_template('settings_filters.html',
user=current_user,
filter_sets=filter_sets,
current_filter=current_filter)
@app.route('/settings/experience', methods=['GET', 'POST'])
@login_required
def settings_experience():
"""Experience and behavioral settings page - opt-in addictive features"""
if request.method == 'POST':
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Get experience settings with defaults (all opt-in, so default to False)
user_settings['experience'] = {
'infinite_scroll': request.form.get('infinite_scroll') == 'on',
'auto_refresh': request.form.get('auto_refresh') == 'on',
'push_notifications': request.form.get('push_notifications') == 'on',
'dark_patterns_opt_in': request.form.get('dark_patterns_opt_in') == 'on'
}
# Save settings
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Experience settings updated successfully', 'success')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
experience_settings = user_settings.get('experience', {
'infinite_scroll': False,
'auto_refresh': False,
'push_notifications': False,
'dark_patterns_opt_in': False
})
return render_template('settings_experience.html',
user=current_user,
experience_settings=experience_settings)
@app.route('/upload-avatar', methods=['POST'])
@login_required
def upload_avatar():
"""Upload profile picture"""
if 'avatar' not in request.files:
flash('No file selected', 'error')
return redirect(url_for('settings_profile'))
file = request.files['avatar']
if file.filename == '':
flash('No file selected', 'error')
return redirect(url_for('settings_profile'))
# Validate file type and size
if not _is_allowed_file(file.filename):
flash('Invalid file type. Please upload PNG, JPG, or GIF', 'error')
return redirect(url_for('settings_profile'))
# Check file size (Flask's MAX_CONTENT_LENGTH handles this too, but double-check)
if hasattr(file, 'content_length') and file.content_length > app.config['MAX_CONTENT_LENGTH']:
flash('File too large. Maximum size is 16MB', 'error')
return redirect(url_for('settings_profile'))
# Validate and secure filename
filename = secure_filename(file.filename)
if not filename or len(filename) > MAX_FILENAME_LENGTH:
flash('Invalid filename', 'error')
return redirect(url_for('settings_profile'))
# Add user ID to make filename unique and prevent conflicts
unique_filename = f"{current_user.id}_{filename}"
# Ensure upload directory exists and is secure
upload_dir = os.path.abspath(UPLOAD_FOLDER)
os.makedirs(upload_dir, exist_ok=True)
upload_path = os.path.join(upload_dir, unique_filename)
# Final security check - ensure path is within upload directory
if not os.path.abspath(upload_path).startswith(upload_dir):
logger.warning(f"Path traversal attempt in file upload: {upload_path}")
flash('Invalid file path', 'error')
return redirect(url_for('settings_profile'))
try:
file.save(upload_path)
logger.info(f"File uploaded successfully: {unique_filename} by user {current_user.id}")
except Exception as e:
logger.error(f"Error saving uploaded file: {e}")
flash('Error saving file', 'error')
return redirect(url_for('settings_profile'))
# Update user profile
current_user.profile_picture_url = f"/static/avatars/{unique_filename}"
db.session.commit()
flash('Profile picture updated successfully', 'success')
return redirect(url_for('settings_profile'))
@app.route('/profile')
@login_required
def profile():
"""User profile page"""
return render_template('profile.html', user=current_user)
# ============================================================
# ADMIN ROUTES
# ============================================================
@app.route('/admin')
@login_required
def admin_panel():
"""Admin panel - user management"""
if not current_user.is_admin:
flash('Access denied. Admin privileges required.', 'error')
return redirect(url_for('index'))
if not user_service:
flash('User service not available', 'error')
return redirect(url_for('index'))
users = user_service.get_all_users()
return render_template('admin.html', users=users)
@app.route('/admin/user/<user_id>/delete', methods=['POST'])
@login_required
def admin_delete_user(user_id):
"""Delete user (admin only)"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
# Prevent self-deletion
if current_user.id == user_id:
flash('You cannot delete your own account!', 'error')
return redirect(url_for('admin_panel'))
user = user_service.get_user_by_id(user_id)
if user:
username = user.username
if user_service.delete_user(user_id):
flash(f'User {username} has been deleted.', 'success')
logger.info(f"Admin {current_user.id} deleted user {username} ({user_id})")
else:
flash('Error deleting user', 'error')
logger.error(f"Failed to delete user {user_id}")
else:
flash('User not found', 'error')
return redirect(url_for('admin_panel'))
@app.route('/admin/user/<user_id>/toggle-admin', methods=['POST'])
@login_required
def admin_toggle_admin(user_id):
"""Toggle user admin status"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
target_user = user_service.get_user_by_id(user_id)
if target_user:
new_status = not target_user.is_admin # Toggle admin status
user_service.update_user_admin_status(user_id, new_status)
flash('Admin status updated', 'success')
else:
flash('User not found', 'error')
return redirect(url_for('admin_panel'))
# This route is duplicate - removed in favor of the UUID-based route above
# This route is duplicate - removed in favor of the UUID-based route above
@app.route('/admin/regenerate_content', methods=['POST'])
@login_required
def admin_regenerate_content():
"""Regenerate all HTML content"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
import subprocess
import shlex
# Secure subprocess execution with absolute paths and validation
script_path = os.path.abspath('generate_html.py')
if not os.path.exists(script_path):
flash('Content generation script not found', 'error')
return redirect(url_for('admin_panel'))
# Use absolute python path and validate arguments
python_exe = os.path.abspath(os.sys.executable)
cmd = [python_exe, script_path, '--filterset', 'no_filter', '--theme', 'vanilla-js']
# Execute with timeout and security restrictions
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=os.path.abspath('.'),
timeout=300, # 5 minute timeout
check=False
)
if result.returncode == 0:
flash('Content regenerated successfully', 'success')
logger.info(f"Content regenerated by admin user {current_user.id}")
# Invalidate cache since content was regenerated
_invalidate_cache()
else:
flash('Error regenerating content', 'error')
logger.error(f"Content regeneration failed: {result.stderr}")
except subprocess.TimeoutExpired:
flash('Content regeneration timed out', 'error')
logger.error("Content regeneration timed out")
except Exception as e:
flash(f'Error regenerating content: {str(e)}', 'error')
logger.error(f"Content regeneration error: {e}")
return redirect(url_for('admin_panel'))
@app.route('/admin/clear_cache', methods=['POST'])
@login_required
def admin_clear_cache():
"""Clear application cache"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
# Clear any cache directories or temp files
import shutil
import os
cache_dirs = ['cache', 'temp']
for cache_dir in cache_dirs:
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
# Clear application cache
_invalidate_cache()
flash('Cache cleared successfully', 'success')
logger.info(f"Cache cleared by admin user {current_user.id}")
except Exception as e:
flash(f'Error clearing cache: {str(e)}', 'error')
logger.error(f"Cache clearing error: {e}")
return redirect(url_for('admin_panel'))
@app.route('/admin/backup_data', methods=['POST'])
@login_required
def admin_backup_data():
"""Create backup of application data"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
import shutil
import os
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f'balanceboard_backup_{timestamp}'
# Create backup directory
backup_dir = f'backups/{backup_name}'
os.makedirs(backup_dir, exist_ok=True)
# Copy important directories
dirs_to_backup = ['data', 'templates', 'themes', 'static']
for dir_name in dirs_to_backup:
if os.path.exists(dir_name):
shutil.copytree(dir_name, f'{backup_dir}/{dir_name}')
# Copy important files
files_to_backup = ['app.py', 'models.py', 'database.py', 'filtersets.json']
for file_name in files_to_backup:
if os.path.exists(file_name):
shutil.copy2(file_name, backup_dir)
flash(f'Backup created: {backup_name}', 'success')
except Exception as e:
flash(f'Error creating backup: {str(e)}', 'error')
return redirect(url_for('admin_panel'))
# ============================================================
# POLLING MANAGEMENT ROUTES
# ============================================================
@app.route('/admin/polling')
@login_required
def admin_polling():
"""Admin polling management page"""
if not current_user.is_admin:
flash('Access denied. Admin privileges required.', 'error')
return redirect(url_for('index'))
from models import PollSource, PollLog
from polling_service import polling_service
# Get all poll sources with recent logs
sources = PollSource.query.order_by(PollSource.platform, PollSource.display_name).all()
# Get scheduler status
scheduler_status = polling_service.get_status()
# Load platform config for available sources
platform_config = load_platform_config()
return render_template('admin_polling.html',
sources=sources,
scheduler_status=scheduler_status,
platform_config=platform_config)
@app.route('/admin/polling/add', methods=['POST'])
@login_required
def admin_polling_add():
"""Add a new poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
platform = request.form.get('platform')
source_id = request.form.get('source_id')
custom_source_id = request.form.get('custom_source_id')
display_name = request.form.get('display_name')
poll_interval = int(request.form.get('poll_interval', 60))
max_posts = int(request.form.get('max_posts', 100))
fetch_comments = request.form.get('fetch_comments', 'true') == 'true'
priority = request.form.get('priority', 'medium')
# Use custom source if provided, otherwise use dropdown
if custom_source_id and custom_source_id.strip():
source_id = custom_source_id.strip()
if not platform or not source_id or not display_name:
flash('Missing required fields', 'error')
return redirect(url_for('admin_polling'))
# Check if source already exists
existing = PollSource.query.filter_by(platform=platform, source_id=source_id).first()
if existing:
flash(f'Source {platform}:{source_id} already exists', 'warning')
return redirect(url_for('admin_polling'))
# Create new source (disabled by default)
source = PollSource(
platform=platform,
source_id=source_id,
display_name=display_name,
poll_interval_minutes=poll_interval,
max_posts=max_posts,
fetch_comments=fetch_comments,
priority=priority,
enabled=False,
created_by=current_user.id
)
db.session.add(source)
db.session.commit()
flash(f'Added polling source: {display_name}', 'success')
logger.info(f"Admin {current_user.id} added poll source {platform}:{source_id}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling/<source_id>/toggle', methods=['POST'])
@login_required
def admin_polling_toggle(source_id):
"""Toggle a poll source on/off"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
source.enabled = not source.enabled
db.session.commit()
status = 'enabled' if source.enabled else 'disabled'
flash(f'Polling {status} for {source.display_name}', 'success')
return redirect(url_for('admin_polling'))
@app.route('/admin/polling/<source_id>/update', methods=['POST'])
@login_required
def admin_polling_update(source_id):
"""Update poll source configuration"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Update all configurable fields
if request.form.get('poll_interval'):
source.poll_interval_minutes = int(request.form.get('poll_interval'))
if request.form.get('max_posts'):
source.max_posts = int(request.form.get('max_posts'))
if request.form.get('fetch_comments') is not None:
source.fetch_comments = request.form.get('fetch_comments') == 'true'
if request.form.get('priority'):
source.priority = request.form.get('priority')
if request.form.get('display_name'):
source.display_name = request.form.get('display_name')
db.session.commit()
flash(f'Updated settings for {source.display_name}', 'success')
return redirect(url_for('admin_polling'))
@app.route('/admin/polling/<source_id>/poll-now', methods=['POST'])
@login_required
def admin_polling_poll_now(source_id):
"""Manually trigger polling for a source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
from polling_service import polling_service
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Trigger polling in background
try:
polling_service.poll_now(source_id)
flash(f'Polling started for {source.display_name}', 'success')
except Exception as e:
flash(f'Error starting poll: {str(e)}', 'error')
logger.error(f"Error triggering poll for {source_id}: {e}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling/<source_id>/delete', methods=['POST'])
@login_required
def admin_polling_delete(source_id):
"""Delete a poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
display_name = source.display_name
db.session.delete(source)
db.session.commit()
flash(f'Deleted polling source: {display_name}', 'success')
logger.info(f"Admin {current_user.id} deleted poll source {source_id}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling/<source_id>/logs')
@login_required
def admin_polling_logs(source_id):
"""View logs for a specific poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource, PollLog
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Get recent logs (limit to 50)
logs = source.logs.limit(50).all()
return render_template('admin_polling_logs.html',
source=source,
logs=logs)
# ============================================================
# ERROR HANDLERS
# ============================================================
@app.errorhandler(404)
def not_found(e):
"""404 page"""
return render_template('404.html'), 404
@app.errorhandler(500)
def server_error(e):
"""500 page"""
return render_template('500.html'), 500
# ============================================================
# INITIALIZATION
# ============================================================
if __name__ == '__main__':
print("✓ BalanceBoard starting...")
print("✓ Database: PostgreSQL with SQLAlchemy")
print("✓ Password hashing: bcrypt")
print("✓ Authentication: Flask-Login")
app.run(host='0.0.0.0', port=DEFAULT_PORT, debug=True)