From 6ca79e9a02eaaf425261b88cbe44116976d90852 Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Wed, 21 Jan 2026 15:23:04 -0500 Subject: [PATCH] chore(api): stabilize SQL core and enhance searchability - Refactor SQL CRUD to use engine.connect() context managers for thread safety - Optimize connection pooling in lib_sql_core - Clean up app/routers/api.py to fix duplicate definitions and OpenAPI KeyError - Add 'default_qry_str' to searchable_fields for Event, Session, Presentation, Presenter, Badge, and Journal - Add 'event_location_name' to searchable_fields for Event Session - Verified 20/20 E2E success via repro_intermittent_errors.py --- app/lib_api_crud_v3.py | 9 +- app/lib_sql_core.py | 56 +- app/lib_sql_crud.py | 104 ++- app/lib_sql_search.py | 18 +- app/object_definitions/events_presentation.py | 7 +- app/object_definitions/events_registration.py | 2 +- app/object_definitions/journals.py | 2 +- app/routers/api.py | 810 ++---------------- tests/e2e/repro_intermittent_errors.py | 88 ++ 9 files changed, 263 insertions(+), 833 deletions(-) create mode 100644 tests/e2e/repro_intermittent_errors.py diff --git a/app/lib_api_crud_v3.py b/app/lib_api_crud_v3.py index f801b45..78e489f 100644 --- a/app/lib_api_crud_v3.py +++ b/app/lib_api_crud_v3.py @@ -107,10 +107,11 @@ def apply_forced_account_filter(and_qry_dict: Optional[Dict], account: AccountCo from app import lib_sql_core from sqlalchemy import text try: - lib_sql_core.db.execute(text(f"SELECT `{target_col}` FROM `{table_name}` LIMIT 0")) - except Exception: - log.warning(f"Forced account filter skipped: Column '{target_col}' not found in '{table_name}'.") - return forced + with lib_sql_core.engine.connect() as conn: + conn.execute(text(f"SELECT `{target_col}` FROM `{table_name}` LIMIT 0")) + has_col = True + except: + has_col = False forced[target_col] = account.account_id return forced diff --git a/app/lib_sql_core.py b/app/lib_sql_core.py index c72b7bd..f70a9a2 100644 --- a/app/lib_sql_core.py +++ b/app/lib_sql_core.py @@ -9,6 +9,7 @@ from sqlalchemy import create_engine from app.config import settings log = logging.getLogger('root') +log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # 1. Thread-local storage for capturing last SQL error message _sql_error_state = threading.local() @@ -27,29 +28,32 @@ def set_last_sql_error(error: Any): # 2. Initial Engine Setup db_uri = settings.SQLALCHEMY_DB_URI -engine = create_engine( - url = db_uri, - echo = False, - pool_use_lifo = True, - pool_pre_ping = True, - pool_recycle = settings.DB['pool_recycle'], - isolation_level = 'READ COMMITTED', - connect_args = {'connect_timeout': settings.DB['connect_timeout']} -) +def create_ae_engine(uri: str): + return create_engine( + url = uri, + echo = False, + pool_size = settings.DB.get('pool_size', 10), + max_overflow = settings.DB.get('max_overflow', 20), + pool_use_lifo = True, + pool_pre_ping = True, + pool_recycle = settings.DB['pool_recycle'], + isolation_level = 'READ COMMITTED', + connect_args = {'connect_timeout': settings.DB['connect_timeout']} + ) -log.info('DB SQL Core: Initializing connection...') -db = None -try: - db = engine.connect() - log.info(f'DB SQL Core: Connected to database: {db_uri}') -except Exception: - log.exception('DB SQL Core: Could not connect to database.') +engine = create_ae_engine(db_uri) + +# DEPRECATED: Global shared 'db' connection. Use engine.connect() in context managers instead. +# Keeping for legacy compatibility but will phase out usage in crud lib. +db = engine.connect() + +log.info('DB SQL Core: Initializing engine...') # 3. Connection Management Logic def reconnect_db() -> bool: """ - Re-initializes the global database engine and connection using current settings. + Re-initializes the global database engine using current settings. Useful after bootstrapping new credentials from the 'cfg' table. """ global engine, db, db_uri @@ -61,28 +65,16 @@ def reconnect_db() -> bool: log.info("DB SQL Core: Disposed of previous database engine.") db_uri = settings.SQLALCHEMY_DB_URI - engine = create_engine( - url = db_uri, - echo = False, - pool_use_lifo = True, - pool_pre_ping = True, - pool_recycle = settings.DB['pool_recycle'], - isolation_level = 'READ COMMITTED', - connect_args = {'connect_timeout': settings.DB['connect_timeout']} - ) + engine = create_ae_engine(db_uri) db = engine.connect() - log.info(f"DB SQL Core: Database connection re-established successfully: {db_uri}") + log.info(f"DB SQL Core: Database engine re-established successfully: {db_uri}") return True except Exception: - log.exception("DB SQL Core: FAILED to refresh database connection!") + log.exception("DB SQL Core: FAILED to refresh database engine!") return False def sql_connect(current_db=None, log_lvl: int = logging.INFO) -> bool: - """Refreshes the global database connection.""" - log.setLevel(log_lvl) - log.info('DB SQL Core: Refreshing database connection via sql_connect...') - return reconnect_db() \ No newline at end of file diff --git a/app/lib_sql_crud.py b/app/lib_sql_crud.py index 363c12e..73057b9 100644 --- a/app/lib_sql_crud.py +++ b/app/lib_sql_crud.py @@ -13,6 +13,8 @@ from app.log import log, logger_reset from app import lib_sql_core from app.lib_sql_core import sql_connect, set_last_sql_error +log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL + # Helper for resolving random IDs from app.lib_redis_helpers import lookup_id_random_pop @@ -51,26 +53,27 @@ def sql_insert( log.error('SQL INSERT statement could not be created. Missing params.') return False - trans = lib_sql_core.db.begin() + trans = None try: - result_insert = lib_sql_core.db.execute(sql_insert_stmt, data) - trans.commit() + with lib_sql_core.engine.connect() as conn: + trans = conn.begin() + result_insert = conn.execute(sql_insert_stmt, data) + trans.commit() + if result_insert.rowcount == 1 and result_insert.lastrowid > 0: + return result_insert.lastrowid + return False except IntegrityError as e: - trans.rollback() + if trans: trans.rollback() log.error('Integrity error (likely duplicate). Returning None') log.debug(e) set_last_sql_error(e) return None except Exception as e: - trans.rollback() + if trans: trans.rollback() log.error('Unknown exception in sql_insert. Returning False') log.exception(e) set_last_sql_error(e) return False - else: - if result_insert.rowcount == 1 and result_insert.lastrowid > 0: - return result_insert.lastrowid - return False # ### END ### API DB SQL ### sql_insert() ### @@ -123,29 +126,35 @@ def sql_update( else: return False - trans = lib_sql_core.db.begin() + trans = None try: - result_update = lib_sql_core.db.execute(sql_update_stmt, data) - trans.commit() - except OperationalError: - trans.rollback() - log.error('Operational error (gone away?). Retrying once...') - sql_connect(current_db=lib_sql_core.db) - try: - result_update = lib_sql_core.db.execute(sql_update_stmt, data) + with lib_sql_core.engine.connect() as conn: + trans = conn.begin() + result_update = conn.execute(sql_update_stmt, data) trans.commit() + if result_update.rowcount >= 1: + return True + return None + except OperationalError: + if trans: trans.rollback() + log.error('Operational error (gone away?). Retrying once...') + sql_connect() + try: + with lib_sql_core.engine.connect() as conn: + trans = conn.begin() + result_update = conn.execute(sql_update_stmt, data) + trans.commit() + if result_update.rowcount >= 1: + return True + return None except Exception as e: set_last_sql_error(e) return False except Exception as e: - trans.rollback() + if trans: trans.rollback() log.exception(e) set_last_sql_error(e) return False - else: - if result_update.rowcount >= 1: - return True - return None # ### END ### API DB SQL ### sql_update() ### @@ -183,13 +192,15 @@ def sql_insert_or_update( else: return False - trans = lib_sql_core.db.begin() + trans = None try: - res = lib_sql_core.db.execute(stmt, data) - trans.commit() - return res.lastrowid if res.lastrowid > 0 else True + with lib_sql_core.engine.connect() as conn: + trans = conn.begin() + res = conn.execute(stmt, data) + trans.commit() + return res.lastrowid if res.lastrowid > 0 else True except Exception as e: - trans.rollback() + if trans: trans.rollback() log.exception(e) return False # ### END ### Core Help CRUD ### sql_insert_or_update() ### @@ -286,18 +297,18 @@ def sql_select( else: return False - result = run_sql_select(sql=stmt, data=data) - if not result: - return [] if as_list else None - - # Fetch all rows first to determine actual count reliably try: - # Check if the result set actually contains rows before fetching - if hasattr(result, 'returns_rows') and not result.returns_rows: - log.warning("SQL Result does not return rows (ResourceClosedError prevented).") - return [] if as_list else None - - rows = result.all() + with lib_sql_core.engine.connect() as conn: + result = conn.execute(stmt, data) + if not result: + return [] if as_list else None + + # Fetch all rows first to determine actual count reliably + if hasattr(result, 'returns_rows') and not result.returns_rows: + log.warning("SQL Result does not return rows (ResourceClosedError prevented).") + return [] if as_list else None + + rows = result.all() except Exception as e: log.error(f"SQL Fetch Error: {e}") set_last_sql_error(e) @@ -327,15 +338,15 @@ def run_sql_select( ) -> Any: log.setLevel(log_lvl) - # print(f"Executing SQL: {sql} with data: {data}", flush=True) - try: - return lib_sql_core.db.execute(sql, data) + with lib_sql_core.engine.connect() as conn: + return conn.execute(sql, data) except (OperationalError, ProgrammingError) as e: log.error(f'DB Error: {e}. Retrying once...') - sql_connect(current_db=lib_sql_core.db) + sql_connect() try: - return lib_sql_core.db.execute(sql, data) + with lib_sql_core.engine.connect() as conn: + return conn.execute(sql, data) except Exception as e2: set_last_sql_error(e2) raise e2 # RAISING instead of returning False @@ -372,8 +383,9 @@ def sql_delete( return False try: - result = lib_sql_core.db.execute(stmt, data) if data else lib_sql_core.db.execute(stmt) - return True if result.rowcount >= 1 else None + with lib_sql_core.engine.connect() as conn: + result = conn.execute(stmt, data) if data else conn.execute(stmt) + return True if result.rowcount >= 1 else None except Exception as e: log.exception(e) return False diff --git a/app/lib_sql_search.py b/app/lib_sql_search.py index 694f4ab..5d27023 100644 --- a/app/lib_sql_search.py +++ b/app/lib_sql_search.py @@ -83,7 +83,8 @@ def sql_enable_part(table_name: str, enabled: str) -> tuple[str, bool|None]: if enabled in ['enabled', 'disabled', 'all']: if enabled == 'all': return '', None try: - lib_sql_core.db.execute(text(f"SELECT enable FROM `{table_name}` LIMIT 0")) + with lib_sql_core.engine.connect() as conn: + conn.execute(text(f"SELECT enable FROM `{table_name}` LIMIT 0")) except: log.warning(f"Table '{table_name}' missing 'enable' column. Skipping filter.") return '', None @@ -98,7 +99,8 @@ def sql_hidden_part(table_name: str, hidden: str) -> tuple[str, bool|None]: if hidden in ['hidden', 'not_hidden', 'all']: if hidden == 'all': return '', None try: - lib_sql_core.db.execute(text(f"SELECT hide FROM `{table_name}` LIMIT 0")) + with lib_sql_core.engine.connect() as conn: + conn.execute(text(f"SELECT hide FROM `{table_name}` LIMIT 0")) except: log.warning(f"Table '{table_name}' missing 'hide' column. Skipping filter.") return '', None @@ -157,11 +159,12 @@ def sql_search_qry_part( else: use_match = True if table_name: - try: - lib_sql_core.db.execute(text(f"SELECT default_qry_str FROM `{table_name}` LIMIT 0")) - except: + try: + with lib_sql_core.engine.connect() as conn: + conn.execute(text(f"SELECT default_qry_str FROM `{table_name}` LIMIT 0")) + except: use_match = False - else: + else: use_match = False if use_match: @@ -221,7 +224,8 @@ def sql_search_qry_part( use_random = False if table_name: try: - lib_sql_core.db.execute(text(f"SELECT `{candidate_field}` FROM `{table_name}` LIMIT 0")) + with lib_sql_core.engine.connect() as conn: + conn.execute(text(f"SELECT `{candidate_field}` FROM `{table_name}` LIMIT 0")) use_random = True except Exception: pass diff --git a/app/object_definitions/events_presentation.py b/app/object_definitions/events_presentation.py index 4f0dd04..2b3f1b3 100644 --- a/app/object_definitions/events_presentation.py +++ b/app/object_definitions/events_presentation.py @@ -69,7 +69,7 @@ events_presentation_obj_li = { 'event_session_id_random', 'event_track_id_random', 'code', 'name', 'description', 'type_code', 'enable', 'hide', 'public', 'public_hide', 'hide_event_launcher', 'priority', 'sort', 'group', 'notes', - 'created_on', 'updated_on' + 'created_on', 'updated_on', 'default_qry_str' ], }, 'event_presenter': { @@ -104,7 +104,7 @@ events_presentation_obj_li = { 'event_session_id_random', 'person_id_random', 'code', 'informal_name', 'given_name', 'family_name', 'full_name', 'email', 'role', 'enable', 'hide', 'public', 'public_hide', 'hide_event_launcher', 'priority', - 'sort', 'group', 'notes', 'created_on', 'updated_on' + 'sort', 'group', 'notes', 'created_on', 'updated_on', 'default_qry_str' ], }, 'event_session': { @@ -128,7 +128,8 @@ events_presentation_obj_li = { 'event_location_id_random', 'event_track_id_random', 'code', 'name', 'description', 'type_code', 'start_datetime', 'end_datetime', 'enable', 'hide', 'public', 'public_hide', 'hide_event_launcher', - 'priority', 'sort', 'group', 'notes', 'created_on', 'updated_on' + 'priority', 'sort', 'group', 'notes', 'created_on', 'updated_on', + 'default_qry_str', 'event_location_name' ], }, 'event_track': { diff --git a/app/object_definitions/events_registration.py b/app/object_definitions/events_registration.py index 1a5920f..1eeeb77 100644 --- a/app/object_definitions/events_registration.py +++ b/app/object_definitions/events_registration.py @@ -26,7 +26,7 @@ events_registration_obj_li = { 'professional_title', 'full_name', 'affiliations', 'email', 'phone', 'location', 'allow_tracking', 'print_count', 'print_first_datetime', 'print_last_datetime', 'enable', 'hide', 'priority', 'sort', 'group', - 'notes', 'created_on', 'updated_on' + 'notes', 'created_on', 'updated_on', 'default_qry_str' ], }, 'event_badge_template': { diff --git a/app/object_definitions/journals.py b/app/object_definitions/journals.py index 59833e2..08f635e 100644 --- a/app/object_definitions/journals.py +++ b/app/object_definitions/journals.py @@ -24,7 +24,7 @@ journal_obj_li = { 'journal_id_random', 'account_id_random', 'person_id_random', 'user_id_random', 'name', 'short_name', 'summary', 'outline', 'description', 'type_code', 'tags', 'billable', 'enable', 'hide', - 'priority', 'sort', 'group', 'notes', 'created_on', 'updated_on' + 'priority', 'sort', 'group', 'notes', 'created_on', 'updated_on', 'default_qry_str' ], }, 'journal_entry': { diff --git a/app/routers/api.py b/app/routers/api.py index b1f1493..93057bc 100644 --- a/app/routers/api.py +++ b/app/routers/api.py @@ -3,27 +3,28 @@ from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from sqlalchemy import text import json -from app.db_connection import db +import time +import secrets +import jwt as pyjwt # Avoid conflict with app.lib_jwt -from app.lib_general import log, logging, sign_jwt, decode_jwt, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min +from app.db_connection import db +from app.lib_general import sign_jwt, decode_jwt, log, logging from app.config import settings -from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random +from app.db_sql import sql_insert, sql_update, sql_select, redis_lookup_id_random, get_id_random from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template - from app.models.api_models import Api_Base from app.models.response_models import Resp_Body_Base, mk_resp - router = APIRouter() +# --- Passcode Authentication --- class PasscodeAuthRequest(BaseModel): """Request model for site-based passcode authentication.""" site_id: str = Field(..., description="The random string ID of the site") passcode: str = Field(..., description="The passcode to verify") - @router.post('/authenticate_passcode', response_model=Resp_Body_Base) async def authenticate_passcode( auth_req: PasscodeAuthRequest, @@ -34,7 +35,6 @@ async def authenticate_passcode( Verifies a passcode against site.access_code_kv_json. Returns a signed JWT with the site's account context and role flags. """ - from app.db_sql import get_id_random log.setLevel(logging.INFO) log.debug(locals()) @@ -62,13 +62,13 @@ async def authenticate_passcode( if matched_role: log.info(f"Auth Success: Verified '{matched_role}' passcode for site {site_id}") - + # 4. Resolve Account Context account_id_random = record.get('account_id_random') if not account_id_random: if account_id_int := record.get('account_id'): account_id_random = get_id_random(record_id=account_id_int, table_name='account') - + # 5. Mint JWT payload = { 'account_id': account_id_random, @@ -81,13 +81,13 @@ async def authenticate_passcode( 'role': matched_role }) } - + token = sign_jwt( secret_key=settings.JWT_KEY, ttl=3600 * 24, # 24 hour session **payload ) - + return mk_resp(data={'jwt': token, 'account_id': account_id_random, 'role': matched_role}, response=response) else: log.warning(f"Auth Failed: Invalid passcode for site {site_id}") @@ -96,73 +96,32 @@ async def authenticate_passcode( log.warning(f"Auth Failed: Site {site_id} not found.") return mk_resp(data=False, status_code=404, response=response, status_message="Site not found.") +# --- JWT Request --- -# ### BEGIN ### API API ### request_jwt() ### -# This can be used to generate JWTs for various purposes: -# * for end client browser API access -# * for proof of sign in -# * newer/better version of sign in by URL -# Generate (sign) JWT using Aether platform super secret key or x_aether_signing_key sort of secret key if passed. The Aether platform super secret JWT signing key must be used API access token -# If x_aether_api_key is passed then set higher TTL -# If old and valid x_aether_api_jwt_token is passed then decode and decrease TTL by 1 -# Updated 2023-03-24 - -# Verify JWT using the API public key's associated API private key -# API server or trusted app can generate JWTs -# JWT contains: -# * client_token (to request a new short term client token) -# * iat -# * eat -# * account_id -# * client_id -# * order_cart_id -# * person_id -# * user_id -# API server verifies JWTs -# Updated 2021-07-14 @router.get('/request_jwt', response_model=Resp_Body_Base) async def request_jwt( - x_aether_signing_key: Optional[str] = Header(None, min_length=22, max_length=22), # The (secret) signing key. Keep safe!!! If passed then use to sign JWT. Otherwise need to get from system/environment. - - # x_aether_secret_key: Optional[str] = Header(None, min_length=22, max_length=22), # The Aether secret key. Keep safe!!! If passed then can also set TTL - - x_aether_api_key: Optional[str] = Header(None, min_length=22, max_length=22), # The client side API key. This should be kept secret by the client. If passed then store with JWT and can set TTL. - - # x_aether_api_public_key: Optional[str] = Header(None, min_length=22, max_length=22), # Used to look up the API secret if not given - - x_aether_jwt: Optional[str] = Header(None), # A JWT that was created and given to client browser or server in the past. It may or may not be valid. If the x_aether_signing_key was not passed, then assume it was signed with the Aether super secret key. - - account_id: str = None, # Handle this different because it is special - json_str: str = None, # This is what should be stored - b64_str: str = None, # This is what should be stored - # I would like payload to be a dict, but then we have to use POST instead of GET... - # Maybe base64 encode and decode? - - # session_id: str = None, # End client (web browser) - # client_id: str = None, # End client (web browser) - # person_id: str = None, - # user_id: str = None, - - max_ttl: int = 300, # Number of seconds to live. Only use if given the API secret key. - # Seconds: 3600 = 1 hr; 300 = 5 min - max_renew: int = 5, # Decrease count by 1 until 0 if only sent a current API token. + x_aether_signing_key: Optional[str] = Header(None, min_length=22, max_length=22), + x_aether_api_key: Optional[str] = Header(None, min_length=22, max_length=22), + x_aether_jwt: Optional[str] = Header(None), + account_id: str = None, + person_id: str = None, + user_id: str = None, + json_str: str = None, + b64_str: str = None, + max_ttl: int = 300, + max_renew: int = 5, response: Response = Response, ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL + log.setLevel(logging.WARNING) log.debug(locals()) - # One of these is required if account_id or json_str or b64_str: pass - else: return mk_resp(data=False, status_code=400, response=response) # Bad Request + else: return mk_resp(data=False, status_code=400, response=response) - # Possible overrides and checks go here if x_aether_signing_key: pass elif x_aether_api_key: - # Override any if for API JWT??? max_ttl = 3600 max_renew = 5 - # if not x_aether_secret_key: max_renew = 5 # Override any max_rewnew if no API secret - # api_secret_key = x_aether_secret_key signing_key = None if x_aether_signing_key: @@ -171,12 +130,9 @@ async def request_jwt( signing_key = settings.JWT_KEY else: log.error('No key found to sign the JWT with!') - return mk_resp(data=False, status_code=400, response=response) # Bad Request + return mk_resp(data=False, status_code=400, response=response) # SECURITY PATCH: Prevent public API key from minting privileged tokens - # If we are using the default system key (settings.JWT_KEY) but NO external signing key was provided - # (i.e. access via public API Key), we must NOT allow minting account-level privileges. - # UNLESS we are renewing a valid existing token (handled by x_aether_jwt renewal logic below). if not x_aether_signing_key and not x_aether_jwt: if account_id or person_id or user_id: log.warning("Security: Attempt to mint privileged JWT without signing key. Downgrading to Guest.") @@ -184,735 +140,111 @@ async def request_jwt( person_id = None user_id = None - payload = {} - payload['account_id'] = account_id - payload['json_str'] = json_str - payload['b64_str'] = b64_str + payload = { + 'account_id': account_id, + 'person_id': person_id, + 'user_id': user_id, + 'json_str': json_str, + 'b64_str': b64_str, + } token = sign_jwt(secret_key=signing_key, public_key=x_aether_api_key, ttl=max_ttl, max_renew=max_renew, **payload) - - response_data = { 'jwt': token } - - return mk_resp(data=response_data) - - - if x_aether_secret_key: - log.debug(f'Contains a value in x_aether_secret_key: {x_aether_secret_key}') - - table_name_select = 'api_key' - field_name = 'secret_key' - field_value = api_secret_key - - if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass - else: - log.warning('No results when looking up the API secret key') - return mk_resp(data=False, status_code=401, response=response) # Unauthorized - elif x_aether_api_public_key and x_aether_api_token: - table_name_select = 'api_key' - field_name = 'public_key' - field_value = x_aether_api_public_key - - if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass - else: - log.warning('No results when looking up the API public key') - return mk_resp(data=False, status_code=401, response=response) # Unauthorized - - # Check if the API keys are valid - if api_key_rec_select_result.get('enable', None): - api_key_rec = api_key_rec_select_result - else: - log.warning('API secret key not enabled') - return mk_resp(data=False, status_code=401, response=response) # Unauthorized - - current_datetime = datetime.datetime.utcnow() # datetime.datetime.now() Gets server local datetime - if api_key_rec.get('enable_from', None) <= current_datetime and api_key_rec.get('enable_to', None) >= current_datetime: - pass - else: - log.warning('API secret key expired') - return mk_resp(data=False, status_code=401, response=response) # Unauthorized - - if api_secret_key := api_key_rec.get('secret_key', None): pass - else: - log.warning('Secret key was not found') - return mk_resp(data=False, status_code=400, response=response) # Bad Request - - if api_public_key := api_key_rec.get('public_key', None): pass - else: - log.warning('Public key was not found') - return mk_resp(data=False, status_code=400, response=response) # Bad Request - - # Decode the JWT if an API token was sent and the API secret key was sent/found. - if x_aether_api_token and api_public_key and api_secret_key: - if current_token := decode_jwt(secret_key=api_secret_key, token=x_aether_api_token): - if current_token.get('max_renew', 0) > 0: pass - else: - message = 'The JWT sent is out of allowed renewals. Try again with a current JWT or just the API secret key.' - log.warning(message) - return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized - max_renew = current_token.get('max_renew', 0) - 1 - if not account_id: account_id = current_token.get('account_id', None) - if not person_id: person_id = current_token.get('person_id', None) - if not user_id: user_id = current_token.get('user_id', None) - else: - message = 'The JWT sent is either expired or otherwise invalid. Try again with a current JWT or just the API secret key.' - log.warning(message) - return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized - - payload = {} - payload['account_id'] = account_id - payload['person_id'] = person_id - payload['user_id'] = user_id - token = sign_jwt(secret_key=api_secret_key, public_key=api_public_key, ttl=max_ttl, max_renew=max_renew, **payload) - - response_data = { 'jwt': token } - - return mk_resp(data=response_data) -# ### END ### API API ### request_jwt() ### - + return mk_resp(data={ 'jwt': token }) @router.get('/temp_token', response_model=Resp_Body_Base) async def get_api_temp_token( x_aether_api_key: Optional[str] = Header(None), - x_aether_api_token: Optional[str] = Header(None), - x_aether_api_token_expire_on: Optional[str] = Header(None), response: Response = Response, ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - + log.setLevel(logging.WARNING) table_name_select = 'api_key' - field_name = 'secret_key' - field_value = x_aether_api_key - if x_aether_api_key: - log.debug(f'Contains a value in x_aether_api_key: {x_aether_api_key}') - sql_result = sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value) - else: - return mk_resp(data=False, status_code=400, response=response) # Bad Request + sql_result = sql_select(table_name=table_name_select, field_name='secret_key', field_value=x_aether_api_key) + if sql_result: + resp_data = Api_Base(**sql_result).dict(by_alias=True, exclude_unset=False) + return mk_resp(data=resp_data) + return mk_resp(data=False, status_code=404, response=response) - # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - if sql_result: - log.debug(type(sql_result)) - log.debug(sql_result) +# --- Jitsi Token --- - base_name = Api_Base - log.debug(base_name) - resp_data = base_name(**sql_result).dict(by_alias=True, exclude_unset=False) - log.debug(resp_data) - - return mk_resp(data=resp_data) - else: - log.debug(sql_result) - return mk_resp(data=False, status_code=404, response=response) - - - -# Updated 2025-12-02 -# It's best practice to import settings from a config file or environment variables -# For this example, we'll hardcode them, but you should use your actual values -# from your .env file JWT_APP_ID = "my_jitsi_app_id" JWT_APP_SECRET = "my_jitsi_app_secret-9876543210" JITSI_DOMAIN = "jitsi.dgrzone.com" -# Define the data model for the incoming request body from the client class JitsiTokenRequest(BaseModel): - """Defines the expected request body from your frontend.""" room: str = Field(..., description="The name of the Jitsi room.") name: str = Field(..., description="The display name of the user.") email: EmailStr = Field(..., description="The email of the user.") is_moderator: bool = Field(..., description="Whether the user should be a moderator.") + user: Optional[Dict[str, Union[str, bool]]] = Field(None) + features: Optional[Dict[str, bool]] = Field(None) + settings: Optional[Dict[str, bool]] = Field(None) + config: Optional[Dict] = Field(None) - # Clearly separated override categories - user: Optional[Dict[str, Union[str, bool]]] = Field(None, description="User-specific overrides like name, email, moderator.") - features: Optional[Dict[str, bool]] = Field(None, description="Feature flags like recording, livestreaming.") - settings: Optional[Dict[str, bool]] = Field(None, description="User profile settings like startMuted, reactionsMuted.") - config: Optional[Dict] = Field(None, description="Overrides for config.js properties.") - -# A simple endpoint to generate the Jitsi-specific JWT @router.post("/jitsi_token") -async def create_jitsi_jwt( - request_data: JitsiTokenRequest = Body(...), - - # commons: Common_Route_Params_Min = Depends(common_route_params_min), - ): - """ - Generates a Jitsi-specific JWT token for authentication. - The token includes claims to set the user's name, email, and moderator status. - """ - log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - # log.debug(f"Received Jitsi token request: {request_data.model_dump_json(indent=2)}") - log.debug(f"Received Jitsi token request: {request_data}") - +async def create_jitsi_jwt(request_data: JitsiTokenRequest = Body(...)): + log.setLevel(logging.INFO) if not request_data.is_moderator: - raise HTTPException( - status_code=403, - detail="JWT generation is only permitted for moderators." - ) + raise HTTPException(status_code=403, detail="JWT generation is only permitted for moderators.") try: - # Build the payload with the correct structure accepted by Jitsi - # Define the JWT payload with all the required claims for Jitsi. - # This is where we securely set the moderator and user info. - # Even though 'user' is included we are currently ignoring it to prevent client overrides. It is rebuilt below from the main fields. payload = { - "aud": JWT_APP_ID, - "iss": JWT_APP_ID, - "sub": JITSI_DOMAIN, # Your Jitsi base domain + "aud": JWT_APP_ID, "iss": JWT_APP_ID, "sub": JITSI_DOMAIN, "room": request_data.room, - "exp": int(time.time()) + 3600, # Token expires in 1 hour - - # 1. Top-level 'config' for config.js overrides + "exp": int(time.time()) + 3600, "config": request_data.config or {}, - - # 2. 'context' for user data, features, and moderator settings "context": { "user": { - "id": request_data.user['id'], + "id": request_data.user['id'] if request_data.user else "guest", "name": request_data.name, "email": request_data.email, - # CRITICAL: 'moderator' must be a boolean, not a string "moderator": request_data.is_moderator, }, - # 'features' enables/disables major Jitsi functionalities "features": request_data.features or {}, - # 'settings' controls the moderator's default options in the settings panel "settings": request_data.settings or {}, } } - - # Clean up empty objects to keep the final JWT tidy - if not payload["config"]: - del payload["config"] - if not payload["context"]["features"]: - del payload["context"]["features"] - if not payload["context"]["settings"]: - del payload["context"]["settings"] - - log.debug(f"Constructed JWT payload: {payload}") - - - # Sign the JWT with your secret key - # The algorithm must be the same as configured in your Prosody setup (HS256) - token = jwt.encode(payload, JWT_APP_SECRET, algorithm="HS256") - log.info("Jitsi JWT generated successfully.") - log.debug(token) - + token = pyjwt.encode(payload, JWT_APP_SECRET, algorithm="HS256") return {"token": token} - except Exception as e: - log.exception("Failed to create JWT") + log.exception("Failed to create Jitsi JWT") raise HTTPException(status_code=500, detail=f"Failed to create JWT: {str(e)}") +# --- Api_Base CRUD --- @router.post('', response_model=Resp_Body_Base) -async def post_api_obj( - obj: Api_Base, - x_account_id: str = Header(...), - return_obj: Optional[bool] = True, - by_alias: Optional[bool] = True, - exclude_unset: Optional[bool] = True, - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - obj_type = 'api' - obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) - result = post_obj_template( - obj_type=obj_type, - data=obj_data_dict, - return_obj=True, - by_alias=True, - exclude_unset=True, - ) - return result - +async def post_api_obj(obj: Api_Base, x_account_id: str = Header(...)): + return post_obj_template(obj_type='api', data=obj.dict(by_alias=False, exclude_unset=True), return_obj=True) @router.patch('/{obj_id}', response_model=Resp_Body_Base) -async def patch_api_obj( - obj_id: str, - obj: Api_Base = None, - x_account_id: Optional[str] = Header(..., ), - return_obj: Optional[bool] = True, - by_alias: Optional[bool] = True, - exclude_unset: Optional[bool] = True, - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - obj_type = 'api' - obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) - obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type) - obj_data_dict['id_random'] = obj_id - result = patch_obj_template( - obj_type=obj_type, - data=obj_data_dict, - obj_id=obj_id, - return_obj=True, - by_alias=True, - exclude_unset=True, - ) - return result - +async def patch_api_obj(obj_id: str, obj: Api_Base, x_account_id: str = Header(...)): + data = obj.dict(by_alias=False, exclude_unset=True) + data['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name='api') + return patch_obj_template(obj_type='api', data=data, obj_id=obj_id, return_obj=True) @router.get('/list', response_model=Resp_Body_Base) -async def get_api_obj_li( - for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50), - for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22), - x_account_id: str = Header(...), - by_alias: Optional[bool] = True, - exclude_unset: Optional[bool] = True, - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - obj_type = 'api' - result = get_obj_li_template( - obj_type=obj_type, - for_obj_type=for_obj_type, - for_obj_id=for_obj_id, - by_alias=True, - exclude_unset=True, - ) - return result - +async def get_api_obj_li(for_obj_type: Optional[str] = Query(None), for_obj_id: Optional[str] = Query(None), x_account_id: str = Header(...)): + return get_obj_li_template(obj_type='api', for_obj_type=for_obj_type, for_obj_id=for_obj_id) @router.get('/{obj_id}', response_model=Resp_Body_Base) -async def get_api_obj( - obj_id: str, - x_account_id: str = Header(...), - by_alias: Optional[bool] = True, - exclude_unset: Optional[bool] = True, - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - obj_type = 'api' - result = get_obj_template( - obj_type=obj_type, - obj_id=obj_id, - by_alias=True, - exclude_unset=True, - ) - return result - +async def get_api_obj(obj_id: str, x_account_id: str = Header(...)): + return get_obj_template(obj_type='api', obj_id=obj_id) @router.delete('/{obj_id}', response_model=Resp_Body_Base) -async def delete_api_obj( - obj_id: str, - x_account_id: str = Header(...), - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - obj_type = 'api' - result = delete_obj_template( - obj_type=obj_type, - obj_id=obj_id, - ) - return result - +async def delete_api_obj(obj_id: str, x_account_id: str = Header(...)): + return delete_obj_template(obj_type='api', obj_id=obj_id) @router.get('/get_id/{object_type}/{object_id_random}', response_model=Resp_Body_Base) -async def get_api_object_id( - object_type: str, - object_id_random: str, - x_account_id: str = Header(...), - by_alias: Optional[bool] = True, - exclude_unset: Optional[bool] = True, - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - +async def get_api_object_id(object_type: str, object_id_random: str): if object_id := redis_lookup_id_random(record_id_random=object_id_random, table_name=object_type): - return mk_resp(data={ 'object_id': object_id}, status_code=400) - else: return mk_resp(data=None, status_code=400) + return mk_resp(data={ 'object_id': object_id}) + return mk_resp(data=None, status_code=404) - -# ### BEGIN ### API API ### sql_test() ### @router.get('/sql_test', tags=['Testing']) async def sql_test(response: Response = Response): - log.setLevel(logging.DEBUG) - log.debug(locals()) - sql = text("SELECT NOW() as current_time, VERSION() as version") try: - result_proxy = db.execute(sql) - result = result_proxy.fetchone() - data = { - "current_time": str(result[0]), - "version": result[1] - } - return mk_resp(data=data, response=response) + result = db.execute(sql).fetchone() + return mk_resp(data={"current_time": str(result[0]), "version": result[1]}) except Exception as e: - log.error(f'SQL Test failed: {str(e)}') - return mk_resp(data=False, status_code=500, details=str(e), response=response) -# ### END ### API API ### sql_test() ### - - - if x_aether_secret_key: - log.debug(f'Contains a value in x_aether_secret_key: {x_aether_secret_key}') - - table_name_select = 'api_key' - field_name = 'secret_key' - field_value = api_secret_key - - if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass - else: - log.warning('No results when looking up the API secret key') - return mk_resp(data=False, status_code=401, response=response) # Unauthorized - elif x_aether_api_public_key and x_aether_api_token: - table_name_select = 'api_key' - field_name = 'public_key' - field_value = x_aether_api_public_key - - if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass - else: - log.warning('No results when looking up the API public key') - return mk_resp(data=False, status_code=401, response=response) # Unauthorized - - # Check if the API keys are valid - if api_key_rec_select_result.get('enable', None): - api_key_rec = api_key_rec_select_result - else: - log.warning('API secret key not enabled') - return mk_resp(data=False, status_code=401, response=response) # Unauthorized - - current_datetime = datetime.datetime.utcnow() # datetime.datetime.now() Gets server local datetime - if api_key_rec.get('enable_from', None) <= current_datetime and api_key_rec.get('enable_to', None) >= current_datetime: - pass - else: - log.warning('API secret key expired') - return mk_resp(data=False, status_code=401, response=response) # Unauthorized - - if api_secret_key := api_key_rec.get('secret_key', None): pass - else: - log.warning('Secret key was not found') - return mk_resp(data=False, status_code=400, response=response) # Bad Request - - if api_public_key := api_key_rec.get('public_key', None): pass - else: - log.warning('Public key was not found') - return mk_resp(data=False, status_code=400, response=response) # Bad Request - - # Decode the JWT if an API token was sent and the API secret key was sent/found. - if x_aether_api_token and api_public_key and api_secret_key: - if current_token := decode_jwt(secret_key=api_secret_key, token=x_aether_api_token): - if current_token.get('max_renew', 0) > 0: pass - else: - message = 'The JWT sent is out of allowed renewals. Try again with a current JWT or just the API secret key.' - log.warning(message) - return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized - max_renew = current_token.get('max_renew', 0) - 1 - if not account_id: account_id = current_token.get('account_id', None) - if not person_id: person_id = current_token.get('person_id', None) - if not user_id: user_id = current_token.get('user_id', None) - else: - message = 'The JWT sent is either expired or otherwise invalid. Try again with a current JWT or just the API secret key.' - log.warning(message) - return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized - - payload = {} - payload['account_id'] = account_id - payload['person_id'] = person_id - payload['user_id'] = user_id - token = sign_jwt(secret_key=api_secret_key, public_key=api_public_key, ttl=max_ttl, max_renew=max_renew, **payload) - - response_data = { 'jwt': token } - - return mk_resp(data=response_data) -# ### END ### API API ### request_jwt() ### - - -@router.get('/temp_token', response_model=Resp_Body_Base) -async def get_api_temp_token( - x_aether_api_key: Optional[str] = Header(None), - x_aether_api_token: Optional[str] = Header(None), - x_aether_api_token_expire_on: Optional[str] = Header(None), - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - table_name_select = 'api_key' - field_name = 'secret_key' - field_value = x_aether_api_key - - if x_aether_api_key: - log.debug(f'Contains a value in x_aether_api_key: {x_aether_api_key}') - sql_result = sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value) - else: - return mk_resp(data=False, status_code=400, response=response) # Bad Request - - # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - if sql_result: - log.debug(type(sql_result)) - log.debug(sql_result) - - base_name = Api_Base - log.debug(base_name) - resp_data = base_name(**sql_result).dict(by_alias=True, exclude_unset=False) - log.debug(resp_data) - - return mk_resp(data=resp_data) - else: - log.debug(sql_result) - return mk_resp(data=False, status_code=404, response=response) - - - -# Updated 2025-12-02 -# It's best practice to import settings from a config file or environment variables -# For this example, we'll hardcode them, but you should use your actual values -# from your .env file -JWT_APP_ID = "my_jitsi_app_id" -JWT_APP_SECRET = "my_jitsi_app_secret-9876543210" -JITSI_DOMAIN = "jitsi.dgrzone.com" - -# Define the data model for the incoming request body from the client -class JitsiTokenRequest(BaseModel): - """Defines the expected request body from your frontend.""" - room: str = Field(..., description="The name of the Jitsi room.") - name: str = Field(..., description="The display name of the user.") - email: EmailStr = Field(..., description="The email of the user.") - is_moderator: bool = Field(..., description="Whether the user should be a moderator.") - - # Clearly separated override categories - user: Optional[Dict[str, Union[str, bool]]] = Field(None, description="User-specific overrides like name, email, moderator.") - features: Optional[Dict[str, bool]] = Field(None, description="Feature flags like recording, livestreaming.") - settings: Optional[Dict[str, bool]] = Field(None, description="User profile settings like startMuted, reactionsMuted.") - config: Optional[Dict] = Field(None, description="Overrides for config.js properties.") - -# A simple endpoint to generate the Jitsi-specific JWT -@router.post("/jitsi_token") -async def create_jitsi_jwt( - request_data: JitsiTokenRequest = Body(...), - - # commons: Common_Route_Params_Min = Depends(common_route_params_min), - ): - """ - Generates a Jitsi-specific JWT token for authentication. - The token includes claims to set the user's name, email, and moderator status. - """ - log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - # log.debug(f"Received Jitsi token request: {request_data.model_dump_json(indent=2)}") - log.debug(f"Received Jitsi token request: {request_data}") - - if not request_data.is_moderator: - raise HTTPException( - status_code=403, - detail="JWT generation is only permitted for moderators." - ) - - try: - # Build the payload with the correct structure accepted by Jitsi - # Define the JWT payload with all the required claims for Jitsi. - # This is where we securely set the moderator and user info. - # Even though 'user' is included we are currently ignoring it to prevent client overrides. It is rebuilt below from the main fields. - payload = { - "aud": JWT_APP_ID, - "iss": JWT_APP_ID, - "sub": JITSI_DOMAIN, # Your Jitsi base domain - "room": request_data.room, - "exp": int(time.time()) + 3600, # Token expires in 1 hour - - # 1. Top-level 'config' for config.js overrides - "config": request_data.config or {}, - - # 2. 'context' for user data, features, and moderator settings - "context": { - "user": { - "id": request_data.user['id'], - "name": request_data.name, - "email": request_data.email, - # CRITICAL: 'moderator' must be a boolean, not a string - "moderator": request_data.is_moderator, - }, - # 'features' enables/disables major Jitsi functionalities - "features": request_data.features or {}, - # 'settings' controls the moderator's default options in the settings panel - "settings": request_data.settings or {}, - } - } - - # Clean up empty objects to keep the final JWT tidy - if not payload["config"]: - del payload["config"] - if not payload["context"]["features"]: - del payload["context"]["features"] - if not payload["context"]["settings"]: - del payload["context"]["settings"] - - log.debug(f"Constructed JWT payload: {payload}") - - - # Sign the JWT with your secret key - # The algorithm must be the same as configured in your Prosody setup (HS256) - token = jwt.encode(payload, JWT_APP_SECRET, algorithm="HS256") - log.info("Jitsi JWT generated successfully.") - log.debug(token) - - return {"token": token} - - except Exception as e: - log.exception("Failed to create JWT") - raise HTTPException(status_code=500, detail=f"Failed to create JWT: {str(e)}") - - -@router.post('', response_model=Resp_Body_Base) -async def post_api_obj( - obj: Api_Base, - x_account_id: str = Header(...), - return_obj: Optional[bool] = True, - by_alias: Optional[bool] = True, - exclude_unset: Optional[bool] = True, - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - obj_type = 'api' - obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) - result = post_obj_template( - obj_type=obj_type, - data=obj_data_dict, - return_obj=True, - by_alias=True, - exclude_unset=True, - ) - return result - - -@router.patch('/{obj_id}', response_model=Resp_Body_Base) -async def patch_api_obj( - obj_id: str, - obj: Api_Base = None, - x_account_id: Optional[str] = Header(..., ), - return_obj: Optional[bool] = True, - by_alias: Optional[bool] = True, - exclude_unset: Optional[bool] = True, - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - obj_type = 'api' - obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) - obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type) - obj_data_dict['id_random'] = obj_id - result = patch_obj_template( - obj_type=obj_type, - data=obj_data_dict, - obj_id=obj_id, - return_obj=True, - by_alias=True, - exclude_unset=True, - ) - return result - - -@router.get('/list', response_model=Resp_Body_Base) -async def get_api_obj_li( - for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50), - for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22), - x_account_id: str = Header(...), - by_alias: Optional[bool] = True, - exclude_unset: Optional[bool] = True, - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - obj_type = 'api' - result = get_obj_li_template( - obj_type=obj_type, - for_obj_type=for_obj_type, - for_obj_id=for_obj_id, - by_alias=True, - exclude_unset=True, - ) - return result - - -@router.get('/{obj_id}', response_model=Resp_Body_Base) -async def get_api_obj( - obj_id: str, - x_account_id: str = Header(...), - by_alias: Optional[bool] = True, - exclude_unset: Optional[bool] = True, - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - obj_type = 'api' - result = get_obj_template( - obj_type=obj_type, - obj_id=obj_id, - by_alias=True, - exclude_unset=True, - ) - return result - - -@router.delete('/{obj_id}', response_model=Resp_Body_Base) -async def delete_api_obj( - obj_id: str, - x_account_id: str = Header(...), - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - obj_type = 'api' - result = delete_obj_template( - obj_type=obj_type, - obj_id=obj_id, - ) - return result - - -@router.get('/get_id/{object_type}/{object_id_random}', response_model=Resp_Body_Base) -async def get_api_object_id( - object_type: str, - object_id_random: str, - x_account_id: str = Header(...), - by_alias: Optional[bool] = True, - exclude_unset: Optional[bool] = True, - response: Response = Response, - ): - log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL - log.debug(locals()) - - if object_id := redis_lookup_id_random(record_id_random=object_id_random, table_name=object_type): - return mk_resp(data={ 'object_id': object_id}, status_code=400) - else: return mk_resp(data=None, status_code=400) - - -# ### BEGIN ### API API ### sql_test() ### -@router.get('/sql_test', tags=['Testing']) -async def sql_test(response: Response = Response): - log.setLevel(logging.DEBUG) - log.debug(locals()) - - sql = text("SELECT NOW() as current_time, VERSION() as version") - try: - result_proxy = db.execute(sql) - result = result_proxy.fetchone() - data = { - "current_time": str(result[0]), - "version": result[1] - } - return mk_resp(data=data, response=response) - except Exception as e: - log.error(f'SQL Test failed: {str(e)}') - return mk_resp(data=False, status_code=500, details=str(e), response=response) -# ### END ### API API ### sql_test() ### + return mk_resp(data=False, status_code=500, details=str(e), response=response) \ No newline at end of file diff --git a/tests/e2e/repro_intermittent_errors.py b/tests/e2e/repro_intermittent_errors.py new file mode 100644 index 0000000..e312af7 --- /dev/null +++ b/tests/e2e/repro_intermittent_errors.py @@ -0,0 +1,88 @@ +import requests +import json +import time +import concurrent.futures + +# Configuration +BASE_URL = "https://dev-api.oneskyit.com/v3/crud" +API_KEY = "IDF68Em5X4HTZlswRNgepQ" +ACCOUNT_ID = "_XY7DXtc9MY" +JOURNAL_ID = "DCAV-06-35-85" +EVENT_ID = "pjrcghqwert" + +HEADERS = { + "X-Aether-API-Key": API_KEY, + "X-Account-ID": ACCOUNT_ID, + "Content-Type": "application/json" +} + +def hit_endpoint(name, method, url, body=None, params=None): + try: + start = time.time() + if method == "GET": + resp = requests.get(url, headers=HEADERS, params=params, timeout=15) + else: + resp = requests.post(url, headers=HEADERS, json=body, params=params, timeout=15) + duration = time.time() - start + + status = resp.status_code + print(f"[{name}] Status: {status} | Duration: {duration:.3f}s") + + if status != 200: + print(f" ❌ ERROR: {resp.text[:200]}") + return False + return True + except Exception as e: + print(f"[{name}] ❌ EXCEPTION: {e}") + return False + +def test_batch(): + success_count = 0 + total = 0 + + # 1. Journal Entries (Nested GET) + url = f"{BASE_URL}/journal/{JOURNAL_ID}/journal_entry/" + if hit_endpoint("Journal Entries (Nested GET)", "GET", url): success_count += 1 + total += 1 + + # 2. Event Badges (Search POST) + url = f"{BASE_URL}/event_badge/search" + query = {"q": "%"} + if hit_endpoint("Event Badges (Search POST)", "POST", url, body=query): success_count += 1 + total += 1 + + # 3. Event Sessions (Search POST) + url = f"{BASE_URL}/event_session/search" + query = {"q": "%"} + if hit_endpoint("Event Sessions (Search POST)", "POST", url, body=query): success_count += 1 + total += 1 + + # 4. Event Sessions (List GET with parent) + url = f"{BASE_URL}/event_session/" + params = {"for_obj_type": "event", "for_obj_id": EVENT_ID} + if hit_endpoint("Event Sessions (List GET)", "GET", url, params=params): success_count += 1 + total += 1 + + return success_count, total + +if __name__ == "__main__": + print(f"Reproduction script started against {BASE_URL}") + print(f"Using Account: {ACCOUNT_ID}\n") + + grand_success = 0 + grand_total = 0 + + iterations = 5 + # Running 20 requests total (5 batches of 4) + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + futures = [executor.submit(test_batch) for _ in range(iterations)] + for future in concurrent.futures.as_completed(futures): + s, t = future.result() + grand_success += s + grand_total += t + + print(f"\nFinal Results: {grand_success}/{grand_total} requests succeeded.") + if grand_success == grand_total: + print("🎉 SUCCESS! No more timeouts or 403s.") + else: + print("❌ Reproduced some failures!") \ No newline at end of file