- New router: app/routers/api_v3_actions_event_exhibit.py
- GET /v3/action/event_exhibit/{exhibit_id}/tracking_export
- Full V3 auth (x-aether-api-key + account context)
- Multi-tenant ownership check via check_account_access
- Permission gate: leads_api_access flag OR manager-level access
- Returns CSV or XLSX file attachment (return_file=false for JSON)
- Flattens responses_json custom Q&A columns; strips HTML from exhibitor_notes
- Exports all records regardless of hidden/enabled state
- Registered in registry.py under prefix /v3/action/event_exhibit
- New E2E test: tests/e2e/test_e2e_v3_action_event_exhibit_tracking_export.py
- 7/7 tests passing against dev-api.oneskyit.com
- Docs: GUIDE__AE_API_V3_for_Frontend.md — new Section 7 covering endpoint
usage, columns, leads_api_access dual-purpose (3rd-party API + UI export gate)
- Docs: tests/README.md — added test to table and when-to-run matrix
233 lines
8.8 KiB
Python
233 lines
8.8 KiB
Python
import datetime
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import re
|
|
from typing import Optional
|
|
|
|
import pandas
|
|
from fastapi import APIRouter, Depends, HTTPException, Path, Query, status
|
|
from fastapi.responses import FileResponse
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
from app.config import settings
|
|
from app.db_sql import redis_lookup_id_random, sql_select
|
|
from app.lib_api_crud_v3 import check_account_access
|
|
from app.lib_export import create_export_file, return_full_tmp_path
|
|
from app.lib_general_v3 import AccountContext, get_account_context
|
|
from app.methods.event_exhibit_tracking_methods import (
|
|
get_event_exhibit_tracking_rec_list,
|
|
load_event_exhibit_tracking_obj,
|
|
)
|
|
from app.models.response_models import mk_resp
|
|
|
|
"""
|
|
Aether API V3 - Event Exhibit Action Router
|
|
---------------------------------------------
|
|
Handles specialized actions for the Event Exhibit module, such as
|
|
exporting tracking (lead) data for exhibitors.
|
|
"""
|
|
|
|
router = APIRouter()
|
|
|
|
# --- Helpers ---
|
|
|
|
_HTML_TAG_RE = re.compile(r'<[^>]+>')
|
|
|
|
|
|
def _strip_html(text: Optional[str]) -> Optional[str]:
|
|
if not text:
|
|
return text
|
|
return _HTML_TAG_RE.sub('', text)
|
|
|
|
|
|
def _flatten_responses(responses_json: Optional[dict]) -> dict:
|
|
"""
|
|
Flatten responses_json into key→value pairs for CSV/Excel export.
|
|
New format: { question_code: { response: <value> } } → value = inner['response']
|
|
Legacy format: { label: <scalar> } → value = scalar
|
|
"""
|
|
if not responses_json:
|
|
return {}
|
|
flat = {}
|
|
for key, value in responses_json.items():
|
|
if isinstance(value, dict):
|
|
flat[key] = value.get('response')
|
|
else:
|
|
flat[key] = value
|
|
return flat
|
|
|
|
|
|
# --- Routes ---
|
|
|
|
@router.get('/{exhibit_id}/tracking_export')
|
|
async def export_exhibit_tracking(
|
|
exhibit_id: str = Path(..., min_length=11, max_length=22),
|
|
file_type: str = Query('CSV', regex=r'^(CSV|XLSX)$'),
|
|
return_file: bool = Query(True),
|
|
account: AccountContext = Depends(get_account_context),
|
|
):
|
|
"""
|
|
V3 Action: Export all tracking (lead capture) records for an exhibit.
|
|
|
|
Auth: Requires `leads_api_access == True` on the exhibit OR manager-level account access.
|
|
Returns a CSV or XLSX file attachment.
|
|
"""
|
|
# 1. Resolve random ID → internal integer
|
|
exhibit_int_id = redis_lookup_id_random(record_id_random=exhibit_id, table_name='event_exhibit')
|
|
if not exhibit_int_id:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Exhibit not found.')
|
|
|
|
# 2. Load exhibit record for ownership + permission checks
|
|
exhibit_rec = sql_select(table_name='v_event_exhibit', record_id=exhibit_int_id)
|
|
if not exhibit_rec:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Exhibit not found.')
|
|
|
|
# 3. Multi-tenant ownership check
|
|
if not check_account_access(exhibit_rec, account, 'event_exhibit'):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail='Access denied: this exhibit belongs to a different account.',
|
|
)
|
|
|
|
# 4. Permission: leads_api_access flag OR manager-level access
|
|
if not exhibit_rec.get('leads_api_access') and not account.manager:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail='Access denied: leads API access is not enabled for this exhibit.',
|
|
)
|
|
|
|
# 5. Fetch all tracking records — no hidden/enabled filter, full export
|
|
tracking_rec_list = get_event_exhibit_tracking_rec_list(
|
|
event_exhibit_id=exhibit_int_id,
|
|
hidden='all',
|
|
enabled='all',
|
|
limit=1500,
|
|
)
|
|
if tracking_rec_list is False:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail='Failed to retrieve tracking records.',
|
|
)
|
|
|
|
# 6. Build export rows
|
|
data_rows = []
|
|
response_keys: list = [] # ordered unique custom-question column names
|
|
|
|
for rec in (tracking_rec_list or []):
|
|
tracking_obj = load_event_exhibit_tracking_obj(
|
|
event_exhibit_tracking_id=rec.get('event_exhibit_tracking_id'),
|
|
)
|
|
if not tracking_obj:
|
|
continue
|
|
|
|
# Apply exhibitor-side field overrides (override values replace badge defaults)
|
|
if tracking_obj.event_badge_full_name_override:
|
|
tracking_obj.event_badge_full_name = tracking_obj.event_badge_full_name_override
|
|
if tracking_obj.event_badge_professional_title_override:
|
|
tracking_obj.event_badge_professional_title = tracking_obj.event_badge_professional_title_override
|
|
if tracking_obj.event_badge_affiliations_override:
|
|
tracking_obj.event_badge_affiliations = tracking_obj.event_badge_affiliations_override
|
|
if tracking_obj.event_badge_email_override:
|
|
tracking_obj.event_badge_email = tracking_obj.event_badge_email_override
|
|
if tracking_obj.event_badge_location_override:
|
|
tracking_obj.event_badge_location = tracking_obj.event_badge_location_override
|
|
|
|
# Flatten custom Q&A responses and collect column keys (order-preserving dedup)
|
|
responses = _flatten_responses(tracking_obj.responses_json)
|
|
for key in responses:
|
|
if key not in response_keys:
|
|
response_keys.append(key)
|
|
|
|
row = {
|
|
'event_exhibit_tracking_id': tracking_obj.event_exhibit_tracking_id,
|
|
'created_on': tracking_obj.created_on,
|
|
'updated_on': tracking_obj.updated_on,
|
|
'event_exhibit_name': tracking_obj.event_exhibit_name,
|
|
'event_badge_full_name': tracking_obj.event_badge_full_name,
|
|
'event_badge_email': tracking_obj.event_badge_email,
|
|
'event_badge_professional_title': tracking_obj.event_badge_professional_title,
|
|
'event_badge_affiliations': tracking_obj.event_badge_affiliations,
|
|
'event_badge_location': tracking_obj.event_badge_location,
|
|
'event_badge_country': tracking_obj.event_badge_country,
|
|
'external_person_id': tracking_obj.external_person_id,
|
|
'exhibitor_notes': _strip_html(tracking_obj.exhibitor_notes),
|
|
'priority': tracking_obj.priority,
|
|
'enable': tracking_obj.enable,
|
|
'hide': tracking_obj.hide,
|
|
**responses,
|
|
}
|
|
data_rows.append(row)
|
|
|
|
# 7. Determine file format
|
|
export_type = 'Excel' if file_type == 'XLSX' else 'CSV'
|
|
content_type = (
|
|
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
|
if file_type == 'XLSX' else 'text/csv'
|
|
)
|
|
datetime_str = datetime.datetime.utcnow().strftime('%Y-%m-%d_%H%M')
|
|
filename = f'leads_export_{datetime_str}'
|
|
ext = '.xlsx' if export_type == 'Excel' else '.csv'
|
|
filename_w_ext = filename + ext
|
|
|
|
fixed_columns = [
|
|
'event_exhibit_tracking_id',
|
|
'created_on',
|
|
'updated_on',
|
|
'event_exhibit_name',
|
|
'event_badge_full_name',
|
|
'event_badge_email',
|
|
'event_badge_professional_title',
|
|
'event_badge_affiliations',
|
|
'event_badge_location',
|
|
'event_badge_country',
|
|
'external_person_id',
|
|
'exhibitor_notes',
|
|
'priority',
|
|
'enable',
|
|
'hide',
|
|
]
|
|
column_name_li = fixed_columns + response_keys
|
|
|
|
# 8. Handle empty result — write headers-only file
|
|
if not data_rows:
|
|
hosted_tmp_path = settings.FILES_PATH['hosted_tmp_root']
|
|
subdir = os.path.join(hosted_tmp_path, 'event_exhibit')
|
|
pathlib.Path(subdir).mkdir(parents=True, exist_ok=True)
|
|
full_path = os.path.join(subdir, filename_w_ext)
|
|
df = pandas.DataFrame(columns=fixed_columns)
|
|
if export_type == 'CSV':
|
|
df.to_csv(full_path, index=False)
|
|
else:
|
|
df.to_excel(full_path, index=False)
|
|
if return_file:
|
|
return FileResponse(path=full_path, filename=filename_w_ext, media_type=content_type)
|
|
return mk_resp(data=[], tmp_file_path=filename_w_ext)
|
|
|
|
# 9. Generate the export file
|
|
tmp_file_path = create_export_file(
|
|
data_dict_list=data_rows,
|
|
column_name_li=column_name_li,
|
|
subdir_path='event_exhibit',
|
|
filename=filename,
|
|
rm_id=False,
|
|
export_type=export_type,
|
|
)
|
|
if not tmp_file_path:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail='Failed to generate export file.',
|
|
)
|
|
|
|
if return_file:
|
|
full_path = return_full_tmp_path(full_tmp_path=tmp_file_path)
|
|
if not full_path:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail='Export file not found after creation.',
|
|
)
|
|
return FileResponse(path=full_path, filename=filename_w_ext, media_type=content_type)
|
|
|
|
return mk_resp(data=data_rows, tmp_file_path=tmp_file_path)
|