updated api python

This commit is contained in:
Berkay 2025-05-11 22:54:30 +03:00
commit c61a09a54a
14 changed files with 812 additions and 0 deletions

13
.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
# idea
.idea/

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.12

View File

@ -0,0 +1,87 @@
import random
from faker import Faker
from faker_commerce import Provider
from Controllers.Postgres.engine import get_db, engine, Base
from Controllers.Postgres.schema import (
User,
Order,
Product,
Supplier,
ShippingAddress,
Category,
)
fake = Faker('en_US')
fake.add_provider(Provider)
num_rows = 1000
# Base.metadata.create_all(engine)
def commit_session_or_rollback(session, list_of_appends):
try:
session.add_all(list_of_appends)
session.commit()
except Exception as err:
print('Alchemy', str(err)[:113])
session.rollback()
raise
with get_db() as session:
# Generate and add unique Products
products = []
for _ in range(num_rows):
products.append(Product(name=fake.unique.ecommerce_name() + str(_), price=round(random.uniform(10, 2000), 2)))
commit_session_or_rollback(session, products)
product_ids = [p.id for p in session.query(Product).all()]
# Generate and add unique Categories
categories = []
for _ in range(num_rows):
categories.append(Category(name=fake.unique.domain_name() + str(_)))
commit_session_or_rollback(session, categories)
category_ids = [c.id for c in session.query(Category).all()]
# Generate and add unique Suppliers
suppliers = []
for _ in range(num_rows):
suppliers.append(Supplier(name=fake.unique.free_email() + str(_), rating=round(random.uniform(2.5, 5.0), 1)))
commit_session_or_rollback(session, suppliers)
supplier_ids = [s.id for s in session.query(Supplier).all()]
# Generate and add unique Shipping Addresses
shipping_addresses = []
for _ in range(num_rows):
shipping_addresses.append(ShippingAddress(
address_line_1=fake.unique.address(),
city=fake.unique.city(),
zip_code=fake.unique.zipcode()
))
commit_session_or_rollback(session, shipping_addresses)
shipping_address_ids = [sa.id for sa in session.query(ShippingAddress).all()]
# Generate and add unique Users
users = []
for _ in range(num_rows):
users.append(User(
username=fake.unique.user_name() + str(_),
account_balance=round(random.uniform(0, 5000), 2),
preferred_category_id=random.choice(category_ids),
last_ordered_product_id=random.choice(product_ids),
supplier_rating_id=random.choice(supplier_ids)
))
commit_session_or_rollback(session, users)
user_ids = [u.id for u in session.query(User).all()]
# Generate and add unique Orders
orders = []
for _ in range(num_rows):
orders.append(Order(
order_date=fake.unique.date(),
total_amount=round(random.uniform(10, 3000), 2),
user_id=random.choice(user_ids),
shipping_address_id=random.choice(shipping_address_ids)
))
commit_session_or_rollback(session, orders)

View File

@ -0,0 +1,203 @@
import time
from typing import Optional
from Controllers.Postgres.engine import get_db, engine, Base
from Controllers.Postgres.schema import (
User,
Order,
Product,
Product,
ShippingAddress,
Category,
)
import Controllers.Postgres.schema as schemasAll
from sqlalchemy import func, inspect
from sqlalchemy.orm import Query
from pydantic import BaseModel, Field
class UserPydantic(BaseModel):
username: str = Field(..., alias='user.username')
account_balance: float = Field(..., alias='user.account_balance')
preferred_category_id: Optional[int] = Field(None, alias='user.preferred_category_id')
last_ordered_product_id: Optional[int] = Field(None, alias='user.last_ordered_product_id')
supplier_rating_id: Optional[int] = Field(None, alias='user.supplier_rating_id')
other_rating_id: Optional[int] = Field(None, alias='product.supplier_rating_id')
id: int = Field(..., alias='user.id')
class Config:
validate_by_name = True
use_enum_values = True
def model_dump(self, *args, **kwargs):
data = super().model_dump(*args, **kwargs)
return {self.__class__.model_fields[field].alias: value for field, value in data.items()}
def print_action_of_inspect():
print('columns inspect',inspect(Order).columns)
print('inspect _init_properties',inspect(Order)._init_properties)
print('inspect relationships',inspect(Order).relationships)
print('inspect all_orm_descriptors',inspect(Order).all_orm_descriptors)
print('primary_keys_full', Order.primary_keys_full)
print('columns',Order.columns)
print('hybrid_properties',Order.hybrid_properties)
print('relations',Order.relations)
print('relations',Order.__mapper__.attrs)
print('settable_attributes',Order.settable_attributes)
def print_dictionary_of_result(list_of_instrumented_attributes, result):
list_of_dictionaries = []
for user_orders_shipping_iter in result:
print_dictionary = dict()
for index, instrumented_attribute_iter in enumerate(list_of_instrumented_attributes):
print_dictionary[str(instrumented_attribute_iter)] = user_orders_shipping_iter[index]
list_of_dictionaries.append(print_dictionary)
return list_of_dictionaries
class ResultQuerySingle:
def __init__(self, query):
self.query = query
def to_dict(self):
if isinstance(self.query, Query):
retrieve_result = self.query.first()
return {f"{retrieve_result.__class__.__name__}.{key}": value for key, value in retrieve_result.to_dict().items()}
raise ValueError("query is not a Query object")
def print_result(self):
print(self.to_dict())
class ResultQueryMultiple:
def __init__(self, query):
self.query = query
def to_dict(self):
if isinstance(self.query, Query):
retrieve_result = self.query.all()
return [{f"{retrieve_result.__class__.__name__}.{key}": value for key, value in retrieve_result.to_dict().items()} for retrieve_result in retrieve_result]
raise ValueError("query is not a Query object")
def print_result(self):
print(self.to_dict())
class ResultQueryJoin:
def __init__(self, list_of_instrumented_attributes, query):
self.list_of_instrumented_attributes = list_of_instrumented_attributes
self.query = query
self.prepared_result = None
def prepare_result(self):
list_of_dictionaries = []
if isinstance(self.query, list):
for user_orders_shipping_iter in self.query:
print_dictionary = dict()
for index, instrumented_attribute_iter in enumerate(self.list_of_instrumented_attributes):
print_dictionary[str(instrumented_attribute_iter)] = user_orders_shipping_iter[index]
list_of_dictionaries.append(print_dictionary)
else:
for user_orders_shipping_iter in self.query.all():
print_dictionary = dict()
for index, instrumented_attribute_iter in enumerate(self.list_of_instrumented_attributes):
print_dictionary[str(instrumented_attribute_iter)] = user_orders_shipping_iter[index]
list_of_dictionaries.append(print_dictionary)
return list_of_dictionaries
def print_result(self):
if self.prepared_result is None:
self.prepared_result = self.prepare_result()
for dictionary_iter in self.prepared_result:
print(dictionary_iter)
def print_query(self):
print(str(self.query))
def test_join_result():
with get_db() as session:
list_of_instrumented_attributes = (User.username, Order.total_amount, ShippingAddress.city)
user_orders_shipping_query = session.query(*list_of_instrumented_attributes).join(Order, User.id == Order.user_id)
user_orders_shipping_query = user_orders_shipping_query.join(ShippingAddress, Order.shipping_address_id == ShippingAddress.id)
result_query = ResultQuery(list_of_instrumented_attributes, user_orders_shipping_query)
result_query.print_result()
result_query.print_query()
def delete_with_session(session):
User.set_session(session)
user_to_delete = User.query.filter(User.username.ilike('%test%'))
user_to_delete.delete()
def create_with_session(session, create_dict: dict):
User.set_session(session)
user_to_create = User(**create_dict)
session.add(user_to_create)
def update_with_session(session, update_dict: dict, find_dict: dict):
User.set_session(session)
user_to_update = User.query.filter_by(**find_dict).first()
for key, value in update_dict.items():
setattr(user_to_update, key, value)
session.add(user_to_update)
def test_create_update_delete():
with get_db() as session:
create_dict = {'username': 'test12_new_created', 'account_balance': 100.65478}
create_dict_2 = {'username': 'test12_new_created_2', 'account_balance': 100.65478}
update_dict = {'account_balance': 222.111}
find_dict = {'username': 'test12_new_created'}
try:
create_with_session(session, create_dict)
create_with_session(session, create_dict_2)
except Exception as create_error:
print('create_error', create_error)
try:
update_with_session(session, update_dict, find_dict)
except Exception as update_error:
print('update_error', update_error)
try:
delete_with_session(session)
except Exception as delete_error:
print('delete_error', delete_error)
print("Pending (New) Objects:")
for obj in session.new:
print(obj)
print("\nDirty (Modified) Objects:")
for obj in session.dirty:
print(obj)
print("\nDeleted Objects:")
for obj in session.deleted:
print(obj)
session.commit()
if __name__ == '__main__':
with get_db() as session:
find_dict = {'username': 'test12_new_created'}
User.set_session(session)
user_filter_all = User.query.filter(User.username.ilike('%test%'), User.account_balance > 100)
user_filter_query = User.query.filter_by(**find_dict)
user_filter_query_result = ResultQuerySingle(user_filter_query).to_dict()
print('user_filter_query_result', user_filter_query_result, type(user_filter_query_result))
validation = UserPydantic(**user_filter_query_result)
print('validation', validation.model_dump())
ResultQueryMultiple(user_filter_all).print_result()
# test_create_update_delete()

View File

@ -0,0 +1,222 @@
import random
from faker import Faker
from faker_commerce import Provider
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey, func, asc, desc
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
fake = Faker('en_US')
fake.add_provider(Provider)
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
price = Column(Float)
category_id = Column(Integer, ForeignKey('categories.id'))
category = relationship("Category")
class Category(Base):
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
class Supplier(Base):
__tablename__ = 'suppliers'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
rating = Column(Float)
class ShippingAddress(Base):
__tablename__ = 'shipping_addresses'
id = Column(Integer, primary_key=True)
address_line_1 = Column(String, unique=True)
city = Column(String)
zip_code = Column(String)
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
order_date = Column(String)
total_amount = Column(Float)
user_id = Column(Integer, ForeignKey('users.id'))
shipping_address_id = Column(Integer, ForeignKey('shipping_addresses.id'))
user = relationship("User", back_populates="orders")
shipping_address = relationship("ShippingAddress")
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String, unique=True)
account_balance = Column(Float)
preferred_category_id = Column(Integer, ForeignKey('categories.id'))
last_ordered_product_id = Column(Integer, ForeignKey('products.id'))
supplier_rating_id = Column(Integer, ForeignKey('suppliers.id'))
preferred_category = relationship("Category")
last_ordered_product = relationship("Product")
supplier_rating = relationship("Supplier")
orders = relationship("Order", back_populates="user")
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# Add some more sample data with category for Product
if not session.query(Category).first():
session.add_all([
Category(name='Electronics'),
Category(name='Accessories'),
Category(name='Books')
])
session.commit()
category_electronics = session.query(Category).filter_by(name='Electronics').first()
category_accessories = session.query(Category).filter_by(name='Accessories').first()
category_books = session.query(Category).filter_by(name='Books').first()
if not session.query(Product).first():
session.add_all([
Product(name='Laptop', price=1200.50, category=category_electronics),
Product(name='Mouse', price=25.75, category=category_accessories),
Product(name='Keyboard', price=75.20, category=category_accessories),
Product(name='Monitor', price=300.00, category=category_electronics),
Product(name='The Lord of the Rings', price=20.00, category=category_books),
Product(name='Python Crash Course', price=35.00, category=category_books),
Product(name='Webcam', price=50.00, category=category_accessories)
])
session.commit()
if not session.query(User).first():
session.add_all([
User(username='Alice', account_balance=500.00, preferred_category_id=1, last_ordered_product_id=1, supplier_rating_id=None),
User(username='Bob', account_balance=1200.75, preferred_category_id=2, last_ordered_product_id=2, supplier_rating_id=None),
User(username='Charlie', account_balance=75.50, preferred_category_id=1, last_ordered_product_id=3, supplier_rating_id=None),
User(username='Diana', account_balance=200.00, preferred_category_id=None, last_ordered_product_id=None, supplier_rating_id=None),
User(username='Eve', account_balance=1500.00, preferred_category_id=3, last_ordered_product_id=5, supplier_rating_id=None)
])
session.commit()
if not session.query(Order).first():
alice = session.query(User).filter_by(username='Alice').first()
bob = session.query(User).filter_by(username='Bob').first()
charlie = session.query(User).filter_by(username='Charlie').first()
shipping_address1 = ShippingAddress(address_line_1='123 Main St', city='Ankara', zip_code='06500')
shipping_address2 = ShippingAddress(address_line_1='456 Elm Ave', city='Istanbul', zip_code='34000')
eve = session.query(User).filter_by(username='Eve').first()
session.add_all([shipping_address1, shipping_address2])
session.commit()
session.add_all([
Order(order_date='2025-05-08', total_amount=1200.50, user=alice, shipping_address=shipping_address1),
Order(order_date='2025-05-09', total_amount=25.75, user=bob, shipping_address=shipping_address2),
Order(order_date='2025-05-10', total_amount=75.20, user=charlie, shipping_address=shipping_address1),
Order(order_date='2025-05-11', total_amount=300.00, user=alice, shipping_address=shipping_address2),
Order(order_date='2025-05-12', total_amount=20.00, user=eve, shipping_address=shipping_address1),
Order(order_date='2025-05-13', total_amount=35.00, user=bob, shipping_address=shipping_address2)
])
session.commit()
# GROUP BY Examples
# 1. Count the number of products in each category
product_counts_by_category = session.query(Category.name, func.count(Product.id)) \
.join(Product, Product.category_id == Category.id) \
.group_by(Category.name) \
.all()
print("\nProduct Counts by Category:")
for category, count in product_counts_by_category:
print(f"Category: {category}, Count: {count}")
# 2. Find the average price of products in each category
average_price_by_category = session.query(Category.name, func.avg(Product.price)) \
.join(Product, Product.category_id == Category.id) \
.group_by(Category.name) \
.all()
print("\nAverage Price by Category:")
for category, avg_price in average_price_by_category:
print(f"Category: {category}, Average Price: {avg_price:.2f}")
# 3. Find the maximum order amount for each user
max_order_amount_by_user = session.query(User.username, func.max(Order.total_amount)) \
.join(Order, User.id == Order.user_id) \
.group_by(User.username) \
.all()
print("\nMaximum Order Amount by User:")
for user, max_amount in max_order_amount_by_user:
print(f"User: {user}, Max Order Amount: {max_amount:.2f}")
# ORDER BY Examples
# 1. Get all products ordered by price in ascending order
products_ordered_by_price_asc = session.query(Product.name, Product.price) \
.order_by(asc(Product.price)) \
.all()
print("\nProducts Ordered by Price (Ascending):")
for name, price in products_ordered_by_price_asc:
print(f"Product: {name}, Price: {price:.2f}")
# 2. Get all users ordered by account balance in descending order
users_ordered_by_balance_desc = session.query(User.username, User.account_balance) \
.order_by(desc(User.account_balance)) \
.all()
print("\nUsers Ordered by Account Balance (Descending):")
for username, balance in users_ordered_by_balance_desc:
print(f"User: {username}, Balance: {balance:.2f}")
# 3. Get orders ordered by order date
orders_ordered_by_date = session.query(Order.id, Order.order_date) \
.order_by(Order.order_date) \
.all()
print("\nOrders Ordered by Date:")
for order_id, order_date in orders_ordered_by_date:
print(f"Order ID: {order_id}, Date: {order_date}")
# LIMIT Examples
# 1. Get the top 3 most expensive products
top_3_expensive_products = session.query(Product.name, Product.price) \
.order_by(desc(Product.price)) \
.limit(3) \
.all()
print("\nTop 3 Most Expensive Products:")
for name, price in top_3_expensive_products:
print(f"Product: {name}, Price: {price:.2f}")
# 2. Get the first 2 users listed alphabetically by username
first_2_users_alphabetical = session.query(User.username) \
.order_by(asc(User.username)) \
.limit(2) \
.all()
print("\nFirst 2 Users (Alphabetical):")
for user in first_2_users_alphabetical:
print(f"User: {user.username}")
# Combining GROUP BY, ORDER BY, and LIMIT
# 1. Find the category with the fewest products (limit 1)
category_with_fewest_products = session.query(Category.name, func.count(Product.id).label('product_count')) \
.join(Product, Product.category_id == Category.id) \
.group_by(Category.name) \
.order_by(asc('product_count')) \
.limit(1) \
.first()
print("\nCategory with Fewest Products:")
if category_with_fewest_products:
print(f"Category: {category_with_fewest_products.name}, Count: {category_with_fewest_products.product_count}")
# 2. Find the top 2 users with the highest average order amount
top_2_users_highest_avg_order = session.query(User.username, func.avg(Order.total_amount).label('average_order')) \
.join(Order, User.id == Order.user_id) \
.group_by(User.username) \
.order_by(desc('average_order')) \
.limit(2) \
.all()
print("\nTop 2 Users with Highest Average Order Amount:")
for user, avg_order in top_2_users_highest_avg_order:
print(f"User: {user}, Average Order: {avg_order:.2f}")
session.close()

View File

@ -0,0 +1,87 @@
from Controllers.Postgres.engine import get_db, Base
from Controllers.Postgres.schema import (
User,
Order,
Product,
Supplier,
ShippingAddress,
Category,
)
from sqlalchemy import func
import time
with get_db() as session:
start_time = time.time()
# Product table analysis
max_price = session.query(func.max(Product.price)).scalar()
min_price = session.query(func.min(Product.price)).scalar()
avg_price = session.query(func.avg(Product.price)).scalar()
print(f"product_prices Product Prices - Max: {max_price}, Min: {min_price}, Average: {avg_price}")
# Supplier table analysis
max_rating = session.query(func.max(Supplier.rating)).scalar()
min_rating = session.query(func.min(Supplier.rating)).scalar()
avg_rating = session.query(func.avg(Supplier.rating)).scalar()
print(f"supplier_ratings Supplier Ratings - Max: {max_rating}, Min: {min_rating}, Average: {avg_rating}")
# User table analysis
max_balance = session.query(func.max(User.account_balance)).scalar()
min_balance = session.query(func.min(User.account_balance)).scalar()
avg_balance = session.query(func.avg(User.account_balance)).scalar()
print(f"user_balances User Balances - Max: {max_balance}, Min: {min_balance}, Average: {avg_balance}")
# Order table analysis
max_order_amount = session.query(func.max(Order.order_date)).scalar()
min_order_amount = session.query(func.min(Order.total_amount)).scalar()
avg_order_amount = session.query(func.avg(Order.total_amount)).scalar()
print(f"order_amounts Order Amounts - Max: {max_order_amount}, Min: {min_order_amount}, Average: {avg_order_amount}")
# Example complex joins
# Join User with Preferred Category and Last Ordered Product
user_category_product = session.query(User.username, Category.name.label('preferred_category'), Product.name.label('last_product')) \
.join(Category, User.preferred_category_id == Category.id) \
.join(Product, User.last_ordered_product_id == Product.id) \
.all()
print("\nUsers, their Preferred Category, and Last Ordered Product:")
for user, category, product in user_category_product[0:5]:
print(f"user_category_product User: {user}, Preferred Category: {category}, Last Product: {product}")
# Join User with Orders and Shipping Address
user_orders_shipping = session.query(User.username, Order.total_amount, ShippingAddress.city) \
.join(Order, User.id == Order.user_id) \
.join(ShippingAddress, Order.shipping_address_id == ShippingAddress.id) \
.all()
print("\n Users, their Order Amounts, and Shipping City:")
for user, amount, city in user_orders_shipping[0:5]:
print(f"user_orders_shipping User: {user}, Order Amount: {amount}, Shipping City: {city}")
# Join User with Preferred Category and Supplier Rating
user_category_supplier = session.query(User.username, Category.name.label('preferred_category'), Supplier.rating.label('supplier_rating')) \
.join(Category, User.preferred_category_id == Category.id) \
.join(Supplier, User.supplier_rating_id == Supplier.id) \
.all()
print("\nUsers, their Preferred Category, and Supplier Rating:")
for user, category, rating in user_category_supplier[0:5]:
print(f"user_category_supplier User: {user}, Preferred Category: {category}, Supplier Rating: {rating}")
left_join_result = session.query(User.username, Category.name.label('preferred_category')) \
.outerjoin(Category, User.preferred_category_id == Category.id) \
.all()
print("\nLeft Outer Join (All Users and their Preferred Categories):")
for user, category in left_join_result[0:5]:
print(f"left_join_result User: {user}, Preferred Category: {category}")
right_join_result = session.query(User.username, Category.name.label('preferred_category')) \
.outerjoin(User, Category.id == User.preferred_category_id) \
.all()
print("\nRight Outer Join (All Categories and the Users who Prefer Them):")
for user, category in right_join_result[0:5]:
print(f"right_join_result Category: {category}, User: {user}")
end_time = time.time()
print(f"Total time taken: {end_time - start_time} seconds")

View File

@ -0,0 +1,32 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
class Configs(BaseSettings):
"""
Postgresql configuration settings.
"""
DB: str = "sample_db"
USER: str = "sample_user"
PASSWORD: str = "password"
HOST: str = "10.10.2.14"
PORT: int = 5432
ENGINE: str = "postgresql+psycopg2"
POOL_PRE_PING: bool = True
POOL_SIZE: int = 20
MAX_OVERFLOW: int = 10
POOL_RECYCLE: int = 600
POOL_TIMEOUT: int = 30
ECHO: bool = True
@property
def url(self):
"""Generate the database URL."""
return f"{self.ENGINE}://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{self.DB}"
model_config = SettingsConfigDict(env_prefix="POSTGRES_")
# singleton instance of the POSTGRESQL configuration settings
postgres_configs = Configs()
print("url", postgres_configs.url)

View File

@ -0,0 +1,62 @@
from contextlib import contextmanager
from functools import lru_cache
from typing import Generator
from Controllers.Postgres.config import postgres_configs
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session, Session
# Configure the database engine with proper pooling
engine = create_engine(
postgres_configs.url,
pool_pre_ping=True,
pool_size=10, # Reduced from 20 to better match your CPU cores
max_overflow=5, # Reduced from 10 to prevent too many connections
pool_recycle=600, # Keep as is
pool_timeout=30, # Keep as is
echo=False, # Consider setting to False in production
)
Base = declarative_base()
# Create a cached session factory
@lru_cache()
def get_session_factory() -> scoped_session:
"""Create a thread-safe session factory."""
session_local = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
expire_on_commit=True, # Prevent expired object issues
)
return scoped_session(session_local)
# Get database session with proper connection management
@contextmanager
def get_db() -> Generator[Session, None, None]:
"""Get database session with proper connection management.
This context manager ensures:
- Proper connection pooling
- Session cleanup
- Connection return to pool
- Thread safety
Yields:
Session: SQLAlchemy session object
"""
session_factory = get_session_factory()
session = session_factory()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
session_factory.remove() # Clean up the session from the registry

View File

@ -0,0 +1,64 @@
from Controllers.Postgres.engine import Base
from sqlalchemy import Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy_mixins.serialize import SerializeMixin
from sqlalchemy_mixins.repr import ReprMixin
from sqlalchemy_mixins.smartquery import SmartQueryMixin
from sqlalchemy_mixins.activerecord import ActiveRecordMixin
class BasicMixin(
Base,
ActiveRecordMixin,
SerializeMixin,
ReprMixin,
SmartQueryMixin,
):
__abstract__ = True
__repr__ = ReprMixin.__repr__
id = mapped_column(Integer, primary_key=True)
class Product(BasicMixin):
__tablename__ = 'products'
name = Column(String, unique=True)
price = Column(Float)
class Category(BasicMixin):
__tablename__ = 'categories'
name = Column(String, unique=True)
class Supplier(BasicMixin):
__tablename__ = 'suppliers'
name = Column(String, unique=True)
rating = Column(Float)
class ShippingAddress(BasicMixin):
__tablename__ = 'shipping_addresses'
address_line_1 = Column(String, unique=True)
city = Column(String)
zip_code = Column(String)
class Order(BasicMixin):
__tablename__ = 'orders'
order_date = Column(String)
total_amount = Column(Float)
user_id = Column(Integer, ForeignKey('users.id'))
shipping_address_id = Column(Integer, ForeignKey('shipping_addresses.id'))
user = relationship("User", back_populates="orders")
shipping_address = relationship("ShippingAddress")
class User(BasicMixin):
__tablename__ = 'users'
username = Column(String, unique=True)
account_balance = Column(Float)
preferred_category_id = Column(Integer, ForeignKey('categories.id'))
last_ordered_product_id = Column(Integer, ForeignKey('products.id'))
supplier_rating_id = Column(Integer, ForeignKey('suppliers.id'))
preferred_category = relationship("Category")
last_ordered_product = relationship("Product")
supplier_rating = relationship("Supplier")
orders = relationship("Order", back_populates="user")

0
Controllers/a.txt Normal file
View File

0
README.md Normal file
View File

7
Readme.md Normal file
View File

@ -0,0 +1,7 @@
CREATE DATABASE sample_db;
CREATE USER sample_user WITH PASSWORD 'password';
GRANT ALL PRIVILEGES ON DATABASE sample_db TO sample_user;
Enter with
psql -U sample_user -d sample_db

6
main.py Normal file
View File

@ -0,0 +1,6 @@
def main():
print("Hello from backendmodule!")
if __name__ == "__main__":
main()

28
pyproject.toml Normal file
View File

@ -0,0 +1,28 @@
[project]
name = "backendmodule"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"alembic>=1.15.2",
"arrow>=1.3.0",
"cryptography>=44.0.2",
"faker>=37.1.0",
"fastapi>=0.115.12",
"pandas>=2.2.3",
"prometheus-fastapi-instrumentator>=7.1.0",
"psycopg2-binary>=2.9.10",
"pydantic-settings>=2.8.1",
"pymongo>=4.11.3",
"pytest>=8.3.5",
"redbox>=0.2.1",
"redis>=5.2.1",
"redmail>=0.6.0",
"requests>=2.32.3",
"sqlalchemy-mixins>=2.0.5",
"textdistance>=4.6.3",
"unidecode>=1.3.8",
"uvicorn>=0.34.0",
]