updated prisma service async runner
This commit is contained in:
835
ServicesRunner/Depends/service_handler.py
Normal file
835
ServicesRunner/Depends/service_handler.py
Normal file
@@ -0,0 +1,835 @@
|
||||
import logging
|
||||
|
||||
from json import loads, dumps
|
||||
from uuid import uuid4
|
||||
from datetime import datetime
|
||||
from Depends.config import Status, ConfigServices, RedisTaskObject, RedisData
|
||||
from Depends.redis_handlers import RedisHandler
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger('Service Task Retriever')
|
||||
|
||||
|
||||
class ServiceTaskRetriever:
|
||||
"""
|
||||
Class for retrieving and updating Redis task objects by UUID or mail ID.
|
||||
Provides direct access to task objects and service-specific data without iteration.
|
||||
"""
|
||||
|
||||
def __init__(self, redis_handler=None):
|
||||
"""
|
||||
Initialize the ServiceTaskRetriever
|
||||
|
||||
Args:
|
||||
redis_handler: Optional RedisHandler instance. If not provided, a new one will be created.
|
||||
"""
|
||||
if redis_handler:
|
||||
self.redis_handler = redis_handler
|
||||
else:
|
||||
self.redis_handler = RedisHandler()
|
||||
|
||||
self.redis_client = self.redis_handler.redis_client
|
||||
self.redis_prefix = ConfigServices.MAIN_TASK_PREFIX
|
||||
self.mailid_index_key = ConfigServices.TASK_MAILID_INDEX_PREFIX
|
||||
self.uuid_index_key = ConfigServices.TASK_UUID_INDEX_PREFIX
|
||||
|
||||
def fetch_all_tasks(self):
|
||||
"""
|
||||
Get all tasks from Redis
|
||||
|
||||
Returns:
|
||||
list: List of task objects
|
||||
"""
|
||||
all_task = self.redis_handler.get_all_tasks()
|
||||
return [RedisTaskObject(**loads(task)) for task in all_task]
|
||||
|
||||
def get_index_by_uuid(self, task_uuid: str):
|
||||
"""
|
||||
Get the index of a task by its UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
|
||||
Returns:
|
||||
int: Index of the task if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the UUID index or task is not found
|
||||
"""
|
||||
uuid_index_data = self.redis_handler.get(self.uuid_index_key)
|
||||
if uuid_index_data:
|
||||
uuid_index_dict = loads(uuid_index_data)
|
||||
return uuid_index_dict.get(task_uuid, None)
|
||||
raise FileNotFoundError(f"UUID index not found for {task_uuid}")
|
||||
|
||||
def get_index_by_mail_id(self, mail_id: str):
|
||||
"""
|
||||
Get the index of a task by its mail ID
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task
|
||||
|
||||
Returns:
|
||||
int: Index of the task if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the mail ID index or task is not found
|
||||
"""
|
||||
mail_id_index_data = self.redis_handler.get(self.mailid_index_key)
|
||||
if mail_id_index_data:
|
||||
mail_id_index_dict = loads(mail_id_index_data)
|
||||
return mail_id_index_dict.get(str(mail_id), None)
|
||||
raise FileNotFoundError(f"Mail ID index not found for {mail_id}")
|
||||
|
||||
def set_index_uuid(self, task_uuid: str, index: int):
|
||||
"""
|
||||
Set the index of a task by its mail ID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
index: Index of the task
|
||||
"""
|
||||
already_dict = self.redis_handler.get(self.mailid_index_key)
|
||||
if already_dict:
|
||||
already_dict = loads(already_dict)
|
||||
already_dict[str(task_uuid)] = index
|
||||
self.redis_handler.set(self.mailid_index_key, dumps(already_dict))
|
||||
else:
|
||||
self.redis_handler.set(self.mailid_index_key, dumps({str(task_uuid): index}))
|
||||
|
||||
def set_index_mail_id(self, mail_id: str, index: int):
|
||||
"""
|
||||
Set the index of a task by its mail ID
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task
|
||||
index: Index of the task
|
||||
"""
|
||||
already_dict = self.redis_handler.get(self.mailid_index_key)
|
||||
if already_dict:
|
||||
already_dict = loads(already_dict)
|
||||
already_dict[str(mail_id)] = index
|
||||
self.redis_handler.set(self.mailid_index_key, dumps(already_dict))
|
||||
else:
|
||||
self.redis_handler.set(self.mailid_index_key, dumps({str(mail_id): index}))
|
||||
|
||||
def update_mail_id_index(self, task_uuid: str, index: int):
|
||||
"""
|
||||
Update the mail ID index with the same index as UUID index
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task
|
||||
task_uuid: UUID of the task
|
||||
"""
|
||||
if get_index_by_uuid := self.get_index_by_uuid(task_uuid):
|
||||
self.set_index_uuid(task_uuid, get_index_by_uuid)
|
||||
|
||||
def update_uuid_index(self, task_uuid: str, mail_id: str):
|
||||
"""
|
||||
Update the UUID index with the same index as mail ID index
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
mail_id: Mail ID of the task
|
||||
"""
|
||||
if get_index_by_mail_id := self.get_index_by_mail_id(mail_id):
|
||||
self.set_index_uuid(task_uuid, get_index_by_mail_id)
|
||||
|
||||
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
|
||||
"""
|
||||
Get a task object directly by its UUID without iteration
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to retrieve
|
||||
|
||||
Returns:
|
||||
RedisTaskObject: The task object if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the UUID index or task is not found
|
||||
"""
|
||||
index_by_uuid = self.get_index_by_uuid(task_uuid)
|
||||
if not index_by_uuid:
|
||||
raise FileNotFoundError(f"UUID index not found for {task_uuid}")
|
||||
if task_data := self.redis_client.lindex(self.redis_prefix, int(index_by_uuid)):
|
||||
return RedisTaskObject(**loads(task_data))
|
||||
raise FileNotFoundError(f"Task not found for UUID: {task_uuid}")
|
||||
|
||||
def get_task_by_mail_id(self, mail_id: str) -> RedisTaskObject:
|
||||
"""
|
||||
Get a task object directly by its mail ID without iteration
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task to retrieve
|
||||
|
||||
Returns:
|
||||
RedisTaskObject: The task object if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the mail ID index or task is not found
|
||||
"""
|
||||
mail_id_index = self.get_index_by_mail_id(mail_id)
|
||||
if mail_id_index is not None:
|
||||
if task_data := self.redis_client.lindex(self.redis_prefix, int(mail_id_index)):
|
||||
return RedisTaskObject(**loads(task_data))
|
||||
raise FileNotFoundError(f"Task not found for mail ID: {mail_id}")
|
||||
|
||||
def get_service_data_by_uuid(self, task_uuid: str, service_name: str):
|
||||
"""
|
||||
Get service-specific data from a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
service_name: Name of the service to extract data for
|
||||
|
||||
Returns:
|
||||
Any: Service-specific data if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task or service data is not found
|
||||
"""
|
||||
task_object = self.get_task_by_uuid(task_uuid)
|
||||
|
||||
# Extract service data
|
||||
for attr in ConfigServices.__dict__:
|
||||
if attr == service_name:
|
||||
service_data = getattr(task_object.data, attr, None)
|
||||
if service_data:
|
||||
return service_data
|
||||
raise FileNotFoundError(f"Service data '{service_name}' not found in task {task_uuid}")
|
||||
|
||||
def get_service_data_by_mail_id(self, mail_id: str, service_name: str):
|
||||
"""
|
||||
Get service-specific data from a task by mail ID
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task
|
||||
service_name: Name of the service to extract data for
|
||||
|
||||
Returns:
|
||||
Any: Service-specific data if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task or service data is not found
|
||||
"""
|
||||
task_object = self.get_task_by_mail_id(mail_id)
|
||||
for attr in ConfigServices.__dict__:
|
||||
if attr == service_name:
|
||||
service_data = getattr(task_object.data, attr, None)
|
||||
if service_data:
|
||||
return service_data
|
||||
raise FileNotFoundError(f"Service data '{service_name}' not found in task for mail ID {mail_id}")
|
||||
|
||||
def create_task_with_uuid(self, task_uuid: str, service_name: str, mail_reader: dict, mail_parser: dict) -> bool:
|
||||
"""
|
||||
Create a new task with UUID indexing. This method will fail if a task with the UUID already exists.
|
||||
|
||||
Args:
|
||||
task_uuid: UUID for the task
|
||||
service_name: Service name for the task
|
||||
task_data: Dictionary containing task data
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
ValueError: If the task data is invalid, storage fails, or task already exists
|
||||
"""
|
||||
# Check if task with this UUID already exists
|
||||
try:
|
||||
existing_task = self.get_task_by_uuid(task_uuid)
|
||||
# If we get here, task exists
|
||||
raise ValueError(f"Task with UUID {task_uuid} already exists. Use store_task_with_uuid to update.")
|
||||
except FileNotFoundError:
|
||||
# Task doesn't exist, proceed with creation
|
||||
pass
|
||||
|
||||
# Validate service name
|
||||
self._validate_service_name(service_name)
|
||||
|
||||
# Create new RedisData with proper defaults for all services
|
||||
data_dict = {'MailReader': None, 'MailParser': [], 'FinderIban': {}, 'FinderComment': {}}
|
||||
# Set the actual service data
|
||||
data_dict['MailReader'] = mail_reader
|
||||
data_dict['MailParser'] = mail_parser
|
||||
|
||||
# Create new RedisData object
|
||||
redis_data = RedisData(**data_dict)
|
||||
|
||||
# Create new task object
|
||||
write_object = RedisTaskObject(
|
||||
task=task_uuid,
|
||||
data=redis_data,
|
||||
completed=False,
|
||||
service=service_name,
|
||||
status=Status.COMPLETED,
|
||||
created_at=datetime.now().isoformat(),
|
||||
is_completed=False
|
||||
)
|
||||
|
||||
# Convert to dict for serialization
|
||||
write_object = write_object.model_dump()
|
||||
|
||||
# Push new task to Redis list
|
||||
redis_write_ = self.redis_client.rpush(self.redis_prefix, dumps(write_object))
|
||||
if not redis_write_:
|
||||
raise ValueError(f"Failed to write task data to Redis for UUID {task_uuid}")
|
||||
|
||||
index_value = redis_write_ - 1
|
||||
self.set_index_mail_id(mail_reader['id'], index_value)
|
||||
self.set_index_uuid(task_uuid, index_value)
|
||||
return True
|
||||
|
||||
def store_task_with_uuid(self, task_uuid: str, service_name: str, task_data: dict) -> bool:
|
||||
"""
|
||||
Update an existing task with UUID indexing. Only the service-specific data will be updated
|
||||
while preserving other service data and task metadata.
|
||||
|
||||
Args:
|
||||
task_uuid: UUID for the task
|
||||
service_name: Service name for the task
|
||||
task_data: Dictionary containing service-specific data
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task does not exist
|
||||
ValueError: If the task data is invalid or storage fails
|
||||
"""
|
||||
# Validate service name
|
||||
self._validate_service_name(service_name)
|
||||
|
||||
# Get existing task
|
||||
try:
|
||||
existing_task = self.get_task_by_uuid(task_uuid)
|
||||
except FileNotFoundError:
|
||||
raise FileNotFoundError(f"Task with UUID {task_uuid} not found. Use create_task_with_uuid to create a new task.")
|
||||
|
||||
# Prepare new service data
|
||||
new_service_data = task_data
|
||||
|
||||
# Get the existing data model
|
||||
existing_data = existing_task.data
|
||||
|
||||
# Create a new RedisData with all existing service data
|
||||
data_dict = existing_data.model_dump()
|
||||
|
||||
# Update only the specific service data
|
||||
data_dict[service_name] = new_service_data
|
||||
|
||||
# Create updated RedisData object
|
||||
redis_data = RedisData(**data_dict)
|
||||
|
||||
# Create task object with existing metadata but updated data
|
||||
write_object = RedisTaskObject(
|
||||
task=task_uuid,
|
||||
data=redis_data,
|
||||
completed=existing_task.completed,
|
||||
service=existing_task.service,
|
||||
status=existing_task.status,
|
||||
created_at=existing_task.created_at,
|
||||
is_completed=existing_task.is_completed
|
||||
)
|
||||
|
||||
# Convert to dict for serialization
|
||||
write_object = write_object.model_dump()
|
||||
|
||||
# Get task index
|
||||
index_value, _ = self._get_task_index_by_uuid(task_uuid)
|
||||
|
||||
# Update the task at the existing index
|
||||
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(write_object)):
|
||||
raise ValueError(f"Failed to update task data in Redis for UUID {task_uuid}")
|
||||
|
||||
return True
|
||||
|
||||
def _get_task_index_by_uuid(self, task_uuid: str) -> tuple:
|
||||
"""
|
||||
Helper method to get task index by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
|
||||
Returns:
|
||||
tuple: (index_value, task_data_dict) where index_value is the index in Redis list
|
||||
and task_data_dict is the deserialized task data
|
||||
Raises:
|
||||
FileNotFoundError: If the UUID index or task is not found
|
||||
"""
|
||||
# Get UUID index
|
||||
uuid_index_data = self.redis_client.get(self.uuid_index_key)
|
||||
if not uuid_index_data:
|
||||
raise FileNotFoundError(f"UUID index not found for task: {task_uuid}")
|
||||
|
||||
# Parse index and get task index
|
||||
uuid_index_dict = loads(uuid_index_data)
|
||||
index_value = uuid_index_dict.get(task_uuid)
|
||||
if index_value is None:
|
||||
raise FileNotFoundError(f"Task UUID {task_uuid} not found in index")
|
||||
|
||||
# Get task data
|
||||
task_data = self.redis_client.lindex(self.redis_prefix, int(index_value))
|
||||
if not task_data:
|
||||
raise FileNotFoundError(f"No task data found at index {index_value}")
|
||||
|
||||
return index_value, loads(task_data)
|
||||
|
||||
def _validate_service_name(self, service_name: str) -> bool:
|
||||
"""
|
||||
Validate that a service name exists in ConfigServices
|
||||
|
||||
Args:
|
||||
service_name: Name of the service to validate
|
||||
|
||||
Returns:
|
||||
bool: True if valid
|
||||
|
||||
Raises:
|
||||
ValueError: If service name is invalid
|
||||
"""
|
||||
# Check if service_name is one of the values in ConfigServices
|
||||
valid_service_names = [
|
||||
ConfigServices.SERVICE_PREFIX_MAIL_READER,
|
||||
ConfigServices.SERVICE_PREFIX_MAIL_PARSER,
|
||||
ConfigServices.SERVICE_PREFIX_FINDER_IBAN,
|
||||
ConfigServices.SERVICE_PREFIX_FINDER_COMMENT
|
||||
]
|
||||
|
||||
if service_name in valid_service_names:
|
||||
return True
|
||||
|
||||
raise ValueError(f"Invalid service name: {service_name}")
|
||||
|
||||
def update_task_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False):
|
||||
"""
|
||||
Update the service of a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to update
|
||||
service_name: Name of the service to update
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task is not found
|
||||
ValueError: If the update fails or service name is invalid
|
||||
"""
|
||||
# Get task index and data
|
||||
index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid)
|
||||
|
||||
# Update task status
|
||||
task_object_dict['service'] = service_name
|
||||
task_object_dict['status'] = status
|
||||
task_object_dict['completed'] = completed
|
||||
|
||||
# Write updated task back to Redis
|
||||
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)):
|
||||
raise ValueError(f"Failed to write updated task data for UUID {task_uuid}")
|
||||
return True
|
||||
|
||||
|
||||
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
|
||||
"""
|
||||
Update the status of a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to update
|
||||
is_completed: Whether the task is completed
|
||||
status: New status for the task
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task is not found
|
||||
ValueError: If the update fails
|
||||
"""
|
||||
# Get task index and data
|
||||
index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid)
|
||||
|
||||
# Update task status
|
||||
task_object_dict['is_completed'] = is_completed
|
||||
task_object_dict['status'] = status
|
||||
task_object_dict['completed'] = is_completed
|
||||
|
||||
# Write updated task back to Redis
|
||||
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)):
|
||||
raise ValueError(f"Failed to write updated task data for UUID {task_uuid}")
|
||||
|
||||
return True
|
||||
|
||||
def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool:
|
||||
"""
|
||||
Update service-specific data in a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to update
|
||||
service_name: Name of the service data to update
|
||||
service_data: New service data
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task is not found
|
||||
ValueError: If the update fails or service name is invalid
|
||||
"""
|
||||
# Validate service name
|
||||
self._validate_service_name(service_name)
|
||||
|
||||
# Get task index and data
|
||||
index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid)
|
||||
|
||||
# Update the service data
|
||||
if 'data' not in task_object_dict:
|
||||
task_object_dict['data'] = {}
|
||||
|
||||
task_object_dict['data'][service_name] = service_data
|
||||
|
||||
# Write updated task back to Redis
|
||||
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)):
|
||||
raise ValueError(f"Failed to update service data for UUID {task_uuid}")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class MailReaderService:
|
||||
"""
|
||||
Main handler class that uses ServiceTaskRetriever with RedisHandler for Redis operations.
|
||||
This class serves as the main entry point for Redis operations in the application.
|
||||
Uses the RedisHandler singleton for all Redis operations.
|
||||
"""
|
||||
|
||||
# Singleton instance
|
||||
_instance = None
|
||||
REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS
|
||||
|
||||
@classmethod
|
||||
def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5):
|
||||
"""
|
||||
Handle Redis reconnection with exponential backoff based on consecutive errors
|
||||
|
||||
Args:
|
||||
consecutive_errors: Number of consecutive errors encountered
|
||||
max_consecutive_errors: Threshold for extended sleep time
|
||||
|
||||
Returns:
|
||||
tuple: (MainRedisHandler instance, bool indicating if extended sleep is needed)
|
||||
"""
|
||||
# Delegate to RedisHandler's reconnection logic
|
||||
redis_handler, need_extended_sleep = RedisHandler.handle_reconnection(
|
||||
consecutive_errors=consecutive_errors,
|
||||
max_consecutive_errors=max_consecutive_errors
|
||||
)
|
||||
|
||||
# If Redis handler was successfully reconnected, create a new MainRedisHandler instance
|
||||
if redis_handler:
|
||||
# Reset the singleton instance to force re-initialization
|
||||
cls._instance = None
|
||||
main_handler = cls()
|
||||
return main_handler, need_extended_sleep
|
||||
|
||||
return None, need_extended_sleep
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(MailReaderService, cls).__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
# Initialize only once
|
||||
if hasattr(self, '_initialized') and self._initialized:
|
||||
return
|
||||
|
||||
# Use RedisHandler singleton
|
||||
self.redis_handler = RedisHandler()
|
||||
self.service_retriever = ServiceTaskRetriever(self.redis_handler)
|
||||
self._initialized = True
|
||||
|
||||
def ensure_connection(self):
|
||||
"""
|
||||
Ensure Redis connection is established
|
||||
|
||||
Returns:
|
||||
bool: True if connection is established, False otherwise
|
||||
"""
|
||||
return self.redis_handler.ensure_connection()
|
||||
|
||||
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
|
||||
"""
|
||||
Get a task object by its UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to retrieve
|
||||
|
||||
Returns:
|
||||
RedisTaskObject: The task object if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the UUID index or task is not found
|
||||
"""
|
||||
return self.service_retriever.get_task_by_uuid(task_uuid)
|
||||
|
||||
def get_task_by_mail_id(self, mail_id: str) -> RedisTaskObject:
|
||||
"""
|
||||
Get a task object by its mail ID
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task to retrieve
|
||||
|
||||
Returns:
|
||||
RedisTaskObject: The task object if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the mail ID index or task is not found
|
||||
"""
|
||||
return self.service_retriever.get_task_by_mail_id(mail_id)
|
||||
|
||||
def get_service_data_by_uuid(self, task_uuid: str, service_name: str):
|
||||
"""
|
||||
Get service-specific data from a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
service_name: Name of the service to extract data for
|
||||
|
||||
Returns:
|
||||
Any: Service-specific data if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task or service data is not found
|
||||
"""
|
||||
return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name)
|
||||
|
||||
def get_service_data_by_mail_id(self, mail_id: str, service_name: str):
|
||||
"""
|
||||
Get service-specific data from a task by mail ID
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task
|
||||
service_name: Name of the service to extract data for
|
||||
|
||||
Returns:
|
||||
Any: Service-specific data if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task or service data is not found
|
||||
"""
|
||||
return self.service_retriever.get_service_data_by_mail_id(mail_id, service_name)
|
||||
|
||||
def store_task_with_uuid(self, task_uuid: str, service_name: str, task_data: dict) -> bool:
|
||||
"""
|
||||
Store a task with UUID indexing
|
||||
|
||||
Args:
|
||||
task_uuid: UUID for the task
|
||||
service_name: Service name for the task
|
||||
task_data: Dictionary containing task data
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
ValueError: If the task data is invalid or storage fails
|
||||
"""
|
||||
return self.service_retriever.store_task_with_uuid(task_uuid, service_name, task_data)
|
||||
|
||||
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
|
||||
"""
|
||||
Update the status of a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to update
|
||||
is_completed: Whether the task is completed
|
||||
status: New status for the task
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task is not found
|
||||
ValueError: If the update fails
|
||||
"""
|
||||
return self.service_retriever.update_task_status(task_uuid, is_completed, status)
|
||||
|
||||
def process_mail(self, mail_id: str, mail_data: dict, service_prefix: str, counter: int) -> dict:
|
||||
"""
|
||||
Process mail data and store it in Redis
|
||||
|
||||
Args:
|
||||
mail_id: The ID of the mail
|
||||
mail_data: Dictionary containing mail data
|
||||
service_prefix: Service prefix for the mail
|
||||
|
||||
Returns:
|
||||
dict: Result of the operation with status and action
|
||||
"""
|
||||
try:
|
||||
if self.redis_handler.sadd(f'{ConfigServices.TASK_SEEN_PREFIX}', mail_id):
|
||||
counter += 1
|
||||
task_uuid = uuid4().hex
|
||||
mail_without_attachments = mail_data.copy()
|
||||
attachments = mail_without_attachments.pop('attachments', [])
|
||||
create_task = dict(task_uuid=task_uuid, service_name=service_prefix, mail_reader=mail_without_attachments, mail_parser=attachments)
|
||||
self.service_retriever.create_task_with_uuid(**create_task)
|
||||
return {'status': 'success', 'action': 'stored_new_mail', 'counter': counter}
|
||||
else:
|
||||
try:
|
||||
task = self.service_retriever.get_task_by_mail_id(mail_id)
|
||||
if task is None and task.data and task.data.MailReader:
|
||||
stored_id = task.data.MailReader.id
|
||||
if stored_id != mail_id:
|
||||
return {'status': 'error', 'action': 'id_mismatch', 'stored_id': stored_id}
|
||||
return {'status': 'success', 'action': 'checked_existing_mail', 'is_completed': task.is_completed if task else False}
|
||||
except FileNotFoundError:
|
||||
return {'status': 'error', 'action': 'not_found', 'error': f'Mail with ID {mail_id} not found in index'}
|
||||
except Exception as e:
|
||||
logger.error(f"Mail Reader Service Error processing mail {mail_id}: {str(e)}")
|
||||
return {'status': 'error', 'action': 'exception', 'error': str(e)}
|
||||
|
||||
|
||||
class MailParserService:
|
||||
"""
|
||||
Mail Parser Service
|
||||
"""
|
||||
|
||||
# Singleton instance
|
||||
_instance = None
|
||||
REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS
|
||||
|
||||
@classmethod
|
||||
def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5):
|
||||
"""
|
||||
Handle Redis reconnection with exponential backoff based on consecutive errors
|
||||
|
||||
Args:
|
||||
consecutive_errors: Number of consecutive errors encountered
|
||||
max_consecutive_errors: Threshold for extended sleep time
|
||||
|
||||
Returns:
|
||||
tuple: (MainRedisHandler instance, bool indicating if extended sleep is needed)
|
||||
"""
|
||||
# Delegate to RedisHandler's reconnection logic
|
||||
redis_handler, need_extended_sleep = RedisHandler.handle_reconnection(
|
||||
consecutive_errors=consecutive_errors,
|
||||
max_consecutive_errors=max_consecutive_errors
|
||||
)
|
||||
|
||||
# If Redis handler was successfully reconnected, create a new MainRedisHandler instance
|
||||
if redis_handler:
|
||||
# Reset the singleton instance to force re-initialization
|
||||
cls._instance = None
|
||||
main_handler = cls()
|
||||
return main_handler, need_extended_sleep
|
||||
|
||||
return None, need_extended_sleep
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(MailParserService, cls).__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
# Initialize only once
|
||||
if hasattr(self, '_initialized') and self._initialized:
|
||||
return
|
||||
|
||||
# Use RedisHandler singleton
|
||||
self.service_retriever = ServiceTaskRetriever()
|
||||
self._initialized = True
|
||||
|
||||
def fetch_all_tasks(self) -> list[RedisTaskObject]:
|
||||
return self.service_retriever.fetch_all_tasks()
|
||||
|
||||
def ensure_connection(self):
|
||||
"""
|
||||
Ensure Redis connection is established
|
||||
|
||||
Returns:
|
||||
bool: True if connection is established, False otherwise
|
||||
"""
|
||||
return self.redis_handler.ensure_connection()
|
||||
|
||||
def _check_redis_connection(self) -> bool:
|
||||
"""
|
||||
Check if Redis connection is alive using RedisHandler
|
||||
|
||||
Returns:
|
||||
True if connection is alive, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Use RedisHandler to check connection
|
||||
connection_status = self.redis_handler.ensure_connection()
|
||||
if connection_status:
|
||||
logger.info("Redis connection established via RedisHandler")
|
||||
else:
|
||||
logger.error("Redis connection check failed via RedisHandler")
|
||||
return connection_status
|
||||
except RedisHandler.REDIS_EXCEPTIONS as e:
|
||||
logger.error(f"Redis connection failed: {str(e)}")
|
||||
return False
|
||||
|
||||
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
|
||||
"""
|
||||
Get a task object by its UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to retrieve
|
||||
|
||||
Returns:
|
||||
RedisTaskObject: The task object if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the UUID index or task is not found
|
||||
"""
|
||||
return self.service_retriever.get_task_by_uuid(task_uuid)
|
||||
|
||||
def get_service_data_by_uuid(self, task_uuid: str, service_name: str):
|
||||
"""
|
||||
Get service-specific data from a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
service_name: Name of the service to extract data for
|
||||
|
||||
Returns:
|
||||
Any: Service-specific data if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task or service data is not found
|
||||
"""
|
||||
return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name)
|
||||
|
||||
def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool:
|
||||
"""
|
||||
Update service-specific data in a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to update
|
||||
service_name: Name of the service data to update
|
||||
service_data: New service data
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task is not found
|
||||
ValueError: If the update fails or service name is invalid
|
||||
"""
|
||||
return self.service_retriever.update_service_data(task_uuid, service_name, service_data)
|
||||
|
||||
def change_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False) -> bool:
|
||||
"""
|
||||
Update the service of a task by UUID
|
||||
"""
|
||||
return self.service_retriever.update_task_service(task_uuid, service_name, status, completed)
|
||||
|
||||
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
|
||||
"""
|
||||
Update the status of a task by UUID
|
||||
"""
|
||||
return self.service_retriever.update_task_status(task_uuid, is_completed, status)
|
||||
Reference in New Issue
Block a user