"""
BalanceBoard Web Application
Flask server with user authentication and content serving.
"""
import os
import re
import logging
import time
from pathlib import Path
from werkzeug.utils import secure_filename
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, abort, session, jsonify
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from dotenv import load_dotenv
from functools import lru_cache
from collections import defaultdict
from authlib.integrations.flask_client import OAuth
from urllib.parse import quote_plus, urlencode
from database import init_db, db
from models import User, bcrypt
from user_service import UserService
import json
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__,
static_folder='themes',
template_folder='templates')
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
# Auth0 Configuration
app.config['AUTH0_DOMAIN'] = os.getenv('AUTH0_DOMAIN', '')
app.config['AUTH0_CLIENT_ID'] = os.getenv('AUTH0_CLIENT_ID', '')
app.config['AUTH0_CLIENT_SECRET'] = os.getenv('AUTH0_CLIENT_SECRET', '')
app.config['AUTH0_AUDIENCE'] = os.getenv('AUTH0_AUDIENCE', '')
# Configuration constants
ALLOWED_FILTERSETS = {'no_filter', 'safe_content'}
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
UPLOAD_FOLDER = 'static/avatars'
MAX_FILENAME_LENGTH = 100
DEFAULT_PORT = 5021
DEFAULT_PAGE_SIZE = 20
MIN_PASSWORD_LENGTH = 8
MAX_USERNAME_LENGTH = 80
MAX_EMAIL_LENGTH = 120
MAX_COMMUNITY_NAME_LENGTH = 100
# Initialize database
init_db(app)
# Initialize bcrypt
bcrypt.init_app(app)
# Initialize Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Please log in to access this page.'
# Initialize user service
user_service = UserService()
# Initialize polling service
from polling_service import polling_service
polling_service.init_app(app)
polling_service.start()
# Initialize OAuth for Auth0
oauth = OAuth(app)
auth0 = oauth.register(
'auth0',
client_id=app.config['AUTH0_CLIENT_ID'],
client_secret=app.config['AUTH0_CLIENT_SECRET'],
server_metadata_url=f'https://{app.config["AUTH0_DOMAIN"]}/.well-known/openid_configuration',
client_kwargs={
'scope': 'openid profile email',
}
)
# Cache for posts and comments - improves performance
post_cache = {}
comment_cache = defaultdict(list)
cache_timestamp = 0
CACHE_DURATION = 300 # 5 minutes
# Security helper functions
def _is_safe_filterset(filterset):
"""Validate filterset name for security"""
if not filterset or not isinstance(filterset, str):
return False
return filterset in ALLOWED_FILTERSETS and re.match(r'^[a-zA-Z0-9_-]+$', filterset)
def _is_safe_path(path):
"""Validate file path for security"""
if not path or not isinstance(path, str):
return False
# Check for directory traversal attempts
if '..' in path or path.startswith('/') or '\\' in path:
return False
# Only allow alphanumeric, dots, hyphens, underscores, and forward slashes
return re.match(r'^[a-zA-Z0-9._/-]+$', path) is not None
def _is_allowed_file(filename):
"""Check if file extension is allowed"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def _load_posts_cache():
"""Load and cache posts data for better performance"""
global post_cache, comment_cache, cache_timestamp
current_time = time.time()
if current_time - cache_timestamp < CACHE_DURATION and post_cache:
return post_cache, comment_cache
# Clear existing cache
post_cache.clear()
comment_cache.clear()
posts_dir = Path('data/posts')
comments_dir = Path('data/comments')
# Load all posts
if posts_dir.exists():
for post_file in posts_dir.glob('*.json'):
try:
with open(post_file, 'r') as f:
post_data = json.load(f)
post_uuid = post_data.get('uuid')
if post_uuid:
post_cache[post_uuid] = post_data
except (json.JSONDecodeError, IOError) as e:
logger.debug(f"Error reading post file {post_file}: {e}")
continue
# Load all comments and group by post UUID
if comments_dir.exists():
for comment_file in comments_dir.glob('*.json'):
try:
with open(comment_file, 'r') as f:
comment_data = json.load(f)
post_uuid = comment_data.get('post_uuid')
if post_uuid:
comment_cache[post_uuid].append(comment_data)
except (json.JSONDecodeError, IOError) as e:
logger.debug(f"Error reading comment file {comment_file}: {e}")
continue
cache_timestamp = current_time
logger.info(f"Cache refreshed: {len(post_cache)} posts, {len(comment_cache)} comment groups")
return post_cache, comment_cache
def _invalidate_cache():
"""Invalidate the cache to force refresh"""
global cache_timestamp
cache_timestamp = 0
def _validate_user_settings(settings_str):
"""Validate and sanitize user settings JSON"""
try:
if not settings_str:
return {}
settings = json.loads(settings_str)
if not isinstance(settings, dict):
logger.warning("User settings must be a JSON object")
return {}
# Validate specific fields
validated = {}
# Filter set validation
if 'filter_set' in settings:
filter_set = settings['filter_set']
if isinstance(filter_set, str) and _is_safe_filterset(filter_set):
validated['filter_set'] = filter_set
# Communities validation
if 'communities' in settings:
communities = settings['communities']
if isinstance(communities, list):
# Validate each community name
safe_communities = []
for community in communities:
if isinstance(community, str) and len(community) <= MAX_COMMUNITY_NAME_LENGTH and re.match(r'^[a-zA-Z0-9_-]+$', community):
safe_communities.append(community)
validated['communities'] = safe_communities
# Experience settings validation
if 'experience' in settings:
exp = settings['experience']
if isinstance(exp, dict):
safe_exp = {}
bool_fields = ['infinite_scroll', 'auto_refresh', 'push_notifications', 'dark_patterns_opt_in']
for field in bool_fields:
if field in exp and isinstance(exp[field], bool):
safe_exp[field] = exp[field]
validated['experience'] = safe_exp
return validated
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Invalid user settings JSON: {e}")
return {}
# Add custom Jinja filters
@app.template_filter('nl2br')
def nl2br_filter(text):
"""Convert newlines to
tags"""
if not text:
return text
return text.replace('\n', '
\n')
@login_manager.user_loader
def load_user(user_id):
"""Load user by ID for Flask-Login"""
return user_service.get_user_by_id(user_id)
# ============================================================
# STATIC CONTENT ROUTES
# ============================================================
@app.before_request
def check_first_user():
"""Check if any users exist, redirect to admin creation if not"""
# Skip for static files and auth routes
if request.endpoint and (
request.endpoint.startswith('static') or
request.endpoint in ['login', 'signup', 'admin_setup', 'serve_theme', 'serve_logo']
):
return
# Skip if user is already authenticated
if current_user.is_authenticated:
return
# Check if any users exist
try:
user_count = User.query.count()
if user_count == 0:
return redirect(url_for('admin_setup'))
except Exception as e:
# If database is not ready, skip check
logger.warning(f"Database not ready for user count check: {e}")
pass
@app.route('/')
def index():
"""Serve the main feed page"""
if current_user.is_authenticated:
# Load user settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Invalid user settings JSON for user {current_user.id}: {e}")
user_settings = {}
return render_template('dashboard.html', user_settings=user_settings)
else:
# Anonymous mode - allow browsing with default settings
user_settings = {
'filter_set': 'no_filter',
'communities': [],
'experience': {
'infinite_scroll': False,
'auto_refresh': False,
'push_notifications': False,
'dark_patterns_opt_in': False
}
}
return render_template('dashboard.html', user_settings=user_settings, anonymous=True)
@app.route('/feed/')
def feed_content(filterset='no_filter'):
"""Serve filtered feed content"""
# Validate filterset to prevent directory traversal
if not _is_safe_filterset(filterset):
logger.warning(f"Invalid filterset requested: {filterset}")
abort(404)
# Additional path validation
safe_path = os.path.normpath(f'active_html/{filterset}/index.html')
if not safe_path.startswith('active_html/'):
logger.warning(f"Path traversal attempt detected: {filterset}")
abort(404)
return send_from_directory(f'active_html/{filterset}', 'index.html')
def load_platform_config():
"""Load platform configuration"""
try:
with open('platform_config.json', 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
logger.warning(f"Could not load platform config: {e}")
return {"platforms": {}, "collection_targets": []}
def get_display_name_for_source(platform, source, platform_config):
"""Get proper display name for a source based on platform"""
if not platform_config or 'platforms' not in platform_config:
return source
platform_info = platform_config['platforms'].get(platform, {})
# For platforms with communities, find the community info
if platform_info.get('supports_communities'):
for community in platform_info.get('communities', []):
if community['id'] == source:
return community['display_name']
# Fallback to prefix + source for Reddit-like platforms
prefix = platform_info.get('prefix', '')
return f"{prefix}{source}" if source else platform_info.get('name', platform)
else:
# For platforms without communities, use the platform name
return platform_info.get('name', platform)
@app.route('/api/posts')
def api_posts():
"""API endpoint to get posts data with pagination and filtering"""
try:
# Load platform configuration
platform_config = load_platform_config()
# Get query parameters
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE))
community = request.args.get('community', '')
platform = request.args.get('platform', '')
# Use cached data for better performance
cached_posts, cached_comments = _load_posts_cache()
posts = []
# Process cached posts
for post_uuid, post_data in cached_posts.items():
# Apply community filter
if community and post_data.get('source', '').lower() != community.lower():
continue
# Apply platform filter
if platform and post_data.get('platform', '').lower() != platform.lower():
continue
# Get comment count from cache
comment_count = len(cached_comments.get(post_uuid, []))
# Get proper display name for source
source_display = get_display_name_for_source(
post_data.get('platform', ''),
post_data.get('source', ''),
platform_config
)
# Create post object with actual title
post = {
'id': post_uuid,
'title': post_data.get('title', 'Untitled'),
'author': post_data.get('author', 'Unknown'),
'platform': post_data.get('platform', 'unknown'),
'score': post_data.get('score', 0),
'timestamp': post_data.get('timestamp', 0),
'url': f'/post/{post_uuid}',
'comments_count': comment_count,
'content_preview': (post_data.get('content', '') or '')[:200] + '...' if post_data.get('content') else '',
'source': post_data.get('source', ''),
'source_display': source_display,
'tags': post_data.get('tags', []),
'external_url': post_data.get('url', '')
}
posts.append(post)
# Sort by timestamp (newest first)
posts.sort(key=lambda x: x['timestamp'], reverse=True)
# Calculate pagination
total_posts = len(posts)
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_posts = posts[start_idx:end_idx]
total_pages = (total_posts + per_page - 1) // per_page
has_next = page < total_pages
has_prev = page > 1
return {
'posts': paginated_posts,
'pagination': {
'current_page': page,
'total_pages': total_pages,
'total_posts': total_posts,
'per_page': per_page,
'has_next': has_next,
'has_prev': has_prev
}
}
except Exception as e:
print(f"Error loading posts: {e}")
return {'posts': [], 'error': str(e), 'pagination': {'current_page': 1, 'total_pages': 0, 'total_posts': 0, 'per_page': DEFAULT_PAGE_SIZE, 'has_next': False, 'has_prev': False}}
@app.route('/api/platforms')
def api_platforms():
"""API endpoint to get platform configuration and available communities"""
try:
platform_config = load_platform_config()
# Build community list for filtering UI
communities = []
posts_dir = Path('data/posts')
source_counts = {}
# Count posts per source to show actual available communities
for post_file in posts_dir.glob('*.json'):
try:
with open(post_file, 'r') as f:
post_data = json.load(f)
platform = post_data.get('platform', 'unknown')
source = post_data.get('source', '')
key = f"{platform}:{source}"
source_counts[key] = source_counts.get(key, 0) + 1
except:
continue
# Build community list from actual data and platform config
for key, count in source_counts.items():
platform, source = key.split(':', 1)
# Get display info from platform config
platform_info = platform_config.get('platforms', {}).get(platform, {})
community_info = None
if platform_info.get('supports_communities'):
for community in platform_info.get('communities', []):
if community['id'] == source:
community_info = community
break
# Create community entry
if community_info:
community_entry = {
'platform': platform,
'id': source,
'name': community_info['name'],
'display_name': community_info['display_name'],
'icon': community_info.get('icon', platform_info.get('icon', '📄')),
'count': count,
'description': community_info.get('description', '')
}
else:
# Fallback for sources not in config
display_name = get_display_name_for_source(platform, source, platform_config)
community_entry = {
'platform': platform,
'id': source,
'name': source or platform,
'display_name': display_name,
'icon': platform_info.get('icon', '📄'),
'count': count,
'description': f"Posts from {display_name}"
}
communities.append(community_entry)
# Sort communities by count (most posts first)
communities.sort(key=lambda x: x['count'], reverse=True)
return {
'platforms': platform_config.get('platforms', {}),
'communities': communities,
'total_communities': len(communities)
}
except Exception as e:
print(f"Error loading platform configuration: {e}")
return {
'platforms': {},
'communities': [],
'total_communities': 0,
'error': str(e)
}
@app.route('/api/content-timestamp')
def api_content_timestamp():
"""API endpoint to get the last content update timestamp for auto-refresh"""
try:
posts_dir = Path('data/posts')
if not posts_dir.exists():
return jsonify({'timestamp': 0})
# Get the most recent modification time of any post file
latest_mtime = 0
for post_file in posts_dir.glob('*.json'):
mtime = post_file.stat().st_mtime
if mtime > latest_mtime:
latest_mtime = mtime
return jsonify({'timestamp': latest_mtime})
except Exception as e:
logger.error(f"Error getting content timestamp: {e}")
return jsonify({'error': 'Failed to get content timestamp'}), 500
@app.route('/post/')
def post_detail(post_id):
"""Serve individual post detail page with modern theme"""
try:
# Load platform configuration
platform_config = load_platform_config()
# Use cached data for better performance
cached_posts, cached_comments = _load_posts_cache()
# Get post data from cache
post_data = cached_posts.get(post_id)
if not post_data:
return render_template('404.html'), 404
# Add source display name
post_data['source_display'] = get_display_name_for_source(
post_data.get('platform', ''),
post_data.get('source', ''),
platform_config
)
# Get comments from cache
comments = cached_comments.get(post_id, [])
# Sort comments by timestamp
comments.sort(key=lambda x: x.get('timestamp', 0))
# Load user settings if authenticated
user_settings = {}
if current_user.is_authenticated:
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
return render_template('post_detail.html', post=post_data, comments=comments, user_settings=user_settings)
except Exception as e:
print(f"Error loading post {post_id}: {e}")
return render_template('404.html'), 404
@app.route('/themes/')
def serve_theme(filename):
"""Serve theme files (CSS, JS)"""
# Validate filename to prevent directory traversal
if not _is_safe_path(filename) or '..' in filename:
logger.warning(f"Unsafe theme file requested: {filename}")
abort(404)
return send_from_directory('themes', filename)
@app.route('/logo.png')
def serve_logo():
"""Serve logo"""
return send_from_directory('.', 'logo.png')
@app.route('/static/')
def serve_static(filename):
"""Serve static files (avatars, etc.)"""
# Validate filename to prevent directory traversal
if not _is_safe_path(filename) or '..' in filename:
logger.warning(f"Unsafe static file requested: {filename}")
abort(404)
return send_from_directory('static', filename)
# ============================================================
# AUTHENTICATION ROUTES
# ============================================================
@app.route('/login', methods=['GET', 'POST'])
def login():
"""Login page"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
remember = request.form.get('remember', False) == 'on'
if not user_service:
flash('User service not available', 'error')
return render_template('login.html')
user = user_service.authenticate(username, password)
if user:
login_user(user, remember=remember)
flash(f'Welcome back, {user.username}!', 'success')
# Redirect to next page or home
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('index'))
else:
flash('Invalid username or password', 'error')
return render_template('login.html')
@app.route('/password-reset-request', methods=['GET', 'POST'])
def password_reset_request():
"""Request a password reset"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = request.form.get('email', '').strip().lower()
if not email:
flash('Please enter your email address', 'error')
return render_template('password_reset_request.html')
# Find user by email
user = User.query.filter_by(email=email).first()
# Always show success message for security (don't reveal if email exists)
flash('If an account exists with that email, a password reset link has been sent.', 'success')
if user and user.password_hash: # Only send reset if user has a password (not OAuth only)
# Generate reset token
token = user.generate_reset_token()
# Build reset URL
reset_url = url_for('password_reset', token=token, _external=True)
# Log the reset URL (in production, this would be emailed)
logger.info(f"Password reset requested for {email}. Reset URL: {reset_url}")
# For now, also flash it for development (remove in production)
flash(f'Reset link (development only): {reset_url}', 'info')
return redirect(url_for('login'))
return render_template('password_reset_request.html')
@app.route('/password-reset/', methods=['GET', 'POST'])
def password_reset(token):
"""Reset password with token"""
if current_user.is_authenticated:
return redirect(url_for('index'))
# Find user by token
user = User.query.filter_by(reset_token=token).first()
if not user or not user.verify_reset_token(token):
flash('Invalid or expired reset token', 'error')
return redirect(url_for('login'))
if request.method == 'POST':
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
if not password or len(password) < 6:
flash('Password must be at least 6 characters', 'error')
return render_template('password_reset.html')
if password != confirm_password:
flash('Passwords do not match', 'error')
return render_template('password_reset.html')
# Set new password
user.set_password(password)
user.clear_reset_token()
flash('Your password has been reset successfully. You can now log in.', 'success')
return redirect(url_for('login'))
return render_template('password_reset.html')
# Auth0 Routes
@app.route('/auth0/login')
def auth0_login():
"""Redirect to Auth0 for authentication"""
redirect_uri = url_for('auth0_callback', _external=True)
return auth0.authorize_redirect(redirect_uri)
@app.route('/auth0/callback')
def auth0_callback():
"""Handle Auth0 callback and create/login user"""
try:
# Get the access token from Auth0
token = auth0.authorize_access_token()
# Get user info from Auth0
user_info = token.get('userinfo')
if not user_info:
user_info = auth0.parse_id_token(token)
# Extract user details
auth0_id = user_info.get('sub')
email = user_info.get('email')
username = user_info.get('nickname') or user_info.get('preferred_username') or email.split('@')[0]
if not auth0_id or not email:
flash('Unable to get user information from Auth0', 'error')
return redirect(url_for('login'))
# Check if user exists with this Auth0 ID
user = user_service.get_user_by_auth0_id(auth0_id)
if not user:
# Check if user exists with this email (for account linking)
existing_user = user_service.get_user_by_email(email)
if existing_user:
# Link existing account to Auth0
user_service.link_auth0_account(existing_user.id, auth0_id)
user = existing_user
flash(f'Account linked successfully! Welcome back, {user.username}!', 'success')
else:
# Create new user
# Generate unique username if needed
base_username = username[:MAX_USERNAME_LENGTH-3] # Leave room for suffix
unique_username = base_username
counter = 1
while user_service.username_exists(unique_username):
unique_username = f"{base_username}_{counter}"
counter += 1
user_id = user_service.create_user(
username=unique_username,
email=email,
password=None, # No password for OAuth users
is_admin=False,
auth0_id=auth0_id
)
if user_id:
user = user_service.get_user_by_id(user_id)
flash(f'Account created successfully! Welcome, {user.username}!', 'success')
else:
flash('Failed to create user account', 'error')
return redirect(url_for('login'))
else:
flash(f'Welcome back, {user.username}!', 'success')
# Log in the user
if user:
login_user(user, remember=True)
# Store Auth0 info in session for future use
session['auth0_user_info'] = user_info
# Redirect to next page or home
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('index'))
except Exception as e:
logger.error(f"Auth0 callback error: {e}")
flash('Authentication failed. Please try again.', 'error')
return redirect(url_for('login'))
@app.route('/auth0/logout')
@login_required
def auth0_logout():
"""Logout from Auth0 and local session"""
# Clear session
session.clear()
logout_user()
# Build Auth0 logout URL
domain = app.config['AUTH0_DOMAIN']
client_id = app.config['AUTH0_CLIENT_ID']
return_to = url_for('index', _external=True)
logout_url = f'https://{domain}/v2/logout?' + urlencode({
'returnTo': return_to,
'client_id': client_id
}, quote_via=quote_plus)
return redirect(logout_url)
@app.route('/admin-setup', methods=['GET', 'POST'])
def admin_setup():
"""Create first admin user"""
# Check if users already exist
try:
user_count = User.query.count()
if user_count > 0:
flash('Admin user already exists.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.warning(f"Database error checking existing users: {e}")
pass
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
password_confirm = request.form.get('password_confirm')
# Validation
if not username or not email or not password:
flash('All fields are required', 'error')
return render_template('admin_setup.html')
if password != password_confirm:
flash('Passwords do not match', 'error')
return render_template('admin_setup.html')
if len(password) < MIN_PASSWORD_LENGTH:
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
return render_template('admin_setup.html')
# Create admin user
user_id = user_service.create_user(username, email, password, is_admin=True)
if user_id:
flash('Admin account created successfully! Please log in.', 'success')
return redirect(url_for('login'))
else:
flash('Error creating admin account. Please try again.', 'error')
return render_template('admin_setup.html')
@app.route('/signup', methods=['GET', 'POST'])
def signup():
"""Signup page"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
password_confirm = request.form.get('password_confirm')
if not user_service:
flash('User service not available', 'error')
return render_template('signup.html')
# Validation
if not username or not email or not password:
flash('All fields are required', 'error')
return render_template('signup.html')
if password != password_confirm:
flash('Passwords do not match', 'error')
return render_template('signup.html')
if len(password) < MIN_PASSWORD_LENGTH:
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
return render_template('signup.html')
if user_service.username_exists(username):
flash('Username already taken', 'error')
return render_template('signup.html')
if user_service.email_exists(email):
flash('Email already registered', 'error')
return render_template('signup.html')
# Create user
user_id = user_service.create_user(username, email, password)
if user_id:
flash('Account created successfully! Please log in.', 'success')
return redirect(url_for('login'))
else:
flash('Error creating account. Please try again.', 'error')
return render_template('signup.html')
@app.route('/logout')
@login_required
def logout():
"""Logout current user"""
logout_user()
flash('You have been logged out.', 'info')
return redirect(url_for('index'))
@app.route('/settings')
@login_required
def settings():
"""Main settings page"""
# Load user settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Load available filter sets
try:
with open('filtersets.json', 'r') as f:
filter_sets = json.load(f)
except:
filter_sets = {}
return render_template('settings.html',
user=current_user,
user_settings=user_settings,
filter_sets=filter_sets)
@app.route('/settings/profile', methods=['GET', 'POST'])
@login_required
def settings_profile():
"""Profile settings page"""
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
default_avatar = request.form.get('default_avatar')
# Validation
if not username or not email:
flash('Username and email are required', 'error')
return render_template('settings_profile.html', user=current_user)
# Check if username is taken by another user
if username != current_user.username and user_service.username_exists(username):
flash('Username already taken', 'error')
return render_template('settings_profile.html', user=current_user)
# Check if email is taken by another user
if email != current_user.email and user_service.email_exists(email):
flash('Email already registered', 'error')
return render_template('settings_profile.html', user=current_user)
# Update user
current_user.username = username
current_user.email = email
# Handle default avatar selection
if default_avatar and default_avatar.startswith('default_'):
current_user.profile_picture_url = f"/static/default-avatars/{default_avatar}.png"
db.session.commit()
flash('Profile updated successfully', 'success')
return redirect(url_for('settings'))
# Available default avatars
default_avatars = [
{'id': 'default_1', 'name': 'Gradient Blue', 'bg': 'linear-gradient(135deg, #667eea 0%, #764ba2 100%)'},
{'id': 'default_2', 'name': 'Gradient Green', 'bg': 'linear-gradient(135deg, #4facfe 0%, #00f2fe 100%)'},
{'id': 'default_3', 'name': 'Gradient Orange', 'bg': 'linear-gradient(135deg, #fa709a 0%, #fee140 100%)'},
{'id': 'default_4', 'name': 'Gradient Purple', 'bg': 'linear-gradient(135deg, #a8edea 0%, #fed6e3 100%)'},
{'id': 'default_5', 'name': 'Brand Colors', 'bg': 'linear-gradient(135deg, #4db6ac 0%, #26a69a 100%)'},
{'id': 'default_6', 'name': 'Sunset', 'bg': 'linear-gradient(135deg, #ff7e5f 0%, #feb47b 100%)'},
]
return render_template('settings_profile.html', user=current_user, default_avatars=default_avatars)
@app.route('/settings/communities', methods=['GET', 'POST'])
@login_required
def settings_communities():
"""Community/source selection settings"""
if request.method == 'POST':
# Get selected communities
selected_communities = request.form.getlist('communities')
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Update communities
user_settings['communities'] = selected_communities
# Save settings
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Community preferences updated', 'success')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
selected_communities = user_settings.get('communities', [])
except:
selected_communities = []
# Available communities
available_communities = [
{'id': 'programming', 'name': 'Programming', 'platform': 'reddit'},
{'id': 'python', 'name': 'Python', 'platform': 'reddit'},
{'id': 'technology', 'name': 'Technology', 'platform': 'reddit'},
{'id': 'hackernews', 'name': 'Hacker News', 'platform': 'hackernews'},
{'id': 'lobsters', 'name': 'Lobsters', 'platform': 'lobsters'},
{'id': 'stackoverflow', 'name': 'Stack Overflow', 'platform': 'stackoverflow'},
]
return render_template('settings_communities.html',
user=current_user,
available_communities=available_communities,
selected_communities=selected_communities)
@app.route('/settings/filters', methods=['GET', 'POST'])
@login_required
def settings_filters():
"""Filter settings page"""
if request.method == 'POST':
selected_filter = request.form.get('filter_set', 'no_filter')
# Load and validate current settings
user_settings = _validate_user_settings(current_user.settings)
# Validate new filter setting
if _is_safe_filterset(selected_filter):
user_settings['filter_set'] = selected_filter
else:
flash('Invalid filter selection', 'error')
return redirect(url_for('settings'))
# Save validated settings
try:
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Filter settings updated successfully', 'success')
except Exception as e:
db.session.rollback()
logger.error(f"Error saving filter settings for user {current_user.id}: {e}")
flash('Error saving settings', 'error')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
current_filter = user_settings.get('filter_set', 'no_filter')
# Load available filter sets
filter_sets = {}
try:
with open('filtersets.json', 'r') as f:
filter_sets = json.load(f)
except:
filter_sets = {}
return render_template('settings_filters.html',
user=current_user,
filter_sets=filter_sets,
current_filter=current_filter)
@app.route('/settings/experience', methods=['GET', 'POST'])
@login_required
def settings_experience():
"""Experience and behavioral settings page - opt-in addictive features"""
if request.method == 'POST':
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Get experience settings with defaults (all opt-in, so default to False)
user_settings['experience'] = {
'infinite_scroll': request.form.get('infinite_scroll') == 'on',
'auto_refresh': request.form.get('auto_refresh') == 'on',
'push_notifications': request.form.get('push_notifications') == 'on',
'dark_patterns_opt_in': request.form.get('dark_patterns_opt_in') == 'on'
}
# Save settings
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Experience settings updated successfully', 'success')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
experience_settings = user_settings.get('experience', {
'infinite_scroll': False,
'auto_refresh': False,
'push_notifications': False,
'dark_patterns_opt_in': False
})
return render_template('settings_experience.html',
user=current_user,
experience_settings=experience_settings)
@app.route('/upload-avatar', methods=['POST'])
@login_required
def upload_avatar():
"""Upload profile picture"""
if 'avatar' not in request.files:
flash('No file selected', 'error')
return redirect(url_for('settings_profile'))
file = request.files['avatar']
if file.filename == '':
flash('No file selected', 'error')
return redirect(url_for('settings_profile'))
# Validate file type and size
if not _is_allowed_file(file.filename):
flash('Invalid file type. Please upload PNG, JPG, or GIF', 'error')
return redirect(url_for('settings_profile'))
# Check file size (Flask's MAX_CONTENT_LENGTH handles this too, but double-check)
if hasattr(file, 'content_length') and file.content_length > app.config['MAX_CONTENT_LENGTH']:
flash('File too large. Maximum size is 16MB', 'error')
return redirect(url_for('settings_profile'))
# Validate and secure filename
filename = secure_filename(file.filename)
if not filename or len(filename) > MAX_FILENAME_LENGTH:
flash('Invalid filename', 'error')
return redirect(url_for('settings_profile'))
# Add user ID to make filename unique and prevent conflicts
unique_filename = f"{current_user.id}_{filename}"
# Ensure upload directory exists and is secure
upload_dir = os.path.abspath(UPLOAD_FOLDER)
os.makedirs(upload_dir, exist_ok=True)
upload_path = os.path.join(upload_dir, unique_filename)
# Final security check - ensure path is within upload directory
if not os.path.abspath(upload_path).startswith(upload_dir):
logger.warning(f"Path traversal attempt in file upload: {upload_path}")
flash('Invalid file path', 'error')
return redirect(url_for('settings_profile'))
try:
file.save(upload_path)
logger.info(f"File uploaded successfully: {unique_filename} by user {current_user.id}")
except Exception as e:
logger.error(f"Error saving uploaded file: {e}")
flash('Error saving file', 'error')
return redirect(url_for('settings_profile'))
# Update user profile
current_user.profile_picture_url = f"/static/avatars/{unique_filename}"
db.session.commit()
flash('Profile picture updated successfully', 'success')
return redirect(url_for('settings_profile'))
@app.route('/profile')
@login_required
def profile():
"""User profile page"""
return render_template('profile.html', user=current_user)
# ============================================================
# ADMIN ROUTES
# ============================================================
@app.route('/admin')
@login_required
def admin_panel():
"""Admin panel - user management"""
if not current_user.is_admin:
flash('Access denied. Admin privileges required.', 'error')
return redirect(url_for('index'))
if not user_service:
flash('User service not available', 'error')
return redirect(url_for('index'))
users = user_service.get_all_users()
return render_template('admin.html', users=users)
@app.route('/admin/user//delete', methods=['POST'])
@login_required
def admin_delete_user(user_id):
"""Delete user (admin only)"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
# Prevent self-deletion
if current_user.id == user_id:
flash('You cannot delete your own account!', 'error')
return redirect(url_for('admin_panel'))
user = user_service.get_user_by_id(user_id)
if user:
username = user.username
if user_service.delete_user(user_id):
flash(f'User {username} has been deleted.', 'success')
logger.info(f"Admin {current_user.id} deleted user {username} ({user_id})")
else:
flash('Error deleting user', 'error')
logger.error(f"Failed to delete user {user_id}")
else:
flash('User not found', 'error')
return redirect(url_for('admin_panel'))
@app.route('/admin/user//toggle-admin', methods=['POST'])
@login_required
def admin_toggle_admin(user_id):
"""Toggle user admin status"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
target_user = user_service.get_user_by_id(user_id)
if target_user:
new_status = not target_user.is_admin # Toggle admin status
user_service.update_user_admin_status(user_id, new_status)
flash('Admin status updated', 'success')
else:
flash('User not found', 'error')
return redirect(url_for('admin_panel'))
# This route is duplicate - removed in favor of the UUID-based route above
# This route is duplicate - removed in favor of the UUID-based route above
@app.route('/admin/regenerate_content', methods=['POST'])
@login_required
def admin_regenerate_content():
"""Regenerate all HTML content"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
import subprocess
import shlex
# Secure subprocess execution with absolute paths and validation
script_path = os.path.abspath('generate_html.py')
if not os.path.exists(script_path):
flash('Content generation script not found', 'error')
return redirect(url_for('admin_panel'))
# Use absolute python path and validate arguments
python_exe = os.path.abspath(os.sys.executable)
cmd = [python_exe, script_path, '--filterset', 'no_filter', '--theme', 'vanilla-js']
# Execute with timeout and security restrictions
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=os.path.abspath('.'),
timeout=300, # 5 minute timeout
check=False
)
if result.returncode == 0:
flash('Content regenerated successfully', 'success')
logger.info(f"Content regenerated by admin user {current_user.id}")
# Invalidate cache since content was regenerated
_invalidate_cache()
else:
flash('Error regenerating content', 'error')
logger.error(f"Content regeneration failed: {result.stderr}")
except subprocess.TimeoutExpired:
flash('Content regeneration timed out', 'error')
logger.error("Content regeneration timed out")
except Exception as e:
flash(f'Error regenerating content: {str(e)}', 'error')
logger.error(f"Content regeneration error: {e}")
return redirect(url_for('admin_panel'))
@app.route('/admin/clear_cache', methods=['POST'])
@login_required
def admin_clear_cache():
"""Clear application cache"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
# Clear any cache directories or temp files
import shutil
import os
cache_dirs = ['cache', 'temp']
for cache_dir in cache_dirs:
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
# Clear application cache
_invalidate_cache()
flash('Cache cleared successfully', 'success')
logger.info(f"Cache cleared by admin user {current_user.id}")
except Exception as e:
flash(f'Error clearing cache: {str(e)}', 'error')
logger.error(f"Cache clearing error: {e}")
return redirect(url_for('admin_panel'))
@app.route('/admin/backup_data', methods=['POST'])
@login_required
def admin_backup_data():
"""Create backup of application data"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
import shutil
import os
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f'balanceboard_backup_{timestamp}'
# Create backup directory
backup_dir = f'backups/{backup_name}'
os.makedirs(backup_dir, exist_ok=True)
# Copy important directories
dirs_to_backup = ['data', 'templates', 'themes', 'static']
for dir_name in dirs_to_backup:
if os.path.exists(dir_name):
shutil.copytree(dir_name, f'{backup_dir}/{dir_name}')
# Copy important files
files_to_backup = ['app.py', 'models.py', 'database.py', 'filtersets.json']
for file_name in files_to_backup:
if os.path.exists(file_name):
shutil.copy2(file_name, backup_dir)
flash(f'Backup created: {backup_name}', 'success')
except Exception as e:
flash(f'Error creating backup: {str(e)}', 'error')
return redirect(url_for('admin_panel'))
# ============================================================
# POLLING MANAGEMENT ROUTES
# ============================================================
@app.route('/admin/polling')
@login_required
def admin_polling():
"""Admin polling management page"""
if not current_user.is_admin:
flash('Access denied. Admin privileges required.', 'error')
return redirect(url_for('index'))
from models import PollSource, PollLog
from polling_service import polling_service
# Get all poll sources with recent logs
sources = PollSource.query.order_by(PollSource.platform, PollSource.display_name).all()
# Get scheduler status
scheduler_status = polling_service.get_status()
# Load platform config for available sources
platform_config = load_platform_config()
return render_template('admin_polling.html',
sources=sources,
scheduler_status=scheduler_status,
platform_config=platform_config)
@app.route('/admin/polling/add', methods=['POST'])
@login_required
def admin_polling_add():
"""Add a new poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
platform = request.form.get('platform')
source_id = request.form.get('source_id')
custom_source_id = request.form.get('custom_source_id')
display_name = request.form.get('display_name')
poll_interval = int(request.form.get('poll_interval', 60))
max_posts = int(request.form.get('max_posts', 100))
fetch_comments = request.form.get('fetch_comments', 'true') == 'true'
priority = request.form.get('priority', 'medium')
# Use custom source if provided, otherwise use dropdown
if custom_source_id and custom_source_id.strip():
source_id = custom_source_id.strip()
if not platform or not source_id or not display_name:
flash('Missing required fields', 'error')
return redirect(url_for('admin_polling'))
# Check if source already exists
existing = PollSource.query.filter_by(platform=platform, source_id=source_id).first()
if existing:
flash(f'Source {platform}:{source_id} already exists', 'warning')
return redirect(url_for('admin_polling'))
# Create new source (disabled by default)
source = PollSource(
platform=platform,
source_id=source_id,
display_name=display_name,
poll_interval_minutes=poll_interval,
max_posts=max_posts,
fetch_comments=fetch_comments,
priority=priority,
enabled=False,
created_by=current_user.id
)
db.session.add(source)
db.session.commit()
flash(f'Added polling source: {display_name}', 'success')
logger.info(f"Admin {current_user.id} added poll source {platform}:{source_id}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling//toggle', methods=['POST'])
@login_required
def admin_polling_toggle(source_id):
"""Toggle a poll source on/off"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
source.enabled = not source.enabled
db.session.commit()
status = 'enabled' if source.enabled else 'disabled'
flash(f'Polling {status} for {source.display_name}', 'success')
return redirect(url_for('admin_polling'))
@app.route('/admin/polling//update', methods=['POST'])
@login_required
def admin_polling_update(source_id):
"""Update poll source configuration"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Update all configurable fields
if request.form.get('poll_interval'):
source.poll_interval_minutes = int(request.form.get('poll_interval'))
if request.form.get('max_posts'):
source.max_posts = int(request.form.get('max_posts'))
if request.form.get('fetch_comments') is not None:
source.fetch_comments = request.form.get('fetch_comments') == 'true'
if request.form.get('priority'):
source.priority = request.form.get('priority')
if request.form.get('display_name'):
source.display_name = request.form.get('display_name')
db.session.commit()
flash(f'Updated settings for {source.display_name}', 'success')
return redirect(url_for('admin_polling'))
@app.route('/admin/polling//poll-now', methods=['POST'])
@login_required
def admin_polling_poll_now(source_id):
"""Manually trigger polling for a source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
from polling_service import polling_service
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Trigger polling in background
try:
polling_service.poll_now(source_id)
flash(f'Polling started for {source.display_name}', 'success')
except Exception as e:
flash(f'Error starting poll: {str(e)}', 'error')
logger.error(f"Error triggering poll for {source_id}: {e}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling//delete', methods=['POST'])
@login_required
def admin_polling_delete(source_id):
"""Delete a poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
display_name = source.display_name
db.session.delete(source)
db.session.commit()
flash(f'Deleted polling source: {display_name}', 'success')
logger.info(f"Admin {current_user.id} deleted poll source {source_id}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling//logs')
@login_required
def admin_polling_logs(source_id):
"""View logs for a specific poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource, PollLog
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Get recent logs (limit to 50)
logs = source.logs.limit(50).all()
return render_template('admin_polling_logs.html',
source=source,
logs=logs)
# ============================================================
# ERROR HANDLERS
# ============================================================
@app.errorhandler(404)
def not_found(e):
"""404 page"""
return render_template('404.html'), 404
@app.errorhandler(500)
def server_error(e):
"""500 page"""
return render_template('500.html'), 500
# ============================================================
# INITIALIZATION
# ============================================================
if __name__ == '__main__':
print("✓ BalanceBoard starting...")
print("✓ Database: PostgreSQL with SQLAlchemy")
print("✓ Password hashing: bcrypt")
print("✓ Authentication: Flask-Login")
app.run(host='0.0.0.0', port=DEFAULT_PORT, debug=True)