304 lines
16 KiB
Python
304 lines
16 KiB
Python
from __future__ import annotations
|
|
# import datetime, hashlib, os, pathlib, shutil, time
|
|
#from datetime import datetime, time, timedelta
|
|
from fastapi import APIRouter, Body, Depends, File, Form, Header, HTTPException, Query, Response, status, UploadFile
|
|
from pydantic import BaseModel, EmailStr, Field
|
|
from typing import Dict, List, Optional, Set, Union
|
|
|
|
from app.lib_general import log, logging
|
|
from app.config import settings
|
|
from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random
|
|
|
|
# from .api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
|
|
|
|
from app.methods.hosted_file_methods import create_hosted_file_obj, load_hosted_file_obj, save_file, create_hosted_file_link
|
|
|
|
from app.models.hosted_file_models import Hosted_File_Base
|
|
from app.models.response_models import mk_resp
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Route ### upload_files() ###
|
|
# This just needs to return the currect model for a hosted_file
|
|
# Everything else seems to be working well
|
|
# Should this also do something with meta data and updating the DB?
|
|
@router.post('/upload_files')
|
|
async def upload_files(
|
|
file_list: List[UploadFile] = File(...),
|
|
account_id: str = Form(..., min_length=1, max_length=22),
|
|
# filename: Optional[str] = Form(...),
|
|
for_object_type: str = Form(...),
|
|
for_object_id: str = Form(..., min_length=1, max_length=22),
|
|
check_allowed_extension: bool = False,
|
|
# create_hosted_file_link: bool = True,
|
|
x_account_id: str = Header(..., ),
|
|
return_obj: bool = True,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
):
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
account_id_random = account_id # This is for the account random str ID
|
|
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
|
|
else:
|
|
return mk_resp(data=None, status_code=400)
|
|
|
|
for_object_type = for_object_type
|
|
for_object_id_random = for_object_id # This is for the object random str ID
|
|
if for_object_id := redis_lookup_id_random(record_id_random=for_object_id, table_name=for_object_type): pass
|
|
else:
|
|
return mk_resp(data=None, status_code=400)
|
|
|
|
hosted_file_list = []
|
|
for file_obj in file_list:
|
|
file_info = await save_file(
|
|
file = file_obj,
|
|
account_id = account_id,
|
|
account_id_random = account_id_random,
|
|
for_object_type = for_object_type,
|
|
for_object_id = for_object_id,
|
|
for_object_id_random = for_object_id_random,
|
|
check_allowed_extension = check_allowed_extension,
|
|
)
|
|
if file_info['saved']:
|
|
# Create a new host_file object entry
|
|
if file_info['already_exists']:
|
|
# Look up in DB based on hash
|
|
# Get existing host_file object_entry and existing host_file.id_random.
|
|
if hosted_file_sel_result := sql_select(
|
|
table_name = 'hosted_file',
|
|
field_name = 'hash_sha256',
|
|
field_value = file_info['hash_sha256'],
|
|
):
|
|
hosted_file_id = hosted_file_sel_result.get('id_random', None)
|
|
# hosted_file_obj = Hosted_File_Base(**file_info)
|
|
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
|
|
else:
|
|
# SOMETHING WENT WRONG
|
|
# Going to try and create a new host_file entry...
|
|
log.warning('For some reason a host_file object entry with the has was not found.')
|
|
# file_info['id_random'] = None
|
|
hosted_file_obj = Hosted_File_Base(**file_info)
|
|
if hosted_file_obj_result := create_hosted_file_obj(hosted_file_obj_new=hosted_file_obj):
|
|
hosted_file_id = hosted_file_obj_result
|
|
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
|
|
else:
|
|
log.warning('For some reason a host_file object entry could not be created.')
|
|
hosted_file_id = None
|
|
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
|
|
log.debug(hosted_file_obj_result)
|
|
log.debug(hosted_file_sel_result)
|
|
else:
|
|
# Just in case look up in DB based on hash
|
|
if hosted_file_sel_result := sql_select(
|
|
table_name = 'hosted_file',
|
|
field_name = 'hash_sha256',
|
|
field_value = file_info['hash_sha256'],
|
|
):
|
|
log.warning('Found an existing host_file object_entry in the DB but the file was not found on the server!')
|
|
# Got existing host_file object_entry!
|
|
# Odd... the hash was found in the database, but the file had to be copied again.
|
|
# If this happens then the file on the host server was probably deleted at some point.
|
|
hosted_file_id = hosted_file_sel_result.get('id_random', None)
|
|
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
|
|
else:
|
|
# This is normal since the file was not found on the host server and not found in the DB.
|
|
# Create a new host_file object entry and new host_file.id_random.
|
|
hosted_file_obj = Hosted_File_Base(**file_info)
|
|
if hosted_file_obj_result := create_hosted_file_obj(hosted_file_obj_new=hosted_file_obj):
|
|
hosted_file_id = hosted_file_obj_result
|
|
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
|
|
else:
|
|
log.warning('For some reason a host_file object entry could not be created.')
|
|
hosted_file_id = None
|
|
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
|
|
log.debug(hosted_file_obj_result)
|
|
log.debug(hosted_file_sel_result)
|
|
else:
|
|
file_info['id_random'] = None
|
|
hosted_file_obj = Hosted_File_Base(**file_info)
|
|
hosted_file_id = None
|
|
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
|
|
|
|
# file_info_obj = Hosted_File_Base(**file_info)
|
|
hosted_file_dict['extension_allowed'] = file_info['extension_allowed']
|
|
hosted_file_dict['already_exists'] = file_info['already_exists']
|
|
hosted_file_dict['saved'] = file_info['saved']
|
|
hosted_file_dict['copy_timer'] = file_info['copy_timer']
|
|
hosted_file_list.append(hosted_file_dict)
|
|
|
|
# NOTE: Currently sql_insert does not handel all successful inserts correctly. If there is not an autonum ID then it will return 0 as the ID.
|
|
if create_hosted_file_link(
|
|
account_id=account_id,
|
|
hosted_file_id=hosted_file_id,
|
|
for_object_type=for_object_type,
|
|
for_object_id=for_object_id,
|
|
): pass # This if statement should be improved
|
|
else:
|
|
# This if statement should be improved
|
|
log.debug('Because the hosted_file_link table does not have a primary autonum this check is incorrect even when successful.')
|
|
log.debug('Something may have gone wrong while trying to create the hosted_file_link record.')
|
|
log.debug('The hosted_file_link was probably created fine though.')
|
|
|
|
log.debug(hosted_file_list)
|
|
return mk_resp(data=hosted_file_list)
|
|
# ### END ### API Hosted File Route ### upload_files() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Route ### upload_files_fake() ###
|
|
# This just needs to return the currect model for a hosted_file
|
|
# Everything else seems to be working well
|
|
# Should this also do something with meta data and updating the DB?
|
|
@router.post('/upload_files/fake')
|
|
async def upload_files_fake(
|
|
file_info_li: list,
|
|
account_id: str,
|
|
# filename: Optional[str] = Form(...),
|
|
for_object_type: str,
|
|
for_object_id: str,
|
|
check_allowed_extension: bool = False,
|
|
# create_hosted_file_link: bool = True,
|
|
x_account_id: str = Header(..., ),
|
|
return_obj: bool = True,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
):
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
log.debug(file_info_li)
|
|
|
|
account_id_random = account_id # This is for the account random str ID
|
|
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
|
|
else:
|
|
return mk_resp(data=None, status_code=400)
|
|
|
|
for_object_type = for_object_type
|
|
for_object_id_random = for_object_id # This is for the object random str ID
|
|
if for_object_id := redis_lookup_id_random(record_id_random=for_object_id, table_name=for_object_type): pass
|
|
else:
|
|
return mk_resp(data=None, status_code=400)
|
|
|
|
hosted_file_list = []
|
|
for file_info in file_info_li:
|
|
if file_info['saved']:
|
|
# Create a new host_file object entry
|
|
log.info('Check and create a new host_file object entry...')
|
|
if file_info['already_exists']:
|
|
# Look up in DB based on hash
|
|
# Get existing host_file object_entry and existing host_file.id_random.
|
|
log.info('Look up in DB based on hash...')
|
|
if hosted_file_sel_result := sql_select(
|
|
table_name = 'hosted_file',
|
|
field_name = 'hash_sha256',
|
|
field_value = file_info['hash_sha256'],
|
|
):
|
|
hosted_file_id = hosted_file_sel_result.get('id_random', None)
|
|
# hosted_file_obj = Hosted_File_Base(**file_info)
|
|
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
|
|
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(hosted_file_dict)
|
|
else:
|
|
# SOMETHING WENT WRONG
|
|
# Going to try and create a new host_file entry...
|
|
log.warning('For some reason a host_file object entry with the has was not found.')
|
|
# file_info['id_random'] = None
|
|
hosted_file_obj = Hosted_File_Base(**file_info)
|
|
if hosted_file_obj_result := create_hosted_file_obj(hosted_file_obj_new=hosted_file_obj):
|
|
hosted_file_id = hosted_file_obj_result
|
|
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
|
|
else:
|
|
log.warning('For some reason a host_file object entry could not be created.')
|
|
hosted_file_id = None
|
|
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
|
|
log.debug(hosted_file_obj_result)
|
|
log.debug(hosted_file_sel_result)
|
|
else:
|
|
# Just in case look up in DB based on hash
|
|
log.info('Look up in DB based on hash...')
|
|
if hosted_file_sel_result := sql_select(
|
|
table_name = 'hosted_file',
|
|
field_name = 'hash_sha256',
|
|
field_value = file_info['hash_sha256'],
|
|
):
|
|
log.warning('Found an existing host_file object_entry in the DB but the file was not found on the server!')
|
|
# Got existing host_file object_entry!
|
|
# Odd... the hash was found in the database, but the file had to be copied again.
|
|
# If this happens then the file on the host server was probably deleted at some point.
|
|
hosted_file_id = hosted_file_sel_result.get('id_random', None)
|
|
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
|
|
else:
|
|
# This is normal since the file was not found on the host server and not found in the DB.
|
|
# Create a new host_file object entry and new host_file.id_random.
|
|
log.warning('This is sort of normal. The file may have been deleted from the host server...')
|
|
hosted_file_obj = Hosted_File_Base(**file_info)
|
|
if hosted_file_obj_result := create_hosted_file_obj(hosted_file_obj_new=hosted_file_obj):
|
|
hosted_file_id = hosted_file_obj_result
|
|
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id, model_as_dict=True)
|
|
else:
|
|
log.warning('For some reason a host_file object entry could not be created.')
|
|
hosted_file_id = None
|
|
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
|
|
log.debug(hosted_file_obj_result)
|
|
log.debug(hosted_file_sel_result)
|
|
else:
|
|
file_info['id_random'] = None
|
|
hosted_file_obj = Hosted_File_Base(**file_info)
|
|
hosted_file_id = None
|
|
hosted_file_dict = hosted_file_obj.dict(by_alias=True, exclude_unset=True, exclude={'id', 'id_random'}) # pylint: disable=no-member
|
|
|
|
# file_info_obj = Hosted_File_Base(**file_info)
|
|
hosted_file_dict['extension_allowed'] = file_info['extension_allowed']
|
|
hosted_file_dict['already_exists'] = file_info['already_exists']
|
|
hosted_file_dict['saved'] = file_info['saved']
|
|
hosted_file_dict['copy_timer'] = file_info['copy_timer']
|
|
log.debug(hosted_file_dict)
|
|
hosted_file_list.append(hosted_file_dict)
|
|
|
|
# NOTE: Currently sql_insert does not handel all successful inserts correctly. If there is not an autonum ID then it will return 0 as the ID.
|
|
if create_hosted_file_link(
|
|
account_id=account_id,
|
|
hosted_file_id=hosted_file_id,
|
|
for_object_type=for_object_type,
|
|
for_object_id=for_object_id,
|
|
): pass # This if statement should be improved
|
|
else:
|
|
# This if statement should be improved
|
|
log.debug('Because the hosted_file_link table does not have a primary autonum this check is incorrect even when successful.')
|
|
log.debug('Something may have gone wrong while trying to create the hosted_file_link record.')
|
|
log.debug('The hosted_file_link was probably created fine though.')
|
|
|
|
log.debug(hosted_file_list)
|
|
return mk_resp(data=hosted_file_list)
|
|
# ### END ### API Hosted File Route ### upload_files_fake() ###
|
|
|
|
|
|
|
|
|
|
|
|
@router.post('/test_uploads')
|
|
async def test_upload_files(
|
|
file_list: List[UploadFile],
|
|
# account_id: str = Form(..., min_length=1, max_length=22),
|
|
# filename: Optional[str] = Form(...),
|
|
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
for file_obj in file_list:
|
|
file_info = await save_file(
|
|
file = file_obj,
|
|
account_id = account_id,
|
|
account_id_random = account_id_random,
|
|
for_object_type = for_object_type,
|
|
for_object_id = for_object_id,
|
|
for_object_id_random = for_object_id_random,
|
|
check_allowed_extension = check_allowed_extension,
|
|
)
|
|
log.debug(file_info)
|
|
|
|
return mk_resp(data=False, status_code=501) |