diff --git a/AllConfigs/SqlDatabase/configs.py b/AllConfigs/SqlDatabase/configs.py index 7478ba7..f33684b 100644 --- a/AllConfigs/SqlDatabase/configs.py +++ b/AllConfigs/SqlDatabase/configs.py @@ -9,3 +9,9 @@ class WagDatabase: PASSWORD: str = "berkay_wag_user_password" DATABASE_NAME: str = "wag_database" DATABASE_URL: str = f"{SQL}://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DATABASE_NAME}" + + +class PaginateConfig: + DEFAULT_SIZE = 10 + MIN_SIZE = 10 + MAX_SIZE = 50 diff --git a/ApiEvents/AuthServiceApi/events/auth/auth.py b/ApiEvents/AuthServiceApi/events/auth/auth.py index 3ba9eb5..c323246 100644 --- a/ApiEvents/AuthServiceApi/events/auth/auth.py +++ b/ApiEvents/AuthServiceApi/events/auth/auth.py @@ -88,8 +88,15 @@ class AuthenticationLoginEventMethods(MethodToEvent): # Return response with token and headers return { - **token, - "headers": dict(request.headers), + "completed": True, + "message": "User is logged in successfully", + "access_token": token.get("access_token"), + "refresh_token": token.get("refresher_token"), + "access_object": { + "user_type": token.get("user_type"), + "companies_list": token.get("companies_list"), + }, + "user": token.get("user"), } @@ -176,6 +183,9 @@ class AuthenticationSelectEventMethods(MethodToEvent): # Get reachable events reachable_event_codes = Event2Employee.get_event_codes(employee_id=employee.id) + reachable_event_endpoints = Event2Employee.get_event_endpoints( + employee_id=employee.id + ) # Get staff and duties staff = Staff.filter_one(Staff.id == employee.staff_id, db=db_session).data @@ -206,6 +216,7 @@ class AuthenticationSelectEventMethods(MethodToEvent): employee_id=employee.id, employee_uu_id=employee.uu_id.__str__(), reachable_event_codes=reachable_event_codes, + reachable_event_endpoints=reachable_event_endpoints, ) try: # Update Redis update_token = TokenService.update_token_at_redis( @@ -246,6 +257,10 @@ class AuthenticationSelectEventMethods(MethodToEvent): reachable_event_codes = Event2Occupant.get_event_codes( build_living_space_id=selected_build_living_space.id ) + reachable_event_endpoints = Event2Occupant.get_event_endpoints( + build_living_space_id=selected_build_living_space.id + ) + occupant_type = OccupantTypes.filter_one( OccupantTypes.id == selected_build_living_space.occupant_type_id, db=db, @@ -289,6 +304,7 @@ class AuthenticationSelectEventMethods(MethodToEvent): responsible_company_id=company_related.id, responsible_company_uuid=company_related.uu_id.__str__(), reachable_event_codes=reachable_event_codes, + reachable_event_endpoints=reachable_event_endpoints, ) try: # Update Redis diff --git a/ApiEvents/AuthServiceApi/events/auth/endpoints.py b/ApiEvents/AuthServiceApi/events/auth/endpoints.py index 887170c..6f1d3fb 100644 --- a/ApiEvents/AuthServiceApi/events/auth/endpoints.py +++ b/ApiEvents/AuthServiceApi/events/auth/endpoints.py @@ -69,9 +69,10 @@ async def authentication_select_company_or_occupant_type( request=request, data=data, token_dict=auth_dict ): if data.is_employee: - return {"selected_company": data.company_uu_id} + return {"selected_company": data.company_uu_id, "completed": True} elif data.is_occupant: - return {"selected_occupant": data.build_living_space_uu_id} + return {"selected_occupant": data.build_living_space_uu_id, "completed": True} + return {"completed": False, "selected_company": None, "selected_occupant": None} @endpoint_wrapper("/authentication/login") @@ -87,10 +88,9 @@ async def authentication_login_with_domain_and_creds( ) -@endpoint_wrapper("/authentication/check") +@endpoint_wrapper("/authentication/valid") async def authentication_check_token_is_valid( request: "Request", - data: EndpointBaseRequestModel, ) -> Dict[str, Any]: """ Check if a token is valid. @@ -99,12 +99,10 @@ async def authentication_check_token_is_valid( access_token = TokenService.get_access_token_from_request(request=request) if TokenService.get_object_via_access_key(access_token=access_token): return { - "status": True, "message": "Access Token is valid", } except HTTPException: return { - "status": False, "message": "Access Token is NOT valid", } @@ -261,9 +259,9 @@ AUTH_CONFIG = RouteFactoryConfig( ), EndpointFactoryConfig( url_prefix=prefix, - url_endpoint="/check", - url_of_endpoint="/authentication/check", - endpoint="/check", + url_endpoint="/valid", + url_of_endpoint="/authentication/valid", + endpoint="/valid", method="GET", summary="Check access token is valid", description="Check access token is valid", diff --git a/ApiEvents/EventServiceApi/events/account/endpoints.py b/ApiEvents/EventServiceApi/events/account/endpoints.py index 53f5672..a6b0f2a 100644 --- a/ApiEvents/EventServiceApi/events/account/endpoints.py +++ b/ApiEvents/EventServiceApi/events/account/endpoints.py @@ -81,9 +81,9 @@ ACCOUNT_RECORDS_CONFIG = RouteFactoryConfig( endpoints=[ EndpointFactoryConfig( url_prefix=prefix, - url_endpoint="/address/list", - url_of_endpoint="/account/records/address/list", - endpoint="/address/list", + url_endpoint="/list", + url_of_endpoint=f"{prefix}/list", + endpoint="/list", method="POST", summary="List Active/Delete/Confirm Address", description="List Active/Delete/Confirm Address", @@ -93,9 +93,9 @@ ACCOUNT_RECORDS_CONFIG = RouteFactoryConfig( ), EndpointFactoryConfig( url_prefix=prefix, - url_endpoint="/address/create", - url_of_endpoint="/account/records/address/create", - endpoint="/address/create", + url_endpoint="/create", + url_of_endpoint=f"{prefix}/create", + endpoint="/create", method="POST", summary="Create Address with given auth levels", description="Create Address with given auth levels", @@ -105,9 +105,9 @@ ACCOUNT_RECORDS_CONFIG = RouteFactoryConfig( ), EndpointFactoryConfig( url_prefix=prefix, - url_endpoint="/address/search", - url_of_endpoint="/account/records/address/search", - endpoint="/address/search", + url_endpoint="/search", + url_of_endpoint=f"{prefix}/search", + endpoint="/search", method="POST", summary="Search Address with given auth levels", description="Search Address with given auth levels", @@ -117,9 +117,9 @@ ACCOUNT_RECORDS_CONFIG = RouteFactoryConfig( ), EndpointFactoryConfig( url_prefix=prefix, - url_endpoint="/address/{address_uu_id}", - url_of_endpoint="/account/records/address/{address_uu_id}", - endpoint="/address/{address_uu_id}", + url_endpoint="/{address_uu_id}", + url_of_endpoint="{prefix}/" + "{address_uu_id}", + endpoint="/{address_uu_id}", method="PUT", summary="Update Address with given auth levels", description="Update Address with given auth levels", diff --git a/ApiEvents/EventServiceApi/events/address/endpoints.py b/ApiEvents/EventServiceApi/events/address/endpoints.py new file mode 100644 index 0000000..d9205d8 --- /dev/null +++ b/ApiEvents/EventServiceApi/events/address/endpoints.py @@ -0,0 +1,112 @@ +""" +Account records endpoint configurations. + +""" + +from ApiEvents.abstract_class import ( + RouteFactoryConfig, + EndpointFactoryConfig, + endpoint_wrapper, +) +from ApiEvents.base_request_model import EndpointBaseRequestModel + +from Services.PostgresDb.Models.alchemy_response import DictJsonResponse +from fastapi import Request, Path, Body + + +@endpoint_wrapper("/address/list") +async def address_list(request: "Request", data: EndpointBaseRequestModel): + """Handle address list endpoint.""" + auth_dict = address_list.auth + code_dict = getattr(address_list, "func_code", {"function_code": None}) + return {"auth_dict": auth_dict, "code_dict": code_dict, "data": data} + + +@endpoint_wrapper("/address/create") +async def address_create(request: "Request", data: EndpointBaseRequestModel): + """Handle address creation endpoint.""" + return { + "data": data, + "request": str(request.headers), + "request_url": str(request.url), + "request_base_url": str(request.base_url), + } + + +@endpoint_wrapper("/address/update/{address_uu_id}") +async def address_update( + request: Request, + address_uu_id: str = Path(..., description="UUID of the address to update"), + request_data: EndpointBaseRequestModel = Body(..., description="Request body"), +): + """ + Handle address update endpoint. + + Args: + request: FastAPI request object + address_uu_id: UUID of the address to update + request_data: Request body containing updated address data + + Returns: + DictJsonResponse: Response containing updated address info + """ + auth_dict = address_update.auth + return DictJsonResponse( + data={ + "address_uu_id": address_uu_id, + "data": request_data.root, + "request": str(request.headers), + "request_url": str(request.url), + "request_base_url": str(request.base_url), + } + ) + + +prefix = "/address" + + +# Address Router Configuration +ADDRESS_CONFIG = RouteFactoryConfig( + name="address", + prefix=prefix, + tags=["Address"], + include_in_schema=True, + endpoints=[ + EndpointFactoryConfig( + url_prefix=prefix, + url_endpoint="/list", + url_of_endpoint=f"{prefix}/list", + endpoint="/list", + method="POST", + summary="List Active/Delete/Confirm Address", + description="List Active/Delete/Confirm Address", + is_auth_required=True, + is_event_required=True, + endpoint_function=address_list, + ), + EndpointFactoryConfig( + url_prefix=prefix, + url_endpoint="/create", + url_of_endpoint=f"{prefix}/create", + endpoint="/create", + method="POST", + summary="Create Address with given auth levels", + description="Create Address with given auth levels", + is_auth_required=False, + is_event_required=False, + endpoint_function=address_create, + ), + EndpointFactoryConfig( + url_prefix=prefix, + url_endpoint="/{address_uu_id}", + url_of_endpoint="{prefix}/" + "{address_uu_id}", + endpoint="/{address_uu_id}", + method="PUT", + summary="Update Address with given auth levels", + description="Update Address with given auth levels", + is_auth_required=True, + is_event_required=True, + endpoint_function=address_update, + ), + ], +).as_dict() diff --git a/ApiEvents/EventServiceApi/events/address/endpoint.py b/ApiEvents/EventServiceApi/events/building/build_area.py similarity index 100% rename from ApiEvents/EventServiceApi/events/address/endpoint.py rename to ApiEvents/EventServiceApi/events/building/build_area.py diff --git a/ApiEvents/ValidationServiceApi/events/validation/__init__.py b/ApiEvents/EventServiceApi/events/building/build_parts.py similarity index 100% rename from ApiEvents/ValidationServiceApi/events/validation/__init__.py rename to ApiEvents/EventServiceApi/events/building/build_parts.py diff --git a/ApiEvents/EventServiceApi/events/building/build_sites.py b/ApiEvents/EventServiceApi/events/building/build_sites.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/building/build_types.py b/ApiEvents/EventServiceApi/events/building/build_types.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/building/building.py b/ApiEvents/EventServiceApi/events/building/building.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/building/living_spaces.py b/ApiEvents/EventServiceApi/events/building/living_spaces.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/company/company.py b/ApiEvents/EventServiceApi/events/company/company.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/company/department.py b/ApiEvents/EventServiceApi/events/company/department.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/company/duties.py b/ApiEvents/EventServiceApi/events/company/duties.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/company/duty.py b/ApiEvents/EventServiceApi/events/company/duty.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/company/employee.py b/ApiEvents/EventServiceApi/events/company/employee.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/company/staff.py b/ApiEvents/EventServiceApi/events/company/staff.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/decision_book/book_payment.py b/ApiEvents/EventServiceApi/events/decision_book/book_payment.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/decision_book/decision_book.py b/ApiEvents/EventServiceApi/events/decision_book/decision_book.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/decision_book/decision_book_items.py b/ApiEvents/EventServiceApi/events/decision_book/decision_book_items.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/decision_book/decision_book_items_debits.py b/ApiEvents/EventServiceApi/events/decision_book/decision_book_items_debits.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/decision_book/decision_book_person.py b/ApiEvents/EventServiceApi/events/decision_book/decision_book_person.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/decision_book/invitations.py b/ApiEvents/EventServiceApi/events/decision_book/invitations.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/identity/people.py b/ApiEvents/EventServiceApi/events/identity/people.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/identity/users.py b/ApiEvents/EventServiceApi/events/identity/users.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/project_decision_book/project_decision_book.py b/ApiEvents/EventServiceApi/events/project_decision_book/project_decision_book.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/project_decision_book/project_decision_book_items.py b/ApiEvents/EventServiceApi/events/project_decision_book/project_decision_book_items.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/EventServiceApi/events/project_decision_book/project_decision_book_person.py b/ApiEvents/EventServiceApi/events/project_decision_book/project_decision_book_person.py new file mode 100644 index 0000000..e69de29 diff --git a/ApiEvents/TemplateServiceApi/endpoint/endpoint.py b/ApiEvents/TemplateServiceApi/endpoint/endpoints.py similarity index 100% rename from ApiEvents/TemplateServiceApi/endpoint/endpoint.py rename to ApiEvents/TemplateServiceApi/endpoint/endpoints.py diff --git a/ApiEvents/ValidationServiceApi/events/available/endpoints.py b/ApiEvents/ValidationServiceApi/events/available/endpoints.py new file mode 100644 index 0000000..9c4d88b --- /dev/null +++ b/ApiEvents/ValidationServiceApi/events/available/endpoints.py @@ -0,0 +1,121 @@ +from typing import TYPE_CHECKING, Dict, Any, Union + +from ApiEvents.base_request_model import DictRequestModel, EndpointBaseRequestModel +from ApiEvents.abstract_class import ( + RouteFactoryConfig, + EndpointFactoryConfig, + endpoint_wrapper, +) +from ApiValidations.Custom.token_objects import EmployeeTokenObject, OccupantTokenObject +from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi +from ApiLibrary.common.line_number import get_line_number_for_error + + +if TYPE_CHECKING: + from fastapi import Request, HTTPException, status, Body + + +# Type aliases for common types +prefix = "/available" + + +async def check_endpoints_available( + request: "Request" +) -> Dict[str, Any]: + """ + Check if endpoints are available. + """ + auth_dict = check_endpoints_available.auth + selection_of_user = None + if auth_dict.is_occupant: + selection_of_user = auth_dict.selected_occupant + else: + selection_of_user = auth_dict.selected_company + if not selection_of_user: + raise HTTPExceptionApi( + error_code="", + lang=auth_dict.lang, + loc=get_line_number_for_error(), + sys_msg="User selection not found", + ) + return {"reachable_event_endpoints": selection_of_user.reachable_event_endpoints} + + +async def check_endpoint_available( + request: "Request", + data: EndpointBaseRequestModel, +) -> Dict[str, Any]: + """ + Check if endpoints are available. + """ + auth_dict = check_endpoint_available.auth + print("data", data) + data_dict = data.data + endpoint_asked = data_dict.get("endpoint", None) + + if not endpoint_asked: + raise HTTPExceptionApi( + error_code="", + lang=auth_dict.lang, + loc=get_line_number_for_error(), + sys_msg="Endpoint not found", + ) + + selection_of_user = None + if auth_dict.is_occupant: + selection_of_user = auth_dict.selected_occupant + else: + selection_of_user = auth_dict.selected_company + if not selection_of_user: + raise HTTPExceptionApi( + error_code="", + lang=auth_dict.lang, + loc=get_line_number_for_error(), + sys_msg="User selection not found", + ) + + if endpoint_asked not in selection_of_user.reachable_event_endpoints: + raise HTTPExceptionApi( + error_code="", + lang=auth_dict.lang, + loc=get_line_number_for_error(), + sys_msg="Endpoint not found", + ) + return { + "endpoint": endpoint_asked, + "status": "OK" + } + + +AVAILABLE_CONFIG = RouteFactoryConfig( + name="available_endpoints", + prefix=prefix, + tags=["Available Endpoints"], + include_in_schema=True, + endpoints=[ + EndpointFactoryConfig( + url_prefix=prefix, + url_endpoint="/endpoints", + url_of_endpoint=f"{prefix}/endpoints", + endpoint="/endpoints", + method="POST", + summary="Retrieve all endpoints available for user", + description="", + is_auth_required=True, # Needs token_dict + is_event_required=False, + endpoint_function=check_endpoints_available, + ), + EndpointFactoryConfig( + url_prefix=prefix, + url_endpoint="/endpoint", + url_of_endpoint=f"{prefix}/endpoint", + endpoint="/endpoint", + method="POST", + summary="Retrieve an endpoint available for user", + description="", + is_auth_required=True, # Needs token_dict + is_event_required=False, + endpoint_function=check_endpoint_available, + ), + ], +).as_dict() diff --git a/ApiEvents/ValidationServiceApi/events/available/models.py b/ApiEvents/ValidationServiceApi/events/available/models.py new file mode 100644 index 0000000..ec9c70e --- /dev/null +++ b/ApiEvents/ValidationServiceApi/events/available/models.py @@ -0,0 +1,325 @@ +""" + request models. +""" + +from typing import TYPE_CHECKING, Dict, Any, Literal, Optional, TypedDict, Union +from pydantic import BaseModel, Field, model_validator, RootModel, ConfigDict +from ApiEvents.base_request_model import BaseRequestModel, DictRequestModel +from ApiValidations.Custom.token_objects import EmployeeTokenObject, OccupantTokenObject +from ApiValidations.Request.base_validations import ListOptions +from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi +from Schemas.identity.identity import ( + AddressPostcode, + Addresses, + RelationshipEmployee2PostCode, +) + + +if TYPE_CHECKING: + from fastapi import Request + + +class AddressListEventMethods(MethodToEvent): + + event_type = "SELECT" + event_description = "List Address records" + event_category = "Address" + + __event_keys__ = { + "9c251d7d-da70-4d63-a72c-e69c26270442": "address_list_super_user", + "52afe375-dd95-4f4b-aaa2-4ec61bc6de52": "address_list_employee", + } + __event_validation__ = { + "9c251d7d-da70-4d63-a72c-e69c26270442": ListAddressResponse, + "52afe375-dd95-4f4b-aaa2-4ec61bc6de52": ListAddressResponse, + } + + @classmethod + def address_list_super_user( + cls, + list_options: ListOptions, + token_dict: Union[EmployeeTokenObject, OccupantTokenObject], + ): + db = RelationshipEmployee2PostCode.new_session() + post_code_list = RelationshipEmployee2PostCode.filter_all( + RelationshipEmployee2PostCode.company_id + == token_dict.selected_company.company_id, + db=db, + ).data + post_code_id_list = [post_code.member_id for post_code in post_code_list] + if not post_code_id_list: + raise HTTPExceptionApi( + status_code=404, + detail="User has no post code registered. User can not list addresses.", + ) + get_street_ids = [ + street_id[0] + for street_id in AddressPostcode.select_only( + AddressPostcode.id.in_(post_code_id_list), + select_args=[AddressPostcode.street_id], + order_by=AddressPostcode.street_id.desc(), + ).data + ] + if not get_street_ids: + raise HTTPExceptionApi( + status_code=404, + detail="User has no street registered. User can not list addresses.", + ) + Addresses.pre_query = Addresses.filter_all( + Addresses.street_id.in_(get_street_ids), + ).query + Addresses.filter_attr = list_options + records = Addresses.filter_all().data + return + # return AlchemyJsonResponse( + # completed=True, message="List Address records", result=records + # ) + + @classmethod + def address_list_employee( + cls, + list_options: ListOptions, + token_dict: Union[EmployeeTokenObject, OccupantTokenObject], + ): + Addresses.filter_attr = list_options + Addresses.pre_query = Addresses.filter_all( + Addresses.street_id.in_(get_street_ids), + ) + records = Addresses.filter_all().data + return + # return AlchemyJsonResponse( + # completed=True, message="List Address records", result=records + # ) + + +class AddressCreateEventMethods(MethodToEvent): + + event_type = "CREATE" + event_description = "" + event_category = "" + + __event_keys__ = { + "ffdc445f-da10-4ce4-9531-d2bdb9a198ae": "create_address", + } + __event_validation__ = { + "ffdc445f-da10-4ce4-9531-d2bdb9a198ae": InsertAddress, + } + + @classmethod + def create_address( + cls, + data: InsertAddress, + token_dict: Union[EmployeeTokenObject, OccupantTokenObject], + ): + post_code = AddressPostcode.filter_one( + AddressPostcode.uu_id == data.post_code_uu_id, + ).data + if not post_code: + raise HTTPExceptionApi( + status_code=404, + detail="Post code not found. User can not create address without post code.", + ) + + data_dict = data.excluded_dump() + data_dict["street_id"] = post_code.street_id + data_dict["street_uu_id"] = str(post_code.street_uu_id) + del data_dict["post_code_uu_id"] + address = Addresses.find_or_create(**data_dict) + address.save() + address.update(is_confirmed=True) + address.save() + return AlchemyJsonResponse( + completed=True, + message="Address created successfully", + result=address.get_dict(), + ) + + +class AddressSearchEventMethods(MethodToEvent): + """Event methods for searching addresses. + + This class handles address search functionality including text search + and filtering. + """ + + event_type = "SEARCH" + event_description = "Search for addresses using text and filters" + event_category = "Address" + + __event_keys__ = { + "e0ac1269-e9a7-4806-9962-219ac224b0d0": "search_address", + } + __event_validation__ = { + "e0ac1269-e9a7-4806-9962-219ac224b0d0": SearchAddress, + } + + @classmethod + def _build_order_clause( + cls, filter_list: Dict[str, Any], schemas: List[str], filter_table: Any + ) -> Any: + """Build the ORDER BY clause for the query. + + Args: + filter_list: Dictionary of filter options + schemas: List of available schema fields + filter_table: SQLAlchemy table to query + + Returns: + SQLAlchemy order_by clause + """ + # Default to ordering by UUID if field not in schema + if filter_list.get("order_field") not in schemas: + filter_list["order_field"] = "uu_id" + else: + # Extract table and field from order field + table_name, field_name = str(filter_list.get("order_field")).split(".") + filter_table = getattr(databases.sql_models, table_name) + filter_list["order_field"] = field_name + + # Build order clause + field = getattr(filter_table, filter_list.get("order_field")) + return ( + field.desc() + if str(filter_list.get("order_type"))[0] == "d" + else field.asc() + ) + + @classmethod + def _format_record(cls, record: Any, schemas: List[str]) -> Dict[str, str]: + """Format a database record into a dictionary. + + Args: + record: Database record to format + schemas: List of schema fields + + Returns: + Formatted record dictionary + """ + result = {} + for index, schema in enumerate(schemas): + value = str(record[index]) + # Special handling for UUID fields + if "uu_id" in value: + value = str(value) + result[schema] = value + return result + + @classmethod + def search_address( + cls, + data: SearchAddress, + token_dict: Union[EmployeeTokenObject, OccupantTokenObject], + ) -> JSONResponse: + """Search for addresses using text search and filters. + + Args: + data: Search parameters including text and filters + token_dict: Authentication token + + Returns: + JSON response with search results + + Raises: + HTTPExceptionApi: If search fails + """ + try: + # Start performance measurement + start_time = perf_counter() + + # Get initial query + search_result = AddressStreet.search_address_text(search_text=data.search) + if not search_result: + raise HTTPExceptionApi( + status_code=status.HTTP_404_NOT_FOUND, + detail="No addresses found matching search criteria", + ) + + query = search_result.get("query") + schemas = search_result.get("schema") + + # Apply filters + filter_list = data.list_options.dump() + filter_table = AddressStreet + + # Build and apply order clause + order = cls._build_order_clause(filter_list, schemas, filter_table) + + # Apply pagination + page_size = int(filter_list.get("size")) + offset = (int(filter_list.get("page")) - 1) * page_size + + # Execute query + query = ( + query.order_by(order) + .limit(page_size) + .offset(offset) + .populate_existing() + ) + records = list(query.all()) + + # Format results + results = [cls._format_record(record, schemas) for record in records] + + # Log performance + duration = perf_counter() - start_time + print(f"Address search completed in {duration:.3f}s") + + return AlchemyJsonResponse( + completed=True, message="Address search results", result=results + ) + + except HTTPExceptionApi as e: + # Re-raise HTTP exceptions + raise e + except Exception as e: + # Log and wrap other errors + print(f"Address search error: {str(e)}") + raise HTTPExceptionApi( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to search addresses", + ) from e + + +class AddressUpdateEventMethods(MethodToEvent): + + event_type = "UPDATE" + event_description = "" + event_category = "" + + __event_keys__ = { + "1f9c3a9c-e5bd-4dcd-9b9a-3742d7e03a27": "update_address", + } + __event_validation__ = { + "1f9c3a9c-e5bd-4dcd-9b9a-3742d7e03a27": UpdateAddress, + } + + @classmethod + def update_address( + cls, + address_uu_id: str, + data: UpdateAddress, + token_dict: Union[EmployeeTokenObject, OccupantTokenObject], + ): + if isinstance(token_dict, EmployeeTokenObject): + address = Addresses.filter_one( + Addresses.uu_id == address_uu_id, + ).data + if not address: + raise HTTPExceptionApi( + status_code=404, + detail=f"Address not found. User can not update with given address uuid : {address_uu_id}", + ) + + data_dict = data.excluded_dump() + updated_address = address.update(**data_dict) + updated_address.save() + return AlchemyJsonResponse( + completed=True, + message="Address updated successfully", + result=updated_address.get_dict(), + ) + elif isinstance(token_dict, OccupantTokenObject): + raise HTTPExceptionApi( + status_code=403, + detail="Occupant can not update address.", + ) diff --git a/ApiEvents/ValidationServiceApi/events/validation/endpoints.py b/ApiEvents/ValidationServiceApi/events/validation/endpoints.py index 6c62f35..cf8b9d1 100644 --- a/ApiEvents/ValidationServiceApi/events/validation/endpoints.py +++ b/ApiEvents/ValidationServiceApi/events/validation/endpoints.py @@ -81,7 +81,7 @@ VALIDATION_CONFIG_MAIN =RouteFactoryConfig( method="POST", summary="Select company or occupant type", description="Select company or occupant type", - is_auth_required=False, # Needs token_dict + is_auth_required=True, # Needs token_dict is_event_required=False, endpoint_function=validations_validations_select, ), @@ -93,7 +93,7 @@ VALIDATION_CONFIG_MAIN =RouteFactoryConfig( method="POST", summary="Select company or occupant type", description="Select company or occupant type", - is_auth_required=False, # Needs token_dict + is_auth_required=True, # Needs token_dict is_event_required=False, endpoint_function=validations_headers_select, ), @@ -105,7 +105,7 @@ VALIDATION_CONFIG_MAIN =RouteFactoryConfig( method="POST", summary="Select company or occupant type", description="Select company or occupant type", - is_auth_required=False, # Needs token_dict + is_auth_required=True, # Needs token_dict is_event_required=False, endpoint_function=validations_validations_and_headers_select, ), diff --git a/ApiEvents/ValidationServiceApi/events/validation/validation.py b/ApiEvents/ValidationServiceApi/events/validation/validation.py index 00e8511..806f3eb 100644 --- a/ApiEvents/ValidationServiceApi/events/validation/validation.py +++ b/ApiEvents/ValidationServiceApi/events/validation/validation.py @@ -75,7 +75,7 @@ class ValidationsBoth(MethodToEvent): return { "headers": validation.headers, "validation": validation.validation, - "language_models": language_model_all, + # "language_models": language_model_all, } diff --git a/ApiEvents/ValidationServiceApi/route_configs.py b/ApiEvents/ValidationServiceApi/route_configs.py index c9c8f95..f3845bf 100644 --- a/ApiEvents/ValidationServiceApi/route_configs.py +++ b/ApiEvents/ValidationServiceApi/route_configs.py @@ -7,9 +7,10 @@ to be used by the dynamic route creation system. from typing import Dict, List, Any, TypeVar from .events.validation.endpoints import VALIDATION_CONFIG +from .events.available.endpoints import AVAILABLE_CONFIG # Registry of all route configurations -ROUTE_CONFIGS = [VALIDATION_CONFIG] +ROUTE_CONFIGS = [VALIDATION_CONFIG, AVAILABLE_CONFIG] def get_route_configs() -> List[Dict[str, Any]]: diff --git a/ApiLibrary/__init__.py b/ApiLibrary/__init__.py index b85b70c..9da6e7d 100644 --- a/ApiLibrary/__init__.py +++ b/ApiLibrary/__init__.py @@ -1,9 +1,16 @@ -from ApiLibrary.date_time_actions.date_functions import DateTimeLocal, system_arrow +from ApiLibrary.date_time_actions.date_functions import ( + DateTimeLocal, + system_arrow, + client_arrow, +) from ApiLibrary.extensions.select import SelectActionWithEmployee, SelectAction +from ApiLibrary.common.line_number import get_line_number_for_error __all__ = [ "DateTimeLocal", "system_arrow", + "client_arrow", + "get_line_number_for_error", "SelectActionWithEmployee", "SelectAction", ] diff --git a/ApiValidations/Custom/token_objects.py b/ApiValidations/Custom/token_objects.py index 4054d34..4b3a24f 100644 --- a/ApiValidations/Custom/token_objects.py +++ b/ApiValidations/Custom/token_objects.py @@ -59,7 +59,7 @@ class OccupantToken(BaseModel): responsible_employee_uuid: Optional[str] = None reachable_event_codes: Optional[list[str]] = None # ID list of reachable modules - # reachable_event_list_uu_id: Optional[list] = None # UUID list of reachable modules + reachable_event_endpoints: Optional[list[str]] = None class CompanyToken(BaseModel): # Required Company Object for an employee @@ -82,7 +82,7 @@ class CompanyToken(BaseModel): # Required Company Object for an employee bulk_duties_id: int reachable_event_codes: Optional[list[str]] = None # ID list of reachable modules - # reachable_event_list_uu_id: Optional[list] = None # UUID list of reachable modules + reachable_event_endpoints: Optional[list[str]] = None class OccupantTokenObject(ApplicationToken): diff --git a/DockerApiServices/AllApiNeeds/middleware/token_event_middleware.py b/DockerApiServices/AllApiNeeds/middleware/token_event_middleware.py index e60e523..b39591a 100644 --- a/DockerApiServices/AllApiNeeds/middleware/token_event_middleware.py +++ b/DockerApiServices/AllApiNeeds/middleware/token_event_middleware.py @@ -218,9 +218,9 @@ class TokenEventMiddleware: Returns: Callable: The wrapped function with both auth and event handling """ - # First apply authentication - authenticated_func = MiddlewareModule.auth_required(func) - + # # First apply authentication + # authenticated_func = MiddlewareModule.auth_required(func) + authenticated_func = func @wraps(authenticated_func) async def wrapper(request: Request, *args, **kwargs) -> Dict[str, Any]: diff --git a/Schemas/event/event.py b/Schemas/event/event.py index 1479466..c2befca 100644 --- a/Schemas/event/event.py +++ b/Schemas/event/event.py @@ -278,6 +278,42 @@ class Event2Employee(CrudCollection): active_events.extend(events_extra) return [event.function_code for event in active_events] + @classmethod + def get_event_endpoints(cls, employee_id: int) -> list: + from Schemas import EndpointRestriction + db = cls.new_session() + employee_events = cls.filter_all( + cls.employee_id == employee_id, + db=db, + ).data + active_event_ids = Service2Events.filter_all( + Service2Events.service_id.in_( + [event.event_service_id for event in employee_events] + ), + db=db, + system=True, + ).data + active_events = Events.filter_all( + Events.id.in_([event.event_id for event in active_event_ids]), + db=db, + ).data + if extra_events := Event2EmployeeExtra.filter_all( + Event2EmployeeExtra.employee_id == employee_id, + db=db, + ).data: + events_extra = Events.filter_all( + Events.id.in_([event.event_id for event in extra_events]), + db=db, + ).data + active_events.extend(events_extra) + endpoint_restrictions = EndpointRestriction.filter_all( + EndpointRestriction.id.in_( + [event.endpoint_id for event in active_events] + ), + db=db, + ).data + return [event.endpoint_name for event in endpoint_restrictions] + class Event2Occupant(CrudCollection): """ @@ -342,6 +378,41 @@ class Event2Occupant(CrudCollection): active_events.extend(events_extra) return [event.function_code for event in active_events] + @classmethod + def get_event_endpoints(cls, build_living_space_id) -> list: + from Schemas import EndpointRestriction + db = cls.new_session() + occupant_events = cls.filter_all( + cls.build_living_space_id == build_living_space_id, + db=db, + ).data + active_event_ids = Service2Events.filter_all( + Service2Events.service_id.in_( + [event.event_service_id for event in occupant_events] + ), + db=db, + system=True, + ).data + active_events = Events.filter_all( + Events.id.in_([event.event_id for event in active_event_ids]), + db=db, + ).data + if extra_events := Event2OccupantExtra.filter_all( + Event2OccupantExtra.build_living_space_id == build_living_space_id, + db=db, + ).data: + events_extra = Events.filter_all( + Events.id.in_([event.event_id for event in extra_events]), + db=db, + ).data + active_events.extend(events_extra) + endpoint_restrictions = EndpointRestriction.filter_all( + EndpointRestriction.id.in_( + [event.endpoint_id for event in active_events] + ), + db=db, + ).data + return [event.endpoint_name for event in endpoint_restrictions] class ModulePrice(CrudCollection): """ diff --git a/Services/PostgresDb/Models/core_alchemy.py b/Services/PostgresDb/Models/core_alchemy.py new file mode 100644 index 0000000..dbcb1d8 --- /dev/null +++ b/Services/PostgresDb/Models/core_alchemy.py @@ -0,0 +1,148 @@ +from typing import Type, TypeVar + +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import Session +from ApiLibrary import get_line_number_for_error +from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi + +# Type variable for class methods returning self +T = TypeVar("T", bound="FilterAttributes") + + +class BaseAlchemyModel: + """ + Controller of alchemy to database transactions. + Query: Query object for model + Session: Session object for model + Actions: save, flush, rollback, commit + """ + __abstract__ = True + + @classmethod + def new_session(cls) -> Session: + """Get database session.""" + from Services.PostgresDb.database import get_db + with get_db() as session: + return session + + @classmethod + def flush(cls: Type[T], db: Session) -> T: + """ + Flush the current session to the database. + + Args: + db: Database session + + Returns: + Self instance + + Raises: + HTTPException: If database operation fails + """ + try: + db.flush() + return cls + except SQLAlchemyError as e: + raise HTTPExceptionApi( + error_code="HTTP_304_NOT_MODIFIED", + lang=cls.lang or "tr", + loc=get_line_number_for_error(), + sys_msg=str(e), + ) + + def destroy(self: Type[T], db: Session) -> None: + """ + Delete the record from the database. + + Args: + db: Database session + """ + db.delete(self) + + @classmethod + def save_via_metadata(cls: Type[T], db: Session) -> None: + """ + Save or rollback based on metadata. + + Args: + db: Database session + + Raises: + HTTPException: If save operation fails + """ + try: + if cls.is_created: + db.commit() + db.flush() + db.rollback() + except SQLAlchemyError as e: + raise HTTPExceptionApi( + error_code="HTTP_304_NOT_MODIFIED", + lang=cls.lang or "tr", + loc=get_line_number_for_error(), + sys_msg=str(e), + ) + + @classmethod + def save(cls: Type[T], db: Session) -> None: + """ + Commit changes to database. + + Args: + db: Database session + + Raises: + HTTPException: If commit fails + """ + try: + db.commit() + except SQLAlchemyError as e: + raise HTTPExceptionApi( + error_code="HTTP_304_NOT_MODIFIED", + lang=cls.lang or "tr", + loc=get_line_number_for_error(), + sys_msg=str(e), + ) + except Exception as e: + raise HTTPExceptionApi( + error_code="HTTP_500_INTERNAL_SERVER_ERROR", + lang=cls.lang or "tr", + loc=get_line_number_for_error(), + sys_msg=str(e), + ) + + @classmethod + def save_and_confirm(cls: Type[T], db: Session) -> None: + """ + Save changes and mark record as confirmed. + + Args: + db: Database session + + Raises: + HTTPException: If operation fails + """ + try: + cls.save(db) + cls.update(db, is_confirmed=True) + cls.save(db) + except SQLAlchemyError as e: + raise HTTPExceptionApi( + error_code="HTTP_304_NOT_MODIFIED", + lang=cls.lang or "tr", + loc=get_line_number_for_error(), + sys_msg=str(e), + ) + + @classmethod + def rollback(cls: Type[T], db: Session) -> None: + """ + Rollback current transaction. + + Args: + db: Database session + """ + db.rollback() + + + diff --git a/Services/PostgresDb/Models/crud_alchemy.py b/Services/PostgresDb/Models/crud_alchemy.py new file mode 100644 index 0000000..f67720d --- /dev/null +++ b/Services/PostgresDb/Models/crud_alchemy.py @@ -0,0 +1,395 @@ +import datetime + +from decimal import Decimal +from typing import Any, Dict, List, Optional +from sqlalchemy import TIMESTAMP, NUMERIC +from sqlalchemy.orm import Session, Mapped +from pydantic import BaseModel + +from ApiLibrary import system_arrow, get_line_number_for_error, client_arrow +from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi + +from Services.PostgresDb.Models.core_alchemy import BaseAlchemyModel +from Services.PostgresDb.Models.system_fields import SystemFields + + +class MetaDataRow(BaseModel): + created: Optional[bool] = False + message: Optional[str] = None + error_case: Optional[str] = None + + +class Credentials(BaseModel): + person_id: int + person_name: str + + +class CrudActions(SystemFields): + + @classmethod + def extract_system_fields(cls, filter_kwargs: dict, create: bool = True) -> Dict[str, Any]: + """ + Remove system-managed fields from input dictionary. + + Args: + filter_kwargs: Input dictionary of fields + create: If True, use creation field list, else use update field list + + Returns: + Dictionary with system fields removed + """ + system_fields = filter_kwargs.copy() + extract_fields = ( + cls.__system__fields__create__ if create else cls.__system__fields__update__ + ) + for field in extract_fields: + system_fields.pop(field, None) + return system_fields + + @classmethod + def remove_non_related_inputs(cls, kwargs: Dict[str, Any]) -> Dict[str, Any]: + """ + Filter out inputs that don't correspond to model fields. + + Args: + kwargs: Dictionary of field names and values + + Returns: + Dictionary containing only valid model fields + """ + return { + key: value + for key, value in kwargs.items() + if key in cls.columns + cls.hybrid_properties + cls.settable_relations + } + + + + @classmethod + def iterate_over_variables(cls, val: Any, key: str) -> tuple[bool, Optional[Any]]: + """ + Process a field value based on its type and convert it to the appropriate format. + + Args: + val: Field value + key: Field name + + Returns: + Tuple of (should_include, processed_value) + """ + key_ = cls.__annotations__.get(key, None) + is_primary = key in cls.primary_keys + row_attr = bool(getattr(getattr(cls, key), "foreign_keys", None)) + + # Skip primary keys and foreign keys + if is_primary or row_attr: + return False, None + + # Handle None values + if val is None: + return True, None + + # Special handling for UUID fields + if str(key[-5:]).lower() == "uu_id": + return True, str(val) + + # Handle typed fields + if key_: + if key_ == Mapped[int]: + return True, int(val) + elif key_ == Mapped[bool]: + return True, bool(val) + elif key_ == Mapped[float] or key_ == Mapped[NUMERIC]: + return True, round(float(val), 3) + elif key_ == Mapped[TIMESTAMP]: + return True, str( + system_arrow.get(str(val)).format("YYYY-MM-DD HH:mm:ss ZZ") + ) + elif key_ == Mapped[str]: + return True, str(val) + + # Handle based on Python types + else: + if isinstance(val, datetime.datetime): + return True, str( + system_arrow.get(str(val)).format("YYYY-MM-DD HH:mm:ss ZZ") + ) + elif isinstance(val, bool): + return True, bool(val) + elif isinstance(val, (float, Decimal)): + return True, round(float(val), 3) + elif isinstance(val, int): + return True, int(val) + elif isinstance(val, str): + return True, str(val) + elif val is None: + return True, None + + return False, None + + def get_dict( + self, + exclude: Optional[List[str]] = None, + include: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """ + Convert model instance to dictionary with customizable fields. + + Args: + exclude: List of fields to exclude + include: List of fields to include (takes precedence over exclude) + + Returns: + Dictionary representation of the model + """ + return_dict: Dict[str, Any] = {} + + if include: + # Handle explicitly included fields + exclude_list = [ + element + for element in self.__system_default_model__ + if str(element)[-2:] == "id" and str(element)[-5:].lower() == "uu_id" + ] + columns_include_list = list(set(include).difference(set(exclude_list))) + columns_include_list.extend(["uu_id"]) + + for key in columns_include_list: + val = getattr(self, key) + correct, value_of_database = self.iterate_over_variables(val, key) + if correct: + return_dict[key] = value_of_database + + elif exclude: + # Handle explicitly excluded fields + exclude.extend( + list( + set(getattr(self, "__exclude__fields__", []) or []).difference( + exclude + ) + ) + ) + exclude.extend( + [ + element + for element in self.__system_default_model__ + if str(element)[-2:] == "id" + ] + ) + + columns_excluded_list = list(set(self.columns).difference(set(exclude))) + columns_excluded_list.extend(["uu_id", "active"]) + + for key in columns_excluded_list: + val = getattr(self, key) + correct, value_of_database = self.iterate_over_variables(val, key) + if correct: + return_dict[key] = value_of_database + else: + # Handle default field selection + exclude_list = ( + getattr(self, "__exclude__fields__", []) or [] + ) + list(self.__system_default_model__) + columns_list = list(set(self.columns).difference(set(exclude_list))) + columns_list = [col for col in columns_list if str(col)[-2:] != "id"] + columns_list.extend( + [col for col in self.columns if str(col)[-5:].lower() == "uu_id"] + ) + + for remove_field in self.__system_default_model__: + if remove_field in columns_list: + columns_list.remove(remove_field) + + for key in columns_list: + val = getattr(self, key) + correct, value_of_database = self.iterate_over_variables(val, key) + if correct: + return_dict[key] = value_of_database + + return return_dict + + +class CRUDModel(BaseAlchemyModel, CrudActions): + + __abstract__ = True + + meta_data: MetaDataRow + creds: Credentials = None + + @property + def is_created(self): + return self.meta_data.created + + @classmethod + def create_credentials(cls, record_created) -> None: + """ + Save user credentials for tracking. + + Args: + record_created: Record that created or updated + """ + + if getattr(cls.creds, "person_id", None) and getattr( + cls.creds, "person_name", None + ): + record_created.created_by_id = cls.creds.person_id + record_created.created_by = cls.creds.person_name + return + + @classmethod + def update_metadata(cls, created: bool, error_case: str = None, message: str = None) -> None: + cls.meta_data = MetaDataRow( + created=created, + error_case=error_case, + message=message + ) + + @classmethod + def raise_exception(cls): + raise HTTPExceptionApi( + error_code=cls.meta_data.error_case, + lang=cls.lang, + loc=get_line_number_for_error(), + sys_msg=cls.meta_data.message + ) + + @classmethod + def create_or_abort(cls, db: Session, **kwargs): + """ + Create a new record or abort if it already exists. + + Args: + db: Database session + **kwargs: Record fields + + Returns: + New record if successfully created + """ + check_kwargs = cls.extract_system_fields(kwargs) + + # Search for existing record + query = db.query(cls).filter( + cls.expiry_ends > str(system_arrow.now()), + cls.expiry_starts <= str(system_arrow.now()), + ) + + for key, value in check_kwargs.items(): + if hasattr(cls, key): + query = query.filter(getattr(cls, key) == value) + + already_record = query.first() + # Handle existing record + if already_record: + if already_record.deleted: + cls.update_metadata(created=False, error_case="DeletedRecord") + cls.raise_exception() + elif not already_record.is_confirmed: + cls.update_metadata(created=False, error_case="IsNotConfirmed") + cls.raise_exception() + cls.update_metadata(created=False, error_case="AlreadyExists") + cls.raise_exception() + + # Create new record + check_kwargs = cls.remove_non_related_inputs(check_kwargs) + created_record = cls() + for key, value in check_kwargs.items(): + setattr(created_record, key, value) + cls.create_credentials(created_record) + db.add(created_record) + db.flush() + cls.update_metadata(created=True) + return created_record + + @classmethod + def find_or_create(cls, db: Session, **kwargs): + """ + Find an existing record matching the criteria or create a new one. + + Args: + db: Database session + **kwargs: Search/creation criteria + + Returns: + Existing or newly created record + """ + check_kwargs = cls.extract_system_fields(kwargs) + + # Search for existing record + query = db.query(cls).filter( + cls.expiry_ends > str(system_arrow.now()), + cls.expiry_starts <= str(system_arrow.now()), + ) + + for key, value in check_kwargs.items(): + if hasattr(cls, key): + query = query.filter(getattr(cls, key) == value) + + already_record = query.first() + # Handle existing record + if already_record: + if already_record.deleted: + cls.update_metadata(created=False, error_case="DeletedRecord") + return already_record + elif not already_record.is_confirmed: + cls.update_metadata(created=False, error_case="IsNotConfirmed") + return already_record + cls.update_metadata(created=False, error_case="AlreadyExists") + return already_record + + # Create new record + check_kwargs = cls.remove_non_related_inputs(check_kwargs) + created_record = cls() + for key, value in check_kwargs.items(): + setattr(created_record, key, value) + cls.create_credentials(created_record) + db.add(created_record) + db.flush() + cls.update_metadata(created=True) + return created_record + + def update(self, db: Session, **kwargs): + """ + Update the record with new values. + + Args: + db: Database session + **kwargs: Fields to update + + Returns: + Updated record + + Raises: + ValueError: If attempting to update is_confirmed with other fields + """ + check_kwargs = self.remove_non_related_inputs(kwargs) + check_kwargs = self.extract_system_fields(check_kwargs, create=False) + + for key, value in check_kwargs.items(): + setattr(self, key, value) + + self.update_credentials(kwargs=kwargs) + db.flush() + return self + + def update_credentials(self, **kwargs) -> None: + """ + Save user credentials for tracking. + + Args: + record_updated: Record that created or updated + """ + # Update confirmation or modification tracking + is_confirmed_argument = kwargs.get("is_confirmed", None) + + if is_confirmed_argument and not len(kwargs) == 1: + raise ValueError("Confirm field cannot be updated with other fields") + + if is_confirmed_argument: + if getattr(self.creds, "person_id", None) and getattr(self.creds, "person_name", None): + self.confirmed_by_id = self.creds.person_id + self.confirmed_by = self.creds.person_name + else: + if getattr(self.creds, "person_id", None) and getattr(self.creds, "person_name", None): + self.updated_by_id = self.creds.person_id + self.updated_by = self.creds.person_name + return diff --git a/Services/PostgresDb/Models/filter_functions.py b/Services/PostgresDb/Models/filter_functions.py index efc62bc..11df4d4 100644 --- a/Services/PostgresDb/Models/filter_functions.py +++ b/Services/PostgresDb/Models/filter_functions.py @@ -6,420 +6,83 @@ including pagination, ordering, and complex query building. """ from __future__ import annotations -from typing import Any, Dict, List, Optional, Type, TypeVar, Union, Tuple, Protocol -from dataclasses import dataclass -from json import dumps +from typing import Any, TypeVar, Type -from sqlalchemy import BinaryExpression, desc, asc -from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Query, Session from sqlalchemy.sql.elements import BinaryExpression +from sqlalchemy_mixins.smartquery import SmartQueryMixin + +from Services.PostgresDb.Models.response import PostgresResponse +from Services.PostgresDb.Models_old.base_model import BaseModel from ApiLibrary import system_arrow -from ApiLibrary.common.line_number import get_line_number_for_error -from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi -from Services.PostgresDb.Models.response import PostgresResponse -# Type variable for class methods returning self + T = TypeVar("T", bound="FilterAttributes") -class HTTPException(Exception): - """Base exception for HTTP errors.""" - - def __init__(self, status_code: str, detail: str): - self.status_code = status_code - self.detail = detail - super().__init__(detail) - - -class HTTPStatus(Protocol): - """Protocol defining required HTTP status codes.""" - - HTTP_400_BAD_REQUEST: str - HTTP_404_NOT_FOUND: str - HTTP_304_NOT_MODIFIED: str - - -@dataclass -class FilterConfig: - """Configuration for filtering and pagination.""" - - page: int = 1 - size: int = 10 - order_field: str = "id" - order_type: str = "asc" - include_joins: List[str] = None - query: Dict[str, Any] = None - - def __post_init__(self): - """Initialize default values for None fields.""" - self.include_joins = self.include_joins or [] - self.query = self.query or {} - - -class QueryConfig: - """Configuration for query building and execution.""" - - def __init__( - self, - pre_query: Optional[Query] = None, - filter_config: Optional[FilterConfig] = None, - http_exception: Optional[Type[HTTPException]] = HTTPException, - status: Optional[Type[HTTPStatus]] = None, - ): - self.pre_query = pre_query - self.filter_config = filter_config or FilterConfig() - self.http_exception = http_exception - self.status = status - self.total_count: Optional[int] = None - - def update_filter_config(self, **kwargs) -> None: - """Update filter configuration parameters.""" - for key, value in kwargs.items(): - if hasattr(self.filter_config, key): - setattr(self.filter_config, key, value) - - def set_total_count(self, count: int) -> None: - """Set the total count of records.""" - self.total_count = count - - -class FilterAttributes: - """ - Advanced filtering capabilities for SQLAlchemy models. - - Features: - - Pagination and ordering - - Complex query building - - Active/deleted/confirmed status filtering - - Expiry date handling - - Transaction management - - Usage: - # Initialize configuration - config = QueryConfig(filter_config=FilterConfig(page=1, size=10)) - - # Create model with configuration - class User(FilterAttributes): - query_config = config - - # Filter multiple records - users = User.filter_by_all(db, name="John").data - - # Update configuration - User.query_config.update_filter_config(page=2, size=20) - next_users = User.filter_all(db).data - """ +class ArgumentModel: __abstract__ = True - # Class-level configuration - query_config: QueryConfig = QueryConfig() - - @classmethod - def flush(cls: Type[T], db: Session) -> T: - """ - Flush the current session to the database. - - Args: - db: Database session - - Returns: - Self instance - - Raises: - HTTPException: If database operation fails - """ - try: - db.flush() - return cls - except SQLAlchemyError as e: - raise HTTPExceptionApi( - error_code="HTTP_304_NOT_MODIFIED", - lang=cls.lang or "tr", - loc=get_line_number_for_error(), - sys_msg=str(e), - ) - - @classmethod - def destroy(cls: Type[T], db: Session) -> None: - """ - Delete the record from the database. - - Args: - db: Database session - """ - db.delete(cls) - db.commit() - - @classmethod - def save_via_metadata(cls: Type[T], db: Session) -> None: - """ - Save or rollback based on metadata. - - Args: - db: Database session - - Raises: - HTTPException: If save operation fails - """ - try: - meta_data = getattr(cls, "meta_data", {}) - if meta_data.get("created", False): - db.commit() - db.rollback() - except SQLAlchemyError as e: - raise HTTPExceptionApi( - error_code="HTTP_304_NOT_MODIFIED", - lang=cls.lang or "tr", - loc=get_line_number_for_error(), - sys_msg=str(e), - ) - - @classmethod - def save(cls: Type[T], db: Session) -> None: - """ - Commit changes to database. - - Args: - db: Database session - - Raises: - HTTPException: If commit fails - """ - try: - db.commit() - except SQLAlchemyError as e: - raise HTTPExceptionApi( - error_code="HTTP_304_NOT_MODIFIED", - lang=cls.lang or "tr", - loc=get_line_number_for_error(), - sys_msg=str(e), - ) - - @classmethod - def rollback(cls: Type[T], db: Session) -> None: - """ - Rollback current transaction. - - Args: - db: Database session - """ - db.rollback() - - @classmethod - def save_and_confirm(cls: Type[T], db: Session) -> None: - """ - Save changes and mark record as confirmed. - - Args: - db: Database session - - Raises: - HTTPException: If operation fails - """ - try: - cls.save(db) - cls.update(db, is_confirmed=True) - cls.save(db) - except SQLAlchemyError as e: - raise HTTPExceptionApi( - error_code="HTTP_304_NOT_MODIFIED", - lang=cls.lang or "tr", - loc=get_line_number_for_error(), - sys_msg=str(e), - ) - @classmethod def _query(cls: Type[T], db: Session) -> Query: - """ - Get base query for model. - - Args: - db: Database session - - Returns: - SQLAlchemy Query object - """ + """Returns the query to use in the model.""" return ( - cls.query_config.pre_query if cls.query_config.pre_query else db.query(cls) + cls.pre_query if cls.pre_query else db.query(cls) ) @classmethod - def add_query_to_filter( - cls: Type[T], query: Query, filter_list: Dict[str, Any] - ) -> Query: - """ - Add pagination and ordering to query. - - Args: - query: Base query - filter_list: Dictionary containing pagination and ordering parameters - - Returns: - Modified query with pagination and ordering - """ - order_field = getattr(cls, filter_list.get("order_field")) - order_func = desc if str(filter_list.get("order_type"))[0] == "d" else asc - - return ( - query.order_by(order_func(order_field)) - .limit(filter_list.get("size")) - .offset((filter_list.get("page") - 1) * filter_list.get("size")) - .populate_existing() + def add_new_arg_to_args(cls: Type[T], args_list, argument, value): + new_arg_list = list( + set( + args_ + for args_ in list(args_list) + if isinstance(args_, BinaryExpression) + ) ) - - @classmethod - def get_filter_attributes(cls) -> Dict[str, Any]: - """ - Get filter configuration from attributes. - - Returns: - Dictionary containing pagination and filtering parameters - """ - return { - "page": getattr(cls.query_config.filter_config, "page", 1), - "size": getattr(cls.query_config.filter_config, "size", 10), - "order_field": getattr(cls.query_config.filter_config, "order_field", "id"), - "order_type": getattr(cls.query_config.filter_config, "order_type", "asc"), - "include_joins": getattr( - cls.query_config.filter_config, "include_joins", [] - ), - "query": getattr(cls.query_config.filter_config, "query", {}), - } - - @classmethod - def add_new_arg_to_args( - cls, - args_list: Tuple[BinaryExpression, ...], - argument: str, - value: BinaryExpression, - ) -> Tuple[BinaryExpression, ...]: - """ - Add new argument to filter arguments if not exists. - - Args: - args_list: Current filter arguments - argument: Argument name to check - value: New argument to add - - Returns: - Updated argument tuple - """ - new_args = [arg for arg in args_list if isinstance(arg, BinaryExpression)] arg_left = lambda arg_obj: getattr(getattr(arg_obj, "left", None), "key", None) - - if not any(arg_left(arg) == argument for arg in new_args): - new_args.append(value) - - return tuple(new_args) + # arg_right = lambda arg_obj: getattr(getattr(arg_obj, "right", None), "value", None) + if not any(True for arg in new_arg_list if arg_left(arg_obj=arg) == argument): + new_arg_list.append(value) + return tuple(new_arg_list) @classmethod - def get_not_expired_query_arg( - cls, args: Tuple[BinaryExpression, ...] - ) -> Tuple[BinaryExpression, ...]: - """ - Add expiry date conditions to query. - - Args: - args: Current query arguments - - Returns: - Updated arguments with expiry conditions - """ - current_time = str(system_arrow.now()) - args = cls.add_new_arg_to_args( - args, "expiry_ends", cls.expiry_ends > current_time - ) - args = cls.add_new_arg_to_args( - args, "expiry_starts", cls.expiry_starts <= current_time - ) - return args + def get_not_expired_query_arg(cls: Type[T], arg): + """Add expiry_starts and expiry_ends to the query.""" + starts = cls.expiry_starts <= str(system_arrow.now()) + ends = cls.expiry_ends > str(system_arrow.now()) + arg = cls.add_new_arg_to_args(arg, "expiry_ends", ends) + arg = cls.add_new_arg_to_args(arg, "expiry_starts", starts) + return arg @classmethod - def get_active_and_confirmed_query_arg( - cls, args: Tuple[BinaryExpression, ...] - ) -> Tuple[BinaryExpression, ...]: - """ - Add status conditions to query. + def get_active_and_confirmed_query_arg(cls: Type[T], arg): + """Add active and confirmed to the query.""" + arg = cls.add_new_arg_to_args(arg, "is_confirmed", cls.is_confirmed == True) + arg = cls.add_new_arg_to_args(arg, "active", cls.active == True) + arg = cls.add_new_arg_to_args(arg, "deleted", cls.deleted == False) + return arg - Args: - args: Current query arguments - Returns: - Updated arguments with status conditions - """ - args = cls.add_new_arg_to_args(args, "is_confirmed", cls.is_confirmed == True) - args = cls.add_new_arg_to_args(args, "active", cls.active == True) - args = cls.add_new_arg_to_args(args, "deleted", cls.deleted == False) - return args +class QueryModel(ArgumentModel, BaseModel, SmartQueryMixin): + + pre_query = None + __abstract__ = True @classmethod - def select_only( - cls: Type[T], - db: Session, - *args: BinaryExpression, - select_args: List[Any], - order_by: Optional[Any] = None, - limit: Optional[int] = None, - system: bool = False, - ) -> PostgresResponse: - """ - Select specific columns from filtered query. - - Args: - db: Database session - args: Filter conditions - select_args: Columns to select - order_by: Optional ordering - limit: Optional result limit - system: If True, skip status filtering - - Returns: - Query response with selected columns - """ - if not system: - args = cls.get_active_and_confirmed_query_arg(args) - args = cls.get_not_expired_query_arg(args) - - query = cls._query(db).filter(*args).with_entities(*select_args) - cls.query_config.set_total_count(query.count()) - - if order_by is not None: - query = query.order_by(order_by) - if limit: - query = query.limit(limit) - - return PostgresResponse(query=query, first=False) + def produce_query_to_add(cls: Type[T], filter_list): + if filter_list.get("query"): + for smart_iter in cls.filter_expr(**filter_list["query"]): + if key := getattr(getattr(smart_iter, "left", None), "key", None): + args = cls.add_new_arg_to_args(args, key, smart_iter) @classmethod - def filter_by_all( - cls: Type[T], db: Session, system: bool = False, **kwargs - ) -> PostgresResponse: - """ - Filter multiple records by keyword arguments. - - Args: - db: Database session - system: If True, skip status filtering - **kwargs: Filter criteria - - Returns: - Query response with matching records - """ - if "is_confirmed" not in kwargs and not system: - kwargs["is_confirmed"] = True - kwargs.pop("system", None) - - query = cls._query(db).filter_by(**kwargs) - cls.query_config.set_total_count(query.count()) - - if cls.query_config.filter_config: - filter_list = cls.get_filter_attributes() - query = cls.add_query_to_filter(query, filter_list) - - return PostgresResponse(query=query, first=False) + def convert( + cls: Type[T], smart_options: dict, validate_model: Any = None + ) -> tuple[BinaryExpression]: + if not validate_model: + return tuple(cls.filter_expr(**smart_options)) @classmethod def filter_by_one( @@ -439,53 +102,16 @@ class FilterAttributes: if "is_confirmed" not in kwargs and not system: kwargs["is_confirmed"] = True kwargs.pop("system", None) - query = cls._query(db).filter_by(**kwargs) - cls.query_config.set_total_count(1) - - return PostgresResponse(query=query, first=True) - - @classmethod - def filter_all( - cls: Type[T], *args: Any, db: Session, system: bool = False - ) -> PostgresResponse: - """ - Filter multiple records by expressions. - - Args: - db: Database session - args: Filter expressions - system: If True, skip status filtering - - Returns: - Query response with matching records - """ - if not system: - args = cls.get_active_and_confirmed_query_arg(args) - args = cls.get_not_expired_query_arg(args) - - filter_list = cls.get_filter_attributes() - if filter_list.get("query"): - for smart_iter in cls.filter_expr(**filter_list["query"]): - if key := getattr(getattr(smart_iter, "left", None), "key", None): - args = cls.add_new_arg_to_args(args, key, smart_iter) - - query = cls._query(db) - cls.query_config.set_total_count(query.count()) - query = query.filter(*args) - - if cls.query_config.filter_config: - query = cls.add_query_to_filter(query, filter_list) - - return PostgresResponse(query=query, first=False) + return PostgresResponse(pre_query=cls._query(db), query=query, is_array=False) @classmethod def filter_one( - cls: Type[T], - *args: Any, - db: Session, - system: bool = False, - expired: bool = False, + cls: Type[T], + *args: Any, + db: Session, + system: bool = False, + expired: bool = False, ) -> PostgresResponse: """ Filter single record by expressions. @@ -501,35 +127,61 @@ class FilterAttributes: """ if not system: args = cls.get_active_and_confirmed_query_arg(args) - if not expired: - args = cls.get_not_expired_query_arg(args) - + if not expired: + args = cls.get_not_expired_query_arg(args) query = cls._query(db).filter(*args) - cls.query_config.set_total_count(1) + return PostgresResponse(pre_query=cls._query(db), query=query, is_array=False) - return PostgresResponse(query=query, first=True) - # @classmethod - # def raise_http_exception( - # cls, - # status_code: str, - # error_case: str, - # data: Dict[str, Any], - # message: str, - # ) -> None: - # """ - # Raise HTTP exception with formatted error details. + @classmethod + def filter_all_system( + cls: Type[T], *args: BinaryExpression, db: Session + ) -> PostgresResponse: + """ + Filter multiple records by expressions without status filtering. - # Args: - # status_code: HTTP status code string - # error_case: Error type - # data: Additional error data - # message: Error message + Args: + db: Database session + args: Filter expressions - # Raises: - # HTTPException: With formatted error details - # """ - # raise HTTPExceptionApi( - # error_code="HTTP_304_NOT_MODIFIED", - # lang=cls.lang or "tr", loc=get_line_number_for_error() - # ) + Returns: + Query response with matching records + """ + query = cls._query(db) + query = query.filter(*args) + return PostgresResponse(pre_query=cls._query(db), query=query, is_array=True) + + @classmethod + def filter_all( + cls: Type[T], *args: Any, db: Session + ) -> PostgresResponse: + """ + Filter multiple records by expressions. + + Args: + db: Database session + args: Filter expressions + Returns: + Query response with matching records + """ + args = cls.get_active_and_confirmed_query_arg(args) + args = cls.get_not_expired_query_arg(args) + query = cls._query(db).filter(*args) + return PostgresResponse(pre_query=cls._query(db), query=query, is_array=True) + + @classmethod + def filter_by_all_system( + cls: Type[T], db: Session, **kwargs + ) -> PostgresResponse: + """ + Filter multiple records by keyword arguments. + + Args: + db: Database session + **kwargs: Filter criteria + + Returns: + Query response with matching records + """ + query = cls._query(db).filter_by(**kwargs) + return PostgresResponse(pre_query=cls._query(db), query=query, is_array=True) diff --git a/Services/PostgresDb/Models/language_alchemy.py b/Services/PostgresDb/Models/language_alchemy.py new file mode 100644 index 0000000..10416d0 --- /dev/null +++ b/Services/PostgresDb/Models/language_alchemy.py @@ -0,0 +1,7 @@ + + + +class LanguageModel: + __language_model__ = None + + diff --git a/Services/PostgresDb/Models/mixin.py b/Services/PostgresDb/Models/mixin.py new file mode 100644 index 0000000..1770a2e --- /dev/null +++ b/Services/PostgresDb/Models/mixin.py @@ -0,0 +1,176 @@ +from sqlalchemy import ( + TIMESTAMP, + NUMERIC, + func, + text, + UUID, + String, + Integer, + Boolean, + SmallInteger, +) +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy_mixins.serialize import SerializeMixin +from sqlalchemy_mixins.repr import ReprMixin + +from Services.PostgresDb.Models.crud_alchemy import CRUDModel +from Services.PostgresDb.Models.filter_functions import QueryModel + + +class BasicMixin(CRUDModel, QueryModel): + + __abstract__ = True + __repr__ = ReprMixin.__repr__ + + +class CrudMixin(BasicMixin, SerializeMixin, ReprMixin): + """ + Base mixin providing CRUD operations and common fields for PostgreSQL models. + + Features: + - Automatic timestamps (created_at, updated_at) + - Soft delete capability + - User tracking (created_by, updated_by) + - Data serialization + - Multi-language support + """ + + __abstract__ = True + + + # Primary and reference fields + id: Mapped[int] = mapped_column(Integer, primary_key=True) + uu_id: Mapped[str] = mapped_column( + UUID, + server_default=text("gen_random_uuid()"), + index=True, + unique=True, + comment="Unique identifier UUID", + ) + + # Common timestamp fields for all models + expiry_starts: Mapped[TIMESTAMP] = mapped_column( + type_=TIMESTAMP(timezone=True), + server_default=func.now(), + nullable=False, + comment="Record validity start timestamp", + ) + expiry_ends: Mapped[TIMESTAMP] = mapped_column( + type_=TIMESTAMP(timezone=True), + default="2099-12-31", + server_default="2099-12-31", + comment="Record validity end timestamp", + ) + + +class BaseCollection(CrudMixin): + """Base model class with minimal fields.""" + + __abstract__ = True + __repr__ = ReprMixin.__repr__ + + +class CrudCollection(CrudMixin): + """ + Full-featured model class with all common fields. + + Includes: + - UUID and reference ID + - Timestamps + - User tracking + - Confirmation status + - Soft delete + - Notification flags + """ + + __abstract__ = True + __repr__ = ReprMixin.__repr__ + + ref_id: Mapped[str] = mapped_column( + String(100), nullable=True, index=True, comment="External reference ID" + ) + + # Timestamps + created_at: Mapped[TIMESTAMP] = mapped_column( + TIMESTAMP(timezone=True), + server_default=func.now(), + nullable=False, + index=True, + comment="Record creation timestamp", + ) + updated_at: Mapped[TIMESTAMP] = mapped_column( + TIMESTAMP(timezone=True), + server_default=func.now(), + onupdate=func.now(), + nullable=False, + index=True, + comment="Last update timestamp", + ) + + # Cryptographic and user tracking + cryp_uu_id: Mapped[str] = mapped_column( + String, nullable=True, index=True, comment="Cryptographic UUID" + ) + created_by: Mapped[str] = mapped_column( + String, nullable=True, comment="Creator name" + ) + created_by_id: Mapped[int] = mapped_column( + Integer, nullable=True, comment="Creator ID" + ) + updated_by: Mapped[str] = mapped_column( + String, nullable=True, comment="Last modifier name" + ) + updated_by_id: Mapped[int] = mapped_column( + Integer, nullable=True, comment="Last modifier ID" + ) + confirmed_by: Mapped[str] = mapped_column( + String, nullable=True, comment="Confirmer name" + ) + confirmed_by_id: Mapped[int] = mapped_column( + Integer, nullable=True, comment="Confirmer ID" + ) + + # Status flags + is_confirmed: Mapped[bool] = mapped_column( + Boolean, server_default="0", comment="Record confirmation status" + ) + replication_id: Mapped[int] = mapped_column( + SmallInteger, server_default="0", comment="Replication identifier" + ) + deleted: Mapped[bool] = mapped_column( + Boolean, server_default="0", comment="Soft delete flag" + ) + active: Mapped[bool] = mapped_column( + Boolean, server_default="1", comment="Record active status" + ) + is_notification_send: Mapped[bool] = mapped_column( + Boolean, server_default="0", comment="Notification sent flag" + ) + is_email_send: Mapped[bool] = mapped_column( + Boolean, server_default="0", comment="Email sent flag" + ) + + # @classmethod + # def retrieve_language_model(cls, lang: str, response_model: Any) -> Dict[str, str]: + # """ + # Retrieve language-specific model headers and validation messages. + # + # Args: + # lang: Language code + # response_model: Model containing language annotations + # + # Returns: + # Dictionary of field names to localized headers + # """ + # headers_and_validation = {} + # __language_model__ = getattr(cls.__language_model__, lang, "tr") + # + # for field in response_model.__annotations__.keys(): + # headers_and_validation[field] = getattr( + # __language_model__, field, "Lang Not found" + # ) + # + # return headers_and_validation + + + diff --git a/Services/PostgresDb/Models/pagination.py b/Services/PostgresDb/Models/pagination.py new file mode 100644 index 0000000..a159b98 --- /dev/null +++ b/Services/PostgresDb/Models/pagination.py @@ -0,0 +1,176 @@ +from __future__ import annotations +from typing import Any, Dict, Optional, Union +from sqlalchemy import desc, asc +from pydantic import BaseModel +from AllConfigs.SqlDatabase.configs import PaginateConfig +from Services.PostgresDb.Models.response import PostgresResponse + + +class PaginationConfig(BaseModel): + """ + Configuration for pagination settings. + + Attributes: + page: Current page number (default: 1) + size: Items per page (default: 10) + order_field: Field to order by (default: "id") + order_type: Order direction (default: "asc") + """ + + page: int = 1 + size: int = 10 + order_field: Optional[Union[tuple[str], list[str]]] = None + order_type: Optional[Union[tuple[str], list[str]]] = None + + def __init__(self, **data): + super().__init__(**data) + if self.order_field is None: + self.order_field = ["uu_id"] + if self.order_type is None: + self.order_type = ["asc"] + + +class Pagination: + """ + Handles pagination logic for query results. + + Manages page size, current page, ordering, and calculates total pages + and items based on the data source. + + Attributes: + DEFAULT_SIZE: Default number of items per page (10) + MIN_SIZE: Minimum allowed page size (10) + MAX_SIZE: Maximum allowed page size (40) + """ + + DEFAULT_SIZE = PaginateConfig.DEFAULT_SIZE + MIN_SIZE = PaginateConfig.MIN_SIZE + MAX_SIZE = PaginateConfig.MAX_SIZE + + def __init__(self, data: PostgresResponse): + self.data = data + self.size: int = self.DEFAULT_SIZE + self.page: int = 1 + self.orderField: Optional[Union[tuple[str], list[str]]] = ["uu_id"] + self.orderType: Optional[Union[tuple[str], list[str]]] = ["asc"] + self.page_count: int = 1 + self.total_count: int = 0 + self.all_count: int = 0 + self.total_pages: int = 1 + self._update_page_counts() + + def change(self, **kwargs) -> None: + """Update pagination settings from config.""" + config = PaginationConfig(**kwargs) + self.size = ( + config.size + if self.MIN_SIZE <= config.size <= self.MAX_SIZE + else self.DEFAULT_SIZE + ) + self.page = config.page + self.orderField = config.order_field + self.orderType = config.order_type + self._update_page_counts() + + def feed(self, data: PostgresResponse) -> None: + """Calculate pagination based on data source.""" + self.data = data + self._update_page_counts() + + def _update_page_counts(self) -> None: + """Update page counts and validate current page.""" + if self.data: + self.total_count = self.data.count + self.all_count = self.data.total_count + + self.size = ( + self.size + if self.MIN_SIZE <= self.size <= self.MAX_SIZE + else self.DEFAULT_SIZE + ) + self.total_pages = max(1, (self.total_count + self.size - 1) // self.size) + self.page = max(1, min(self.page, self.total_pages)) + self.page_count = ( + self.total_count % self.size + if self.page == self.total_pages and self.total_count % self.size + else self.size + ) + + def refresh(self) -> None: + """Reset pagination state to defaults.""" + self._update_page_counts() + + def reset(self) -> None: + """Reset pagination state to defaults.""" + self.size = self.DEFAULT_SIZE + self.page = 1 + self.orderField = "uu_id" + self.orderType = "asc" + + def as_dict(self) -> Dict[str, Any]: + """Convert pagination state to dictionary format.""" + self.refresh() + return { + "size": self.size, + "page": self.page, + "allCount": self.all_count, + "totalCount": self.total_count, + "totalPages": self.total_pages, + "pageCount": self.page_count, + "order_field": self.orderField, + "order_type": self.orderType, + } + + +class PaginationResult: + """ + Result of a paginated query. + + Contains the query result and pagination state. + data: PostgresResponse of query results + pagination: Pagination state + + Attributes: + _query: Original query object + pagination: Pagination state + """ + + def __init__(self, data: PostgresResponse, pagination: Pagination): + self._query = data.query + self.pagination = pagination + self.response_type = data.is_list + self.limit = self.pagination.size + self.offset = self.pagination.size * (self.pagination.page - 1) + self.order_by = self.pagination.orderField + + def dynamic_order_by(self): + """ + Dynamically order a query by multiple fields. + Returns: + Ordered query object. + """ + if not len(self.order_by) == len(self.pagination.orderType): + raise ValueError( + "Order by fields and order types must have the same length." + ) + order_criteria = zip(self.order_by, self.pagination.orderType) + for field, direction in order_criteria: + if hasattr(self._query.column_descriptions[0]['entity'], field): + if direction.lower().startswith("d"): + self._query = self._query.order_by( + desc(getattr(self._query.column_descriptions[0]['entity'], field)) + ) + else: + self._query = self._query.order_by( + asc(getattr(self._query.column_descriptions[0]['entity'], field)) + ) + return self._query + + @property + def data(self) -> Union[list | dict]: + """Get query object.""" + query_ordered = self.dynamic_order_by() + query_paginated = query_ordered.limit(self.limit).offset(self.offset) + queried_data = query_paginated.all() if self.response_type else query_paginated.first() + return [result.get_dict() for result in queried_data] if self.response_type else queried_data.get_dict() + diff --git a/Services/PostgresDb/Models/response.py b/Services/PostgresDb/Models/response.py index 60d8cf0..a84e731 100644 --- a/Services/PostgresDb/Models/response.py +++ b/Services/PostgresDb/Models/response.py @@ -5,7 +5,7 @@ This module provides a wrapper class for SQLAlchemy query results, adding convenience methods for accessing data and managing query state. """ -from typing import Any, Dict, List, Optional, TypeVar, Generic, Union +from typing import Any, Dict, Optional, TypeVar, Generic, Union from sqlalchemy.orm import Query T = TypeVar("T") @@ -17,30 +17,59 @@ class PostgresResponse(Generic[T]): Attributes: query: SQLAlchemy query object - first: Whether to return first result only - data: Query results (lazy loaded) - count: Total count of results + metadata: Additional metadata for the query Properties: - all: All results as list - first_item: First result only + count: Total count of results + query: Get query object + as_dict: Convert response to dictionary format """ def __init__( - self, - query: Query, - first: bool = False, - status: bool = True, - message: str = "", - error: Optional[str] = None, + self, + pre_query: Query, + query: Query, + is_array: bool = True, + metadata: Any = None, ): + self._is_list = is_array self._query = query - self._first = first - self.status = status - self.message = message - self.error = error - self._data: Optional[Union[List[T], T]] = None + self._pre_query = pre_query self._count: Optional[int] = None + self.metadata = metadata + + @property + def data(self) -> Union[T, list[T]]: + """Get query results.""" + if not self.is_list: + first_item = self._query.first() + return first_item if first_item else None + return self._query.all() if self._query.all() else [] + + @property + def data_as_dict(self) -> Union[Dict[str, Any], list[Dict[str, Any]]]: + """Get query results as dictionary.""" + if self.is_list: + first_item = self._query.first() + return first_item.get_dict() if first_item.first() else None + all_items = self._query.all() + return [result.get_dict() for result in all_items] if all_items else [] + + @property + def total_count(self) -> int: + """Lazy load and return total count of results.""" + if self.is_list: + return self._pre_query.count() if self._pre_query else 0 + return 1 + + @property + def count(self) -> int: + """Lazy load and return total count of results.""" + if self.is_list and self._count is None: + self._count = self._query.count() + elif not self.is_list: + self._count = 1 + return self._count @property def query(self) -> Query: @@ -48,43 +77,15 @@ class PostgresResponse(Generic[T]): return self._query @property - def data(self) -> Union[List[T], T, None]: - """ - Lazy load and return query results. - Returns first item if first=True, otherwise returns all results. - """ - if self._data is None: - results = self._query.all() - self._data = results[0] if self._first and results else results - return self._data - - @property - def count(self) -> int: - """Lazy load and return total count of results.""" - if self._count is None: - self._count = self._query.count() - return self._count - - @property - def all(self) -> List[T]: - """Get all results as list.""" - return ( - self.data - if isinstance(self.data, list) - else [self.data] if self.data else [] - ) - - @property - def first(self) -> Optional[T]: - """Get first result only.""" - return self.data if self._first else (self.data[0] if self.data else None) + def is_list(self) -> bool: + """Check if response is a list.""" + return self._is_list def as_dict(self) -> Dict[str, Any]: """Convert response to dictionary format.""" return { - "status": self.status, - "message": self.message, - "data": self.data, + "metadata": self.metadata, + "is_list": self._is_list, + "query": self.query, "count": self.count, - "error": self.error, } diff --git a/Services/PostgresDb/Models/system_fields.py b/Services/PostgresDb/Models/system_fields.py new file mode 100644 index 0000000..29d5875 --- /dev/null +++ b/Services/PostgresDb/Models/system_fields.py @@ -0,0 +1,53 @@ + + +class SystemFields: + + __abstract__ = True + + # System fields that should be handled automatically during creation + __system__fields__create__ = ( + "created_at", + "updated_at", + "cryp_uu_id", + "created_by", + "created_by_id", + "updated_by", + "updated_by_id", + "replication_id", + "confirmed_by", + "confirmed_by_id", + "is_confirmed", + "deleted", + "active", + "is_notification_send", + "is_email_send", + ) + + # System fields that should be handled automatically during updates + __system__fields__update__ = ( + "cryp_uu_id", + "created_at", + "updated_at", + "created_by", + "created_by_id", + "confirmed_by", + "confirmed_by_id", + "updated_by", + "updated_by_id", + "replication_id", + ) + + # Default fields to exclude from serialization + __system_default_model__ = ( + "cryp_uu_id", + "is_confirmed", + "deleted", + "is_notification_send", + "replication_id", + "is_email_send", + "confirmed_by_id", + "confirmed_by", + "updated_by_id", + "created_by_id", + ) + diff --git a/Services/PostgresDb/Models/token.py b/Services/PostgresDb/Models/token.py new file mode 100644 index 0000000..e60832b --- /dev/null +++ b/Services/PostgresDb/Models/token.py @@ -0,0 +1,39 @@ +from typing import TypeVar, Dict, Any +from dataclasses import dataclass +from ApiLibrary import get_line_number_for_error +from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi + +# Type variable for class methods returning self +T = TypeVar("T", bound="FilterAttributes") + + +@dataclass +class TokenModel: + lang: str + credentials: Dict[str, str] + timezone: str + + def __post_init__(self): + self.lang = str(self.lang or "tr").lower() + self.credentials = self.credentials or {} + if 'GMT' in self.timezone: + raise HTTPExceptionApi( + error_code="HTTP_400_BAD_REQUEST", + lang=self.lang, + loc=get_line_number_for_error(), + sys_msg="Invalid timezone format", + ) + + @classmethod + def set_user_define_properties(cls, token: Any) -> None: + """ + Set user-specific properties from the authentication token. + + Args: + token: Authentication token containing user preferences + """ + from ApiLibrary.date_time_actions.date_functions import DateTimeLocal + + cls.credentials = token.credentials + cls.client_arrow = DateTimeLocal(is_client=True, timezone=token.timezone) + cls.lang = str(token.lang).lower() diff --git a/Services/PostgresDb/Models/alchemy_response.py b/Services/PostgresDb/Models_old/alchemy_response.py similarity index 99% rename from Services/PostgresDb/Models/alchemy_response.py rename to Services/PostgresDb/Models_old/alchemy_response.py index ca4314c..78b282f 100644 --- a/Services/PostgresDb/Models/alchemy_response.py +++ b/Services/PostgresDb/Models_old/alchemy_response.py @@ -276,7 +276,6 @@ class AlchemyJsonResponse(BaseJsonResponse[T]): pagination = instance._create_pagination() data = [instance._transform_data(item.get_dict()) for item in result.data] pagination.feed(data) - return instance._format_response(pagination, data) diff --git a/Services/PostgresDb/Models/base_model.py b/Services/PostgresDb/Models_old/base_model.py similarity index 100% rename from Services/PostgresDb/Models/base_model.py rename to Services/PostgresDb/Models_old/base_model.py diff --git a/Services/PostgresDb/Models_old/filter_functions.py b/Services/PostgresDb/Models_old/filter_functions.py new file mode 100644 index 0000000..efc62bc --- /dev/null +++ b/Services/PostgresDb/Models_old/filter_functions.py @@ -0,0 +1,535 @@ +""" +Advanced filtering functionality for SQLAlchemy models. + +This module provides a comprehensive set of filtering capabilities for SQLAlchemy models, +including pagination, ordering, and complex query building. +""" + +from __future__ import annotations +from typing import Any, Dict, List, Optional, Type, TypeVar, Union, Tuple, Protocol +from dataclasses import dataclass +from json import dumps + +from sqlalchemy import BinaryExpression, desc, asc +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import Query, Session +from sqlalchemy.sql.elements import BinaryExpression + +from ApiLibrary import system_arrow +from ApiLibrary.common.line_number import get_line_number_for_error +from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi +from Services.PostgresDb.Models.response import PostgresResponse + +# Type variable for class methods returning self +T = TypeVar("T", bound="FilterAttributes") + + +class HTTPException(Exception): + """Base exception for HTTP errors.""" + + def __init__(self, status_code: str, detail: str): + self.status_code = status_code + self.detail = detail + super().__init__(detail) + + +class HTTPStatus(Protocol): + """Protocol defining required HTTP status codes.""" + + HTTP_400_BAD_REQUEST: str + HTTP_404_NOT_FOUND: str + HTTP_304_NOT_MODIFIED: str + + +@dataclass +class FilterConfig: + """Configuration for filtering and pagination.""" + + page: int = 1 + size: int = 10 + order_field: str = "id" + order_type: str = "asc" + include_joins: List[str] = None + query: Dict[str, Any] = None + + def __post_init__(self): + """Initialize default values for None fields.""" + self.include_joins = self.include_joins or [] + self.query = self.query or {} + + +class QueryConfig: + """Configuration for query building and execution.""" + + def __init__( + self, + pre_query: Optional[Query] = None, + filter_config: Optional[FilterConfig] = None, + http_exception: Optional[Type[HTTPException]] = HTTPException, + status: Optional[Type[HTTPStatus]] = None, + ): + self.pre_query = pre_query + self.filter_config = filter_config or FilterConfig() + self.http_exception = http_exception + self.status = status + self.total_count: Optional[int] = None + + def update_filter_config(self, **kwargs) -> None: + """Update filter configuration parameters.""" + for key, value in kwargs.items(): + if hasattr(self.filter_config, key): + setattr(self.filter_config, key, value) + + def set_total_count(self, count: int) -> None: + """Set the total count of records.""" + self.total_count = count + + +class FilterAttributes: + """ + Advanced filtering capabilities for SQLAlchemy models. + + Features: + - Pagination and ordering + - Complex query building + - Active/deleted/confirmed status filtering + - Expiry date handling + - Transaction management + + Usage: + # Initialize configuration + config = QueryConfig(filter_config=FilterConfig(page=1, size=10)) + + # Create model with configuration + class User(FilterAttributes): + query_config = config + + # Filter multiple records + users = User.filter_by_all(db, name="John").data + + # Update configuration + User.query_config.update_filter_config(page=2, size=20) + next_users = User.filter_all(db).data + """ + + __abstract__ = True + + # Class-level configuration + query_config: QueryConfig = QueryConfig() + + @classmethod + def flush(cls: Type[T], db: Session) -> T: + """ + Flush the current session to the database. + + Args: + db: Database session + + Returns: + Self instance + + Raises: + HTTPException: If database operation fails + """ + try: + db.flush() + return cls + except SQLAlchemyError as e: + raise HTTPExceptionApi( + error_code="HTTP_304_NOT_MODIFIED", + lang=cls.lang or "tr", + loc=get_line_number_for_error(), + sys_msg=str(e), + ) + + @classmethod + def destroy(cls: Type[T], db: Session) -> None: + """ + Delete the record from the database. + + Args: + db: Database session + """ + db.delete(cls) + db.commit() + + @classmethod + def save_via_metadata(cls: Type[T], db: Session) -> None: + """ + Save or rollback based on metadata. + + Args: + db: Database session + + Raises: + HTTPException: If save operation fails + """ + try: + meta_data = getattr(cls, "meta_data", {}) + if meta_data.get("created", False): + db.commit() + db.rollback() + except SQLAlchemyError as e: + raise HTTPExceptionApi( + error_code="HTTP_304_NOT_MODIFIED", + lang=cls.lang or "tr", + loc=get_line_number_for_error(), + sys_msg=str(e), + ) + + @classmethod + def save(cls: Type[T], db: Session) -> None: + """ + Commit changes to database. + + Args: + db: Database session + + Raises: + HTTPException: If commit fails + """ + try: + db.commit() + except SQLAlchemyError as e: + raise HTTPExceptionApi( + error_code="HTTP_304_NOT_MODIFIED", + lang=cls.lang or "tr", + loc=get_line_number_for_error(), + sys_msg=str(e), + ) + + @classmethod + def rollback(cls: Type[T], db: Session) -> None: + """ + Rollback current transaction. + + Args: + db: Database session + """ + db.rollback() + + @classmethod + def save_and_confirm(cls: Type[T], db: Session) -> None: + """ + Save changes and mark record as confirmed. + + Args: + db: Database session + + Raises: + HTTPException: If operation fails + """ + try: + cls.save(db) + cls.update(db, is_confirmed=True) + cls.save(db) + except SQLAlchemyError as e: + raise HTTPExceptionApi( + error_code="HTTP_304_NOT_MODIFIED", + lang=cls.lang or "tr", + loc=get_line_number_for_error(), + sys_msg=str(e), + ) + + @classmethod + def _query(cls: Type[T], db: Session) -> Query: + """ + Get base query for model. + + Args: + db: Database session + + Returns: + SQLAlchemy Query object + """ + return ( + cls.query_config.pre_query if cls.query_config.pre_query else db.query(cls) + ) + + @classmethod + def add_query_to_filter( + cls: Type[T], query: Query, filter_list: Dict[str, Any] + ) -> Query: + """ + Add pagination and ordering to query. + + Args: + query: Base query + filter_list: Dictionary containing pagination and ordering parameters + + Returns: + Modified query with pagination and ordering + """ + order_field = getattr(cls, filter_list.get("order_field")) + order_func = desc if str(filter_list.get("order_type"))[0] == "d" else asc + + return ( + query.order_by(order_func(order_field)) + .limit(filter_list.get("size")) + .offset((filter_list.get("page") - 1) * filter_list.get("size")) + .populate_existing() + ) + + @classmethod + def get_filter_attributes(cls) -> Dict[str, Any]: + """ + Get filter configuration from attributes. + + Returns: + Dictionary containing pagination and filtering parameters + """ + return { + "page": getattr(cls.query_config.filter_config, "page", 1), + "size": getattr(cls.query_config.filter_config, "size", 10), + "order_field": getattr(cls.query_config.filter_config, "order_field", "id"), + "order_type": getattr(cls.query_config.filter_config, "order_type", "asc"), + "include_joins": getattr( + cls.query_config.filter_config, "include_joins", [] + ), + "query": getattr(cls.query_config.filter_config, "query", {}), + } + + @classmethod + def add_new_arg_to_args( + cls, + args_list: Tuple[BinaryExpression, ...], + argument: str, + value: BinaryExpression, + ) -> Tuple[BinaryExpression, ...]: + """ + Add new argument to filter arguments if not exists. + + Args: + args_list: Current filter arguments + argument: Argument name to check + value: New argument to add + + Returns: + Updated argument tuple + """ + new_args = [arg for arg in args_list if isinstance(arg, BinaryExpression)] + arg_left = lambda arg_obj: getattr(getattr(arg_obj, "left", None), "key", None) + + if not any(arg_left(arg) == argument for arg in new_args): + new_args.append(value) + + return tuple(new_args) + + @classmethod + def get_not_expired_query_arg( + cls, args: Tuple[BinaryExpression, ...] + ) -> Tuple[BinaryExpression, ...]: + """ + Add expiry date conditions to query. + + Args: + args: Current query arguments + + Returns: + Updated arguments with expiry conditions + """ + current_time = str(system_arrow.now()) + args = cls.add_new_arg_to_args( + args, "expiry_ends", cls.expiry_ends > current_time + ) + args = cls.add_new_arg_to_args( + args, "expiry_starts", cls.expiry_starts <= current_time + ) + return args + + @classmethod + def get_active_and_confirmed_query_arg( + cls, args: Tuple[BinaryExpression, ...] + ) -> Tuple[BinaryExpression, ...]: + """ + Add status conditions to query. + + Args: + args: Current query arguments + + Returns: + Updated arguments with status conditions + """ + args = cls.add_new_arg_to_args(args, "is_confirmed", cls.is_confirmed == True) + args = cls.add_new_arg_to_args(args, "active", cls.active == True) + args = cls.add_new_arg_to_args(args, "deleted", cls.deleted == False) + return args + + @classmethod + def select_only( + cls: Type[T], + db: Session, + *args: BinaryExpression, + select_args: List[Any], + order_by: Optional[Any] = None, + limit: Optional[int] = None, + system: bool = False, + ) -> PostgresResponse: + """ + Select specific columns from filtered query. + + Args: + db: Database session + args: Filter conditions + select_args: Columns to select + order_by: Optional ordering + limit: Optional result limit + system: If True, skip status filtering + + Returns: + Query response with selected columns + """ + if not system: + args = cls.get_active_and_confirmed_query_arg(args) + args = cls.get_not_expired_query_arg(args) + + query = cls._query(db).filter(*args).with_entities(*select_args) + cls.query_config.set_total_count(query.count()) + + if order_by is not None: + query = query.order_by(order_by) + if limit: + query = query.limit(limit) + + return PostgresResponse(query=query, first=False) + + @classmethod + def filter_by_all( + cls: Type[T], db: Session, system: bool = False, **kwargs + ) -> PostgresResponse: + """ + Filter multiple records by keyword arguments. + + Args: + db: Database session + system: If True, skip status filtering + **kwargs: Filter criteria + + Returns: + Query response with matching records + """ + if "is_confirmed" not in kwargs and not system: + kwargs["is_confirmed"] = True + kwargs.pop("system", None) + + query = cls._query(db).filter_by(**kwargs) + cls.query_config.set_total_count(query.count()) + + if cls.query_config.filter_config: + filter_list = cls.get_filter_attributes() + query = cls.add_query_to_filter(query, filter_list) + + return PostgresResponse(query=query, first=False) + + @classmethod + def filter_by_one( + cls: Type[T], db: Session, system: bool = False, **kwargs + ) -> PostgresResponse: + """ + Filter single record by keyword arguments. + + Args: + db: Database session + system: If True, skip status filtering + **kwargs: Filter criteria + + Returns: + Query response with single record + """ + if "is_confirmed" not in kwargs and not system: + kwargs["is_confirmed"] = True + kwargs.pop("system", None) + + query = cls._query(db).filter_by(**kwargs) + cls.query_config.set_total_count(1) + + return PostgresResponse(query=query, first=True) + + @classmethod + def filter_all( + cls: Type[T], *args: Any, db: Session, system: bool = False + ) -> PostgresResponse: + """ + Filter multiple records by expressions. + + Args: + db: Database session + args: Filter expressions + system: If True, skip status filtering + + Returns: + Query response with matching records + """ + if not system: + args = cls.get_active_and_confirmed_query_arg(args) + args = cls.get_not_expired_query_arg(args) + + filter_list = cls.get_filter_attributes() + if filter_list.get("query"): + for smart_iter in cls.filter_expr(**filter_list["query"]): + if key := getattr(getattr(smart_iter, "left", None), "key", None): + args = cls.add_new_arg_to_args(args, key, smart_iter) + + query = cls._query(db) + cls.query_config.set_total_count(query.count()) + query = query.filter(*args) + + if cls.query_config.filter_config: + query = cls.add_query_to_filter(query, filter_list) + + return PostgresResponse(query=query, first=False) + + @classmethod + def filter_one( + cls: Type[T], + *args: Any, + db: Session, + system: bool = False, + expired: bool = False, + ) -> PostgresResponse: + """ + Filter single record by expressions. + + Args: + db: Database session + args: Filter expressions + system: If True, skip status filtering + expired: If True, include expired records + + Returns: + Query response with single record + """ + if not system: + args = cls.get_active_and_confirmed_query_arg(args) + if not expired: + args = cls.get_not_expired_query_arg(args) + + query = cls._query(db).filter(*args) + cls.query_config.set_total_count(1) + + return PostgresResponse(query=query, first=True) + + # @classmethod + # def raise_http_exception( + # cls, + # status_code: str, + # error_case: str, + # data: Dict[str, Any], + # message: str, + # ) -> None: + # """ + # Raise HTTP exception with formatted error details. + + # Args: + # status_code: HTTP status code string + # error_case: Error type + # data: Additional error data + # message: Error message + + # Raises: + # HTTPException: With formatted error details + # """ + # raise HTTPExceptionApi( + # error_code="HTTP_304_NOT_MODIFIED", + # lang=cls.lang or "tr", loc=get_line_number_for_error() + # ) diff --git a/Services/PostgresDb/Models/mixins.py b/Services/PostgresDb/Models_old/mixins.py similarity index 100% rename from Services/PostgresDb/Models/mixins.py rename to Services/PostgresDb/Models_old/mixins.py diff --git a/Services/PostgresDb/Models/query.py b/Services/PostgresDb/Models_old/query.py similarity index 100% rename from Services/PostgresDb/Models/query.py rename to Services/PostgresDb/Models_old/query.py diff --git a/Services/PostgresDb/Models_old/response.py b/Services/PostgresDb/Models_old/response.py new file mode 100644 index 0000000..60d8cf0 --- /dev/null +++ b/Services/PostgresDb/Models_old/response.py @@ -0,0 +1,90 @@ +""" +Response handler for PostgreSQL query results. + +This module provides a wrapper class for SQLAlchemy query results, +adding convenience methods for accessing data and managing query state. +""" + +from typing import Any, Dict, List, Optional, TypeVar, Generic, Union +from sqlalchemy.orm import Query + +T = TypeVar("T") + + +class PostgresResponse(Generic[T]): + """ + Wrapper for PostgreSQL/SQLAlchemy query results. + + Attributes: + query: SQLAlchemy query object + first: Whether to return first result only + data: Query results (lazy loaded) + count: Total count of results + + Properties: + all: All results as list + first_item: First result only + """ + + def __init__( + self, + query: Query, + first: bool = False, + status: bool = True, + message: str = "", + error: Optional[str] = None, + ): + self._query = query + self._first = first + self.status = status + self.message = message + self.error = error + self._data: Optional[Union[List[T], T]] = None + self._count: Optional[int] = None + + @property + def query(self) -> Query: + """Get query object.""" + return self._query + + @property + def data(self) -> Union[List[T], T, None]: + """ + Lazy load and return query results. + Returns first item if first=True, otherwise returns all results. + """ + if self._data is None: + results = self._query.all() + self._data = results[0] if self._first and results else results + return self._data + + @property + def count(self) -> int: + """Lazy load and return total count of results.""" + if self._count is None: + self._count = self._query.count() + return self._count + + @property + def all(self) -> List[T]: + """Get all results as list.""" + return ( + self.data + if isinstance(self.data, list) + else [self.data] if self.data else [] + ) + + @property + def first(self) -> Optional[T]: + """Get first result only.""" + return self.data if self._first else (self.data[0] if self.data else None) + + def as_dict(self) -> Dict[str, Any]: + """Convert response to dictionary format.""" + return { + "status": self.status, + "message": self.message, + "data": self.data, + "count": self.count, + "error": self.error, + } diff --git a/Services/PostgresDb/how_to.py b/Services/PostgresDb/how_to.py index e69de29..a1858df 100644 --- a/Services/PostgresDb/how_to.py +++ b/Services/PostgresDb/how_to.py @@ -0,0 +1,81 @@ + +from Schemas import AddressNeighborhood +from Services.PostgresDb.Models.crud_alchemy import Credentials +from Services.PostgresDb.Models.mixin import BasicMixin +from Services.PostgresDb.Models.pagination import Pagination, PaginationResult + + +listing = False +creating = False +updating = True + +new_session = AddressNeighborhood.new_session() +new_session_test = AddressNeighborhood.new_session() + +BasicMixin.creds = Credentials(person_id=10, person_name='Berkay Super User') + + +if listing: + """List Options and Queries """ + AddressNeighborhood.pre_query = AddressNeighborhood.filter_all( + AddressNeighborhood.neighborhood_code.icontains('10'), + db=new_session, + ).query + query_of_list_options = { + "neighborhood_name__ilike": "A%", + "neighborhood_code__contains": "3", + } + address_neighborhoods = AddressNeighborhood.filter_all( + *AddressNeighborhood.convert(query_of_list_options), + db=new_session, + ) + pagination = Pagination(data=address_neighborhoods) + pagination.page = 9 + pagination.size = 10 + pagination.orderField = ['type_code','neighborhood_code'] + pagination.orderType = ['desc', 'asc'] + + pagination_result = PaginationResult(data=address_neighborhoods, pagination=pagination) + print(pagination_result.pagination.as_dict()) + print(pagination_result.data) + +if creating: + """Create Queries """ + find_or_create = AddressNeighborhood.find_or_create( + neighborhood_code='100', + neighborhood_name='Test', + locality_id=15334, + db=new_session, + ) + find_or_create.save_via_metadata(db=new_session) + find_or_create.destroy(db=new_session) + find_or_create.save_via_metadata(db=new_session) + find_or_create = AddressNeighborhood.find_or_create( + neighborhood_code='100', + neighborhood_name='Test', + locality_id=15334, + db=new_session, + ) + find_or_create.save_via_metadata(db=new_session) + +if updating: + """Update Queries """ + + query_of_list_options = { + "uu_id": str("33a89767-d2dc-4531-8f66-7b650e22a8a7"), + } + print('query_of_list_options', query_of_list_options) + address_neighborhoods_one = AddressNeighborhood.filter_one( + *AddressNeighborhood.convert(query_of_list_options), + db=new_session, + ).data + address_neighborhoods_one.update( + neighborhood_name='Test 44', + db=new_session, + ) + address_neighborhoods_one.save(db=new_session) + address_neighborhoods_one = AddressNeighborhood.filter_one( + *AddressNeighborhood.convert(query_of_list_options), + db=new_session, + ).data_as_dict + print('address_neighborhoods_one', address_neighborhoods_one)