production-evyos-systems-an.../ServicesRunner/Depends/mail_handler.py

607 lines
23 KiB
Python

import os
import socket
import logging
from functools import wraps
from base64 import b64encode
from time import sleep
from datetime import datetime
from typing import List, Dict, Any, Union, TypeVar, Tuple
from email.message import EmailMessage
from email.policy import default as policy
from email.headerregistry import UniqueDateHeader, UniqueAddressHeader, UniqueUnstructuredHeader
from email.parser import BytesParser
from imaplib import IMAP4_SSL, IMAP4
from Depends.redis_handlers import RedisHandler
from Depends.config import ConfigServices, EmailConfig, MailReaderMainConfig, MailReader, MailParser, RedisData, Status
from Depends.service_handler import MailReaderService
# Configure logging
logger = logging.getLogger('Email Reader Service')
# Type variable for generic function return types
T = TypeVar('T')
def retry_on_connection_error(max_retries: int = 3, delay: int = 5, backoff: int = 2, exceptions=(Exception,)):
"""
Retry decorator with exponential backoff for handling connection errors
Args:
max_retries: Maximum number of retries
delay: Initial delay between retries in seconds
backoff: Backoff multiplier
exceptions: Tuple of exceptions to catch
Returns:
Decorated function
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
mtries, mdelay = max_retries, delay
while mtries > 0:
try:
return func(*args, **kwargs)
except exceptions as e:
logger.warning(f"Connection error in {func.__name__}: {str(e)}, retrying in {mdelay}s...")
sleep(mdelay)
mtries -= 1
mdelay *= backoff
# Final attempt
return func(*args, **kwargs)
return wrapper
return decorator
class Mails:
"""Class representing an email with attachments and metadata"""
def __init__(self, mail_id: bytes, mail_data: bytes):
"""
Initialize a mail object
Args:
mail_id: Unique identifier for the email
mail_data: Raw email data
"""
self.id: bytes = mail_id
self.raw_data: bytes = mail_data
self.attachments: List[Dict[str, Union[str, bytes]]] = []
self.message: EmailMessage = BytesParser(policy=policy).parsebytes(mail_data)
self.subject: UniqueUnstructuredHeader = self.message.get('Subject', '') or ''
self.from_: UniqueAddressHeader = self.message.get('From', '') or ''
self.to: UniqueAddressHeader = self.message.get('To', '') or ''
self.date: UniqueDateHeader = self.message.get('Date', '') or ''
self.body_text: str = self._get_body_text()
self._extract_attachments()
def to_dict(self) -> Dict[str, Any]:
"""
Convert mail object to dictionary representation
Returns:
Dictionary representation of mail
"""
return {
'id': self.id.decode('utf-8'),
'attachments': [{
'filename': attachment['filename'],
'content_type': attachment['content_type'],
'charset': attachment['charset'],
'data': b64encode(attachment['data']).decode(attachment['charset'], errors='replace')
} for attachment in self.attachments],
'subject': str(self.subject),
'from_': {
"display_name": self.from_.addresses[0].display_name,
"username": self.from_.addresses[0].username,
"domain": self.from_.addresses[0].domain,
"mail": f"{self.from_.addresses[0].username}@{self.from_.addresses[0].domain}"
},
'to': [
{
"display_name": address.display_name,
"username": address.username,
"domain": address.domain,
"mail": f"{address.username}@{address.domain}"
} for address in self.to.addresses
],
'date': str(self.date.datetime),
'body_text': str(self.body_text)
}
def _get_body_text(self) -> str:
"""
Extract plain text body from email
Returns:
Plain text body of email
"""
body = self.message.get_body(preferencelist=('plain',))
if body is not None:
return body.get_content() or ''
if self.message.is_multipart():
for part in self.message.walk():
if part.get_content_type() == 'text/plain' and (part.get_content_disposition() or '') != 'attachment':
try:
return part.get_content() or ''
except Exception:
payload = part.get_payload(decode=True) or b''
return payload.decode(part.get_content_charset() or 'utf-8', errors='replace')
else:
if self.message.get_content_type() == 'text/plain':
try:
return self.message.get_content() or ''
except Exception:
payload = self.message.get_payload(decode=True) or b''
return payload.decode(self.message.get_content_charset() or 'utf-8', errors='replace')
return ''
def _extract_attachments(self) -> None:
"""Extract attachments from email"""
for part in self.message.walk():
if part.get_content_disposition() == 'attachment':
filename = part.get_filename()
if not filename:
continue
data = part.get_payload(decode=True) or b''
charset = part.get_charset() or 'utf-8'
self.attachments.append({'filename': filename, 'content_type': part.get_content_type(), 'data': data, 'charset': charset})
def save_attachments(self, folder: str) -> None:
"""
Save attachments to folder
Args:
folder: Folder to save attachments to
"""
os.makedirs(folder, exist_ok=True)
for att in self.attachments:
with open(os.path.join(folder, att['filename']), 'wb') as f:
f.write(att['data'])
class EmailReaderService:
"""Service for reading emails from mailbox with improved connection resilience"""
def __init__(self, config: MailReaderMainConfig):
"""
Initialize email reader service
Args:
config: Application configuration
"""
self.email_config = EmailConfig()
self.config = config
self.mail = None
self.data: List[Mails] = []
self.mail_count = 0
self.is_connected = False
self.connect_imap()
def connect_imap(self) -> bool:
"""
Establish IMAP connection with retry mechanism
Returns:
True if connection successful, False otherwise
"""
try:
if self.mail:
# Try to close existing connection if any
try:
self.mail.close()
self.mail.logout()
except Exception:
pass
logger.info(f"Connecting to IMAP server {self.email_config.HOST}:{self.email_config.PORT}")
self.mail = IMAP4_SSL(self.email_config.HOST, self.email_config.PORT)
self.is_connected = True
return True
except (socket.error, IMAP4.error) as e:
logger.error(f"Failed to connect to IMAP server: {str(e)}")
self.is_connected = False
return False
@retry_on_connection_error(max_retries=3, delay=5, exceptions=(socket.error, IMAP4.error, OSError))
def login_and_connect(self) -> bool:
"""
Login to IMAP server and connect to inbox with retry mechanism
Returns:
True if login successful, False otherwise
Raises:
ConnectionError: If connection cannot be established
"""
if not self.is_connected:
if not self.connect_imap():
raise ConnectionError("Cannot establish connection to IMAP server")
try:
logger.info(f"Logging in as {self.email_config.USERNAME}")
self.mail.login(self.email_config.USERNAME, self.email_config.PASSWORD)
self._connect_inbox()
logger.info("Successfully logged in and connected to inbox")
return True
except (socket.error, IMAP4.error) as e:
logger.error(f"Login failed: {str(e)}")
self.is_connected = False
raise
@retry_on_connection_error(max_retries=2, delay=3, exceptions=(socket.error, IMAP4.error, OSError))
def refresh(self) -> Tuple[List[Mails], int, int]:
"""
Refresh mail data with connection retry
Returns:
Tuple of (mail data, mail count, data length)
"""
try:
self.mail_count = self._fetch_count()
self.data = self._fetch_all()
return self.data, self.mail_count, len(self.data)
except (socket.error, IMAP4.error) as e:
logger.error(f"Refresh failed, attempting to reconnect: {str(e)}")
self.connect_imap()
self.login_and_connect()
self.mail_count = self._fetch_count()
self.data = self._fetch_all()
return self.data, self.mail_count, len(self.data)
@retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error))
def _connect_inbox(self) -> None:
"""
Connect to INBOX with retry mechanism
Raises:
IMAP4.error: If connection to INBOX fails
"""
logger.info("Selecting INBOX folder")
status, _ = self.mail.select("INBOX")
if status != 'OK':
error_msg = "Failed to connect to INBOX"
logger.error(error_msg)
raise IMAP4.error(error_msg)
@retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error))
def _fetch_count(self) -> int:
"""
Fetch mail count with retry mechanism
Returns:
Number of emails
Raises:
IMAP4.error: If fetching mail count fails
"""
try:
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
if status != 'OK':
raise IMAP4.error("Failed to get mail count")
count = len(uids[0].split()) if uids[0] else 0
logger.info(f"Found {count} emails from {self.config.MAILBOX}")
return count
except (socket.error, IMAP4.error) as e:
logger.error(f"Error fetching mail count: {str(e)}")
raise
@retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error))
def _fetch_all(self) -> List[Mails]:
"""
Fetch all mails with retry mechanism
Returns:
List of mail objects
Raises:
IMAP4.error: If fetching mails fails
"""
self.data = []
try:
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
if status != 'OK':
raise IMAP4.error("Mail search failed")
if not uids[0]:
logger.info("No emails found matching criteria")
return self.data
uid_list = uids[0].split()
logger.info(f"Processing {len(uid_list)} emails")
for uid in uid_list:
try:
status, msg_data = self.mail.uid('fetch', uid, '(RFC822)')
if status == 'OK' and msg_data[0] is not None:
self.data.append(Mails(uid, msg_data[0][1]))
except Exception as e:
logger.warning(f"Failed to fetch email with UID {uid}: {str(e)}")
continue
logger.info(f"Successfully fetched {len(self.data)} emails")
return self.data
except (socket.error, IMAP4.error) as e:
logger.error(f"Error fetching emails: {str(e)}")
raise
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def move_to_folder(self, uid: Union[str, bytes], folder: str):
"""
Move message to folder with retry mechanism
Args:
uid: Email UID
folder: Destination folder
"""
try:
log_uid = uid
if isinstance(uid, bytes):
log_uid = uid.decode('utf-8', errors='replace')
elif isinstance(uid, str):
uid = uid.encode('utf-8')
logger.info(f"Moving email {log_uid} to {folder} folder")
self.mail.uid('MOVE', uid, folder)
self.commit()
return True
except Exception as e:
logger.error(f"Failed to move email to folder: {str(e)}")
return False
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def copy_to_folder(self, uid: Union[str, bytes], folder: str):
"""
Copy message to folder with retry mechanism
Args:
uid: Email UID
folder: Destination folder
"""
try:
log_uid = uid
if isinstance(uid, bytes):
log_uid = uid.decode('utf-8', errors='replace')
elif isinstance(uid, str):
uid = uid.encode('utf-8')
logger.info(f"Copying email {log_uid} to {folder} folder")
self.mail.uid('COPY', uid, folder)
self.commit()
return True
except Exception as e:
logger.error(f"Failed to copy email to folder: {str(e)}")
return False
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def mark_no_attachment(self, uid: Union[str, bytes]):
"""
Move message to no attachment folder with retry mechanism
Args:
uid: Email UID
"""
self.move_to_folder(uid, self.config.NO_ATTACHMENT_FOLDER)
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def mark_completed(self, uid: Union[str, bytes]):
"""
Move message to completed folder with retry mechanism
Args:
uid: Email UID
"""
self.move_to_folder(uid, self.config.COMPLETED_FOLDER)
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def delete(self, uid):
"""
Delete message with retry mechanism
Args:
uid: Email UID
"""
try:
# Handle both string and bytes types
log_uid = uid
if isinstance(uid, bytes):
log_uid = uid.decode('utf-8', errors='replace')
logger.info(f"Marking email {log_uid} for deletion")
self.mail.uid('STORE', uid, '+FLAGS', r'(\Deleted)')
except Exception as e:
logger.error(f"Failed to delete email: {str(e)}")
raise
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def commit(self):
"""
Commit pending operations with retry mechanism
Raises:
Exception: If commit fails
"""
try:
logger.info("Committing changes (expunge)")
self.mail.expunge()
except Exception as e:
logger.error(f"Failed to commit changes: {str(e)}")
raise
def logout(self):
"""Logout from IMAP server"""
if self.mail and self.is_connected:
try:
logger.info("Logging out from IMAP server")
self.mail.close()
self.mail.logout()
self.is_connected = False
except Exception as e:
logger.warning(f"Logout failed: {str(e)}")
@property
def count(self):
"""Get count of emails"""
return len(self.data)
class EmailServiceRunner:
"""Runner for email service with improved resilience to connection issues"""
def __init__(self, redis_handler: MailReaderService, email_service: EmailReaderService) -> None:
"""
Initialize email service runner
Args:
redis_handler: Redis handler for Redis operations
email_service: Email service for email operations
"""
# Use MailReaderService singleton for Redis operations
self.redis_handler = redis_handler
self.email_service = email_service
self.mails = None
self.count = 0
self.counter = 0
self.mail_count = 0
self.redis_connected = self._check_redis_connection()
def _check_redis_connection(self) -> bool:
"""
Check if Redis connection is alive using MailReaderService
Returns:
True if connection is alive, False otherwise
"""
try:
# Use MailReaderService to check connection
connection_status = self.redis_handler.ensure_connection()
if connection_status:
logger.info("Redis connection established via MailReaderService")
else:
logger.error("Redis connection check failed via MailReaderService")
return connection_status
except MailReaderService.REDIS_EXCEPTIONS as e:
logger.error(f"Redis connection failed: {str(e)}")
return False
@retry_on_connection_error(max_retries=3, delay=5, exceptions=MailReaderService.REDIS_EXCEPTIONS)
def _ensure_redis_connection(self) -> bool:
"""
Ensure Redis connection is established using MailReaderService
Returns:
True if connection is established, False otherwise
"""
if not self.redis_connected:
try:
self.redis_connected = self.redis_handler.ensure_connection()
if not self.redis_connected:
raise RedisHandler.REDIS_EXCEPTIONS[0]("Failed to establish Redis connection")
except Exception as e:
self.redis_connected = False
raise
return self.redis_connected
def fetch_and_set_mails(self):
"""
Fetch and process emails with improved error handling
Raises:
Exception: If fetching or processing emails fails
"""
try:
# Reset counters
self.count = 0
self.counter = 0
# Ensure Redis connection before proceeding
self._ensure_redis_connection()
# Refresh email data
try:
self.mails, self.mail_count, self.count = self.email_service.refresh()
except Exception as e:
logger.error(f"Failed to refresh emails: {str(e)}")
raise
# Process each email
for mail in self.mails:
if not getattr(mail, 'id', None):
logger.warning("Skipping email with no ID")
continue
mail_id = mail.id.decode('utf-8')
# check mail has .pdf extension
mail_dict = mail.to_dict()
try:
if mail.attachments:
if any([attachment['filename'].lower().endswith('.pdf') for attachment in mail_dict['attachments']]):
self.email_service.mark_no_attachment(mail_id)
else:
self._process_mail_with_attachments(mail, mail_id)
else:
self.email_service.mark_no_attachment(mail_id)
except Exception as e:
logger.error(f"Error processing email {mail_id}: {str(e)}")
continue
try:
self.email_service.commit()
except Exception as e:
logger.error(f"Failed to commit email changes: {str(e)}")
timestamp = f"TIMESTAMP: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
logger.info(f"Fetched and processed emails. Read/Total: {self.counter}/{self.count}. {self.count - self.counter} already saved. Mail count: {self.mail_count}")
print(f"{timestamp} Mail Reader Service | Fetching and setting mails. Read / Total Mails: {self.counter}/{self.count}. {self.count - self.counter} already saved in redis. Mail Server mail count: {self.mail_count}")
except MailReaderService.REDIS_EXCEPTIONS as e:
logger.error(f"Redis error in fetch_and_set_mails: {str(e)}")
self.redis_connected = False
raise
except Exception as e:
logger.error(f"Unexpected error in fetch_and_set_mails: {str(e)}")
raise
def _process_mail_with_attachments(self, mail: Mails, mail_id: str):
"""
Process an email with attachments using MailReaderService
Args:
mail: Mail object
mail_id: Mail ID
Raises:
Exception: If processing mail fails
"""
try:
mail_to_dict = mail.to_dict()
task_uuid, self.counter = self.redis_handler.process_mail(
mail_id=mail_id, mail_data=mail_to_dict, service_prefix=self.email_service.config.SERVICE_PREFIX, counter=self.counter
)
if task_uuid:
self.redis_handler.change_service(
task_uuid=task_uuid, service_name=ConfigServices.SERVICE_PREFIX_MAIL_READER, status=Status.COMPLETED, completed=True
)
else:
if self.redis_handler.check_mail_is_ready_to_delete(mail_id):
self.email_service.mark_completed(mail_id)
self.redis_handler.pop_mail(mail_id)
except MailReaderService.REDIS_EXCEPTIONS as e:
logger.error(f"Redis error while processing mail {mail_id}: {str(e)}")
self.redis_connected = False
raise
except Exception as e:
logger.error(f"Email Service Runner Error processing mail {mail_id}: {str(e)}")
raise
def drop(self):
"""Clean up resources"""
try:
self.email_service.commit()
except Exception as e:
logger.warning(f"Error during commit on drop: {str(e)}")
try:
self.email_service.logout()
except Exception as e:
logger.warning(f"Error during logout on drop: {str(e)}")