ROW_NUMBER() was partitioning by `group`, collapsing all 12 US/* timezones (which share group="United States") down to a single record. Partitioning by `name` correctly deduplicates by timezone identity while still preserving the object > account > global override hierarchy. Priority-only list now returns the expected 72 entries. Adds a regression test asserting all 12 US/* timezones are present in the full list. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
97 lines
2.9 KiB
Python
97 lines
2.9 KiB
Python
import logging
|
|
from typing import List, Optional
|
|
from sqlalchemy import text
|
|
from app.lib_sql_core import engine
|
|
from app.lib_general_v3 import AccountContext
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
def get_lookup_list_v3(
|
|
lu_type: str,
|
|
account_ctx: AccountContext,
|
|
for_type: Optional[str] = None,
|
|
for_id: Optional[int] = None,
|
|
include_disabled: bool = False,
|
|
whitelist: Optional[List[str]] = None,
|
|
only_priority: bool = False
|
|
) -> List[dict]:
|
|
"""
|
|
Retrieves a ranked, deduplicated list of lookup records.
|
|
Priority: Object Override > Account Override > Global Default.
|
|
Supports an optional whitelist and priority filtering.
|
|
"""
|
|
table_name = f"v_lu_v3_{lu_type}"
|
|
|
|
# We use ROW_NUMBER() to handle the hierarchy
|
|
|
|
sql = f"""
|
|
SELECT * FROM (
|
|
SELECT *,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY `name`
|
|
ORDER BY
|
|
(for_type = :for_type AND for_id = :for_id) DESC,
|
|
(account_id = :account_id) DESC,
|
|
created_on DESC
|
|
) as rank_priority
|
|
FROM `{table_name}`
|
|
WHERE ((for_type = :for_type AND for_id = :for_id)
|
|
OR account_id = :account_id
|
|
OR account_id IS NULL)
|
|
"""
|
|
|
|
if whitelist:
|
|
sql += " AND `group` IN :whitelist"
|
|
|
|
sql += f"""
|
|
) AS ranked
|
|
WHERE rank_priority = 1
|
|
"""
|
|
|
|
if not include_disabled:
|
|
sql += " AND enable = 1"
|
|
|
|
if only_priority:
|
|
sql += " AND priority = 1"
|
|
|
|
sql += " ORDER BY COALESCE(priority, 0) DESC, COALESCE(sort, 0) DESC, name ASC"
|
|
|
|
params = {
|
|
"account_id": account_ctx.account_id,
|
|
"for_type": for_type,
|
|
"for_id": for_id,
|
|
"whitelist": tuple(whitelist) if whitelist else None
|
|
}
|
|
|
|
try:
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text(sql), params)
|
|
return [dict(row._mapping) for row in result]
|
|
except Exception as e:
|
|
log.error(f"Error in get_lookup_list_v3: {e}")
|
|
return []
|
|
|
|
def resolve_lookup_v3(
|
|
lu_type: str,
|
|
query: str,
|
|
account_ctx: AccountContext,
|
|
identity_fields: List[str]
|
|
) -> Optional[dict]:
|
|
"""
|
|
Resolves a query string to a single lookup record by scanning multiple identity fields.
|
|
Returns the highest-priority match.
|
|
"""
|
|
# Simple implementation: get the full ranked list and find first match in identity fields
|
|
# For performance with large tables (like timezones), we might want a specific SQL query
|
|
full_list = get_lookup_list_v3(lu_type, account_ctx)
|
|
|
|
query_clean = query.strip().lower()
|
|
|
|
for item in full_list:
|
|
for field in identity_fields:
|
|
val = item.get(field)
|
|
if val and str(val).lower() == query_clean:
|
|
return item
|
|
|
|
return None
|