Files
OSIT-AE-API-FastAPI/app/methods/hosted_file_methods.py
2021-07-09 17:34:02 -04:00

306 lines
12 KiB
Python

from __future__ import annotations
import datetime, hashlib, os, pathlib, shutil, time
from fastapi import File, UploadFile
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging
from app.models.hosted_file_models import Hosted_File_Base
# ### BEGIN ### API Hosted File Methods ### create_hosted_file_obj() ###
def create_hosted_file_obj(hosted_file_obj_new:Hosted_File_Base):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# hosted_file_obj_data = hosted_file_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'created_on', 'updated_on'})
hosted_file_obj_data = hosted_file_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'saved', 'already_exists', 'copy_timer', 'created_on', 'updated_on'})
if hosted_file_obj_in_result := sql_insert(data=hosted_file_obj_data, table_name='hosted_file', rm_id_random=True, id_random_length=8): pass
else:
return False
log.debug(hosted_file_obj_in_result)
hosted_file_id = hosted_file_obj_in_result
log.debug(f'Returning the new hosted_file_id: {hosted_file_id}')
return hosted_file_id
# ### END ### API Hosted File Methods ### create_hosted_file_obj() ###
# ### BEGIN ### API Hosted File Methods ### load_hosted_file_obj() ###
def load_hosted_file_obj(
hosted_file_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_hosted_file_link_list: bool = False,
) -> Hosted_File_Base|dict|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
else: return False
if hosted_file_rec := sql_select(table_name='v_hosted_file', record_id=hosted_file_id):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(hosted_file_rec)
else:
return False
try:
hosted_file_obj = Hosted_File_Base(**hosted_file_rec)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(hosted_file_obj)
except ValidationError as e:
log.error(e.json())
return False
# if inc_x:
# x_id = hosted_file_rec.get('x_id', None)
# if x_obj_result := load_x_obj(x_id=x_id):
# x_obj = x_obj_result
# hosted_file_obj.x = x_obj
# else: hosted_file_obj.x = None
if model_as_dict:
return hosted_file_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return hosted_file_obj
# ### END ### API Hosted File Methods ### load_hosted_file_obj() ###
# ### BEGIN ### API Hosted File Route ### get_file_object_hash() ###
async def get_file_object_hash(file_object:File):
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# 4096 bytes is the current block size on my workstation and Linode server
# 4096 8192 16384 32768 65536 131072 262144 524288 1048576 bytes
block_size = 131072
hash_value = hashlib.sha256()
timer_start = time.process_time()
for chunk in iter(lambda: file_object.read(block_size), b""):
hash_value.update(chunk)
file_hash = hash_value.hexdigest()
file_object.seek(0) # The file will not properly save if seek is not reset to 0.
timer_end = time.process_time()
elapsed_time = timer_end - timer_start
log.debug(f'Elapsed time: {elapsed_time}')
return file_hash
# ### END ### API Hosted File Route ### get_file_object_hash() ###
# ### BEGIN ### API Hosted File Route ### guess_file_extension() ###
def guess_file_extension(filename:str):
return filename.rsplit('.', 1)[1].lower()
# ### END ### API Hosted File Route ### guess_file_extension() ###
# ### BEGIN ### API Hosted File Route ### allowed_file_extension() ###
def allowed_file_extension(extension:str, extension_list:list):
return extension.lower() in extension_list # app.config['ALLOWED_EXTENSIONS']
# ### END ### API Hosted File Route ### allowed_file_extension() ###
# ### BEGIN ### API Hosted File Route ### save_file() ###
async def save_file(
file: UploadFile,
account_id: int,
account_id_random: str,
link_to_type: str,
link_to_id: int,
link_to_id_random: str,
check_allowed_extension: bool = False,
):
# log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
hosted_file_path = '/home/scott/tmp/hosted_file_dev/'
log.debug(shutil.disk_usage(hosted_file_path))
log.debug(dir(file))
log.debug(f'{file.filename}')
file_info: dict = {}
file_info['saved'] = None
file_info['link_to_type'] = link_to_type
file_info['link_to_id'] = link_to_id
file_info['link_to_id_random'] = link_to_id_random
file_info['filename'] = file.filename
file_info['extension'] = guess_file_extension(filename=file.filename)
if check_allowed_extension:
if allowed_file_extension(extension=file_info['extension'], extension_list=['jpg','png','webp']):
file_info['extension_allowed'] = True
else:
file_info['extension_allowed'] = False
file_info['saved'] = False
return file_info
else:
file_info['extension_allowed'] = None
# There is a difference between Content-Type and MIME type.
# https://stackoverflow.com/questions/3452381/whats-the-difference-of-contenttype-and-mimetype
file_info['content_type'] = file.content_type # might also include charset or other parameters
# file_info['mimetype'] = file.mimetype # This may need to be filled in a different way?
file.file.seek(0, os.SEEK_END)
file_size = file.file.tell()
file.file.seek(0) # The file will not properly save if seek is not reset to 0.
log.debug(file_size)
file_info['size'] = file_size
file_hash = await get_file_object_hash(file.file)
log.debug(file_hash)
file_info['hash_sha256'] = file_hash
# 16384 bytes is the default
# 4096 8192 16384 32768 65536 131072 262144 524288 1048576 bytes
buffer_size = 524288
#f_src = open(file_src, 'rb')
f_src = file.file # Don't need to do open(file_src, 'rb') since it is already "open"
#file_dest = f'{hosted_file_path}{file.filename}'
file_dest = f'{hosted_file_path}{file_hash}.file'
existing_file_check = pathlib.Path(file_dest)
if existing_file_check.exists():
file_info['already_exists'] = True
file_info['copy_timer'] = 0
file_info['saved'] = True
else:
file_info['already_exists'] = False
try:
f_dest = open(file_dest, 'wb')
timer_start = time.process_time()
shutil.copyfileobj(f_src, f_dest, buffer_size)
timer_end = time.process_time()
elapsed_time = timer_end - timer_start
log.debug(f'Elapsed time: {elapsed_time}')
file_info['copy_timer'] = elapsed_time
file_info['saved'] = True
except Exception as e:
log.exception('*** An exception happened. ***')
log.exception(repr(e))
log.exception('***')
log.exception(str(e))
log.exception('^^^ exception ^^^')
file_info['copy_timer'] = 0
file_info['saved'] = False
log.debug(shutil.disk_usage(hosted_file_path))
return file_info
# ### END ### API Hosted File Route ### save_file() ###
# ### BEGIN ### API Hosted File Route ### hosted_file_link() ###
def create_hosted_file_link(
account_id: int|str,
hosted_file_id: int|str,
link_to_type: str,
link_to_id: int|str,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return False
if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
else: return False
if link_to_id := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type): pass
else: return False
hosted_file_link_data: dict = {}
hosted_file_link_data['account_id'] = account_id
hosted_file_link_data['hosted_file_id'] = hosted_file_id
hosted_file_link_data['link_to_type'] = link_to_type # Should this be renamed to "link_to_type" for clarity?
hosted_file_link_data['link_to_id'] = link_to_id # Should this be renamed to "link_to_id" for clarity?
# NOTE: Currently sql_insert does not handel all successful inserts correctly. If there is not an autonum ID then it will return 0 as the ID.
if hosted_file_link_data_in_result := sql_insert(data=hosted_file_link_data, table_name='hosted_file_link', id_random_length=0): pass # This should be improved
else:
# This should be improved
log.warning('Because the hosted_file_link table does not have a primary autonum this check is incorrect even when successful.')
log.warning('Something may have gone wrong while trying to create the hosted_file_link record.')
log.warning('The hosted_file_link was probably created fine though.')
return False
log.debug(hosted_file_link_data_in_result)
return True
# ### END ### API Hosted File Route ### hosted_file_link() ###
# ### BEGIN ### API Hosted File Methods ### get_hosted_file_rec_list() ###
# This needs to be improved. Currently it does not really do anything.
# Need to allow for list by account? Probably have the same actual hosted file have two hosted_file entries if it was uploaded for two separate accounts.
def get_hosted_file_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'hosted_file_id', `tbl`.id_random AS 'hosted_file_id_random'
FROM `hosted_file` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if hosted_file_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
hosted_file_rec_li = hosted_file_rec_li_result
else:
hosted_file_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(hosted_file_rec_li_result)
return hosted_file_rec_li
# ### END ### API Hosted File Methods ### get_hosted_file_rec_list() ###