import logging from json import loads, dumps from uuid import uuid4 from datetime import datetime from Depends.config import Status, ConfigServices, RedisTaskObject, RedisData from Depends.redis_handlers import RedisHandler # Configure logging logger = logging.getLogger('Service Task Retriever') class ServiceTaskRetriever: """ Class for retrieving and updating Redis task objects by UUID or mail ID. Provides direct access to task objects and service-specific data without iteration. """ def __init__(self, redis_handler=None): """ Initialize the ServiceTaskRetriever Args: redis_handler: Optional RedisHandler instance. If not provided, a new one will be created. """ if redis_handler: self.redis_handler = redis_handler else: self.redis_handler = RedisHandler() self.redis_client = self.redis_handler.redis_client self.redis_prefix = ConfigServices.MAIN_TASK_PREFIX self.mailid_index_key = ConfigServices.TASK_MAILID_INDEX_PREFIX self.uuid_index_key = ConfigServices.TASK_UUID_INDEX_PREFIX def fetch_all_tasks(self): """ Get all tasks from Redis Returns: list: List of task objects """ all_task = self.redis_handler.get_all_tasks() return [RedisTaskObject(**loads(task)) for task in all_task] def get_index_by_uuid(self, task_uuid: str): """ Get the index of a task by its UUID Args: task_uuid: UUID of the task Returns: int: Index of the task if found Raises: FileNotFoundError: If the UUID index or task is not found """ uuid_index_data = self.redis_handler.get(self.uuid_index_key) if uuid_index_data: uuid_index_dict = loads(uuid_index_data) return uuid_index_dict.get(task_uuid, None) raise FileNotFoundError(f"UUID index not found for {task_uuid}") def get_index_by_mail_id(self, mail_id: str): """ Get the index of a task by its mail ID Args: mail_id: Mail ID of the task Returns: int: Index of the task if found Raises: FileNotFoundError: If the mail ID index or task is not found """ mail_id_index_data = self.redis_handler.get(self.mailid_index_key) if mail_id_index_data: mail_id_index_dict = loads(mail_id_index_data) return mail_id_index_dict.get(str(mail_id), None) raise FileNotFoundError(f"Mail ID index not found for {mail_id}") def set_index_uuid(self, task_uuid: str, index: int): """ Set the index of a task by its mail ID Args: task_uuid: UUID of the task index: Index of the task """ already_dict = self.redis_handler.get(self.mailid_index_key) if already_dict: already_dict = loads(already_dict) already_dict[str(task_uuid)] = index self.redis_handler.set(self.mailid_index_key, dumps(already_dict)) else: self.redis_handler.set(self.mailid_index_key, dumps({str(task_uuid): index})) def set_index_mail_id(self, mail_id: str, index: int): """ Set the index of a task by its mail ID Args: mail_id: Mail ID of the task index: Index of the task """ already_dict = self.redis_handler.get(self.mailid_index_key) if already_dict: already_dict = loads(already_dict) already_dict[str(mail_id)] = index self.redis_handler.set(self.mailid_index_key, dumps(already_dict)) else: self.redis_handler.set(self.mailid_index_key, dumps({str(mail_id): index})) def update_mail_id_index(self, task_uuid: str, index: int): """ Update the mail ID index with the same index as UUID index Args: mail_id: Mail ID of the task task_uuid: UUID of the task """ if get_index_by_uuid := self.get_index_by_uuid(task_uuid): self.set_index_uuid(task_uuid, get_index_by_uuid) def update_uuid_index(self, task_uuid: str, mail_id: str): """ Update the UUID index with the same index as mail ID index Args: task_uuid: UUID of the task mail_id: Mail ID of the task """ if get_index_by_mail_id := self.get_index_by_mail_id(mail_id): self.set_index_uuid(task_uuid, get_index_by_mail_id) def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject: """ Get a task object directly by its UUID without iteration Args: task_uuid: UUID of the task to retrieve Returns: RedisTaskObject: The task object if found Raises: FileNotFoundError: If the UUID index or task is not found """ index_by_uuid = self.get_index_by_uuid(task_uuid) if not index_by_uuid: raise FileNotFoundError(f"UUID index not found for {task_uuid}") if task_data := self.redis_client.lindex(self.redis_prefix, int(index_by_uuid)): return RedisTaskObject(**loads(task_data)) raise FileNotFoundError(f"Task not found for UUID: {task_uuid}") def get_task_by_mail_id(self, mail_id: str) -> RedisTaskObject: """ Get a task object directly by its mail ID without iteration Args: mail_id: Mail ID of the task to retrieve Returns: RedisTaskObject: The task object if found Raises: FileNotFoundError: If the mail ID index or task is not found """ mail_id_index = self.get_index_by_mail_id(mail_id) if mail_id_index is not None: if task_data := self.redis_client.lindex(self.redis_prefix, int(mail_id_index)): return RedisTaskObject(**loads(task_data)) raise FileNotFoundError(f"Task not found for mail ID: {mail_id}") def get_service_data_by_uuid(self, task_uuid: str, service_name: str): """ Get service-specific data from a task by UUID Args: task_uuid: UUID of the task service_name: Name of the service to extract data for Returns: Any: Service-specific data if found Raises: FileNotFoundError: If the task or service data is not found """ task_object = self.get_task_by_uuid(task_uuid) # Extract service data for attr in ConfigServices.__dict__: if attr == service_name: service_data = getattr(task_object.data, attr, None) if service_data: return service_data raise FileNotFoundError(f"Service data '{service_name}' not found in task {task_uuid}") def get_service_data_by_mail_id(self, mail_id: str, service_name: str): """ Get service-specific data from a task by mail ID Args: mail_id: Mail ID of the task service_name: Name of the service to extract data for Returns: Any: Service-specific data if found Raises: FileNotFoundError: If the task or service data is not found """ task_object = self.get_task_by_mail_id(mail_id) for attr in ConfigServices.__dict__: if attr == service_name: service_data = getattr(task_object.data, attr, None) if service_data: return service_data raise FileNotFoundError(f"Service data '{service_name}' not found in task for mail ID {mail_id}") def create_task_with_uuid(self, task_uuid: str, service_name: str, mail_reader: dict, mail_parser: dict) -> bool: """ Create a new task with UUID indexing. This method will fail if a task with the UUID already exists. Args: task_uuid: UUID for the task service_name: Service name for the task task_data: Dictionary containing task data Returns: bool: True if successful Raises: ValueError: If the task data is invalid, storage fails, or task already exists """ # Check if task with this UUID already exists try: existing_task = self.get_task_by_uuid(task_uuid) # If we get here, task exists raise ValueError(f"Task with UUID {task_uuid} already exists. Use store_task_with_uuid to update.") except FileNotFoundError: # Task doesn't exist, proceed with creation pass # Validate service name self._validate_service_name(service_name) # Create new RedisData with proper defaults for all services data_dict = {'MailReader': None, 'MailParser': [], 'FinderIban': {}, 'FinderComment': {}} # Set the actual service data data_dict['MailReader'] = mail_reader data_dict['MailParser'] = mail_parser # Create new RedisData object redis_data = RedisData(**data_dict) # Create new task object write_object = RedisTaskObject( task=task_uuid, data=redis_data, completed=False, service=service_name, status=Status.COMPLETED, created_at=datetime.now().isoformat(), is_completed=False ) # Convert to dict for serialization write_object = write_object.model_dump() # Push new task to Redis list redis_write_ = self.redis_client.rpush(self.redis_prefix, dumps(write_object)) if not redis_write_: raise ValueError(f"Failed to write task data to Redis for UUID {task_uuid}") index_value = redis_write_ - 1 self.set_index_mail_id(mail_reader['id'], index_value) self.set_index_uuid(task_uuid, index_value) return True def store_task_with_uuid(self, task_uuid: str, service_name: str, task_data: dict) -> bool: """ Update an existing task with UUID indexing. Only the service-specific data will be updated while preserving other service data and task metadata. Args: task_uuid: UUID for the task service_name: Service name for the task task_data: Dictionary containing service-specific data Returns: bool: True if successful Raises: FileNotFoundError: If the task does not exist ValueError: If the task data is invalid or storage fails """ # Validate service name self._validate_service_name(service_name) # Get existing task try: existing_task = self.get_task_by_uuid(task_uuid) except FileNotFoundError: raise FileNotFoundError(f"Task with UUID {task_uuid} not found. Use create_task_with_uuid to create a new task.") # Prepare new service data new_service_data = task_data # Get the existing data model existing_data = existing_task.data # Create a new RedisData with all existing service data data_dict = existing_data.model_dump() # Update only the specific service data data_dict[service_name] = new_service_data # Create updated RedisData object redis_data = RedisData(**data_dict) # Create task object with existing metadata but updated data write_object = RedisTaskObject( task=task_uuid, data=redis_data, completed=existing_task.completed, service=existing_task.service, status=existing_task.status, created_at=existing_task.created_at, is_completed=existing_task.is_completed ) # Convert to dict for serialization write_object = write_object.model_dump() # Get task index index_value, _ = self._get_task_index_by_uuid(task_uuid) # Update the task at the existing index if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(write_object)): raise ValueError(f"Failed to update task data in Redis for UUID {task_uuid}") return True def _get_task_index_by_uuid(self, task_uuid: str) -> tuple: """ Helper method to get task index by UUID Args: task_uuid: UUID of the task Returns: tuple: (index_value, task_data_dict) where index_value is the index in Redis list and task_data_dict is the deserialized task data Raises: FileNotFoundError: If the UUID index or task is not found """ # Get UUID index uuid_index_data = self.redis_client.get(self.uuid_index_key) if not uuid_index_data: raise FileNotFoundError(f"UUID index not found for task: {task_uuid}") # Parse index and get task index uuid_index_dict = loads(uuid_index_data) index_value = uuid_index_dict.get(task_uuid) if index_value is None: raise FileNotFoundError(f"Task UUID {task_uuid} not found in index") # Get task data task_data = self.redis_client.lindex(self.redis_prefix, int(index_value)) if not task_data: raise FileNotFoundError(f"No task data found at index {index_value}") return index_value, loads(task_data) def _validate_service_name(self, service_name: str) -> bool: """ Validate that a service name exists in ConfigServices Args: service_name: Name of the service to validate Returns: bool: True if valid Raises: ValueError: If service name is invalid """ # Check if service_name is one of the values in ConfigServices valid_service_names = [ ConfigServices.SERVICE_PREFIX_MAIL_READER, ConfigServices.SERVICE_PREFIX_MAIL_PARSER, ConfigServices.SERVICE_PREFIX_FINDER_IBAN, ConfigServices.SERVICE_PREFIX_FINDER_COMMENT ] if service_name in valid_service_names: return True raise ValueError(f"Invalid service name: {service_name}") def update_task_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False): """ Update the service of a task by UUID Args: task_uuid: UUID of the task to update service_name: Name of the service to update Returns: bool: True if successful Raises: FileNotFoundError: If the task is not found ValueError: If the update fails or service name is invalid """ # Get task index and data index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid) # Update task status task_object_dict['service'] = service_name task_object_dict['status'] = status task_object_dict['completed'] = completed # Write updated task back to Redis if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)): raise ValueError(f"Failed to write updated task data for UUID {task_uuid}") return True def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool: """ Update the status of a task by UUID Args: task_uuid: UUID of the task to update is_completed: Whether the task is completed status: New status for the task Returns: bool: True if successful Raises: FileNotFoundError: If the task is not found ValueError: If the update fails """ # Get task index and data index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid) # Update task status task_object_dict['is_completed'] = is_completed task_object_dict['status'] = status task_object_dict['completed'] = is_completed # Write updated task back to Redis if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)): raise ValueError(f"Failed to write updated task data for UUID {task_uuid}") return True def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool: """ Update service-specific data in a task by UUID Args: task_uuid: UUID of the task to update service_name: Name of the service data to update service_data: New service data Returns: bool: True if successful Raises: FileNotFoundError: If the task is not found ValueError: If the update fails or service name is invalid """ # Validate service name self._validate_service_name(service_name) # Get task index and data index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid) # Update the service data if 'data' not in task_object_dict: task_object_dict['data'] = {} task_object_dict['data'][service_name] = service_data # Write updated task back to Redis if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)): raise ValueError(f"Failed to update service data for UUID {task_uuid}") return True class MailReaderService: """ Main handler class that uses ServiceTaskRetriever with RedisHandler for Redis operations. This class serves as the main entry point for Redis operations in the application. Uses the RedisHandler singleton for all Redis operations. """ # Singleton instance _instance = None REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS @classmethod def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5): """ Handle Redis reconnection with exponential backoff based on consecutive errors Args: consecutive_errors: Number of consecutive errors encountered max_consecutive_errors: Threshold for extended sleep time Returns: tuple: (MainRedisHandler instance, bool indicating if extended sleep is needed) """ # Delegate to RedisHandler's reconnection logic redis_handler, need_extended_sleep = RedisHandler.handle_reconnection( consecutive_errors=consecutive_errors, max_consecutive_errors=max_consecutive_errors ) # If Redis handler was successfully reconnected, create a new MainRedisHandler instance if redis_handler: # Reset the singleton instance to force re-initialization cls._instance = None main_handler = cls() return main_handler, need_extended_sleep return None, need_extended_sleep def __new__(cls): if cls._instance is None: cls._instance = super(MailReaderService, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): # Initialize only once if hasattr(self, '_initialized') and self._initialized: return # Use RedisHandler singleton self.redis_handler = RedisHandler() self.service_retriever = ServiceTaskRetriever(self.redis_handler) self._initialized = True def ensure_connection(self): """ Ensure Redis connection is established Returns: bool: True if connection is established, False otherwise """ return self.redis_handler.ensure_connection() def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject: """ Get a task object by its UUID Args: task_uuid: UUID of the task to retrieve Returns: RedisTaskObject: The task object if found Raises: FileNotFoundError: If the UUID index or task is not found """ return self.service_retriever.get_task_by_uuid(task_uuid) def get_task_by_mail_id(self, mail_id: str) -> RedisTaskObject: """ Get a task object by its mail ID Args: mail_id: Mail ID of the task to retrieve Returns: RedisTaskObject: The task object if found Raises: FileNotFoundError: If the mail ID index or task is not found """ return self.service_retriever.get_task_by_mail_id(mail_id) def get_service_data_by_uuid(self, task_uuid: str, service_name: str): """ Get service-specific data from a task by UUID Args: task_uuid: UUID of the task service_name: Name of the service to extract data for Returns: Any: Service-specific data if found Raises: FileNotFoundError: If the task or service data is not found """ return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name) def get_service_data_by_mail_id(self, mail_id: str, service_name: str): """ Get service-specific data from a task by mail ID Args: mail_id: Mail ID of the task service_name: Name of the service to extract data for Returns: Any: Service-specific data if found Raises: FileNotFoundError: If the task or service data is not found """ return self.service_retriever.get_service_data_by_mail_id(mail_id, service_name) def store_task_with_uuid(self, task_uuid: str, service_name: str, task_data: dict) -> bool: """ Store a task with UUID indexing Args: task_uuid: UUID for the task service_name: Service name for the task task_data: Dictionary containing task data Returns: bool: True if successful Raises: ValueError: If the task data is invalid or storage fails """ return self.service_retriever.store_task_with_uuid(task_uuid, service_name, task_data) def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool: """ Update the status of a task by UUID Args: task_uuid: UUID of the task to update is_completed: Whether the task is completed status: New status for the task Returns: bool: True if successful Raises: FileNotFoundError: If the task is not found ValueError: If the update fails """ return self.service_retriever.update_task_status(task_uuid, is_completed, status) def process_mail(self, mail_id: str, mail_data: dict, service_prefix: str, counter: int) -> dict: """ Process mail data and store it in Redis Args: mail_id: The ID of the mail mail_data: Dictionary containing mail data service_prefix: Service prefix for the mail Returns: dict: Result of the operation with status and action """ try: if self.redis_handler.sadd(f'{ConfigServices.TASK_SEEN_PREFIX}', mail_id): counter += 1 task_uuid = uuid4().hex mail_without_attachments = mail_data.copy() attachments = mail_without_attachments.pop('attachments', []) create_task = dict(task_uuid=task_uuid, service_name=service_prefix, mail_reader=mail_without_attachments, mail_parser=attachments) self.service_retriever.create_task_with_uuid(**create_task) return {'status': 'success', 'action': 'stored_new_mail', 'counter': counter} else: try: task = self.service_retriever.get_task_by_mail_id(mail_id) if task is None and task.data and task.data.MailReader: stored_id = task.data.MailReader.id if stored_id != mail_id: return {'status': 'error', 'action': 'id_mismatch', 'stored_id': stored_id} return {'status': 'success', 'action': 'checked_existing_mail', 'is_completed': task.is_completed if task else False} except FileNotFoundError: return {'status': 'error', 'action': 'not_found', 'error': f'Mail with ID {mail_id} not found in index'} except Exception as e: logger.error(f"Mail Reader Service Error processing mail {mail_id}: {str(e)}") return {'status': 'error', 'action': 'exception', 'error': str(e)} class MailParserService: """ Mail Parser Service """ # Singleton instance _instance = None REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS @classmethod def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5): """ Handle Redis reconnection with exponential backoff based on consecutive errors Args: consecutive_errors: Number of consecutive errors encountered max_consecutive_errors: Threshold for extended sleep time Returns: tuple: (MainRedisHandler instance, bool indicating if extended sleep is needed) """ # Delegate to RedisHandler's reconnection logic redis_handler, need_extended_sleep = RedisHandler.handle_reconnection( consecutive_errors=consecutive_errors, max_consecutive_errors=max_consecutive_errors ) # If Redis handler was successfully reconnected, create a new MainRedisHandler instance if redis_handler: # Reset the singleton instance to force re-initialization cls._instance = None main_handler = cls() return main_handler, need_extended_sleep return None, need_extended_sleep def __new__(cls): if cls._instance is None: cls._instance = super(MailParserService, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): # Initialize only once if hasattr(self, '_initialized') and self._initialized: return # Use RedisHandler singleton self.service_retriever = ServiceTaskRetriever() self._initialized = True def fetch_all_tasks(self) -> list[RedisTaskObject]: return self.service_retriever.fetch_all_tasks() def ensure_connection(self): """ Ensure Redis connection is established Returns: bool: True if connection is established, False otherwise """ return self.redis_handler.ensure_connection() def _check_redis_connection(self) -> bool: """ Check if Redis connection is alive using RedisHandler Returns: True if connection is alive, False otherwise """ try: # Use RedisHandler to check connection connection_status = self.redis_handler.ensure_connection() if connection_status: logger.info("Redis connection established via RedisHandler") else: logger.error("Redis connection check failed via RedisHandler") return connection_status except RedisHandler.REDIS_EXCEPTIONS as e: logger.error(f"Redis connection failed: {str(e)}") return False def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject: """ Get a task object by its UUID Args: task_uuid: UUID of the task to retrieve Returns: RedisTaskObject: The task object if found Raises: FileNotFoundError: If the UUID index or task is not found """ return self.service_retriever.get_task_by_uuid(task_uuid) def get_service_data_by_uuid(self, task_uuid: str, service_name: str): """ Get service-specific data from a task by UUID Args: task_uuid: UUID of the task service_name: Name of the service to extract data for Returns: Any: Service-specific data if found Raises: FileNotFoundError: If the task or service data is not found """ return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name) def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool: """ Update service-specific data in a task by UUID Args: task_uuid: UUID of the task to update service_name: Name of the service data to update service_data: New service data Returns: bool: True if successful Raises: FileNotFoundError: If the task is not found ValueError: If the update fails or service name is invalid """ return self.service_retriever.update_service_data(task_uuid, service_name, service_data) def change_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False) -> bool: """ Update the service of a task by UUID """ return self.service_retriever.update_task_service(task_uuid, service_name, status, completed) def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool: """ Update the status of a task by UUID """ return self.service_retriever.update_task_status(task_uuid, is_completed, status)