from __future__ import annotations import datetime, hashlib, os, pathlib, shutil, time from fastapi import File, UploadFile from typing import Dict, List, Optional, Set, Union from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update from app.lib_general import log, logging from app.models.event_file_models import Event_File_Base # ### BEGIN ### API Event File Methods ### create_event_file_obj() ### def create_event_file_obj(event_file_obj_new:Event_File_Base): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # event_file_obj_data = event_file_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'created_on', 'updated_on'}) event_file_obj_data = event_file_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'saved', 'already_exists', 'copy_timer', 'created_on', 'updated_on'}) if event_file_obj_in_result := sql_insert(data=event_file_obj_data, table_name='event_file', rm_id_random=True, id_random_length=8): pass else: return False log.debug(event_file_obj_in_result) event_file_id = event_file_obj_in_result log.debug(f'Returning the new event_file_id: {event_file_id}') return event_file_id # ### END ### API Event File Methods ### create_event_file_obj() ### # ### BEGIN ### API Event File Methods ### load_event_file_obj() ### def load_event_file_obj( event_file_id: int|str, limit: int = 1000, model_as_dict: bool = False, enabled: str = 'enabled', # enabled, disabled, all inc_hosted_file: bool = False, ) -> Event_File_Base|dict|bool: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if event_file_id := redis_lookup_id_random(record_id_random=event_file_id, table_name='event_file'): pass else: return False # NOTE: What table or view should be used here??? if event_file_rec := sql_select(table_name='v_event_file_simple', record_id=event_file_id): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(event_file_rec) else: return False try: event_file_obj = Event_File_Base(**event_file_rec) log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(event_file_obj) except ValidationError as e: log.error(e.json()) return False # if inc_hosted_file: # x_id = event_file_rec.get('x_id', None) # if x_obj_result := load_x_obj(x_id=x_id): # x_obj = x_obj_result # event_file_obj.x = x_obj # else: event_file_obj.x = None # model_as_dict = True if model_as_dict: return event_file_obj.dict(by_alias=True, exclude_unset=False) # pylint: disable=no-member else: return event_file_obj # ### END ### API Event File Methods ### load_event_file_obj() ### # ### BEGIN ### API Event File Methods ### load_event_file_obj_list() ### def load_event_file_obj_list( event_id: int|str|None = None, event_session_id: int|str|None = None, limit: int = 1000, model_as_dict: bool = False, enabled: str = 'enabled', # enabled, disabled, all inc_hosted_file: bool = False, ) -> list|bool: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) data: dict = {} if event_id: if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass else: return False data['for_type'] = 'event' data['for_id'] = event_id sql_obj_type_id = f'`tbl`.for_type = :for_type AND `tbl`.for_id = :for_id' elif event_session_id: if event_session_id := redis_lookup_id_random(record_id_random=event_session_id, table_name='event_session'): pass else: return False data['for_type'] = 'event_session' data['for_id'] = event_session_id sql_obj_type_id = f'`tbl`.for_type = :for_type AND `tbl`.for_id = :for_id' if enabled in ['enabled', 'disabled', 'all']: if enabled == 'enabled': data['enable'] = True sql_enabled = f'AND `tbl`.enable = :enable' elif enabled == 'disabled': data['enable'] = False sql_enabled = f'AND `tbl`.enable = :enable' elif enabled == 'all': sql_enabled = '' # else: tbl_obj['account'] = None if limit: data['limit'] = limit sql_limit = f'LIMIT :limit' else: sql_limit = '' sql = f""" SELECT `tbl`.id AS 'event_file_id', `tbl`.id_random AS 'event_file_id_random' FROM `event_file` AS `tbl` WHERE {sql_obj_type_id} {sql_enabled} ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC {sql_limit}; """ if event_file_rec_li_result := sql_select(data=data, sql=sql, as_list=True): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(event_file_rec_li_result) event_file_result_li = [] for event_file_rec in event_file_rec_li_result: event_file_id = event_file_rec.get('event_file_id', None) if event_file_result := load_event_file_obj( event_file_id = event_file_id, limit = limit, model_as_dict = True, # NOTE: This is overriding the model_as_dict param! enabled = enabled, inc_hosted_file = inc_hosted_file, ): log.debug(event_file_result) event_file_result_li.append(event_file_result) else: log.debug(event_file_result) event_file_result_li.append(None) # log.debug(event_file_result_li) else: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(event_file_rec_li_result) event_file_result_li = [] return event_file_result_li # ### END ### API Event Methods ### load_event_file_obj_list() ###