Compare commits

..

7 Commits

Author SHA1 Message Date
Berkay 7452e05a92 venv and prisma cahce deleted 2025-08-10 11:30:56 +03:00
Berkay bd12fe02ae updated async prisma runner 2025-08-10 11:29:09 +03:00
Berkay a00c2942f5 async prisma runner completed 2025-08-10 11:23:18 +03:00
Berkay 768f0a5daf updated prisma service async runner 2025-08-10 11:18:00 +03:00
Berkay c2fd263f27 Revert "updated Service Runner email Reader"
This reverts commit 81184a8acc.
2025-08-10 11:14:54 +03:00
Berkay ac1980566a Revert "updated prisma service async runner"
This reverts commit db0ae34948.
2025-08-10 11:05:45 +03:00
Berkay db0ae34948 updated prisma service async runner 2025-08-10 11:01:26 +03:00
40 changed files with 6494 additions and 277 deletions

7
.gitignore vendored
View File

@ -54,3 +54,10 @@ pids
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
env
.env
ServicesRunner/AccountRecordServices/Finder/Iban/.prisma-cache
venv/
__pycache__/
*.py[cod]

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"git.ignoreLimitWarning": true
}

View File

@ -0,0 +1,14 @@
__pycache__/
*.pyc
*.pyo
*.pyd
*.db
*.sqlite3
*.log
*.env
venv/
.env.*
node_modules/
.prisma/
.prisma-cache/
ServicesRunnner/AccountRecordServices/Test/venv/

View File

@ -0,0 +1,23 @@
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV VIRTUAL_ENV=/opt/venv
ENV PRISMA_SCHEMA_PATH=/app/Depends/schema.prisma
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
ENV PYTHONPATH=/app
RUN apt-get update && apt-get install -y --no-install-recommends gcc curl && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY ServicesRunner/Depends/ /app/Depends
COPY ServicesRunner/requirements.txt /app/requirements.txt
COPY ServicesRunner/AccountRecordServices/Finder/Iban/entrypoint.sh /entrypoint.sh
COPY ServicesRunner/AccountRecordServices/Finder/Iban /app/
RUN chmod +x /entrypoint.sh
CMD ["/entrypoint.sh"]

View File

@ -0,0 +1,74 @@
import uvloop
import asyncio
import sys
import signal
import time
from datetime import datetime
from Depends.prisma_client import prisma_client, disconnect_prisma
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# sys.stdout.reconfigure(line_buffering=True) # alternatif: python -u veya PYTHONUNBUFFERED=1
async def tick():
start = time.time()
print(f"[{datetime.now()}] Attempting database query...")
async with prisma_client() as db:
rows = await db.account_records.find_many(
take=5, skip=0, order=[{"bank_date": "desc"}]
)
print(f"[{datetime.now()}] Query completed in {time.time()-start:.2f}s")
for i, r in enumerate(rows):
# Dilersen burada formatı değiştir
print(f" Row: {i} | id={r.id} bank_date={r.bank_date} currency_value={r.currency_value}")
print("-" * 80)
async def service():
print(f"[{datetime.now()}] IBAN Finder service starting")
try:
iteration = 0
while True:
iteration += 1
print(f"\n[{datetime.now()}] Loop iteration {iteration}")
try:
await tick()
except Exception as e:
print(f"[{datetime.now()}] Error in service tick: {e}")
await asyncio.sleep(1) # bloklamayan bekleme
finally:
# Her durumda DB'yi temiz kapat
await disconnect_prisma()
print(f"[{datetime.now()}] Cleaning up database connection...")
async def _graceful_shutdown(sig: signal.Signals):
print(f"\n[{datetime.now()}] Shutting down due to signal: {sig.name}")
# Burada istersen tüm pending task'leri iptal edebilirsin:
# for t in asyncio.all_tasks():
# if t is not asyncio.current_task():
# t.cancel()
await disconnect_prisma()
def _install_signal_handlers(loop: asyncio.AbstractEventLoop):
# Linux/Unix: SIGINT (Ctrl+C) ve SIGTERM (docker stop) için kibar kapanış
for s in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(s, lambda s=s: asyncio.create_task(_graceful_shutdown(s)))
async def main():
loop = asyncio.get_running_loop()
try:
_install_signal_handlers(loop)
except NotImplementedError:
# (Gerekirse Windows vs., ama sen Linux/Docker kullanıyorsun)
pass
await service()
if __name__ == "__main__":
# uvloop policy zaten yukarıda set edildi; burada normal asyncio.run kullanıyoruz
asyncio.run(main())

View File

@ -0,0 +1,18 @@
#!/bin/sh
VENV_PATH="/opt/venv"
REQUIREMENTS_PATH="/app/requirements.txt"
SCHEMA_PATH="/app/Depends/schema.prisma"
PRISMA_BINARY_PATH="/root/.cache/prisma-python/binaries"
if [ ! -x "$VENV_PATH/bin/python" ]; then
python -m venv "$VENV_PATH"
"$VENV_PATH/bin/pip" install --no-cache-dir -r "$REQUIREMENTS_PATH"
"$VENV_PATH/bin/prisma" generate --schema "$SCHEMA_PATH"
fi
if ! find "$PRISMA_BINARY_PATH" -type f -name "prisma-query-engine-debian-openssl-3.0.x" | grep -q .; then
"$VENV_PATH/bin/prisma" py fetch
fi
exec "$VENV_PATH/bin/python" -u app.py

View File

@ -0,0 +1 @@
3.12

View File

@ -0,0 +1,22 @@
FROM python:3.12-slim
WORKDIR /app
# Copy only the dependency files first to leverage Docker cache
COPY ServicesRunner/AccountRecordServices/Finder/Parser/pyproject.toml .
# Install dependencies
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir .
# Copy only the necessary directories
COPY ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank /app/ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank
COPY ServicesRunner/Depends /app/ServicesRunner/Depends
# Set the Python path to include the root directory and ServicesRunner
ENV PYTHONPATH=/app:/app/ServicesRunner
# Set working directory to the IsBank service directory
WORKDIR /app/ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank
# Run the application
CMD ["python", "app.py"]

View File

@ -0,0 +1,211 @@
import sys
import logging
import pandas as pd
from time import sleep
from datetime import datetime
from io import BytesIO
from ServicesRunner.Depends.config import ConfigServices, MailParser, RedisTaskObject, Status
from base64 import b64decode
from unidecode import unidecode
from Depends.service_handler import MailParserService
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout), logging.FileHandler('isbank_parser_service.log')]
)
logger = logging.getLogger('IsBank_Parser_Service')
class IsbankMailParserService:
def __init__(self):
self.mail_parser_service = MailParserService()
def try_dataframe_extract_with_xlsx(self, binary_data: BytesIO):
try:
df = pd.read_excel(binary_data, engine='openpyxl')
return df
except Exception as e:
return None
def try_dataframe_extract_with_xls(self, binary_data: BytesIO):
try:
df = pd.read_excel(binary_data, engine='xlrd')
return df
except Exception as e:
return None
def try_dataframe_extract_else(self, binary_data: BytesIO):
try:
df = pd.read_excel(binary_data, engine='openpyxl')
except Exception as e1:
try:
binary_data.seek(0)
df = pd.read_excel(binary_data, engine='xlrd')
except Exception as e2:
return None
return df
def find_ibans(self, excel_frame: pd.DataFrame, file_name: str) -> list[dict]:
"""Parse Excel file data.
Args:
excel_frame: DataFrame containing Excel data
Returns:
list[dict]: List of parsed data dictionaries
"""
iban, data_list = "", []
try:
for row in excel_frame.itertuples():
if "IBAN" in str(row[3]).upper():
iban = str(row[5]).replace(" ", "")
if not str(row[1]) == "nan" and not str(row[2]) == "nan":
if len(str(row[1]).split("/")) > 2:
data_list.append(
dict(
filename=file_name,
iban=str(iban),
bank_date=datetime.strptime(str(row[1]), "%d/%m/%Y-%H:%M:%S").strftime("%Y-%m-%d %H:%M:%S"),
channel_branch=unidecode(str(row[3])),
currency_value=(float(str(row[4]).replace(",", "")) if row[4] else 0),
balance=(float(str(row[5]).replace(",", "")) if row[5] else 0),
additional_balance=(float(str(row[6]).replace(",", "")) if row[6] else 0),
process_name=str(row[7]),
process_type=unidecode(str(row[8])),
process_comment=unidecode(str(row[9])),
bank_reference_code=str(row[15]),
)
)
except Exception as e:
print(f"[PARSER_SERVICE] Error parsing Excel file: {str(e)}")
return data_list
def parse_dataframes(self, dataframe: pd.DataFrame, task: RedisTaskObject, attachment_data: MailParser):
file_name = attachment_data.filename
data_list = self.find_ibans(dataframe, file_name)
print(f"[PARSER_SERVICE] Successfully parsed {len(data_list)} records from Excel file")
if data_list:
self.mail_parser_service.update_service_data(task.task, ConfigServices.SERVICE_PREFIX_FINDER_IBAN, data_list)
print(f"Updated service data for task {task.task} with {len(data_list)} records")
self.mail_parser_service.change_service(task.task, ConfigServices.SERVICE_PREFIX_MAIL_PARSER, Status.COMPLETED, True)
return True
self.mail_parser_service.change_service(task.task, ConfigServices.SERVICE_PREFIX_MAIL_PARSER, Status.FAILED, True)
return False
def process_task(self, active_task: RedisTaskObject):
"""Process a task object using the MailParserService
Args:
task: RedisTaskObject or task dictionary to process
"""
try:
for data in active_task.data.MailParser:
attachment_data: MailParser = data
if not attachment_data or not attachment_data.data:
print(f"[PARSER_SERVICE] No data found for task {active_task.task}")
continue
binary_data: bytes = b64decode(attachment_data.data)
excel_data = BytesIO(binary_data)
df = self.try_dataframe_extract_with_xlsx(excel_data)
if df is None:
excel_data.seek(0)
df = self.try_dataframe_extract_with_xls(excel_data)
if df is None:
excel_data.seek(0)
df = self.try_dataframe_extract_else(excel_data)
if df is not None:
self.parse_dataframes(df, active_task, attachment_data)
print(f"[PARSER_SERVICE] Completed processing task {active_task.task}")
except Exception as e:
print(f"[PARSER_SERVICE] Error processing task: {str(e)}")
if __name__ == "__main__":
logger.info("Starting IsBank Parser Service")
print(f"Starting IsBank Parser Service.")
# Initialize service
parser = IsbankMailParserService()
# Configurable parameters
normal_sleep_time = 10 # seconds between normal operations
error_sleep_time = 30 # seconds to wait after an error before retrying
max_consecutive_errors = 5 # maximum number of consecutive errors before longer pause
extended_error_sleep = 120 # seconds to wait after hitting max consecutive errors
consecutive_errors = 0
# Main service loop
while True:
try:
# Fetch all tasks
all_tasks = parser.mail_parser_service.fetch_all_tasks()
if all_tasks and len(all_tasks) > 0:
logger.info(f"Found {len(all_tasks)} tasks to process")
# Process each task
for active_task in all_tasks:
if active_task.service == ConfigServices.SERVICE_PREFIX_MAIL_PARSER and active_task.completed:
# logger.info(f"Task {active_task.task} is already processed.")
continue
logger.info(f"Processing task {active_task.task}")
parser.process_task(active_task)
else:
logger.info("No tasks found to process")
# Reset error counter on success
if consecutive_errors > 0:
logger.info(f"Service recovered after {consecutive_errors} consecutive errors")
consecutive_errors = 0
# Normal operation sleep
sleep(normal_sleep_time)
except MailParserService.REDIS_EXCEPTIONS as e:
# Redis-specific errors
consecutive_errors += 1
logger.error(f"Redis error (attempt {consecutive_errors}): {str(e)}")
# Use centralized reconnection handler from RedisHandler
mail_parser_service, need_extended_sleep = MailParserService.handle_reconnection(
consecutive_errors=consecutive_errors, max_consecutive_errors=max_consecutive_errors
)
if mail_parser_service:
# Update parser's mail parser service with the new instance
parser.mail_parser_service = mail_parser_service
# Sleep based on error count
if need_extended_sleep:
sleep(extended_error_sleep)
else:
sleep(error_sleep_time)
except Exception as e:
# Any other unexpected errors
consecutive_errors += 1
logger.error(f"Unexpected error (attempt {consecutive_errors}): {str(e)}")
# For any other error, try to reinitialize everything after some delay
if consecutive_errors >= max_consecutive_errors:
logger.warning(f"Hit {max_consecutive_errors} consecutive errors, reinitializing service")
try:
# Reinitialize the service directly
parser = IsbankMailParserService()
logger.info("Successfully reinitialized parser service")
consecutive_errors = 0 # Reset counter after reinitialization
except Exception as reinit_error:
logger.error(f"Service reinitialization failed: {str(reinit_error)}")
sleep(extended_error_sleep)
else:
# For fewer consecutive errors, just retry
sleep(error_sleep_time)

View File

@ -0,0 +1,6 @@
def main():
print("Hello from parser!")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,15 @@
[project]
name = "parser"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"arrow>=1.3.0",
"pandas>=2.2.3",
"redis>=6.4.0",
"unidecode>=1.3.8",
"xlrd>=2.0.1",
"openpyxl>=3.1.2",
"pydantic-settings>=2.8.1",
]

View File

@ -0,0 +1 @@
3.12

View File

@ -0,0 +1,22 @@
FROM python:3.12-slim
WORKDIR /app
# Copy only the dependency files first to leverage Docker cache
COPY ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/pyproject.toml .
# Install dependencies
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir .
# Copy only the necessary directories
COPY ServicesRunner/AccountRecordServices/Reader/Banks /app/ServicesRunner/AccountRecordServices/Reader/Banks
COPY ServicesRunner/Depends /app/ServicesRunner/Depends
# Set the Python path to include the root directory and ServicesRunner
ENV PYTHONPATH=/app:/app/ServicesRunner
# Set working directory to the IsBank service directory
WORKDIR /app/ServicesRunner/AccountRecordServices/Reader/Banks/IsBank
# Run the application
CMD ["python", "app.py"]

View File

@ -0,0 +1,176 @@
import sys
import socket
import logging
from time import sleep
from config import IsBankConfig
from Depends.mail_handler import EmailReaderService, EmailServiceRunner
from Depends.service_handler import MailReaderService
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout), logging.FileHandler('isbank_email_service.log')]
)
logger = logging.getLogger('IsBank_Email_Service')
# Main application code
def main():
"""Main entry point for the IsBank email service"""
try:
redis_handler = MailReaderService()
email_service = EmailReaderService(IsBankConfig())
email_service.login_and_connect()
runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service)
runner.fetch_and_set_mails()
runner.drop()
return True
except Exception as e:
logger.error(f"Error in main function: {str(e)}")
return False
def initialize_service():
"""Initialize the service with proper error handling"""
try:
# Create singleton instances directly
logger.info("Creating Redis handler singleton")
redis_handler = MailReaderService()
logger.info("Creating EmailReaderService")
email_service = EmailReaderService(IsBankConfig())
# Initialize email service and connect
logger.info("Connecting to email service")
email_service.login_and_connect()
# Create email service runner with the singletons
logger.info("Creating EmailServiceRunner")
runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service)
if runner:
logger.info("Email service runner initialized successfully")
return runner
else:
logger.error("Failed to initialize email service runner")
# Sleep before retry to avoid rapid failure loops
sleep(5)
return initialize_service()
except Exception as e:
logger.error(f"Service initialization failed: {str(e)}")
# Sleep before retry to avoid rapid failure loops
sleep(5)
return initialize_service()
if __name__ == "__main__":
logger.info("Starting IsBank Email Service")
print(f"Starting Service Mail Reader.")
# Initialize service
runner = initialize_service()
# Configurable parameters
normal_sleep_time = 10 # seconds between normal operations
error_sleep_time = 30 # seconds to wait after an error before retrying
max_consecutive_errors = 5 # maximum number of consecutive errors before longer pause
extended_error_sleep = 120 # seconds to wait after hitting max consecutive errors
consecutive_errors = 0
# Main service loop
while True:
try:
# Main processing
runner.fetch_and_set_mails()
# Reset error counter on success
if consecutive_errors > 0:
logger.info(f"Service recovered after {consecutive_errors} consecutive errors")
consecutive_errors = 0
# Normal operation sleep
sleep(normal_sleep_time)
except MailReaderService.REDIS_EXCEPTIONS as e:
# Redis-specific errors
consecutive_errors += 1
logger.error(f"Redis error (attempt {consecutive_errors}): {str(e)}")
# Use centralized reconnection handler from RedisHandler
redis_handler, need_extended_sleep = MailReaderService.handle_reconnection(
consecutive_errors=consecutive_errors, max_consecutive_errors=max_consecutive_errors
)
if redis_handler:
# Update runner's redis handler with the new instance
runner.redis_handler = redis_handler
runner.redis_connected = False # Will trigger reconnection on next cycle
# Sleep based on error count
if need_extended_sleep:
sleep(extended_error_sleep)
else:
sleep(error_sleep_time)
except socket.error as e:
# Email connection errors
consecutive_errors += 1
logger.error(f"Email connection error (attempt {consecutive_errors}): {str(e)}")
# Try to re-establish email connection
try:
logger.info("Attempting to re-establish email connection...")
# Create new email service directly
email_service = EmailReaderService(IsBankConfig())
email_service.login_and_connect()
# Create new runner with existing Redis handler and new email service
redis_handler = runner.redis_handler # Preserve existing Redis handler
runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service)
logger.info("Successfully re-established email connection")
except Exception as email_retry_error:
logger.error(f"Failed to re-establish email connection: {str(email_retry_error)}")
# Determine sleep time based on consecutive errors
if consecutive_errors >= max_consecutive_errors:
logger.warning(f"Hit {max_consecutive_errors} consecutive email errors, taking longer pause")
sleep(extended_error_sleep)
else:
sleep(error_sleep_time)
except Exception as e:
# Any other unexpected errors
consecutive_errors += 1
logger.error(f"Unexpected error (attempt {consecutive_errors}): {str(e)}")
# For any other error, try to reinitialize everything after some delay
if consecutive_errors >= max_consecutive_errors:
logger.warning(f"Hit {max_consecutive_errors} consecutive errors, reinitializing service")
try:
# Try to clean up existing connections
try:
runner.drop()
except Exception as cleanup_error:
logger.warning(f"Error during cleanup: {str(cleanup_error)}")
# Reinitialize the service directly
redis_handler = MailReaderService()
email_service = EmailReaderService(IsBankConfig())
email_service.login_and_connect()
runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service)
if runner:
logger.info("Successfully reinitialized email service runner")
consecutive_errors = 0 # Reset counter after reinitialization
else:
logger.error("Failed to reinitialize email service runner")
except Exception as reinit_error:
logger.error(f"Service reinitialization failed: {str(reinit_error)}")
sleep(extended_error_sleep)
else:
# For fewer consecutive errors, just retry the current runner
print(f"Error: {str(e)}")
sleep(error_sleep_time)

View File

@ -0,0 +1,16 @@
import os
from Depends.config import ConfigServices
class IsBankConfig:
MAILBOX: str = os.getenv("MAILBOX", "bilgilendirme@ileti.isbank.com.tr")
AUTHORIZE_IBAN: str = os.getenv("AUTHORIZE_IBAN", "4245-0093333")
NO_ATTACHMENT_FOLDER: str = "NoAttachment"
COMPLETED_FOLDER: str = "Completed"
TASK_DATA_PREFIX: str = ConfigServices.MAIN_TASK_PREFIX
TASK_MAILID_INDEX_PREFIX: str = ConfigServices.TASK_MAILID_INDEX_PREFIX
TASK_UUID_INDEX_PREFIX: str = ConfigServices.TASK_UUID_INDEX_PREFIX
TASK_SEEN_PREFIX: str = ConfigServices.TASK_SEEN_PREFIX
SERVICE_PREFIX: str = ConfigServices.SERVICE_PREFIX_MAIL_READER
NEXT_SERVICE_PREFIX: str = ConfigServices.SERVICE_PREFIX_MAIL_PARSER

View File

@ -0,0 +1,13 @@
[project]
name = "isbank-email-reader"
version = "0.1.0"
description = "IsBank Email Reader Service"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"arrow>=1.3.0",
"redis>=6.4.0",
"pydantic>=2.0.0",
"pydantic-settings>=2.0.0",
"email-validator>=2.0.0",
]

View File

@ -0,0 +1,4 @@
Uses
- MainRedisHandler
- MailHandler

View File

@ -0,0 +1,9 @@
[project]
name = "reader"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"redis>=6.4.0",
]

View File

@ -0,0 +1,23 @@
version = 1
revision = 3
requires-python = ">=3.12"
[[package]]
name = "reader"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "redis" },
]
[package.metadata]
requires-dist = [{ name = "redis", specifier = ">=6.4.0" }]
[[package]]
name = "redis"
version = "6.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/0d/d6/e8b92798a5bd67d659d51a18170e91c16ac3b59738d91894651ee255ed49/redis-6.4.0.tar.gz", hash = "sha256:b01bc7282b8444e28ec36b261df5375183bb47a07eb9c603f284e89cbc5ef010", size = 4647399, upload-time = "2025-08-07T08:10:11.441Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e8/02/89e2ed7e85db6c93dfa9e8f691c5087df4e3551ab39081a4d7c6d1f90e05/redis-6.4.0-py3-none-any.whl", hash = "sha256:f0544fa9604264e9464cdf4814e7d4830f74b165d52f2a330a760a88dd248b7f", size = 279847, upload-time = "2025-08-07T08:10:09.84Z" },
]

View File

@ -0,0 +1,14 @@
__pycache__/
*.pyc
*.pyo
*.pyd
*.db
*.sqlite3
*.log
*.env
venv/
.env.*
node_modules/
.prisma/
.prisma-cache/
ServicesRunnner/AccountRecordServices/Test/venv/

View File

@ -0,0 +1,20 @@
FROM python:3.12-slim
ENV VIRTUAL_ENV=/app/ServicesRunner/Depends/opt/venv
ENV PRISMA_SCHEMA_PATH=/app/ServicesRunner/Depends/schema.prisma
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
ENV PYTHONPATH=/app
RUN apt-get update && apt-get install -y --no-install-recommends gcc curl && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY ServicesRunner/requirements.txt /app/ServicesRunner/requirements.txt
COPY ServicesRunner/Depends/schema.prisma /app/ServicesRunner/Depends/schema.prisma
COPY ServicesRunner/Depends/ /app/ServicesRunner/Depends/
COPY ServicesRunner/AccountRecordServices/Test/ /app/ServicesRunner/AccountRecordServices/Test/
COPY ServicesRunner/AccountRecordServices/Test/entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
CMD ["/entrypoint.sh"]

View File

@ -0,0 +1,29 @@
import asyncio
import sys
from Depends.prisma_client import prisma_client
async def my_function():
try:
async with prisma_client() as db:
result = await db.account_records.find_many(
take=5,
skip=0,
order=[{"bank_date": "desc"}]
)
selected_result = [{"id": record.id, "bank_date": record.bank_date, "currency_value": record.currency_value} for record in result]
for record in selected_result:
print(record)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
raise
if __name__ == "__main__":
while True:
print("I am online")
asyncio.sleep(30)
# asyncio.run(my_function())
# asyncio.run(my_function())
# while True:
# asyncio.sleep(5)

View File

@ -0,0 +1,18 @@
#!/bin/sh
VENV_PATH="/opt/venv"
REQUIREMENTS_PATH="/app/ServicesRunner/requirements.txt"
SCHEMA_PATH="/app/ServicesRunner/Depends/schema.prisma"
PRISMA_BINARY_PATH="/app/ServicesRunner/Depends/root/.cache/prisma-python/binaries"
if [ ! -x "$VENV_PATH/bin/python" ]; then
python -m venv "$VENV_PATH"
"$VENV_PATH/bin/pip" install --no-cache-dir -r "$REQUIREMENTS_PATH"
"$VENV_PATH/bin/prisma" generate --schema "$SCHEMA_PATH"
fi
if ! find "$PRISMA_BINARY_PATH" -type f -name "prisma-query-engine-debian-openssl-3.0.x" | grep -q .; then
"$VENV_PATH/bin/prisma" py fetch
fi
exec "$VENV_PATH/bin/python" -m ServicesRunner.AccountRecordServices.Test.app

View File

@ -0,0 +1,116 @@
import os
from pydantic import BaseModel
from typing import Any, List, Optional, Union
class FromToHeader(BaseModel):
display_name: Optional[str]
username: Optional[str]
domain: Optional[str]
mail: Optional[str]
class MailReader(BaseModel):
id: str
subject: str
from_: FromToHeader
to: List[FromToHeader]
date: str
body_text: str
class MailParser(BaseModel):
filename: str
content_type: str
charset: str
data: str
class FinderIban(BaseModel):
...
class FinderComment(BaseModel):
...
class RedisData(BaseModel):
MailReader: MailReader
MailParser: List[MailParser]
FinderIban: FinderIban | Any
FinderComment: FinderComment | Any
class Status:
PENDING: str = "PENDING"
IN_PROGRESS: str = "IN_PROGRESS"
COMPLETED: str = "COMPLETED"
FAILED: str = "FAILED"
class RedisTaskObject(BaseModel):
task: str
data: RedisData
completed: bool
service: str
status: str
created_at: str
is_completed: bool
class EmailConfig:
HOST: str = os.getenv("EMAIL_HOST", "10.10.2.34")
USERNAME: str = os.getenv("EMAIL_USERNAME", "isbank@mehmetkaratay.com.tr")
PASSWORD: str = os.getenv("EMAIL_PASSWORD", "system")
PORT: int = int(os.getenv("EMAIL_PORT", 993))
@classmethod
def as_dict(cls):
return dict(host=EmailConfig.HOST, port=EmailConfig.PORT, username=EmailConfig.USERNAME, password=EmailConfig.PASSWORD)
class RedisConfig:
HOST: str = os.getenv("REDIS_HOST", "10.10.2.15")
PASSWORD: str = os.getenv("REDIS_PASSWORD", "your_strong_password_here")
PORT: int = int(os.getenv("REDIS_PORT", 6379))
DB: int = int(os.getenv("REDIS_DB", 0))
@classmethod
def as_dict(cls):
return dict(host=RedisConfig.HOST, port=int(RedisConfig.PORT), password=RedisConfig.PASSWORD, db=int(RedisConfig.DB))
class MailReaderMainConfig:
MAILBOX: str
AUTHORIZE_IBAN: str
NO_ATTACHMENT_FOLDER: str
COMPLETED_FOLDER: str
TASK_DATA_PREFIX: str
TASK_MAILID_INDEX_PREFIX: str
TASK_UUID_INDEX_PREFIX: str
TASK_SEEN_PREFIX: str
SERVICE_PREFIX: str
NEXT_SERVICE_PREFIX: str
class ConfigServices:
MAIN_TASK_PREFIX: str = "BANK:SERVICES:TASK:DATA"
TASK_MAILID_INDEX_PREFIX: str = "BANK:SERVICES:TASK:MAILID"
TASK_UUID_INDEX_PREFIX: str = "BANK:SERVICES:TASK:UUID"
TASK_SEEN_PREFIX: str = "BANK:SERVICES:TASK:SEEN"
SERVICE_PREFIX_MAIL_READER: str = "MailReader"
SERVICE_PREFIX_MAIL_PARSER: str = "MailParser"
SERVICE_PREFIX_FINDER_IBAN: str = "FinderIban"
SERVICE_PREFIX_FINDER_COMMENT: str = "FinderComment"
paramsRedisData = Union[MailReader, MailParser, FinderIban, FinderComment]

View File

@ -0,0 +1,586 @@
import os
import socket
import logging
from functools import wraps
from base64 import b64encode
from time import sleep
from datetime import datetime
from typing import List, Dict, Any, Union, TypeVar, Tuple
from email.message import EmailMessage
from email.policy import default as policy
from email.headerregistry import UniqueDateHeader, UniqueAddressHeader, UniqueUnstructuredHeader
from email.parser import BytesParser
from imaplib import IMAP4_SSL, IMAP4
from Depends.redis_handlers import RedisHandler
from Depends.config import EmailConfig, MailReaderMainConfig, MailReader, MailParser, RedisData
from Depends.service_handler import MailReaderService
# Configure logging
logger = logging.getLogger('Email Reader Service')
# Type variable for generic function return types
T = TypeVar('T')
def retry_on_connection_error(max_retries: int = 3, delay: int = 5, backoff: int = 2, exceptions=(Exception,)):
"""
Retry decorator with exponential backoff for handling connection errors
Args:
max_retries: Maximum number of retries
delay: Initial delay between retries in seconds
backoff: Backoff multiplier
exceptions: Tuple of exceptions to catch
Returns:
Decorated function
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
mtries, mdelay = max_retries, delay
while mtries > 0:
try:
return func(*args, **kwargs)
except exceptions as e:
logger.warning(f"Connection error in {func.__name__}: {str(e)}, retrying in {mdelay}s...")
sleep(mdelay)
mtries -= 1
mdelay *= backoff
# Final attempt
return func(*args, **kwargs)
return wrapper
return decorator
class Mails:
"""Class representing an email with attachments and metadata"""
def __init__(self, mail_id: bytes, mail_data: bytes):
"""
Initialize a mail object
Args:
mail_id: Unique identifier for the email
mail_data: Raw email data
"""
self.id: bytes = mail_id
self.raw_data: bytes = mail_data
self.attachments: List[Dict[str, Union[str, bytes]]] = []
self.message: EmailMessage = BytesParser(policy=policy).parsebytes(mail_data)
self.subject: UniqueUnstructuredHeader = self.message.get('Subject', '') or ''
self.from_: UniqueAddressHeader = self.message.get('From', '') or ''
self.to: UniqueAddressHeader = self.message.get('To', '') or ''
self.date: UniqueDateHeader = self.message.get('Date', '') or ''
self.body_text: str = self._get_body_text()
self._extract_attachments()
def to_dict(self) -> Dict[str, Any]:
"""
Convert mail object to dictionary representation
Returns:
Dictionary representation of mail
"""
return {
'id': self.id.decode('utf-8'),
'attachments': [{
'filename': attachment['filename'],
'content_type': attachment['content_type'],
'charset': attachment['charset'],
'data': b64encode(attachment['data']).decode(attachment['charset'], errors='replace')
} for attachment in self.attachments],
'subject': str(self.subject),
'from_': {
"display_name": self.from_.addresses[0].display_name,
"username": self.from_.addresses[0].username,
"domain": self.from_.addresses[0].domain,
"mail": f"{self.from_.addresses[0].username}@{self.from_.addresses[0].domain}"
},
'to': [
{
"display_name": address.display_name,
"username": address.username,
"domain": address.domain,
"mail": f"{address.username}@{address.domain}"
} for address in self.to.addresses
],
'date': str(self.date.datetime),
'body_text': str(self.body_text)
}
def _get_body_text(self) -> str:
"""
Extract plain text body from email
Returns:
Plain text body of email
"""
body = self.message.get_body(preferencelist=('plain',))
if body is not None:
return body.get_content() or ''
if self.message.is_multipart():
for part in self.message.walk():
if part.get_content_type() == 'text/plain' and (part.get_content_disposition() or '') != 'attachment':
try:
return part.get_content() or ''
except Exception:
payload = part.get_payload(decode=True) or b''
return payload.decode(part.get_content_charset() or 'utf-8', errors='replace')
else:
if self.message.get_content_type() == 'text/plain':
try:
return self.message.get_content() or ''
except Exception:
payload = self.message.get_payload(decode=True) or b''
return payload.decode(self.message.get_content_charset() or 'utf-8', errors='replace')
return ''
def _extract_attachments(self) -> None:
"""Extract attachments from email"""
for part in self.message.walk():
if part.get_content_disposition() == 'attachment':
filename = part.get_filename()
if not filename:
continue
data = part.get_payload(decode=True) or b''
charset = part.get_charset() or 'utf-8'
self.attachments.append({'filename': filename, 'content_type': part.get_content_type(), 'data': data, 'charset': charset})
def save_attachments(self, folder: str) -> None:
"""
Save attachments to folder
Args:
folder: Folder to save attachments to
"""
os.makedirs(folder, exist_ok=True)
for att in self.attachments:
with open(os.path.join(folder, att['filename']), 'wb') as f:
f.write(att['data'])
class EmailReaderService:
"""Service for reading emails from mailbox with improved connection resilience"""
def __init__(self, config: MailReaderMainConfig):
"""
Initialize email reader service
Args:
config: Application configuration
"""
self.email_config = EmailConfig()
self.config = config
self.mail = None
self.data: List[Mails] = []
self.mail_count = 0
self.is_connected = False
self.connect_imap()
def connect_imap(self) -> bool:
"""
Establish IMAP connection with retry mechanism
Returns:
True if connection successful, False otherwise
"""
try:
if self.mail:
# Try to close existing connection if any
try:
self.mail.close()
self.mail.logout()
except Exception:
pass
logger.info(f"Connecting to IMAP server {self.email_config.HOST}:{self.email_config.PORT}")
self.mail = IMAP4_SSL(self.email_config.HOST, self.email_config.PORT)
self.is_connected = True
return True
except (socket.error, IMAP4.error) as e:
logger.error(f"Failed to connect to IMAP server: {str(e)}")
self.is_connected = False
return False
@retry_on_connection_error(max_retries=3, delay=5, exceptions=(socket.error, IMAP4.error, OSError))
def login_and_connect(self) -> bool:
"""
Login to IMAP server and connect to inbox with retry mechanism
Returns:
True if login successful, False otherwise
Raises:
ConnectionError: If connection cannot be established
"""
if not self.is_connected:
if not self.connect_imap():
raise ConnectionError("Cannot establish connection to IMAP server")
try:
logger.info(f"Logging in as {self.email_config.USERNAME}")
self.mail.login(self.email_config.USERNAME, self.email_config.PASSWORD)
self._connect_inbox()
logger.info("Successfully logged in and connected to inbox")
return True
except (socket.error, IMAP4.error) as e:
logger.error(f"Login failed: {str(e)}")
self.is_connected = False
raise
@retry_on_connection_error(max_retries=2, delay=3, exceptions=(socket.error, IMAP4.error, OSError))
def refresh(self) -> Tuple[List[Mails], int, int]:
"""
Refresh mail data with connection retry
Returns:
Tuple of (mail data, mail count, data length)
"""
try:
self.mail_count = self._fetch_count()
self.data = self._fetch_all()
return self.data, self.mail_count, len(self.data)
except (socket.error, IMAP4.error) as e:
logger.error(f"Refresh failed, attempting to reconnect: {str(e)}")
self.connect_imap()
self.login_and_connect()
self.mail_count = self._fetch_count()
self.data = self._fetch_all()
return self.data, self.mail_count, len(self.data)
@retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error))
def _connect_inbox(self) -> None:
"""
Connect to INBOX with retry mechanism
Raises:
IMAP4.error: If connection to INBOX fails
"""
logger.info("Selecting INBOX folder")
status, _ = self.mail.select("INBOX")
if status != 'OK':
error_msg = "Failed to connect to INBOX"
logger.error(error_msg)
raise IMAP4.error(error_msg)
@retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error))
def _fetch_count(self) -> int:
"""
Fetch mail count with retry mechanism
Returns:
Number of emails
Raises:
IMAP4.error: If fetching mail count fails
"""
try:
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
if status != 'OK':
raise IMAP4.error("Failed to get mail count")
count = len(uids[0].split()) if uids[0] else 0
logger.info(f"Found {count} emails from {self.config.MAILBOX}")
return count
except (socket.error, IMAP4.error) as e:
logger.error(f"Error fetching mail count: {str(e)}")
raise
@retry_on_connection_error(max_retries=2, delay=2, exceptions=(socket.error, IMAP4.error))
def _fetch_all(self) -> List[Mails]:
"""
Fetch all mails with retry mechanism
Returns:
List of mail objects
Raises:
IMAP4.error: If fetching mails fails
"""
self.data = []
try:
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
if status != 'OK':
raise IMAP4.error("Mail search failed")
if not uids[0]:
logger.info("No emails found matching criteria")
return self.data
uid_list = uids[0].split()
logger.info(f"Processing {len(uid_list)} emails")
for uid in uid_list:
try:
status, msg_data = self.mail.uid('fetch', uid, '(RFC822)')
if status == 'OK' and msg_data[0] is not None:
self.data.append(Mails(uid, msg_data[0][1]))
except Exception as e:
logger.warning(f"Failed to fetch email with UID {uid}: {str(e)}")
continue
logger.info(f"Successfully fetched {len(self.data)} emails")
return self.data
except (socket.error, IMAP4.error) as e:
logger.error(f"Error fetching emails: {str(e)}")
raise
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def mark_no_attachment(self, uid):
"""
Move message to no attachment folder with retry mechanism
Args:
uid: Email UID
"""
try:
# Handle both string and bytes types for logging
log_uid = uid
if isinstance(uid, bytes):
log_uid = uid.decode('utf-8', errors='replace')
logger.info(f"Moving email {log_uid} to {self.config.NO_ATTACHMENT_FOLDER} folder")
self.mail.uid('COPY', uid, self.config.NO_ATTACHMENT_FOLDER)
self.delete(uid)
except Exception as e:
logger.error(f"Failed to mark email as no attachment: {str(e)}")
raise
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def mark_completed(self, uid: bytes):
"""
Move message to completed folder with retry mechanism
Args:
uid: Email UID
"""
try:
logger.info(f"Moving email {uid.decode('utf-8', errors='replace')} to {self.config.COMPLETED_FOLDER} folder")
self.mail.uid('COPY', uid, self.config.COMPLETED_FOLDER)
# self.delete(uid)
except Exception as e:
logger.error(f"Failed to mark email as completed: {str(e)}")
raise
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def delete(self, uid):
"""
Delete message with retry mechanism
Args:
uid: Email UID
"""
try:
# Handle both string and bytes types
log_uid = uid
if isinstance(uid, bytes):
log_uid = uid.decode('utf-8', errors='replace')
logger.info(f"Marking email {log_uid} for deletion")
self.mail.uid('STORE', uid, '+FLAGS', r'(\Deleted)')
except Exception as e:
logger.error(f"Failed to delete email: {str(e)}")
raise
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def commit(self):
"""
Commit pending operations with retry mechanism
Raises:
Exception: If commit fails
"""
try:
logger.info("Committing changes (expunge)")
self.mail.expunge()
except Exception as e:
logger.error(f"Failed to commit changes: {str(e)}")
raise
def logout(self):
"""Logout from IMAP server"""
if self.mail and self.is_connected:
try:
logger.info("Logging out from IMAP server")
self.mail.close()
self.mail.logout()
self.is_connected = False
except Exception as e:
logger.warning(f"Logout failed: {str(e)}")
@property
def count(self):
"""Get count of emails"""
return len(self.data)
class EmailServiceRunner:
"""Runner for email service with improved resilience to connection issues"""
def __init__(self, redis_handler: MailReaderService, email_service: EmailReaderService) -> None:
"""
Initialize email service runner
Args:
redis_handler: Redis handler for Redis operations
email_service: Email service for email operations
"""
# Use MailReaderService singleton for Redis operations
self.redis_handler = redis_handler
self.email_service = email_service
self.mails = None
self.count = 0
self.counter = 0
self.mail_count = 0
self.redis_connected = self._check_redis_connection()
def _check_redis_connection(self) -> bool:
"""
Check if Redis connection is alive using MailReaderService
Returns:
True if connection is alive, False otherwise
"""
try:
# Use MailReaderService to check connection
connection_status = self.redis_handler.ensure_connection()
if connection_status:
logger.info("Redis connection established via MailReaderService")
else:
logger.error("Redis connection check failed via MailReaderService")
return connection_status
except MailReaderService.REDIS_EXCEPTIONS as e:
logger.error(f"Redis connection failed: {str(e)}")
return False
@retry_on_connection_error(max_retries=3, delay=5, exceptions=MailReaderService.REDIS_EXCEPTIONS)
def _ensure_redis_connection(self) -> bool:
"""
Ensure Redis connection is established using MailReaderService
Returns:
True if connection is established, False otherwise
"""
if not self.redis_connected:
try:
self.redis_connected = self.redis_handler.ensure_connection()
if not self.redis_connected:
raise RedisHandler.REDIS_EXCEPTIONS[0]("Failed to establish Redis connection")
except Exception as e:
self.redis_connected = False
raise
return self.redis_connected
def fetch_and_set_mails(self):
"""
Fetch and process emails with improved error handling
Raises:
Exception: If fetching or processing emails fails
"""
try:
# Reset counters
self.count = 0
self.counter = 0
# Ensure Redis connection before proceeding
self._ensure_redis_connection()
# Refresh email data
try:
self.mails, self.mail_count, self.count = self.email_service.refresh()
except Exception as e:
logger.error(f"Failed to refresh emails: {str(e)}")
raise
# Process each email
for mail in self.mails:
if not getattr(mail, 'id', None):
logger.warning("Skipping email with no ID")
continue
mail_id = mail.id.decode('utf-8')
# check mail has .pdf extension
mail_dict = mail.to_dict()
try:
if mail.attachments:
if any([attachment['filename'].lower().endswith('.pdf') for attachment in mail_dict['attachments']]):
self.email_service.mark_no_attachment(mail_id)
else:
self._process_mail_with_attachments(mail, mail_id)
else:
self.email_service.mark_no_attachment(mail_id)
except Exception as e:
logger.error(f"Error processing email {mail_id}: {str(e)}")
continue
try:
self.email_service.commit()
except Exception as e:
logger.error(f"Failed to commit email changes: {str(e)}")
timestamp = f"TIMESTAMP: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
logger.info(f"Fetched and processed emails. Read/Total: {self.counter}/{self.count}. {self.count - self.counter} already saved. Mail count: {self.mail_count}")
print(f"{timestamp} Mail Reader Service | Fetching and setting mails. Read / Total Mails: {self.counter}/{self.count}. {self.count - self.counter} already saved in redis. Mail Server mail count: {self.mail_count}")
except MailReaderService.REDIS_EXCEPTIONS as e:
logger.error(f"Redis error in fetch_and_set_mails: {str(e)}")
self.redis_connected = False
raise
except Exception as e:
logger.error(f"Unexpected error in fetch_and_set_mails: {str(e)}")
raise
def _process_mail_with_attachments(self, mail: Mails, mail_id: str):
"""
Process an email with attachments using MailReaderService
Args:
mail: Mail object
mail_id: Mail ID
Raises:
Exception: If processing mail fails
"""
try:
mail_to_dict = mail.to_dict()
result = self.redis_handler.process_mail(
mail_id=mail_id, mail_data=mail_to_dict, service_prefix=self.email_service.config.SERVICE_PREFIX, counter=self.counter
)
if result['status'] == 'success':
if result['action'] == 'stored_new_mail':
self.counter = result.get('counter', self.counter)
logger.info(f"Successfully processed new email {mail_id}")
elif result['action'] == 'checked_existing_mail':
if result.get('is_completed', False):
logger.info(f"Marking completed email {mail_id}")
self.email_service.mark_completed(mail_id)
elif result['status'] == 'error':
if result['action'] == 'id_mismatch':
logger.error(f"Mail ID mismatch: {mail_id} != {result.get('stored_id')}")
raise ValueError("Mail id does not match with id from Redis")
else:
logger.error(f"Email Service Runner Error processing mail {mail_id}: {result.get('error', 'Unknown error')}")
raise Exception(result.get('error', 'Unknown error during mail processing'))
else:
logger.warning(f"Unexpected result status: {result['status']} for mail {mail_id}")
except MailReaderService.REDIS_EXCEPTIONS as e:
logger.error(f"Redis error while processing mail {mail_id}: {str(e)}")
self.redis_connected = False
raise
except Exception as e:
logger.error(f"Email Service Runner Error processing mail {mail_id}: {str(e)}")
raise
def drop(self):
"""Clean up resources"""
try:
self.email_service.commit()
except Exception as e:
logger.warning(f"Error during commit on drop: {str(e)}")
try:
self.email_service.logout()
except Exception as e:
logger.warning(f"Error during logout on drop: {str(e)}")

View File

@ -0,0 +1,49 @@
from prisma import Prisma
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Optional
# Singleton pattern for Prisma client
_prisma_client: Optional[Prisma] = None
async def get_prisma_client() -> Prisma:
"""
Get or initialize the Prisma client singleton.
Returns:
Prisma: The initialized Prisma client instance
"""
global _prisma_client
if _prisma_client is None:
_prisma_client = Prisma()
await _prisma_client.connect()
return _prisma_client
@asynccontextmanager
async def prisma_client() -> AsyncGenerator[Prisma, None]:
"""
Context manager for Prisma client operations.
Yields:
Prisma: The initialized Prisma client instance
Example:
```python
async with prisma_client() as db:
users = await db.user.find_many()
```
"""
client = await get_prisma_client()
try:
yield client
except Exception as e:
print(f"Database operation error: {e}")
raise
async def disconnect_prisma():
"""Disconnect the Prisma client when shutting down the application."""
global _prisma_client
if _prisma_client is not None:
await _prisma_client.disconnect()
_prisma_client = None

View File

@ -0,0 +1,160 @@
import logging
from contextlib import contextmanager
from time import sleep
from redis import Redis, RedisError, ConnectionError as RedisConnectionError
from Depends.config import ConfigServices, RedisConfig
# Configure logging
logger = logging.getLogger('Redis_Handler')
@contextmanager
def safe_redis_operation(redis_client: Redis, operation_name: str = "Redis operation"):
"""
Context manager for safely executing Redis operations with error handling
"""
try:
yield redis_client
except RedisConnectionError as e:
logger.error(f"{operation_name} failed due to Redis connection error: {str(e)}")
raise
except RedisError as e:
logger.error(f"{operation_name} failed due to Redis error: {str(e)}")
raise
except Exception as e:
logger.error(f"{operation_name} failed with unexpected error: {str(e)}")
raise
class RedisHandler:
"""Singleton Redis handler class for centralized Redis operations"""
# Singleton instance
_instance = None
# Redis exceptions constant for unified error handling
REDIS_EXCEPTIONS = (RedisConnectionError, RedisError)
def __new__(cls):
if cls._instance is None:
cls._instance = super(RedisHandler, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
# Initialize only once
if self._initialized:
return
# Initialize Redis client with retry logic
self.redis_client = self._create_redis_client()
self.redis_connected = self._check_redis_connection()
self._initialized = True
def _create_redis_client(self):
"""Create a Redis client with connection retry"""
max_retries = 5
retry_delay = 5
for attempt in range(max_retries):
try:
client = Redis(**RedisConfig.as_dict())
client.ping() # Test the connection
logger.info("Redis connection established successfully")
return client
except (RedisConnectionError, RedisError) as e:
if attempt < max_retries - 1:
logger.warning(f"Redis connection attempt {attempt + 1} failed: {str(e)}. Retrying in {retry_delay} seconds...")
sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
logger.error(f"Failed to connect to Redis after {max_retries} attempts: {str(e)}")
# Continue with a new Redis client instance even if ping fails
# This allows the service to start and retry connections later
return Redis(**RedisConfig.as_dict())
def _check_redis_connection(self) -> bool:
"""Check if Redis connection is alive"""
try:
self.ping()
return True
except Exception as e:
return False
def ping(self):
"""Ping Redis server to check connection"""
return self.redis_client.ping()
def sadd(self, key: str, value):
"""Add a value to a Redis set"""
return self.redis_client.sadd(key, value)
def get(self, key: str):
"""Get a value from Redis by key"""
return self.redis_client.get(key)
def set(self, key: str, value):
"""Set a key-value pair in Redis"""
return self.redis_client.set(key, value)
def rpush(self, key: str, value):
"""Append a value to a Redis list"""
return self.redis_client.rpush(key, value)
def lindex(self, key: str, index: int):
"""Get an element from a Redis list by its index"""
return self.redis_client.lindex(key, index)
def spop(self, key: str, value):
"""Remove and return a random member from a Redis set"""
return self.redis_client.spop(key, value)
def get_all_tasks(self):
"""Get all keys matching the task prefix pattern"""
return self.redis_client.lrange(ConfigServices.MAIN_TASK_PREFIX, 0, -1)
def ensure_connection(self) -> bool:
"""Check if Redis connection is alive and reconnect if needed"""
if not self.redis_connected:
try:
self.redis_client = self._create_redis_client()
self.redis_connected = self._check_redis_connection()
if self.redis_connected:
logger.info("Redis connection re-established successfully")
return self.redis_connected
except Exception as e:
logger.error(f"Failed to re-establish Redis connection: {str(e)}")
return False
return True
@classmethod
def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5):
"""
Handle Redis reconnection with exponential backoff based on consecutive errors
Args:
consecutive_errors: Number of consecutive errors encountered
max_consecutive_errors: Threshold for extended sleep time
Returns:
tuple: (RedisHandler instance, bool indicating if extended sleep is needed)
"""
try:
# Get a fresh instance (will reconnect internally)
instance = cls()
instance.redis_connected = instance._check_redis_connection()
logger.info("Recreated Redis handler using singleton pattern")
# Determine if extended sleep is needed
need_extended_sleep = consecutive_errors >= max_consecutive_errors
if need_extended_sleep:
logger.warning(f"Hit {max_consecutive_errors} consecutive Redis errors, taking longer pause")
return instance, need_extended_sleep
except Exception as redis_retry_error:
logger.error(f"Failed to recreate Redis handler: {str(redis_retry_error)}")
return None, consecutive_errors >= max_consecutive_errors

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,835 @@
import logging
from json import loads, dumps
from uuid import uuid4
from datetime import datetime
from Depends.config import Status, ConfigServices, RedisTaskObject, RedisData
from Depends.redis_handlers import RedisHandler
# Configure logging
logger = logging.getLogger('Service Task Retriever')
class ServiceTaskRetriever:
"""
Class for retrieving and updating Redis task objects by UUID or mail ID.
Provides direct access to task objects and service-specific data without iteration.
"""
def __init__(self, redis_handler=None):
"""
Initialize the ServiceTaskRetriever
Args:
redis_handler: Optional RedisHandler instance. If not provided, a new one will be created.
"""
if redis_handler:
self.redis_handler = redis_handler
else:
self.redis_handler = RedisHandler()
self.redis_client = self.redis_handler.redis_client
self.redis_prefix = ConfigServices.MAIN_TASK_PREFIX
self.mailid_index_key = ConfigServices.TASK_MAILID_INDEX_PREFIX
self.uuid_index_key = ConfigServices.TASK_UUID_INDEX_PREFIX
def fetch_all_tasks(self):
"""
Get all tasks from Redis
Returns:
list: List of task objects
"""
all_task = self.redis_handler.get_all_tasks()
return [RedisTaskObject(**loads(task)) for task in all_task]
def get_index_by_uuid(self, task_uuid: str):
"""
Get the index of a task by its UUID
Args:
task_uuid: UUID of the task
Returns:
int: Index of the task if found
Raises:
FileNotFoundError: If the UUID index or task is not found
"""
uuid_index_data = self.redis_handler.get(self.uuid_index_key)
if uuid_index_data:
uuid_index_dict = loads(uuid_index_data)
return uuid_index_dict.get(task_uuid, None)
raise FileNotFoundError(f"UUID index not found for {task_uuid}")
def get_index_by_mail_id(self, mail_id: str):
"""
Get the index of a task by its mail ID
Args:
mail_id: Mail ID of the task
Returns:
int: Index of the task if found
Raises:
FileNotFoundError: If the mail ID index or task is not found
"""
mail_id_index_data = self.redis_handler.get(self.mailid_index_key)
if mail_id_index_data:
mail_id_index_dict = loads(mail_id_index_data)
return mail_id_index_dict.get(str(mail_id), None)
raise FileNotFoundError(f"Mail ID index not found for {mail_id}")
def set_index_uuid(self, task_uuid: str, index: int):
"""
Set the index of a task by its mail ID
Args:
task_uuid: UUID of the task
index: Index of the task
"""
already_dict = self.redis_handler.get(self.mailid_index_key)
if already_dict:
already_dict = loads(already_dict)
already_dict[str(task_uuid)] = index
self.redis_handler.set(self.mailid_index_key, dumps(already_dict))
else:
self.redis_handler.set(self.mailid_index_key, dumps({str(task_uuid): index}))
def set_index_mail_id(self, mail_id: str, index: int):
"""
Set the index of a task by its mail ID
Args:
mail_id: Mail ID of the task
index: Index of the task
"""
already_dict = self.redis_handler.get(self.mailid_index_key)
if already_dict:
already_dict = loads(already_dict)
already_dict[str(mail_id)] = index
self.redis_handler.set(self.mailid_index_key, dumps(already_dict))
else:
self.redis_handler.set(self.mailid_index_key, dumps({str(mail_id): index}))
def update_mail_id_index(self, task_uuid: str, index: int):
"""
Update the mail ID index with the same index as UUID index
Args:
mail_id: Mail ID of the task
task_uuid: UUID of the task
"""
if get_index_by_uuid := self.get_index_by_uuid(task_uuid):
self.set_index_uuid(task_uuid, get_index_by_uuid)
def update_uuid_index(self, task_uuid: str, mail_id: str):
"""
Update the UUID index with the same index as mail ID index
Args:
task_uuid: UUID of the task
mail_id: Mail ID of the task
"""
if get_index_by_mail_id := self.get_index_by_mail_id(mail_id):
self.set_index_uuid(task_uuid, get_index_by_mail_id)
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
"""
Get a task object directly by its UUID without iteration
Args:
task_uuid: UUID of the task to retrieve
Returns:
RedisTaskObject: The task object if found
Raises:
FileNotFoundError: If the UUID index or task is not found
"""
index_by_uuid = self.get_index_by_uuid(task_uuid)
if not index_by_uuid:
raise FileNotFoundError(f"UUID index not found for {task_uuid}")
if task_data := self.redis_client.lindex(self.redis_prefix, int(index_by_uuid)):
return RedisTaskObject(**loads(task_data))
raise FileNotFoundError(f"Task not found for UUID: {task_uuid}")
def get_task_by_mail_id(self, mail_id: str) -> RedisTaskObject:
"""
Get a task object directly by its mail ID without iteration
Args:
mail_id: Mail ID of the task to retrieve
Returns:
RedisTaskObject: The task object if found
Raises:
FileNotFoundError: If the mail ID index or task is not found
"""
mail_id_index = self.get_index_by_mail_id(mail_id)
if mail_id_index is not None:
if task_data := self.redis_client.lindex(self.redis_prefix, int(mail_id_index)):
return RedisTaskObject(**loads(task_data))
raise FileNotFoundError(f"Task not found for mail ID: {mail_id}")
def get_service_data_by_uuid(self, task_uuid: str, service_name: str):
"""
Get service-specific data from a task by UUID
Args:
task_uuid: UUID of the task
service_name: Name of the service to extract data for
Returns:
Any: Service-specific data if found
Raises:
FileNotFoundError: If the task or service data is not found
"""
task_object = self.get_task_by_uuid(task_uuid)
# Extract service data
for attr in ConfigServices.__dict__:
if attr == service_name:
service_data = getattr(task_object.data, attr, None)
if service_data:
return service_data
raise FileNotFoundError(f"Service data '{service_name}' not found in task {task_uuid}")
def get_service_data_by_mail_id(self, mail_id: str, service_name: str):
"""
Get service-specific data from a task by mail ID
Args:
mail_id: Mail ID of the task
service_name: Name of the service to extract data for
Returns:
Any: Service-specific data if found
Raises:
FileNotFoundError: If the task or service data is not found
"""
task_object = self.get_task_by_mail_id(mail_id)
for attr in ConfigServices.__dict__:
if attr == service_name:
service_data = getattr(task_object.data, attr, None)
if service_data:
return service_data
raise FileNotFoundError(f"Service data '{service_name}' not found in task for mail ID {mail_id}")
def create_task_with_uuid(self, task_uuid: str, service_name: str, mail_reader: dict, mail_parser: dict) -> bool:
"""
Create a new task with UUID indexing. This method will fail if a task with the UUID already exists.
Args:
task_uuid: UUID for the task
service_name: Service name for the task
task_data: Dictionary containing task data
Returns:
bool: True if successful
Raises:
ValueError: If the task data is invalid, storage fails, or task already exists
"""
# Check if task with this UUID already exists
try:
existing_task = self.get_task_by_uuid(task_uuid)
# If we get here, task exists
raise ValueError(f"Task with UUID {task_uuid} already exists. Use store_task_with_uuid to update.")
except FileNotFoundError:
# Task doesn't exist, proceed with creation
pass
# Validate service name
self._validate_service_name(service_name)
# Create new RedisData with proper defaults for all services
data_dict = {'MailReader': None, 'MailParser': [], 'FinderIban': {}, 'FinderComment': {}}
# Set the actual service data
data_dict['MailReader'] = mail_reader
data_dict['MailParser'] = mail_parser
# Create new RedisData object
redis_data = RedisData(**data_dict)
# Create new task object
write_object = RedisTaskObject(
task=task_uuid,
data=redis_data,
completed=False,
service=service_name,
status=Status.COMPLETED,
created_at=datetime.now().isoformat(),
is_completed=False
)
# Convert to dict for serialization
write_object = write_object.model_dump()
# Push new task to Redis list
redis_write_ = self.redis_client.rpush(self.redis_prefix, dumps(write_object))
if not redis_write_:
raise ValueError(f"Failed to write task data to Redis for UUID {task_uuid}")
index_value = redis_write_ - 1
self.set_index_mail_id(mail_reader['id'], index_value)
self.set_index_uuid(task_uuid, index_value)
return True
def store_task_with_uuid(self, task_uuid: str, service_name: str, task_data: dict) -> bool:
"""
Update an existing task with UUID indexing. Only the service-specific data will be updated
while preserving other service data and task metadata.
Args:
task_uuid: UUID for the task
service_name: Service name for the task
task_data: Dictionary containing service-specific data
Returns:
bool: True if successful
Raises:
FileNotFoundError: If the task does not exist
ValueError: If the task data is invalid or storage fails
"""
# Validate service name
self._validate_service_name(service_name)
# Get existing task
try:
existing_task = self.get_task_by_uuid(task_uuid)
except FileNotFoundError:
raise FileNotFoundError(f"Task with UUID {task_uuid} not found. Use create_task_with_uuid to create a new task.")
# Prepare new service data
new_service_data = task_data
# Get the existing data model
existing_data = existing_task.data
# Create a new RedisData with all existing service data
data_dict = existing_data.model_dump()
# Update only the specific service data
data_dict[service_name] = new_service_data
# Create updated RedisData object
redis_data = RedisData(**data_dict)
# Create task object with existing metadata but updated data
write_object = RedisTaskObject(
task=task_uuid,
data=redis_data,
completed=existing_task.completed,
service=existing_task.service,
status=existing_task.status,
created_at=existing_task.created_at,
is_completed=existing_task.is_completed
)
# Convert to dict for serialization
write_object = write_object.model_dump()
# Get task index
index_value, _ = self._get_task_index_by_uuid(task_uuid)
# Update the task at the existing index
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(write_object)):
raise ValueError(f"Failed to update task data in Redis for UUID {task_uuid}")
return True
def _get_task_index_by_uuid(self, task_uuid: str) -> tuple:
"""
Helper method to get task index by UUID
Args:
task_uuid: UUID of the task
Returns:
tuple: (index_value, task_data_dict) where index_value is the index in Redis list
and task_data_dict is the deserialized task data
Raises:
FileNotFoundError: If the UUID index or task is not found
"""
# Get UUID index
uuid_index_data = self.redis_client.get(self.uuid_index_key)
if not uuid_index_data:
raise FileNotFoundError(f"UUID index not found for task: {task_uuid}")
# Parse index and get task index
uuid_index_dict = loads(uuid_index_data)
index_value = uuid_index_dict.get(task_uuid)
if index_value is None:
raise FileNotFoundError(f"Task UUID {task_uuid} not found in index")
# Get task data
task_data = self.redis_client.lindex(self.redis_prefix, int(index_value))
if not task_data:
raise FileNotFoundError(f"No task data found at index {index_value}")
return index_value, loads(task_data)
def _validate_service_name(self, service_name: str) -> bool:
"""
Validate that a service name exists in ConfigServices
Args:
service_name: Name of the service to validate
Returns:
bool: True if valid
Raises:
ValueError: If service name is invalid
"""
# Check if service_name is one of the values in ConfigServices
valid_service_names = [
ConfigServices.SERVICE_PREFIX_MAIL_READER,
ConfigServices.SERVICE_PREFIX_MAIL_PARSER,
ConfigServices.SERVICE_PREFIX_FINDER_IBAN,
ConfigServices.SERVICE_PREFIX_FINDER_COMMENT
]
if service_name in valid_service_names:
return True
raise ValueError(f"Invalid service name: {service_name}")
def update_task_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False):
"""
Update the service of a task by UUID
Args:
task_uuid: UUID of the task to update
service_name: Name of the service to update
Returns:
bool: True if successful
Raises:
FileNotFoundError: If the task is not found
ValueError: If the update fails or service name is invalid
"""
# Get task index and data
index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid)
# Update task status
task_object_dict['service'] = service_name
task_object_dict['status'] = status
task_object_dict['completed'] = completed
# Write updated task back to Redis
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)):
raise ValueError(f"Failed to write updated task data for UUID {task_uuid}")
return True
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
"""
Update the status of a task by UUID
Args:
task_uuid: UUID of the task to update
is_completed: Whether the task is completed
status: New status for the task
Returns:
bool: True if successful
Raises:
FileNotFoundError: If the task is not found
ValueError: If the update fails
"""
# Get task index and data
index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid)
# Update task status
task_object_dict['is_completed'] = is_completed
task_object_dict['status'] = status
task_object_dict['completed'] = is_completed
# Write updated task back to Redis
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)):
raise ValueError(f"Failed to write updated task data for UUID {task_uuid}")
return True
def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool:
"""
Update service-specific data in a task by UUID
Args:
task_uuid: UUID of the task to update
service_name: Name of the service data to update
service_data: New service data
Returns:
bool: True if successful
Raises:
FileNotFoundError: If the task is not found
ValueError: If the update fails or service name is invalid
"""
# Validate service name
self._validate_service_name(service_name)
# Get task index and data
index_value, task_object_dict = self._get_task_index_by_uuid(task_uuid)
# Update the service data
if 'data' not in task_object_dict:
task_object_dict['data'] = {}
task_object_dict['data'][service_name] = service_data
# Write updated task back to Redis
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)):
raise ValueError(f"Failed to update service data for UUID {task_uuid}")
return True
class MailReaderService:
"""
Main handler class that uses ServiceTaskRetriever with RedisHandler for Redis operations.
This class serves as the main entry point for Redis operations in the application.
Uses the RedisHandler singleton for all Redis operations.
"""
# Singleton instance
_instance = None
REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS
@classmethod
def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5):
"""
Handle Redis reconnection with exponential backoff based on consecutive errors
Args:
consecutive_errors: Number of consecutive errors encountered
max_consecutive_errors: Threshold for extended sleep time
Returns:
tuple: (MainRedisHandler instance, bool indicating if extended sleep is needed)
"""
# Delegate to RedisHandler's reconnection logic
redis_handler, need_extended_sleep = RedisHandler.handle_reconnection(
consecutive_errors=consecutive_errors,
max_consecutive_errors=max_consecutive_errors
)
# If Redis handler was successfully reconnected, create a new MainRedisHandler instance
if redis_handler:
# Reset the singleton instance to force re-initialization
cls._instance = None
main_handler = cls()
return main_handler, need_extended_sleep
return None, need_extended_sleep
def __new__(cls):
if cls._instance is None:
cls._instance = super(MailReaderService, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
# Initialize only once
if hasattr(self, '_initialized') and self._initialized:
return
# Use RedisHandler singleton
self.redis_handler = RedisHandler()
self.service_retriever = ServiceTaskRetriever(self.redis_handler)
self._initialized = True
def ensure_connection(self):
"""
Ensure Redis connection is established
Returns:
bool: True if connection is established, False otherwise
"""
return self.redis_handler.ensure_connection()
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
"""
Get a task object by its UUID
Args:
task_uuid: UUID of the task to retrieve
Returns:
RedisTaskObject: The task object if found
Raises:
FileNotFoundError: If the UUID index or task is not found
"""
return self.service_retriever.get_task_by_uuid(task_uuid)
def get_task_by_mail_id(self, mail_id: str) -> RedisTaskObject:
"""
Get a task object by its mail ID
Args:
mail_id: Mail ID of the task to retrieve
Returns:
RedisTaskObject: The task object if found
Raises:
FileNotFoundError: If the mail ID index or task is not found
"""
return self.service_retriever.get_task_by_mail_id(mail_id)
def get_service_data_by_uuid(self, task_uuid: str, service_name: str):
"""
Get service-specific data from a task by UUID
Args:
task_uuid: UUID of the task
service_name: Name of the service to extract data for
Returns:
Any: Service-specific data if found
Raises:
FileNotFoundError: If the task or service data is not found
"""
return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name)
def get_service_data_by_mail_id(self, mail_id: str, service_name: str):
"""
Get service-specific data from a task by mail ID
Args:
mail_id: Mail ID of the task
service_name: Name of the service to extract data for
Returns:
Any: Service-specific data if found
Raises:
FileNotFoundError: If the task or service data is not found
"""
return self.service_retriever.get_service_data_by_mail_id(mail_id, service_name)
def store_task_with_uuid(self, task_uuid: str, service_name: str, task_data: dict) -> bool:
"""
Store a task with UUID indexing
Args:
task_uuid: UUID for the task
service_name: Service name for the task
task_data: Dictionary containing task data
Returns:
bool: True if successful
Raises:
ValueError: If the task data is invalid or storage fails
"""
return self.service_retriever.store_task_with_uuid(task_uuid, service_name, task_data)
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
"""
Update the status of a task by UUID
Args:
task_uuid: UUID of the task to update
is_completed: Whether the task is completed
status: New status for the task
Returns:
bool: True if successful
Raises:
FileNotFoundError: If the task is not found
ValueError: If the update fails
"""
return self.service_retriever.update_task_status(task_uuid, is_completed, status)
def process_mail(self, mail_id: str, mail_data: dict, service_prefix: str, counter: int) -> dict:
"""
Process mail data and store it in Redis
Args:
mail_id: The ID of the mail
mail_data: Dictionary containing mail data
service_prefix: Service prefix for the mail
Returns:
dict: Result of the operation with status and action
"""
try:
if self.redis_handler.sadd(f'{ConfigServices.TASK_SEEN_PREFIX}', mail_id):
counter += 1
task_uuid = uuid4().hex
mail_without_attachments = mail_data.copy()
attachments = mail_without_attachments.pop('attachments', [])
create_task = dict(task_uuid=task_uuid, service_name=service_prefix, mail_reader=mail_without_attachments, mail_parser=attachments)
self.service_retriever.create_task_with_uuid(**create_task)
return {'status': 'success', 'action': 'stored_new_mail', 'counter': counter}
else:
try:
task = self.service_retriever.get_task_by_mail_id(mail_id)
if task is None and task.data and task.data.MailReader:
stored_id = task.data.MailReader.id
if stored_id != mail_id:
return {'status': 'error', 'action': 'id_mismatch', 'stored_id': stored_id}
return {'status': 'success', 'action': 'checked_existing_mail', 'is_completed': task.is_completed if task else False}
except FileNotFoundError:
return {'status': 'error', 'action': 'not_found', 'error': f'Mail with ID {mail_id} not found in index'}
except Exception as e:
logger.error(f"Mail Reader Service Error processing mail {mail_id}: {str(e)}")
return {'status': 'error', 'action': 'exception', 'error': str(e)}
class MailParserService:
"""
Mail Parser Service
"""
# Singleton instance
_instance = None
REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS
@classmethod
def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5):
"""
Handle Redis reconnection with exponential backoff based on consecutive errors
Args:
consecutive_errors: Number of consecutive errors encountered
max_consecutive_errors: Threshold for extended sleep time
Returns:
tuple: (MainRedisHandler instance, bool indicating if extended sleep is needed)
"""
# Delegate to RedisHandler's reconnection logic
redis_handler, need_extended_sleep = RedisHandler.handle_reconnection(
consecutive_errors=consecutive_errors,
max_consecutive_errors=max_consecutive_errors
)
# If Redis handler was successfully reconnected, create a new MainRedisHandler instance
if redis_handler:
# Reset the singleton instance to force re-initialization
cls._instance = None
main_handler = cls()
return main_handler, need_extended_sleep
return None, need_extended_sleep
def __new__(cls):
if cls._instance is None:
cls._instance = super(MailParserService, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
# Initialize only once
if hasattr(self, '_initialized') and self._initialized:
return
# Use RedisHandler singleton
self.service_retriever = ServiceTaskRetriever()
self._initialized = True
def fetch_all_tasks(self) -> list[RedisTaskObject]:
return self.service_retriever.fetch_all_tasks()
def ensure_connection(self):
"""
Ensure Redis connection is established
Returns:
bool: True if connection is established, False otherwise
"""
return self.redis_handler.ensure_connection()
def _check_redis_connection(self) -> bool:
"""
Check if Redis connection is alive using RedisHandler
Returns:
True if connection is alive, False otherwise
"""
try:
# Use RedisHandler to check connection
connection_status = self.redis_handler.ensure_connection()
if connection_status:
logger.info("Redis connection established via RedisHandler")
else:
logger.error("Redis connection check failed via RedisHandler")
return connection_status
except RedisHandler.REDIS_EXCEPTIONS as e:
logger.error(f"Redis connection failed: {str(e)}")
return False
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
"""
Get a task object by its UUID
Args:
task_uuid: UUID of the task to retrieve
Returns:
RedisTaskObject: The task object if found
Raises:
FileNotFoundError: If the UUID index or task is not found
"""
return self.service_retriever.get_task_by_uuid(task_uuid)
def get_service_data_by_uuid(self, task_uuid: str, service_name: str):
"""
Get service-specific data from a task by UUID
Args:
task_uuid: UUID of the task
service_name: Name of the service to extract data for
Returns:
Any: Service-specific data if found
Raises:
FileNotFoundError: If the task or service data is not found
"""
return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name)
def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool:
"""
Update service-specific data in a task by UUID
Args:
task_uuid: UUID of the task to update
service_name: Name of the service data to update
service_data: New service data
Returns:
bool: True if successful
Raises:
FileNotFoundError: If the task is not found
ValueError: If the update fails or service name is invalid
"""
return self.service_retriever.update_service_data(task_uuid, service_name, service_data)
def change_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False) -> bool:
"""
Update the service of a task by UUID
"""
return self.service_retriever.update_task_service(task_uuid, service_name, status, completed)
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
"""
Update the status of a task by UUID
"""
return self.service_retriever.update_task_status(task_uuid, is_completed, status)

View File

@ -0,0 +1,195 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ServiceTaskRetriever Usage Example
This script demonstrates how to use the ServiceTaskRetriever class
and the updated MainRedisHandler for direct Redis task retrieval and updates.
"""
import uuid
import logging
from json import dumps
from config import Status, RedisData
from redis_handlers import ServiceTaskRetriever, MainRedisHandler
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def direct_retriever_example():
"""Example using ServiceTaskRetriever directly"""
logger.info("=== ServiceTaskRetriever Direct Usage Example ===")
# Create a retriever instance
retriever = ServiceTaskRetriever()
# Generate a test UUID
test_uuid = str(uuid.uuid4())
logger.info(f"Generated test UUID: {test_uuid}")
# Example mail data
mail_data = {
"id": "test-mail-123",
"subject": "Test Mail Subject",
"body": "This is a test mail body",
"from": "test@example.com",
"to": "recipient@example.com",
"date": "2025-08-09T14:08:05+03:00"
}
try:
# Store a new task with UUID
logger.info("Storing new task with UUID...")
success = retriever.store_task_with_uuid(
test_uuid,
"MailReader",
mail_data
)
logger.info(f"Task stored successfully: {success}")
# Retrieve the task by UUID
logger.info("Retrieving task by UUID...")
task = retriever.get_task_by_uuid(test_uuid)
logger.info(f"Retrieved task: service={task.service}, status={task.status}")
# Get service-specific data
logger.info("Retrieving service data by UUID...")
mail_reader_data = retriever.get_service_data_by_uuid(test_uuid, "MailReader")
logger.info(f"Mail subject: {mail_reader_data.get('subject')}")
# Update task status
logger.info("Updating task status...")
retriever.update_task_status(
test_uuid,
is_completed=True,
status=Status.COMPLETED
)
# Verify status update
updated_task = retriever.get_task_by_uuid(test_uuid)
logger.info(f"Updated task status: {updated_task.status}, completed: {updated_task.is_completed}")
# Update service data
logger.info("Updating service data...")
new_mail_data = mail_data.copy()
new_mail_data["subject"] = "Updated Subject"
retriever.update_service_data(test_uuid, "MailReader", new_mail_data)
# Verify service data update
updated_mail_data = retriever.get_service_data_by_uuid(test_uuid, "MailReader")
logger.info(f"Updated mail subject: {updated_mail_data.get('subject')}")
except FileNotFoundError as e:
logger.error(f"Not found error: {e}")
except ValueError as e:
logger.error(f"Value error: {e}")
def main_handler_example():
"""Example using MainRedisHandler"""
logger.info("\n=== MainRedisHandler Usage Example ===")
# Create a handler instance
handler = MainRedisHandler()
# Generate a test UUID
test_uuid = str(uuid.uuid4())
logger.info(f"Generated test UUID: {test_uuid}")
# Example mail data
mail_data = {
"id": "test-mail-456",
"subject": "Test Mail via MainRedisHandler",
"body": "This is a test mail body via MainRedisHandler",
"from": "test@example.com",
"to": "recipient@example.com",
"date": "2025-08-09T14:08:05+03:00"
}
try:
# Store a new task with UUID
logger.info("Storing new task with UUID via MainRedisHandler...")
success = handler.store_task_with_uuid(
test_uuid,
"MailReader",
mail_data
)
logger.info(f"Task stored successfully: {success}")
# Retrieve the task by UUID
logger.info("Retrieving task by UUID...")
task = handler.get_task_by_uuid(test_uuid)
logger.info(f"Retrieved task: service={task.service}, status={task.status}")
# Get service-specific data
logger.info("Retrieving service data by UUID...")
mail_reader_data = handler.get_service_data_by_uuid(test_uuid, "MailReader")
logger.info(f"Mail subject: {mail_reader_data.get('subject')}")
# Update task status
logger.info("Updating task status...")
handler.update_task_status(
test_uuid,
is_completed=True,
status=Status.COMPLETED
)
# Verify status update
updated_task = handler.get_task_by_uuid(test_uuid)
logger.info(f"Updated task status: {updated_task.status}, completed: {updated_task.is_completed}")
# Update service data
logger.info("Updating service data...")
new_mail_data = mail_data.copy()
new_mail_data["subject"] = "Updated Subject via MainRedisHandler"
handler.update_service_data(test_uuid, "MailReader", new_mail_data)
# Verify service data update
updated_mail_data = handler.get_service_data_by_uuid(test_uuid, "MailReader")
logger.info(f"Updated mail subject: {updated_mail_data.get('subject')}")
except FileNotFoundError as e:
logger.error(f"Not found error: {e}")
except ValueError as e:
logger.error(f"Value error: {e}")
def error_handling_example():
"""Example demonstrating error handling"""
logger.info("\n=== Error Handling Example ===")
retriever = ServiceTaskRetriever()
# Try to retrieve non-existent task
try:
logger.info("Attempting to retrieve non-existent task...")
task = retriever.get_task_by_uuid("non-existent-uuid")
except FileNotFoundError as e:
logger.info(f"Expected error caught: {e}")
# Try to update non-existent task
try:
logger.info("Attempting to update non-existent task...")
retriever.update_task_status("non-existent-uuid", True, Status.COMPLETED)
except FileNotFoundError as e:
logger.info(f"Expected error caught: {e}")
# Try to update with invalid service name
test_uuid = str(uuid.uuid4())
mail_data = {"subject": "Test"}
retriever.store_task_with_uuid(test_uuid, "MailReader", mail_data)
try:
logger.info("Attempting to update with invalid service name...")
retriever.update_service_data(test_uuid, "NonExistentService", {"data": "test"})
except ValueError as e:
logger.info(f"Expected error caught: {e}")
if __name__ == "__main__":
direct_retriever_example()
main_handler_example()
error_handling_example()

27
ServicesRunner/Dockerfile Normal file
View File

@ -0,0 +1,27 @@
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y build-essential libpq-dev && rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY ServicesRunnner/requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy Prisma schema and generate client
COPY ServicesRunnner/schema.prisma .
COPY ServicesRunnner/Depends .
RUN prisma generate
# Copy the rest of the application
COPY ServicesRunnner/ .
# Set environment variables
ENV PYTHONPATH=/app
# Command to run when container starts
CMD ["python", "-m", "template.py"]

View File

@ -0,0 +1,4 @@
prisma==0.9.1
python-dotenv==1.0.0
asyncio==3.4.3
uvloop>=0.19

View File

@ -1,240 +0,0 @@
import os
from redis import Redis
from json import dumps, loads
from datetime import datetime
from typing import List, Dict, Any, Union
from email import policy
from email.message import EmailMessage
from email.headerregistry import UniqueDateHeader, UniqueAddressHeader, UniqueUnstructuredHeader
from email.parser import BytesParser
from imaplib import IMAP4_SSL
from config import EmailConfig, Config
email_config = EmailConfig()
config = Config()
redis_client = Redis(
host='10.10.2.15',
password='your_strong_password_here',
port=6379,
db=0
)
class Mails:
def __init__(self, mail_id: bytes, mail_data: bytes):
self.id: bytes = mail_id
self.raw_data: bytes = mail_data
self.attachments: List[Dict[str, Union[str, bytes]]] = []
self.message: EmailMessage = BytesParser(policy=policy.default).parsebytes(mail_data)
self.subject: UniqueUnstructuredHeader = self.message.get('Subject', '') or ''
self.from_: UniqueAddressHeader = self.message.get('From', '') or ''
self.to: UniqueAddressHeader = self.message.get('To', '') or ''
self.date: UniqueDateHeader = self.message.get('Date', '') or ''
self.body_text: str = self._get_body_text()
self._extract_attachments()
def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id.decode('utf-8'),
# 'raw_data': self.raw_data.decode('utf-8'),
'attachments': [{
'filename': attachment['filename'],
'content_type': attachment['content_type'],
'charset': attachment['charset'],
'data': attachment['data'].decode(attachment['charset'], errors='replace')
} for attachment in self.attachments],
# 'message': self.message.as_string(),
'subject': str(self.subject),
'from_': {
"display_name": self.from_.addresses[0].display_name,
"username": self.from_.addresses[0].username,
"domain": self.from_.addresses[0].domain,
"mail": f"{self.from_.addresses[0].username}@{self.from_.addresses[0].domain}"
},
'to': [
{
"display_name": address.display_name,
"username": address.username,
"domain": address.domain,
"mail": f"{address.username}@{address.domain}"
} for address in self.to.addresses
],
'date': str(self.date.datetime),
'body_text': str(self.body_text)
}
def _get_body_text(self) -> str:
body = self.message.get_body(preferencelist=('plain',))
if body is not None:
return body.get_content() or ''
if self.message.is_multipart():
for part in self.message.walk():
if part.get_content_type() == 'text/plain' and (part.get_content_disposition() or '') != 'attachment':
try:
return part.get_content() or ''
except Exception:
payload = part.get_payload(decode=True) or b''
return payload.decode(part.get_content_charset() or 'utf-8', errors='replace')
else:
if self.message.get_content_type() == 'text/plain':
try:
return self.message.get_content() or ''
except Exception:
payload = self.message.get_payload(decode=True) or b''
return payload.decode(self.message.get_content_charset() or 'utf-8', errors='replace')
return ''
def _extract_attachments(self) -> None:
for part in self.message.walk():
if part.get_content_disposition() == 'attachment':
filename = part.get_filename()
if not filename:
continue
data = part.get_payload(decode=True) or b''
charset = part.get_charset() or 'utf-8'
self.attachments.append(
{'filename': filename, 'content_type': part.get_content_type(), 'data': data, 'charset': charset}
)
def save_attachments(self, folder: str) -> None:
os.makedirs(folder, exist_ok=True)
for att in self.attachments:
with open(os.path.join(folder, att['filename']), 'wb') as f:
f.write(att['data'])
class EmailReaderIsbankService:
NO_ATTACHMENT_FOLDER = "NoAttachment"
COMPLETED_FOLDER = "Completed"
def __init__(self, email_config: EmailConfig, config: Config):
self.email_config = email_config
self.config = config
self.mail = IMAP4_SSL(email_config.EMAIL_HOST, email_config.EMAIL_PORT)
self.mail.login(email_config.EMAIL_USERNAME, email_config.EMAIL_PASSWORD)
self.data: List[Mails] = []
self.inc: int = 100
self.start: int = 0
self.end: int = self.inc
self._connect_inbox()
self.mail_count = self._fetch_count()
self._fetch_all()
def _connect_inbox(self):
"""INBOX'a bağlanır"""
status, _ = self.mail.select("INBOX")
if status != 'OK':
raise Exception("INBOX'a bağlanılamadı")
def _fetch_count(self):
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
if status != 'OK':
raise Exception("Mail sayısı alınamadı")
return len(uids[0].split())
def _fetch_all(self):
"""Tüm mailleri çeker ve self.data'ya Mails objesi olarak ekler"""
status, uids = self.mail.uid('SORT', '(REVERSE DATE)', 'UTF-8', 'ALL', 'FROM', f'"{self.config.MAILBOX}"')
if status != 'OK':
raise Exception("Mail arama başarısız")
for uid in uids[0].split():
status, msg_data = self.mail.uid('fetch', uid, '(RFC822)')
if status == 'OK' and msg_data[0] is not None:
self.data.append(Mails(uid, msg_data[0][1]))
def mark_no_attachment(self, uid: bytes):
"""Mesajı arşive taşır"""
self.mail.uid('COPY', uid, self.NO_ATTACHMENT_FOLDER)
self.delete(uid)
def mark_completed(self, uid: bytes):
"""Mesajı arşive taşır"""
self.mail.uid('COPY', uid, self.COMPLETED_FOLDER)
# self.delete(uid)
def delete(self, uid: bytes):
"""Mesajı siler"""
self.mail.uid('STORE', uid, '+FLAGS', r'(\Deleted)')
def commit(self):
"""Bekleyen silme/taşıma işlemlerini uygular"""
self.mail.expunge()
def logout(self):
self.mail.logout()
@property
def count(self):
return len(self.data)
service = EmailReaderIsbankService(email_config, config)
mails = service.data
count = 0
redis_prefix = "Bank:Services:Task"
my_service = "mail"
for mail in mails:
if not getattr(mail, 'id', None):
continue
mail_id = mail.id.decode('utf-8')
if mail.attachments:
not_seen = redis_client.sadd(f'{redis_prefix}:Seen', mail_id)
index_of_set_data_get = redis_client.get(f'{redis_prefix}:Index')
if not_seen:
mail_to_dict = mail.to_dict()
mail_without_attachments = mail_to_dict.copy()
mail_without_attachments.pop('attachments', None)
write_object = {
'id': mail_id,
'data': {
"mail": mail_without_attachments,
"parser": mail_to_dict.get('attachments', []),
"ibanFinder": None,
"commentFinder": None
},
'created_at': datetime.now().isoformat(),
'completed': True,
'status': 'red',
'service': my_service,
# 'bank': 'isbank',
'is_completed': False
}
redis_write_ = redis_client.rpush(f'{redis_prefix}:Data', dumps(write_object))
if redis_write_:
if index_of_set_data_get:
index_of_set_data_get = loads(index_of_set_data_get)
index_of_set_data_get[str(mail_id)] = redis_write_ - 1
else:
index_of_set_data_get = {str(mail_id): redis_write_ - 1}
index_of_set_data_set = redis_client.set(f'{redis_prefix}:Index', dumps(index_of_set_data_get))
count += 1
else:
redis_client.spop(f'{redis_prefix}:Seen', mail_id)
else:
get_index = redis_client.get(f'{redis_prefix}:Index')
if not get_index:
continue
get_index = loads(get_index)
if get_index.get(str(mail_id), None):
object_from_redis = redis_client.lindex(f'{redis_prefix}:Data', int(get_index[str(mail_id)]))
if object_from_redis:
object_from_redis = loads(object_from_redis)
is_completed = object_from_redis.get('is_completed', False)
id_ = object_from_redis.get('data', {}).get('id', None)
if not mail_id == id_:
raise Exception("Mail id not match with id from redis")
if is_completed:
service.mark_completed(mail_id)
else:
service.mark_no_attachment(mail_id)
service.commit()
service.logout()
print("Total Mails: ", f"{count}/{service.count}")

View File

@ -1,37 +0,0 @@
import os
class Config:
MAILBOX: str = os.getenv("MAILBOX", "bilgilendirme@ileti.isbank.com.tr")
EMAIL_HOST: str = os.getenv("EMAIL_HOST", "10.10.2.34")
EMAIL_LOGIN_USER: str = os.getenv("EMAIL_READER_ADDRESS", "isbank@mehmetkaratay.com.tr")
EMAIL_LOGIN_PASSWORD: str = os.getenv("EMAIL_LOGIN_PASSWORD", "system")
AUTHORIZE_IBAN: str = os.getenv("AUTHORIZE_IBAN", "4245-0093333")
EMAIL_PORT: int = int(os.getenv("EMAIL_PORT", 993))
class EmailConfig:
EMAIL_HOST: str = Config.EMAIL_HOST
EMAIL_USERNAME: str = Config.EMAIL_LOGIN_USER
EMAIL_PASSWORD: str = Config.EMAIL_LOGIN_PASSWORD
EMAIL_PORT: int = Config.EMAIL_PORT
@classmethod
def as_dict(cls):
return dict(
host=EmailConfig.EMAIL_HOST,
port=EmailConfig.EMAIL_PORT,
username=EmailConfig.EMAIL_USERNAME,
password=EmailConfig.EMAIL_PASSWORD
)
# INFO_MAIL: str = os.getenv("INFO_MAIL", "mehmet.karatay@hotmail.com")
# EMAIL_SEND: bool = bool(os.getenv("EMAIL_SEND", False))
# EMAIL_SEND_PORT: int = int(os.getenv("EMAIL_SEND_PORT", 587))
# EMAIL_SLEEP: int = int(os.getenv("EMAIL_SLEEP", 60))
# SERVICE_TIMING: int = int(os.getenv("SERVICE_TIMING", 900))
# EMAIL_LOGIN_USER: str = os.getenv("EMAIL_LOGIN_USER", "karatay@mehmetkaratay.com.tr")
# MAIN_MAIL: str = os.getenv("MAIN_MAIL", "karatay.berkay@gmail.com")
# EMAIL_SEND: bool = Config.EMAIL_SEND

122
docker-compose.bank.yml Normal file
View File

@ -0,0 +1,122 @@
services:
# prisma_service_test:
# container_name: prisma_service_test
# build:
# context: .
# dockerfile: ServicesRunner/AccountRecordServices/Test/Dockerfile
# networks:
# - bank-services-network
# volumes:
# - ./ServicesRunner/AccountRecordServices/Finder/Iban/venv:/opt/venv
# - ./ServicesRunner/AccountRecordServices/Finder/Iban/.prisma-cache:/root/.cache/prisma-python
# logging:
# driver: "json-file"
# options:
# max-size: "10m"
# max-file: "3"
# healthcheck:
# test: [ "CMD", "/opt/venv/bin/python", "-c", "import asyncio; from ServicesRunner.Depends.prisma_client import get_prisma_client; asyncio.run(get_prisma_client())" ]
# interval: 15s
# timeout: 10s
# retries: 3
prisma_service_iban:
container_name: prisma_service_iban
build:
context: .
dockerfile: ServicesRunner/AccountRecordServices/Finder/Iban/Dockerfile
networks:
- bank-services-network
volumes:
- ./ServicesRunner/AccountRecordServices/Finder/Iban/venv:/opt/venv
- ./ServicesRunner/AccountRecordServices/Finder/Iban/.prisma-cache:/root/.cache/prisma-python
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
healthcheck:
test: [ "CMD", "/opt/venv/bin/python", "-c", "import asyncio; from ServicesRunner.Depends.prisma_client import get_prisma_client; asyncio.run(get_prisma_client())" ]
interval: 15s
timeout: 10s
retries: 3
# prisma_studio:
# image: node:18
# working_dir: /app
# # volumes:
# # - ./ServicesRunner/Depends:/app
# ports:
# - "5555:5555"
# entrypoint: [ "/bin/sh", "-c" ]
# command: |
# "npx prisma studio --schema=/app/schema.prisma"
# depends_on:
# - prisma_service_test
# networks:
# - bank-services-network
# logging:
# driver: "json-file"
# options:
# max-size: "10m"
# max-file: "3"
# finder_payments:
# container_name: finder_payments
# env_file:
# - api_env.env
# build:
# context: .
# dockerfile: ServicesBank/Finder/Payment/Dockerfile
# networks:
# - bank-services-network
# logging:
# driver: "json-file"
# options:
# max-size: "10m"
# max-file: "3"
isbank_email_reader:
container_name: isbank_email_reader
environment:
- MAILBOX=bilgilendirme@ileti.isbank.com.tr
- MAIN_MAIL=karatay.berkay@gmail.com
- INFO_MAIL=mehmet.karatay@hotmail.com
- EMAIL_HOST=10.10.2.34
- EMAIL_READER_ADDRESS=isbank@mehmetkaratay.com.tr
- EMAIL_LOGIN_PASSWORD=system
- AUTHORIZE_IBAN=4245-0093333
- SERVICE_TIMING=900
- EMAIL_PORT=993
- EMAIL_SLEEP=60
build:
context: .
dockerfile: ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/Dockerfile
networks:
- bank-services-network
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
isbank_mail_parser:
container_name: isbank_mail_parser
build:
context: .
dockerfile: ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank/Dockerfile
networks:
- bank-services-network
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
networks:
bank-services-network:
driver: bridge
# volumes:
# prisma_generated:
# name: prisma_generated_volume