Files
OSIT-AE-API-FastAPI/app/methods/event_location_methods.py
2021-10-22 04:07:41 -04:00

270 lines
12 KiB
Python

from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging
# from app.methods.event_methods import load_event_obj
# from app.methods.event_device_methods import load_event_device_obj
# from app.methods.event_file_methods import load_event_file_obj
# from app.methods.event_presentation_methods import load_event_presentation_obj
# from app.methods.event_presenter_methods import load_event_presenter_obj
# from app.methods.event_track_methods import load_event_track_obj
from app.models.event_location_models import Event_Location_Base
# ### BEGIN ### API Event Location Methods ### load_event_location_obj() ###
def load_event_location_obj(
event_location_id: int|str,
inc_event_device_list: bool = False,
inc_event_file_list: bool = False,
inc_event_presentation_list: bool = False,
inc_event_presenter_list: bool = False,
inc_event_session_list: bool = False,
inc_file_count: bool = False, # NOTE: file counts are from separate views
event_file_file_purpose_id: int = None,
event_file_file_purpose: str = None,
event_file_priority: bool = None,
event_file_group: str = None,
enabled: str = 'enabled', # enabled, disabled, all
hidden: str = 'hidden', # hidden, not_hidden, all
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
) -> Event_Location_Base|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_location_id := redis_lookup_id_random(record_id_random=event_location_id, table_name='event_location'): pass
else: return False
if event_location_rec := sql_select(table_name='v_event_location', record_id=event_location_id): pass
else: return False
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_location_rec)
try:
log.info('Try to apply event location record data to Event_Location_Base...')
event_location_obj = Event_Location_Base(**event_location_rec)
log.debug(event_location_obj)
except ValidationError as e:
log.error(e.json())
return False
#account_id = event_location_rec.get('account_id', None)
event_id = event_location_rec.get('event_id', None)
# if inc_event_device_list: pass
# if inc_event_file_list: pass
# Updated 2021-10-21
if inc_event_file_list:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Need to include event file list...')
from app.methods.event_file_methods import get_event_file_rec_list, load_event_file_obj
if event_file_rec_list_result := get_event_file_rec_list(
for_type = 'event_location',
for_id = event_location_id,
file_purpose_id = event_file_file_purpose_id,
file_purpose = event_file_file_purpose,
priority = event_file_priority,
group = event_file_group,
enabled = enabled,
limit = limit,
):
event_file_result_list = []
for event_file_rec in event_file_rec_list_result:
if load_event_file_result := load_event_file_obj(
event_file_id = event_file_rec.get('event_file_id', None),
enabled = enabled,
# inc_hosted_file = inc_hosted_file,
# model_as_dict = True,
# by_alias = by_alias,
# exclude_unset = False,
):
event_file_result_list.append(load_event_file_result)
else:
event_file_result_list.append(None)
log.debug(event_file_result_list)
event_location_obj.event_file_list = event_file_result_list
elif isinstance(event_file_rec_list_result, list):
event_location_obj.event_file_list = []
else:
event_location_obj.event_file_list = None
if inc_event_presentation_list:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
data = {}
data['event_location_id'] = event_location_id
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `event_presentation`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `event_presentation`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
# else: event_location_obj['event'] = None
sql = f"""
SELECT `event_presentation`.id AS 'event_presentation_id', `event_presentation`.id_random AS 'event_presentation_id_random'
FROM `v_event_presentation` AS `event_presentation`
WHERE `event_presentation`.event_location_id = :event_location_id
{sql_enabled}
ORDER BY `event_presentation`.created_on DESC, `event_presentation`.updated_on DESC;
"""
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if event_presentation_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_presentation_rec_li_result)
event_presentation_obj_li = []
for event_presentation_rec in event_presentation_rec_li_result:
event_presentation_id = event_presentation_rec.get('event_presentation_id', None)
if event_presentation_obj := load_event_presentation_obj(
event_presentation_id=event_presentation_id,
enabled=enabled,
inc_event_device_list=inc_event_device_list,
inc_event_file_list=inc_event_file_list,
inc_event_presenter_list=inc_event_presenter_list,
):
data = event_presentation_obj.dict(by_alias=True, exclude_unset=True)
event_presentation_obj_li.append(data)
log.debug(event_presentation_obj_li)
event_location_obj.event_presentation_list = event_presentation_obj_li
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_presentation_rec_li_result)
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if inc_event_presenter_list: pass
# Updated 2021-10-09
if inc_event_session_list:
log.info('Need to include event session list...')
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
from app.methods.event_session_methods import get_event_session_rec_list, load_event_session_obj
if event_session_rec_list_result := get_event_session_rec_list(
event_location_id = event_location_id,
enabled = enabled, # enabled, disabled, all
approved = 'all', # approve(d), not_approved, all
hidden = hidden, # hidden, not_hidden, all
review = 'all', # ready, not_ready, all
limit = limit,
):
event_session_result_list = []
for event_session_rec in event_session_rec_list_result:
if load_event_session_result := load_event_session_obj(
event_session_id = event_session_rec.get('event_session_id', None),
enabled = enabled,
limit = limit,
inc_event_file_list = inc_event_file_list,
# inc_event_presentation_list = inc_event_presentation_list,
# inc_event_presenter_cat = inc_event_presenter_cat,
# inc_event_presenter_list = inc_event_presenter_list,
# inc_person = inc_person,
# inc_poc_event_person = inc_poc_event_person,
by_alias = by_alias,
exclude_unset = exclude_unset,
# model_as_dict = model_as_dict,
):
event_session_result_list.append(load_event_session_result)
else:
event_session_result_list.append(None)
log.debug(event_session_result_list)
event_location_obj.event_session_list = event_session_result_list
elif isinstance(event_session_rec_list_result, list):
event_location_obj.event_session_list = []
else:
event_location_obj.event_session_list = None
if model_as_dict:
return event_location_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return event_location_obj
return event_location_obj
# ### BEGIN ### API Event Location Methods ### get_event_location_rec_list() ###
def get_event_location_rec_list(
event_id: str,
enabled: str = 'enabled', # enabled, disabled, all
hidden: str = 'not_hidden', # hidden, not_hidden, all
limit: int = 100,
) -> list|bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else: return False
data = {}
data['event_id'] = event_id
if event_id:
sql_where_event_id = f'`event_location`.event_id = :event_id'
else: sql_where_event_id = ''
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `event_location`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `event_location`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if hidden in ['hidden', 'not_hidden', 'all']:
if hidden == 'hidden':
data['hide'] = True
sql_hidden = f'AND `event_location`.hide = :hide'
elif hidden == 'not_hidden':
data['hide'] = False
sql_hidden = f'AND `event_location`.hide = :hide'
elif hidden == 'all':
sql_hidden = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `event_location`.id AS 'event_location_id', `event_location`.id_random AS 'event_location_id_random'
FROM `event_location` AS `event_location`
WHERE
{sql_where_event_id}
{sql_enabled}
{sql_hidden}
ORDER BY `event_location`.created_on DESC, `event_location`.updated_on DESC
{sql_limit};
"""
if event_location_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
event_location_rec_li = event_location_rec_li_result
else:
event_location_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_location_rec_li_result)
log.debug(type(event_location_rec_li))
log.debug(len(event_location_rec_li))
return event_location_rec_li
# ### END ### API Event Location Methods ### get_event_location_rec_list() ###