Creating templates different types of objects or resources.

This commit is contained in:
Scott Idem
2020-09-14 16:37:46 -04:00
parent eeedc2ad6f
commit d3b6f46368
6 changed files with 244 additions and 56 deletions

View File

@@ -2,3 +2,4 @@ uvicorn
fastapi[all]
SQLAlchemy
mysqlclient
redis

View File

@@ -1,3 +1,5 @@
import secrets
from app.config import settings
from sqlalchemy import create_engine, text
@@ -19,16 +21,20 @@ db = engine.connect()
# Insert a new record with values given.
def sql_insert(table_name=None, record=None, sql=None, data=None):
def sql_insert(table_name=None, record=None, sql=None, data=None, id_random_length=None):
print('** sql_insert() ***')
if table_name and record:
#record.pop('id')
if id_random_length in [8, 16]:
record['id_random'] = secrets.token_urlsafe(id_random_length)
fields = []
values = []
for key, value in record.items():
if key != 'id': # A special exception for the id auto increment field.
fields.append('`'+str(key)+'`')
values.append(':'+str(key))
#if key != 'id': # A special exception for the id auto increment field.
fields.append('`'+str(key)+'`')
values.append(':'+str(key))
fields_string = ', '.join(fields)
values_string = ', '.join(values)
@@ -157,15 +163,17 @@ def sql_select(sql=None, data=None, table_name=None, record_id=None, record_id_r
else:
if result.rowcount == 1 and as_list:
print('Single as list')
record = result.fetchall()
return record
record = dict(result.fetchone())
return [record]
elif result.rowcount == 1 and not as_list:
print('Single as single')
record = result.fetchone()
#record = result.fetchone()
record = dict(result.fetchone())
return record
elif result.rowcount > 1:
print('List as list')
records = result.fetchall()
#records = result.fetchall()
records = [dict(u) for u in result.fetchall()]
return records
else:
return False

41
app/redis.py Normal file
View File

@@ -0,0 +1,41 @@
from app.db import *
import redis
from datetime import timedelta
# Attempt to look up id_random key
# If success then return the id number
# If not success and there is a table_name then check the database table passed
# If found in database table then store in Redis
def redis_lookup_id_random(record_id_random=None, table_name=None):
print('*** redis_lookup_id_random() ***')
r = redis.Redis(host='localhost', port=6379, db=7, password=None, decode_responses=True)
key_name = 'record_id:'+record_id_random
record_id = r.get(key_name)
#print('Record ID? '+str(record_id))
if record_id:
print('TTL for: '+key_name+' : '+str(record_id)+' is '+str(r.ttl(key_name))+' seconds')
return record_id
elif table_name:
data = { 'id_random': record_id_random }
sql = """
SELECT id
FROM `"""+table_name+"""` AS `table`
WHERE table.id_random = :id_random
"""
if select_results := sql_select(table_name=table_name, record_id_random=record_id_random): # sql_select(sql=sql, data=data)
#print('Record ID random found: '+str(select_results['id']))
record_id = select_results['id']
r.setex(key_name, timedelta(minutes=2), value=record_id)
return record_id
else:
#print('Record ID random was not found')
return False
else:
print('Missing table_name to select from for id_random')
return False
#return False

113
app/routers/crud.py Normal file
View File

@@ -0,0 +1,113 @@
from datetime import datetime, time, timedelta
from fastapi import APIRouter, Depends, Header, HTTPException, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from ..lib_general import *
from app.config import settings
from app.db import *
from .user_models import *
#import logging
router = APIRouter()
@router.post(
"/",
response_model=UserOut,
summary='Create a new user account',
status_code=status.HTTP_201_CREATED
)
async def create_user(user: UserIn, x_account_id: str = Header(...)):
"""
Create a new user account
"""
user = {}
user['account_id_random'] = x_account_id
user['username'] = 'Scott.Idem'
user['name'] = 'Scott Idem'
user['email'] = 'Scott.Idem@oneskyit.com'
return user
#@router.patch('/{id_random}', response_model=UserOut, dependencies=[Depends(get_account_header)])
#async def update_user(id_random: str, user: UserIn, x_account_id: str = Header(...)):
#async def update_user(id_random: str, user: UserIn):
@router.patch(
'/{id_random}',
response_model=UserOut,
summary='Update a user account'
)
async def update_user(id_random: str, user: UserIn, x_account_id: str = Depends(get_account_header)):
"""
Update a user account
"""
user = {}
user['id_random'] = id_random
user['account_id_random'] = x_account_id
user['username'] = 'Scott.Idem'
user['name'] = 'Scott Idem'
user['email'] = 'Scott.Idem@oneskyit.com'
user['created_on'] = datetime.now()
user['super'] = True
return user
@router.delete('/{id_random}', response_model=bool)
async def delete_user(id_random: str, x_account_id: str = Depends(get_account_header)):
"""
Delete a user account
"""
return True
return False
@router.get('{object}/', response_model=List[UserOut])
@router.get('{object}/list_all', response_model=List[UserOut])
async def list_users():
"""
Get a list of users
"""
print(settings.APP_NAME)
users = [{'username': 'test.user.1'}, {'username': 'test.user.2'}, {'username': 'Scott.Idem'}]
print('Getting all users...')
sql = """
SELECT *
FROM `user`
WHERE id=1
"""
records = sql_select(sql=sql, as_list=True)
#records = sql_select(table_name='user')
if records:
print('Got the user list')
return records
else:
print('No user records found')
raise HTTPException(status_code=404)
@router.get('/{username}')
async def get_user_username(username: str, x_account_id: str = Header(...)):
return {'username': username}
#@router.get('/me')
#async def get_user_current():
#user_out: UserOut
#return {'username': 'test.user'}

View File

@@ -0,0 +1,45 @@
from datetime import datetime, time, timedelta
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
class UserBase(BaseModel):
#id_random: str = None # This should not be None. It is required.
account_id_random: str = None # This should not be None. It is required.
username: str = Field(None, example='New.User', min_length=3, max_length=100)
name: Optional[str] = None
email: EmailStr
email_verified: Optional[bool] = None
enable: Optional[bool] = None
enable_from: Optional[datetime] = None
enable_to: Optional[datetime] = None
super: Optional[bool] = None
manager: Optional[bool] = None
administrator: Optional[bool] = None
verified: Optional[bool] = None
notes: Optional[str] = None
class UserIn(UserBase):
#id_random: str = None
password: str = Field(None, example='My Difficult Password!', min_length=10)
class UserOut(UserBase):
id_random: str = None
password_set_on: Optional[datetime] = None
password_reset_token: Optional[str] = None
password_reset_expire_on: Optional[datetime] = None
logged_in_on: Optional[datetime] = None
last_activity_on: Optional[datetime] = None
created_on: datetime
update_on: Optional[datetime] = None
class UserInDB(UserBase):
hashed_password: str
password_set_on: Optional[datetime] = None
password_reset_token: Optional[str] = None
password_reset_expire_on: Optional[datetime] = None
logged_in_on: Optional[datetime] = None
last_activity_on: Optional[datetime] = None

View File

@@ -5,57 +5,19 @@ from typing import Dict, List, Optional, Set, Union
from ..lib_general import *
from app.config import settings
from app.db import *
from app.redis import *
from .user_models import *
#import logging
router = APIRouter()
class UserBase(BaseModel):
id_random: str = None # This should not be None. It is required.
account_id_random: str = None # This should not be None. It is required.
username: str = Field(None, example='New.User', min_length=3, max_length=100)
name: Optional[str] = None
email: EmailStr
email_verified: Optional[bool] = None
enable: Optional[bool] = None
enable_from: Optional[datetime] = None
enable_to: Optional[datetime] = None
super: Optional[bool] = None
manager: Optional[bool] = None
administrator: Optional[bool] = None
verified: Optional[bool] = None
class UserIn(UserBase):
password: str = Field(None, example='My Difficult Password!', min_length=10)
class UserOut(UserBase):
password_set_on: Optional[datetime] = None
password_reset_token: Optional[str] = None
password_reset_expire_on: Optional[datetime] = None
logged_in_on: Optional[datetime] = None
last_activity_on: Optional[datetime] = None
created_on: datetime
update_on: Optional[datetime] = None
class UserInDB(UserBase):
hashed_password: str
password_set_on: Optional[datetime] = None
password_reset_token: Optional[str] = None
password_reset_expire_on: Optional[datetime] = None
logged_in_on: Optional[datetime] = None
last_activity_on: Optional[datetime] = None
@router.post(
"/",
response_model=UserOut,
response_model_exclude_unset=True,
summary='Create a new user account',
status_code=status.HTTP_201_CREATED
)
@@ -63,14 +25,32 @@ async def create_user(user: UserIn, x_account_id: str = Header(...)):
"""
Create a new user account
"""
user = dict(user)
table_name = 'user'
user = {}
user['account_id_random'] = x_account_id
user['username'] = 'Scott.Idem'
user['name'] = 'Scott Idem'
user['email'] = 'Scott.Idem@oneskyit.com'
# Look up the user['account_id_random'] and match to a record ID from Redis
if account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
user['account_id'] = account_id
user.pop('account_id_random')
else:
print('Something went wrong with the id_random lookup.')
raise HTTPException(status_code=500)
return user
if result := sql_insert(table_name=table_name, record=user, id_random_length=16):
print(type(result))
if type(result) == int: # isinstance(result, int):
# Select the new record to return as a response.
if new_user := dict(sql_select(table_name=table_name, record_id=result)):
return new_user
else:
print('New user record was not found.')
raise HTTPException(status_code=400)
else:
print('There is likely a duplicate record. A new record was not created.')
raise HTTPException(status_code=400)
else:
print('No user record was not created')
raise HTTPException(status_code=400)
#@router.patch('/{id_random}', response_model=UserOut, dependencies=[Depends(get_account_header)])
@@ -125,7 +105,7 @@ async def list_users():
sql = """
SELECT *
FROM `user`
WHERE id=1
/*WHERE id=1*/
"""
records = sql_select(sql=sql, as_list=True)