from __future__ import annotations import datetime, hashlib, os, pathlib, shutil, time from fastapi import File, UploadFile from typing import Dict, List, Optional, Set, Union from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update from app.lib_general import log, logging from app.models.hosted_file_models import Hosted_File_Base # ### BEGIN ### API Hosted File Methods ### create_hosted_file_obj() ### def create_hosted_file_obj(hosted_file_obj_new:Hosted_File_Base): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # hosted_file_obj_data = hosted_file_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'created_on', 'updated_on'}) hosted_file_obj_data = hosted_file_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'saved', 'already_exists', 'copy_timer', 'created_on', 'updated_on'}) if hosted_file_obj_in_result := sql_insert(data=hosted_file_obj_data, table_name='hosted_file', rm_id_random=True, id_random_length=8): pass else: return False log.debug(hosted_file_obj_in_result) hosted_file_id = hosted_file_obj_in_result log.debug(f'Returning the new hosted_file_id: {hosted_file_id}') return hosted_file_id # ### END ### API Hosted File Methods ### create_hosted_file_obj() ### # ### BEGIN ### API Hosted File Methods ### load_hosted_file_obj() ### def load_hosted_file_obj( hosted_file_id: int|str, limit: int = 1000, model_as_dict: bool = False, enabled: str = 'enabled', # enabled, disabled, all # inc_x: bool = False, ) -> Hosted_File_Base|dict|bool: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass else: return False if hosted_file_rec := sql_select(table_name='v_hosted_file', record_id=hosted_file_id): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(hosted_file_rec) else: return False try: hosted_file_obj = Hosted_File_Base(**hosted_file_rec) log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(hosted_file_obj) except ValidationError as e: log.error(e.json()) return False # if inc_x: # x_id = hosted_file_rec.get('x_id', None) # if x_obj_result := load_x_obj(x_id=x_id): # x_obj = x_obj_result # hosted_file_obj.x = x_obj # else: hosted_file_obj.x = None if model_as_dict: return hosted_file_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member else: return hosted_file_obj # ### END ### API Hosted File Methods ### load_hosted_file_obj() ### # ### BEGIN ### API Hosted File Route ### get_file_object_hash() ### async def get_file_object_hash(file_object:File): #log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # 4096 bytes is the current block size on my workstation and Linode server # 4096 8192 16384 32768 65536 131072 262144 524288 1048576 bytes block_size = 131072 hash_value = hashlib.sha256() timer_start = time.process_time() for chunk in iter(lambda: file_object.read(block_size), b""): hash_value.update(chunk) file_hash = hash_value.hexdigest() file_object.seek(0) # The file will not properly save if seek is not reset to 0. timer_end = time.process_time() elapsed_time = timer_end - timer_start log.debug(f'Elapsed time: {elapsed_time}') return file_hash # ### END ### API Hosted File Route ### get_file_object_hash() ### # ### BEGIN ### API Hosted File Route ### guess_file_extension() ### def guess_file_extension(filename:str): return filename.rsplit('.', 1)[1].lower() # ### END ### API Hosted File Route ### guess_file_extension() ### # ### BEGIN ### API Hosted File Route ### allowed_file_extension() ### def allowed_file_extension(extension:str, extension_list:list): return extension.lower() in extension_list # app.config['ALLOWED_EXTENSIONS'] # ### END ### API Hosted File Route ### allowed_file_extension() ### # ### BEGIN ### API Hosted File Route ### save_file() ### async def save_file( file: UploadFile, account_id: int, account_id_random: str, for_object_type: str, for_object_id: int, for_object_id_random: str, check_allowed_extension: bool = False, ): # log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) hosted_file_path = '/home/scott/tmp/hosted_file_dev/' log.debug(shutil.disk_usage(hosted_file_path)) log.debug(dir(file)) log.debug(f'{file.filename}') file_info: dict = {} file_info['saved'] = None file_info['for_object_type'] = for_object_type file_info['for_object_id'] = for_object_id file_info['for_object_id_random'] = for_object_id_random file_info['filename'] = file.filename file_info['extension'] = guess_file_extension(filename=file.filename) if check_allowed_extension: if allowed_file_extension(extension=file_info['extension'], extension_list=['jpg','png','webp']): file_info['extension_allowed'] = True else: file_info['extension_allowed'] = False file_info['saved'] = False return file_info else: file_info['extension_allowed'] = None # There is a difference between Content-Type and MIME type. # https://stackoverflow.com/questions/3452381/whats-the-difference-of-contenttype-and-mimetype file_info['content_type'] = file.content_type # might also include charset or other parameters # file_info['mimetype'] = file.mimetype # This may need to be filled in a different way? file.file.seek(0, os.SEEK_END) file_size = file.file.tell() file.file.seek(0) # The file will not properly save if seek is not reset to 0. log.debug(file_size) file_info['size'] = file_size file_hash = await get_file_object_hash(file.file) log.debug(file_hash) file_info['hash_sha256'] = file_hash # 16384 bytes is the default # 4096 8192 16384 32768 65536 131072 262144 524288 1048576 bytes buffer_size = 524288 #f_src = open(file_src, 'rb') f_src = file.file # Don't need to do open(file_src, 'rb') since it is already "open" #file_dest = f'{hosted_file_path}{file.filename}' file_dest = f'{hosted_file_path}{file_hash}.file' existing_file_check = pathlib.Path(file_dest) if existing_file_check.exists(): file_info['already_exists'] = True file_info['copy_timer'] = 0 file_info['saved'] = True else: file_info['already_exists'] = False try: f_dest = open(file_dest, 'wb') timer_start = time.process_time() shutil.copyfileobj(f_src, f_dest, buffer_size) timer_end = time.process_time() elapsed_time = timer_end - timer_start log.debug(f'Elapsed time: {elapsed_time}') file_info['copy_timer'] = elapsed_time file_info['saved'] = True except Exception as e: log.exception('*** An exception happened. ***') log.exception(repr(e)) log.exception('***') log.exception(str(e)) log.exception('^^^ exception ^^^') file_info['copy_timer'] = 0 file_info['saved'] = False log.debug(shutil.disk_usage(hosted_file_path)) return file_info # ### END ### API Hosted File Route ### save_file() ### # ### BEGIN ### API Hosted File Route ### hosted_file_link() ### def create_hosted_file_link( account_id: int|str, hosted_file_id: int|str, for_object_type: str, for_object_id: int|str, # for_object_id_random: str, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass else: return False if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass else: return False if for_object_id := redis_lookup_id_random(record_id_random=for_object_id, table_name=for_object_type): pass else: return False hosted_file_link_data: dict = {} hosted_file_link_data['account_id'] = account_id hosted_file_link_data['hosted_file_id'] = hosted_file_id hosted_file_link_data['object_type'] = for_object_type hosted_file_link_data['object_id'] = for_object_id # NOTE: Currently sql_insert does not handel all successful inserts correctly. If there is not an autonum ID then it will return 0 as the ID. if hosted_file_link_data_in_result := sql_insert(data=hosted_file_link_data, table_name='hosted_file_link', id_random_length=0): pass # This should be improved else: # This should be improved log.warning('Because the hosted_file_link table does not have a primary autonum this check is incorrect even when successful.') log.warning('Something may have gone wrong while trying to create the hosted_file_link record.') log.warning('The hosted_file_link was probably created fine though.') return False log.debug(hosted_file_link_data_in_result) return True # ### END ### API Hosted File Route ### hosted_file_link() ###