Files
OSIT-AE-API-FastAPI/app/methods/event_location_methods.py

407 lines
18 KiB
Python

from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging, logger_reset
# from app.methods.event_methods import load_event_obj
# from app.methods.event_device_methods import load_event_device_obj
# from app.methods.event_file_methods import load_event_file_obj
# from app.methods.event_presentation_methods import load_event_presentation_obj
# from app.methods.event_presenter_methods import load_event_presenter_obj
# from app.methods.event_track_methods import load_event_track_obj
from app.models.event_location_models import Event_Location_Base
# ### BEGIN ### API Event Location Methods ### load_event_location_obj() ###
@logger_reset
def load_event_location_obj(
event_location_id: int|str,
inc_event_device_list: bool = False,
inc_event_file_list: bool = False,
inc_event_presentation_list: bool = False,
inc_event_presenter_list: bool = False,
inc_event_session_list: bool = False,
inc_file_count: bool = False, # NOTE: file counts are from separate views
event_file_file_purpose_id: int = None,
event_file_file_purpose: str = None,
event_file_priority: bool = None,
event_file_group: str = None,
enabled: str = 'enabled', # enabled, disabled, all
hidden: str = 'hidden', # hidden, not_hidden, all
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
) -> Event_Location_Base|bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_location_id := redis_lookup_id_random(record_id_random=event_location_id, table_name='event_location'): pass
else: return False
if inc_file_count:
log.info('Using view with file count')
if event_location_rec := sql_select(table_name='v_event_location_w_file_count', record_id=event_location_id): pass
else:
return False
else:
if event_location_rec := sql_select(table_name='v_event_location', record_id=event_location_id): pass
else:
return False
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_location_rec)
try:
log.info('Try to apply event location record data to Event_Location_Base...')
event_location_obj = Event_Location_Base(**event_location_rec)
log.debug(event_location_obj)
except ValidationError as e:
log.error(e.json())
return False
#account_id = event_location_rec.get('account_id', None)
event_id = event_location_rec.get('event_id', None)
# if inc_event_device_list: pass
# if inc_event_file_list: pass
# Updated 2021-10-21
if inc_event_file_list:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Need to include event file list...')
from app.methods.event_file_methods import get_event_file_rec_list, load_event_file_obj
if event_file_rec_list_result := get_event_file_rec_list(
for_type = 'event_location',
for_id = event_location_id,
file_purpose_id = event_file_file_purpose_id,
file_purpose = event_file_file_purpose,
priority = event_file_priority,
group = event_file_group,
enabled = enabled,
limit = limit,
):
event_file_result_list = []
for event_file_rec in event_file_rec_list_result:
if load_event_file_result := load_event_file_obj(
event_file_id = event_file_rec.get('event_file_id', None),
enabled = enabled,
# inc_hosted_file = inc_hosted_file,
# model_as_dict = True,
# by_alias = by_alias,
# exclude_unset = False,
):
event_file_result_list.append(load_event_file_result)
else:
event_file_result_list.append(None)
log.debug(event_file_result_list)
event_location_obj.event_file_list = event_file_result_list
elif isinstance(event_file_rec_list_result, list):
event_location_obj.event_file_list = []
else:
event_location_obj.event_file_list = None
if inc_event_presentation_list:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
data = {}
data['event_location_id'] = event_location_id
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `event_presentation`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `event_presentation`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
# else: event_location_obj['event'] = None
sql = f"""
SELECT `event_presentation`.id AS 'event_presentation_id', `event_presentation`.id_random AS 'event_presentation_id_random'
FROM `v_event_presentation` AS `event_presentation`
WHERE `event_presentation`.event_location_id = :event_location_id
{sql_enabled}
ORDER BY `event_presentation`.created_on DESC, `event_presentation`.updated_on DESC;
"""
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if event_presentation_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_presentation_rec_li_result)
event_presentation_obj_li = []
for event_presentation_rec in event_presentation_rec_li_result:
event_presentation_id = event_presentation_rec.get('event_presentation_id', None)
if event_presentation_obj := load_event_presentation_obj(
event_presentation_id=event_presentation_id,
enabled=enabled,
inc_event_device_list=inc_event_device_list,
inc_event_file_list=inc_event_file_list,
inc_event_presenter_list=inc_event_presenter_list,
):
data = event_presentation_obj.dict(by_alias=True, exclude_unset=True)
event_presentation_obj_li.append(data)
log.debug(event_presentation_obj_li)
event_location_obj.event_presentation_list = event_presentation_obj_li
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_presentation_rec_li_result)
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if inc_event_presenter_list: pass
# Updated 2021-10-09
if inc_event_session_list:
log.info('Need to include event session list...')
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
from app.methods.event_session_methods import get_event_session_rec_list, load_event_session_obj
if event_session_rec_list_result := get_event_session_rec_list(
event_location_id = event_location_id,
enabled = enabled, # enabled, disabled, all
approved = 'all', # approve(d), not_approved, all
hidden = hidden, # hidden, not_hidden, all
review = 'all', # ready, not_ready, all
limit = limit,
):
event_session_result_list = []
for event_session_rec in event_session_rec_list_result:
if load_event_session_result := load_event_session_obj(
event_session_id = event_session_rec.get('event_session_id', None),
enabled = enabled,
limit = limit,
inc_event_file_list = inc_event_file_list,
# inc_event_presentation_list = inc_event_presentation_list,
# inc_event_presenter_cat = inc_event_presenter_cat,
# inc_event_presenter_list = inc_event_presenter_list,
# inc_person = inc_person,
# inc_poc_event_person = inc_poc_event_person,
by_alias = by_alias,
exclude_unset = exclude_unset,
# model_as_dict = model_as_dict,
):
event_session_result_list.append(load_event_session_result)
else:
event_session_result_list.append(None)
log.debug(event_session_result_list)
event_location_obj.event_session_list = event_session_result_list
elif isinstance(event_session_rec_list_result, list):
event_location_obj.event_session_list = []
else:
event_location_obj.event_session_list = None
if model_as_dict:
return event_location_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return event_location_obj
return event_location_obj
# ### BEGIN ### API Event Location Methods ### get_event_location_rec_list() ###
# Updated 2021-12-13
@logger_reset
def get_event_location_rec_list(
event_id: str,
enabled: str = 'enabled', # enabled, disabled, all
hidden: str = 'not_hidden', # hidden, not_hidden, all
limit: int = 100,
) -> list|bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else: return False
data = {}
data['event_id'] = event_id
if event_id:
sql_where_event_id = f'`event_location`.event_id = :event_id'
else: sql_where_event_id = ''
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `event_location`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `event_location`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if hidden in ['hidden', 'not_hidden', 'all']:
if hidden == 'hidden':
data['hide'] = True
sql_hidden = f'AND `event_location`.hide = :hide'
elif hidden == 'not_hidden':
data['hide'] = False
sql_hidden = f'AND `event_location`.hide = :hide'
elif hidden == 'all':
sql_hidden = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `event_location`.id AS 'event_location_id', `event_location`.id_random AS 'event_location_id_random'
FROM `event_location` AS `event_location`
WHERE
{sql_where_event_id}
{sql_enabled}
{sql_hidden}
ORDER BY `event_location`.priority DESC, `event_location`.sort ASC, `event_location`.name ASC, `event_location`.created_on DESC, `event_location`.updated_on DESC
{sql_limit};
"""
log.debug(sql)
if event_location_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
log.info('Got a list result')
event_location_rec_li = event_location_rec_li_result
else: # [] or False
log.info('No results or something went wrong')
event_location_rec_li = event_location_rec_li_result
log.debug(event_location_rec_li_result)
return event_location_rec_li
# ### END ### API Event Location Methods ### get_event_location_rec_list() ###
# ### BEGIN ### API Event Location Methods ### create_update_event_location_obj_v4() ###
# NOTE: This will create or update a event_location.
# Rewrite and updated 2021-08-25
def create_update_event_location_obj_v4(
event_location_dict_obj: Event_Location_Base|dict,
event_location_id: int|str = None,
event_id: int|str = None,
# event_location_id: int|str = None, # For future?
# event_track_id: int|str = None, # For future?
create_sub_obj: bool = False,
fail_any: bool = False, # Fail if any thing goes wrong for sub objects
return_outline: bool = False,
) -> int|bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
log.info('Checking requirements...')
if event_location_id:
log.info(f'Event Location ID passed. Update existing Event Location. Event Location ID: {event_location_id}')
if event_location_id := redis_lookup_id_random(record_id_random=event_location_id, table_name='event_location'): pass
else:
log.error('Event Location ID passed but is invalid. Failed requirement.')
return False
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else:
log.error('Missing or invalid Event ID passed. Not required. Ignoring.')
log.info(f'Event ID: {event_id}')
log.info('Attempting to get Event ID from related object.')
from app.methods.event_methods import get_event_id_w_for_type_id
if event_id := get_event_id_w_for_type_id(for_type='event_location', for_id=event_location_id): pass
else:
log.error('Unable to get Event ID from related object.')
False
else:
log.info('No Event Location ID passed. Create new Event Location. Required: Account ID, Event ID')
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else:
log.error('Missing or invalid Event ID passed. Failed requirement.')
log.info(f'Event ID: {event_id}')
return False
log.debug(type(event_location_dict_obj))
if isinstance(event_location_dict_obj, dict):
event_location_dict = event_location_dict_obj
if event_location_id:
event_location_dict['event_location_id'] = event_location_id
if event_id:
event_location_dict['event_id'] = event_id
try:
event_location_obj = Event_Location_Base(**event_location_dict)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_location_obj)
except ValidationError as e:
log.error(e.json())
return False
else:
event_location_obj = event_location_dict_obj
if event_location_id:
# NOTE: Can't update the ID alias if it was never set.
event_location_obj.id = event_location_id
if event_id:
event_location_obj.event_id = event_id
event_location_dict = event_location_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'event_presentation', 'event_presentation_list', 'created_on', 'updated_on'})
if event_location_id:
if event_location_dict_up_result := sql_update(data=event_location_dict, table_name='event_location', rm_id_random=True): pass
else:
log.warning(f'Event Location not updated. Event Location ID: {event_location_id}')
log.debug(event_location_dict_up_result)
return False
log.debug(event_location_dict_up_result)
else:
if event_location_dict_in_result := sql_insert(data=event_location_dict, table_name='event_location', rm_id_random=True, id_random_length=8): pass
else:
log.warning(f'Event Location not created.')
log.debug(event_location_dict_in_result)
return False
log.debug(event_location_dict_in_result)
event_location_id = event_location_dict_in_result
log.debug(event_location_id)
event_location_outline = {}
event_location_outline['event_location_id'] = event_location_id
event_location_outline['event_session_list'] = []
if event_location_obj.event_session_list and isinstance(event_location_obj.event_session_list, list):
log.info(f'Event Session List was found. Loop through and create a new Event Session for each and link them to the new Event Location. Event Location ID: {event_location_id}')
for event_session_obj in event_location_obj.event_session_list:
# NOTE: Use object model version because of better type checking and validations
log.debug(event_session_obj)
if event_session_id := event_session_obj.id: pass
else: event_session_id = None
# event_session_obj.event_id = event_id
# event_session_obj.event_location_id = event_location_id
create_update_event_session_obj_result = create_update_event_session_obj_v4(
event_session_dict_obj = event_session_obj,
event_session_id = event_session_id,
event_id = event_id,
event_location_id = event_location_id,
fail_any = fail_any,
return_outline = return_outline,
)
if isinstance(create_update_event_session_obj_result, int):
event_session_id = create_update_event_session_obj_result
elif create_update_event_session_obj_result == True: pass
else:
log.warning(f'Create or Update failed while trying create_update_event_session_obj_v4(): {create_update_event_session_obj_result}')
event_session_id = None
event_location_outline['event_session_id'] = event_session_id
if return_outline:
log.debug(f'Returning the Event Location Outline: {event_location_outline}')
return event_location_outline
else:
log.debug(f'Returning the Event Location ID: {event_location_id}')
return event_location_id
# ### END ### API Event Location Methods ### create_update_event_location_obj_v4() ###