diff --git a/.gitignore b/.gitignore index 1301a42..9838fc3 100644 --- a/.gitignore +++ b/.gitignore @@ -57,6 +57,7 @@ report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json env .env ServicesRunner/AccountRecordServices/Finder/Iban/.prisma-cache +ServicesRunner/AccountRecordServices/Finder/Comment/.prisma-cache venv/ .vscode/ __pycache__/ diff --git a/=1.3.8 b/=1.3.8 new file mode 100644 index 0000000..e69de29 diff --git a/ServicesRunner/AccountRecordServices/Finder/Comment/.dockerignore b/ServicesRunner/AccountRecordServices/Finder/Comment/.dockerignore new file mode 100644 index 0000000..f0d05fd --- /dev/null +++ b/ServicesRunner/AccountRecordServices/Finder/Comment/.dockerignore @@ -0,0 +1,14 @@ +__pycache__/ +*.pyc +*.pyo +*.pyd +*.db +*.sqlite3 +*.log +*.env +venv/ +.env.* +node_modules/ +.prisma/ +.prisma-cache/ +ServicesRunnner/AccountRecordServices/Test/venv/ diff --git a/ServicesRunner/AccountRecordServices/Finder/Comment/Dockerfile b/ServicesRunner/AccountRecordServices/Finder/Comment/Dockerfile new file mode 100644 index 0000000..f5b816a --- /dev/null +++ b/ServicesRunner/AccountRecordServices/Finder/Comment/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.12-slim + +ENV PYTHONUNBUFFERED=1 +ENV PYTHONDONTWRITEBYTECODE=1 +ENV VIRTUAL_ENV=/opt/venv +ENV PRISMA_SCHEMA_PATH=/app/Depends/schema.prisma +ENV PATH="$VIRTUAL_ENV/bin:$PATH" +ENV PYTHONPATH=/app + +RUN apt-get update && apt-get install -y --no-install-recommends gcc curl && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY ServicesRunner/Depends/ /app/Depends/ +COPY ServicesRunner/AccountRecordServices/Finder/Comment /app/ + +COPY ServicesRunner/requirements.txt /app/requirements.txt +COPY ServicesRunner/AccountRecordServices/Finder/Comment/entrypoint.sh /entrypoint.sh + +RUN chmod +x /entrypoint.sh + +CMD ["/entrypoint.sh"] \ No newline at end of file diff --git a/ServicesRunner/AccountRecordServices/Finder/Comment/app.py b/ServicesRunner/AccountRecordServices/Finder/Comment/app.py new file mode 100644 index 0000000..78f7c3b --- /dev/null +++ b/ServicesRunner/AccountRecordServices/Finder/Comment/app.py @@ -0,0 +1,174 @@ +import time +import arrow +import pprint + +from json import dumps + +from decimal import Decimal +from pydantic import BaseModel +from datetime import datetime +from typing import Optional + +from Depends.prisma_client import PrismaService +from Depends.service_handler import ProcessCommentFinderService +from Depends.config import ConfigServices, MailSendModel, RedisMailSender, Status, RedisTaskObject, FinderComment + + +class BankReceive(BaseModel): + import_file_name: str + iban: str + bank_date: datetime + channel_branch: str + currency: Optional[str] = "TL" + currency_value: Decimal + bank_balance: Decimal + additional_balance: Decimal + process_name: str + process_type: str + process_comment: str + bank_reference_code: str + bank_date_w: int + bank_date_m: int + bank_date_d: int + bank_date_y: int + + +def check_task_belong_to_this_service(task: RedisTaskObject): + if not task.service == ConfigServices.SERVICE_PREFIX_FINDER_IBAN: + return False + if not task.completed: + return False + if task.is_completed: + return False + if not task.data: + return False + return True + + +def write_account_records_row_from_finder_comment(finder_comments: list[FinderComment], prisma_service: PrismaService, saved_list_of_account_records: dict): + finder_comments = list(finder_comments) + for finder_comment in finder_comments: + bank_date = arrow.get(finder_comment.bank_date).replace(tzinfo='GMT+3').datetime + bank_receive_record = BankReceive( + import_file_name=finder_comment.filename, iban=finder_comment.iban, bank_date=bank_date, channel_branch=finder_comment.channel_branch, currency="TL", currency_value=finder_comment.currency_value, + bank_balance=finder_comment.balance, additional_balance=finder_comment.additional_balance, process_name=finder_comment.process_name, process_type=finder_comment.process_type, + process_comment=finder_comment.process_comment, bank_reference_code=finder_comment.bank_reference_code, build_id=finder_comment.build_id, build_uu_id=finder_comment.build_uu_id, + decision_book_id=finder_comment.decision_book_id, decision_book_uu_id=finder_comment.decision_book_uu_id, bank_date_w=bank_date.weekday(), bank_date_m=bank_date.month, + bank_date_d=bank_date.day, bank_date_y=bank_date.year + ) + account_record_found = prisma_service.find_first(table="account_records", query={"iban": bank_receive_record.iban, "bank_reference_code": bank_receive_record.bank_reference_code, + "bank_date": bank_receive_record.bank_date, "bank_balance": bank_receive_record.bank_balance, "currency_value": bank_receive_record.currency_value}, + select={"id": True, "iban": True, "bank_reference_code": True, "bank_date": True, "bank_balance": True} + ) + if not account_record_found: + created_account_record = prisma_service.create(table="account_records", data=bank_receive_record.dict(), select={"id": True, "iban": True, "bank_reference_code": True, "bank_date": True, "bank_balance": True} ) + if created_account_record['build_id'] in saved_list_of_account_records.keys(): + saved_list_of_account_records[created_account_record['build_id']] = [*saved_list_of_account_records[created_account_record['build_id']], created_account_record] + else: + saved_list_of_account_records[created_account_record['build_id']] = [created_account_record] + return saved_list_of_account_records + + +def enclose_task_and_send_mail_to_build_manager(prisma_service: PrismaService, saved_list_of_account_records: dict, process_comment_finder_service: ProcessCommentFinderService, task: RedisTaskObject): + """ + Enclose task and send mail to build manager + """ + if not saved_list_of_account_records: + return + today = arrow.now().to('GMT+3').datetime + for build_id, saved_list_of_account_record in saved_list_of_account_records.items(): + build_manager_occupant_type = prisma_service.find_first(table="occupant_types", query={"occupant_code":"BU-MNG", "is_confirmed": True, "active": True}) + living_space = prisma_service.find_first( + table="build_living_space", query={"build_id": build_id, "occupant_type_id": build_manager_occupant_type['id'], "expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}}) + build = prisma_service.find_first(table="builds", query={"id": build_id}) + person = prisma_service.find_first(table="people", query={"id": living_space['person_id']}) + user = prisma_service.find_first(table="users", query={"person_id": person['id']}) + send_object = MailSendModel( + receivers=[user.email], data=saved_list_of_account_record, template_name=ConfigServices.TEMPLATE_ACCOUNT_RECORDS, + subject=f"{build['name']} Cari Durum Bilgilendirme Raporu - {today.strftime('%d/%m/%Y %H:%M')}", + ) + set_mail_object = RedisMailSender(task=task, data=send_object, service=ConfigServices.SERVICE_PREFIX_MAIL_SENDER, status=Status.PENDING, completed=False, created_at=today.strftime('%Y-%m-%d %H:%M:%S')) + process_comment_finder_service.service_retriever.redis_client.set(ConfigServices.SERVICE_PREFIX_MAIL_SENDER, dumps(set_mail_object.dict())) + return + + +if __name__ == "__main__": + + prisma_service = PrismaService() + process_comment_finder_service = ProcessCommentFinderService() + print("Process Comment service started") + try: + print("Process Comment service started sleeping for 5 seconds") + while True: + time.sleep(5) + saved_list_of_account_records = dict() + tasks = process_comment_finder_service.fetch_all_tasks() + for task in tasks: + if not check_task_belong_to_this_service(task): + continue + write_account_records_row_from_finder_comment(finder_comments=task.data.FinderComment, prisma_service=prisma_service, saved_list_of_account_records=saved_list_of_account_records) + process_comment_finder_service.update_task_status(task_uuid=task.task, is_completed=True, status=Status.COMPLETED) + process_comment_finder_service.delete_task(task_uuid=task.task) + except Exception as e: + raise + finally: + prisma_service.disconnect() + + +def fix_account_records_bank_date(prisma_service: PrismaService, bank_receive_record: BankReceive): + account_record_from_other_fields = prisma_service.find_first( + table="account_records", + query={ + "iban": bank_receive_record.iban, + "bank_reference_code": bank_receive_record.bank_reference_code, + "bank_balance": bank_receive_record.bank_balance, + "currency_value": bank_receive_record.currency_value, + # "process_comment": {"contains": str(bank_receive_record.process_comment), "mode": "insensitive"}, + }, + select={ + "id": True, "iban": True, "bank_reference_code": True, "bank_date": True, + "bank_balance": True, "currency_value": True, "process_comment": True + } + ) + if account_record_from_other_fields: + prisma_service.update( + table="account_records", where={"id": account_record_from_other_fields['id']}, data={"bank_date": bank_receive_record.bank_date}, + ) + if not account_record_from_other_fields: + pprint.pprint({"not_found_bank_receive_record": bank_receive_record}) + # prisma_service.update( + # table="account_records", where={"id": account_record_from_other_fields['id']}, data={"bank_date": bank_receive_record.bank_date}, + # ) + # from_database = arrow.get(account_record_from_other_fields['bank_date']).to('GMT+3').datetime + # print('old date', from_database, " - new date ", bank_receive_record.bank_date) + + +def commented_out_code(): + account_record_found = None + + old_bank_date=arrow.get(finder_comment.bank_date).datetime + + if not account_record_found: + account_record_found_with_old_date = prisma_service.find_first( + table="account_records", + query={ + "iban": bank_receive_record.iban, "bank_reference_code": bank_receive_record.bank_reference_code, + "bank_date": old_bank_date, "bank_balance": bank_receive_record.bank_balance, + }, + ) + if account_record_found_with_old_date: + prisma_service.update( + table="account_records", where={"id": account_record_found_with_old_date.id}, data={"bank_date": bank_receive_record.bank_date}, + ) + if account_record_found: + print('-' * 150) + pprint.pprint( + { + "account_record_found": dict(account_record_found), + "bank_receive_record": bank_receive_record.dict(), + "bank_receive_record.bank_date": bank_receive_record.bank_date, + "account_record_found.bank_date": account_record_found["bank_date"], + } + ) + print('-' * 150) + return diff --git a/ServicesRunner/AccountRecordServices/Finder/Comment/entrypoint.sh b/ServicesRunner/AccountRecordServices/Finder/Comment/entrypoint.sh new file mode 100644 index 0000000..8afa504 --- /dev/null +++ b/ServicesRunner/AccountRecordServices/Finder/Comment/entrypoint.sh @@ -0,0 +1,19 @@ +#!/bin/sh + +VENV_PATH="/opt/venv" +REQUIREMENTS_PATH="/app/requirements.txt" +SCHEMA_PATH="/app/Depends/schema.prisma" +PRISMA_BINARY_PATH="/root/.cache/prisma-python/binaries" + +if [ ! -x "$VENV_PATH/bin/python" ]; then + python -m venv "$VENV_PATH" + "$VENV_PATH/bin/pip" install pip --upgrade + "$VENV_PATH/bin/pip" install --no-cache-dir -r "$REQUIREMENTS_PATH" + "$VENV_PATH/bin/prisma" generate --schema "$SCHEMA_PATH" +fi + +if ! find "$PRISMA_BINARY_PATH" -type f -name "prisma-query-engine-debian-openssl-3.0.x" | grep -q .; then + "$VENV_PATH/bin/prisma" py fetch +fi + +exec "$VENV_PATH/bin/python" -u app.py diff --git a/ServicesRunner/AccountRecordServices/Finder/Iban/app.py b/ServicesRunner/AccountRecordServices/Finder/Iban/app.py index bed5dd6..26d95b8 100644 --- a/ServicesRunner/AccountRecordServices/Finder/Iban/app.py +++ b/ServicesRunner/AccountRecordServices/Finder/Iban/app.py @@ -1,74 +1,108 @@ -import uvloop -import asyncio -import sys -import signal import time +import arrow + +from pydantic import BaseModel from datetime import datetime -from Depends.prisma_client import prisma_client, disconnect_prisma -asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) +from Depends.prisma_client import PrismaService +from Depends.service_handler import IbanFinderService +from Depends.config import ConfigServices, Status, FinderIban, RedisTaskObject -# sys.stdout.reconfigure(line_buffering=True) # alternatif: python -u veya PYTHONUNBUFFERED=1 +class IbanRecord(BaseModel): + id: int + uu_id: str + iban: str + build_id: int + build_uu_id: str + expiry_starts: datetime + expiry_ends: datetime -async def tick(): - start = time.time() - print(f"[{datetime.now()}] Attempting database query...") - async with prisma_client() as db: - rows = await db.account_records.find_many( - take=5, skip=0, order=[{"bank_date": "desc"}] - ) - print(f"[{datetime.now()}] Query completed in {time.time()-start:.2f}s") - - for i, r in enumerate(rows): - # Dilersen burada formatı değiştir - print(f" Row: {i} | id={r.id} bank_date={r.bank_date} currency_value={r.currency_value}") - print("-" * 80) +class DecisionBookRecord(BaseModel): + id: int + uu_id: str + build_id: int + build_uu_id: str + expiry_starts: datetime + expiry_ends: datetime -async def service(): - print(f"[{datetime.now()}] IBAN Finder service starting") - try: - iteration = 0 - while True: - iteration += 1 - print(f"\n[{datetime.now()}] Loop iteration {iteration}") - try: - await tick() - except Exception as e: - print(f"[{datetime.now()}] Error in service tick: {e}") - await asyncio.sleep(1) # bloklamayan bekleme - finally: - # Her durumda DB'yi temiz kapat - await disconnect_prisma() - print(f"[{datetime.now()}] Cleaning up database connection...") +def check_task_belong_to_this_service(task: RedisTaskObject): + if not task.service == ConfigServices.SERVICE_PREFIX_MAIL_PARSER: + return False + if not task.completed: + return False + if not task.data: + return False + return True -async def _graceful_shutdown(sig: signal.Signals): - print(f"\n[{datetime.now()}] Shutting down due to signal: {sig.name}") - # Burada istersen tüm pending task'leri iptal edebilirsin: - # for t in asyncio.all_tasks(): - # if t is not asyncio.current_task(): - # t.cancel() - await disconnect_prisma() +def extract_build_iban_from_task(task: RedisTaskObject, finder_iban: FinderIban, write_object: dict) -> tuple[bool, dict]: + bank_date = arrow.get(finder_iban.bank_date).datetime + iban_record_db = prisma_service.find_first( + table="build_ibans", + query={ + "active": True, "deleted": False, "is_confirmed": True, "iban": finder_iban.iban, + "expiry_starts": {"lte": bank_date}, "expiry_ends": {"gte": bank_date}, + }, + select={"id": None, "uu_id": None, "iban": None, "build_id": None, "build_uu_id": None, "expiry_starts": None, "expiry_ends": None} + ) + if iban_record_db: + iban_record = IbanRecord(**iban_record_db) + write_object["build_id"] = iban_record.build_id + write_object["build_uu_id"] = iban_record.build_uu_id + return True, write_object + return False, write_object -def _install_signal_handlers(loop: asyncio.AbstractEventLoop): - # Linux/Unix: SIGINT (Ctrl+C) ve SIGTERM (docker stop) için kibar kapanış - for s in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler(s, lambda s=s: asyncio.create_task(_graceful_shutdown(s))) +def extract_decision_book_from_task(write_object: dict) -> tuple[bool, dict]: + bank_date = arrow.get(write_object["bank_date"]).datetime + decision_book_record_db = prisma_service.find_first( + table="build_decision_book", + query={ + "active": True, "deleted": False, "is_confirmed": True, "build_id": write_object["build_id"], + "expiry_starts": {"lte": bank_date}, "expiry_ends": {"gte": bank_date}, + }, + select={"id": None, "uu_id": None, "build_id": None, "build_uu_id": None, "expiry_starts": None, "expiry_ends": None} + ) + if decision_book_record_db: + decision_book_record = DecisionBookRecord(**decision_book_record_db) + write_object["build_decision_book_id"] = decision_book_record.id + write_object["build_decision_book_uu_id"] = decision_book_record.uu_id + return True, write_object + return False, write_object -async def main(): - loop = asyncio.get_running_loop() - try: - _install_signal_handlers(loop) - except NotImplementedError: - # (Gerekirse Windows vs., ama sen Linux/Docker kullanıyorsun) - pass - await service() - if __name__ == "__main__": - # uvloop policy zaten yukarıda set edildi; burada normal asyncio.run kullanıyoruz - asyncio.run(main()) + + prisma_service = PrismaService() + iban_finder_service = IbanFinderService() + print("Find Build Iban service started") + try: + print("Find Build Iban service started sleeping for 5 seconds") + while True: + time.sleep(5) + tasks = iban_finder_service.fetch_all_tasks() + for task in tasks: + if not check_task_belong_to_this_service(task): + continue + if list(task.data.FinderIban): + finder_iban_list = [] + for finder_iban in list(task.data.FinderIban): + write_object = finder_iban.dict() + is_build_found, is_decision_book_found = False, False + is_build_found, write_object = extract_build_iban_from_task(task, finder_iban, write_object) + if is_build_found: + is_decision_book_found, write_object = extract_decision_book_from_task(write_object) + if is_build_found or is_decision_book_found: + finder_iban_list.append(write_object) + if finder_iban_list: + iban_finder_service.update_service_data(task.task, ConfigServices.SERVICE_PREFIX_FINDER_COMMENT, finder_iban_list) + iban_finder_service.change_service(task.task, ConfigServices.SERVICE_PREFIX_FINDER_IBAN, Status.COMPLETED, True) + continue + iban_finder_service.change_service(task.task, ConfigServices.SERVICE_PREFIX_FINDER_IBAN, Status.FAILED, True) + except Exception as e: + raise + finally: + prisma_service.disconnect() diff --git a/ServicesRunner/AccountRecordServices/Finder/Iban/entrypoint.sh b/ServicesRunner/AccountRecordServices/Finder/Iban/entrypoint.sh index 940e6db..8afa504 100644 --- a/ServicesRunner/AccountRecordServices/Finder/Iban/entrypoint.sh +++ b/ServicesRunner/AccountRecordServices/Finder/Iban/entrypoint.sh @@ -7,6 +7,7 @@ PRISMA_BINARY_PATH="/root/.cache/prisma-python/binaries" if [ ! -x "$VENV_PATH/bin/python" ]; then python -m venv "$VENV_PATH" + "$VENV_PATH/bin/pip" install pip --upgrade "$VENV_PATH/bin/pip" install --no-cache-dir -r "$REQUIREMENTS_PATH" "$VENV_PATH/bin/prisma" generate --schema "$SCHEMA_PATH" fi diff --git a/ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank/app.py b/ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank/app.py index 5e4146a..dfc10fb 100644 --- a/ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank/app.py +++ b/ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank/app.py @@ -152,12 +152,11 @@ if __name__ == "__main__": # Process each task for active_task in all_tasks: - if active_task.service == ConfigServices.SERVICE_PREFIX_MAIL_PARSER and active_task.completed: - # logger.info(f"Task {active_task.task} is already processed.") + if active_task.service == ConfigServices.SERVICE_PREFIX_MAIL_READER and active_task.completed: + logger.info(f"Processing task {active_task.task}") + parser.process_task(active_task) + else: continue - - logger.info(f"Processing task {active_task.task}") - parser.process_task(active_task) else: logger.info("No tasks found to process") diff --git a/ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/app.py b/ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/app.py index a091ef5..ac9762b 100644 --- a/ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/app.py +++ b/ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/app.py @@ -68,102 +68,69 @@ def initialize_service(): if __name__ == "__main__": logger.info("Starting IsBank Email Service") print(f"Starting Service Mail Reader.") - - # Initialize service runner = initialize_service() - - # Configurable parameters - normal_sleep_time = 10 # seconds between normal operations - error_sleep_time = 30 # seconds to wait after an error before retrying - max_consecutive_errors = 5 # maximum number of consecutive errors before longer pause - extended_error_sleep = 120 # seconds to wait after hitting max consecutive errors + normal_sleep_time = 10 + error_sleep_time = 30 + max_consecutive_errors = 5 + extended_error_sleep = 120 consecutive_errors = 0 - - # Main service loop + while True: try: - # Main processing + print("Fetching and setting mails...") runner.fetch_and_set_mails() - - # Reset error counter on success if consecutive_errors > 0: logger.info(f"Service recovered after {consecutive_errors} consecutive errors") consecutive_errors = 0 - - # Normal operation sleep sleep(normal_sleep_time) - except MailReaderService.REDIS_EXCEPTIONS as e: - # Redis-specific errors consecutive_errors += 1 logger.error(f"Redis error (attempt {consecutive_errors}): {str(e)}") - - # Use centralized reconnection handler from RedisHandler redis_handler, need_extended_sleep = MailReaderService.handle_reconnection( consecutive_errors=consecutive_errors, max_consecutive_errors=max_consecutive_errors ) - if redis_handler: - # Update runner's redis handler with the new instance runner.redis_handler = redis_handler - runner.redis_connected = False # Will trigger reconnection on next cycle - - # Sleep based on error count + runner.redis_connected = False if need_extended_sleep: sleep(extended_error_sleep) else: sleep(error_sleep_time) - except socket.error as e: - # Email connection errors consecutive_errors += 1 logger.error(f"Email connection error (attempt {consecutive_errors}): {str(e)}") - - # Try to re-establish email connection try: logger.info("Attempting to re-establish email connection...") - # Create new email service directly email_service = EmailReaderService(IsBankConfig()) email_service.login_and_connect() - - # Create new runner with existing Redis handler and new email service - redis_handler = runner.redis_handler # Preserve existing Redis handler + redis_handler = runner.redis_handler runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service) logger.info("Successfully re-established email connection") except Exception as email_retry_error: logger.error(f"Failed to re-establish email connection: {str(email_retry_error)}") - # Determine sleep time based on consecutive errors if consecutive_errors >= max_consecutive_errors: logger.warning(f"Hit {max_consecutive_errors} consecutive email errors, taking longer pause") sleep(extended_error_sleep) else: sleep(error_sleep_time) - except Exception as e: - # Any other unexpected errors consecutive_errors += 1 logger.error(f"Unexpected error (attempt {consecutive_errors}): {str(e)}") - - # For any other error, try to reinitialize everything after some delay if consecutive_errors >= max_consecutive_errors: logger.warning(f"Hit {max_consecutive_errors} consecutive errors, reinitializing service") try: - # Try to clean up existing connections try: runner.drop() except Exception as cleanup_error: logger.warning(f"Error during cleanup: {str(cleanup_error)}") - - # Reinitialize the service directly redis_handler = MailReaderService() email_service = EmailReaderService(IsBankConfig()) email_service.login_and_connect() runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service) - if runner: logger.info("Successfully reinitialized email service runner") - consecutive_errors = 0 # Reset counter after reinitialization + consecutive_errors = 0 else: logger.error("Failed to reinitialize email service runner") except Exception as reinit_error: @@ -171,6 +138,5 @@ if __name__ == "__main__": sleep(extended_error_sleep) else: - # For fewer consecutive errors, just retry the current runner print(f"Error: {str(e)}") sleep(error_sleep_time) diff --git a/ServicesRunner/Depends/config.py b/ServicesRunner/Depends/config.py index a8c24a1..96a5587 100644 --- a/ServicesRunner/Depends/config.py +++ b/ServicesRunner/Depends/config.py @@ -1,5 +1,5 @@ import os - +from re import TEMPLATE from pydantic import BaseModel from typing import Any, List, Optional, Union @@ -29,20 +29,34 @@ class MailParser(BaseModel): charset: str data: str - class FinderIban(BaseModel): - ... + + filename: str + iban: str + bank_date: str + channel_branch: str + currency_value: float + balance: float + additional_balance: float + process_name: str + process_type: str + process_comment: str + bank_reference_code: str -class FinderComment(BaseModel): - ... +class FinderComment(FinderIban): + build_id: Optional[int] = None + build_uu_id: Optional[str] = None + decision_book_id: Optional[int] = None + decision_book_uu_id: Optional[str] = None + class RedisData(BaseModel): MailReader: MailReader MailParser: List[MailParser] - FinderIban: FinderIban | Any - FinderComment: FinderComment | Any + FinderIban: List[FinderIban] + FinderComment: List[FinderComment] class Status: @@ -61,6 +75,24 @@ class RedisTaskObject(BaseModel): created_at: str is_completed: bool + +class MailSendModel(BaseModel): + receivers: List[str] + subject: str + template_name: str + data: dict + + +class RedisMailSender(BaseModel): + task: RedisTaskObject + data: MailSendModel + completed: bool + service: str + status: str + created_at: str + completed: bool + + class EmailConfig: HOST: str = os.getenv("EMAIL_HOST", "10.10.2.34") @@ -105,12 +137,16 @@ class ConfigServices: TASK_MAILID_INDEX_PREFIX: str = "BANK:SERVICES:TASK:MAILID" TASK_UUID_INDEX_PREFIX: str = "BANK:SERVICES:TASK:UUID" TASK_SEEN_PREFIX: str = "BANK:SERVICES:TASK:SEEN" + TASK_DELETED_PREFIX: str = "BANK:SERVICES:TASK:DELETED" SERVICE_PREFIX_MAIL_READER: str = "MailReader" SERVICE_PREFIX_MAIL_PARSER: str = "MailParser" SERVICE_PREFIX_FINDER_IBAN: str = "FinderIban" SERVICE_PREFIX_FINDER_COMMENT: str = "FinderComment" + SERVICE_PREFIX_MAIL_SENDER: str = "MailSender" + TEMPLATE_ACCOUNT_RECORDS: str = "template_accounts.html" + paramsRedisData = Union[MailReader, MailParser, FinderIban, FinderComment] diff --git a/ServicesRunner/Depends/mail_handler.py b/ServicesRunner/Depends/mail_handler.py index 33b9a01..c67f259 100644 --- a/ServicesRunner/Depends/mail_handler.py +++ b/ServicesRunner/Depends/mail_handler.py @@ -15,7 +15,7 @@ from email.parser import BytesParser from imaplib import IMAP4_SSL, IMAP4 from Depends.redis_handlers import RedisHandler -from Depends.config import EmailConfig, MailReaderMainConfig, MailReader, MailParser, RedisData +from Depends.config import ConfigServices, EmailConfig, MailReaderMainConfig, MailReader, MailParser, RedisData, Status from Depends.service_handler import MailReaderService # Configure logging @@ -328,41 +328,70 @@ class EmailReaderService: raise @retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error)) - def mark_no_attachment(self, uid): + def move_to_folder(self, uid: Union[str, bytes], folder: str): + """ + Move message to folder with retry mechanism + + Args: + uid: Email UID + folder: Destination folder + """ + try: + log_uid = uid + if isinstance(uid, bytes): + log_uid = uid.decode('utf-8', errors='replace') + elif isinstance(uid, str): + uid = uid.encode('utf-8') + logger.info(f"Moving email {log_uid} to {folder} folder") + self.mail.uid('MOVE', uid, folder) + self.commit() + return True + except Exception as e: + logger.error(f"Failed to move email to folder: {str(e)}") + return False + + @retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error)) + def copy_to_folder(self, uid: Union[str, bytes], folder: str): + """ + Copy message to folder with retry mechanism + + Args: + uid: Email UID + folder: Destination folder + """ + try: + log_uid = uid + if isinstance(uid, bytes): + log_uid = uid.decode('utf-8', errors='replace') + elif isinstance(uid, str): + uid = uid.encode('utf-8') + logger.info(f"Copying email {log_uid} to {folder} folder") + self.mail.uid('COPY', uid, folder) + self.commit() + return True + except Exception as e: + logger.error(f"Failed to copy email to folder: {str(e)}") + return False + + @retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error)) + def mark_no_attachment(self, uid: Union[str, bytes]): """ Move message to no attachment folder with retry mechanism Args: uid: Email UID """ - try: - # Handle both string and bytes types for logging - log_uid = uid - if isinstance(uid, bytes): - log_uid = uid.decode('utf-8', errors='replace') - - logger.info(f"Moving email {log_uid} to {self.config.NO_ATTACHMENT_FOLDER} folder") - self.mail.uid('COPY', uid, self.config.NO_ATTACHMENT_FOLDER) - self.delete(uid) - except Exception as e: - logger.error(f"Failed to mark email as no attachment: {str(e)}") - raise + self.move_to_folder(uid, self.config.NO_ATTACHMENT_FOLDER) @retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error)) - def mark_completed(self, uid: bytes): + def mark_completed(self, uid: Union[str, bytes]): """ Move message to completed folder with retry mechanism Args: uid: Email UID """ - try: - logger.info(f"Moving email {uid.decode('utf-8', errors='replace')} to {self.config.COMPLETED_FOLDER} folder") - self.mail.uid('COPY', uid, self.config.COMPLETED_FOLDER) - # self.delete(uid) - except Exception as e: - logger.error(f"Failed to mark email as completed: {str(e)}") - raise + self.move_to_folder(uid, self.config.COMPLETED_FOLDER) @retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error)) def delete(self, uid): @@ -499,7 +528,6 @@ class EmailServiceRunner: if not getattr(mail, 'id', None): logger.warning("Skipping email with no ID") continue - mail_id = mail.id.decode('utf-8') # check mail has .pdf extension @@ -545,26 +573,18 @@ class EmailServiceRunner: """ try: mail_to_dict = mail.to_dict() - result = self.redis_handler.process_mail( + task_uuid, self.counter = self.redis_handler.process_mail( mail_id=mail_id, mail_data=mail_to_dict, service_prefix=self.email_service.config.SERVICE_PREFIX, counter=self.counter ) - if result['status'] == 'success': - if result['action'] == 'stored_new_mail': - self.counter = result.get('counter', self.counter) - logger.info(f"Successfully processed new email {mail_id}") - elif result['action'] == 'checked_existing_mail': - if result.get('is_completed', False): - logger.info(f"Marking completed email {mail_id}") - self.email_service.mark_completed(mail_id) - elif result['status'] == 'error': - if result['action'] == 'id_mismatch': - logger.error(f"Mail ID mismatch: {mail_id} != {result.get('stored_id')}") - raise ValueError("Mail id does not match with id from Redis") - else: - logger.error(f"Email Service Runner Error processing mail {mail_id}: {result.get('error', 'Unknown error')}") - raise Exception(result.get('error', 'Unknown error during mail processing')) + if task_uuid: + self.redis_handler.change_service( + task_uuid=task_uuid, service_name=ConfigServices.SERVICE_PREFIX_MAIL_READER, status=Status.COMPLETED, completed=True + ) else: - logger.warning(f"Unexpected result status: {result['status']} for mail {mail_id}") + if self.redis_handler.check_mail_is_ready_to_delete(mail_id): + self.email_service.mark_completed(mail_id) + self.redis_handler.pop_mail(mail_id) + except MailReaderService.REDIS_EXCEPTIONS as e: logger.error(f"Redis error while processing mail {mail_id}: {str(e)}") self.redis_connected = False diff --git a/ServicesRunner/Depends/prisma_client.py b/ServicesRunner/Depends/prisma_client.py index 06caf6b..55ac71c 100644 --- a/ServicesRunner/Depends/prisma_client.py +++ b/ServicesRunner/Depends/prisma_client.py @@ -1,49 +1,289 @@ -from prisma import Prisma +import asyncio +import time +import logging +import uvloop +import threading + +from datetime import datetime +from typing import Optional, AsyncGenerator, Protocol, Any from contextlib import asynccontextmanager -from typing import AsyncGenerator, Optional +from prisma import Prisma -# Singleton pattern for Prisma client -_prisma_client: Optional[Prisma] = None -async def get_prisma_client() -> Prisma: - """ - Get or initialize the Prisma client singleton. - - Returns: - Prisma: The initialized Prisma client instance - """ - global _prisma_client - - if _prisma_client is None: - _prisma_client = Prisma() - await _prisma_client.connect() - - return _prisma_client +logger = logging.getLogger("prisma-service") -@asynccontextmanager -async def prisma_client() -> AsyncGenerator[Prisma, None]: - """ - Context manager for Prisma client operations. - - Yields: - Prisma: The initialized Prisma client instance - - Example: - ```python - async with prisma_client() as db: - users = await db.user.find_many() - ``` - """ - client = await get_prisma_client() - try: - yield client - except Exception as e: - print(f"Database operation error: {e}") - raise -async def disconnect_prisma(): - """Disconnect the Prisma client when shutting down the application.""" - global _prisma_client - if _prisma_client is not None: - await _prisma_client.disconnect() - _prisma_client = None +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("httpcore").setLevel(logging.WARNING) + + +class BaseModelClient(Protocol): + + async def find_many(self, **kwargs) -> list[Any]: ... + async def find_first(self, **kwargs) -> Any: ... + async def find_first_or_raise(self, **kwargs) -> Any: ... + async def find_unique(self, **kwargs) -> Any: ... + async def find_unique_or_raise(self, **kwargs) -> Any: ... + async def create(self, **kwargs) -> Any: ... + async def update(self, **kwargs) -> Any: ... + async def delete(self, **kwargs) -> Any: ... + async def delete_many(self, **kwargs) -> Any: ... + + +class PrismaService: + + def __init__(self) -> None: + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._thread: Optional[threading.Thread] = None + self._client: Optional[Prisma] = None + self._start_loop_thread() + + def _loop_runner(self) -> None: + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + try: + self._loop.run_forever() + finally: + self._loop.close() + + def _submit(self, coro): + if self._loop is None or not self._loop.is_running(): + raise RuntimeError("PrismaService event loop is not running.") + fut = asyncio.run_coroutine_threadsafe(coro, self._loop) + return fut.result() + + async def _aconnect(self) -> Prisma: + if self._client is not None: + return self._client + logger.info("Connecting Prisma client...") + client = Prisma() + await client.connect() + self._client = client + logger.info("Prisma client connected.") + return self._client + + async def _adisconnect(self) -> None: + if self._client is not None: + logger.info("Disconnecting Prisma client...") + try: + await self._client.disconnect() + finally: + self._client = None + logger.info("Prisma client disconnected.") + + @asynccontextmanager + async def _asession(self) -> AsyncGenerator[Prisma, None]: + yield await self._aconnect() + + def _start_loop_thread(self) -> None: + t = threading.Thread(target=self._loop_runner, name="PrismaLoop", daemon=True) + t.start() + self._thread = t + while self._loop is None: + time.sleep(0.005) + + async def _connect(self) -> Prisma: + if self._client is not None: + return self._client + async with self._lock: + if self._client is None: + logger.info("Connecting Prisma client...") + client = Prisma() + await client.connect() + self._client = client + logger.info("Prisma client connected.") + return self._client + + async def _disconnect(self) -> None: + async with self._lock: + if self._client is not None: + try: + logger.info("Disconnecting Prisma client...") + await self._client.disconnect() + logger.info("Prisma client disconnected.") + finally: + self._client = None + + @asynccontextmanager + async def _session(self) -> AsyncGenerator[Prisma, None]: + client = await self._connect() + try: + yield client + except Exception: + logger.exception("Database operation error") + raise + + def _run(self, coro): + try: + asyncio.get_running_loop() + raise RuntimeError("Async run is not allowed. Use sync methods instead.") + except RuntimeError as e: + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + with asyncio.Runner() as runner: + return runner.run(coro) + + async def _a_find_many(self, table: str, query: Optional[dict] = None, take: int = None, skip: int = None, + order: Optional[list[dict]] = None, select: Optional[dict] = None, include: Optional[dict] = None + ) -> list[dict]: + start = time.time() + async with self._asession() as db: + table_selected: BaseModelClient = getattr(db, table, None) + if not table_selected: + raise ValueError(f"Table {table} not found") + rows = await table_selected.find_many(where=query, take=take, skip=skip, order=order or [], select=select, include=include) + # print(f"[{datetime.now()}] Find many query completed in {time.time() - start:.2f}s") + return rows + + async def _a_find_first(self, table: str, query: Optional[dict] = None, order : Optional[list[dict]] = None, include: Optional[dict] = None) -> Any: + start = time.time() + async with self._asession() as db: + table_selected: BaseModelClient = getattr(db, table, None) + if not table_selected: + raise ValueError(f"Table {table} not found") + result = await table_selected.find_first(where=query, order=order or [], include=include) + # print(f"[{datetime.now()}] Find first query completed in {time.time() - start:.2f}s") + return result + + async def _a_find_first_or_throw(self, table: str, query: Optional[dict] = None, order : Optional[list[dict]] = None, + include: Optional[dict] = None + ) -> Any: + start = time.time() + async with self._asession() as db: + table_selected: BaseModelClient = getattr(db, table, None) + if not table_selected: + raise ValueError(f"Table {table} not found") + result = await table_selected.find_first_or_raise(where=query, order=order or [], include=include) + # print(f"[{datetime.now()}] Find first or throw query completed in {time.time() - start:.2f}s") + return result + + async def _a_create(self, table: str, data: dict, include: Optional[dict] = None) -> Any: + start = time.time() + async with self._asession() as db: + table_selected: BaseModelClient = getattr(db, table, None) + if not table_selected: + raise ValueError(f"Table {table} not found") + result = await table_selected.create(data=data, include=include) + # print(f"[{datetime.now()}] Create operation completed in {time.time() - start:.2f}s") + return result + + async def _a_update(self, table: str, where: dict, data: dict, include: Optional[dict] = None) -> Any: + start = time.time() + async with self._asession() as db: + table_selected: BaseModelClient = getattr(db, table, None) + if not table_selected: + raise ValueError(f"Table {table} not found") + result = await table_selected.update(where=where, data=data, include=include) + # print(f"[{datetime.now()}] Update operation completed in {time.time() - start:.2f}s") + return result + + async def _a_delete(self, table: str, where: dict, include: Optional[dict] = None) -> Any: + start = time.time() + async with self._asession() as db: + table_selected: BaseModelClient = getattr(db, table, None) + if not table_selected: + raise ValueError(f"Table {table} not found") + result = await table_selected.delete(where=where, include=include) + # print(f"[{datetime.now()}] Delete operation completed in {time.time() - start:.2f}s") + return result + + async def _a_delete_many(self, table: str, where: dict, include: Optional[dict] = None): + start = time.time() + async with self._asession() as db: + table_selected: BaseModelClient = getattr(db, table, None) + if not table_selected: + raise ValueError(f"Table {table} not found") + result = await table_selected.delete_many(where=where, include=include) + # print(f"[{datetime.now()}] Delete many operation completed in {time.time() - start:.2f}s") + return result + + async def _a_find_unique(self, table: str, query: dict, include: Optional[dict] = None) -> Any: + start = time.time() + async with self._asession() as db: + table_selected: BaseModelClient = getattr(db, table, None) + if not table_selected: + raise ValueError(f"Table {table} not found") + result = await table_selected.find_unique(where=query, include=include) + # print(f"[{datetime.now()}] Find unique query completed in {time.time() - start:.2f}s") + return result + + async def _a_find_unique_or_throw(self, table: str, query: dict, include: Optional[dict] = None) -> Any: + start = time.time() + async with self._asession() as db: + table_selected: BaseModelClient = getattr(db, table, None) + if not table_selected: + raise ValueError(f"Table {table} not found") + result = await table_selected.find_unique_or_raise(where=query, include=include) + # print(f"[{datetime.now()}] Find unique or throw query completed in {time.time() - start:.2f}s") + return result + + def find_unique_or_throw(self, table: str, query: dict, select: Optional[dict] = None, include: Optional[dict] = None): + result = self._submit(self._a_find_unique_or_throw(table=table, query=query, include=include)) + if select: + result = {k: v for k, v in result if k in select} + return result + + def find_unique(self, table: str, query: dict, select: Optional[dict] = None, include: Optional[dict] = None): + result = self._submit(self._a_find_unique(table=table, query=query, include=include)) + if select and result: + result = {k: v for k, v in result if k in select} + return result + + def find_many( + self, table: str, query: Optional[dict] = None, take: int = None, skip: int = None, + order: Optional[list[dict]] = None, select: Optional[dict] = None, include: Optional[dict] = None + ): + result = self._submit(self._a_find_many(table=table, query=query, take=take, skip=skip, order=order, select=select, include=include)) + if select and result: + result = [{k: v for k, v in item.items() if k in select} for item in result] + return result + + def create(self, table: str, data: dict, select: Optional[dict] = None, include: Optional[dict] = None): + result = self._submit(self._a_create(table=table, data=data, include=include)) + if select and result: + result = {k: v for k, v in result if k in select} + return result + + def find_first_or_throw(self, table: str, query: Optional[dict] = None, + order: Optional[list[dict]] = None, select: Optional[dict] = None, include: Optional[dict] = None + ): + result = self._submit(self._a_find_first_or_throw(table=table, query=query, order=order, include=include)) + if select and result: + result = {k: v for k, v in result if k in select} + return result + + def find_first(self, table: str, query: Optional[dict] = None, select: Optional[dict] = None, order: Optional[list[dict]] = None, include: Optional[dict] = None): + result = self._submit(self._a_find_first(table=table, query=query, order=order, include=include)) + if select and result: + result = {k: v for k, v in result if k in select} + return result + + def update(self, table: str, where: dict, data: dict, select: Optional[dict] = None, include: Optional[dict] = None): + result = self._submit(self._a_update(table=table, where=where, data=data, include=include)) + if select and result: + result = {k: v for k, v in result if k in select} + return result + + def delete(self, table: str, where: dict, select: Optional[dict] = None, include: Optional[dict] = None): + result = self._submit(self._a_delete(table=table, where=where, select=select, include=include)) + if select and result: + result = {k: v for k, v in result if k in select} + return result + + def delete_many(self, table: str, where: dict, select: Optional[dict] = None, include: Optional[dict] = None): + result = self._submit(self._a_delete_many(table=table, where=where, select=select, include=include)) + if select and result: + result = [{k: v for k, v in item if k in select} for item in result] + return result + + def disconnect(self) -> None: + try: + self._submit(self._adisconnect()) + finally: + if self._loop and self._loop.is_running(): + self._loop.call_soon_threadsafe(self._loop.stop) + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=2.0) + self._loop = None + self._thread = None diff --git a/ServicesRunner/Depends/redis_handlers.py b/ServicesRunner/Depends/redis_handlers.py index 6aed166..f21426a 100644 --- a/ServicesRunner/Depends/redis_handlers.py +++ b/ServicesRunner/Depends/redis_handlers.py @@ -1,5 +1,6 @@ import logging +from json import loads, dumps from contextlib import contextmanager from time import sleep from redis import Redis, RedisError, ConnectionError as RedisConnectionError @@ -90,6 +91,10 @@ class RedisHandler: def sadd(self, key: str, value): """Add a value to a Redis set""" return self.redis_client.sadd(key, value) + + def ismember(self, key: str, value): + """Check if a value is a member of a Redis set""" + return self.redis_client.sismember(key, value) def get(self, key: str): """Get a value from Redis by key""" @@ -98,7 +103,15 @@ class RedisHandler: def set(self, key: str, value): """Set a key-value pair in Redis""" return self.redis_client.set(key, value) - + + def delete_value(self, key: str, value): + """Delete a value from a Redis value by finding key""" + get_redis = self.get(key) + if get_redis: + get_redis: dict = loads(get_redis) + get_redis.pop(value) + self.set(key, dumps(get_redis)) + def rpush(self, key: str, value): """Append a value to a Redis list""" return self.redis_client.rpush(key, value) @@ -107,9 +120,13 @@ class RedisHandler: """Get an element from a Redis list by its index""" return self.redis_client.lindex(key, index) - def spop(self, key: str, value): - """Remove and return a random member from a Redis set""" - return self.redis_client.spop(key, value) + def spop(self, key: str, count=1): + """Remove and return random members from a Redis set""" + return self.redis_client.spop(key, count) + + def srem(self, key: str, value): + """Remove a specific member from a Redis set""" + return self.redis_client.srem(key, value) def get_all_tasks(self): """Get all keys matching the task prefix pattern""" diff --git a/ServicesRunner/Depends/service_handler.py b/ServicesRunner/Depends/service_handler.py index 7ba9be8..e27394a 100644 --- a/ServicesRunner/Depends/service_handler.py +++ b/ServicesRunner/Depends/service_handler.py @@ -1,12 +1,15 @@ import logging +from time import sleep from json import loads, dumps from uuid import uuid4 from datetime import datetime + from Depends.config import Status, ConfigServices, RedisTaskObject, RedisData from Depends.redis_handlers import RedisHandler +from redis import Redis +from redis.exceptions import WatchError, ResponseError -# Configure logging logger = logging.getLogger('Service Task Retriever') @@ -15,6 +18,7 @@ class ServiceTaskRetriever: Class for retrieving and updating Redis task objects by UUID or mail ID. Provides direct access to task objects and service-specific data without iteration. """ + SENTINEL = "__DEL__SENTINEL__" def __init__(self, redis_handler=None): """ @@ -28,7 +32,7 @@ class ServiceTaskRetriever: else: self.redis_handler = RedisHandler() - self.redis_client = self.redis_handler.redis_client + self.redis_client: Redis = self.redis_handler.redis_client self.redis_prefix = ConfigServices.MAIN_TASK_PREFIX self.mailid_index_key = ConfigServices.TASK_MAILID_INDEX_PREFIX self.uuid_index_key = ConfigServices.TASK_UUID_INDEX_PREFIX @@ -89,13 +93,13 @@ class ServiceTaskRetriever: task_uuid: UUID of the task index: Index of the task """ - already_dict = self.redis_handler.get(self.mailid_index_key) + already_dict = self.redis_handler.get(self.uuid_index_key) if already_dict: already_dict = loads(already_dict) already_dict[str(task_uuid)] = index - self.redis_handler.set(self.mailid_index_key, dumps(already_dict)) + self.redis_handler.set(self.uuid_index_key, dumps(already_dict)) else: - self.redis_handler.set(self.mailid_index_key, dumps({str(task_uuid): index})) + self.redis_handler.set(self.uuid_index_key, dumps({str(task_uuid): index})) def set_index_mail_id(self, mail_id: str, index: int): """ @@ -135,6 +139,65 @@ class ServiceTaskRetriever: if get_index_by_mail_id := self.get_index_by_mail_id(mail_id): self.set_index_uuid(task_uuid, get_index_by_mail_id) + def delete_task(self, task_uuid: str, max_retries: int = 20, base_sleep: float = 0.01): + """ + Delete a task object by its UUID + Args: + task_uuid: UUID of the task + max_retries: Maximum number of retries + """ + for attempt in range(max_retries): + try: + with self.redis_client.pipeline() as pipe: + pipe.watch(ConfigServices.MAIN_TASK_PREFIX, ConfigServices.TASK_UUID_INDEX_PREFIX, ConfigServices.TASK_MAILID_INDEX_PREFIX) + raw_uuid = pipe.get(ConfigServices.TASK_UUID_INDEX_PREFIX) + raw_mail = pipe.get(ConfigServices.TASK_MAILID_INDEX_PREFIX) + llen = pipe.llen(ConfigServices.MAIN_TASK_PREFIX) + if not llen: + pipe.unwatch() + return False + uuid_map = loads(raw_uuid.decode()) if raw_uuid else {} + mail_map = loads(raw_mail.decode()) if raw_mail else {} + if task_uuid not in uuid_map: + pipe.unwatch() + return False + index = int(uuid_map[task_uuid]) + if index < 0: + index = int(llen) + index + if index < 0 or index >= int(llen): + pipe.unwatch() + return False + uuid_key_to_del = next((k for k, v in uuid_map.items() if int(v) == index), None) + mail_key_to_del = next((k for k, v in mail_map.items() if int(v) == index), None) + dup_uuid_count = sum(1 for v in uuid_map.values() if int(v) == index) + dup_mail_count = sum(1 for v in mail_map.values() if int(v) == index) + if dup_uuid_count > 1: + pass + if dup_mail_count > 1: + pass + if uuid_key_to_del is not None: + uuid_map.pop(uuid_key_to_del, None) + if mail_key_to_del is not None: + mail_map.pop(mail_key_to_del, None) + for k, v in list(uuid_map.items()): + if int(v) > index: uuid_map[k] = int(v) - 1 + for k, v in list(mail_map.items()): + if int(v) > index: mail_map[k] = int(v) - 1 + sentinel = f"__DEL__{uuid4()}__" + pipe.multi() + pipe.lset(ConfigServices.MAIN_TASK_PREFIX, index, sentinel) + pipe.lrem(ConfigServices.MAIN_TASK_PREFIX, 1, sentinel) + pipe.set(ConfigServices.TASK_UUID_INDEX_PREFIX, dumps(uuid_map)) + pipe.set(ConfigServices.TASK_MAILID_INDEX_PREFIX, dumps(mail_map)) + pipe.execute() + mail_key_to_del = int(mail_key_to_del) + self.redis_client.sadd(ConfigServices.TASK_DELETED_PREFIX, mail_key_to_del) + return True + except (WatchError, ResponseError): + sleep(base_sleep * (1.5 ** attempt)) + continue + return False + def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject: """ Get a task object directly by its UUID without iteration @@ -248,7 +311,7 @@ class ServiceTaskRetriever: self._validate_service_name(service_name) # Create new RedisData with proper defaults for all services - data_dict = {'MailReader': None, 'MailParser': [], 'FinderIban': {}, 'FinderComment': {}} + data_dict = {'MailReader': None, 'MailParser': [], 'FinderIban': [], 'FinderComment': []} # Set the actual service data data_dict['MailReader'] = mail_reader data_dict['MailParser'] = mail_parser @@ -268,7 +331,7 @@ class ServiceTaskRetriever: ) # Convert to dict for serialization - write_object = write_object.model_dump() + write_object = write_object.dict() # Push new task to Redis list redis_write_ = self.redis_client.rpush(self.redis_prefix, dumps(write_object)) @@ -428,7 +491,6 @@ class ServiceTaskRetriever: if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)): raise ValueError(f"Failed to write updated task data for UUID {task_uuid}") return True - def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool: """ @@ -655,6 +717,23 @@ class MailReaderService: """ return self.service_retriever.update_task_status(task_uuid, is_completed, status) + def change_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False): + """ + Change the service of a task by UUID + + Args: + task_uuid: UUID of the task to update + service_name: Name of the service to update + + Returns: + bool: True if successful + + Raises: + FileNotFoundError: If the task is not found + ValueError: If the update fails + """ + return self.service_retriever.update_task_service(task_uuid, service_name, status, completed) + def process_mail(self, mail_id: str, mail_data: dict, service_prefix: str, counter: int) -> dict: """ Process mail data and store it in Redis @@ -675,20 +754,42 @@ class MailReaderService: attachments = mail_without_attachments.pop('attachments', []) create_task = dict(task_uuid=task_uuid, service_name=service_prefix, mail_reader=mail_without_attachments, mail_parser=attachments) self.service_retriever.create_task_with_uuid(**create_task) - return {'status': 'success', 'action': 'stored_new_mail', 'counter': counter} - else: - try: - task = self.service_retriever.get_task_by_mail_id(mail_id) - if task is None and task.data and task.data.MailReader: - stored_id = task.data.MailReader.id - if stored_id != mail_id: - return {'status': 'error', 'action': 'id_mismatch', 'stored_id': stored_id} - return {'status': 'success', 'action': 'checked_existing_mail', 'is_completed': task.is_completed if task else False} - except FileNotFoundError: - return {'status': 'error', 'action': 'not_found', 'error': f'Mail with ID {mail_id} not found in index'} + return task_uuid, counter except Exception as e: logger.error(f"Mail Reader Service Error processing mail {mail_id}: {str(e)}") - return {'status': 'error', 'action': 'exception', 'error': str(e)} + return None, counter + + def pop_mail(self, mail_id: str): + """ + Pop a mail from Redis + + Args: + mail_id: ID of the mail to pop + + Returns: + bool: True if successful + + Raises: + FileNotFoundError: If the mail is not found + ValueError: If the pop fails + """ + try: + if self.redis_handler.ismember(f'{ConfigServices.TASK_SEEN_PREFIX}', int(mail_id)): + self.redis_handler.srem(f'{ConfigServices.TASK_SEEN_PREFIX}', int(mail_id)) + return True + return False + except Exception as e: + logger.error(f"Mail Reader Service Error popping mail {int(mail_id)}: {str(e)}") + return False + + def check_mail_is_ready_to_delete(self, mail_id: str): + try: + if self.redis_handler.ismember(f'{ConfigServices.TASK_DELETED_PREFIX}', int(mail_id)): + return True + return False + except Exception as e: + logger.error(f"Mail Reader Service Error checking mail {int(mail_id)}: {str(e)}") + return False class MailParserService: @@ -832,4 +933,228 @@ class MailParserService: """ Update the status of a task by UUID """ - return self.service_retriever.update_task_status(task_uuid, is_completed, status) \ No newline at end of file + return self.service_retriever.update_task_status(task_uuid, is_completed, status) + + +class IbanFinderService: + """ + Iban Finder Service + """ + + # Singleton instance + _instance = None + REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS + + def __init__(self): + if hasattr(self, '_initialized') and self._initialized: + return + self.service_retriever = ServiceTaskRetriever() + self._initialized = True + + def fetch_all_tasks(self) -> list[RedisTaskObject]: + return self.service_retriever.fetch_all_tasks() + + def ensure_connection(self): + """ + Ensure Redis connection is established + + Returns: + bool: True if connection is established, False otherwise + """ + return self.redis_handler.ensure_connection() + + def _check_redis_connection(self) -> bool: + """ + Check if Redis connection is alive using RedisHandler + + Returns: + True if connection is alive, False otherwise + """ + try: + # Use RedisHandler to check connection + connection_status = self.redis_handler.ensure_connection() + if connection_status: + logger.info("Redis connection established via RedisHandler") + else: + logger.error("Redis connection check failed via RedisHandler") + return connection_status + except RedisHandler.REDIS_EXCEPTIONS as e: + logger.error(f"Redis connection failed: {str(e)}") + return False + + def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject: + """ + Get a task object by its UUID + + Args: + task_uuid: UUID of the task to retrieve + + Returns: + RedisTaskObject: The task object if found + + Raises: + FileNotFoundError: If the UUID index or task is not found + """ + return self.service_retriever.get_task_by_uuid(task_uuid) + + def get_service_data_by_uuid(self, task_uuid: str, service_name: str): + """ + Get service-specific data from a task by UUID + + Args: + task_uuid: UUID of the task + service_name: Name of the service to extract data for + + Returns: + Any: Service-specific data if found + + Raises: + FileNotFoundError: If the task or service data is not found + """ + return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name) + + def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool: + """ + Update service-specific data in a task by UUID + + Args: + task_uuid: UUID of the task to update + service_name: Name of the service data to update + service_data: New service data + + Returns: + bool: True if successful + + Raises: + FileNotFoundError: If the task is not found + ValueError: If the update fails or service name is invalid + """ + return self.service_retriever.update_service_data(task_uuid, service_name, service_data) + + def change_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False) -> bool: + """ + Update the service of a task by UUID + """ + return self.service_retriever.update_task_service(task_uuid, service_name, status, completed) + + def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool: + """ + Update the status of a task by UUID + """ + return self.service_retriever.update_task_status(task_uuid, is_completed, status) + + +class ProcessCommentFinderService: + """ + Process Comment Finder Service + """ + + # Singleton instance + _instance = None + REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS + + def __init__(self): + if hasattr(self, '_initialized') and self._initialized: + return + self.service_retriever = ServiceTaskRetriever() + self._initialized = True + + def fetch_all_tasks(self) -> list[RedisTaskObject]: + return self.service_retriever.fetch_all_tasks() + + def ensure_connection(self): + """ + Ensure Redis connection is established + + Returns: + bool: True if connection is established, False otherwise + """ + return self.redis_handler.ensure_connection() + + def _check_redis_connection(self) -> bool: + """ + Check if Redis connection is alive using RedisHandler + + Returns: + True if connection is alive, False otherwise + """ + try: + # Use RedisHandler to check connection + connection_status = self.redis_handler.ensure_connection() + if connection_status: + logger.info("Redis connection established via RedisHandler") + else: + logger.error("Redis connection check failed via RedisHandler") + return connection_status + except RedisHandler.REDIS_EXCEPTIONS as e: + logger.error(f"Redis connection failed: {str(e)}") + return False + + def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject: + """ + Get a task object by its UUID + + Args: + task_uuid: UUID of the task to retrieve + + Returns: + RedisTaskObject: The task object if found + + Raises: + FileNotFoundError: If the UUID index or task is not found + """ + return self.service_retriever.get_task_by_uuid(task_uuid) + + def get_service_data_by_uuid(self, task_uuid: str, service_name: str): + """ + Get service-specific data from a task by UUID + + Args: + task_uuid: UUID of the task + service_name: Name of the service to extract data for + + Returns: + Any: Service-specific data if found + + Raises: + FileNotFoundError: If the task or service data is not found + """ + return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name) + + def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool: + """ + Update service-specific data in a task by UUID + + Args: + task_uuid: UUID of the task to update + service_name: Name of the service data to update + service_data: New service data + + Returns: + bool: True if successful + + Raises: + FileNotFoundError: If the task is not found + ValueError: If the update fails or service name is invalid + """ + return self.service_retriever.update_service_data(task_uuid, service_name, service_data) + + def change_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False) -> bool: + """ + Update the service of a task by UUID + """ + return self.service_retriever.update_task_service(task_uuid, service_name, status, completed) + + def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool: + """ + Update the status of a task by UUID + """ + return self.service_retriever.update_task_status(task_uuid, is_completed, status) + + def delete_task(self, task_uuid: str, max_retries: int = 5): + """ + Delete a task object by its UUID + """ + return self.service_retriever.delete_task(task_uuid, max_retries) + + diff --git a/ServicesRunner/Depends/template_accounts.html b/ServicesRunner/Depends/template_accounts.html new file mode 100644 index 0000000..67c03d1 --- /dev/null +++ b/ServicesRunner/Depends/template_accounts.html @@ -0,0 +1,54 @@ + + + + + + Gelen Banka Kayıtları + + + +

Günaydın, Admin

+
+

Banka Kayıtları : {{today}}

+

Son Bakiye : {{bank_balance}}

+

{{"Status : İkinci Bakiye Hatalı" if balance_error else "Status :OK"}}

+ + + + {% for header in headers %} + + {% endfor %} + + + + {% for row in rows %} + + {% for cell in row %} + + {% endfor %} + + {% endfor %} + +
{{ header }}
{{ cell }}
+

Teşekkür ederiz,
Evyos Yönetim
Saygılarımızla

+ + \ No newline at end of file diff --git a/ServicesRunner/requirements.txt b/ServicesRunner/requirements.txt index 7309a58..d5ad949 100644 --- a/ServicesRunner/requirements.txt +++ b/ServicesRunner/requirements.txt @@ -1,4 +1,6 @@ prisma==0.9.1 -python-dotenv==1.0.0 asyncio==3.4.3 -uvloop>=0.19 \ No newline at end of file +uvloop>=0.19 +redis>=6.4.0 +unidecode>=1.3.8 +arrow>=1.3.0 \ No newline at end of file diff --git a/docker-compose.bank.yml b/docker-compose.bank.yml index f2e29a5..0287c54 100644 --- a/docker-compose.bank.yml +++ b/docker-compose.bank.yml @@ -21,6 +21,26 @@ services: # timeout: 10s # retries: 3 + # prisma_studio: + # image: node:18 + # working_dir: /app + # # volumes: + # # - ./ServicesRunner/Depends:/app + # ports: + # - "5555:5555" + # entrypoint: [ "/bin/sh", "-c" ] + # command: | + # "npx prisma studio --schema=/app/schema.prisma" + # depends_on: + # - prisma_service_test + # networks: + # - bank-services-network + # logging: + # driver: "json-file" + # options: + # max-size: "10m" + # max-file: "3" + prisma_service_iban: container_name: prisma_service_iban build: @@ -42,39 +62,27 @@ services: timeout: 10s retries: 3 - # prisma_studio: - # image: node:18 - # working_dir: /app - # # volumes: - # # - ./ServicesRunner/Depends:/app - # ports: - # - "5555:5555" - # entrypoint: [ "/bin/sh", "-c" ] - # command: | - # "npx prisma studio --schema=/app/schema.prisma" - # depends_on: - # - prisma_service_test - # networks: - # - bank-services-network - # logging: - # driver: "json-file" - # options: - # max-size: "10m" - # max-file: "3" - # finder_payments: - # container_name: finder_payments - # env_file: - # - api_env.env - # build: - # context: . - # dockerfile: ServicesBank/Finder/Payment/Dockerfile - # networks: - # - bank-services-network - # logging: - # driver: "json-file" - # options: - # max-size: "10m" - # max-file: "3" + prisma_service_process_comment: + container_name: prisma_service_process_comment + build: + context: . + dockerfile: ServicesRunner/AccountRecordServices/Finder/Comment/Dockerfile + networks: + - bank-services-network + volumes: + - ./ServicesRunner/AccountRecordServices/Finder/Comment/venv:/opt/venv + - ./ServicesRunner/AccountRecordServices/Finder/Comment/.prisma-cache:/root/.cache/prisma-python + restart: on-failure + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" + healthcheck: + test: [ "CMD", "/opt/venv/bin/python", "-c", "import asyncio; from ServicesRunner.Depends.prisma_client import get_prisma_client; asyncio.run(get_prisma_client())" ] + interval: 15s + timeout: 10s + retries: 3 isbank_email_reader: container_name: isbank_email_reader @@ -112,6 +120,22 @@ services: options: max-size: "10m" max-file: "3" + # finder_payments: + # container_name: finder_payments + # env_file: + # - api_env.env + # build: + # context: . + # dockerfile: ServicesBank/Finder/Payment/Dockerfile + # networks: + # - bank-services-network + # logging: + # driver: "json-file" + # options: + # max-size: "10m" + # max-file: "3" + + networks: bank-services-network: