alcehmy and event functions updated
This commit is contained in:
@@ -10,7 +10,6 @@ from sqlalchemy import (
|
||||
Boolean,
|
||||
TIMESTAMP,
|
||||
Numeric,
|
||||
Identity,
|
||||
UUID,
|
||||
)
|
||||
|
||||
@@ -23,9 +22,7 @@ class AccountBooks(CrudCollection):
|
||||
country: Mapped[str] = mapped_column(String, nullable=False)
|
||||
branch_type: Mapped[str] = mapped_column(SmallInteger, server_default="0")
|
||||
|
||||
company_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("companies.id"), nullable=False
|
||||
)
|
||||
company_id: Mapped[int] = mapped_column(ForeignKey("companies.id"), nullable=False)
|
||||
company_uu_id: Mapped[UUID] = mapped_column(String, nullable=False)
|
||||
branch_id: Mapped[int] = mapped_column(ForeignKey("companies.id"))
|
||||
branch_uu_id: Mapped[UUID] = mapped_column(String, comment="Branch UU ID")
|
||||
@@ -466,9 +463,7 @@ class AccountRecords(CrudCollection):
|
||||
send_person_uu_id = mapped_column(
|
||||
String, nullable=True, comment="Send Person UU ID"
|
||||
)
|
||||
approving_accounting_person: Mapped[int] = mapped_column(
|
||||
ForeignKey("people.id")
|
||||
)
|
||||
approving_accounting_person: Mapped[int] = mapped_column(ForeignKey("people.id"))
|
||||
approving_accounting_person_uu_id = mapped_column(
|
||||
String, nullable=True, comment="Approving Accounting Person UU ID"
|
||||
)
|
||||
@@ -537,7 +532,7 @@ class AccountRecords(CrudCollection):
|
||||
"comment": "Bank Records that are related to building and financial transactions"
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
# def payment_budget_record_close(self):
|
||||
# from database_sql_models import (
|
||||
# DecisionBookProjectPaymentsMaster,
|
||||
|
||||
@@ -16,7 +16,6 @@ from sqlalchemy import (
|
||||
TIMESTAMP,
|
||||
Text,
|
||||
Numeric,
|
||||
Identity,
|
||||
)
|
||||
|
||||
from databases.sql_models.core_mixin import CrudCollection
|
||||
@@ -331,25 +330,27 @@ class Build(CrudCollection, SelectActionWithEmployee):
|
||||
data_dict["build_types_id"] = build_type.id
|
||||
del data_dict["build_types_uu_id"]
|
||||
build_created = cls.find_or_create(**data_dict)
|
||||
if not build_created.is_found:
|
||||
cls.__many__table__.find_or_create(
|
||||
company_id=token.selected_company.company_id,
|
||||
employee_id=token.selected_company.employee_id,
|
||||
member_id=build_created.id,
|
||||
is_confirmed=True,
|
||||
)
|
||||
cls.__many__table__.find_or_create(
|
||||
company_id=token.selected_company.company_id,
|
||||
employee_id=token.selected_company.employee_id,
|
||||
member_id=build_created.id,
|
||||
is_confirmed=True,
|
||||
)
|
||||
return build_created
|
||||
|
||||
@classmethod
|
||||
def update_action(cls, data: UpdateBuild, build_uu_id: str, token):
|
||||
from databases import Addresses
|
||||
|
||||
data_dict = data.excluded_dump()
|
||||
if data.official_address_uu_id:
|
||||
official_address = Addresses.find_one(uu_id=data.address_uu_id)
|
||||
data_dict["address_id"] = official_address.id
|
||||
official_address = Addresses.filter_one(
|
||||
Addresses.uu_id==data.address_uu_id
|
||||
).data
|
||||
data_dict["address_id"] = official_address.id if official_address else None
|
||||
del data_dict["address_uu_id"]
|
||||
if build_to_update := cls.find_one(uu_id=build_uu_id, person_id=token.id):
|
||||
if build_to_update := cls.filter_one(
|
||||
cls.uu_id==build_uu_id, cls.person_id==token.id
|
||||
).data:
|
||||
return build_to_update.update(**data_dict)
|
||||
|
||||
@property
|
||||
@@ -489,7 +490,9 @@ class BuildParts(CrudCollection):
|
||||
"Check with your supervisor.",
|
||||
)
|
||||
|
||||
if build_types := BuildTypes.find_one(uu_id=data.build_part_type_uu_id):
|
||||
if build_types := BuildTypes.filter_one(
|
||||
BuildTypes.uu_id==data.build_part_type_uu_id
|
||||
).data:
|
||||
part_direction = ApiEnumDropdown.get_by_uuid(
|
||||
uuid=str(data.part_direction_uu_id)
|
||||
)
|
||||
@@ -516,13 +519,13 @@ class BuildParts(CrudCollection):
|
||||
|
||||
if not data_dict["part_gross_size"]:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_418_IM_A_TEAPOT,
|
||||
status_code=status.HTTP_406_NOT_ACCEPTABLE,
|
||||
detail="Part Gross Size can not be empty.",
|
||||
)
|
||||
|
||||
if not data_dict["part_net_size"]:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_418_IM_A_TEAPOT,
|
||||
status_code=status.HTTP_406_NOT_ACCEPTABLE,
|
||||
detail="Part Net Size can not be empty.",
|
||||
)
|
||||
pt = int(data_dict["part_net_size"])
|
||||
|
||||
@@ -26,7 +26,6 @@ from sqlalchemy import (
|
||||
Text,
|
||||
Numeric,
|
||||
Integer,
|
||||
Identity,
|
||||
)
|
||||
from sqlalchemy.orm import mapped_column, Mapped, relationship
|
||||
|
||||
@@ -406,15 +405,17 @@ class BuildDecisionBookPerson(CrudCollection):
|
||||
if person_occupants := BuildDecisionBookPersonOccupants.find_or_create(
|
||||
**book_dict
|
||||
):
|
||||
decision_book = BuildDecisionBook.find_one(
|
||||
id=self.build_decision_book_id, active=True, is_confirmed=True
|
||||
)
|
||||
decision_book = BuildDecisionBook.filter_one(
|
||||
BuildDecisionBook.id==self.build_decision_book_id,
|
||||
BuildDecisionBook.active==True,
|
||||
BuildDecisionBook.is_confirmed==True
|
||||
).data
|
||||
person_occupants.update(
|
||||
expiry_starts=decision_book.expiry_starts,
|
||||
expiry_ends=decision_book.expiry_ends,
|
||||
)
|
||||
if not person_occupants.is_found and build_living_space_id:
|
||||
related_service = Services.find_one(
|
||||
if build_living_space_id:
|
||||
related_service = Services.filter_by_one(
|
||||
related_responsibility=str(occupant_type.occupant_code),
|
||||
active=True,
|
||||
is_confirmed=True,
|
||||
@@ -425,9 +426,11 @@ class BuildDecisionBookPerson(CrudCollection):
|
||||
detail=f"Service is not found for {occupant_type.occupant_code}",
|
||||
)
|
||||
|
||||
decision_build = Build.find_one(
|
||||
id=decision_book.build_id, active=True, is_confirmed=True
|
||||
)
|
||||
decision_build = Build.filter_one(
|
||||
Build.id==decision_book.build_id,
|
||||
Build.active==True,
|
||||
Build.is_confirmed==True
|
||||
).data
|
||||
management_room = decision_build.management_room
|
||||
if not management_room:
|
||||
raise HTTPException(
|
||||
@@ -435,8 +438,16 @@ class BuildDecisionBookPerson(CrudCollection):
|
||||
detail=f"Management Room is not found in {decision_build.build_name}",
|
||||
)
|
||||
|
||||
living_space = BuildLivingSpace.find_one(
|
||||
id=build_living_space_id, active=True, is_confirmed=True
|
||||
living_space = BuildLivingSpace.filter_one(
|
||||
BuildLivingSpace.id==build_living_space_id,
|
||||
BuildLivingSpace.active==True,
|
||||
BuildLivingSpace.is_confirmed==True
|
||||
).data
|
||||
expiry_ends = str(
|
||||
system_arrow.get(decision_book.meeting_date).shift(hours=23)
|
||||
)
|
||||
expiry_starts = str(
|
||||
system_arrow.get(decision_book.meeting_date)
|
||||
)
|
||||
related_living_space = BuildLivingSpace.find_or_create(
|
||||
build_parts_id=management_room.id,
|
||||
@@ -445,40 +456,36 @@ class BuildDecisionBookPerson(CrudCollection):
|
||||
occupant_type_uu_id=str(occupant_type.uu_id),
|
||||
person_id=living_space.person_id,
|
||||
person_uu_id=str(living_space.person_uu_id),
|
||||
expiry_starts=system_arrow.get(
|
||||
decision_book.meeting_date
|
||||
).__str__(),
|
||||
expiry_ends=system_arrow.get(decision_book.meeting_date)
|
||||
.shift(hours=23)
|
||||
.__str__(),
|
||||
expiry_starts=expiry_starts,
|
||||
expiry_ends=expiry_ends,
|
||||
is_confirmed=True,
|
||||
active=True,
|
||||
)
|
||||
expires_at = str(
|
||||
system_arrow.get(decision_book.meeting_date).shift(days=15)
|
||||
)
|
||||
ServiceBindOccupantEventMethods.bind_services_occupant_system(
|
||||
build_living_space_id=related_living_space.id,
|
||||
service_id=related_service.id,
|
||||
expires_at=str(
|
||||
system_arrow.get(decision_book.meeting_date).shift(days=15)
|
||||
),
|
||||
expires_at=expires_at,
|
||||
)
|
||||
return person_occupants
|
||||
return
|
||||
|
||||
def get_occupant_types(self):
|
||||
if occupants := BuildDecisionBookPersonOccupants.filter_active(
|
||||
if occupants := BuildDecisionBookPersonOccupants.filter_all(
|
||||
BuildDecisionBookPersonOccupants.build_decision_book_person_id == self.id,
|
||||
filter_records=False,
|
||||
):
|
||||
return occupants.data
|
||||
return
|
||||
|
||||
def check_occupant_type(self, occupant_type):
|
||||
book_person_occupant_type = BuildDecisionBookPersonOccupants.find_one(
|
||||
build_decision_book_person_id=self.id,
|
||||
occupant_type_id=occupant_type.id,
|
||||
active=True,
|
||||
is_confirmed=True,
|
||||
)
|
||||
book_person_occupant_type = BuildDecisionBookPersonOccupants.filter_one(
|
||||
BuildDecisionBookPersonOccupants.build_decision_book_person_id==self.id,
|
||||
BuildDecisionBookPersonOccupants.occupant_type_id==occupant_type.id,
|
||||
BuildDecisionBookPersonOccupants.active==True,
|
||||
BuildDecisionBookPersonOccupants.is_confirmed==True,
|
||||
).data
|
||||
if not book_person_occupant_type:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
@@ -839,9 +846,7 @@ class BuildDecisionBookPayments(CrudCollection):
|
||||
String, nullable=True, comment="Build Part UUID"
|
||||
)
|
||||
|
||||
budget_records_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("account_records.id")
|
||||
)
|
||||
budget_records_id: Mapped[int] = mapped_column(ForeignKey("account_records.id"))
|
||||
budget_records_uu_id: Mapped[str] = mapped_column(
|
||||
String, nullable=True, comment="Budget UUID"
|
||||
)
|
||||
@@ -936,9 +941,7 @@ class BuildDecisionBookLegal(CrudCollection):
|
||||
ForeignKey("people.id"), nullable=False
|
||||
)
|
||||
resp_attorney_uu_id = mapped_column(String, nullable=True, comment="Attorney UUID")
|
||||
resp_attorney_company_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("companies.id")
|
||||
)
|
||||
resp_attorney_company_id: Mapped[int] = mapped_column(ForeignKey("companies.id"))
|
||||
resp_attorney_company_uu_id = mapped_column(
|
||||
String, nullable=True, comment="Company UUID"
|
||||
)
|
||||
|
||||
@@ -18,9 +18,7 @@ class Departments(CrudCollection):
|
||||
)
|
||||
department_description: Mapped[str] = mapped_column(String, server_default="")
|
||||
|
||||
company_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("companies.id"), nullable=False
|
||||
)
|
||||
company_id: Mapped[int] = mapped_column(ForeignKey("companies.id"), nullable=False)
|
||||
company_uu_id: Mapped[str] = mapped_column(
|
||||
String, nullable=False, comment="Company UUID"
|
||||
)
|
||||
|
||||
@@ -3,7 +3,6 @@ from sqlalchemy import (
|
||||
ForeignKey,
|
||||
Index,
|
||||
Numeric,
|
||||
Identity,
|
||||
)
|
||||
from sqlalchemy.orm import mapped_column, Mapped
|
||||
from databases.sql_models.core_mixin import CrudCollection
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
import datetime
|
||||
from decimal import Decimal
|
||||
|
||||
from sqlalchemy import (
|
||||
TIMESTAMP,
|
||||
NUMERIC,
|
||||
func,
|
||||
Identity,
|
||||
text,
|
||||
UUID,
|
||||
String,
|
||||
Integer,
|
||||
Boolean,
|
||||
SmallInteger,
|
||||
)
|
||||
|
||||
from sqlalchemy.orm import (
|
||||
Mapped,
|
||||
mapped_column,
|
||||
@@ -63,6 +65,18 @@ class CrudMixin(Base, SmartQueryMixin, SessionMixin, FilterAttributes):
|
||||
"updated_by_id",
|
||||
"replication_id",
|
||||
)
|
||||
__system_default_model__ = [
|
||||
"cryp_uu_id",
|
||||
"is_confirmed",
|
||||
"deleted",
|
||||
"is_notification_send",
|
||||
"replication_id",
|
||||
"is_email_send",
|
||||
"confirmed_by_id",
|
||||
"confirmed_by",
|
||||
"updated_by_id",
|
||||
"created_by_id",
|
||||
]
|
||||
|
||||
creds: Credentials = None # The credentials to use in the model.
|
||||
client_arrow: DateTimeLocal = None # The arrow to use in the model.
|
||||
@@ -74,6 +88,17 @@ class CrudMixin(Base, SmartQueryMixin, SessionMixin, FilterAttributes):
|
||||
TIMESTAMP, default="2099-12-31", server_default="2099-12-31"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def remove_non_related_inputs(cls, kwargs):
|
||||
"""
|
||||
Removes the non-related inputs from the given attributes.
|
||||
"""
|
||||
return {
|
||||
key: value
|
||||
for key, value in kwargs.items()
|
||||
if key in cls.columns + cls.hybrid_properties + cls.settable_relations
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def extract_system_fields(cls, filter_kwargs: dict, create: bool = True):
|
||||
"""
|
||||
@@ -87,6 +112,47 @@ class CrudMixin(Base, SmartQueryMixin, SessionMixin, FilterAttributes):
|
||||
system_fields.pop(field, None)
|
||||
return system_fields
|
||||
|
||||
@classmethod
|
||||
def iterate_over_variables(cls, val, key):
|
||||
key_ = cls.__annotations__.get(key, None)
|
||||
is_primary, value_type = key in cls.primary_keys, type(val)
|
||||
row_attr = bool(getattr(getattr(cls, key), "foreign_keys", None))
|
||||
if is_primary or row_attr and key_ == Mapped[int]:
|
||||
return None
|
||||
if key_:
|
||||
if key_ == Mapped[int]:
|
||||
return int(val) if val else None
|
||||
elif key_ == Mapped[bool]:
|
||||
return bool(val) if val else None
|
||||
elif key_ == Mapped[float] or key_ == Mapped[NUMERIC]:
|
||||
return round(float(val), 3) if val else None
|
||||
elif key_ == Mapped[int]:
|
||||
return int(val) if val else None
|
||||
elif key_ == Mapped[TIMESTAMP]:
|
||||
formatted_date = client_arrow.get(str(val)).format(
|
||||
"DD-MM-YYYY HH:mm:ss"
|
||||
)
|
||||
return str(formatted_date) if val else None
|
||||
else:
|
||||
if isinstance(val, datetime.datetime):
|
||||
formatted_date = client_arrow.get(str(val)).format(
|
||||
"DD-MM-YYYY HH:mm:ss"
|
||||
)
|
||||
print(key, "isinstance(value_type, datetime) | ", formatted_date)
|
||||
return str(formatted_date) if val else None
|
||||
elif isinstance(value_type, bool):
|
||||
return bool(val) if val else None
|
||||
elif isinstance(value_type, float) or isinstance(value_type, Decimal):
|
||||
return round(float(val), 3) if val else None
|
||||
elif isinstance(value_type, int):
|
||||
return int(val) if val else None
|
||||
elif isinstance(value_type, str):
|
||||
return str(val) if val else None
|
||||
elif isinstance(value_type, type(None)):
|
||||
return None
|
||||
|
||||
return str(val) if val else None
|
||||
|
||||
@classmethod
|
||||
def find_or_create(cls, **kwargs):
|
||||
from api_library.date_time_actions.date_functions import system_arrow
|
||||
@@ -97,19 +163,18 @@ class CrudMixin(Base, SmartQueryMixin, SessionMixin, FilterAttributes):
|
||||
is_found can be used to check if the record was found or created.
|
||||
"""
|
||||
check_kwargs = cls.extract_system_fields(kwargs)
|
||||
cls.pre_query = cls.query.filter(
|
||||
system_arrow.get(cls.expiry_ends).date() < system_arrow.now().date()
|
||||
)
|
||||
already_record = cls.filter_by_one(**check_kwargs)
|
||||
cls.pre_query = cls.query.filter(cls.expiry_ends < system_arrow.now().date())
|
||||
already_record = cls.filter_by_one(**check_kwargs, system=True)
|
||||
cls.pre_query = None
|
||||
if already_record := already_record.data:
|
||||
if already_record.is_deleted:
|
||||
if already_record.deleted:
|
||||
cls.raise_http_exception(
|
||||
status_code="HTTP_406_NOT_ACCEPTABLE",
|
||||
error_case="DeletedRecord",
|
||||
data=check_kwargs,
|
||||
message="Record exits but is deleted. Contact with authorized user",
|
||||
)
|
||||
elif already_record.is_confirmed:
|
||||
elif not already_record.is_confirmed:
|
||||
cls.raise_http_exception(
|
||||
status_code="HTTP_406_NOT_ACCEPTABLE",
|
||||
error_case="IsNotConfirmed",
|
||||
@@ -122,6 +187,7 @@ class CrudMixin(Base, SmartQueryMixin, SessionMixin, FilterAttributes):
|
||||
data=check_kwargs,
|
||||
message="Record already exits. Refresh data and try again",
|
||||
)
|
||||
check_kwargs = cls.remove_non_related_inputs(check_kwargs)
|
||||
created_record = cls()
|
||||
for key, value in check_kwargs.items():
|
||||
setattr(created_record, key, value)
|
||||
@@ -130,28 +196,8 @@ class CrudMixin(Base, SmartQueryMixin, SessionMixin, FilterAttributes):
|
||||
cls.created_by = cls.creds.person_name
|
||||
return created_record
|
||||
|
||||
@classmethod
|
||||
def iterate_over_variables(cls, val, key):
|
||||
key_ = cls.__annotations__.get(key, None)
|
||||
is_primary, is_foreign_key = key in cls.primary_keys and bool(getattr(cls, key).foreign_keys)
|
||||
if is_primary or is_foreign_key and key_ == Mapped[int]:
|
||||
return None
|
||||
elif key_ == Mapped[UUID]:
|
||||
return str(val) if val else None
|
||||
elif key_ == Mapped[int]:
|
||||
return int(val) if val else None
|
||||
elif key_ == Mapped[bool]:
|
||||
return bool(val) if val else None
|
||||
elif key_ == Mapped[float] or key_ == Mapped[NUMERIC]:
|
||||
return float(val) if val else None
|
||||
elif key_ == Mapped[int]:
|
||||
return int(val) if val else None
|
||||
elif key_ == Mapped[TIMESTAMP]:
|
||||
return str(client_arrow.get(val).format("DD-MM-YYYY HH:mm:ss")) if val else None
|
||||
return str(val) if val else None
|
||||
|
||||
|
||||
def update(self, **kwargs):
|
||||
check_kwargs = self.remove_non_related_inputs(kwargs)
|
||||
"""Updates the record with the given attributes."""
|
||||
is_confirmed_argument = kwargs.get("is_confirmed", None)
|
||||
if is_confirmed_argument and not len(kwargs) == 1:
|
||||
@@ -161,7 +207,7 @@ class CrudMixin(Base, SmartQueryMixin, SessionMixin, FilterAttributes):
|
||||
data=kwargs,
|
||||
message="Confirm field can not be updated with other fields",
|
||||
)
|
||||
check_kwargs = self.extract_system_fields(kwargs, create=False)
|
||||
check_kwargs = self.extract_system_fields(check_kwargs, create=False)
|
||||
for key, value in check_kwargs.items():
|
||||
setattr(self, key, value)
|
||||
|
||||
@@ -178,24 +224,48 @@ class CrudMixin(Base, SmartQueryMixin, SessionMixin, FilterAttributes):
|
||||
self, exclude: list = None, include: list = None, include_joins: list = None
|
||||
):
|
||||
return_dict = {}
|
||||
if exclude:
|
||||
exclude.extend(list(set(self.__exclude__fields__).difference(exclude)))
|
||||
else:
|
||||
exclude = self.__exclude__fields__
|
||||
include = include or []
|
||||
if include:
|
||||
include.extend(["uu_id", "active"])
|
||||
include = list(set(include).difference(self.__exclude__fields__))
|
||||
for key, val in self.to_dict().items():
|
||||
if key in include:
|
||||
if value_of_database := self.iterate_over_variables(val, key):
|
||||
return_dict[key] = value_of_database
|
||||
exclude_list = [
|
||||
element
|
||||
for element in self.__system_default_model__
|
||||
if str(element)[-2:] == "id"
|
||||
]
|
||||
columns_include_list = list(set(include).difference(set(exclude_list)))
|
||||
columns_include_list.extend(["uu_id", "active"])
|
||||
for key in list(columns_include_list):
|
||||
val = getattr(self, key)
|
||||
value_of_database = self.iterate_over_variables(val, key)
|
||||
if value_of_database is not None:
|
||||
return_dict[key] = value_of_database
|
||||
elif exclude:
|
||||
exclude.extend(
|
||||
list(set(self.__exclude__fields__ or []).difference(exclude))
|
||||
)
|
||||
for i in self.__system_default_model__:
|
||||
print("i", str(i)[-2:])
|
||||
exclude.extend(
|
||||
[
|
||||
element
|
||||
for element in self.__system_default_model__
|
||||
if str(element)[-2:] == "id"
|
||||
]
|
||||
)
|
||||
columns_excluded_list = set(self.columns).difference(set(exclude))
|
||||
for key in list(columns_excluded_list):
|
||||
val = getattr(self, key)
|
||||
value_of_database = self.iterate_over_variables(val, key)
|
||||
if value_of_database is not None:
|
||||
return_dict[key] = value_of_database
|
||||
else:
|
||||
exclude.extend(["is_confirmed", "deleted", "cryp_uu_id"])
|
||||
for key, val in self.to_dict().items():
|
||||
if key not in exclude:
|
||||
if value_of_database := self.iterate_over_variables(val, key):
|
||||
return_dict[key] = value_of_database
|
||||
exclude_list = (
|
||||
self.__exclude__fields__ or [] + self.__system_default_model__
|
||||
)
|
||||
columns_list = list(set(self.columns).difference(set(exclude_list)))
|
||||
for key in list(columns_list):
|
||||
val = getattr(self, key)
|
||||
value_of_database = self.iterate_over_variables(val, key)
|
||||
if value_of_database is not None:
|
||||
return_dict[key] = value_of_database
|
||||
|
||||
all_arguments = [
|
||||
record
|
||||
@@ -218,15 +288,15 @@ class CrudMixin(Base, SmartQueryMixin, SessionMixin, FilterAttributes):
|
||||
return_dict[all_argument] = (
|
||||
populate_arg.get_dict() if populate_arg else []
|
||||
)
|
||||
return return_dict
|
||||
return dict(sorted(return_dict.items(), reverse=False))
|
||||
|
||||
|
||||
class BaseMixin(CrudMixin, ReprMixin, SerializeMixin):
|
||||
class BaseMixin(CrudMixin, ReprMixin, SerializeMixin, FilterAttributes):
|
||||
|
||||
__abstract__ = True
|
||||
|
||||
|
||||
class BaseCollection(CrudMixin, BaseMixin):
|
||||
class BaseCollection(BaseMixin):
|
||||
|
||||
__abstract__ = True
|
||||
__repr__ = ReprMixin.__repr__
|
||||
@@ -234,14 +304,14 @@ class BaseCollection(CrudMixin, BaseMixin):
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
|
||||
|
||||
class CrudCollection(CrudMixin, BaseMixin, SmartQueryMixin):
|
||||
class CrudCollection(BaseMixin, SmartQueryMixin):
|
||||
|
||||
__abstract__ = True
|
||||
__repr__ = ReprMixin.__repr__
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
uu_id: Mapped[UUID] = mapped_column(
|
||||
UUID, server_default=func.text("gen_random_uuid()"), index=True, unique=True
|
||||
UUID, server_default=text("gen_random_uuid()"), index=True, unique=True
|
||||
)
|
||||
|
||||
ref_id: Mapped[UUID] = mapped_column(String(100), nullable=True, index=True)
|
||||
|
||||
@@ -8,7 +8,6 @@ from sqlalchemy import (
|
||||
Boolean,
|
||||
Integer,
|
||||
Index,
|
||||
Identity,
|
||||
)
|
||||
from sqlalchemy.orm import mapped_column, Mapped
|
||||
|
||||
@@ -92,9 +91,7 @@ class Services(CrudCollection):
|
||||
__tablename__ = "services"
|
||||
__exclude__fields__ = []
|
||||
|
||||
module_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("modules.id"), nullable=False
|
||||
)
|
||||
module_id: Mapped[int] = mapped_column(ForeignKey("modules.id"), nullable=False)
|
||||
module_uu_id: Mapped[str] = mapped_column(
|
||||
String, nullable=False, comment="Module UUID"
|
||||
)
|
||||
@@ -118,9 +115,7 @@ class Service2Events(CrudCollection):
|
||||
__tablename__ = "services2events"
|
||||
__exclude__fields__ = []
|
||||
|
||||
service_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("services.id"), nullable=False
|
||||
)
|
||||
service_id: Mapped[int] = mapped_column(ForeignKey("services.id"), nullable=False)
|
||||
service_uu_id = mapped_column(String, nullable=False, comment="Service UUID")
|
||||
event_id: Mapped[int] = mapped_column(ForeignKey("events.id"), nullable=False)
|
||||
event_uu_id = mapped_column(String, nullable=False, comment="Event UUID")
|
||||
@@ -148,9 +143,9 @@ class Event2Employee(CrudCollection):
|
||||
|
||||
@classmethod
|
||||
def get_event_id_by_employee_id(cls, employee_id) -> (list, list):
|
||||
active_events = cls.filter_active(cls.employee_id == employee_id)
|
||||
active_events = cls.filter_all(cls.employee_id == employee_id)
|
||||
active_events_id = [event.event_id for event in active_events.data]
|
||||
active_events = Events.filter_active(Events.id.in_(active_events_id))
|
||||
active_events = Events.filter_all(Events.id.in_(active_events_id))
|
||||
active_events_uu_id = [str(event.uu_id) for event in active_events.data]
|
||||
return active_events_id, active_events_uu_id
|
||||
|
||||
@@ -205,13 +200,9 @@ class ModulePrice(CrudCollection):
|
||||
__exclude__fields__ = []
|
||||
|
||||
campaign_code = mapped_column(String, nullable=False, comment="Campaign Code")
|
||||
module_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("modules.id"), nullable=False
|
||||
)
|
||||
module_id: Mapped[int] = mapped_column(ForeignKey("modules.id"), nullable=False)
|
||||
module_uu_id = mapped_column(String, nullable=False, comment="Module UUID")
|
||||
service_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("services.id"), nullable=False
|
||||
)
|
||||
service_id: Mapped[int] = mapped_column(ForeignKey("services.id"), nullable=False)
|
||||
service_uu_id = mapped_column(String, nullable=False, comment="Service UUID")
|
||||
event_id: Mapped[int] = mapped_column(ForeignKey("events.id"), nullable=False)
|
||||
event_uu_id = mapped_column(String, nullable=False, comment="Event UUID")
|
||||
|
||||
@@ -20,7 +20,6 @@ from sqlalchemy import (
|
||||
Integer,
|
||||
Text,
|
||||
or_,
|
||||
Identity,
|
||||
)
|
||||
from sqlalchemy.orm import mapped_column, relationship, Mapped
|
||||
|
||||
@@ -425,9 +424,7 @@ class RelationshipEmployee2PostCode(CrudCollection):
|
||||
company_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("companies.id"), nullable=True
|
||||
) # 1, 2, 3
|
||||
employee_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("employees.id"), nullable=False
|
||||
)
|
||||
employee_id: Mapped[int] = mapped_column(ForeignKey("employees.id"), nullable=False)
|
||||
member_id: Mapped[int] = mapped_column(
|
||||
ForeignKey("address_postcode.id"), nullable=False
|
||||
)
|
||||
@@ -465,13 +462,25 @@ class Addresses(CrudCollection):
|
||||
__tablename__ = "addresses"
|
||||
__exclude__fields__ = []
|
||||
|
||||
build_number: Mapped[str] = mapped_column(String(24), nullable=False, comment="Build Number")
|
||||
door_number: Mapped[str] = mapped_column(String(24), nullable=True, comment="Door Number")
|
||||
floor_number: Mapped[str] = mapped_column(String(24), nullable=True, comment="Floor Number")
|
||||
build_number: Mapped[str] = mapped_column(
|
||||
String(24), nullable=False, comment="Build Number"
|
||||
)
|
||||
door_number: Mapped[str] = mapped_column(
|
||||
String(24), nullable=True, comment="Door Number"
|
||||
)
|
||||
floor_number: Mapped[str] = mapped_column(
|
||||
String(24), nullable=True, comment="Floor Number"
|
||||
)
|
||||
|
||||
comment_address: Mapped[str] = mapped_column(String, nullable=False, comment="Address")
|
||||
letter_address: Mapped[str] = mapped_column(String, nullable=False, comment="Address")
|
||||
short_letter_address: Mapped[str] = mapped_column(String, nullable=False, comment="Address")
|
||||
comment_address: Mapped[str] = mapped_column(
|
||||
String, nullable=False, comment="Address"
|
||||
)
|
||||
letter_address: Mapped[str] = mapped_column(
|
||||
String, nullable=False, comment="Address"
|
||||
)
|
||||
short_letter_address: Mapped[str] = mapped_column(
|
||||
String, nullable=False, comment="Address"
|
||||
)
|
||||
|
||||
latitude: Mapped[float] = mapped_column(Numeric(20, 12), server_default="0")
|
||||
longitude: Mapped[float] = mapped_column(Numeric(20, 12), server_default="0")
|
||||
@@ -483,9 +492,10 @@ class Addresses(CrudCollection):
|
||||
|
||||
@classmethod
|
||||
def list_via_employee(cls, token_dict, filter_expr):
|
||||
post_code_list = RelationshipEmployee2PostCode.filter_active(
|
||||
post_code_list = RelationshipEmployee2PostCode.filter_all(
|
||||
RelationshipEmployee2PostCode.employee_id
|
||||
== token_dict.selected_company.employee_id,
|
||||
RelationshipEmployee2PostCode.active==True,
|
||||
).data
|
||||
post_code_id_list = [post_code.member_id for post_code in post_code_list]
|
||||
if not post_code_id_list:
|
||||
@@ -493,10 +503,10 @@ class Addresses(CrudCollection):
|
||||
status_code=404,
|
||||
detail="User has no post code registered. User can not list addresses.",
|
||||
)
|
||||
cls.pre_query = Addresses.filter_active(
|
||||
cls.post_code_id.in_(post_code_id_list)
|
||||
).query
|
||||
return cls.filter_active(*filter_expr)
|
||||
cls.pre_query = cls.filter_active(cls.post_code_id.in_(post_code_id_list)).query
|
||||
filter_cls = cls.filter_all(*filter_expr)
|
||||
cls.pre_query = None
|
||||
return filter_cls
|
||||
|
||||
# buildings: Mapped["Build"] = relationship(
|
||||
# "Build", back_populates="addresses", foreign_keys="Build.address_id"
|
||||
|
||||
@@ -4,7 +4,6 @@ from sqlalchemy import (
|
||||
UUID,
|
||||
String,
|
||||
text,
|
||||
Identity,
|
||||
)
|
||||
from sqlalchemy.orm import (
|
||||
Mapped,
|
||||
@@ -40,22 +39,26 @@ class ApiEnumDropdown(BaseCollection):
|
||||
if search := cls.query.filter(
|
||||
cls.enum_class.in_(["DebitTypes"]),
|
||||
cls.uu_id == search_uu_id,
|
||||
cls.active == True,
|
||||
).first():
|
||||
return search
|
||||
elif search_debit:
|
||||
if search := cls.query.filter(
|
||||
cls.enum_class.in_(["DebitTypes"]),
|
||||
cls.key == search_debit,
|
||||
cls.active == True,
|
||||
).first():
|
||||
return search
|
||||
return cls.query.filter(
|
||||
cls.enum_class.in_(["DebitTypes"]),
|
||||
cls.active == True,
|
||||
).all()
|
||||
|
||||
@classmethod
|
||||
def get_due_types(cls):
|
||||
if due_list := cls.filter_active(
|
||||
cls.enum_class == "BuildDuesTypes", cls.key.in_(["BDT-A", "BDT-D"])
|
||||
if due_list := cls.filter_all(
|
||||
cls.enum_class == "BuildDuesTypes", cls.key.in_(["BDT-A", "BDT-D"]),
|
||||
cls.active == True,
|
||||
).data:
|
||||
return [due.uu_id.__str__() for due in due_list]
|
||||
raise HTTPException(
|
||||
@@ -69,16 +72,19 @@ class ApiEnumDropdown(BaseCollection):
|
||||
if search := cls.query.filter(
|
||||
cls.enum_class.in_(["BuildDuesTypes"]),
|
||||
cls.uu_id == search_uu_id,
|
||||
cls.active == True,
|
||||
).first():
|
||||
return search
|
||||
elif search_management:
|
||||
if search := cls.query.filter(
|
||||
cls.enum_class.in_(["BuildDuesTypes"]),
|
||||
cls.key == search_management,
|
||||
cls.active == True,
|
||||
).first():
|
||||
return search
|
||||
return cls.query.filter(
|
||||
cls.enum_class.in_(["BuildDuesTypes"]),
|
||||
cls.active == True,
|
||||
).all()
|
||||
|
||||
def get_enum_dict(self):
|
||||
@@ -92,7 +98,9 @@ class ApiEnumDropdown(BaseCollection):
|
||||
|
||||
@classmethod
|
||||
def uuid_of_enum(cls, enum_class: str, key: str):
|
||||
return str(getattr(cls.find_one(enum_class=enum_class, key=key), "uu_id", None))
|
||||
return str(getattr(cls.filter_one(
|
||||
cls.enum_class==enum_class, cls.key==key
|
||||
).data, "uu_id", None))
|
||||
|
||||
|
||||
ApiEnumDropdown.set_session(ApiEnumDropdown.__session__)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from sqlalchemy import SQLColumnExpression
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from api_validations.validations_request import ListOptions
|
||||
@@ -90,24 +91,51 @@ class FilterAttributes:
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_not_expired_query_arg(cls, *arg, expired=True):
|
||||
def add_new_arg_to_args(cls, args_list, argument, value):
|
||||
new_arg_list = list(
|
||||
args_ for args_ in list(args_list) if isinstance(args_, SQLColumnExpression)
|
||||
)
|
||||
if not any(True for arg in new_arg_list if arg.left.key == argument):
|
||||
new_arg_list.append(value)
|
||||
return tuple(new_arg_list)
|
||||
|
||||
@classmethod
|
||||
def get_not_expired_query_arg(cls, arg):
|
||||
"""Add expiry_starts and expiry_ends to the query."""
|
||||
from api_library.date_time_actions.date_functions import system_arrow
|
||||
|
||||
if expired:
|
||||
arg_add = (
|
||||
*arg[0],
|
||||
system_arrow.get(cls.expiry_ends) >= system_arrow.now(),
|
||||
system_arrow.now() >= system_arrow.get(cls.expiry_starts),
|
||||
)
|
||||
return arg_add
|
||||
return arg[0]
|
||||
arg = cls.add_new_arg_to_args(
|
||||
arg, "expiry_ends", cls.expiry_ends >= str(system_arrow.now())
|
||||
)
|
||||
arg = cls.add_new_arg_to_args(
|
||||
arg, "expiry_starts", cls.expiry_starts <= str(system_arrow.now())
|
||||
)
|
||||
return arg
|
||||
|
||||
@classmethod
|
||||
def filter_by_all(cls, **kwargs):
|
||||
def select_only(
|
||||
cls, *args, select_args: list, order_by=None, limit=None, system=False
|
||||
):
|
||||
if not system:
|
||||
args = cls.add_new_arg_to_args(
|
||||
args, "is_confirmed", cls.is_confirmed == True
|
||||
)
|
||||
args = cls.get_not_expired_query_arg(args)
|
||||
query = cls._query().filter(*args).with_entities(*select_args)
|
||||
if order_by is not None:
|
||||
query = query.order_by(order_by)
|
||||
if limit:
|
||||
query = query.limit(limit)
|
||||
return AlchemyResponse(query=query, first=False)
|
||||
|
||||
@classmethod
|
||||
def filter_by_all(cls, system=False, **kwargs):
|
||||
"""
|
||||
Filters all the records regardless of is_deleted, is_confirmed.
|
||||
"""
|
||||
if "is_confirmed" not in kwargs and not system:
|
||||
kwargs["is_confirmed"] = True
|
||||
kwargs.pop("system", None)
|
||||
query = cls._query().filter_by(**kwargs)
|
||||
if cls.filter_attr:
|
||||
filter_list = cls.get_filter_attributes()
|
||||
@@ -116,18 +144,27 @@ class FilterAttributes:
|
||||
return AlchemyResponse(query=query, first=False)
|
||||
|
||||
@classmethod
|
||||
def filter_by_one(cls, **kwargs):
|
||||
def filter_by_one(cls, system=False, **kwargs):
|
||||
"""
|
||||
Filters one record regardless of is_deleted, is_confirmed.
|
||||
"""
|
||||
if "is_confirmed" not in kwargs and not system:
|
||||
kwargs["is_confirmed"] = True
|
||||
kwargs.pop("system", None)
|
||||
query = cls._query().filter_by(**kwargs)
|
||||
return AlchemyResponse(query=query, first=True)
|
||||
|
||||
@classmethod
|
||||
def filter_all(cls, *args, expired: bool = False):
|
||||
def filter_all(cls, *args, system=False):
|
||||
"""
|
||||
Filters all the records regardless of is_deleted, is_confirmed.
|
||||
"""
|
||||
if not system:
|
||||
args = cls.add_new_arg_to_args(
|
||||
args, "is_confirmed", cls.is_confirmed == True
|
||||
)
|
||||
args = cls.get_not_expired_query_arg(args)
|
||||
|
||||
query = cls._query().filter(*args)
|
||||
if cls.filter_attr:
|
||||
filter_list = cls.get_filter_attributes()
|
||||
@@ -136,12 +173,16 @@ class FilterAttributes:
|
||||
return AlchemyResponse(query=query, first=False)
|
||||
|
||||
@classmethod
|
||||
def filter_one(cls, *args, expired: bool = False):
|
||||
def filter_one(cls, *args, system=False, expired: bool = False):
|
||||
"""
|
||||
Filters one record regardless of is_deleted, is_confirmed.
|
||||
"""
|
||||
arg = cls.get_not_expired_query_arg(args, expired=expired)
|
||||
query = cls._query().filter(*arg)
|
||||
if not system:
|
||||
args = cls.add_new_arg_to_args(
|
||||
args, "is_confirmed", cls.is_confirmed == True
|
||||
)
|
||||
args = cls.get_not_expired_query_arg(args)
|
||||
query = cls._query().filter(*args)
|
||||
return AlchemyResponse(query=query, first=True)
|
||||
|
||||
@classmethod
|
||||
|
||||
Reference in New Issue
Block a user