Files
OSIT-AE-API-FastAPI/app/methods/event_methods.py
2022-08-17 18:49:37 -04:00

1029 lines
43 KiB
Python

import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging, logger_reset
from app.methods.address_methods import load_address_obj
from app.methods.contact_methods import load_contact_obj
from app.methods.event_cfg_methods import create_update_event_cfg_obj_v4, load_event_cfg_obj
from app.methods.event_location_methods import get_event_location_rec_list, load_event_location_obj
from app.methods.event_session_methods import get_event_session_rec_list, load_event_session_obj
from app.methods.person_methods import create_update_person_obj_v4b, load_person_obj
# from app.methods.user_methods import create_user_obj, load_user_obj, update_user_obj
from app.models.common_field_schema import default_num_bytes
from app.models.event_models import Event_Base
from app.models.event_cfg_models import Event_Cfg_Base
# ### BEGIN ### API Event Methods ### load_event_obj() ###
@logger_reset
def load_event_obj(
event_id: int|str,
enabled: str = 'enabled', # enabled, disabled, all
approved: str = 'all', # approved, not_approved, all
hidden: str = 'not_hidden', # hidden, not_hidden, all
review: str = 'all', # ready, not_ready, all
inc_file_count: bool = False, # NOTE: file counts are from separate views
event_file_file_purpose_id: int = None,
event_file_file_purpose: str = None,
event_file_priority: bool = None,
event_file_group: str = None,
inc_address: bool = False, # Loads address_location and under contact(s)
# inc_address_location: bool = False,
inc_contact: bool = False, # Loads all 3 contacts
# inc_contact_1: bool = False,
# inc_contact_2: bool = False,
# inc_contact_3: bool = False,
inc_event_abstract_list: bool = False,
inc_event_badge_list: bool = False,
inc_event_cfg: bool = False,
inc_event_device_list: bool = False,
inc_event_exhibit_list: bool = False,
inc_event_file_list: bool = False,
inc_event_location: bool = False, # For event_session child object
inc_event_location_list: bool = False,
inc_event_person_list: bool = False,
inc_event_presentation_list: bool = False,
inc_event_presenter_cat: bool = False, # For event_session child object
inc_event_presenter_list: bool = False,
inc_event_registration_cfg: bool = False,
inc_event_registration_list: bool = False,
inc_event_session_list: bool = False,
inc_event_track: bool = False, # For event_session child object
inc_event_track_list: bool = False,
inc_order_list: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
inc_poc_event_person: bool = False,
inc_product: bool = False,
inc_product_list: bool = False,
# inc_user: bool = False,
limit: int = 1000,
offset: int = 0,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
) -> Event_Base|bool:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else: return False
if inc_file_count:
log.info('Using view with file count')
if event_rec := sql_select(table_name='v_event_w_file_count', record_id=event_id): pass
else:
return False
else:
if event_rec := sql_select(table_name='v_event', record_id=event_id): pass
else:
return False
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_rec)
try:
event_obj = Event_Base(**event_rec)
log.debug(event_obj)
except ValidationError as e:
log.error(e.json())
return False
account_id = event_rec.get('account_id', None)
poc_event_person_id = event_rec.get('poc_event_person_id', None)
poc_person_id = event_rec.get('poc_person_id', None)
user_id = event_rec.get('user_id', None)
# Updated 2021-06-30
if inc_address: # This address is directly linked from the event record.
log.info('Need to include event location address data...')
address_location_id = event_rec.get('address_location_id', None) # Should this be location_address_id? Reverse the field naming?
log.debug(address_location_id)
if address_location_result := load_address_obj(
address_id = address_location_id,
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
):
event_obj.address_location = address_location_result
else: event_obj.address_location = None
# Updated 2021-06-30
if inc_contact: # Just load all 3 of the contacts
log.info('Need to include event contacts (3x) data...')
contact_1_id = event_rec.get('contact_1_id', None)
log.debug(contact_1_id)
if contact_1_result := load_contact_obj(
contact_id = contact_1_id,
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_address = inc_address,
):
event_obj.contact_1 = contact_1_result
else: event_obj.contact_1 = None
contact_2_id = event_rec.get('contact_2_id', None)
log.debug(contact_2_id)
if contact_2_result := load_contact_obj(
contact_id = contact_2_id,
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_address = inc_address,
):
event_obj.contact_2 = contact_2_result
else: event_obj.contact_2 = None
contact_3_id = event_rec.get('contact_3_id', None)
log.debug(contact_3_id)
if contact_3_result := load_contact_obj(
contact_id = contact_3_id,
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_address = inc_address,
):
event_obj.contact_3 = contact_3_result
else: event_obj.contact_3 = None
if inc_event_abstract_list: pass
# Updated 2022-06-14
if inc_event_badge_list:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Need to include event badge list...')
from app.methods.event_badge_methods import get_event_badge_rec_list, load_event_badge_obj
if event_badge_rec_list_result := get_event_badge_rec_list(
event_id = event_id,
badge_type_code = badge_type_code,
enabled = enabled,
limit = limit,
offset = offset,
):
event_badge_result_list = []
for event_badge_rec in event_badge_rec_list_result:
if load_event_badge_result := load_event_badge_obj(
event_badge_id = event_badge_rec.get('event_badge_id', None),
enabled = enabled,
):
event_badge_result_list.append(load_event_badge_result)
else:
event_badge_result_list.append(None)
log.debug(event_badge_result_list)
event_obj.event_badge_list = event_badge_result_list
elif isinstance(event_badge_rec_list_result, list):
event_obj.event_badge_list = []
else:
event_obj.event_badge_list = None
# Updated 2022-03-09
if inc_event_cfg:
log.info('Need to include event configuration...')
if event_cfg_result := load_event_cfg_obj(
event_id = event_id,
inc_event_registration_cfg = inc_event_registration_cfg,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
):
event_obj.event_cfg = event_cfg_result
else: event_obj.event_cfg = {} # None
log.debug(event_obj.event_cfg)
# Updated 2022-04-17
if inc_event_device_list:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Need to include event device list...')
from app.methods.event_device_methods import get_event_device_rec_list, load_event_device_obj
if event_device_rec_list_result := get_event_device_rec_list(
for_type = 'event',
for_id = event_id,
# event_id = event_id,
enabled = enabled,
):
event_device_result_list = []
for event_device_rec in event_device_rec_list_result:
if load_event_device_result := load_event_device_obj(
event_device_id = event_device_rec.get('event_device_id', None),
enabled = enabled,
):
event_device_result_list.append(load_event_device_result)
else:
event_device_result_list.append(None)
log.debug(event_device_result_list)
event_obj.event_device_list = event_device_result_list
elif isinstance(event_device_rec_list_result, list):
event_obj.event_device_list = []
else:
event_obj.event_device_list = None
# Updated 2022-03-30
if inc_event_exhibit_list:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Need to include event exhibit list...')
from app.methods.event_exhibit_methods import get_event_exhibit_rec_list, load_event_exhibit_obj
if event_exhibit_rec_list_result := get_event_exhibit_rec_list(
event_id = event_id,
enabled = enabled,
):
event_exhibit_result_list = []
for event_exhibit_rec in event_exhibit_rec_list_result:
if load_event_exhibit_result := load_event_exhibit_obj(
event_exhibit_id = event_exhibit_rec.get('event_exhibit_id', None),
enabled = enabled,
):
event_exhibit_result_list.append(load_event_exhibit_result)
else:
event_exhibit_result_list.append(None)
log.debug(event_exhibit_result_list)
event_obj.event_exhibit_list = event_exhibit_result_list
elif isinstance(event_exhibit_rec_list_result, list):
event_obj.event_exhibit_list = []
else:
event_obj.event_exhibit_list = None
# Updated 2021-10-21
if inc_event_file_list:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Need to include event file list...')
from app.methods.event_file_methods import get_event_file_rec_list, load_event_file_obj
if event_file_rec_list_result := get_event_file_rec_list(
for_type = 'event',
for_id = event_id,
file_purpose_id = event_file_file_purpose_id,
file_purpose = event_file_file_purpose,
priority = event_file_priority,
group = event_file_group,
enabled = enabled,
limit = limit,
):
event_file_result_list = []
for event_file_rec in event_file_rec_list_result:
if load_event_file_result := load_event_file_obj(
event_file_id = event_file_rec.get('event_file_id', None),
enabled = enabled,
# inc_hosted_file = inc_hosted_file,
# model_as_dict = True,
# by_alias = by_alias,
# exclude_unset = False,
):
event_file_result_list.append(load_event_file_result)
else:
event_file_result_list.append(None)
log.debug(event_file_result_list)
event_obj.event_file_list = event_file_result_list
elif isinstance(event_file_rec_list_result, list):
event_obj.event_file_list = []
else:
event_obj.event_file_list = None
# Updated 2021-09-29
if inc_event_location_list:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Need to include event location list...')
if event_location_rec_list_result := get_event_location_rec_list(
event_id = event_id,
enabled = enabled, # enabled, disabled, all
hidden = 'all', # hidden, not_hidden, all
limit = limit,
):
event_location_result_list = []
for event_location_rec in event_location_rec_list_result:
if load_event_location_result := load_event_location_obj(
event_location_id = event_location_rec.get('event_location_id', None),
enabled = enabled,
inc_event_file_list = inc_event_file_list,
inc_event_presentation_list = inc_event_presentation_list,
inc_event_presenter_list = inc_event_presenter_list,
inc_event_session_list = inc_event_session_list,
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
# model_as_dict = model_as_dict,
):
event_location_result_list.append(load_event_location_result)
else:
event_location_result_list.append(None)
log.debug(event_location_result_list)
event_obj.event_location_list = event_location_result_list
elif isinstance(event_location_rec_list_result, list):
event_obj.event_location_list = []
else:
event_obj.event_location_list = None
if inc_event_person_list: pass
if inc_event_presentation_list: pass
if inc_event_presenter_list: pass
# if inc_event_registration_cfg: pass
if inc_event_registration_list: pass
# Updated 2021-09-28
if inc_event_session_list:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Need to include event session list...')
if event_session_rec_list_result := get_event_session_rec_list(
event_id = event_id,
enabled = enabled, # enabled, disabled, all
approved = approved, # approve(d), not_approved, all
hidden = hidden, # hidden, not_hidden, all
review = review, # ready, not_ready, all
limit = limit,
):
event_session_result_list = []
for event_session_rec in event_session_rec_list_result:
if load_event_session_result := load_event_session_obj(
event_session_id = event_session_rec.get('event_session_id', None),
enabled = enabled,
limit = limit,
inc_event_file_list = inc_event_file_list,
inc_event_location = inc_event_location,
inc_event_presentation_list = inc_event_presentation_list,
inc_event_presenter_cat = inc_event_presenter_cat,
inc_event_presenter_list = inc_event_presenter_list,
inc_person = inc_person,
inc_poc_event_person = inc_poc_event_person,
by_alias = by_alias,
exclude_unset = exclude_unset,
# model_as_dict = model_as_dict,
):
event_session_result_list.append(load_event_session_result)
else:
event_session_result_list.append(None)
log.debug(event_session_result_list)
event_obj.event_session_list = event_session_result_list
elif isinstance(event_session_rec_list_result, list):
event_obj.event_session_list = []
else:
event_obj.event_session_list = None
if inc_event_track_list: pass
if inc_poc_event_person:
log.info('Need to include event poc event person data...')
log.info('OR??? Need to include event poc person data...?')
poc_event_person_obj = load_person_obj(person_id=poc_event_person_id)
log.debug(poc_event_person_obj)
#event_rec['poc_event_person'] = poc_event_person_obj
#log.debug(event_rec)
event_obj.poc_event_person = poc_event_person_obj
log.debug(event_obj)
return event_obj
# ### END ### API Event Methods ### load_event_obj() ###
# ### BEGIN ### API Event Methods ### get_event_rec_list() ###
# Updated 2021-12-13
@logger_reset
def get_event_rec_list(
account_id: str = None,
organization_id: str = None,
person_id: str = None,
user_id: str = None,
limit: int = 500,
enabled: str = 'enabled', # enabled, disabled, all
hidden: str = 'not_hidden', # hidden, not_hidden, all
archived: str = 'not_archived', # archived, not_archived, all
conference: bool = False, # If it is a conference then organization, person, and user are queried as participants (not the owner/organizer)
) -> list|bool:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id or organization_id or person_id or user_id: pass
else: return False
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: pass
if organization_id := redis_lookup_id_random(record_id_random=organization_id, table_name='organization'): pass
else: pass
if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
else: pass
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: pass
data = {}
data['account_id'] = account_id
data['organization_id'] = organization_id
data['person_id'] = person_id
data['user_id'] = user_id
data['conference'] = conference
if archived in ['archived', 'not_archived', 'all']:
if archived == 'archived':
data['archive'] = True
sql_archived = f'AND `event`.archive = :archive'
elif archived == 'not_archived':
data['archive'] = False
sql_archived = f'AND (`event`.archive = :archive OR `event`.archive IS NULL)'
elif archived == 'all':
sql_archived = ''
else:
sql_archived = f'AND `event`.archive = :archive'
if hidden in ['hidden', 'not_hidden', 'all']:
if hidden == 'hidden':
data['hide'] = True
sql_hidden = f'AND `event`.hide = :hide'
elif hidden == 'not_hidden':
data['hide'] = False
sql_hidden = f'AND (`event`.hide = :hide OR `event`.hide IS NULL)'
elif hidden == 'all':
sql_hidden = ''
else:
sql_hidden = f'AND `event`.hide = :hide'
if conference:
data['conference'] = True
sql_conference = f'AND `event`.conference = :conference'
else:
data['conference'] = False
sql_conference = f'AND `event`.conference = :conference'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `event`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `event`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
else:
sql_enabled = f'AND `event`.enable = :enable'
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
if account_id or not conference and (organization_id or person_id or user_id):
if account_id:
sql_where_type_id = f'`event`.account_id = :account_id'
elif organization_id:
sql_where_type_id = f'`event`.organization_id = :organization_id'
elif person_id:
sql_where_type_id = f'`event`.person_id = :person_id'
elif user_id:
sql_where_type_id = f'`event`.user_id = :user_id'
sql = f"""
SELECT `event`.id AS 'event_id', `event`.id_random AS 'event_id_random'
FROM `event` AS `event`
WHERE
{sql_where_type_id}
{sql_archived}
{sql_hidden}
{sql_enabled}
{sql_conference}
ORDER BY `event`.created_on DESC, `event`.updated_on DESC
{sql_limit};
"""
elif conference and (organization_id or person_id or user_id): # If it is a conference then organization, person, and user are queried as participants (not the owner/organizer)
if organization_id: # Not sure if this makes sense?
sql_inner_join = f'`organization` ON event_person.organization_id = organization.id AND organization.id AND organization.id = :organization_id'
elif person_id:
sql_inner_join = f'`person` ON event_person.person_id = person.id AND person.id AND person.id = :person_id'
elif user_id:
sql_inner_join = f'`user` ON event_person.user_id = user.id AND user.id AND user.id = :user_id'
sql = f"""
SELECT `event`.id AS 'event_id', `event`.id_random AS 'event_id_random'
FROM `event` AS `event`
INNER JOIN `event_person` ON event.id = event_person.event_id
/*INNER JOIN `person` ON event_person.person_id = person.id*/
INNER JOIN {sql_inner_join}
WHERE 1=1
{sql_archived}
{sql_hidden}
{sql_enabled}
{sql_conference}
ORDER BY `event`.created_on DESC, `event`.updated_on DESC
{sql_limit};
"""
log.debug(sql)
if event_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
log.info('Got a list result')
event_rec_li = event_rec_li_result
else: # [] or False
log.info('No results or something went wrong')
event_rec_li = event_rec_li_result
log.debug(event_rec_li_result)
return event_rec_li
# ### END ### API Event Methods ### get_event_rec_list() ###
# ### BEGIN ### API Event Methods ### get_event_meeting_rec_list() ###
# Updated 2021-12-13
@logger_reset
def get_event_meeting_rec_list(
account_id: str = None,
organization_id: str = None,
person_id: str = None,
user_id: str = None,
limit: int = 500,
enabled: str = 'enabled', # enabled, disabled, all
hidden: str = 'not_hidden', # hidden, not_hidden, all
archived: str = 'not_archived', # archived, not_archived, all
) -> list|bool:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id or organization_id or person_id or user_id: pass
else: return False
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: pass
if organization_id := redis_lookup_id_random(record_id_random=organization_id, table_name='organization'): pass
else: pass
if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
else: pass
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: pass
data = {}
data['account_id'] = account_id
data['organization_id'] = organization_id
data['person_id'] = person_id
data['user_id'] = user_id
# data['conference'] = conference
if archived in ['archived', 'not_archived', 'all']:
if archived == 'archived':
data['archive'] = True
sql_archived = f'AND `event`.archive = :archive'
elif archived == 'not_archived':
data['archive'] = False
sql_archived = f'AND (`event`.archive = :archive OR `event`.archive IS NULL)'
elif archived == 'all':
sql_archived = ''
else:
sql_archived = f'AND `event`.archive = :archive'
if hidden in ['hidden', 'not_hidden', 'all']:
if hidden == 'hidden':
data['hide'] = True
sql_hidden = f'AND `event`.hide = :hide'
elif hidden == 'not_hidden':
data['hide'] = False
sql_hidden = f'AND (`event`.hide = :hide OR `event`.hide IS NULL)'
elif hidden == 'all':
sql_hidden = ''
else:
sql_hidden = f'AND `event`.hide = :hide'
# if conference:
# data['conference'] = True
# sql_conference = f'AND `event`.conference = :conference'
# else:
# data['conference'] = False
# sql_conference = f'AND `event`.conference = :conference'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `event`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `event`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
else:
sql_enabled = f'AND `event`.enable = :enable'
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
if account_id or not conference and (organization_id or person_id or user_id):
if account_id:
sql_where_type_id = f'`event`.account_id = :account_id'
elif organization_id:
sql_where_type_id = f'`event`.organization_id = :organization_id'
elif person_id:
sql_where_type_id = f'`event`.person_id = :person_id'
elif user_id:
sql_where_type_id = f'`event`.user_id = :user_id'
sql = f"""
SELECT *
FROM `v_event` AS `event`
WHERE
{sql_where_type_id}
{sql_archived}
{sql_hidden}
{sql_enabled}
ORDER BY `event`.priority DESC, `event`.sort ASC, `event`.updated_on DESC, `event`.created_on DESC
{sql_limit};
"""
elif conference and (organization_id or person_id or user_id): # If it is a conference then organization, person, and user are queried as participants (not the owner/organizer)
if organization_id: # Not sure if this makes sense?
sql_inner_join = f'`organization` ON event_person.organization_id = organization.id AND organization.id AND organization.id = :organization_id'
elif person_id:
sql_inner_join = f'`person` ON event_person.person_id = person.id AND person.id AND person.id = :person_id'
elif user_id:
sql_inner_join = f'`user` ON event_person.user_id = user.id AND user.id AND user.id = :user_id'
sql = f"""
SELECT `event`.id AS 'event_id', `event`.id_random AS 'event_id_random'
FROM `v_event` AS `event`
INNER JOIN `event_person` ON event.id = event_person.event_id
/*INNER JOIN `person` ON event_person.person_id = person.id*/
INNER JOIN {sql_inner_join}
WHERE 1=1
{sql_archived}
{sql_hidden}
{sql_enabled}
ORDER BY `event`.priority DESC, `event`.sort ASC, `event`.updated_on DESC, `event`.created_on DESC
{sql_limit};
"""
log.debug(sql)
if event_rec_li_result := sql_select(data=data, sql=sql):
log.info('Got a list result')
log.debug(event_rec_li_result)
event_rec_li = event_rec_li_result
else: # [] or False
log.info('No results or something went wrong')
event_rec_li = event_rec_li_result
log.debug(event_rec_li_result)
return event_rec_li
# ### END ### API Event Methods ### get_event_meeting_rec_list() ###
# ### BEGIN ### API Event Methods ### load_event_obj_list() ###
def load_event_obj_list(
account_id: int|str|None = None,
user_id: int|str|None = None,
limit: int = 1000,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_contact_1: bool = False,
inc_contact_2: bool = False,
inc_contact_3: bool = False,
inc_event_abstract_list: bool = False,
inc_event_badge_list: bool = False,
inc_event_cfg: bool = False,
inc_event_device_list: bool = False,
inc_event_exhibit_list: bool = False,
inc_event_file_list: bool = False,
inc_event_location: bool = False, # For event_session child object
inc_event_location_list: bool = False,
inc_event_person_list: bool = False,
inc_event_presentation_list: bool = False,
inc_event_presenter_cat: bool = False, # For event_session child object
inc_event_presenter_list: bool = False,
inc_event_registration_cfg: bool = False,
inc_event_registration_list: bool = False,
inc_event_session_list: bool = False,
inc_event_track: bool = False, # For event_session child object
inc_event_track_list: bool = False,
inc_address_location: bool = False,
inc_poc_event_person: bool = False,
inc_person: bool = False,
# inc_user: bool = False,
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
data = {}
if account_id:
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return False
data['account_id'] = account_id
sql_obj_type_id = f'`tbl`.account_id = :account_id'
elif user_id:
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return False
data['user_id'] = user_id
sql_obj_type_id = f'`tbl`.user_id = :user_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
# else: tbl_obj['account'] = None
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'event_id', `tbl`.id_random AS 'event_id_random'
FROM `event` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if event_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_rec_li_result)
event_result_li = []
for event_rec in event_rec_li_result:
event_id = event_rec.get('event_id', None)
if event_result := load_event_obj(
event_id = event_id,
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
# inc_address_location = inc_address,
# inc_contact_1 = inc_contact,
# inc_contact_2 = inc_contact,
# inc_contact_3 = inc_contact,
# inc_event_abstract_list = inc_event_abstract_list,
# inc_event_badge_list = inc_event_badge_list,
# inc_event_device_list = inc_event_device_list,
inc_event_exhibit_list = inc_event_exhibit_list,
inc_event_file_list = inc_event_file_list,
inc_event_location_list = inc_event_location_list,
inc_event_person_list = inc_event_person_list,
inc_event_presentation_list = inc_event_presentation_list,
inc_event_presenter_list = inc_event_presenter_list,
inc_event_registration_list = inc_event_registration_list,
inc_event_session_list = inc_event_session_list,
inc_event_track_list = inc_event_track_list,
# inc_person = inc_person,
# inc_user = inc_user,
):
log.debug(event_result)
event_result_li.append(event_result)
else:
log.debug(event_result)
event_result_li.append(None)
log.debug(event_result_li)
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_rec_li_result)
event_result_li = []
return event_result_li
# ### END ### API Event Methods ### load_event_obj_list() ###
# ### BEGIN ### API Event Person Methods ### get_event_id_w_for_type_id() ###
# Updated 2021-08-25
def get_event_id_w_for_type_id(
for_type: str,
for_id: int|str,
) -> bool|int|None:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_id := redis_lookup_id_random(record_id_random=for_id, table_name=for_type): pass
else: return False
data = {}
data['for_type'] = for_type
data['for_id'] = for_id
sql = f"""
SELECT `for`.id AS 'for_id', `for`.id_random AS 'for_id_random', `for`.event_id AS event_id
FROM {for_type} AS `for`
WHERE `for`.id = :for_id
LIMIT 1;
"""
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if for_data_result := sql_select(data=data, sql=sql):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(for_data_result)
if event_id := for_data_result.get('event_id', None): return event_id
else: return False
else: return None
# ### END ### API Event Person Methods ### get_event_id_w_for_type_id() ###
# ### BEGIN ### API Event Methods ### get_account_id_w_event_id() ###
# Updated 2021-08-24
def get_account_id_w_event_id(
event_id: int|str,
) -> bool|int|None:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else: return False
data = {}
data['event_id'] = event_id
sql = f"""
SELECT `event`.id AS 'event_id', `event`.id_random AS 'event_id_random', `event`.account_id AS account_id
FROM `event` AS `event`
WHERE `event`.id = :event_id
LIMIT 1;
"""
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if event_data_result := sql_select(data=data, sql=sql):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_data_result)
if account_id := event_data_result.get('account_id', None): return account_id
else: return False
else: return None
# ### END ### API Event Methods ### get_account_id_w_event_person_id() ###
# ### BEGIN ### API Event Methods ### update_event_obj() ###
# Updated 2022-08-15
# NOTE: This has been partially moved to v4 to allow for create_update conversion. 2021-09
# NOTE: Commented out sections for create/update poc_person and user. 2022-08
def update_event_obj(
event_id: int|str, # Ideally the int ID should be passed. This allows for updating of the id_random value.
event_obj_up: Event_Base, # NOTE: rename from event_obj_up to event_dict_obj
account_id: int|str|None = None,
create_sub_obj: bool = False,
fail_any: bool = False, # Fail if any thing goes wrong for sub objects
return_outline: bool = False,
) -> bool:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
log.info('Checking requirements...')
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else: return False
account_id = get_account_id_w_event_id(event_id=event_id)
log.debug(type(event_obj_up)) # NOTE: rename from event_obj_up to event_dict_obj
if isinstance(event_obj_up, dict):
event_dict = event_obj_up
if event_id:
event_dict['event_id'] = event_id
if account_id:
event_dict['account_id'] = account_id
try:
event_obj = Event_Base(**event_dict)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_obj)
except ValidationError as e:
log.error(e.json())
return False
else:
event_obj = event_obj_up
if event_id:
# NOTE: Can't update the ID alias if it was never set.
event_obj.id = event_id
if account_id:
event_obj.account_id = account_id
event_dict = event_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'address_location', 'contact_1', 'contact_2', 'contact_3', 'event_cfg', 'event_abstract_list', 'event_badge_list', 'event_exhibit_list', 'event_file_list', 'event_location_list', 'event_person_list', 'event_presentation_list', 'event_presenter_list', 'event_session_list', 'event_track_list', 'poc_person', 'created_on', 'updated_on'}) # NOTE this exclude list should be reviewed again
if event_id:
if event_dict_up_result := sql_update(data=event_dict, table_name='event', rm_id_random=True): pass
else:
log.warning(f'Event not updated. Event ID: {event_id}')
log.debug(event_dict_up_result)
return False
log.debug(event_dict_up_result)
else: # NOTE NOTE NOTE NOTE: This section is for future use when creating create_update_event_obj_vX NOTE NOTE NOTE NOTE
if event_dict_in_result := sql_insert(data=event_dict, table_name='event', rm_id_random=True, id_random_length=default_num_bytes): pass
else:
log.warning(f'Event not created.')
log.debug(event_dict_in_result)
return False
log.debug(event_dict_in_result)
event_id = event_dict_in_result
event_outline = {}
event_outline['event_id'] = event_id
log.debug(event_obj)
# log.debug(event_obj.dict(by_alias=True, exclude_unset=True))
log.debug(event_obj.dict(by_alias=False, exclude_unset=True))
# log.debug(event_obj.dict(by_alias=False, exclude_unset=False))
if event_obj.event_cfg:
event_cfg_obj = event_obj.event_cfg
if create_update_event_cfg_obj_up_result := create_update_event_cfg_obj_v4(
event_cfg_dict_obj = event_cfg_obj,
event_id = event_id,
fail_any = fail_any,
return_outline = return_outline,
):
log.debug(create_update_event_cfg_obj_up_result)
else:
log.debug(create_update_event_cfg_obj_up_result)
return False
# if event_obj.poc_person_id and event_obj.poc_person:
# event_outline['poc_person_id'] = None
# poc_person_id = event_obj.poc_person_id
# poc_person_obj_up = event_obj.poc_person
# log.debug(poc_person_id)
# log.debug(poc_person_obj_up)
# if poc_person_obj_up_result := create_update_person_obj_v4b(
# account_id = account_id,
# person_dict_obj = poc_person_obj_up,
# person_id = poc_person_id,
# ):
# log.debug(poc_person_obj_up_result)
# else:
# log.debug(poc_person_obj_up_result)
# return False
# elif event_obj.poc_person and not event_obj.poc_person.id:
# # NOTE: This will blindly create a new person even if there was one associated but the event.poc_person_id was not found.
# poc_person_obj_in = event_obj.poc_person
# log.debug(poc_person_obj_in)
# if poc_person_obj_in_result := create_update_person_obj_v4b(account_id=account_id, person_dict_obj=poc_person_obj_in):
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(poc_person_obj_in_result)
# event_obj.poc_person_id = poc_person_obj_in_result
# else:
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(poc_person_obj_in_result)
# return False
# if event_obj.user_id and event_obj.user:
# event_outline['user_id'] = None
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# from app.methods.user_methods import update_user_obj
# user_id = event_obj.user_id
# user_obj_up = event_obj.user
# log.debug(user_id)
# log.debug(user_obj_up)
# if user_obj_up_result := update_user_obj(
# user_id = user_id,
# user_dict_obj = user_obj_up,
# create_sub_obj = create_sub_obj,
# ):
# log.debug(user_obj_up_result)
# else:
# log.debug(user_obj_up_result)
# return False
# elif event_obj.user and not event_obj.user.id:
# # NOTE: This will blindly create a new user even if there was one associated but the event.user_id was not found.
# from app.methods.user_methods import create_user_obj
# user_obj_in = event_obj.user
# log.debug(user_obj_in)
# if user_obj_in_result := create_user_obj(account_id=account_id, user_dict_obj=user_obj_in):
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(user_obj_in_result)
# event_obj.user_id = user_obj_in_result
# else:
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(user_obj_in_result)
# return False
# event_dict_up = event_obj.dict(by_alias=False, exclude_unset=True, exclude={'event_abstract_list', 'event_badge', 'event_exhibit_list', 'event_file_list', 'event_location_list', 'event_presentation_list', 'event_presenter_list', 'event_registration', 'event_session', 'event_track', 'person', 'user'})
# log.debug(event_dict_up)
# if event_obj_result := sql_update(data=event_dict_up, table_name='event', rm_id_random=True):
# log.debug(event_obj_result)
# return True
# else:
# log.debug(event_obj_result)
# return False
if return_outline:
log.debug(f'Returning the Event Outline: {event_outline}')
return event_outline
else:
log.debug(f'Returning the Event ID: {event_id}')
return event_id
# ### END ### API Event Methods ### update_event_obj() ###