Files
olaper/utils.py
SERTY 0a17b31c06
All checks were successful
Test Build / test-build (push) Successful in 25s
added scheduler v2
2025-07-31 03:50:08 +03:00

202 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
from jinja2 import Template
from datetime import datetime, date, timedelta
from dateutil.relativedelta import relativedelta
# Настройка логирования
logger = logging.getLogger(__name__)
def generate_template_from_preset(preset):
"""
Генерирует один шаблон запроса OLAP на основе пресета из RMS API.
Функция заменяет существующие фильтры по дате на универсальные плейсхолдеры
Jinja2 (`{{ from_date }}` и `{{ to_date }}`).
Args:
preset (dict): Словарь с пресетом OLAP-отчета из API RMS.
Returns:
dict: Словарь, представляющий шаблон запроса OLAP, готовый для рендеринга.
Raises:
ValueError: Если пресет некорректен (не словарь или отсутствуют ключи).
Exception: Другие непредвиденные ошибки при обработке.
"""
if not isinstance(preset, dict):
logger.error("Ошибка генерации шаблона: входной 'preset' не является словарем.")
raise ValueError("Пресет должен быть словарем.")
required_keys = ["reportType", "groupByRowFields", "aggregateFields", "filters"]
if not all(k in preset for k in required_keys):
logger.error(f"Ошибка генерации шаблона: пресет {preset.get('id', 'N/A')} не содержит необходимых ключей.")
raise ValueError("Пресет не содержит ключей: reportType, groupByRowFields, aggregateFields, filters.")
try:
# Создаем глубокую копию, чтобы не изменять оригинальный объект пресета
template = json.loads(json.dumps(preset))
# Удаляем ненужные для запроса поля, которые приходят из API
template.pop('id', None)
template.pop('name', None)
current_filters = template.get("filters", {})
filters_to_remove = []
# Находим и запоминаем все существующие фильтры типа DateRange для удаления
for key, value in current_filters.items():
if isinstance(value, dict) and value.get("filterType") == "DateRange":
filters_to_remove.append(key)
for key in filters_to_remove:
del current_filters[key]
logger.debug(f"Удален существующий DateRange фильтр '{key}' из пресета {preset.get('id', 'N/A')}.")
# Определяем правильный ключ для фильтра по дате на основе типа отчета
report_type = template["reportType"]
date_filter_key = None
if report_type in ["SALES", "DELIVERIES"]:
# Для отчетов по продажам и доставкам используется "OpenDate.Typed"
date_filter_key = "OpenDate.Typed"
elif report_type == "TRANSACTIONS":
# Для отчетов по проводкам используется "DateTime.DateTyped"
date_filter_key = "DateTime.DateTyped"
else:
logger.warning(
f"Для типа отчета '{report_type}' (пресет {preset.get('id', 'N/A')}) нет стандартного ключа даты. "
f"Фильтр по дате не будет добавлен автоматически."
)
if date_filter_key:
logger.debug(f"Для отчета {report_type} будет использован фильтр '{date_filter_key}'.")
current_filters[date_filter_key] = {
"filterType": "DateRange",
"from": "{{ from_date }}",
"to": "{{ to_date }}",
"includeLow": True,
"includeHigh": True
}
logger.info(f"Шаблон для пресета {preset.get('id', 'N/A')} ('{preset.get('name', '')}') успешно сгенерирован с фильтром даты.")
template["filters"] = current_filters
return template
except Exception as e:
logger.error(f"Непредвиденная ошибка при генерации шаблона из пресета {preset.get('id', 'N/A')}: {str(e)}", exc_info=True)
raise
def render_temp(template_dict, context):
"""
Рендерит шаблон (представленный словарем) с использованием Jinja2,
подставляя значения из контекста (например, даты).
Args:
template_dict (dict): Словарь-шаблон OLAP-запроса.
context (dict): Словарь с переменными для рендеринга (например, {'from_date': '...', 'to_date': '...'}).
Returns:
dict: Словарь с отрендеренным OLAP-запросом, готовый к отправке.
Raises:
Exception: Ошибки при рендеринге или парсинге JSON.
"""
try:
# Преобразуем словарь шаблона в строку JSON
template_str = json.dumps(template_dict)
# Рендерим строку с помощью Jinja, подставляя переменные из context
rendered_str = Template(template_str).render(context)
# Преобразуем отрендеренную строку обратно в словарь Python
rendered_dict = json.loads(rendered_str)
logger.info('Шаблон OLAP-запроса успешно отрендерен с датами.')
return rendered_dict
except Exception as e:
logger.error(f"Ошибка рендеринга шаблона: {str(e)}", exc_info=True)
raise
def get_dates(start_date, end_date):
"""
Проверяет и форматирует даты.
Args:
start_date (str): Дата начала в формате 'YYYY-MM-DD'.
end_date (str): Дата окончания в формате 'YYYY-MM-DD'.
Returns:
tuple: Кортеж (start_date, end_date), если даты корректны.
Raises:
ValueError: Если формат дат некорректен или дата начала позже даты окончания.
"""
date_format = "%Y-%m-%d"
try:
start = datetime.strptime(start_date, date_format)
end = datetime.strptime(end_date, date_format)
except (ValueError, TypeError):
logger.error(f"Некорректный формат или тип дат: start='{start_date}', end='{end_date}'. Ожидается YYYY-MM-DD.")
raise ValueError("Некорректный формат даты. Используйте YYYY-MM-DD.")
if start > end:
logger.error(f"Дата начала '{start_date}' не может быть позже даты окончания '{end_date}'.")
raise ValueError("Дата начала не может быть позже даты окончания.")
return start_date, end_date
def _parse_cron_string(cron_str):
"""Парсит строку cron в словарь для APScheduler."""
if not isinstance(cron_str, str):
raise TypeError("cron_str must be a string.")
parts = cron_str.split()
if len(parts) != 5:
raise ValueError("Invalid cron string format. Expected 5 parts.")
keys = ['minute', 'hour', 'day', 'month', 'day_of_week']
return {keys[i]: part for i, part in enumerate(parts)}
def calculate_period_dates(period_key):
"""
Вычисляет начальную и конечную даты на основе строкового ключа.
Возвращает кортеж (start_date_str, end_date_str) в формате YYYY-MM-DD.
"""
if not period_key:
raise ValueError("Period key cannot be empty.")
today = date.today()
yesterday = today - timedelta(days=1)
date_format = "%Y-%m-%d"
# За прошлую неделю (с пн по вс)
if period_key == 'previous_week':
start_of_last_week = today - timedelta(days=today.weekday() + 7)
end_of_last_week = start_of_last_week + timedelta(days=6)
return start_of_last_week.strftime(date_format), end_of_last_week.strftime(date_format)
# За последние 7 дней (не включая сегодня)
if period_key == 'last_7_days':
start_date = today - timedelta(days=7)
return start_date.strftime(date_format), yesterday.strftime(date_format)
# За прошлый месяц
if period_key == 'previous_month':
last_month = today - relativedelta(months=1)
start_date = last_month.replace(day=1)
end_date = start_date + relativedelta(months=1) - timedelta(days=1)
return start_date.strftime(date_format), end_date.strftime(date_format)
# За текущий месяц (до вчера)
if period_key == 'current_month':
start_date = today.replace(day=1)
return start_date.strftime(date_format), yesterday.strftime(date_format)
# Динамический ключ "За последние N дней"
if period_key.startswith('last_') and period_key.endswith('_days'):
try:
days = int(period_key.split('_')[1])
if days <= 0:
raise ValueError("Number of days must be positive.")
start_date = today - timedelta(days=days)
return start_date.strftime(date_format), yesterday.strftime(date_format)
except (ValueError, IndexError):
raise ValueError(f"Invalid dynamic period key: {period_key}")
raise ValueError(f"Unknown period key: {period_key}")