Files
OSIT-AE-API-FastAPI/app/methods/site_domain_methods.py

196 lines
7.6 KiB
Python

import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_enable_part, sql_insert, sql_limit_offset_part, sql_select, sql_update
from app.lib_general import log, logging, logger_reset
from app.models.site_domain_models import Site_Domain_Base
# ### BEGIN ### API Site Domain Methods ### create_site_domain_obj() ###
def create_site_domain_obj(site_domain_obj_new:Site_Domain_Base):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if not site_domain_obj_new:
return False
site_domain_obj_data = site_domain_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'user', 'created_on', 'updated_on'})
if site_domain_obj_in_result := sql_insert(
data=site_domain_obj_data,
table_name='site_domain',
rm_id_random=True,
id_random_length=8
): pass
else: return False
log.debug(site_domain_obj_in_result)
site_domain_id = site_domain_obj_in_result
log.debug(f'New site_domain_id: {site_domain_id}')
return site_domain_id
# ### END ### API Site Domain Methods ### create_site_domain_obj() ###
# ### BEGIN ### API Site Domain Methods ### load_site_domain_obj() ###
def load_site_domain_obj(
site_domain_id: int|str,
enabled: str = 'enabled', # enabled, disabled, all
limit: int = 100,
offset: int = 0,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
# inc_site_domain_list: bool = False,
) -> Site_Domain_Base|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if site_domain_id := redis_lookup_id_random(record_id_random=site_domain_id, table_name='site_domain'): pass
else: return False
if site_domain_rec := sql_select(table_name='v_site_domain', record_id=site_domain_id):
log.debug(site_domain_rec)
else: return False
# if site_domain_rec := sql_select(table_name='site_domain', record_id=site_domain_id):
# log.debug(site_domain_rec)
# else: return False
log.debug(site_domain_rec)
try:
site_domain_obj = Site_Domain_Base(**site_domain_rec)
log.debug(site_domain_obj)
except ValidationError as e:
log.error(e.json())
if model_as_dict:
return site_domain_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return site_domain_obj
# ### END ### API Site Domain Methods ### load_site_domain_obj() ###
# ### BEGIN ### API Site Domain Methods ### update_site_domain_obj() ###
def update_site_domain_obj(
site_domain_id: int|str, # This allows for updating of the id_random value.
site_domain_obj_up: Site_Domain_Base,
create_sub_obj: bool = False,
) -> bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if site_domain_id := redis_lookup_id_random(record_id_random=site_domain_id, table_name='site_domain'): pass
else: return False
site_domain_obj_up.id = site_domain_id
log.debug(site_domain_obj_up)
log.debug(site_domain_obj_up.dict(by_alias=False, exclude_unset=True))
# log.debug(site_domain_obj_up.dict(by_alias=False, exclude_unset=False))
site_domain_dict_up = site_domain_obj_up.dict(by_alias=False, exclude_unset=True, exclude={'user'})
log.debug(site_domain_dict_up)
if site_domain_obj_up_result := sql_update(data=site_domain_dict_up, table_name='site_domain', rm_id_random=True):
log.debug(site_domain_obj_up_result)
return True
else:
log.debug(site_domain_obj_up_result)
return False
# ### END ### API Site Domain Methods ### update_site_domain_obj() ###
# ### BEGIN ### API Site Domain Methods ### get_site_domain_rec_list() ###
def get_site_domain_rec_list(
site_id: int,
enabled: str = 'enabled', # enabled, disabled, all
limit: int = 100,
offset: int = 0,
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if site_id := redis_lookup_id_random(record_id_random=site_id, table_name='site'): pass
else: return False
data = {}
data['site_id'] = site_id
sql_enabled, data['enable'] = sql_enable_part(table_name='site_domain', enabled=enabled) # Reasonably safe return str and bool
sql_limit = sql_limit_offset_part(limit=limit, offset=offset) # Reasonably safe return str
sql = f"""
SELECT `site_domain`.id AS 'site_domain_id', `site_domain`.id_random AS 'site_domain_id_random'
FROM `site_domain` AS `site_domain`
WHERE
`site_domain`.site_id = :site_id
{sql_enabled}
ORDER BY `site_domain`.fqdn ASC, `site_domain`.access_key ASC, `site_domain`.required_referrer ASC, `site_domain`.created_on DESC, `site_domain`.updated_on DESC
{sql_limit};
"""
if site_domain_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
site_domain_rec_li = site_domain_rec_li_result
else:
site_domain_rec_li = []
return site_domain_rec_li
# ### END ### API Site Domain Methods ### get_site_domain_rec_list() ###
# ### BEGIN ### API Site Domain Methods ### lookup_site_domain_fqdn() ###
def lookup_site_domain_fqdn(
fqdn: str,
# TODO: Accept access_key as an argument for validation (str|None)
# access_key: Optional[str] = None,
enabled: str = 'enabled', # enabled, disabled, all
limit: int = 100,
offset: int = 0,
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
data = {}
data['fqdn'] = fqdn
# TODO: If access_key is provided, add it to the data dict for SQL parameterization
# if access_key is not None:
# data['access_key'] = access_key
sql_enabled, data['enable'] = sql_enable_part(table_name='site_domain', enabled=enabled) # Reasonably safe return str and bool
sql_limit = sql_limit_offset_part(limit=limit, offset=offset) # Reasonably safe return str
# TODO: Add access_key to WHERE clause if provided, e.g.:
# WHERE site_domain.fqdn = :fqdn AND (:access_key IS NULL OR site_domain.access_key = :access_key)
sql = f"""
SELECT `site_domain`.id AS 'site_domain_id', `site_domain`.id_random AS 'site_domain_id_random'
FROM `v_site_domain` AS site_domain
WHERE
site_domain.fqdn = :fqdn
-- TODO: Add access_key check here for stricter validation
-- AND (:access_key IS NULL OR site_domain.access_key = :access_key)
{sql_enabled}
ORDER BY `site_domain`.fqdn ASC, `site_domain`.access_key ASC, `site_domain`.required_referrer ASC, `site_domain`.created_on DESC, `site_domain`.updated_on DESC
{sql_limit};
"""
if site_domain_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
site_domain_rec_li = site_domain_rec_li_result
else:
site_domain_rec_li = []
return site_domain_rec_li
# ---
# To restore access_key validation:
# 1. Accept access_key as a parameter to this function (and any API endpoint calling it).
# 2. Add access_key to the SQL WHERE clause (see above) so only matching records are returned.
# 3. If access_key is required, return empty or error if not matched.
# 4. Update API docs and tests to reflect the new/required parameter.
# ### END ### API Site Domain Methods ### get_site_domain_rec_list() ###