"""
BalanceBoard Web Application
Flask server with user authentication and content serving.
"""
import os
import re
import logging
import time
from pathlib import Path
from werkzeug.utils import secure_filename
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, abort, session, jsonify
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from dotenv import load_dotenv
from functools import lru_cache
from collections import defaultdict
from authlib.integrations.flask_client import OAuth
from urllib.parse import quote_plus, urlencode
from database import init_db, db
from models import User, bcrypt
from user_service import UserService
import json
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__,
static_folder='themes',
template_folder='templates')
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
app.config['ALLOW_ANONYMOUS_ACCESS'] = os.getenv('ALLOW_ANONYMOUS_ACCESS', 'true').lower() == 'true'
# Application branding configuration
app.config['APP_NAME'] = os.getenv('APP_NAME', 'BalanceBoard')
app.config['LOGO_PATH'] = os.getenv('LOGO_PATH', 'logo.png')
# Auth0 Configuration
app.config['AUTH0_DOMAIN'] = os.getenv('AUTH0_DOMAIN', '')
app.config['AUTH0_CLIENT_ID'] = os.getenv('AUTH0_CLIENT_ID', '')
app.config['AUTH0_CLIENT_SECRET'] = os.getenv('AUTH0_CLIENT_SECRET', '')
app.config['AUTH0_AUDIENCE'] = os.getenv('AUTH0_AUDIENCE', '')
# Configuration constants
# Note: ALLOWED_FILTERSETS will be dynamically loaded from filter_engine
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
UPLOAD_FOLDER = 'static/avatars'
MAX_FILENAME_LENGTH = 100
DEFAULT_PORT = 5021
DEFAULT_PAGE_SIZE = 20
MIN_PASSWORD_LENGTH = 8
MAX_USERNAME_LENGTH = 80
MAX_EMAIL_LENGTH = 120
MAX_COMMUNITY_NAME_LENGTH = 100
# Initialize database
init_db(app)
# Initialize bcrypt
bcrypt.init_app(app)
# Initialize Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Please log in to access this page.'
# Initialize user service
user_service = UserService()
# Initialize polling service
from polling_service import polling_service
polling_service.init_app(app)
polling_service.start()
# Initialize filter engine
from filter_pipeline import FilterEngine
filter_engine = FilterEngine.get_instance()
logger.info(f"FilterEngine initialized with {len(filter_engine.get_available_filtersets())} filtersets")
# Initialize OAuth for Auth0
oauth = OAuth(app)
auth0 = oauth.register(
'auth0',
client_id=app.config['AUTH0_CLIENT_ID'],
client_secret=app.config['AUTH0_CLIENT_SECRET'],
server_metadata_url=f'https://{app.config["AUTH0_DOMAIN"]}/.well-known/openid_configuration',
client_kwargs={
'scope': 'openid profile email',
}
)
# Cache for posts and comments - improves performance
post_cache = {}
comment_cache = defaultdict(list)
cache_timestamp = 0
CACHE_DURATION = 300 # 5 minutes
# Security helper functions
def _is_safe_filterset(filterset):
"""Validate filterset name for security"""
if not filterset or not isinstance(filterset, str):
return False
# Check against available filtersets from filter_engine
allowed_filtersets = set(filter_engine.get_available_filtersets())
return filterset in allowed_filtersets and re.match(r'^[a-zA-Z0-9_-]+$', filterset)
def _is_safe_path(path):
"""Validate file path for security"""
if not path or not isinstance(path, str):
return False
# Check for directory traversal attempts
if '..' in path or path.startswith('/') or '\\' in path:
return False
# Only allow alphanumeric, dots, hyphens, underscores, and forward slashes
return re.match(r'^[a-zA-Z0-9._/-]+$', path) is not None
def _is_allowed_file(filename):
"""Check if file extension is allowed"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def _load_posts_cache():
"""Load and cache posts data for better performance"""
global post_cache, comment_cache, cache_timestamp
current_time = time.time()
if current_time - cache_timestamp < CACHE_DURATION and post_cache:
return post_cache, comment_cache
# Clear existing cache
post_cache.clear()
comment_cache.clear()
posts_dir = Path('data/posts')
comments_dir = Path('data/comments')
# Load all posts
if posts_dir.exists():
for post_file in posts_dir.glob('*.json'):
try:
with open(post_file, 'r') as f:
post_data = json.load(f)
post_uuid = post_data.get('uuid')
if post_uuid:
post_cache[post_uuid] = post_data
except (json.JSONDecodeError, IOError) as e:
logger.debug(f"Error reading post file {post_file}: {e}")
continue
# Load all comments and group by post UUID
if comments_dir.exists():
for comment_file in comments_dir.glob('*.json'):
try:
with open(comment_file, 'r') as f:
comment_data = json.load(f)
post_uuid = comment_data.get('post_uuid')
if post_uuid:
comment_cache[post_uuid].append(comment_data)
except (json.JSONDecodeError, IOError) as e:
logger.debug(f"Error reading comment file {comment_file}: {e}")
continue
cache_timestamp = current_time
logger.info(f"Cache refreshed: {len(post_cache)} posts, {len(comment_cache)} comment groups")
return post_cache, comment_cache
def _invalidate_cache():
"""Invalidate the cache to force refresh"""
global cache_timestamp
cache_timestamp = 0
def _validate_user_settings(settings_str):
"""Validate and sanitize user settings JSON"""
try:
if not settings_str:
return {}
settings = json.loads(settings_str)
if not isinstance(settings, dict):
logger.warning("User settings must be a JSON object")
return {}
# Validate specific fields
validated = {}
# Filter set validation
if 'filter_set' in settings:
filter_set = settings['filter_set']
if isinstance(filter_set, str) and _is_safe_filterset(filter_set):
validated['filter_set'] = filter_set
# Communities validation
if 'communities' in settings:
communities = settings['communities']
if isinstance(communities, list):
# Validate each community name
safe_communities = []
for community in communities:
if isinstance(community, str) and len(community) <= MAX_COMMUNITY_NAME_LENGTH and re.match(r'^[a-zA-Z0-9_-]+$', community):
safe_communities.append(community)
validated['communities'] = safe_communities
# Experience settings validation
if 'experience' in settings:
exp = settings['experience']
if isinstance(exp, dict):
safe_exp = {}
bool_fields = ['infinite_scroll', 'auto_refresh', 'push_notifications', 'dark_patterns_opt_in', 'time_filter_enabled']
for field in bool_fields:
if field in exp and isinstance(exp[field], bool):
safe_exp[field] = exp[field]
# Handle time_filter_days as integer
if 'time_filter_days' in exp and isinstance(exp['time_filter_days'], int) and exp['time_filter_days'] > 0:
safe_exp['time_filter_days'] = exp['time_filter_days']
validated['experience'] = safe_exp
return validated
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Invalid user settings JSON: {e}")
return {}
# Add custom Jinja filters
@app.template_filter('nl2br')
def nl2br_filter(text):
"""Convert newlines to
tags"""
if not text:
return text
return text.replace('\n', '
\n')
@login_manager.user_loader
def load_user(user_id):
"""Load user by ID for Flask-Login"""
return user_service.get_user_by_id(user_id)
# ============================================================
# STATIC CONTENT ROUTES
# ============================================================
@app.before_request
def check_first_user():
"""Check if any users exist, redirect to admin creation if not"""
# Skip for static files and auth routes
if request.endpoint and (
request.endpoint.startswith('static') or
request.endpoint in ['login', 'signup', 'admin_setup', 'serve_theme', 'serve_logo']
):
return
# Skip if user is already authenticated
if current_user.is_authenticated:
return
# Check if any users exist
try:
user_count = User.query.count()
if user_count == 0:
return redirect(url_for('admin_setup'))
except Exception as e:
# If database is not ready, skip check
logger.warning(f"Database not ready for user count check: {e}")
pass
def calculate_quick_stats():
"""Calculate quick stats for dashboard"""
from datetime import datetime, timedelta
cached_posts, _ = _load_posts_cache()
# Calculate posts from today (last 24 hours)
now = datetime.utcnow()
today_start = now - timedelta(hours=24)
today_timestamp = today_start.timestamp()
posts_today = sum(1 for post in cached_posts.values()
if post.get('timestamp', 0) >= today_timestamp)
return {
'posts_today': posts_today,
'total_posts': len(cached_posts)
}
@app.route('/')
def index():
"""Serve the main feed page"""
# Calculate stats
quick_stats = calculate_quick_stats()
if current_user.is_authenticated:
# Load user settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Invalid user settings JSON for user {current_user.id}: {e}")
user_settings = {}
return render_template('dashboard.html', user_settings=user_settings, quick_stats=quick_stats)
else:
# Check if anonymous access is allowed
if app.config.get('ALLOW_ANONYMOUS_ACCESS', False):
# Anonymous mode - allow browsing with default settings
user_settings = {
'filter_set': 'no_filter',
'communities': [],
'experience': {
'infinite_scroll': False,
'auto_refresh': False,
'push_notifications': False,
'dark_patterns_opt_in': False,
'time_filter_enabled': False,
'time_filter_days': 7
}
}
return render_template('dashboard.html', user_settings=user_settings, anonymous=True, quick_stats=quick_stats)
else:
# Redirect non-authenticated users to login
return redirect(url_for('login'))
@app.route('/feed/')
def feed_content(filterset='no_filter'):
"""Serve filtered feed content"""
# Validate filterset to prevent directory traversal
if not _is_safe_filterset(filterset):
logger.warning(f"Invalid filterset requested: {filterset}")
abort(404)
# Additional path validation
safe_path = os.path.normpath(f'active_html/{filterset}/index.html')
if not safe_path.startswith('active_html/'):
logger.warning(f"Path traversal attempt detected: {filterset}")
abort(404)
return send_from_directory(f'active_html/{filterset}', 'index.html')
def load_platform_config():
"""Load platform configuration"""
try:
with open('platform_config.json', 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
logger.warning(f"Could not load platform config: {e}")
return {"platforms": {}, "collection_targets": []}
def get_display_name_for_source(platform, source, platform_config):
"""Get proper display name for a source based on platform"""
if not platform_config or 'platforms' not in platform_config:
return source
platform_info = platform_config['platforms'].get(platform, {})
# For platforms with communities, find the community info
if platform_info.get('supports_communities'):
for community in platform_info.get('communities', []):
if community['id'] == source:
return community['display_name']
# Fallback to prefix + source for Reddit-like platforms
prefix = platform_info.get('prefix', '')
return f"{prefix}{source}" if source else platform_info.get('name', platform)
else:
# For platforms without communities, use the platform name
return platform_info.get('name', platform)
@app.route('/api/posts')
def api_posts():
"""API endpoint to get posts data with pagination and filtering"""
try:
# Load platform configuration
platform_config = load_platform_config()
# Get query parameters
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE))
community = request.args.get('community', '')
platform = request.args.get('platform', '')
search_query = request.args.get('q', '').lower().strip()
filter_override = request.args.get('filter', '')
# Get user's filterset preference, community selections, and time filter
filterset_name = 'no_filter'
user_communities = []
time_filter_enabled = False
time_filter_days = 7
if current_user.is_authenticated:
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
filterset_name = user_settings.get('filter_set', 'no_filter')
user_communities = user_settings.get('communities', [])
experience_settings = user_settings.get('experience', {})
time_filter_enabled = experience_settings.get('time_filter_enabled', False)
time_filter_days = experience_settings.get('time_filter_days', 7)
except:
filterset_name = 'no_filter'
user_communities = []
time_filter_enabled = False
time_filter_days = 7
# Override filterset if specified in request (for sidebar filter switching)
if filter_override and _is_safe_filterset(filter_override):
filterset_name = filter_override
# Use cached data for better performance
cached_posts, cached_comments = _load_posts_cache()
# Calculate time filter cutoff if enabled
time_cutoff = None
if time_filter_enabled:
from datetime import datetime, timedelta
cutoff_date = datetime.utcnow() - timedelta(days=time_filter_days)
time_cutoff = cutoff_date.timestamp()
# Collect raw posts for filtering
raw_posts = []
for post_uuid, post_data in cached_posts.items():
# Apply time filter first if enabled
if time_filter_enabled and time_cutoff:
post_timestamp = post_data.get('timestamp', 0)
if post_timestamp < time_cutoff:
continue
# Apply community filter (before filterset)
if community and post_data.get('source', '').lower() != community.lower():
continue
# Apply platform filter (before filterset)
if platform and post_data.get('platform', '').lower() != platform.lower():
continue
# Apply user's community preferences (before filterset)
if user_communities:
post_source = post_data.get('source', '').lower()
post_platform = post_data.get('platform', '').lower()
# Check if this post matches any of the user's selected communities
matches_community = False
for selected_community in user_communities:
selected_community = selected_community.lower()
# Match by exact source name or platform name
if (post_source == selected_community or
post_platform == selected_community or
selected_community in post_source):
matches_community = True
break
if not matches_community:
continue
# Apply search filter (before filterset)
if search_query:
title = post_data.get('title', '').lower()
content = post_data.get('content', '').lower()
author = post_data.get('author', '').lower()
source = post_data.get('source', '').lower()
if not (search_query in title or
search_query in content or
search_query in author or
search_query in source):
continue
raw_posts.append(post_data)
# Apply filterset using FilterEngine
filtered_posts = filter_engine.apply_filterset(raw_posts, filterset_name, use_cache=True)
# Build response posts with metadata
posts = []
for post_data in filtered_posts:
post_uuid = post_data.get('uuid')
comment_count = len(cached_comments.get(post_uuid, []))
# Get proper display name for source
source_display = get_display_name_for_source(
post_data.get('platform', ''),
post_data.get('source', ''),
platform_config
)
# Create post object with filter metadata
post = {
'id': post_uuid,
'title': post_data.get('title', 'Untitled'),
'author': post_data.get('author', 'Unknown'),
'platform': post_data.get('platform', 'unknown'),
'score': post_data.get('score', 0),
'timestamp': post_data.get('timestamp', 0),
'url': f'/post/{post_uuid}',
'comments_count': comment_count,
'content_preview': (post_data.get('content', '') or '')[:200] + '...' if post_data.get('content') else '',
'source': post_data.get('source', ''),
'source_display': source_display,
'tags': post_data.get('tags', []),
'external_url': post_data.get('url', ''),
# Add filter metadata
'filter_score': post_data.get('_filter_score', 0.5),
'filter_categories': post_data.get('_filter_categories', []),
'filter_tags': post_data.get('_filter_tags', [])
}
posts.append(post)
# Sort by filter score (highest first), then timestamp
posts.sort(key=lambda x: (x['filter_score'], x['timestamp']), reverse=True)
# Calculate pagination
total_posts = len(posts)
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_posts = posts[start_idx:end_idx]
total_pages = (total_posts + per_page - 1) // per_page
has_next = page < total_pages
has_prev = page > 1
return {
'posts': paginated_posts,
'pagination': {
'current_page': page,
'total_pages': total_pages,
'total_posts': total_posts,
'per_page': per_page,
'has_next': has_next,
'has_prev': has_prev
}
}
except Exception as e:
print(f"Error loading posts: {e}")
return {'posts': [], 'error': str(e), 'pagination': {'current_page': 1, 'total_pages': 0, 'total_posts': 0, 'per_page': DEFAULT_PAGE_SIZE, 'has_next': False, 'has_prev': False}}
@app.route('/api/platforms')
def api_platforms():
"""API endpoint to get platform configuration and available communities"""
try:
platform_config = load_platform_config()
# Build community list for filtering UI
communities = []
posts_dir = Path('data/posts')
source_counts = {}
# Count posts per source to show actual available communities
for post_file in posts_dir.glob('*.json'):
try:
with open(post_file, 'r') as f:
post_data = json.load(f)
platform = post_data.get('platform', 'unknown')
source = post_data.get('source', '')
key = f"{platform}:{source}"
source_counts[key] = source_counts.get(key, 0) + 1
except:
continue
# Build community list from actual data and platform config
for key, count in source_counts.items():
platform, source = key.split(':', 1)
# Get display info from platform config
platform_info = platform_config.get('platforms', {}).get(platform, {})
community_info = None
if platform_info.get('supports_communities'):
for community in platform_info.get('communities', []):
if community['id'] == source:
community_info = community
break
# Create community entry
if community_info:
community_entry = {
'platform': platform,
'id': source,
'name': community_info['name'],
'display_name': community_info['display_name'],
'icon': community_info.get('icon', platform_info.get('icon', '📄')),
'count': count,
'description': community_info.get('description', '')
}
else:
# Fallback for sources not in config
display_name = get_display_name_for_source(platform, source, platform_config)
community_entry = {
'platform': platform,
'id': source,
'name': source or platform,
'display_name': display_name,
'icon': platform_info.get('icon', '📄'),
'count': count,
'description': f"Posts from {display_name}"
}
communities.append(community_entry)
# Sort communities by count (most posts first)
communities.sort(key=lambda x: x['count'], reverse=True)
return {
'platforms': platform_config.get('platforms', {}),
'communities': communities,
'total_communities': len(communities)
}
except Exception as e:
print(f"Error loading platform configuration: {e}")
return {
'platforms': {},
'communities': [],
'total_communities': 0,
'error': str(e)
}
@app.route('/api/content-timestamp')
def api_content_timestamp():
"""API endpoint to get the last content update timestamp for auto-refresh"""
try:
posts_dir = Path('data/posts')
if not posts_dir.exists():
return jsonify({'timestamp': 0})
# Get the most recent modification time of any post file
latest_mtime = 0
for post_file in posts_dir.glob('*.json'):
mtime = post_file.stat().st_mtime
if mtime > latest_mtime:
latest_mtime = mtime
return jsonify({'timestamp': latest_mtime})
except Exception as e:
logger.error(f"Error getting content timestamp: {e}")
return jsonify({'error': 'Failed to get content timestamp'}), 500
@app.route('/api/bookmark', methods=['POST'])
@login_required
def api_bookmark():
"""Toggle bookmark status for a post"""
try:
from models import Bookmark
data = request.get_json()
if not data or 'post_uuid' not in data:
return jsonify({'error': 'Missing post_uuid'}), 400
post_uuid = data['post_uuid']
if not post_uuid:
return jsonify({'error': 'Invalid post_uuid'}), 400
# Check if bookmark already exists
existing_bookmark = Bookmark.query.filter_by(
user_id=current_user.id,
post_uuid=post_uuid
).first()
if existing_bookmark:
# Remove bookmark
db.session.delete(existing_bookmark)
db.session.commit()
return jsonify({'bookmarked': False, 'message': 'Bookmark removed'})
else:
# Add bookmark - get post data for caching
cached_posts, _ = _load_posts_cache()
post_data = cached_posts.get(post_uuid, {})
bookmark = Bookmark(
user_id=current_user.id,
post_uuid=post_uuid,
title=post_data.get('title', ''),
platform=post_data.get('platform', ''),
source=post_data.get('source', '')
)
db.session.add(bookmark)
db.session.commit()
return jsonify({'bookmarked': True, 'message': 'Bookmark added'})
except Exception as e:
logger.error(f"Error toggling bookmark: {e}")
return jsonify({'error': 'Failed to toggle bookmark'}), 500
@app.route('/api/bookmarks')
@login_required
def api_bookmarks():
"""Get user's bookmarks"""
try:
from models import Bookmark
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE))
# Get user's bookmarks with pagination
bookmarks_query = Bookmark.query.filter_by(user_id=current_user.id).order_by(Bookmark.created_at.desc())
total_bookmarks = bookmarks_query.count()
bookmarks = bookmarks_query.offset((page - 1) * per_page).limit(per_page).all()
# Load current posts cache to get updated data
cached_posts, cached_comments = _load_posts_cache()
# Build response
bookmark_posts = []
for bookmark in bookmarks:
# Try to get current post data, fallback to cached data
post_data = cached_posts.get(bookmark.post_uuid)
if post_data:
# Post still exists in current data
comment_count = len(cached_comments.get(bookmark.post_uuid, []))
post = {
'id': bookmark.post_uuid,
'title': post_data.get('title', bookmark.title or 'Untitled'),
'author': post_data.get('author', 'Unknown'),
'platform': post_data.get('platform', bookmark.platform or 'unknown'),
'score': post_data.get('score', 0),
'timestamp': post_data.get('timestamp', 0),
'url': f'/post/{bookmark.post_uuid}',
'comments_count': comment_count,
'content_preview': (post_data.get('content', '') or '')[:200] + '...' if post_data.get('content') else '',
'source': post_data.get('source', bookmark.source or ''),
'bookmarked_at': bookmark.created_at.isoformat(),
'external_url': post_data.get('url', '')
}
else:
# Post no longer in current data, use cached bookmark data
post = {
'id': bookmark.post_uuid,
'title': bookmark.title or 'Untitled',
'author': 'Unknown',
'platform': bookmark.platform or 'unknown',
'score': 0,
'timestamp': 0,
'url': f'/post/{bookmark.post_uuid}',
'comments_count': 0,
'content_preview': 'Content no longer available',
'source': bookmark.source or '',
'bookmarked_at': bookmark.created_at.isoformat(),
'external_url': '',
'archived': True # Mark as archived
}
bookmark_posts.append(post)
total_pages = (total_bookmarks + per_page - 1) // per_page
has_next = page < total_pages
has_prev = page > 1
return jsonify({
'posts': bookmark_posts,
'pagination': {
'current_page': page,
'total_pages': total_pages,
'total_posts': total_bookmarks,
'per_page': per_page,
'has_next': has_next,
'has_prev': has_prev
}
})
except Exception as e:
logger.error(f"Error getting bookmarks: {e}")
return jsonify({'error': 'Failed to get bookmarks'}), 500
@app.route('/api/bookmark-status/')
@login_required
def api_bookmark_status(post_uuid):
"""Check if a post is bookmarked by current user"""
try:
from models import Bookmark
bookmark = Bookmark.query.filter_by(
user_id=current_user.id,
post_uuid=post_uuid
).first()
return jsonify({'bookmarked': bookmark is not None})
except Exception as e:
logger.error(f"Error checking bookmark status: {e}")
return jsonify({'error': 'Failed to check bookmark status'}), 500
@app.route('/api/filters')
def api_filters():
"""API endpoint to get available filters"""
try:
filters = []
# Get current user's filter preference
current_filter = 'no_filter'
if current_user.is_authenticated:
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
current_filter = user_settings.get('filter_set', 'no_filter')
except:
pass
# Get available filtersets from filter engine
for filterset_name in filter_engine.get_available_filtersets():
filterset_config = filter_engine.config.get_filterset(filterset_name)
if filterset_config:
# Map filter names to icons and display names
icon_map = {
'no_filter': '🌐',
'safe_content': '✅',
'tech_only': '💻',
'high_quality': '⭐',
'custom_example': '🎯'
}
name_map = {
'no_filter': 'All Content',
'safe_content': 'Safe Content',
'tech_only': 'Tech Only',
'high_quality': 'High Quality',
'custom_example': 'Custom Example'
}
filters.append({
'id': filterset_name,
'name': name_map.get(filterset_name, filterset_name.replace('_', ' ').title()),
'description': filterset_config.get('description', ''),
'icon': icon_map.get(filterset_name, '🔧'),
'active': filterset_name == current_filter
})
return jsonify({'filters': filters})
except Exception as e:
logger.error(f"Error getting filters: {e}")
return jsonify({'error': 'Failed to get filters'}), 500
@app.route('/bookmarks')
@login_required
def bookmarks():
"""Bookmarks page"""
return render_template('bookmarks.html', user=current_user)
def build_comment_tree(comments):
"""Build a hierarchical comment tree from flat comment list"""
# Create lookup dict by UUID
comment_dict = {c['uuid']: {**c, 'replies': []} for c in comments}
# Build tree structure
root_comments = []
for comment in comments:
parent_uuid = comment.get('parent_comment_uuid')
if parent_uuid and parent_uuid in comment_dict:
# Add as reply to parent
comment_dict[parent_uuid]['replies'].append(comment_dict[comment['uuid']])
else:
# Top-level comment
root_comments.append(comment_dict[comment['uuid']])
# Sort at each level by timestamp
def sort_tree(comments_list):
comments_list.sort(key=lambda x: x.get('timestamp', 0))
for comment in comments_list:
if comment.get('replies'):
sort_tree(comment['replies'])
sort_tree(root_comments)
return root_comments
@app.route('/post/')
def post_detail(post_id):
"""Serve individual post detail page with modern theme"""
try:
# Load platform configuration
platform_config = load_platform_config()
# Use cached data for better performance
cached_posts, cached_comments = _load_posts_cache()
# Get post data from cache
post_data = cached_posts.get(post_id)
if not post_data:
return render_template('404.html'), 404
# Add source display name
post_data['source_display'] = get_display_name_for_source(
post_data.get('platform', ''),
post_data.get('source', ''),
platform_config
)
# Get comments from cache
comments_flat = cached_comments.get(post_id, [])
logger.info(f"Loading post {post_id}: found {len(comments_flat)} comments")
# Build comment tree
comments = build_comment_tree(comments_flat)
# Load user settings if authenticated
user_settings = {}
if current_user.is_authenticated:
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
return render_template('post_detail.html', post=post_data, comments=comments, user_settings=user_settings)
except Exception as e:
print(f"Error loading post {post_id}: {e}")
return render_template('404.html'), 404
@app.route('/themes/')
def serve_theme(filename):
"""Serve theme files (CSS, JS)"""
# Validate filename to prevent directory traversal
if not _is_safe_path(filename) or '..' in filename:
logger.warning(f"Unsafe theme file requested: {filename}")
abort(404)
return send_from_directory('themes', filename)
@app.route('/logo.png')
def serve_logo():
"""Serve configurable logo"""
logo_path = app.config['LOGO_PATH']
# If it's just a filename, serve from current directory
if '/' not in logo_path:
return send_from_directory('.', logo_path)
else:
# If it's a full path, split directory and filename
directory = os.path.dirname(logo_path)
filename = os.path.basename(logo_path)
return send_from_directory(directory, filename)
@app.route('/static/')
def serve_static(filename):
"""Serve static files (avatars, etc.)"""
# Validate filename to prevent directory traversal
if not _is_safe_path(filename) or '..' in filename:
logger.warning(f"Unsafe static file requested: {filename}")
abort(404)
return send_from_directory('static', filename)
# ============================================================
# AUTHENTICATION ROUTES
# ============================================================
@app.route('/login', methods=['GET', 'POST'])
def login():
"""Login page"""
if current_user.is_authenticated:
return redirect(url_for('index'))
# Check if Auth0 is configured
auth0_configured = bool(app.config.get('AUTH0_DOMAIN') and app.config.get('AUTH0_CLIENT_ID'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
remember = request.form.get('remember', False) == 'on'
if not user_service:
flash('User service not available', 'error')
return render_template('login.html', auth0_configured=auth0_configured)
user = user_service.authenticate(username, password)
if user:
login_user(user, remember=remember)
flash(f'Welcome back, {user.username}!', 'success')
# Redirect to next page or home
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('index'))
else:
flash('Invalid username or password', 'error')
return render_template('login.html', auth0_configured=auth0_configured)
@app.route('/password-reset-request', methods=['GET', 'POST'])
def password_reset_request():
"""Request a password reset"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
email = request.form.get('email', '').strip().lower()
if not email:
flash('Please enter your email address', 'error')
return render_template('password_reset_request.html')
# Find user by email
user = User.query.filter_by(email=email).first()
# Always show success message for security (don't reveal if email exists)
flash('If an account exists with that email, a password reset link has been sent.', 'success')
if user and user.password_hash: # Only send reset if user has a password (not OAuth only)
# Generate reset token
token = user.generate_reset_token()
# Build reset URL
reset_url = url_for('password_reset', token=token, _external=True)
# Log the reset URL (in production, this would be emailed)
logger.info(f"Password reset requested for {email}. Reset URL: {reset_url}")
# For now, also flash it for development (remove in production)
flash(f'Reset link (development only): {reset_url}', 'info')
return redirect(url_for('login'))
return render_template('password_reset_request.html')
@app.route('/password-reset/', methods=['GET', 'POST'])
def password_reset(token):
"""Reset password with token"""
if current_user.is_authenticated:
return redirect(url_for('index'))
# Find user by token
user = User.query.filter_by(reset_token=token).first()
if not user or not user.verify_reset_token(token):
flash('Invalid or expired reset token', 'error')
return redirect(url_for('login'))
if request.method == 'POST':
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
if not password or len(password) < 6:
flash('Password must be at least 6 characters', 'error')
return render_template('password_reset.html')
if password != confirm_password:
flash('Passwords do not match', 'error')
return render_template('password_reset.html')
# Set new password
user.set_password(password)
user.clear_reset_token()
flash('Your password has been reset successfully. You can now log in.', 'success')
return redirect(url_for('login'))
return render_template('password_reset.html')
# Auth0 Routes
@app.route('/auth0/login')
def auth0_login():
"""Redirect to Auth0 for authentication"""
# Check if Auth0 is configured
if not app.config.get('AUTH0_DOMAIN') or not app.config.get('AUTH0_CLIENT_ID'):
flash('Auth0 authentication is not configured. Please use email/password login or contact the administrator.', 'error')
return redirect(url_for('login'))
try:
redirect_uri = url_for('auth0_callback', _external=True)
return auth0.authorize_redirect(redirect_uri)
except Exception as e:
logger.error(f"Auth0 login error: {e}")
flash('Auth0 authentication failed. Please use email/password login.', 'error')
return redirect(url_for('login'))
@app.route('/auth0/callback')
def auth0_callback():
"""Handle Auth0 callback and create/login user"""
try:
# Get the access token from Auth0
token = auth0.authorize_access_token()
# Get user info from Auth0
user_info = token.get('userinfo')
if not user_info:
user_info = auth0.parse_id_token(token)
# Extract user details
auth0_id = user_info.get('sub')
email = user_info.get('email')
username = user_info.get('nickname') or user_info.get('preferred_username') or email.split('@')[0]
if not auth0_id or not email:
flash('Unable to get user information from Auth0', 'error')
return redirect(url_for('login'))
# Check if user exists with this Auth0 ID
user = user_service.get_user_by_auth0_id(auth0_id)
if not user:
# Check if user exists with this email (for account linking)
existing_user = user_service.get_user_by_email(email)
if existing_user:
# Link existing account to Auth0
user_service.link_auth0_account(existing_user.id, auth0_id)
user = existing_user
flash(f'Account linked successfully! Welcome back, {user.username}!', 'success')
else:
# Create new user
# Generate unique username if needed
base_username = username[:MAX_USERNAME_LENGTH-3] # Leave room for suffix
unique_username = base_username
counter = 1
while user_service.username_exists(unique_username):
unique_username = f"{base_username}_{counter}"
counter += 1
user_id = user_service.create_user(
username=unique_username,
email=email,
password=None, # No password for OAuth users
is_admin=False,
auth0_id=auth0_id
)
if user_id:
user = user_service.get_user_by_id(user_id)
flash(f'Account created successfully! Welcome, {user.username}!', 'success')
else:
flash('Failed to create user account', 'error')
return redirect(url_for('login'))
else:
flash(f'Welcome back, {user.username}!', 'success')
# Log in the user
if user:
login_user(user, remember=True)
# Store Auth0 info in session for future use
session['auth0_user_info'] = user_info
# Redirect to next page or home
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('index'))
except Exception as e:
logger.error(f"Auth0 callback error: {e}")
flash('Authentication failed. Please try again.', 'error')
return redirect(url_for('login'))
@app.route('/auth0/logout')
@login_required
def auth0_logout():
"""Logout from Auth0 and local session"""
# Clear session
session.clear()
logout_user()
# Build Auth0 logout URL
domain = app.config['AUTH0_DOMAIN']
client_id = app.config['AUTH0_CLIENT_ID']
return_to = url_for('index', _external=True)
logout_url = f'https://{domain}/v2/logout?' + urlencode({
'returnTo': return_to,
'client_id': client_id
}, quote_via=quote_plus)
return redirect(logout_url)
@app.route('/admin-setup', methods=['GET', 'POST'])
def admin_setup():
"""Create first admin user"""
# Check if users already exist
try:
user_count = User.query.count()
if user_count > 0:
flash('Admin user already exists.', 'info')
return redirect(url_for('login'))
except Exception as e:
logger.warning(f"Database error checking existing users: {e}")
pass
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
password_confirm = request.form.get('password_confirm')
# Validation
if not username or not email or not password:
flash('All fields are required', 'error')
return render_template('admin_setup.html')
if password != password_confirm:
flash('Passwords do not match', 'error')
return render_template('admin_setup.html')
if len(password) < MIN_PASSWORD_LENGTH:
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
return render_template('admin_setup.html')
# Create admin user
user_id = user_service.create_user(username, email, password, is_admin=True)
if user_id:
flash('Admin account created successfully! Please log in.', 'success')
return redirect(url_for('login'))
else:
flash('Error creating admin account. Please try again.', 'error')
return render_template('admin_setup.html')
@app.route('/signup', methods=['GET', 'POST'])
def signup():
"""Signup page"""
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
password_confirm = request.form.get('password_confirm')
if not user_service:
flash('User service not available', 'error')
return render_template('signup.html')
# Validation
if not username or not email or not password:
flash('All fields are required', 'error')
return render_template('signup.html')
if password != password_confirm:
flash('Passwords do not match', 'error')
return render_template('signup.html')
if len(password) < MIN_PASSWORD_LENGTH:
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
return render_template('signup.html')
if user_service.username_exists(username):
flash('Username already taken', 'error')
return render_template('signup.html')
if user_service.email_exists(email):
flash('Email already registered', 'error')
return render_template('signup.html')
# Create user
user_id = user_service.create_user(username, email, password)
if user_id:
flash('Account created successfully! Please log in.', 'success')
return redirect(url_for('login'))
else:
flash('Error creating account. Please try again.', 'error')
return render_template('signup.html')
@app.route('/logout')
@login_required
def logout():
"""Logout current user"""
logout_user()
flash('You have been logged out.', 'info')
return redirect(url_for('index'))
@app.route('/settings')
@login_required
def settings():
"""Main settings page"""
# Load user settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Load available filter sets
try:
with open('filtersets.json', 'r') as f:
filter_sets = json.load(f)
except:
filter_sets = {}
return render_template('settings.html',
user=current_user,
user_settings=user_settings,
filter_sets=filter_sets)
@app.route('/settings/profile', methods=['GET', 'POST'])
@login_required
def settings_profile():
"""Profile settings page"""
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
default_avatar = request.form.get('default_avatar')
# Validation
if not username or not email:
flash('Username and email are required', 'error')
return render_template('settings_profile.html', user=current_user)
# Check if username is taken by another user
if username != current_user.username and user_service.username_exists(username):
flash('Username already taken', 'error')
return render_template('settings_profile.html', user=current_user)
# Check if email is taken by another user
if email != current_user.email and user_service.email_exists(email):
flash('Email already registered', 'error')
return render_template('settings_profile.html', user=current_user)
# Update user
current_user.username = username
current_user.email = email
# Handle default avatar selection
if default_avatar and default_avatar.startswith('default_'):
current_user.profile_picture_url = f"/static/default-avatars/{default_avatar}.png"
db.session.commit()
flash('Profile updated successfully', 'success')
return redirect(url_for('settings'))
# Available default avatars
default_avatars = [
{'id': 'default_1', 'name': 'Gradient Blue', 'bg': 'linear-gradient(135deg, #667eea 0%, #764ba2 100%)'},
{'id': 'default_2', 'name': 'Gradient Green', 'bg': 'linear-gradient(135deg, #4facfe 0%, #00f2fe 100%)'},
{'id': 'default_3', 'name': 'Gradient Orange', 'bg': 'linear-gradient(135deg, #fa709a 0%, #fee140 100%)'},
{'id': 'default_4', 'name': 'Gradient Purple', 'bg': 'linear-gradient(135deg, #a8edea 0%, #fed6e3 100%)'},
{'id': 'default_5', 'name': 'Brand Colors', 'bg': 'linear-gradient(135deg, #4db6ac 0%, #26a69a 100%)'},
{'id': 'default_6', 'name': 'Sunset', 'bg': 'linear-gradient(135deg, #ff7e5f 0%, #feb47b 100%)'},
]
return render_template('settings_profile.html', user=current_user, default_avatars=default_avatars)
@app.route('/settings/communities', methods=['GET', 'POST'])
@login_required
def settings_communities():
"""Community/source selection settings"""
if request.method == 'POST':
# Get selected communities
selected_communities = request.form.getlist('communities')
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Update communities
user_settings['communities'] = selected_communities
# Save settings
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Community preferences updated', 'success')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
selected_communities = user_settings.get('communities', [])
except:
selected_communities = []
# Get available communities from platform config and collection targets
available_communities = []
# Load platform configuration
platform_config = load_platform_config()
# Get enabled communities from collection_targets (what's actually being crawled)
enabled_communities = set()
for target in platform_config.get('collection_targets', []):
enabled_communities.add((target['platform'], target['community']))
# Build community list from platform config for communities that are enabled
for platform_name, platform_info in platform_config.get('platforms', {}).items():
for community_info in platform_info.get('communities', []):
# Only include communities that are in collection_targets
if (platform_name, community_info['id']) in enabled_communities:
available_communities.append({
'id': community_info['id'],
'name': community_info['name'],
'display_name': community_info.get('display_name', community_info['name']),
'platform': platform_name,
'icon': community_info.get('icon', platform_info.get('icon', '📄')),
'description': community_info.get('description', '')
})
return render_template('settings_communities.html',
user=current_user,
available_communities=available_communities,
selected_communities=selected_communities)
@app.route('/settings/filters', methods=['GET', 'POST'])
@login_required
def settings_filters():
"""Filter settings page"""
if request.method == 'POST':
selected_filter = request.form.get('filter_set', 'no_filter')
# Load and validate current settings
user_settings = _validate_user_settings(current_user.settings)
# Validate new filter setting
if _is_safe_filterset(selected_filter):
user_settings['filter_set'] = selected_filter
else:
flash('Invalid filter selection', 'error')
return redirect(url_for('settings'))
# Save validated settings
try:
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Filter settings updated successfully', 'success')
except Exception as e:
db.session.rollback()
logger.error(f"Error saving filter settings for user {current_user.id}: {e}")
flash('Error saving settings', 'error')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
current_filter = user_settings.get('filter_set', 'no_filter')
# Load available filter sets from FilterEngine as a dictionary
filter_sets = {}
for filterset_name in filter_engine.get_available_filtersets():
filter_sets[filterset_name] = filter_engine.config.get_filterset(filterset_name)
return render_template('settings_filters.html',
user=current_user,
filter_sets=filter_sets,
current_filter=current_filter)
@app.route('/settings/experience', methods=['GET', 'POST'])
@login_required
def settings_experience():
"""Experience and behavioral settings page - opt-in addictive features"""
if request.method == 'POST':
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
# Get experience settings with defaults (all opt-in, so default to False)
user_settings['experience'] = {
'infinite_scroll': request.form.get('infinite_scroll') == 'on',
'auto_refresh': request.form.get('auto_refresh') == 'on',
'push_notifications': request.form.get('push_notifications') == 'on',
'dark_patterns_opt_in': request.form.get('dark_patterns_opt_in') == 'on',
'time_filter_enabled': request.form.get('time_filter_enabled') == 'on',
'time_filter_days': int(request.form.get('time_filter_days', 7))
}
# Save settings
current_user.settings = json.dumps(user_settings)
db.session.commit()
flash('Experience settings updated successfully', 'success')
return redirect(url_for('settings'))
# Load current settings
try:
user_settings = json.loads(current_user.settings) if current_user.settings else {}
except:
user_settings = {}
experience_settings = user_settings.get('experience', {
'infinite_scroll': False,
'auto_refresh': False,
'push_notifications': False,
'dark_patterns_opt_in': False,
'time_filter_enabled': False,
'time_filter_days': 7
})
return render_template('settings_experience.html',
user=current_user,
experience_settings=experience_settings)
@app.route('/upload-avatar', methods=['POST'])
@login_required
def upload_avatar():
"""Upload profile picture"""
if 'avatar' not in request.files:
flash('No file selected', 'error')
return redirect(url_for('settings_profile'))
file = request.files['avatar']
if file.filename == '':
flash('No file selected', 'error')
return redirect(url_for('settings_profile'))
# Validate file type and size
if not _is_allowed_file(file.filename):
flash('Invalid file type. Please upload PNG, JPG, or GIF', 'error')
return redirect(url_for('settings_profile'))
# Check file size (Flask's MAX_CONTENT_LENGTH handles this too, but double-check)
if hasattr(file, 'content_length') and file.content_length > app.config['MAX_CONTENT_LENGTH']:
flash('File too large. Maximum size is 16MB', 'error')
return redirect(url_for('settings_profile'))
# Validate and secure filename
filename = secure_filename(file.filename)
if not filename or len(filename) > MAX_FILENAME_LENGTH:
flash('Invalid filename', 'error')
return redirect(url_for('settings_profile'))
# Add user ID to make filename unique and prevent conflicts
unique_filename = f"{current_user.id}_{filename}"
# Ensure upload directory exists and is secure
upload_dir = os.path.abspath(UPLOAD_FOLDER)
os.makedirs(upload_dir, exist_ok=True)
upload_path = os.path.join(upload_dir, unique_filename)
# Final security check - ensure path is within upload directory
if not os.path.abspath(upload_path).startswith(upload_dir):
logger.warning(f"Path traversal attempt in file upload: {upload_path}")
flash('Invalid file path', 'error')
return redirect(url_for('settings_profile'))
try:
file.save(upload_path)
logger.info(f"File uploaded successfully: {unique_filename} by user {current_user.id}")
except Exception as e:
logger.error(f"Error saving uploaded file: {e}")
flash('Error saving file', 'error')
return redirect(url_for('settings_profile'))
# Update user profile
current_user.profile_picture_url = f"/static/avatars/{unique_filename}"
db.session.commit()
flash('Profile picture updated successfully', 'success')
return redirect(url_for('settings_profile'))
@app.route('/profile')
@login_required
def profile():
"""User profile page"""
return render_template('profile.html', user=current_user)
# ============================================================
# ADMIN ROUTES
# ============================================================
@app.route('/admin')
@login_required
def admin_panel():
"""Admin panel - user management"""
if not current_user.is_admin:
flash('Access denied. Admin privileges required.', 'error')
return redirect(url_for('index'))
if not user_service:
flash('User service not available', 'error')
return redirect(url_for('index'))
users = user_service.get_all_users()
return render_template('admin.html', users=users)
@app.route('/admin/user//delete', methods=['POST'])
@login_required
def admin_delete_user(user_id):
"""Delete user (admin only)"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
# Prevent self-deletion
if current_user.id == user_id:
flash('You cannot delete your own account!', 'error')
return redirect(url_for('admin_panel'))
user = user_service.get_user_by_id(user_id)
if user:
username = user.username
if user_service.delete_user(user_id):
flash(f'User {username} has been deleted.', 'success')
logger.info(f"Admin {current_user.id} deleted user {username} ({user_id})")
else:
flash('Error deleting user', 'error')
logger.error(f"Failed to delete user {user_id}")
else:
flash('User not found', 'error')
return redirect(url_for('admin_panel'))
@app.route('/admin/user//toggle-admin', methods=['POST'])
@login_required
def admin_toggle_admin(user_id):
"""Toggle user admin status"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
target_user = user_service.get_user_by_id(user_id)
if target_user:
new_status = not target_user.is_admin # Toggle admin status
user_service.update_user_admin_status(user_id, new_status)
flash('Admin status updated', 'success')
else:
flash('User not found', 'error')
return redirect(url_for('admin_panel'))
# This route is duplicate - removed in favor of the UUID-based route above
# This route is duplicate - removed in favor of the UUID-based route above
@app.route('/admin/regenerate_content', methods=['POST'])
@login_required
def admin_regenerate_content():
"""Regenerate all HTML content"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
import subprocess
import shlex
# Secure subprocess execution with absolute paths and validation
script_path = os.path.abspath('generate_html.py')
if not os.path.exists(script_path):
flash('Content generation script not found', 'error')
return redirect(url_for('admin_panel'))
# Use absolute python path and validate arguments
python_exe = os.path.abspath(os.sys.executable)
cmd = [python_exe, script_path, '--filterset', 'no_filter', '--theme', 'vanilla-js']
# Execute with timeout and security restrictions
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=os.path.abspath('.'),
timeout=300, # 5 minute timeout
check=False
)
if result.returncode == 0:
flash('Content regenerated successfully', 'success')
logger.info(f"Content regenerated by admin user {current_user.id}")
# Invalidate cache since content was regenerated
_invalidate_cache()
else:
flash('Error regenerating content', 'error')
logger.error(f"Content regeneration failed: {result.stderr}")
except subprocess.TimeoutExpired:
flash('Content regeneration timed out', 'error')
logger.error("Content regeneration timed out")
except Exception as e:
flash(f'Error regenerating content: {str(e)}', 'error')
logger.error(f"Content regeneration error: {e}")
return redirect(url_for('admin_panel'))
@app.route('/admin/clear_cache', methods=['POST'])
@login_required
def admin_clear_cache():
"""Clear application cache"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
# Clear any cache directories or temp files
import shutil
import os
cache_dirs = ['cache', 'temp']
for cache_dir in cache_dirs:
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
# Clear application cache
_invalidate_cache()
flash('Cache cleared successfully', 'success')
logger.info(f"Cache cleared by admin user {current_user.id}")
except Exception as e:
flash(f'Error clearing cache: {str(e)}', 'error')
logger.error(f"Cache clearing error: {e}")
return redirect(url_for('admin_panel'))
@app.route('/admin/backup_data', methods=['POST'])
@login_required
def admin_backup_data():
"""Create backup of application data"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('admin_panel'))
try:
import shutil
import os
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f'balanceboard_backup_{timestamp}'
# Create backup directory
backup_dir = f'backups/{backup_name}'
os.makedirs(backup_dir, exist_ok=True)
# Copy important directories
dirs_to_backup = ['data', 'templates', 'themes', 'static']
for dir_name in dirs_to_backup:
if os.path.exists(dir_name):
shutil.copytree(dir_name, f'{backup_dir}/{dir_name}')
# Copy important files
files_to_backup = ['app.py', 'models.py', 'database.py', 'filtersets.json']
for file_name in files_to_backup:
if os.path.exists(file_name):
shutil.copy2(file_name, backup_dir)
flash(f'Backup created: {backup_name}', 'success')
except Exception as e:
flash(f'Error creating backup: {str(e)}', 'error')
return redirect(url_for('admin_panel'))
# ============================================================
# POLLING MANAGEMENT ROUTES
# ============================================================
@app.route('/admin/polling')
@login_required
def admin_polling():
"""Admin polling management page"""
if not current_user.is_admin:
flash('Access denied. Admin privileges required.', 'error')
return redirect(url_for('index'))
from models import PollSource, PollLog
from polling_service import polling_service
# Get all poll sources with recent logs
sources = PollSource.query.order_by(PollSource.platform, PollSource.display_name).all()
# Get scheduler status
scheduler_status = polling_service.get_status()
# Load platform config for available sources
platform_config = load_platform_config()
return render_template('admin_polling.html',
sources=sources,
scheduler_status=scheduler_status,
platform_config=platform_config)
@app.route('/admin/polling/add', methods=['POST'])
@login_required
def admin_polling_add():
"""Add a new poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
platform = request.form.get('platform')
source_id = request.form.get('source_id')
custom_source_id = request.form.get('custom_source_id')
display_name = request.form.get('display_name')
poll_interval = int(request.form.get('poll_interval', 60))
max_posts = int(request.form.get('max_posts', 100))
fetch_comments = request.form.get('fetch_comments', 'true') == 'true'
priority = request.form.get('priority', 'medium')
# Use custom source if provided, otherwise use dropdown
if custom_source_id and custom_source_id.strip():
source_id = custom_source_id.strip()
if not platform or not source_id or not display_name:
flash('Missing required fields', 'error')
return redirect(url_for('admin_polling'))
# Check if source already exists
existing = PollSource.query.filter_by(platform=platform, source_id=source_id).first()
if existing:
flash(f'Source {platform}:{source_id} already exists', 'warning')
return redirect(url_for('admin_polling'))
# Create new source (disabled by default)
source = PollSource(
platform=platform,
source_id=source_id,
display_name=display_name,
poll_interval_minutes=poll_interval,
max_posts=max_posts,
fetch_comments=fetch_comments,
priority=priority,
enabled=False,
created_by=current_user.id
)
db.session.add(source)
db.session.commit()
flash(f'Added polling source: {display_name}', 'success')
logger.info(f"Admin {current_user.id} added poll source {platform}:{source_id}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling//toggle', methods=['POST'])
@login_required
def admin_polling_toggle(source_id):
"""Toggle a poll source on/off"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
source.enabled = not source.enabled
db.session.commit()
status = 'enabled' if source.enabled else 'disabled'
flash(f'Polling {status} for {source.display_name}', 'success')
return redirect(url_for('admin_polling'))
@app.route('/admin/polling//update', methods=['POST'])
@login_required
def admin_polling_update(source_id):
"""Update poll source configuration"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Update all configurable fields
if request.form.get('poll_interval'):
source.poll_interval_minutes = int(request.form.get('poll_interval'))
if request.form.get('max_posts'):
source.max_posts = int(request.form.get('max_posts'))
if request.form.get('fetch_comments') is not None:
source.fetch_comments = request.form.get('fetch_comments') == 'true'
if request.form.get('priority'):
source.priority = request.form.get('priority')
if request.form.get('display_name'):
source.display_name = request.form.get('display_name')
db.session.commit()
flash(f'Updated settings for {source.display_name}', 'success')
return redirect(url_for('admin_polling'))
@app.route('/admin/polling//poll-now', methods=['POST'])
@login_required
def admin_polling_poll_now(source_id):
"""Manually trigger polling for a source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
from polling_service import polling_service
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Trigger polling in background
try:
polling_service.poll_now(source_id)
flash(f'Polling started for {source.display_name}', 'success')
except Exception as e:
flash(f'Error starting poll: {str(e)}', 'error')
logger.error(f"Error triggering poll for {source_id}: {e}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling//delete', methods=['POST'])
@login_required
def admin_polling_delete(source_id):
"""Delete a poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
display_name = source.display_name
db.session.delete(source)
db.session.commit()
flash(f'Deleted polling source: {display_name}', 'success')
logger.info(f"Admin {current_user.id} deleted poll source {source_id}")
return redirect(url_for('admin_polling'))
@app.route('/admin/polling//logs')
@login_required
def admin_polling_logs(source_id):
"""View logs for a specific poll source"""
if not current_user.is_admin:
flash('Access denied', 'error')
return redirect(url_for('index'))
from models import PollSource, PollLog
source = PollSource.query.get(source_id)
if not source:
flash('Source not found', 'error')
return redirect(url_for('admin_polling'))
# Get recent logs (limit to 50)
logs = source.logs.limit(50).all()
return render_template('admin_polling_logs.html',
source=source,
logs=logs)
# ============================================================
# TEMPLATE CONTEXT PROCESSORS
# ============================================================
@app.context_processor
def inject_app_config():
"""Inject app configuration into all templates"""
return {
'APP_NAME': app.config['APP_NAME']
}
# ============================================================
# ERROR HANDLERS
# ============================================================
@app.errorhandler(404)
def not_found(e):
"""404 page"""
return render_template('404.html'), 404
@app.errorhandler(500)
def server_error(e):
"""500 page"""
return render_template('500.html'), 500
# ============================================================
# INITIALIZATION
# ============================================================
if __name__ == '__main__':
print("✓ BalanceBoard starting...")
print("✓ Database: PostgreSQL with SQLAlchemy")
print("✓ Password hashing: bcrypt")
print("✓ Authentication: Flask-Login")
app.run(host='0.0.0.0', port=DEFAULT_PORT, debug=True)