import os, pathlib, qrcode, time from fastapi import APIRouter, Body, Depends, File, Form, Header, HTTPException, Path, Query, Response, status, UploadFile from fastapi.responses import FileResponse from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from app.lib_general import log, logging, common_route_params, Common_Route_Params from app.config import settings from app.db_sql import redis_lookup_id_random # from app.methods.hosted_file_methods import create_hosted_file_obj, load_hosted_file_obj, save_file, create_hosted_file_link # from app.models.hosted_file_models import Hosted_File_Base from app.models.response_models import Resp_Body_Base, mk_resp router = APIRouter() # ### BEGIN ### API QR ### get_qr() ### # Updated 2022-07-06 @router.get('/qr/{account_id}/{qr_id}', response_model=Resp_Body_Base) @router.get('/qr/{qr_id}', response_model=Resp_Body_Base) async def get_qr( # account_id: str = Path(min_length=11, max_length=22), qr_id: str = Path(min_length=11, max_length=22), account_id: str = Path(min_length=11, max_length=22), regen: bool = False, qr_type: str = Query(None, min_length=1, max_length=10), filename: str = Query(None, min_length=4, max_length=255), n: str = Query('', max_length=100), fn: str = Query('', max_length=100), title: str = Query('', max_length=100), org: str = Query('', max_length=255), url: str = Query('', max_length=500), email: str = Query('', max_length=255), tel: str = Query('', max_length=25), adr: str = Query('', max_length=200), adr_poa: str = Query('', max_length=100), # Address Post Office Address adr_ext: str = Query('', max_length=100), # Address Extended Address adr_str: str = Query('', max_length=100), # Address Street adr_loc: str = Query('', max_length=100), # Address Locality adr_reg: str = Query('', max_length=100), # Address Region adr_postal: str = Query('', max_length=100), # Address Postal Code adr_country: str = Query('', max_length=100), # Address Country obj_type: str = Query(None, max_length=100), obj_id: str = Query(None, max_length=100), key: str = Query(None, max_length=200), val: str = Query(None, max_length=1000), js: str = Query(None, max_length=1000), str: str = Query(None, max_length=1000), return_file: bool = False, # commons: Common_Route_Params = Depends(common_route_params), x_account_id: str = Header(None, min_length=11, max_length=22), response: Response = Response, ): log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # if commons.x_account_id_random: # account_id_random = commons.x_account_id_random if x_account_id: account_id_random = x_account_id elif account_id: account_id_random = account_id # ### SECTION ### Secondary data validation if account_id := redis_lookup_id_random(record_id_random=account_id_random, table_name='account'): pass else: return mk_resp(data=None, status_code=404, response=response, status_message='The account ID was invalid or not found.') # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # time.sleep(1.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING hosted_tmp_root_path = settings.FILES_PATH['hosted_tmp_root'] log.info(f'Hosted Tmp Root Path: {hosted_tmp_root_path}') if os.path.exists(hosted_tmp_root_path): log.info(f'Hosted tmp root path found: {hosted_tmp_root_path}') else: log.error(f'Hosted tmp root path not found: {hosted_tmp_root_path}') return mk_resp(data=False, status_code=400, response=response) # Bad Request hosted_tmp_root_path_w_subdir = os.path.join(hosted_tmp_root_path, account_id_random) if os.path.exists(hosted_tmp_root_path_w_subdir): log.info(f'Hosted tmp root path with subdirectory found: {hosted_tmp_root_path_w_subdir}') else: log.info(f'Hosted tmp root path with subdirectory not found: {hosted_tmp_root_path_w_subdir}; Creating...') pathlib.Path(hosted_tmp_root_path_w_subdir).mkdir(parents=True, exist_ok=True) hosted_tmp_root_path_w_subdir_qr_id = os.path.join(hosted_tmp_root_path_w_subdir, qr_id) if regen: pass elif os.path.exists(hosted_tmp_root_path_w_subdir_qr_id): if return_file or filename: if not filename: filename = f'qr_{account_id_random}.png' return FileResponse(hosted_tmp_root_path_w_subdir_qr_id, filename=filename) else: return mk_resp(data=True, response=response) else: log.error(f'The QR file was not found on the server. Account ID: {account_id_random}; QR ID: {qr_id}; Going to create a new one.') # return mk_resp(data=False, status_code=404, response=commons.response, status_message='The QR file was not found on the server.') # Not Found qr_data = None if qr_type: log.info(f'Found QR Type: {qr_type}') if qr_type == 'mecard': # Use MeCard (QR code) format # https://en.wikipedia.org/wiki/MeCard_(QR_code) qr_data = f'MECARD:N:{n};EMAIL:{email};ADR:{adr};;' #qr_data = 'MECARD:N:'+event_badge.given_name+' '+event_badge.family_name+';EMAIL:'+event_badge.email+';ADR:'+event_badge.city+' '+event_badge.state_province+' '+event_badge.country+';;' log.debug(qr_data) elif qr_type == 'vcard': # Use VCard format # https://en.wikipedia.org/wiki/VCard # qr_data = f'BEGIN:VCARD VERSION:3.0\nN:{n}\nFN:{fn}\nORG:{org}\nURL:{url}\nEMAIL:{email}\nTEL:TYPE=VOICE:{tel}\nADR:TYPE=postal:{adr_poa};{adr_ext};{adr_str};{adr_loc};{adr_reg};{adr_postal};{adr_country}\nEND:VCARD' # qr_data = f'BEGIN:VCARD VERSION:3.0\nN:{n}\nFN:{fn}\nORG:{org}\nURL:{url}\nEMAIL:{email}\nTEL:{tel}\nADR:{adr_poa};{adr_ext};{adr_str};{adr_loc};{adr_reg};{adr_postal};{adr_country}\nEND:VCARD' qr_data = f'BEGIN:VCARD VERSION:3.0\nN:{n}\nFN:{fn}\nORG:{org}\nEMAIL:{email}\n' if url: qr_data = f'{qr_data}URL:{url}\n' if tel: qr_data = f'{qr_data}TEL:{tel}\n' if adr_loc: qr_data = f'{qr_data}ADR:{adr_poa};{adr_ext};{adr_str};{adr_loc};{adr_reg};{adr_postal};{adr_country}\n' qr_data = f'{qr_data}END:VCARD' log.debug(qr_data) elif qr_type == 'obj': qr_data = f'OBJ:ot:{obj_type},oi:{obj_id}' # NOTE: These are not quoted values "". Key value below still is quoted. log.debug(qr_data) elif qr_type == 'kv': qr_data = f'KV:k:"{key}",v:"{val}"' elif qr_type == 'js': # qr_data_dict = {'example':'example JSON'} # json_str = json.dumps(qr_data_dict) # qr_data = f'JS:{json_str}' qr_data = f'JS:{js}' elif qr_type == 'str': qr_data = str else: # json_str = request.json() # string = json_str.get('STR', None) qr_data = None log.debug(qr_data) # Create qr code instance qr = qrcode.QRCode( version = 1, error_correction = qrcode.constants.ERROR_CORRECT_M, box_size = 10, border = 1, ) # The data that you want to store # Add data qr.add_data(qr_data) qr.make(fit=True) # Create an image from the QR Code instance img = qr.make_image() # Save it somewhere, change the extension as needed: img.save(f'{hosted_tmp_root_path_w_subdir_qr_id}') # .png ??? if os.path.exists(hosted_tmp_root_path_w_subdir_qr_id): if return_file or filename: if not filename: filename = f'qr_{account_id_random}.png' return FileResponse(hosted_tmp_root_path_w_subdir_qr_id, filename=filename) else: return mk_resp(data=True, response=response) else: log.error(f'The QR file was not found on the server and could not be created. Account ID: {account_id_random}; QR ID: {qr_id}') return mk_resp(data=False, status_code=404, response=response, status_message='The QR file was not found on the server and could not be created.') # Not Found # ### END ### API QR ### get_qr() ###