Files
OSIT-AE-API-FastAPI/app/methods/event_file_methods.py
Scott Idem 17a627a981 feat: Implement Event File Hosted Data Fix and API Guide Update
Address critical data visibility issues for Event Files and enhance frontend documentation.

This commit resolves the persistent problem where top-level hosted file convenience fields
(e.g., , , ) were
returning as  in V3 Event File API responses, even when .

Key changes include:
- Refactored  Pydantic model:
    - Removed redundant  definitions from top-level hosted file convenience fields,
      allowing direct mapping from SQL view columns.
    - Simplified  to focus solely on conditionally loading the nested
       object, as top-level fields are now populated directly by Pydantic
      from the  view.
    - Added comprehensive comments to clarify data flow, Pydantic's behavior, and the
      expected origin of these convenience fields from SQL views.
- Updated :
    - Introduced a new section detailing how to retrieve Event File data, including the
      use of  to get both top-level convenience fields and a nested
       object.
    - Clarified all ID references as random string IDs.
    - Renumbered the troubleshooting section.
- Copied updated guide to .
- Continued ID Vision compliance audit, ensuring consistent handling of random string IDs
  across various core and event models (Account, Address, Contact, DataStore, Event Badge Template).
- Consolidated ID Vision E2E tests and updated related documentation.
- Minor updates to  and
  to support Event File data retrieval with .
2026-02-19 15:22:17 -05:00

315 lines
12 KiB
Python

import datetime, hashlib, os, pathlib, shutil, time
from fastapi import File, UploadFile
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import get_id_random, redis_lookup_id_random, sql_delete, sql_enable_part, sql_insert, sql_insert_or_update, sql_limit_offset_part, sql_select, sql_update
# from app.lib_general import log, logging, logger_reset
from app.log import log, logging, logger_reset
from app.methods.hosted_file_methods import load_hosted_file_obj
from app.models.event_file_models import Event_File_Base
# ### BEGIN ### API Event File Methods ### create_event_file_obj() ###
@logger_reset
def create_event_file_obj(event_file_obj_new: Event_File_Base):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
try:
# event_file_obj_data = event_file_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'created_on', 'updated_on'})
event_file_obj_data = event_file_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'saved', 'already_exists', 'copy_timer', 'created_on', 'updated_on'})
except Exception as e:
log.exception(e)
log.debug(event_file_obj_data)
if event_file_obj_in_result := sql_insert_or_update(data=event_file_obj_data, table_name='event_file', rm_id_random=True): pass
else:
return False
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_file_obj_in_result)
event_file_id = event_file_obj_in_result
log.debug(f'Returning the new event_file_id: {event_file_id}')
return event_file_id
# ### END ### API Event File Methods ### create_event_file_obj() ###
# ### BEGIN ### API Event File Methods ### load_event_file_obj() ###
@logger_reset
def load_event_file_obj(
event_file_id: int|str,
model_as_dict: bool = False, # This was defaulted to True 2022-03-07
by_alias: bool = True,
exclude_unset: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_hosted_file: bool = False,
) -> Event_File_Base|dict|bool:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_file_id := redis_lookup_id_random(record_id_random=event_file_id, table_name='event_file'): pass
else: return False
# NOTE: What table or view should be used here???
if event_file_rec := sql_select(table_name='v_event_file_simple', record_id=event_file_id):
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_file_rec)
else:
return False
if event_file_rec.get('for_type') and event_file_rec.get('for_id') and not event_file_rec.get('for_id_random'):
event_file_rec['for_id_random'] = get_id_random(event_file_rec.get('for_id'), table_name=event_file_rec.get('for_type'))
hosted_file_id = event_file_rec.get('hosted_file_id', None)
try:
event_file_obj = Event_File_Base(**event_file_rec)
except ValidationError as e:
log.error(e.json())
return False
log.debug(event_file_obj)
if inc_hosted_file and hosted_file_id:
log.info('Need to include hosted file...')
if hosted_file_obj := load_hosted_file_obj(
hosted_file_id = hosted_file_id,
enabled = enabled,
):
event_file_obj.hosted_file = hosted_file_obj
# Explicitly populate convenience fields from hosted_file_obj
if hosted_file_obj.hash_sha256:
event_file_obj.hosted_file_hash_sha256 = hosted_file_obj.hash_sha256
if hosted_file_obj.subdirectory_path:
event_file_obj.hosted_file_subdirectory_path = hosted_file_obj.subdirectory_path
if hosted_file_obj.content_type:
event_file_obj.hosted_file_content_type = hosted_file_obj.content_type
if hosted_file_obj.size:
event_file_obj.hosted_file_size = str(hosted_file_obj.size) # Ensure it's a string as per model definition
else:
event_file_obj.hosted_file = {}
else:
event_file_obj.hosted_file = None
log.debug(event_file_obj)
if model_as_dict:
return event_file_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return event_file_obj
# ### END ### API Event File Methods ### load_event_file_obj() ###
# ### BEGIN ### API Event File Methods ### get_event_file_rec_list() ###
# Updated 2021-09-10
@logger_reset
def get_event_file_rec_list(
for_type: str, # NOTE: This is not for_obj_type because the field name is actually for_type
for_id: int|str, # NOTE: This is not for_obj_id because the field name is actually for_id
file_purpose_id: int = None, # NOTE: Not prefixed with lu_
file_purpose: str = None,
internal_use: bool = None, # Default to False instead of None
priority: bool = None,
group: str = None,
# event_id: str = None,
# event_person_id: str = None,
# event_presentation_id: str = None,
# event_presenter_id: str = None,
# event_session_id: str = None,
enabled: str = 'enabled', # enabled, disabled, all
limit: int = 100,
offset: int = 0,
) -> list|bool:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_id := redis_lookup_id_random(record_id_random=for_id, table_name=for_type): pass
else: return False
data = {}
data['for_type'] = for_type
data['for_id'] = for_id
data['file_purpose_id'] = file_purpose_id
data['file_purpose'] = file_purpose
data['internal_use'] = internal_use
data['priority'] = priority
# data['sort'] = sort
data['group'] = group # Same or similar as file purpose?
sql_for_type_id = f'`event_file`.for_type = :for_type AND `event_file`.for_id = :for_id'
if file_purpose_id:
sql_file_purpose_id = f'AND `event_file`.lu_file_purpose_id = :file_purpose_id'
else:
sql_file_purpose_id = ''
if file_purpose:
sql_file_purpose = f'AND `event_file`.file_purpose = :file_purpose'
else:
sql_file_purpose = ''
if internal_use:
sql_internal_use = f'AND `event_file`.internal_use = 1'
elif internal_use is False:
sql_internal_use = f'AND (`event_file`.internal_use IS NULL OR `event_file`.internal_use = 0)'
else:
sql_internal_use = ''
if priority:
sql_priority = f'AND `event_file`.priority = :priority'
else:
sql_priority = ''
if group:
sql_group = f'AND `event_file`.group = :group'
else:
sql_group = ''
sql_enabled, data['enable'] = sql_enable_part(table_name='event_file', enabled=enabled) # Reasonably safe return str and bool
sql_limit = sql_limit_offset_part(limit=limit, offset=offset) # Reasonably safe return str
sql = f"""
SELECT `event_file`.id AS 'event_file_id', `event_file`.id_random AS 'event_file_id_random'
FROM `event_file` AS `event_file`
WHERE
{sql_for_type_id}
{sql_file_purpose_id}
{sql_file_purpose}
{sql_internal_use}
{sql_priority}
{sql_group}
{sql_enabled}
ORDER BY `event_file`.priority DESC, -`event_file`.sort DESC, `event_file`.created_on DESC, `event_file`.updated_on DESC, `event_file`.filename ASC, `event_file`.extension ASC
{sql_limit};
"""
log.debug(sql)
if event_file_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
event_file_rec_li = event_file_rec_li_result
else:
event_file_rec_li = []
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_file_rec_li_result)
log.debug(type(event_file_rec_li))
log.debug(len(event_file_rec_li))
return event_file_rec_li
# ### END ### API Event File Methods ### get_event_file_rec_list() ###
# ### BEGIN ### API Event File Methods ### load_event_file_obj_list() ###
@logger_reset
def load_event_file_obj_list(
event_id: int|str|None = None,
event_session_id: int|str|None = None,
inc_hosted_file: bool = False,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
limit: int = 1000,
offset: int = 0,
) -> list|bool:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
data: dict = {}
if event_id:
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else: return False
data['for_type'] = 'event'
data['for_id'] = event_id
sql_obj_type_id = f'`tbl`.for_type = :for_type AND `tbl`.for_id = :for_id'
elif event_session_id:
if event_session_id := redis_lookup_id_random(record_id_random=event_session_id, table_name='event_session'): pass
else: return False
data['for_type'] = 'event_session'
data['for_id'] = event_session_id
sql_obj_type_id = f'`tbl`.for_type = :for_type AND `tbl`.for_id = :for_id'
sql_enabled, data['enable'] = sql_enable_part(table_name='event_file', enabled=enabled) # Reasonably safe return str and bool
sql_limit = sql_limit_offset_part(limit=limit, offset=offset) # Reasonably safe return str
sql = f"""
SELECT `tbl`.id AS 'event_file_id', `tbl`.id_random AS 'event_file_id_random'
FROM `event_file` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if event_file_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_file_rec_li_result)
event_file_result_li = []
for event_file_rec in event_file_rec_li_result:
event_file_id = event_file_rec.get('event_file_id', None)
if event_file_result := load_event_file_obj(
event_file_id = event_file_id,
model_as_dict = True, # NOTE: This is overriding the model_as_dict param!
enabled = enabled,
inc_hosted_file = inc_hosted_file,
):
log.debug(event_file_result)
event_file_result_li.append(event_file_result)
else:
log.debug(event_file_result)
event_file_result_li.append(None)
# log.debug(event_file_result_li)
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_file_rec_li_result)
event_file_result_li = []
return event_file_result_li
# ### END ### API Event Methods ### load_event_file_obj_list() ###
# ### BEGIN ### API Event File Methods ### handle_delete_event_file() ###
# Updated 2022-08-18
@logger_reset
def handle_delete_event_file(
event_file_id: int|str,
link_to_type: str = None,
link_to_id: int|str = None,
rm_all_links: bool = False,
rm_orphan: bool = False,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
# else: return False
if event_file_id := redis_lookup_id_random(record_id_random=event_file_id, table_name='event_file'): pass
else: return False
# log.debug(event_file_id)
# try:
# print('Trying to SQL DELETE!!!!')
# event_file_delete_result = sql_delete(table_name='event_file', record_id=event_file_id)
# print('SQL DELETE ran')
# log.debug(event_file_delete_result)
# except e:
# log.exception(e)
if event_file_delete_result := sql_delete(table_name='event_file', record_id=event_file_id, log_lvl=logging.DEBUG):
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info(f'Deleted Event File ID: {event_file_id}')
return True
else:
log.error(f'Something went wrong will trying to delete Event File ID: {event_file_id}')
return False