- Update admin_polling_add to accept max_posts, fetch_comments, priority - Enhance admin_polling_update to modify all configurable fields - Support editing display_name, interval, max_posts, fetch_comments, priority 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1556 lines
54 KiB
Python
1556 lines
54 KiB
Python
"""
|
|
BalanceBoard Web Application
|
|
Flask server with user authentication and content serving.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
from werkzeug.utils import secure_filename
|
|
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, abort, session, jsonify
|
|
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
|
|
from dotenv import load_dotenv
|
|
from functools import lru_cache
|
|
from collections import defaultdict
|
|
from authlib.integrations.flask_client import OAuth
|
|
from urllib.parse import quote_plus, urlencode
|
|
|
|
from database import init_db, db
|
|
from models import User, bcrypt
|
|
from user_service import UserService
|
|
import json
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('app.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Initialize Flask app
|
|
app = Flask(__name__,
|
|
static_folder='themes',
|
|
template_folder='templates')
|
|
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
|
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
|
|
|
|
# Auth0 Configuration
|
|
app.config['AUTH0_DOMAIN'] = os.getenv('AUTH0_DOMAIN', '')
|
|
app.config['AUTH0_CLIENT_ID'] = os.getenv('AUTH0_CLIENT_ID', '')
|
|
app.config['AUTH0_CLIENT_SECRET'] = os.getenv('AUTH0_CLIENT_SECRET', '')
|
|
app.config['AUTH0_AUDIENCE'] = os.getenv('AUTH0_AUDIENCE', '')
|
|
|
|
# Configuration constants
|
|
ALLOWED_FILTERSETS = {'no_filter', 'safe_content'}
|
|
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
|
|
UPLOAD_FOLDER = 'static/avatars'
|
|
MAX_FILENAME_LENGTH = 100
|
|
DEFAULT_PORT = 5021
|
|
DEFAULT_PAGE_SIZE = 20
|
|
MIN_PASSWORD_LENGTH = 8
|
|
MAX_USERNAME_LENGTH = 80
|
|
MAX_EMAIL_LENGTH = 120
|
|
MAX_COMMUNITY_NAME_LENGTH = 100
|
|
|
|
# Initialize database
|
|
init_db(app)
|
|
|
|
# Initialize bcrypt
|
|
bcrypt.init_app(app)
|
|
|
|
# Initialize Flask-Login
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
login_manager.login_view = 'login'
|
|
login_manager.login_message = 'Please log in to access this page.'
|
|
|
|
# Initialize user service
|
|
user_service = UserService()
|
|
|
|
# Initialize polling service
|
|
from polling_service import polling_service
|
|
polling_service.init_app(app)
|
|
polling_service.start()
|
|
|
|
# Initialize OAuth for Auth0
|
|
oauth = OAuth(app)
|
|
auth0 = oauth.register(
|
|
'auth0',
|
|
client_id=app.config['AUTH0_CLIENT_ID'],
|
|
client_secret=app.config['AUTH0_CLIENT_SECRET'],
|
|
server_metadata_url=f'https://{app.config["AUTH0_DOMAIN"]}/.well-known/openid_configuration',
|
|
client_kwargs={
|
|
'scope': 'openid profile email',
|
|
}
|
|
)
|
|
|
|
# Cache for posts and comments - improves performance
|
|
post_cache = {}
|
|
comment_cache = defaultdict(list)
|
|
cache_timestamp = 0
|
|
CACHE_DURATION = 300 # 5 minutes
|
|
|
|
# Security helper functions
|
|
def _is_safe_filterset(filterset):
|
|
"""Validate filterset name for security"""
|
|
if not filterset or not isinstance(filterset, str):
|
|
return False
|
|
return filterset in ALLOWED_FILTERSETS and re.match(r'^[a-zA-Z0-9_-]+$', filterset)
|
|
|
|
def _is_safe_path(path):
|
|
"""Validate file path for security"""
|
|
if not path or not isinstance(path, str):
|
|
return False
|
|
# Check for directory traversal attempts
|
|
if '..' in path or path.startswith('/') or '\\' in path:
|
|
return False
|
|
# Only allow alphanumeric, dots, hyphens, underscores, and forward slashes
|
|
return re.match(r'^[a-zA-Z0-9._/-]+$', path) is not None
|
|
|
|
def _is_allowed_file(filename):
|
|
"""Check if file extension is allowed"""
|
|
return '.' in filename and \
|
|
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
def _load_posts_cache():
|
|
"""Load and cache posts data for better performance"""
|
|
global post_cache, comment_cache, cache_timestamp
|
|
|
|
current_time = time.time()
|
|
if current_time - cache_timestamp < CACHE_DURATION and post_cache:
|
|
return post_cache, comment_cache
|
|
|
|
# Clear existing cache
|
|
post_cache.clear()
|
|
comment_cache.clear()
|
|
|
|
posts_dir = Path('data/posts')
|
|
comments_dir = Path('data/comments')
|
|
|
|
# Load all posts
|
|
if posts_dir.exists():
|
|
for post_file in posts_dir.glob('*.json'):
|
|
try:
|
|
with open(post_file, 'r') as f:
|
|
post_data = json.load(f)
|
|
post_uuid = post_data.get('uuid')
|
|
if post_uuid:
|
|
post_cache[post_uuid] = post_data
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
logger.debug(f"Error reading post file {post_file}: {e}")
|
|
continue
|
|
|
|
# Load all comments and group by post UUID
|
|
if comments_dir.exists():
|
|
for comment_file in comments_dir.glob('*.json'):
|
|
try:
|
|
with open(comment_file, 'r') as f:
|
|
comment_data = json.load(f)
|
|
post_uuid = comment_data.get('post_uuid')
|
|
if post_uuid:
|
|
comment_cache[post_uuid].append(comment_data)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
logger.debug(f"Error reading comment file {comment_file}: {e}")
|
|
continue
|
|
|
|
cache_timestamp = current_time
|
|
logger.info(f"Cache refreshed: {len(post_cache)} posts, {len(comment_cache)} comment groups")
|
|
return post_cache, comment_cache
|
|
|
|
def _invalidate_cache():
|
|
"""Invalidate the cache to force refresh"""
|
|
global cache_timestamp
|
|
cache_timestamp = 0
|
|
|
|
def _validate_user_settings(settings_str):
|
|
"""Validate and sanitize user settings JSON"""
|
|
try:
|
|
if not settings_str:
|
|
return {}
|
|
|
|
settings = json.loads(settings_str)
|
|
if not isinstance(settings, dict):
|
|
logger.warning("User settings must be a JSON object")
|
|
return {}
|
|
|
|
# Validate specific fields
|
|
validated = {}
|
|
|
|
# Filter set validation
|
|
if 'filter_set' in settings:
|
|
filter_set = settings['filter_set']
|
|
if isinstance(filter_set, str) and _is_safe_filterset(filter_set):
|
|
validated['filter_set'] = filter_set
|
|
|
|
# Communities validation
|
|
if 'communities' in settings:
|
|
communities = settings['communities']
|
|
if isinstance(communities, list):
|
|
# Validate each community name
|
|
safe_communities = []
|
|
for community in communities:
|
|
if isinstance(community, str) and len(community) <= MAX_COMMUNITY_NAME_LENGTH and re.match(r'^[a-zA-Z0-9_-]+$', community):
|
|
safe_communities.append(community)
|
|
validated['communities'] = safe_communities
|
|
|
|
# Experience settings validation
|
|
if 'experience' in settings:
|
|
exp = settings['experience']
|
|
if isinstance(exp, dict):
|
|
safe_exp = {}
|
|
bool_fields = ['infinite_scroll', 'auto_refresh', 'push_notifications', 'dark_patterns_opt_in']
|
|
for field in bool_fields:
|
|
if field in exp and isinstance(exp[field], bool):
|
|
safe_exp[field] = exp[field]
|
|
validated['experience'] = safe_exp
|
|
|
|
return validated
|
|
|
|
except (json.JSONDecodeError, TypeError) as e:
|
|
logger.warning(f"Invalid user settings JSON: {e}")
|
|
return {}
|
|
|
|
# Add custom Jinja filters
|
|
@app.template_filter('nl2br')
|
|
def nl2br_filter(text):
|
|
"""Convert newlines to <br> tags"""
|
|
if not text:
|
|
return text
|
|
return text.replace('\n', '<br>\n')
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
"""Load user by ID for Flask-Login"""
|
|
return user_service.get_user_by_id(user_id)
|
|
|
|
|
|
# ============================================================
|
|
# STATIC CONTENT ROUTES
|
|
# ============================================================
|
|
|
|
@app.before_request
|
|
def check_first_user():
|
|
"""Check if any users exist, redirect to admin creation if not"""
|
|
# Skip for static files and auth routes
|
|
if request.endpoint and (
|
|
request.endpoint.startswith('static') or
|
|
request.endpoint in ['login', 'signup', 'admin_setup', 'serve_theme', 'serve_logo']
|
|
):
|
|
return
|
|
|
|
# Skip if user is already authenticated
|
|
if current_user.is_authenticated:
|
|
return
|
|
|
|
# Check if any users exist
|
|
try:
|
|
user_count = User.query.count()
|
|
if user_count == 0:
|
|
return redirect(url_for('admin_setup'))
|
|
except Exception as e:
|
|
# If database is not ready, skip check
|
|
logger.warning(f"Database not ready for user count check: {e}")
|
|
pass
|
|
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Serve the main feed page"""
|
|
if current_user.is_authenticated:
|
|
# Load user settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except (json.JSONDecodeError, TypeError) as e:
|
|
logger.warning(f"Invalid user settings JSON for user {current_user.id}: {e}")
|
|
user_settings = {}
|
|
|
|
return render_template('dashboard.html', user_settings=user_settings)
|
|
else:
|
|
# Redirect non-authenticated users to login
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
@app.route('/feed/<filterset>')
|
|
def feed_content(filterset='no_filter'):
|
|
"""Serve filtered feed content"""
|
|
# Validate filterset to prevent directory traversal
|
|
if not _is_safe_filterset(filterset):
|
|
logger.warning(f"Invalid filterset requested: {filterset}")
|
|
abort(404)
|
|
|
|
# Additional path validation
|
|
safe_path = os.path.normpath(f'active_html/{filterset}/index.html')
|
|
if not safe_path.startswith('active_html/'):
|
|
logger.warning(f"Path traversal attempt detected: {filterset}")
|
|
abort(404)
|
|
|
|
return send_from_directory(f'active_html/{filterset}', 'index.html')
|
|
|
|
def load_platform_config():
|
|
"""Load platform configuration"""
|
|
try:
|
|
with open('platform_config.json', 'r') as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
|
|
logger.warning(f"Could not load platform config: {e}")
|
|
return {"platforms": {}, "collection_targets": []}
|
|
|
|
|
|
def get_display_name_for_source(platform, source, platform_config):
|
|
"""Get proper display name for a source based on platform"""
|
|
if not platform_config or 'platforms' not in platform_config:
|
|
return source
|
|
|
|
platform_info = platform_config['platforms'].get(platform, {})
|
|
|
|
# For platforms with communities, find the community info
|
|
if platform_info.get('supports_communities'):
|
|
for community in platform_info.get('communities', []):
|
|
if community['id'] == source:
|
|
return community['display_name']
|
|
# Fallback to prefix + source for Reddit-like platforms
|
|
prefix = platform_info.get('prefix', '')
|
|
return f"{prefix}{source}" if source else platform_info.get('name', platform)
|
|
else:
|
|
# For platforms without communities, use the platform name
|
|
return platform_info.get('name', platform)
|
|
|
|
|
|
@app.route('/api/posts')
|
|
def api_posts():
|
|
"""API endpoint to get posts data with pagination and filtering"""
|
|
try:
|
|
# Load platform configuration
|
|
platform_config = load_platform_config()
|
|
|
|
# Get query parameters
|
|
page = int(request.args.get('page', 1))
|
|
per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE))
|
|
community = request.args.get('community', '')
|
|
platform = request.args.get('platform', '')
|
|
|
|
# Use cached data for better performance
|
|
cached_posts, cached_comments = _load_posts_cache()
|
|
|
|
posts = []
|
|
|
|
# Process cached posts
|
|
for post_uuid, post_data in cached_posts.items():
|
|
# Apply community filter
|
|
if community and post_data.get('source', '').lower() != community.lower():
|
|
continue
|
|
|
|
# Apply platform filter
|
|
if platform and post_data.get('platform', '').lower() != platform.lower():
|
|
continue
|
|
|
|
# Get comment count from cache
|
|
comment_count = len(cached_comments.get(post_uuid, []))
|
|
|
|
# Get proper display name for source
|
|
source_display = get_display_name_for_source(
|
|
post_data.get('platform', ''),
|
|
post_data.get('source', ''),
|
|
platform_config
|
|
)
|
|
|
|
# Create post object with actual title
|
|
post = {
|
|
'id': post_uuid,
|
|
'title': post_data.get('title', 'Untitled'),
|
|
'author': post_data.get('author', 'Unknown'),
|
|
'platform': post_data.get('platform', 'unknown'),
|
|
'score': post_data.get('score', 0),
|
|
'timestamp': post_data.get('timestamp', 0),
|
|
'url': f'/post/{post_uuid}',
|
|
'comments_count': comment_count,
|
|
'content_preview': (post_data.get('content', '') or '')[:200] + '...' if post_data.get('content') else '',
|
|
'source': post_data.get('source', ''),
|
|
'source_display': source_display,
|
|
'tags': post_data.get('tags', []),
|
|
'external_url': post_data.get('url', '')
|
|
}
|
|
posts.append(post)
|
|
|
|
# Sort by timestamp (newest first)
|
|
posts.sort(key=lambda x: x['timestamp'], reverse=True)
|
|
|
|
# Calculate pagination
|
|
total_posts = len(posts)
|
|
start_idx = (page - 1) * per_page
|
|
end_idx = start_idx + per_page
|
|
paginated_posts = posts[start_idx:end_idx]
|
|
|
|
total_pages = (total_posts + per_page - 1) // per_page
|
|
has_next = page < total_pages
|
|
has_prev = page > 1
|
|
|
|
return {
|
|
'posts': paginated_posts,
|
|
'pagination': {
|
|
'current_page': page,
|
|
'total_pages': total_pages,
|
|
'total_posts': total_posts,
|
|
'per_page': per_page,
|
|
'has_next': has_next,
|
|
'has_prev': has_prev
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error loading posts: {e}")
|
|
return {'posts': [], 'error': str(e), 'pagination': {'current_page': 1, 'total_pages': 0, 'total_posts': 0, 'per_page': DEFAULT_PAGE_SIZE, 'has_next': False, 'has_prev': False}}
|
|
|
|
|
|
@app.route('/api/platforms')
|
|
def api_platforms():
|
|
"""API endpoint to get platform configuration and available communities"""
|
|
try:
|
|
platform_config = load_platform_config()
|
|
|
|
# Build community list for filtering UI
|
|
communities = []
|
|
posts_dir = Path('data/posts')
|
|
source_counts = {}
|
|
|
|
# Count posts per source to show actual available communities
|
|
for post_file in posts_dir.glob('*.json'):
|
|
try:
|
|
with open(post_file, 'r') as f:
|
|
post_data = json.load(f)
|
|
platform = post_data.get('platform', 'unknown')
|
|
source = post_data.get('source', '')
|
|
|
|
key = f"{platform}:{source}"
|
|
source_counts[key] = source_counts.get(key, 0) + 1
|
|
except:
|
|
continue
|
|
|
|
# Build community list from actual data and platform config
|
|
for key, count in source_counts.items():
|
|
platform, source = key.split(':', 1)
|
|
|
|
# Get display info from platform config
|
|
platform_info = platform_config.get('platforms', {}).get(platform, {})
|
|
community_info = None
|
|
|
|
if platform_info.get('supports_communities'):
|
|
for community in platform_info.get('communities', []):
|
|
if community['id'] == source:
|
|
community_info = community
|
|
break
|
|
|
|
# Create community entry
|
|
if community_info:
|
|
community_entry = {
|
|
'platform': platform,
|
|
'id': source,
|
|
'name': community_info['name'],
|
|
'display_name': community_info['display_name'],
|
|
'icon': community_info.get('icon', platform_info.get('icon', '📄')),
|
|
'count': count,
|
|
'description': community_info.get('description', '')
|
|
}
|
|
else:
|
|
# Fallback for sources not in config
|
|
display_name = get_display_name_for_source(platform, source, platform_config)
|
|
community_entry = {
|
|
'platform': platform,
|
|
'id': source,
|
|
'name': source or platform,
|
|
'display_name': display_name,
|
|
'icon': platform_info.get('icon', '📄'),
|
|
'count': count,
|
|
'description': f"Posts from {display_name}"
|
|
}
|
|
|
|
communities.append(community_entry)
|
|
|
|
# Sort communities by count (most posts first)
|
|
communities.sort(key=lambda x: x['count'], reverse=True)
|
|
|
|
return {
|
|
'platforms': platform_config.get('platforms', {}),
|
|
'communities': communities,
|
|
'total_communities': len(communities)
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error loading platform configuration: {e}")
|
|
return {
|
|
'platforms': {},
|
|
'communities': [],
|
|
'total_communities': 0,
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
@app.route('/api/content-timestamp')
|
|
def api_content_timestamp():
|
|
"""API endpoint to get the last content update timestamp for auto-refresh"""
|
|
try:
|
|
posts_dir = Path('data/posts')
|
|
|
|
if not posts_dir.exists():
|
|
return jsonify({'timestamp': 0})
|
|
|
|
# Get the most recent modification time of any post file
|
|
latest_mtime = 0
|
|
for post_file in posts_dir.glob('*.json'):
|
|
mtime = post_file.stat().st_mtime
|
|
if mtime > latest_mtime:
|
|
latest_mtime = mtime
|
|
|
|
return jsonify({'timestamp': latest_mtime})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting content timestamp: {e}")
|
|
return jsonify({'error': 'Failed to get content timestamp'}), 500
|
|
|
|
|
|
@app.route('/post/<post_id>')
|
|
def post_detail(post_id):
|
|
"""Serve individual post detail page with modern theme"""
|
|
try:
|
|
# Load platform configuration
|
|
platform_config = load_platform_config()
|
|
|
|
# Use cached data for better performance
|
|
cached_posts, cached_comments = _load_posts_cache()
|
|
|
|
# Get post data from cache
|
|
post_data = cached_posts.get(post_id)
|
|
if not post_data:
|
|
return render_template('404.html'), 404
|
|
|
|
# Add source display name
|
|
post_data['source_display'] = get_display_name_for_source(
|
|
post_data.get('platform', ''),
|
|
post_data.get('source', ''),
|
|
platform_config
|
|
)
|
|
|
|
# Get comments from cache
|
|
comments = cached_comments.get(post_id, [])
|
|
|
|
# Sort comments by timestamp
|
|
comments.sort(key=lambda x: x.get('timestamp', 0))
|
|
|
|
# Load user settings if authenticated
|
|
user_settings = {}
|
|
if current_user.is_authenticated:
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
return render_template('post_detail.html', post=post_data, comments=comments, user_settings=user_settings)
|
|
|
|
except Exception as e:
|
|
print(f"Error loading post {post_id}: {e}")
|
|
return render_template('404.html'), 404
|
|
|
|
|
|
@app.route('/themes/<path:filename>')
|
|
def serve_theme(filename):
|
|
"""Serve theme files (CSS, JS)"""
|
|
# Validate filename to prevent directory traversal
|
|
if not _is_safe_path(filename) or '..' in filename:
|
|
logger.warning(f"Unsafe theme file requested: {filename}")
|
|
abort(404)
|
|
return send_from_directory('themes', filename)
|
|
|
|
|
|
@app.route('/logo.png')
|
|
def serve_logo():
|
|
"""Serve logo"""
|
|
return send_from_directory('.', 'logo.png')
|
|
|
|
@app.route('/static/<path:filename>')
|
|
def serve_static(filename):
|
|
"""Serve static files (avatars, etc.)"""
|
|
# Validate filename to prevent directory traversal
|
|
if not _is_safe_path(filename) or '..' in filename:
|
|
logger.warning(f"Unsafe static file requested: {filename}")
|
|
abort(404)
|
|
return send_from_directory('static', filename)
|
|
|
|
|
|
# ============================================================
|
|
# AUTHENTICATION ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
"""Login page"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
remember = request.form.get('remember', False) == 'on'
|
|
|
|
if not user_service:
|
|
flash('User service not available', 'error')
|
|
return render_template('login.html')
|
|
|
|
user = user_service.authenticate(username, password)
|
|
|
|
if user:
|
|
login_user(user, remember=remember)
|
|
flash(f'Welcome back, {user.username}!', 'success')
|
|
|
|
# Redirect to next page or home
|
|
next_page = request.args.get('next')
|
|
return redirect(next_page) if next_page else redirect(url_for('index'))
|
|
else:
|
|
flash('Invalid username or password', 'error')
|
|
|
|
return render_template('login.html')
|
|
|
|
|
|
# Auth0 Routes
|
|
@app.route('/auth0/login')
|
|
def auth0_login():
|
|
"""Redirect to Auth0 for authentication"""
|
|
redirect_uri = url_for('auth0_callback', _external=True)
|
|
return auth0.authorize_redirect(redirect_uri)
|
|
|
|
|
|
@app.route('/auth0/callback')
|
|
def auth0_callback():
|
|
"""Handle Auth0 callback and create/login user"""
|
|
try:
|
|
# Get the access token from Auth0
|
|
token = auth0.authorize_access_token()
|
|
|
|
# Get user info from Auth0
|
|
user_info = token.get('userinfo')
|
|
if not user_info:
|
|
user_info = auth0.parse_id_token(token)
|
|
|
|
# Extract user details
|
|
auth0_id = user_info.get('sub')
|
|
email = user_info.get('email')
|
|
username = user_info.get('nickname') or user_info.get('preferred_username') or email.split('@')[0]
|
|
|
|
if not auth0_id or not email:
|
|
flash('Unable to get user information from Auth0', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
# Check if user exists with this Auth0 ID
|
|
user = user_service.get_user_by_auth0_id(auth0_id)
|
|
|
|
if not user:
|
|
# Check if user exists with this email (for account linking)
|
|
existing_user = user_service.get_user_by_email(email)
|
|
|
|
if existing_user:
|
|
# Link existing account to Auth0
|
|
user_service.link_auth0_account(existing_user.id, auth0_id)
|
|
user = existing_user
|
|
flash(f'Account linked successfully! Welcome back, {user.username}!', 'success')
|
|
else:
|
|
# Create new user
|
|
# Generate unique username if needed
|
|
base_username = username[:MAX_USERNAME_LENGTH-3] # Leave room for suffix
|
|
unique_username = base_username
|
|
counter = 1
|
|
while user_service.username_exists(unique_username):
|
|
unique_username = f"{base_username}_{counter}"
|
|
counter += 1
|
|
|
|
user_id = user_service.create_user(
|
|
username=unique_username,
|
|
email=email,
|
|
password=None, # No password for OAuth users
|
|
is_admin=False,
|
|
auth0_id=auth0_id
|
|
)
|
|
|
|
if user_id:
|
|
user = user_service.get_user_by_id(user_id)
|
|
flash(f'Account created successfully! Welcome, {user.username}!', 'success')
|
|
else:
|
|
flash('Failed to create user account', 'error')
|
|
return redirect(url_for('login'))
|
|
else:
|
|
flash(f'Welcome back, {user.username}!', 'success')
|
|
|
|
# Log in the user
|
|
if user:
|
|
login_user(user, remember=True)
|
|
|
|
# Store Auth0 info in session for future use
|
|
session['auth0_user_info'] = user_info
|
|
|
|
# Redirect to next page or home
|
|
next_page = request.args.get('next')
|
|
return redirect(next_page) if next_page else redirect(url_for('index'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Auth0 callback error: {e}")
|
|
flash('Authentication failed. Please try again.', 'error')
|
|
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
@app.route('/auth0/logout')
|
|
@login_required
|
|
def auth0_logout():
|
|
"""Logout from Auth0 and local session"""
|
|
# Clear session
|
|
session.clear()
|
|
logout_user()
|
|
|
|
# Build Auth0 logout URL
|
|
domain = app.config['AUTH0_DOMAIN']
|
|
client_id = app.config['AUTH0_CLIENT_ID']
|
|
return_to = url_for('index', _external=True)
|
|
|
|
logout_url = f'https://{domain}/v2/logout?' + urlencode({
|
|
'returnTo': return_to,
|
|
'client_id': client_id
|
|
}, quote_via=quote_plus)
|
|
|
|
return redirect(logout_url)
|
|
|
|
|
|
@app.route('/admin-setup', methods=['GET', 'POST'])
|
|
def admin_setup():
|
|
"""Create first admin user"""
|
|
# Check if users already exist
|
|
try:
|
|
user_count = User.query.count()
|
|
if user_count > 0:
|
|
flash('Admin user already exists.', 'info')
|
|
return redirect(url_for('login'))
|
|
except Exception as e:
|
|
logger.warning(f"Database error checking existing users: {e}")
|
|
pass
|
|
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
email = request.form.get('email')
|
|
password = request.form.get('password')
|
|
password_confirm = request.form.get('password_confirm')
|
|
|
|
# Validation
|
|
if not username or not email or not password:
|
|
flash('All fields are required', 'error')
|
|
return render_template('admin_setup.html')
|
|
|
|
if password != password_confirm:
|
|
flash('Passwords do not match', 'error')
|
|
return render_template('admin_setup.html')
|
|
|
|
if len(password) < MIN_PASSWORD_LENGTH:
|
|
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
|
|
return render_template('admin_setup.html')
|
|
|
|
# Create admin user
|
|
user_id = user_service.create_user(username, email, password, is_admin=True)
|
|
|
|
if user_id:
|
|
flash('Admin account created successfully! Please log in.', 'success')
|
|
return redirect(url_for('login'))
|
|
else:
|
|
flash('Error creating admin account. Please try again.', 'error')
|
|
|
|
return render_template('admin_setup.html')
|
|
|
|
|
|
@app.route('/signup', methods=['GET', 'POST'])
|
|
def signup():
|
|
"""Signup page"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
email = request.form.get('email')
|
|
password = request.form.get('password')
|
|
password_confirm = request.form.get('password_confirm')
|
|
|
|
if not user_service:
|
|
flash('User service not available', 'error')
|
|
return render_template('signup.html')
|
|
|
|
# Validation
|
|
if not username or not email or not password:
|
|
flash('All fields are required', 'error')
|
|
return render_template('signup.html')
|
|
|
|
if password != password_confirm:
|
|
flash('Passwords do not match', 'error')
|
|
return render_template('signup.html')
|
|
|
|
if len(password) < MIN_PASSWORD_LENGTH:
|
|
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
|
|
return render_template('signup.html')
|
|
|
|
if user_service.username_exists(username):
|
|
flash('Username already taken', 'error')
|
|
return render_template('signup.html')
|
|
|
|
if user_service.email_exists(email):
|
|
flash('Email already registered', 'error')
|
|
return render_template('signup.html')
|
|
|
|
# Create user
|
|
user_id = user_service.create_user(username, email, password)
|
|
|
|
if user_id:
|
|
flash('Account created successfully! Please log in.', 'success')
|
|
return redirect(url_for('login'))
|
|
else:
|
|
flash('Error creating account. Please try again.', 'error')
|
|
|
|
return render_template('signup.html')
|
|
|
|
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
"""Logout current user"""
|
|
logout_user()
|
|
flash('You have been logged out.', 'info')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/settings')
|
|
@login_required
|
|
def settings():
|
|
"""Main settings page"""
|
|
# Load user settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
# Load available filter sets
|
|
try:
|
|
with open('filtersets.json', 'r') as f:
|
|
filter_sets = json.load(f)
|
|
except:
|
|
filter_sets = {}
|
|
|
|
return render_template('settings.html',
|
|
user=current_user,
|
|
user_settings=user_settings,
|
|
filter_sets=filter_sets)
|
|
|
|
|
|
@app.route('/settings/profile', methods=['GET', 'POST'])
|
|
@login_required
|
|
def settings_profile():
|
|
"""Profile settings page"""
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
email = request.form.get('email')
|
|
default_avatar = request.form.get('default_avatar')
|
|
|
|
# Validation
|
|
if not username or not email:
|
|
flash('Username and email are required', 'error')
|
|
return render_template('settings_profile.html', user=current_user)
|
|
|
|
# Check if username is taken by another user
|
|
if username != current_user.username and user_service.username_exists(username):
|
|
flash('Username already taken', 'error')
|
|
return render_template('settings_profile.html', user=current_user)
|
|
|
|
# Check if email is taken by another user
|
|
if email != current_user.email and user_service.email_exists(email):
|
|
flash('Email already registered', 'error')
|
|
return render_template('settings_profile.html', user=current_user)
|
|
|
|
# Update user
|
|
current_user.username = username
|
|
current_user.email = email
|
|
|
|
# Handle default avatar selection
|
|
if default_avatar and default_avatar.startswith('default_'):
|
|
current_user.profile_picture_url = f"/static/default-avatars/{default_avatar}.png"
|
|
|
|
db.session.commit()
|
|
|
|
flash('Profile updated successfully', 'success')
|
|
return redirect(url_for('settings'))
|
|
|
|
# Available default avatars
|
|
default_avatars = [
|
|
{'id': 'default_1', 'name': 'Gradient Blue', 'bg': 'linear-gradient(135deg, #667eea 0%, #764ba2 100%)'},
|
|
{'id': 'default_2', 'name': 'Gradient Green', 'bg': 'linear-gradient(135deg, #4facfe 0%, #00f2fe 100%)'},
|
|
{'id': 'default_3', 'name': 'Gradient Orange', 'bg': 'linear-gradient(135deg, #fa709a 0%, #fee140 100%)'},
|
|
{'id': 'default_4', 'name': 'Gradient Purple', 'bg': 'linear-gradient(135deg, #a8edea 0%, #fed6e3 100%)'},
|
|
{'id': 'default_5', 'name': 'Brand Colors', 'bg': 'linear-gradient(135deg, #4db6ac 0%, #26a69a 100%)'},
|
|
{'id': 'default_6', 'name': 'Sunset', 'bg': 'linear-gradient(135deg, #ff7e5f 0%, #feb47b 100%)'},
|
|
]
|
|
|
|
return render_template('settings_profile.html', user=current_user, default_avatars=default_avatars)
|
|
|
|
|
|
@app.route('/settings/communities', methods=['GET', 'POST'])
|
|
@login_required
|
|
def settings_communities():
|
|
"""Community/source selection settings"""
|
|
if request.method == 'POST':
|
|
# Get selected communities
|
|
selected_communities = request.form.getlist('communities')
|
|
|
|
# Load current settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
# Update communities
|
|
user_settings['communities'] = selected_communities
|
|
|
|
# Save settings
|
|
current_user.settings = json.dumps(user_settings)
|
|
db.session.commit()
|
|
|
|
flash('Community preferences updated', 'success')
|
|
return redirect(url_for('settings'))
|
|
|
|
# Load current settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
selected_communities = user_settings.get('communities', [])
|
|
except:
|
|
selected_communities = []
|
|
|
|
# Available communities
|
|
available_communities = [
|
|
{'id': 'programming', 'name': 'Programming', 'platform': 'reddit'},
|
|
{'id': 'python', 'name': 'Python', 'platform': 'reddit'},
|
|
{'id': 'technology', 'name': 'Technology', 'platform': 'reddit'},
|
|
{'id': 'hackernews', 'name': 'Hacker News', 'platform': 'hackernews'},
|
|
{'id': 'lobsters', 'name': 'Lobsters', 'platform': 'lobsters'},
|
|
{'id': 'stackoverflow', 'name': 'Stack Overflow', 'platform': 'stackoverflow'},
|
|
]
|
|
|
|
return render_template('settings_communities.html',
|
|
user=current_user,
|
|
available_communities=available_communities,
|
|
selected_communities=selected_communities)
|
|
|
|
|
|
@app.route('/settings/filters', methods=['GET', 'POST'])
|
|
@login_required
|
|
def settings_filters():
|
|
"""Filter settings page"""
|
|
if request.method == 'POST':
|
|
selected_filter = request.form.get('filter_set', 'no_filter')
|
|
|
|
# Load and validate current settings
|
|
user_settings = _validate_user_settings(current_user.settings)
|
|
|
|
# Validate new filter setting
|
|
if _is_safe_filterset(selected_filter):
|
|
user_settings['filter_set'] = selected_filter
|
|
else:
|
|
flash('Invalid filter selection', 'error')
|
|
return redirect(url_for('settings'))
|
|
|
|
# Save validated settings
|
|
try:
|
|
current_user.settings = json.dumps(user_settings)
|
|
db.session.commit()
|
|
flash('Filter settings updated successfully', 'success')
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Error saving filter settings for user {current_user.id}: {e}")
|
|
flash('Error saving settings', 'error')
|
|
|
|
return redirect(url_for('settings'))
|
|
|
|
# Load current settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
current_filter = user_settings.get('filter_set', 'no_filter')
|
|
|
|
# Load available filter sets
|
|
filter_sets = {}
|
|
try:
|
|
with open('filtersets.json', 'r') as f:
|
|
filter_sets = json.load(f)
|
|
except:
|
|
filter_sets = {}
|
|
|
|
return render_template('settings_filters.html',
|
|
user=current_user,
|
|
filter_sets=filter_sets,
|
|
current_filter=current_filter)
|
|
|
|
|
|
@app.route('/settings/experience', methods=['GET', 'POST'])
|
|
@login_required
|
|
def settings_experience():
|
|
"""Experience and behavioral settings page - opt-in addictive features"""
|
|
if request.method == 'POST':
|
|
# Load current settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
# Get experience settings with defaults (all opt-in, so default to False)
|
|
user_settings['experience'] = {
|
|
'infinite_scroll': request.form.get('infinite_scroll') == 'on',
|
|
'auto_refresh': request.form.get('auto_refresh') == 'on',
|
|
'push_notifications': request.form.get('push_notifications') == 'on',
|
|
'dark_patterns_opt_in': request.form.get('dark_patterns_opt_in') == 'on'
|
|
}
|
|
|
|
# Save settings
|
|
current_user.settings = json.dumps(user_settings)
|
|
db.session.commit()
|
|
|
|
flash('Experience settings updated successfully', 'success')
|
|
return redirect(url_for('settings'))
|
|
|
|
# Load current settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
experience_settings = user_settings.get('experience', {
|
|
'infinite_scroll': False,
|
|
'auto_refresh': False,
|
|
'push_notifications': False,
|
|
'dark_patterns_opt_in': False
|
|
})
|
|
|
|
return render_template('settings_experience.html',
|
|
user=current_user,
|
|
experience_settings=experience_settings)
|
|
|
|
|
|
@app.route('/upload-avatar', methods=['POST'])
|
|
@login_required
|
|
def upload_avatar():
|
|
"""Upload profile picture"""
|
|
if 'avatar' not in request.files:
|
|
flash('No file selected', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
file = request.files['avatar']
|
|
if file.filename == '':
|
|
flash('No file selected', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
# Validate file type and size
|
|
if not _is_allowed_file(file.filename):
|
|
flash('Invalid file type. Please upload PNG, JPG, or GIF', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
# Check file size (Flask's MAX_CONTENT_LENGTH handles this too, but double-check)
|
|
if hasattr(file, 'content_length') and file.content_length > app.config['MAX_CONTENT_LENGTH']:
|
|
flash('File too large. Maximum size is 16MB', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
# Validate and secure filename
|
|
filename = secure_filename(file.filename)
|
|
if not filename or len(filename) > MAX_FILENAME_LENGTH:
|
|
flash('Invalid filename', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
# Add user ID to make filename unique and prevent conflicts
|
|
unique_filename = f"{current_user.id}_{filename}"
|
|
|
|
# Ensure upload directory exists and is secure
|
|
upload_dir = os.path.abspath(UPLOAD_FOLDER)
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
upload_path = os.path.join(upload_dir, unique_filename)
|
|
|
|
# Final security check - ensure path is within upload directory
|
|
if not os.path.abspath(upload_path).startswith(upload_dir):
|
|
logger.warning(f"Path traversal attempt in file upload: {upload_path}")
|
|
flash('Invalid file path', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
try:
|
|
file.save(upload_path)
|
|
logger.info(f"File uploaded successfully: {unique_filename} by user {current_user.id}")
|
|
except Exception as e:
|
|
logger.error(f"Error saving uploaded file: {e}")
|
|
flash('Error saving file', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
# Update user profile
|
|
current_user.profile_picture_url = f"/static/avatars/{unique_filename}"
|
|
db.session.commit()
|
|
|
|
flash('Profile picture updated successfully', 'success')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
|
|
@app.route('/profile')
|
|
@login_required
|
|
def profile():
|
|
"""User profile page"""
|
|
return render_template('profile.html', user=current_user)
|
|
|
|
|
|
# ============================================================
|
|
# ADMIN ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/admin')
|
|
@login_required
|
|
def admin_panel():
|
|
"""Admin panel - user management"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied. Admin privileges required.', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
if not user_service:
|
|
flash('User service not available', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
users = user_service.get_all_users()
|
|
return render_template('admin.html', users=users)
|
|
|
|
|
|
@app.route('/admin/user/<user_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def admin_delete_user(user_id):
|
|
"""Delete user (admin only)"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
# Prevent self-deletion
|
|
if current_user.id == user_id:
|
|
flash('You cannot delete your own account!', 'error')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
user = user_service.get_user_by_id(user_id)
|
|
if user:
|
|
username = user.username
|
|
if user_service.delete_user(user_id):
|
|
flash(f'User {username} has been deleted.', 'success')
|
|
logger.info(f"Admin {current_user.id} deleted user {username} ({user_id})")
|
|
else:
|
|
flash('Error deleting user', 'error')
|
|
logger.error(f"Failed to delete user {user_id}")
|
|
else:
|
|
flash('User not found', 'error')
|
|
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
|
|
@app.route('/admin/user/<user_id>/toggle-admin', methods=['POST'])
|
|
@login_required
|
|
def admin_toggle_admin(user_id):
|
|
"""Toggle user admin status"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
target_user = user_service.get_user_by_id(user_id)
|
|
|
|
if target_user:
|
|
new_status = not target_user.is_admin # Toggle admin status
|
|
user_service.update_user_admin_status(user_id, new_status)
|
|
flash('Admin status updated', 'success')
|
|
else:
|
|
flash('User not found', 'error')
|
|
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
|
|
# This route is duplicate - removed in favor of the UUID-based route above
|
|
|
|
|
|
# This route is duplicate - removed in favor of the UUID-based route above
|
|
|
|
|
|
@app.route('/admin/regenerate_content', methods=['POST'])
|
|
@login_required
|
|
def admin_regenerate_content():
|
|
"""Regenerate all HTML content"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
try:
|
|
import subprocess
|
|
import shlex
|
|
|
|
# Secure subprocess execution with absolute paths and validation
|
|
script_path = os.path.abspath('generate_html.py')
|
|
if not os.path.exists(script_path):
|
|
flash('Content generation script not found', 'error')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
# Use absolute python path and validate arguments
|
|
python_exe = os.path.abspath(os.sys.executable)
|
|
cmd = [python_exe, script_path, '--filterset', 'no_filter', '--theme', 'vanilla-js']
|
|
|
|
# Execute with timeout and security restrictions
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=os.path.abspath('.'),
|
|
timeout=300, # 5 minute timeout
|
|
check=False
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
flash('Content regenerated successfully', 'success')
|
|
logger.info(f"Content regenerated by admin user {current_user.id}")
|
|
# Invalidate cache since content was regenerated
|
|
_invalidate_cache()
|
|
else:
|
|
flash('Error regenerating content', 'error')
|
|
logger.error(f"Content regeneration failed: {result.stderr}")
|
|
|
|
except subprocess.TimeoutExpired:
|
|
flash('Content regeneration timed out', 'error')
|
|
logger.error("Content regeneration timed out")
|
|
except Exception as e:
|
|
flash(f'Error regenerating content: {str(e)}', 'error')
|
|
logger.error(f"Content regeneration error: {e}")
|
|
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
|
|
@app.route('/admin/clear_cache', methods=['POST'])
|
|
@login_required
|
|
def admin_clear_cache():
|
|
"""Clear application cache"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
try:
|
|
# Clear any cache directories or temp files
|
|
import shutil
|
|
import os
|
|
|
|
cache_dirs = ['cache', 'temp']
|
|
for cache_dir in cache_dirs:
|
|
if os.path.exists(cache_dir):
|
|
shutil.rmtree(cache_dir)
|
|
|
|
# Clear application cache
|
|
_invalidate_cache()
|
|
|
|
flash('Cache cleared successfully', 'success')
|
|
logger.info(f"Cache cleared by admin user {current_user.id}")
|
|
except Exception as e:
|
|
flash(f'Error clearing cache: {str(e)}', 'error')
|
|
logger.error(f"Cache clearing error: {e}")
|
|
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
|
|
@app.route('/admin/backup_data', methods=['POST'])
|
|
@login_required
|
|
def admin_backup_data():
|
|
"""Create backup of application data"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
try:
|
|
import shutil
|
|
import os
|
|
from datetime import datetime
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
backup_name = f'balanceboard_backup_{timestamp}'
|
|
|
|
# Create backup directory
|
|
backup_dir = f'backups/{backup_name}'
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
|
|
# Copy important directories
|
|
dirs_to_backup = ['data', 'templates', 'themes', 'static']
|
|
for dir_name in dirs_to_backup:
|
|
if os.path.exists(dir_name):
|
|
shutil.copytree(dir_name, f'{backup_dir}/{dir_name}')
|
|
|
|
# Copy important files
|
|
files_to_backup = ['app.py', 'models.py', 'database.py', 'filtersets.json']
|
|
for file_name in files_to_backup:
|
|
if os.path.exists(file_name):
|
|
shutil.copy2(file_name, backup_dir)
|
|
|
|
flash(f'Backup created: {backup_name}', 'success')
|
|
except Exception as e:
|
|
flash(f'Error creating backup: {str(e)}', 'error')
|
|
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
|
|
# ============================================================
|
|
# POLLING MANAGEMENT ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/admin/polling')
|
|
@login_required
|
|
def admin_polling():
|
|
"""Admin polling management page"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied. Admin privileges required.', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource, PollLog
|
|
from polling_service import polling_service
|
|
|
|
# Get all poll sources with recent logs
|
|
sources = PollSource.query.order_by(PollSource.platform, PollSource.display_name).all()
|
|
|
|
# Get scheduler status
|
|
scheduler_status = polling_service.get_status()
|
|
|
|
# Load platform config for available sources
|
|
platform_config = load_platform_config()
|
|
|
|
return render_template('admin_polling.html',
|
|
sources=sources,
|
|
scheduler_status=scheduler_status,
|
|
platform_config=platform_config)
|
|
|
|
|
|
@app.route('/admin/polling/add', methods=['POST'])
|
|
@login_required
|
|
def admin_polling_add():
|
|
"""Add a new poll source"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource
|
|
|
|
platform = request.form.get('platform')
|
|
source_id = request.form.get('source_id')
|
|
custom_source_id = request.form.get('custom_source_id')
|
|
display_name = request.form.get('display_name')
|
|
poll_interval = int(request.form.get('poll_interval', 60))
|
|
max_posts = int(request.form.get('max_posts', 100))
|
|
fetch_comments = request.form.get('fetch_comments', 'true') == 'true'
|
|
priority = request.form.get('priority', 'medium')
|
|
|
|
# Use custom source if provided, otherwise use dropdown
|
|
if custom_source_id and custom_source_id.strip():
|
|
source_id = custom_source_id.strip()
|
|
|
|
if not platform or not source_id or not display_name:
|
|
flash('Missing required fields', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
# Check if source already exists
|
|
existing = PollSource.query.filter_by(platform=platform, source_id=source_id).first()
|
|
if existing:
|
|
flash(f'Source {platform}:{source_id} already exists', 'warning')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
# Create new source (disabled by default)
|
|
source = PollSource(
|
|
platform=platform,
|
|
source_id=source_id,
|
|
display_name=display_name,
|
|
poll_interval_minutes=poll_interval,
|
|
max_posts=max_posts,
|
|
fetch_comments=fetch_comments,
|
|
priority=priority,
|
|
enabled=False,
|
|
created_by=current_user.id
|
|
)
|
|
|
|
db.session.add(source)
|
|
db.session.commit()
|
|
|
|
flash(f'Added polling source: {display_name}', 'success')
|
|
logger.info(f"Admin {current_user.id} added poll source {platform}:{source_id}")
|
|
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
|
|
@app.route('/admin/polling/<source_id>/toggle', methods=['POST'])
|
|
@login_required
|
|
def admin_polling_toggle(source_id):
|
|
"""Toggle a poll source on/off"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource
|
|
|
|
source = PollSource.query.get(source_id)
|
|
if not source:
|
|
flash('Source not found', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
source.enabled = not source.enabled
|
|
db.session.commit()
|
|
|
|
status = 'enabled' if source.enabled else 'disabled'
|
|
flash(f'Polling {status} for {source.display_name}', 'success')
|
|
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
|
|
@app.route('/admin/polling/<source_id>/update', methods=['POST'])
|
|
@login_required
|
|
def admin_polling_update(source_id):
|
|
"""Update poll source configuration"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource
|
|
|
|
source = PollSource.query.get(source_id)
|
|
if not source:
|
|
flash('Source not found', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
# Update all configurable fields
|
|
if request.form.get('poll_interval'):
|
|
source.poll_interval_minutes = int(request.form.get('poll_interval'))
|
|
|
|
if request.form.get('max_posts'):
|
|
source.max_posts = int(request.form.get('max_posts'))
|
|
|
|
if request.form.get('fetch_comments') is not None:
|
|
source.fetch_comments = request.form.get('fetch_comments') == 'true'
|
|
|
|
if request.form.get('priority'):
|
|
source.priority = request.form.get('priority')
|
|
|
|
if request.form.get('display_name'):
|
|
source.display_name = request.form.get('display_name')
|
|
|
|
db.session.commit()
|
|
flash(f'Updated settings for {source.display_name}', 'success')
|
|
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
|
|
@app.route('/admin/polling/<source_id>/poll-now', methods=['POST'])
|
|
@login_required
|
|
def admin_polling_poll_now(source_id):
|
|
"""Manually trigger polling for a source"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource
|
|
from polling_service import polling_service
|
|
|
|
source = PollSource.query.get(source_id)
|
|
if not source:
|
|
flash('Source not found', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
# Trigger polling in background
|
|
try:
|
|
polling_service.poll_now(source_id)
|
|
flash(f'Polling started for {source.display_name}', 'success')
|
|
except Exception as e:
|
|
flash(f'Error starting poll: {str(e)}', 'error')
|
|
logger.error(f"Error triggering poll for {source_id}: {e}")
|
|
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
|
|
@app.route('/admin/polling/<source_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def admin_polling_delete(source_id):
|
|
"""Delete a poll source"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource
|
|
|
|
source = PollSource.query.get(source_id)
|
|
if not source:
|
|
flash('Source not found', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
display_name = source.display_name
|
|
db.session.delete(source)
|
|
db.session.commit()
|
|
|
|
flash(f'Deleted polling source: {display_name}', 'success')
|
|
logger.info(f"Admin {current_user.id} deleted poll source {source_id}")
|
|
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
|
|
@app.route('/admin/polling/<source_id>/logs')
|
|
@login_required
|
|
def admin_polling_logs(source_id):
|
|
"""View logs for a specific poll source"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource, PollLog
|
|
|
|
source = PollSource.query.get(source_id)
|
|
if not source:
|
|
flash('Source not found', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
# Get recent logs (limit to 50)
|
|
logs = source.logs.limit(50).all()
|
|
|
|
return render_template('admin_polling_logs.html',
|
|
source=source,
|
|
logs=logs)
|
|
|
|
|
|
# ============================================================
|
|
# ERROR HANDLERS
|
|
# ============================================================
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(e):
|
|
"""404 page"""
|
|
return render_template('404.html'), 404
|
|
|
|
|
|
@app.errorhandler(500)
|
|
def server_error(e):
|
|
"""500 page"""
|
|
return render_template('500.html'), 500
|
|
|
|
|
|
# ============================================================
|
|
# INITIALIZATION
|
|
# ============================================================
|
|
|
|
if __name__ == '__main__':
|
|
print("✓ BalanceBoard starting...")
|
|
print("✓ Database: PostgreSQL with SQLAlchemy")
|
|
print("✓ Password hashing: bcrypt")
|
|
print("✓ Authentication: Flask-Login")
|
|
|
|
app.run(host='0.0.0.0', port=DEFAULT_PORT, debug=True)
|