from redmail import EmailSender from typing import List, Optional, Dict from pydantic import BaseModel from contextlib import contextmanager from .config import email_configs class EmailSendModel(BaseModel): subject: str html: str = "" receivers: List[str] text: Optional[str] = "" cc: Optional[List[str]] = None bcc: Optional[List[str]] = None headers: Optional[Dict] = None attachments: Optional[Dict] = None class EmailSession: def __init__(self, email_sender): self.email_sender = email_sender def send(self, params: EmailSendModel) -> bool: """Send email using this session.""" if not email_configs.is_send: print("Email sending is disabled", params) return False receivers = [email_configs.USERNAME] # Ensure connection is established before sending try: # Check if connection exists, if not establish it if not hasattr(self.email_sender, '_connected') or not self.email_sender._connected: self.email_sender.connect() self.email_sender.send( subject=params.subject, receivers=receivers, text=params.text + f" : Gonderilen [{str(receivers)}]", html=params.html, cc=params.cc, bcc=params.bcc, headers=params.headers or {}, attachments=params.attachments or {}, ) return True except Exception as e: print(f"Error sending email: {e}") raise class EmailService: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(EmailService, cls).__new__(cls) return cls._instance @classmethod @contextmanager def new_session(cls): """Create and yield a new email session with active connection.""" email_sender = EmailSender(**email_configs.as_dict()) session = EmailSession(email_sender) connection_established = False try: # Establish connection and set flag email_sender.connect() # Set a flag to track connection state email_sender._connected = True connection_established = True yield session except Exception as e: print(f"Error with email connection: {e}") raise finally: # Only close if connection was successfully established if connection_established: try: email_sender.close() email_sender._connected = False except Exception as e: print(f"Error closing email connection: {e}") @classmethod def send_email(cls, session: EmailSession, params: EmailSendModel) -> bool: """Send email using the provided session.""" return session.send(params)