import time import arrow import os import json import base64 from uuid import uuid4 from datetime import datetime, timedelta from typing import TypeVar from BankServices.ServiceDepends.config import Config from redbox import EmailBox from redbox.query import FROM, UNSEEN, OR, SINCE # Import Redis pub/sub handler from Controllers.Redis.Broadcast.actions import redis_pubsub authorized_iban = Config.AUTHORIZE_IBAN authorized_iban_cleaned = authorized_iban.replace("-", "") # Define Redis channel name REDIS_CHANNEL = "reader" delimiter = "|" # banks_mails = mailbox.search(from_=filter_mail, unseen=True) bununla denemeyin # banks_mails = mailbox.search(FROM(filter_mail) & UNSEEN) T = TypeVar("T") class EmailProcessingContext: """Context manager for email processing that marks emails as unread if an error occurs.""" def __init__(self, email_message, mark_as_read: bool = True): self.email_message = email_message self.mark_as_read = mark_as_read self.success = False def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None or not self.success: # If an exception occurred or processing wasn't successful, mark as unread try: if hasattr(self.email_message, 'mark_as_unread'): self.email_message.mark_as_unread() print(f"[EMAIL_SERVICE] Marked email as UNREAD due to processing error: {exc_val if exc_val else 'Unknown error'}") except Exception as e: print(f"[EMAIL_SERVICE] Failed to mark email as unread: {str(e)}") elif self.mark_as_read: # If processing was successful and mark_as_read is True, ensure it's marked as read try: if hasattr(self.email_message, 'mark_as_read'): self.email_message.mark_as_read() except Exception as e: print(f"[EMAIL_SERVICE] Failed to mark email as read: {str(e)}") return False # Don't suppress exceptions def publish_payload_to_redis( payload, filename: str, mail_info: dict ) -> bool: # Create message document # Use base64 encoding for binary payloads to ensure proper transmission if isinstance(payload, bytes): encoded_payload = base64.b64encode(payload).decode('utf-8') is_base64 = True else: encoded_payload = payload is_base64 = False message = { "filename": filename, "payload": encoded_payload, "is_base64": is_base64, # Flag to indicate if payload is base64 encoded "stage": "red", # Initial stage for the processing chain "created_at": str(arrow.now()), "uuid": str(uuid4()), # Use UUID **mail_info, } # Publish to Redis channel result = redis_pubsub.publisher.publish(REDIS_CHANNEL, message) if result.status: print(f"[EMAIL_SERVICE] Published message with filename: {filename} to channel: {REDIS_CHANNEL}") return True else: print(f"[EMAIL_SERVICE] Publish error: {result.error}") return False def read_email_and_publish_to_redis(email_message, mail_info: dict) -> bool: if email_message.is_multipart(): # Check if email has multipart content for part in email_message.walk(): # Each part can be an attachment content_disposition = part.get("Content-Disposition") if content_disposition and "attachment" in content_disposition: if filename := part.get_filename(): is_iban_in_filename = authorized_iban_cleaned in str(filename) if is_iban_in_filename: if payload := part.get_payload(decode=True): return publish_payload_to_redis( payload=payload, filename=filename, mail_info=mail_info, ) else: # Handle non-multipart email, though this is rare for emails with attachments content_disposition = email_message.get("Content-Disposition") if content_disposition and "attachment" in content_disposition: if filename := email_message.get_filename(): is_iban_in_filename = authorized_iban_cleaned in str(filename) if is_iban_in_filename: payload = email_message.get_payload(decode=True) return publish_payload_to_redis( payload=payload, filename=filename, mail_info=mail_info, ) return False def app(): # Get email configuration host = Config.EMAIL_HOST port = Config.EMAIL_PORT username = Config.EMAIL_USERNAME password = Config.EMAIL_PASSWORD box = EmailBox(host=host, port=port, username=username, password=password) if not box: return Exception("Mailbox not found") box.connect() mail_folders = box.mailfolders filter_mail = OR(FROM(Config.MAILBOX), FROM(Config.MAIN_MAIL)) filter_print = f"{Config.MAILBOX} & {Config.MAIN_MAIL}" # Determine if this is the first run of the day # Store last run date in a file last_run_file = "/tmp/email_service_last_run.json" current_date = datetime.now().strftime("%Y-%m-%d") days_to_check, full_check = 7, 90 # Default to 7 days try: if os.path.exists(last_run_file): with open(last_run_file, 'r') as f: last_run_data = json.load(f) last_run_date = last_run_data.get('last_run_date') # If this is the first run of a new day, check 90 days if last_run_date != current_date: days_to_check = full_check print(f"[EMAIL_SERVICE] First run of the day. Checking emails from the past {days_to_check} days") else: print(f"[EMAIL_SERVICE] Subsequent run today. Checking emails from the past {days_to_check} days") else: # If no last run file exists, this is the first run ever - check 90 days days_to_check = full_check print(f"[EMAIL_SERVICE] First run detected. Checking emails from the past {days_to_check} days") except Exception as e: print(f"[EMAIL_SERVICE] Error reading last run file: {str(e)}. Using default of {days_to_check} days") # Update the last run file try: with open(last_run_file, 'w') as f: json.dump({'last_run_date': current_date}, f) except Exception as e: print(f"[EMAIL_SERVICE] Error writing last run file: {str(e)}") # Calculate the date to check from check_since_date = (datetime.now() - timedelta(days=days_to_check)).strftime("%d-%b-%Y") for folder in mail_folders: if folder.name == "INBOX": # Search for emails since the calculated date banks_mails = folder.search(filter_mail & SINCE(check_since_date)) print( f"[EMAIL_SERVICE] Reading mailbox [{username}] with mail sender [{filter_print}] since {check_since_date} with count: {len(banks_mails)}" ) for banks_mail in banks_mails or []: if email_message := banks_mail.email: # Use context manager to handle errors and mark email as unread if needed with EmailProcessingContext(banks_mail) as ctx: try: headers = {k.lower(): v for k, v in banks_mail.headers.items()} mail_info = { "from": headers["from"], "to": headers["to"], "subject": headers["subject"], "date": str(headers["date"]), } # Process the email and publish to Redis success = read_email_and_publish_to_redis( email_message=email_message, mail_info=mail_info ) # Set success flag for the context manager ctx.success = success if success: print(f"[EMAIL_SERVICE] Successfully processed email with subject: {mail_info['subject']}") else: print(f"[EMAIL_SERVICE] No matching attachments found in email with subject: {mail_info['subject']}") except Exception as e: print(f"[EMAIL_SERVICE] Error processing email: {str(e)}") # The context manager will mark the email as unread if __name__ == "__main__": print("=== Starting Email Service with Redis Pub/Sub ===") print(f"Publishing to channel: {REDIS_CHANNEL}") time.sleep(20) # Wait for 20 seconds to other services to kick in while True: print("\n[EMAIL_SERVICE] Checking for new emails...") app() time.sleep(Config.EMAIL_SLEEP)