import os import socket import logging from functools import wraps from base64 import b64encode from time import sleep from datetime import datetime from typing import List, Dict, Any, Union, TypeVar, Tuple from email.message import EmailMessage from email.policy import default as policy from email.headerregistry import UniqueDateHeader, UniqueAddressHeader, UniqueUnstructuredHeader from email.parser import BytesParser from imaplib import IMAP4_SSL, IMAP4 from Depends.redis_handlers import RedisHandler from Depends.config import EmailConfig, MailReaderMainConfig, MailReader, MailParser, RedisData from Depends.service_handler import MailReaderService # Configure logging logger = logging.getLogger('Email Reader Service') # Type variable for generic function return types T = TypeVar('T') def retry_on_connection_error(max_retries: int = 3, delay: int = 5, backoff: int = 2, exceptions=(Exception,)): """ Retry decorator with exponential backoff for handling connection errors Args: max_retries: Maximum number of retries delay: Initial delay between retries in seconds backoff: Backoff multiplier exceptions: Tuple of exceptions to catch Returns: Decorated function """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): mtries, mdelay = max_retries, delay while mtries > 0: try: return func(*args, **kwargs) except exceptions as e: logger.warning(f"Connection error in {func.__name__}: {str(e)}, retrying in {mdelay}s...") sleep(mdelay) mtries -= 1 mdelay *= backoff # Final attempt return func(*args, **kwargs) return wrapper return decorator class Mails: """Class representing an email with attachments and metadata""" def __init__(self, mail_id: bytes, mail_data: bytes): """ Initialize a mail object Args: mail_id: Unique identifier for the email mail_data: Raw email data """ self.id: bytes = mail_id self.raw_data: bytes = mail_data self.attachments: List[Dict[str, Union[str, bytes]]] = [] self.message: EmailMessage = BytesParser(policy=policy).parsebytes(mail_data) self.subject: UniqueUnstructuredHeader = self.message.get('Subject', '') or '' self.from_: UniqueAddressHeader = self.message.get('From', '') or '' self.to: UniqueAddressHeader = self.message.get('To', '') or '' self.date: UniqueDateHeader = self.message.get('Date', '') or '' self.body_text: str = self._get_body_text() self._extract_attachments() def to_dict(self) -> Dict[str, Any]: """ Convert mail object to dictionary representation Returns: Dictionary representation of mail """ return { 'id': self.id.decode('utf-8'), 'attachments': [{ 'filename': attachment['filename'], 'content_type': attachment['content_type'], 'charset': attachment['charset'], 'data': b64encode(attachment['data']).decode(attachment['charset'], errors='replace') } for attachment in self.attachments], 'subject': str(self.subject), 'from_': { "display_name": self.from_.addresses[0].display_name, "username": self.from_.addresses[0].username, "domain": self.from_.addresses[0].domain, "mail": f"{self.from_.addresses[0].username}@{self.from_.addresses[0].domain}" }, 'to': [ { "display_name": address.display_name, "username": address.username, "domain": address.domain, "mail": f"{address.username}@{address.domain}" } for address in self.to.addresses ], 'date': str(self.date.datetime), 'body_text': str(self.body_text) } def _get_body_text(self) -> str: """ Extract plain text body from email Returns: Plain text body of email """ body = self.message.get_body(preferencelist=('plain',)) if body is not None: return body.get_content() or '' if self.message.is_multipart(): for part in self.message.walk(): if part.get_content_type() == 'text/plain' and (part.get_content_disposition() or '') != 'attachment': try: return part.get_content() or '' except Exception: payload = part.get_payload(decode=True) or b'' return payload.decode(part.get_content_charset() or 'utf-8', errors='replace') else: if self.message.get_content_type() == 'text/plain': try: return self.message.get_content() or '' except Exception: payload = self.message.get_payload(decode=True) or b'' return payload.decode(self.message.get_content_charset() or 'utf-8', errors='replace') return '' def _extract_attachments(self) -> None: """Extract attachments from email""" for part in self.message.walk(): if part.get_content_disposition() == 'attachment': filename = part.get_filename() if not filename: continue data = part.get_payload(decode=True) or b'' charset = part.get_charset() or 'utf-8' self.attachments.append({'filename': filename, 'content_type': part.get_content_type(), 'data': data, 'charset': charset}) def save_attachments(self, folder: str) -> None: """ Save attachments to folder Args: folder: Folder to save attachments to """ os.makedirs(folder, exist_ok=True) for att in self.attachments: with open(os.path.join(folder, att['filename']), 'wb') as f: f.write(att['data']) class EmailReaderService: """Service for reading emails from mailbox with improved connection resilience""" def __init__(self, config: MailReaderMainConfig): """ Initialize email reader service Args: config: Application configuration """ self.email_config = EmailConfig() self.config = config self.mail = None self.data: List[Mails] = [] self.mail_count = 0 self.is_connected = False self.connect_imap() def connect_imap(self) -> bool: """ Establish IMAP connection with retry mechanism Returns: True if connection successful, False otherwise """ try: if self.mail: # Try to close existing connection if any try: self.mail.close() self.mail.logout() except Exception: pass logger.info(f"Connecting to IMAP server {self.email_config.HOST}:{self.email_config.PORT}") self.mail = IMAP4_SSL(self.email_config.HOST, self.email_config.PORT) self.is_connected = True return True except (socket.error, IMAP4.error) as e: logger.error(f"Failed to connect to IMAP server: {str(e)}") self.is_connected = False return False @retry_on_connection_error(max_retries=3, delay=5, exceptions=(socket.error, IMAP4.error, OSError)) def login_and_connect(self) -> bool: """ Login to IMAP server and connect to inbox with retry mechanism Returns: True if login successful, False otherwise Raises: ConnectionError: If connection cannot be established """ if not self.is_connected: if not self.connect_imap(): raise ConnectionError("Cannot establish connection to IMAP server") try: logger.info(f"Logging in as {self.email_config.USERNAME}") self.mail.login(self.email_config.USERNAME, self.email_config.PASSWORD) self._connect_inbox() logger.info("Successfully logged in and connected to inbox") return True except (socket.error, IMAP4.error) as e: logger.error(f"Login failed: {str(e)}") self.is_connected = False raise @retry_on_connection_error(max_retries=2, delay=3, exceptions=(socket.error, IMAP4.error, OSError)) def refresh(self) -> Tuple[List[Mails], int, int]: """ Refresh mail data with connection retry Returns: Tuple of (mail data, mail count, data length) """ try: self.mail_count = self._fetch_count() self.data = self._fetch_all() return self.data, self.mail_count, len(self.data) except (socket.error, IMAP4.error) as e: logger.error(f"Refresh failed, attempting to reconnect: {str(e)}") self.connect_imap() self.login_and_connect() self.mail_count = self._fetch_count() self.data = self._fetch_all() return self.data, self.mail_count, len(self.data) @retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error)) def _connect_inbox(self) -> None: """ Connect to INBOX with retry mechanism Raises: IMAP4.error: If connection to INBOX fails """ logger.info("Selecting INBOX folder") status, _ = self.mail.select("INBOX") if status != 'OK': error_msg = "Failed to connect to INBOX" logger.error(error_msg) raise IMAP4.error(error_msg) @retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error)) def _fetch_count(self) -> int: """ Fetch mail count with retry mechanism Returns: Number of emails Raises: IMAP4.error: If fetching mail count fails """ try: status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"') if status != 'OK': raise IMAP4.error("Failed to get mail count") count = len(uids[0].split()) if uids[0] else 0 logger.info(f"Found {count} emails from {self.config.MAILBOX}") return count except (socket.error, IMAP4.error) as e: logger.error(f"Error fetching mail count: {str(e)}") raise @retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error)) def _fetch_all(self) -> List[Mails]: """ Fetch all mails with retry mechanism Returns: List of mail objects Raises: IMAP4.error: If fetching mails fails """ self.data = [] try: status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"') if status != 'OK': raise IMAP4.error("Mail search failed") if not uids[0]: logger.info("No emails found matching criteria") return self.data uid_list = uids[0].split() logger.info(f"Processing {len(uid_list)} emails") for uid in uid_list: try: status, msg_data = self.mail.uid('fetch', uid, '(RFC822)') if status == 'OK' and msg_data[0] is not None: self.data.append(Mails(uid, msg_data[0][1])) except Exception as e: logger.warning(f"Failed to fetch email with UID {uid}: {str(e)}") continue logger.info(f"Successfully fetched {len(self.data)} emails") return self.data except (socket.error, IMAP4.error) as e: logger.error(f"Error fetching emails: {str(e)}") raise @retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error)) def mark_no_attachment(self, uid): """ Move message to no attachment folder with retry mechanism Args: uid: Email UID """ try: # Handle both string and bytes types for logging log_uid = uid if isinstance(uid, bytes): log_uid = uid.decode('utf-8', errors='replace') logger.info(f"Moving email {log_uid} to {self.config.NO_ATTACHMENT_FOLDER} folder") self.mail.uid('COPY', uid, self.config.NO_ATTACHMENT_FOLDER) self.delete(uid) except Exception as e: logger.error(f"Failed to mark email as no attachment: {str(e)}") raise @retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error)) def mark_completed(self, uid: bytes): """ Move message to completed folder with retry mechanism Args: uid: Email UID """ try: logger.info(f"Moving email {uid.decode('utf-8', errors='replace')} to {self.config.COMPLETED_FOLDER} folder") self.mail.uid('COPY', uid, self.config.COMPLETED_FOLDER) # self.delete(uid) except Exception as e: logger.error(f"Failed to mark email as completed: {str(e)}") raise @retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error)) def delete(self, uid): """ Delete message with retry mechanism Args: uid: Email UID """ try: # Handle both string and bytes types log_uid = uid if isinstance(uid, bytes): log_uid = uid.decode('utf-8', errors='replace') logger.info(f"Marking email {log_uid} for deletion") self.mail.uid('STORE', uid, '+FLAGS', r'(\Deleted)') except Exception as e: logger.error(f"Failed to delete email: {str(e)}") raise @retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error)) def commit(self): """ Commit pending operations with retry mechanism Raises: Exception: If commit fails """ try: logger.info("Committing changes (expunge)") self.mail.expunge() except Exception as e: logger.error(f"Failed to commit changes: {str(e)}") raise def logout(self): """Logout from IMAP server""" if self.mail and self.is_connected: try: logger.info("Logging out from IMAP server") self.mail.close() self.mail.logout() self.is_connected = False except Exception as e: logger.warning(f"Logout failed: {str(e)}") @property def count(self): """Get count of emails""" return len(self.data) class EmailServiceRunner: """Runner for email service with improved resilience to connection issues""" def __init__(self, redis_handler: MailReaderService, email_service: EmailReaderService) -> None: """ Initialize email service runner Args: redis_handler: Redis handler for Redis operations email_service: Email service for email operations """ # Use MailReaderService singleton for Redis operations self.redis_handler = redis_handler self.email_service = email_service self.mails = None self.count = 0 self.counter = 0 self.mail_count = 0 self.redis_connected = self._check_redis_connection() def _check_redis_connection(self) -> bool: """ Check if Redis connection is alive using MailReaderService Returns: True if connection is alive, False otherwise """ try: # Use MailReaderService to check connection connection_status = self.redis_handler.ensure_connection() if connection_status: logger.info("Redis connection established via MailReaderService") else: logger.error("Redis connection check failed via MailReaderService") return connection_status except MailReaderService.REDIS_EXCEPTIONS as e: logger.error(f"Redis connection failed: {str(e)}") return False @retry_on_connection_error(max_retries=3, delay=5, exceptions=MailReaderService.REDIS_EXCEPTIONS) def _ensure_redis_connection(self) -> bool: """ Ensure Redis connection is established using MailReaderService Returns: True if connection is established, False otherwise """ if not self.redis_connected: try: self.redis_connected = self.redis_handler.ensure_connection() if not self.redis_connected: raise RedisHandler.REDIS_EXCEPTIONS[0]("Failed to establish Redis connection") except Exception as e: self.redis_connected = False raise return self.redis_connected def fetch_and_set_mails(self): """ Fetch and process emails with improved error handling Raises: Exception: If fetching or processing emails fails """ try: # Reset counters self.count = 0 self.counter = 0 # Ensure Redis connection before proceeding self._ensure_redis_connection() # Refresh email data try: self.mails, self.mail_count, self.count = self.email_service.refresh() except Exception as e: logger.error(f"Failed to refresh emails: {str(e)}") raise # Process each email for mail in self.mails: if not getattr(mail, 'id', None): logger.warning("Skipping email with no ID") continue mail_id = mail.id.decode('utf-8') # check mail has .pdf extension mail_dict = mail.to_dict() try: if mail.attachments: if any([attachment['filename'].lower().endswith('.pdf') for attachment in mail_dict['attachments']]): self.email_service.mark_no_attachment(mail_id) else: self._process_mail_with_attachments(mail, mail_id) else: self.email_service.mark_no_attachment(mail_id) except Exception as e: logger.error(f"Error processing email {mail_id}: {str(e)}") continue try: self.email_service.commit() except Exception as e: logger.error(f"Failed to commit email changes: {str(e)}") timestamp = f"TIMESTAMP: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" logger.info(f"Fetched and processed emails. Read/Total: {self.counter}/{self.count}. {self.count - self.counter} already saved. Mail count: {self.mail_count}") print(f"{timestamp} Mail Reader Service | Fetching and setting mails. Read / Total Mails: {self.counter}/{self.count}. {self.count - self.counter} already saved in redis. Mail Server mail count: {self.mail_count}") except MailReaderService.REDIS_EXCEPTIONS as e: logger.error(f"Redis error in fetch_and_set_mails: {str(e)}") self.redis_connected = False raise except Exception as e: logger.error(f"Unexpected error in fetch_and_set_mails: {str(e)}") raise def _process_mail_with_attachments(self, mail: Mails, mail_id: str): """ Process an email with attachments using MailReaderService Args: mail: Mail object mail_id: Mail ID Raises: Exception: If processing mail fails """ try: mail_to_dict = mail.to_dict() result = self.redis_handler.process_mail( mail_id=mail_id, mail_data=mail_to_dict, service_prefix=self.email_service.config.SERVICE_PREFIX, counter=self.counter ) if result['status'] == 'success': if result['action'] == 'stored_new_mail': self.counter = result.get('counter', self.counter) logger.info(f"Successfully processed new email {mail_id}") elif result['action'] == 'checked_existing_mail': if result.get('is_completed', False): logger.info(f"Marking completed email {mail_id}") self.email_service.mark_completed(mail_id) elif result['status'] == 'error': if result['action'] == 'id_mismatch': logger.error(f"Mail ID mismatch: {mail_id} != {result.get('stored_id')}") raise ValueError("Mail id does not match with id from Redis") else: logger.error(f"Email Service Runner Error processing mail {mail_id}: {result.get('error', 'Unknown error')}") raise Exception(result.get('error', 'Unknown error during mail processing')) else: logger.warning(f"Unexpected result status: {result['status']} for mail {mail_id}") except MailReaderService.REDIS_EXCEPTIONS as e: logger.error(f"Redis error while processing mail {mail_id}: {str(e)}") self.redis_connected = False raise except Exception as e: logger.error(f"Email Service Runner Error processing mail {mail_id}: {str(e)}") raise def drop(self): """Clean up resources""" try: self.email_service.commit() except Exception as e: logger.warning(f"Error during commit on drop: {str(e)}") try: self.email_service.logout() except Exception as e: logger.warning(f"Error during logout on drop: {str(e)}")