Files
OSIT-AE-API-FastAPI/app/methods/event_device_methods.py

224 lines
8.9 KiB
Python

import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_enable_part, sql_insert, sql_limit_offset_part, sql_select, sql_update
from app.lib_general import log, logging, logger_reset
from app.methods.event_cfg_methods import load_event_cfg_obj
from app.methods.event_location_methods import load_event_location_obj
from app.models.common_field_schema import default_num_bytes
from app.models.event_cfg_models import Event_Cfg_Base
from app.models.event_device_models import Event_Device_Base
from app.models.event_location_models import Event_Location_Base
# ### BEGIN ### API Event Device Methods ### load_event_device_obj() ###
# Updated 2022-03-09
@logger_reset
def load_event_device_obj(
event_device_id: int,
enabled: str = 'enabled', # enabled, disabled, all
limit: int = 100,
offset: int = 0,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
inc_event_cfg: bool = False,
inc_event_location: bool = False,
) -> Event_Device_Base|dict|bool:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if event_device_rec := sql_select(table_name='v_event_device', record_id=event_device_id): pass
else: return False
log.debug(event_device_rec)
try:
event_device_obj = Event_Device_Base(**event_device_rec)
except ValidationError as e:
log.error(e.json())
return False
log.debug(event_device_obj)
# Updated 2022-03-09
if inc_event_cfg:
log.info('Need to include event configuration...')
event_id = event_device_rec.get('event_id', None)
log.debug(event_id)
if event_cfg_result := load_event_cfg_obj(
event_id = event_id,
):
event_device_obj.event_cfg = event_cfg_result
else: event_device_obj.event_cfg = {} # None
# Updated 2022-03-09
if inc_event_location:
log.info('Need to include event location...')
event_location_id = event_device_rec.get('event_location_id', None)
log.debug(event_location_id)
if event_location_result := load_event_location_obj(
event_location_id = event_location_id,
):
event_device_obj.event_location = event_location_result
else: event_device_obj.event_location = {} # None
if model_as_dict:
return event_device_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return event_device_obj
# ### END ### API Event Device Methods ### load_event_device_obj() ###
# ### BEGIN ### API Event Device Methods ### get_event_device_rec_list() ###
# Updated 2022-03-09
@logger_reset
def get_event_device_rec_list(
for_type: str, # 'account' is a special case
for_id: str,
enabled: str = 'enabled', # enabled, disabled, all
limit: int = 100,
offset: int = 0,
) -> list|bool:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_id := redis_lookup_id_random(record_id_random=for_id, table_name=for_type): pass
else: return False
if for_type == 'account':
sql_for_type_id = f'`event_device`.account_id = :for_id'
elif for_type == 'event':
sql_for_type_id = f'`event_device`.event_id = :for_id'
elif for_type == 'event_location':
sql_for_type_id = f'`event_device`.event_location_id = :for_id'
else: return False
data = {}
data['for_type'] = for_type
data['for_id'] = for_id
sql_enabled, data['enable'] = sql_enable_part(table_name='event_device', enabled=enabled) # Reasonably safe return str and bool
sql_limit = sql_limit_offset_part(limit=limit, offset=offset) # Reasonably safe return str
sql = f"""
SELECT `event_device`.id AS 'event_device_id', `event_device`.id_random AS 'event_device_id_random'
FROM `v_event_device` AS `event_device`
WHERE
{sql_for_type_id}
{sql_enabled}
ORDER BY `event_device`.created_on DESC, `event_device`.updated_on DESC
{sql_limit};
"""
log.debug(sql)
if event_device_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
event_device_rec_li = event_device_rec_li_result
else: # [] or False
event_device_rec_li = event_device_rec_li_result
log.debug(event_device_rec_li_result)
return event_device_rec_li
# ### END ### API Event Device Methods ### get_event_device_rec_list() ###
# ### BEGIN ### API Event Device Methods ### create_update_event_device_obj() ###
# Updated 2022-03-09
def create_update_event_device_obj(
event_device_dict_obj: Event_Device_Base|dict,
event_device_id: int|str = None,
event_id: int|str = None,
event_location_id: int|str = None,
create_sub_obj: bool = False,
fail_any: bool = False, # Fail if any thing goes wrong for sub objects
return_outline: bool = False,
) -> int|bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# ### SECTION ### Secondary data validation
log.info('Checking requirements...')
if event_device_id:
log.info(f'Event Device ID passed. Update existing Event Device. Event Device ID: {event_device_id}')
if event_device_id := redis_lookup_id_random(record_id_random=event_device_id, table_name='event_device'): pass
else:
log.error('Event Device ID passed but is invalid. Failed requirement.')
return False
else:
log.info('No Event Device ID passed. Create new Event Device. Required: Event ID; Optional: Event Location ID')
if event_id := redis_lookup_id_random(record_id_random=event_id, table_name='event'): pass
else:
log.error('Missing or invalid Event ID passed. Failed requirement.')
log.info(f'Event ID: {event_id}')
return False
if event_location_id := redis_lookup_id_random(record_id_random=event_location_id, table_name='event_location'): pass
elif event_location_id is None: pass
else:
log.warning('Missing or invalid Event Location ID passed. Failed requirement.')
log.info(f'Event Location ID: {event_location_id}')
return False
log.info('Create dictionary or Pydantic object')
log.debug(type(event_device_dict_obj))
if isinstance(event_device_dict_obj, dict):
event_device_dict = event_device_dict_obj
if event_device_id:
event_device_dict['event_device_id'] = event_device_id
if event_id:
event_dict['event_id'] = event_id
if event_location_id:
event_location_dict['event_location_id'] = event_location_id
try:
event_device_obj = Event_Device_Base(**event_device_dict)
except ValidationError as e:
log.error(e.json())
return False
else:
event_device_obj = event_device_dict_obj
if event_device_id:
# NOTE: Can't update the ID alias if it was never set.
event_device_obj.id = event_device_id
if event_id:
event_device_obj.event_id = event_id
if event_location_id:
event_device_obj.event_location_id = event_location_id
log.debug(event_device_obj)
event_device_dict = event_device_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'event_cfg', 'event_location', 'account_id', 'account_id_random', 'created_on', 'updated_on'})
# ### SECTION ### Process data
if event_device_id:
if event_device_dict_up_result := sql_update(data=event_device_dict, table_name='event_device', rm_id_random=True): pass
else:
log.warning(f'Event Device not updated. Event Device ID: {event_device_id}')
log.debug(event_device_dict_up_result)
return False
log.debug(event_device_dict_up_result)
else:
if event_device_dict_in_result := sql_insert(data=event_device_dict, table_name='event_device', rm_id_random=True, id_random_length=8): pass
else:
log.warning(f'Event Device not created.')
log.debug(event_device_dict_in_result)
return False
log.debug(event_device_dict_in_result)
event_device_id = event_device_dict_in_result
event_device_outline = {}
event_device_outline['event_device_id'] = event_device_id
if return_outline:
log.debug(f'Returning the Event Device Outline: {event_device_outline}')
return event_device_outline
else:
log.debug(f'Returning the Event Device ID: {event_device_id}')
return event_device_id
# ### END ### API Event Device Methods ### create_update_event_device_obj() ###