Compare commits
No commits in common. "7452e05a92fa17a9fde0939c8852a496c620ed70" and "81184a8acc32f0fc3f686bf6638c0fb6c99ec973" have entirely different histories.
7452e05a92
...
81184a8acc
|
|
@ -54,10 +54,3 @@ pids
|
|||
|
||||
# Diagnostic reports (https://nodejs.org/api/report.html)
|
||||
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
|
||||
env
|
||||
.env
|
||||
ServicesRunner/AccountRecordServices/Finder/Iban/.prisma-cache
|
||||
venv/
|
||||
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
|
|
@ -1,3 +0,0 @@
|
|||
{
|
||||
"git.ignoreLimitWarning": true
|
||||
}
|
||||
|
|
@ -1,14 +0,0 @@
|
|||
__pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
*.pyd
|
||||
*.db
|
||||
*.sqlite3
|
||||
*.log
|
||||
*.env
|
||||
venv/
|
||||
.env.*
|
||||
node_modules/
|
||||
.prisma/
|
||||
.prisma-cache/
|
||||
ServicesRunnner/AccountRecordServices/Test/venv/
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
FROM python:3.12-slim
|
||||
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
ENV PYTHONDONTWRITEBYTECODE=1
|
||||
ENV VIRTUAL_ENV=/opt/venv
|
||||
ENV PRISMA_SCHEMA_PATH=/app/Depends/schema.prisma
|
||||
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
|
||||
ENV PYTHONPATH=/app
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends gcc curl && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY ServicesRunner/Depends/ /app/Depends
|
||||
|
||||
COPY ServicesRunner/requirements.txt /app/requirements.txt
|
||||
|
||||
COPY ServicesRunner/AccountRecordServices/Finder/Iban/entrypoint.sh /entrypoint.sh
|
||||
COPY ServicesRunner/AccountRecordServices/Finder/Iban /app/
|
||||
|
||||
RUN chmod +x /entrypoint.sh
|
||||
|
||||
CMD ["/entrypoint.sh"]
|
||||
|
|
@ -1,74 +0,0 @@
|
|||
import uvloop
|
||||
import asyncio
|
||||
import sys
|
||||
import signal
|
||||
import time
|
||||
from datetime import datetime
|
||||
from Depends.prisma_client import prisma_client, disconnect_prisma
|
||||
|
||||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||
|
||||
|
||||
# sys.stdout.reconfigure(line_buffering=True) # alternatif: python -u veya PYTHONUNBUFFERED=1
|
||||
|
||||
|
||||
async def tick():
|
||||
start = time.time()
|
||||
print(f"[{datetime.now()}] Attempting database query...")
|
||||
async with prisma_client() as db:
|
||||
rows = await db.account_records.find_many(
|
||||
take=5, skip=0, order=[{"bank_date": "desc"}]
|
||||
)
|
||||
print(f"[{datetime.now()}] Query completed in {time.time()-start:.2f}s")
|
||||
|
||||
for i, r in enumerate(rows):
|
||||
# Dilersen burada formatı değiştir
|
||||
print(f" Row: {i} | id={r.id} bank_date={r.bank_date} currency_value={r.currency_value}")
|
||||
print("-" * 80)
|
||||
|
||||
|
||||
async def service():
|
||||
print(f"[{datetime.now()}] IBAN Finder service starting")
|
||||
try:
|
||||
iteration = 0
|
||||
while True:
|
||||
iteration += 1
|
||||
print(f"\n[{datetime.now()}] Loop iteration {iteration}")
|
||||
try:
|
||||
await tick()
|
||||
except Exception as e:
|
||||
print(f"[{datetime.now()}] Error in service tick: {e}")
|
||||
await asyncio.sleep(1) # bloklamayan bekleme
|
||||
finally:
|
||||
# Her durumda DB'yi temiz kapat
|
||||
await disconnect_prisma()
|
||||
print(f"[{datetime.now()}] Cleaning up database connection...")
|
||||
|
||||
|
||||
async def _graceful_shutdown(sig: signal.Signals):
|
||||
print(f"\n[{datetime.now()}] Shutting down due to signal: {sig.name}")
|
||||
# Burada istersen tüm pending task'leri iptal edebilirsin:
|
||||
# for t in asyncio.all_tasks():
|
||||
# if t is not asyncio.current_task():
|
||||
# t.cancel()
|
||||
await disconnect_prisma()
|
||||
|
||||
|
||||
def _install_signal_handlers(loop: asyncio.AbstractEventLoop):
|
||||
# Linux/Unix: SIGINT (Ctrl+C) ve SIGTERM (docker stop) için kibar kapanış
|
||||
for s in (signal.SIGINT, signal.SIGTERM):
|
||||
loop.add_signal_handler(s, lambda s=s: asyncio.create_task(_graceful_shutdown(s)))
|
||||
|
||||
|
||||
async def main():
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
_install_signal_handlers(loop)
|
||||
except NotImplementedError:
|
||||
# (Gerekirse Windows vs., ama sen Linux/Docker kullanıyorsun)
|
||||
pass
|
||||
await service()
|
||||
|
||||
if __name__ == "__main__":
|
||||
# uvloop policy zaten yukarıda set edildi; burada normal asyncio.run kullanıyoruz
|
||||
asyncio.run(main())
|
||||
|
|
@ -1,18 +0,0 @@
|
|||
#!/bin/sh
|
||||
|
||||
VENV_PATH="/opt/venv"
|
||||
REQUIREMENTS_PATH="/app/requirements.txt"
|
||||
SCHEMA_PATH="/app/Depends/schema.prisma"
|
||||
PRISMA_BINARY_PATH="/root/.cache/prisma-python/binaries"
|
||||
|
||||
if [ ! -x "$VENV_PATH/bin/python" ]; then
|
||||
python -m venv "$VENV_PATH"
|
||||
"$VENV_PATH/bin/pip" install --no-cache-dir -r "$REQUIREMENTS_PATH"
|
||||
"$VENV_PATH/bin/prisma" generate --schema "$SCHEMA_PATH"
|
||||
fi
|
||||
|
||||
if ! find "$PRISMA_BINARY_PATH" -type f -name "prisma-query-engine-debian-openssl-3.0.x" | grep -q .; then
|
||||
"$VENV_PATH/bin/prisma" py fetch
|
||||
fi
|
||||
|
||||
exec "$VENV_PATH/bin/python" -u app.py
|
||||
|
|
@ -1 +0,0 @@
|
|||
3.12
|
||||
|
|
@ -1,22 +0,0 @@
|
|||
FROM python:3.12-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy only the dependency files first to leverage Docker cache
|
||||
COPY ServicesRunner/AccountRecordServices/Finder/Parser/pyproject.toml .
|
||||
|
||||
# Install dependencies
|
||||
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir .
|
||||
|
||||
# Copy only the necessary directories
|
||||
COPY ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank /app/ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank
|
||||
COPY ServicesRunner/Depends /app/ServicesRunner/Depends
|
||||
|
||||
# Set the Python path to include the root directory and ServicesRunner
|
||||
ENV PYTHONPATH=/app:/app/ServicesRunner
|
||||
|
||||
# Set working directory to the IsBank service directory
|
||||
WORKDIR /app/ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank
|
||||
|
||||
# Run the application
|
||||
CMD ["python", "app.py"]
|
||||
|
|
@ -1,211 +0,0 @@
|
|||
import sys
|
||||
import logging
|
||||
import pandas as pd
|
||||
|
||||
from time import sleep
|
||||
from datetime import datetime
|
||||
from io import BytesIO
|
||||
from ServicesRunner.Depends.config import ConfigServices, MailParser, RedisTaskObject, Status
|
||||
from base64 import b64decode
|
||||
from unidecode import unidecode
|
||||
|
||||
from Depends.service_handler import MailParserService
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[logging.StreamHandler(sys.stdout), logging.FileHandler('isbank_parser_service.log')]
|
||||
)
|
||||
logger = logging.getLogger('IsBank_Parser_Service')
|
||||
|
||||
|
||||
class IsbankMailParserService:
|
||||
|
||||
def __init__(self):
|
||||
self.mail_parser_service = MailParserService()
|
||||
|
||||
def try_dataframe_extract_with_xlsx(self, binary_data: BytesIO):
|
||||
try:
|
||||
df = pd.read_excel(binary_data, engine='openpyxl')
|
||||
return df
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
def try_dataframe_extract_with_xls(self, binary_data: BytesIO):
|
||||
try:
|
||||
df = pd.read_excel(binary_data, engine='xlrd')
|
||||
return df
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
def try_dataframe_extract_else(self, binary_data: BytesIO):
|
||||
try:
|
||||
df = pd.read_excel(binary_data, engine='openpyxl')
|
||||
except Exception as e1:
|
||||
try:
|
||||
binary_data.seek(0)
|
||||
df = pd.read_excel(binary_data, engine='xlrd')
|
||||
except Exception as e2:
|
||||
return None
|
||||
return df
|
||||
|
||||
def find_ibans(self, excel_frame: pd.DataFrame, file_name: str) -> list[dict]:
|
||||
"""Parse Excel file data.
|
||||
|
||||
Args:
|
||||
excel_frame: DataFrame containing Excel data
|
||||
|
||||
Returns:
|
||||
list[dict]: List of parsed data dictionaries
|
||||
"""
|
||||
iban, data_list = "", []
|
||||
try:
|
||||
for row in excel_frame.itertuples():
|
||||
if "IBAN" in str(row[3]).upper():
|
||||
iban = str(row[5]).replace(" ", "")
|
||||
if not str(row[1]) == "nan" and not str(row[2]) == "nan":
|
||||
if len(str(row[1]).split("/")) > 2:
|
||||
data_list.append(
|
||||
dict(
|
||||
filename=file_name,
|
||||
iban=str(iban),
|
||||
bank_date=datetime.strptime(str(row[1]), "%d/%m/%Y-%H:%M:%S").strftime("%Y-%m-%d %H:%M:%S"),
|
||||
channel_branch=unidecode(str(row[3])),
|
||||
currency_value=(float(str(row[4]).replace(",", "")) if row[4] else 0),
|
||||
balance=(float(str(row[5]).replace(",", "")) if row[5] else 0),
|
||||
additional_balance=(float(str(row[6]).replace(",", "")) if row[6] else 0),
|
||||
process_name=str(row[7]),
|
||||
process_type=unidecode(str(row[8])),
|
||||
process_comment=unidecode(str(row[9])),
|
||||
bank_reference_code=str(row[15]),
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"[PARSER_SERVICE] Error parsing Excel file: {str(e)}")
|
||||
return data_list
|
||||
|
||||
def parse_dataframes(self, dataframe: pd.DataFrame, task: RedisTaskObject, attachment_data: MailParser):
|
||||
file_name = attachment_data.filename
|
||||
data_list = self.find_ibans(dataframe, file_name)
|
||||
print(f"[PARSER_SERVICE] Successfully parsed {len(data_list)} records from Excel file")
|
||||
if data_list:
|
||||
self.mail_parser_service.update_service_data(task.task, ConfigServices.SERVICE_PREFIX_FINDER_IBAN, data_list)
|
||||
print(f"Updated service data for task {task.task} with {len(data_list)} records")
|
||||
self.mail_parser_service.change_service(task.task, ConfigServices.SERVICE_PREFIX_MAIL_PARSER, Status.COMPLETED, True)
|
||||
return True
|
||||
self.mail_parser_service.change_service(task.task, ConfigServices.SERVICE_PREFIX_MAIL_PARSER, Status.FAILED, True)
|
||||
return False
|
||||
|
||||
def process_task(self, active_task: RedisTaskObject):
|
||||
"""Process a task object using the MailParserService
|
||||
|
||||
Args:
|
||||
task: RedisTaskObject or task dictionary to process
|
||||
"""
|
||||
|
||||
try:
|
||||
for data in active_task.data.MailParser:
|
||||
attachment_data: MailParser = data
|
||||
if not attachment_data or not attachment_data.data:
|
||||
print(f"[PARSER_SERVICE] No data found for task {active_task.task}")
|
||||
continue
|
||||
binary_data: bytes = b64decode(attachment_data.data)
|
||||
excel_data = BytesIO(binary_data)
|
||||
df = self.try_dataframe_extract_with_xlsx(excel_data)
|
||||
if df is None:
|
||||
excel_data.seek(0)
|
||||
df = self.try_dataframe_extract_with_xls(excel_data)
|
||||
if df is None:
|
||||
excel_data.seek(0)
|
||||
df = self.try_dataframe_extract_else(excel_data)
|
||||
|
||||
if df is not None:
|
||||
self.parse_dataframes(df, active_task, attachment_data)
|
||||
print(f"[PARSER_SERVICE] Completed processing task {active_task.task}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[PARSER_SERVICE] Error processing task: {str(e)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.info("Starting IsBank Parser Service")
|
||||
print(f"Starting IsBank Parser Service.")
|
||||
|
||||
# Initialize service
|
||||
parser = IsbankMailParserService()
|
||||
|
||||
# Configurable parameters
|
||||
normal_sleep_time = 10 # seconds between normal operations
|
||||
error_sleep_time = 30 # seconds to wait after an error before retrying
|
||||
max_consecutive_errors = 5 # maximum number of consecutive errors before longer pause
|
||||
extended_error_sleep = 120 # seconds to wait after hitting max consecutive errors
|
||||
consecutive_errors = 0
|
||||
|
||||
# Main service loop
|
||||
while True:
|
||||
try:
|
||||
# Fetch all tasks
|
||||
all_tasks = parser.mail_parser_service.fetch_all_tasks()
|
||||
|
||||
if all_tasks and len(all_tasks) > 0:
|
||||
logger.info(f"Found {len(all_tasks)} tasks to process")
|
||||
|
||||
# Process each task
|
||||
for active_task in all_tasks:
|
||||
if active_task.service == ConfigServices.SERVICE_PREFIX_MAIL_PARSER and active_task.completed:
|
||||
# logger.info(f"Task {active_task.task} is already processed.")
|
||||
continue
|
||||
|
||||
logger.info(f"Processing task {active_task.task}")
|
||||
parser.process_task(active_task)
|
||||
else:
|
||||
logger.info("No tasks found to process")
|
||||
|
||||
# Reset error counter on success
|
||||
if consecutive_errors > 0:
|
||||
logger.info(f"Service recovered after {consecutive_errors} consecutive errors")
|
||||
consecutive_errors = 0
|
||||
|
||||
# Normal operation sleep
|
||||
sleep(normal_sleep_time)
|
||||
|
||||
except MailParserService.REDIS_EXCEPTIONS as e:
|
||||
# Redis-specific errors
|
||||
consecutive_errors += 1
|
||||
logger.error(f"Redis error (attempt {consecutive_errors}): {str(e)}")
|
||||
|
||||
# Use centralized reconnection handler from RedisHandler
|
||||
mail_parser_service, need_extended_sleep = MailParserService.handle_reconnection(
|
||||
consecutive_errors=consecutive_errors, max_consecutive_errors=max_consecutive_errors
|
||||
)
|
||||
|
||||
if mail_parser_service:
|
||||
# Update parser's mail parser service with the new instance
|
||||
parser.mail_parser_service = mail_parser_service
|
||||
|
||||
# Sleep based on error count
|
||||
if need_extended_sleep:
|
||||
sleep(extended_error_sleep)
|
||||
else:
|
||||
sleep(error_sleep_time)
|
||||
|
||||
except Exception as e:
|
||||
# Any other unexpected errors
|
||||
consecutive_errors += 1
|
||||
logger.error(f"Unexpected error (attempt {consecutive_errors}): {str(e)}")
|
||||
|
||||
# For any other error, try to reinitialize everything after some delay
|
||||
if consecutive_errors >= max_consecutive_errors:
|
||||
logger.warning(f"Hit {max_consecutive_errors} consecutive errors, reinitializing service")
|
||||
try:
|
||||
# Reinitialize the service directly
|
||||
parser = IsbankMailParserService()
|
||||
logger.info("Successfully reinitialized parser service")
|
||||
consecutive_errors = 0 # Reset counter after reinitialization
|
||||
except Exception as reinit_error:
|
||||
logger.error(f"Service reinitialization failed: {str(reinit_error)}")
|
||||
|
||||
sleep(extended_error_sleep)
|
||||
else:
|
||||
# For fewer consecutive errors, just retry
|
||||
sleep(error_sleep_time)
|
||||
|
|
@ -1,6 +0,0 @@
|
|||
def main():
|
||||
print("Hello from parser!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,15 +0,0 @@
|
|||
[project]
|
||||
name = "parser"
|
||||
version = "0.1.0"
|
||||
description = "Add your description here"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
"arrow>=1.3.0",
|
||||
"pandas>=2.2.3",
|
||||
"redis>=6.4.0",
|
||||
"unidecode>=1.3.8",
|
||||
"xlrd>=2.0.1",
|
||||
"openpyxl>=3.1.2",
|
||||
"pydantic-settings>=2.8.1",
|
||||
]
|
||||
|
|
@ -1 +0,0 @@
|
|||
3.12
|
||||
|
|
@ -1,22 +0,0 @@
|
|||
FROM python:3.12-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy only the dependency files first to leverage Docker cache
|
||||
COPY ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/pyproject.toml .
|
||||
|
||||
# Install dependencies
|
||||
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir .
|
||||
|
||||
# Copy only the necessary directories
|
||||
COPY ServicesRunner/AccountRecordServices/Reader/Banks /app/ServicesRunner/AccountRecordServices/Reader/Banks
|
||||
COPY ServicesRunner/Depends /app/ServicesRunner/Depends
|
||||
|
||||
# Set the Python path to include the root directory and ServicesRunner
|
||||
ENV PYTHONPATH=/app:/app/ServicesRunner
|
||||
|
||||
# Set working directory to the IsBank service directory
|
||||
WORKDIR /app/ServicesRunner/AccountRecordServices/Reader/Banks/IsBank
|
||||
|
||||
# Run the application
|
||||
CMD ["python", "app.py"]
|
||||
|
|
@ -1,176 +0,0 @@
|
|||
import sys
|
||||
import socket
|
||||
import logging
|
||||
from time import sleep
|
||||
from config import IsBankConfig
|
||||
from Depends.mail_handler import EmailReaderService, EmailServiceRunner
|
||||
from Depends.service_handler import MailReaderService
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[logging.StreamHandler(sys.stdout), logging.FileHandler('isbank_email_service.log')]
|
||||
)
|
||||
logger = logging.getLogger('IsBank_Email_Service')
|
||||
|
||||
|
||||
# Main application code
|
||||
def main():
|
||||
"""Main entry point for the IsBank email service"""
|
||||
try:
|
||||
redis_handler = MailReaderService()
|
||||
email_service = EmailReaderService(IsBankConfig())
|
||||
email_service.login_and_connect()
|
||||
runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service)
|
||||
runner.fetch_and_set_mails()
|
||||
runner.drop()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error in main function: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
def initialize_service():
|
||||
"""Initialize the service with proper error handling"""
|
||||
try:
|
||||
# Create singleton instances directly
|
||||
logger.info("Creating Redis handler singleton")
|
||||
redis_handler = MailReaderService()
|
||||
|
||||
logger.info("Creating EmailReaderService")
|
||||
email_service = EmailReaderService(IsBankConfig())
|
||||
|
||||
# Initialize email service and connect
|
||||
logger.info("Connecting to email service")
|
||||
email_service.login_and_connect()
|
||||
|
||||
# Create email service runner with the singletons
|
||||
logger.info("Creating EmailServiceRunner")
|
||||
runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service)
|
||||
|
||||
if runner:
|
||||
logger.info("Email service runner initialized successfully")
|
||||
return runner
|
||||
else:
|
||||
logger.error("Failed to initialize email service runner")
|
||||
# Sleep before retry to avoid rapid failure loops
|
||||
sleep(5)
|
||||
return initialize_service()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Service initialization failed: {str(e)}")
|
||||
# Sleep before retry to avoid rapid failure loops
|
||||
sleep(5)
|
||||
return initialize_service()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.info("Starting IsBank Email Service")
|
||||
print(f"Starting Service Mail Reader.")
|
||||
|
||||
# Initialize service
|
||||
runner = initialize_service()
|
||||
|
||||
# Configurable parameters
|
||||
normal_sleep_time = 10 # seconds between normal operations
|
||||
error_sleep_time = 30 # seconds to wait after an error before retrying
|
||||
max_consecutive_errors = 5 # maximum number of consecutive errors before longer pause
|
||||
extended_error_sleep = 120 # seconds to wait after hitting max consecutive errors
|
||||
consecutive_errors = 0
|
||||
|
||||
# Main service loop
|
||||
while True:
|
||||
try:
|
||||
# Main processing
|
||||
runner.fetch_and_set_mails()
|
||||
|
||||
# Reset error counter on success
|
||||
if consecutive_errors > 0:
|
||||
logger.info(f"Service recovered after {consecutive_errors} consecutive errors")
|
||||
consecutive_errors = 0
|
||||
|
||||
# Normal operation sleep
|
||||
sleep(normal_sleep_time)
|
||||
|
||||
except MailReaderService.REDIS_EXCEPTIONS as e:
|
||||
# Redis-specific errors
|
||||
consecutive_errors += 1
|
||||
logger.error(f"Redis error (attempt {consecutive_errors}): {str(e)}")
|
||||
|
||||
# Use centralized reconnection handler from RedisHandler
|
||||
redis_handler, need_extended_sleep = MailReaderService.handle_reconnection(
|
||||
consecutive_errors=consecutive_errors, max_consecutive_errors=max_consecutive_errors
|
||||
)
|
||||
|
||||
if redis_handler:
|
||||
# Update runner's redis handler with the new instance
|
||||
runner.redis_handler = redis_handler
|
||||
runner.redis_connected = False # Will trigger reconnection on next cycle
|
||||
|
||||
# Sleep based on error count
|
||||
if need_extended_sleep:
|
||||
sleep(extended_error_sleep)
|
||||
else:
|
||||
sleep(error_sleep_time)
|
||||
|
||||
except socket.error as e:
|
||||
# Email connection errors
|
||||
consecutive_errors += 1
|
||||
logger.error(f"Email connection error (attempt {consecutive_errors}): {str(e)}")
|
||||
|
||||
# Try to re-establish email connection
|
||||
try:
|
||||
logger.info("Attempting to re-establish email connection...")
|
||||
# Create new email service directly
|
||||
email_service = EmailReaderService(IsBankConfig())
|
||||
email_service.login_and_connect()
|
||||
|
||||
# Create new runner with existing Redis handler and new email service
|
||||
redis_handler = runner.redis_handler # Preserve existing Redis handler
|
||||
runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service)
|
||||
logger.info("Successfully re-established email connection")
|
||||
except Exception as email_retry_error:
|
||||
logger.error(f"Failed to re-establish email connection: {str(email_retry_error)}")
|
||||
|
||||
# Determine sleep time based on consecutive errors
|
||||
if consecutive_errors >= max_consecutive_errors:
|
||||
logger.warning(f"Hit {max_consecutive_errors} consecutive email errors, taking longer pause")
|
||||
sleep(extended_error_sleep)
|
||||
else:
|
||||
sleep(error_sleep_time)
|
||||
|
||||
except Exception as e:
|
||||
# Any other unexpected errors
|
||||
consecutive_errors += 1
|
||||
logger.error(f"Unexpected error (attempt {consecutive_errors}): {str(e)}")
|
||||
|
||||
# For any other error, try to reinitialize everything after some delay
|
||||
if consecutive_errors >= max_consecutive_errors:
|
||||
logger.warning(f"Hit {max_consecutive_errors} consecutive errors, reinitializing service")
|
||||
try:
|
||||
# Try to clean up existing connections
|
||||
try:
|
||||
runner.drop()
|
||||
except Exception as cleanup_error:
|
||||
logger.warning(f"Error during cleanup: {str(cleanup_error)}")
|
||||
|
||||
# Reinitialize the service directly
|
||||
redis_handler = MailReaderService()
|
||||
email_service = EmailReaderService(IsBankConfig())
|
||||
email_service.login_and_connect()
|
||||
runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service)
|
||||
|
||||
if runner:
|
||||
logger.info("Successfully reinitialized email service runner")
|
||||
consecutive_errors = 0 # Reset counter after reinitialization
|
||||
else:
|
||||
logger.error("Failed to reinitialize email service runner")
|
||||
except Exception as reinit_error:
|
||||
logger.error(f"Service reinitialization failed: {str(reinit_error)}")
|
||||
|
||||
sleep(extended_error_sleep)
|
||||
else:
|
||||
# For fewer consecutive errors, just retry the current runner
|
||||
print(f"Error: {str(e)}")
|
||||
sleep(error_sleep_time)
|
||||
|
|
@ -1,16 +0,0 @@
|
|||
import os
|
||||
from Depends.config import ConfigServices
|
||||
|
||||
class IsBankConfig:
|
||||
|
||||
MAILBOX: str = os.getenv("MAILBOX", "bilgilendirme@ileti.isbank.com.tr")
|
||||
AUTHORIZE_IBAN: str = os.getenv("AUTHORIZE_IBAN", "4245-0093333")
|
||||
NO_ATTACHMENT_FOLDER: str = "NoAttachment"
|
||||
COMPLETED_FOLDER: str = "Completed"
|
||||
TASK_DATA_PREFIX: str = ConfigServices.MAIN_TASK_PREFIX
|
||||
TASK_MAILID_INDEX_PREFIX: str = ConfigServices.TASK_MAILID_INDEX_PREFIX
|
||||
TASK_UUID_INDEX_PREFIX: str = ConfigServices.TASK_UUID_INDEX_PREFIX
|
||||
TASK_SEEN_PREFIX: str = ConfigServices.TASK_SEEN_PREFIX
|
||||
SERVICE_PREFIX: str = ConfigServices.SERVICE_PREFIX_MAIL_READER
|
||||
NEXT_SERVICE_PREFIX: str = ConfigServices.SERVICE_PREFIX_MAIL_PARSER
|
||||
|
||||
|
|
@ -1,13 +0,0 @@
|
|||
[project]
|
||||
name = "isbank-email-reader"
|
||||
version = "0.1.0"
|
||||
description = "IsBank Email Reader Service"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
"arrow>=1.3.0",
|
||||
"redis>=6.4.0",
|
||||
"pydantic>=2.0.0",
|
||||
"pydantic-settings>=2.0.0",
|
||||
"email-validator>=2.0.0",
|
||||
]
|
||||
|
|
@ -1,4 +0,0 @@
|
|||
Uses
|
||||
|
||||
- MainRedisHandler
|
||||
- MailHandler
|
||||
|
|
@ -1,9 +0,0 @@
|
|||
[project]
|
||||
name = "reader"
|
||||
version = "0.1.0"
|
||||
description = "Add your description here"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
"redis>=6.4.0",
|
||||
]
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
version = 1
|
||||
revision = 3
|
||||
requires-python = ">=3.12"
|
||||
|
||||
[[package]]
|
||||
name = "reader"
|
||||
version = "0.1.0"
|
||||
source = { virtual = "." }
|
||||
dependencies = [
|
||||
{ name = "redis" },
|
||||
]
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [{ name = "redis", specifier = ">=6.4.0" }]
|
||||
|
||||
[[package]]
|
||||
name = "redis"
|
||||
version = "6.4.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/0d/d6/e8b92798a5bd67d659d51a18170e91c16ac3b59738d91894651ee255ed49/redis-6.4.0.tar.gz", hash = "sha256:b01bc7282b8444e28ec36b261df5375183bb47a07eb9c603f284e89cbc5ef010", size = 4647399, upload-time = "2025-08-07T08:10:11.441Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/e8/02/89e2ed7e85db6c93dfa9e8f691c5087df4e3551ab39081a4d7c6d1f90e05/redis-6.4.0-py3-none-any.whl", hash = "sha256:f0544fa9604264e9464cdf4814e7d4830f74b165d52f2a330a760a88dd248b7f", size = 279847, upload-time = "2025-08-07T08:10:09.84Z" },
|
||||
]
|
||||
|
|
@ -1,14 +0,0 @@
|
|||
__pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
*.pyd
|
||||
*.db
|
||||
*.sqlite3
|
||||
*.log
|
||||
*.env
|
||||
venv/
|
||||
.env.*
|
||||
node_modules/
|
||||
.prisma/
|
||||
.prisma-cache/
|
||||
ServicesRunnner/AccountRecordServices/Test/venv/
|
||||
|
|
@ -1,20 +0,0 @@
|
|||
FROM python:3.12-slim
|
||||
|
||||
ENV VIRTUAL_ENV=/app/ServicesRunner/Depends/opt/venv
|
||||
ENV PRISMA_SCHEMA_PATH=/app/ServicesRunner/Depends/schema.prisma
|
||||
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
|
||||
ENV PYTHONPATH=/app
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends gcc curl && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY ServicesRunner/requirements.txt /app/ServicesRunner/requirements.txt
|
||||
COPY ServicesRunner/Depends/schema.prisma /app/ServicesRunner/Depends/schema.prisma
|
||||
COPY ServicesRunner/Depends/ /app/ServicesRunner/Depends/
|
||||
COPY ServicesRunner/AccountRecordServices/Test/ /app/ServicesRunner/AccountRecordServices/Test/
|
||||
COPY ServicesRunner/AccountRecordServices/Test/entrypoint.sh /entrypoint.sh
|
||||
|
||||
RUN chmod +x /entrypoint.sh
|
||||
|
||||
CMD ["/entrypoint.sh"]
|
||||
|
|
@ -1,29 +0,0 @@
|
|||
import asyncio
|
||||
import sys
|
||||
from Depends.prisma_client import prisma_client
|
||||
|
||||
async def my_function():
|
||||
try:
|
||||
async with prisma_client() as db:
|
||||
result = await db.account_records.find_many(
|
||||
take=5,
|
||||
skip=0,
|
||||
order=[{"bank_date": "desc"}]
|
||||
)
|
||||
|
||||
selected_result = [{"id": record.id, "bank_date": record.bank_date, "currency_value": record.currency_value} for record in result]
|
||||
for record in selected_result:
|
||||
print(record)
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
raise
|
||||
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
print("I am online")
|
||||
asyncio.sleep(30)
|
||||
# asyncio.run(my_function())
|
||||
# asyncio.run(my_function())
|
||||
# while True:
|
||||
# asyncio.sleep(5)
|
||||
|
||||
|
|
@ -1,18 +0,0 @@
|
|||
#!/bin/sh
|
||||
|
||||
VENV_PATH="/opt/venv"
|
||||
REQUIREMENTS_PATH="/app/ServicesRunner/requirements.txt"
|
||||
SCHEMA_PATH="/app/ServicesRunner/Depends/schema.prisma"
|
||||
PRISMA_BINARY_PATH="/app/ServicesRunner/Depends/root/.cache/prisma-python/binaries"
|
||||
|
||||
if [ ! -x "$VENV_PATH/bin/python" ]; then
|
||||
python -m venv "$VENV_PATH"
|
||||
"$VENV_PATH/bin/pip" install --no-cache-dir -r "$REQUIREMENTS_PATH"
|
||||
"$VENV_PATH/bin/prisma" generate --schema "$SCHEMA_PATH"
|
||||
fi
|
||||
|
||||
if ! find "$PRISMA_BINARY_PATH" -type f -name "prisma-query-engine-debian-openssl-3.0.x" | grep -q .; then
|
||||
"$VENV_PATH/bin/prisma" py fetch
|
||||
fi
|
||||
|
||||
exec "$VENV_PATH/bin/python" -m ServicesRunner.AccountRecordServices.Test.app
|
||||
|
|
@ -1,116 +0,0 @@
|
|||
import os
|
||||
|
||||
from pydantic import BaseModel
|
||||
from typing import Any, List, Optional, Union
|
||||
|
||||
|
||||
class FromToHeader(BaseModel):
|
||||
|
||||
display_name: Optional[str]
|
||||
username: Optional[str]
|
||||
domain: Optional[str]
|
||||
mail: Optional[str]
|
||||
|
||||
|
||||
class MailReader(BaseModel):
|
||||
|
||||
id: str
|
||||
subject: str
|
||||
from_: FromToHeader
|
||||
to: List[FromToHeader]
|
||||
date: str
|
||||
body_text: str
|
||||
|
||||
|
||||
class MailParser(BaseModel):
|
||||
|
||||
filename: str
|
||||
content_type: str
|
||||
charset: str
|
||||
data: str
|
||||
|
||||
|
||||
class FinderIban(BaseModel):
|
||||
...
|
||||
|
||||
|
||||
class FinderComment(BaseModel):
|
||||
...
|
||||
|
||||
|
||||
class RedisData(BaseModel):
|
||||
MailReader: MailReader
|
||||
MailParser: List[MailParser]
|
||||
FinderIban: FinderIban | Any
|
||||
FinderComment: FinderComment | Any
|
||||
|
||||
|
||||
class Status:
|
||||
PENDING: str = "PENDING"
|
||||
IN_PROGRESS: str = "IN_PROGRESS"
|
||||
COMPLETED: str = "COMPLETED"
|
||||
FAILED: str = "FAILED"
|
||||
|
||||
|
||||
class RedisTaskObject(BaseModel):
|
||||
task: str
|
||||
data: RedisData
|
||||
completed: bool
|
||||
service: str
|
||||
status: str
|
||||
created_at: str
|
||||
is_completed: bool
|
||||
|
||||
class EmailConfig:
|
||||
|
||||
HOST: str = os.getenv("EMAIL_HOST", "10.10.2.34")
|
||||
USERNAME: str = os.getenv("EMAIL_USERNAME", "isbank@mehmetkaratay.com.tr")
|
||||
PASSWORD: str = os.getenv("EMAIL_PASSWORD", "system")
|
||||
PORT: int = int(os.getenv("EMAIL_PORT", 993))
|
||||
|
||||
@classmethod
|
||||
def as_dict(cls):
|
||||
return dict(host=EmailConfig.HOST, port=EmailConfig.PORT, username=EmailConfig.USERNAME, password=EmailConfig.PASSWORD)
|
||||
|
||||
|
||||
class RedisConfig:
|
||||
|
||||
HOST: str = os.getenv("REDIS_HOST", "10.10.2.15")
|
||||
PASSWORD: str = os.getenv("REDIS_PASSWORD", "your_strong_password_here")
|
||||
PORT: int = int(os.getenv("REDIS_PORT", 6379))
|
||||
DB: int = int(os.getenv("REDIS_DB", 0))
|
||||
|
||||
@classmethod
|
||||
def as_dict(cls):
|
||||
return dict(host=RedisConfig.HOST, port=int(RedisConfig.PORT), password=RedisConfig.PASSWORD, db=int(RedisConfig.DB))
|
||||
|
||||
|
||||
class MailReaderMainConfig:
|
||||
|
||||
MAILBOX: str
|
||||
AUTHORIZE_IBAN: str
|
||||
NO_ATTACHMENT_FOLDER: str
|
||||
COMPLETED_FOLDER: str
|
||||
TASK_DATA_PREFIX: str
|
||||
TASK_MAILID_INDEX_PREFIX: str
|
||||
TASK_UUID_INDEX_PREFIX: str
|
||||
TASK_SEEN_PREFIX: str
|
||||
SERVICE_PREFIX: str
|
||||
NEXT_SERVICE_PREFIX: str
|
||||
|
||||
|
||||
class ConfigServices:
|
||||
|
||||
MAIN_TASK_PREFIX: str = "BANK:SERVICES:TASK:DATA"
|
||||
TASK_MAILID_INDEX_PREFIX: str = "BANK:SERVICES:TASK:MAILID"
|
||||
TASK_UUID_INDEX_PREFIX: str = "BANK:SERVICES:TASK:UUID"
|
||||
TASK_SEEN_PREFIX: str = "BANK:SERVICES:TASK:SEEN"
|
||||
|
||||
SERVICE_PREFIX_MAIL_READER: str = "MailReader"
|
||||
SERVICE_PREFIX_MAIL_PARSER: str = "MailParser"
|
||||
SERVICE_PREFIX_FINDER_IBAN: str = "FinderIban"
|
||||
SERVICE_PREFIX_FINDER_COMMENT: str = "FinderComment"
|
||||
|
||||
|
||||
paramsRedisData = Union[MailReader, MailParser, FinderIban, FinderComment]
|
||||
|
||||
|
|
@ -1,586 +0,0 @@
|
|||
import os
|
||||
import socket
|
||||
import logging
|
||||
|
||||
from functools import wraps
|
||||
from base64 import b64encode
|
||||
from time import sleep
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Any, Union, TypeVar, Tuple
|
||||
|
||||
from email.message import EmailMessage
|
||||
from email.policy import default as policy
|
||||
from email.headerregistry import UniqueDateHeader, UniqueAddressHeader, UniqueUnstructuredHeader
|
||||
from email.parser import BytesParser
|
||||
from imaplib import IMAP4_SSL, IMAP4
|
||||
|
||||
from Depends.redis_handlers import RedisHandler
|
||||
from Depends.config import EmailConfig, MailReaderMainConfig, MailReader, MailParser, RedisData
|
||||
from Depends.service_handler import MailReaderService
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger('Email Reader Service')
|
||||
|
||||
# Type variable for generic function return types
|
||||
T = TypeVar('T')
|
||||
|
||||
def retry_on_connection_error(max_retries: int = 3, delay: int = 5, backoff: int = 2, exceptions=(Exception,)):
|
||||
"""
|
||||
Retry decorator with exponential backoff for handling connection errors
|
||||
|
||||
Args:
|
||||
max_retries: Maximum number of retries
|
||||
delay: Initial delay between retries in seconds
|
||||
backoff: Backoff multiplier
|
||||
exceptions: Tuple of exceptions to catch
|
||||
|
||||
Returns:
|
||||
Decorated function
|
||||
"""
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
mtries, mdelay = max_retries, delay
|
||||
while mtries > 0:
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except exceptions as e:
|
||||
logger.warning(f"Connection error in {func.__name__}: {str(e)}, retrying in {mdelay}s...")
|
||||
sleep(mdelay)
|
||||
mtries -= 1
|
||||
mdelay *= backoff
|
||||
|
||||
# Final attempt
|
||||
return func(*args, **kwargs)
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
class Mails:
|
||||
"""Class representing an email with attachments and metadata"""
|
||||
|
||||
def __init__(self, mail_id: bytes, mail_data: bytes):
|
||||
"""
|
||||
Initialize a mail object
|
||||
|
||||
Args:
|
||||
mail_id: Unique identifier for the email
|
||||
mail_data: Raw email data
|
||||
"""
|
||||
self.id: bytes = mail_id
|
||||
self.raw_data: bytes = mail_data
|
||||
self.attachments: List[Dict[str, Union[str, bytes]]] = []
|
||||
self.message: EmailMessage = BytesParser(policy=policy).parsebytes(mail_data)
|
||||
self.subject: UniqueUnstructuredHeader = self.message.get('Subject', '') or ''
|
||||
self.from_: UniqueAddressHeader = self.message.get('From', '') or ''
|
||||
self.to: UniqueAddressHeader = self.message.get('To', '') or ''
|
||||
self.date: UniqueDateHeader = self.message.get('Date', '') or ''
|
||||
self.body_text: str = self._get_body_text()
|
||||
self._extract_attachments()
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Convert mail object to dictionary representation
|
||||
|
||||
Returns:
|
||||
Dictionary representation of mail
|
||||
"""
|
||||
return {
|
||||
'id': self.id.decode('utf-8'),
|
||||
'attachments': [{
|
||||
'filename': attachment['filename'],
|
||||
'content_type': attachment['content_type'],
|
||||
'charset': attachment['charset'],
|
||||
'data': b64encode(attachment['data']).decode(attachment['charset'], errors='replace')
|
||||
} for attachment in self.attachments],
|
||||
'subject': str(self.subject),
|
||||
'from_': {
|
||||
"display_name": self.from_.addresses[0].display_name,
|
||||
"username": self.from_.addresses[0].username,
|
||||
"domain": self.from_.addresses[0].domain,
|
||||
"mail": f"{self.from_.addresses[0].username}@{self.from_.addresses[0].domain}"
|
||||
},
|
||||
'to': [
|
||||
{
|
||||
"display_name": address.display_name,
|
||||
"username": address.username,
|
||||
"domain": address.domain,
|
||||
"mail": f"{address.username}@{address.domain}"
|
||||
} for address in self.to.addresses
|
||||
],
|
||||
'date': str(self.date.datetime),
|
||||
'body_text': str(self.body_text)
|
||||
}
|
||||
|
||||
def _get_body_text(self) -> str:
|
||||
"""
|
||||
Extract plain text body from email
|
||||
|
||||
Returns:
|
||||
Plain text body of email
|
||||
"""
|
||||
body = self.message.get_body(preferencelist=('plain',))
|
||||
if body is not None:
|
||||
return body.get_content() or ''
|
||||
if self.message.is_multipart():
|
||||
for part in self.message.walk():
|
||||
if part.get_content_type() == 'text/plain' and (part.get_content_disposition() or '') != 'attachment':
|
||||
try:
|
||||
return part.get_content() or ''
|
||||
except Exception:
|
||||
payload = part.get_payload(decode=True) or b''
|
||||
return payload.decode(part.get_content_charset() or 'utf-8', errors='replace')
|
||||
else:
|
||||
if self.message.get_content_type() == 'text/plain':
|
||||
try:
|
||||
return self.message.get_content() or ''
|
||||
except Exception:
|
||||
payload = self.message.get_payload(decode=True) or b''
|
||||
return payload.decode(self.message.get_content_charset() or 'utf-8', errors='replace')
|
||||
return ''
|
||||
|
||||
def _extract_attachments(self) -> None:
|
||||
"""Extract attachments from email"""
|
||||
for part in self.message.walk():
|
||||
if part.get_content_disposition() == 'attachment':
|
||||
filename = part.get_filename()
|
||||
if not filename:
|
||||
continue
|
||||
data = part.get_payload(decode=True) or b''
|
||||
charset = part.get_charset() or 'utf-8'
|
||||
self.attachments.append({'filename': filename, 'content_type': part.get_content_type(), 'data': data, 'charset': charset})
|
||||
|
||||
def save_attachments(self, folder: str) -> None:
|
||||
"""
|
||||
Save attachments to folder
|
||||
|
||||
Args:
|
||||
folder: Folder to save attachments to
|
||||
"""
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
for att in self.attachments:
|
||||
with open(os.path.join(folder, att['filename']), 'wb') as f:
|
||||
f.write(att['data'])
|
||||
|
||||
|
||||
class EmailReaderService:
|
||||
"""Service for reading emails from mailbox with improved connection resilience"""
|
||||
|
||||
def __init__(self, config: MailReaderMainConfig):
|
||||
"""
|
||||
Initialize email reader service
|
||||
|
||||
Args:
|
||||
config: Application configuration
|
||||
"""
|
||||
self.email_config = EmailConfig()
|
||||
self.config = config
|
||||
self.mail = None
|
||||
self.data: List[Mails] = []
|
||||
self.mail_count = 0
|
||||
self.is_connected = False
|
||||
self.connect_imap()
|
||||
|
||||
def connect_imap(self) -> bool:
|
||||
"""
|
||||
Establish IMAP connection with retry mechanism
|
||||
|
||||
Returns:
|
||||
True if connection successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
if self.mail:
|
||||
# Try to close existing connection if any
|
||||
try:
|
||||
self.mail.close()
|
||||
self.mail.logout()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
logger.info(f"Connecting to IMAP server {self.email_config.HOST}:{self.email_config.PORT}")
|
||||
self.mail = IMAP4_SSL(self.email_config.HOST, self.email_config.PORT)
|
||||
self.is_connected = True
|
||||
return True
|
||||
except (socket.error, IMAP4.error) as e:
|
||||
logger.error(f"Failed to connect to IMAP server: {str(e)}")
|
||||
self.is_connected = False
|
||||
return False
|
||||
|
||||
@retry_on_connection_error(max_retries=3, delay=5, exceptions=(socket.error, IMAP4.error, OSError))
|
||||
def login_and_connect(self) -> bool:
|
||||
"""
|
||||
Login to IMAP server and connect to inbox with retry mechanism
|
||||
|
||||
Returns:
|
||||
True if login successful, False otherwise
|
||||
|
||||
Raises:
|
||||
ConnectionError: If connection cannot be established
|
||||
"""
|
||||
if not self.is_connected:
|
||||
if not self.connect_imap():
|
||||
raise ConnectionError("Cannot establish connection to IMAP server")
|
||||
|
||||
try:
|
||||
logger.info(f"Logging in as {self.email_config.USERNAME}")
|
||||
self.mail.login(self.email_config.USERNAME, self.email_config.PASSWORD)
|
||||
self._connect_inbox()
|
||||
logger.info("Successfully logged in and connected to inbox")
|
||||
return True
|
||||
except (socket.error, IMAP4.error) as e:
|
||||
logger.error(f"Login failed: {str(e)}")
|
||||
self.is_connected = False
|
||||
raise
|
||||
|
||||
@retry_on_connection_error(max_retries=2, delay=3, exceptions=(socket.error, IMAP4.error, OSError))
|
||||
def refresh(self) -> Tuple[List[Mails], int, int]:
|
||||
"""
|
||||
Refresh mail data with connection retry
|
||||
|
||||
Returns:
|
||||
Tuple of (mail data, mail count, data length)
|
||||
"""
|
||||
try:
|
||||
self.mail_count = self._fetch_count()
|
||||
self.data = self._fetch_all()
|
||||
return self.data, self.mail_count, len(self.data)
|
||||
except (socket.error, IMAP4.error) as e:
|
||||
logger.error(f"Refresh failed, attempting to reconnect: {str(e)}")
|
||||
self.connect_imap()
|
||||
self.login_and_connect()
|
||||
self.mail_count = self._fetch_count()
|
||||
self.data = self._fetch_all()
|
||||
return self.data, self.mail_count, len(self.data)
|
||||
|
||||
@retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error))
|
||||
def _connect_inbox(self) -> None:
|
||||
"""
|
||||
Connect to INBOX with retry mechanism
|
||||
|
||||
Raises:
|
||||
IMAP4.error: If connection to INBOX fails
|
||||
"""
|
||||
logger.info("Selecting INBOX folder")
|
||||
status, _ = self.mail.select("INBOX")
|
||||
if status != 'OK':
|
||||
error_msg = "Failed to connect to INBOX"
|
||||
logger.error(error_msg)
|
||||
raise IMAP4.error(error_msg)
|
||||
|
||||
@retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error))
|
||||
def _fetch_count(self) -> int:
|
||||
"""
|
||||
Fetch mail count with retry mechanism
|
||||
|
||||
Returns:
|
||||
Number of emails
|
||||
|
||||
Raises:
|
||||
IMAP4.error: If fetching mail count fails
|
||||
"""
|
||||
try:
|
||||
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
|
||||
if status != 'OK':
|
||||
raise IMAP4.error("Failed to get mail count")
|
||||
count = len(uids[0].split()) if uids[0] else 0
|
||||
logger.info(f"Found {count} emails from {self.config.MAILBOX}")
|
||||
return count
|
||||
except (socket.error, IMAP4.error) as e:
|
||||
logger.error(f"Error fetching mail count: {str(e)}")
|
||||
raise
|
||||
|
||||
@retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error))
|
||||
def _fetch_all(self) -> List[Mails]:
|
||||
"""
|
||||
Fetch all mails with retry mechanism
|
||||
|
||||
Returns:
|
||||
List of mail objects
|
||||
|
||||
Raises:
|
||||
IMAP4.error: If fetching mails fails
|
||||
"""
|
||||
self.data = []
|
||||
try:
|
||||
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
|
||||
if status != 'OK':
|
||||
raise IMAP4.error("Mail search failed")
|
||||
|
||||
if not uids[0]:
|
||||
logger.info("No emails found matching criteria")
|
||||
return self.data
|
||||
|
||||
uid_list = uids[0].split()
|
||||
logger.info(f"Processing {len(uid_list)} emails")
|
||||
|
||||
for uid in uid_list:
|
||||
try:
|
||||
status, msg_data = self.mail.uid('fetch', uid, '(RFC822)')
|
||||
if status == 'OK' and msg_data[0] is not None:
|
||||
self.data.append(Mails(uid, msg_data[0][1]))
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch email with UID {uid}: {str(e)}")
|
||||
continue
|
||||
logger.info(f"Successfully fetched {len(self.data)} emails")
|
||||
return self.data
|
||||
except (socket.error, IMAP4.error) as e:
|
||||
logger.error(f"Error fetching emails: {str(e)}")
|
||||
raise
|
||||
|
||||
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
|
||||
def mark_no_attachment(self, uid):
|
||||
"""
|
||||
Move message to no attachment folder with retry mechanism
|
||||
|
||||
Args:
|
||||
uid: Email UID
|
||||
"""
|
||||
try:
|
||||
# Handle both string and bytes types for logging
|
||||
log_uid = uid
|
||||
if isinstance(uid, bytes):
|
||||
log_uid = uid.decode('utf-8', errors='replace')
|
||||
|
||||
logger.info(f"Moving email {log_uid} to {self.config.NO_ATTACHMENT_FOLDER} folder")
|
||||
self.mail.uid('COPY', uid, self.config.NO_ATTACHMENT_FOLDER)
|
||||
self.delete(uid)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to mark email as no attachment: {str(e)}")
|
||||
raise
|
||||
|
||||
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
|
||||
def mark_completed(self, uid: bytes):
|
||||
"""
|
||||
Move message to completed folder with retry mechanism
|
||||
|
||||
Args:
|
||||
uid: Email UID
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Moving email {uid.decode('utf-8', errors='replace')} to {self.config.COMPLETED_FOLDER} folder")
|
||||
self.mail.uid('COPY', uid, self.config.COMPLETED_FOLDER)
|
||||
# self.delete(uid)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to mark email as completed: {str(e)}")
|
||||
raise
|
||||
|
||||
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
|
||||
def delete(self, uid):
|
||||
"""
|
||||
Delete message with retry mechanism
|
||||
|
||||
Args:
|
||||
uid: Email UID
|
||||
"""
|
||||
try:
|
||||
# Handle both string and bytes types
|
||||
log_uid = uid
|
||||
if isinstance(uid, bytes):
|
||||
log_uid = uid.decode('utf-8', errors='replace')
|
||||
logger.info(f"Marking email {log_uid} for deletion")
|
||||
self.mail.uid('STORE', uid, '+FLAGS', r'(\Deleted)')
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete email: {str(e)}")
|
||||
raise
|
||||
|
||||
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
|
||||
def commit(self):
|
||||
"""
|
||||
Commit pending operations with retry mechanism
|
||||
|
||||
Raises:
|
||||
Exception: If commit fails
|
||||
"""
|
||||
try:
|
||||
logger.info("Committing changes (expunge)")
|
||||
self.mail.expunge()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to commit changes: {str(e)}")
|
||||
raise
|
||||
|
||||
def logout(self):
|
||||
"""Logout from IMAP server"""
|
||||
if self.mail and self.is_connected:
|
||||
try:
|
||||
logger.info("Logging out from IMAP server")
|
||||
self.mail.close()
|
||||
self.mail.logout()
|
||||
self.is_connected = False
|
||||
except Exception as e:
|
||||
logger.warning(f"Logout failed: {str(e)}")
|
||||
|
||||
@property
|
||||
def count(self):
|
||||
"""Get count of emails"""
|
||||
return len(self.data)
|
||||
|
||||
|
||||
class EmailServiceRunner:
|
||||
"""Runner for email service with improved resilience to connection issues"""
|
||||
|
||||
def __init__(self, redis_handler: MailReaderService, email_service: EmailReaderService) -> None:
|
||||
"""
|
||||
Initialize email service runner
|
||||
|
||||
Args:
|
||||
redis_handler: Redis handler for Redis operations
|
||||
email_service: Email service for email operations
|
||||
"""
|
||||
# Use MailReaderService singleton for Redis operations
|
||||
self.redis_handler = redis_handler
|
||||
self.email_service = email_service
|
||||
self.mails = None
|
||||
self.count = 0
|
||||
self.counter = 0
|
||||
self.mail_count = 0
|
||||
self.redis_connected = self._check_redis_connection()
|
||||
|
||||
def _check_redis_connection(self) -> bool:
|
||||
"""
|
||||
Check if Redis connection is alive using MailReaderService
|
||||
|
||||
Returns:
|
||||
True if connection is alive, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Use MailReaderService to check connection
|
||||
connection_status = self.redis_handler.ensure_connection()
|
||||
if connection_status:
|
||||
logger.info("Redis connection established via MailReaderService")
|
||||
else:
|
||||
logger.error("Redis connection check failed via MailReaderService")
|
||||
return connection_status
|
||||
except MailReaderService.REDIS_EXCEPTIONS as e:
|
||||
logger.error(f"Redis connection failed: {str(e)}")
|
||||
return False
|
||||
|
||||
@retry_on_connection_error(max_retries=3, delay=5, exceptions=MailReaderService.REDIS_EXCEPTIONS)
|
||||
def _ensure_redis_connection(self) -> bool:
|
||||
"""
|
||||
Ensure Redis connection is established using MailReaderService
|
||||
|
||||
Returns:
|
||||
True if connection is established, False otherwise
|
||||
"""
|
||||
if not self.redis_connected:
|
||||
try:
|
||||
self.redis_connected = self.redis_handler.ensure_connection()
|
||||
if not self.redis_connected:
|
||||
raise RedisHandler.REDIS_EXCEPTIONS[0]("Failed to establish Redis connection")
|
||||
except Exception as e:
|
||||
self.redis_connected = False
|
||||
raise
|
||||
return self.redis_connected
|
||||
|
||||
def fetch_and_set_mails(self):
|
||||
"""
|
||||
Fetch and process emails with improved error handling
|
||||
|
||||
Raises:
|
||||
Exception: If fetching or processing emails fails
|
||||
"""
|
||||
try:
|
||||
# Reset counters
|
||||
self.count = 0
|
||||
self.counter = 0
|
||||
|
||||
# Ensure Redis connection before proceeding
|
||||
self._ensure_redis_connection()
|
||||
|
||||
# Refresh email data
|
||||
try:
|
||||
self.mails, self.mail_count, self.count = self.email_service.refresh()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to refresh emails: {str(e)}")
|
||||
raise
|
||||
|
||||
# Process each email
|
||||
for mail in self.mails:
|
||||
if not getattr(mail, 'id', None):
|
||||
logger.warning("Skipping email with no ID")
|
||||
continue
|
||||
|
||||
mail_id = mail.id.decode('utf-8')
|
||||
|
||||
# check mail has .pdf extension
|
||||
mail_dict = mail.to_dict()
|
||||
try:
|
||||
if mail.attachments:
|
||||
if any([attachment['filename'].lower().endswith('.pdf') for attachment in mail_dict['attachments']]):
|
||||
self.email_service.mark_no_attachment(mail_id)
|
||||
else:
|
||||
self._process_mail_with_attachments(mail, mail_id)
|
||||
else:
|
||||
self.email_service.mark_no_attachment(mail_id)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing email {mail_id}: {str(e)}")
|
||||
continue
|
||||
|
||||
try:
|
||||
self.email_service.commit()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to commit email changes: {str(e)}")
|
||||
timestamp = f"TIMESTAMP: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
|
||||
logger.info(f"Fetched and processed emails. Read/Total: {self.counter}/{self.count}. {self.count - self.counter} already saved. Mail count: {self.mail_count}")
|
||||
print(f"{timestamp} Mail Reader Service | Fetching and setting mails. Read / Total Mails: {self.counter}/{self.count}. {self.count - self.counter} already saved in redis. Mail Server mail count: {self.mail_count}")
|
||||
|
||||
except MailReaderService.REDIS_EXCEPTIONS as e:
|
||||
logger.error(f"Redis error in fetch_and_set_mails: {str(e)}")
|
||||
self.redis_connected = False
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in fetch_and_set_mails: {str(e)}")
|
||||
raise
|
||||
|
||||
def _process_mail_with_attachments(self, mail: Mails, mail_id: str):
|
||||
"""
|
||||
Process an email with attachments using MailReaderService
|
||||
|
||||
Args:
|
||||
mail: Mail object
|
||||
mail_id: Mail ID
|
||||
|
||||
Raises:
|
||||
Exception: If processing mail fails
|
||||
"""
|
||||
try:
|
||||
mail_to_dict = mail.to_dict()
|
||||
result = self.redis_handler.process_mail(
|
||||
mail_id=mail_id, mail_data=mail_to_dict, service_prefix=self.email_service.config.SERVICE_PREFIX, counter=self.counter
|
||||
)
|
||||
if result['status'] == 'success':
|
||||
if result['action'] == 'stored_new_mail':
|
||||
self.counter = result.get('counter', self.counter)
|
||||
logger.info(f"Successfully processed new email {mail_id}")
|
||||
elif result['action'] == 'checked_existing_mail':
|
||||
if result.get('is_completed', False):
|
||||
logger.info(f"Marking completed email {mail_id}")
|
||||
self.email_service.mark_completed(mail_id)
|
||||
elif result['status'] == 'error':
|
||||
if result['action'] == 'id_mismatch':
|
||||
logger.error(f"Mail ID mismatch: {mail_id} != {result.get('stored_id')}")
|
||||
raise ValueError("Mail id does not match with id from Redis")
|
||||
else:
|
||||
logger.error(f"Email Service Runner Error processing mail {mail_id}: {result.get('error', 'Unknown error')}")
|
||||
raise Exception(result.get('error', 'Unknown error during mail processing'))
|
||||
else:
|
||||
logger.warning(f"Unexpected result status: {result['status']} for mail {mail_id}")
|
||||
except MailReaderService.REDIS_EXCEPTIONS as e:
|
||||
logger.error(f"Redis error while processing mail {mail_id}: {str(e)}")
|
||||
self.redis_connected = False
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Email Service Runner Error processing mail {mail_id}: {str(e)}")
|
||||
raise
|
||||
|
||||
def drop(self):
|
||||
"""Clean up resources"""
|
||||
try:
|
||||
self.email_service.commit()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error during commit on drop: {str(e)}")
|
||||
|
||||
try:
|
||||
self.email_service.logout()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error during logout on drop: {str(e)}")
|
||||
|
|
@ -1,49 +0,0 @@
|
|||
from prisma import Prisma
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
# Singleton pattern for Prisma client
|
||||
_prisma_client: Optional[Prisma] = None
|
||||
|
||||
async def get_prisma_client() -> Prisma:
|
||||
"""
|
||||
Get or initialize the Prisma client singleton.
|
||||
|
||||
Returns:
|
||||
Prisma: The initialized Prisma client instance
|
||||
"""
|
||||
global _prisma_client
|
||||
|
||||
if _prisma_client is None:
|
||||
_prisma_client = Prisma()
|
||||
await _prisma_client.connect()
|
||||
|
||||
return _prisma_client
|
||||
|
||||
@asynccontextmanager
|
||||
async def prisma_client() -> AsyncGenerator[Prisma, None]:
|
||||
"""
|
||||
Context manager for Prisma client operations.
|
||||
|
||||
Yields:
|
||||
Prisma: The initialized Prisma client instance
|
||||
|
||||
Example:
|
||||
```python
|
||||
async with prisma_client() as db:
|
||||
users = await db.user.find_many()
|
||||
```
|
||||
"""
|
||||
client = await get_prisma_client()
|
||||
try:
|
||||
yield client
|
||||
except Exception as e:
|
||||
print(f"Database operation error: {e}")
|
||||
raise
|
||||
|
||||
async def disconnect_prisma():
|
||||
"""Disconnect the Prisma client when shutting down the application."""
|
||||
global _prisma_client
|
||||
if _prisma_client is not None:
|
||||
await _prisma_client.disconnect()
|
||||
_prisma_client = None
|
||||
|
|
@ -1,160 +0,0 @@
|
|||
import logging
|
||||
|
||||
from contextlib import contextmanager
|
||||
from time import sleep
|
||||
from redis import Redis, RedisError, ConnectionError as RedisConnectionError
|
||||
|
||||
from Depends.config import ConfigServices, RedisConfig
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger('Redis_Handler')
|
||||
|
||||
|
||||
@contextmanager
|
||||
def safe_redis_operation(redis_client: Redis, operation_name: str = "Redis operation"):
|
||||
"""
|
||||
Context manager for safely executing Redis operations with error handling
|
||||
"""
|
||||
try:
|
||||
yield redis_client
|
||||
except RedisConnectionError as e:
|
||||
logger.error(f"{operation_name} failed due to Redis connection error: {str(e)}")
|
||||
raise
|
||||
except RedisError as e:
|
||||
logger.error(f"{operation_name} failed due to Redis error: {str(e)}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"{operation_name} failed with unexpected error: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
class RedisHandler:
|
||||
"""Singleton Redis handler class for centralized Redis operations"""
|
||||
|
||||
# Singleton instance
|
||||
_instance = None
|
||||
|
||||
# Redis exceptions constant for unified error handling
|
||||
REDIS_EXCEPTIONS = (RedisConnectionError, RedisError)
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(RedisHandler, cls).__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
# Initialize only once
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# Initialize Redis client with retry logic
|
||||
self.redis_client = self._create_redis_client()
|
||||
self.redis_connected = self._check_redis_connection()
|
||||
self._initialized = True
|
||||
|
||||
def _create_redis_client(self):
|
||||
"""Create a Redis client with connection retry"""
|
||||
max_retries = 5
|
||||
retry_delay = 5
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
client = Redis(**RedisConfig.as_dict())
|
||||
client.ping() # Test the connection
|
||||
logger.info("Redis connection established successfully")
|
||||
return client
|
||||
except (RedisConnectionError, RedisError) as e:
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning(f"Redis connection attempt {attempt + 1} failed: {str(e)}. Retrying in {retry_delay} seconds...")
|
||||
sleep(retry_delay)
|
||||
retry_delay *= 2 # Exponential backoff
|
||||
else:
|
||||
logger.error(f"Failed to connect to Redis after {max_retries} attempts: {str(e)}")
|
||||
# Continue with a new Redis client instance even if ping fails
|
||||
# This allows the service to start and retry connections later
|
||||
return Redis(**RedisConfig.as_dict())
|
||||
|
||||
def _check_redis_connection(self) -> bool:
|
||||
"""Check if Redis connection is alive"""
|
||||
try:
|
||||
self.ping()
|
||||
return True
|
||||
except Exception as e:
|
||||
return False
|
||||
|
||||
def ping(self):
|
||||
"""Ping Redis server to check connection"""
|
||||
return self.redis_client.ping()
|
||||
|
||||
def sadd(self, key: str, value):
|
||||
"""Add a value to a Redis set"""
|
||||
return self.redis_client.sadd(key, value)
|
||||
|
||||
def get(self, key: str):
|
||||
"""Get a value from Redis by key"""
|
||||
return self.redis_client.get(key)
|
||||
|
||||
def set(self, key: str, value):
|
||||
"""Set a key-value pair in Redis"""
|
||||
return self.redis_client.set(key, value)
|
||||
|
||||
def rpush(self, key: str, value):
|
||||
"""Append a value to a Redis list"""
|
||||
return self.redis_client.rpush(key, value)
|
||||
|
||||
def lindex(self, key: str, index: int):
|
||||
"""Get an element from a Redis list by its index"""
|
||||
return self.redis_client.lindex(key, index)
|
||||
|
||||
def spop(self, key: str, value):
|
||||
"""Remove and return a random member from a Redis set"""
|
||||
return self.redis_client.spop(key, value)
|
||||
|
||||
def get_all_tasks(self):
|
||||
"""Get all keys matching the task prefix pattern"""
|
||||
return self.redis_client.lrange(ConfigServices.MAIN_TASK_PREFIX, 0, -1)
|
||||
|
||||
def ensure_connection(self) -> bool:
|
||||
"""Check if Redis connection is alive and reconnect if needed"""
|
||||
if not self.redis_connected:
|
||||
try:
|
||||
self.redis_client = self._create_redis_client()
|
||||
self.redis_connected = self._check_redis_connection()
|
||||
if self.redis_connected:
|
||||
logger.info("Redis connection re-established successfully")
|
||||
return self.redis_connected
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to re-establish Redis connection: {str(e)}")
|
||||
return False
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5):
|
||||
"""
|
||||
Handle Redis reconnection with exponential backoff based on consecutive errors
|
||||
|
||||
Args:
|
||||
consecutive_errors: Number of consecutive errors encountered
|
||||
max_consecutive_errors: Threshold for extended sleep time
|
||||
|
||||
Returns:
|
||||
tuple: (RedisHandler instance, bool indicating if extended sleep is needed)
|
||||
"""
|
||||
try:
|
||||
# Get a fresh instance (will reconnect internally)
|
||||
instance = cls()
|
||||
instance.redis_connected = instance._check_redis_connection()
|
||||
logger.info("Recreated Redis handler using singleton pattern")
|
||||
|
||||
# Determine if extended sleep is needed
|
||||
need_extended_sleep = consecutive_errors >= max_consecutive_errors
|
||||
if need_extended_sleep:
|
||||
logger.warning(f"Hit {max_consecutive_errors} consecutive Redis errors, taking longer pause")
|
||||
|
||||
return instance, need_extended_sleep
|
||||
|
||||
except Exception as redis_retry_error:
|
||||
logger.error(f"Failed to recreate Redis handler: {str(redis_retry_error)}")
|
||||
return None, consecutive_errors >= max_consecutive_errors
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
|
|
@ -1,835 +0,0 @@
|
|||
import logging
|
||||
|
||||
from json import loads, dumps
|
||||
from uuid import uuid4
|
||||
from datetime import datetime
|
||||
from Depends.config import Status, ConfigServices, RedisTaskObject, RedisData
|
||||
from Depends.redis_handlers import RedisHandler
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger('Service Task Retriever')
|
||||
|
||||
|
||||
class ServiceTaskRetriever:
|
||||
"""
|
||||
Class for retrieving and updating Redis task objects by UUID or mail ID.
|
||||
Provides direct access to task objects and service-specific data without iteration.
|
||||
"""
|
||||
|
||||
def __init__(self, redis_handler=None):
|
||||
"""
|
||||
Initialize the ServiceTaskRetriever
|
||||
|
||||
Args:
|
||||
redis_handler: Optional RedisHandler instance. If not provided, a new one will be created.
|
||||
"""
|
||||
if redis_handler:
|
||||
self.redis_handler = redis_handler
|
||||
else:
|
||||
self.redis_handler = RedisHandler()
|
||||
|
||||
self.redis_client = self.redis_handler.redis_client
|
||||
self.redis_prefix = ConfigServices.MAIN_TASK_PREFIX
|
||||
self.mailid_index_key = ConfigServices.TASK_MAILID_INDEX_PREFIX
|
||||
self.uuid_index_key = ConfigServices.TASK_UUID_INDEX_PREFIX
|
||||
|
||||
def fetch_all_tasks(self):
|
||||
"""
|
||||
Get all tasks from Redis
|
||||
|
||||
Returns:
|
||||
list: List of task objects
|
||||
"""
|
||||
all_task = self.redis_handler.get_all_tasks()
|
||||
return [RedisTaskObject(**loads(task)) for task in all_task]
|
||||
|
||||
def get_index_by_uuid(self, task_uuid: str):
|
||||
"""
|
||||
Get the index of a task by its UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
|
||||
Returns:
|
||||
int: Index of the task if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the UUID index or task is not found
|
||||
"""
|
||||
uuid_index_data = self.redis_handler.get(self.uuid_index_key)
|
||||
if uuid_index_data:
|
||||
uuid_index_dict = loads(uuid_index_data)
|
||||
return uuid_index_dict.get(task_uuid, None)
|
||||
raise FileNotFoundError(f"UUID index not found for {task_uuid}")
|
||||
|
||||
def get_index_by_mail_id(self, mail_id: str):
|
||||
"""
|
||||
Get the index of a task by its mail ID
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task
|
||||
|
||||
Returns:
|
||||
int: Index of the task if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the mail ID index or task is not found
|
||||
"""
|
||||
mail_id_index_data = self.redis_handler.get(self.mailid_index_key)
|
||||
if mail_id_index_data:
|
||||
mail_id_index_dict = loads(mail_id_index_data)
|
||||
return mail_id_index_dict.get(str(mail_id), None)
|
||||
raise FileNotFoundError(f"Mail ID index not found for {mail_id}")
|
||||
|
||||
def set_index_uuid(self, task_uuid: str, index: int):
|
||||
"""
|
||||
Set the index of a task by its mail ID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
index: Index of the task
|
||||
"""
|
||||
already_dict = self.redis_handler.get(self.mailid_index_key)
|
||||
if already_dict:
|
||||
already_dict = loads(already_dict)
|
||||
already_dict[str(task_uuid)] = index
|
||||
self.redis_handler.set(self.mailid_index_key, dumps(already_dict))
|
||||
else:
|
||||
self.redis_handler.set(self.mailid_index_key, dumps({str(task_uuid): index}))
|
||||
|
||||
def set_index_mail_id(self, mail_id: str, index: int):
|
||||
"""
|
||||
Set the index of a task by its mail ID
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task
|
||||
index: Index of the task
|
||||
"""
|
||||
already_dict = self.redis_handler.get(self.mailid_index_key)
|
||||
if already_dict:
|
||||
already_dict = loads(already_dict)
|
||||
already_dict[str(mail_id)] = index
|
||||
self.redis_handler.set(self.mailid_index_key, dumps(already_dict))
|
||||
else:
|
||||
self.redis_handler.set(self.mailid_index_key, dumps({str(mail_id): index}))
|
||||
|
||||
def update_mail_id_index(self, task_uuid: str, index: int):
|
||||
"""
|
||||
Update the mail ID index with the same index as UUID index
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task
|
||||
task_uuid: UUID of the task
|
||||
"""
|
||||
if get_index_by_uuid := self.get_index_by_uuid(task_uuid):
|
||||
self.set_index_uuid(task_uuid, get_index_by_uuid)
|
||||
|
||||
def update_uuid_index(self, task_uuid: str, mail_id: str):
|
||||
"""
|
||||
Update the UUID index with the same index as mail ID index
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
mail_id: Mail ID of the task
|
||||
"""
|
||||
if get_index_by_mail_id := self.get_index_by_mail_id(mail_id):
|
||||
self.set_index_uuid(task_uuid, get_index_by_mail_id)
|
||||
|
||||
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
|
||||
"""
|
||||
Get a task object directly by its UUID without iteration
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to retrieve
|
||||
|
||||
Returns:
|
||||
RedisTaskObject: The task object if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the UUID index or task is not found
|
||||
"""
|
||||
index_by_uuid = self.get_index_by_uuid(task_uuid)
|
||||
if not index_by_uuid:
|
||||
raise FileNotFoundError(f"UUID index not found for {task_uuid}")
|
||||
if task_data := self.redis_client.lindex(self.redis_prefix, int(index_by_uuid)):
|
||||
return RedisTaskObject(**loads(task_data))
|
||||
raise FileNotFoundError(f"Task not found for UUID: {task_uuid}")
|
||||
|
||||
def get_task_by_mail_id(self, mail_id: str) -> RedisTaskObject:
|
||||
"""
|
||||
Get a task object directly by its mail ID without iteration
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task to retrieve
|
||||
|
||||
Returns:
|
||||
RedisTaskObject: The task object if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the mail ID index or task is not found
|
||||
"""
|
||||
mail_id_index = self.get_index_by_mail_id(mail_id)
|
||||
if mail_id_index is not None:
|
||||
if task_data := self.redis_client.lindex(self.redis_prefix, int(mail_id_index)):
|
||||
return RedisTaskObject(**loads(task_data))
|
||||
raise FileNotFoundError(f"Task not found for mail ID: {mail_id}")
|
||||
|
||||
def get_service_data_by_uuid(self, task_uuid: str, service_name: str):
|
||||
"""
|
||||
Get service-specific data from a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
service_name: Name of the service to extract data for
|
||||
|
||||
Returns:
|
||||
Any: Service-specific data if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task or service data is not found
|
||||
"""
|
||||
task_object = self.get_task_by_uuid(task_uuid)
|
||||
|
||||
# Extract service data
|
||||
for attr in ConfigServices.__dict__:
|
||||
if attr == service_name:
|
||||
service_data = getattr(task_object.data, attr, None)
|
||||
if service_data:
|
||||
return service_data
|
||||
raise FileNotFoundError(f"Service data '{service_name}' not found in task {task_uuid}")
|
||||
|
||||
def get_service_data_by_mail_id(self, mail_id: str, service_name: str):
|
||||
"""
|
||||
Get service-specific data from a task by mail ID
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task
|
||||
service_name: Name of the service to extract data for
|
||||
|
||||
Returns:
|
||||
Any: Service-specific data if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task or service data is not found
|
||||
"""
|
||||
task_object = self.get_task_by_mail_id(mail_id)
|
||||
for attr in ConfigServices.__dict__:
|
||||
if attr == service_name:
|
||||
service_data = getattr(task_object.data, attr, None)
|
||||
if service_data:
|
||||
return service_data
|
||||
raise FileNotFoundError(f"Service data '{service_name}' not found in task for mail ID {mail_id}")
|
||||
|
||||
def create_task_with_uuid(self, task_uuid: str, service_name: str, mail_reader: dict, mail_parser: dict) -> bool:
|
||||
"""
|
||||
Create a new task with UUID indexing. This method will fail if a task with the UUID already exists.
|
||||
|
||||
Args:
|
||||
task_uuid: UUID for the task
|
||||
service_name: Service name for the task
|
||||
task_data: Dictionary containing task data
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
ValueError: If the task data is invalid, storage fails, or task already exists
|
||||
"""
|
||||
# Check if task with this UUID already exists
|
||||
try:
|
||||
existing_task = self.get_task_by_uuid(task_uuid)
|
||||
# If we get here, task exists
|
||||
raise ValueError(f"Task with UUID {task_uuid} already exists. Use store_task_with_uuid to update.")
|
||||
except FileNotFoundError:
|
||||
# Task doesn't exist, proceed with creation
|
||||
pass
|
||||
|
||||
# Validate service name
|
||||
self._validate_service_name(service_name)
|
||||
|
||||
# Create new RedisData with proper defaults for all services
|
||||
data_dict = {'MailReader': None, 'MailParser': [], 'FinderIban': {}, 'FinderComment': {}}
|
||||
# Set the actual service data
|
||||
data_dict['MailReader'] = mail_reader
|
||||
data_dict['MailParser'] = mail_parser
|
||||
|
||||
# Create new RedisData object
|
||||
redis_data = RedisData(**data_dict)
|
||||
|
||||
# Create new task object
|
||||
write_object = RedisTaskObject(
|
||||
task=task_uuid,
|
||||
data=redis_data,
|
||||
completed=False,
|
||||
service=service_name,
|
||||
status=Status.COMPLETED,
|
||||
created_at=datetime.now().isoformat(),
|
||||
is_completed=False
|
||||
)
|
||||
|
||||
# Convert to dict for serialization
|
||||
write_object = write_object.model_dump()
|
||||
|
||||
# Push new task to Redis list
|
||||
redis_write_ = self.redis_client.rpush(self.redis_prefix, dumps(write_object))
|
||||
if not redis_write_:
|
||||
raise ValueError(f"Failed to write task data to Redis for UUID {task_uuid}")
|
||||
|
||||
index_value = redis_write_ - 1
|
||||
self.set_index_mail_id(mail_reader['id'], index_value)
|
||||
self.set_index_uuid(task_uuid, index_value)
|
||||
return True
|
||||
|
||||
def store_task_with_uuid(self, task_uuid: str, service_name: str, task_data: dict) -> bool:
|
||||
"""
|
||||
Update an existing task with UUID indexing. Only the service-specific data will be updated
|
||||
while preserving other service data and task metadata.
|
||||
|
||||
Args:
|
||||
task_uuid: UUID for the task
|
||||
service_name: Service name for the task
|
||||
task_data: Dictionary containing service-specific data
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task does not exist
|
||||
ValueError: If the task data is invalid or storage fails
|
||||
"""
|
||||
# Validate service name
|
||||
self._validate_service_name(service_name)
|
||||
|
||||
# Get existing task
|
||||
try:
|
||||
existing_task = self.get_task_by_uuid(task_uuid)
|
||||
except FileNotFoundError:
|
||||
raise FileNotFoundError(f"Task with UUID {task_uuid} not found. Use create_task_with_uuid to create a new task.")
|
||||
|
||||
# Prepare new service data
|
||||
new_service_data = task_data
|
||||
|
||||
# Get the existing data model
|
||||
existing_data = existing_task.data
|
||||
|
||||
# Create a new RedisData with all existing service data
|
||||
data_dict = existing_data.model_dump()
|
||||
|
||||
# Update only the specific service data
|
||||
data_dict[service_name] = new_service_data
|
||||
|
||||
# Create updated RedisData object
|
||||
redis_data = RedisData(**data_dict)
|
||||
|
||||
# Create task object with existing metadata but updated data
|
||||
write_object = RedisTaskObject(
|
||||
task=task_uuid,
|
||||
data=redis_data,
|
||||
completed=existing_task.completed,
|
||||
service=existing_task.service,
|
||||
status=existing_task.status,
|
||||
created_at=existing_task.created_at,
|
||||
is_completed=existing_task.is_completed
|
||||
)
|
||||
|
||||
# Convert to dict for serialization
|
||||
write_object = write_object.model_dump()
|
||||
|
||||
# Get task index
|
||||
index_value, _ = self._get_task_index_by_uuid(task_uuid)
|
||||
|
||||
# Update the task at the existing index
|
||||
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(write_object)):
|
||||
raise ValueError(f"Failed to update task data in Redis for UUID {task_uuid}")
|
||||
|
||||
return True
|
||||
|
||||
def _get_task_index_by_uuid(self, task_uuid: str) -> tuple:
|
||||
"""
|
||||
Helper method to get task index by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
|
||||
Returns:
|
||||
tuple: (index_value, task_data_dict) where index_value is the index in Redis list
|
||||
and task_data_dict is the deserialized task data
|
||||
Raises:
|
||||
FileNotFoundError: If the UUID index or task is not found
|
||||
"""
|
||||
# Get UUID index
|
||||
uuid_index_data = self.redis_client.get(self.uuid_index_key)
|
||||
if not uuid_index_data:
|
||||
raise FileNotFoundError(f"UUID index not found for task: {task_uuid}")
|
||||
|
||||
# Parse index and get task index
|
||||
uuid_index_dict = loads(uuid_index_data)
|
||||
index_value = uuid_index_dict.get(task_uuid)
|
||||
if index_value is None:
|
||||
raise FileNotFoundError(f"Task UUID {task_uuid} not found in index")
|
||||
|
||||
# Get task data
|
||||
task_data = self.redis_client.lindex(self.redis_prefix, int(index_value))
|
||||
if not task_data:
|
||||
raise FileNotFoundError(f"No task data found at index {index_value}")
|
||||
|
||||
return index_value, loads(task_data)
|
||||
|
||||
def _validate_service_name(self, service_name: str) -> bool:
|
||||
"""
|
||||
Validate that a service name exists in ConfigServices
|
||||
|
||||
Args:
|
||||
service_name: Name of the service to validate
|
||||
|
||||
Returns:
|
||||
bool: True if valid
|
||||
|
||||
Raises:
|
||||
ValueError: If service name is invalid
|
||||
"""
|
||||
# Check if service_name is one of the values in ConfigServices
|
||||
valid_service_names = [
|
||||
ConfigServices.SERVICE_PREFIX_MAIL_READER,
|
||||
ConfigServices.SERVICE_PREFIX_MAIL_PARSER,
|
||||
ConfigServices.SERVICE_PREFIX_FINDER_IBAN,
|
||||
ConfigServices.SERVICE_PREFIX_FINDER_COMMENT
|
||||
]
|
||||
|
||||
if service_name in valid_service_names:
|
||||
return True
|
||||
|
||||
raise ValueError(f"Invalid service name: {service_name}")
|
||||
|
||||
def update_task_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False):
|
||||
"""
|
||||
Update the service of a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to update
|
||||
service_name: Name of the service to update
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task is not found
|
||||
ValueError: If the update fails or service name is invalid
|
||||
"""
|
||||
# Get task index and data
|
||||
index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid)
|
||||
|
||||
# Update task status
|
||||
task_object_dict['service'] = service_name
|
||||
task_object_dict['status'] = status
|
||||
task_object_dict['completed'] = completed
|
||||
|
||||
# Write updated task back to Redis
|
||||
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)):
|
||||
raise ValueError(f"Failed to write updated task data for UUID {task_uuid}")
|
||||
return True
|
||||
|
||||
|
||||
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
|
||||
"""
|
||||
Update the status of a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to update
|
||||
is_completed: Whether the task is completed
|
||||
status: New status for the task
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task is not found
|
||||
ValueError: If the update fails
|
||||
"""
|
||||
# Get task index and data
|
||||
index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid)
|
||||
|
||||
# Update task status
|
||||
task_object_dict['is_completed'] = is_completed
|
||||
task_object_dict['status'] = status
|
||||
task_object_dict['completed'] = is_completed
|
||||
|
||||
# Write updated task back to Redis
|
||||
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)):
|
||||
raise ValueError(f"Failed to write updated task data for UUID {task_uuid}")
|
||||
|
||||
return True
|
||||
|
||||
def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool:
|
||||
"""
|
||||
Update service-specific data in a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to update
|
||||
service_name: Name of the service data to update
|
||||
service_data: New service data
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task is not found
|
||||
ValueError: If the update fails or service name is invalid
|
||||
"""
|
||||
# Validate service name
|
||||
self._validate_service_name(service_name)
|
||||
|
||||
# Get task index and data
|
||||
index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid)
|
||||
|
||||
# Update the service data
|
||||
if 'data' not in task_object_dict:
|
||||
task_object_dict['data'] = {}
|
||||
|
||||
task_object_dict['data'][service_name] = service_data
|
||||
|
||||
# Write updated task back to Redis
|
||||
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)):
|
||||
raise ValueError(f"Failed to update service data for UUID {task_uuid}")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class MailReaderService:
|
||||
"""
|
||||
Main handler class that uses ServiceTaskRetriever with RedisHandler for Redis operations.
|
||||
This class serves as the main entry point for Redis operations in the application.
|
||||
Uses the RedisHandler singleton for all Redis operations.
|
||||
"""
|
||||
|
||||
# Singleton instance
|
||||
_instance = None
|
||||
REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS
|
||||
|
||||
@classmethod
|
||||
def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5):
|
||||
"""
|
||||
Handle Redis reconnection with exponential backoff based on consecutive errors
|
||||
|
||||
Args:
|
||||
consecutive_errors: Number of consecutive errors encountered
|
||||
max_consecutive_errors: Threshold for extended sleep time
|
||||
|
||||
Returns:
|
||||
tuple: (MainRedisHandler instance, bool indicating if extended sleep is needed)
|
||||
"""
|
||||
# Delegate to RedisHandler's reconnection logic
|
||||
redis_handler, need_extended_sleep = RedisHandler.handle_reconnection(
|
||||
consecutive_errors=consecutive_errors,
|
||||
max_consecutive_errors=max_consecutive_errors
|
||||
)
|
||||
|
||||
# If Redis handler was successfully reconnected, create a new MainRedisHandler instance
|
||||
if redis_handler:
|
||||
# Reset the singleton instance to force re-initialization
|
||||
cls._instance = None
|
||||
main_handler = cls()
|
||||
return main_handler, need_extended_sleep
|
||||
|
||||
return None, need_extended_sleep
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(MailReaderService, cls).__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
# Initialize only once
|
||||
if hasattr(self, '_initialized') and self._initialized:
|
||||
return
|
||||
|
||||
# Use RedisHandler singleton
|
||||
self.redis_handler = RedisHandler()
|
||||
self.service_retriever = ServiceTaskRetriever(self.redis_handler)
|
||||
self._initialized = True
|
||||
|
||||
def ensure_connection(self):
|
||||
"""
|
||||
Ensure Redis connection is established
|
||||
|
||||
Returns:
|
||||
bool: True if connection is established, False otherwise
|
||||
"""
|
||||
return self.redis_handler.ensure_connection()
|
||||
|
||||
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
|
||||
"""
|
||||
Get a task object by its UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to retrieve
|
||||
|
||||
Returns:
|
||||
RedisTaskObject: The task object if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the UUID index or task is not found
|
||||
"""
|
||||
return self.service_retriever.get_task_by_uuid(task_uuid)
|
||||
|
||||
def get_task_by_mail_id(self, mail_id: str) -> RedisTaskObject:
|
||||
"""
|
||||
Get a task object by its mail ID
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task to retrieve
|
||||
|
||||
Returns:
|
||||
RedisTaskObject: The task object if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the mail ID index or task is not found
|
||||
"""
|
||||
return self.service_retriever.get_task_by_mail_id(mail_id)
|
||||
|
||||
def get_service_data_by_uuid(self, task_uuid: str, service_name: str):
|
||||
"""
|
||||
Get service-specific data from a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
service_name: Name of the service to extract data for
|
||||
|
||||
Returns:
|
||||
Any: Service-specific data if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task or service data is not found
|
||||
"""
|
||||
return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name)
|
||||
|
||||
def get_service_data_by_mail_id(self, mail_id: str, service_name: str):
|
||||
"""
|
||||
Get service-specific data from a task by mail ID
|
||||
|
||||
Args:
|
||||
mail_id: Mail ID of the task
|
||||
service_name: Name of the service to extract data for
|
||||
|
||||
Returns:
|
||||
Any: Service-specific data if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task or service data is not found
|
||||
"""
|
||||
return self.service_retriever.get_service_data_by_mail_id(mail_id, service_name)
|
||||
|
||||
def store_task_with_uuid(self, task_uuid: str, service_name: str, task_data: dict) -> bool:
|
||||
"""
|
||||
Store a task with UUID indexing
|
||||
|
||||
Args:
|
||||
task_uuid: UUID for the task
|
||||
service_name: Service name for the task
|
||||
task_data: Dictionary containing task data
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
ValueError: If the task data is invalid or storage fails
|
||||
"""
|
||||
return self.service_retriever.store_task_with_uuid(task_uuid, service_name, task_data)
|
||||
|
||||
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
|
||||
"""
|
||||
Update the status of a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to update
|
||||
is_completed: Whether the task is completed
|
||||
status: New status for the task
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task is not found
|
||||
ValueError: If the update fails
|
||||
"""
|
||||
return self.service_retriever.update_task_status(task_uuid, is_completed, status)
|
||||
|
||||
def process_mail(self, mail_id: str, mail_data: dict, service_prefix: str, counter: int) -> dict:
|
||||
"""
|
||||
Process mail data and store it in Redis
|
||||
|
||||
Args:
|
||||
mail_id: The ID of the mail
|
||||
mail_data: Dictionary containing mail data
|
||||
service_prefix: Service prefix for the mail
|
||||
|
||||
Returns:
|
||||
dict: Result of the operation with status and action
|
||||
"""
|
||||
try:
|
||||
if self.redis_handler.sadd(f'{ConfigServices.TASK_SEEN_PREFIX}', mail_id):
|
||||
counter += 1
|
||||
task_uuid = uuid4().hex
|
||||
mail_without_attachments = mail_data.copy()
|
||||
attachments = mail_without_attachments.pop('attachments', [])
|
||||
create_task = dict(task_uuid=task_uuid, service_name=service_prefix, mail_reader=mail_without_attachments, mail_parser=attachments)
|
||||
self.service_retriever.create_task_with_uuid(**create_task)
|
||||
return {'status': 'success', 'action': 'stored_new_mail', 'counter': counter}
|
||||
else:
|
||||
try:
|
||||
task = self.service_retriever.get_task_by_mail_id(mail_id)
|
||||
if task is None and task.data and task.data.MailReader:
|
||||
stored_id = task.data.MailReader.id
|
||||
if stored_id != mail_id:
|
||||
return {'status': 'error', 'action': 'id_mismatch', 'stored_id': stored_id}
|
||||
return {'status': 'success', 'action': 'checked_existing_mail', 'is_completed': task.is_completed if task else False}
|
||||
except FileNotFoundError:
|
||||
return {'status': 'error', 'action': 'not_found', 'error': f'Mail with ID {mail_id} not found in index'}
|
||||
except Exception as e:
|
||||
logger.error(f"Mail Reader Service Error processing mail {mail_id}: {str(e)}")
|
||||
return {'status': 'error', 'action': 'exception', 'error': str(e)}
|
||||
|
||||
|
||||
class MailParserService:
|
||||
"""
|
||||
Mail Parser Service
|
||||
"""
|
||||
|
||||
# Singleton instance
|
||||
_instance = None
|
||||
REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS
|
||||
|
||||
@classmethod
|
||||
def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5):
|
||||
"""
|
||||
Handle Redis reconnection with exponential backoff based on consecutive errors
|
||||
|
||||
Args:
|
||||
consecutive_errors: Number of consecutive errors encountered
|
||||
max_consecutive_errors: Threshold for extended sleep time
|
||||
|
||||
Returns:
|
||||
tuple: (MainRedisHandler instance, bool indicating if extended sleep is needed)
|
||||
"""
|
||||
# Delegate to RedisHandler's reconnection logic
|
||||
redis_handler, need_extended_sleep = RedisHandler.handle_reconnection(
|
||||
consecutive_errors=consecutive_errors,
|
||||
max_consecutive_errors=max_consecutive_errors
|
||||
)
|
||||
|
||||
# If Redis handler was successfully reconnected, create a new MainRedisHandler instance
|
||||
if redis_handler:
|
||||
# Reset the singleton instance to force re-initialization
|
||||
cls._instance = None
|
||||
main_handler = cls()
|
||||
return main_handler, need_extended_sleep
|
||||
|
||||
return None, need_extended_sleep
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(MailParserService, cls).__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
# Initialize only once
|
||||
if hasattr(self, '_initialized') and self._initialized:
|
||||
return
|
||||
|
||||
# Use RedisHandler singleton
|
||||
self.service_retriever = ServiceTaskRetriever()
|
||||
self._initialized = True
|
||||
|
||||
def fetch_all_tasks(self) -> list[RedisTaskObject]:
|
||||
return self.service_retriever.fetch_all_tasks()
|
||||
|
||||
def ensure_connection(self):
|
||||
"""
|
||||
Ensure Redis connection is established
|
||||
|
||||
Returns:
|
||||
bool: True if connection is established, False otherwise
|
||||
"""
|
||||
return self.redis_handler.ensure_connection()
|
||||
|
||||
def _check_redis_connection(self) -> bool:
|
||||
"""
|
||||
Check if Redis connection is alive using RedisHandler
|
||||
|
||||
Returns:
|
||||
True if connection is alive, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Use RedisHandler to check connection
|
||||
connection_status = self.redis_handler.ensure_connection()
|
||||
if connection_status:
|
||||
logger.info("Redis connection established via RedisHandler")
|
||||
else:
|
||||
logger.error("Redis connection check failed via RedisHandler")
|
||||
return connection_status
|
||||
except RedisHandler.REDIS_EXCEPTIONS as e:
|
||||
logger.error(f"Redis connection failed: {str(e)}")
|
||||
return False
|
||||
|
||||
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
|
||||
"""
|
||||
Get a task object by its UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to retrieve
|
||||
|
||||
Returns:
|
||||
RedisTaskObject: The task object if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the UUID index or task is not found
|
||||
"""
|
||||
return self.service_retriever.get_task_by_uuid(task_uuid)
|
||||
|
||||
def get_service_data_by_uuid(self, task_uuid: str, service_name: str):
|
||||
"""
|
||||
Get service-specific data from a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task
|
||||
service_name: Name of the service to extract data for
|
||||
|
||||
Returns:
|
||||
Any: Service-specific data if found
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task or service data is not found
|
||||
"""
|
||||
return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name)
|
||||
|
||||
def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool:
|
||||
"""
|
||||
Update service-specific data in a task by UUID
|
||||
|
||||
Args:
|
||||
task_uuid: UUID of the task to update
|
||||
service_name: Name of the service data to update
|
||||
service_data: New service data
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the task is not found
|
||||
ValueError: If the update fails or service name is invalid
|
||||
"""
|
||||
return self.service_retriever.update_service_data(task_uuid, service_name, service_data)
|
||||
|
||||
def change_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False) -> bool:
|
||||
"""
|
||||
Update the service of a task by UUID
|
||||
"""
|
||||
return self.service_retriever.update_task_service(task_uuid, service_name, status, completed)
|
||||
|
||||
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
|
||||
"""
|
||||
Update the status of a task by UUID
|
||||
"""
|
||||
return self.service_retriever.update_task_status(task_uuid, is_completed, status)
|
||||
|
|
@ -1,195 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
ServiceTaskRetriever Usage Example
|
||||
|
||||
This script demonstrates how to use the ServiceTaskRetriever class
|
||||
and the updated MainRedisHandler for direct Redis task retrieval and updates.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
import logging
|
||||
from json import dumps
|
||||
|
||||
from config import Status, RedisData
|
||||
from redis_handlers import ServiceTaskRetriever, MainRedisHandler
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def direct_retriever_example():
|
||||
"""Example using ServiceTaskRetriever directly"""
|
||||
logger.info("=== ServiceTaskRetriever Direct Usage Example ===")
|
||||
|
||||
# Create a retriever instance
|
||||
retriever = ServiceTaskRetriever()
|
||||
|
||||
# Generate a test UUID
|
||||
test_uuid = str(uuid.uuid4())
|
||||
logger.info(f"Generated test UUID: {test_uuid}")
|
||||
|
||||
# Example mail data
|
||||
mail_data = {
|
||||
"id": "test-mail-123",
|
||||
"subject": "Test Mail Subject",
|
||||
"body": "This is a test mail body",
|
||||
"from": "test@example.com",
|
||||
"to": "recipient@example.com",
|
||||
"date": "2025-08-09T14:08:05+03:00"
|
||||
}
|
||||
|
||||
try:
|
||||
# Store a new task with UUID
|
||||
logger.info("Storing new task with UUID...")
|
||||
success = retriever.store_task_with_uuid(
|
||||
test_uuid,
|
||||
"MailReader",
|
||||
mail_data
|
||||
)
|
||||
logger.info(f"Task stored successfully: {success}")
|
||||
|
||||
# Retrieve the task by UUID
|
||||
logger.info("Retrieving task by UUID...")
|
||||
task = retriever.get_task_by_uuid(test_uuid)
|
||||
logger.info(f"Retrieved task: service={task.service}, status={task.status}")
|
||||
|
||||
# Get service-specific data
|
||||
logger.info("Retrieving service data by UUID...")
|
||||
mail_reader_data = retriever.get_service_data_by_uuid(test_uuid, "MailReader")
|
||||
logger.info(f"Mail subject: {mail_reader_data.get('subject')}")
|
||||
|
||||
# Update task status
|
||||
logger.info("Updating task status...")
|
||||
retriever.update_task_status(
|
||||
test_uuid,
|
||||
is_completed=True,
|
||||
status=Status.COMPLETED
|
||||
)
|
||||
|
||||
# Verify status update
|
||||
updated_task = retriever.get_task_by_uuid(test_uuid)
|
||||
logger.info(f"Updated task status: {updated_task.status}, completed: {updated_task.is_completed}")
|
||||
|
||||
# Update service data
|
||||
logger.info("Updating service data...")
|
||||
new_mail_data = mail_data.copy()
|
||||
new_mail_data["subject"] = "Updated Subject"
|
||||
retriever.update_service_data(test_uuid, "MailReader", new_mail_data)
|
||||
|
||||
# Verify service data update
|
||||
updated_mail_data = retriever.get_service_data_by_uuid(test_uuid, "MailReader")
|
||||
logger.info(f"Updated mail subject: {updated_mail_data.get('subject')}")
|
||||
|
||||
except FileNotFoundError as e:
|
||||
logger.error(f"Not found error: {e}")
|
||||
except ValueError as e:
|
||||
logger.error(f"Value error: {e}")
|
||||
|
||||
def main_handler_example():
|
||||
"""Example using MainRedisHandler"""
|
||||
logger.info("\n=== MainRedisHandler Usage Example ===")
|
||||
|
||||
# Create a handler instance
|
||||
handler = MainRedisHandler()
|
||||
|
||||
# Generate a test UUID
|
||||
test_uuid = str(uuid.uuid4())
|
||||
logger.info(f"Generated test UUID: {test_uuid}")
|
||||
|
||||
# Example mail data
|
||||
mail_data = {
|
||||
"id": "test-mail-456",
|
||||
"subject": "Test Mail via MainRedisHandler",
|
||||
"body": "This is a test mail body via MainRedisHandler",
|
||||
"from": "test@example.com",
|
||||
"to": "recipient@example.com",
|
||||
"date": "2025-08-09T14:08:05+03:00"
|
||||
}
|
||||
|
||||
try:
|
||||
# Store a new task with UUID
|
||||
logger.info("Storing new task with UUID via MainRedisHandler...")
|
||||
success = handler.store_task_with_uuid(
|
||||
test_uuid,
|
||||
"MailReader",
|
||||
mail_data
|
||||
)
|
||||
logger.info(f"Task stored successfully: {success}")
|
||||
|
||||
# Retrieve the task by UUID
|
||||
logger.info("Retrieving task by UUID...")
|
||||
task = handler.get_task_by_uuid(test_uuid)
|
||||
logger.info(f"Retrieved task: service={task.service}, status={task.status}")
|
||||
|
||||
# Get service-specific data
|
||||
logger.info("Retrieving service data by UUID...")
|
||||
mail_reader_data = handler.get_service_data_by_uuid(test_uuid, "MailReader")
|
||||
logger.info(f"Mail subject: {mail_reader_data.get('subject')}")
|
||||
|
||||
# Update task status
|
||||
logger.info("Updating task status...")
|
||||
handler.update_task_status(
|
||||
test_uuid,
|
||||
is_completed=True,
|
||||
status=Status.COMPLETED
|
||||
)
|
||||
|
||||
# Verify status update
|
||||
updated_task = handler.get_task_by_uuid(test_uuid)
|
||||
logger.info(f"Updated task status: {updated_task.status}, completed: {updated_task.is_completed}")
|
||||
|
||||
# Update service data
|
||||
logger.info("Updating service data...")
|
||||
new_mail_data = mail_data.copy()
|
||||
new_mail_data["subject"] = "Updated Subject via MainRedisHandler"
|
||||
handler.update_service_data(test_uuid, "MailReader", new_mail_data)
|
||||
|
||||
# Verify service data update
|
||||
updated_mail_data = handler.get_service_data_by_uuid(test_uuid, "MailReader")
|
||||
logger.info(f"Updated mail subject: {updated_mail_data.get('subject')}")
|
||||
|
||||
except FileNotFoundError as e:
|
||||
logger.error(f"Not found error: {e}")
|
||||
except ValueError as e:
|
||||
logger.error(f"Value error: {e}")
|
||||
|
||||
def error_handling_example():
|
||||
"""Example demonstrating error handling"""
|
||||
logger.info("\n=== Error Handling Example ===")
|
||||
|
||||
retriever = ServiceTaskRetriever()
|
||||
|
||||
# Try to retrieve non-existent task
|
||||
try:
|
||||
logger.info("Attempting to retrieve non-existent task...")
|
||||
task = retriever.get_task_by_uuid("non-existent-uuid")
|
||||
except FileNotFoundError as e:
|
||||
logger.info(f"Expected error caught: {e}")
|
||||
|
||||
# Try to update non-existent task
|
||||
try:
|
||||
logger.info("Attempting to update non-existent task...")
|
||||
retriever.update_task_status("non-existent-uuid", True, Status.COMPLETED)
|
||||
except FileNotFoundError as e:
|
||||
logger.info(f"Expected error caught: {e}")
|
||||
|
||||
# Try to update with invalid service name
|
||||
test_uuid = str(uuid.uuid4())
|
||||
mail_data = {"subject": "Test"}
|
||||
retriever.store_task_with_uuid(test_uuid, "MailReader", mail_data)
|
||||
|
||||
try:
|
||||
logger.info("Attempting to update with invalid service name...")
|
||||
retriever.update_service_data(test_uuid, "NonExistentService", {"data": "test"})
|
||||
except ValueError as e:
|
||||
logger.info(f"Expected error caught: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
direct_retriever_example()
|
||||
main_handler_example()
|
||||
error_handling_example()
|
||||
|
|
@ -1,27 +0,0 @@
|
|||
FROM python:3.9-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Install system dependencies
|
||||
RUN apt-get update && apt-get install -y build-essential libpq-dev && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Copy requirements first for better caching
|
||||
COPY ServicesRunnner/requirements.txt .
|
||||
|
||||
# Install Python dependencies
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Copy Prisma schema and generate client
|
||||
COPY ServicesRunnner/schema.prisma .
|
||||
COPY ServicesRunnner/Depends .
|
||||
|
||||
RUN prisma generate
|
||||
|
||||
# Copy the rest of the application
|
||||
COPY ServicesRunnner/ .
|
||||
|
||||
# Set environment variables
|
||||
ENV PYTHONPATH=/app
|
||||
|
||||
# Command to run when container starts
|
||||
CMD ["python", "-m", "template.py"]
|
||||
|
|
@ -1,4 +0,0 @@
|
|||
prisma==0.9.1
|
||||
python-dotenv==1.0.0
|
||||
asyncio==3.4.3
|
||||
uvloop>=0.19
|
||||
Binary file not shown.
|
|
@ -0,0 +1,240 @@
|
|||
import os
|
||||
|
||||
from redis import Redis
|
||||
from json import dumps, loads
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Any, Union
|
||||
from email import policy
|
||||
from email.message import EmailMessage
|
||||
from email.headerregistry import UniqueDateHeader, UniqueAddressHeader, UniqueUnstructuredHeader
|
||||
from email.parser import BytesParser
|
||||
from imaplib import IMAP4_SSL
|
||||
from config import EmailConfig, Config
|
||||
|
||||
|
||||
email_config = EmailConfig()
|
||||
config = Config()
|
||||
redis_client = Redis(
|
||||
host='10.10.2.15',
|
||||
password='your_strong_password_here',
|
||||
port=6379,
|
||||
db=0
|
||||
)
|
||||
|
||||
class Mails:
|
||||
|
||||
def __init__(self, mail_id: bytes, mail_data: bytes):
|
||||
self.id: bytes = mail_id
|
||||
self.raw_data: bytes = mail_data
|
||||
self.attachments: List[Dict[str, Union[str, bytes]]] = []
|
||||
self.message: EmailMessage = BytesParser(policy=policy.default).parsebytes(mail_data)
|
||||
self.subject: UniqueUnstructuredHeader = self.message.get('Subject', '') or ''
|
||||
self.from_: UniqueAddressHeader = self.message.get('From', '') or ''
|
||||
self.to: UniqueAddressHeader = self.message.get('To', '') or ''
|
||||
self.date: UniqueDateHeader = self.message.get('Date', '') or ''
|
||||
self.body_text: str = self._get_body_text()
|
||||
self._extract_attachments()
|
||||
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
'id': self.id.decode('utf-8'),
|
||||
# 'raw_data': self.raw_data.decode('utf-8'),
|
||||
'attachments': [{
|
||||
'filename': attachment['filename'],
|
||||
'content_type': attachment['content_type'],
|
||||
'charset': attachment['charset'],
|
||||
'data': attachment['data'].decode(attachment['charset'], errors='replace')
|
||||
} for attachment in self.attachments],
|
||||
# 'message': self.message.as_string(),
|
||||
'subject': str(self.subject),
|
||||
'from_': {
|
||||
"display_name": self.from_.addresses[0].display_name,
|
||||
"username": self.from_.addresses[0].username,
|
||||
"domain": self.from_.addresses[0].domain,
|
||||
"mail": f"{self.from_.addresses[0].username}@{self.from_.addresses[0].domain}"
|
||||
},
|
||||
'to': [
|
||||
{
|
||||
"display_name": address.display_name,
|
||||
"username": address.username,
|
||||
"domain": address.domain,
|
||||
"mail": f"{address.username}@{address.domain}"
|
||||
} for address in self.to.addresses
|
||||
],
|
||||
'date': str(self.date.datetime),
|
||||
'body_text': str(self.body_text)
|
||||
}
|
||||
|
||||
def _get_body_text(self) -> str:
|
||||
body = self.message.get_body(preferencelist=('plain',))
|
||||
if body is not None:
|
||||
return body.get_content() or ''
|
||||
if self.message.is_multipart():
|
||||
for part in self.message.walk():
|
||||
if part.get_content_type() == 'text/plain' and (part.get_content_disposition() or '') != 'attachment':
|
||||
try:
|
||||
return part.get_content() or ''
|
||||
except Exception:
|
||||
payload = part.get_payload(decode=True) or b''
|
||||
return payload.decode(part.get_content_charset() or 'utf-8', errors='replace')
|
||||
else:
|
||||
if self.message.get_content_type() == 'text/plain':
|
||||
try:
|
||||
return self.message.get_content() or ''
|
||||
except Exception:
|
||||
payload = self.message.get_payload(decode=True) or b''
|
||||
return payload.decode(self.message.get_content_charset() or 'utf-8', errors='replace')
|
||||
return ''
|
||||
|
||||
def _extract_attachments(self) -> None:
|
||||
for part in self.message.walk():
|
||||
if part.get_content_disposition() == 'attachment':
|
||||
filename = part.get_filename()
|
||||
if not filename:
|
||||
continue
|
||||
data = part.get_payload(decode=True) or b''
|
||||
charset = part.get_charset() or 'utf-8'
|
||||
self.attachments.append(
|
||||
{'filename': filename, 'content_type': part.get_content_type(), 'data': data, 'charset': charset}
|
||||
)
|
||||
|
||||
def save_attachments(self, folder: str) -> None:
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
for att in self.attachments:
|
||||
with open(os.path.join(folder, att['filename']), 'wb') as f:
|
||||
f.write(att['data'])
|
||||
|
||||
|
||||
class EmailReaderIsbankService:
|
||||
|
||||
NO_ATTACHMENT_FOLDER = "NoAttachment"
|
||||
COMPLETED_FOLDER = "Completed"
|
||||
|
||||
def __init__(self, email_config: EmailConfig, config: Config):
|
||||
self.email_config = email_config
|
||||
self.config = config
|
||||
self.mail = IMAP4_SSL(email_config.EMAIL_HOST, email_config.EMAIL_PORT)
|
||||
self.mail.login(email_config.EMAIL_USERNAME, email_config.EMAIL_PASSWORD)
|
||||
self.data: List[Mails] = []
|
||||
self.inc: int = 100
|
||||
self.start: int = 0
|
||||
self.end: int = self.inc
|
||||
|
||||
self._connect_inbox()
|
||||
self.mail_count = self._fetch_count()
|
||||
self._fetch_all()
|
||||
|
||||
|
||||
def _connect_inbox(self):
|
||||
"""INBOX'a bağlanır"""
|
||||
status, _ = self.mail.select("INBOX")
|
||||
if status != 'OK':
|
||||
raise Exception("INBOX'a bağlanılamadı")
|
||||
|
||||
def _fetch_count(self):
|
||||
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
|
||||
if status != 'OK':
|
||||
raise Exception("Mail sayısı alınamadı")
|
||||
return len(uids[0].split())
|
||||
|
||||
def _fetch_all(self):
|
||||
"""Tüm mailleri çeker ve self.data'ya Mails objesi olarak ekler"""
|
||||
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
|
||||
if status != 'OK':
|
||||
raise Exception("Mail arama başarısız")
|
||||
for uid in uids[0].split():
|
||||
status, msg_data = self.mail.uid('fetch', uid, '(RFC822)')
|
||||
if status == 'OK' and msg_data[0] is not None:
|
||||
self.data.append(Mails(uid, msg_data[0][1]))
|
||||
|
||||
def mark_no_attachment(self, uid: bytes):
|
||||
"""Mesajı arşive taşır"""
|
||||
self.mail.uid('COPY', uid, self.NO_ATTACHMENT_FOLDER)
|
||||
self.delete(uid)
|
||||
|
||||
def mark_completed(self, uid: bytes):
|
||||
"""Mesajı arşive taşır"""
|
||||
self.mail.uid('COPY', uid, self.COMPLETED_FOLDER)
|
||||
# self.delete(uid)
|
||||
|
||||
def delete(self, uid: bytes):
|
||||
"""Mesajı siler"""
|
||||
self.mail.uid('STORE', uid, '+FLAGS', r'(\Deleted)')
|
||||
|
||||
def commit(self):
|
||||
"""Bekleyen silme/taşıma işlemlerini uygular"""
|
||||
self.mail.expunge()
|
||||
|
||||
def logout(self):
|
||||
self.mail.logout()
|
||||
|
||||
@property
|
||||
def count(self):
|
||||
return len(self.data)
|
||||
|
||||
service = EmailReaderIsbankService(email_config, config)
|
||||
mails = service.data
|
||||
count = 0
|
||||
redis_prefix = "Bank:Services:Task"
|
||||
|
||||
my_service = "mail"
|
||||
for mail in mails:
|
||||
if not getattr(mail, 'id', None):
|
||||
continue
|
||||
mail_id = mail.id.decode('utf-8')
|
||||
if mail.attachments:
|
||||
not_seen = redis_client.sadd(f'{redis_prefix}:Seen', mail_id)
|
||||
index_of_set_data_get = redis_client.get(f'{redis_prefix}:Index')
|
||||
if not_seen:
|
||||
mail_to_dict = mail.to_dict()
|
||||
mail_without_attachments = mail_to_dict.copy()
|
||||
mail_without_attachments.pop('attachments', None)
|
||||
write_object = {
|
||||
'id': mail_id,
|
||||
'data': {
|
||||
"mail": mail_without_attachments,
|
||||
"parser": mail_to_dict.get('attachments', []),
|
||||
"ibanFinder": None,
|
||||
"commentFinder": None
|
||||
},
|
||||
'created_at': datetime.now().isoformat(),
|
||||
'completed': True,
|
||||
'status': 'red',
|
||||
'service': my_service,
|
||||
# 'bank': 'isbank',
|
||||
'is_completed': False
|
||||
}
|
||||
redis_write_ = redis_client.rpush(f'{redis_prefix}:Data', dumps(write_object))
|
||||
if redis_write_:
|
||||
if index_of_set_data_get:
|
||||
index_of_set_data_get = loads(index_of_set_data_get)
|
||||
index_of_set_data_get[str(mail_id)] = redis_write_ - 1
|
||||
else:
|
||||
index_of_set_data_get = {str(mail_id): redis_write_ - 1}
|
||||
index_of_set_data_set = redis_client.set(f'{redis_prefix}:Index', dumps(index_of_set_data_get))
|
||||
count += 1
|
||||
else:
|
||||
redis_client.spop(f'{redis_prefix}:Seen', mail_id)
|
||||
else:
|
||||
get_index = redis_client.get(f'{redis_prefix}:Index')
|
||||
if not get_index:
|
||||
continue
|
||||
get_index = loads(get_index)
|
||||
if get_index.get(str(mail_id), None):
|
||||
object_from_redis = redis_client.lindex(f'{redis_prefix}:Data', int(get_index[str(mail_id)]))
|
||||
if object_from_redis:
|
||||
object_from_redis = loads(object_from_redis)
|
||||
is_completed = object_from_redis.get('is_completed', False)
|
||||
id_ = object_from_redis.get('data', {}).get('id', None)
|
||||
if not mail_id == id_:
|
||||
raise Exception("Mail id not match with id from redis")
|
||||
if is_completed:
|
||||
service.mark_completed(mail_id)
|
||||
else:
|
||||
service.mark_no_attachment(mail_id)
|
||||
|
||||
service.commit()
|
||||
service.logout()
|
||||
|
||||
print("Total Mails: ", f"{count}/{service.count}")
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
import os
|
||||
|
||||
class Config:
|
||||
|
||||
MAILBOX: str = os.getenv("MAILBOX", "bilgilendirme@ileti.isbank.com.tr")
|
||||
EMAIL_HOST: str = os.getenv("EMAIL_HOST", "10.10.2.34")
|
||||
EMAIL_LOGIN_USER: str = os.getenv("EMAIL_READER_ADDRESS", "isbank@mehmetkaratay.com.tr")
|
||||
EMAIL_LOGIN_PASSWORD: str = os.getenv("EMAIL_LOGIN_PASSWORD", "system")
|
||||
AUTHORIZE_IBAN: str = os.getenv("AUTHORIZE_IBAN", "4245-0093333")
|
||||
EMAIL_PORT: int = int(os.getenv("EMAIL_PORT", 993))
|
||||
|
||||
|
||||
class EmailConfig:
|
||||
|
||||
EMAIL_HOST: str = Config.EMAIL_HOST
|
||||
EMAIL_USERNAME: str = Config.EMAIL_LOGIN_USER
|
||||
EMAIL_PASSWORD: str = Config.EMAIL_LOGIN_PASSWORD
|
||||
EMAIL_PORT: int = Config.EMAIL_PORT
|
||||
|
||||
@classmethod
|
||||
def as_dict(cls):
|
||||
return dict(
|
||||
host=EmailConfig.EMAIL_HOST,
|
||||
port=EmailConfig.EMAIL_PORT,
|
||||
username=EmailConfig.EMAIL_USERNAME,
|
||||
password=EmailConfig.EMAIL_PASSWORD
|
||||
)
|
||||
|
||||
|
||||
# INFO_MAIL: str = os.getenv("INFO_MAIL", "mehmet.karatay@hotmail.com")
|
||||
# EMAIL_SEND: bool = bool(os.getenv("EMAIL_SEND", False))
|
||||
# EMAIL_SEND_PORT: int = int(os.getenv("EMAIL_SEND_PORT", 587))
|
||||
# EMAIL_SLEEP: int = int(os.getenv("EMAIL_SLEEP", 60))
|
||||
# SERVICE_TIMING: int = int(os.getenv("SERVICE_TIMING", 900))
|
||||
# EMAIL_LOGIN_USER: str = os.getenv("EMAIL_LOGIN_USER", "karatay@mehmetkaratay.com.tr")
|
||||
# MAIN_MAIL: str = os.getenv("MAIN_MAIL", "karatay.berkay@gmail.com")
|
||||
# EMAIL_SEND: bool = Config.EMAIL_SEND
|
||||
|
|
@ -1,122 +0,0 @@
|
|||
services:
|
||||
|
||||
# prisma_service_test:
|
||||
# container_name: prisma_service_test
|
||||
# build:
|
||||
# context: .
|
||||
# dockerfile: ServicesRunner/AccountRecordServices/Test/Dockerfile
|
||||
# networks:
|
||||
# - bank-services-network
|
||||
# volumes:
|
||||
# - ./ServicesRunner/AccountRecordServices/Finder/Iban/venv:/opt/venv
|
||||
# - ./ServicesRunner/AccountRecordServices/Finder/Iban/.prisma-cache:/root/.cache/prisma-python
|
||||
# logging:
|
||||
# driver: "json-file"
|
||||
# options:
|
||||
# max-size: "10m"
|
||||
# max-file: "3"
|
||||
# healthcheck:
|
||||
# test: [ "CMD", "/opt/venv/bin/python", "-c", "import asyncio; from ServicesRunner.Depends.prisma_client import get_prisma_client; asyncio.run(get_prisma_client())" ]
|
||||
# interval: 15s
|
||||
# timeout: 10s
|
||||
# retries: 3
|
||||
|
||||
prisma_service_iban:
|
||||
container_name: prisma_service_iban
|
||||
build:
|
||||
context: .
|
||||
dockerfile: ServicesRunner/AccountRecordServices/Finder/Iban/Dockerfile
|
||||
networks:
|
||||
- bank-services-network
|
||||
volumes:
|
||||
- ./ServicesRunner/AccountRecordServices/Finder/Iban/venv:/opt/venv
|
||||
- ./ServicesRunner/AccountRecordServices/Finder/Iban/.prisma-cache:/root/.cache/prisma-python
|
||||
logging:
|
||||
driver: "json-file"
|
||||
options:
|
||||
max-size: "10m"
|
||||
max-file: "3"
|
||||
healthcheck:
|
||||
test: [ "CMD", "/opt/venv/bin/python", "-c", "import asyncio; from ServicesRunner.Depends.prisma_client import get_prisma_client; asyncio.run(get_prisma_client())" ]
|
||||
interval: 15s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
|
||||
# prisma_studio:
|
||||
# image: node:18
|
||||
# working_dir: /app
|
||||
# # volumes:
|
||||
# # - ./ServicesRunner/Depends:/app
|
||||
# ports:
|
||||
# - "5555:5555"
|
||||
# entrypoint: [ "/bin/sh", "-c" ]
|
||||
# command: |
|
||||
# "npx prisma studio --schema=/app/schema.prisma"
|
||||
# depends_on:
|
||||
# - prisma_service_test
|
||||
# networks:
|
||||
# - bank-services-network
|
||||
# logging:
|
||||
# driver: "json-file"
|
||||
# options:
|
||||
# max-size: "10m"
|
||||
# max-file: "3"
|
||||
# finder_payments:
|
||||
# container_name: finder_payments
|
||||
# env_file:
|
||||
# - api_env.env
|
||||
# build:
|
||||
# context: .
|
||||
# dockerfile: ServicesBank/Finder/Payment/Dockerfile
|
||||
# networks:
|
||||
# - bank-services-network
|
||||
# logging:
|
||||
# driver: "json-file"
|
||||
# options:
|
||||
# max-size: "10m"
|
||||
# max-file: "3"
|
||||
|
||||
isbank_email_reader:
|
||||
container_name: isbank_email_reader
|
||||
environment:
|
||||
- MAILBOX=bilgilendirme@ileti.isbank.com.tr
|
||||
- MAIN_MAIL=karatay.berkay@gmail.com
|
||||
- INFO_MAIL=mehmet.karatay@hotmail.com
|
||||
- EMAIL_HOST=10.10.2.34
|
||||
- EMAIL_READER_ADDRESS=isbank@mehmetkaratay.com.tr
|
||||
- EMAIL_LOGIN_PASSWORD=system
|
||||
- AUTHORIZE_IBAN=4245-0093333
|
||||
- SERVICE_TIMING=900
|
||||
- EMAIL_PORT=993
|
||||
- EMAIL_SLEEP=60
|
||||
build:
|
||||
context: .
|
||||
dockerfile: ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/Dockerfile
|
||||
networks:
|
||||
- bank-services-network
|
||||
logging:
|
||||
driver: "json-file"
|
||||
options:
|
||||
max-size: "10m"
|
||||
max-file: "3"
|
||||
|
||||
isbank_mail_parser:
|
||||
container_name: isbank_mail_parser
|
||||
build:
|
||||
context: .
|
||||
dockerfile: ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank/Dockerfile
|
||||
networks:
|
||||
- bank-services-network
|
||||
logging:
|
||||
driver: "json-file"
|
||||
options:
|
||||
max-size: "10m"
|
||||
max-file: "3"
|
||||
|
||||
networks:
|
||||
bank-services-network:
|
||||
driver: bridge
|
||||
|
||||
# volumes:
|
||||
# prisma_generated:
|
||||
# name: prisma_generated_volume
|
||||
Loading…
Reference in New Issue