Working on the basic SQL select API CRUD and lots of models

This commit is contained in:
Scott Idem
2021-03-08 15:53:39 -05:00
parent c7f2b16feb
commit 3d5fafc4bf
36 changed files with 2570 additions and 863 deletions

201
app/db.py
View File

@@ -1,201 +0,0 @@
import secrets
from datetime import timedelta
from app.config import settings
from .log import *
from sqlalchemy import create_engine, text
from sqlalchemy.exc import IntegrityError, OperationalError
#from sqlalchemy.ext.declarative import declarative_base
#from sqlalchemy.orm import sessionmaker, session
db_uri = settings.SQLALCHEMY_DATABASE_URI
connection_string = db_uri
engine = create_engine(name_or_url=connection_string, pool_size=10, pool_recycle=120, pool_pre_ping=True, echo=True, echo_pool=True, isolation_level='READ COMMITTED')
# NOTE: The default isolation_level is 'REPEATABLE READ'. This can sometimes not show updated data.
#SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
#Base = declarative_base()
db = engine.connect()
# Dependency
#def get_db():
#db = SessionLocal()
#try:
#yield db
#finally:
#db.close()
# Insert a new record with values given.
def sql_insert(table_name=None, record=None, sql=None, data=None, id_random_length=None):
print('** sql_insert() ***')
if table_name and record:
#record.pop('id')
if id_random_length in [8, 16]:
record['id_random'] = secrets.token_urlsafe(id_random_length)
fields = []
values = []
for key, value in record.items():
#if key != 'id': # A special exception for the id auto increment field.
fields.append('`'+str(key)+'`')
values.append(':'+str(key))
fields_string = ', '.join(fields)
values_string = ', '.join(values)
sql_insert = text(
"""
INSERT INTO `"""+table_name+"""` ("""+fields_string+""") VALUES ("""+values_string+""");
"""
)
elif table_name:
sql_insert = text(
"""
INSERT INTO `"""+table_name+"""` () VALUES ();
"""
)
elif sql:
sql_insert = text(sql)
else:
print('One or more required fields are missing')
return False
trans = db.begin()
try:
if record:
result_insert = db.execute(sql_insert, record)
else:
result_insert = db.execute(sql_insert)
trans.commit()
except OperationalError as e:
trans.rollback()
print('*** An exception happened: OperationalError ***')
print('* This is likely because a field that does not exist. *')
print(repr(e))
print('***')
print(str(e))
print('^^^ exception ^^^')
return False
except IntegrityError as e:
trans.rollback()
print('*** An exception happened: IntegrityError ***')
print('* This is likely because of a duplicate entry for a primary or unique field. *')
print(repr(e))
print('***')
print(str(e))
print('^^^ exception ^^^')
return True # NOTE: This is returning True even though there was an exception
except Exception as e:
trans.rollback()
print('*** An exception happened: catch all ***')
print(repr(e))
print('***')
print(str(e))
print('^^^ exception ^^^')
return False
else:
record_id = result_insert.lastrowid
if record_id == 0:
#print('******')
#print(dir(result_insert))
#print('******')
#print(vars(result_insert))
#print('******')
return True
else:
return record_id
# NOTE: Select records using custom SQL SELECT statements.
def sql_select(sql=None, data=None, table_name=None, record_id=None, record_id_random=None, field_name=None, field_value=None, as_list=False):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
custom_sql = None
if record_id and table_name:
sql = text(
"""
SELECT *
FROM `"""+table_name+"""`
WHERE `"""+table_name+"""`.id = :record_id
"""
)
elif record_id_random and table_name:
sql = text(
"""
SELECT *
FROM `"""+table_name+"""`
WHERE `"""+table_name+"""`.id_random = :record_id_random
"""
)
elif field_name and field_value and table_name:
sql = text(
"""
SELECT *
FROM `"""+table_name+"""`
WHERE `"""+table_name+"""`."""+field_name+""" = :field_value
"""
)
data = {}
data[field_name] = field_value
elif table_name:
sql = text(
"""
SELECT *
FROM `"""+table_name+"""`
"""
)
elif sql:
log.info('SQL found')
custom_sql = True
sql = text(sql)
else:
log.warn('One or more required fields are missing')
return False
try:
if not custom_sql:
log.info('Executing a simple SQL select with no extra data dict...')
result = db.execute(sql, record_id=record_id, record_id_random=record_id_random, table_name=table_name, field_name=field_name, field_value=field_value)
elif custom_sql and data:
log.info('Executing a custom SQL select and including the data dict...')
result = db.execute(sql, data)
elif custom_sql:
log.info('Executing a custom SQL select with no extra data dict...')
result = db.execute(sql)
except Exception as e:
log.info('An exception happened. Returning False.')
log.error(repr(e))
log.error('***')
log.error(str(e))
log.error('^^^ exception ^^^')
return False
else:
if result.rowcount == 1 and as_list:
log.info('Found one record. Returning as a list.')
record = dict(result.fetchone())
return [record]
elif result.rowcount == 1 and not as_list:
log.info('Found one record. Returning as a dict.')
#record = result.fetchone()
record = dict(result.fetchone())
return record
elif result.rowcount > 1:
log.info('Found more than one record. Returning as a list of dicts.')
#records = result.fetchall()
records = [dict(u) for u in result.fetchall()]
return records
elif as_list:
log.info('No records found. Returning as a list.')
return [None]
else:
log.info('No records found. Returning None.')
return None

View File

@@ -1,5 +1,8 @@
from __future__ import annotations
import secrets
from app.config import settings
from .log import *
#from .lib_general import lookup_id_random_pop
from sqlalchemy import create_engine, text
from sqlalchemy.exc import IntegrityError, OperationalError
@@ -19,10 +22,83 @@ db = engine.connect()
# ### BEGIN ### Core Help CRUD ### sql_insert() ###
def sql_insert(sql=None, data=None, table_name=None, rm_id_random=None, id_random_length=None):
log.setLevel(logging.ERROR) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
def sql_insert(sql:str=None, data:dict=None, table_name:str=None, id_random_length:int=8):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if sql:
sql_insert = text(sql)
elif table_name and data:
#if rm_id_random:
#data = lookup_id_random_pop(obj_data=data)
if not data.get('id_random', None) and id_random_length:
data['id_random'] = secrets.token_urlsafe(id_random_length)
log.debug(data)
fields = []
values = []
for key, value in data.items():
if key != 'id': # A special exception for the id auto increment field.
fields.append('`'+str(key)+'`')
values.append(':'+str(key))
fields_string = ', '.join(fields)
values_string = ', '.join(values)
log.debug(fields_string)
log.debug(values_string)
field_list = []
for key, value in data.items():
if key != 'id': # Creating a special exception for the id field.
field_list.append('`'+str(key) + '` = :' + str(key))
set_values_string = ', '.join(field_list)
sql_insert = text(f"""
INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string});
"""
)
print(sql_insert)
log.debug(f"""
INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string});
"""
)
trans = db.begin()
try:
result_insert = db.execute(sql_insert, data)
trans.commit()
except Exception as e:
trans.rollback()
log.exception('*** An exception happened. ***')
log.exception(repr(e))
log.exception('***')
log.exception(str(e))
log.exception('^^^ exception ^^^')
return False
else:
log.debug(result_insert)
log.debug(result_insert.rowcount)
if result_insert.rowcount == 1 and result_insert.lastrowid > 0: # insert
log.info('Insert record')
log.debug(result_insert.lastrowid)
record_id = result_insert.lastrowid
return record_id
#elif result_insert.rowcount == 1 and result_insert.lastrowid == 0: # update with no change
#log.info('Update record with no change')
#return True
#elif result_insert.rowcount == 2 and result_insert.lastrowid > 0: # update with change
#log.info('Update record with changes')
#record_id = result_insert.lastrowid
#return record_id
else:
log.debug(result_insert)
log.debug(vars(result_insert))
log.debug(dir(result_insert))
log.debug(result_insert.rowcount) # returns 1 on insert and 2 on update with change
log.debug(result_insert.lastrowid) # returns last row ID on insert and update with a change and returns 0 if nothing changed
return False
return False
# ### END ### Core Help CRUD ### sql_insert() ###
@@ -71,14 +147,12 @@ def sql_insert_or_update(sql:str=None, data:dict=None, table_name:str=None, rm_i
field_list.append('`'+str(key) + '` = :' + str(key))
set_values_string = ', '.join(field_list)
sql_insert_or_update = text(
f"""
sql_insert_or_update = text(f"""
INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string})
ON DUPLICATE KEY UPDATE
{set_values_string}
;
"""
)
""")
log.debug(f"""
INSERT INTO `{table_name}` ({fields_string}) VALUES ({values_string})
@@ -497,3 +571,4 @@ def sql_result_proxy_to_dict_simple(result_proxy=None):
record[key] = value
return record
return False

View File

@@ -1,3 +1,4 @@
from __future__ import annotations
import datetime, redis
#from datetime import datetime, time, timedelta
@@ -6,7 +7,7 @@ from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from .log import *
from .db_sql import *
from .db_sql import sql_select
async def get_token_header(x_token: str = Header(...)):
@@ -15,8 +16,9 @@ async def get_token_header(x_token: str = Header(...)):
async def get_account_header(x_account_id: str = Header(...)):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.setLevel(logging.WARNING) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
print('get_account_header(): '+x_account_id)
if len(x_account_id):
@@ -117,3 +119,120 @@ def redis_lookup_id_random(record_id_random=None, table_name=None):
log.setLevel(logging.ERROR) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.error('We should not be here. Something unexpected happened.')
return False # Just in case
# Look up and resolve id_random values to their id
# Remove the unneeded *_id_random key from the dict
def lookup_id_random_pop(obj_data=None):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if 'account_id_random' in obj_data:
obj_data['account_id'] = redis_lookup_id_random(record_id_random=obj_data['account_id_random'], table_name='account')
obj_data.pop('account_id_random')
if 'address_id_random' in obj_data:
obj_data['address_id'] = redis_lookup_id_random(record_id_random=obj_data['address_id_random'], table_name='address')
obj_data.pop('address_id_random')
if 'address_location_id_random' in obj_data:
obj_data['address_location_id'] = redis_lookup_id_random(record_id_random=obj_data['address_location_id_random'], table_name='address')
obj_data.pop('address_location_id_random')
if 'archive_id_random' in obj_data:
obj_data['archive_id'] = redis_lookup_id_random(record_id_random=obj_data['archive_id_random'], table_name='archive')
obj_data.pop('archive_id_random')
if 'contact_id_random' in obj_data:
obj_data['contact_id'] = redis_lookup_id_random(record_id_random=obj_data['contact_id_random'], table_name='contact')
obj_data.pop('contact_id_random')
if 'event_id_random' in obj_data:
obj_data['event_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_id_random', None), table_name='event')
obj_data.pop('event_id_random')
if 'event_exhibit_id_random' in obj_data:
obj_data['event_exhibit_id'] = redis_lookup_id_random(record_id_random=obj_data.get('event_exhibit_id_random', None), table_name='event_exhibit')
obj_data.pop('event_exhibit_id_random')
if 'hosted_file_id_random' in obj_data:
obj_data['hosted_file_id'] = redis_lookup_id_random(record_id_random=obj_data.get('hosted_file_id_random', None), table_name='hosted_file')
obj_data.pop('hosted_file_id_random')
if 'order_id_random' in obj_data:
obj_data['order_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_id_random', None), table_name='order')
obj_data.pop('order_id_random')
if 'order_line_id_random' in obj_data:
obj_data['order_line_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_line_id_random', None), table_name='order_line')
obj_data.pop('order_line_id_random')
if 'order_cart_id_random' in obj_data:
obj_data['order_cart_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_cart_id_random', None), table_name='order_cart')
obj_data.pop('order_cart_id_random')
if 'order_cart_line_id_random' in obj_data:
obj_data['order_cart_line_id'] = redis_lookup_id_random(record_id_random=obj_data.get('order_cart_line_id_random', None), table_name='order_cart_line')
obj_data.pop('order_cart_line_id_random')
if 'organization_id_random' in obj_data:
obj_data['organization_id'] = redis_lookup_id_random(record_id_random=obj_data.get('organization_id_random', None), table_name='organization')
obj_data.pop('organization_id_random')
if 'person_id_random' in obj_data:
obj_data['person_id'] = redis_lookup_id_random(record_id_random=obj_data['person_id_random'], table_name='person')
obj_data.pop('person_id_random')
if 'post_id_random' in obj_data:
obj_data['post_id'] = redis_lookup_id_random(record_id_random=obj_data.get('post_id_random', None), table_name='post')
obj_data.pop('post_id_random')
if 'product_id_random' in obj_data:
obj_data['product_id'] = redis_lookup_id_random(record_id_random=obj_data['product_id_random'], table_name='product')
obj_data.pop('product_id_random')
if 'user_id_random' in obj_data:
obj_data['user_id'] = redis_lookup_id_random(record_id_random=obj_data['user_id_random'], table_name='user')
obj_data.pop('user_id_random')
if 'for_type' in obj_data and 'for_id_random' in obj_data:
obj_data['for_id'] = redis_lookup_id_random(record_id_random=obj_data.get('for_id_random', None), table_name=obj_data.get('for_type', None))
obj_data.pop('for_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'for_id_random' in obj_data:
# In case for_id_random was passed without for_type
log.warn('for_id_random was passed without for_type')
obj_data.pop('for_id_random')
if 'object_type' in obj_data and 'object_id_random' in obj_data:
obj_data['object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('object_id_random', None), table_name=obj_data.get('object_type', None))
obj_data.pop('object_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'object_id_random' in obj_data:
# In case object_id_random was passed without object_type
log.warn('object_id_random was passed without object_type')
obj_data.pop('object_id_random')
if 'to_object_type' in obj_data and 'to_object_id_random' in obj_data:
obj_data['to_object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('to_object_id_random', None), table_name=obj_data.get('to_object_type', None))
obj_data.pop('to_object_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'to_object_id_random' in obj_data:
# In case to_object_id_random was passed without to_object_type
log.warn('to_object_id_random was passed without to_object_type')
obj_data.pop('to_object_id_random')
if 'from_object_type' in obj_data and 'from_object_id_random' in obj_data:
obj_data['from_object_id'] = redis_lookup_id_random(record_id_random=obj_data.get('from_object_id_random', None), table_name=obj_data.get('from_object_type', None))
obj_data.pop('from_object_id_random')
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data)
elif 'from_object_id_random' in obj_data:
# In case from_object_id_random was passed without from_object_type
log.warn('from_object_id_random was passed without from_object_type')
obj_data.pop('from_object_id_random')
return obj_data

View File

@@ -18,15 +18,12 @@ from .lib_general import *
from .log import *
# Import the routers here first:
from .routers import api_crud, items, journals, users, websockets
from .routers import api_crud, address, websockets # , items, journals, users
# TEST TEST TEST
print('**** Calling db_sql.py ... ****')
#from .db_sql import engine, SessionLocal, Base
from .db_sql import db
print('**** Called db_sql.py ****')
# TEST TEST TEST
print('### **** *** ** * The Aether FastAPI API app is starting... * ** *** **** ###')
#log = logging.getLogger('root')
@@ -53,37 +50,23 @@ app.include_router(
prefix='/crud',
tags=['CRUD'],
#dependencies=[Depends(get_token_header)],
#responses={404: {'description': 'Not found'}},
)
app.include_router(
items.router,
prefix='/item',
tags=['Items'],
#dependencies=[Depends(get_token_header)],
#responses={404: {'description': 'Not found'}},
)
app.include_router(
journals.router,
prefix='/journal',
tags=['Journals'],
#dependencies=[Depends(get_token_header)],
#dependencies=[Depends(get_account_header)],
#responses={404: {'description': 'Not found'}},
)
app.include_router(
users.router,
prefix='/user',
tags=['Users'],
address.router,
prefix='/address',
tags=['Address'],
#dependencies=[Depends(get_token_header)],
#dependencies=[Depends(get_account_header)],
#responses={404: {'description': 'Not found'}},
)
app.include_router(
websockets.router,
#prefix='/websocket',
tags=['Websockets'],
#dependencies=[Depends(get_token_header)],
#dependencies=[Depends(get_account_header)],
#responses={404: {'description': 'Not found'}},
)

View File

@@ -0,0 +1,116 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
class Account_Cfg_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
id_random: Optional[str] = Field(
**base_fields['account_cfg_id_random'],
alias='account_cfg_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='account_cfg_id'
)
account_id_random: Optional[str]
account_id: Optional[int]
show_user_availability: Optional[bool]
show_person_create: Optional[bool]
person_create_label: Optional[str]
show_person_view: Optional[bool]
person_view_label: Optional[str]
show_person_load: Optional[bool]
person_load_label: Optional[str]
show_cart: Optional[bool]
cart_label: Optional[str]
default_no_reply_email: Optional[str]
default_no_reply_name: Optional[str]
default_reply_to_email: Optional[str]
default_reply_to_name: Optional[str]
confirm_email: Optional[str]
confirm_name: Optional[str]
help_event_email: Optional[str]
help_event_name: Optional[str]
help_general_email: Optional[str]
help_general_name: Optional[str]
help_member_email: Optional[str]
help_member_name: Optional[str]
help_tech_email: Optional[str]
help_tech_name: Optional[str]
order_header: Optional[str]
order_thanks: Optional[str]
order_message: Optional[str]
order_footer: Optional[str]
order_fundraising_thanks: Optional[str]
order_fundraising_message: Optional[str]
fundraising_message: Optional[str]
post_rules: Optional[str]
post_comment_rules: Optional[str]
show_post_title: Optional[bool]
show_post_comment_title: Optional[bool]
hide_posts_after: Optional[int] # Should posts be singular?
delete_posts_after: Optional[int] # Should posts be singular?
stripe_api_key: Optional[str]
stripe_publishable_key: Optional[str]
stripe_account_id: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('account_cfg_id_random', always=True)
def account_cfg_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def account_cfg_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
log.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='account_cfg')
return None
@validator('account_id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['account_id_random']:
return redis_lookup_id_random(record_id_random=values['account_id_random'], table_name='account')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
Account_Cfg_Base.update_forward_refs()

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
class Account_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
id_random: Optional[str] = Field(
**base_fields['account_id_random'],
alias='account_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='account_id'
)
code: Optional[str]
name: Optional[str]
description: Optional[str]
enable: Optional[bool]
enable_from: Optional[datetime.datetime] = None
enable_to: Optional[datetime.datetime] = None
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('account_id_random', always=True)
def account_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
log.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='account')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
Account_Base.update_forward_refs()

View File

@@ -6,6 +6,7 @@ from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationEr
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
#from .account_model import Account_Base

View File

@@ -0,0 +1 @@

View File

@@ -23,6 +23,7 @@ base_fields = {}
#base_fields['id_random'] = xxx_id_random_field_schema_default
base_fields['obj_id_random'] = xxx_id_random_field_schema # General or generic object_id_random
base_fields['account_id_random'] = xxx_id_random_field_schema
base_fields['account_cfg_id_random'] = xxx_id_random_field_schema
base_fields['address_id_random'] = xxx_id_random_field_schema
base_fields['archive_id_random'] = xxx_id_random_field_schema
base_fields['contact_id_random'] = xxx_id_random_field_schema

View File

@@ -6,6 +6,7 @@ from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationEr
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
#from .account_model import Account_Base
from .address_model import Address_Base

View File

@@ -2,9 +2,12 @@ from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from .common_field_schema import base_fields
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
class Core_Object_Base(BaseModel):

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
class Hosted_File_Base(BaseModel):
app.logger.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
id_random: Optional[str] = Field(
**base_fields['hosted_file_id_random'],
alias='hosted_file_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='hosted_file_id'
)
#account_id_random: Optional[str]
#account_id: Optional[int]
hash_sha256: Optional[str]
title: Optional[str]
description: Optional[str]
version: Optional[int]
directory_path: Optional[str]
filename: Optional[str]
extension: Optional[str]
content_type: Optional[str]
mimetype: Optional[str]
size: Optional[int] # In bytes
cloud_storage: Optional[str]
owner_user_id: Optional[int]
group_user_id: Optional[str]
package_name: Optional[str]
#metadata: Optional[str]
#hide: Optional[int]
#priority: Optional[int]
#sort: Optional[int]
#group: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('hosted_file_id_random', always=True)
def hosted_file_id_random_copy(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def hosted_file_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
app.logger.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='hosted_file')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
Hosted_File_Base.update_forward_refs()

View File

@@ -0,0 +1,470 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
from .address_model import Address_Base
from .contact_model import Contact_Base
from .organization_model import Organization_Base
from .person_model import Person_Base
from .user_model import User_Base
class Membership_Cfg_Base(BaseModel):
app.logger.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
account_name: Optional[str]
cycle_type: Optional[str]
membership_length: Optional[int] = Field(0, ge=0, lt=150)
prorate: Optional[bool] = False
calendar_year_start_buffer_days: Optional[int] = Field(0, ge=0, lt=150)
calendar_year_start_buffer_on: Optional[datetime.datetime] = None
calendar_year_start_on: Optional[datetime.datetime] = None
calendar_year_end_on: Optional[datetime.datetime] = None
calendar_year_end_buffer_days: Optional[int] = Field(0, ge=0, lt=150)
calendar_year_end_buffer_on: Optional[datetime.datetime] = None
enable_privacy_view: Optional[bool] = False
accept_message: Optional[str]
reject_message: Optional[str]
renew_message: Optional[str]
#cust_membership_profile: Optional[str] # list of dicts outlining custom membership profile fields for client
cust_membership_profile: Optional[Json] = '[]' # list of dicts outlining custom membership profile fields for client
default_no_reply_email: Optional[str]
default_no_reply_name: Optional[str]
confirm_email: Optional[str]
confirm_name: Optional[str]
Membership_Cfg_Base.update_forward_refs()
class Membership_Profile_Base(BaseModel):
app.logger.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
# if TYPE_CHECKING:
# from .supporting_core_models import Address_Base, Contact_Base, Organization_Base, User_Base
# from .person_model import Person_Base
# from .address_model import Address_Base
# from .contact_model import Contact_Base
# from .organization_model import Organization_Base
# from .person_model import Person_Base
# from .user_model import User_Base
id_random: Optional[str] = Field(
**base_fields['membership_profile_id_random'],
alias='membership_profile_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='membership_profile_id'
)
membership_id_random: Optional[str]
membership_id: Optional[int]
person_id_random: Optional[str]
person_id: Optional[int]
user_id_random: Optional[str]
user_id: Optional[int]
organization_id_random: Optional[str]
organization_id: Optional[int]
contact_id_random: Optional[str]
contact_id: Optional[int]
address_id_random: Optional[str]
address_id: Optional[int]
display_name: Optional[str]
email: Optional[str]
email_allowed: Optional[bool]
email_newsletter: Optional[bool]
mail_newsletter: Optional[bool]
show_online: Optional[bool]
show_printed: Optional[bool]
biography: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
address: 'Optional[Address_Base]' = Address_Base()
contact: 'Optional[Contact_Base]' = Contact_Base()
organization: 'Optional[Organization_Base]' = Organization_Base()
person: 'Optional[Person_Base]' = Person_Base()
user: 'Optional[User_Base]' = User_Base()
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('membership_profile_id_random', always=True)
def membership_profile_id_random_copy(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def membership_profile_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
app.logger.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='membership_profile')
return None
@validator('membership_id', always=True)
def membership_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['membership_id_random']:
return redis_lookup_id_random(record_id_random=values['membership_id_random'], table_name='membership')
return None
@validator('person_id', always=True)
def person_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['person_id_random']:
return redis_lookup_id_random(record_id_random=values['person_id_random'], table_name='person')
return None
@validator('user_id', always=True)
def user_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['user_id_random']:
return redis_lookup_id_random(record_id_random=values['user_id_random'], table_name='user')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
Membership_Profile_Base.update_forward_refs()
class Membership_Base(BaseModel):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
#from .supporting_core_models import User_Base
# from .account_model import Account_Base
# from .person_model import Person_Base
# from .user_model import User_Base
id_random: Optional[str] = Field(
**base_fields['membership_id_random'],
alias='membership_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='membership_id'
)
account_id_random: Optional[str]
account_id: Optional[int] # NOTE: This is not really optional
person_id_random: Optional[str]
person_id: Optional[int]
user_id_random: Optional[str]
user_id: Optional[int]
level_number: Optional[int] = Field(0, ge=0, lt=150)
level_name: Optional[str]
product_id: Optional[int]
type_id: Optional[int]
type_name: Optional[str]
category_id: Optional[int]
category_name: Optional[str]
division_id: Optional[int]
division_name: Optional[str]
status_id: Optional[int]
status_name: Optional[str]
application_start_on: Optional[datetime.datetime] = None
approved_on: Optional[datetime.datetime] = None
first_start_on: Optional[datetime.datetime] = None
start_buffer_on: Optional[datetime.datetime] = None
start_on: Optional[datetime.datetime] = None
end_on: Optional[datetime.datetime] = None
end_buffer_on: Optional[datetime.datetime] = None
flag: Optional[bool]
flag_message: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
#account: Optional[Account_Base] = Account_Base()
profile: 'Optional[Membership_Profile_Base]' = Membership_Profile_Base()
person: 'Optional[Person_Base]' = Person_Base()
user: 'Optional[User_Base]' = User_Base()
cfg: 'Optional[Membership_Cfg_Base]' = Membership_Cfg_Base()
cust_profile: Optional[dict] = {}
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('order_id_random', always=True)
def order_id_random_copy(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def membership_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='membership')
return None
@validator('account_id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['account_id_random']:
return redis_lookup_id_random(record_id_random=values['account_id_random'], table_name='account')
return None
@validator('person_id', always=True)
def person_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['person_id_random']:
return redis_lookup_id_random(record_id_random=values['person_id_random'], table_name='person')
return None
@validator('user_id', always=True)
def user_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['user_id_random']:
return redis_lookup_id_random(record_id_random=values['user_id_random'], table_name='user')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
# Membership_Base.update_forward_refs()
# ### BEGIN ### API Membership Model ### save_membership_obj() ###
def save_membership_obj(order_obj_new:Membership_Base=None):
app.logger.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
if not order_obj_new:
return False
app.logger.debug(order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True))
order_line_obj_li_curr = [] # Initialize to store order_line list
if order_obj_new.id_random:
app.logger.info(f'An order.id {order_obj_new.id} or order.id_random {order_obj_new.id_random} was included. We can update an existing order.')
app.logger.info(f'Get the current order_line list to compare with what was sent...')
data = {}
data['order_id_random'] = order_obj_new.id_random
if order_line_rec_li_curr := sql_select(table_name='v_order_line', data=data, rm_id_random=True, as_list=True):
#app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(order_line_rec_li_curr)
for order_line_rec in order_line_rec_li_curr:
try:
order_line_obj = Order_Line_Base(**order_line_rec)
app.logger.debug(order_line_obj)
except ValidationError as e:
app.logger.error(e.json())
order_line_obj_li_curr.append(order_line_obj)
else:
app.logger.info(f'No order_line records were found')
elif order_obj_new.account_id_random and (order_obj_new.person_id_random or order_obj_new.user_id_random):
app.logger.info(f'An account.id_random {order_obj_new.account_id_random} was passed. And either a person.id_random {order_obj_new.person_id_random} or user.id_random {order_obj_new.user_id_random} was passed. We can create a new order.')
# Because there was not an order ID, assume there are no order lines yet. So no look up.
else:
app.logger.info('Either an order ID is required to update an order or an account ID along with a person ID or user ID is required to create an order.')
return False
#app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(order_line_obj_li_curr)
if repl_order_line_list: # This will remove any order line list items not sent with the new order information.
app.logger.info('Removing any order line list items not sent with the new order information...')
for index, order_line_obj_curr in enumerate(order_line_obj_li_curr):
app.logger.info(f'Current: order line ID={order_line_obj_curr.id_random} and product ID= {order_line_obj_curr.product_id_random}')
matched_product_id = False
for order_line_obj_new in order_obj_new.order_line_list:
app.logger.debug(f'Checking new: product ID={order_line_obj_new.product_id_random}')
if order_line_obj_curr.product_id_random == order_line_obj_new.product_id_random:
matched_product_id = True
app.logger.debug(f'Matched: product ID={order_line_obj_new.product_id_random}')
break
else:
app.logger.debug(f'No match: product ID={order_line_obj_new.product_id_random}')
if not matched_product_id: # Was not found in the new order line list sent
app.logger.info(f'Current order line product ID did not match any of the new list. DELETE order line ID {order_line_obj_curr.id_random} with product ID {order_line_obj_curr.product_id_random}')
if order_line_del_result := sql_delete(table_name='order_line', record_id_random=order_line_obj_curr.id_random):
app.logger.info(f'Deleted record and now pop the current list item {index}...')
order_line_obj_li_curr.pop(index)
else:
app.logger.info(f'Current order line product ID matched. Keeping order line ID {order_line_obj_curr.id_random} with product ID {order_line_obj_curr.product_id_random}')
# NOTE: That this current order line item will be updated below.
app.logger.debug(order_line_obj_li_curr)
#app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.info('Loop through the line list that was sent and compare with what was pulled from the DB')
# Loop through the new line list that was sent and compare with the current line list that was pulled from the DB
# Only insert if a product ID does not match
# Only update if a product ID does match
for order_line_obj_new in order_obj_new.order_line_list:
app.logger.info(f'New: order line ID={order_line_obj_new.id_random} and product ID= {order_line_obj_new.product_id_random}')
matched_product_id = False
for index, order_line_obj_curr in enumerate(order_line_obj_li_curr):
app.logger.debug(f'Checking current: product ID={order_line_obj_curr.product_id_random}')
if order_line_obj_new.product_id_random == order_line_obj_curr.product_id_random:
matched_product_id = True
app.logger.debug(f'Matched: product ID={order_line_obj_curr.product_id_random}')
app.logger.info(f'Updating the current line item with the new line item.')
order_line_obj_new.id_random = order_line_obj_curr.id_random
order_line_obj_new.id = order_line_obj_curr.id
order_line_obj_li_curr[index] = order_line_obj_new
break
else:
app.logger.debug(f'No match: product ID={order_line_obj_curr.product_id_random}')
if not matched_product_id: # Was not found in the current order line list that was pulled from the DB
app.logger.info(f'New order line product ID did not match any of the current list. Append order line ID {order_line_obj_new.id_random} with product ID {order_line_obj_new.product_id_random}')
app.logger.info('Append to current list...')
order_line_obj_li_curr.append(order_line_obj_new) # These will be inserted/updated below
# Save merged current and new list to the new order object
order_obj_new.order_line_list = order_line_obj_li_curr
app.logger.debug(order_obj_new)
# Final loop through to get the new order totals
# Calculate totals
order_total_amount:int = 0
order_total_quantity:int = 0
for order_line_obj_new in order_obj_new.order_line_list:
order_total_amount += order_line_obj_new.quantity * order_line_obj_new.amount
order_total_quantity += order_line_obj_new.quantity
order_obj_new.total_bill = order_total_amount # "amount" is used by order_cart and Stripe
order_obj_new.total_quantity = order_total_quantity
order_obj_new.balance = order_total_amount - order_obj_new.total_paid
app.logger.debug(order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_list', 'cfg', 'created_on', 'updated_on'}))
order_obj_data = order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_list', 'cfg', 'created_on', 'updated_on'})
# SQL INSERT or UPDATE the order record
app.logger.info('SQL INSERT or UPDATE the order record')
if order_obj_resp := sql_insert_or_update(data=order_obj_data, table_name='order', rm_id_random=True, id_random_length=8): pass
else: return False
#app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(order_obj_resp)
if isinstance(order_obj_resp, bool) and order_obj_resp:
if order_id := order_obj_new.id: pass
elif order_id := order_obj_new.id_random: pass
elif isinstance(order_obj_resp, int):
order_id = order_obj_resp
else:
return False
app.logger.debug(f'Order ID={order_id}')
# Loop through the order_line list to SQL INSERT or UPDATE the records
app.logger.info('Loop through the order_line list to SQL INSERT or UPDATE the records')
for order_line_obj_new in order_obj_new.order_line_list:
app.logger.info(f"New order_line: order_line_id_random={order_line_obj_new.id_random}; product_id_random={order_line_obj_new.product_id_random}")
app.logger.debug(order_line_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=False, exclude={'order_line_id_random', 'product_type_id', 'product_type', 'created_on', 'updated_on'}))
order_line_obj_data = order_line_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_id_random', 'product_type_id', 'product_type', 'created_on', 'updated_on'})
order_line_obj_data['order_id'] = order_id
if order_line_obj_resp := sql_insert_or_update(sql=None, data=order_line_obj_data, table_name='order_line', rm_id_random=True, id_random_length=8): pass
else: return False
app.logger.debug(order_line_obj_resp)
return order_id
# ### END ### API Membership Model ### save_membership_obj() ###
# ### BEGIN ### API Membership Model ### get_membership_obj() ###
def get_membership_obj(membership_id=None, inc_membership_profile=None, inc_membership_cfg=None, inc_cust_profile=None):
app.logger.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
if membership_id := redis_lookup_id_random(record_id_random=membership_id, table_name='membership'): pass
else:
return False
if membership_rec := sql_select(table_name='v_membership', record_id=membership_id):
#app.logger.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(membership_rec)
if inc_membership_profile:
if membership_profile_rec := sql_select(table_name='v_membership_profile', field_name='membership_id', field_value=membership_id):
#app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(membership_profile_rec)
membership_rec['profile'] = membership_profile_rec
if inc_membership_cfg:
if membership_cfg_rec := sql_select(table_name='v_membership_cfg', field_name='account_id', field_value=membership_rec.get('account_id', None)):
#app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(membership_cfg_rec)
membership_rec['cfg'] = membership_cfg_rec
if inc_cust_profile:
account_code = membership_rec.get('account_code', None)
table_name = f'c_{account_code}_membership_profile'
if cust_profile_rec := sql_select(table_name=table_name, field_name='membership_id', field_value=membership_id):
#app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(cust_profile_rec)
membership_rec['cust_profile'] = cust_profile_rec
#app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(membership_rec)
else:
return False
try:
membership_obj = Membership_Base(**membership_rec)
app.logger.debug(membership_obj)
except ValidationError as e:
app.logger.error(e.json())
return membership_obj
# ### END ### API Membership Model ### get_membership_obj() ###

View File

@@ -0,0 +1,330 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
class Order_Cart_Cfg_Base(BaseModel):
app.logger.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
account_name: Optional[str]
show_cart: Optional[bool]
cart_label: Optional[str]
class Order_Cart_Line_Base(BaseModel):
app.logger.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
id_random: Optional[str] = Field(
**base_fields['order_cart_line_id_random'],
alias='order_cart_line_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='order_cart_line_id'
)
#order_cart_line_id_random: Optional[str]
order_cart_id_random: Optional[str]
order_cart_id: Optional[int]
product_id_random: str
product_id: Optional[int]
product_type_id: Optional[int]
product_type: Optional[str]
product_name: Optional[str]
product_description: Optional[str]
product_unit_price: Optional[int] = Field(0, ge=0, lt=1500000)
product_max_quantity: Optional[int] = Field(0, ge=0, lt=150)
quantity: int = Field(0, ge=0, lt=150)
amount: int = Field(0, ge=0, lt=1500000)
recurring: Optional[bool] = False
message: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('order_cart_line_id_random', always=True)
def order_cart_line_id_random_copy(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def order_cart_line_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
app.logger.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='order_cart_line')
return None
@validator('order_cart_id', always=True)
def order_cart_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['order_cart_id_random']:
return redis_lookup_id_random(record_id_random=values['order_cart_id_random'], table_name='order_cart')
return None
@validator('product_id', always=True)
def product_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['product_id_random']:
return redis_lookup_id_random(record_id_random=values['product_id_random'], table_name='product')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
class Order_Cart_Base(BaseModel):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
id_random: Optional[str] = Field(
**base_fields['order_cart_id_random'],
alias='order_cart_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='order_cart_id'
)
#order_cart_id_random: Optional[str]
#order_cart_id: Optional[int]
account_id_random: Optional[str]
account_id: Optional[int] # NOTE: This is not really optional
person_id_random: Optional[str]
person_id: Optional[int]
user_id_random: Optional[str]
user_id: Optional[int]
order_id_random: Optional[str]
order_id: Optional[int]
total_quantity: Optional[int] = Field(0, ge=0, lt=150)
total_amount: Optional[int] = Field(0, ge=0, lt=1500000)
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
order_cart_line_list: List[Order_Cart_Line_Base] = []
cfg: Optional[Order_Cart_Cfg_Base] = Order_Cart_Cfg_Base()
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('order_cart_id_random', always=True)
def order_cart_id_random_copy(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def order_cart_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='order_cart')
return None
@validator('account_id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['account_id_random']:
return redis_lookup_id_random(record_id_random=values['account_id_random'], table_name='account')
return None
@validator('person_id', always=True)
def person_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['person_id_random']:
return redis_lookup_id_random(record_id_random=values['person_id_random'], table_name='person')
return None
@validator('user_id', always=True)
def user_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['user_id_random']:
return redis_lookup_id_random(record_id_random=values['user_id_random'], table_name='user')
return None
@validator('order_id', always=True)
def order_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if 'order_id_random' in values and values['order_id_random']:
return redis_lookup_id_random(record_id_random=values['order_id_random'], table_name='order')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
# ### BEGIN ### API Order Cart Model ### save_order_cart_obj() ###
def save_order_cart_obj(order_cart_obj_new=None):
app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
if not order_cart_obj_new:
return False
#app.logger.debug(order_cart_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True))
# Get the current order_cart_line list to compare with what was sent
data = {}
data['order_cart_id_random'] = order_cart_obj_new.id_random
if order_cart_line_rec_li_curr := sql_select(table_name='v_order_cart_line', data=data, rm_id_random=True, as_list=True):
#app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
#app.logger.debug(order_cart_line_rec_li_curr)
order_cart_line_obj_li_curr = []
for order_cart_line_rec in order_cart_line_rec_li_curr:
try:
order_cart_line_obj = Order_Cart_Line_Base(**order_cart_line_rec)
app.logger.debug(order_cart_line_obj)
except ValidationError as e:
app.logger.error(e.json())
order_cart_line_obj_li_curr.append(order_cart_line_obj)
#app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(order_cart_line_obj_li_curr)
else:
#app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(order_cart_line_rec_li_curr)
order_cart_line_obj_li_curr = []
# Loop through the line list that was sent and compare with what was pulled from the DB
# Only insert if a product ID does not match
# Only update if a product ID does match
for order_cart_line_obj_new in order_cart_obj_new.order_cart_line_list:
app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if not any(order_cart_line_obj_curr.product_id_random == order_cart_line_obj_new.product_id_random for order_cart_line_obj_curr in order_cart_line_obj_li_curr):
# Need to append to current list
app.logger.info('Need to append to current list')
order_cart_line_obj_li_curr.append(order_cart_line_obj_new)
else:
# Need to update a current list item ... loop through to find
app.logger.info('Need to update a current list item ... loop through to find')
for index, order_cart_line_obj_curr in enumerate(order_cart_line_obj_li_curr):
app.logger.info(index)
if order_cart_line_obj_new.product_id_random == order_cart_line_obj_curr.product_id_random:
app.logger.info(f'Match: {order_cart_line_obj_curr.product_id_random}')
order_cart_line_obj_new.id_random = order_cart_line_obj_curr.id_random
order_cart_line_obj_li_curr[index] = order_cart_line_obj_new
else:
app.logger.info(f'Not a match: {order_cart_line_obj_curr.product_id_random}')
# Save merged current and new list to the new order cart object
order_cart_obj_new.order_cart_line_list = order_cart_line_obj_li_curr
app.logger.debug(order_cart_obj_new)
# Final loop through to get the new order totals
# Calculate totals
order_cart_total_amount = 0
order_cart_total_quantity = 0
for order_cart_line_obj_curr in order_cart_line_obj_li_curr:
order_cart_total_amount += order_cart_line_obj_curr.quantity * order_cart_line_obj_curr.amount
order_cart_total_quantity += order_cart_line_obj_curr.quantity
order_cart_obj_new.total_amount = order_cart_total_amount
order_cart_obj_new.total_quantity = order_cart_total_quantity
app.logger.debug(order_cart_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_cart_id_random', 'order_cart_line_list', 'cfg', 'created_on', 'updated_on'}))
order_cart_obj_data = order_cart_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_cart_id_random', 'order_cart_line_list', 'cfg', 'created_on', 'updated_on'})
# SQL INSERT or UPDATE the order_cart record
app.logger.info('SQL INSERT or UPDATE the order_cart record')
if order_cart_obj_resp := sql_insert_or_update(sql=None, data=order_cart_obj_data, table_name='order_cart', rm_id_random=True, id_random_length=8): pass
else: return False
app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(order_cart_obj_resp)
if isinstance(order_cart_obj_resp, bool) and order_cart_obj_resp:
if order_cart_id := order_cart_obj_new.id: pass
elif order_cart_id := order_cart_obj_new.id_random: pass
elif isinstance(order_cart_obj_resp, int):
order_cart_id = order_cart_obj_resp
else:
return False
# Loop through the order_cart_line list to SQL INSERT or UPDATE the records
app.logger.info('Loop through the order_cart_line list to SQL INSERT or UPDATE the records')
for order_cart_line_obj in order_cart_obj_new.order_cart_line_list:
app.logger.debug(f"--- {order_cart_line_obj}")
app.logger.debug(order_cart_line_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=False, exclude={}))
order_cart_line_obj_data = order_cart_line_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'product_type_id', 'product_type', 'product_name', 'product_description', 'product_unit_price', 'product_max_quantity', 'order_cart_line_id_random', 'created_on', 'updated_on'})
order_cart_line_obj_data['order_cart_id'] = order_cart_id
if order_cart_line_obj_resp := sql_insert_or_update(sql=None, data=order_cart_line_obj_data, table_name='order_cart_line', rm_id_random=True, id_random_length=8): pass
else: return False
app.logger.debug(order_cart_line_obj_resp)
return order_cart_id
# ### END ### API Order Cart Model ### save_order_cart_obj() ###
# ### BEGIN ### API Order Cart Model ### get_order_cart_obj() ###
def get_order_cart_obj(order_cart_id=None, inc_order_cart_line_li=None, inc_order_cart_cfg=None):
app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
if order_cart_id := redis_lookup_id_random(record_id_random=order_cart_id, table_name='order_cart'): pass
else:
return False
if order_cart_rec := sql_select(table_name='v_order_cart', record_id=order_cart_id):
#app.logger.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(order_cart_rec)
if inc_order_cart_line_li:
order_cart_line_data = {}
order_cart_line_data['order_cart_id'] = order_cart_id
if order_cart_line_rec_li := sql_select(table_name='v_order_cart_line', data=order_cart_line_data, as_list=True):
order_cart_rec['order_cart_line_list'] = order_cart_line_rec_li
if inc_order_cart_cfg:
if order_cart_cfg_rec := sql_select(table_name='v_account_cfg_detail', field_name='account_id', field_value=order_cart_rec.get('account_id', None)):
order_cart_rec['cfg'] = order_cart_cfg_rec
app.logger.debug(order_cart_rec)
else:
return False
try:
order_cart_obj = Order_Cart_Base(**order_cart_rec)
app.logger.debug(order_cart_obj)
except ValidationError as e:
app.logger.error(e.json())
return order_cart_obj
# ### END ### API Order Cart Model ### get_order_cart_obj() ###

209
app/models/order_methods.py Normal file
View File

@@ -0,0 +1,209 @@
from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from .log import *
from .db_sql import sql_select
from .order_model import Order_Base
from .person_model import Person_Base
from .user_model import User_Base
#from myapp.lib_general import *
#from myapp.help_crud import *
# ### BEGIN ### API Order Model ### save_order_obj() ###
def save_order_obj(order_obj_new:Order_Base=None, repl_order_line_list:bool=False):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if not order_obj_new:
return False
log.debug(order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True))
order_line_obj_li_curr = [] # Initialize to store order_line list
if order_obj_new.id_random:
log.info(f'An order.id {order_obj_new.id} or order.id_random {order_obj_new.id_random} was included. We can update an existing order.')
log.info(f'Get the current order_line list to compare with what was sent...')
data = {}
data['order_id_random'] = order_obj_new.id_random
if order_line_rec_li_curr := sql_select(table_name='v_order_line', data=data, rm_id_random=True, as_list=True):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(order_line_rec_li_curr)
for order_line_rec in order_line_rec_li_curr:
try:
order_line_obj = Order_Line_Base(**order_line_rec)
log.debug(order_line_obj)
except ValidationError as e:
log.error(e.json())
order_line_obj_li_curr.append(order_line_obj)
else:
log.info(f'No order_line records were found')
elif order_obj_new.account_id_random and (order_obj_new.person_id_random or order_obj_new.user_id_random):
log.info(f'An account.id_random {order_obj_new.account_id_random} was passed. And either a person.id_random {order_obj_new.person_id_random} or user.id_random {order_obj_new.user_id_random} was passed. We can create a new order.')
# Because there was not an order ID, assume there are no order lines yet. So no look up.
else:
log.info('Either an order ID is required to update an order or an account ID along with a person ID or user ID is required to create an order.')
return False
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(order_line_obj_li_curr)
if repl_order_line_list: # This will remove any order line list items not sent with the new order information.
log.info('Removing any order line list items not sent with the new order information...')
for index, order_line_obj_curr in enumerate(order_line_obj_li_curr):
log.info(f'Current: order line ID={order_line_obj_curr.id_random} and product ID= {order_line_obj_curr.product_id_random}')
matched_product_id = False
for order_line_obj_new in order_obj_new.order_line_list:
log.debug(f'Checking new: product ID={order_line_obj_new.product_id_random}')
if order_line_obj_curr.product_id_random == order_line_obj_new.product_id_random:
matched_product_id = True
log.debug(f'Matched: product ID={order_line_obj_new.product_id_random}')
break
else:
log.debug(f'No match: product ID={order_line_obj_new.product_id_random}')
if not matched_product_id: # Was not found in the new order line list sent
log.info(f'Current order line product ID did not match any of the new list. DELETE order line ID {order_line_obj_curr.id_random} with product ID {order_line_obj_curr.product_id_random}')
if order_line_del_result := sql_delete(table_name='order_line', record_id_random=order_line_obj_curr.id_random):
log.info(f'Deleted record and now pop the current list item {index}...')
order_line_obj_li_curr.pop(index)
else:
log.info(f'Current order line product ID matched. Keeping order line ID {order_line_obj_curr.id_random} with product ID {order_line_obj_curr.product_id_random}')
# NOTE: That this current order line item will be updated below.
log.debug(order_line_obj_li_curr)
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Loop through the line list that was sent and compare with what was pulled from the DB')
# Loop through the new line list that was sent and compare with the current line list that was pulled from the DB
# Only insert if a product ID does not match
# Only update if a product ID does match
for order_line_obj_new in order_obj_new.order_line_list:
log.info(f'New: order line ID={order_line_obj_new.id_random} and product ID= {order_line_obj_new.product_id_random}')
matched_product_id = False
for index, order_line_obj_curr in enumerate(order_line_obj_li_curr):
log.debug(f'Checking current: product ID={order_line_obj_curr.product_id_random}')
if order_line_obj_new.product_id_random == order_line_obj_curr.product_id_random:
matched_product_id = True
log.debug(f'Matched: product ID={order_line_obj_curr.product_id_random}')
log.info(f'Updating the current line item with the new line item.')
order_line_obj_new.id_random = order_line_obj_curr.id_random
order_line_obj_new.id = order_line_obj_curr.id
order_line_obj_li_curr[index] = order_line_obj_new
break
else:
log.debug(f'No match: product ID={order_line_obj_curr.product_id_random}')
if not matched_product_id: # Was not found in the current order line list that was pulled from the DB
log.info(f'New order line product ID did not match any of the current list. Append order line ID {order_line_obj_new.id_random} with product ID {order_line_obj_new.product_id_random}')
log.info('Append to current list...')
order_line_obj_li_curr.append(order_line_obj_new) # These will be inserted/updated below
# Save merged current and new list to the new order object
order_obj_new.order_line_list = order_line_obj_li_curr
log.debug(order_obj_new)
# Final loop through to get the new order totals
# Calculate totals
order_total_amount:int = 0
order_total_quantity:int = 0
for order_line_obj_new in order_obj_new.order_line_list:
order_total_amount += order_line_obj_new.quantity * order_line_obj_new.amount
order_total_quantity += order_line_obj_new.quantity
order_obj_new.total_bill = order_total_amount # "amount" is used by order_cart and Stripe
order_obj_new.total_quantity = order_total_quantity
order_obj_new.balance = order_total_amount - order_obj_new.total_paid
log.debug(order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_list', 'cfg', 'created_on', 'updated_on'}))
order_obj_data = order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_list', 'cfg', 'created_on', 'updated_on'})
# SQL INSERT or UPDATE the order record
log.info('SQL INSERT or UPDATE the order record')
if order_obj_resp := sql_insert_or_update(data=order_obj_data, table_name='order', rm_id_random=True, id_random_length=8): pass
else: return False
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(order_obj_resp)
if isinstance(order_obj_resp, bool) and order_obj_resp:
if order_id := order_obj_new.id: pass
elif order_id := order_obj_new.id_random: pass
elif isinstance(order_obj_resp, int):
order_id = order_obj_resp
else:
return False
log.debug(f'Order ID={order_id}')
# Loop through the order_line list to SQL INSERT or UPDATE the records
log.info('Loop through the order_line list to SQL INSERT or UPDATE the records')
for order_line_obj_new in order_obj_new.order_line_list:
log.info(f"New order_line: order_line_id_random={order_line_obj_new.id_random}; product_id_random={order_line_obj_new.product_id_random}")
log.debug(order_line_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=False, exclude={'order_line_id_random', 'product_type_id', 'product_type', 'created_on', 'updated_on'}))
order_line_obj_data = order_line_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_id_random', 'product_type_id', 'product_type', 'created_on', 'updated_on'})
order_line_obj_data['order_id'] = order_id
if order_line_obj_resp := sql_insert_or_update(sql=None, data=order_line_obj_data, table_name='order_line', rm_id_random=True, id_random_length=8): pass
else: return False
log.debug(order_line_obj_resp)
return order_id
# ### END ### API Order Model ### save_order_obj() ###
# ### BEGIN ### API Order Model ### get_order_obj() ###
def get_order_obj(order_id=None, inc_order_line_li=None, inc_order_cfg=None, inc_person_obj=None, inc_user_obj=None):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if order_id := redis_lookup_id_random(record_id_random=order_id, table_name='order'): pass
else:
return False
if order_rec := sql_select(table_name='v_order', record_id=order_id):
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(order_rec)
if inc_order_line_li:
order_line_data = {}
order_line_data['order_id'] = order_id
if order_line_rec_li := sql_select(table_name='v_order_line', data=order_line_data, as_list=True):#, field_name='order_id', field_value=order_id):
log.debug(order_line_rec_li)
order_rec['order_line_list'] = order_line_rec_li
if inc_order_cfg:
if order_cfg_rec := sql_select(table_name='v_account_cfg_detail', field_name='account_id', field_value=order_rec.get('account_id', None)):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(order_cfg_rec)
order_rec['cfg'] = order_cfg_rec
if inc_person_obj:
if person_rec := sql_select(table_name='v_person', field_name='person_id', field_value=order_rec.get('person_id', None)):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(person_rec)
order_rec['person'] = person_rec
log.debug(order_rec)
else:
return False
try:
order_obj = Order_Base(**order_rec)
log.debug(order_obj)
except ValidationError as e:
log.error(e.json())
return order_obj
# ### END ### API Order Model ### get_order_obj() ###

197
app/models/order_model.py Normal file
View File

@@ -0,0 +1,197 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
#from .supporting_core_models import *
from .person_model import Person_Base
from .user_model import User_Base
class Order_Cfg_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
account_name: Optional[str]
default_no_reply_email: Optional[str]
default_no_reply_name: Optional[str]
confirm_email: Optional[str]
confirm_name: Optional[str]
order_header: Optional[str]
order_thanks: Optional[str]
order_message: Optional[str]
order_footer: Optional[str]
order_fundraising_thanks: Optional[str]
order_fundraising_message: Optional[str]
class Order_Line_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
id_random: Optional[str] = Field(
**base_fields['order_line_id_random'],
alias='order_line_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='order_line_id'
)
#order_line_id_random: Optional[str]
order_id_random: Optional[str]
order_id: Optional[int]
product_id_random: str
product_id: Optional[int]
product_type_id: Optional[int]
product_type: Optional[str]
name: Optional[str]
description: Optional[str]
quantity: int = Field(0, ge=0, lt=150)
amount: int = Field(0, ge=0, lt=1500000)
recurring: Optional[bool] = False
message: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('order_line_id_random', always=True)
def order_line_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def order_line_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
log.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='order_line')
return None
@validator('order_id', always=True)
def order_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['order_id_random']:
return redis_lookup_id_random(record_id_random=values['order_id_random'], table_name='order')
return None
@validator('product_id', always=True)
def product_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['product_id_random']:
return redis_lookup_id_random(record_id_random=values['product_id_random'], table_name='product')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
class Order_Base(BaseModel):
log.setLevel(logging.WARNING)
log.debug(locals())
# from .person_model import Person_Base
# from .user_model import User_Base
id_random: Optional[str] = Field(
**base_fields['order_id_random'],
alias='order_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='order_id'
)
#order_id_random: Optional[str]
#order_id: Optional[int]
account_id_random: Optional[str]
account_id: Optional[int] # NOTE: This is not really optional
person_id_random: Optional[str]
person_id: Optional[int]
user_id_random: Optional[str]
user_id: Optional[int]
total_quantity: Optional[int] = Field(0, ge=0, lt=150)
total_bill: Optional[int] = Field(0, ge=0, lt=1500000) # NOTE: This is total_amount in the order_cart
total_paid: Optional[int] = Field(0, ge=0, lt=1500000)
balance: Optional[int] = Field(0, ge=-1500000, lt=1500000) # Balance needs to be calculated
status: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
order_line_list: List[Order_Line_Base] = []
cfg: Optional[Order_Cfg_Base] = Order_Cfg_Base()
person: Optional[Person_Base] = Person_Base()
user: Optional[User_Base] = User_Base()
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('order_id_random', always=True)
def order_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def order_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='order')
return None
@validator('account_id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['account_id_random']:
return redis_lookup_id_random(record_id_random=values['account_id_random'], table_name='account')
return None
@validator('person_id', always=True)
def person_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['person_id_random']:
return redis_lookup_id_random(record_id_random=values['person_id_random'], table_name='person')
return None
@validator('user_id', always=True)
def user_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['user_id_random']:
return redis_lookup_id_random(record_id_random=values['user_id_random'], table_name='user')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields

View File

@@ -0,0 +1,135 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
#from .account_model import Account_Base
from .contact_model import Contact_Base
#from .person_model import Person_Base
#from .user_model import User_Base
class Organization_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
#from .account_model import Account_Base
#from .contact_model import Contact_Base
#from .person_model import Person_Base
#from .user_model import User_Base
id_random: Optional[str] = Field(
**base_fields['organization_id_random'],
alias='organization_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='organization_id'
)
account_id_random: Optional[str]
account_id: Optional[int]
contact_id_random: Optional[str]
contact_id: Optional[int]
person_id_random: Optional[str]
person_id: Optional[int]
user_id_random: Optional[str]
user_id: Optional[int]
name: Optional[str]
tagline: Optional[str]
description: Optional[str]
company: Optional[bool]
nonprofit: Optional[bool]
industry: Optional[int]
start_date: Optional[datetime.datetime] = None
end_date: Optional[datetime.datetime] = None
path_logo: Optional[str]
logo_bg_color: Optional[str]
path_thumbnail: Optional[str]
thumbnail_bg_color: Optional[str]
priority: Optional[int]
sort: Optional[int]
group: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
#account: Optional[Account_Base] = Account_Base()
contact: Optional[Contact_Base] = Contact_Base()
#person: Optional[Person_Base] = Person_Base()
#user: Optional[User_Base] = User_Base()
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('organization_id_random', always=True)
def organization_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def organization_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
log.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='organization')
return None
@validator('account_id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['account_id_random']:
return redis_lookup_id_random(record_id_random=values['account_id_random'], table_name='account')
return None
@validator('contact_id', always=True)
def contact_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['contact_id_random']:
return redis_lookup_id_random(record_id_random=values['contact_id_random'], table_name='contact')
return None
@validator('person_id', always=True)
def person_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['person_id_random']:
return redis_lookup_id_random(record_id_random=values['person_id_random'], table_name='person')
return None
@validator('user_id', always=True)
def user_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['user_id_random']:
return redis_lookup_id_random(record_id_random=values['user_id_random'], table_name='user')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
#if TYPE_CHECKING:
#from .supporting_core_models import Address_Base, Contact_Base, Person_Base, User_Base
Organization_Base.update_forward_refs()

71
app/models/page_model.py Normal file
View File

@@ -0,0 +1,71 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
class Page_Base(BaseModel):
app.logger.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
id_random: Optional[str] = Field(
**base_fields['page_id_random'],
alias='page_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='page_id'
)
account_id_random: Optional[str]
account_id: Optional[int]
alias: Optional[str]
name: Optional[str]
enable: Optional[bool]
enable_from: Optional[datetime.datetime] = None
enable_to: Optional[datetime.datetime] = None
title: Optional[str]
body: Optional[str]
style_href: Optional[str]
script_src: Optional[str]
authentication_required: Optional[bool]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('page_id_random', always=True)
def page_id_random_copy(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def page_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
app.logger.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='page')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
Page_Base.update_forward_refs()

122
app/models/person_model.py Normal file
View File

@@ -0,0 +1,122 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
# from .account_model import Account_Base
from .contact_model import Contact_Base
from .organization_model import Organization_Base
# from .user_model import User_Base
class Person_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
#from .account_model import Account_Base
#from .contact_model import Contact_Base
#from .organization_model import Organization_Base
#from .user_model import User_Base
#from .organization_model import Organization_Base
#if TYPE_CHECKING:
#from .supporting_core_models import Address_Base, Contact_Base, Organization_Base, Person_Base, User_Base
#from .supporting_core_models import Address_Base, Contact_Base, Organization_Base, User_Base
id_random: Optional[str] = Field(
**base_fields['person_id_random'],
alias='person_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='person_id'
)
user_id_random: Optional[str]
user_id: Optional[int]
organization_id_random: Optional[str]
organization_id: Optional[int]
contact_id_random: Optional[str]
contact_id: Optional[int]
given_name: Optional[str]
family_name: Optional[str]
middle_name: Optional[str]
prefix: Optional[str]
suffix: Optional[str]
full_name: Optional[str]
informal_name: Optional[str]
title: Optional[str]
organization_name: Optional[str]
tagline: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
contact: Optional[Contact_Base] = Contact_Base()
organization: Optional[Organization_Base] = Organization_Base()
#user: Optional[User_Base] = User_Base()
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('person_id_random', always=True)
def person_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def person_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
log.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='person')
return None
@validator('user_id', always=True)
def user_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['user_id_random']:
return redis_lookup_id_random(record_id_random=values['user_id_random'], table_name='user')
return None
@validator('organization_id', always=True)
def organization_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['organization_id_random']:
return redis_lookup_id_random(record_id_random=values['organization_id_random'], table_name='organization')
return None
@validator('contact_id', always=True)
def contact_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['contact_id_random']:
return redis_lookup_id_random(record_id_random=values['contact_id_random'], table_name='contact')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
#from .supporting_core_models import Address_Base, Contact_Base, Organization_Base, User_Base
Person_Base.update_forward_refs()

View File

@@ -0,0 +1,92 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
class Product_Base(BaseModel):
app.logger.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
id_random: Optional[str] = Field(
**base_fields['product_id_random'],
alias='product_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='product_id'
)
account_id_random: Optional[str]
account_id: Optional[int]
type_id: Optional[int]
type_name: Optional[str] = Field(
alias='type'
)
name: Optional[str]
description: Optional[str]
image_url: Optional[str]
image_small_url: Optional[str]
unit_price: int = Field(0, ge=0, lt=1500000)
tax_rate: Optional[int]
lu_vat_id: Optional[int]
vat_rate: Optional[int]
max_quantity: Optional[int] = Field(0, ge=0, lt=150)
recurring: Optional[bool] = False
recurring_period: Optional[int]
recurring_lu_unit_id: Optional[int]
recurring_unit: Optional[str]
enable: Optional[bool]
enable_from: Optional[datetime.datetime] = None
enable_to: Optional[datetime.datetime] = None
lu_account_code_id: Optional[int]
lu_account_code_deferred_id: Optional[int]
metadata: Optional[str]
hide: Optional[int]
priority: Optional[int]
sort: Optional[int]
group: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('product_id_random', always=True)
def product_id_random_copy(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def product_id_lookup(cls, v, values, **kwargs):
app.logger.setLevel(logging.WARNING)
app.logger.debug(locals())
if values['id_random']:
app.logger.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='product')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
Product_Base.update_forward_refs()

View File

@@ -2,13 +2,15 @@ from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields
from app.config import settings
from .common_field_schema import base_fields, default_num_bytes
# The pydantic BaseModel to help make consistent REST responses - STI 2021-03-05
class Resp_Body_Base(BaseModel):

View File

@@ -0,0 +1,74 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
class Site_Domain_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
id_random: Optional[str] = Field(
**base_fields['site_domain_id_random'],
alias='site_domain_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='site_domain_id'
)
site_id_random: Optional[str]
site_id: Optional[int]
fqdn: Optional[str]
# restrict_access: Optional[bool]
access_key: Optional[str]
required_referrer: Optional[bool]
valid_for: Optional[int] # number of hours
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('site_domain_id_random', always=True)
def site_domain_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def site_domain_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
log.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='site_domain')
return None
@validator('site_id', always=True)
def site_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['site_id_random']:
return redis_lookup_id_random(record_id_random=values['site_id_random'], table_name='site')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
Site_Domain_Base.update_forward_refs()

115
app/models/site_model.py Normal file
View File

@@ -0,0 +1,115 @@
from __future__ import annotations
import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
class Site_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
id_random: Optional[str] = Field(
**base_fields['site_id_random'],
alias='site_id_random',
default_factory=lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
#alias='site_id'
)
account_id_random: Optional[str]
account_id: Optional[int]
code: Optional[str]
# fqdn_1: Optional[str]
# fqdn_2: Optional[str]
# fqdn_3: Optional[str]
name: Optional[str]
description: Optional[str]
restrict_access: Optional[bool]
access_key: Optional[str]
enable: Optional[bool]
enable_from: Optional[datetime.datetime] = None
enable_to: Optional[datetime.datetime] = None
path_logo: Optional[str]
logo_bg_color: Optional[str]
path_banner_image: Optional[str]
banner_bg_color: Optional[str]
path_background_image: Optional[str]
background_bg_color: Optional[str]
path_html_menu: Optional[str]
title: Optional[str]
html_menu: Optional[str]
html_header: Optional[str]
html_header_h1: Optional[str]
html_header_h2: Optional[str]
html_banner: Optional[str]
html_root_body: Optional[str]
html_tagline: Optional[str]
logo_filename: Optional[str]
banner_image_filename: Optional[str]
banner_html: Optional[str]
site_title: Optional[str]
site_menu_html_path: Optional[str]
site_header_h1: Optional[str]
site_header_h2: Optional[str]
site_body: Optional[str]
site_tagline: Optional[str]
style_href: Optional[str]
script_src: Optional[str]
google_tracking_id: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('site_id_random', always=True)
def site_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def site_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
log.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='site')
return None
@validator('account_id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['account_id_random']:
return redis_lookup_id_random(record_id_random=values['account_id_random'], table_name='account')
return None
class Config:
underscore_attrs_are_private = True
fields = base_fields
Site_Base.update_forward_refs()

View File

@@ -6,6 +6,7 @@ from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationEr
from ..lib_general import *
from ..log import *
from .common_field_schema import base_fields, default_num_bytes
#from .account_model import Account_Base
from .contact_model import Contact_Base

56
app/routers/address.py Normal file
View File

@@ -0,0 +1,56 @@
import datetime
#from datetime import datetime, time, timedelta
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from ..lib_general import *
from ..log import *
from app.config import settings
from app.db_sql import *
from ..models.address_model import *
from ..models.response_model import *
router = APIRouter()
# Working on the address API CRUD - STI 2021-03-08
@router.post('')
async def post_address_obj(
address: Address_Base,
x_account_id: str = Header(...),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
table_name_insert = 'address'
obj_data_dict = address.dict(by_alias=False, exclude_unset=True)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(obj_data_dict)
obj_data = lookup_id_random_pop(obj_data_dict)
base_name = Address_Base
if sql_insert_result := sql_insert(table_name=table_name_insert, data=obj_data):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_insert_result)
obj_id = sql_insert_result
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_insert_result)
return mk_resp(data=False, status_code=400)
table_name_select = 'v_address'
sql_select_result = sql_select(table_name=table_name_select, record_id=obj_id)
log.debug(sql_select_result)
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data)

View File

@@ -1,93 +1,194 @@
import datetime
#from datetime import datetime, time, timedelta
from fastapi import APIRouter, Depends, Header, HTTPException, Query, status
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from ..lib_general import *
from ..log import *
from app.config import settings
from app.db import *
#from .journal_models import *
#from .user_models import *
from .user_model import *
from .response_model import *
from app.db_sql import *
from ..models.account_model import *
from ..models.account_cfg_model import *
from ..models.address_model import *
from ..models.contact_model import *
from ..models.order_model import *
from ..models.site_model import *
from ..models.site_domain_model import *
from ..models.user_model import *
from ..models.response_model import *
obj_type_li = {}
obj_type_li['account'] = 'v_account'
obj_type_li['activity_log'] = 'activity_log'
obj_type_li['address'] = 'v_address'
obj_type_li['archive'] = 'v_archive'
obj_type_li['archive_content'] = 'v_archive_content'
obj_type_li['contact'] = 'v_contact'
obj_type_li['event'] = 'v_event'
obj_type_li['event_badge'] = 'v_event_badge'
obj_type_li['event_exhibit'] = 'v_event_exhibit'
obj_type_li['event_location'] = 'v_event_location'
obj_type_li['event_presentation'] = 'v_event_presentation'
obj_type_li['event_presenter'] = 'v_event_presenter'
obj_type_li['event_session'] = 'v_event_session'
obj_type_li['event_track'] = 'v_event_track'
obj_type_li['hosted_file'] = 'v_hosted_file'
obj_type_li['journal'] = 'v_journal'
obj_type_li['log'] = 'log' #'v_log'
obj_type_li['log_client_viewing'] = 'log_client_viewing'
obj_type_li['message'] = 'message' #'v_message'
obj_type_li['order'] = 'v_order'
obj_type_li['order_cart'] = 'v_order_cart'
obj_type_li['order_cart_line'] = 'v_order_cart_line'
obj_type_li['order_line'] = 'v_order_line'
obj_type_li['order_transaction'] = 'order_transaction'
obj_type_li['organization'] = 'v_organization'
obj_type_li['page'] = 'page'
obj_type_li['person'] = 'v_person'
obj_type_li['post'] = 'v_post_detail'
obj_type_li['post_comment'] = 'v_post_comment_detail'
obj_type_li['product'] = 'v_product'
obj_type_li['site'] = 'v_site'
obj_type_li['site_domain'] = 'v_site_domain'
obj_type_li['user'] = 'v_user'
obj_type_li['lu_education_degree'] = 'lu_education_degree'
obj_type_li['lu_education_level'] = 'lu_education_level'
obj_type_li['lu_html_color'] = 'lu_html_color'
obj_type_li['lu_time_zone'] = 'v_lu_time_zone'
obj_type_li['lu_user_status'] = 'lu_user_status'
#obj_type_li['cfg_flask'] = {'table_name': 'cfg_flask', 'base_name': Cfg_Flask_Base}
obj_type_li['stripe_customer'] = 'stripe_customer'
obj_type_li['stripe_log'] = 'stripe_log'
#obj_type_li['api_client_token'] = {'table_name': 'api_client_token', 'base_name': Api_Client_Token_Base}
#obj_type_li['api_key'] = {'table_name': 'api_key', 'base_name': Api_Key_Base}
#obj_type_li['api_token'] = {'table_name': 'api_token', 'base_name': Api_Token_Base}
obj_type_li['account'] = {'table_name': 'account', 'base_name': Account_Base}
obj_type_li['account_cfg'] = {'table_name': 'v_account_cfg_detail', 'base_name': Account_Cfg_Base} # NOTE check view name: *_detail?
#obj_type_li['activity_log'] = {'table_name': 'activity_log', 'base_name': Activity_Log_Base}
obj_type_li['address'] = {'table_name': 'v_address', 'base_name': Address_Base}
obj_type_li['archive'] = {'table_name': 'v_archive', 'base_name': Archive_Base}
obj_type_li['archive_content'] = {'table_name': 'v_archive_content', 'base_name': Archive_Content_Base}
obj_type_li['change_log'] = {'table_name': 'change_log', 'base_name': Change_Log_Base}
obj_type_li['contact'] = {'table_name': 'v_contact', 'base_name': Contact_Base}
obj_type_li['cont_edu_cert'] = {'table_name': 'cont_edu_cert', 'base_name': Cont_Edu_Cert_Base}
obj_type_li['event'] = {'table_name': 'v_event', 'base_name': Event_Base}
obj_type_li['event_badge'] = {'table_name': 'event_badge', 'base_name': Event_Badge_Base}
obj_type_li['event_badge_log'] = {'table_name': 'event_badge_log', 'base_name': Event_Badge_Log_Base}
obj_type_li['event_badge_template'] = {'table_name': 'event_badge_template', 'base_name': Event_Badge_Template_Base}
obj_type_li['event_device'] = {'table_name': 'event_device', 'base_name': Event_Device_Base}
obj_type_li['event_exhibit'] = {'table_name': 'v_event_exhibit', 'base_name': Event_Exhibit_Base} # NOTE check view name: *_detail?
obj_type_li['event_file'] = {'table_name': 'v_event_file', 'base_name': Event_File_Base} # Should this eventually be changed to event_hosted_file
obj_type_li['event_location'] = {'table_name': 'v_event_location', 'base_name': Event_Location_Base}
obj_type_li['event_presentation'] = {'table_name': 'v_event_presentation', 'base_name': Event_Presentation_Base}
obj_type_li['event_presenter'] = {'table_name': 'v_event_presenter', 'base_name': Event_Presenter_Base}
obj_type_li['event_registration'] = {'table_name': 'v_event_registration', 'base_name': Event_Registration_Base}
obj_type_li['event_session'] = {'table_name': 'v_event_session', 'base_name': Event_Session_Base}
obj_type_li['event_track'] = {'table_name': 'v_event_track', 'base_name': Event_Track_Base}
obj_type_li['hosted_file'] = {'table_name': 'hosted_file', 'base_name': Hosted_File_Base}
obj_type_li['hosted_file_link'] = {'table_name': 'hosted_file_link', 'base_name': Hosted_File_Link_Base}
obj_type_li['journal'] = {'table_name': 'v_journal', 'base_name': Journal_Base}
obj_type_li['journal_entry'] = {'table_name': 'v_journal_entry', 'base_name': Journal_Entry_Base}
obj_type_li['log'] = {'table_name': 'log', 'base_name': Log_Base} #'v_log'
obj_type_li['log_client_viewing'] = {'table_name': 'log_client_viewing', 'base_name': Log_Client_Viewing_Base}
obj_type_li['membership'] = {'table_name': 'v_membership', 'base_name': Membership_Base}
obj_type_li['membership_cfg'] = {'table_name': 'v_membership_cfg', 'base_name': Membership_Cfg_Base}
obj_type_li['message'] = {'table_name': 'message', 'base_name': Message_Base} #'v_message'
obj_type_li['order'] = {'table_name': 'v_order', 'base_name': Order_Base}
obj_type_li['order_cart'] = {'table_name': 'v_order_cart', 'base_name': Order_Cart_Base}
obj_type_li['order_cart_line'] = {'table_name': 'v_order_cart_line', 'base_name': Order_Cart_Line_Base}
obj_type_li['order_line'] = {'table_name': 'v_order_line', 'base_name': Order_Line_Base}
obj_type_li['order_transaction'] = {'table_name': 'order_transaction', 'base_name': Order_Transaction_Base}
obj_type_li['organization'] = {'table_name': 'v_organization', 'base_name': Organization_Base}
obj_type_li['page'] = {'table_name': 'page', 'base_name': Page_Base}
obj_type_li['person'] = {'table_name': 'v_person', 'base_name': Person_Base}
obj_type_li['post'] = {'table_name': 'v_post_detail', 'base_name': Post_Base} # NOTE check view name: *_detail?
obj_type_li['post_comment'] = {'table_name': 'v_post_comment_detail', 'base_name': Post_Comment_Base} # NOTE check view name: *_detail?
obj_type_li['product'] = {'table_name': 'v_product', 'base_name': Product_Base}
obj_type_li['site'] = {'table_name': 'site', 'base_name': Site_Base}
obj_type_li['site_domain'] = {'table_name': 'v_site_domain', 'base_name': Site_Domain_Base} # NOTE check view name: *_detail?
obj_type_li['user'] = {'table_name': 'v_user', 'base_name': User_Base}
obj_type_li['user_role'] = {'table_name': 'v_user_role', 'base_name': User_Role_Base} # NOTE check view name: *_detail?
obj_type_li['lu_country'] = {'table_name': 'lu_country', 'base_name': Lu_Country_Base}
obj_type_li['lu_country_subdivision'] = {'table_name': 'lu_country_subdivision', 'base_name': Lu_Country_Subdivision_Base}
obj_type_li['lu_education_degree'] = {'table_name': 'lu_education_degree', 'base_name': Lu_Education_Degree_Base}
obj_type_li['lu_education_level'] = {'table_name': 'lu_education_level', 'base_name': Lu_Education_Level_Base}
obj_type_li['lu_ethnicity'] = {'table_name': 'lu_ethnicity', 'base_name': Lu_Ethnicity_Base}
obj_type_li['lu_file_purpose'] = {'table_name': 'lu_file_purpose', 'base_name': Lu_File_Purpose_Base}
obj_type_li['lu_gender'] = {'table_name': 'lu_gender', 'base_name': Lu_Gender_Base}
obj_type_li['lu_html_color'] = {'table_name': 'lu_html_color', 'base_name': Lu_Html_Color_Base}
obj_type_li['lu_media_type'] = {'table_name': 'lu_media_type', 'base_name': Lu_Media_Type_Base}
obj_type_li['lu_membership_status'] = {'table_name': 'lu_membership_status', 'base_name': Lu_Membership_Status_Base}
obj_type_li['lu_membership_type'] = {'table_name': 'lu_membership_type', 'base_name': Lu_Membership_Type_Base}
obj_type_li['lu_order_status'] = {'table_name': 'lu_order_status', 'base_name': Lu_Order_Status_Base}
obj_type_li['lu_post_topic'] = {'table_name': 'lu_post_topic', 'base_name': Lu_Post_Topic_Base}
obj_type_li['lu_product_type'] = {'table_name': 'lu_product_type', 'base_name': Lu_Product_Type_Base}
obj_type_li['lu_pronoun'] = {'table_name': 'lu_pronoun', 'base_name': Lu_Pronoun_Base}
obj_type_li['lu_race'] = {'table_name': 'lu_race', 'base_name': Lu_Race_Base}
obj_type_li['lu_time_zone'] = {'table_name': 'v_lu_time_zone', 'base_name': Lu_Time_Zone_Base}
obj_type_li['lu_user_role'] = {'table_name': 'lu_user_role', 'base_name': Lu_User_Role_Base}
obj_type_li['lu_user_status'] = {'table_name': 'lu_user_status', 'base_name': Lu_User_Status_Base}
obj_type_li['stripe_customer'] = {'table_name': 'stripe_customer', 'base_name': Stripe_Customer_Base}
obj_type_li['stripe_log'] = {'table_name': 'stripe_log', 'base_name': Stipe_Log_Base}
# obj_type_li['c_idda_membership_profile'] = {'table_name': 'c_idda_membership_profile', 'base_name': C_Idda_Membership_Profile_Base}
# obj_type_li['c_osit_demo_membership_profile'] = {'table_name': 'c_osit_demo_membership_profile', 'base_name': C_Osit_Demo_Membership_Profile_Base}
router = APIRouter()
# Working on the basic API CRUD - STI 2021-03-05
# Working on the basic API CRUD - STI 2021-03-08
#@router.get('/{object_l1}/list')
@router.get('/{object_l1}/{object_id}/list')
@router.get('/{object_l1}/{object_l2}/{object_id}/list')
@router.get('/{object_l1}/{object_l2}/{object_id}/{object_l3}/list')
async def get_obj_li(object_l1: str=None, object_l2: str=None, object_l3: str=None, object_id: str=None, x_account_id: str = Header(...)):
response_data = {}
response_data['object_l1'] = object_l1
response_data['object_l2'] = object_l2
response_data['object_l3'] = object_l3
response_data['object_id'] = object_id
response_data['list'] = 'li'
@router.get('/{obj_type_l1}/list')
@router.get('/{obj_type_l1}/{obj_type_l2}/list')
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/list')
async def get_obj_li(obj_type_l1: str=None,
obj_type_l2: str=None,
obj_type_l3: str=None,
obj_id: str=None,
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[str] = Query(None, max_length=22),
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
sql_result = sql_select(table_name='user', record_id=1)
debug_data = {}
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
debug_data['obj_id'] = obj_id
debug_data['for_obj_type'] = for_obj_type
debug_data['for_obj_id'] = for_obj_id
response_data['sql_result'] = sql_result
log.debug(debug_data)
return response_data
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_li:
pass
else:
return mk_resp(data=False, status_code=400)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_li:
pass
else:
return mk_resp(data=False, status_code=400)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_li:
pass
else:
return mk_resp(data=False, status_code=400)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400)
table_name = obj_type_li[obj_name]['table_name']
if for_obj_type and for_obj_id:
for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
#data = {}
#data[f'{for_obj_type}_id'] = for_obj_id
field_name = f'{for_obj_type}_id'
sql_result = sql_select(table_name=table_name, field_name=f'{for_obj_type}_id', field_value=for_obj_id)
else:
sql_result = sql_select(table_name=table_name)
log.debug(sql_result)
base_name = obj_type_li[obj_name]['base_name']
resp_data_li = []
for record in sql_result:
resp_data = base_name(**record).dict(by_alias=by_alias, exclude_unset=exclude_unset)
resp_data_li.append(resp_data)
return mk_resp(data=resp_data_li)
#@router.get('/{obj_type_l1}/{obj_id_int}')
@router.get('/{obj_type_l1}/{obj_id}')
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
async def get_obj(obj_type_l1: str=None, obj_type_l2: str=None, obj_type_l3: str=None, obj_id: str=None, x_account_id: str = Header(...),
async def get_obj(obj_type_l1: str=None,
obj_type_l2: str=None,
obj_type_l3: str=None,
obj_id: str=None,
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[str] = Query(None, max_length=22),
x_account_id: str = Header(...),
qry_str: Optional[str] = Query(None, max_length=50),
qry_int: Optional[int] = None,
by_alias: Optional[bool] = True,
@@ -101,36 +202,75 @@ async def get_obj(obj_type_l1: str=None, obj_type_l2: str=None, obj_type_l3: str
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
debug_data['obj_id'] = obj_id
#debug_data['object_id_int'] = object_id_int
#debug_data['object_id_rand'] = object_id_rand
debug_data['for_obj_type'] = for_obj_type
debug_data['for_obj_id'] = for_obj_id
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_li:
table_name = obj_type_li[obj_name]
#table_name = obj_type_li[obj_name]
#table_name = obj_type_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_li:
table_name = obj_type_li[obj_name]
#table_name = obj_type_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_li:
table_name = obj_type_li[obj_name]
#table_name = obj_type_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400)
table_name = obj_type_li[obj_name]['table_name']
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
sql_result = sql_select(table_name=table_name, record_id_random=obj_id)
log.debug(sql_result)
resp_data = User_Base(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data)#, details=debug_data)
base_name = obj_type_li[obj_name]['base_name']
resp_data = base_name(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data) #, details=debug_data)
# GET: get object item or list linked to object
@router.get('/{obj_type_l1}/{obj_id}')
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
def obj_type_li_for_type_for_id(obj_type_l1: str=None,
obj_type_l2: str=None,
obj_type_l3: str=None,
obj_id: str=None,
x_account_id: str = Header(...),
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[str] = Query(None, max_length=22),
):
app.logger.setLevel(logging.DEBUG) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
app.logger.debug(locals())
for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
sql = """
SELECT *
FROM `v_address` AS address
WHERE address.account_id = :account_id
"""
response = sql_select_for_rest(data=data, table_name=None, sql=sql, model=None, iso_dates_times=True, resource_ref=True)
return jsonify(response), response['meta']['status_code']

View File

@@ -1,113 +0,0 @@
from datetime import datetime, time, timedelta
from fastapi import APIRouter, Depends, Header, HTTPException, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from ..lib_general import *
from app.config import settings
from app.db import *
from .user_models import *
#import logging
router = APIRouter()
@router.post(
"/",
response_model=UserOut,
summary='Create a new user account',
status_code=status.HTTP_201_CREATED
)
async def create_user(user: UserIn, x_account_id: str = Header(...)):
"""
Create a new user account
"""
user = {}
user['account_id_random'] = x_account_id
user['username'] = 'Scott.Idem'
user['name'] = 'Scott Idem'
user['email'] = 'Scott.Idem@oneskyit.com'
return user
#@router.patch('/{id_random}', response_model=UserOut, dependencies=[Depends(get_account_header)])
#async def update_user(id_random: str, user: UserIn, x_account_id: str = Header(...)):
#async def update_user(id_random: str, user: UserIn):
@router.patch(
'/{id_random}',
response_model=UserOut,
summary='Update a user account'
)
async def update_user(id_random: str, user: UserIn, x_account_id: str = Depends(get_account_header)):
"""
Update a user account
"""
user = {}
user['id_random'] = id_random
user['account_id_random'] = x_account_id
user['username'] = 'Scott.Idem'
user['name'] = 'Scott Idem'
user['email'] = 'Scott.Idem@oneskyit.com'
user['created_on'] = datetime.now()
user['super'] = True
return user
@router.delete('/{id_random}', response_model=bool)
async def delete_user(id_random: str, x_account_id: str = Depends(get_account_header)):
"""
Delete a user account
"""
return True
return False
@router.get('{object}/', response_model=List[UserOut])
@router.get('{object}/list_all', response_model=List[UserOut])
async def list_users():
"""
Get a list of users
"""
print(settings.APP_NAME)
users = [{'username': 'test.user.1'}, {'username': 'test.user.2'}, {'username': 'Scott.Idem'}]
print('Getting all users...')
sql = """
SELECT *
FROM `user`
WHERE id=1
"""
records = sql_select(sql=sql, as_list=True)
#records = sql_select(table_name='user')
if records:
print('Got the user list')
return records
else:
print('No user records found')
raise HTTPException(status_code=404)
@router.get('/{username}')
async def get_user_username(username: str, x_account_id: str = Header(...)):
return {'username': username}
#@router.get('/me')
#async def get_user_current():
#user_out: UserOut
#return {'username': 'test.user'}

View File

@@ -1,49 +0,0 @@
from datetime import datetime, time, timedelta
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from ..lib_general import *
from ..log import *
from app.config import settings
from app.db import *
router = APIRouter()
class Image(BaseModel):
url: str
name: str
class Item(BaseModel):
name: str
description: Optional[str] = Field(None, example='A very nice Item')
price: float
tax: Optional[float] = None
is_offer: Optional[bool] = None
#tags: List[str] = [] # not unique tags
tags: Set[str] = set() # unique tags
image: Optional[Image] = None # one image
images: Optional[List[Image]] = None # or as a list of images
@router.get('/')
async def read_items():
return [{'name': 'Item Foo'}, {'name': 'item Bar'}]
@router.get('/{item_id}')
async def read_item(item_id: str):
return {'name': 'Fake Specific Item', 'item_id': item_id}
@router.put(
'/{item_id}',
tags=['Extra Tag'],
responses={403: {'description': 'Operation forbidden'}},
)
async def update_item(item_id: str):
if item_id != 'foo':
raise HTTPException(status_code=403, detail='You can only update the item: foo')
return {'item_id': item_id, 'name': 'The Fighters'}

View File

@@ -1,40 +0,0 @@
from datetime import datetime, time, timedelta
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
class JournalBase(BaseModel):
#id_random: str = None # This should not be None. It is required.
#id_random: str = Field(None, example='iyOrkTnHEuyYUNeePbEdIg', min_length=11, max_length=22)
account_id_random: str = None # This should not be None. It is required.
user_id_random: str = Field(None, example='iyOrkTnHEuyYUNeePbEdIg', min_length=11, max_length=22)
default_private: Optional[bool] = None
default_public: Optional[bool] = None
default_personal: Optional[bool] = None
default_professional: Optional[bool] = None
private_passcode: str = Field(None, example='my passcode', min_length=3, max_length=20)
title: str = Field(None, example='The Journal Title', min_length=3, max_length=200)
summary: Optional[str] = None
hide: Optional[bool] = None
status: Optional[int] = None
archive_on: Optional[datetime] = None
archive: Optional[bool] = None
priority: Optional[bool] = None
sort: Optional[int] = None
group: Optional[str] = None
notes: Optional[str] = None
class JournalIn(JournalBase):
id_random: str = Field(None, example='iyOrkTnHEuyYUNeePbEdIg', min_length=11, max_length=22)
class JournalOut(JournalBase):
id_random: str = Field(None, example='iyOrkTnHEuyYUNeePbEdIg', min_length=11, max_length=22)
created_on: datetime
update_on: Optional[datetime] = None

View File

@@ -1,159 +0,0 @@
from datetime import datetime, time, timedelta
from fastapi import APIRouter, Depends, Header, HTTPException, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from ..lib_general import *
from ..log import *
from app.config import settings
from app.db import *
from .journal_models import *
router = APIRouter()
@router.post(
"/",
response_model=JournalOut,
response_model_exclude_unset=True,
summary='Create a new journal account',
status_code=status.HTTP_201_CREATED
)
async def create_journal(journal: JournalIn, x_account_id: str = Header(...)):
"""
Create a new journal account
"""
journal = dict(journal)
table_name = 'journal'
# Look up the journal['account_id_random'] and match to a record ID from Redis
if account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
journal['account_id'] = account_id
journal.pop('account_id_random')
else:
print('Something went wrong with the id_random lookup.')
raise HTTPException(status_code=500)
if result := sql_insert(table_name=table_name, record=journal, id_random_length=16):
print(type(result))
if type(result) == int: # isinstance(result, int):
# Select the new record to return as a response.
if new_journal := dict(sql_select(table_name=table_name, record_id=result)):
return new_journal
else:
print('New journal record was not found.')
raise HTTPException(status_code=400)
else:
print('There is likely a duplicate record. A new record was not created.')
raise HTTPException(status_code=400)
else:
print('No journal record was not created')
raise HTTPException(status_code=400)
#@router.patch('/{id_random}', response_model=JournalOut, dependencies=[Depends(get_account_header)])
#async def update_journal(id_random: str, journal: JournalIn, x_account_id: str = Header(...)):
#async def update_journal(id_random: str, journal: JournalIn):
@router.patch(
'/{id_random}',
response_model=JournalOut,
summary='Update a journal account'
)
async def update_journal(id_random: str, journal: JournalIn, x_account_id: str = Depends(get_account_header)):
"""
Update a journal account
"""
journal = {}
journal['id_random'] = id_random
journal['account_id_random'] = x_account_id
journal['title'] = 'tit'
journal['summary'] = 'sum'
#journal['created_on'] = datetime.now()
journal['default_private'] = True
journal['default_public'] = False
journal['default_personal'] = False
journal['default_professional'] = False
return journal
@router.delete('/{id_random}', response_model=bool)
async def delete_journal(id_random: str, x_account_id: str = Depends(get_account_header)):
"""
Delete a journal account
"""
return True
return False
@router.get('/', response_model=List[JournalOut])
@router.get('/list_all', response_model=List[JournalOut])
async def list_journals():
"""
Get a list of journals
"""
log.setLevel(logging.DEBUG)
log.debug(str(locals().keys())+' | '+str(locals().values()))
log.debug(locals())
journals = [{'journalname': 'test.journal.1'}, {'journalname': 'test.journal.2'}, {'journalname': 'Scott.Idem'}]
log.info('Getting all journals...')
sql = """
SELECT *
FROM `journal`
/*WHERE id=1*/
"""
#records = sql_select(sql=sql, as_list=True)
records = sql_select(table_name='v_journal', as_list=True)
if records:
log.info('Got the journal list')
return records
else:
log.info('No journal records found')
raise HTTPException(status_code=404)
@router.get(
'/{journal_id_random}',
response_model=JournalOut,
summary='Get a journal with an id (id_random)'
)
async def get_journal_id(journal_id_random: str, x_account_id: str = Header(...)):
"""
Get a journal with an id (id_random)
"""
log.setLevel(logging.WARN) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
#journal['account_id'] = account_id
#journal.pop('account_id_random')
pass
else:
log.warning('Something went wrong with the id_random lookup.')
raise HTTPException(status_code=500)
if journal_id := redis_lookup_id_random(table_name='journal', record_id_random=journal_id_random):
#journal['journal_id'] = journal_id
#journal.pop('account_id_random')
pass
else:
log.warning('Something went wrong with the id_random lookup.')
raise HTTPException(status_code=500)
record = sql_select(table_name='v_journal', record_id=journal_id)
if record:
log.info('Got the journal')
return record
else:
log.info('No journal record found')
raise HTTPException(status_code=404)

View File

@@ -1,45 +0,0 @@
from datetime import datetime, time, timedelta
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
class UserBase(BaseModel):
#id_random: str = None # This should not be None. It is required.
account_id_random: str = None # This should not be None. It is required.
username: str = Field(None, example='New.User', min_length=3, max_length=100)
name: Optional[str] = None
email: EmailStr
email_verified: Optional[bool] = None
enable: Optional[bool] = None
enable_from: Optional[datetime] = None
enable_to: Optional[datetime] = None
super: Optional[bool] = None
manager: Optional[bool] = None
administrator: Optional[bool] = None
verified: Optional[bool] = None
notes: Optional[str] = None
class UserIn(UserBase):
#id_random: str = None
password: str = Field(None, example='My Difficult Password!', min_length=10)
class UserOut(UserBase):
id_random: str = None
password_set_on: Optional[datetime] = None
password_reset_token: Optional[str] = None
password_reset_expire_on: Optional[datetime] = None
logged_in_on: Optional[datetime] = None
last_activity_on: Optional[datetime] = None
created_on: datetime
update_on: Optional[datetime] = None
class UserInDB(UserBase):
hashed_password: str
password_set_on: Optional[datetime] = None
password_reset_token: Optional[str] = None
password_reset_expire_on: Optional[datetime] = None
logged_in_on: Optional[datetime] = None
last_activity_on: Optional[datetime] = None

View File

@@ -1,149 +0,0 @@
from datetime import datetime, time, timedelta
from fastapi import APIRouter, Depends, Header, HTTPException, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from ..lib_general import *
from ..log import *
from app.config import settings
from app.db import *
from .user_models import *
router = APIRouter()
@router.post(
"/",
response_model=UserOut,
response_model_exclude_unset=True,
summary='Create a new user account',
status_code=status.HTTP_201_CREATED
)
async def create_user(user: UserIn, x_account_id: str = Header(...)):
"""
Create a new user account
"""
user = dict(user)
table_name = 'user'
# Look up the user['account_id_random'] and match to a record ID from Redis
if account_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
user['account_id'] = account_id
user.pop('account_id_random')
else:
print('Something went wrong with the id_random lookup.')
raise HTTPException(status_code=500)
if result := sql_insert(table_name=table_name, record=user, id_random_length=16):
print(type(result))
if type(result) == int: # isinstance(result, int):
# Select the new record to return as a response.
if new_user := dict(sql_select(table_name=table_name, record_id=result)):
return new_user
else:
print('New user record was not found.')
raise HTTPException(status_code=400)
else:
print('There is likely a duplicate record. A new record was not created.')
raise HTTPException(status_code=400)
else:
print('No user record was not created')
raise HTTPException(status_code=400)
#@router.patch('/{id_random}', response_model=UserOut, dependencies=[Depends(get_account_header)])
#async def update_user(id_random: str, user: UserIn, x_account_id: str = Header(...)):
#async def update_user(id_random: str, user: UserIn):
@router.patch(
'/{id_random}',
response_model=UserOut,
summary='Update a user account'
)
async def update_user(id_random: str, user: UserIn, x_account_id: str = Depends(get_account_header)):
"""
Update a user account
"""
log.setLevel(logging.DEBUG)
log.debug(locals())
user = {}
user['id_random'] = id_random
user['account_id_random'] = x_account_id
user['username'] = 'Scott.Idem'
user['name'] = 'Scott Idem'
user['email'] = 'Scott.Idem@oneskyit.com'
user['created_on'] = datetime.now()
user['super'] = True
return user
@router.delete('/{id_random}', response_model=bool)
async def delete_user(id_random: str, x_account_id: str = Depends(get_account_header)):
"""
Delete a user account
"""
log.setLevel(logging.DEBUG)
log.debug(locals())
return True
return False
@router.get('/', response_model=List[UserOut])
@router.get('/list_all', response_model=List[UserOut])
async def list_users(x_account: str = Depends(get_account_header)):
"""
Get a list of users
"""
log.setLevel(logging.DEBUG)
log.debug(locals())
if x_account['id']:
log.info('The x-account-id was given and is not empty...')
sql = """
SELECT *
FROM `user`
WHERE account_id = :account_id
"""
records = sql_select(table_name='user', field_name='account_id', field_value=x_account['id'], as_list=True)
elif x_account['id'] is None:
log.info('The x-account-id was given, but is empty...')
sql = """
SELECT *
FROM `user`
"""
records = sql_select(table_name='user', as_list=True)
if records:
log.info('Returning a user list...')
return records
else:
log.info('No user records found...')
raise HTTPException(status_code=404)
@router.get('/{username}')
async def get_user_username(username: str, x_account: str = Depends(get_account_header)):
log.setLevel(logging.DEBUG)
log.debug(locals())
data = {}
data['username'] = username
if x_account['id']:
sql = """
SELECT *
FROM `user`
WHERE account_id = :account_id AND username=:username
"""
data['account_id'] = x_account['id']
elif x_account['id'] is None:
sql = """
SELECT *
FROM `user`
WHERE (account_id IS NULL OR account_id = "") AND username=:username
"""
record = sql_select(sql=sql, data=data)
return record

View File

@@ -6,7 +6,7 @@ import aioredis, asyncio, json
from ..lib_general import *
from ..log import *
from app.config import settings
from app.db import *
from app.db_sql import *
router = APIRouter()