prod-wag-backend-automate-s.../api_services/api_controllers/email/send_email.py

91 lines
2.9 KiB
Python

from redmail import EmailSender
from typing import List, Optional, Dict
from pydantic import BaseModel
from contextlib import contextmanager
from .config import email_configs
class EmailSendModel(BaseModel):
subject: str
html: str = ""
receivers: List[str]
text: Optional[str] = ""
cc: Optional[List[str]] = None
bcc: Optional[List[str]] = None
headers: Optional[Dict] = None
attachments: Optional[Dict] = None
class EmailSession:
def __init__(self, email_sender):
self.email_sender = email_sender
def send(self, params: EmailSendModel) -> bool:
"""Send email using this session."""
if not email_configs.is_send:
print("Email sending is disabled", params)
return False
receivers = [email_configs.USERNAME]
# Ensure connection is established before sending
try:
# Check if connection exists, if not establish it
if not hasattr(self.email_sender, '_connected') or not self.email_sender._connected:
self.email_sender.connect()
self.email_sender.send(
subject=params.subject,
receivers=receivers,
text=params.text + f" : Gonderilen [{str(receivers)}]",
html=params.html,
cc=params.cc,
bcc=params.bcc,
headers=params.headers or {},
attachments=params.attachments or {},
)
return True
except Exception as e:
print(f"Error sending email: {e}")
raise
class EmailService:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(EmailService, cls).__new__(cls)
return cls._instance
@classmethod
@contextmanager
def new_session(cls):
"""Create and yield a new email session with active connection."""
email_sender = EmailSender(**email_configs.as_dict())
session = EmailSession(email_sender)
connection_established = False
try:
# Establish connection and set flag
email_sender.connect()
# Set a flag to track connection state
email_sender._connected = True
connection_established = True
yield session
except Exception as e:
print(f"Error with email connection: {e}")
raise
finally:
# Only close if connection was successfully established
if connection_established:
try:
email_sender.close()
email_sender._connected = False
except Exception as e:
print(f"Error closing email connection: {e}")
@classmethod
def send_email(cls, session: EmailSession, params: EmailSendModel) -> bool:
"""Send email using the provided session."""
return session.send(params)