Files
olaper/routes.py
SERTY ddd0ffbcb0
All checks were successful
Test Build / test-build (push) Successful in 3s
vv
2025-07-26 05:56:11 +03:00

432 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# routes.py
import os
import json
import shutil
from flask import (
Blueprint, render_template, request, redirect, url_for, flash, g, session, current_app
)
from flask_login import login_user, login_required, logout_user, current_user
from flask_babel import _
from werkzeug.utils import secure_filename
import gspread
# --- ИМПОРТ РАСШИРЕНИЙ И МОДЕЛЕЙ ---
# Импортируем экземпляры расширений, созданные в app.py
from extensions import db, login_manager
# Импортируем наши классы и утилиты
from models import User, UserConfig
from google_sheets import GoogleSheets
from request_module import ReqModule
from utils import get_dates, generate_template_from_preset, render_temp
# --- Создание блюпринта ---
main_bp = Blueprint('main', __name__)
# --- Регистрация обработчиков для расширений ---
@login_manager.user_loader
def load_user(user_id):
"""Загружает пользователя из БД для управления сессией."""
return db.session.get(User, int(user_id))
@main_bp.before_app_request
def load_user_specific_data():
"""Загружает конфигурацию пользователя в глобальный объект `g` для текущего запроса."""
g.user_config = None
if current_user.is_authenticated:
g.user_config = get_user_config()
# --- Вспомогательные функции, специфичные для маршрутов ---
def get_user_config():
"""Получает конфиг для текущего пользователя, создавая его при необходимости."""
if not current_user.is_authenticated:
return None
config = UserConfig.query.filter_by(user_id=current_user.id).first()
if not config:
config = UserConfig(user_id=current_user.id)
db.session.add(config)
return config
def get_user_upload_path(filename=""):
"""Возвращает путь для загрузки файлов для текущего пользователя."""
if not current_user.is_authenticated:
return None
user_dir = os.path.join(current_app.config['DATA_DIR'], str(current_user.id))
os.makedirs(user_dir, exist_ok=True)
return os.path.join(user_dir, secure_filename(filename))
# --- Маршруты ---
@main_bp.route('/language/<language>')
def set_language(language=None):
session['language'] = language
return redirect(request.referrer or url_for('.index'))
@main_bp.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user is None or not user.check_password(password):
flash(_('Invalid username or password'), 'error')
return redirect(url_for('.login'))
login_user(user, remember=request.form.get('remember'))
flash(_('Login successful!'), 'success')
next_page = request.args.get('next')
return redirect(next_page or url_for('.index'))
return render_template('login.html')
@main_bp.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
if not username or not password:
flash(_('Username and password are required.'), 'error')
return redirect(url_for('.register'))
if User.query.filter_by(username=username).first():
flash(_('Username already exists.'), 'error')
return redirect(url_for('.register'))
user = User(username=username)
user.set_password(password)
user.config = UserConfig()
db.session.add(user)
try:
db.session.commit()
flash(_('Registration successful! Please log in.'), 'success')
return redirect(url_for('.login'))
except Exception as e:
db.session.rollback()
flash(_('An error occurred during registration. Please try again.'), 'error')
return redirect(url_for('.register'))
return render_template('register.html')
@main_bp.route('/logout')
@login_required
def logout():
logout_user()
flash(_('You have been logged out.'), 'success')
return redirect(url_for('.index'))
@main_bp.route('/')
@login_required
def index():
config = g.user_config
return render_template(
'index.html',
rms_config=config.get_rms_dict(),
google_config=config.get_google_dict(),
presets=config.presets,
sheets=config.sheets,
mappings=config.mappings,
client_email=config.google_client_email
)
@main_bp.route('/configure_rms', methods=['POST'])
@login_required
def configure_rms():
config = g.user_config
try:
host = request.form.get('host', '').strip()
login = request.form.get('login', '').strip()
password = request.form.get('password', '')
if not config.rms_password and not password:
flash(_('Password is required for the first time.'), 'error')
return redirect(url_for('.index'))
if not host or not login:
flash(_('Host and Login fields must be filled.'), 'error')
return redirect(url_for('.index'))
effective_password = password if password else config.rms_password
req_module = ReqModule(host, login, effective_password)
if req_module.login():
presets_data = req_module.take_presets()
req_module.logout()
config.rms_host = host
config.rms_login = login
if password:
config.rms_password = password
config.presets = presets_data
db.session.commit()
flash(_('Successfully authorized on RMS server. Received %(num)s presets.', num=len(presets_data)), 'success')
else:
flash(_('Authorization error on RMS server. Check host, login or password.'), 'error')
except Exception as e:
db.session.rollback()
flash(_('Error configuring RMS: %(error)s', error=str(e)), 'error')
return redirect(url_for('.index'))
@main_bp.route('/upload_credentials', methods=['POST'])
@login_required
def upload_credentials():
config = g.user_config
if 'cred_file' not in request.files or request.files['cred_file'].filename == '':
flash(_('No file was selected.'), 'error')
return redirect(url_for('.index'))
cred_file = request.files['cred_file']
filename = cred_file.filename
# Получаем путь для сохранения файла в папке пользователя
user_cred_path = get_user_upload_path(filename)
temp_path = None
try:
# Сначала сохраняем файл во временную директорию для проверки
temp_dir = os.path.join(current_app.config['DATA_DIR'], "temp")
os.makedirs(temp_dir, exist_ok=True)
temp_path = os.path.join(temp_dir, f"temp_{current_user.id}_{filename}")
cred_file.save(temp_path)
with open(temp_path, 'r', encoding='utf-8') as f:
cred_data = json.load(f)
client_email = cred_data.get('client_email')
if not client_email:
flash(_('Could not find client_email in the credentials file.'), 'error')
# Не забываем удалить временный файл при ошибке
if os.path.exists(temp_path):
os.remove(temp_path)
return redirect(url_for('.index'))
# Если все хорошо, перемещаем файл из временной папки в постоянную
shutil.move(temp_path, user_cred_path)
# Сохраняем путь к файлу и email в базу данных
config.google_cred_file_path = user_cred_path
config.google_client_email = client_email
config.sheets = [] # Сбрасываем список листов при смене credentials
db.session.commit()
flash(_('Credentials file successfully uploaded. Email: %(email)s', email=client_email), 'success')
except json.JSONDecodeError:
flash(_('Error: Uploaded file is not a valid JSON.'), 'error')
except Exception as e:
db.session.rollback()
flash(_('Error processing credentials: %(error)s', error=str(e)), 'error')
finally:
# Гарантированно удаляем временный файл, если он еще существует
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
return redirect(url_for('.index'))
@main_bp.route('/configure_google', methods=['POST'])
@login_required
def configure_google():
config = g.user_config
sheet_url = request.form.get('sheet_url', '').strip()
if not sheet_url:
flash(_('Sheet URL must be provided.'), 'error')
return redirect(url_for('.index'))
config.google_sheet_url = sheet_url
cred_path = config.google_cred_file_path
if not cred_path or not os.path.isfile(cred_path):
flash(_('Please upload a valid credentials file first.'), 'warning')
config.sheets = []
db.session.commit()
return redirect(url_for('.index'))
try:
gs_client = GoogleSheets(cred_path, sheet_url)
sheets_data = gs_client.get_sheets()
config.sheets = sheets_data
db.session.commit()
flash(_('Successfully connected to Google Sheets. Found %(num)s sheets. Settings saved.', num=len(sheets_data)), 'success')
except Exception as e:
db.session.rollback()
config.sheets = []
flash(_('Error connecting to Google Sheets: %(error)s. Check the URL and service account permissions.', error=str(e)), 'error')
try:
db.session.commit()
except Exception:
db.session.rollback()
return redirect(url_for('.index'))
@main_bp.route('/mapping_set', methods=['POST'])
@login_required
def mapping_set():
config = g.user_config
try:
new_mappings = {}
for sheet in config.sheets:
report_key = f"sheet_{sheet['id']}"
selected_report_id = request.form.get(report_key)
if selected_report_id:
new_mappings[sheet['title']] = selected_report_id
config.mappings = new_mappings
db.session.commit()
flash(_('Mappings updated successfully.'), 'success')
except Exception as e:
db.session.rollback()
flash(_('Error updating mappings: %(error)s', error=str(e)), 'error')
return redirect(url_for('.index'))
@main_bp.route('/render_olap', methods=['POST'])
@login_required
def render_olap():
config = g.user_config
sheet_title = None
req_module = None
try:
from_date, to_date = get_dates(request.form.get('start_date'), request.form.get('end_date'))
sheet_title = next((key for key in request.form if key.startswith('render_')), '').replace('render_', '')
if not sheet_title:
flash(_('Error: Could not determine which sheet to render the report for.'), 'error')
return redirect(url_for('.index'))
report_id = config.mappings.get(sheet_title)
if not report_id:
flash(_('Error: No report is assigned to sheet "%(sheet)s".', sheet=sheet_title), 'error')
return redirect(url_for('.index'))
if not all([config.rms_host, config.rms_login, config.rms_password, config.google_cred_file_path, config.google_sheet_url]):
flash(_('Error: RMS or Google Sheets configuration is incomplete.'), 'error')
return redirect(url_for('.index'))
preset = next((p for p in config.presets if p.get('id') == report_id), None)
if not preset:
flash(_('Error: Preset with ID "%(id)s" not found in saved configuration.', id=report_id), 'error')
return redirect(url_for('.index'))
template = generate_template_from_preset(preset)
json_body = render_temp(template, {"from_date": from_date, "to_date": to_date})
req_module = ReqModule(config.rms_host, config.rms_login, config.rms_password)
gs_client = GoogleSheets(config.google_cred_file_path, config.google_sheet_url)
if req_module.login():
result = req_module.take_olap(json_body)
# --- НАЧАЛО НОВОЙ УЛУЧШЕННОЙ ЛОГИКИ ОБРАБОТКИ ДАННЫХ ---
if 'data' not in result or not isinstance(result['data'], list):
flash(_('Error: Unexpected response format from RMS for report "%(name)s".', name=preset.get('name', report_id)), 'error')
current_app.logger.error(f"Unexpected API response for report {report_id} ('{preset.get('name')}'). Response: {result}")
return redirect(url_for('.index'))
report_data = result['data']
# Если отчет пуст, очищаем лист и уведомляем пользователя.
if not report_data:
gs_client.clear_and_write_data(sheet_title, [])
flash(_('Report "%(name)s" returned no data for the selected period. Sheet "%(sheet)s" has been cleared.', name=preset.get('name', report_id), sheet=sheet_title), 'warning')
return redirect(url_for('.index'))
# Здесь будет храниться наш итоговый "плоский" список словарей
processed_data = []
# Проверяем структуру отчета: сводный (pivoted) или простой (flat)
first_item = report_data[0]
is_pivoted = isinstance(first_item, dict) and 'row' in first_item and 'cells' in first_item
if is_pivoted:
current_app.logger.info(f"Processing a pivoted report: {preset.get('name', report_id)}")
# "Разворачиваем" (unpivot) данные в плоский список словарей
for row_item in report_data:
row_values = row_item.get('row', {})
cells = row_item.get('cells', [])
if not cells:
# Обрабатываем строки, у которых может не быть данных в ячейках
processed_data.append(row_values.copy())
else:
for cell in cells:
new_flat_row = row_values.copy()
new_flat_row.update(cell.get('col', {}))
new_flat_row.update(cell.get('values', {}))
processed_data.append(new_flat_row)
else:
current_app.logger.info(f"Processing a simple flat report: {preset.get('name', report_id)}")
# Данные уже в виде плоского списка, просто присваиваем
processed_data = [item for item in report_data if isinstance(item, dict)]
# --- Универсальное формирование заголовков и данных ---
# 1. Собираем все уникальные ключи из всех строк для гарантии целостности.
all_keys = set()
for row in processed_data:
all_keys.update(row.keys())
# 2. Создаем упорядоченный список заголовков для лучшей читаемости.
# Используем поля из пресета для определения логического порядка.
row_group_fields = preset.get('groupByRowFields', [])
col_group_fields = preset.get('groupByColFields', [])
agg_fields = preset.get('aggregateFields', [])
ordered_headers = []
# Сначала добавляем известные поля из пресета в логической последовательности.
for field in row_group_fields + col_group_fields + agg_fields:
if field in all_keys:
ordered_headers.append(field)
all_keys.remove(field)
# Добавляем любые другие (неожиданные) поля, отсортировав их по алфавиту.
ordered_headers.extend(sorted(list(all_keys)))
# 3. Собираем итоговый список списков для Google Sheets, приводя все значения к строкам.
data_to_insert = [ordered_headers]
for row in processed_data:
row_data = []
for header in ordered_headers:
value_str = str(row.get(header, ''))
if value_str.startswith(('=', '+', '-', '@')):
row_data.append("'" + value_str)
else:
row_data.append(value_str)
# Преобразуем None в пустую строку, а все остальное в строковое представление.
# Это предотвращает потенциальные ошибки типов со стороны Google Sheets API.
data_to_insert.append(row_data)
gs_client.clear_and_write_data(sheet_title, data_to_insert)
rows_count = len(data_to_insert) - 1
flash(_('Report "%(name)s" data (%(rows)s rows) successfully written to sheet "%(sheet)s".',
name=preset.get('name', report_id),
rows=rows_count,
sheet=sheet_title), 'success')
else:
flash(_('Error authorizing on RMS server when trying to get a report.'), 'error')
except ValueError as ve:
flash(_('Data Error: %(error)s', error=str(ve)), 'error')
except gspread.exceptions.APIError as api_err:
flash(_('Google API Error accessing sheet "%(sheet)s". Check service account permissions.', sheet=sheet_title or _('Unknown')), 'error')
current_app.logger.error(f"Google API Error for sheet '{sheet_title}': {api_err}", exc_info=True)
except Exception as e:
flash(_('An unexpected error occurred: %(error)s', error=str(e)), 'error')
current_app.logger.error(f"Unexpected error in render_olap: {e}", exc_info=True)
finally:
if req_module and req_module.token:
req_module.logout()
return redirect(url_for('.index'))