import datetime from typing import Dict, List, Optional, Set, Union from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator from app.db_sql import redis_lookup_id_random, sql_enable_part, sql_insert, sql_limit_offset_part, sql_select, sql_update from app.lib_general import log, logging, logger_reset # from app.methods.event_methods import load_event_obj from app.methods.event_file_methods import load_event_file_obj_list from app.methods.event_location_methods import load_event_location_obj, get_event_location_rec_list from app.methods.event_person_methods import load_event_person_obj, update_event_person_obj from app.methods.event_presentation_methods import create_event_presentation_obj, create_update_event_presentation_obj_v4, load_event_presentation_obj, update_event_presentation_obj_v3 # from app.methods.event_presenter_methods import load_event_presenter_obj from app.methods.person_methods import load_person_obj # from app.methods.user_methods import load_user_obj from app.models.event_session_models import Event_Session_Base # ### BEGIN ### API Event Session Methods ### load_event_session_obj() ### # Updated 2022-09-23 @logger_reset def load_event_session_obj( event_session_id: int|str, enabled: str = 'enabled', # enabled, disabled, all hidden: str = 'not_hidden', # hidden, not_hidden, all inc_file_count: bool = False, # NOTE: file counts are from separate views event_file_file_purpose_id: int = None, event_file_file_purpose: str = None, event_file_priority: bool = None, event_file_group: str = None, inc_address: bool = False, inc_contact: bool = False, inc_event_abstract_list: bool = False, inc_event_badge_list: bool = False, inc_event_device_list: bool = False, inc_event_file_list: bool = False, inc_event_file_internal_use_list: bool = False, inc_event_location: bool = False, inc_event_location_list: bool = False, inc_event_person: bool = False, inc_event_person_profile: bool = False, inc_event_person_list: bool = False, inc_event_presentation_list: bool = False, inc_event_presenter_cat: bool = False, # Concatenate presenter names inc_event_presenter_list: bool = False, inc_event_registration_list: bool = False, inc_event_track: bool = False, inc_hosted_file: bool = False, inc_poc_event_person: bool = False, inc_person: bool = False, inc_user: bool = False, limit: int = 1000, offset: int = 0, by_alias: bool = True, exclude_unset: bool = True, model_as_dict: bool = False, ) -> Event_Session_Base|bool: log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if event_session_id := redis_lookup_id_random(record_id_random=event_session_id, table_name='event_session'): pass else: return False if inc_file_count: log.info('Using view with file count') if event_session_rec := sql_select(table_name='v_event_session_w_file_count', record_id=event_session_id): pass else: return False else: if event_session_rec := sql_select(table_name='v_event_session', record_id=event_session_id): pass else: return False log.debug(event_session_rec) try: event_session_obj = Event_Session_Base(**event_session_rec) log.debug(event_session_obj) except ValidationError as e: log.error(e.json()) return False account_id = event_session_rec.get('account_id', None) event_id = event_session_rec.get('event_id', None) event_location_id = event_session_rec.get('event_location_id', None) event_track_id = event_session_rec.get('event_track_id', None) poc_event_person_id = event_session_rec.get('poc_event_person_id', None) #if inc_event: pass if inc_event_abstract_list: pass if inc_event_badge_list: pass if inc_event_device_list: pass # Updated 2021-10-21 if inc_event_file_list: log.info('Need to include event file list...') from app.methods.event_file_methods import get_event_file_rec_list, load_event_file_obj if event_file_rec_list_result := get_event_file_rec_list( for_type = 'event_session', for_id = event_session_id, file_purpose_id = event_file_file_purpose_id, file_purpose = event_file_file_purpose, internal_use = False, priority = event_file_priority, group = event_file_group, enabled = enabled, limit = limit, ): event_file_result_list = [] for event_file_rec in event_file_rec_list_result: if load_event_file_result := load_event_file_obj( event_file_id = event_file_rec.get('event_file_id', None), enabled = enabled, inc_hosted_file = inc_hosted_file, ): event_file_result_list.append(load_event_file_result) else: event_file_result_list.append(None) log.debug(event_file_result_list) event_session_obj.event_file_list = event_file_result_list elif isinstance(event_file_rec_list_result, list): event_session_obj.event_file_list = [] else: event_session_obj.event_file_list = None if inc_event_file_internal_use_list: log.info('Need to include event file internal use list...') from app.methods.event_file_methods import get_event_file_rec_list, load_event_file_obj if event_file_rec_list_result := get_event_file_rec_list( for_type = 'event_session', for_id = event_session_id, file_purpose_id = event_file_file_purpose_id, file_purpose = event_file_file_purpose, internal_use = True, priority = event_file_priority, group = event_file_group, enabled = enabled, limit = limit, ): event_file_result_list = [] for event_file_rec in event_file_rec_list_result: if load_event_file_result := load_event_file_obj( event_file_id = event_file_rec.get('event_file_id', None), enabled = enabled, inc_hosted_file = inc_hosted_file, ): event_file_result_list.append(load_event_file_result) else: event_file_result_list.append(None) log.debug(event_file_result_list) event_session_obj.event_file_internal_use_list = event_file_result_list elif isinstance(event_file_rec_list_result, list): event_session_obj.event_file_internal_use_list = [] else: event_session_obj.event_file_internal_use_list = None log.debug(f'Get event location? Include Event Location: {inc_event_location} Event Location ID: {event_location_id}') if inc_event_location and event_location_id: log.info('Need to include event location...') if event_location_obj := load_event_location_obj( event_location_id = event_location_id, enabled = enabled, ): event_session_obj.event_location = event_location_obj.dict(by_alias=True, exclude_unset=True) else: event_session_obj.event_location = None else: event_session_obj.event_location = None # Updated 2022-09-20 if inc_event_location_list: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.info('Need to include event location list...') if event_location_rec_list_result := get_event_location_rec_list( event_id = event_id, enabled = enabled, # enabled, disabled, all hidden = 'all', # hidden, not_hidden, all limit = limit, ): event_location_result_list = [] for event_location_rec in event_location_rec_list_result: if load_event_location_result := load_event_location_obj( event_location_id = event_location_rec.get('event_location_id', None), enabled = enabled, limit = limit, ): event_location_result_list.append(load_event_location_result) else: event_location_result_list.append(None) log.debug(event_location_result_list) event_session_obj.event_location_list = event_location_result_list elif isinstance(event_location_rec_list_result, list): event_session_obj.event_location_list = [] else: event_session_obj.event_location_list = None if inc_event_person_list: pass if inc_event_presentation_list: log.info('Need to include event presentation list...') data = {} data['event_session_id'] = event_session_id if hidden in ['hidden', 'not_hidden', 'all']: if hidden == 'hidden': data['hide'] = True sql_hidden = f'AND `event_presentation`.hide = :hide' elif hidden == 'not_hidden': data['hide'] = False sql_hidden = f'AND `event_presentation`.hide = :hide' elif hidden == 'all': sql_hidden = '' if enabled in ['enabled', 'disabled', 'all']: if enabled == 'enabled': data['enable'] = True sql_enabled = f'AND `event_presentation`.enable = :enable' elif enabled == 'disabled': data['enable'] = False sql_enabled = f'AND `event_presentation`.enable = :enable' elif enabled == 'all': sql_enabled = '' # else: event_obj['event_session'] = None # if limit: # data['limit'] = limit # sql_limit = f'LIMIT :limit' # else: # sql_limit = '' sql = f""" SELECT `event_presentation`.id AS 'event_presentation_id', `event_presentation`.id_random AS 'event_presentation_id_random' FROM `event_presentation` AS `event_presentation` WHERE `event_presentation`.event_session_id = :event_session_id {sql_hidden} {sql_enabled} ORDER BY `event_presentation`.priority DESC, -`event_presentation`.sort DESC, `event_presentation`.start_datetime ASC, `event_presentation`.name ASC, `event_presentation`.created_on DESC, `event_presentation`.updated_on DESC; """ if event_presentation_rec_li_result := sql_select(data=data, sql=sql, as_list=True): log.debug(event_presentation_rec_li_result) event_presentation_obj_li = [] for event_presentation_rec in event_presentation_rec_li_result: event_presentation_id = event_presentation_rec.get('event_presentation_id', None) if event_presentation_obj := load_event_presentation_obj( event_presentation_id = event_presentation_id, enabled = enabled, # review = review, # approved = approved, hidden = hidden, inc_file_count = inc_file_count, inc_address = inc_address, inc_contact = inc_contact, inc_event_abstract_list = inc_event_abstract_list, inc_event_device_list = inc_event_device_list, inc_event_file_list = inc_event_file_list, inc_event_person = inc_event_person, inc_event_person_profile = inc_event_person_profile, inc_event_person_list = inc_event_person_list, inc_event_presenter_list = inc_event_presenter_list, inc_person = inc_person, inc_user = inc_user ): data = event_presentation_obj.dict(by_alias=True, exclude_unset=True) event_presentation_obj_li.append(data) log.debug(event_presentation_obj_li) event_session_obj.event_presentation_list = event_presentation_obj_li else: log.debug(event_presentation_rec_li_result) event_session_obj.event_presentation_list = [] if inc_event_presenter_cat: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.info(f'Need to get event presenter cat list for session. Event Session ID: {event_session_id}') data = {} data['event_session_id'] = event_session_id sql = f""" SELECT event_session_id, GROUP_CONCAT(given_name, ' ', family_name SEPARATOR '; ') AS 'event_presenter_names' FROM event_presenter WHERE event_session_id = :event_session_id GROUP BY event_session_id LIMIT 1 """ if event_presenter_cat_rec_result := sql_select(data=data, sql=sql): log.debug(event_presenter_cat_rec_result.get('event_presenter_names', None)) event_session_obj.event_presenter_cat = event_presenter_cat_rec_result.get('event_presenter_names', None) else: log.warning(event_presenter_cat_rec_result) event_session_obj.event_presenter_cat = None if inc_event_presenter_list: pass if inc_poc_event_person: poc_event_person_obj = load_event_person_obj( event_person_id = poc_event_person_id, inc_address = inc_address, inc_contact = inc_contact, inc_person = inc_person, inc_user = inc_user, ) log.debug(poc_event_person_obj) event_session_obj.poc_event_person = poc_event_person_obj log.debug(event_session_obj) if model_as_dict: return event_session_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member else: return event_session_obj # ### END ### API Event Session Methods ### load_event_session_obj() ### # ### BEGIN ### API Event Session Methods ### get_event_session_rec_list() ### # Updated 2022-09-22 @logger_reset def get_event_session_rec_list( event_id: str = None, event_location_id: str = None, event_track_id: str = None, enabled: str = 'enabled', # enabled, disabled, all approved: str = 'all', # approved, not_approved, all hidden: str = 'not_hidden', # hidden, not_hidden, all review: str = 'all', # ready, not_ready, all limit: int = 100, offset: int = 0, ) -> list|bool: log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass # else: return False if event_location_id := redis_lookup_id_random(record_id_random=event_location_id, table_name='event_location'): pass # else: return False data = {} if event_id: data[f'event_id'] = event_id sql_where_type_id = f'`event_session`.event_id = :event_id' elif event_location_id: data[f'event_location_id'] = event_location_id sql_where_type_id = f'`event_session`.event_location_id = :event_location_id' if review in ['ready', 'not_ready', 'all']: if review == 'ready': data['review'] = True sql_review = f'AND `event_session`.review = :review' elif review == 'not_ready': data['review'] = False sql_review = f'AND `event_session`.review = :review' elif review == 'all': sql_review = '' if approved in ['approved', 'not_approved', 'all']: if approved == 'approved': data['approve'] = True sql_approved = f'AND `event_session`.approve = :approve' elif approved == 'not_approved': data['approve'] = False sql_approved = f'AND `event_session`.approve = :approve' elif approved == 'all': sql_approved = '' if hidden in ['hidden', 'not_hidden', 'all']: if hidden == 'hidden': data['hide'] = True sql_hidden = f'AND `event_session`.hide = :hide' elif hidden == 'not_hidden': data['hide'] = False sql_hidden = f'AND `event_session`.hide = :hide' elif hidden == 'all': sql_hidden = '' if enabled in ['enabled', 'disabled', 'all']: if enabled == 'enabled': data['enable'] = True sql_enabled = f'AND `event_session`.enable = :enable' elif enabled == 'disabled': data['enable'] = False sql_enabled = f'AND `event_session`.enable = :enable' elif enabled == 'all': sql_enabled = '' if limit: data['limit'] = limit sql_limit = f'LIMIT :limit' else: sql_limit = '' sql = f""" SELECT `event_session`.id AS 'event_session_id', `event_session`.id_random AS 'event_session_id_random' FROM `event_session` AS `event_session` WHERE {sql_where_type_id} {sql_review} {sql_approved} {sql_hidden} {sql_enabled} ORDER BY `event_session`.priority DESC, -`event_session`.sort DESC, `event_session`.start_datetime ASC, `event_session`.name ASC, `event_session`.created_on DESC, `event_session`.updated_on DESC {sql_limit}; """ log.debug(sql) if event_session_rec_li_result := sql_select(data=data, sql=sql, as_list=True): log.info('Got a list result') event_session_rec_li = event_session_rec_li_result else: # [] or False log.info('No results or something went wrong') event_session_rec_li = event_session_rec_li_result log.debug(event_session_rec_li_result) return event_session_rec_li # ### END ### API Event Session Methods ### get_event_session_rec_list() ### # ### BEGIN ### API Event Session Methods ### create_update_event_session_obj_v4() ### # NOTE: This will create or update a event_session. # Rewrite and updated 2021-08-25 @logger_reset def create_update_event_session_obj_v4( event_session_dict_obj: Event_Session_Base|dict, event_session_id: int|str = None, event_id: int|str = None, event_location_id: int|str = None, # For future? # event_track_id: int|str = None, # For future? create_sub_obj: bool = False, fail_any: bool = False, # Fail if any thing goes wrong for sub objects return_outline: bool = False, ) -> int|bool: log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) log.info('Checking requirements...') if event_session_id: log.info(f'Event Session ID passed. Update existing Event Session. Event Session ID: {event_session_id}') if event_session_id := redis_lookup_id_random(record_id_random=event_session_id, table_name='event_session'): pass else: log.error('Event Session ID passed but is invalid. Failed requirement.') return False if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass else: log.error('Missing or invalid Event ID passed. Not required. Ignoring.') # log.info(f'Event ID: {event_id}') # log.info('Attempting to get Event ID from related object.') # from app.methods.event_methods import get_event_id_w_for_type_id # if event_id := get_event_id_w_for_type_id(for_type='event_session', for_id=event_session_id): pass # else: # log.error('Unable to get Event ID from related object.') # False else: log.info('No Event Session ID passed. Create new Event Session. Required: Account ID, Event ID') if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass else: log.error('Missing or invalid Event ID passed. Failed requirement.') log.info(f'Event ID: {event_id}') return False log.debug(type(event_session_dict_obj)) if isinstance(event_session_dict_obj, dict): event_session_dict = event_session_dict_obj if event_session_id: event_session_dict['event_session_id'] = event_session_id if event_id: event_session_dict['event_id'] = event_id try: event_session_obj = Event_Session_Base(**event_session_dict) log.warning(event_session_obj) except ValidationError as e: log.error(e.json()) return False else: event_session_obj = event_session_dict_obj if event_session_id: # NOTE: Can't update the ID alias if it was never set. event_session_obj.id = event_session_id if event_id: event_session_obj.event_id = event_id event_session_dict = event_session_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'event_presentation', 'event_presentation_list', 'created_on', 'updated_on'}) if event_session_id: if event_session_dict_up_result := sql_update(data=event_session_dict, table_name='event_session', rm_id_random=True): pass else: log.warning(f'Event Session not updated. Event Session ID: {event_session_id}') log.debug(event_session_dict_up_result) return False log.debug(event_session_dict_up_result) else: if event_session_dict_in_result := sql_insert(data=event_session_dict, table_name='event_session', rm_id_random=True, id_random_length=8): pass else: log.warning(f'Event Session not created.') log.debug(event_session_dict_in_result) return False log.debug(event_session_dict_in_result) event_session_id = event_session_dict_in_result log.debug(event_session_id) event_session_outline = {} event_session_outline['event_session_id'] = event_session_id event_session_outline['event_presentation_list'] = [] if event_session_obj.event_presentation_list and isinstance(event_session_obj.event_presentation_list, list): log.info(f'Event Presentation List was found. Loop through and create a new Event Presentation for each and link them to the new Event Session. Event Session ID: {event_session_id}') for event_presentation_obj in event_session_obj.event_presentation_list: # NOTE: Use object model version because of better type checking and validations log.debug(event_presentation_obj) if event_presentation_id := event_presentation_obj.id: pass else: event_presentation_id = None # event_presentation_obj.event_id = event_id # event_presentation_obj.event_session_id = event_session_id create_update_event_presentation_obj_result = create_update_event_presentation_obj_v4( event_presentation_dict_obj = event_presentation_obj, event_presentation_id = event_presentation_id, event_id = event_id, # event_location_id = event_location_id, event_session_id = event_session_id, # event_track_id = event_track_id, fail_any = fail_any, return_outline = return_outline, ) if isinstance(create_update_event_presentation_obj_result, int): event_presentation_id = create_update_event_presentation_obj_result elif create_update_event_presentation_obj_result == True: pass else: log.warning(f'Create or Update failed while trying create_update_event_presentation_obj_v4(): {create_update_event_presentation_obj_result}') event_presentation_id = None event_session_outline['event_presentation_id'] = event_presentation_id if return_outline: log.debug(f'Returning the Event Session Outline: {event_session_outline}') return event_session_outline else: log.debug(f'Returning the Event Session ID: {event_session_id}') return event_session_id # ### END ### API Event Session Methods ### create_update_event_session_obj_v4() ### # ### BEGIN ### API Event Session Methods ### create_event_session_obj() ### # Updated 2022-04-12 @logger_reset def create_event_session_obj( event_id: int|str, event_session_obj_new: Event_Session_Base, create_sub_obj: bool = False, fail_any: bool = False, # Fail if any thing goes wrong for sub objects ) -> bool: log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass else: return False log.debug(type(event_session_obj_new)) if isinstance(event_session_obj_new, dict): event_session_dict = event_session_obj_new try: event_session_obj = Event_Session_Base(**event_session_dict) except ValidationError as e: log.error(e.json()) return False log.debug(event_session_obj) else: event_session_dict = event_session_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'event_presentation', 'event_presentation_list', 'event_presenter', 'event_presenter_list', 'created_on', 'updated_on'}) event_session_dict['event_id'] = event_id log.debug(event_session_dict) if event_session_obj_in_result := sql_insert(data=event_session_dict, table_name='event_session', rm_id_random=True, id_random_length=8): pass else: log.warning(f'Event Session not created.') log.debug(event_session_obj_in_result) return False event_session_id = event_session_obj_in_result return_dict = {} return_dict['event_session_id'] = None return_dict['event_presentation_list'] = [] if event_session_dict.get('event_presentation_list') and isinstance(event_session_dict.get('event_presentation_list'), list): log.info(f'Event Presentation List was found. Loop through and create a new Event Presentation for each and link them to the new Event Session. Event Session ID: {event_session_id}') for event_presentation_obj in event_session_dict.get('event_presentation_list'): # NOTE: This does not account for an edge case where the presentation already exists. Possibly as part of another session. if create_event_presentation_obj_result := create_event_presentation_obj( event_session_id = event_session_id, event_presentation_obj_new = event_presentation_obj, create_sub_obj = create_sub_obj, fail_any = fail_any, ): if isinstance(create_event_presentation_obj_result, int): event_presentation_id = create_event_presentation_obj_result log.info(f'Event Presentation created. Event Presentation ID: {event_presentation_id}') else: log.warning(f'Event Presentation not created. Event Session ID: {event_session_id}') log.debug(create_event_presentation_obj_result) event_presentation_id = None if fail_any: return False else: log.warning(f'Event Presentation not created. Event Session ID: {event_session_id}') log.debug(create_event_presentation_obj_result) event_presentation_id = None if fail_any: return False return_dict['event_presentation_list'].append(event_presentation_id) else: log.info('Event Presentation List not found') pass log.info(f'The Event Session has been created. Event Session ID: {event_session_id}') return event_session_id # ### END ### API Event Session Methods ### create_event_session_obj() ### # ### BEGIN ### API Event Session Methods ### update_event_session_obj_v3() ### # Updated 2022-04-12 @logger_reset def update_event_session_obj_v3( event_session_id: int|str, event_session_obj_exist: Event_Session_Base, create_sub_obj: bool = False, fail_any: bool = False, # Fail if any thing goes wrong for sub objects ) -> bool: log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if event_session_id := redis_lookup_id_random(record_id_random=event_session_id, table_name='event_session'): pass else: return False log.debug(type(event_session_obj_exist)) if isinstance(event_session_obj_exist, dict): event_session_dict = event_session_obj_exist try: event_session_obj = Event_Session_Base(**event_session_obj_exist) except ValidationError as e: log.error(e.json()) return False log.debug(event_session_obj) else: event_session_dict = event_session_obj_exist.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'event_presentation', 'event_presentation_list', 'event_presenter', 'event_presenter_list', 'created_on', 'updated_on'}) # Can't update the event_session_id alias if the .id was never set. # event_session_obj.event_session_id = event_session_id if not event_session_obj.id: event_session_obj.id = event_session_id # event_presenter_dict['event_id'] = event_id # if event_track_id: # event_presenter_dict['for_type'] = 'event_track' # event_presenter_dict['for_id'] = event_track_id # event_presenter_dict['event_track_id'] = event_track_id log.debug(event_session_dict) if event_session_obj_up_result := sql_update(data=event_session_dict, table_name='event_session', record_id=event_session_id, rm_id_random=True): pass else: log.warning(f'Event Session not updated.') log.debug(event_session_obj_up_result) return False return_dict = {} return_dict['event_session_id'] = event_session_id return_dict['event_presentation_list'] = [] if event_session_obj.event_presentation_list and isinstance(event_session_obj.event_presentation_list, list): for event_presentation_obj_unknown in event_session_obj.event_presentation_list: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(event_presentation_obj_unknown) if event_presentation_id := event_presentation_obj_unknown.get('event_presentation_id_random', None): if update_event_presentation_obj_result := update_event_presentation_obj_v3( event_presentation_id = event_presentation_id, event_presentation_obj_exist = event_presentation_obj_unknown, create_sub_obj = create_sub_obj, fail_any = fail_any, ): event_presentation_id = update_event_presentation_obj_result log.info(f'Event Presentation updated. Event Presentation ID: {event_presentation_id}') else: log.warning(f'Event Presentation not updated. Event Session ID: {event_session_id}') log.debug(update_event_presentation_obj_result) event_presentation_id = None if fail_any: return False if isinstance(update_event_presentation_obj_result, int): event_presentation_id = update_event_presentation_obj_result log.info(f'Event Presentation updated. Event Presentation ID: {event_presentation_id}') else: log.warning(f'Event Presentation not updated. Event Session ID: {event_session_id}') log.debug(update_event_presentation_obj_result) event_presentation_id = None if fail_any: return False else: log.info(f'No Event Presentation ID found.') if create_event_presentation_obj_result := create_event_presentation_obj( event_session_id = event_session_id, event_presentation_obj_new = event_presentation_obj_unknown, create_sub_obj = create_sub_obj, fail_any = fail_any, ): if isinstance(create_event_presentation_obj_result, int): event_presentation_id = create_event_presentation_obj_result log.info(f'Event Presentation created. Event Presentation ID: {event_presentation_id}') else: log.warning(f'Event Presentation not created. Event Session ID: {event_session_id}') log.debug(create_event_presentation_obj_result) event_presentation_id = None if fail_any: return False else: log.warning(f'Event Presentation not created. Event Session ID: {event_session_id}') log.debug(create_event_presentation_obj_result) event_presentation_id = None if fail_any: return False return_dict['event_presentation_list'].append(event_presentation_id) else: log.info('Event Presentation List not found or not in a list.') pass log.info(f'The event session has been updated. Event Session ID: {event_session_id}') return True # ### END ### API Event Session Methods ### update_event_session_obj_v3() ### # ### BEGIN ### API Event Session Methods ### update_event_session_obj() ### # This will be taken over by _exist version # Updated 2022-04-12 @logger_reset def update_event_session_obj( event_session_id: int|str, # Ideally the int ID should be passed. This allows for updating of the id_random value. event_session_obj_up: Event_Session_Base, create_sub_obj: bool = False, ) -> bool: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if event_session_id := redis_lookup_id_random(record_id_random=event_session_id, table_name='event_session'): pass else: return False event_session_obj_up.id = event_session_id log.debug(event_session_obj_up) # log.debug(event_session_obj_up.dict(by_alias=True, exclude_unset=True)) log.debug(event_session_obj_up.dict(by_alias=False, exclude_unset=True)) # log.debug(event_session_obj_up.dict(by_alias=False, exclude_unset=False)) if event_session_obj_up.poc_event_person_id and event_session_obj_up.poc_event_person: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL poc_event_person_id = event_session_obj_up.poc_event_person_id poc_event_person_obj_up = event_session_obj_up.poc_event_person log.debug(poc_event_person_id) log.debug(poc_event_person_obj_up) if poc_event_person_obj_up_result := update_event_person_obj( event_person_id=poc_event_person_id, event_person_obj_up=poc_event_person_obj_up, create_sub_obj=create_sub_obj, ): log.debug(poc_event_person_obj_up_result) else: log.debug(poc_event_person_obj_up_result) return False elif event_session_obj_up.poc_event_person and not event_session_obj_up.poc_event_person.id: # NOTE: This will blindly create a new poc_event_person even if there was one associated but the event_session.poc_event_person_id was not found. poc_event_person_obj_in = event_session_obj_up.poc_event_person log.debug(poc_event_person_obj_in) if poc_event_person_obj_in_result := create_event_person_obj(event_person_obj_new=poc_event_person_obj_in): # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(poc_event_person_obj_in_result) event_session_obj_up.poc_event_person_id = poc_event_person_obj_in_result else: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(poc_event_person_obj_in_result) return False if event_session_obj_up.event_presentation_id and event_session_obj_up.event_presentation: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL event_presentation_id = event_session_obj_up.event_presentation_id event_presentation_obj_up = event_session_obj_up.event_presentation log.debug(event_presentation_id) log.debug(event_presentation_obj_up) if event_presentation_obj_up_result := update_event_presentation_obj( event_presentation_id=event_presentation_id, event_presentation_obj_up=event_presentation_obj_up, create_sub_obj=create_sub_obj, ): log.debug(event_presentation_obj_up_result) else: log.debug(event_presentation_obj_up_result) return False elif event_session_obj_up.event_presentation and not event_session_obj_up.event_presentation.id: # NOTE: This will blindly create a new event_presentation even if there was one associated but the event_session.event_presentation_id was not found. event_presentation_obj_in = event_session_obj_up.event_presentation log.debug(event_presentation_obj_in) if event_presentation_obj_in_result := create_event_presentation_obj(event_session_id=event_session, event_presentation_obj_new=event_presentation_obj_in): # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(event_presentation_obj_in_result) event_session_obj_up.event_presentation_id = event_presentation_obj_in_result else: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(event_presentation_obj_in_result) return False event_session_dict_up = event_session_obj_up.dict(by_alias=False, exclude_unset=True, exclude={'event_abstract', 'event_abstract_list', 'event_file_list', 'event_person', 'event_presentation', 'event_session', 'person', 'user'}) log.debug(event_session_dict_up) if event_session_obj_up_result := sql_update(data=event_session_dict_up, table_name='event_session', rm_id_random=True): log.debug(event_session_obj_up_result) return True else: log.debug(event_session_obj_up_result) return False # ### END ### API Session Methods ### update_event_session_obj() ###