from datetime import datetime, time, timedelta from fastapi import APIRouter, Depends, Header, HTTPException, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from ..lib_general import * from ..log import * from app.config import settings from app.db import * from app.redis import * from .user_models import * #import logging router = APIRouter() @router.post( "/", response_model=UserOut, response_model_exclude_unset=True, summary='Create a new user account', status_code=status.HTTP_201_CREATED ) async def create_user(user: UserIn, x_account_id: str = Header(...)): """ Create a new user account """ user = dict(user) table_name = 'user' # Look up the user['account_id_random'] and match to a record ID from Redis if account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id): user['account_id'] = account_id user.pop('account_id_random') else: print('Something went wrong with the id_random lookup.') raise HTTPException(status_code=500) if result := sql_insert(table_name=table_name, record=user, id_random_length=16): print(type(result)) if type(result) == int: # isinstance(result, int): # Select the new record to return as a response. if new_user := dict(sql_select(table_name=table_name, record_id=result)): return new_user else: print('New user record was not found.') raise HTTPException(status_code=400) else: print('There is likely a duplicate record. A new record was not created.') raise HTTPException(status_code=400) else: print('No user record was not created') raise HTTPException(status_code=400) #@router.patch('/{id_random}', response_model=UserOut, dependencies=[Depends(get_account_header)]) #async def update_user(id_random: str, user: UserIn, x_account_id: str = Header(...)): #async def update_user(id_random: str, user: UserIn): @router.patch( '/{id_random}', response_model=UserOut, summary='Update a user account' ) async def update_user(id_random: str, user: UserIn, x_account_id: str = Depends(get_account_header)): """ Update a user account """ user = {} user['id_random'] = id_random user['account_id_random'] = x_account_id user['username'] = 'Scott.Idem' user['name'] = 'Scott Idem' user['email'] = 'Scott.Idem@oneskyit.com' user['created_on'] = datetime.now() user['super'] = True return user @router.delete('/{id_random}', response_model=bool) async def delete_user(id_random: str, x_account_id: str = Depends(get_account_header)): """ Delete a user account """ return True return False @router.get('/', response_model=List[UserOut]) @router.get('/list_all', response_model=List[UserOut]) async def list_users(): """ Get a list of users """ log.setLevel(logging.DEBUG) log.debug(str(locals().keys())+' | '+str(locals().values())) log.debug(locals()) #log.setLevel(logging.INFO) #log.info(None) log.setLevel(logging.WARNING) print('***') log.debug('This is debug') # 10 DEBUG log.info('This is info') # 20 INFO log.warn('This is warn') # 30 WARNING log.warning('This is a warning') # 30 WARNING log.error('This is an error') # 40 ERROR log.exception('This is an exception') # 40 ERROR log.critical('This is critical') # 50 CRITICAL users = [{'username': 'test.user.1'}, {'username': 'test.user.2'}, {'username': 'Scott.Idem'}] print('Getting all users...') sql = """ SELECT * FROM `user` /*WHERE id=1*/ """ records = sql_select(sql=sql, as_list=True) #records = sql_select(table_name='user') if records: print('Got the user list') return records else: print('No user records found') raise HTTPException(status_code=404) @router.get('/{username}') async def get_user_username(username: str, x_account_id: str = Header(...)): return {'username': username} #@router.get('/me') #async def get_user_current(): #user_out: UserOut #return {'username': 'test.user'}