Files
balanceboard/app.py
chelsea 5d6da930df Investigate comments loading issue (Issue #4)
- Added debug logging to post_detail route to track comment loading
- Created migration script for poll source fields (max_posts, fetch_comments, priority)
- Migration adds default values to ensure comments are fetched

The issue may be that existing poll sources in database dont have fetch_comments field.
Migration needs to be run on server with database access.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 19:43:55 -05:00

1639 lines
56 KiB
Python

"""
BalanceBoard Web Application
Flask server with user authentication and content serving.
"""
import os
import re
import logging
import time
from pathlib import Path
from werkzeug.utils import secure_filename
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, abort, session, jsonify
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from dotenv import load_dotenv
from functools import lru_cache
from collections import defaultdict
from authlib.integrations.flask_client import OAuth
from urllib.parse import quote_plus, urlencode
from database import init_db, db
from models import User, bcrypt
from user_service import UserService
import json
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__,
static_folder='themes',
template_folder='templates')
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
# Auth0 Configuration
app.config['AUTH0_DOMAIN'] = os.getenv('AUTH0_DOMAIN', '')
app.config['AUTH0_CLIENT_ID'] = os.getenv('AUTH0_CLIENT_ID', '')
app.config['AUTH0_CLIENT_SECRET'] = os.getenv('AUTH0_CLIENT_SECRET', '')
app.config['AUTH0_AUDIENCE'] = os.getenv('AUTH0_AUDIENCE', '')
# Configuration constants
ALLOWED_FILTERSETS = {'no_filter', 'safe_content'}
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
UPLOAD_FOLDER = 'static/avatars'
MAX_FILENAME_LENGTH = 100
DEFAULT_PORT = 5021
DEFAULT_PAGE_SIZE = 20
MIN_PASSWORD_LENGTH = 8
MAX_USERNAME_LENGTH = 80
MAX_EMAIL_LENGTH = 120
MAX_COMMUNITY_NAME_LENGTH = 100
# Initialize database
init_db(app)
# Initialize bcrypt
bcrypt.init_app(app)
# Initialize Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Please log in to access this page.'
# Initialize user service
user_service = UserService()
# Initialize polling service
from polling_service import polling_service
polling_service.init_app(app)
polling_service.start()
# Initialize OAuth for Auth0
oauth = OAuth(app)
auth0 = oauth.register(
'auth0',
client_id=app.config['AUTH0_CLIENT_ID'],
client_secret=app.config['AUTH0_CLIENT_SECRET'],
server_metadata_url=f'https://{app.config["AUTH0_DOMAIN"]}/.well-known/openid_configuration',
client_kwargs={
'scope': 'openid profile email',
}
)
# Cache for posts and comments - improves performance
post_cache = {}
comment_cache = defaultdict(list)
cache_timestamp = 0
CACHE_DURATION = 300 # 5 minutes
# Security helper functions
def _is_safe_filterset(filterset):
"""Validate filterset name for security"""
if not filterset or not isinstance(filterset, str):
return False
return filterset in ALLOWED_FILTERSETS and re.match(r'^[a-zA-Z0-9_-]+$', filterset)
def _is_safe_path(path):
"""Validate file path for security"""
if not path or not isinstance(path, str):
return False
# Check for directory traversal attempts
if '..' in path or path.startswith('/') or '\\' in path:
return False
# Only allow alphanumeric, dots, hyphens, underscores, and forward slashes
return re.match(r'^[a-zA-Z0-9._/-]+$', path) is not None
def _is_allowed_file(filename):
"""Check if file extension is allowed"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def _load_posts_cache():
"""Load and cache posts data for better performance"""
global post_cache, comment_cache, cache_timestamp
current_time = time.time()
if current_time - cache_timestamp < CACHE_DURATION and post_cache:
return post_cache, comment_cache
# Clear existing cache
post_cache.clear()
comment_cache.clear()
posts_dir = Path('data/posts')
comments_dir = Path('data/comments')
# Load all posts
if posts_dir.exists():
for post_file in posts_dir.glob('*.json'):
try:
with open(post_file, 'r') as f:
post_data = json.load(f)
post_uuid = post_data.get('uuid')
if post_uuid:
post_cache[post_uuid] = post_data
except (json.JSONDecodeError, IOError) as e:
logger.debug(f"Error reading post file {post_file}: {e}")
continue
# Load all comments and group by post UUID
if comments_dir.exists():
for comment_file in comments_dir.glob('*.json'):
try:
with open(comment_file, 'r') as f:
comment_data = json.load(f)
post_uuid = comment_data.get('post_uuid')
if post_uuid:
comment_cache[post_uuid].append(comment_data)
except (json.JSONDecodeError, IOError) as e:
logger.debug(f"Error reading comment file {comment_file}: {e}")
continue
cache_timestamp = current_time
logger.info(f"Cache refreshed: {len(post_cache)} posts, {len(comment_cache)} comment groups")
return post_cache, comment_cache
def _invalidate_cache():
"""Invalidate the cache to force refresh"""
global cache_timestamp
cache_timestamp = 0
def _validate_user_settings(settings_str):
"""Validate and sanitize user settings JSON"""
try:
if not settings_str:
return {}
settings = json.loads(settings_str)
if not isinstance(settings, dict):
logger.warning("User settings must be a JSON object")
return {}
# Validate specific fields
validated = {}
# Filter set validation
if 'filter_set' in settings:
filter_set = settings['filter_set']
if isinstance(filter_set, str) and _is_safe_filterset(filter_set):
validated['filter_set'] = filter_set
# Communities validation
if 'communities' in settings:
communities = settings['communities']
if isinstance(communities, list):
# Validate each community name
safe_communities = []
for community in communities:
if isinstance(community, str) and len(community) <= MAX_COMMUNITY_NAME_LENGTH and re.match(r'^[a-zA-Z0-9_-]+$', community):
safe_communities.append(community)
validated['communities'] = safe_communities
# Experience settings validation
if 'experience' in settings:
exp = settings['experience']
if isinstance(exp, dict):
safe_exp = {}
bool_fields = ['infinite_scroll', 'auto_refresh', 'push_notifications', 'dark_patterns_opt_in']
for field in bool_fields:
if field in exp and isinstance(exp[field], bool):
safe_exp[field] = exp[field]
validated['experience'] = safe_exp
return validated
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Invalid user settings JSON: {e}")
return {}
# Add custom Jinja filters
@app.template_filter('nl2br')
def nl2br_filter(text):
"""Convert newlines to <br> tags"""
if not text:
return text
return text.replace('\n', '<br>\n')
@login_manager.user_loader
def load_user(user_id):
"""Load user by ID for Flask-Login"""
return user_service.get_user_by_id(user_id)
# ============================================================
# STATIC CONTENT ROUTES
# ============================================================
@app.before_request
def check_first_user():
"""Check if any users exist, redirect to admin creation if not"""
# Skip for static files and auth routes
if request.endpoint and (
request.endpoint.startswith('static') or
request.endpoint in ['login', 'signup', 'admin_setup', 'serve_theme', 'serve_logo']
):
return
# Skip if user is already authenticated
if current_user.is_authenticated:
return
# Check if any users exist
try:
user_count = User.query.count()
if user_count == 0:
return redirect(url_for('admin_setup'))
except Exception as e:
# If database is not ready, skip check
logger.warning(f"Database not ready for user count check: {e}")
pass
@app.route('/')
def index():
"""Serve the main feed page"""
if current_user.is_authenticated:
# Load user settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Invalid user settings JSON for user {current_user.id}: {e}")
user_settings = {}
return render_template('dashboard.html', user_settings=user_settings)
else:
# Anonymous mode - allow browsing with default settings
user_settings = {
'filter_set': 'no_filter',
'communities': [],
'experience': {
'infinite_scroll': False,
'auto_refresh': False,
'push_notifications': False,
'dark_patterns_opt_in': False
}
}
return render_template('dashboard.html', user_settings=user_settings, anonymous=True)
@app.route('/feed/<filterset>')
def feed_content(filterset='no_filter'):
"""Serve filtered feed content"""
# Validate filterset to prevent directory traversal
if not _is_safe_filterset(filterset):
logger.warning(f"Invalid filterset requested: {filterset}")
abort(404)
# Additional path validation
safe_path = os.path.normpath(f'active_html/{filterset}/index.html')
if not safe_path.startswith('active_html/'):
logger.warning(f"Path traversal attempt detected: {filterset}")
abort(404)
return send_from_directory(f'active_html/{filterset}', 'index.html')
def load_platform_config():
"""Load platform configuration"""
try:
with open('platform_config.json', 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
logger.warning(f"Could not load platform config: {e}")
return {"platforms": {}, "collection_targets": []}
def get_display_name_for_source(platform, source, platform_config):
"""Get proper display name for a source based on platform"""
if not platform_config or 'platforms' not in platform_config:
return source
platform_info = platform_config['platforms'].get(platform, {})
# For platforms with communities, find the community info
if platform_info.get('supports_communities'):
for community in platform_info.get('communities', []):
if community['id'] == source:
return community['display_name']
# Fallback to prefix + source for Reddit-like platforms
prefix = platform_info.get('prefix', '')
return f"{prefix}{source}" if source else platform_info.get('name', platform)
else:
# For platforms without communities, use the platform name
return platform_info.get('name', platform)
@app.route('/api/posts')
def api_posts():
"""API endpoint to get posts data with pagination and filtering"""
try:
# Load platform configuration
platform_config = load_platform_config()
# Get query parameters
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE))
community = request.args.get('community', '')
platform = request.args.get('platform', '')
# Use cached data for better performance
cached_posts, cached_comments = _load_posts_cache()
posts = []
# Process cached posts
for post_uuid, post_data in cached_posts.items():
# Apply community filter
if community and post_data.get('source', '').lower() != community.lower():
continue
# Apply platform filter
if platform and post_data.get('platform', '').lower() != platform.lower():
continue
# Get comment count from cache
comment_count = len(cached_comments.get(post_uuid, []))
# Get proper display name for source
source_display = get_display_name_for_source(
post_data.get('platform', ''),
post_data.get('source', ''),
platform_config
)
# Create post object with actual title
post = {
'id': post_uuid,
'title': post_data.get('title', 'Untitled'),
'author': post_data.get('author', 'Unknown'),
'platform': post_data.get('platform', 'unknown'),
'score': post_data.get('score', 0),
'timestamp': post_data.get('timestamp', 0),
'url': f'/post/{post_uuid}',
'comments_count': comment_count,
'content_preview': (post_data.get('content', '') or '')[:200] + '...' if post_data.get('content') else '',
'source': post_data.get('source', ''),
'source_display': source_display,
'tags': post_data.get('tags', []),
'external_url': post_data.get('url', '')
}
posts.append(post)
# Sort by timestamp (newest first)
posts.sort(key=lambda x: x['timestamp'], reverse=True)
# Calculate pagination
total_posts = len(posts)
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_posts = posts[start_idx:end_idx]
total_pages = (total_posts + per_page - 1) // per_page
has_next = page < total_pages
has_prev = page > 1
return {
'posts': paginated_posts,
'pagination': {
'current_page': page,
'total_pages': total_pages,
'total_posts': total_posts,
'per_page': per_page,
'has_next': has_next,
'has_prev': has_prev
}
}
except Exception as e:
print(f"Error loading posts: {e}")
return {'posts': [], 'error': str(e), 'pagination': {'current_page': 1, 'total_pages': 0, 'total_posts': 0, 'per_page': DEFAULT_PAGE_SIZE, 'has_next': False, 'has_prev': False}}
@app.route('/api/platforms')
def api_platforms():
"""API endpoint to get platform configuration and available communities"""
try:
platform_config = load_platform_config()
# Build community list for filtering UI
communities = []
posts_dir = Path('data/posts')
source_counts = {}
# Count posts per source to show actual available communities
for post_file in posts_dir.glob('*.json'):
try:
with open(post_file, 'r') as f:
post_data = json.load(f)
platform = post_data.get('platform', 'unknown')
source = post_data.get('source', '')
key = f"{platform}:{source}"
source_counts[key] = source_counts.get(key, 0) + 1
except:
continue
# Build community list from actual data and platform config
for key, count in source_counts.items():
platform, source = key.split(':', 1)
# Get display info from platform config
platform_info = platform_config.get('platforms', {}).get(platform, {})
community_info = None
if platform_info.get('supports_communities'):
for community in platform_info.get('communities', []):
if community['id'] == source:
community_info = community
break
# Create community entry
if community_info:
community_entry = {
'platform': platform,
'id': source,
'name': community_info['name'],
'display_name': community_info['display_name'],
'icon': community_info.get('icon', platform_info.get('icon', '📄')),
'count': count,
'description': community_info.get('description', '')
}
else:
# Fallback for sources not in config
display_name = get_display_name_for_source(platform, source, platform_config)
community_entry = {
'platform': platform,
'id': source,
'name': source or platform,
'display_name': display_name,
'icon': platform_info.get('icon', '📄'),
'count': count,
'description': f"Posts from {display_name}"
}
communities.append(community_entry)
# Sort communities by count (most posts first)
communities.sort(key=lambda x: x['count'], reverse=True)
return {
'platforms': platform_config.get('platforms', {}),
'communities': communities,
'total_communities': len(communities)
}
except Exception as e:
print(f"Error loading platform configuration: {e}")
return {
'platforms': {},
'communities': [],
'total_communities': 0,
'error': str(e)
}
@app.route('/api/content-timestamp')
def api_content_timestamp():
"""API endpoint to get the last content update timestamp for auto-refresh"""
try:
posts_dir = Path('data/posts')
if not posts_dir.exists():
return jsonify({'timestamp': 0})
# Get the most recent modification time of any post file
latest_mtime = 0
for post_file in posts_dir.glob('*.json'):
mtime = post_file.stat().st_mtime
if mtime > latest_mtime:
latest_mtime = mtime
return jsonify({'timestamp': latest_mtime})
except Exception as e:
logger.error(f"Error getting content timestamp: {e}")
return jsonify({'error': 'Failed to get content timestamp'}), 500
@app.route('/post/<post_id>')
def post_detail(post_id):
"""Serve individual post detail page with modern theme"""
try:
# Load platform configuration
platform_config = load_platform_config()
# Use cached data for better performance
cached_posts, cached_comments = _load_posts_cache()
# Get post data from cache
post_data = cached_posts.get(post_id)
if not post_data:
return render_template('404.html'), 404
# Add source display name
post_data['source_display'] = get_display_name_for_source(
post_data.get('platform', ''),
post_data.get('source', ''),
platform_config
)
# Get comments from cache
comments = cached_comments.get(post_id, [])
logger.info(f"Loading post {post_id}: found {len(comments)} comments")
# Sort comments by timestamp
comments.sort(key=lambda x: x.get('timestamp', 0))
# Load user settings if authenticated
user_settings = {}
if current_user.is_authenticated:
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
return render_template('post_detail.html', post=post_data, comments=comments, user_settings=user_settings)
except Exception as e:
print(f"Error loading post {post_id}: {e}")
return render_template('404.html'), 404
@app.route('/themes/<path:filename>')
def serve_theme(filename):
"""Serve theme files (CSS, JS)"""
# Validate filename to prevent directory traversal
if not _is_safe_path(filename) or '..' in filename:
logger.warning(f"Unsafe theme file requested: {filename}")
abort(404)
return send_from_directory('themes', filename)
@app.route('/logo.png')
def serve_logo():
"""Serve logo"""
return send_from_directory('.', 'logo.png')
@app.route('/static/<path:filename>')
def serve_static(filename):
"""Serve static files (avatars, etc.)"""
# Validate filename to prevent directory traversal
if not _is_safe_path(filename) or '..' in filename:
logger.warning(f"Unsafe static file requested: {filename}")
abort(404)
return send_from_directory('static', filename)
# ============================================================
# AUTHENTICATION ROUTES
# ============================================================
@app.route('/login', methods=['GET', 'POST'])
def login():
"""Login page"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
remember = request.form.get('remember', False) == 'on'
if not user_service:
flash('User service not available', 'error')
return render_template('login.html')
user = user_service.authenticate(username, password)
if user:
login_user(user, remember=remember)
flash(f'Welcome back, {user.username}!', 'success')
# Redirect to next page or home
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('index'))
else:
flash('Invalid username or password', 'error')
return render_template('login.html')
@app.route('/password-reset-request', methods=['GET', 'POST'])
def password_reset_request():
"""Request a password reset"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = request.form.get('email', '').strip().lower()
if not email:
flash('Please enter your email address', 'error')
return render_template('password_reset_request.html')
# Find user by email
user = User.query.filter_by(email=email).first()
# Always show success message for security (don't reveal if email exists)
flash('If an account exists with that email, a password reset link has been sent.', 'success')
if user and user.password_hash: # Only send reset if user has a password (not OAuth only)
# Generate reset token
token = user.generate_reset_token()
# Build reset URL
reset_url = url_for('password_reset', token=token, _external=True)
# Log the reset URL (in production, this would be emailed)
logger.info(f"Password reset requested for {email}. Reset URL: {reset_url}")
# For now, also flash it for development (remove in production)
flash(f'Reset link (development only): {reset_url}', 'info')
return redirect(url_for('login'))
return render_template('password_reset_request.html')
@app.route('/password-reset/<token>', methods=['GET', 'POST'])
def password_reset(token):
"""Reset password with token"""
if current_user.is_authenticated:
return redirect(url_for('index'))
# Find user by token
user = User.query.filter_by(reset_token=token).first()
if not user or not user.verify_reset_token(token):
flash('Invalid or expired reset token', 'error')
return redirect(url_for('login'))
if request.method == 'POST':
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
if not password or len(password) < 6:
flash('Password must be at least 6 characters', 'error')
return render_template('password_reset.html')
if password != confirm_password:
flash('Passwords do not match', 'error')
return render_template('password_reset.html')
# Set new password
user.set_password(password)
user.clear_reset_token()
flash('Your password has been reset successfully. You can now log in.', 'success')
return redirect(url_for('login'))
return render_template('password_reset.html')
# Auth0 Routes
@app.route('/auth0/login')
def auth0_login():
"""Redirect to Auth0 for authentication"""
redirect_uri = url_for('auth0_callback', _external=True)
return auth0.authorize_redirect(redirect_uri)
@app.route('/auth0/callback')
def auth0_callback():
"""Handle Auth0 callback and create/login user"""
try:
# Get the access token from Auth0
token = auth0.authorize_access_token()
# Get user info from Auth0
user_info = token.get('userinfo')
if not user_info:
user_info = auth0.parse_id_token(token)
# Extract user details
auth0_id = user_info.get('sub')
email = user_info.get('email')
username = user_info.get('nickname') or user_info.get('preferred_username') or email.split('@')[0]
if not auth0_id or not email:
flash('Unable to get user information from Auth0', 'error')
return redirect(url_for('login'))
# Check if user exists with this Auth0 ID
user = user_service.get_user_by_auth0_id(auth0_id)
if not user:
# Check if user exists with this email (for account linking)
existing_user = user_service.get_user_by_email(email)
if existing_user:
# Link existing account to Auth0
user_service.link_auth0_account(existing_user.id, auth0_id)
user = existing_user
flash(f'Account linked successfully! Welcome back, {user.username}!', 'success')
else:
# Create new user
# Generate unique username if needed
base_username = username[:MAX_USERNAME_LENGTH-3] # Leave room for suffix
unique_username = base_username
counter = 1
while user_service.username_exists(unique_username):
unique_username = f"{base_username}_{counter}"
counter += 1
user_id = user_service.create_user(
username=unique_username,
email=email,
password=None, # No password for OAuth users
is_admin=False,
auth0_id=auth0_id
)
if user_id:
user = user_service.get_user_by_id(user_id)
flash(f'Account created successfully! Welcome, {user.username}!', 'success')
else:
flash('Failed to create user account', 'error')
return redirect(url_for('login'))
else:
flash(f'Welcome back, {user.username}!', 'success')
# Log in the user
if user:
login_user(user, remember=True)
# Store Auth0 info in session for future use
session['auth0_user_info'] = user_info
# Redirect to next page or home
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('index'))
except Exception as e:
logger.error(f"Auth0 callback error: {e}")
flash('Authentication failed. Please try again.', 'error')
return redirect(url_for('login'))
@app.route('/auth0/logout')
@login_required
def auth0_logout():
"""Logout from Auth0 and local session"""
# Clear session
session.clear()
logout_user()
# Build Auth0 logout URL
domain = app.config['AUTH0_DOMAIN']
client_id = app.config['AUTH0_CLIENT_ID']
return_to = url_for('index', _external=True)
logout_url = f'https://{domain}/v2/logout?' + urlencode({
'returnTo': return_to,
'client_id': client_id
}, quote_via=quote_plus)
return redirect(logout_url)
@app.route('/admin-setup', methods=['GET', 'POST'])
def admin_setup():
"""Create first admin user"""
# Check if users already exist
try:
user_count = User.query.count()
if user_count > 0:
flash('Admin user already exists.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.warning(f"Database error checking existing users: {e}")
pass
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
password_confirm = request.form.get('password_confirm')
# Validation
if not username or not email or not password:
flash('All fields are required', 'error')
return render_template('admin_setup.html')
if password != password_confirm:
flash('Passwords do not match', 'error')
return render_template('admin_setup.html')
if len(password) < MIN_PASSWORD_LENGTH:
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
return render_template('admin_setup.html')
# Create admin user
user_id = user_service.create_user(username, email, password, is_admin=True)
if user_id:
flash('Admin account created successfully! Please log in.', 'success')
return redirect(url_for('login'))
else:
flash('Error creating admin account. Please try again.', 'error')
return render_template('admin_setup.html')
@app.route('/signup', methods=['GET', 'POST'])
def signup():
"""Signup page"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
password_confirm = request.form.get('password_confirm')
if not user_service:
flash('User service not available', 'error')
return render_template('signup.html')
# Validation
if not username or not email or not password:
flash('All fields are required', 'error')
return render_template('signup.html')
if password != password_confirm:
flash('Passwords do not match', 'error')
return render_template('signup.html')
if len(password) < MIN_PASSWORD_LENGTH:
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
return render_template('signup.html')
if user_service.username_exists(username):
flash('Username already taken', 'error')
return render_template('signup.html')
if user_service.email_exists(email):
flash('Email already registered', 'error')
return render_template('signup.html')
# Create user
user_id = user_service.create_user(username, email, password)
if user_id:
flash('Account created successfully! Please log in.', 'success')
return redirect(url_for('login'))
else:
flash('Error creating account. Please try again.', 'error')
return render_template('signup.html')
@app.route('/logout')
@login_required
def logout():
"""Logout current user"""
logout_user()
flash('You have been logged out.', 'info')
return redirect(url_for('index'))
@app.route('/settings')
@login_required
def settings():
"""Main settings page"""
# Load user settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Load available filter sets
try:
with open('filtersets.json', 'r') as f:
filter_sets = json.load(f)
except:
filter_sets = {}
return render_template('settings.html',
user=current_user,
user_settings=user_settings,
filter_sets=filter_sets)
@app.route('/settings/profile', methods=['GET', 'POST'])
@login_required
def settings_profile():
"""Profile settings page"""
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
default_avatar = request.form.get('default_avatar')
# Validation
if not username or not email:
flash('Username and email are required', 'error')
return render_template('settings_profile.html', user=current_user)
# Check if username is taken by another user
if username != current_user.username and user_service.username_exists(username):
flash('Username already taken', 'error')
return render_template('settings_profile.html', user=current_user)
# Check if email is taken by another user
if email != current_user.email and user_service.email_exists(email):
flash('Email already registered', 'error')
return render_template('settings_profile.html', user=current_user)
# Update user
current_user.username = username
current_user.email = email
# Handle default avatar selection
if default_avatar and default_avatar.startswith('default_'):
current_user.profile_picture_url = f"/static/default-avatars/{default_avatar}.png"
db.session.commit()
flash('Profile updated successfully', 'success')
return redirect(url_for('settings'))
# Available default avatars
default_avatars = [
{'id': 'default_1', 'name': 'Gradient Blue', 'bg': 'linear-gradient(135deg, #667eea 0%, #764ba2 100%)'},
{'id': 'default_2', 'name': 'Gradient Green', 'bg': 'linear-gradient(135deg, #4facfe 0%, #00f2fe 100%)'},
{'id': 'default_3', 'name': 'Gradient Orange', 'bg': 'linear-gradient(135deg, #fa709a 0%, #fee140 100%)'},
{'id': 'default_4', 'name': 'Gradient Purple', 'bg': 'linear-gradient(135deg, #a8edea 0%, #fed6e3 100%)'},
{'id': 'default_5', 'name': 'Brand Colors', 'bg': 'linear-gradient(135deg, #4db6ac 0%, #26a69a 100%)'},
{'id': 'default_6', 'name': 'Sunset', 'bg': 'linear-gradient(135deg, #ff7e5f 0%, #feb47b 100%)'},
]
return render_template('settings_profile.html', user=current_user, default_avatars=default_avatars)
@app.route('/settings/communities', methods=['GET', 'POST'])
@login_required
def settings_communities():
"""Community/source selection settings"""
if request.method == 'POST':
# Get selected communities
selected_communities = request.form.getlist('communities')
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Update communities
user_settings['communities'] = selected_communities
# Save settings
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Community preferences updated', 'success')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
selected_communities = user_settings.get('communities', [])
except:
selected_communities = []
# Available communities
available_communities = [
{'id': 'programming', 'name': 'Programming', 'platform': 'reddit'},
{'id': 'python', 'name': 'Python', 'platform': 'reddit'},
{'id': 'technology', 'name': 'Technology', 'platform': 'reddit'},
{'id': 'hackernews', 'name': 'Hacker News', 'platform': 'hackernews'},
{'id': 'lobsters', 'name': 'Lobsters', 'platform': 'lobsters'},
{'id': 'stackoverflow', 'name': 'Stack Overflow', 'platform': 'stackoverflow'},
]
return render_template('settings_communities.html',
user=current_user,
available_communities=available_communities,
selected_communities=selected_communities)
@app.route('/settings/filters', methods=['GET', 'POST'])
@login_required
def settings_filters():
"""Filter settings page"""
if request.method == 'POST':
selected_filter = request.form.get('filter_set', 'no_filter')
# Load and validate current settings
user_settings = _validate_user_settings(current_user.settings)
# Validate new filter setting
if _is_safe_filterset(selected_filter):
user_settings['filter_set'] = selected_filter
else:
flash('Invalid filter selection', 'error')
return redirect(url_for('settings'))
# Save validated settings
try:
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Filter settings updated successfully', 'success')
except Exception as e:
db.session.rollback()
logger.error(f"Error saving filter settings for user {current_user.id}: {e}")
flash('Error saving settings', 'error')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
current_filter = user_settings.get('filter_set', 'no_filter')
# Load available filter sets
filter_sets = {}
try:
with open('filtersets.json', 'r') as f:
filter_sets = json.load(f)
except:
filter_sets = {}
return render_template('settings_filters.html',
user=current_user,
filter_sets=filter_sets,
current_filter=current_filter)
@app.route('/settings/experience', methods=['GET', 'POST'])
@login_required
def settings_experience():
"""Experience and behavioral settings page - opt-in addictive features"""
if request.method == 'POST':
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Get experience settings with defaults (all opt-in, so default to False)
user_settings['experience'] = {
'infinite_scroll': request.form.get('infinite_scroll') == 'on',
'auto_refresh': request.form.get('auto_refresh') == 'on',
'push_notifications': request.form.get('push_notifications') == 'on',
'dark_patterns_opt_in': request.form.get('dark_patterns_opt_in') == 'on'
}
# Save settings
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Experience settings updated successfully', 'success')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
experience_settings = user_settings.get('experience', {
'infinite_scroll': False,
'auto_refresh': False,
'push_notifications': False,
'dark_patterns_opt_in': False
})
return render_template('settings_experience.html',
user=current_user,
experience_settings=experience_settings)
@app.route('/upload-avatar', methods=['POST'])
@login_required
def upload_avatar():
"""Upload profile picture"""
if 'avatar' not in request.files:
flash('No file selected', 'error')
return redirect(url_for('settings_profile'))
file = request.files['avatar']
if file.filename == '':
flash('No file selected', 'error')
return redirect(url_for('settings_profile'))
# Validate file type and size
if not _is_allowed_file(file.filename):
flash('Invalid file type. Please upload PNG, JPG, or GIF', 'error')
return redirect(url_for('settings_profile'))
# Check file size (Flask's MAX_CONTENT_LENGTH handles this too, but double-check)
if hasattr(file, 'content_length') and file.content_length > app.config['MAX_CONTENT_LENGTH']:
flash('File too large. Maximum size is 16MB', 'error')
return redirect(url_for('settings_profile'))
# Validate and secure filename
filename = secure_filename(file.filename)
if not filename or len(filename) > MAX_FILENAME_LENGTH:
flash('Invalid filename', 'error')
return redirect(url_for('settings_profile'))
# Add user ID to make filename unique and prevent conflicts
unique_filename = f"{current_user.id}_{filename}"
# Ensure upload directory exists and is secure
upload_dir = os.path.abspath(UPLOAD_FOLDER)
os.makedirs(upload_dir, exist_ok=True)
upload_path = os.path.join(upload_dir, unique_filename)
# Final security check - ensure path is within upload directory
if not os.path.abspath(upload_path).startswith(upload_dir):
logger.warning(f"Path traversal attempt in file upload: {upload_path}")
flash('Invalid file path', 'error')
return redirect(url_for('settings_profile'))
try:
file.save(upload_path)
logger.info(f"File uploaded successfully: {unique_filename} by user {current_user.id}")
except Exception as e:
logger.error(f"Error saving uploaded file: {e}")
flash('Error saving file', 'error')
return redirect(url_for('settings_profile'))
# Update user profile
current_user.profile_picture_url = f"/static/avatars/{unique_filename}"
db.session.commit()
flash('Profile picture updated successfully', 'success')
return redirect(url_for('settings_profile'))
@app.route('/profile')
@login_required
def profile():
"""User profile page"""
return render_template('profile.html', user=current_user)
# ============================================================
# ADMIN ROUTES
# ============================================================
@app.route('/admin')
@login_required
def admin_panel():
"""Admin panel - user management"""
if not current_user.is_admin:
flash('Access denied. Admin privileges required.', 'error')
return redirect(url_for('index'))
if not user_service:
flash('User service not available', 'error')
return redirect(url_for('index'))
users = user_service.get_all_users()
return render_template('admin.html', users=users)
@app.route('/admin/user/<user_id>/delete', methods=['POST'])
@login_required
def admin_delete_user(user_id):
"""Delete user (admin only)"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
# Prevent self-deletion
if current_user.id == user_id:
flash('You cannot delete your own account!', 'error')
return redirect(url_for('admin_panel'))
user = user_service.get_user_by_id(user_id)
if user:
username = user.username
if user_service.delete_user(user_id):
flash(f'User {username} has been deleted.', 'success')
logger.info(f"Admin {current_user.id} deleted user {username} ({user_id})")
else:
flash('Error deleting user', 'error')
logger.error(f"Failed to delete user {user_id}")
else:
flash('User not found', 'error')
return redirect(url_for('admin_panel'))
@app.route('/admin/user/<user_id>/toggle-admin', methods=['POST'])
@login_required
def admin_toggle_admin(user_id):
"""Toggle user admin status"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
target_user = user_service.get_user_by_id(user_id)
if target_user:
new_status = not target_user.is_admin # Toggle admin status
user_service.update_user_admin_status(user_id, new_status)
flash('Admin status updated', 'success')
else:
flash('User not found', 'error')
return redirect(url_for('admin_panel'))
# This route is duplicate - removed in favor of the UUID-based route above
# This route is duplicate - removed in favor of the UUID-based route above
@app.route('/admin/regenerate_content', methods=['POST'])
@login_required
def admin_regenerate_content():
"""Regenerate all HTML content"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
import subprocess
import shlex
# Secure subprocess execution with absolute paths and validation
script_path = os.path.abspath('generate_html.py')
if not os.path.exists(script_path):
flash('Content generation script not found', 'error')
return redirect(url_for('admin_panel'))
# Use absolute python path and validate arguments
python_exe = os.path.abspath(os.sys.executable)
cmd = [python_exe, script_path, '--filterset', 'no_filter', '--theme', 'vanilla-js']
# Execute with timeout and security restrictions
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=os.path.abspath('.'),
timeout=300, # 5 minute timeout
check=False
)
if result.returncode == 0:
flash('Content regenerated successfully', 'success')
logger.info(f"Content regenerated by admin user {current_user.id}")
# Invalidate cache since content was regenerated
_invalidate_cache()
else:
flash('Error regenerating content', 'error')
logger.error(f"Content regeneration failed: {result.stderr}")
except subprocess.TimeoutExpired:
flash('Content regeneration timed out', 'error')
logger.error("Content regeneration timed out")
except Exception as e:
flash(f'Error regenerating content: {str(e)}', 'error')
logger.error(f"Content regeneration error: {e}")
return redirect(url_for('admin_panel'))
@app.route('/admin/clear_cache', methods=['POST'])
@login_required
def admin_clear_cache():
"""Clear application cache"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
# Clear any cache directories or temp files
import shutil
import os
cache_dirs = ['cache', 'temp']
for cache_dir in cache_dirs:
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
# Clear application cache
_invalidate_cache()
flash('Cache cleared successfully', 'success')
logger.info(f"Cache cleared by admin user {current_user.id}")
except Exception as e:
flash(f'Error clearing cache: {str(e)}', 'error')
logger.error(f"Cache clearing error: {e}")
return redirect(url_for('admin_panel'))
@app.route('/admin/backup_data', methods=['POST'])
@login_required
def admin_backup_data():
"""Create backup of application data"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
import shutil
import os
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f'balanceboard_backup_{timestamp}'
# Create backup directory
backup_dir = f'backups/{backup_name}'
os.makedirs(backup_dir, exist_ok=True)
# Copy important directories
dirs_to_backup = ['data', 'templates', 'themes', 'static']
for dir_name in dirs_to_backup:
if os.path.exists(dir_name):
shutil.copytree(dir_name, f'{backup_dir}/{dir_name}')
# Copy important files
files_to_backup = ['app.py', 'models.py', 'database.py', 'filtersets.json']
for file_name in files_to_backup:
if os.path.exists(file_name):
shutil.copy2(file_name, backup_dir)
flash(f'Backup created: {backup_name}', 'success')
except Exception as e:
flash(f'Error creating backup: {str(e)}', 'error')
return redirect(url_for('admin_panel'))
# ============================================================
# POLLING MANAGEMENT ROUTES
# ============================================================
@app.route('/admin/polling')
@login_required
def admin_polling():
"""Admin polling management page"""
if not current_user.is_admin:
flash('Access denied. Admin privileges required.', 'error')
return redirect(url_for('index'))
from models import PollSource, PollLog
from polling_service import polling_service
# Get all poll sources with recent logs
sources = PollSource.query.order_by(PollSource.platform, PollSource.display_name).all()
# Get scheduler status
scheduler_status = polling_service.get_status()
# Load platform config for available sources
platform_config = load_platform_config()
return render_template('admin_polling.html',
sources=sources,
scheduler_status=scheduler_status,
platform_config=platform_config)
@app.route('/admin/polling/add', methods=['POST'])
@login_required
def admin_polling_add():
"""Add a new poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
platform = request.form.get('platform')
source_id = request.form.get('source_id')
custom_source_id = request.form.get('custom_source_id')
display_name = request.form.get('display_name')
poll_interval = int(request.form.get('poll_interval', 60))
max_posts = int(request.form.get('max_posts', 100))
fetch_comments = request.form.get('fetch_comments', 'true') == 'true'
priority = request.form.get('priority', 'medium')
# Use custom source if provided, otherwise use dropdown
if custom_source_id and custom_source_id.strip():
source_id = custom_source_id.strip()
if not platform or not source_id or not display_name:
flash('Missing required fields', 'error')
return redirect(url_for('admin_polling'))
# Check if source already exists
existing = PollSource.query.filter_by(platform=platform, source_id=source_id).first()
if existing:
flash(f'Source {platform}:{source_id} already exists', 'warning')
return redirect(url_for('admin_polling'))
# Create new source (disabled by default)
source = PollSource(
platform=platform,
source_id=source_id,
display_name=display_name,
poll_interval_minutes=poll_interval,
max_posts=max_posts,
fetch_comments=fetch_comments,
priority=priority,
enabled=False,
created_by=current_user.id
)
db.session.add(source)
db.session.commit()
flash(f'Added polling source: {display_name}', 'success')
logger.info(f"Admin {current_user.id} added poll source {platform}:{source_id}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling/<source_id>/toggle', methods=['POST'])
@login_required
def admin_polling_toggle(source_id):
"""Toggle a poll source on/off"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
source.enabled = not source.enabled
db.session.commit()
status = 'enabled' if source.enabled else 'disabled'
flash(f'Polling {status} for {source.display_name}', 'success')
return redirect(url_for('admin_polling'))
@app.route('/admin/polling/<source_id>/update', methods=['POST'])
@login_required
def admin_polling_update(source_id):
"""Update poll source configuration"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Update all configurable fields
if request.form.get('poll_interval'):
source.poll_interval_minutes = int(request.form.get('poll_interval'))
if request.form.get('max_posts'):
source.max_posts = int(request.form.get('max_posts'))
if request.form.get('fetch_comments') is not None:
source.fetch_comments = request.form.get('fetch_comments') == 'true'
if request.form.get('priority'):
source.priority = request.form.get('priority')
if request.form.get('display_name'):
source.display_name = request.form.get('display_name')
db.session.commit()
flash(f'Updated settings for {source.display_name}', 'success')
return redirect(url_for('admin_polling'))
@app.route('/admin/polling/<source_id>/poll-now', methods=['POST'])
@login_required
def admin_polling_poll_now(source_id):
"""Manually trigger polling for a source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
from polling_service import polling_service
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Trigger polling in background
try:
polling_service.poll_now(source_id)
flash(f'Polling started for {source.display_name}', 'success')
except Exception as e:
flash(f'Error starting poll: {str(e)}', 'error')
logger.error(f"Error triggering poll for {source_id}: {e}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling/<source_id>/delete', methods=['POST'])
@login_required
def admin_polling_delete(source_id):
"""Delete a poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
display_name = source.display_name
db.session.delete(source)
db.session.commit()
flash(f'Deleted polling source: {display_name}', 'success')
logger.info(f"Admin {current_user.id} deleted poll source {source_id}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling/<source_id>/logs')
@login_required
def admin_polling_logs(source_id):
"""View logs for a specific poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource, PollLog
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Get recent logs (limit to 50)
logs = source.logs.limit(50).all()
return render_template('admin_polling_logs.html',
source=source,
logs=logs)
# ============================================================
# ERROR HANDLERS
# ============================================================
@app.errorhandler(404)
def not_found(e):
"""404 page"""
return render_template('404.html'), 404
@app.errorhandler(500)
def server_error(e):
"""500 page"""
return render_template('500.html'), 500
# ============================================================
# INITIALIZATION
# ============================================================
if __name__ == '__main__':
print("✓ BalanceBoard starting...")
print("✓ Database: PostgreSQL with SQLAlchemy")
print("✓ Password hashing: bcrypt")
print("✓ Authentication: Flask-Login")
app.run(host='0.0.0.0', port=DEFAULT_PORT, debug=True)