This disables the community-based filtering in /api/posts to allow logged in users to see posts in their feed. The community selection may need further debugging as it appears users have selected communities that match no posts.
2089 lines
76 KiB
Python
2089 lines
76 KiB
Python
"""
|
|
BalanceBoard Web Application
|
|
Flask server with user authentication and content serving.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
from werkzeug.utils import secure_filename
|
|
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, abort, session, jsonify
|
|
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
|
|
from dotenv import load_dotenv
|
|
from functools import lru_cache
|
|
from collections import defaultdict
|
|
from authlib.integrations.flask_client import OAuth
|
|
from urllib.parse import quote_plus, urlencode
|
|
|
|
from database import init_db, db
|
|
from models import User, bcrypt
|
|
from user_service import UserService
|
|
import json
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('app.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Initialize Flask app
|
|
app = Flask(__name__,
|
|
static_folder='themes',
|
|
template_folder='templates')
|
|
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
|
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
|
|
app.config['ALLOW_ANONYMOUS_ACCESS'] = os.getenv('ALLOW_ANONYMOUS_ACCESS', 'true').lower() == 'true'
|
|
|
|
# Application branding configuration
|
|
app.config['APP_NAME'] = os.getenv('APP_NAME', 'BalanceBoard')
|
|
app.config['LOGO_PATH'] = os.getenv('LOGO_PATH', 'logo.png')
|
|
|
|
# Auth0 Configuration
|
|
app.config['AUTH0_DOMAIN'] = os.getenv('AUTH0_DOMAIN', '')
|
|
app.config['AUTH0_CLIENT_ID'] = os.getenv('AUTH0_CLIENT_ID', '')
|
|
app.config['AUTH0_CLIENT_SECRET'] = os.getenv('AUTH0_CLIENT_SECRET', '')
|
|
app.config['AUTH0_AUDIENCE'] = os.getenv('AUTH0_AUDIENCE', '')
|
|
|
|
# Configuration constants
|
|
# Note: ALLOWED_FILTERSETS will be dynamically loaded from filter_engine
|
|
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
|
|
UPLOAD_FOLDER = 'static/avatars'
|
|
MAX_FILENAME_LENGTH = 100
|
|
DEFAULT_PORT = 5021
|
|
DEFAULT_PAGE_SIZE = 20
|
|
MIN_PASSWORD_LENGTH = 8
|
|
MAX_USERNAME_LENGTH = 80
|
|
MAX_EMAIL_LENGTH = 120
|
|
MAX_COMMUNITY_NAME_LENGTH = 100
|
|
|
|
# Initialize database
|
|
init_db(app)
|
|
|
|
# Initialize bcrypt
|
|
bcrypt.init_app(app)
|
|
|
|
# Initialize Flask-Login
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
login_manager.login_view = 'login'
|
|
login_manager.login_message = 'Please log in to access this page.'
|
|
|
|
# Initialize user service
|
|
user_service = UserService()
|
|
|
|
# Initialize polling service
|
|
from polling_service import polling_service
|
|
polling_service.init_app(app)
|
|
polling_service.start()
|
|
|
|
# Initialize filter engine
|
|
from filter_pipeline import FilterEngine
|
|
filter_engine = FilterEngine.get_instance()
|
|
logger.info(f"FilterEngine initialized with {len(filter_engine.get_available_filtersets())} filtersets")
|
|
|
|
# Initialize OAuth for Auth0
|
|
oauth = OAuth(app)
|
|
auth0 = oauth.register(
|
|
'auth0',
|
|
client_id=app.config['AUTH0_CLIENT_ID'],
|
|
client_secret=app.config['AUTH0_CLIENT_SECRET'],
|
|
server_metadata_url=f'https://{app.config["AUTH0_DOMAIN"]}/.well-known/openid_configuration',
|
|
client_kwargs={
|
|
'scope': 'openid profile email',
|
|
}
|
|
)
|
|
|
|
# Cache for posts and comments - improves performance
|
|
post_cache = {}
|
|
comment_cache = defaultdict(list)
|
|
cache_timestamp = 0
|
|
CACHE_DURATION = 300 # 5 minutes
|
|
|
|
# Security helper functions
|
|
def _is_safe_filterset(filterset):
|
|
"""Validate filterset name for security"""
|
|
if not filterset or not isinstance(filterset, str):
|
|
return False
|
|
# Check against available filtersets from filter_engine
|
|
allowed_filtersets = set(filter_engine.get_available_filtersets())
|
|
return filterset in allowed_filtersets and re.match(r'^[a-zA-Z0-9_-]+$', filterset)
|
|
|
|
def _is_safe_path(path):
|
|
"""Validate file path for security"""
|
|
if not path or not isinstance(path, str):
|
|
return False
|
|
# Check for directory traversal attempts
|
|
if '..' in path or path.startswith('/') or '\\' in path:
|
|
return False
|
|
# Only allow alphanumeric, dots, hyphens, underscores, and forward slashes
|
|
return re.match(r'^[a-zA-Z0-9._/-]+$', path) is not None
|
|
|
|
def _is_allowed_file(filename):
|
|
"""Check if file extension is allowed"""
|
|
return '.' in filename and \
|
|
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
def _load_posts_cache():
|
|
"""Load and cache posts data for better performance"""
|
|
global post_cache, comment_cache, cache_timestamp
|
|
|
|
current_time = time.time()
|
|
if current_time - cache_timestamp < CACHE_DURATION and post_cache:
|
|
return post_cache, comment_cache
|
|
|
|
# Clear existing cache
|
|
post_cache.clear()
|
|
comment_cache.clear()
|
|
|
|
posts_dir = Path('data/posts')
|
|
comments_dir = Path('data/comments')
|
|
|
|
# Load all posts
|
|
if posts_dir.exists():
|
|
for post_file in posts_dir.glob('*.json'):
|
|
try:
|
|
with open(post_file, 'r') as f:
|
|
post_data = json.load(f)
|
|
post_uuid = post_data.get('uuid')
|
|
if post_uuid:
|
|
post_cache[post_uuid] = post_data
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
logger.debug(f"Error reading post file {post_file}: {e}")
|
|
continue
|
|
|
|
# Load all comments and group by post UUID
|
|
if comments_dir.exists():
|
|
for comment_file in comments_dir.glob('*.json'):
|
|
try:
|
|
with open(comment_file, 'r') as f:
|
|
comment_data = json.load(f)
|
|
post_uuid = comment_data.get('post_uuid')
|
|
if post_uuid:
|
|
comment_cache[post_uuid].append(comment_data)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
logger.debug(f"Error reading comment file {comment_file}: {e}")
|
|
continue
|
|
|
|
cache_timestamp = current_time
|
|
logger.info(f"Cache refreshed: {len(post_cache)} posts, {len(comment_cache)} comment groups")
|
|
return post_cache, comment_cache
|
|
|
|
def _invalidate_cache():
|
|
"""Invalidate the cache to force refresh"""
|
|
global cache_timestamp
|
|
cache_timestamp = 0
|
|
|
|
def _validate_user_settings(settings_str):
|
|
"""Validate and sanitize user settings JSON"""
|
|
try:
|
|
if not settings_str:
|
|
return {}
|
|
|
|
settings = json.loads(settings_str)
|
|
if not isinstance(settings, dict):
|
|
logger.warning("User settings must be a JSON object")
|
|
return {}
|
|
|
|
# Validate specific fields
|
|
validated = {}
|
|
|
|
# Filter set validation
|
|
if 'filter_set' in settings:
|
|
filter_set = settings['filter_set']
|
|
if isinstance(filter_set, str) and _is_safe_filterset(filter_set):
|
|
validated['filter_set'] = filter_set
|
|
|
|
# Communities validation
|
|
if 'communities' in settings:
|
|
communities = settings['communities']
|
|
if isinstance(communities, list):
|
|
# Validate each community name
|
|
safe_communities = []
|
|
for community in communities:
|
|
if isinstance(community, str) and len(community) <= MAX_COMMUNITY_NAME_LENGTH and re.match(r'^[a-zA-Z0-9_-]+$', community):
|
|
safe_communities.append(community)
|
|
validated['communities'] = safe_communities
|
|
|
|
# Experience settings validation
|
|
if 'experience' in settings:
|
|
exp = settings['experience']
|
|
if isinstance(exp, dict):
|
|
safe_exp = {}
|
|
bool_fields = ['infinite_scroll', 'auto_refresh', 'push_notifications', 'dark_patterns_opt_in', 'time_filter_enabled']
|
|
for field in bool_fields:
|
|
if field in exp and isinstance(exp[field], bool):
|
|
safe_exp[field] = exp[field]
|
|
|
|
# Handle time_filter_days as integer
|
|
if 'time_filter_days' in exp and isinstance(exp['time_filter_days'], int) and exp['time_filter_days'] > 0:
|
|
safe_exp['time_filter_days'] = exp['time_filter_days']
|
|
|
|
validated['experience'] = safe_exp
|
|
|
|
return validated
|
|
|
|
except (json.JSONDecodeError, TypeError) as e:
|
|
logger.warning(f"Invalid user settings JSON: {e}")
|
|
return {}
|
|
|
|
# Add custom Jinja filters
|
|
@app.template_filter('nl2br')
|
|
def nl2br_filter(text):
|
|
"""Convert newlines to <br> tags"""
|
|
if not text:
|
|
return text
|
|
return text.replace('\n', '<br>\n')
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
"""Load user by ID for Flask-Login"""
|
|
return user_service.get_user_by_id(user_id)
|
|
|
|
|
|
# ============================================================
|
|
# STATIC CONTENT ROUTES
|
|
# ============================================================
|
|
|
|
@app.before_request
|
|
def check_first_user():
|
|
"""Check if any users exist, redirect to admin creation if not"""
|
|
# Skip for static files and auth routes
|
|
if request.endpoint and (
|
|
request.endpoint.startswith('static') or
|
|
request.endpoint in ['login', 'signup', 'admin_setup', 'serve_theme', 'serve_logo']
|
|
):
|
|
return
|
|
|
|
# Skip if user is already authenticated
|
|
if current_user.is_authenticated:
|
|
return
|
|
|
|
# Check if any users exist
|
|
try:
|
|
user_count = User.query.count()
|
|
if user_count == 0:
|
|
return redirect(url_for('admin_setup'))
|
|
except Exception as e:
|
|
# If database is not ready, skip check
|
|
logger.warning(f"Database not ready for user count check: {e}")
|
|
pass
|
|
|
|
|
|
def calculate_quick_stats():
|
|
"""Calculate quick stats for dashboard"""
|
|
from datetime import datetime, timedelta
|
|
|
|
cached_posts, _ = _load_posts_cache()
|
|
|
|
# Calculate posts from today (last 24 hours)
|
|
now = datetime.utcnow()
|
|
today_start = now - timedelta(hours=24)
|
|
today_timestamp = today_start.timestamp()
|
|
|
|
posts_today = sum(1 for post in cached_posts.values()
|
|
if post.get('timestamp', 0) >= today_timestamp)
|
|
|
|
return {
|
|
'posts_today': posts_today,
|
|
'total_posts': len(cached_posts)
|
|
}
|
|
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Serve the main feed page"""
|
|
# Calculate stats
|
|
quick_stats = calculate_quick_stats()
|
|
|
|
if current_user.is_authenticated:
|
|
# Load user settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except (json.JSONDecodeError, TypeError) as e:
|
|
logger.warning(f"Invalid user settings JSON for user {current_user.id}: {e}")
|
|
user_settings = {}
|
|
|
|
return render_template('dashboard.html', user_settings=user_settings, quick_stats=quick_stats)
|
|
else:
|
|
# Check if anonymous access is allowed
|
|
if app.config.get('ALLOW_ANONYMOUS_ACCESS', False):
|
|
# Anonymous mode - allow browsing with default settings
|
|
user_settings = {
|
|
'filter_set': 'no_filter',
|
|
'communities': [],
|
|
'experience': {
|
|
'infinite_scroll': False,
|
|
'auto_refresh': False,
|
|
'push_notifications': False,
|
|
'dark_patterns_opt_in': False,
|
|
'time_filter_enabled': False,
|
|
'time_filter_days': 7
|
|
}
|
|
}
|
|
return render_template('dashboard.html', user_settings=user_settings, anonymous=True, quick_stats=quick_stats)
|
|
else:
|
|
# Redirect non-authenticated users to login
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
@app.route('/feed/<filterset>')
|
|
def feed_content(filterset='no_filter'):
|
|
"""Serve filtered feed content"""
|
|
# Validate filterset to prevent directory traversal
|
|
if not _is_safe_filterset(filterset):
|
|
logger.warning(f"Invalid filterset requested: {filterset}")
|
|
abort(404)
|
|
|
|
# Additional path validation
|
|
safe_path = os.path.normpath(f'active_html/{filterset}/index.html')
|
|
if not safe_path.startswith('active_html/'):
|
|
logger.warning(f"Path traversal attempt detected: {filterset}")
|
|
abort(404)
|
|
|
|
return send_from_directory(f'active_html/{filterset}', 'index.html')
|
|
|
|
def load_platform_config():
|
|
"""Load platform configuration"""
|
|
try:
|
|
with open('platform_config.json', 'r') as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
|
|
logger.warning(f"Could not load platform config: {e}")
|
|
return {"platforms": {}, "collection_targets": []}
|
|
|
|
|
|
def get_display_name_for_source(platform, source, platform_config):
|
|
"""Get proper display name for a source based on platform"""
|
|
if not platform_config or 'platforms' not in platform_config:
|
|
return source
|
|
|
|
platform_info = platform_config['platforms'].get(platform, {})
|
|
|
|
# For platforms with communities, find the community info
|
|
if platform_info.get('supports_communities'):
|
|
for community in platform_info.get('communities', []):
|
|
if community['id'] == source:
|
|
return community['display_name']
|
|
# Fallback to prefix + source for Reddit-like platforms
|
|
prefix = platform_info.get('prefix', '')
|
|
return f"{prefix}{source}" if source else platform_info.get('name', platform)
|
|
else:
|
|
# For platforms without communities, use the platform name
|
|
return platform_info.get('name', platform)
|
|
|
|
|
|
@app.route('/api/posts')
|
|
def api_posts():
|
|
"""API endpoint to get posts data with pagination and filtering"""
|
|
try:
|
|
# Load platform configuration
|
|
platform_config = load_platform_config()
|
|
|
|
# Get query parameters
|
|
page = int(request.args.get('page', 1))
|
|
per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE))
|
|
community = request.args.get('community', '')
|
|
platform = request.args.get('platform', '')
|
|
search_query = request.args.get('q', '').lower().strip()
|
|
filter_override = request.args.get('filter', '')
|
|
|
|
# Get user's filterset preference, community selections, and time filter
|
|
filterset_name = 'no_filter'
|
|
user_communities = []
|
|
time_filter_enabled = False
|
|
time_filter_days = 7
|
|
if current_user.is_authenticated:
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
filterset_name = user_settings.get('filter_set', 'no_filter')
|
|
user_communities = user_settings.get('communities', [])
|
|
|
|
experience_settings = user_settings.get('experience', {})
|
|
time_filter_enabled = experience_settings.get('time_filter_enabled', False)
|
|
time_filter_days = experience_settings.get('time_filter_days', 7)
|
|
except:
|
|
filterset_name = 'no_filter'
|
|
user_communities = []
|
|
time_filter_enabled = False
|
|
time_filter_days = 7
|
|
|
|
# Override filterset if specified in request (for sidebar filter switching)
|
|
if filter_override and _is_safe_filterset(filter_override):
|
|
filterset_name = filter_override
|
|
|
|
# Use cached data for better performance
|
|
cached_posts, cached_comments = _load_posts_cache()
|
|
|
|
# Calculate time filter cutoff if enabled
|
|
time_cutoff = None
|
|
if time_filter_enabled:
|
|
from datetime import datetime, timedelta
|
|
cutoff_date = datetime.utcnow() - timedelta(days=time_filter_days)
|
|
time_cutoff = cutoff_date.timestamp()
|
|
|
|
# Collect raw posts for filtering
|
|
raw_posts = []
|
|
for post_uuid, post_data in cached_posts.items():
|
|
# Apply time filter first if enabled
|
|
if time_filter_enabled and time_cutoff:
|
|
post_timestamp = post_data.get('timestamp', 0)
|
|
if post_timestamp < time_cutoff:
|
|
continue
|
|
# Apply community filter (before filterset)
|
|
if community and post_data.get('source', '').lower() != community.lower():
|
|
continue
|
|
|
|
# Apply platform filter (before filterset)
|
|
if platform and post_data.get('platform', '').lower() != platform.lower():
|
|
continue
|
|
|
|
# Apply user's community preferences (before filterset)
|
|
# Temporarily disabled to fix urgent feed issue for logged in users
|
|
# if user_communities:
|
|
# post_source = post_data.get('source', '').lower()
|
|
# post_platform = post_data.get('platform', '').lower()
|
|
#
|
|
# # Check if this post matches any of the user's selected communities
|
|
# matches_community = False
|
|
# for selected_community in user_communities:
|
|
# selected_community = selected_community.lower()
|
|
# # Match by exact source name or platform name
|
|
# if (post_source == selected_community or
|
|
# post_platform == selected_community or
|
|
# selected_community in post_source):
|
|
# matches_community = True
|
|
# break
|
|
#
|
|
# if not matches_community:
|
|
# continue
|
|
|
|
# Apply search filter (before filterset)
|
|
if search_query:
|
|
title = post_data.get('title', '').lower()
|
|
content = post_data.get('content', '').lower()
|
|
author = post_data.get('author', '').lower()
|
|
source = post_data.get('source', '').lower()
|
|
|
|
if not (search_query in title or
|
|
search_query in content or
|
|
search_query in author or
|
|
search_query in source):
|
|
continue
|
|
|
|
raw_posts.append(post_data)
|
|
|
|
# Apply filterset using FilterEngine
|
|
filtered_posts = filter_engine.apply_filterset(raw_posts, filterset_name, use_cache=True)
|
|
|
|
# Build response posts with metadata
|
|
posts = []
|
|
for post_data in filtered_posts:
|
|
post_uuid = post_data.get('uuid')
|
|
comment_count = len(cached_comments.get(post_uuid, []))
|
|
|
|
# Get proper display name for source
|
|
source_display = get_display_name_for_source(
|
|
post_data.get('platform', ''),
|
|
post_data.get('source', ''),
|
|
platform_config
|
|
)
|
|
|
|
# Create post object with filter metadata
|
|
post = {
|
|
'id': post_uuid,
|
|
'title': post_data.get('title', 'Untitled'),
|
|
'author': post_data.get('author', 'Unknown'),
|
|
'platform': post_data.get('platform', 'unknown'),
|
|
'score': post_data.get('score', 0),
|
|
'timestamp': post_data.get('timestamp', 0),
|
|
'url': f'/post/{post_uuid}',
|
|
'comments_count': comment_count,
|
|
'content_preview': (post_data.get('content', '') or '')[:200] + '...' if post_data.get('content') else '',
|
|
'source': post_data.get('source', ''),
|
|
'source_display': source_display,
|
|
'tags': post_data.get('tags', []),
|
|
'external_url': post_data.get('url', ''),
|
|
# Add filter metadata
|
|
'filter_score': post_data.get('_filter_score', 0.5),
|
|
'filter_categories': post_data.get('_filter_categories', []),
|
|
'filter_tags': post_data.get('_filter_tags', [])
|
|
}
|
|
posts.append(post)
|
|
|
|
# Sort by filter score (highest first), then timestamp
|
|
posts.sort(key=lambda x: (x['filter_score'], x['timestamp']), reverse=True)
|
|
|
|
# Calculate pagination
|
|
total_posts = len(posts)
|
|
start_idx = (page - 1) * per_page
|
|
end_idx = start_idx + per_page
|
|
paginated_posts = posts[start_idx:end_idx]
|
|
|
|
total_pages = (total_posts + per_page - 1) // per_page
|
|
has_next = page < total_pages
|
|
has_prev = page > 1
|
|
|
|
return {
|
|
'posts': paginated_posts,
|
|
'pagination': {
|
|
'current_page': page,
|
|
'total_pages': total_pages,
|
|
'total_posts': total_posts,
|
|
'per_page': per_page,
|
|
'has_next': has_next,
|
|
'has_prev': has_prev
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error loading posts: {e}")
|
|
return {'posts': [], 'error': str(e), 'pagination': {'current_page': 1, 'total_pages': 0, 'total_posts': 0, 'per_page': DEFAULT_PAGE_SIZE, 'has_next': False, 'has_prev': False}}
|
|
|
|
|
|
@app.route('/api/platforms')
|
|
def api_platforms():
|
|
"""API endpoint to get platform configuration and available communities"""
|
|
try:
|
|
platform_config = load_platform_config()
|
|
|
|
# Build community list for filtering UI
|
|
communities = []
|
|
posts_dir = Path('data/posts')
|
|
source_counts = {}
|
|
|
|
# Count posts per source to show actual available communities
|
|
for post_file in posts_dir.glob('*.json'):
|
|
try:
|
|
with open(post_file, 'r') as f:
|
|
post_data = json.load(f)
|
|
platform = post_data.get('platform', 'unknown')
|
|
source = post_data.get('source', '')
|
|
|
|
key = f"{platform}:{source}"
|
|
source_counts[key] = source_counts.get(key, 0) + 1
|
|
except:
|
|
continue
|
|
|
|
# Build community list from actual data and platform config
|
|
for key, count in source_counts.items():
|
|
platform, source = key.split(':', 1)
|
|
|
|
# Get display info from platform config
|
|
platform_info = platform_config.get('platforms', {}).get(platform, {})
|
|
community_info = None
|
|
|
|
if platform_info.get('supports_communities'):
|
|
for community in platform_info.get('communities', []):
|
|
if community['id'] == source:
|
|
community_info = community
|
|
break
|
|
|
|
# Create community entry
|
|
if community_info:
|
|
community_entry = {
|
|
'platform': platform,
|
|
'id': source,
|
|
'name': community_info['name'],
|
|
'display_name': community_info['display_name'],
|
|
'icon': community_info.get('icon', platform_info.get('icon', '📄')),
|
|
'count': count,
|
|
'description': community_info.get('description', '')
|
|
}
|
|
else:
|
|
# Fallback for sources not in config
|
|
display_name = get_display_name_for_source(platform, source, platform_config)
|
|
community_entry = {
|
|
'platform': platform,
|
|
'id': source,
|
|
'name': source or platform,
|
|
'display_name': display_name,
|
|
'icon': platform_info.get('icon', '📄'),
|
|
'count': count,
|
|
'description': f"Posts from {display_name}"
|
|
}
|
|
|
|
communities.append(community_entry)
|
|
|
|
# Sort communities by count (most posts first)
|
|
communities.sort(key=lambda x: x['count'], reverse=True)
|
|
|
|
return {
|
|
'platforms': platform_config.get('platforms', {}),
|
|
'communities': communities,
|
|
'total_communities': len(communities)
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error loading platform configuration: {e}")
|
|
return {
|
|
'platforms': {},
|
|
'communities': [],
|
|
'total_communities': 0,
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
@app.route('/api/content-timestamp')
|
|
def api_content_timestamp():
|
|
"""API endpoint to get the last content update timestamp for auto-refresh"""
|
|
try:
|
|
posts_dir = Path('data/posts')
|
|
|
|
if not posts_dir.exists():
|
|
return jsonify({'timestamp': 0})
|
|
|
|
# Get the most recent modification time of any post file
|
|
latest_mtime = 0
|
|
for post_file in posts_dir.glob('*.json'):
|
|
mtime = post_file.stat().st_mtime
|
|
if mtime > latest_mtime:
|
|
latest_mtime = mtime
|
|
|
|
return jsonify({'timestamp': latest_mtime})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting content timestamp: {e}")
|
|
return jsonify({'error': 'Failed to get content timestamp'}), 500
|
|
|
|
|
|
@app.route('/api/bookmark', methods=['POST'])
|
|
@login_required
|
|
def api_bookmark():
|
|
"""Toggle bookmark status for a post"""
|
|
try:
|
|
from models import Bookmark
|
|
|
|
data = request.get_json()
|
|
if not data or 'post_uuid' not in data:
|
|
return jsonify({'error': 'Missing post_uuid'}), 400
|
|
|
|
post_uuid = data['post_uuid']
|
|
if not post_uuid:
|
|
return jsonify({'error': 'Invalid post_uuid'}), 400
|
|
|
|
# Check if bookmark already exists
|
|
existing_bookmark = Bookmark.query.filter_by(
|
|
user_id=current_user.id,
|
|
post_uuid=post_uuid
|
|
).first()
|
|
|
|
if existing_bookmark:
|
|
# Remove bookmark
|
|
db.session.delete(existing_bookmark)
|
|
db.session.commit()
|
|
return jsonify({'bookmarked': False, 'message': 'Bookmark removed'})
|
|
else:
|
|
# Add bookmark - get post data for caching
|
|
cached_posts, _ = _load_posts_cache()
|
|
post_data = cached_posts.get(post_uuid, {})
|
|
|
|
bookmark = Bookmark(
|
|
user_id=current_user.id,
|
|
post_uuid=post_uuid,
|
|
title=post_data.get('title', ''),
|
|
platform=post_data.get('platform', ''),
|
|
source=post_data.get('source', '')
|
|
)
|
|
db.session.add(bookmark)
|
|
db.session.commit()
|
|
return jsonify({'bookmarked': True, 'message': 'Bookmark added'})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error toggling bookmark: {e}")
|
|
return jsonify({'error': 'Failed to toggle bookmark'}), 500
|
|
|
|
|
|
@app.route('/api/bookmarks')
|
|
@login_required
|
|
def api_bookmarks():
|
|
"""Get user's bookmarks"""
|
|
try:
|
|
from models import Bookmark
|
|
|
|
page = int(request.args.get('page', 1))
|
|
per_page = int(request.args.get('per_page', DEFAULT_PAGE_SIZE))
|
|
|
|
# Get user's bookmarks with pagination
|
|
bookmarks_query = Bookmark.query.filter_by(user_id=current_user.id).order_by(Bookmark.created_at.desc())
|
|
total_bookmarks = bookmarks_query.count()
|
|
bookmarks = bookmarks_query.offset((page - 1) * per_page).limit(per_page).all()
|
|
|
|
# Load current posts cache to get updated data
|
|
cached_posts, cached_comments = _load_posts_cache()
|
|
|
|
# Build response
|
|
bookmark_posts = []
|
|
for bookmark in bookmarks:
|
|
# Try to get current post data, fallback to cached data
|
|
post_data = cached_posts.get(bookmark.post_uuid)
|
|
if post_data:
|
|
# Post still exists in current data
|
|
comment_count = len(cached_comments.get(bookmark.post_uuid, []))
|
|
post = {
|
|
'id': bookmark.post_uuid,
|
|
'title': post_data.get('title', bookmark.title or 'Untitled'),
|
|
'author': post_data.get('author', 'Unknown'),
|
|
'platform': post_data.get('platform', bookmark.platform or 'unknown'),
|
|
'score': post_data.get('score', 0),
|
|
'timestamp': post_data.get('timestamp', 0),
|
|
'url': f'/post/{bookmark.post_uuid}',
|
|
'comments_count': comment_count,
|
|
'content_preview': (post_data.get('content', '') or '')[:200] + '...' if post_data.get('content') else '',
|
|
'source': post_data.get('source', bookmark.source or ''),
|
|
'bookmarked_at': bookmark.created_at.isoformat(),
|
|
'external_url': post_data.get('url', '')
|
|
}
|
|
else:
|
|
# Post no longer in current data, use cached bookmark data
|
|
post = {
|
|
'id': bookmark.post_uuid,
|
|
'title': bookmark.title or 'Untitled',
|
|
'author': 'Unknown',
|
|
'platform': bookmark.platform or 'unknown',
|
|
'score': 0,
|
|
'timestamp': 0,
|
|
'url': f'/post/{bookmark.post_uuid}',
|
|
'comments_count': 0,
|
|
'content_preview': 'Content no longer available',
|
|
'source': bookmark.source or '',
|
|
'bookmarked_at': bookmark.created_at.isoformat(),
|
|
'external_url': '',
|
|
'archived': True # Mark as archived
|
|
}
|
|
bookmark_posts.append(post)
|
|
|
|
total_pages = (total_bookmarks + per_page - 1) // per_page
|
|
has_next = page < total_pages
|
|
has_prev = page > 1
|
|
|
|
return jsonify({
|
|
'posts': bookmark_posts,
|
|
'pagination': {
|
|
'current_page': page,
|
|
'total_pages': total_pages,
|
|
'total_posts': total_bookmarks,
|
|
'per_page': per_page,
|
|
'has_next': has_next,
|
|
'has_prev': has_prev
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting bookmarks: {e}")
|
|
return jsonify({'error': 'Failed to get bookmarks'}), 500
|
|
|
|
|
|
@app.route('/api/bookmark-status/<post_uuid>')
|
|
@login_required
|
|
def api_bookmark_status(post_uuid):
|
|
"""Check if a post is bookmarked by current user"""
|
|
try:
|
|
from models import Bookmark
|
|
|
|
bookmark = Bookmark.query.filter_by(
|
|
user_id=current_user.id,
|
|
post_uuid=post_uuid
|
|
).first()
|
|
|
|
return jsonify({'bookmarked': bookmark is not None})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking bookmark status: {e}")
|
|
return jsonify({'error': 'Failed to check bookmark status'}), 500
|
|
|
|
|
|
@app.route('/api/filters')
|
|
def api_filters():
|
|
"""API endpoint to get available filters"""
|
|
try:
|
|
filters = []
|
|
|
|
# Get current user's filter preference
|
|
current_filter = 'no_filter'
|
|
if current_user.is_authenticated:
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
current_filter = user_settings.get('filter_set', 'no_filter')
|
|
except:
|
|
pass
|
|
|
|
# Get available filtersets from filter engine
|
|
for filterset_name in filter_engine.get_available_filtersets():
|
|
filterset_config = filter_engine.config.get_filterset(filterset_name)
|
|
if filterset_config:
|
|
# Map filter names to icons and display names
|
|
icon_map = {
|
|
'no_filter': '🌐',
|
|
'safe_content': '✅',
|
|
'tech_only': '💻',
|
|
'high_quality': '⭐',
|
|
'custom_example': '🎯'
|
|
}
|
|
|
|
name_map = {
|
|
'no_filter': 'All Content',
|
|
'safe_content': 'Safe Content',
|
|
'tech_only': 'Tech Only',
|
|
'high_quality': 'High Quality',
|
|
'custom_example': 'Custom Example'
|
|
}
|
|
|
|
filters.append({
|
|
'id': filterset_name,
|
|
'name': name_map.get(filterset_name, filterset_name.replace('_', ' ').title()),
|
|
'description': filterset_config.get('description', ''),
|
|
'icon': icon_map.get(filterset_name, '🔧'),
|
|
'active': filterset_name == current_filter
|
|
})
|
|
|
|
return jsonify({'filters': filters})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting filters: {e}")
|
|
return jsonify({'error': 'Failed to get filters'}), 500
|
|
|
|
|
|
@app.route('/bookmarks')
|
|
@login_required
|
|
def bookmarks():
|
|
"""Bookmarks page"""
|
|
return render_template('bookmarks.html', user=current_user)
|
|
|
|
|
|
def build_comment_tree(comments):
|
|
"""Build a hierarchical comment tree from flat comment list"""
|
|
# Create lookup dict by UUID
|
|
comment_dict = {c['uuid']: {**c, 'replies': []} for c in comments}
|
|
|
|
# Build tree structure
|
|
root_comments = []
|
|
for comment in comments:
|
|
parent_uuid = comment.get('parent_comment_uuid')
|
|
if parent_uuid and parent_uuid in comment_dict:
|
|
# Add as reply to parent
|
|
comment_dict[parent_uuid]['replies'].append(comment_dict[comment['uuid']])
|
|
else:
|
|
# Top-level comment
|
|
root_comments.append(comment_dict[comment['uuid']])
|
|
|
|
# Sort at each level by timestamp
|
|
def sort_tree(comments_list):
|
|
comments_list.sort(key=lambda x: x.get('timestamp', 0))
|
|
for comment in comments_list:
|
|
if comment.get('replies'):
|
|
sort_tree(comment['replies'])
|
|
|
|
sort_tree(root_comments)
|
|
return root_comments
|
|
|
|
|
|
@app.route('/post/<post_id>')
|
|
def post_detail(post_id):
|
|
"""Serve individual post detail page with modern theme"""
|
|
try:
|
|
# Load platform configuration
|
|
platform_config = load_platform_config()
|
|
|
|
# Use cached data for better performance
|
|
cached_posts, cached_comments = _load_posts_cache()
|
|
|
|
# Get post data from cache
|
|
post_data = cached_posts.get(post_id)
|
|
if not post_data:
|
|
return render_template('404.html'), 404
|
|
|
|
# Add source display name
|
|
post_data['source_display'] = get_display_name_for_source(
|
|
post_data.get('platform', ''),
|
|
post_data.get('source', ''),
|
|
platform_config
|
|
)
|
|
|
|
# Get comments from cache
|
|
comments_flat = cached_comments.get(post_id, [])
|
|
logger.info(f"Loading post {post_id}: found {len(comments_flat)} comments")
|
|
|
|
# Build comment tree
|
|
comments = build_comment_tree(comments_flat)
|
|
|
|
# Load user settings if authenticated
|
|
user_settings = {}
|
|
if current_user.is_authenticated:
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
return render_template('post_detail.html', post=post_data, comments=comments, user_settings=user_settings)
|
|
|
|
except Exception as e:
|
|
print(f"Error loading post {post_id}: {e}")
|
|
return render_template('404.html'), 404
|
|
|
|
|
|
@app.route('/themes/<path:filename>')
|
|
def serve_theme(filename):
|
|
"""Serve theme files (CSS, JS)"""
|
|
# Validate filename to prevent directory traversal
|
|
if not _is_safe_path(filename) or '..' in filename:
|
|
logger.warning(f"Unsafe theme file requested: {filename}")
|
|
abort(404)
|
|
return send_from_directory('themes', filename)
|
|
|
|
|
|
@app.route('/logo.png')
|
|
def serve_logo():
|
|
"""Serve configurable logo"""
|
|
logo_path = app.config['LOGO_PATH']
|
|
# If it's just a filename, serve from current directory
|
|
if '/' not in logo_path:
|
|
return send_from_directory('.', logo_path)
|
|
else:
|
|
# If it's a full path, split directory and filename
|
|
directory = os.path.dirname(logo_path)
|
|
filename = os.path.basename(logo_path)
|
|
return send_from_directory(directory, filename)
|
|
|
|
@app.route('/static/<path:filename>')
|
|
def serve_static(filename):
|
|
"""Serve static files (avatars, etc.)"""
|
|
# Validate filename to prevent directory traversal
|
|
if not _is_safe_path(filename) or '..' in filename:
|
|
logger.warning(f"Unsafe static file requested: {filename}")
|
|
abort(404)
|
|
return send_from_directory('static', filename)
|
|
|
|
|
|
# ============================================================
|
|
# AUTHENTICATION ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
"""Login page"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
# Check if Auth0 is configured
|
|
auth0_configured = bool(app.config.get('AUTH0_DOMAIN') and app.config.get('AUTH0_CLIENT_ID'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
remember = request.form.get('remember', False) == 'on'
|
|
|
|
if not user_service:
|
|
flash('User service not available', 'error')
|
|
return render_template('login.html', auth0_configured=auth0_configured)
|
|
|
|
user = user_service.authenticate(username, password)
|
|
|
|
if user:
|
|
login_user(user, remember=remember)
|
|
flash(f'Welcome back, {user.username}!', 'success')
|
|
|
|
# Redirect to next page or home
|
|
next_page = request.args.get('next')
|
|
return redirect(next_page) if next_page else redirect(url_for('index'))
|
|
else:
|
|
flash('Invalid username or password', 'error')
|
|
|
|
return render_template('login.html', auth0_configured=auth0_configured)
|
|
|
|
|
|
@app.route('/password-reset-request', methods=['GET', 'POST'])
|
|
def password_reset_request():
|
|
"""Request a password reset"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
email = request.form.get('email', '').strip().lower()
|
|
|
|
if not email:
|
|
flash('Please enter your email address', 'error')
|
|
return render_template('password_reset_request.html')
|
|
|
|
# Find user by email
|
|
user = User.query.filter_by(email=email).first()
|
|
|
|
# Always show success message for security (don't reveal if email exists)
|
|
flash('If an account exists with that email, a password reset link has been sent.', 'success')
|
|
|
|
if user and user.password_hash: # Only send reset if user has a password (not OAuth only)
|
|
# Generate reset token
|
|
token = user.generate_reset_token()
|
|
|
|
# Build reset URL
|
|
reset_url = url_for('password_reset', token=token, _external=True)
|
|
|
|
# Log the reset URL (in production, this would be emailed)
|
|
logger.info(f"Password reset requested for {email}. Reset URL: {reset_url}")
|
|
|
|
# For now, also flash it for development (remove in production)
|
|
flash(f'Reset link (development only): {reset_url}', 'info')
|
|
|
|
return redirect(url_for('login'))
|
|
|
|
return render_template('password_reset_request.html')
|
|
|
|
|
|
@app.route('/password-reset/<token>', methods=['GET', 'POST'])
|
|
def password_reset(token):
|
|
"""Reset password with token"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
# Find user by token
|
|
user = User.query.filter_by(reset_token=token).first()
|
|
|
|
if not user or not user.verify_reset_token(token):
|
|
flash('Invalid or expired reset token', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
if request.method == 'POST':
|
|
password = request.form.get('password', '')
|
|
confirm_password = request.form.get('confirm_password', '')
|
|
|
|
if not password or len(password) < 6:
|
|
flash('Password must be at least 6 characters', 'error')
|
|
return render_template('password_reset.html')
|
|
|
|
if password != confirm_password:
|
|
flash('Passwords do not match', 'error')
|
|
return render_template('password_reset.html')
|
|
|
|
# Set new password
|
|
user.set_password(password)
|
|
user.clear_reset_token()
|
|
|
|
flash('Your password has been reset successfully. You can now log in.', 'success')
|
|
return redirect(url_for('login'))
|
|
|
|
return render_template('password_reset.html')
|
|
|
|
|
|
# Auth0 Routes
|
|
@app.route('/auth0/login')
|
|
def auth0_login():
|
|
"""Redirect to Auth0 for authentication"""
|
|
# Check if Auth0 is configured
|
|
if not app.config.get('AUTH0_DOMAIN') or not app.config.get('AUTH0_CLIENT_ID'):
|
|
flash('Auth0 authentication is not configured. Please use email/password login or contact the administrator.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
try:
|
|
redirect_uri = url_for('auth0_callback', _external=True)
|
|
return auth0.authorize_redirect(redirect_uri)
|
|
except Exception as e:
|
|
logger.error(f"Auth0 login error: {e}")
|
|
flash('Auth0 authentication failed. Please use email/password login.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
@app.route('/auth0/callback')
|
|
def auth0_callback():
|
|
"""Handle Auth0 callback and create/login user"""
|
|
try:
|
|
# Get the access token from Auth0
|
|
token = auth0.authorize_access_token()
|
|
|
|
# Get user info from Auth0
|
|
user_info = token.get('userinfo')
|
|
if not user_info:
|
|
user_info = auth0.parse_id_token(token)
|
|
|
|
# Extract user details
|
|
auth0_id = user_info.get('sub')
|
|
email = user_info.get('email')
|
|
username = user_info.get('nickname') or user_info.get('preferred_username') or email.split('@')[0]
|
|
|
|
if not auth0_id or not email:
|
|
flash('Unable to get user information from Auth0', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
# Check if user exists with this Auth0 ID
|
|
user = user_service.get_user_by_auth0_id(auth0_id)
|
|
|
|
if not user:
|
|
# Check if user exists with this email (for account linking)
|
|
existing_user = user_service.get_user_by_email(email)
|
|
|
|
if existing_user:
|
|
# Link existing account to Auth0
|
|
user_service.link_auth0_account(existing_user.id, auth0_id)
|
|
user = existing_user
|
|
flash(f'Account linked successfully! Welcome back, {user.username}!', 'success')
|
|
else:
|
|
# Create new user
|
|
# Generate unique username if needed
|
|
base_username = username[:MAX_USERNAME_LENGTH-3] # Leave room for suffix
|
|
unique_username = base_username
|
|
counter = 1
|
|
while user_service.username_exists(unique_username):
|
|
unique_username = f"{base_username}_{counter}"
|
|
counter += 1
|
|
|
|
user_id = user_service.create_user(
|
|
username=unique_username,
|
|
email=email,
|
|
password=None, # No password for OAuth users
|
|
is_admin=False,
|
|
auth0_id=auth0_id
|
|
)
|
|
|
|
if user_id:
|
|
user = user_service.get_user_by_id(user_id)
|
|
flash(f'Account created successfully! Welcome, {user.username}!', 'success')
|
|
else:
|
|
flash('Failed to create user account', 'error')
|
|
return redirect(url_for('login'))
|
|
else:
|
|
flash(f'Welcome back, {user.username}!', 'success')
|
|
|
|
# Log in the user
|
|
if user:
|
|
login_user(user, remember=True)
|
|
|
|
# Store Auth0 info in session for future use
|
|
session['auth0_user_info'] = user_info
|
|
|
|
# Redirect to next page or home
|
|
next_page = request.args.get('next')
|
|
return redirect(next_page) if next_page else redirect(url_for('index'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Auth0 callback error: {e}")
|
|
flash('Authentication failed. Please try again.', 'error')
|
|
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
@app.route('/auth0/logout')
|
|
@login_required
|
|
def auth0_logout():
|
|
"""Logout from Auth0 and local session"""
|
|
# Clear session
|
|
session.clear()
|
|
logout_user()
|
|
|
|
# Build Auth0 logout URL
|
|
domain = app.config['AUTH0_DOMAIN']
|
|
client_id = app.config['AUTH0_CLIENT_ID']
|
|
return_to = url_for('index', _external=True)
|
|
|
|
logout_url = f'https://{domain}/v2/logout?' + urlencode({
|
|
'returnTo': return_to,
|
|
'client_id': client_id
|
|
}, quote_via=quote_plus)
|
|
|
|
return redirect(logout_url)
|
|
|
|
|
|
@app.route('/admin-setup', methods=['GET', 'POST'])
|
|
def admin_setup():
|
|
"""Create first admin user"""
|
|
# Check if users already exist
|
|
try:
|
|
user_count = User.query.count()
|
|
if user_count > 0:
|
|
flash('Admin user already exists.', 'info')
|
|
return redirect(url_for('login'))
|
|
except Exception as e:
|
|
logger.warning(f"Database error checking existing users: {e}")
|
|
pass
|
|
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
email = request.form.get('email')
|
|
password = request.form.get('password')
|
|
password_confirm = request.form.get('password_confirm')
|
|
|
|
# Validation
|
|
if not username or not email or not password:
|
|
flash('All fields are required', 'error')
|
|
return render_template('admin_setup.html')
|
|
|
|
if password != password_confirm:
|
|
flash('Passwords do not match', 'error')
|
|
return render_template('admin_setup.html')
|
|
|
|
if len(password) < MIN_PASSWORD_LENGTH:
|
|
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
|
|
return render_template('admin_setup.html')
|
|
|
|
# Create admin user
|
|
user_id = user_service.create_user(username, email, password, is_admin=True)
|
|
|
|
if user_id:
|
|
flash('Admin account created successfully! Please log in.', 'success')
|
|
return redirect(url_for('login'))
|
|
else:
|
|
flash('Error creating admin account. Please try again.', 'error')
|
|
|
|
return render_template('admin_setup.html')
|
|
|
|
|
|
@app.route('/signup', methods=['GET', 'POST'])
|
|
def signup():
|
|
"""Signup page"""
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
email = request.form.get('email')
|
|
password = request.form.get('password')
|
|
password_confirm = request.form.get('password_confirm')
|
|
|
|
if not user_service:
|
|
flash('User service not available', 'error')
|
|
return render_template('signup.html')
|
|
|
|
# Validation
|
|
if not username or not email or not password:
|
|
flash('All fields are required', 'error')
|
|
return render_template('signup.html')
|
|
|
|
if password != password_confirm:
|
|
flash('Passwords do not match', 'error')
|
|
return render_template('signup.html')
|
|
|
|
if len(password) < MIN_PASSWORD_LENGTH:
|
|
flash(f'Password must be at least {MIN_PASSWORD_LENGTH} characters', 'error')
|
|
return render_template('signup.html')
|
|
|
|
if user_service.username_exists(username):
|
|
flash('Username already taken', 'error')
|
|
return render_template('signup.html')
|
|
|
|
if user_service.email_exists(email):
|
|
flash('Email already registered', 'error')
|
|
return render_template('signup.html')
|
|
|
|
# Create user
|
|
user_id = user_service.create_user(username, email, password)
|
|
|
|
if user_id:
|
|
flash('Account created successfully! Please log in.', 'success')
|
|
return redirect(url_for('login'))
|
|
else:
|
|
flash('Error creating account. Please try again.', 'error')
|
|
|
|
return render_template('signup.html')
|
|
|
|
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
"""Logout current user"""
|
|
logout_user()
|
|
flash('You have been logged out.', 'info')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/settings')
|
|
@login_required
|
|
def settings():
|
|
"""Main settings page"""
|
|
# Load user settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
# Load available filter sets
|
|
try:
|
|
with open('filtersets.json', 'r') as f:
|
|
filter_sets = json.load(f)
|
|
except:
|
|
filter_sets = {}
|
|
|
|
return render_template('settings.html',
|
|
user=current_user,
|
|
user_settings=user_settings,
|
|
filter_sets=filter_sets)
|
|
|
|
|
|
@app.route('/settings/profile', methods=['GET', 'POST'])
|
|
@login_required
|
|
def settings_profile():
|
|
"""Profile settings page"""
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
email = request.form.get('email')
|
|
default_avatar = request.form.get('default_avatar')
|
|
|
|
# Validation
|
|
if not username or not email:
|
|
flash('Username and email are required', 'error')
|
|
return render_template('settings_profile.html', user=current_user)
|
|
|
|
# Check if username is taken by another user
|
|
if username != current_user.username and user_service.username_exists(username):
|
|
flash('Username already taken', 'error')
|
|
return render_template('settings_profile.html', user=current_user)
|
|
|
|
# Check if email is taken by another user
|
|
if email != current_user.email and user_service.email_exists(email):
|
|
flash('Email already registered', 'error')
|
|
return render_template('settings_profile.html', user=current_user)
|
|
|
|
# Update user
|
|
current_user.username = username
|
|
current_user.email = email
|
|
|
|
# Handle default avatar selection
|
|
if default_avatar and default_avatar.startswith('default_'):
|
|
current_user.profile_picture_url = f"/static/default-avatars/{default_avatar}.png"
|
|
|
|
db.session.commit()
|
|
|
|
flash('Profile updated successfully', 'success')
|
|
return redirect(url_for('settings'))
|
|
|
|
# Available default avatars
|
|
default_avatars = [
|
|
{'id': 'default_1', 'name': 'Gradient Blue', 'bg': 'linear-gradient(135deg, #667eea 0%, #764ba2 100%)'},
|
|
{'id': 'default_2', 'name': 'Gradient Green', 'bg': 'linear-gradient(135deg, #4facfe 0%, #00f2fe 100%)'},
|
|
{'id': 'default_3', 'name': 'Gradient Orange', 'bg': 'linear-gradient(135deg, #fa709a 0%, #fee140 100%)'},
|
|
{'id': 'default_4', 'name': 'Gradient Purple', 'bg': 'linear-gradient(135deg, #a8edea 0%, #fed6e3 100%)'},
|
|
{'id': 'default_5', 'name': 'Brand Colors', 'bg': 'linear-gradient(135deg, #4db6ac 0%, #26a69a 100%)'},
|
|
{'id': 'default_6', 'name': 'Sunset', 'bg': 'linear-gradient(135deg, #ff7e5f 0%, #feb47b 100%)'},
|
|
]
|
|
|
|
return render_template('settings_profile.html', user=current_user, default_avatars=default_avatars)
|
|
|
|
|
|
@app.route('/settings/communities', methods=['GET', 'POST'])
|
|
@login_required
|
|
def settings_communities():
|
|
"""Community/source selection settings"""
|
|
if request.method == 'POST':
|
|
# Get selected communities
|
|
selected_communities = request.form.getlist('communities')
|
|
|
|
# Load current settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
# Update communities
|
|
user_settings['communities'] = selected_communities
|
|
|
|
# Save settings
|
|
current_user.settings = json.dumps(user_settings)
|
|
db.session.commit()
|
|
|
|
flash('Community preferences updated', 'success')
|
|
return redirect(url_for('settings'))
|
|
|
|
# Load current settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
selected_communities = user_settings.get('communities', [])
|
|
except:
|
|
selected_communities = []
|
|
|
|
# Get available communities from platform config and collection targets
|
|
available_communities = []
|
|
|
|
# Load platform configuration
|
|
platform_config = load_platform_config()
|
|
|
|
# Get enabled communities from collection_targets (what's actually being crawled)
|
|
enabled_communities = set()
|
|
for target in platform_config.get('collection_targets', []):
|
|
enabled_communities.add((target['platform'], target['community']))
|
|
|
|
# Build community list from platform config for communities that are enabled
|
|
for platform_name, platform_info in platform_config.get('platforms', {}).items():
|
|
for community_info in platform_info.get('communities', []):
|
|
# Only include communities that are in collection_targets
|
|
if (platform_name, community_info['id']) in enabled_communities:
|
|
available_communities.append({
|
|
'id': community_info['id'],
|
|
'name': community_info['name'],
|
|
'display_name': community_info.get('display_name', community_info['name']),
|
|
'platform': platform_name,
|
|
'icon': community_info.get('icon', platform_info.get('icon', '📄')),
|
|
'description': community_info.get('description', '')
|
|
})
|
|
|
|
return render_template('settings_communities.html',
|
|
user=current_user,
|
|
available_communities=available_communities,
|
|
selected_communities=selected_communities)
|
|
|
|
|
|
@app.route('/settings/filters', methods=['GET', 'POST'])
|
|
@login_required
|
|
def settings_filters():
|
|
"""Filter settings page"""
|
|
if request.method == 'POST':
|
|
selected_filter = request.form.get('filter_set', 'no_filter')
|
|
|
|
# Load and validate current settings
|
|
user_settings = _validate_user_settings(current_user.settings)
|
|
|
|
# Validate new filter setting
|
|
if _is_safe_filterset(selected_filter):
|
|
user_settings['filter_set'] = selected_filter
|
|
else:
|
|
flash('Invalid filter selection', 'error')
|
|
return redirect(url_for('settings'))
|
|
|
|
# Save validated settings
|
|
try:
|
|
current_user.settings = json.dumps(user_settings)
|
|
db.session.commit()
|
|
flash('Filter settings updated successfully', 'success')
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Error saving filter settings for user {current_user.id}: {e}")
|
|
flash('Error saving settings', 'error')
|
|
|
|
return redirect(url_for('settings'))
|
|
|
|
# Load current settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
current_filter = user_settings.get('filter_set', 'no_filter')
|
|
|
|
# Load available filter sets from FilterEngine as a dictionary
|
|
filter_sets = {}
|
|
for filterset_name in filter_engine.get_available_filtersets():
|
|
filter_sets[filterset_name] = filter_engine.config.get_filterset(filterset_name)
|
|
|
|
return render_template('settings_filters.html',
|
|
user=current_user,
|
|
filter_sets=filter_sets,
|
|
current_filter=current_filter)
|
|
|
|
|
|
@app.route('/settings/experience', methods=['GET', 'POST'])
|
|
@login_required
|
|
def settings_experience():
|
|
"""Experience and behavioral settings page - opt-in addictive features"""
|
|
if request.method == 'POST':
|
|
# Load current settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
# Get experience settings with defaults (all opt-in, so default to False)
|
|
user_settings['experience'] = {
|
|
'infinite_scroll': request.form.get('infinite_scroll') == 'on',
|
|
'auto_refresh': request.form.get('auto_refresh') == 'on',
|
|
'push_notifications': request.form.get('push_notifications') == 'on',
|
|
'dark_patterns_opt_in': request.form.get('dark_patterns_opt_in') == 'on',
|
|
'time_filter_enabled': request.form.get('time_filter_enabled') == 'on',
|
|
'time_filter_days': int(request.form.get('time_filter_days', 7))
|
|
}
|
|
|
|
# Save settings
|
|
current_user.settings = json.dumps(user_settings)
|
|
db.session.commit()
|
|
|
|
flash('Experience settings updated successfully', 'success')
|
|
return redirect(url_for('settings'))
|
|
|
|
# Load current settings
|
|
try:
|
|
user_settings = json.loads(current_user.settings) if current_user.settings else {}
|
|
except:
|
|
user_settings = {}
|
|
|
|
experience_settings = user_settings.get('experience', {
|
|
'infinite_scroll': False,
|
|
'auto_refresh': False,
|
|
'push_notifications': False,
|
|
'dark_patterns_opt_in': False,
|
|
'time_filter_enabled': False,
|
|
'time_filter_days': 7
|
|
})
|
|
|
|
return render_template('settings_experience.html',
|
|
user=current_user,
|
|
experience_settings=experience_settings)
|
|
|
|
|
|
@app.route('/upload-avatar', methods=['POST'])
|
|
@login_required
|
|
def upload_avatar():
|
|
"""Upload profile picture"""
|
|
try:
|
|
# Debug logging
|
|
logger.info(f"Avatar upload attempt by user {current_user.id} ({current_user.username})")
|
|
logger.debug(f"Request files: {list(request.files.keys())}")
|
|
logger.debug(f"Request form: {dict(request.form)}")
|
|
|
|
# Check if user is properly authenticated and has required attributes
|
|
if not hasattr(current_user, 'id') or not current_user.id:
|
|
logger.error("User missing ID attribute")
|
|
flash('Authentication error. Please log in again.', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
if not hasattr(current_user, 'username') or not current_user.username:
|
|
logger.error("User missing username attribute")
|
|
flash('User profile incomplete. Please update your profile.', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
# Check for file in request
|
|
if 'avatar' not in request.files:
|
|
logger.warning("No avatar file in request")
|
|
flash('No file selected', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
file = request.files['avatar']
|
|
if file.filename == '':
|
|
logger.warning("Empty filename provided")
|
|
flash('No file selected', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
logger.info(f"Processing file: {file.filename}")
|
|
|
|
# Validate file type and size
|
|
if not _is_allowed_file(file.filename):
|
|
logger.warning(f"Invalid file type: {file.filename}")
|
|
flash('Invalid file type. Please upload PNG, JPG, or GIF', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
# Check file size (Flask's MAX_CONTENT_LENGTH handles this too, but double-check)
|
|
if hasattr(file, 'content_length') and file.content_length > app.config.get('MAX_CONTENT_LENGTH', 16*1024*1024):
|
|
logger.warning(f"File too large: {file.content_length}")
|
|
flash('File too large. Maximum size is 16MB', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
# Validate and secure filename
|
|
filename = secure_filename(file.filename)
|
|
if not filename or len(filename) > MAX_FILENAME_LENGTH:
|
|
logger.warning(f"Invalid filename after sanitization: {filename}")
|
|
flash('Invalid filename', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
# Add user ID to make filename unique and prevent conflicts
|
|
unique_filename = f"{current_user.id}_{filename}"
|
|
logger.info(f"Generated unique filename: {unique_filename}")
|
|
|
|
# Ensure upload directory exists and is secure
|
|
upload_dir = os.path.abspath(UPLOAD_FOLDER)
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
upload_path = os.path.join(upload_dir, unique_filename)
|
|
|
|
# Final security check - ensure path is within upload directory
|
|
if not os.path.abspath(upload_path).startswith(upload_dir):
|
|
logger.warning(f"Path traversal attempt in file upload: {upload_path}")
|
|
flash('Invalid file path', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
# Save the file
|
|
file.save(upload_path)
|
|
logger.info(f"File saved successfully: {upload_path}")
|
|
|
|
# Update user profile with database error handling
|
|
old_avatar_url = current_user.profile_picture_url
|
|
current_user.profile_picture_url = f"/static/avatars/{unique_filename}"
|
|
|
|
db.session.commit()
|
|
logger.info(f"User profile updated successfully for {current_user.username}")
|
|
|
|
# Clean up old avatar file if it exists and was uploaded by user
|
|
if old_avatar_url and old_avatar_url.startswith('/static/avatars/') and current_user.id in old_avatar_url:
|
|
try:
|
|
old_file_path = os.path.join(upload_dir, os.path.basename(old_avatar_url))
|
|
if os.path.exists(old_file_path):
|
|
os.remove(old_file_path)
|
|
logger.info(f"Cleaned up old avatar: {old_file_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not clean up old avatar: {e}")
|
|
|
|
flash('Profile picture updated successfully', 'success')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in avatar upload: {e}")
|
|
db.session.rollback()
|
|
flash('An unexpected error occurred. Please try again.', 'error')
|
|
return redirect(url_for('settings_profile'))
|
|
|
|
|
|
@app.route('/profile')
|
|
@login_required
|
|
def profile():
|
|
"""User profile page"""
|
|
return render_template('profile.html', user=current_user)
|
|
|
|
|
|
# ============================================================
|
|
# ADMIN ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/admin')
|
|
@login_required
|
|
def admin_panel():
|
|
"""Admin panel - user management"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied. Admin privileges required.', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
if not user_service:
|
|
flash('User service not available', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
users = user_service.get_all_users()
|
|
return render_template('admin.html', users=users)
|
|
|
|
|
|
@app.route('/admin/user/<user_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def admin_delete_user(user_id):
|
|
"""Delete user (admin only)"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
# Prevent self-deletion
|
|
if current_user.id == user_id:
|
|
flash('You cannot delete your own account!', 'error')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
user = user_service.get_user_by_id(user_id)
|
|
if user:
|
|
username = user.username
|
|
if user_service.delete_user(user_id):
|
|
flash(f'User {username} has been deleted.', 'success')
|
|
logger.info(f"Admin {current_user.id} deleted user {username} ({user_id})")
|
|
else:
|
|
flash('Error deleting user', 'error')
|
|
logger.error(f"Failed to delete user {user_id}")
|
|
else:
|
|
flash('User not found', 'error')
|
|
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
|
|
@app.route('/admin/user/<user_id>/toggle-admin', methods=['POST'])
|
|
@login_required
|
|
def admin_toggle_admin(user_id):
|
|
"""Toggle user admin status"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
target_user = user_service.get_user_by_id(user_id)
|
|
|
|
if target_user:
|
|
new_status = not target_user.is_admin # Toggle admin status
|
|
user_service.update_user_admin_status(user_id, new_status)
|
|
flash('Admin status updated', 'success')
|
|
else:
|
|
flash('User not found', 'error')
|
|
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
|
|
# This route is duplicate - removed in favor of the UUID-based route above
|
|
|
|
|
|
# This route is duplicate - removed in favor of the UUID-based route above
|
|
|
|
|
|
@app.route('/admin/regenerate_content', methods=['POST'])
|
|
@login_required
|
|
def admin_regenerate_content():
|
|
"""Regenerate all HTML content"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
try:
|
|
import subprocess
|
|
import shlex
|
|
|
|
# Secure subprocess execution with absolute paths and validation
|
|
script_path = os.path.abspath('generate_html.py')
|
|
if not os.path.exists(script_path):
|
|
flash('Content generation script not found', 'error')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
# Use absolute python path and validate arguments
|
|
python_exe = os.path.abspath(os.sys.executable)
|
|
cmd = [python_exe, script_path, '--filterset', 'no_filter', '--theme', 'vanilla-js']
|
|
|
|
# Execute with timeout and security restrictions
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=os.path.abspath('.'),
|
|
timeout=300, # 5 minute timeout
|
|
check=False
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
flash('Content regenerated successfully', 'success')
|
|
logger.info(f"Content regenerated by admin user {current_user.id}")
|
|
# Invalidate cache since content was regenerated
|
|
_invalidate_cache()
|
|
else:
|
|
flash('Error regenerating content', 'error')
|
|
logger.error(f"Content regeneration failed: {result.stderr}")
|
|
|
|
except subprocess.TimeoutExpired:
|
|
flash('Content regeneration timed out', 'error')
|
|
logger.error("Content regeneration timed out")
|
|
except Exception as e:
|
|
flash(f'Error regenerating content: {str(e)}', 'error')
|
|
logger.error(f"Content regeneration error: {e}")
|
|
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
|
|
@app.route('/admin/clear_cache', methods=['POST'])
|
|
@login_required
|
|
def admin_clear_cache():
|
|
"""Clear application cache"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
try:
|
|
# Clear any cache directories or temp files
|
|
import shutil
|
|
import os
|
|
|
|
cache_dirs = ['cache', 'temp']
|
|
for cache_dir in cache_dirs:
|
|
if os.path.exists(cache_dir):
|
|
shutil.rmtree(cache_dir)
|
|
|
|
# Clear application cache
|
|
_invalidate_cache()
|
|
|
|
flash('Cache cleared successfully', 'success')
|
|
logger.info(f"Cache cleared by admin user {current_user.id}")
|
|
except Exception as e:
|
|
flash(f'Error clearing cache: {str(e)}', 'error')
|
|
logger.error(f"Cache clearing error: {e}")
|
|
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
|
|
@app.route('/admin/backup_data', methods=['POST'])
|
|
@login_required
|
|
def admin_backup_data():
|
|
"""Create backup of application data"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
try:
|
|
import shutil
|
|
import os
|
|
from datetime import datetime
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
backup_name = f'balanceboard_backup_{timestamp}'
|
|
|
|
# Create backup directory
|
|
backup_dir = f'backups/{backup_name}'
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
|
|
# Copy important directories
|
|
dirs_to_backup = ['data', 'templates', 'themes', 'static']
|
|
for dir_name in dirs_to_backup:
|
|
if os.path.exists(dir_name):
|
|
shutil.copytree(dir_name, f'{backup_dir}/{dir_name}')
|
|
|
|
# Copy important files
|
|
files_to_backup = ['app.py', 'models.py', 'database.py', 'filtersets.json']
|
|
for file_name in files_to_backup:
|
|
if os.path.exists(file_name):
|
|
shutil.copy2(file_name, backup_dir)
|
|
|
|
flash(f'Backup created: {backup_name}', 'success')
|
|
except Exception as e:
|
|
flash(f'Error creating backup: {str(e)}', 'error')
|
|
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
|
|
# ============================================================
|
|
# POLLING MANAGEMENT ROUTES
|
|
# ============================================================
|
|
|
|
@app.route('/admin/polling')
|
|
@login_required
|
|
def admin_polling():
|
|
"""Admin polling management page"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied. Admin privileges required.', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource, PollLog
|
|
from polling_service import polling_service
|
|
|
|
# Get all poll sources with recent logs
|
|
sources = PollSource.query.order_by(PollSource.platform, PollSource.display_name).all()
|
|
|
|
# Get scheduler status
|
|
scheduler_status = polling_service.get_status()
|
|
|
|
# Load platform config for available sources
|
|
platform_config = load_platform_config()
|
|
|
|
return render_template('admin_polling.html',
|
|
sources=sources,
|
|
scheduler_status=scheduler_status,
|
|
platform_config=platform_config)
|
|
|
|
|
|
@app.route('/admin/polling/add', methods=['POST'])
|
|
@login_required
|
|
def admin_polling_add():
|
|
"""Add a new poll source"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource
|
|
|
|
platform = request.form.get('platform')
|
|
source_id = request.form.get('source_id')
|
|
custom_source_id = request.form.get('custom_source_id')
|
|
display_name = request.form.get('display_name')
|
|
poll_interval = int(request.form.get('poll_interval', 60))
|
|
max_posts = int(request.form.get('max_posts', 100))
|
|
fetch_comments = request.form.get('fetch_comments', 'true') == 'true'
|
|
priority = request.form.get('priority', 'medium')
|
|
|
|
# Use custom source if provided, otherwise use dropdown
|
|
if custom_source_id and custom_source_id.strip():
|
|
source_id = custom_source_id.strip()
|
|
|
|
if not platform or not source_id or not display_name:
|
|
flash('Missing required fields', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
# Check if source already exists
|
|
existing = PollSource.query.filter_by(platform=platform, source_id=source_id).first()
|
|
if existing:
|
|
flash(f'Source {platform}:{source_id} already exists', 'warning')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
# Create new source (disabled by default)
|
|
source = PollSource(
|
|
platform=platform,
|
|
source_id=source_id,
|
|
display_name=display_name,
|
|
poll_interval_minutes=poll_interval,
|
|
max_posts=max_posts,
|
|
fetch_comments=fetch_comments,
|
|
priority=priority,
|
|
enabled=False,
|
|
created_by=current_user.id
|
|
)
|
|
|
|
db.session.add(source)
|
|
db.session.commit()
|
|
|
|
flash(f'Added polling source: {display_name}', 'success')
|
|
logger.info(f"Admin {current_user.id} added poll source {platform}:{source_id}")
|
|
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
|
|
@app.route('/admin/polling/<source_id>/toggle', methods=['POST'])
|
|
@login_required
|
|
def admin_polling_toggle(source_id):
|
|
"""Toggle a poll source on/off"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource
|
|
|
|
source = PollSource.query.get(source_id)
|
|
if not source:
|
|
flash('Source not found', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
source.enabled = not source.enabled
|
|
db.session.commit()
|
|
|
|
status = 'enabled' if source.enabled else 'disabled'
|
|
flash(f'Polling {status} for {source.display_name}', 'success')
|
|
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
|
|
@app.route('/admin/polling/<source_id>/update', methods=['POST'])
|
|
@login_required
|
|
def admin_polling_update(source_id):
|
|
"""Update poll source configuration"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource
|
|
|
|
source = PollSource.query.get(source_id)
|
|
if not source:
|
|
flash('Source not found', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
# Update all configurable fields
|
|
if request.form.get('poll_interval'):
|
|
source.poll_interval_minutes = int(request.form.get('poll_interval'))
|
|
|
|
if request.form.get('max_posts'):
|
|
source.max_posts = int(request.form.get('max_posts'))
|
|
|
|
if request.form.get('fetch_comments') is not None:
|
|
source.fetch_comments = request.form.get('fetch_comments') == 'true'
|
|
|
|
if request.form.get('priority'):
|
|
source.priority = request.form.get('priority')
|
|
|
|
if request.form.get('display_name'):
|
|
source.display_name = request.form.get('display_name')
|
|
|
|
db.session.commit()
|
|
flash(f'Updated settings for {source.display_name}', 'success')
|
|
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
|
|
@app.route('/admin/polling/<source_id>/poll-now', methods=['POST'])
|
|
@login_required
|
|
def admin_polling_poll_now(source_id):
|
|
"""Manually trigger polling for a source"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource
|
|
from polling_service import polling_service
|
|
|
|
source = PollSource.query.get(source_id)
|
|
if not source:
|
|
flash('Source not found', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
# Trigger polling in background
|
|
try:
|
|
polling_service.poll_now(source_id)
|
|
flash(f'Polling started for {source.display_name}', 'success')
|
|
except Exception as e:
|
|
flash(f'Error starting poll: {str(e)}', 'error')
|
|
logger.error(f"Error triggering poll for {source_id}: {e}")
|
|
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
|
|
@app.route('/admin/polling/<source_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def admin_polling_delete(source_id):
|
|
"""Delete a poll source"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource
|
|
|
|
source = PollSource.query.get(source_id)
|
|
if not source:
|
|
flash('Source not found', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
display_name = source.display_name
|
|
db.session.delete(source)
|
|
db.session.commit()
|
|
|
|
flash(f'Deleted polling source: {display_name}', 'success')
|
|
logger.info(f"Admin {current_user.id} deleted poll source {source_id}")
|
|
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
|
|
@app.route('/admin/polling/<source_id>/logs')
|
|
@login_required
|
|
def admin_polling_logs(source_id):
|
|
"""View logs for a specific poll source"""
|
|
if not current_user.is_admin:
|
|
flash('Access denied', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
from models import PollSource, PollLog
|
|
|
|
source = PollSource.query.get(source_id)
|
|
if not source:
|
|
flash('Source not found', 'error')
|
|
return redirect(url_for('admin_polling'))
|
|
|
|
# Get recent logs (limit to 50)
|
|
logs = source.logs.limit(50).all()
|
|
|
|
return render_template('admin_polling_logs.html',
|
|
source=source,
|
|
logs=logs)
|
|
|
|
|
|
# ============================================================
|
|
# TEMPLATE CONTEXT PROCESSORS
|
|
# ============================================================
|
|
|
|
@app.context_processor
|
|
def inject_app_config():
|
|
"""Inject app configuration into all templates"""
|
|
return {
|
|
'APP_NAME': app.config['APP_NAME']
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# ERROR HANDLERS
|
|
# ============================================================
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(e):
|
|
"""404 page"""
|
|
return render_template('404.html'), 404
|
|
|
|
|
|
@app.errorhandler(500)
|
|
def server_error(e):
|
|
"""500 page"""
|
|
return render_template('500.html'), 500
|
|
|
|
|
|
# ============================================================
|
|
# INITIALIZATION
|
|
# ============================================================
|
|
|
|
if __name__ == '__main__':
|
|
print("✓ BalanceBoard starting...")
|
|
print("✓ Database: PostgreSQL with SQLAlchemy")
|
|
print("✓ Password hashing: bcrypt")
|
|
print("✓ Authentication: Flask-Login")
|
|
|
|
app.run(host='0.0.0.0', port=DEFAULT_PORT, debug=True)
|