Files
OSIT-AE-API-FastAPI/app/routers/hosted_file.py

305 lines
16 KiB
Python

from __future__ import annotations
# import datetime, hashlib, os, pathlib, shutil, time
#from datetime import datetime, time, timedelta
from fastapi import APIRouter, Body, Depends, File, Form, Header, HTTPException, Query, Response, status, UploadFile
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.lib_general import log, logging
from app.config import settings
from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random
# from .api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
from app.methods.hosted_file_methods import create_hosted_file_obj, load_hosted_file_obj, save_file, create_hosted_file_link
from app.models.hosted_file_models import Hosted_File_Base
from app.models.response_models import mk_resp
router = APIRouter()
# ### BEGIN ### API Hosted File Route ### upload_files() ###
# This just needs to return the currect model for a hosted_file
# Everything else seems to be working well
# Should this also do something with meta data and updating the DB?
@router.post('/upload_files')
async def upload_files(
file_list: List[UploadFile] = File(...),
account_id: str = Form(..., min_length=1, max_length=22),
# filename: Optional[str] = Form(...),
link_to_type: str = Form(...),
link_to_id: str = Form(..., min_length=1, max_length=22),
check_allowed_extension: bool = False,
# create_hosted_file_link: bool = True,
x_account_id: str = Header(..., ),
return_obj: bool = True,
by_alias: bool = True,
exclude_unset: bool = True,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
account_id_random = account_id # This is for the account random str ID
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else:
return mk_resp(data=None, status_code=400)
link_to_type = link_to_type
link_to_id_random = link_to_id # This is for the object random str ID
if link_to_id := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type): pass
else:
return mk_resp(data=None, status_code=400)
hosted_file_list = []
for file_obj in file_list:
file_info = await save_file(
file = file_obj,
account_id = account_id,
account_id_random = account_id_random,
link_to_type = link_to_type,
link_to_id = link_to_id,
link_to_id_random = link_to_id_random,
check_allowed_extension = check_allowed_extension,
)
if file_info['saved']:
# Create a new host_file object entry
if file_info['already_exists']:
# Look up in DB based on hash
# Get existing host_file object_entry and existing host_file.id_random.
if hosted_file_sel_result := sql_select(
table_name = 'hosted_file',
field_name = 'hash_sha256',
field_value = file_info['hash_sha256'],
):
hosted_file_id = hosted_file_sel_result.get('id_random', None)
# hosted_file_obj = Hosted_File_Base(**file_info)
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
else:
# SOMETHING WENT WRONG
# Going to try and create a new host_file entry...
log.warning('For some reason a host_file object entry with the has was not found.')
# file_info['id_random'] = None
hosted_file_obj = Hosted_File_Base(**file_info)
if hosted_file_obj_result := create_hosted_file_obj(hosted_file_obj_new=hosted_file_obj):
hosted_file_id = hosted_file_obj_result
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
else:
log.warning('For some reason a host_file object entry could not be created.')
hosted_file_id = None
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
log.debug(hosted_file_obj_result)
log.debug(hosted_file_sel_result)
else:
# Just in case look up in DB based on hash
if hosted_file_sel_result := sql_select(
table_name = 'hosted_file',
field_name = 'hash_sha256',
field_value = file_info['hash_sha256'],
):
log.warning('Found an existing host_file object_entry in the DB but the file was not found on the server!')
# Got existing host_file object_entry!
# Odd... the hash was found in the database, but the file had to be copied again.
# If this happens then the file on the host server was probably deleted at some point.
hosted_file_id = hosted_file_sel_result.get('id_random', None)
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
else:
# This is normal since the file was not found on the host server and not found in the DB.
# Create a new host_file object entry and new host_file.id_random.
hosted_file_obj = Hosted_File_Base(**file_info)
if hosted_file_obj_result := create_hosted_file_obj(hosted_file_obj_new=hosted_file_obj):
hosted_file_id = hosted_file_obj_result
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
else:
log.warning('For some reason a host_file object entry could not be created.')
hosted_file_id = None
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
log.debug(hosted_file_obj_result)
log.debug(hosted_file_sel_result)
else:
file_info['id_random'] = None
hosted_file_obj = Hosted_File_Base(**file_info)
hosted_file_id = None
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
# file_info_obj = Hosted_File_Base(**file_info)
hosted_file_dict['extension_allowed'] = file_info['extension_allowed']
hosted_file_dict['already_exists'] = file_info['already_exists']
hosted_file_dict['saved'] = file_info['saved']
hosted_file_dict['copy_timer'] = file_info['copy_timer']
hosted_file_list.append(hosted_file_dict)
# NOTE: Currently sql_insert does not handel all successful inserts correctly. If there is not an autonum ID then it will return 0 as the ID.
if create_hosted_file_link(
account_id=account_id,
hosted_file_id=hosted_file_id,
link_to_type=link_to_type,
link_to_id=link_to_id,
): pass # This if statement should be improved
else:
# This if statement should be improved
log.debug('Because the hosted_file_link table does not have a primary autonum this check is incorrect even when successful.')
log.debug('Something may have gone wrong while trying to create the hosted_file_link record.')
log.debug('The hosted_file_link was probably created fine though.')
log.debug(hosted_file_list)
return mk_resp(data=hosted_file_list)
# ### END ### API Hosted File Route ### upload_files() ###
# ### BEGIN ### API Hosted File Route ### upload_files_fake() ###
# This just needs to return the currect model for a hosted_file
# Everything else seems to be working well
# Should this also do something with meta data and updating the DB?
@router.post('/upload_files/fake')
async def upload_files_fake(
file_info_li: list,
account_id: str,
# filename: Optional[str] = Form(...),
link_to_type: str,
link_to_id: str,
check_allowed_extension: bool = False,
# create_hosted_file_link: bool = True,
x_account_id: str = Header(..., ),
return_obj: bool = True,
by_alias: bool = True,
exclude_unset: bool = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
log.debug(file_info_li)
account_id_random = account_id # This is for the account random str ID
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else:
return mk_resp(data=None, status_code=400)
link_to_type = link_to_type
link_to_id_random = link_to_id # This is for the object random str ID
if link_to_id := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type): pass
else:
return mk_resp(data=None, status_code=400)
hosted_file_list = []
for file_info in file_info_li:
if file_info['saved']:
# Create a new host_file object entry
log.info('Check and create a new host_file object entry...')
if file_info['already_exists']:
# Look up in DB based on hash
# Get existing host_file object_entry and existing host_file.id_random.
log.info('Look up in DB based on hash...')
if hosted_file_sel_result := sql_select(
table_name = 'hosted_file',
field_name = 'hash_sha256',
field_value = file_info['hash_sha256'],
):
hosted_file_id = hosted_file_sel_result.get('id_random', None)
# hosted_file_obj = Hosted_File_Base(**file_info)
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(hosted_file_dict)
else:
# SOMETHING WENT WRONG
# Going to try and create a new host_file entry...
log.warning('For some reason a host_file object entry with the has was not found.')
# file_info['id_random'] = None
hosted_file_obj = Hosted_File_Base(**file_info)
if hosted_file_obj_result := create_hosted_file_obj(hosted_file_obj_new=hosted_file_obj):
hosted_file_id = hosted_file_obj_result
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
else:
log.warning('For some reason a host_file object entry could not be created.')
hosted_file_id = None
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
log.debug(hosted_file_obj_result)
log.debug(hosted_file_sel_result)
else:
# Just in case look up in DB based on hash
log.info('Look up in DB based on hash...')
if hosted_file_sel_result := sql_select(
table_name = 'hosted_file',
field_name = 'hash_sha256',
field_value = file_info['hash_sha256'],
):
log.warning('Found an existing host_file object_entry in the DB but the file was not found on the server!')
# Got existing host_file object_entry!
# Odd... the hash was found in the database, but the file had to be copied again.
# If this happens then the file on the host server was probably deleted at some point.
hosted_file_id = hosted_file_sel_result.get('id_random', None)
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
else:
# This is normal since the file was not found on the host server and not found in the DB.
# Create a new host_file object entry and new host_file.id_random.
log.warning('This is sort of normal. The file may have been deleted from the host server...')
hosted_file_obj = Hosted_File_Base(**file_info)
if hosted_file_obj_result := create_hosted_file_obj(hosted_file_obj_new=hosted_file_obj):
hosted_file_id = hosted_file_obj_result
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
else:
log.warning('For some reason a host_file object entry could not be created.')
hosted_file_id = None
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
log.debug(hosted_file_obj_result)
log.debug(hosted_file_sel_result)
else:
file_info['id_random'] = None
hosted_file_obj = Hosted_File_Base(**file_info)
hosted_file_id = None
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
# file_info_obj = Hosted_File_Base(**file_info)
hosted_file_dict['extension_allowed'] = file_info['extension_allowed']
hosted_file_dict['already_exists'] = file_info['already_exists']
hosted_file_dict['saved'] = file_info['saved']
hosted_file_dict['copy_timer'] = file_info['copy_timer']
log.debug(hosted_file_dict)
hosted_file_list.append(hosted_file_dict)
# NOTE: Currently sql_insert does not handel all successful inserts correctly. If there is not an autonum ID then it will return 0 as the ID.
if create_hosted_file_link(
account_id=account_id,
hosted_file_id=hosted_file_id,
link_to_type=link_to_type,
link_to_id=link_to_id,
): pass # This if statement should be improved
else:
# This if statement should be improved
log.debug('Because the hosted_file_link table does not have a primary autonum this check is incorrect even when successful.')
log.debug('Something may have gone wrong while trying to create the hosted_file_link record.')
log.debug('The hosted_file_link was probably created fine though.')
log.debug(hosted_file_list)
return mk_resp(data=hosted_file_list)
# ### END ### API Hosted File Route ### upload_files_fake() ###
@router.post('/test_uploads')
async def test_upload_files(
file_list: List[UploadFile],
# account_id: str = Form(..., min_length=1, max_length=22),
# filename: Optional[str] = Form(...),
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
for file_obj in file_list:
file_info = await save_file(
file = file_obj,
account_id = account_id,
account_id_random = account_id_random,
link_to_type = link_to_type,
link_to_id = link_to_id,
link_to_id_random = link_to_id_random,
check_allowed_extension = check_allowed_extension,
)
log.debug(file_info)
return mk_resp(data=False, status_code=501)