Files
olaper/routes.py
SERTY 0f1c749b33
All checks were successful
Test Build / test-build (push) Successful in 23s
Scheduler v1
2025-07-30 18:28:55 +03:00

516 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# routes.py
import os
import json
import shutil
from flask import (
Blueprint, render_template, request, redirect, url_for, flash, g, session, current_app
)
from flask_login import login_user, login_required, logout_user, current_user
from flask_babel import _
from werkzeug.utils import secure_filename
import gspread
# --- ИМПОРТ РАСШИРЕНИЙ И МОДЕЛЕЙ ---
# Импортируем экземпляры расширений, созданные в app.py
from extensions import db, login_manager, scheduler
# Импортируем наши классы и утилиты
from models import User, UserConfig
from google_sheets import GoogleSheets
from request_module import ReqModule
from utils import calculate_period_dates, get_dates, generate_template_from_preset, render_temp
# --- Создание блюпринта ---
main_bp = Blueprint('main', __name__)
# --- Регистрация обработчиков для расширений ---
@login_manager.user_loader
def load_user(user_id):
"""Загружает пользователя из БД для управления сессией."""
return db.session.get(User, int(user_id))
@main_bp.before_app_request
def load_user_specific_data():
"""Загружает конфигурацию пользователя в глобальный объект `g` для текущего запроса."""
g.user_config = None
if current_user.is_authenticated:
g.user_config = get_user_config()
# --- Вспомогательные функции, специфичные для маршрутов ---
def get_user_config():
"""Получает конфиг для текущего пользователя, создавая его при необходимости."""
if not current_user.is_authenticated:
return None
config = UserConfig.query.filter_by(user_id=current_user.id).first()
if not config:
config = UserConfig(user_id=current_user.id)
db.session.add(config)
return config
def get_user_upload_path(filename=""):
"""Возвращает путь для загрузки файлов для текущего пользователя."""
if not current_user.is_authenticated:
return None
user_dir = os.path.join(current_app.config['DATA_DIR'], str(current_user.id))
os.makedirs(user_dir, exist_ok=True)
return os.path.join(user_dir, secure_filename(filename))
def _parse_cron_string(cron_str):
"""Парсит строку cron в словарь для APScheduler. Локальная копия для удобства."""
parts = cron_str.split()
if len(parts) != 5:
raise ValueError("Invalid cron string format. Expected 5 parts.")
keys = ['minute', 'hour', 'day', 'month', 'day_of_week']
return {keys[i]: part for i, part in enumerate(parts)}
# --- Маршруты ---
@main_bp.route('/language/<language>')
def set_language(language=None):
session['language'] = language
return redirect(request.referrer or url_for('.index'))
@main_bp.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user is None or not user.check_password(password):
flash(_('Invalid username or password'), 'error')
return redirect(url_for('.login'))
login_user(user, remember=request.form.get('remember'))
flash(_('Login successful!'), 'success')
next_page = request.args.get('next')
return redirect(next_page or url_for('.index'))
return render_template('login.html')
@main_bp.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
if not username or not password:
flash(_('Username and password are required.'), 'error')
return redirect(url_for('.register'))
if User.query.filter_by(username=username).first():
flash(_('Username already exists.'), 'error')
return redirect(url_for('.register'))
user = User(username=username)
user.set_password(password)
user.config = UserConfig()
db.session.add(user)
try:
db.session.commit()
flash(_('Registration successful! Please log in.'), 'success')
return redirect(url_for('.login'))
except Exception as e:
db.session.rollback()
flash(_('An error occurred during registration. Please try again.'), 'error')
return redirect(url_for('.register'))
return render_template('register.html')
@main_bp.route('/logout')
@login_required
def logout():
logout_user()
flash(_('You have been logged out.'), 'success')
return redirect(url_for('.index'))
@main_bp.route('/')
@login_required
def index():
config = g.user_config
return render_template(
'index.html',
rms_config=config.get_rms_dict(),
google_config=config.get_google_dict(),
presets=config.presets,
sheets=config.sheets,
mappings=config.mappings,
client_email=config.google_client_email
)
@main_bp.route('/configure_rms', methods=['POST'])
@login_required
def configure_rms():
config = g.user_config
try:
host = request.form.get('host', '').strip()
login = request.form.get('login', '').strip()
password = request.form.get('password', '')
if not config.rms_password and not password:
flash(_('Password is required for the first time.'), 'error')
return redirect(url_for('.index'))
if not host or not login:
flash(_('Host and Login fields must be filled.'), 'error')
return redirect(url_for('.index'))
effective_password = password if password else config.rms_password
req_module = ReqModule(host, login, effective_password)
if req_module.login():
presets_data = req_module.take_presets()
req_module.logout()
config.rms_host = host
config.rms_login = login
if password:
config.rms_password = password
config.presets = presets_data
db.session.commit()
flash(_('Successfully authorized on RMS server. Received %(num)s presets.', num=len(presets_data)), 'success')
else:
flash(_('Authorization error on RMS server. Check host, login or password.'), 'error')
except Exception as e:
db.session.rollback()
flash(_('Error configuring RMS: %(error)s', error=str(e)), 'error')
return redirect(url_for('.index'))
@main_bp.route('/upload_credentials', methods=['POST'])
@login_required
def upload_credentials():
config = g.user_config
if 'cred_file' not in request.files or request.files['cred_file'].filename == '':
flash(_('No file was selected.'), 'error')
return redirect(url_for('.index'))
cred_file = request.files['cred_file']
filename = cred_file.filename
# Получаем путь для сохранения файла в папке пользователя
user_cred_path = get_user_upload_path(filename)
temp_path = None
try:
# Сначала сохраняем файл во временную директорию для проверки
temp_dir = os.path.join(current_app.config['DATA_DIR'], "temp")
os.makedirs(temp_dir, exist_ok=True)
temp_path = os.path.join(temp_dir, f"temp_{current_user.id}_{filename}")
cred_file.save(temp_path)
with open(temp_path, 'r', encoding='utf-8') as f:
cred_data = json.load(f)
client_email = cred_data.get('client_email')
if not client_email:
flash(_('Could not find client_email in the credentials file.'), 'error')
# Не забываем удалить временный файл при ошибке
if os.path.exists(temp_path):
os.remove(temp_path)
return redirect(url_for('.index'))
# Если все хорошо, перемещаем файл из временной папки в постоянную
shutil.move(temp_path, user_cred_path)
# Сохраняем путь к файлу и email в базу данных
config.google_cred_file_path = user_cred_path
config.google_client_email = client_email
config.sheets = [] # Сбрасываем список листов при смене credentials
db.session.commit()
flash(_('Credentials file successfully uploaded. Email: %(email)s', email=client_email), 'success')
except json.JSONDecodeError:
flash(_('Error: Uploaded file is not a valid JSON.'), 'error')
except Exception as e:
db.session.rollback()
flash(_('Error processing credentials: %(error)s', error=str(e)), 'error')
finally:
# Гарантированно удаляем временный файл, если он еще существует
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
return redirect(url_for('.index'))
@main_bp.route('/configure_google', methods=['POST'])
@login_required
def configure_google():
config = g.user_config
sheet_url = request.form.get('sheet_url', '').strip()
if not sheet_url:
flash(_('Sheet URL must be provided.'), 'error')
return redirect(url_for('.index'))
config.google_sheet_url = sheet_url
cred_path = config.google_cred_file_path
if not cred_path or not os.path.isfile(cred_path):
flash(_('Please upload a valid credentials file first.'), 'warning')
config.sheets = []
db.session.commit()
return redirect(url_for('.index'))
try:
gs_client = GoogleSheets(cred_path, sheet_url)
sheets_data = gs_client.get_sheets()
config.sheets = sheets_data
db.session.commit()
flash(_('Successfully connected to Google Sheets. Found %(num)s sheets. Settings saved.', num=len(sheets_data)), 'success')
except Exception as e:
db.session.rollback()
config.sheets = []
flash(_('Error connecting to Google Sheets: %(error)s. Check the URL and service account permissions.', error=str(e)), 'error')
try:
db.session.commit()
except Exception:
db.session.rollback()
return redirect(url_for('.index'))
@main_bp.route('/mapping_set', methods=['POST'])
@login_required
def mapping_set():
config = g.user_config
try:
new_mappings = {}
# Сохраняем существующие настройки расписания при обновлении отчетов
current_mappings = config.mappings or {}
for sheet in config.sheets:
report_key = f"sheet_{sheet['id']}"
selected_report_id = request.form.get(report_key)
if selected_report_id:
# Получаем существующие данные расписания для этого листа
existing_schedule = current_mappings.get(sheet['title'], {})
new_mappings[sheet['title']] = {
'report_id': selected_report_id,
'schedule_cron': existing_schedule.get('schedule_cron'),
'schedule_period': existing_schedule.get('schedule_period')
}
config.mappings = new_mappings
db.session.commit()
flash(_('Mappings updated successfully.'), 'success')
except Exception as e:
db.session.rollback()
flash(_('Error updating mappings: %(error)s', error=str(e)), 'error')
return redirect(url_for('.index'))
def execute_olap_export(user_id, sheet_title, start_date_str=None, end_date_str=None):
"""
Основная логика выгрузки OLAP-отчета. Может вызываться как из эндпоинта, так и из планировщика.
Если start_date_str и end_date_str не переданы, вычисляет их на основе расписания.
"""
app = current_app._get_current_object()
with app.app_context():
user = db.session.get(User, user_id)
if not user:
app.logger.error(f"Task failed: User with ID {user_id} not found.")
return
config = user.config
req_module = None
try:
mappings = config.mappings
mapping_info = mappings.get(sheet_title)
if not mapping_info or not mapping_info.get('report_id'):
raise ValueError(f"No report is assigned to sheet '{sheet_title}'.")
report_id = mapping_info['report_id']
# Если даты не переданы (вызов из планировщика), вычисляем их
if not start_date_str or not end_date_str:
period_key = mapping_info.get('schedule_period')
if not period_key:
raise ValueError(f"Scheduled task for sheet '{sheet_title}' is missing a period setting.")
from_date, to_date = calculate_period_dates(period_key)
app.logger.info(f"Executing scheduled job for user {user_id}, sheet '{sheet_title}', period '{period_key}' ({from_date} to {to_date})")
else:
from_date, to_date = get_dates(start_date_str, end_date_str)
app.logger.info(f"Executing manual job for user {user_id}, sheet '{sheet_title}' ({from_date} to {to_date})")
# Проверка полноты конфигурации
if not all([config.rms_host, config.rms_login, config.rms_password, config.google_cred_file_path, config.google_sheet_url]):
raise ValueError('RMS or Google Sheets configuration is incomplete.')
preset = next((p for p in config.presets if p.get('id') == report_id), None)
if not preset:
raise ValueError(f'Preset with ID "{report_id}" not found in saved configuration.')
template = generate_template_from_preset(preset)
json_body = render_temp(template, {"from_date": from_date, "to_date": to_date})
req_module = ReqModule(config.rms_host, config.rms_login, config.rms_password)
gs_client = GoogleSheets(config.google_cred_file_path, config.google_sheet_url)
if req_module.login():
result = req_module.take_olap(json_body)
# Код обработки данных (остается без изменений)
if 'data' not in result or not isinstance(result['data'], list):
raise ValueError(f'Unexpected response format from RMS for report "{preset.get("name", report_id)}".')
report_data = result['data']
if not report_data:
gs_client.clear_and_write_data(sheet_title, [])
app.logger.warning(f"Report '{preset.get('name', report_id)}' for user {user_id} returned no data. Sheet '{sheet_title}' cleared.")
return
processed_data = []
first_item = report_data[0]
is_pivoted = isinstance(first_item, dict) and 'row' in first_item and 'cells' in first_item
if is_pivoted:
for row_item in report_data:
row_values = row_item.get('row', {})
cells = row_item.get('cells', [])
if not cells:
processed_data.append(row_values.copy())
else:
for cell in cells:
new_flat_row = row_values.copy()
new_flat_row.update(cell.get('col', {}))
new_flat_row.update(cell.get('values', {}))
processed_data.append(new_flat_row)
else:
processed_data = [item for item in report_data if isinstance(item, dict)]
all_keys = set()
for row in processed_data:
all_keys.update(row.keys())
row_group_fields = preset.get('groupByRowFields', [])
col_group_fields = preset.get('groupByColFields', [])
agg_fields = preset.get('aggregateFields', [])
ordered_headers = []
for field in row_group_fields + col_group_fields + agg_fields:
if field in all_keys:
ordered_headers.append(field)
all_keys.remove(field)
ordered_headers.extend(sorted(list(all_keys)))
data_to_insert = [ordered_headers]
for row in processed_data:
row_data = []
for header in ordered_headers:
value = row.get(header, '')
value_str = str(value) if value is not None else ''
if value_str.startswith(('=', '+', '-', '@')):
row_data.append("'" + value_str)
else:
row_data.append(value_str)
data_to_insert.append(row_data)
gs_client.clear_and_write_data(sheet_title, data_to_insert)
app.logger.info(f"Successfully wrote {len(data_to_insert) - 1} rows to sheet '{sheet_title}' for user {user_id}.")
else:
raise Exception('Error authorizing on RMS server when trying to get a report.')
except Exception as e:
app.logger.error(f"Error in execute_olap_export for user {user_id}, sheet '{sheet_title}': {e}", exc_info=True)
finally:
if req_module and req_module.token:
req_module.logout()
@main_bp.route('/render_olap', methods=['POST'])
@login_required
def render_olap():
sheet_title = next((key for key in request.form if key.startswith('render_')), '').replace('render_', '')
start_date = request.form.get('start_date')
end_date = request.form.get('end_date')
if not sheet_title:
flash(_('Error: Could not determine which sheet to render the report for.'), 'error')
return redirect(url_for('.index'))
if not start_date or not end_date:
flash(_('Error: Start date and end date are required for manual rendering.'), 'error')
return redirect(url_for('.index'))
try:
# Просто вызываем нашу новую универсальную функцию
execute_olap_export(current_user.id, sheet_title, start_date, end_date)
flash(_('Report generation task for sheet "%(sheet)s" has been started. The data will appear shortly.', sheet=sheet_title), 'success')
except Exception as e:
flash(_('An unexpected error occurred: %(error)s', error=str(e)), 'error')
current_app.logger.error(f"Unexpected error in render_olap route: {e}", exc_info=True)
return redirect(url_for('.index'))
@main_bp.route('/save_schedule', methods=['POST'])
@login_required
def save_schedule():
config = g.user_config
try:
updated_mappings = config.mappings or {}
for sheet_title, params in updated_mappings.items():
cron_value = request.form.get(f"cron-{sheet_title}", "").strip()
period_value = request.form.get(f"period-{sheet_title}", "").strip()
# Обработка кастомного периода N дней
if period_value == 'last_N_days':
try:
custom_days = int(request.form.get(f"custom_days-{sheet_title}", 0))
if custom_days > 0:
period_value = f"last_{custom_days}_days"
else:
period_value = "" # Сбрасываем, если введено 0 или некорректное значение
except (ValueError, TypeError):
period_value = ""
params['schedule_cron'] = cron_value if cron_value else None
params['schedule_period'] = period_value if period_value else None
job_id = f"user_{current_user.id}_sheet_{sheet_title}"
# Удаляем старую задачу, если она была
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
current_app.logger.info(f"Removed existing job: {job_id}")
# Добавляем новую задачу, если есть cron-расписание
if cron_value and period_value:
try:
cron_params = _parse_cron_string(cron_value)
scheduler.add_job(
id=job_id,
func=execute_olap_export,
trigger='cron',
args=[current_user.id, sheet_title],
replace_existing=True,
**cron_params
)
current_app.logger.info(f"Added/updated job: {job_id} with schedule '{cron_value}'")
except ValueError as ve:
flash(_('Invalid cron format for sheet "%(sheet)s": %(error)s', sheet=sheet_title, error=ve), 'error')
except Exception as e:
flash(_('Error scheduling job for sheet "%(sheet)s": %(error)s', sheet=sheet_title, error=e), 'error')
config.mappings = updated_mappings
db.session.commit()
flash(_('Schedule settings saved successfully.'), 'success')
except Exception as e:
db.session.rollback()
flash(_('An error occurred while saving the schedule: %(error)s', error=str(e)), 'error')
current_app.logger.error(f"Error in save_schedule: {e}", exc_info=True)
return redirect(url_for('.index'))