Refactor: Modularize lib_general.py by extracting core functionalities.

- Extracted Email functions to app/lib_email.py.
- Extracted Export functions to app/lib_export.py.
- Extracted JWT utilities to app/lib_jwt.py.
- Extracted Hash utilities to app/lib_hash.py.
- Updated app/lib_general.py to import from these new modules for backward compatibility.
- Updated V3 Frontend API Guide with latest security and site_domain exception details.
This commit is contained in:
Scott Idem
2026-01-07 17:41:04 -05:00
parent d61dd0f00e
commit 734576817c
6 changed files with 426 additions and 504 deletions

168
app/lib_email.py Normal file
View File

@@ -0,0 +1,168 @@
import html2text
import smtplib, ssl
import logging
from email.message import EmailMessage
from email.headerregistry import Address
from typing import Optional
from app.log import logger_reset
from app.config import settings
log = logging.getLogger(__name__)
# ### BEGIN ### API Lib Email ### send_email() ###
# Moved from lib_general.py 2026-01-07
@logger_reset
def send_email(
from_email: str,
to_email: str,
subject: str,
body_html: str,
from_name: str = '',
reply_to_email: str = '',
reply_to_name: str = '',
to_name: str = '',
cc_email: str = '',
cc_name: str = '',
bcc_email: str = '',
bcc_name: str = '',
body_text: str = '',
test: bool = False,
log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
):
log.setLevel(log_lvl)
log.debug(locals())
if test:
log.setLevel(logging.DEBUG)
log.debug('[TESTING] Running with send_email() in TEST mode')
message = EmailMessage()
if subject:
message['Subject'] = subject
else:
return False
if from_email and from_name:
try:
message['From'] = Address(display_name=from_name, addr_spec=from_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif from_email:
try:
message['From'] = Address(display_name=from_email, addr_spec=from_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
else:
return False
if reply_to_email and reply_to_name:
try:
message['Reply-To'] = Address(display_name=reply_to_name, addr_spec=reply_to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif reply_to_email:
try:
message['Reply-To'] = Address(display_name=reply_to_email, addr_spec=reply_to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
if to_email and to_name:
try:
message['To'] = Address(display_name=to_name, addr_spec=to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif to_email:
try:
message['To'] = Address(display_name=to_email, addr_spec=to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
else:
return False
if cc_email and cc_name:
try:
message['Cc'] = Address(display_name=cc_name, addr_spec=cc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif cc_email:
try:
message['Cc'] = Address(display_name=cc_email, addr_spec=cc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
if bcc_email and bcc_name:
try:
message['Bcc'] = Address(display_name=bcc_name, addr_spec=bcc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif bcc_email:
try:
message['Bcc'] = Address(display_name=bcc_email, addr_spec=bcc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
html_version = """
<html>
<body>
"""+body_html+"""
</body>
</html>
"""
if body_text:
text_version = body_text
else:
text_version = html2text.html2text(html_version)
message.set_content(text_version)
message.add_alternative(html_version, subtype='html')
log.info('Sending email...')
log.debug(settings.SMTP)
log.info(f'Subject: {subject}')
log.info(f'From: {from_email} Reply To: {reply_to_email} To: {to_email} CC: {cc_email} BCC: {bcc_email}')
log.debug('Message:')
log.debug(message.as_string())
log.info('Creating SMTP SSL connection...')
context = ssl.create_default_context()
log.info('SMTP configuration, connect, and send')
log.info(f'Server: {settings.SMTP["server"]} Port: {settings.SMTP["port"]} Username: {settings.SMTP["username"]}')
log.info('Trying smtplib.SMTP_SSL in send_email()...')
if test:
log.info('[TESTING] Email will NOT actually be sent! [TEST MODE]')
try:
with smtplib.SMTP_SSL(settings.SMTP['server'], settings.SMTP['port'], context=context) as server:
log.info('SMTP log in...')
log.debug(f'Server: {settings.SMTP["server"]} Port: {settings.SMTP["port"]} Username: {settings.SMTP["username"]} Password: {settings.SMTP["password"]}')
server.login(settings.SMTP['username'], settings.SMTP['password'])
log.info('SMTP send message...')
if not test:
log.info('Email sent! Returning True')
server.send_message(message)
else:
log.info('[TESTING] Email (NOT) sent! Returning True [TEST MODE]')
return True
except:
log.error('Error: Unable to send email. Returning False')
return False
# ### END ### API Lib Email ### send_email() ###

116
app/lib_export.py Normal file
View File

@@ -0,0 +1,116 @@
import os
import pandas
import pathlib
import logging
from typing import Optional, Union
from app.log import logger_reset
from app.config import settings
log = logging.getLogger(__name__)
# ### BEGIN ### API Lib Export ### create_export_file() ###
# Moved from lib_general.py 2026-01-07
@logger_reset
def create_export_file(
data_dict_list: list,
subdir_path: str,
filename: str,
column_name_li: list = [],
rm_id: bool = True,
export_type: str = 'CSV', # CSV, Excel
) -> Union[bool, str]:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root']
log.info(f'Hosted Temp Path: {hosted_tmp_path}')
subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path)
log.debug(subdirectory_dest)
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_dest_w_subdir = os.path.join(subdirectory_dest, filename)
log.info(f'File Dest With Subdir: {file_dest_w_subdir}')
if column_name_li:
log.info('Using column name list passed')
else:
log.info('Using an auto generated column name list')
column_name_li = list(data_dict_list[0].keys())
log.debug(column_name_li)
if rm_id:
for column_name in list(column_name_li):
if column_name.endswith('_id'):
column_name_li.remove(column_name)
log.info(f'Removing column name: {column_name}')
log.info(column_name_li)
data_dataframe = pandas.DataFrame(data_dict_list)
log.debug(data_dataframe)
missing_cols = [col for col in column_name_li if col not in data_dataframe.columns]
if missing_cols:
column_name_li = [col for col in column_name_li if col not in missing_cols]
try:
if export_type == 'CSV':
log.info('Saving dataframe to CSV file')
full_dest_path = file_dest_w_subdir+'.csv'
filename_w_ext = filename+'.csv'
tmp_file_path = os.path.join(subdir_path,filename_w_ext)
data_dataframe.to_csv(
full_dest_path,
na_rep='NULL',
columns=column_name_li,
index=False,
)
elif export_type == 'Excel':
log.info('Saving dataframe to Excel file')
full_dest_path = file_dest_w_subdir+'.xlsx'
filename_w_ext = filename+'.xlsx'
tmp_file_path = os.path.join(subdir_path,filename_w_ext)
data_dataframe.to_excel(
full_dest_path,
na_rep='NULL',
columns=column_name_li,
index=False,
)
except:
log.exception('Something went wrong while trying to save the export file.')
return False
log.info(f'Temp File Path: {tmp_file_path}')
return tmp_file_path
# ### END ### API Lib Export ### create_export_file() ###
# ### BEGIN ### API Lib Export ### return_full_tmp_path() ###
# This is for using with return FileResponse(path=full_tmp_path, filename=filename)
# Moved from lib_general.py 2026-01-07
@logger_reset
def return_full_tmp_path(
full_tmp_path: str = None,
subdir_path: str = None,
filename: str = None,
) -> Union[bool, str]:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root']
log.info(f'Hosted Temp Path: {hosted_tmp_path}')
if full_tmp_path:
file_dest = os.path.join(hosted_tmp_path, full_tmp_path)
return file_dest
elif subdir_path and filename:
subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path)
log.debug(subdirectory_dest)
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_dest_w_subdir = os.path.join(subdirectory_dest, filename)
log.info(f'File Dest With Subdir: {file_dest_w_subdir}')
return file_dest_w_subdir
else:
return False
# ### END ### API Lib Export ### return_full_tmp_path() ###

View File

@@ -1,26 +1,21 @@
import html2text, jwt, os, pandas, pathlib, time
# No longer needed here: datetime, pytz, redis
from passlib.hash import argon2
# Import smtplib for the actual sending function
import smtplib, ssl
# Import the email package modules needed
from email.message import EmailMessage
from email.headerregistry import Address
# from email.utils import make_msgid
from fastapi import APIRouter, Depends, Header, HTTPException, Response, status
# from pydantic import BaseModel, EmailStr, Field
import html2text, time
from typing import Dict, List, Optional, Set, Union
import logging
from app.log import logger_reset
log = logging.getLogger(__name__)
from fastapi import Header, HTTPException, Response, status
from app.log import logger_reset
from app.config import settings
from app.db_sql import redis_lookup_id_random, sql_select
from app.lib_email import send_email
from app.lib_export import create_export_file, return_full_tmp_path
from app.lib_jwt import sign_jwt, decode_jwt
from app.lib_hash import secure_hash_string, verify_secure_hash_string
log = logging.getLogger(__name__)
# ### BEGIN ### API Lib General ### async get_token_header() ###
def get_token_header(x_token: str = Header(...)):
@@ -267,388 +262,4 @@ def common_route_params_min(
# ### END ### API Lib General ### async common_route_params_min() ###
def secure_hash_string(string: str):
string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string)
return string_hash
def verify_secure_hash_string(string: str, string_hash: str):
if argon2.verify(string, string_hash):
return True
else:
return False
# ### BEGIN ### API Lib General ### sign_jwt() ###
# Updated 2021-07-14
@logger_reset
def sign_jwt(
secret_key: str, # Secret/Private/Password
ttl: int = 60, # Default to 60 seconds
max_renew: int = 0, # Default to 0
public_key: str = None, # Will be part of the token. Use to look up secret when verifying.???
account_id: str = None,
person_id: str = None,
user_id: str = None,
json_str: str = None,
b64_str: str = None,
) -> Dict[str, str]:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
payload = {
'iat': time.time(), # Issued at
'eat': time.time() + ttl, # Expires at
'max_renew': max_renew, # Number of times allowed to request renew without API secret key
'public_key': public_key, # Use to lookup the secret/private/password key when verifying
'account_id': account_id,
'person_id': person_id,
'user_id': user_id,
'json_str': json_str,
'b64_str': b64_str,
}
secret = secret_key
algorithm = 'HS256'
token = jwt.encode(payload, secret, algorithm=algorithm)
log.debug(token)
return token
# ### END ### API Lib General ### sign_jwt() ###
# ### BEGIN ### API Lib General ### decode_jwt() ###
# Updated 2021-07-14
@logger_reset
def decode_jwt(
secret_key: str,
token: str,
) -> dict:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
secret = secret_key
algorithm = 'HS256'
try:
decoded_token = jwt.decode(token, secret, algorithms=[algorithm])
log.debug(decoded_token)
if decoded_token['eat'] >= time.time(): return decoded_token
else: return False
except:
return None
# ### END ### API Lib General ### decode_jwt() ###
# ### BEGIN ### API Lib General ### create_export() ###
# Updated 2021-11-23
@logger_reset
def create_export_file(
data_dict_list: list,
subdir_path: str,
filename: str,
column_name_li: list = [],
rm_id: bool = True,
export_type: str = 'CSV', # CSV, Excel
) -> bool|str:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root']
# hosted_tmp_path = 'admin/temp'
log.info(f'Hosted Temp Path: {hosted_tmp_path}')
subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path)
log.debug(subdirectory_dest)
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_dest_w_subdir = os.path.join(subdirectory_dest, filename)
log.info(f'File Dest With Subdir: {file_dest_w_subdir}')
if column_name_li:
log.info('Using column name list passed')
else:
log.info('Using an auto generated column name list')
column_name_li = list(data_dict_list[0].keys())
log.debug(column_name_li)
if rm_id:
for column_name in list(column_name_li):
# log.info(f'Checking column name: {column_name}')
if column_name.endswith('_id'):
column_name_li.remove(column_name)
log.info(f'Removing column name: {column_name}')
log.info(column_name_li)
# column_name_li = ['order_line_id_random', 'order_id_random', 'product_id_random', 'product_type', 'product_name', 'quantity', 'amount', 'dollar_amount', 'message', 'person_id_random', 'person_given_name', 'person_family_name', 'person_display_name', 'person_full_name', 'person_contact_email', 'person_contact_cc_email', 'person_contact_address_name', 'person_contact_address_organization_name', 'person_contact_address_line_1', 'person_contact_address_line_2', 'person_contact_address_line_3', 'person_contact_address_city', 'person_contact_address_country_subdivision_code', 'person_contact_address_state_province', 'person_contact_address_postal_code', 'person_contact_address_country_alpha_2_code', 'person_contact_address_country_name', 'person_contact_address_country', 'order_status', 'order_created_on', 'order_updated_on']
data_dataframe = pandas.DataFrame(data_dict_list)
log.debug(data_dataframe)
# Need to deal with this error when a field is empty (null/None) for every record in the export: KeyError("Not all names specified in 'columns' are found")
# There does not seem to be a Panda option to ignore missing columns
missing_cols = [col for col in column_name_li if col not in data_dataframe.columns]
if missing_cols:
# Remove the missing columns from the column list
column_name_li = [col for col in column_name_li if col not in missing_cols]
# raise KeyError(f"The following columns are not found in the DataFrame: {', '.join(missing_cols)}")
try:
if export_type == 'CSV':
log.info('Saving dataframe to CSV file')
full_dest_path = file_dest_w_subdir+'.csv'
filename_w_ext = filename+'.csv'
tmp_file_path = os.path.join(subdir_path,filename_w_ext)
data_dataframe.to_csv(
full_dest_path,
na_rep='NULL',
columns=column_name_li,
index=False,
# errors='ignore',
)
elif export_type == 'Excel':
log.info('Saving dataframe to Excel file')
full_dest_path = file_dest_w_subdir+'.xlsx'
filename_w_ext = filename+'.xlsx'
tmp_file_path = os.path.join(subdir_path,filename_w_ext)
# This should ignore unknown columns
data_dataframe.to_excel(
full_dest_path,
na_rep='NULL',
columns=column_name_li,
index=False,
# engine='openpyxl',
# errors='ignore',
)
except:
log.exception('Something went wrong while trying to save the export file.')
return False
log.info(f'Temp File Path: {tmp_file_path}')
return tmp_file_path # True
# ### END ### API Lib General ### create_export() ###
# ### BEGIN ### API Lib General ### return_full_tmp_path() ###
# This is for using with return FileResponse(path=full_tmp_path, filename=filename)
# Updated 2022-04-22
@logger_reset
def return_full_tmp_path(
full_tmp_path: str = None,
subdir_path: str = None,
filename: str = None,
) -> bool|str:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root']
# hosted_tmp_path = 'admin/temp'
log.info(f'Hosted Temp Path: {hosted_tmp_path}')
if full_tmp_path:
file_dest = os.path.join(hosted_tmp_path, full_tmp_path)
return file_dest
elif subdir_path and filename:
subdirectory_dest = os.path.join(hosted_tmp_path, subdir_path)
log.debug(subdirectory_dest)
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_dest_w_subdir = os.path.join(subdirectory_dest, filename)
log.info(f'File Dest With Subdir: {file_dest_w_subdir}')
return file_dest_w_subdir
else:
return False
# ### END ### API Lib General ### return_full_tmp_path() ###
# ### BEGIN ### API Lib General ### send_email() ###
# Updated 2021-12-02
@logger_reset
def send_email(
from_email: str,
to_email: str,
subject: str,
body_html: str,
from_name: str = '',
reply_to_email: str = '',
reply_to_name: str = '',
to_name: str = '',
cc_email: str = '',
cc_name: str = '',
bcc_email: str = '',
bcc_name: str = '',
body_text: str = '',
test: bool = False,
log_lvl: int = logging.WARNING, # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
):
log.setLevel(log_lvl)
log.debug(locals())
if test:
log.setLevel(logging.DEBUG)
log.debug('[TESTING] Running with send_email() in TEST mode')
message = EmailMessage()
if subject:
message['Subject'] = subject
else:
return False
if from_email and from_name:
#message['From'] = Address(display_name=from_name, username=from_email.split('@')[0], domain=from_email.split('@')[1])
try:
message['From'] = Address(display_name=from_name, addr_spec=from_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif from_email:
try:
message['From'] = Address(display_name=from_email, addr_spec=from_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
else:
return False
if reply_to_email and reply_to_name:
try:
message['Reply-To'] = Address(display_name=reply_to_name, addr_spec=reply_to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif reply_to_email:
try:
message['Reply-To'] = Address(display_name=reply_to_email, addr_spec=reply_to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
if to_email and to_name:
try:
message['To'] = Address(display_name=to_name, addr_spec=to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif to_email:
try:
message['To'] = Address(display_name=to_email, addr_spec=to_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
else:
return False
if cc_email and cc_name:
try:
message['Cc'] = Address(display_name=cc_name, addr_spec=cc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif cc_email:
try:
message['Cc'] = Address(display_name=cc_email, addr_spec=cc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
if bcc_email and bcc_name:
try:
message['Bcc'] = Address(display_name=bcc_name, addr_spec=bcc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
elif bcc_email:
try:
message['Bcc'] = Address(display_name=bcc_email, addr_spec=bcc_email)
except Exception as e:
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
return False
html_version = """\
<html>
<body>
"""+body_html+"""
</body>
</html>
"""
# html_version = """\
# <html>
# <body>
# <div>
# """+body_html+"""
# </div>
# </body>
# </html>
# """
if body_text:
text_version = body_text
else:
text_version = html2text.html2text(html_version)
message.set_content(text_version)
message.add_alternative(html_version, subtype='html')
log.info('Sending email...')
log.debug(settings.SMTP)
log.info(f'Subject: {subject}')
log.info(f'From: {from_email} Reply To: {reply_to_email} To: {to_email} CC: {cc_email} BCC: {bcc_email}')
log.debug('Message:')
log.debug(message.as_string())
log.info('Creating SMTP SSL connection...')
context = ssl.create_default_context()
log.info('SMTP configuration, connect, and send')
log.info(f'Server: {settings.SMTP["server"]} Port: {settings.SMTP["port"]} Username: {settings.SMTP["username"]}')
# log.info(settings.SMTP['server'])
# log.info(settings.SMTP['port'])
# log.info(settings.SMTP['username'])
# log.info(settings.SMTP['password'])
# if test:
# try:
# with smtplib.SMTP_SSL(settings.SMTP['server'], settings.SMTP['port'], context=context) as server:
# log.info('[TESTING] SMTP log in...')
# server.login(settings.SMTP['username'], settings.SMTP['password'])
# log.info('[TESTING] SMTP send message... [WILL NOT SEND IN TEST MODE]')
# # server.send_message(message)
# log.info('[TESTING] Email (NOT) sent!')
# return True
# # except SMTPException:
# except:
# log.error('[TEST] Error: Unable to send email. Returning False')
# return False
log.info('Trying smtplib.SMTP_SSL in send_email()...')
if test:
log.info('[TESTING] Email will NOT actually be sent! [TEST MODE]')
try:
with smtplib.SMTP_SSL(settings.SMTP['server'], settings.SMTP['port'], context=context) as server:
log.info('SMTP log in...')
log.debug(f'Server: {settings.SMTP["server"]} Port: {settings.SMTP["port"]} Username: {settings.SMTP["username"]} Password: {settings.SMTP["password"]}')
server.login(settings.SMTP['username'], settings.SMTP['password'])
log.info('SMTP send message...')
if not test:
log.info('Email sent! Returning True')
server.send_message(message)
else:
log.info('[TESTING] Email (NOT) sent! Returning True [TEST MODE]')
return True
except:
log.error('Error: Unable to send email. Returning False')
return False
# ### END ### API Lib General ### send_email() ###

16
app/lib_hash.py Normal file
View File

@@ -0,0 +1,16 @@
from passlib.hash import argon2
import logging
log = logging.getLogger(__name__)
# Moved from lib_general.py 2026-01-07
def secure_hash_string(string: str) -> str:
string_hash = argon2.using(rounds=14, memory_cost=1536, parallelism=2).hash(string)
return string_hash
# Moved from lib_general.py 2026-01-07
def verify_secure_hash_string(string: str, string_hash: str) -> bool:
if argon2.verify(string, string_hash):
return True
else:
return False

68
app/lib_jwt.py Normal file
View File

@@ -0,0 +1,68 @@
import jwt
import time
import logging
from typing import Dict, Optional
from app.log import logger_reset
log = logging.getLogger(__name__)
# ### BEGIN ### API Lib JWT ### sign_jwt() ###
# Moved from lib_general.py 2026-01-07
@logger_reset
def sign_jwt(
secret_key: str, # Secret/Private/Password
ttl: int = 60, # Default to 60 seconds
max_renew: int = 0, # Default to 0
public_key: str = None, # Will be part of the token. Use to look up secret when verifying.???
account_id: str = None,
person_id: str = None,
user_id: str = None,
json_str: str = None,
b64_str: str = None,
) -> str:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
payload = {
'iat': time.time(), # Issued at
'eat': time.time() + ttl, # Expires at
'max_renew': max_renew, # Number of times allowed to request renew without API secret key
'public_key': public_key, # Use to lookup the secret/private/password key when verifying
'account_id': account_id,
'person_id': person_id,
'user_id': user_id,
'json_str': json_str,
'b64_str': b64_str,
}
secret = secret_key
algorithm = 'HS256'
token = jwt.encode(payload, secret, algorithm=algorithm)
log.debug(token)
return token
# ### END ### API Lib JWT ### sign_jwt() ###
# ### BEGIN ### API Lib JWT ### decode_jwt() ###
# Moved from lib_general.py 2026-01-07
@logger_reset
def decode_jwt(
secret_key: str,
token: str,
) -> Optional[dict]:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
secret = secret_key
algorithm = 'HS256'
try:
decoded_token = jwt.decode(token, secret, algorithms=[algorithm])
log.debug(decoded_token)
if decoded_token['eat'] >= time.time(): return decoded_token
else: return False
except:
return None
# ### END ### API Lib JWT ### decode_jwt() ###

View File

@@ -17,7 +17,36 @@ This guide explains how to update or create frontend functions to interact with
---
## 2. Implementing V3 CRUD Functions
## 2. Authentication and Security (Mandatory in V3)
As of January 2026, the V3 architecture enforces strict **Multi-Tenant Isolation**. Most requests now require valid authentication to prevent data leakage between accounts.
### A. Authentication Requirement
Almost all V3 CRUD endpoints require a standard Bearer token in the `Authorization` header.
* **Mandatory:** You must provide a valid JWT for nearly all requests.
* **Account Isolation:** The backend automatically filters all results based on the `account_id` found in your JWT. You cannot access data belonging to another account even if you know the random ID.
* **Status Codes:**
* `401 Unauthorized`: Your JWT is invalid or expired.
* `403 Forbidden`: No authentication provided, or you attempted to access an object belonging to a different account.
**Example Request Header:**
```http
Authorization: Bearer <your_jwt_token>
```
### B. The "Bootstrap Paradox" Exception (`site_domain`)
There is one critical exception to strict authentication: **`site_domain` search**.
Because the frontend needs to lookup the site configuration (to know which account it's on) *before* a user can log in, the following endpoint allows unauthenticated (guest) access:
**Endpoint:** `POST /v3/crud/site_domain/search`
This is the only V3 search allowed without a JWT. All other object types (journal, account, post, etc.) will return `403 Forbidden` if accessed without a token.
---
## 3. Implementing V3 CRUD Functions
### A. List & Single Object (GET)
Support for view selection allows fetching richer data models when needed.
@@ -60,41 +89,21 @@ export async function search_ae_obj_v3({
}
```
### C. Standardized Global Search
Use the `q` property in your search body for a general keyword search across indexed columns.
### C. Standardized Global Search (`q`)
Use the `q` property in your search body for a keyword search.
- **Wildcard Support**: Setting `q: "%"` will bypass all text filtering and return all records (respecting only logical filters like `enable`).
- **Fallback**: If the table lacks a `default_qry_str` column, the API automatically performs a `LIKE` search across all searchable fields.
```json
// POST /v3/crud/journal/search
{
"q": "Annual Meeting",
"and": [
{ "field": "enable", "op": "eq", "value": true }
]
"and": [{ "field": "enable", "op": "eq", "value": true }]
}
```
### D. Supported Search Operators
The `op` property in a `SearchFilter` supports the following values:
| Operator | SQL equivalent | Description |
| --- | --- | --- |
| `eq` | `=` | Equal to |
| `ne` | `!=` | Not equal to |
| `gt` | `>` | Greater than |
| `gte` | `>=` | Greater than or equal to |
| `lt` | `<` | Less than |
| `lte` | `<=` | Less than or equal to |
| `in` | `IN (...)` | Matches any value in a provided list |
| `is_null` | `IS NULL` | Field is null (ignores `value`) |
| `is_not_null` | `IS NOT NULL` | Field is not null (ignores `value`) |
| `like` | `LIKE` | Standard SQL LIKE (requires manual `%` in `value`) |
| `contains` | `LIKE %val%` | Wraps value in `%` automatically |
| `startswith` | `LIKE val%` | Appends `%` to value automatically |
| `endswith` | `LIKE %val` | Prepends `%` to value automatically |
---
## 3. Create, Update, & Delete (POST, PATCH, DELETE)
## 4. Create, Update, & Delete (POST, PATCH, DELETE)
V3 supports both top-level operations and nested parent/child operations.
@@ -103,14 +112,12 @@ When creating objects, V3 strictly validates the incoming JSON against the `mdl_
```ts
// POST /v3/crud/{obj_type}/
// POST /v3/crud/journal/
export async function create_ae_obj_v3({ api_cfg, obj_type, data }) {
const endpoint = `/v3/crud/${obj_type}/`;
return await post_object({ api_cfg, endpoint, data });
}
// POST /v3/crud/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/
// POST /v3/crud/journal/EIAC-40-76-82/journal_entry/
// Note: Parent ID is automatically injected into the child record.
export async function create_nested_obj_v3({ api_cfg, parent_type, parent_id, child_type, data }) {
const endpoint = `/v3/crud/${parent_type}/${parent_id}/${child_type}/`;
@@ -127,13 +134,6 @@ export async function update_ae_obj_v3({ api_cfg, obj_type, obj_id, data }) {
const endpoint = `/v3/crud/${obj_type}/${obj_id}`;
return await patch_object({ api_cfg, endpoint, data });
}
// PATCH /v3/crud/{parent_type}/{parent_id}/{child_type}/{child_id}
// Verification: The backend ensures the child actually belongs to the parent before updating.
export async function update_nested_obj_v3({ api_cfg, parent_type, parent_id, child_type, child_id, data }) {
const endpoint = `/v3/crud/${parent_type}/${parent_id}/${child_type}/${child_id}`;
return await patch_object({ api_cfg, endpoint, data });
}
```
### C. Delete (DELETE)
@@ -149,34 +149,18 @@ export async function delete_ae_obj_v3({ api_cfg, obj_type, obj_id }) {
---
## 4. Specialized & Context Endpoints
## 5. Specialized & Context Endpoints
### A. Initial Context Resolution (FQDN)
**Path**: `GET /crud/site/domain/{fqdn}?use_alt_table=true&use_alt_base=true`
### A. Context Resolution (FQDN)
Used during initial load to find the `account_id` associated with the domain.
This is a **critical public endpoint** used by the Svelte frontend during initial load.
- **Requirement**: Does NOT require `X-Account-ID` header (it resolves it).
- **Purpose**: Returns the `account_id` and site configuration associated with the current URL.
- **Backend**: Currently routed via legacy V1/V2 `api_crud.py` for maximum compatibility.
```ts
// Example: resolve context from current location
const fqdn = window.location.host;
const endpoint = `/crud/site/domain/${fqdn}`;
const context = await get_object({
api_cfg,
endpoint,
params: { use_alt_table: true, use_alt_base: true }
});
// result includes account_id and account_id_random
```
**Legacy Method:** `GET /crud/site/domain/{fqdn}?use_alt_table=true&use_alt_base=true`
**Modern V3 Method (Preferred):** `POST /v3/crud/site_domain/search` with `{ "q": "your-domain.com" }`
### B. Schema Discovery
**Path**: `GET /v3/crud/{obj_type}/schema`
Used for developer tools or dynamic UI builders to understand the structure of an AE object. Returns:
- Database column names and SQL types.
- Pydantic model field names, aliases, and TypeScript-compatible types.
Used for developer tools or dynamic UI builders to understand the structure of an AE object.
```ts
// GET /v3/crud/account/schema
@@ -188,44 +172,7 @@ export async function get_obj_schema_v3({ api_cfg, obj_type }) {
---
## 5. Advanced Search Logic
### A. Standardized Global Search (`q`)
Use the `q` property in your search body for a keyword search.
- **Wildcard Support**: Setting `q: "%"` will bypass all text filtering and return all records (respecting only logical filters like `enable`).
- **Fallback**: If the table lacks a `default_qry_str` column, the API automatically performs a `LIKE` search across all searchable fields.
```json
{
"q": "%",
"and": [{ "field": "enable", "op": "eq", "value": true }]
}
```
## 6. Authentication and Security in V3 (Mandatory)
Implemented in January 2026, the V3 architecture enforces strict **Multi-Tenant Isolation** and requires valid authentication for nearly all requests.
### A. Authentication Requirement
Most API calls now require a standard Bearer token in the `Authorization` header.
* **Mandatory:** V3 CRUD endpoints require a valid JWT or an administrative bypass header.
* **Account Isolation:** The backend automatically filters all results based on the `account_id` found in your JWT. You cannot access data belonging to another account even if you know the ID.
* **Status Codes:**
* `401 Unauthorized`: Your JWT is invalid or expired.
* `403 Forbidden`: No authentication provided, or you attempted to access an object belonging to a different account.
**Example Request Header:**
```http
Authorization: Bearer <your_jwt_token>
```
### B. Role-Based Access
The API performs background checks on the user's role stored in the system:
* **Managers/Admins:** Can see and edit most data within their account.
* **Super Users:** Can bypass account isolation (reserved for system maintenance).
### C. Secure File Downloads (URL Parameter)
## 6. Secure File Downloads (URL Parameter)
For `hosted_file` and `event_file`, browsers often need to download files without complex header modifications. In these cases, you can pass the JWT directly in the URL.
```ts
@@ -234,14 +181,10 @@ For `hosted_file` and `event_file`, browsers often need to download files withou
const downloadUrl = `${BASE_URL}/v3/crud/hosted_file/${fileId}/?jwt=${jwtToken}`;
```
### D. Administrative Bypass (Utility/Dev only)
For development utilities and automated scripts, the `X-No-Account-ID: true` header grants full administrative access. **Never use this in a public-facing application.**
---
## 7. Best Practices for V3
1. **Use `view` for Rich Data**: Instead of manually joining data in separate calls, use `?view=enriched` or `?view=detail` if defined in the backend.
2. **Hybrid Search**: Use query parameters for simple toggles (enabled/hidden) and the POST body for complex logic.
3. **Global Search**: Always prefer the `q` property for text search instead of manually targeting `default_qry_str`.
4. **Singular Nouns**: Always use singular names for `obj_type` (e.g., `journal`).
1. **Use `view` for Rich Data**: Instead of manually joining data in separate calls, use `?view=enriched` or `?view=detail`.
2. **Singular Nouns**: Always use singular names for `obj_type` (e.g., `journal`).
3. **Strict Typing**: Ensure your `data` objects match the backend models to avoid `400 Bad Request` validation errors.