import os import asyncio from app.services.mail.IsBank.runner import initialize_service from app.services.mail.mail_handler import Mails from app.services.mail.IsBank.params import IsBankConfig from app.services.common.service_base_async import ServiceBaseAsync from app.services.types.queue import Enqueue from app.services.types.mail import MailParsedResult, ProcessMailObject from app.services.types.task import Job PRODUCE_BURST = int(os.getenv("PRODUCE_BURST", "10")) PRODUCE_ONCE = os.getenv("PRODUCE_ONCE", "true").lower() == "true" EVENT_TYPE = os.getenv("EVENT_TYPE", "db-event") _produced = False PROCESS_SEC = 10 email_service = initialize_service() def generate_unique_with_mail_id(mail_id: str, service_prefix: str): return f"{service_prefix}_{mail_id}" def process_mail_with_attachments(mail: Mails, mail_id: str, count: int, total: int) -> ProcessMailObject: """ Process an email with attachments using MailReaderService Args: mail: Mail object, mail_id: Mail ID Raises: Exception: If processing mail fails """ try: mail_to_dict = mail.to_dict() task_uuid = generate_unique_with_mail_id(mail_id, IsBankConfig.SERVICE_NAME) return ProcessMailObject(uuid=task_uuid, id=mail_id, data=mail_to_dict, service=email_service.config.SERVICE_PREFIX, count=count, total=total, attachments=mail.attachments) except Exception as e: raise Exception(f"Email Service Runner Error processing mail {mail_id}: {str(e)}") # Isbank producer mail Reader async def produce(svc: ServiceBaseAsync): mails, count, length = email_service.refresh() if not mails: await asyncio.sleep(PROCESS_SEC) return for mail in mails: if not getattr(mail, 'id', None): print(f"Skipping email with no ID: {mail.subject}") continue mail_id, mail_dict = mail.id.decode('utf-8'), mail.to_dict() try: if mail.attachments: is_attachment_pdf = any([str(attachment['filename']).lower().endswith('.pdf') for attachment in mail_dict['attachments']]) if not is_attachment_pdf: process_mail_object = process_mail_with_attachments(mail, mail_id, count, length) enqueue = Enqueue(task_id=process_mail_object.uuid, payload=process_mail_object.model_dump(), action=IsBankConfig.SERVICE_NAME) await svc.enqueue(enqueue) await svc.ack_current() print(f"Mail Consumer from parser with attachments : {mail_id}") continue print(f"Mail Consumer from parser with no attachments : {mail_id}") email_service.mark_no_attachment(mail_id) await svc.ack_current() except Exception as e: print(f"Error processing email {mail_id}: {str(e)}") await svc.retry_current() continue await asyncio.sleep(PROCESS_SEC) async def handle_from_parser(svc: ServiceBaseAsync, job): job_model = Job(**job) await svc.ack_current() print("Mail Consumer from parser :", job_model.model_dump()) await asyncio.sleep(PROCESS_SEC) return async def handle_parser_excel(svc: ServiceBaseAsync, job): job_model = Job(**job) parsed_result = MailParsedResult(**job_model.payload) if parsed_result.send_to == "Completed": print("Mail Consumer from parser excel :", parsed_result.mail_data.id) email_service.mark_completed(parsed_result.mail_data.id) await svc.ack_current() await asyncio.sleep(PROCESS_SEC) return async def consume_default(svc: ServiceBaseAsync, job): job_model = Job(**job) await svc.dlq_current() await asyncio.sleep(PROCESS_SEC) print("Mail Consumer default:", job_model.model_dump()) return if __name__ == "__main__": svc = ServiceBaseAsync(produce, consume_default, handlers={"parser.comment.publish": handle_from_parser, "parser.excel.publish": handle_parser_excel}) asyncio.run(svc.run())