Scheduler v1
All checks were successful
Test Build / test-build (push) Successful in 23s

This commit is contained in:
2025-07-30 18:28:55 +03:00
parent 8e757afe39
commit 0f1c749b33
7 changed files with 457 additions and 159 deletions

356
routes.py
View File

@@ -12,18 +12,17 @@ import gspread
# --- ИМПОРТ РАСШИРЕНИЙ И МОДЕЛЕЙ ---
# Импортируем экземпляры расширений, созданные в app.py
from extensions import db, login_manager
from extensions import db, login_manager, scheduler
# Импортируем наши классы и утилиты
from models import User, UserConfig
from google_sheets import GoogleSheets
from request_module import ReqModule
from utils import get_dates, generate_template_from_preset, render_temp
from utils import calculate_period_dates, get_dates, generate_template_from_preset, render_temp
# --- Создание блюпринта ---
main_bp = Blueprint('main', __name__)
# --- Регистрация обработчиков для расширений ---
@login_manager.user_loader
@@ -60,6 +59,13 @@ def get_user_upload_path(filename=""):
os.makedirs(user_dir, exist_ok=True)
return os.path.join(user_dir, secure_filename(filename))
def _parse_cron_string(cron_str):
"""Парсит строку cron в словарь для APScheduler. Локальная копия для удобства."""
parts = cron_str.split()
if len(parts) != 5:
raise ValueError("Invalid cron string format. Expected 5 parts.")
keys = ['minute', 'hour', 'day', 'month', 'day_of_week']
return {keys[i]: part for i, part in enumerate(parts)}
# --- Маршруты ---
@@ -275,12 +281,22 @@ def mapping_set():
config = g.user_config
try:
new_mappings = {}
# Сохраняем существующие настройки расписания при обновлении отчетов
current_mappings = config.mappings or {}
for sheet in config.sheets:
report_key = f"sheet_{sheet['id']}"
selected_report_id = request.form.get(report_key)
if selected_report_id:
new_mappings[sheet['title']] = selected_report_id
# Получаем существующие данные расписания для этого листа
existing_schedule = current_mappings.get(sheet['title'], {})
new_mappings[sheet['title']] = {
'report_id': selected_report_id,
'schedule_cron': existing_schedule.get('schedule_cron'),
'schedule_period': existing_schedule.get('schedule_period')
}
config.mappings = new_mappings
db.session.commit()
@@ -291,142 +307,210 @@ def mapping_set():
return redirect(url_for('.index'))
def execute_olap_export(user_id, sheet_title, start_date_str=None, end_date_str=None):
"""
Основная логика выгрузки OLAP-отчета. Может вызываться как из эндпоинта, так и из планировщика.
Если start_date_str и end_date_str не переданы, вычисляет их на основе расписания.
"""
app = current_app._get_current_object()
with app.app_context():
user = db.session.get(User, user_id)
if not user:
app.logger.error(f"Task failed: User with ID {user_id} not found.")
return
config = user.config
req_module = None
try:
mappings = config.mappings
mapping_info = mappings.get(sheet_title)
if not mapping_info or not mapping_info.get('report_id'):
raise ValueError(f"No report is assigned to sheet '{sheet_title}'.")
report_id = mapping_info['report_id']
# Если даты не переданы (вызов из планировщика), вычисляем их
if not start_date_str or not end_date_str:
period_key = mapping_info.get('schedule_period')
if not period_key:
raise ValueError(f"Scheduled task for sheet '{sheet_title}' is missing a period setting.")
from_date, to_date = calculate_period_dates(period_key)
app.logger.info(f"Executing scheduled job for user {user_id}, sheet '{sheet_title}', period '{period_key}' ({from_date} to {to_date})")
else:
from_date, to_date = get_dates(start_date_str, end_date_str)
app.logger.info(f"Executing manual job for user {user_id}, sheet '{sheet_title}' ({from_date} to {to_date})")
# Проверка полноты конфигурации
if not all([config.rms_host, config.rms_login, config.rms_password, config.google_cred_file_path, config.google_sheet_url]):
raise ValueError('RMS or Google Sheets configuration is incomplete.')
preset = next((p for p in config.presets if p.get('id') == report_id), None)
if not preset:
raise ValueError(f'Preset with ID "{report_id}" not found in saved configuration.')
template = generate_template_from_preset(preset)
json_body = render_temp(template, {"from_date": from_date, "to_date": to_date})
req_module = ReqModule(config.rms_host, config.rms_login, config.rms_password)
gs_client = GoogleSheets(config.google_cred_file_path, config.google_sheet_url)
if req_module.login():
result = req_module.take_olap(json_body)
# Код обработки данных (остается без изменений)
if 'data' not in result or not isinstance(result['data'], list):
raise ValueError(f'Unexpected response format from RMS for report "{preset.get("name", report_id)}".')
report_data = result['data']
if not report_data:
gs_client.clear_and_write_data(sheet_title, [])
app.logger.warning(f"Report '{preset.get('name', report_id)}' for user {user_id} returned no data. Sheet '{sheet_title}' cleared.")
return
processed_data = []
first_item = report_data[0]
is_pivoted = isinstance(first_item, dict) and 'row' in first_item and 'cells' in first_item
if is_pivoted:
for row_item in report_data:
row_values = row_item.get('row', {})
cells = row_item.get('cells', [])
if not cells:
processed_data.append(row_values.copy())
else:
for cell in cells:
new_flat_row = row_values.copy()
new_flat_row.update(cell.get('col', {}))
new_flat_row.update(cell.get('values', {}))
processed_data.append(new_flat_row)
else:
processed_data = [item for item in report_data if isinstance(item, dict)]
all_keys = set()
for row in processed_data:
all_keys.update(row.keys())
row_group_fields = preset.get('groupByRowFields', [])
col_group_fields = preset.get('groupByColFields', [])
agg_fields = preset.get('aggregateFields', [])
ordered_headers = []
for field in row_group_fields + col_group_fields + agg_fields:
if field in all_keys:
ordered_headers.append(field)
all_keys.remove(field)
ordered_headers.extend(sorted(list(all_keys)))
data_to_insert = [ordered_headers]
for row in processed_data:
row_data = []
for header in ordered_headers:
value = row.get(header, '')
value_str = str(value) if value is not None else ''
if value_str.startswith(('=', '+', '-', '@')):
row_data.append("'" + value_str)
else:
row_data.append(value_str)
data_to_insert.append(row_data)
gs_client.clear_and_write_data(sheet_title, data_to_insert)
app.logger.info(f"Successfully wrote {len(data_to_insert) - 1} rows to sheet '{sheet_title}' for user {user_id}.")
else:
raise Exception('Error authorizing on RMS server when trying to get a report.')
except Exception as e:
app.logger.error(f"Error in execute_olap_export for user {user_id}, sheet '{sheet_title}': {e}", exc_info=True)
finally:
if req_module and req_module.token:
req_module.logout()
@main_bp.route('/render_olap', methods=['POST'])
@login_required
def render_olap():
config = g.user_config
sheet_title = None
req_module = None
sheet_title = next((key for key in request.form if key.startswith('render_')), '').replace('render_', '')
start_date = request.form.get('start_date')
end_date = request.form.get('end_date')
if not sheet_title:
flash(_('Error: Could not determine which sheet to render the report for.'), 'error')
return redirect(url_for('.index'))
if not start_date or not end_date:
flash(_('Error: Start date and end date are required for manual rendering.'), 'error')
return redirect(url_for('.index'))
try:
from_date, to_date = get_dates(request.form.get('start_date'), request.form.get('end_date'))
sheet_title = next((key for key in request.form if key.startswith('render_')), '').replace('render_', '')
if not sheet_title:
flash(_('Error: Could not determine which sheet to render the report for.'), 'error')
return redirect(url_for('.index'))
report_id = config.mappings.get(sheet_title)
if not report_id:
flash(_('Error: No report is assigned to sheet "%(sheet)s".', sheet=sheet_title), 'error')
return redirect(url_for('.index'))
if not all([config.rms_host, config.rms_login, config.rms_password, config.google_cred_file_path, config.google_sheet_url]):
flash(_('Error: RMS or Google Sheets configuration is incomplete.'), 'error')
return redirect(url_for('.index'))
preset = next((p for p in config.presets if p.get('id') == report_id), None)
if not preset:
flash(_('Error: Preset with ID "%(id)s" not found in saved configuration.', id=report_id), 'error')
return redirect(url_for('.index'))
template = generate_template_from_preset(preset)
json_body = render_temp(template, {"from_date": from_date, "to_date": to_date})
req_module = ReqModule(config.rms_host, config.rms_login, config.rms_password)
gs_client = GoogleSheets(config.google_cred_file_path, config.google_sheet_url)
if req_module.login():
result = req_module.take_olap(json_body)
# --- НАЧАЛО НОВОЙ УЛУЧШЕННОЙ ЛОГИКИ ОБРАБОТКИ ДАННЫХ ---
if 'data' not in result or not isinstance(result['data'], list):
flash(_('Error: Unexpected response format from RMS for report "%(name)s".', name=preset.get('name', report_id)), 'error')
current_app.logger.error(f"Unexpected API response for report {report_id} ('{preset.get('name')}'). Response: {result}")
return redirect(url_for('.index'))
report_data = result['data']
# Если отчет пуст, очищаем лист и уведомляем пользователя.
if not report_data:
gs_client.clear_and_write_data(sheet_title, [])
flash(_('Report "%(name)s" returned no data for the selected period. Sheet "%(sheet)s" has been cleared.', name=preset.get('name', report_id), sheet=sheet_title), 'warning')
return redirect(url_for('.index'))
# Здесь будет храниться наш итоговый "плоский" список словарей
processed_data = []
# Проверяем структуру отчета: сводный (pivoted) или простой (flat)
first_item = report_data[0]
is_pivoted = isinstance(first_item, dict) and 'row' in first_item and 'cells' in first_item
if is_pivoted:
current_app.logger.info(f"Processing a pivoted report: {preset.get('name', report_id)}")
# "Разворачиваем" (unpivot) данные в плоский список словарей
for row_item in report_data:
row_values = row_item.get('row', {})
cells = row_item.get('cells', [])
if not cells:
# Обрабатываем строки, у которых может не быть данных в ячейках
processed_data.append(row_values.copy())
else:
for cell in cells:
new_flat_row = row_values.copy()
new_flat_row.update(cell.get('col', {}))
new_flat_row.update(cell.get('values', {}))
processed_data.append(new_flat_row)
else:
current_app.logger.info(f"Processing a simple flat report: {preset.get('name', report_id)}")
# Данные уже в виде плоского списка, просто присваиваем
processed_data = [item for item in report_data if isinstance(item, dict)]
# --- Универсальное формирование заголовков и данных ---
# 1. Собираем все уникальные ключи из всех строк для гарантии целостности.
all_keys = set()
for row in processed_data:
all_keys.update(row.keys())
# 2. Создаем упорядоченный список заголовков для лучшей читаемости.
# Используем поля из пресета для определения логического порядка.
row_group_fields = preset.get('groupByRowFields', [])
col_group_fields = preset.get('groupByColFields', [])
agg_fields = preset.get('aggregateFields', [])
ordered_headers = []
# Сначала добавляем известные поля из пресета в логической последовательности.
for field in row_group_fields + col_group_fields + agg_fields:
if field in all_keys:
ordered_headers.append(field)
all_keys.remove(field)
# Добавляем любые другие (неожиданные) поля, отсортировав их по алфавиту.
ordered_headers.extend(sorted(list(all_keys)))
# 3. Собираем итоговый список списков для Google Sheets, приводя все значения к строкам.
data_to_insert = [ordered_headers]
for row in processed_data:
row_data = []
for header in ordered_headers:
value_str = str(row.get(header, ''))
if value_str.startswith(('=', '+', '-', '@')):
row_data.append("'" + value_str)
else:
row_data.append(value_str)
# Преобразуем None в пустую строку, а все остальное в строковое представление.
# Это предотвращает потенциальные ошибки типов со стороны Google Sheets API.
data_to_insert.append(row_data)
gs_client.clear_and_write_data(sheet_title, data_to_insert)
rows_count = len(data_to_insert) - 1
flash(_('Report "%(name)s" data (%(rows)s rows) successfully written to sheet "%(sheet)s".',
name=preset.get('name', report_id),
rows=rows_count,
sheet=sheet_title), 'success')
else:
flash(_('Error authorizing on RMS server when trying to get a report.'), 'error')
except ValueError as ve:
flash(_('Data Error: %(error)s', error=str(ve)), 'error')
except gspread.exceptions.APIError as api_err:
flash(_('Google API Error accessing sheet "%(sheet)s". Check service account permissions.', sheet=sheet_title or _('Unknown')), 'error')
current_app.logger.error(f"Google API Error for sheet '{sheet_title}': {api_err}", exc_info=True)
# Просто вызываем нашу новую универсальную функцию
execute_olap_export(current_user.id, sheet_title, start_date, end_date)
flash(_('Report generation task for sheet "%(sheet)s" has been started. The data will appear shortly.', sheet=sheet_title), 'success')
except Exception as e:
flash(_('An unexpected error occurred: %(error)s', error=str(e)), 'error')
current_app.logger.error(f"Unexpected error in render_olap: {e}", exc_info=True)
finally:
if req_module and req_module.token:
req_module.logout()
current_app.logger.error(f"Unexpected error in render_olap route: {e}", exc_info=True)
return redirect(url_for('.index'))
@main_bp.route('/save_schedule', methods=['POST'])
@login_required
def save_schedule():
config = g.user_config
try:
updated_mappings = config.mappings or {}
for sheet_title, params in updated_mappings.items():
cron_value = request.form.get(f"cron-{sheet_title}", "").strip()
period_value = request.form.get(f"period-{sheet_title}", "").strip()
# Обработка кастомного периода N дней
if period_value == 'last_N_days':
try:
custom_days = int(request.form.get(f"custom_days-{sheet_title}", 0))
if custom_days > 0:
period_value = f"last_{custom_days}_days"
else:
period_value = "" # Сбрасываем, если введено 0 или некорректное значение
except (ValueError, TypeError):
period_value = ""
params['schedule_cron'] = cron_value if cron_value else None
params['schedule_period'] = period_value if period_value else None
job_id = f"user_{current_user.id}_sheet_{sheet_title}"
# Удаляем старую задачу, если она была
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
current_app.logger.info(f"Removed existing job: {job_id}")
# Добавляем новую задачу, если есть cron-расписание
if cron_value and period_value:
try:
cron_params = _parse_cron_string(cron_value)
scheduler.add_job(
id=job_id,
func=execute_olap_export,
trigger='cron',
args=[current_user.id, sheet_title],
replace_existing=True,
**cron_params
)
current_app.logger.info(f"Added/updated job: {job_id} with schedule '{cron_value}'")
except ValueError as ve:
flash(_('Invalid cron format for sheet "%(sheet)s": %(error)s', sheet=sheet_title, error=ve), 'error')
except Exception as e:
flash(_('Error scheduling job for sheet "%(sheet)s": %(error)s', sheet=sheet_title, error=e), 'error')
config.mappings = updated_mappings
db.session.commit()
flash(_('Schedule settings saved successfully.'), 'success')
except Exception as e:
db.session.rollback()
flash(_('An error occurred while saving the schedule: %(error)s', error=str(e)), 'error')
current_app.logger.error(f"Error in save_schedule: {e}", exc_info=True)
return redirect(url_for('.index'))