postgres added

This commit is contained in:
berkay 2025-01-13 20:55:05 +03:00
parent 8b263a3a5c
commit 3bc0146767
15 changed files with 1120 additions and 37 deletions

View File

@ -1,6 +1,7 @@
from redmail import EmailSender
from AllConfigs.Email.configs import EmailConfig
from AllConfigs.Email.email_send_model import EmailSendModel
from redmail import EmailSender
email_sender = EmailSender(**EmailConfig.as_dict())

View File

@ -0,0 +1,68 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, TypeVar, Union
from sqlalchemy import Column, DateTime, Integer, inspect
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import Session
from Services.Postgres.database import Base, get_db
T = TypeVar('T')
class PostgresBase(Base):
"""Base class for all PostgreSQL models."""
__abstract__ = True
id = Column(Integer, primary_key=True, index=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
deleted_at = Column(DateTime, nullable=True)
@declared_attr
def __tablename__(cls) -> str:
"""Generate table name automatically from class name."""
return cls.__name__.lower()
def as_dict(self) -> Dict[str, Any]:
"""Convert model instance to dictionary."""
return {c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs}
@classmethod
def filter_non_deleted(cls, db: Session):
"""Filter out soft-deleted records."""
return db.query(cls).filter(cls.deleted_at.is_(None))
@classmethod
def get_by_id(cls, db: Session, id: int) -> Optional['PostgresBase']:
"""Get record by ID if not deleted."""
return cls.filter_non_deleted(db).filter(cls.id == id).first()
def soft_delete(self) -> None:
"""Soft delete the record."""
self.deleted_at = datetime.utcnow()
with get_db() as db:
self.save(db)
def save(self, db: Session) -> None:
"""Save the current record."""
if not self.id:
db.add(self)
db.commit()
db.refresh(self)
def update(self, **kwargs: Dict[str, Any]) -> None:
"""Update record with given values."""
for key, value in kwargs.items():
if hasattr(self, key):
setattr(self, key, value)
with get_db() as db:
self.save(db)
@classmethod
def create(cls, **kwargs: Dict[str, Any]) -> 'PostgresBase':
"""Create a new record."""
instance = cls(**kwargs)
with get_db() as db:
instance.save(db)
return instance

View File

@ -0,0 +1,120 @@
from contextlib import contextmanager
from typing import Any, Dict, Optional, Generator
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
class BaseModel:
"""Base model class with common utility functions."""
@contextmanager
def db_session(self) -> Generator[Session, None, None]:
"""Context manager for database session."""
session = self.get_session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def update(self, **kwargs: Dict[str, Any]) -> "BaseModel":
"""Update model instance with given attributes."""
with self.db_session() as session:
# Remove unrelated fields
check_kwargs = self.remove_non_related_inputs(kwargs)
# Handle confirmation logic
is_confirmed_argument = kwargs.get("is_confirmed", None)
if is_confirmed_argument and not len(kwargs) == 1:
self.raise_http_exception(
status_code="HTTP_406_NOT_ACCEPTABLE",
error_case="ConfirmError",
data=kwargs,
message="Confirm field cannot be updated with other fields",
)
# Process system fields
check_kwargs = self.extract_system_fields(check_kwargs, create=False)
# Update attributes
for key, value in check_kwargs.items():
setattr(self, key, value)
# Handle user tracking
if hasattr(self, "creds"):
person_id = getattr(self.creds, "person_id", None)
person_name = getattr(self.creds, "person_name", None)
if person_id and person_name:
if is_confirmed_argument:
self.confirmed_by_id = self.creds.get("person_id", "Unknown")
self.confirmed_by = self.creds.get("person_name", "Unknown")
else:
self.updated_by_id = self.creds.get("person_id", "Unknown")
self.updated_by = self.creds.get("person_name", "Unknown")
session.add(self)
session.flush()
return self
@classmethod
@contextmanager
def create_with_session(
cls, **kwargs: Dict[str, Any]
) -> Generator["BaseModel", None, None]:
"""Create new instance with session management."""
instance = cls()
session = instance.get_session()
try:
check_kwargs = cls.remove_non_related_inputs(instance, kwargs)
check_kwargs = cls.extract_system_fields(
instance, check_kwargs, create=True
)
for key, value in check_kwargs.items():
setattr(instance, key, value)
if hasattr(instance, "creds"):
person_id = getattr(instance.creds, "person_id", None)
person_name = getattr(instance.creds, "person_name", None)
if person_id and person_name:
instance.created_by_id = instance.creds.get("person_id", "Unknown")
instance.created_by = instance.creds.get("person_name", "Unknown")
session.add(instance)
session.flush()
yield instance
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# @router.put("/users/{user_id}")
# async def update_user(
# user_id: str,
# update_data: Dict[str, Any],
# db: Session = Depends(get_db_session)
# ):
# with db_session() as session:
# user = session.query(User).filter(User.id == user_id).first()
# if not user:
# raise HTTPException(status_code=404, detail="User not found")
#
# updated_user = user.update(**update_data)
# return updated_user
#
#
# @router.post("/users")
# async def create_user(
# user_data: Dict[str, Any],
# db: Session = Depends(get_db_session)
# ):
# with User.create_with_session(**user_data) as new_user:
# return new_user

View File

@ -0,0 +1,216 @@
"""
How to Use PostgreSQL Models
This module provides examples of how to use the base models and database sessions
effectively in your application.
"""
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Integer
from sqlalchemy.orm import Mapped, mapped_column
from Services.Postgres.Models.my_base_model import CrudCollection
from Services.Postgres.database import get_db
# Example Model Definition
class User(CrudCollection):
"""Example user model demonstrating CrudCollection usage."""
__tablename__ = "users"
# Additional fields (id and other common fields come from CrudCollection)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
email: Mapped[str] = mapped_column(String(100), unique=True)
age: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
# Example Usage
def example_create():
"""Example of creating a new record."""
with get_db() as db:
# Create a new user
user = User.find_or_create(
db,
username="john_doe",
email="john@example.com",
age=30
)
db.commit()
return user
def example_batch_create():
"""Example of creating multiple records in a single transaction."""
with get_db() as db:
try:
# Create multiple users in one transaction
users = []
for i in range(3):
user = User.find_or_create(
db,
username=f"user_{i}",
email=f"user_{i}@example.com"
)
users.append(user)
db.commit()
return users
except Exception:
db.rollback()
raise
def example_update():
"""Example of updating a record."""
with get_db() as db:
# Find user and update
user = db.query(User).filter(User.username == "john_doe").first()
if user:
user.update(
db,
email="john.doe@newdomain.com",
age=31
)
db.commit()
return user
def example_soft_delete():
"""Example of soft deleting a record."""
with get_db() as db:
user = db.query(User).filter(User.username == "john_doe").first()
if user:
# This will set deleted=True instead of actually deleting the record
user.update(db, deleted=True)
db.commit()
def example_query():
"""Example of querying records."""
with get_db() as db:
# Get active (non-deleted) users
active_users = (
db.query(User)
.filter(
User.active == True,
User.deleted == False,
User.age >= 18
)
.order_by(User.created_at.desc())
.all()
)
return active_users
def example_complex_transaction():
"""Example of a complex transaction with multiple operations."""
with get_db() as db:
try:
# Multiple operations in single transaction
user = User.find_or_create(
db,
username="new_user",
email="new@example.com"
)
# Update existing user
other_user = db.query(User).filter(User.username == "old_user").first()
if other_user:
other_user.update(db, email="updated@example.com")
# Soft delete another user
deleted_user = db.query(User).filter(User.username == "to_delete").first()
if deleted_user:
deleted_user.update(db, deleted=True)
# Commit all changes at once
db.commit()
except Exception:
# Rollback all changes if any operation fails
db.rollback()
raise
def example_serialization():
"""Example of serializing records to dictionaries."""
with get_db() as db:
user = db.query(User).first()
if user:
# Get all fields except specified ones
dict_with_excludes = user.get_dict(
exclude=["created_at", "updated_at"]
)
# Get only specified fields
dict_with_includes = user.get_dict(
include=["id", "username", "email"]
)
return {
"excluded": dict_with_excludes,
"included": dict_with_includes
}
def example_confirmation():
"""Example of confirming a record."""
with get_db() as db:
user = db.query(User).filter(User.username == "pending_user").first()
if user:
# Only update confirmation status
user.update(db, is_confirmed=True)
db.commit()
return user
# Example of error handling
def example_error_handling():
"""Example of proper error handling."""
with get_db() as db:
try:
# Attempt to create user
user = User.find_or_create(
db,
username="existing_user", # This might cause unique constraint violation
email="exists@example.com"
)
db.commit()
return {"status": "success", "user": user.get_dict()}
except Exception as e:
db.rollback()
return {
"status": "error",
"message": str(e),
"error_type": e.__class__.__name__
}
# Example of working with dates
def example_date_handling():
"""Example of working with dates and expiry."""
with get_db() as db:
# Find records valid at current time
current_users = (
db.query(User)
.filter(
User.expiry_starts <= datetime.utcnow(),
User.expiry_ends > datetime.utcnow()
)
.all()
)
# Set expiry for a user
user = db.query(User).first()
if user:
user.update(
db,
expiry_ends=datetime(2024, 12, 31, 23, 59, 59)
)
db.commit()
return current_users

View File

@ -0,0 +1,534 @@
"""
PostgreSQL Base Models Module
This module provides base classes for PostgreSQL models with common functionality such as:
- CRUD operations with session management
- Soft delete capability
- Automatic timestamps
- User tracking (created_by, updated_by)
- Data serialization
- Multi-language support
"""
import datetime
from decimal import Decimal
from typing import Any, Dict, List, Optional, Type, TypeVar, Union, cast
from sqlalchemy import (
TIMESTAMP,
NUMERIC,
func,
text,
UUID,
String,
Integer,
Boolean,
SmallInteger,
)
from sqlalchemy.orm import (
Mapped,
mapped_column,
Session
)
from sqlalchemy_mixins.serialize import SerializeMixin
from sqlalchemy_mixins.repr import ReprMixin
from sqlalchemy_mixins.smartquery import SmartQueryMixin
from api_library import DateTimeLocal, system_arrow
from Services.Postgres.database import Base, get_db
# Type variable for class methods returning self
T = TypeVar('T', bound='CrudMixin')
class CrudMixin(Base, SmartQueryMixin, SerializeMixin):
"""
Base mixin providing CRUD operations and common fields for PostgreSQL models.
Features:
- Automatic timestamps (created_at, updated_at)
- Soft delete capability
- User tracking (created_by, updated_by)
- Data serialization
- Multi-language support
"""
__abstract__ = True
# System fields that should be handled automatically during creation
__system__fields__create__ = (
"created_at", "updated_at", "cryp_uu_id",
"created_by", "created_by_id", "updated_by", "updated_by_id",
"replication_id", "confirmed_by", "confirmed_by_id",
"is_confirmed", "deleted", "active",
"is_notification_send", "is_email_send",
)
# System fields that should be handled automatically during updates
__system__fields__update__ = (
"cryp_uu_id", "created_at", "updated_at",
"created_by", "created_by_id", "confirmed_by", "confirmed_by_id",
"updated_by", "updated_by_id", "replication_id",
)
# Default fields to exclude from serialization
__system_default_model__ = [
"cryp_uu_id", "is_confirmed", "deleted",
"is_notification_send", "replication_id", "is_email_send",
"confirmed_by_id", "confirmed_by", "updated_by_id", "created_by_id",
]
# User credentials and preferences
creds = None
lang: str = "tr"
client_arrow: Optional[DateTimeLocal] = None
valid_record_dict: Dict[str, bool] = {"active": True, "deleted": False}
meta_data: Dict[str, Any] = {}
# Common timestamp fields for all models
expiry_starts: Mapped[TIMESTAMP] = mapped_column(
TIMESTAMP(timezone=True),
server_default=func.now(),
nullable=False,
comment="Record validity start timestamp"
)
expiry_ends: Mapped[TIMESTAMP] = mapped_column(
TIMESTAMP(timezone=True),
default="2099-12-31",
server_default="2099-12-31",
comment="Record validity end timestamp"
)
@classmethod
def set_user_define_properties(cls, token: Any) -> None:
"""
Set user-specific properties from the authentication token.
Args:
token: Authentication token containing user preferences
"""
cls.creds = token.credentials
cls.client_arrow = DateTimeLocal(is_client=True, timezone=token.timezone)
cls.lang = str(token.lang).lower()
@classmethod
def remove_non_related_inputs(cls, kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""
Filter out inputs that don't correspond to model fields.
Args:
kwargs: Dictionary of field names and values
Returns:
Dictionary containing only valid model fields
"""
return {
key: value
for key, value in kwargs.items()
if key in cls.columns + cls.hybrid_properties + cls.settable_relations
}
@classmethod
def extract_system_fields(cls, filter_kwargs: dict, create: bool = True) -> Dict[str, Any]:
"""
Remove system-managed fields from input dictionary.
Args:
filter_kwargs: Input dictionary of fields
create: If True, use creation field list, else use update field list
Returns:
Dictionary with system fields removed
"""
system_fields = filter_kwargs.copy()
extract_fields = (
cls.__system__fields__create__ if create
else cls.__system__fields__update__
)
for field in extract_fields:
system_fields.pop(field, None)
return system_fields
@classmethod
def iterate_over_variables(cls, val: Any, key: str) -> tuple[bool, Optional[Any]]:
"""
Process a field value based on its type and convert it to the appropriate format.
Args:
val: Field value
key: Field name
Returns:
Tuple of (should_include, processed_value)
"""
key_ = cls.__annotations__.get(key, None)
is_primary = key in cls.primary_keys
row_attr = bool(getattr(getattr(cls, key), "foreign_keys", None))
# Skip primary keys and foreign keys
if is_primary or row_attr:
return False, None
# Handle None values
if val is None:
return True, None
# Special handling for UUID fields
if str(key[-5:]).lower() == "uu_id":
return True, str(val)
# Handle typed fields
if key_:
if key_ == Mapped[int]:
return True, int(val)
elif key_ == Mapped[bool]:
return True, bool(val)
elif key_ == Mapped[float] or key_ == Mapped[NUMERIC]:
return True, round(float(val), 3)
elif key_ == Mapped[TIMESTAMP]:
return True, str(
cls.client_arrow.get(str(val)).format("DD-MM-YYYY HH:mm:ss +0")
)
elif key_ == Mapped[str]:
return True, str(val)
# Handle based on Python types
else:
if isinstance(val, datetime.datetime):
return True, str(
cls.client_arrow.get(str(val)).format("DD-MM-YYYY HH:mm:ss +0")
)
elif isinstance(val, bool):
return True, bool(val)
elif isinstance(val, (float, Decimal)):
return True, round(float(val), 3)
elif isinstance(val, int):
return True, int(val)
elif isinstance(val, str):
return True, str(val)
elif val is None:
return True, None
return False, None
@classmethod
def find_or_create(cls: Type[T], db: Session, **kwargs) -> T:
"""
Find an existing record matching the criteria or create a new one.
Args:
db: Database session
**kwargs: Search/creation criteria
Returns:
Existing or newly created record
"""
check_kwargs = cls.extract_system_fields(kwargs)
# Search for existing record
query = db.query(cls).filter(
cls.expiry_ends > str(system_arrow.now()),
cls.expiry_starts <= str(system_arrow.now()),
)
for key, value in check_kwargs.items():
if hasattr(cls, key):
query = query.filter(getattr(cls, key) == value)
already_record = query.first()
# Handle existing record
if already_record:
if already_record.deleted:
already_record.meta_data = {
"created": False,
"error_case": "DeletedRecord",
"message": "",
}
return already_record
elif not already_record.is_confirmed:
already_record.meta_data = {
"created": False,
"error_case": "IsNotConfirmed",
"message": "",
}
return already_record
already_record.meta_data = {
"created": False,
"error_case": "AlreadyExists",
"message": "",
}
return already_record
# Create new record
check_kwargs = cls.remove_non_related_inputs(check_kwargs)
created_record = cls()
for key, value in check_kwargs.items():
setattr(created_record, key, value)
if getattr(cls.creds, "person_id", None) and getattr(cls.creds, "person_name", None):
created_record.created_by_id = cls.creds.person_id
created_record.created_by = cls.creds.person_name
db.add(created_record)
db.flush()
created_record.meta_data = {"created": True, "error_case": None, "message": ""}
return created_record
def update(self, db: Session, **kwargs) -> 'CrudMixin':
"""
Update the record with new values.
Args:
db: Database session
**kwargs: Fields to update
Returns:
Updated record
Raises:
ValueError: If attempting to update is_confirmed with other fields
"""
check_kwargs = self.remove_non_related_inputs(kwargs)
is_confirmed_argument = kwargs.get("is_confirmed", None)
if is_confirmed_argument and not len(kwargs) == 1:
raise ValueError("Confirm field cannot be updated with other fields")
check_kwargs = self.extract_system_fields(check_kwargs, create=False)
for key, value in check_kwargs.items():
setattr(self, key, value)
# Update confirmation or modification tracking
if is_confirmed_argument:
if getattr(self.creds, "person_id", None) and getattr(self.creds, "person_name", None):
self.confirmed_by_id = self.creds.person_id
self.confirmed_by = self.creds.person_name
else:
if getattr(self.creds, "person_id", None) and getattr(self.creds, "person_name", None):
self.updated_by_id = self.creds.person_id
self.updated_by = self.creds.person_name
db.flush()
return self
def get_dict(
self,
exclude: Optional[List[str]] = None,
include: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Convert model instance to dictionary with customizable fields.
Args:
exclude: List of fields to exclude
include: List of fields to include (takes precedence over exclude)
Returns:
Dictionary representation of the model
"""
return_dict: Dict[str, Any] = {}
if include:
# Handle explicitly included fields
exclude_list = [
element
for element in self.__system_default_model__
if str(element)[-2:] == "id" and str(element)[-5:].lower() == "uu_id"
]
columns_include_list = list(set(include).difference(set(exclude_list)))
columns_include_list.extend(["uu_id"])
for key in columns_include_list:
val = getattr(self, key)
correct, value_of_database = self.iterate_over_variables(val, key)
if correct:
return_dict[key] = value_of_database
elif exclude:
# Handle explicitly excluded fields
exclude.extend(list(set(getattr(self, '__exclude__fields__', []) or []).difference(exclude)))
exclude.extend([
element
for element in self.__system_default_model__
if str(element)[-2:] == "id"
])
columns_excluded_list = list(set(self.columns).difference(set(exclude)))
columns_excluded_list.extend(["uu_id", "active"])
for key in columns_excluded_list:
val = getattr(self, key)
correct, value_of_database = self.iterate_over_variables(val, key)
if correct:
return_dict[key] = value_of_database
else:
# Handle default field selection
exclude_list = (getattr(self, '__exclude__fields__', []) or []) + self.__system_default_model__
columns_list = list(set(self.columns).difference(set(exclude_list)))
columns_list = [col for col in columns_list if str(col)[-2:] != "id"]
columns_list.extend([col for col in self.columns if str(col)[-5:].lower() == "uu_id"])
for remove_field in self.__system_default_model__:
if remove_field in columns_list:
columns_list.remove(remove_field)
for key in columns_list:
val = getattr(self, key)
correct, value_of_database = self.iterate_over_variables(val, key)
if correct:
return_dict[key] = value_of_database
return return_dict
class BaseCollection(CrudMixin, ReprMixin):
"""Base model class with minimal fields."""
__abstract__ = True
__repr__ = ReprMixin.__repr__
id: Mapped[int] = mapped_column(primary_key=True)
class CrudCollection(CrudMixin, SmartQueryMixin):
"""
Full-featured model class with all common fields.
Includes:
- UUID and reference ID
- Timestamps
- User tracking
- Confirmation status
- Soft delete
- Notification flags
"""
__abstract__ = True
__repr__ = ReprMixin.__repr__
# Primary and reference fields
id: Mapped[int] = mapped_column(primary_key=True)
uu_id: Mapped[str] = mapped_column(
UUID,
server_default=text("gen_random_uuid()"),
index=True,
unique=True,
comment="Unique identifier UUID"
)
ref_id: Mapped[str] = mapped_column(
String(100),
nullable=True,
index=True,
comment="External reference ID"
)
# Timestamps
created_at: Mapped[TIMESTAMP] = mapped_column(
TIMESTAMP(timezone=True),
server_default=func.now(),
nullable=False,
index=True,
comment="Record creation timestamp"
)
updated_at: Mapped[TIMESTAMP] = mapped_column(
TIMESTAMP(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
index=True,
comment="Last update timestamp"
)
# Cryptographic and user tracking
cryp_uu_id: Mapped[str] = mapped_column(
String,
nullable=True,
index=True,
comment="Cryptographic UUID"
)
created_by: Mapped[str] = mapped_column(
String,
nullable=True,
comment="Creator name"
)
created_by_id: Mapped[int] = mapped_column(
Integer,
nullable=True,
comment="Creator ID"
)
updated_by: Mapped[str] = mapped_column(
String,
nullable=True,
comment="Last modifier name"
)
updated_by_id: Mapped[int] = mapped_column(
Integer,
nullable=True,
comment="Last modifier ID"
)
confirmed_by: Mapped[str] = mapped_column(
String,
nullable=True,
comment="Confirmer name"
)
confirmed_by_id: Mapped[int] = mapped_column(
Integer,
nullable=True,
comment="Confirmer ID"
)
# Status flags
is_confirmed: Mapped[bool] = mapped_column(
Boolean,
server_default="0",
comment="Record confirmation status"
)
replication_id: Mapped[int] = mapped_column(
SmallInteger,
server_default="0",
comment="Replication identifier"
)
deleted: Mapped[bool] = mapped_column(
Boolean,
server_default="0",
comment="Soft delete flag"
)
active: Mapped[bool] = mapped_column(
Boolean,
server_default="1",
comment="Record active status"
)
is_notification_send: Mapped[bool] = mapped_column(
Boolean,
server_default="0",
comment="Notification sent flag"
)
is_email_send: Mapped[bool] = mapped_column(
Boolean,
server_default="0",
comment="Email sent flag"
)
@classmethod
def retrieve_language_model(cls, lang: str, response_model: Any) -> Dict[str, str]:
"""
Retrieve language-specific model headers and validation messages.
Args:
lang: Language code
response_model: Model containing language annotations
Returns:
Dictionary of field names to localized headers
"""
headers_and_validation = {}
__language_model__ = getattr(cls.__language_model__, lang, "tr")
for field in response_model.__annotations__.keys():
headers_and_validation[field] = getattr(__language_model__, field, "Lang Not found")
return headers_and_validation

View File

@ -0,0 +1,43 @@
from typing import Any, List, Optional, TypeVar, Union
from sqlalchemy.orm import Query
from sqlalchemy.orm.session import Session
T = TypeVar('T')
class QueryResponse:
"""Handler for SQLAlchemy query results with error handling."""
def __init__(self, db: Session, query: Query, first: bool = False):
self.db = db
self.first = first
self.__query = query
def get(self, index: int) -> Optional[T]:
"""Get item at specific index if it exists."""
count = self.count
if count and not index > count:
return self.data[index - 1]
return None
@property
def data(self) -> Union[Optional[T], List[T]]:
"""Get query results with error handling."""
try:
if self.first:
return self.__query.first()
return self.__query.all()
except Exception as e:
# Handle any database errors by rolling back
self.db.rollback()
return None if self.first else []
@property
def count(self) -> int:
"""Get total count of query results."""
return self.__query.count()
@property
def query(self) -> Query:
"""Get the underlying SQLAlchemy query."""
return self.__query

View File

@ -0,0 +1,49 @@
from typing import Any, Dict, List, Optional, TypeVar, Generic
T = TypeVar('T')
class PostgresResponse(Generic[T]):
"""Base class for Postgres response handling."""
def __init__(
self,
status: bool,
message: str,
data: Optional[T] = None,
error: Optional[str] = None,
):
self.status = status
self.message = message
self.data = data
self.error = error
if isinstance(data, dict):
self.data_type = "dict"
elif isinstance(data, list):
self.data_type = "list"
else:
self.data_type = None
def as_dict(self) -> Dict[str, Any]:
"""Convert response to dictionary format."""
return {
"status": self.status,
"message": self.message,
"data": self.data,
"dataType": self.data_type,
"error": self.error,
}
@property
def all(self) -> Optional[List[T]]:
"""Get all data items if data is a list."""
if isinstance(self.data, list):
return self.data
return None
@property
def first(self) -> Optional[T]:
"""Get first data item if data is a list."""
if isinstance(self.data, list) and self.data:
return self.data[0]
return None

View File

View File

@ -0,0 +1,32 @@
from contextlib import contextmanager
from typing import Generator
from AllConfigs.SqlDatabase.configs import WagDatabase
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
engine_config: dict[str, object] = {
"url": WagDatabase.DATABASE_URL,
"pool_size": 20,
"max_overflow": 10,
"echo": True,
"echo_pool": True,
"isolation_level": "READ COMMITTED",
"pool_pre_ping": True,
}
engine = create_engine(**engine_config)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
Base = declarative_base()
@contextmanager
def get_db() -> Generator[Session, None, None]:
"""Get database session with context management."""
db = SessionLocal()
try:
yield db
finally:
db.close()

View File

@ -69,7 +69,9 @@ class RedisActions:
return arrow.now().shift(seconds=expiry_time).format(MainConfig.DATETIME_FORMAT)
@classmethod
def get_json(cls, list_keys: List[Union[str, bytes]]) -> RedisResponse:
def get_json(
cls, list_keys: List[Union[Optional[str], Optional[bytes]]]
) -> RedisResponse:
"""Get JSON values from Redis using pattern matching."""
try:
list_of_rows = []
@ -80,8 +82,10 @@ class RedisActions:
redis_row = RedisRow()
redis_row.set_key(key=row)
redis_row.expires_at = cls.resolve_expires_at(redis_row=redis_row)
redis_row.feed(redis_cli.get(redis_row.redis_key))
redis_value = redis_cli.get(redis_row.redis_key)
redis_row.feed(redis_value)
list_of_rows.append(redis_row)
return RedisResponse(
status=True,
message="Value is get successfully.",

View File

@ -27,22 +27,38 @@ class RedisRow:
@classmethod
def regex(cls, list_keys: List[Union[str, bytes]]) -> str:
"""Generate Redis search pattern from list of keys."""
search_regex = ""
for key, list_key in enumerate(list_keys):
if not list_key:
continue
"""Generate Redis search pattern from list of keys.
list_key = (
list_key.decode() if isinstance(list_key, bytes) else str(list_key)
)
if key == 0:
search_regex += f"{list_key}{cls.delimiter}*"
elif key == len(list_keys) - 1:
search_regex += f"*{cls.delimiter}{list_key}"
else:
search_regex += f"*{cls.delimiter}{list_key}{cls.delimiter}*"
return search_regex
Example:
list_keys = [None, "example1", "example2"]
Result: "*:example1:example2"
"""
# First create string with dash separators
temp_str = "-"
for list_key in list_keys:
if list_key:
list_key = (
list_key.decode() if isinstance(list_key, bytes) else str(list_key)
)
temp_str += f"{list_key}-"
# Remove redundant dashes
temp_str = temp_str.strip("-")
# If no valid keys, return empty string
if not temp_str:
return ""
# Replace dashes with delimiter
result = temp_str.replace("-", cls.delimiter)
# Add wildcard at start if first item was None
if list_keys and list_keys[0] is None:
result = f"*{cls.delimiter}{result}"
else:
result = f"{result}"
return result
@classmethod
def parse(cls) -> List[str]:

View File

@ -25,20 +25,26 @@ class RedisResponse:
self.error = error
def as_dict(self) -> Dict:
return {
data = self.all
main_dict = {
"status": self.status,
"message": self.message,
"data": self.data,
"count": self.count,
"dataType": self.data_type,
"error": self.error,
}
if isinstance(data, RedisRow):
return {"data": {data.keys: data.data}, **main_dict}
elif isinstance(data, list):
return {"data": {row.keys: row.data for row in data}, **main_dict}
@property
def all(self) -> Union[Optional[List[RedisRow]]]:
return self.data
return self.data or []
@property
def count(self) -> int:
return len(self.all)
@property
def first(self) -> Union[RedisRow, None]:
if self.data:
return self.data[0]
return None
return self.data[0] if self.data else None

View File

@ -1,4 +1,5 @@
from redis import Redis
from AllConfigs.Redis.configs import WagRedis

View File

@ -1,10 +1,7 @@
import secrets
import uuid
from Services.Redis import (
RedisActions,
AccessToken
)
from Services.Redis import RedisActions, AccessToken
first_user = AccessToken(
accessToken=secrets.token_urlsafe(90),
@ -39,4 +36,4 @@ set_response_second = RedisActions.set_json(
search_keys = [None, set_response_first_json["uu_id"]]
get_response = RedisActions.get_json(list_keys=search_keys)
print("get_response", [data.expires_at for data in get_response.all])
# print("get_response", [data.expires_at for data in get_response.all])

10
test.py
View File

@ -1,10 +1,7 @@
import secrets
import uuid
from Services.Redis import (
RedisActions,
AccessToken
)
from Services.Redis import RedisActions, AccessToken
first_user = AccessToken(
accessToken=secrets.token_urlsafe(90),
@ -36,7 +33,6 @@ set_response_second = RedisActions.set_json(
value=set_response_second_json,
expires={"seconds": 190},
)
search_keys = [None, set_response_first_json["uu_id"]]
search_keys = [None, "*a*"]
get_response = RedisActions.get_json(list_keys=search_keys)
print("get_response", [data.expires_at for data in get_response.all])
print("get_response.all", get_response.as_dict()["data"].values())