Files
OSIT-AE-API-FastAPI/app/methods/event_session_methods.py
2021-10-10 01:21:59 -04:00

773 lines
36 KiB
Python

from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging
# from app.methods.event_methods import load_event_obj
from app.methods.event_file_methods import load_event_file_obj_list
from app.methods.event_location_methods import load_event_location_obj
from app.methods.event_person_methods import load_event_person_obj, update_event_person_obj
from app.methods.event_presentation_methods import create_event_presentation_obj, create_update_event_presentation_obj_v4, load_event_presentation_obj, update_event_presentation_obj_v3
# from app.methods.event_presenter_methods import load_event_presenter_obj
from app.methods.person_methods import load_person_obj
# from app.methods.user_methods import load_user_obj
from app.models.event_session_models import Event_Session_Base
# ### BEGIN ### API Event Session Methods ### load_event_session_obj() ###
def load_event_session_obj(
event_session_id: int|str,
enabled: str = 'enabled', # enabled, disabled, all
hidden: str = 'not_hidden', # hidden, not_hidden, all
inc_file_count: bool = False, # NOTE: file counts are from separate views
event_file_file_purpose_id: int = None,
event_file_file_purpose: str = None,
event_file_priority: bool = None,
event_file_group: str = None,
inc_address: bool = False,
inc_contact: bool = False,
inc_event_abstract_list: bool = False,
inc_event_badge_list: bool = False,
inc_event_device_list: bool = False,
inc_event_file_list: bool = False,
inc_event_location: bool = False,
inc_event_person: bool = False,
inc_event_person_profile: bool = False,
inc_event_person_list: bool = False,
inc_event_presentation_list: bool = False,
inc_event_presenter_cat: bool = False, # Concantinate presenter names
inc_event_presenter_list: bool = False,
inc_event_registration_list: bool = False,
inc_event_track: bool = False,
inc_hosted_file: bool = False,
inc_poc_event_person: bool = False,
inc_person: bool = False,
inc_user: bool = False,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
) -> Event_Session_Base|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_session_id := redis_lookup_id_random(record_id_random=event_session_id, table_name='event_session'): pass
else: return False
if inc_file_count:
log.info('Using view with file count')
if event_session_rec := sql_select(table_name='v_event_session_w_file_count', record_id=event_session_id): pass
else:
return False
else:
if event_session_rec := sql_select(table_name='v_event_session', record_id=event_session_id): pass
else:
return False
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_session_rec)
try:
event_session_obj = Event_Session_Base(**event_session_rec)
log.debug(event_session_obj)
except ValidationError as e:
log.error(e.json())
return False
account_id = event_session_rec.get('account_id', None)
event_id = event_session_rec.get('event_id', None)
event_location_id = event_session_rec.get('event_location_id', None)
event_track_id = event_session_rec.get('event_track_id', None)
poc_event_person_id = event_session_rec.get('poc_event_person_id', None)
#if inc_event: pass
if inc_event_abstract_list: pass
if inc_event_badge_list: pass
if inc_event_device_list: pass
if inc_event_file_list: pass
if inc_event_file_list:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Need to include event file list...')
from app.methods.event_file_methods import get_event_file_rec_list, load_event_file_obj
if event_file_rec_list_result := get_event_file_rec_list(
for_type = 'event_session',
for_id = event_session_id,
file_purpose_id = event_file_file_purpose_id,
file_purpose = event_file_file_purpose,
priority = event_file_priority,
group = event_file_group,
enabled = enabled,
limit = limit,
):
event_file_result_list = []
for event_file_rec in event_file_rec_list_result:
if load_event_file_result := load_event_file_obj(
event_file_id = event_file_rec.get('event_file_id', None),
enabled = enabled,
limit = limit,
inc_hosted_file = inc_hosted_file,
# model_as_dict = True,
# by_alias = by_alias,
# exclude_unset = False,
):
event_file_result_list.append(load_event_file_result)
else:
event_file_result_list.append(None)
log.debug(event_file_result_list)
event_session_obj.event_file_list = event_file_result_list
elif isinstance(event_file_rec_list_result, list):
event_session_obj.event_file_list = []
else:
event_session_obj.event_file_list = None
# if event_file_dict_list := load_event_file_obj_list(
# event_session_id = event_session_id,
# limit = limit,
# model_as_dict = model_as_dict,
# enabled = enabled,
# ):
# event_session_obj.event_file_list = event_file_dict_list
# else: event_session_obj.event_file_list = []
if inc_event_location and event_location_id:
log.info('Need to include event location...')
if event_location_obj := load_event_location_obj(
event_location_id = event_location_id,
enabled = enabled,
):
event_session_obj.event_location = event_location_obj.dict(by_alias=True, exclude_unset=True)
else:
event_session_obj.event_location = None
else:
event_session_obj.event_location = None
if inc_event_person_list: pass
if inc_event_presentation_list:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Need to include event presentation list...')
data = {}
data['event_session_id'] = event_session_id
if hidden in ['hidden', 'not_hidden', 'all']:
if hidden == 'hidden':
data['hide'] = True
sql_hidden = f'AND `event_presentation`.hide = :hide'
elif hidden == 'not_hidden':
data['hide'] = False
sql_hidden = f'AND `event_presentation`.hide = :hide'
elif hidden == 'all':
sql_hidden = ''
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `event_presentation`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `event_presentation`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
# else: event_obj['event_session'] = None
# if limit:
# data['limit'] = limit
# sql_limit = f'LIMIT :limit'
# else:
# sql_limit = ''
sql = f"""
SELECT `event_presentation`.id AS 'event_presentation_id', `event_presentation`.id_random AS 'event_presentation_id_random'
FROM `event_presentation` AS `event_presentation`
WHERE `event_presentation`.event_session_id = :event_session_id
{sql_hidden}
{sql_enabled}
ORDER BY `event_presentation`.priority DESC, `event_presentation`.created_on DESC, `event_presentation`.updated_on DESC;
"""
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if event_presentation_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_presentation_rec_li_result)
event_presentation_obj_li = []
for event_presentation_rec in event_presentation_rec_li_result:
event_presentation_id = event_presentation_rec.get('event_presentation_id', None)
if event_presentation_obj := load_event_presentation_obj(
event_presentation_id = event_presentation_id,
enabled = enabled,
# review = review,
# approved = approved,
hidden = hidden,
inc_file_count = inc_file_count,
inc_address = inc_address,
inc_contact = inc_contact,
inc_event_abstract_list = inc_event_abstract_list,
inc_event_device_list = inc_event_device_list,
inc_event_file_list = inc_event_file_list,
inc_event_person = inc_event_person,
inc_event_person_profile = inc_event_person_profile,
inc_event_person_list = inc_event_person_list,
inc_event_presenter_list = inc_event_presenter_list,
inc_person = inc_person,
inc_user = inc_user
):
data = event_presentation_obj.dict(by_alias=True, exclude_unset=True)
event_presentation_obj_li.append(data)
log.debug(event_presentation_obj_li)
event_session_obj.event_presentation_list = event_presentation_obj_li
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_presentation_rec_li_result)
event_session_obj.event_presentation_list = []
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if inc_event_presenter_cat:
data = {}
data['event_session_id'] = event_session_id
sql = f"""
SELECT event_session_id, GROUP_CONCAT(given_name, ' ', family_name SEPARATOR '; ') AS 'event_presenter_names'
FROM event_presenter
WHERE event_session_id = :event_session_id
GROUP BY event_session_id
LIMIT 1
"""
if event_presenter_cat_rec_result := sql_select(data=data, sql=sql):
event_session_obj.event_presenter_cat = event_presenter_cat_rec_result.get('event_presenter_names', None)
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_presenter_cat_rec_result)
event_session_obj.event_presenter_cat = None
if inc_event_presenter_list: pass
if inc_poc_event_person:
poc_event_person_obj = load_event_person_obj(
event_person_id = poc_event_person_id,
inc_address = inc_address,
inc_contact = inc_contact,
inc_person = inc_person,
inc_user = inc_user,
)
log.debug(poc_event_person_obj)
event_session_obj.poc_event_person = poc_event_person_obj
log.debug(event_session_obj)
if model_as_dict:
return event_session_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return event_session_obj
# ### END ### API Event Session Methods ### load_event_session_obj() ###
# ### BEGIN ### API Event Session Methods ### get_event_session_rec_list() ###
def get_event_session_rec_list(
event_id: str = None,
event_location_id: str = None,
event_track_id: str = None,
enabled: str = 'enabled', # enabled, disabled, all
approved: str = 'all', # approved, not_approved, all
hidden: str = 'not_hidden', # hidden, not_hidden, all
review: str = 'all', # ready, not_ready, all
limit: int = 100,
) -> list|bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
# else: return False
if event_location_id := redis_lookup_id_random(record_id_random=event_location_id, table_name='event_location'): pass
# else: return False
data = {}
if event_id:
data[f'event_id'] = event_id
sql_where_type_id = f'`event_session`.event_id = :event_id'
elif event_location_id:
data[f'event_location_id'] = event_location_id
sql_where_type_id = f'`event_session`.event_location_id = :event_location_id'
if review in ['ready', 'not_ready', 'all']:
if review == 'ready':
data['review'] = True
sql_review = f'AND `event_session`.review = :review'
elif review == 'not_ready':
data['review'] = False
sql_review = f'AND `event_session`.review = :review'
elif review == 'all':
sql_review = ''
if approved in ['approved', 'not_approved', 'all']:
if approved == 'approved':
data['approve'] = True
sql_approved = f'AND `event_session`.approve = :approve'
elif approved == 'not_approved':
data['approve'] = False
sql_approved = f'AND `event_session`.approve = :approve'
elif approved == 'all':
sql_approved = ''
if hidden in ['hidden', 'not_hidden', 'all']:
if hidden == 'hidden':
data['hide'] = True
sql_hidden = f'AND `event_session`.hide = :hide'
elif hidden == 'not_hidden':
data['hide'] = False
sql_hidden = f'AND `event_session`.hide = :hide'
elif hidden == 'all':
sql_hidden = ''
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `event_session`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `event_session`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `event_session`.id AS 'event_session_id', `event_session`.id_random AS 'event_session_id_random'
FROM `event_session` AS `event_session`
WHERE
{sql_where_type_id}
{sql_review}
{sql_approved}
{sql_hidden}
{sql_enabled}
ORDER BY `event_session`.created_on DESC, `event_session`.updated_on DESC
{sql_limit};
"""
log.debug(sql)
if event_session_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
event_session_rec_li = event_session_rec_li_result
else:
event_session_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_session_rec_li_result)
log.debug(type(event_session_rec_li))
log.debug(len(event_session_rec_li))
return event_session_rec_li
# ### END ### API Event Session Methods ### get_event_session_rec_list() ###
# ### BEGIN ### API Event Session Methods ### create_update_event_session_obj_v4() ###
# NOTE: This will create or update a event_session.
# Rewrite and updated 2021-08-25
def create_update_event_session_obj_v4(
event_session_dict_obj: Event_Session_Base|dict,
event_session_id: int|str = None,
event_id: int|str = None,
# event_location_id: int|str = None, # For future?
# event_track_id: int|str = None, # For future?
create_sub_obj: bool = False,
fail_any: bool = False, # Fail if any thing goes wrong for sub objects
return_outline: bool = False,
) -> int|bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
log.info('Checking requirements...')
if event_session_id:
log.info(f'Event Session ID passed. Update existing Event Session. Event Session ID: {event_session_id}')
if event_session_id := redis_lookup_id_random(record_id_random=event_session_id, table_name='event_session'): pass
else:
log.error('Event Session ID passed but is invalid. Failed requirement.')
return False
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else:
log.error('Missing or invalid Event ID passed. Not required. Ignoring.')
log.info(f'Event ID: {event_id}')
log.info('Attempting to get Event ID from related object.')
from app.methods.event_methods import get_event_id_w_for_type_id
if event_id := get_event_id_w_for_type_id(for_type='event_session', for_id=event_session_id): pass
else:
log.error('Unable to get Event ID from related object.')
False
else:
log.info('No Event Session ID passed. Create new Event Session. Required: Account ID, Event ID')
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else:
log.error('Missing or invalid Event ID passed. Failed requirement.')
log.info(f'Event ID: {event_id}')
return False
log.debug(type(event_session_dict_obj))
if isinstance(event_session_dict_obj, dict):
event_session_dict = event_session_dict_obj
if event_session_id:
event_session_dict['event_session_id'] = event_session_id
if event_id:
event_session_dict['event_id'] = event_id
try:
event_session_obj = Event_Session_Base(**event_session_dict)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_session_obj)
except ValidationError as e:
log.error(e.json())
return False
else:
event_session_obj = event_session_dict_obj
if event_session_id:
# NOTE: Can't update the ID alias if it was never set.
event_session_obj.id = event_session_id
if event_id:
event_session_obj.event_id = event_id
event_session_dict = event_session_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'event_presentation', 'event_presentation_list', 'created_on', 'updated_on'})
if event_session_id:
if event_session_dict_up_result := sql_update(data=event_session_dict, table_name='event_session', rm_id_random=True): pass
else:
log.warning(f'Event Session not updated. Event Session ID: {event_session_id}')
log.debug(event_session_dict_up_result)
return False
log.debug(event_session_dict_up_result)
else:
if event_session_dict_in_result := sql_insert(data=event_session_dict, table_name='event_session', rm_id_random=True, id_random_length=8): pass
else:
log.warning(f'Event Session not created.')
log.debug(event_session_dict_in_result)
return False
log.debug(event_session_dict_in_result)
event_session_id = event_session_dict_in_result
log.debug(event_session_id)
event_session_outline = {}
event_session_outline['event_session_id'] = event_session_id
event_session_outline['event_presentation_list'] = []
if event_session_obj.event_presentation_list and isinstance(event_session_obj.event_presentation_list, list):
log.info(f'Event Presentation List was found. Loop through and create a new Event Presentation for each and link them to the new Event Session. Event Session ID: {event_session_id}')
for event_presentation_obj in event_session_obj.event_presentation_list:
# NOTE: Use object model version because of better type checking and validations
log.debug(event_presentation_obj)
if event_presentation_id := event_presentation_obj.id: pass
else: event_presentation_id = None
# event_presentation_obj.event_id = event_id
# event_presentation_obj.event_session_id = event_session_id
create_update_event_presentation_obj_result = create_update_event_presentation_obj_v4(
event_presentation_dict_obj = event_presentation_obj,
event_presentation_id = event_presentation_id,
event_id = event_id,
# event_location_id = event_location_id,
event_session_id = event_session_id,
# event_track_id = event_track_id,
fail_any = fail_any,
return_outline = return_outline,
)
if isinstance(create_update_event_presentation_obj_result, int):
event_presentation_id = create_update_event_presentation_obj_result
elif create_update_event_presentation_obj_result == True: pass
else:
log.warning(f'Create or Update failed while trying create_update_event_presentation_obj_v4(): {create_update_event_presentation_obj_result}')
event_presentation_id = None
event_session_outline['event_presentation_id'] = event_presentation_id
if return_outline:
log.debug(f'Returning the Event Session Outline: {event_session_outline}')
return event_session_outline
else:
log.debug(f'Returning the Event Session ID: {event_session_id}')
return event_session_id
# ### END ### API Event Session Methods ### create_update_event_session_obj_v4() ###
# ### BEGIN ### API Event Session Methods ### create_event_session_obj() ###
# Updated 2021-08-25
def create_event_session_obj(
event_id: int|str,
event_session_obj_new: Event_Session_Base,
create_sub_obj: bool = False,
fail_any: bool = False, # Fail if any thing goes wrong for sub objects
) -> bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else: return False
log.debug(type(event_session_obj_new))
if isinstance(event_session_obj_new, dict):
try:
event_session_obj_new = Event_Session_Base(**event_session_obj_new)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_session_obj_new)
except ValidationError as e:
log.error(e.json())
return False
event_session_obj_data = event_session_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'event_presentation', 'event_presentation_list', 'event_presenter', 'event_presenter_list', 'created_on', 'updated_on'})
log.debug(event_session_obj_data)
if event_session_obj_in_result := sql_insert(data=event_session_obj_data, table_name='event_session', rm_id_random=True, id_random_length=8): pass
else:
log.warning(f'Event Session not created.')
log.debug(event_session_obj_in_result)
return False
event_session_id = event_session_obj_in_result
return_dict = {}
return_dict['event_session_id'] = None
return_dict['event_presentation_list'] = []
if event_session_obj_new.event_presentation_list and isinstance(event_session_obj_new.event_presentation_list, list):
log.info(f'Event Presentation List was found. Loop through and create a new Event Presentation for each and link them to the new Event Session. Event Session ID: {event_session_id}')
for event_presentation_obj_new in event_session_obj_new.event_presentation_list:
# NOTE: This does not account for an edge case where the presentation already exists. Possibly as part of another session.
if create_event_presentation_obj_result := create_event_presentation_obj(
event_session_id = event_session_id,
event_presentation_obj_new = event_presentation_obj_new,
create_sub_obj = create_sub_obj,
fail_any = fail_any,
):
if isinstance(create_event_presentation_obj_result, int):
event_presentation_id = create_event_presentation_obj_result
log.info(f'Event Presentation created. Event Presentation ID: {event_presentation_id}')
else:
log.warning(f'Event Presentation not created. Event Session ID: {event_session_id}')
log.debug(create_event_presentation_obj_result)
event_presentation_id = None
if fail_any: return False
else:
log.warning(f'Event Presentation not created. Event Session ID: {event_session_id}')
log.debug(create_event_presentation_obj_result)
event_presentation_id = None
if fail_any: return False
return_dict['event_presentation_list'].append(event_presentation_id)
else:
log.info('Event Presentation List not found')
pass
log.info(f'The Event Session has been created. Event Session ID: {event_session_id}')
return event_session_id
# ### END ### API Event Session Methods ### create_event_session_obj() ###
# ### BEGIN ### API Event Session Methods ### update_event_session_obj_v3() ###
# Updated 2021-08-25
def update_event_session_obj_v3(
event_session_id: int|str,
event_session_obj_exist: Event_Session_Base,
create_sub_obj: bool = False,
fail_any: bool = False, # Fail if any thing goes wrong for sub objects
) -> bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_session_id := redis_lookup_id_random(record_id_random=event_session_id, table_name='event_session'): pass
else: return False
log.debug(type(event_session_obj_exist))
if isinstance(event_session_obj_exist, dict):
try:
event_session_obj_exist = Event_Session_Base(**event_session_obj_exist)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_session_obj_exist)
except ValidationError as e:
log.error(e.json())
return False
# Can't update the event_session_id alias if the .id was never set.
# event_session_obj_exist.event_session_id = event_session_id
if not event_session_obj_exist.id:
event_session_obj_exist.id = event_session_id
event_session_obj_data = event_session_obj_exist.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'event_presentation', 'event_presentation_list', 'event_presenter', 'event_presenter_list', 'created_on', 'updated_on'})
log.debug(event_session_obj_data)
if event_session_obj_up_result := sql_update(data=event_session_obj_data, table_name='event_session', rm_id_random=True): pass
else:
log.warning(f'Event Session not updated.')
log.debug(event_session_obj_up_result)
return False
return_dict = {}
return_dict['event_session_id'] = event_session_id
return_dict['event_presentation_list'] = []
if event_session_obj_exist.event_presentation_list and isinstance(event_session_obj_exist.event_presentation_list, list):
for event_presentation_obj_unknown in event_session_obj_exist.event_presentation_list:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_presentation_obj_unknown)
if event_presentation_id := event_presentation_obj_unknown.get('event_presentation_id_random', None):
if update_event_presentation_obj_result := update_event_presentation_obj_v3(
event_presentation_id = event_presentation_id,
event_presentation_obj_exist = event_presentation_obj_unknown,
create_sub_obj = create_sub_obj,
fail_any = fail_any,
):
event_presentation_id = update_event_presentation_obj_result
log.info(f'Event Presentation updated. Event Presentation ID: {event_presentation_id}')
else:
log.warning(f'Event Presentation not updated. Event Session ID: {event_session_id}')
log.debug(update_event_presentation_obj_result)
event_presentation_id = None
if fail_any: return False
if isinstance(update_event_presentation_obj_result, int):
event_presentation_id = update_event_presentation_obj_result
log.info(f'Event Presentation updated. Event Presentation ID: {event_presentation_id}')
else:
log.warning(f'Event Presentation not updated. Event Session ID: {event_session_id}')
log.debug(update_event_presentation_obj_result)
event_presentation_id = None
if fail_any: return False
else:
log.info(f'No Event Presentation ID found.')
if create_event_presentation_obj_result := create_event_presentation_obj(
event_session_id = event_session_id,
event_presentation_obj_new = event_presentation_obj_unknown,
create_sub_obj = create_sub_obj,
fail_any = fail_any,
):
if isinstance(create_event_presentation_obj_result, int):
event_presentation_id = create_event_presentation_obj_result
log.info(f'Event Presentation created. Event Presentation ID: {event_presentation_id}')
else:
log.warning(f'Event Presentation not created. Event Session ID: {event_session_id}')
log.debug(create_event_presentation_obj_result)
event_presentation_id = None
if fail_any: return False
else:
log.warning(f'Event Presentation not created. Event Session ID: {event_session_id}')
log.debug(create_event_presentation_obj_result)
event_presentation_id = None
if fail_any: return False
return_dict['event_presentation_list'].append(event_presentation_id)
else:
log.info('Event Presentation List not found or not in a list.')
pass
log.info(f'The event session has been updated. Event Session ID: {event_session_id}')
return True
# ### END ### API Event Session Methods ### update_event_session_obj_v3() ###
# ### BEGIN ### API Event Session Methods ### update_event_session_obj() ###
# This will be taken over by _exist version
def update_event_session_obj(
event_session_id: int|str, # Ideally the int ID should be passed. This allows for updating of the id_random value.
event_session_obj_up: Event_Session_Base,
create_sub_obj: bool = False,
) -> bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_session_id := redis_lookup_id_random(record_id_random=event_session_id, table_name='event_session'): pass
else: return False
event_session_obj_up.id = event_session_id
log.debug(event_session_obj_up)
# log.debug(event_session_obj_up.dict(by_alias=True, exclude_unset=True))
log.debug(event_session_obj_up.dict(by_alias=False, exclude_unset=True))
# log.debug(event_session_obj_up.dict(by_alias=False, exclude_unset=False))
if event_session_obj_up.poc_event_person_id and event_session_obj_up.poc_event_person:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
poc_event_person_id = event_session_obj_up.poc_event_person_id
poc_event_person_obj_up = event_session_obj_up.poc_event_person
log.debug(poc_event_person_id)
log.debug(poc_event_person_obj_up)
if poc_event_person_obj_up_result := update_event_person_obj(
event_person_id=poc_event_person_id,
event_person_obj_up=poc_event_person_obj_up,
create_sub_obj=create_sub_obj,
):
log.debug(poc_event_person_obj_up_result)
else:
log.debug(poc_event_person_obj_up_result)
return False
elif event_session_obj_up.poc_event_person and not event_session_obj_up.poc_event_person.id:
# NOTE: This will blindly create a new poc_event_person even if there was one associated but the event_session.poc_event_person_id was not found.
poc_event_person_obj_in = event_session_obj_up.poc_event_person
log.debug(poc_event_person_obj_in)
if poc_event_person_obj_in_result := create_event_person_obj(event_person_obj_new=poc_event_person_obj_in):
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(poc_event_person_obj_in_result)
event_session_obj_up.poc_event_person_id = poc_event_person_obj_in_result
else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(poc_event_person_obj_in_result)
return False
if event_session_obj_up.event_presentation_id and event_session_obj_up.event_presentation:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
event_presentation_id = event_session_obj_up.event_presentation_id
event_presentation_obj_up = event_session_obj_up.event_presentation
log.debug(event_presentation_id)
log.debug(event_presentation_obj_up)
if event_presentation_obj_up_result := update_event_presentation_obj(
event_presentation_id=event_presentation_id,
event_presentation_obj_up=event_presentation_obj_up,
create_sub_obj=create_sub_obj,
):
log.debug(event_presentation_obj_up_result)
else:
log.debug(event_presentation_obj_up_result)
return False
elif event_session_obj_up.event_presentation and not event_session_obj_up.event_presentation.id:
# NOTE: This will blindly create a new event_presentation even if there was one associated but the event_session.event_presentation_id was not found.
event_presentation_obj_in = event_session_obj_up.event_presentation
log.debug(event_presentation_obj_in)
if event_presentation_obj_in_result := create_event_presentation_obj(event_session_id=event_session, event_presentation_obj_new=event_presentation_obj_in):
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_presentation_obj_in_result)
event_session_obj_up.event_presentation_id = event_presentation_obj_in_result
else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_presentation_obj_in_result)
return False
event_session_dict_up = event_session_obj_up.dict(by_alias=False, exclude_unset=True, exclude={'event_abstract', 'event_abstract_list', 'event_file_list', 'event_person', 'event_presentation', 'event_session', 'person', 'user'})
log.debug(event_session_dict_up)
if event_session_obj_up_result := sql_update(data=event_session_dict_up, table_name='event_session', rm_id_random=True):
log.debug(event_session_obj_up_result)
return True
else:
log.debug(event_session_obj_up_result)
return False
# ### END ### API Session Methods ### update_event_session_obj() ###