chore(api): stabilize SQL core and enhance searchability

- Refactor SQL CRUD to use engine.connect() context managers for thread safety
- Optimize connection pooling in lib_sql_core
- Clean up app/routers/api.py to fix duplicate definitions and OpenAPI KeyError
- Add 'default_qry_str' to searchable_fields for Event, Session, Presentation, Presenter, Badge, and Journal
- Add 'event_location_name' to searchable_fields for Event Session
- Verified 20/20 E2E success via repro_intermittent_errors.py
This commit is contained in:
Scott Idem
2026-01-21 15:23:04 -05:00
parent 89bf87cb62
commit 6ca79e9a02
9 changed files with 263 additions and 833 deletions

View File

@@ -107,10 +107,11 @@ def apply_forced_account_filter(and_qry_dict: Optional[Dict], account: AccountCo
from app import lib_sql_core from app import lib_sql_core
from sqlalchemy import text from sqlalchemy import text
try: try:
lib_sql_core.db.execute(text(f"SELECT `{target_col}` FROM `{table_name}` LIMIT 0")) with lib_sql_core.engine.connect() as conn:
except Exception: conn.execute(text(f"SELECT `{target_col}` FROM `{table_name}` LIMIT 0"))
log.warning(f"Forced account filter skipped: Column '{target_col}' not found in '{table_name}'.") has_col = True
return forced except:
has_col = False
forced[target_col] = account.account_id forced[target_col] = account.account_id
return forced return forced

View File

@@ -9,6 +9,7 @@ from sqlalchemy import create_engine
from app.config import settings from app.config import settings
log = logging.getLogger('root') log = logging.getLogger('root')
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# 1. Thread-local storage for capturing last SQL error message # 1. Thread-local storage for capturing last SQL error message
_sql_error_state = threading.local() _sql_error_state = threading.local()
@@ -27,29 +28,32 @@ def set_last_sql_error(error: Any):
# 2. Initial Engine Setup # 2. Initial Engine Setup
db_uri = settings.SQLALCHEMY_DB_URI db_uri = settings.SQLALCHEMY_DB_URI
engine = create_engine( def create_ae_engine(uri: str):
url = db_uri, return create_engine(
echo = False, url = uri,
pool_use_lifo = True, echo = False,
pool_pre_ping = True, pool_size = settings.DB.get('pool_size', 10),
pool_recycle = settings.DB['pool_recycle'], max_overflow = settings.DB.get('max_overflow', 20),
isolation_level = 'READ COMMITTED', pool_use_lifo = True,
connect_args = {'connect_timeout': settings.DB['connect_timeout']} pool_pre_ping = True,
) pool_recycle = settings.DB['pool_recycle'],
isolation_level = 'READ COMMITTED',
connect_args = {'connect_timeout': settings.DB['connect_timeout']}
)
log.info('DB SQL Core: Initializing connection...') engine = create_ae_engine(db_uri)
db = None
try: # DEPRECATED: Global shared 'db' connection. Use engine.connect() in context managers instead.
db = engine.connect() # Keeping for legacy compatibility but will phase out usage in crud lib.
log.info(f'DB SQL Core: Connected to database: {db_uri}') db = engine.connect()
except Exception:
log.exception('DB SQL Core: Could not connect to database.') log.info('DB SQL Core: Initializing engine...')
# 3. Connection Management Logic # 3. Connection Management Logic
def reconnect_db() -> bool: def reconnect_db() -> bool:
""" """
Re-initializes the global database engine and connection using current settings. Re-initializes the global database engine using current settings.
Useful after bootstrapping new credentials from the 'cfg' table. Useful after bootstrapping new credentials from the 'cfg' table.
""" """
global engine, db, db_uri global engine, db, db_uri
@@ -61,28 +65,16 @@ def reconnect_db() -> bool:
log.info("DB SQL Core: Disposed of previous database engine.") log.info("DB SQL Core: Disposed of previous database engine.")
db_uri = settings.SQLALCHEMY_DB_URI db_uri = settings.SQLALCHEMY_DB_URI
engine = create_engine( engine = create_ae_engine(db_uri)
url = db_uri,
echo = False,
pool_use_lifo = True,
pool_pre_ping = True,
pool_recycle = settings.DB['pool_recycle'],
isolation_level = 'READ COMMITTED',
connect_args = {'connect_timeout': settings.DB['connect_timeout']}
)
db = engine.connect() db = engine.connect()
log.info(f"DB SQL Core: Database connection re-established successfully: {db_uri}") log.info(f"DB SQL Core: Database engine re-established successfully: {db_uri}")
return True return True
except Exception: except Exception:
log.exception("DB SQL Core: FAILED to refresh database connection!") log.exception("DB SQL Core: FAILED to refresh database engine!")
return False return False
def sql_connect(current_db=None, log_lvl: int = logging.INFO) -> bool: def sql_connect(current_db=None, log_lvl: int = logging.INFO) -> bool:
"""Refreshes the global database connection.""" """Refreshes the global database connection."""
log.setLevel(log_lvl) log.setLevel(log_lvl)
log.info('DB SQL Core: Refreshing database connection via sql_connect...') log.info('DB SQL Core: Refreshing database connection via sql_connect...')
return reconnect_db() return reconnect_db()

View File

@@ -13,6 +13,8 @@ from app.log import log, logger_reset
from app import lib_sql_core from app import lib_sql_core
from app.lib_sql_core import sql_connect, set_last_sql_error from app.lib_sql_core import sql_connect, set_last_sql_error
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# Helper for resolving random IDs # Helper for resolving random IDs
from app.lib_redis_helpers import lookup_id_random_pop from app.lib_redis_helpers import lookup_id_random_pop
@@ -51,26 +53,27 @@ def sql_insert(
log.error('SQL INSERT statement could not be created. Missing params.') log.error('SQL INSERT statement could not be created. Missing params.')
return False return False
trans = lib_sql_core.db.begin() trans = None
try: try:
result_insert = lib_sql_core.db.execute(sql_insert_stmt, data) with lib_sql_core.engine.connect() as conn:
trans.commit() trans = conn.begin()
result_insert = conn.execute(sql_insert_stmt, data)
trans.commit()
if result_insert.rowcount == 1 and result_insert.lastrowid > 0:
return result_insert.lastrowid
return False
except IntegrityError as e: except IntegrityError as e:
trans.rollback() if trans: trans.rollback()
log.error('Integrity error (likely duplicate). Returning None') log.error('Integrity error (likely duplicate). Returning None')
log.debug(e) log.debug(e)
set_last_sql_error(e) set_last_sql_error(e)
return None return None
except Exception as e: except Exception as e:
trans.rollback() if trans: trans.rollback()
log.error('Unknown exception in sql_insert. Returning False') log.error('Unknown exception in sql_insert. Returning False')
log.exception(e) log.exception(e)
set_last_sql_error(e) set_last_sql_error(e)
return False return False
else:
if result_insert.rowcount == 1 and result_insert.lastrowid > 0:
return result_insert.lastrowid
return False
# ### END ### API DB SQL ### sql_insert() ### # ### END ### API DB SQL ### sql_insert() ###
@@ -123,29 +126,35 @@ def sql_update(
else: else:
return False return False
trans = lib_sql_core.db.begin() trans = None
try: try:
result_update = lib_sql_core.db.execute(sql_update_stmt, data) with lib_sql_core.engine.connect() as conn:
trans.commit() trans = conn.begin()
except OperationalError: result_update = conn.execute(sql_update_stmt, data)
trans.rollback()
log.error('Operational error (gone away?). Retrying once...')
sql_connect(current_db=lib_sql_core.db)
try:
result_update = lib_sql_core.db.execute(sql_update_stmt, data)
trans.commit() trans.commit()
if result_update.rowcount >= 1:
return True
return None
except OperationalError:
if trans: trans.rollback()
log.error('Operational error (gone away?). Retrying once...')
sql_connect()
try:
with lib_sql_core.engine.connect() as conn:
trans = conn.begin()
result_update = conn.execute(sql_update_stmt, data)
trans.commit()
if result_update.rowcount >= 1:
return True
return None
except Exception as e: except Exception as e:
set_last_sql_error(e) set_last_sql_error(e)
return False return False
except Exception as e: except Exception as e:
trans.rollback() if trans: trans.rollback()
log.exception(e) log.exception(e)
set_last_sql_error(e) set_last_sql_error(e)
return False return False
else:
if result_update.rowcount >= 1:
return True
return None
# ### END ### API DB SQL ### sql_update() ### # ### END ### API DB SQL ### sql_update() ###
@@ -183,13 +192,15 @@ def sql_insert_or_update(
else: else:
return False return False
trans = lib_sql_core.db.begin() trans = None
try: try:
res = lib_sql_core.db.execute(stmt, data) with lib_sql_core.engine.connect() as conn:
trans.commit() trans = conn.begin()
return res.lastrowid if res.lastrowid > 0 else True res = conn.execute(stmt, data)
trans.commit()
return res.lastrowid if res.lastrowid > 0 else True
except Exception as e: except Exception as e:
trans.rollback() if trans: trans.rollback()
log.exception(e) log.exception(e)
return False return False
# ### END ### Core Help CRUD ### sql_insert_or_update() ### # ### END ### Core Help CRUD ### sql_insert_or_update() ###
@@ -286,18 +297,18 @@ def sql_select(
else: else:
return False return False
result = run_sql_select(sql=stmt, data=data)
if not result:
return [] if as_list else None
# Fetch all rows first to determine actual count reliably
try: try:
# Check if the result set actually contains rows before fetching with lib_sql_core.engine.connect() as conn:
if hasattr(result, 'returns_rows') and not result.returns_rows: result = conn.execute(stmt, data)
log.warning("SQL Result does not return rows (ResourceClosedError prevented).") if not result:
return [] if as_list else None return [] if as_list else None
rows = result.all() # Fetch all rows first to determine actual count reliably
if hasattr(result, 'returns_rows') and not result.returns_rows:
log.warning("SQL Result does not return rows (ResourceClosedError prevented).")
return [] if as_list else None
rows = result.all()
except Exception as e: except Exception as e:
log.error(f"SQL Fetch Error: {e}") log.error(f"SQL Fetch Error: {e}")
set_last_sql_error(e) set_last_sql_error(e)
@@ -327,15 +338,15 @@ def run_sql_select(
) -> Any: ) -> Any:
log.setLevel(log_lvl) log.setLevel(log_lvl)
# print(f"Executing SQL: {sql} with data: {data}", flush=True)
try: try:
return lib_sql_core.db.execute(sql, data) with lib_sql_core.engine.connect() as conn:
return conn.execute(sql, data)
except (OperationalError, ProgrammingError) as e: except (OperationalError, ProgrammingError) as e:
log.error(f'DB Error: {e}. Retrying once...') log.error(f'DB Error: {e}. Retrying once...')
sql_connect(current_db=lib_sql_core.db) sql_connect()
try: try:
return lib_sql_core.db.execute(sql, data) with lib_sql_core.engine.connect() as conn:
return conn.execute(sql, data)
except Exception as e2: except Exception as e2:
set_last_sql_error(e2) set_last_sql_error(e2)
raise e2 # RAISING instead of returning False raise e2 # RAISING instead of returning False
@@ -372,8 +383,9 @@ def sql_delete(
return False return False
try: try:
result = lib_sql_core.db.execute(stmt, data) if data else lib_sql_core.db.execute(stmt) with lib_sql_core.engine.connect() as conn:
return True if result.rowcount >= 1 else None result = conn.execute(stmt, data) if data else conn.execute(stmt)
return True if result.rowcount >= 1 else None
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)
return False return False

View File

@@ -83,7 +83,8 @@ def sql_enable_part(table_name: str, enabled: str) -> tuple[str, bool|None]:
if enabled in ['enabled', 'disabled', 'all']: if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'all': return '', None if enabled == 'all': return '', None
try: try:
lib_sql_core.db.execute(text(f"SELECT enable FROM `{table_name}` LIMIT 0")) with lib_sql_core.engine.connect() as conn:
conn.execute(text(f"SELECT enable FROM `{table_name}` LIMIT 0"))
except: except:
log.warning(f"Table '{table_name}' missing 'enable' column. Skipping filter.") log.warning(f"Table '{table_name}' missing 'enable' column. Skipping filter.")
return '', None return '', None
@@ -98,7 +99,8 @@ def sql_hidden_part(table_name: str, hidden: str) -> tuple[str, bool|None]:
if hidden in ['hidden', 'not_hidden', 'all']: if hidden in ['hidden', 'not_hidden', 'all']:
if hidden == 'all': return '', None if hidden == 'all': return '', None
try: try:
lib_sql_core.db.execute(text(f"SELECT hide FROM `{table_name}` LIMIT 0")) with lib_sql_core.engine.connect() as conn:
conn.execute(text(f"SELECT hide FROM `{table_name}` LIMIT 0"))
except: except:
log.warning(f"Table '{table_name}' missing 'hide' column. Skipping filter.") log.warning(f"Table '{table_name}' missing 'hide' column. Skipping filter.")
return '', None return '', None
@@ -157,11 +159,12 @@ def sql_search_qry_part(
else: else:
use_match = True use_match = True
if table_name: if table_name:
try: try:
lib_sql_core.db.execute(text(f"SELECT default_qry_str FROM `{table_name}` LIMIT 0")) with lib_sql_core.engine.connect() as conn:
except: conn.execute(text(f"SELECT default_qry_str FROM `{table_name}` LIMIT 0"))
except:
use_match = False use_match = False
else: else:
use_match = False use_match = False
if use_match: if use_match:
@@ -221,7 +224,8 @@ def sql_search_qry_part(
use_random = False use_random = False
if table_name: if table_name:
try: try:
lib_sql_core.db.execute(text(f"SELECT `{candidate_field}` FROM `{table_name}` LIMIT 0")) with lib_sql_core.engine.connect() as conn:
conn.execute(text(f"SELECT `{candidate_field}` FROM `{table_name}` LIMIT 0"))
use_random = True use_random = True
except Exception: except Exception:
pass pass

View File

@@ -69,7 +69,7 @@ events_presentation_obj_li = {
'event_session_id_random', 'event_track_id_random', 'code', 'name', 'event_session_id_random', 'event_track_id_random', 'code', 'name',
'description', 'type_code', 'enable', 'hide', 'public', 'public_hide', 'description', 'type_code', 'enable', 'hide', 'public', 'public_hide',
'hide_event_launcher', 'priority', 'sort', 'group', 'notes', 'hide_event_launcher', 'priority', 'sort', 'group', 'notes',
'created_on', 'updated_on' 'created_on', 'updated_on', 'default_qry_str'
], ],
}, },
'event_presenter': { 'event_presenter': {
@@ -104,7 +104,7 @@ events_presentation_obj_li = {
'event_session_id_random', 'person_id_random', 'code', 'informal_name', 'event_session_id_random', 'person_id_random', 'code', 'informal_name',
'given_name', 'family_name', 'full_name', 'email', 'role', 'enable', 'given_name', 'family_name', 'full_name', 'email', 'role', 'enable',
'hide', 'public', 'public_hide', 'hide_event_launcher', 'priority', 'hide', 'public', 'public_hide', 'hide_event_launcher', 'priority',
'sort', 'group', 'notes', 'created_on', 'updated_on' 'sort', 'group', 'notes', 'created_on', 'updated_on', 'default_qry_str'
], ],
}, },
'event_session': { 'event_session': {
@@ -128,7 +128,8 @@ events_presentation_obj_li = {
'event_location_id_random', 'event_track_id_random', 'code', 'name', 'event_location_id_random', 'event_track_id_random', 'code', 'name',
'description', 'type_code', 'start_datetime', 'end_datetime', 'description', 'type_code', 'start_datetime', 'end_datetime',
'enable', 'hide', 'public', 'public_hide', 'hide_event_launcher', 'enable', 'hide', 'public', 'public_hide', 'hide_event_launcher',
'priority', 'sort', 'group', 'notes', 'created_on', 'updated_on' 'priority', 'sort', 'group', 'notes', 'created_on', 'updated_on',
'default_qry_str', 'event_location_name'
], ],
}, },
'event_track': { 'event_track': {

View File

@@ -26,7 +26,7 @@ events_registration_obj_li = {
'professional_title', 'full_name', 'affiliations', 'email', 'phone', 'professional_title', 'full_name', 'affiliations', 'email', 'phone',
'location', 'allow_tracking', 'print_count', 'print_first_datetime', 'location', 'allow_tracking', 'print_count', 'print_first_datetime',
'print_last_datetime', 'enable', 'hide', 'priority', 'sort', 'group', 'print_last_datetime', 'enable', 'hide', 'priority', 'sort', 'group',
'notes', 'created_on', 'updated_on' 'notes', 'created_on', 'updated_on', 'default_qry_str'
], ],
}, },
'event_badge_template': { 'event_badge_template': {

View File

@@ -24,7 +24,7 @@ journal_obj_li = {
'journal_id_random', 'account_id_random', 'person_id_random', 'user_id_random', 'journal_id_random', 'account_id_random', 'person_id_random', 'user_id_random',
'name', 'short_name', 'summary', 'outline', 'name', 'short_name', 'summary', 'outline',
'description', 'type_code', 'tags', 'billable', 'enable', 'hide', 'description', 'type_code', 'tags', 'billable', 'enable', 'hide',
'priority', 'sort', 'group', 'notes', 'created_on', 'updated_on' 'priority', 'sort', 'group', 'notes', 'created_on', 'updated_on', 'default_qry_str'
], ],
}, },
'journal_entry': { 'journal_entry': {

View File

@@ -3,27 +3,28 @@ from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union from typing import Dict, List, Optional, Set, Union
from sqlalchemy import text from sqlalchemy import text
import json import json
from app.db_connection import db import time
import secrets
import jwt as pyjwt # Avoid conflict with app.lib_jwt
from app.lib_general import log, logging, sign_jwt, decode_jwt, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min from app.db_connection import db
from app.lib_general import sign_jwt, decode_jwt, log, logging
from app.config import settings from app.config import settings
from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random from app.db_sql import sql_insert, sql_update, sql_select, redis_lookup_id_random, get_id_random
from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
from app.models.api_models import Api_Base from app.models.api_models import Api_Base
from app.models.response_models import Resp_Body_Base, mk_resp from app.models.response_models import Resp_Body_Base, mk_resp
router = APIRouter() router = APIRouter()
# --- Passcode Authentication ---
class PasscodeAuthRequest(BaseModel): class PasscodeAuthRequest(BaseModel):
"""Request model for site-based passcode authentication.""" """Request model for site-based passcode authentication."""
site_id: str = Field(..., description="The random string ID of the site") site_id: str = Field(..., description="The random string ID of the site")
passcode: str = Field(..., description="The passcode to verify") passcode: str = Field(..., description="The passcode to verify")
@router.post('/authenticate_passcode', response_model=Resp_Body_Base) @router.post('/authenticate_passcode', response_model=Resp_Body_Base)
async def authenticate_passcode( async def authenticate_passcode(
auth_req: PasscodeAuthRequest, auth_req: PasscodeAuthRequest,
@@ -34,7 +35,6 @@ async def authenticate_passcode(
Verifies a passcode against site.access_code_kv_json. Verifies a passcode against site.access_code_kv_json.
Returns a signed JWT with the site's account context and role flags. Returns a signed JWT with the site's account context and role flags.
""" """
from app.db_sql import get_id_random
log.setLevel(logging.INFO) log.setLevel(logging.INFO)
log.debug(locals()) log.debug(locals())
@@ -62,13 +62,13 @@ async def authenticate_passcode(
if matched_role: if matched_role:
log.info(f"Auth Success: Verified '{matched_role}' passcode for site {site_id}") log.info(f"Auth Success: Verified '{matched_role}' passcode for site {site_id}")
# 4. Resolve Account Context # 4. Resolve Account Context
account_id_random = record.get('account_id_random') account_id_random = record.get('account_id_random')
if not account_id_random: if not account_id_random:
if account_id_int := record.get('account_id'): if account_id_int := record.get('account_id'):
account_id_random = get_id_random(record_id=account_id_int, table_name='account') account_id_random = get_id_random(record_id=account_id_int, table_name='account')
# 5. Mint JWT # 5. Mint JWT
payload = { payload = {
'account_id': account_id_random, 'account_id': account_id_random,
@@ -81,13 +81,13 @@ async def authenticate_passcode(
'role': matched_role 'role': matched_role
}) })
} }
token = sign_jwt( token = sign_jwt(
secret_key=settings.JWT_KEY, secret_key=settings.JWT_KEY,
ttl=3600 * 24, # 24 hour session ttl=3600 * 24, # 24 hour session
**payload **payload
) )
return mk_resp(data={'jwt': token, 'account_id': account_id_random, 'role': matched_role}, response=response) return mk_resp(data={'jwt': token, 'account_id': account_id_random, 'role': matched_role}, response=response)
else: else:
log.warning(f"Auth Failed: Invalid passcode for site {site_id}") log.warning(f"Auth Failed: Invalid passcode for site {site_id}")
@@ -96,73 +96,32 @@ async def authenticate_passcode(
log.warning(f"Auth Failed: Site {site_id} not found.") log.warning(f"Auth Failed: Site {site_id} not found.")
return mk_resp(data=False, status_code=404, response=response, status_message="Site not found.") return mk_resp(data=False, status_code=404, response=response, status_message="Site not found.")
# --- JWT Request ---
# ### BEGIN ### API API ### request_jwt() ###
# This can be used to generate JWTs for various purposes:
# * for end client browser API access
# * for proof of sign in
# * newer/better version of sign in by URL
# Generate (sign) JWT using Aether platform super secret key or x_aether_signing_key sort of secret key if passed. The Aether platform super secret JWT signing key must be used API access token
# If x_aether_api_key is passed then set higher TTL
# If old and valid x_aether_api_jwt_token is passed then decode and decrease TTL by 1
# Updated 2023-03-24
# Verify JWT using the API public key's associated API private key
# API server or trusted app can generate JWTs
# JWT contains:
# * client_token (to request a new short term client token)
# * iat
# * eat
# * account_id
# * client_id
# * order_cart_id
# * person_id
# * user_id
# API server verifies JWTs
# Updated 2021-07-14
@router.get('/request_jwt', response_model=Resp_Body_Base) @router.get('/request_jwt', response_model=Resp_Body_Base)
async def request_jwt( async def request_jwt(
x_aether_signing_key: Optional[str] = Header(None, min_length=22, max_length=22), # The (secret) signing key. Keep safe!!! If passed then use to sign JWT. Otherwise need to get from system/environment. x_aether_signing_key: Optional[str] = Header(None, min_length=22, max_length=22),
x_aether_api_key: Optional[str] = Header(None, min_length=22, max_length=22),
# x_aether_secret_key: Optional[str] = Header(None, min_length=22, max_length=22), # The Aether secret key. Keep safe!!! If passed then can also set TTL x_aether_jwt: Optional[str] = Header(None),
account_id: str = None,
x_aether_api_key: Optional[str] = Header(None, min_length=22, max_length=22), # The client side API key. This should be kept secret by the client. If passed then store with JWT and can set TTL. person_id: str = None,
user_id: str = None,
# x_aether_api_public_key: Optional[str] = Header(None, min_length=22, max_length=22), # Used to look up the API secret if not given json_str: str = None,
b64_str: str = None,
x_aether_jwt: Optional[str] = Header(None), # A JWT that was created and given to client browser or server in the past. It may or may not be valid. If the x_aether_signing_key was not passed, then assume it was signed with the Aether super secret key. max_ttl: int = 300,
max_renew: int = 5,
account_id: str = None, # Handle this different because it is special
json_str: str = None, # This is what should be stored
b64_str: str = None, # This is what should be stored
# I would like payload to be a dict, but then we have to use POST instead of GET...
# Maybe base64 encode and decode?
# session_id: str = None, # End client (web browser)
# client_id: str = None, # End client (web browser)
# person_id: str = None,
# user_id: str = None,
max_ttl: int = 300, # Number of seconds to live. Only use if given the API secret key.
# Seconds: 3600 = 1 hr; 300 = 5 min
max_renew: int = 5, # Decrease count by 1 until 0 if only sent a current API token.
response: Response = Response, response: Response = Response,
): ):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.setLevel(logging.WARNING)
log.debug(locals()) log.debug(locals())
# One of these is required
if account_id or json_str or b64_str: pass if account_id or json_str or b64_str: pass
else: return mk_resp(data=False, status_code=400, response=response) # Bad Request else: return mk_resp(data=False, status_code=400, response=response)
# Possible overrides and checks go here
if x_aether_signing_key: pass if x_aether_signing_key: pass
elif x_aether_api_key: elif x_aether_api_key:
# Override any if for API JWT???
max_ttl = 3600 max_ttl = 3600
max_renew = 5 max_renew = 5
# if not x_aether_secret_key: max_renew = 5 # Override any max_rewnew if no API secret
# api_secret_key = x_aether_secret_key
signing_key = None signing_key = None
if x_aether_signing_key: if x_aether_signing_key:
@@ -171,12 +130,9 @@ async def request_jwt(
signing_key = settings.JWT_KEY signing_key = settings.JWT_KEY
else: else:
log.error('No key found to sign the JWT with!') log.error('No key found to sign the JWT with!')
return mk_resp(data=False, status_code=400, response=response) # Bad Request return mk_resp(data=False, status_code=400, response=response)
# SECURITY PATCH: Prevent public API key from minting privileged tokens # SECURITY PATCH: Prevent public API key from minting privileged tokens
# If we are using the default system key (settings.JWT_KEY) but NO external signing key was provided
# (i.e. access via public API Key), we must NOT allow minting account-level privileges.
# UNLESS we are renewing a valid existing token (handled by x_aether_jwt renewal logic below).
if not x_aether_signing_key and not x_aether_jwt: if not x_aether_signing_key and not x_aether_jwt:
if account_id or person_id or user_id: if account_id or person_id or user_id:
log.warning("Security: Attempt to mint privileged JWT without signing key. Downgrading to Guest.") log.warning("Security: Attempt to mint privileged JWT without signing key. Downgrading to Guest.")
@@ -184,735 +140,111 @@ async def request_jwt(
person_id = None person_id = None
user_id = None user_id = None
payload = {} payload = {
payload['account_id'] = account_id 'account_id': account_id,
payload['json_str'] = json_str 'person_id': person_id,
payload['b64_str'] = b64_str 'user_id': user_id,
'json_str': json_str,
'b64_str': b64_str,
}
token = sign_jwt(secret_key=signing_key, public_key=x_aether_api_key, ttl=max_ttl, max_renew=max_renew, **payload) token = sign_jwt(secret_key=signing_key, public_key=x_aether_api_key, ttl=max_ttl, max_renew=max_renew, **payload)
return mk_resp(data={ 'jwt': token })
response_data = { 'jwt': token }
return mk_resp(data=response_data)
if x_aether_secret_key:
log.debug(f'Contains a value in x_aether_secret_key: {x_aether_secret_key}')
table_name_select = 'api_key'
field_name = 'secret_key'
field_value = api_secret_key
if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass
else:
log.warning('No results when looking up the API secret key')
return mk_resp(data=False, status_code=401, response=response) # Unauthorized
elif x_aether_api_public_key and x_aether_api_token:
table_name_select = 'api_key'
field_name = 'public_key'
field_value = x_aether_api_public_key
if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass
else:
log.warning('No results when looking up the API public key')
return mk_resp(data=False, status_code=401, response=response) # Unauthorized
# Check if the API keys are valid
if api_key_rec_select_result.get('enable', None):
api_key_rec = api_key_rec_select_result
else:
log.warning('API secret key not enabled')
return mk_resp(data=False, status_code=401, response=response) # Unauthorized
current_datetime = datetime.datetime.utcnow() # datetime.datetime.now() Gets server local datetime
if api_key_rec.get('enable_from', None) <= current_datetime and api_key_rec.get('enable_to', None) >= current_datetime:
pass
else:
log.warning('API secret key expired')
return mk_resp(data=False, status_code=401, response=response) # Unauthorized
if api_secret_key := api_key_rec.get('secret_key', None): pass
else:
log.warning('Secret key was not found')
return mk_resp(data=False, status_code=400, response=response) # Bad Request
if api_public_key := api_key_rec.get('public_key', None): pass
else:
log.warning('Public key was not found')
return mk_resp(data=False, status_code=400, response=response) # Bad Request
# Decode the JWT if an API token was sent and the API secret key was sent/found.
if x_aether_api_token and api_public_key and api_secret_key:
if current_token := decode_jwt(secret_key=api_secret_key, token=x_aether_api_token):
if current_token.get('max_renew', 0) > 0: pass
else:
message = 'The JWT sent is out of allowed renewals. Try again with a current JWT or just the API secret key.'
log.warning(message)
return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized
max_renew = current_token.get('max_renew', 0) - 1
if not account_id: account_id = current_token.get('account_id', None)
if not person_id: person_id = current_token.get('person_id', None)
if not user_id: user_id = current_token.get('user_id', None)
else:
message = 'The JWT sent is either expired or otherwise invalid. Try again with a current JWT or just the API secret key.'
log.warning(message)
return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized
payload = {}
payload['account_id'] = account_id
payload['person_id'] = person_id
payload['user_id'] = user_id
token = sign_jwt(secret_key=api_secret_key, public_key=api_public_key, ttl=max_ttl, max_renew=max_renew, **payload)
response_data = { 'jwt': token }
return mk_resp(data=response_data)
# ### END ### API API ### request_jwt() ###
@router.get('/temp_token', response_model=Resp_Body_Base) @router.get('/temp_token', response_model=Resp_Body_Base)
async def get_api_temp_token( async def get_api_temp_token(
x_aether_api_key: Optional[str] = Header(None), x_aether_api_key: Optional[str] = Header(None),
x_aether_api_token: Optional[str] = Header(None),
x_aether_api_token_expire_on: Optional[str] = Header(None),
response: Response = Response, response: Response = Response,
): ):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.setLevel(logging.WARNING)
log.debug(locals())
table_name_select = 'api_key' table_name_select = 'api_key'
field_name = 'secret_key'
field_value = x_aether_api_key
if x_aether_api_key: if x_aether_api_key:
log.debug(f'Contains a value in x_aether_api_key: {x_aether_api_key}') sql_result = sql_select(table_name=table_name_select, field_name='secret_key', field_value=x_aether_api_key)
sql_result = sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value) if sql_result:
else: resp_data = Api_Base(**sql_result).dict(by_alias=True, exclude_unset=False)
return mk_resp(data=False, status_code=400, response=response) # Bad Request return mk_resp(data=resp_data)
return mk_resp(data=False, status_code=404, response=response)
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # --- Jitsi Token ---
if sql_result:
log.debug(type(sql_result))
log.debug(sql_result)
base_name = Api_Base
log.debug(base_name)
resp_data = base_name(**sql_result).dict(by_alias=True, exclude_unset=False)
log.debug(resp_data)
return mk_resp(data=resp_data)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=response)
# Updated 2025-12-02
# It's best practice to import settings from a config file or environment variables
# For this example, we'll hardcode them, but you should use your actual values
# from your .env file
JWT_APP_ID = "my_jitsi_app_id" JWT_APP_ID = "my_jitsi_app_id"
JWT_APP_SECRET = "my_jitsi_app_secret-9876543210" JWT_APP_SECRET = "my_jitsi_app_secret-9876543210"
JITSI_DOMAIN = "jitsi.dgrzone.com" JITSI_DOMAIN = "jitsi.dgrzone.com"
# Define the data model for the incoming request body from the client
class JitsiTokenRequest(BaseModel): class JitsiTokenRequest(BaseModel):
"""Defines the expected request body from your frontend."""
room: str = Field(..., description="The name of the Jitsi room.") room: str = Field(..., description="The name of the Jitsi room.")
name: str = Field(..., description="The display name of the user.") name: str = Field(..., description="The display name of the user.")
email: EmailStr = Field(..., description="The email of the user.") email: EmailStr = Field(..., description="The email of the user.")
is_moderator: bool = Field(..., description="Whether the user should be a moderator.") is_moderator: bool = Field(..., description="Whether the user should be a moderator.")
user: Optional[Dict[str, Union[str, bool]]] = Field(None)
features: Optional[Dict[str, bool]] = Field(None)
settings: Optional[Dict[str, bool]] = Field(None)
config: Optional[Dict] = Field(None)
# Clearly separated override categories
user: Optional[Dict[str, Union[str, bool]]] = Field(None, description="User-specific overrides like name, email, moderator.")
features: Optional[Dict[str, bool]] = Field(None, description="Feature flags like recording, livestreaming.")
settings: Optional[Dict[str, bool]] = Field(None, description="User profile settings like startMuted, reactionsMuted.")
config: Optional[Dict] = Field(None, description="Overrides for config.js properties.")
# A simple endpoint to generate the Jitsi-specific JWT
@router.post("/jitsi_token") @router.post("/jitsi_token")
async def create_jitsi_jwt( async def create_jitsi_jwt(request_data: JitsiTokenRequest = Body(...)):
request_data: JitsiTokenRequest = Body(...), log.setLevel(logging.INFO)
# commons: Common_Route_Params_Min = Depends(common_route_params_min),
):
"""
Generates a Jitsi-specific JWT token for authentication.
The token includes claims to set the user's name, email, and moderator status.
"""
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# log.debug(f"Received Jitsi token request: {request_data.model_dump_json(indent=2)}")
log.debug(f"Received Jitsi token request: {request_data}")
if not request_data.is_moderator: if not request_data.is_moderator:
raise HTTPException( raise HTTPException(status_code=403, detail="JWT generation is only permitted for moderators.")
status_code=403,
detail="JWT generation is only permitted for moderators."
)
try: try:
# Build the payload with the correct structure accepted by Jitsi
# Define the JWT payload with all the required claims for Jitsi.
# This is where we securely set the moderator and user info.
# Even though 'user' is included we are currently ignoring it to prevent client overrides. It is rebuilt below from the main fields.
payload = { payload = {
"aud": JWT_APP_ID, "aud": JWT_APP_ID, "iss": JWT_APP_ID, "sub": JITSI_DOMAIN,
"iss": JWT_APP_ID,
"sub": JITSI_DOMAIN, # Your Jitsi base domain
"room": request_data.room, "room": request_data.room,
"exp": int(time.time()) + 3600, # Token expires in 1 hour "exp": int(time.time()) + 3600,
# 1. Top-level 'config' for config.js overrides
"config": request_data.config or {}, "config": request_data.config or {},
# 2. 'context' for user data, features, and moderator settings
"context": { "context": {
"user": { "user": {
"id": request_data.user['id'], "id": request_data.user['id'] if request_data.user else "guest",
"name": request_data.name, "name": request_data.name,
"email": request_data.email, "email": request_data.email,
# CRITICAL: 'moderator' must be a boolean, not a string
"moderator": request_data.is_moderator, "moderator": request_data.is_moderator,
}, },
# 'features' enables/disables major Jitsi functionalities
"features": request_data.features or {}, "features": request_data.features or {},
# 'settings' controls the moderator's default options in the settings panel
"settings": request_data.settings or {}, "settings": request_data.settings or {},
} }
} }
token = pyjwt.encode(payload, JWT_APP_SECRET, algorithm="HS256")
# Clean up empty objects to keep the final JWT tidy
if not payload["config"]:
del payload["config"]
if not payload["context"]["features"]:
del payload["context"]["features"]
if not payload["context"]["settings"]:
del payload["context"]["settings"]
log.debug(f"Constructed JWT payload: {payload}")
# Sign the JWT with your secret key
# The algorithm must be the same as configured in your Prosody setup (HS256)
token = jwt.encode(payload, JWT_APP_SECRET, algorithm="HS256")
log.info("Jitsi JWT generated successfully.")
log.debug(token)
return {"token": token} return {"token": token}
except Exception as e: except Exception as e:
log.exception("Failed to create JWT") log.exception("Failed to create Jitsi JWT")
raise HTTPException(status_code=500, detail=f"Failed to create JWT: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to create JWT: {str(e)}")
# --- Api_Base CRUD ---
@router.post('', response_model=Resp_Body_Base) @router.post('', response_model=Resp_Body_Base)
async def post_api_obj( async def post_api_obj(obj: Api_Base, x_account_id: str = Header(...)):
obj: Api_Base, return post_obj_template(obj_type='api', data=obj.dict(by_alias=False, exclude_unset=True), return_obj=True)
x_account_id: str = Header(...),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'api'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
result = post_obj_template(
obj_type=obj_type,
data=obj_data_dict,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
@router.patch('/{obj_id}', response_model=Resp_Body_Base) @router.patch('/{obj_id}', response_model=Resp_Body_Base)
async def patch_api_obj( async def patch_api_obj(obj_id: str, obj: Api_Base, x_account_id: str = Header(...)):
obj_id: str, data = obj.dict(by_alias=False, exclude_unset=True)
obj: Api_Base = None, data['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name='api')
x_account_id: Optional[str] = Header(..., ), return patch_obj_template(obj_type='api', data=data, obj_id=obj_id, return_obj=True)
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'api'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type)
obj_data_dict['id_random'] = obj_id
result = patch_obj_template(
obj_type=obj_type,
data=obj_data_dict,
obj_id=obj_id,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
@router.get('/list', response_model=Resp_Body_Base) @router.get('/list', response_model=Resp_Body_Base)
async def get_api_obj_li( async def get_api_obj_li(for_obj_type: Optional[str] = Query(None), for_obj_id: Optional[str] = Query(None), x_account_id: str = Header(...)):
for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50), return get_obj_li_template(obj_type='api', for_obj_type=for_obj_type, for_obj_id=for_obj_id)
for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22),
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'api'
result = get_obj_li_template(
obj_type=obj_type,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
by_alias=True,
exclude_unset=True,
)
return result
@router.get('/{obj_id}', response_model=Resp_Body_Base) @router.get('/{obj_id}', response_model=Resp_Body_Base)
async def get_api_obj( async def get_api_obj(obj_id: str, x_account_id: str = Header(...)):
obj_id: str, return get_obj_template(obj_type='api', obj_id=obj_id)
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'api'
result = get_obj_template(
obj_type=obj_type,
obj_id=obj_id,
by_alias=True,
exclude_unset=True,
)
return result
@router.delete('/{obj_id}', response_model=Resp_Body_Base) @router.delete('/{obj_id}', response_model=Resp_Body_Base)
async def delete_api_obj( async def delete_api_obj(obj_id: str, x_account_id: str = Header(...)):
obj_id: str, return delete_obj_template(obj_type='api', obj_id=obj_id)
x_account_id: str = Header(...),
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'api'
result = delete_obj_template(
obj_type=obj_type,
obj_id=obj_id,
)
return result
@router.get('/get_id/{object_type}/{object_id_random}', response_model=Resp_Body_Base) @router.get('/get_id/{object_type}/{object_id_random}', response_model=Resp_Body_Base)
async def get_api_object_id( async def get_api_object_id(object_type: str, object_id_random: str):
object_type: str,
object_id_random: str,
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if object_id := redis_lookup_id_random(record_id_random=object_id_random, table_name=object_type): if object_id := redis_lookup_id_random(record_id_random=object_id_random, table_name=object_type):
return mk_resp(data={ 'object_id': object_id}, status_code=400) return mk_resp(data={ 'object_id': object_id})
else: return mk_resp(data=None, status_code=400) return mk_resp(data=None, status_code=404)
# ### BEGIN ### API API ### sql_test() ###
@router.get('/sql_test', tags=['Testing']) @router.get('/sql_test', tags=['Testing'])
async def sql_test(response: Response = Response): async def sql_test(response: Response = Response):
log.setLevel(logging.DEBUG)
log.debug(locals())
sql = text("SELECT NOW() as current_time, VERSION() as version") sql = text("SELECT NOW() as current_time, VERSION() as version")
try: try:
result_proxy = db.execute(sql) result = db.execute(sql).fetchone()
result = result_proxy.fetchone() return mk_resp(data={"current_time": str(result[0]), "version": result[1]})
data = {
"current_time": str(result[0]),
"version": result[1]
}
return mk_resp(data=data, response=response)
except Exception as e: except Exception as e:
log.error(f'SQL Test failed: {str(e)}') return mk_resp(data=False, status_code=500, details=str(e), response=response)
return mk_resp(data=False, status_code=500, details=str(e), response=response)
# ### END ### API API ### sql_test() ###
if x_aether_secret_key:
log.debug(f'Contains a value in x_aether_secret_key: {x_aether_secret_key}')
table_name_select = 'api_key'
field_name = 'secret_key'
field_value = api_secret_key
if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass
else:
log.warning('No results when looking up the API secret key')
return mk_resp(data=False, status_code=401, response=response) # Unauthorized
elif x_aether_api_public_key and x_aether_api_token:
table_name_select = 'api_key'
field_name = 'public_key'
field_value = x_aether_api_public_key
if api_key_rec_select_result := sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value): pass
else:
log.warning('No results when looking up the API public key')
return mk_resp(data=False, status_code=401, response=response) # Unauthorized
# Check if the API keys are valid
if api_key_rec_select_result.get('enable', None):
api_key_rec = api_key_rec_select_result
else:
log.warning('API secret key not enabled')
return mk_resp(data=False, status_code=401, response=response) # Unauthorized
current_datetime = datetime.datetime.utcnow() # datetime.datetime.now() Gets server local datetime
if api_key_rec.get('enable_from', None) <= current_datetime and api_key_rec.get('enable_to', None) >= current_datetime:
pass
else:
log.warning('API secret key expired')
return mk_resp(data=False, status_code=401, response=response) # Unauthorized
if api_secret_key := api_key_rec.get('secret_key', None): pass
else:
log.warning('Secret key was not found')
return mk_resp(data=False, status_code=400, response=response) # Bad Request
if api_public_key := api_key_rec.get('public_key', None): pass
else:
log.warning('Public key was not found')
return mk_resp(data=False, status_code=400, response=response) # Bad Request
# Decode the JWT if an API token was sent and the API secret key was sent/found.
if x_aether_api_token and api_public_key and api_secret_key:
if current_token := decode_jwt(secret_key=api_secret_key, token=x_aether_api_token):
if current_token.get('max_renew', 0) > 0: pass
else:
message = 'The JWT sent is out of allowed renewals. Try again with a current JWT or just the API secret key.'
log.warning(message)
return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized
max_renew = current_token.get('max_renew', 0) - 1
if not account_id: account_id = current_token.get('account_id', None)
if not person_id: person_id = current_token.get('person_id', None)
if not user_id: user_id = current_token.get('user_id', None)
else:
message = 'The JWT sent is either expired or otherwise invalid. Try again with a current JWT or just the API secret key.'
log.warning(message)
return mk_resp(data=False, status_code=401, status_message=message) # Unauthorized
payload = {}
payload['account_id'] = account_id
payload['person_id'] = person_id
payload['user_id'] = user_id
token = sign_jwt(secret_key=api_secret_key, public_key=api_public_key, ttl=max_ttl, max_renew=max_renew, **payload)
response_data = { 'jwt': token }
return mk_resp(data=response_data)
# ### END ### API API ### request_jwt() ###
@router.get('/temp_token', response_model=Resp_Body_Base)
async def get_api_temp_token(
x_aether_api_key: Optional[str] = Header(None),
x_aether_api_token: Optional[str] = Header(None),
x_aether_api_token_expire_on: Optional[str] = Header(None),
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
table_name_select = 'api_key'
field_name = 'secret_key'
field_value = x_aether_api_key
if x_aether_api_key:
log.debug(f'Contains a value in x_aether_api_key: {x_aether_api_key}')
sql_result = sql_select(table_name=table_name_select, field_name=field_name, field_value=field_value)
else:
return mk_resp(data=False, status_code=400, response=response) # Bad Request
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if sql_result:
log.debug(type(sql_result))
log.debug(sql_result)
base_name = Api_Base
log.debug(base_name)
resp_data = base_name(**sql_result).dict(by_alias=True, exclude_unset=False)
log.debug(resp_data)
return mk_resp(data=resp_data)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=response)
# Updated 2025-12-02
# It's best practice to import settings from a config file or environment variables
# For this example, we'll hardcode them, but you should use your actual values
# from your .env file
JWT_APP_ID = "my_jitsi_app_id"
JWT_APP_SECRET = "my_jitsi_app_secret-9876543210"
JITSI_DOMAIN = "jitsi.dgrzone.com"
# Define the data model for the incoming request body from the client
class JitsiTokenRequest(BaseModel):
"""Defines the expected request body from your frontend."""
room: str = Field(..., description="The name of the Jitsi room.")
name: str = Field(..., description="The display name of the user.")
email: EmailStr = Field(..., description="The email of the user.")
is_moderator: bool = Field(..., description="Whether the user should be a moderator.")
# Clearly separated override categories
user: Optional[Dict[str, Union[str, bool]]] = Field(None, description="User-specific overrides like name, email, moderator.")
features: Optional[Dict[str, bool]] = Field(None, description="Feature flags like recording, livestreaming.")
settings: Optional[Dict[str, bool]] = Field(None, description="User profile settings like startMuted, reactionsMuted.")
config: Optional[Dict] = Field(None, description="Overrides for config.js properties.")
# A simple endpoint to generate the Jitsi-specific JWT
@router.post("/jitsi_token")
async def create_jitsi_jwt(
request_data: JitsiTokenRequest = Body(...),
# commons: Common_Route_Params_Min = Depends(common_route_params_min),
):
"""
Generates a Jitsi-specific JWT token for authentication.
The token includes claims to set the user's name, email, and moderator status.
"""
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# log.debug(f"Received Jitsi token request: {request_data.model_dump_json(indent=2)}")
log.debug(f"Received Jitsi token request: {request_data}")
if not request_data.is_moderator:
raise HTTPException(
status_code=403,
detail="JWT generation is only permitted for moderators."
)
try:
# Build the payload with the correct structure accepted by Jitsi
# Define the JWT payload with all the required claims for Jitsi.
# This is where we securely set the moderator and user info.
# Even though 'user' is included we are currently ignoring it to prevent client overrides. It is rebuilt below from the main fields.
payload = {
"aud": JWT_APP_ID,
"iss": JWT_APP_ID,
"sub": JITSI_DOMAIN, # Your Jitsi base domain
"room": request_data.room,
"exp": int(time.time()) + 3600, # Token expires in 1 hour
# 1. Top-level 'config' for config.js overrides
"config": request_data.config or {},
# 2. 'context' for user data, features, and moderator settings
"context": {
"user": {
"id": request_data.user['id'],
"name": request_data.name,
"email": request_data.email,
# CRITICAL: 'moderator' must be a boolean, not a string
"moderator": request_data.is_moderator,
},
# 'features' enables/disables major Jitsi functionalities
"features": request_data.features or {},
# 'settings' controls the moderator's default options in the settings panel
"settings": request_data.settings or {},
}
}
# Clean up empty objects to keep the final JWT tidy
if not payload["config"]:
del payload["config"]
if not payload["context"]["features"]:
del payload["context"]["features"]
if not payload["context"]["settings"]:
del payload["context"]["settings"]
log.debug(f"Constructed JWT payload: {payload}")
# Sign the JWT with your secret key
# The algorithm must be the same as configured in your Prosody setup (HS256)
token = jwt.encode(payload, JWT_APP_SECRET, algorithm="HS256")
log.info("Jitsi JWT generated successfully.")
log.debug(token)
return {"token": token}
except Exception as e:
log.exception("Failed to create JWT")
raise HTTPException(status_code=500, detail=f"Failed to create JWT: {str(e)}")
@router.post('', response_model=Resp_Body_Base)
async def post_api_obj(
obj: Api_Base,
x_account_id: str = Header(...),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'api'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
result = post_obj_template(
obj_type=obj_type,
data=obj_data_dict,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
@router.patch('/{obj_id}', response_model=Resp_Body_Base)
async def patch_api_obj(
obj_id: str,
obj: Api_Base = None,
x_account_id: Optional[str] = Header(..., ),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'api'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type)
obj_data_dict['id_random'] = obj_id
result = patch_obj_template(
obj_type=obj_type,
data=obj_data_dict,
obj_id=obj_id,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
@router.get('/list', response_model=Resp_Body_Base)
async def get_api_obj_li(
for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50),
for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22),
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'api'
result = get_obj_li_template(
obj_type=obj_type,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
by_alias=True,
exclude_unset=True,
)
return result
@router.get('/{obj_id}', response_model=Resp_Body_Base)
async def get_api_obj(
obj_id: str,
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'api'
result = get_obj_template(
obj_type=obj_type,
obj_id=obj_id,
by_alias=True,
exclude_unset=True,
)
return result
@router.delete('/{obj_id}', response_model=Resp_Body_Base)
async def delete_api_obj(
obj_id: str,
x_account_id: str = Header(...),
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'api'
result = delete_obj_template(
obj_type=obj_type,
obj_id=obj_id,
)
return result
@router.get('/get_id/{object_type}/{object_id_random}', response_model=Resp_Body_Base)
async def get_api_object_id(
object_type: str,
object_id_random: str,
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if object_id := redis_lookup_id_random(record_id_random=object_id_random, table_name=object_type):
return mk_resp(data={ 'object_id': object_id}, status_code=400)
else: return mk_resp(data=None, status_code=400)
# ### BEGIN ### API API ### sql_test() ###
@router.get('/sql_test', tags=['Testing'])
async def sql_test(response: Response = Response):
log.setLevel(logging.DEBUG)
log.debug(locals())
sql = text("SELECT NOW() as current_time, VERSION() as version")
try:
result_proxy = db.execute(sql)
result = result_proxy.fetchone()
data = {
"current_time": str(result[0]),
"version": result[1]
}
return mk_resp(data=data, response=response)
except Exception as e:
log.error(f'SQL Test failed: {str(e)}')
return mk_resp(data=False, status_code=500, details=str(e), response=response)
# ### END ### API API ### sql_test() ###

View File

@@ -0,0 +1,88 @@
import requests
import json
import time
import concurrent.futures
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
ACCOUNT_ID = "_XY7DXtc9MY"
JOURNAL_ID = "DCAV-06-35-85"
EVENT_ID = "pjrcghqwert"
HEADERS = {
"X-Aether-API-Key": API_KEY,
"X-Account-ID": ACCOUNT_ID,
"Content-Type": "application/json"
}
def hit_endpoint(name, method, url, body=None, params=None):
try:
start = time.time()
if method == "GET":
resp = requests.get(url, headers=HEADERS, params=params, timeout=15)
else:
resp = requests.post(url, headers=HEADERS, json=body, params=params, timeout=15)
duration = time.time() - start
status = resp.status_code
print(f"[{name}] Status: {status} | Duration: {duration:.3f}s")
if status != 200:
print(f" ❌ ERROR: {resp.text[:200]}")
return False
return True
except Exception as e:
print(f"[{name}] ❌ EXCEPTION: {e}")
return False
def test_batch():
success_count = 0
total = 0
# 1. Journal Entries (Nested GET)
url = f"{BASE_URL}/journal/{JOURNAL_ID}/journal_entry/"
if hit_endpoint("Journal Entries (Nested GET)", "GET", url): success_count += 1
total += 1
# 2. Event Badges (Search POST)
url = f"{BASE_URL}/event_badge/search"
query = {"q": "%"}
if hit_endpoint("Event Badges (Search POST)", "POST", url, body=query): success_count += 1
total += 1
# 3. Event Sessions (Search POST)
url = f"{BASE_URL}/event_session/search"
query = {"q": "%"}
if hit_endpoint("Event Sessions (Search POST)", "POST", url, body=query): success_count += 1
total += 1
# 4. Event Sessions (List GET with parent)
url = f"{BASE_URL}/event_session/"
params = {"for_obj_type": "event", "for_obj_id": EVENT_ID}
if hit_endpoint("Event Sessions (List GET)", "GET", url, params=params): success_count += 1
total += 1
return success_count, total
if __name__ == "__main__":
print(f"Reproduction script started against {BASE_URL}")
print(f"Using Account: {ACCOUNT_ID}\n")
grand_success = 0
grand_total = 0
iterations = 5
# Running 20 requests total (5 batches of 4)
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(test_batch) for _ in range(iterations)]
for future in concurrent.futures.as_completed(futures):
s, t = future.result()
grand_success += s
grand_total += t
print(f"\nFinal Results: {grand_success}/{grand_total} requests succeeded.")
if grand_success == grand_total:
print("🎉 SUCCESS! No more timeouts or 403s.")
else:
print("❌ Reproduced some failures!")