fix(P2): add OperationalError retry to sql_insert, sql_select, sql_insert_or_update
All three were missing the transient-connection retry that sql_update and run_sql_select already had. On OperationalError (stale/dropped connection), each now retries once with a fresh engine.connect() without disposing the pool. IntegrityError (duplicate key, FK violation, NOT NULL) continues to return None without retrying — the same data would fail again and None signals a data conflict to callers, distinct from False (error) or an int (success). sql_insert_or_update retry is safe because ON DUPLICATE KEY UPDATE is idempotent. sql_insert retry is safe because OperationalError means MariaDB rolled back. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -63,11 +63,29 @@ def sql_insert(
|
||||
return result_insert.lastrowid
|
||||
return False
|
||||
except IntegrityError as e:
|
||||
# Data constraint violation (duplicate key, FK mismatch, NOT NULL) — do NOT retry;
|
||||
# the same data would fail again. Return None so callers can distinguish from errors.
|
||||
if trans: trans.rollback()
|
||||
log.error('Integrity error (likely duplicate). Returning None')
|
||||
log.debug(e)
|
||||
set_last_sql_error(e)
|
||||
return None
|
||||
except OperationalError:
|
||||
# Transient connection failure. The broken connection rolls back on MariaDB's side,
|
||||
# so retrying with a fresh connection is safe.
|
||||
if trans: trans.rollback()
|
||||
log.warning('Operational error in sql_insert. Retrying once with fresh connection...')
|
||||
try:
|
||||
with lib_sql_core.engine.connect() as conn:
|
||||
trans = conn.begin()
|
||||
result_insert = conn.execute(sql_insert_stmt, data)
|
||||
trans.commit()
|
||||
if result_insert.rowcount == 1 and result_insert.lastrowid > 0:
|
||||
return result_insert.lastrowid
|
||||
return False
|
||||
except Exception as e:
|
||||
set_last_sql_error(e)
|
||||
return False
|
||||
except Exception as e:
|
||||
if trans: trans.rollback()
|
||||
log.error('Unknown exception in sql_insert. Returning False')
|
||||
@@ -198,6 +216,19 @@ def sql_insert_or_update(
|
||||
res = conn.execute(stmt, data)
|
||||
trans.commit()
|
||||
return res.lastrowid if res.lastrowid > 0 else True
|
||||
except OperationalError:
|
||||
# ON DUPLICATE KEY UPDATE is idempotent — safe to retry.
|
||||
if trans: trans.rollback()
|
||||
log.warning('Operational error in sql_insert_or_update. Retrying once...')
|
||||
try:
|
||||
with lib_sql_core.engine.connect() as conn:
|
||||
trans = conn.begin()
|
||||
res = conn.execute(stmt, data)
|
||||
trans.commit()
|
||||
return res.lastrowid if res.lastrowid > 0 else True
|
||||
except Exception as e:
|
||||
set_last_sql_error(e)
|
||||
return False
|
||||
except Exception as e:
|
||||
if trans: trans.rollback()
|
||||
log.exception(e)
|
||||
@@ -308,6 +339,21 @@ def sql_select(
|
||||
return [] if as_list else None
|
||||
|
||||
rows = result.all()
|
||||
except OperationalError:
|
||||
# Transient connection failure — reads are always safe to retry.
|
||||
log.error('Operational error in sql_select. Retrying once with fresh connection...')
|
||||
try:
|
||||
with lib_sql_core.engine.connect() as conn:
|
||||
result = conn.execute(stmt, data)
|
||||
if not result:
|
||||
return [] if as_list else None
|
||||
if hasattr(result, 'returns_rows') and not result.returns_rows:
|
||||
return [] if as_list else None
|
||||
rows = result.all()
|
||||
except Exception as e:
|
||||
log.error(f"SQL Fetch Error on retry: {e}")
|
||||
set_last_sql_error(e)
|
||||
return False
|
||||
except Exception as e:
|
||||
log.error(f"SQL Fetch Error: {e}")
|
||||
set_last_sql_error(e)
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
|
||||
- [x] **[P1] Remove zombie `db_connection.py` import** — `app/routers/api.py` imports `db` from `app/db_connection.py`, creating a parasitic second SQLAlchemy engine at startup that is never updated by `reconnect_db()` after bootstrap. The imported `db` is only used in a commented-out line (`api.py:268`). Fix: remove the import; delete or archive `db_connection.py`.
|
||||
- [x] **[P1] Fix retry mechanism in `sql_update` / `run_sql_select`** — On `OperationalError`, both call `sql_connect()` → `reconnect_db()` which calls `engine.dispose()`, nuking the entire connection pool mid-flight. Under concurrent requests this kills other in-flight connections. Fix: remove the `sql_connect()` retry call; SQLAlchemy's `pool_pre_ping=True` already handles stale connections — just open a fresh `engine.connect()` for the retry without disposing the pool.
|
||||
- [ ] **[P2] Add retry logic to `sql_insert` and `sql_select`** — Both are missing the `OperationalError` retry that `sql_update` and `run_sql_select` have. A DB blip during an INSERT (scan record, badge log, etc.) fails silently and returns `False` with no recovery attempt.
|
||||
- [x] **[P2] Add retry logic to `sql_insert` and `sql_select`** — Added `OperationalError` retry (single fresh connection attempt) to `sql_insert`, `sql_select`, and `sql_insert_or_update`. `IntegrityError` (duplicate key, FK violation) correctly bypasses retry and returns `None` — retrying the same data would fail again.
|
||||
- [x] **[P3] Guard `db = engine.connect()` in `lib_sql_core.py` with try/except** — Wrapped in try/except; sets `db = None` on failure so Docker startup race no longer crashes the worker.
|
||||
- [ ] **[P3 full]** Migrate `lib_schema_v3.py:39` and `lib_api_crud_v3.py:166` off the global `db` to `engine.connect()` context managers, then remove the global `db` entirely.
|
||||
- [x] **[P4] Expose `pool_size` / `max_overflow` as env vars** — `create_ae_engine()` calls `settings.DB.get('pool_size', 10)` but `settings.DB` property doesn't include those keys, so they're always hardcoded 10/20. Add `AE_DB_POOL_SIZE` / `AE_DB_POOL_MAX_OVERFLOW` to `config.py`.
|
||||
|
||||
Reference in New Issue
Block a user