From e5f88f2eb4eb205710276aa3811a831e7baea6c1 Mon Sep 17 00:00:00 2001 From: berkay Date: Tue, 22 Apr 2025 11:10:29 +0300 Subject: [PATCH] black shift --- .../abstracts/default_validations.py | 24 +-- ApiDefaults/create_app.py | 13 +- .../DealerService/init_applications.py | 1 - .../IdentityService/Endpoints/people/route.py | 6 +- .../IdentityService/Endpoints/routes.py | 6 +- .../IdentityService/Endpoints/user/route.py | 16 +- .../IdentityService/Events/people/cluster.py | 17 +- .../IdentityService/Events/user/cluster.py | 17 +- .../Events/user/supers_events.py | 2 + ApiServices/TemplateService/create_app.py | 2 +- .../TemplateService/events/__init__.py | 7 +- .../events/template/cluster.py | 6 +- .../initializer/event_clusters.py | 7 +- BankServices/EmailService/app.py | 96 ++++++----- BankServices/ParserService/app.py | 110 ++++++++----- BankServices/RoutineEmailService/app.py | 71 ++++---- BankServices/SenderService/app.py | 98 ++++++----- BankServices/ServiceDepends/config.py | 2 +- BankServices/WriterService/app.py | 72 ++++---- Controllers/Mongo/database.py | 98 ++++++----- Controllers/Mongo/implementations.py | 137 +++++++++------- Controllers/Mongo/local_test.py | 25 +-- Controllers/Postgres/config.py | 2 +- Controllers/Postgres/crud.py | 4 +- Controllers/Postgres/implementations.py | 32 ++-- Controllers/Redis/Broadcast/actions.py | 154 +++++++++--------- .../Redis/Broadcast/implementations.py | 79 +++++---- Controllers/Redis/connection.py | 6 +- Controllers/Redis/implementations.py | 78 +++++---- Schemas/building/build.py | 4 +- 30 files changed, 671 insertions(+), 521 deletions(-) diff --git a/ApiControllers/abstracts/default_validations.py b/ApiControllers/abstracts/default_validations.py index 20070a8..d5e6a23 100644 --- a/ApiControllers/abstracts/default_validations.py +++ b/ApiControllers/abstracts/default_validations.py @@ -23,26 +23,26 @@ class CommonHeaders(BaseModel): tz: str = Header(None, alias="timezone"), ): token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None) - + # Extract operation_id from the route operation_id = None - if hasattr(request.scope.get('route'), 'operation_id'): - operation_id = request.scope.get('route').operation_id - + if hasattr(request.scope.get("route"), "operation_id"): + operation_id = request.scope.get("route").operation_id + return cls( - language=language, - domain=domain, - timezone=tz, - token=token, - request=request, + language=language, + domain=domain, + timezone=tz, + token=token, + request=request, response=response, - operation_id=operation_id + operation_id=operation_id, ) - + def get_headers_dict(self): """Convert the headers to a dictionary format used in the application""" import uuid - + return { "language": self.language or "", "domain": self.domain or "", diff --git a/ApiDefaults/create_app.py b/ApiDefaults/create_app.py index bcee2eb..1b3159a 100644 --- a/ApiDefaults/create_app.py +++ b/ApiDefaults/create_app.py @@ -5,13 +5,18 @@ from fastapi.responses import RedirectResponse def create_events_if_any_cluster_set(): - import Events + import Events + if not Events.__all__: return - - router_cluster_stack: list[RouterCluster] = [getattr(Events, e, None) for e in Events.__all__] + + router_cluster_stack: list[RouterCluster] = [ + getattr(Events, e, None) for e in Events.__all__ + ] for router_cluster in router_cluster_stack: - event_cluster_stack: list[EventCluster] = list(router_cluster.event_clusters.values()) + event_cluster_stack: list[EventCluster] = list( + router_cluster.event_clusters.values() + ) for event_cluster in event_cluster_stack: print(f"Creating event:", event_cluster.name) try: diff --git a/ApiServices/DealerService/init_applications.py b/ApiServices/DealerService/init_applications.py index 767de93..6eb3a6b 100644 --- a/ApiServices/DealerService/init_applications.py +++ b/ApiServices/DealerService/init_applications.py @@ -200,7 +200,6 @@ def init_applications_for_tenant(super_user: BuildLivingSpace, db_session=None) application_type="Dash", description="Individual Page for tenant account view", ), - ] for list_of_created_app in list_of_created_apps: diff --git a/ApiServices/IdentityService/Endpoints/people/route.py b/ApiServices/IdentityService/Endpoints/people/route.py index 256c96b..3d04612 100644 --- a/ApiServices/IdentityService/Endpoints/people/route.py +++ b/ApiServices/IdentityService/Endpoints/people/route.py @@ -22,7 +22,7 @@ def people_route_list( List people endpoint """ token_object = TokenProvider.get_dict_from_redis(token=headers.token) - event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object) + event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object) event_key = TokenProvider.retrieve_event_codes(**event_founder_dict) FoundCluster = PeopleRouterCluster.get_event_cluster("PeopleList") event_cluster_matched = FoundCluster.match_event(event_key=event_key) @@ -41,7 +41,7 @@ def people_route_create( Create people endpoint """ token_object = TokenProvider.get_dict_from_redis(token=headers.token) - event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object) + event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object) event_key = TokenProvider.retrieve_event_codes(**event_founder_dict) FoundCluster = PeopleRouterCluster.get_event_cluster("PeopleCreate") event_cluster_matched = FoundCluster.match_event(event_key=event_key) @@ -60,7 +60,7 @@ def people_route_update( Update people endpoint """ token_object = TokenProvider.get_dict_from_redis(token=headers.token) - event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object) + event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object) event_key = TokenProvider.retrieve_event_codes(**event_founder_dict) FoundCluster = PeopleRouterCluster.get_event_cluster("PeopleUpdate") event_cluster_matched = FoundCluster.match_event(event_key=event_key) diff --git a/ApiServices/IdentityService/Endpoints/routes.py b/ApiServices/IdentityService/Endpoints/routes.py index 2125826..df1570b 100644 --- a/ApiServices/IdentityService/Endpoints/routes.py +++ b/ApiServices/IdentityService/Endpoints/routes.py @@ -4,10 +4,8 @@ from fastapi import APIRouter def get_routes() -> list[APIRouter]: from .people.route import people_route from .user.route import user_route - return [ - user_route, - people_route - ] + + return [user_route, people_route] def get_safe_endpoint_urls() -> list[tuple[str, str]]: diff --git a/ApiServices/IdentityService/Endpoints/user/route.py b/ApiServices/IdentityService/Endpoints/user/route.py index f919bfb..76aa906 100644 --- a/ApiServices/IdentityService/Endpoints/user/route.py +++ b/ApiServices/IdentityService/Endpoints/user/route.py @@ -28,7 +28,9 @@ def user_list_route( endpoint_code = "5bc09312-d3f2-4f47-baba-17c928706da8" token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None) token_object = TokenProvider.get_dict_from_redis(token=token) - event_key = TokenProvider.retrieve_event_codes(endpoint_code=endpoint_code, token=token_object) + event_key = TokenProvider.retrieve_event_codes( + endpoint_code=endpoint_code, token=token_object + ) headers = { "language": language or "", "domain": domain or "", @@ -36,7 +38,9 @@ def user_list_route( "tz": tz or "GMT+3", "token": token, } - event_cluster_matched = UserRouterCluster.get_event_cluster("UserList").match_event(event_key=event_key) + event_cluster_matched = UserRouterCluster.get_event_cluster("UserList").match_event( + event_key=event_key + ) response.headers["X-Header"] = "Test Header GET" if runner_callable := event_cluster_matched.event_callable(): return runner_callable @@ -71,7 +75,9 @@ def user_create_route( "tz": tz or "GMT+3", "token": token, } - event_cluster_matched = UserRouterCluster.get_event_cluster("UserCreate").match_event(event_key=event_key) + event_cluster_matched = UserRouterCluster.get_event_cluster( + "UserCreate" + ).match_event(event_key=event_key) response.headers["X-Header"] = "Test Header POST" if runner_callable := event_cluster_matched.event_callable(): return runner_callable @@ -100,7 +106,9 @@ def user_update_route(request: Request, response: Response): "tz": tz or "GMT+3", "token": token, } - event_cluster_matched = UserRouterCluster.get_event_cluster("UserUpdate").match_event(event_key=event_key) + event_cluster_matched = UserRouterCluster.get_event_cluster( + "UserUpdate" + ).match_event(event_key=event_key) response.headers["X-Header"] = "Test Header POST" if runner_callable := event_cluster_matched.event_callable(): return runner_callable diff --git a/ApiServices/IdentityService/Events/people/cluster.py b/ApiServices/IdentityService/Events/people/cluster.py index 92de56f..fbd2519 100644 --- a/ApiServices/IdentityService/Events/people/cluster.py +++ b/ApiServices/IdentityService/Events/people/cluster.py @@ -2,31 +2,24 @@ from ApiControllers.abstracts.event_clusters import EventCluster, RouterCluster from .supers_events import ( SupersPeopleCreateEvent, SupersPeopleUpdateEvent, - SupersPeopleListEvent + SupersPeopleListEvent, ) -PeopleRouterCluster = RouterCluster( - name="PeopleRouterCluster" -) +PeopleRouterCluster = RouterCluster(name="PeopleRouterCluster") PeopleEventClusterList = EventCluster( - name="PeopleList", - endpoint_uu_id="f102db46-031a-43e4-966a-dae6896f985b" + name="PeopleList", endpoint_uu_id="f102db46-031a-43e4-966a-dae6896f985b" ) PeopleEventClusterList.add_event(SupersPeopleListEvent) PeopleEventClusterCreate = EventCluster( - name="PeopleCreate", - endpoint_uu_id="eb465fde-337f-4b81-94cf-28c6d4f2b1b6" + name="PeopleCreate", endpoint_uu_id="eb465fde-337f-4b81-94cf-28c6d4f2b1b6" ) PeopleEventClusterCreate.add_event(SupersPeopleCreateEvent) PeopleEventClusterUpdate = EventCluster( - name="PeopleUpdate", - endpoint_uu_id="c9e5ba69-6915-43f5-8f9c-a5c2aa865b89" + name="PeopleUpdate", endpoint_uu_id="c9e5ba69-6915-43f5-8f9c-a5c2aa865b89" ) PeopleEventClusterUpdate.add_event(SupersPeopleUpdateEvent) PeopleRouterCluster.set_event_cluster(PeopleEventClusterList) PeopleRouterCluster.set_event_cluster(PeopleEventClusterCreate) PeopleRouterCluster.set_event_cluster(PeopleEventClusterUpdate) - - diff --git a/ApiServices/IdentityService/Events/user/cluster.py b/ApiServices/IdentityService/Events/user/cluster.py index cc44565..0ad56c0 100644 --- a/ApiServices/IdentityService/Events/user/cluster.py +++ b/ApiServices/IdentityService/Events/user/cluster.py @@ -1,29 +1,24 @@ from ApiControllers.abstracts.event_clusters import EventCluster, RouterCluster from .supers_events import ( - SuperUsersListEvent, - SuperUsersCreateEvent, + SuperUsersListEvent, + SuperUsersCreateEvent, SuperUsersUpdateEvent, ) -UserRouterCluster = RouterCluster( - name="UserRouterCluster" -) +UserRouterCluster = RouterCluster(name="UserRouterCluster") UserEventClusterList = EventCluster( - name="UserList", - endpoint_uu_id="5bc09312-d3f2-4f47-baba-17c928706da8" + name="UserList", endpoint_uu_id="5bc09312-d3f2-4f47-baba-17c928706da8" ) UserEventClusterList.add_event(SuperUsersListEvent) UserEventClusterCreate = EventCluster( - name="UserCreate", - endpoint_uu_id="08d4b572-1584-47bb-aa42-8d068e5514e7" + name="UserCreate", endpoint_uu_id="08d4b572-1584-47bb-aa42-8d068e5514e7" ) UserEventClusterCreate.add_event(SuperUsersCreateEvent) UserEventClusterUpdate = EventCluster( - name="UserUpdate", - endpoint_uu_id="b641236a-928d-4f19-a1d2-5edf611d1e56" + name="UserUpdate", endpoint_uu_id="b641236a-928d-4f19-a1d2-5edf611d1e56" ) UserEventClusterUpdate.add_event(SuperUsersUpdateEvent) diff --git a/ApiServices/IdentityService/Events/user/supers_events.py b/ApiServices/IdentityService/Events/user/supers_events.py index e560a5f..87fd284 100644 --- a/ApiServices/IdentityService/Events/user/supers_events.py +++ b/ApiServices/IdentityService/Events/user/supers_events.py @@ -71,6 +71,7 @@ def supers_users_create_callable(): }, } + SuperUsersCreateEvent.event_callable = supers_users_create_callable # Update endpoint @@ -96,4 +97,5 @@ def supers_users_update_callable(): }, } + SuperUsersUpdateEvent.event_callable = supers_users_update_callable diff --git a/ApiServices/TemplateService/create_app.py b/ApiServices/TemplateService/create_app.py index 46bcebf..f3571d5 100644 --- a/ApiServices/TemplateService/create_app.py +++ b/ApiServices/TemplateService/create_app.py @@ -11,7 +11,7 @@ from config import api_config def create_events_if_any_cluster_set(): - from events import retrieve_all_clusters + from events import retrieve_all_clusters for event_cluster in retrieve_all_clusters(): for event in event_cluster.retrieve_all_event_clusters: diff --git a/ApiServices/TemplateService/events/__init__.py b/ApiServices/TemplateService/events/__init__.py index 3f80219..6bcf85d 100644 --- a/ApiServices/TemplateService/events/__init__.py +++ b/ApiServices/TemplateService/events/__init__.py @@ -1,10 +1,9 @@ -from .template.cluster import ( - TemplateEventClusterSet -) +from .template.cluster import TemplateEventClusterSet __all__ = [ "TemplateEventClusterSet", ] + def retrieve_all_clusters(): - return [TemplateEventClusterSet] \ No newline at end of file + return [TemplateEventClusterSet] diff --git a/ApiServices/TemplateService/events/template/cluster.py b/ApiServices/TemplateService/events/template/cluster.py index 6753615..5f38a61 100644 --- a/ApiServices/TemplateService/events/template/cluster.py +++ b/ApiServices/TemplateService/events/template/cluster.py @@ -1,4 +1,7 @@ -from ApiServices.TemplateService.initializer.event_clusters import EventCluster, SetEventCluster +from ApiServices.TemplateService.initializer.event_clusters import ( + EventCluster, + SetEventCluster, +) TemplateEventCluster = EventCluster( @@ -12,4 +15,3 @@ OtherTemplateEventCluster = EventCluster( TemplateEventClusterSet = SetEventCluster() TemplateEventClusterSet.add_event_cluster(TemplateEventCluster) TemplateEventClusterSet.add_event_cluster(OtherTemplateEventCluster) - diff --git a/ApiServices/TemplateService/initializer/event_clusters.py b/ApiServices/TemplateService/initializer/event_clusters.py index 63dd576..66dc092 100644 --- a/ApiServices/TemplateService/initializer/event_clusters.py +++ b/ApiServices/TemplateService/initializer/event_clusters.py @@ -6,6 +6,7 @@ class EventCluster: """ EventCluster """ + def __init__(self, endpoint_uu_id: str): self.endpoint_uu_id = endpoint_uu_id self.events = [] @@ -96,13 +97,17 @@ class SetEventCluster: """ SetEventCluster """ + list_of_event_clusters: list[EventCluster] = [] def add_event_cluster(self, event_cluster: EventCluster): """ Add an event cluster to the set """ - endpoint_uu_id_list = [event_cluster_uuid.endpoint_uu_id for event_cluster_uuid in self.list_of_event_clusters] + endpoint_uu_id_list = [ + event_cluster_uuid.endpoint_uu_id + for event_cluster_uuid in self.list_of_event_clusters + ] if event_cluster.endpoint_uu_id not in endpoint_uu_id_list: self.list_of_event_clusters.append(event_cluster) diff --git a/BankServices/EmailService/app.py b/BankServices/EmailService/app.py index d90361a..e625dc7 100644 --- a/BankServices/EmailService/app.py +++ b/BankServices/EmailService/app.py @@ -30,46 +30,46 @@ T = TypeVar("T") class EmailProcessingContext: """Context manager for email processing that marks emails as unread if an error occurs.""" - + def __init__(self, email_message, mark_as_read: bool = True): self.email_message = email_message self.mark_as_read = mark_as_read self.success = False - + def __enter__(self): return self - + def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None or not self.success: # If an exception occurred or processing wasn't successful, mark as unread try: - if hasattr(self.email_message, 'mark_as_unread'): + if hasattr(self.email_message, "mark_as_unread"): self.email_message.mark_as_unread() - print(f"[EMAIL_SERVICE] Marked email as UNREAD due to processing error: {exc_val if exc_val else 'Unknown error'}") + print( + f"[EMAIL_SERVICE] Marked email as UNREAD due to processing error: {exc_val if exc_val else 'Unknown error'}" + ) except Exception as e: print(f"[EMAIL_SERVICE] Failed to mark email as unread: {str(e)}") elif self.mark_as_read: # If processing was successful and mark_as_read is True, ensure it's marked as read try: - if hasattr(self.email_message, 'mark_as_read'): + if hasattr(self.email_message, "mark_as_read"): self.email_message.mark_as_read() except Exception as e: print(f"[EMAIL_SERVICE] Failed to mark email as read: {str(e)}") return False # Don't suppress exceptions -def publish_payload_to_redis( - payload, filename: str, mail_info: dict -) -> bool: +def publish_payload_to_redis(payload, filename: str, mail_info: dict) -> bool: # Create message document # Use base64 encoding for binary payloads to ensure proper transmission if isinstance(payload, bytes): - encoded_payload = base64.b64encode(payload).decode('utf-8') + encoded_payload = base64.b64encode(payload).decode("utf-8") is_base64 = True else: encoded_payload = payload is_base64 = False - + message = { "filename": filename, "payload": encoded_payload, @@ -79,12 +79,14 @@ def publish_payload_to_redis( "uuid": str(uuid4()), # Use UUID **mail_info, } - + # Publish to Redis channel result = redis_pubsub.publisher.publish(REDIS_CHANNEL, message) - + if result.status: - print(f"[EMAIL_SERVICE] Published message with filename: {filename} to channel: {REDIS_CHANNEL}") + print( + f"[EMAIL_SERVICE] Published message with filename: {filename} to channel: {REDIS_CHANNEL}" + ) return True else: print(f"[EMAIL_SERVICE] Publish error: {result.error}") @@ -126,7 +128,7 @@ def app(): port = Config.EMAIL_PORT username = Config.EMAIL_USERNAME password = Config.EMAIL_PASSWORD - + box = EmailBox(host=host, port=port, username=username, password=password) if not box: return Exception("Mailbox not found") @@ -136,41 +138,51 @@ def app(): filter_mail = OR(FROM(Config.MAILBOX), FROM(Config.MAIN_MAIL)) filter_print = f"{Config.MAILBOX} & {Config.MAIN_MAIL}" - # Determine if this is the first run of the day + # Determine if this is the first run of the day # Store last run date in a file last_run_file = "/tmp/email_service_last_run.json" current_date = datetime.now().strftime("%Y-%m-%d") days_to_check, full_check = 7, 90 # Default to 7 days - + try: if os.path.exists(last_run_file): - with open(last_run_file, 'r') as f: + with open(last_run_file, "r") as f: last_run_data = json.load(f) - last_run_date = last_run_data.get('last_run_date') - + last_run_date = last_run_data.get("last_run_date") + # If this is the first run of a new day, check 90 days if last_run_date != current_date: days_to_check = full_check - print(f"[EMAIL_SERVICE] First run of the day. Checking emails from the past {days_to_check} days") + print( + f"[EMAIL_SERVICE] First run of the day. Checking emails from the past {days_to_check} days" + ) else: - print(f"[EMAIL_SERVICE] Subsequent run today. Checking emails from the past {days_to_check} days") + print( + f"[EMAIL_SERVICE] Subsequent run today. Checking emails from the past {days_to_check} days" + ) else: # If no last run file exists, this is the first run ever - check 90 days days_to_check = full_check - print(f"[EMAIL_SERVICE] First run detected. Checking emails from the past {days_to_check} days") + print( + f"[EMAIL_SERVICE] First run detected. Checking emails from the past {days_to_check} days" + ) except Exception as e: - print(f"[EMAIL_SERVICE] Error reading last run file: {str(e)}. Using default of {days_to_check} days") - + print( + f"[EMAIL_SERVICE] Error reading last run file: {str(e)}. Using default of {days_to_check} days" + ) + # Update the last run file try: - with open(last_run_file, 'w') as f: - json.dump({'last_run_date': current_date}, f) + with open(last_run_file, "w") as f: + json.dump({"last_run_date": current_date}, f) except Exception as e: print(f"[EMAIL_SERVICE] Error writing last run file: {str(e)}") - + # Calculate the date to check from - check_since_date = (datetime.now() - timedelta(days=days_to_check)).strftime("%d-%b-%Y") - + check_since_date = (datetime.now() - timedelta(days=days_to_check)).strftime( + "%d-%b-%Y" + ) + for folder in mail_folders: if folder.name == "INBOX": # Search for emails since the calculated date @@ -184,27 +196,33 @@ def app(): # Use context manager to handle errors and mark email as unread if needed with EmailProcessingContext(banks_mail) as ctx: try: - headers = {k.lower(): v for k, v in banks_mail.headers.items()} + headers = { + k.lower(): v for k, v in banks_mail.headers.items() + } mail_info = { "from": headers["from"], "to": headers["to"], "subject": headers["subject"], "date": str(headers["date"]), } - + # Process the email and publish to Redis success = read_email_and_publish_to_redis( email_message=email_message, mail_info=mail_info ) - + # Set success flag for the context manager ctx.success = success - + if success: - print(f"[EMAIL_SERVICE] Successfully processed email with subject: {mail_info['subject']}") + print( + f"[EMAIL_SERVICE] Successfully processed email with subject: {mail_info['subject']}" + ) else: - print(f"[EMAIL_SERVICE] No matching attachments found in email with subject: {mail_info['subject']}") - + print( + f"[EMAIL_SERVICE] No matching attachments found in email with subject: {mail_info['subject']}" + ) + except Exception as e: print(f"[EMAIL_SERVICE] Error processing email: {str(e)}") # The context manager will mark the email as unread @@ -213,8 +231,8 @@ def app(): if __name__ == "__main__": print("=== Starting Email Service with Redis Pub/Sub ===") print(f"Publishing to channel: {REDIS_CHANNEL}") - time.sleep(20) # Wait for 20 seconds to other services to kick in - + time.sleep(20) # Wait for 20 seconds to other services to kick in + while True: print("\n[EMAIL_SERVICE] Checking for new emails...") app() diff --git a/BankServices/ParserService/app.py b/BankServices/ParserService/app.py index 4aff3b8..a3d6e32 100644 --- a/BankServices/ParserService/app.py +++ b/BankServices/ParserService/app.py @@ -18,14 +18,16 @@ REDIS_CHANNEL_OUT = "parser" # Publish to Parser Service channel delimiter = "|" -def publish_parsed_data_to_redis(data, collected_data_dict: list[dict], filename: str) -> bool: +def publish_parsed_data_to_redis( + data, collected_data_dict: list[dict], filename: str +) -> bool: """Publish parsed data to Redis. - + Args: data: Original message data from Redis collected_data_dict: Parsed data from Excel file filename: Name of the processed file - + Returns: bool: Success status """ @@ -40,16 +42,18 @@ def publish_parsed_data_to_redis(data, collected_data_dict: list[dict], filename else: message["parsed"] = None message["stage"] = "not found" # Mark as 'not found' if parsing failed - + # Add processing timestamp message["parsed_at"] = str(arrow.now()) message["filename"] = filename - + # Publish to Redis channel result = redis_pubsub.publisher.publish(REDIS_CHANNEL_OUT, message) - + if result.status: - print(f"[PARSER_SERVICE] Published parsed data for {filename} with stage: {message['stage']}") + print( + f"[PARSER_SERVICE] Published parsed data for {filename} with stage: {message['stage']}" + ) return True else: print(f"[PARSER_SERVICE] Publish error: {result.error}") @@ -58,10 +62,10 @@ def publish_parsed_data_to_redis(data, collected_data_dict: list[dict], filename def parse_excel_file(excel_frame: DataFrame) -> list[dict]: """Parse Excel file data. - + Args: excel_frame: DataFrame containing Excel data - + Returns: list[dict]: List of parsed data dictionaries """ @@ -76,13 +80,17 @@ def parse_excel_file(excel_frame: DataFrame) -> list[dict]: dict( iban=str(iban), bank_date=arrow.get( - datetime.datetime.strptime(str(row[1]), "%d/%m/%Y-%H:%M:%S") + datetime.datetime.strptime( + str(row[1]), "%d/%m/%Y-%H:%M:%S" + ) ).__str__(), channel_branch=unidecode(str(row[3])), currency_value=( float(str(row[4]).replace(",", "")) if row[4] else 0 ), - balance=float(str(row[5]).replace(",", "")) if row[5] else 0, + balance=( + float(str(row[5]).replace(",", "")) if row[5] else 0 + ), additional_balance=( float(str(row[6]).replace(",", "")) if row[6] else 0 ), @@ -92,7 +100,9 @@ def parse_excel_file(excel_frame: DataFrame) -> list[dict]: bank_reference_code=str(row[15]), ) ) - print(f"[PARSER_SERVICE] Successfully parsed {len(data_list)} records from Excel file") + print( + f"[PARSER_SERVICE] Successfully parsed {len(data_list)} records from Excel file" + ) except Exception as e: print(f"[PARSER_SERVICE] Error parsing Excel file: {str(e)}") return data_list @@ -100,13 +110,13 @@ def parse_excel_file(excel_frame: DataFrame) -> list[dict]: def process_message(message): """Process a message from Redis. - + Args: message: Message data from Redis subscriber """ # Extract the message data data = message["data"] - + # If data is a string, parse it as JSON if isinstance(data, str): try: @@ -114,7 +124,7 @@ def process_message(message): except json.JSONDecodeError as e: print(f"[PARSER_SERVICE] Error parsing message data: {e}") return - + # Check if stage is 'red' before processing if data.get("stage") == "red": try: @@ -122,91 +132,109 @@ def process_message(message): payload = data.get("payload") is_base64 = data.get("is_base64", False) print(f"[PARSER_SERVICE] Processing file: {filename}") - + # Handle base64-encoded payload if is_base64 and isinstance(payload, str): try: # Decode base64 string to bytes payload = base64.b64decode(payload) - print(f"[PARSER_SERVICE] Successfully decoded base64 payload, size: {len(payload)} bytes") + print( + f"[PARSER_SERVICE] Successfully decoded base64 payload, size: {len(payload)} bytes" + ) except Exception as e: print(f"[PARSER_SERVICE] Error decoding base64 payload: {str(e)}") # Convert regular string payload to bytes if needed elif isinstance(payload, str): - payload = payload.encode('utf-8') - + payload = payload.encode("utf-8") + # Create an in-memory file-like object and try multiple approaches excel_frame = None errors = [] - + # Save payload to a temporary file for debugging if needed temp_file_path = f"/tmp/{filename}" try: - with open(temp_file_path, 'wb') as f: + with open(temp_file_path, "wb") as f: f.write(payload) - print(f"[PARSER_SERVICE] Saved payload to {temp_file_path} for debugging") + print( + f"[PARSER_SERVICE] Saved payload to {temp_file_path} for debugging" + ) except Exception as e: print(f"[PARSER_SERVICE] Could not save debug file: {str(e)}") - + # Try different approaches to read the Excel file approaches = [ # Approach 1: Try xlrd for .xls files - lambda: DataFrame(read_excel(io.BytesIO(payload), engine='xlrd')) if filename.lower().endswith('.xls') else None, + lambda: ( + DataFrame(read_excel(io.BytesIO(payload), engine="xlrd")) + if filename.lower().endswith(".xls") + else None + ), # Approach 2: Try openpyxl for .xlsx files - lambda: DataFrame(read_excel(io.BytesIO(payload), engine='openpyxl')) if filename.lower().endswith('.xlsx') else None, + lambda: ( + DataFrame(read_excel(io.BytesIO(payload), engine="openpyxl")) + if filename.lower().endswith(".xlsx") + else None + ), # Approach 3: Try xlrd with explicit sheet name - lambda: DataFrame(read_excel(io.BytesIO(payload), engine='xlrd', sheet_name=0)), + lambda: DataFrame( + read_excel(io.BytesIO(payload), engine="xlrd", sheet_name=0) + ), # Approach 4: Try with temporary file lambda: DataFrame(read_excel(temp_file_path)), ] - + # Try each approach until one works for i, approach in enumerate(approaches): try: result = approach() if result is not None: excel_frame = result - print(f"[PARSER_SERVICE] Successfully read Excel file using approach {i+1}") + print( + f"[PARSER_SERVICE] Successfully read Excel file using approach {i+1}" + ) break except Exception as e: errors.append(f"Approach {i+1}: {str(e)}") - + # If all approaches failed, raise an exception if excel_frame is None: error_details = "\n".join(errors) - raise Exception(f"Failed to read Excel file using all approaches:\n{error_details}") - + raise Exception( + f"Failed to read Excel file using all approaches:\n{error_details}" + ) + # Extract data from the Excel file collected_data_dict = parse_excel_file(excel_frame) - + # Publish parsed data to Redis publish_parsed_data_to_redis( - data=data, - collected_data_dict=collected_data_dict, - filename=filename + data=data, collected_data_dict=collected_data_dict, filename=filename ) except Exception as e: print(f"[PARSER_SERVICE] Error processing message: {str(e)}") else: - print(f"[PARSER_SERVICE] Skipped message with UUID: {data.get('uuid')} (stage is not 'red')") + print( + f"[PARSER_SERVICE] Skipped message with UUID: {data.get('uuid')} (stage is not 'red')" + ) def app(): """Main application function.""" print("[PARSER_SERVICE] Starting Parser Service") - + # Subscribe to the input channel result = redis_pubsub.subscriber.subscribe(REDIS_CHANNEL_IN, process_message) - + if result.status: print(f"[PARSER_SERVICE] Subscribed to channel: {REDIS_CHANNEL_IN}") else: print(f"[PARSER_SERVICE] Subscribe error: {result.error}") return - + # Start listening for messages listen_result = redis_pubsub.subscriber.start_listening(in_thread=True) - + if listen_result.status: print("[PARSER_SERVICE] Listening for messages") else: @@ -217,7 +245,7 @@ def app(): if __name__ == "__main__": # Initialize the app once app() - + # Keep the main thread alive try: while True: diff --git a/BankServices/RoutineEmailService/app.py b/BankServices/RoutineEmailService/app.py index 04dd111..d6d04db 100644 --- a/BankServices/RoutineEmailService/app.py +++ b/BankServices/RoutineEmailService/app.py @@ -11,13 +11,13 @@ def render_email_template( ) -> str: """ Render the HTML email template with the provided data. - + Args: headers: List of column headers for the table rows: List of data rows for the table balance_error: Flag indicating if there's a balance discrepancy bank_balance: Current bank balance formatted as string - + Returns: Rendered HTML template as string """ @@ -25,7 +25,7 @@ def render_email_template( # Look for template in ServiceDepends directory env = Environment(loader=FileSystemLoader("/")) template = env.get_template("template_accounts.html") - + # Render template with variables return template.render( headers=headers, @@ -35,7 +35,7 @@ def render_email_template( today=str(arrow.now().date()), ) except Exception as e: - print('Exception render template:',e) + print("Exception render template:", e) err = e raise @@ -43,25 +43,25 @@ def render_email_template( def send_email_to_given_address(send_to: str, html_template: str) -> bool: """ Send email with the rendered HTML template to the specified address. - + Args: send_to: Email address of the recipient html_template: Rendered HTML template content - + Returns: Boolean indicating if the email was sent successfully """ today = arrow.now() subject = f"{str(today.date())} Gunes Apt. Cari Durum Bilgilendirme Raporu" - + # Create email parameters using EmailSendModel email_params = EmailSendModel( subject=subject, html=html_template, receivers=[send_to], - text=f"Gunes Apt. Cari Durum Bilgilendirme Raporu - {today.date()}" + text=f"Gunes Apt. Cari Durum Bilgilendirme Raporu - {today.date()}", ) - + try: # Use the context manager to handle connection errors with EmailService.new_session() as email_session: @@ -69,63 +69,74 @@ def send_email_to_given_address(send_to: str, html_template: str) -> bool: EmailService.send_email(email_session, email_params) return True except Exception as e: - print(f'Exception send email: {e}') + print(f"Exception send email: {e}") return False def set_account_records_to_send_email() -> bool: """ Retrieve account records from the database, format them, and send an email report. - + Usage: from app import set_account_records_to_send_email - + Returns: Boolean indicating if the process completed successfully """ # Get database session and retrieve records with AccountRecords.new_session() as db_session: account_records_query = AccountRecords.filter_all(db=db_session).query - + # Get the 3 most recent records account_records: List[AccountRecords] | [] = ( account_records_query.order_by( - AccountRecords.bank_date.desc(), - AccountRecords.bank_reference_code.desc() + AccountRecords.bank_date.desc(), + AccountRecords.bank_reference_code.desc(), ) .limit(3) .all() ) - + # Check if we have enough records if len(account_records) < 2: return False - + # Check for balance discrepancy first_record, second_record = account_records[0], account_records[1] - expected_second_balance = first_record.bank_balance - first_record.currency_value + expected_second_balance = ( + first_record.bank_balance - first_record.currency_value + ) balance_error = expected_second_balance != second_record.bank_balance - + if balance_error: return False # Format rows for the email template list_of_rows = [] for record in account_records: - list_of_rows.append([ - record.bank_date.strftime("%d/%m/%Y %H:%M"), - record.process_comment, - f"{record.currency_value:,.2f}", - f"{record.bank_balance:,.2f}" - ]) + list_of_rows.append( + [ + record.bank_date.strftime("%d/%m/%Y %H:%M"), + record.process_comment, + f"{record.currency_value:,.2f}", + f"{record.bank_balance:,.2f}", + ] + ) # Get the most recent bank balance - last_bank_balance = sorted(account_records, key=lambda x: x.bank_date, reverse=True)[0].bank_balance + last_bank_balance = sorted( + account_records, key=lambda x: x.bank_date, reverse=True + )[0].bank_balance # Define headers for the table - headers = ["Ulaştığı Tarih", "Banka Transaksiyonu Ek Bilgi", "Aktarım Değeri", "Banka Bakiyesi"] - + headers = [ + "Ulaştığı Tarih", + "Banka Transaksiyonu Ek Bilgi", + "Aktarım Değeri", + "Banka Bakiyesi", + ] + # Recipient email address send_to = "karatay@mehmetkaratay.com.tr" - + # Render email template html_template = render_email_template( headers=headers, @@ -133,7 +144,7 @@ def set_account_records_to_send_email() -> bool: balance_error=balance_error, bank_balance=f"{last_bank_balance:,.2f}", ) - + # Send the email return send_email_to_given_address(send_to=send_to, html_template=html_template) diff --git a/BankServices/SenderService/app.py b/BankServices/SenderService/app.py index 4c837ce..e0afdb1 100644 --- a/BankServices/SenderService/app.py +++ b/BankServices/SenderService/app.py @@ -12,13 +12,13 @@ def render_email_template( ) -> str: """ Render the HTML email template with the provided data. - + Args: headers: List of column headers for the table rows: List of data rows for the table balance_error: Flag indicating if there's a balance discrepancy bank_balance: Current bank balance formatted as string - + Returns: Rendered HTML template as string """ @@ -26,7 +26,7 @@ def render_email_template( # Look for template in ServiceDepends directory env = Environment(loader=FileSystemLoader("/")) template = env.get_template("template_accounts.html") - + return template.render( headers=headers, rows=rows, @@ -35,32 +35,34 @@ def render_email_template( today=str(arrow.now().date()), ) except Exception as e: - print(f'Template rendering failed: {e}') + print(f"Template rendering failed: {e}") raise -def send_email_to_given_address(send_to: str, html_template: str, count_of_records: int) -> bool: +def send_email_to_given_address( + send_to: str, html_template: str, count_of_records: int +) -> bool: """ Send email with the rendered HTML template to the specified address. - + Args: send_to: Email address of the recipient html_template: Rendered HTML template content - + Returns: Boolean indicating if the email was sent successfully """ today = arrow.now() subject = f"{str(today.date())} Gunes Apt. Cari Durum Kayıt Giriş Raporu" - + # Create email parameters using EmailSendModel email_params = EmailSendModel( subject=subject + f" ({count_of_records} kayıt)", html=html_template, receivers=[send_to], - text=f"Gunes Apt. Cari Durum Kayıt Giriş Raporu - {today.date()}" + text=f"Gunes Apt. Cari Durum Kayıt Giriş Raporu - {today.date()}", ) - + try: # Use the context manager to handle connection errors with EmailService.new_session() as email_session: @@ -69,17 +71,17 @@ def send_email_to_given_address(send_to: str, html_template: str, count_of_recor print(f"Email successfully sent to: {send_to}") return True except Exception as e: - print(f'Failed to send email: {e}') + print(f"Failed to send email: {e}") return False def process_unsent_email_records() -> bool: """ Process account records that haven't been emailed yet. - + Finds records with is_email_send=False, formats them into an email, sends the email, and updates the records as sent if successful. - + Returns: bool: True if email was sent successfully, False otherwise """ @@ -87,42 +89,55 @@ def process_unsent_email_records() -> bool: # Use the context manager to handle database connections with AccountRecords.new_session() as db_session: # Query un-sent mail rows - with limit for display only - account_records_query = AccountRecords.filter_all( - AccountRecords.is_email_send == False, - db=db_session, - ).query.order_by(AccountRecords.bank_date.asc()).limit(20) - + account_records_query = ( + AccountRecords.filter_all( + AccountRecords.is_email_send == False, + db=db_session, + ) + .query.order_by(AccountRecords.bank_date.asc()) + .limit(20) + ) + account_records: List[AccountRecords] = account_records_query.all() if not account_records: print("No unsent email records found") return False - + # Get the IDs of the records we're processing record_ids = [record.id for record in account_records] print(f"Found {len(account_records)} unsent email records") - + # Format rows for the email template list_of_rows = [] for record in account_records: - list_of_rows.append([ - record.bank_date.strftime("%d/%m/%Y %H:%M"), - record.process_comment, - f"{record.currency_value:,.2f}", - f"{record.bank_balance:,.2f}" - ]) + list_of_rows.append( + [ + record.bank_date.strftime("%d/%m/%Y %H:%M"), + record.process_comment, + f"{record.currency_value:,.2f}", + f"{record.bank_balance:,.2f}", + ] + ) # Reverse list by date list_of_rows = list_of_rows[::-1] # Get the most recent bank balance - last_bank_balance = sorted(account_records, key=lambda x: x.bank_date, reverse=True)[0].bank_balance - + last_bank_balance = sorted( + account_records, key=lambda x: x.bank_date, reverse=True + )[0].bank_balance + # Define headers for the table - headers = ["Ulaştığı Tarih", "Banka Transaksiyonu Ek Bilgi", "Aktarım Değeri", "Banka Bakiyesi"] - + headers = [ + "Ulaştığı Tarih", + "Banka Transaksiyonu Ek Bilgi", + "Aktarım Değeri", + "Banka Bakiyesi", + ] + # Recipient email address send_to = "karatay@mehmetkaratay.com.tr" - + # Render email template html_template = render_email_template( headers=headers, @@ -130,32 +145,35 @@ def process_unsent_email_records() -> bool: balance_error=False, bank_balance=f"{last_bank_balance:,.2f}", ) - + # Send the email - if send_email_to_given_address(send_to=send_to, html_template=html_template, count_of_records=len(list_of_rows)): + if send_email_to_given_address( + send_to=send_to, + html_template=html_template, + count_of_records=len(list_of_rows), + ): # Create a new query without limit for updating update_query = AccountRecords.filter_all( - AccountRecords.id.in_(record_ids), - db=db_session + AccountRecords.id.in_(record_ids), db=db_session ).query - + # Update records as sent update_query.update({"is_email_send": True}, synchronize_session=False) AccountRecords.save(db_session) print(f"Successfully marked {len(account_records)} records as sent") return True - + print("Email sending failed, records not updated") return False except Exception as e: - print(f'Error processing unsent email records: {e}') + print(f"Error processing unsent email records: {e}") return False if __name__ == "__main__": print("Starting Email Sender Service") - + while True: try: result = process_unsent_email_records() @@ -165,7 +183,7 @@ if __name__ == "__main__": print("No emails sent in this iteration") except Exception as e: print(f"Unexpected error in main loop: {e}") - + # Sleep for 60 seconds before next check print("Sleeping for 60 seconds") time.sleep(60) diff --git a/BankServices/ServiceDepends/config.py b/BankServices/ServiceDepends/config.py index 1fd6924..c53e2a4 100644 --- a/BankServices/ServiceDepends/config.py +++ b/BankServices/ServiceDepends/config.py @@ -22,7 +22,7 @@ class Config: class EmailConfig: - EMAIL_HOST: str = os.getenv("EMAIL_HOST", "10.10.2.34") + EMAIL_HOST: str = os.getenv("EMAIL_HOST", "10.10.2.34") EMAIL_USERNAME: str = Config.EMAIL_SENDER_USERNAME EMAIL_PASSWORD: str = Config.EMAIL_PASSWORD EMAIL_PORT: int = Config.EMAIL_SEND_PORT diff --git a/BankServices/WriterService/app.py b/BankServices/WriterService/app.py index 374dec0..e94657f 100644 --- a/BankServices/WriterService/app.py +++ b/BankServices/WriterService/app.py @@ -18,28 +18,30 @@ delimiter = "|" def publish_written_data_to_redis(data: Dict[str, Any], file_name: str) -> bool: """Publish written data status to Redis. - + Args: data: Original message data from Redis file_name: Name of the processed file - + Returns: bool: Success status """ # Create a copy of the original message to preserve metadata message = data.copy() if isinstance(data, dict) else {} - + # Update stage to 'written' message["stage"] = "written" - + # Add processing timestamp message["written_at"] = str(arrow.now()) - + # Publish to Redis channel result = redis_pubsub.publisher.publish(REDIS_CHANNEL_OUT, message) - + if result.status: - print(f"[WRITER_SERVICE] Published written status for {file_name} with stage: written") + print( + f"[WRITER_SERVICE] Published written status for {file_name} with stage: written" + ) return True else: print(f"[WRITER_SERVICE] Publish error: {result.error}") @@ -48,10 +50,10 @@ def publish_written_data_to_redis(data: Dict[str, Any], file_name: str) -> bool: def write_parsed_data_to_account_records(data_dict: dict, file_name: str) -> bool: """Write parsed data to account records database. - + Args: data_dict: Parsed data dictionary - + Returns: bool: True if record was created or already exists, False on error """ @@ -61,8 +63,8 @@ def write_parsed_data_to_account_records(data_dict: dict, file_name: str) -> boo data_dict["bank_balance"] = data_dict.pop("balance") data_dict["import_file_name"] = file_name data_dict = BankReceive(**data_dict).model_dump() - print('data_dict', data_dict) - + print("data_dict", data_dict) + # Process date fields bank_date = arrow.get(str(data_dict["bank_date"])) data_dict["bank_date_w"] = bank_date.weekday() @@ -70,7 +72,7 @@ def write_parsed_data_to_account_records(data_dict: dict, file_name: str) -> boo data_dict["bank_date_d"] = bank_date.day data_dict["bank_date_y"] = bank_date.year data_dict["bank_date"] = str(bank_date) - + # Add build information if available if build_iban := BuildIbans.filter_by_one( iban=data_dict["iban"], db=db_session @@ -81,7 +83,7 @@ def write_parsed_data_to_account_records(data_dict: dict, file_name: str) -> boo "build_uu_id": build_iban.build_uu_id, } ) - + # Create new record or find existing one using specific fields for matching new_account_record = AccountRecords.find_or_create( db=db_session, @@ -90,16 +92,20 @@ def write_parsed_data_to_account_records(data_dict: dict, file_name: str) -> boo AccountRecords.bank_date, AccountRecords.iban, AccountRecords.bank_reference_code, - AccountRecords.bank_balance - ] + AccountRecords.bank_balance, + ], ) if new_account_record.meta_data.created: new_account_record.is_confirmed = True new_account_record.save(db=db_session) - print(f"[WRITER_SERVICE] Created new record in database: {new_account_record.id}") + print( + f"[WRITER_SERVICE] Created new record in database: {new_account_record.id}" + ) return True else: - print(f"[WRITER_SERVICE] Record already exists in database: {new_account_record.id}") + print( + f"[WRITER_SERVICE] Record already exists in database: {new_account_record.id}" + ) return False except Exception as e: print(f"[WRITER_SERVICE] Error writing to database: {str(e)}") @@ -108,13 +114,13 @@ def write_parsed_data_to_account_records(data_dict: dict, file_name: str) -> boo def process_message(message): """Process a message from Redis. - + Args: message: Message data from Redis subscriber """ # Extract the message data data = message["data"] - + # If data is a string, parse it as JSON if isinstance(data, str): try: @@ -122,51 +128,55 @@ def process_message(message): except json.JSONDecodeError as e: print(f"[WRITER_SERVICE] Error parsing message data: {e}") return - + # Check if stage is 'parsed' before processing if data.get("stage") == "parsed": try: file_name = data.get("filename") parsed_data = data.get("parsed") - + print(f"[WRITER_SERVICE] Processing file: {file_name}") - + if not parsed_data: print(f"[WRITER_SERVICE] No parsed data found for {file_name}") return - + # Process each parsed data item success = True for item in parsed_data: - result = write_parsed_data_to_account_records(data_dict=item, file_name=file_name) + result = write_parsed_data_to_account_records( + data_dict=item, file_name=file_name + ) if not result: success = False - + # Publish status update to Redis if all records were processed if success: publish_written_data_to_redis(data=data, file_name=file_name) except Exception as e: print(f"[WRITER_SERVICE] Error processing message: {str(e)}") else: - print(f"[WRITER_SERVICE] Skipped message with UUID: {data.get('uuid')} (stage is not 'parsed')") + print( + f"[WRITER_SERVICE] Skipped message with UUID: {data.get('uuid')} (stage is not 'parsed')" + ) def app(): """Main application function.""" print("[WRITER_SERVICE] Starting Writer Service") - + # Subscribe to the input channel result = redis_pubsub.subscriber.subscribe(REDIS_CHANNEL_IN, process_message) - + if result.status: print(f"[WRITER_SERVICE] Subscribed to channel: {REDIS_CHANNEL_IN}") else: print(f"[WRITER_SERVICE] Subscribe error: {result.error}") return - + # Start listening for messages listen_result = redis_pubsub.subscriber.start_listening(in_thread=True) - + if listen_result.status: print("[WRITER_SERVICE] Listening for messages") else: @@ -177,7 +187,7 @@ def app(): if __name__ == "__main__": # Initialize the app once app() - + # Keep the main thread alive try: while True: diff --git a/Controllers/Mongo/database.py b/Controllers/Mongo/database.py index 1b42c99..95fbe8c 100644 --- a/Controllers/Mongo/database.py +++ b/Controllers/Mongo/database.py @@ -63,7 +63,7 @@ class MongoDBHandler: if not hasattr(self, "_initialized") or not self._initialized: self._debug_mode = debug_mode self._mock_mode = mock_mode - + if mock_mode: # In mock mode, we don't need a real connection string self.uri = "mongodb://mock:27017/mockdb" @@ -76,7 +76,7 @@ class MongoDBHandler: # Use the configured connection string with authentication self.uri = mongo_configs.url print(f"Connecting to MongoDB: {self.uri}") - + # Define MongoDB client options with increased timeouts for better reliability self.client_options = { "maxPoolSize": 5, @@ -132,11 +132,13 @@ class CollectionContext: # If we're in mock mode, return a mock collection immediately if self.db_handler._mock_mode: return self._create_mock_collection() - + try: # Create a new client connection - self.client = MongoClient(self.db_handler.uri, **self.db_handler.client_options) - + self.client = MongoClient( + self.db_handler.uri, **self.db_handler.client_options + ) + if self.db_handler._debug_mode: # In debug mode, we explicitly use the configured DB db_name = mongo_configs.DB @@ -148,7 +150,7 @@ class CollectionContext: except Exception: db_name = mongo_configs.DB print(f"Using fallback database '{db_name}'") - + self.collection = self.client[db_name][self.collection_name] # Enhance collection methods with retry capabilities @@ -159,13 +161,17 @@ class CollectionContext: if "Authentication failed" in str(e): print(f"MongoDB authentication error: {e}") print("Attempting to reconnect with direct connection...") - + try: # Try a direct connection without authentication for testing direct_uri = f"mongodb://{mongo_configs.HOST}:{mongo_configs.PORT}/{mongo_configs.DB}" print(f"Trying direct connection: {direct_uri}") - self.client = MongoClient(direct_uri, **self.db_handler.client_options) - self.collection = self.client[mongo_configs.DB][self.collection_name] + self.client = MongoClient( + direct_uri, **self.db_handler.client_options + ) + self.collection = self.client[mongo_configs.DB][ + self.collection_name + ] self._add_retry_capabilities() return self.collection except Exception as inner_e: @@ -181,54 +187,56 @@ class CollectionContext: if self.client: self.client.close() self.client = None - + return self._create_mock_collection() - + def _create_mock_collection(self): """ Create a mock collection for testing or graceful degradation. This prevents the application from crashing when MongoDB is unavailable. - + Returns: A mock MongoDB collection with simulated behaviors """ from unittest.mock import MagicMock - + if self.db_handler._mock_mode: print(f"MOCK MODE: Using mock collection '{self.collection_name}'") else: - print(f"Using mock MongoDB collection '{self.collection_name}' for graceful degradation") - + print( + f"Using mock MongoDB collection '{self.collection_name}' for graceful degradation" + ) + # Create in-memory storage for this mock collection - if not hasattr(self.db_handler, '_mock_storage'): + if not hasattr(self.db_handler, "_mock_storage"): self.db_handler._mock_storage = {} - + if self.collection_name not in self.db_handler._mock_storage: self.db_handler._mock_storage[self.collection_name] = [] - + mock_collection = MagicMock() mock_data = self.db_handler._mock_storage[self.collection_name] - + # Define behavior for find operations def mock_find(query=None, *args, **kwargs): # Simple implementation that returns all documents return mock_data - + def mock_find_one(query=None, *args, **kwargs): # Simple implementation that returns the first matching document if not mock_data: return None return mock_data[0] - + def mock_insert_one(document, *args, **kwargs): # Add _id if not present - if '_id' not in document: - document['_id'] = f"mock_id_{len(mock_data)}" + if "_id" not in document: + document["_id"] = f"mock_id_{len(mock_data)}" mock_data.append(document) result = MagicMock() - result.inserted_id = document['_id'] + result.inserted_id = document["_id"] return result - + def mock_insert_many(documents, *args, **kwargs): inserted_ids = [] for doc in documents: @@ -237,40 +245,40 @@ class CollectionContext: result = MagicMock() result.inserted_ids = inserted_ids return result - + def mock_update_one(query, update, *args, **kwargs): result = MagicMock() result.modified_count = 1 return result - + def mock_update_many(query, update, *args, **kwargs): result = MagicMock() result.modified_count = len(mock_data) return result - + def mock_delete_one(query, *args, **kwargs): result = MagicMock() result.deleted_count = 1 if mock_data: mock_data.pop(0) # Just remove the first item for simplicity return result - + def mock_delete_many(query, *args, **kwargs): count = len(mock_data) mock_data.clear() result = MagicMock() result.deleted_count = count return result - + def mock_count_documents(query, *args, **kwargs): return len(mock_data) - + def mock_aggregate(pipeline, *args, **kwargs): return [] - + def mock_create_index(keys, **kwargs): return f"mock_index_{keys}" - + # Assign the mock implementations mock_collection.find.side_effect = mock_find mock_collection.find_one.side_effect = mock_find_one @@ -283,10 +291,10 @@ class CollectionContext: mock_collection.count_documents.side_effect = mock_count_documents mock_collection.aggregate.side_effect = mock_aggregate mock_collection.create_index.side_effect = mock_create_index - + # Add retry capabilities to the mock collection self._add_retry_capabilities_to_mock(mock_collection) - + self.collection = mock_collection return self.collection @@ -322,17 +330,25 @@ class CollectionContext: """ Add retry capabilities to mock collection methods. This is a simplified version that just wraps the mock methods. - + Args: mock_collection: The mock collection to enhance """ # List of common MongoDB collection methods to add retry capabilities to methods = [ - 'insert_one', 'insert_many', 'find_one', 'find', - 'update_one', 'update_many', 'delete_one', 'delete_many', - 'replace_one', 'count_documents', 'aggregate' + "insert_one", + "insert_many", + "find_one", + "find", + "update_one", + "update_many", + "delete_one", + "delete_many", + "replace_one", + "count_documents", + "aggregate", ] - + # Add retry decorator to each method for method_name in methods: if hasattr(mock_collection, method_name): @@ -340,7 +356,7 @@ class CollectionContext: setattr( mock_collection, method_name, - retry_operation(max_retries=1, retry_interval=0)(original_method) + retry_operation(max_retries=1, retry_interval=0)(original_method), ) def __exit__(self, exc_type, exc_val, exc_tb): diff --git a/Controllers/Mongo/implementations.py b/Controllers/Mongo/implementations.py index 596a604..7e29752 100644 --- a/Controllers/Mongo/implementations.py +++ b/Controllers/Mongo/implementations.py @@ -22,7 +22,7 @@ def test_basic_crud_operations(): # First, clear any existing data users_collection.delete_many({}) print("Cleared existing data") - + # Insert multiple documents insert_result = users_collection.insert_many( [ @@ -58,7 +58,7 @@ def test_basic_crud_operations(): condition2 = admin_users and admin_users[0].get("username") == "jane" condition3 = update_result.modified_count == 2 condition4 = delete_result.deleted_count == 1 - + print(f"Condition 1 (admin count): {condition1}") print(f"Condition 2 (admin is jane): {condition2}") print(f"Condition 3 (updated 2 users): {condition3}") @@ -80,7 +80,7 @@ def test_nested_documents(): # Clear any existing data products_collection.delete_many({}) print("Cleared existing data") - + # Insert a product with nested data insert_result = products_collection.insert_one( { @@ -110,15 +110,17 @@ def test_nested_documents(): print(f"Found updated laptop: {updated_laptop is not None}") if updated_laptop: print(f"Updated laptop specs: {updated_laptop.get('specs')}") - if 'specs' in updated_laptop: + if "specs" in updated_laptop: print(f"Updated RAM: {updated_laptop['specs'].get('ram')}") # Check each condition separately condition1 = laptop is not None - condition2 = laptop and laptop.get('specs', {}).get('ram') == "16GB" + condition2 = laptop and laptop.get("specs", {}).get("ram") == "16GB" condition3 = update_result.modified_count == 1 - condition4 = updated_laptop and updated_laptop.get('specs', {}).get('ram') == "32GB" - + condition4 = ( + updated_laptop and updated_laptop.get("specs", {}).get("ram") == "32GB" + ) + print(f"Condition 1 (laptop found): {condition1}") print(f"Condition 2 (original RAM is 16GB): {condition2}") print(f"Condition 3 (update modified 1 doc): {condition3}") @@ -140,7 +142,7 @@ def test_array_operations(): # Clear any existing data orders_collection.delete_many({}) print("Cleared existing data") - + # Insert an order with array of items insert_result = orders_collection.insert_one( { @@ -170,10 +172,12 @@ def test_array_operations(): # Verify the update updated_order = orders_collection.find_one({"order_id": "ORD001"}) print(f"Found updated order: {updated_order is not None}") - + if updated_order: - print(f"Number of items in order: {len(updated_order.get('items', []))}") - items = updated_order.get('items', []) + print( + f"Number of items in order: {len(updated_order.get('items', []))}" + ) + items = updated_order.get("items", []) if items: last_item = items[-1] if items else None print(f"Last item in order: {last_item}") @@ -181,9 +185,13 @@ def test_array_operations(): # Check each condition separately condition1 = len(laptop_orders) == 1 condition2 = update_result.modified_count == 1 - condition3 = updated_order and len(updated_order.get('items', [])) == 3 - condition4 = updated_order and updated_order.get('items', []) and updated_order['items'][-1].get('product') == "Keyboard" - + condition3 = updated_order and len(updated_order.get("items", [])) == 3 + condition4 = ( + updated_order + and updated_order.get("items", []) + and updated_order["items"][-1].get("product") == "Keyboard" + ) + print(f"Condition 1 (found 1 laptop order): {condition1}") print(f"Condition 2 (update modified 1 doc): {condition2}") print(f"Condition 3 (order has 3 items): {condition3}") @@ -205,7 +213,7 @@ def test_aggregation(): # Clear any existing data sales_collection.delete_many({}) print("Cleared existing data") - + # Insert sample sales data insert_result = sales_collection.insert_many( [ @@ -219,13 +227,13 @@ def test_aggregation(): # Calculate total sales by product - use a simpler aggregation pipeline pipeline = [ {"$match": {}}, # Match all documents - {"$group": {"_id": "$product", "total": {"$sum": "$amount"}}} + {"$group": {"_id": "$product", "total": {"$sum": "$amount"}}}, ] - + # Execute the aggregation sales_summary = list(sales_collection.aggregate(pipeline)) print(f"Aggregation returned {len(sales_summary)} results") - + # Print the results for debugging for item in sales_summary: print(f"Product: {item.get('_id')}, Total: {item.get('total')}") @@ -233,7 +241,8 @@ def test_aggregation(): # Check each condition separately condition1 = len(sales_summary) == 3 condition2 = any( - item.get("_id") == "Laptop" and abs(item.get("total", 0) - 999.99) < 0.01 + item.get("_id") == "Laptop" + and abs(item.get("total", 0) - 999.99) < 0.01 for item in sales_summary ) condition3 = any( @@ -241,10 +250,11 @@ def test_aggregation(): for item in sales_summary ) condition4 = any( - item.get("_id") == "Keyboard" and abs(item.get("total", 0) - 59.99) < 0.01 + item.get("_id") == "Keyboard" + and abs(item.get("total", 0) - 59.99) < 0.01 for item in sales_summary ) - + print(f"Condition 1 (3 summary items): {condition1}") print(f"Condition 2 (laptop total correct): {condition2}") print(f"Condition 3 (mouse total correct): {condition3}") @@ -325,35 +335,37 @@ def test_complex_queries(): # Update with multiple conditions - split into separate operations for better compatibility # First set the discount products_collection.update_many( - {"price": {"$lt": 100}, "in_stock": True}, - {"$set": {"discount": 0.1}} + {"price": {"$lt": 100}, "in_stock": True}, {"$set": {"discount": 0.1}} ) - + # Then update the price update_result = products_collection.update_many( - {"price": {"$lt": 100}, "in_stock": True}, - {"$inc": {"price": -10}} + {"price": {"$lt": 100}, "in_stock": True}, {"$inc": {"price": -10}} ) # Verify the update updated_product = products_collection.find_one({"name": "Cheap Mouse"}) - + # Print debug information print(f"Found expensive electronics: {len(expensive_electronics)}") if expensive_electronics: - print(f"First expensive product: {expensive_electronics[0].get('name')}") + print( + f"First expensive product: {expensive_electronics[0].get('name')}" + ) print(f"Modified count: {update_result.modified_count}") if updated_product: print(f"Updated product price: {updated_product.get('price')}") print(f"Updated product discount: {updated_product.get('discount')}") - + # More flexible verification with approximate float comparison success = ( len(expensive_electronics) >= 1 - and expensive_electronics[0].get("name") in ["Expensive Laptop", "Laptop"] + and expensive_electronics[0].get("name") + in ["Expensive Laptop", "Laptop"] and update_result.modified_count >= 1 and updated_product is not None - and updated_product.get("discount", 0) > 0 # Just check that discount exists and is positive + and updated_product.get("discount", 0) + > 0 # Just check that discount exists and is positive ) print(f"Test {'passed' if success else 'failed'}") return success @@ -368,48 +380,51 @@ def run_concurrent_operation_test(num_threads=100): import time import uuid from concurrent.futures import ThreadPoolExecutor - + print(f"\nStarting concurrent operation test with {num_threads} threads...") - + # Results tracking results = {"passed": 0, "failed": 0, "errors": []} results_lock = threading.Lock() - + def worker(thread_id): # Create a unique collection name for this thread collection_name = f"concurrent_test_{thread_id}" - + try: # Generate unique data for this thread unique_id = str(uuid.uuid4()) - + with mongo_handler.collection(collection_name) as collection: # Insert a document - collection.insert_one({ - "thread_id": thread_id, - "uuid": unique_id, - "timestamp": time.time() - }) - + collection.insert_one( + { + "thread_id": thread_id, + "uuid": unique_id, + "timestamp": time.time(), + } + ) + # Find the document doc = collection.find_one({"thread_id": thread_id}) - + # Update the document collection.update_one( - {"thread_id": thread_id}, - {"$set": {"updated": True}} + {"thread_id": thread_id}, {"$set": {"updated": True}} ) - + # Verify update updated_doc = collection.find_one({"thread_id": thread_id}) - + # Clean up collection.delete_many({"thread_id": thread_id}) - - success = (doc is not None and - updated_doc is not None and - updated_doc.get("updated") is True) - + + success = ( + doc is not None + and updated_doc is not None + and updated_doc.get("updated") is True + ) + # Update results with thread safety with results_lock: if success: @@ -421,15 +436,15 @@ def run_concurrent_operation_test(num_threads=100): with results_lock: results["failed"] += 1 results["errors"].append(f"Thread {thread_id} exception: {str(e)}") - + # Create and start threads using a thread pool start_time = time.time() with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [executor.submit(worker, i) for i in range(num_threads)] - + # Calculate execution time execution_time = time.time() - start_time - + # Print results print(f"\nConcurrent Operation Test Results:") print(f"Total threads: {num_threads}") @@ -437,14 +452,16 @@ def run_concurrent_operation_test(num_threads=100): print(f"Failed: {results['failed']}") print(f"Execution time: {execution_time:.2f} seconds") print(f"Operations per second: {num_threads / execution_time:.2f}") - + if results["failed"] > 0: print("\nErrors:") - for error in results["errors"][:10]: # Show only first 10 errors to avoid flooding output + for error in results["errors"][ + :10 + ]: # Show only first 10 errors to avoid flooding output print(f"- {error}") if len(results["errors"]) > 10: print(f"- ... and {len(results['errors']) - 10} more errors") - + return results["failed"] == 0 @@ -493,10 +510,10 @@ def run_all_tests(): if __name__ == "__main__": mongo_handler = MongoDBHandler() - + # Run standard tests first passed, failed = run_all_tests() - + # If all tests pass, run the concurrent operation test if failed == 0: run_concurrent_operation_test(10000) diff --git a/Controllers/Mongo/local_test.py b/Controllers/Mongo/local_test.py index 9735201..3ab62a6 100644 --- a/Controllers/Mongo/local_test.py +++ b/Controllers/Mongo/local_test.py @@ -1,14 +1,16 @@ """ Test script for MongoDB handler with a local MongoDB instance. """ + import os from Controllers.Mongo.database import MongoDBHandler, CollectionContext from datetime import datetime + # Create a custom handler class for local testing class LocalMongoDBHandler(MongoDBHandler): """A MongoDB handler for local testing without authentication.""" - + def __init__(self): """Initialize with a direct MongoDB URI.""" self._initialized = False @@ -22,6 +24,7 @@ class LocalMongoDBHandler(MongoDBHandler): } self._initialized = True + # Create a custom handler for local testing def create_local_handler(): """Create a MongoDB handler for local testing.""" @@ -29,35 +32,36 @@ def create_local_handler(): handler = LocalMongoDBHandler() return handler + def test_connection_monitoring(): """Test connection monitoring with the MongoDB handler.""" print("\nTesting connection monitoring...") - + # Create a local handler local_handler = create_local_handler() - + # Add connection tracking to the handler local_handler._open_connections = 0 - + # Modify the CollectionContext class to track connections original_enter = CollectionContext.__enter__ original_exit = CollectionContext.__exit__ - + def tracked_enter(self): result = original_enter(self) self.db_handler._open_connections += 1 print(f"Connection opened. Total open: {self.db_handler._open_connections}") return result - + def tracked_exit(self, exc_type, exc_val, exc_tb): self.db_handler._open_connections -= 1 print(f"Connection closed. Total open: {self.db_handler._open_connections}") return original_exit(self, exc_type, exc_val, exc_tb) - + # Apply the tracking methods CollectionContext.__enter__ = tracked_enter CollectionContext.__exit__ = tracked_exit - + try: # Test with multiple operations for i in range(3): @@ -72,18 +76,19 @@ def test_connection_monitoring(): print(f"Operation failed: {e}") except Exception as e: print(f"Connection failed: {e}") - + # Final connection count print(f"\nFinal open connections: {local_handler._open_connections}") if local_handler._open_connections == 0: print("✅ All connections were properly closed") else: print(f"❌ {local_handler._open_connections} connections remain open") - + finally: # Restore original methods CollectionContext.__enter__ = original_enter CollectionContext.__exit__ = original_exit + if __name__ == "__main__": test_connection_monitoring() diff --git a/Controllers/Postgres/config.py b/Controllers/Postgres/config.py index ddea49d..e3b3772 100644 --- a/Controllers/Postgres/config.py +++ b/Controllers/Postgres/config.py @@ -38,4 +38,4 @@ class Configs(BaseSettings): # singleton instance of the POSTGRESQL configuration settings postgres_configs = Configs() -print('url', postgres_configs.url) +print("url", postgres_configs.url) diff --git a/Controllers/Postgres/crud.py b/Controllers/Postgres/crud.py index 0ea5499..686e4f0 100644 --- a/Controllers/Postgres/crud.py +++ b/Controllers/Postgres/crud.py @@ -240,10 +240,10 @@ class CRUDModel: exclude_args = exclude_args or [] exclude_args = [exclude_arg.key for exclude_arg in exclude_args] - + include_args = include_args or [] include_args = [include_arg.key for include_arg in include_args] - + # If include_args is provided, only use those fields for matching # Otherwise, use all fields except those in exclude_args for key, value in kwargs.items(): diff --git a/Controllers/Postgres/implementations.py b/Controllers/Postgres/implementations.py index a1df328..1b9f7eb 100644 --- a/Controllers/Postgres/implementations.py +++ b/Controllers/Postgres/implementations.py @@ -478,46 +478,48 @@ def run_simple_concurrent_test(num_threads=10): import time import random from concurrent.futures import ThreadPoolExecutor - + print(f"\nStarting simple concurrent test with {num_threads} threads...") - + # Results tracking results = {"passed": 0, "failed": 0, "errors": []} results_lock = threading.Lock() - + def worker(thread_id): try: # Simple query to test connection pooling with EndpointRestriction.new_session() as db_session: # Just run a simple count query count_query = db_session.query(EndpointRestriction).count() - + # Small delay to simulate work time.sleep(random.uniform(0.01, 0.05)) - + # Simple success criteria success = count_query >= 0 - + # Update results with thread safety with results_lock: if success: results["passed"] += 1 else: results["failed"] += 1 - results["errors"].append(f"Thread {thread_id} failed to get count") + results["errors"].append( + f"Thread {thread_id} failed to get count" + ) except Exception as e: with results_lock: results["failed"] += 1 results["errors"].append(f"Thread {thread_id} exception: {str(e)}") - + # Create and start threads using a thread pool start_time = time.time() with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [executor.submit(worker, i) for i in range(num_threads)] - + # Calculate execution time execution_time = time.time() - start_time - + # Print results print(f"\nConcurrent Operation Test Results:") print(f"Total threads: {num_threads}") @@ -525,21 +527,23 @@ def run_simple_concurrent_test(num_threads=10): print(f"Failed: {results['failed']}") print(f"Execution time: {execution_time:.2f} seconds") print(f"Operations per second: {num_threads / execution_time:.2f}") - + if results["failed"] > 0: print("\nErrors:") - for error in results["errors"][:10]: # Show only first 10 errors to avoid flooding output + for error in results["errors"][ + :10 + ]: # Show only first 10 errors to avoid flooding output print(f"- {error}") if len(results["errors"]) > 10: print(f"- ... and {len(results['errors']) - 10} more errors") - + return results["failed"] == 0 if __name__ == "__main__": generate_table_in_postgres() passed, failed = run_all_tests() - + # If all tests pass, run the simple concurrent test if failed == 0: run_simple_concurrent_test(100) diff --git a/Controllers/Redis/Broadcast/actions.py b/Controllers/Redis/Broadcast/actions.py index 50fc43f..756e9f3 100644 --- a/Controllers/Redis/Broadcast/actions.py +++ b/Controllers/Redis/Broadcast/actions.py @@ -8,17 +8,17 @@ from Controllers.Redis.response import RedisResponse class RedisPublisher: """Redis Publisher class for broadcasting messages to channels.""" - + def __init__(self, redis_client=redis_cli): self.redis_client = redis_client - + def publish(self, channel: str, message: Union[Dict, List, str]) -> RedisResponse: """Publish a message to a Redis channel. - + Args: channel: The channel to publish to message: The message to publish (will be JSON serialized if dict or list) - + Returns: RedisResponse with status and message """ @@ -26,113 +26,124 @@ class RedisPublisher: # Convert dict/list to JSON string if needed if isinstance(message, (dict, list)): message = json.dumps(message) - + # Publish the message recipient_count = self.redis_client.publish(channel, message) - + return RedisResponse( status=True, message=f"Message published successfully to {channel}.", - data={"recipients": recipient_count} + data={"recipients": recipient_count}, ) except Exception as e: return RedisResponse( status=False, message=f"Failed to publish message to {channel}.", - error=str(e) + error=str(e), ) class RedisSubscriber: """Redis Subscriber class for listening to channels.""" - + def __init__(self, redis_client=redis_cli): self.redis_client = redis_client self.pubsub = self.redis_client.pubsub() self.active_threads = {} - - def subscribe(self, channel: str, callback: Callable[[Dict], None]) -> RedisResponse: + + def subscribe( + self, channel: str, callback: Callable[[Dict], None] + ) -> RedisResponse: """Subscribe to a Redis channel with a callback function. - + Args: channel: The channel to subscribe to callback: Function to call when a message is received - + Returns: RedisResponse with status and message """ try: # Subscribe to the channel self.pubsub.subscribe(**{channel: self._message_handler(callback)}) - + return RedisResponse( - status=True, - message=f"Successfully subscribed to {channel}." + status=True, message=f"Successfully subscribed to {channel}." ) except Exception as e: return RedisResponse( - status=False, - message=f"Failed to subscribe to {channel}.", - error=str(e) + status=False, message=f"Failed to subscribe to {channel}.", error=str(e) ) - - def psubscribe(self, pattern: str, callback: Callable[[Dict], None]) -> RedisResponse: + + def psubscribe( + self, pattern: str, callback: Callable[[Dict], None] + ) -> RedisResponse: """Subscribe to Redis channels matching a pattern. - + Args: pattern: The pattern to subscribe to (e.g., 'user.*') callback: Function to call when a message is received - + Returns: RedisResponse with status and message """ try: # Subscribe to the pattern self.pubsub.psubscribe(**{pattern: self._message_handler(callback)}) - + return RedisResponse( - status=True, - message=f"Successfully pattern-subscribed to {pattern}." + status=True, message=f"Successfully pattern-subscribed to {pattern}." ) except Exception as e: return RedisResponse( status=False, message=f"Failed to pattern-subscribe to {pattern}.", - error=str(e) + error=str(e), ) - + def _message_handler(self, callback: Callable[[Dict], None]): """Create a message handler function for the subscription.""" + def handler(message): # Skip subscription confirmation messages - if message['type'] in ('subscribe', 'psubscribe'): + if message["type"] in ("subscribe", "psubscribe"): return - + # Parse JSON if the message is a JSON string - data = message['data'] + data = message["data"] if isinstance(data, bytes): - data = data.decode('utf-8') + data = data.decode("utf-8") try: data = json.loads(data) except json.JSONDecodeError: # Not JSON, keep as is pass - + # Call the callback with the message data - callback({ - 'channel': message.get('channel', b'').decode('utf-8') if isinstance(message.get('channel', b''), bytes) else message.get('channel', ''), - 'pattern': message.get('pattern', b'').decode('utf-8') if isinstance(message.get('pattern', b''), bytes) else message.get('pattern', ''), - 'data': data - }) - + callback( + { + "channel": ( + message.get("channel", b"").decode("utf-8") + if isinstance(message.get("channel", b""), bytes) + else message.get("channel", "") + ), + "pattern": ( + message.get("pattern", b"").decode("utf-8") + if isinstance(message.get("pattern", b""), bytes) + else message.get("pattern", "") + ), + "data": data, + } + ) + return handler - + def start_listening(self, in_thread: bool = True) -> RedisResponse: """Start listening for messages on subscribed channels. - + Args: in_thread: If True, start listening in a separate thread - + Returns: RedisResponse with status and message """ @@ -140,50 +151,41 @@ class RedisSubscriber: if in_thread: thread = Thread(target=self._listen_thread, daemon=True) thread.start() - self.active_threads['listener'] = thread + self.active_threads["listener"] = thread return RedisResponse( - status=True, - message="Listening thread started successfully." + status=True, message="Listening thread started successfully." ) else: # This will block the current thread self._listen_thread() return RedisResponse( - status=True, - message="Listening started successfully (blocking)." + status=True, message="Listening started successfully (blocking)." ) except Exception as e: return RedisResponse( - status=False, - message="Failed to start listening.", - error=str(e) + status=False, message="Failed to start listening.", error=str(e) ) - + def _listen_thread(self): """Thread function for listening to messages.""" self.pubsub.run_in_thread(sleep_time=0.01) - + def stop_listening(self) -> RedisResponse: """Stop listening for messages.""" try: self.pubsub.close() - return RedisResponse( - status=True, - message="Successfully stopped listening." - ) + return RedisResponse(status=True, message="Successfully stopped listening.") except Exception as e: return RedisResponse( - status=False, - message="Failed to stop listening.", - error=str(e) + status=False, message="Failed to stop listening.", error=str(e) ) - + def unsubscribe(self, channel: Optional[str] = None) -> RedisResponse: """Unsubscribe from a channel or all channels. - + Args: channel: The channel to unsubscribe from, or None for all channels - + Returns: RedisResponse with status and message """ @@ -194,24 +196,21 @@ class RedisSubscriber: else: self.pubsub.unsubscribe() message = "Successfully unsubscribed from all channels." - - return RedisResponse( - status=True, - message=message - ) + + return RedisResponse(status=True, message=message) except Exception as e: return RedisResponse( status=False, message=f"Failed to unsubscribe from {'channel' if channel else 'all channels'}.", - error=str(e) + error=str(e), ) - + def punsubscribe(self, pattern: Optional[str] = None) -> RedisResponse: """Unsubscribe from a pattern or all patterns. - + Args: pattern: The pattern to unsubscribe from, or None for all patterns - + Returns: RedisResponse with status and message """ @@ -222,24 +221,21 @@ class RedisSubscriber: else: self.pubsub.punsubscribe() message = "Successfully unsubscribed from all patterns." - - return RedisResponse( - status=True, - message=message - ) + + return RedisResponse(status=True, message=message) except Exception as e: return RedisResponse( status=False, message=f"Failed to unsubscribe from {'pattern' if pattern else 'all patterns'}.", - error=str(e) + error=str(e), ) class RedisPubSub: """Singleton class that provides both publisher and subscriber functionality.""" - + _instance = None - + def __new__(cls): if cls._instance is None: cls._instance = super(RedisPubSub, cls).__new__(cls) diff --git a/Controllers/Redis/Broadcast/implementations.py b/Controllers/Redis/Broadcast/implementations.py index f1fc236..34a336f 100644 --- a/Controllers/Redis/Broadcast/implementations.py +++ b/Controllers/Redis/Broadcast/implementations.py @@ -15,6 +15,7 @@ CHANNEL_WRITER = "chain:writer" # Flag to control the demo running = True + def generate_mock_data(): """Generate a mock message with UUID, timestamp, and sample data.""" return { @@ -24,40 +25,43 @@ def generate_mock_data(): "data": { "value": f"Sample data {int(time.time())}", "status": "new", - "counter": 0 - } + "counter": 0, + }, } + def reader_function(): """ First function in the chain. Generates mock data and publishes to the reader channel. """ print("[READER] Function started") - + while running: # Generate mock data message = generate_mock_data() start_time = time.time() message["start_time"] = start_time - + # Publish to reader channel result = redis_pubsub.publisher.publish(CHANNEL_READER, message) - + if result.status: print(f"[READER] {time.time():.6f} | Published UUID: {message['uuid']}") else: print(f"[READER] Publish error: {result.error}") - + # Wait before generating next message time.sleep(2) + def processor_function(): """ Second function in the chain. Subscribes to reader channel, processes messages, and publishes to processor channel. """ print("[PROCESSOR] Function started") + def on_reader_message(message): # The message structure from the subscriber has 'data' containing our actual message # If data is a string, parse it as JSON @@ -68,47 +72,51 @@ def processor_function(): except json.JSONDecodeError as e: print(f"[PROCESSOR] Error parsing message data: {e}") return - + # Check if stage is 'red' before processing if data.get("stage") == "red": # Process the message data["processor_timestamp"] = datetime.now().isoformat() data["data"]["status"] = "processed" data["data"]["counter"] += 1 - + # Update stage to 'processed' data["stage"] = "processed" - + # Add some processing metadata data["processing"] = { "duration_ms": 150, # Mock processing time - "processor_id": "main-processor" + "processor_id": "main-processor", } - + # Publish to processor channel result = redis_pubsub.publisher.publish(CHANNEL_PROCESSOR, data) - + if result.status: - print(f"[PROCESSOR] {time.time():.6f} | Received UUID: {data['uuid']} | Published UUID: {data['uuid']}") + print( + f"[PROCESSOR] {time.time():.6f} | Received UUID: {data['uuid']} | Published UUID: {data['uuid']}" + ) else: print(f"[PROCESSOR] Publish error: {result.error}") else: print(f"[PROCESSOR] Skipped message: {data['uuid']} (stage is not 'red')") - + # Subscribe to reader channel result = redis_pubsub.subscriber.subscribe(CHANNEL_READER, on_reader_message) - + if result.status: print(f"[PROCESSOR] Subscribed to channel: {CHANNEL_READER}") else: print(f"[PROCESSOR] Subscribe error: {result.error}") + def writer_function(): """ Third function in the chain. Subscribes to processor channel and performs final processing. """ print("[WRITER] Function started") + def on_processor_message(message): # The message structure from the subscriber has 'data' containing our actual message # If data is a string, parse it as JSON @@ -119,42 +127,45 @@ def writer_function(): except json.JSONDecodeError as e: print(f"[WRITER] Error parsing message data: {e}") return - + # Check if stage is 'processed' before processing if data.get("stage") == "processed": # Process the message data["writer_timestamp"] = datetime.now().isoformat() data["data"]["status"] = "completed" data["data"]["counter"] += 1 - + # Update stage to 'completed' data["stage"] = "completed" - + # Add some writer metadata - data["storage"] = { - "location": "main-db", - "partition": "events-2025-04" - } - + data["storage"] = {"location": "main-db", "partition": "events-2025-04"} + # Calculate elapsed time if start_time is available current_time = time.time() elapsed_ms = "" if "start_time" in data: - elapsed_ms = f" | Elapsed: {(current_time - data['start_time']) * 1000:.2f}ms" - + elapsed_ms = ( + f" | Elapsed: {(current_time - data['start_time']) * 1000:.2f}ms" + ) + # Optionally publish to writer channel for any downstream listeners result = redis_pubsub.publisher.publish(CHANNEL_WRITER, data) - + if result.status: - print(f"[WRITER] {current_time:.6f} | Received UUID: {data['uuid']} | Published UUID: {data['uuid']}{elapsed_ms}") + print( + f"[WRITER] {current_time:.6f} | Received UUID: {data['uuid']} | Published UUID: {data['uuid']}{elapsed_ms}" + ) else: print(f"[WRITER] Publish error: {result.error}") else: - print(f"[WRITER] Skipped message: {data['uuid']} (stage is not 'processed')") - + print( + f"[WRITER] Skipped message: {data['uuid']} (stage is not 'processed')" + ) + # Subscribe to processor channel result = redis_pubsub.subscriber.subscribe(CHANNEL_PROCESSOR, on_processor_message) - + if result.status: print(f"[WRITER] Subscribed to channel: {CHANNEL_PROCESSOR}") else: @@ -167,18 +178,18 @@ def run_demo(): print("Chain: READER → PROCESSOR → WRITER") print(f"Channels: {CHANNEL_READER} → {CHANNEL_PROCESSOR} → {CHANNEL_WRITER}") print("Format: [SERVICE] TIMESTAMP | Received/Published UUID | [Elapsed time]") - + # Start the Redis subscriber listening thread redis_pubsub.subscriber.start_listening() - + # Start processor and writer functions (these subscribe to channels) processor_function() writer_function() - + # Create a thread for the reader function (this publishes messages) reader_thread = Thread(target=reader_function, daemon=True) reader_thread.start() - + # Keep the main thread alive try: while True: diff --git a/Controllers/Redis/connection.py b/Controllers/Redis/connection.py index 73b974b..7cb48f8 100644 --- a/Controllers/Redis/connection.py +++ b/Controllers/Redis/connection.py @@ -41,15 +41,15 @@ class RedisConn: # Add connection pooling settings if not provided if "max_connections" not in self.config: self.config["max_connections"] = 50 # Increased for better concurrency - + # Add connection timeout settings if "health_check_interval" not in self.config: self.config["health_check_interval"] = 30 # Health check every 30 seconds - + # Add retry settings for operations if "retry_on_timeout" not in self.config: self.config["retry_on_timeout"] = True - + # Add connection pool settings for better performance if "socket_keepalive" not in self.config: self.config["socket_keepalive"] = True diff --git a/Controllers/Redis/implementations.py b/Controllers/Redis/implementations.py index 9d108d4..05c30b7 100644 --- a/Controllers/Redis/implementations.py +++ b/Controllers/Redis/implementations.py @@ -113,56 +113,58 @@ def run_all_examples() -> None: def run_concurrent_test(num_threads=100): """Run a comprehensive concurrent test with multiple threads to verify Redis connection handling.""" - print(f"\nStarting comprehensive Redis concurrent test with {num_threads} threads...") - + print( + f"\nStarting comprehensive Redis concurrent test with {num_threads} threads..." + ) + # Results tracking with detailed metrics results = { - "passed": 0, - "failed": 0, + "passed": 0, + "failed": 0, "retried": 0, "errors": [], "operation_times": [], "retry_count": 0, "max_retries": 3, - "retry_delay": 0.1 + "retry_delay": 0.1, } results_lock = threading.Lock() - + def worker(thread_id): # Track operation timing start_time = time.time() retry_count = 0 success = False error_message = None - + while retry_count <= results["max_retries"] and not success: try: # Generate unique key for this thread unique_id = str(uuid.uuid4())[:8] full_key = f"test:concurrent:{thread_id}:{unique_id}" - + # Simple string operations instead of JSON test_value = f"test-value-{thread_id}-{time.time()}" - + # Set data in Redis with pipeline for efficiency from Controllers.Redis.database import redis_cli - + # Use pipeline to reduce network overhead with redis_cli.pipeline() as pipe: pipe.set(full_key, test_value) pipe.get(full_key) pipe.delete(full_key) results_list = pipe.execute() - + # Check results set_ok = results_list[0] retrieved_value = results_list[1] if isinstance(retrieved_value, bytes): - retrieved_value = retrieved_value.decode('utf-8') - + retrieved_value = retrieved_value.decode("utf-8") + # Verify data success = set_ok and retrieved_value == test_value - + if success: break else: @@ -170,26 +172,28 @@ def run_concurrent_test(num_threads=100): retry_count += 1 with results_lock: results["retry_count"] += 1 - time.sleep(results["retry_delay"] * (2 ** retry_count)) # Exponential backoff - + time.sleep( + results["retry_delay"] * (2**retry_count) + ) # Exponential backoff + except Exception as e: error_message = str(e) retry_count += 1 with results_lock: results["retry_count"] += 1 - + # Check if it's a connection error and retry if "Too many connections" in str(e) or "Connection" in str(e): # Exponential backoff for connection issues - backoff_time = results["retry_delay"] * (2 ** retry_count) + backoff_time = results["retry_delay"] * (2**retry_count) time.sleep(backoff_time) else: # For other errors, use a smaller delay time.sleep(results["retry_delay"]) - + # Record operation time operation_time = time.time() - start_time - + # Update results with results_lock: if success: @@ -200,26 +204,30 @@ def run_concurrent_test(num_threads=100): else: results["failed"] += 1 if error_message: - results["errors"].append(f"Thread {thread_id} failed after {retry_count} retries: {error_message}") + results["errors"].append( + f"Thread {thread_id} failed after {retry_count} retries: {error_message}" + ) else: - results["errors"].append(f"Thread {thread_id} failed after {retry_count} retries with unknown error") - + results["errors"].append( + f"Thread {thread_id} failed after {retry_count} retries with unknown error" + ) + # Create and start threads using a thread pool start_time = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [executor.submit(worker, i) for i in range(num_threads)] concurrent.futures.wait(futures) - + # Calculate execution time and performance metrics execution_time = time.time() - start_time ops_per_second = num_threads / execution_time if execution_time > 0 else 0 - + # Calculate additional metrics if we have successful operations avg_op_time = 0 min_op_time = 0 max_op_time = 0 p95_op_time = 0 - + if results["operation_times"]: avg_op_time = sum(results["operation_times"]) / len(results["operation_times"]) min_op_time = min(results["operation_times"]) @@ -227,8 +235,12 @@ def run_concurrent_test(num_threads=100): # Calculate 95th percentile sorted_times = sorted(results["operation_times"]) p95_index = int(len(sorted_times) * 0.95) - p95_op_time = sorted_times[p95_index] if p95_index < len(sorted_times) else sorted_times[-1] - + p95_op_time = ( + sorted_times[p95_index] + if p95_index < len(sorted_times) + else sorted_times[-1] + ) + # Print detailed results print("\nConcurrent Redis Test Results:") print(f"Total threads: {num_threads}") @@ -237,17 +249,17 @@ def run_concurrent_test(num_threads=100): print(f"Operations with retries: {results['retried']}") print(f"Total retry attempts: {results['retry_count']}") print(f"Success rate: {(results['passed'] / num_threads) * 100:.2f}%") - + print("\nPerformance Metrics:") print(f"Total execution time: {execution_time:.2f} seconds") print(f"Operations per second: {ops_per_second:.2f}") - + if results["operation_times"]: print(f"Average operation time: {avg_op_time * 1000:.2f} ms") print(f"Minimum operation time: {min_op_time * 1000:.2f} ms") print(f"Maximum operation time: {max_op_time * 1000:.2f} ms") print(f"95th percentile operation time: {p95_op_time * 1000:.2f} ms") - + # Print errors (limited to 10 for readability) if results["errors"]: print("\nErrors:") @@ -255,7 +267,7 @@ def run_concurrent_test(num_threads=100): print(f"- {error}") if len(results["errors"]) > 10: print(f"- ... and {len(results['errors']) - 10} more errors") - + # Return results for potential further analysis return results @@ -263,6 +275,6 @@ def run_concurrent_test(num_threads=100): if __name__ == "__main__": # Run basic examples run_all_examples() - + # Run enhanced concurrent test run_concurrent_test(10000) diff --git a/Schemas/building/build.py b/Schemas/building/build.py index a83c656..a41c288 100644 --- a/Schemas/building/build.py +++ b/Schemas/building/build.py @@ -351,9 +351,7 @@ class BuildParts(CrudCollection): if build_type := BuildTypes.filter_by_one( system=True, id=self.part_type_id, db=db ).data: - return ( - f"{str(build_type.type_name).upper()} : {str(self.part_no).upper()}" - ) + return f"{str(build_type.type_name).upper()} : {str(self.part_no).upper()}" return f"Undefined:{str(build_type.type_name).upper()}"