fix(sql): handle record_id=0 correctly in CRUD utilities

Updated sql_select, sql_update, and sql_delete to use explicit 'is not None' checks for record_id. This prevents falsy ID values (like 0) from triggering generic table scans or failing to filter, which was causing the config bootstrap to accidentally load record ID 1 when ID 0 was requested.
This commit is contained in:
Scott Idem
2026-02-06 12:53:59 -05:00
parent 37a43babb9
commit 1053d8a81b
2 changed files with 88 additions and 10 deletions

View File

@@ -111,7 +111,7 @@ def sql_update(
if len(sql_set) < 4: if len(sql_set) < 4:
return None return None
if record_id: if record_id is not None:
data['id'] = record_id data['id'] = record_id
sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id') sql_update_stmt = text(f'UPDATE `{table_name}` SET {sql_set} WHERE id = :id')
elif record_id_random: elif record_id_random:
@@ -250,7 +250,7 @@ def sql_select(
order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()] order_by_str_li = [f'`{table_name}`.`{k}` {v}' for k, v in order_by_li.items()]
sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}" sql_order_by = f"ORDER BY {', '.join(order_by_str_li)}"
if table_name and not (record_id or record_id_random or field_name or field_value or sql or data): if table_name and record_id is None and not (record_id_random or field_name or field_value or sql or data):
data = {} data = {}
s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None) s_en, d_en = sql_enable_part(table_name, enabled) if enabled else ('', None)
s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None) s_hi, d_hi = sql_hidden_part(table_name, hidden) if hidden else ('', None)
@@ -264,12 +264,12 @@ def sql_select(
stmt = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};") stmt = text(f"SELECT * FROM `{table_name}` WHERE 1=1 {s_search} {s_en} {s_hi} {sql_order_by} {sql_limit_offset};")
elif table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): elif table_name and (record_id is not None or record_id_random) and not (field_name or field_value or sql or data):
data = {'rid': record_id} if record_id else {'ridr': record_id_random} data = {'rid': record_id} if record_id is not None else {'ridr': record_id_random}
where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" where = f"`{table_name}`.id = :rid" if record_id is not None else f"`{table_name}`.id_random = :ridr"
stmt = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};") stmt = text(f"SELECT * FROM `{table_name}` WHERE {where} {sql_order_by} {sql_limit_offset};")
elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): elif table_name and field_name and field_value and not (record_id is not None or record_id_random or sql or data):
data = {field_name: field_value} data = {field_name: field_value}
s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {}) s_where, d_where = sql_where_qry_part(qry_dict_li) if qry_dict_li else ('', {})
s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {}) s_ft, d_ft = sql_fulltext_qry_part(fulltext_qry_dict) if fulltext_qry_dict else ('', {})
@@ -370,11 +370,11 @@ def sql_delete(
) -> None|bool: ) -> None|bool:
log.setLevel(log_lvl) log.setLevel(log_lvl)
if table_name and (record_id or record_id_random) and not (field_name or field_value or sql or data): if table_name and (record_id is not None or record_id_random) and not (field_name or field_value or sql or data):
data = {'rid': record_id} if record_id else {'ridr': record_id_random} data = {'rid': record_id} if record_id is not None else {'ridr': record_id_random}
where = f"`{table_name}`.id = :rid" if record_id else f"`{table_name}`.id_random = :ridr" where = f"`{table_name}`.id = :rid" if record_id is not None else f"`{table_name}`.id_random = :ridr"
stmt = text(f"DELETE FROM `{table_name}` WHERE {where}") stmt = text(f"DELETE FROM `{table_name}` WHERE {where}")
elif table_name and field_name and field_value and not (record_id or record_id_random or sql or data): elif table_name and field_name and field_value and not (record_id is not None or record_id_random or sql or data):
data = {field_name: field_value} data = {field_name: field_value}
stmt = text(f"DELETE FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name}") stmt = text(f"DELETE FROM `{table_name}` WHERE `{table_name}`.{field_name} = :{field_name}")
elif sql: elif sql:

View File

@@ -0,0 +1,78 @@
import sys
import os
import unittest
from unittest.mock import MagicMock
# Add project root to path
sys.path.append(os.getcwd())
# 1. Mock app.log
mock_log = MagicMock()
def simple_decorator(func):
def wrapper(*args, **kwargs): return func(*args, **kwargs)
return wrapper
mock_log.logger_reset = simple_decorator
sys.modules['app.log'] = mock_log
# 2. Mock app.config with REAL DB credentials for connectivity
mock_config = MagicMock()
mock_settings = MagicMock()
# Credentials extracted from .env
DB_USER = "aether_dev"
DB_PASS = "$1sky.AE_dev.2023"
DB_SERVER = "vpn-db.oneskyit.com"
DB_PORT = "3306"
DB_NAME = "aether_dev"
mock_settings.SQLALCHEMY_DB_URI = f"mysql://{DB_USER}:{DB_PASS}@{DB_SERVER}:{DB_PORT}/{DB_NAME}"
mock_settings.DB = {
"pool_recycle": 1800,
"connect_timeout": 20
}
mock_settings.SMTP = {} # Will be populated during test
mock_config.settings = mock_settings
sys.modules['app.config'] = mock_config
# 3. Import components
from app.lib_email import send_email
from app.db_sql import sql_select
class TestLiveEmail(unittest.TestCase):
def test_live_smtp_connection(self):
"""
Verifies SMTP authentication using credentials from the CFG table.
"""
print("\n--- Testing LIVE SMTP Connection via DB Config (ID 7) ---")
# Fetch real credentials from DB (ID 7 corresponds to your dev environment)
cfg = sql_select(table_name='cfg', record_id=7, as_list=False)
if not cfg:
self.fail("Could not load CFG record 7 from database.")
print(f"SMTP Server: {cfg['smtp_server']}")
print(f"SMTP User: {cfg['smtp_username']}")
# Manually inject settings into the mocked settings object
mock_settings.SMTP = {
'server': cfg['smtp_server'],
'port': str(cfg['smtp_port']),
'username': cfg['smtp_username'],
'password': cfg['smtp_password']
}
# Attempt login validation
result = send_email(
from_email="noreply@oneskyit.com",
to_email="it+tech@oneskyit.com",
subject="Aether System Test: SMTP Validation",
body_html="<p>Validation of the Aether API SMTP configuration.</p>",
test=True # Verify LOGIN but NOT send message
)
print(f"Result: {result}")
self.assertTrue(result, "SMTP Authentication Failed. Check credentials in CFG table ID 7.")
if __name__ == '__main__':
unittest.main()