Service Runner Finder and complete task chain completed

This commit is contained in:
Berkay 2025-08-11 19:26:49 +03:00
parent 405ba2e95d
commit ca98adc338
18 changed files with 1205 additions and 257 deletions

1
.gitignore vendored
View File

@ -57,6 +57,7 @@ report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
env
.env
ServicesRunner/AccountRecordServices/Finder/Iban/.prisma-cache
ServicesRunner/AccountRecordServices/Finder/Comment/.prisma-cache
venv/
.vscode/
__pycache__/

0
=1.3.8 Normal file
View File

View File

@ -0,0 +1,14 @@
__pycache__/
*.pyc
*.pyo
*.pyd
*.db
*.sqlite3
*.log
*.env
venv/
.env.*
node_modules/
.prisma/
.prisma-cache/
ServicesRunnner/AccountRecordServices/Test/venv/

View File

@ -0,0 +1,22 @@
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV VIRTUAL_ENV=/opt/venv
ENV PRISMA_SCHEMA_PATH=/app/Depends/schema.prisma
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
ENV PYTHONPATH=/app
RUN apt-get update && apt-get install -y --no-install-recommends gcc curl && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY ServicesRunner/Depends/ /app/Depends/
COPY ServicesRunner/AccountRecordServices/Finder/Comment /app/
COPY ServicesRunner/requirements.txt /app/requirements.txt
COPY ServicesRunner/AccountRecordServices/Finder/Comment/entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
CMD ["/entrypoint.sh"]

View File

@ -0,0 +1,174 @@
import time
import arrow
import pprint
from json import dumps
from decimal import Decimal
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
from Depends.prisma_client import PrismaService
from Depends.service_handler import ProcessCommentFinderService
from Depends.config import ConfigServices, MailSendModel, RedisMailSender, Status, RedisTaskObject, FinderComment
class BankReceive(BaseModel):
import_file_name: str
iban: str
bank_date: datetime
channel_branch: str
currency: Optional[str] = "TL"
currency_value: Decimal
bank_balance: Decimal
additional_balance: Decimal
process_name: str
process_type: str
process_comment: str
bank_reference_code: str
bank_date_w: int
bank_date_m: int
bank_date_d: int
bank_date_y: int
def check_task_belong_to_this_service(task: RedisTaskObject):
if not task.service == ConfigServices.SERVICE_PREFIX_FINDER_IBAN:
return False
if not task.completed:
return False
if task.is_completed:
return False
if not task.data:
return False
return True
def write_account_records_row_from_finder_comment(finder_comments: list[FinderComment], prisma_service: PrismaService, saved_list_of_account_records: dict):
finder_comments = list(finder_comments)
for finder_comment in finder_comments:
bank_date = arrow.get(finder_comment.bank_date).replace(tzinfo='GMT+3').datetime
bank_receive_record = BankReceive(
import_file_name=finder_comment.filename, iban=finder_comment.iban, bank_date=bank_date, channel_branch=finder_comment.channel_branch, currency="TL", currency_value=finder_comment.currency_value,
bank_balance=finder_comment.balance, additional_balance=finder_comment.additional_balance, process_name=finder_comment.process_name, process_type=finder_comment.process_type,
process_comment=finder_comment.process_comment, bank_reference_code=finder_comment.bank_reference_code, build_id=finder_comment.build_id, build_uu_id=finder_comment.build_uu_id,
decision_book_id=finder_comment.decision_book_id, decision_book_uu_id=finder_comment.decision_book_uu_id, bank_date_w=bank_date.weekday(), bank_date_m=bank_date.month,
bank_date_d=bank_date.day, bank_date_y=bank_date.year
)
account_record_found = prisma_service.find_first(table="account_records", query={"iban": bank_receive_record.iban, "bank_reference_code": bank_receive_record.bank_reference_code,
"bank_date": bank_receive_record.bank_date, "bank_balance": bank_receive_record.bank_balance, "currency_value": bank_receive_record.currency_value},
select={"id": True, "iban": True, "bank_reference_code": True, "bank_date": True, "bank_balance": True}
)
if not account_record_found:
created_account_record = prisma_service.create(table="account_records", data=bank_receive_record.dict(), select={"id": True, "iban": True, "bank_reference_code": True, "bank_date": True, "bank_balance": True} )
if created_account_record['build_id'] in saved_list_of_account_records.keys():
saved_list_of_account_records[created_account_record['build_id']] = [*saved_list_of_account_records[created_account_record['build_id']], created_account_record]
else:
saved_list_of_account_records[created_account_record['build_id']] = [created_account_record]
return saved_list_of_account_records
def enclose_task_and_send_mail_to_build_manager(prisma_service: PrismaService, saved_list_of_account_records: dict, process_comment_finder_service: ProcessCommentFinderService, task: RedisTaskObject):
"""
Enclose task and send mail to build manager
"""
if not saved_list_of_account_records:
return
today = arrow.now().to('GMT+3').datetime
for build_id, saved_list_of_account_record in saved_list_of_account_records.items():
build_manager_occupant_type = prisma_service.find_first(table="occupant_types", query={"occupant_code":"BU-MNG", "is_confirmed": True, "active": True})
living_space = prisma_service.find_first(
table="build_living_space", query={"build_id": build_id, "occupant_type_id": build_manager_occupant_type['id'], "expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}})
build = prisma_service.find_first(table="builds", query={"id": build_id})
person = prisma_service.find_first(table="people", query={"id": living_space['person_id']})
user = prisma_service.find_first(table="users", query={"person_id": person['id']})
send_object = MailSendModel(
receivers=[user.email], data=saved_list_of_account_record, template_name=ConfigServices.TEMPLATE_ACCOUNT_RECORDS,
subject=f"{build['name']} Cari Durum Bilgilendirme Raporu - {today.strftime('%d/%m/%Y %H:%M')}",
)
set_mail_object = RedisMailSender(task=task, data=send_object, service=ConfigServices.SERVICE_PREFIX_MAIL_SENDER, status=Status.PENDING, completed=False, created_at=today.strftime('%Y-%m-%d %H:%M:%S'))
process_comment_finder_service.service_retriever.redis_client.set(ConfigServices.SERVICE_PREFIX_MAIL_SENDER, dumps(set_mail_object.dict()))
return
if __name__ == "__main__":
prisma_service = PrismaService()
process_comment_finder_service = ProcessCommentFinderService()
print("Process Comment service started")
try:
print("Process Comment service started sleeping for 5 seconds")
while True:
time.sleep(5)
saved_list_of_account_records = dict()
tasks = process_comment_finder_service.fetch_all_tasks()
for task in tasks:
if not check_task_belong_to_this_service(task):
continue
write_account_records_row_from_finder_comment(finder_comments=task.data.FinderComment, prisma_service=prisma_service, saved_list_of_account_records=saved_list_of_account_records)
process_comment_finder_service.update_task_status(task_uuid=task.task, is_completed=True, status=Status.COMPLETED)
process_comment_finder_service.delete_task(task_uuid=task.task)
except Exception as e:
raise
finally:
prisma_service.disconnect()
def fix_account_records_bank_date(prisma_service: PrismaService, bank_receive_record: BankReceive):
account_record_from_other_fields = prisma_service.find_first(
table="account_records",
query={
"iban": bank_receive_record.iban,
"bank_reference_code": bank_receive_record.bank_reference_code,
"bank_balance": bank_receive_record.bank_balance,
"currency_value": bank_receive_record.currency_value,
# "process_comment": {"contains": str(bank_receive_record.process_comment), "mode": "insensitive"},
},
select={
"id": True, "iban": True, "bank_reference_code": True, "bank_date": True,
"bank_balance": True, "currency_value": True, "process_comment": True
}
)
if account_record_from_other_fields:
prisma_service.update(
table="account_records", where={"id": account_record_from_other_fields['id']}, data={"bank_date": bank_receive_record.bank_date},
)
if not account_record_from_other_fields:
pprint.pprint({"not_found_bank_receive_record": bank_receive_record})
# prisma_service.update(
# table="account_records", where={"id": account_record_from_other_fields['id']}, data={"bank_date": bank_receive_record.bank_date},
# )
# from_database = arrow.get(account_record_from_other_fields['bank_date']).to('GMT+3').datetime
# print('old date', from_database, " - new date ", bank_receive_record.bank_date)
def commented_out_code():
account_record_found = None
old_bank_date=arrow.get(finder_comment.bank_date).datetime
if not account_record_found:
account_record_found_with_old_date = prisma_service.find_first(
table="account_records",
query={
"iban": bank_receive_record.iban, "bank_reference_code": bank_receive_record.bank_reference_code,
"bank_date": old_bank_date, "bank_balance": bank_receive_record.bank_balance,
},
)
if account_record_found_with_old_date:
prisma_service.update(
table="account_records", where={"id": account_record_found_with_old_date.id}, data={"bank_date": bank_receive_record.bank_date},
)
if account_record_found:
print('-' * 150)
pprint.pprint(
{
"account_record_found": dict(account_record_found),
"bank_receive_record": bank_receive_record.dict(),
"bank_receive_record.bank_date": bank_receive_record.bank_date,
"account_record_found.bank_date": account_record_found["bank_date"],
}
)
print('-' * 150)
return

View File

@ -0,0 +1,19 @@
#!/bin/sh
VENV_PATH="/opt/venv"
REQUIREMENTS_PATH="/app/requirements.txt"
SCHEMA_PATH="/app/Depends/schema.prisma"
PRISMA_BINARY_PATH="/root/.cache/prisma-python/binaries"
if [ ! -x "$VENV_PATH/bin/python" ]; then
python -m venv "$VENV_PATH"
"$VENV_PATH/bin/pip" install pip --upgrade
"$VENV_PATH/bin/pip" install --no-cache-dir -r "$REQUIREMENTS_PATH"
"$VENV_PATH/bin/prisma" generate --schema "$SCHEMA_PATH"
fi
if ! find "$PRISMA_BINARY_PATH" -type f -name "prisma-query-engine-debian-openssl-3.0.x" | grep -q .; then
"$VENV_PATH/bin/prisma" py fetch
fi
exec "$VENV_PATH/bin/python" -u app.py

View File

@ -1,74 +1,108 @@
import uvloop
import asyncio
import sys
import signal
import time
import arrow
from pydantic import BaseModel
from datetime import datetime
from Depends.prisma_client import prisma_client, disconnect_prisma
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
from Depends.prisma_client import PrismaService
from Depends.service_handler import IbanFinderService
from Depends.config import ConfigServices, Status, FinderIban, RedisTaskObject
# sys.stdout.reconfigure(line_buffering=True) # alternatif: python -u veya PYTHONUNBUFFERED=1
class IbanRecord(BaseModel):
id: int
uu_id: str
iban: str
build_id: int
build_uu_id: str
expiry_starts: datetime
expiry_ends: datetime
async def tick():
start = time.time()
print(f"[{datetime.now()}] Attempting database query...")
async with prisma_client() as db:
rows = await db.account_records.find_many(
take=5, skip=0, order=[{"bank_date": "desc"}]
)
print(f"[{datetime.now()}] Query completed in {time.time()-start:.2f}s")
for i, r in enumerate(rows):
# Dilersen burada formatı değiştir
print(f" Row: {i} | id={r.id} bank_date={r.bank_date} currency_value={r.currency_value}")
print("-" * 80)
class DecisionBookRecord(BaseModel):
id: int
uu_id: str
build_id: int
build_uu_id: str
expiry_starts: datetime
expiry_ends: datetime
async def service():
print(f"[{datetime.now()}] IBAN Finder service starting")
try:
iteration = 0
while True:
iteration += 1
print(f"\n[{datetime.now()}] Loop iteration {iteration}")
try:
await tick()
except Exception as e:
print(f"[{datetime.now()}] Error in service tick: {e}")
await asyncio.sleep(1) # bloklamayan bekleme
finally:
# Her durumda DB'yi temiz kapat
await disconnect_prisma()
print(f"[{datetime.now()}] Cleaning up database connection...")
def check_task_belong_to_this_service(task: RedisTaskObject):
if not task.service == ConfigServices.SERVICE_PREFIX_MAIL_PARSER:
return False
if not task.completed:
return False
if not task.data:
return False
return True
async def _graceful_shutdown(sig: signal.Signals):
print(f"\n[{datetime.now()}] Shutting down due to signal: {sig.name}")
# Burada istersen tüm pending task'leri iptal edebilirsin:
# for t in asyncio.all_tasks():
# if t is not asyncio.current_task():
# t.cancel()
await disconnect_prisma()
def extract_build_iban_from_task(task: RedisTaskObject, finder_iban: FinderIban, write_object: dict) -> tuple[bool, dict]:
bank_date = arrow.get(finder_iban.bank_date).datetime
iban_record_db = prisma_service.find_first(
table="build_ibans",
query={
"active": True, "deleted": False, "is_confirmed": True, "iban": finder_iban.iban,
"expiry_starts": {"lte": bank_date}, "expiry_ends": {"gte": bank_date},
},
select={"id": None, "uu_id": None, "iban": None, "build_id": None, "build_uu_id": None, "expiry_starts": None, "expiry_ends": None}
)
if iban_record_db:
iban_record = IbanRecord(**iban_record_db)
write_object["build_id"] = iban_record.build_id
write_object["build_uu_id"] = iban_record.build_uu_id
return True, write_object
return False, write_object
def _install_signal_handlers(loop: asyncio.AbstractEventLoop):
# Linux/Unix: SIGINT (Ctrl+C) ve SIGTERM (docker stop) için kibar kapanış
for s in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(s, lambda s=s: asyncio.create_task(_graceful_shutdown(s)))
def extract_decision_book_from_task(write_object: dict) -> tuple[bool, dict]:
bank_date = arrow.get(write_object["bank_date"]).datetime
decision_book_record_db = prisma_service.find_first(
table="build_decision_book",
query={
"active": True, "deleted": False, "is_confirmed": True, "build_id": write_object["build_id"],
"expiry_starts": {"lte": bank_date}, "expiry_ends": {"gte": bank_date},
},
select={"id": None, "uu_id": None, "build_id": None, "build_uu_id": None, "expiry_starts": None, "expiry_ends": None}
)
if decision_book_record_db:
decision_book_record = DecisionBookRecord(**decision_book_record_db)
write_object["build_decision_book_id"] = decision_book_record.id
write_object["build_decision_book_uu_id"] = decision_book_record.uu_id
return True, write_object
return False, write_object
async def main():
loop = asyncio.get_running_loop()
try:
_install_signal_handlers(loop)
except NotImplementedError:
# (Gerekirse Windows vs., ama sen Linux/Docker kullanıyorsun)
pass
await service()
if __name__ == "__main__":
# uvloop policy zaten yukarıda set edildi; burada normal asyncio.run kullanıyoruz
asyncio.run(main())
prisma_service = PrismaService()
iban_finder_service = IbanFinderService()
print("Find Build Iban service started")
try:
print("Find Build Iban service started sleeping for 5 seconds")
while True:
time.sleep(5)
tasks = iban_finder_service.fetch_all_tasks()
for task in tasks:
if not check_task_belong_to_this_service(task):
continue
if list(task.data.FinderIban):
finder_iban_list = []
for finder_iban in list(task.data.FinderIban):
write_object = finder_iban.dict()
is_build_found, is_decision_book_found = False, False
is_build_found, write_object = extract_build_iban_from_task(task, finder_iban, write_object)
if is_build_found:
is_decision_book_found, write_object = extract_decision_book_from_task(write_object)
if is_build_found or is_decision_book_found:
finder_iban_list.append(write_object)
if finder_iban_list:
iban_finder_service.update_service_data(task.task, ConfigServices.SERVICE_PREFIX_FINDER_COMMENT, finder_iban_list)
iban_finder_service.change_service(task.task, ConfigServices.SERVICE_PREFIX_FINDER_IBAN, Status.COMPLETED, True)
continue
iban_finder_service.change_service(task.task, ConfigServices.SERVICE_PREFIX_FINDER_IBAN, Status.FAILED, True)
except Exception as e:
raise
finally:
prisma_service.disconnect()

View File

@ -7,6 +7,7 @@ PRISMA_BINARY_PATH="/root/.cache/prisma-python/binaries"
if [ ! -x "$VENV_PATH/bin/python" ]; then
python -m venv "$VENV_PATH"
"$VENV_PATH/bin/pip" install pip --upgrade
"$VENV_PATH/bin/pip" install --no-cache-dir -r "$REQUIREMENTS_PATH"
"$VENV_PATH/bin/prisma" generate --schema "$SCHEMA_PATH"
fi

View File

@ -152,12 +152,11 @@ if __name__ == "__main__":
# Process each task
for active_task in all_tasks:
if active_task.service == ConfigServices.SERVICE_PREFIX_MAIL_PARSER and active_task.completed:
# logger.info(f"Task {active_task.task} is already processed.")
if active_task.service == ConfigServices.SERVICE_PREFIX_MAIL_READER and active_task.completed:
logger.info(f"Processing task {active_task.task}")
parser.process_task(active_task)
else:
continue
logger.info(f"Processing task {active_task.task}")
parser.process_task(active_task)
else:
logger.info("No tasks found to process")

View File

@ -68,102 +68,69 @@ def initialize_service():
if __name__ == "__main__":
logger.info("Starting IsBank Email Service")
print(f"Starting Service Mail Reader.")
# Initialize service
runner = initialize_service()
# Configurable parameters
normal_sleep_time = 10 # seconds between normal operations
error_sleep_time = 30 # seconds to wait after an error before retrying
max_consecutive_errors = 5 # maximum number of consecutive errors before longer pause
extended_error_sleep = 120 # seconds to wait after hitting max consecutive errors
normal_sleep_time = 10
error_sleep_time = 30
max_consecutive_errors = 5
extended_error_sleep = 120
consecutive_errors = 0
# Main service loop
while True:
try:
# Main processing
print("Fetching and setting mails...")
runner.fetch_and_set_mails()
# Reset error counter on success
if consecutive_errors > 0:
logger.info(f"Service recovered after {consecutive_errors} consecutive errors")
consecutive_errors = 0
# Normal operation sleep
sleep(normal_sleep_time)
except MailReaderService.REDIS_EXCEPTIONS as e:
# Redis-specific errors
consecutive_errors += 1
logger.error(f"Redis error (attempt {consecutive_errors}): {str(e)}")
# Use centralized reconnection handler from RedisHandler
redis_handler, need_extended_sleep = MailReaderService.handle_reconnection(
consecutive_errors=consecutive_errors, max_consecutive_errors=max_consecutive_errors
)
if redis_handler:
# Update runner's redis handler with the new instance
runner.redis_handler = redis_handler
runner.redis_connected = False # Will trigger reconnection on next cycle
# Sleep based on error count
runner.redis_connected = False
if need_extended_sleep:
sleep(extended_error_sleep)
else:
sleep(error_sleep_time)
except socket.error as e:
# Email connection errors
consecutive_errors += 1
logger.error(f"Email connection error (attempt {consecutive_errors}): {str(e)}")
# Try to re-establish email connection
try:
logger.info("Attempting to re-establish email connection...")
# Create new email service directly
email_service = EmailReaderService(IsBankConfig())
email_service.login_and_connect()
# Create new runner with existing Redis handler and new email service
redis_handler = runner.redis_handler # Preserve existing Redis handler
redis_handler = runner.redis_handler
runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service)
logger.info("Successfully re-established email connection")
except Exception as email_retry_error:
logger.error(f"Failed to re-establish email connection: {str(email_retry_error)}")
# Determine sleep time based on consecutive errors
if consecutive_errors >= max_consecutive_errors:
logger.warning(f"Hit {max_consecutive_errors} consecutive email errors, taking longer pause")
sleep(extended_error_sleep)
else:
sleep(error_sleep_time)
except Exception as e:
# Any other unexpected errors
consecutive_errors += 1
logger.error(f"Unexpected error (attempt {consecutive_errors}): {str(e)}")
# For any other error, try to reinitialize everything after some delay
if consecutive_errors >= max_consecutive_errors:
logger.warning(f"Hit {max_consecutive_errors} consecutive errors, reinitializing service")
try:
# Try to clean up existing connections
try:
runner.drop()
except Exception as cleanup_error:
logger.warning(f"Error during cleanup: {str(cleanup_error)}")
# Reinitialize the service directly
redis_handler = MailReaderService()
email_service = EmailReaderService(IsBankConfig())
email_service.login_and_connect()
runner = EmailServiceRunner(redis_handler=redis_handler, email_service=email_service)
if runner:
logger.info("Successfully reinitialized email service runner")
consecutive_errors = 0 # Reset counter after reinitialization
consecutive_errors = 0
else:
logger.error("Failed to reinitialize email service runner")
except Exception as reinit_error:
@ -171,6 +138,5 @@ if __name__ == "__main__":
sleep(extended_error_sleep)
else:
# For fewer consecutive errors, just retry the current runner
print(f"Error: {str(e)}")
sleep(error_sleep_time)

View File

@ -1,5 +1,5 @@
import os
from re import TEMPLATE
from pydantic import BaseModel
from typing import Any, List, Optional, Union
@ -29,20 +29,34 @@ class MailParser(BaseModel):
charset: str
data: str
class FinderIban(BaseModel):
...
filename: str
iban: str
bank_date: str
channel_branch: str
currency_value: float
balance: float
additional_balance: float
process_name: str
process_type: str
process_comment: str
bank_reference_code: str
class FinderComment(BaseModel):
...
class FinderComment(FinderIban):
build_id: Optional[int] = None
build_uu_id: Optional[str] = None
decision_book_id: Optional[int] = None
decision_book_uu_id: Optional[str] = None
class RedisData(BaseModel):
MailReader: MailReader
MailParser: List[MailParser]
FinderIban: FinderIban | Any
FinderComment: FinderComment | Any
FinderIban: List[FinderIban]
FinderComment: List[FinderComment]
class Status:
@ -61,6 +75,24 @@ class RedisTaskObject(BaseModel):
created_at: str
is_completed: bool
class MailSendModel(BaseModel):
receivers: List[str]
subject: str
template_name: str
data: dict
class RedisMailSender(BaseModel):
task: RedisTaskObject
data: MailSendModel
completed: bool
service: str
status: str
created_at: str
completed: bool
class EmailConfig:
HOST: str = os.getenv("EMAIL_HOST", "10.10.2.34")
@ -105,12 +137,16 @@ class ConfigServices:
TASK_MAILID_INDEX_PREFIX: str = "BANK:SERVICES:TASK:MAILID"
TASK_UUID_INDEX_PREFIX: str = "BANK:SERVICES:TASK:UUID"
TASK_SEEN_PREFIX: str = "BANK:SERVICES:TASK:SEEN"
TASK_DELETED_PREFIX: str = "BANK:SERVICES:TASK:DELETED"
SERVICE_PREFIX_MAIL_READER: str = "MailReader"
SERVICE_PREFIX_MAIL_PARSER: str = "MailParser"
SERVICE_PREFIX_FINDER_IBAN: str = "FinderIban"
SERVICE_PREFIX_FINDER_COMMENT: str = "FinderComment"
SERVICE_PREFIX_MAIL_SENDER: str = "MailSender"
TEMPLATE_ACCOUNT_RECORDS: str = "template_accounts.html"
paramsRedisData = Union[MailReader, MailParser, FinderIban, FinderComment]

View File

@ -15,7 +15,7 @@ from email.parser import BytesParser
from imaplib import IMAP4_SSL, IMAP4
from Depends.redis_handlers import RedisHandler
from Depends.config import EmailConfig, MailReaderMainConfig, MailReader, MailParser, RedisData
from Depends.config import ConfigServices, EmailConfig, MailReaderMainConfig, MailReader, MailParser, RedisData, Status
from Depends.service_handler import MailReaderService
# Configure logging
@ -328,41 +328,70 @@ class EmailReaderService:
raise
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def mark_no_attachment(self, uid):
def move_to_folder(self, uid: Union[str, bytes], folder: str):
"""
Move message to folder with retry mechanism
Args:
uid: Email UID
folder: Destination folder
"""
try:
log_uid = uid
if isinstance(uid, bytes):
log_uid = uid.decode('utf-8', errors='replace')
elif isinstance(uid, str):
uid = uid.encode('utf-8')
logger.info(f"Moving email {log_uid} to {folder} folder")
self.mail.uid('MOVE', uid, folder)
self.commit()
return True
except Exception as e:
logger.error(f"Failed to move email to folder: {str(e)}")
return False
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def copy_to_folder(self, uid: Union[str, bytes], folder: str):
"""
Copy message to folder with retry mechanism
Args:
uid: Email UID
folder: Destination folder
"""
try:
log_uid = uid
if isinstance(uid, bytes):
log_uid = uid.decode('utf-8', errors='replace')
elif isinstance(uid, str):
uid = uid.encode('utf-8')
logger.info(f"Copying email {log_uid} to {folder} folder")
self.mail.uid('COPY', uid, folder)
self.commit()
return True
except Exception as e:
logger.error(f"Failed to copy email to folder: {str(e)}")
return False
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def mark_no_attachment(self, uid: Union[str, bytes]):
"""
Move message to no attachment folder with retry mechanism
Args:
uid: Email UID
"""
try:
# Handle both string and bytes types for logging
log_uid = uid
if isinstance(uid, bytes):
log_uid = uid.decode('utf-8', errors='replace')
logger.info(f"Moving email {log_uid} to {self.config.NO_ATTACHMENT_FOLDER} folder")
self.mail.uid('COPY', uid, self.config.NO_ATTACHMENT_FOLDER)
self.delete(uid)
except Exception as e:
logger.error(f"Failed to mark email as no attachment: {str(e)}")
raise
self.move_to_folder(uid, self.config.NO_ATTACHMENT_FOLDER)
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def mark_completed(self, uid: bytes):
def mark_completed(self, uid: Union[str, bytes]):
"""
Move message to completed folder with retry mechanism
Args:
uid: Email UID
"""
try:
logger.info(f"Moving email {uid.decode('utf-8', errors='replace')} to {self.config.COMPLETED_FOLDER} folder")
self.mail.uid('COPY', uid, self.config.COMPLETED_FOLDER)
# self.delete(uid)
except Exception as e:
logger.error(f"Failed to mark email as completed: {str(e)}")
raise
self.move_to_folder(uid, self.config.COMPLETED_FOLDER)
@retry_on_connection_error(max_retries=2, delay=1, exceptions=(socket.error, IMAP4.error))
def delete(self, uid):
@ -499,7 +528,6 @@ class EmailServiceRunner:
if not getattr(mail, 'id', None):
logger.warning("Skipping email with no ID")
continue
mail_id = mail.id.decode('utf-8')
# check mail has .pdf extension
@ -545,26 +573,18 @@ class EmailServiceRunner:
"""
try:
mail_to_dict = mail.to_dict()
result = self.redis_handler.process_mail(
task_uuid, self.counter = self.redis_handler.process_mail(
mail_id=mail_id, mail_data=mail_to_dict, service_prefix=self.email_service.config.SERVICE_PREFIX, counter=self.counter
)
if result['status'] == 'success':
if result['action'] == 'stored_new_mail':
self.counter = result.get('counter', self.counter)
logger.info(f"Successfully processed new email {mail_id}")
elif result['action'] == 'checked_existing_mail':
if result.get('is_completed', False):
logger.info(f"Marking completed email {mail_id}")
self.email_service.mark_completed(mail_id)
elif result['status'] == 'error':
if result['action'] == 'id_mismatch':
logger.error(f"Mail ID mismatch: {mail_id} != {result.get('stored_id')}")
raise ValueError("Mail id does not match with id from Redis")
else:
logger.error(f"Email Service Runner Error processing mail {mail_id}: {result.get('error', 'Unknown error')}")
raise Exception(result.get('error', 'Unknown error during mail processing'))
if task_uuid:
self.redis_handler.change_service(
task_uuid=task_uuid, service_name=ConfigServices.SERVICE_PREFIX_MAIL_READER, status=Status.COMPLETED, completed=True
)
else:
logger.warning(f"Unexpected result status: {result['status']} for mail {mail_id}")
if self.redis_handler.check_mail_is_ready_to_delete(mail_id):
self.email_service.mark_completed(mail_id)
self.redis_handler.pop_mail(mail_id)
except MailReaderService.REDIS_EXCEPTIONS as e:
logger.error(f"Redis error while processing mail {mail_id}: {str(e)}")
self.redis_connected = False

View File

@ -1,49 +1,289 @@
from prisma import Prisma
import asyncio
import time
import logging
import uvloop
import threading
from datetime import datetime
from typing import Optional, AsyncGenerator, Protocol, Any
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Optional
from prisma import Prisma
# Singleton pattern for Prisma client
_prisma_client: Optional[Prisma] = None
async def get_prisma_client() -> Prisma:
"""
Get or initialize the Prisma client singleton.
Returns:
Prisma: The initialized Prisma client instance
"""
global _prisma_client
if _prisma_client is None:
_prisma_client = Prisma()
await _prisma_client.connect()
return _prisma_client
logger = logging.getLogger("prisma-service")
@asynccontextmanager
async def prisma_client() -> AsyncGenerator[Prisma, None]:
"""
Context manager for Prisma client operations.
Yields:
Prisma: The initialized Prisma client instance
Example:
```python
async with prisma_client() as db:
users = await db.user.find_many()
```
"""
client = await get_prisma_client()
try:
yield client
except Exception as e:
print(f"Database operation error: {e}")
raise
async def disconnect_prisma():
"""Disconnect the Prisma client when shutting down the application."""
global _prisma_client
if _prisma_client is not None:
await _prisma_client.disconnect()
_prisma_client = None
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
class BaseModelClient(Protocol):
async def find_many(self, **kwargs) -> list[Any]: ...
async def find_first(self, **kwargs) -> Any: ...
async def find_first_or_raise(self, **kwargs) -> Any: ...
async def find_unique(self, **kwargs) -> Any: ...
async def find_unique_or_raise(self, **kwargs) -> Any: ...
async def create(self, **kwargs) -> Any: ...
async def update(self, **kwargs) -> Any: ...
async def delete(self, **kwargs) -> Any: ...
async def delete_many(self, **kwargs) -> Any: ...
class PrismaService:
def __init__(self) -> None:
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._thread: Optional[threading.Thread] = None
self._client: Optional[Prisma] = None
self._start_loop_thread()
def _loop_runner(self) -> None:
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._loop.run_forever()
finally:
self._loop.close()
def _submit(self, coro):
if self._loop is None or not self._loop.is_running():
raise RuntimeError("PrismaService event loop is not running.")
fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
return fut.result()
async def _aconnect(self) -> Prisma:
if self._client is not None:
return self._client
logger.info("Connecting Prisma client...")
client = Prisma()
await client.connect()
self._client = client
logger.info("Prisma client connected.")
return self._client
async def _adisconnect(self) -> None:
if self._client is not None:
logger.info("Disconnecting Prisma client...")
try:
await self._client.disconnect()
finally:
self._client = None
logger.info("Prisma client disconnected.")
@asynccontextmanager
async def _asession(self) -> AsyncGenerator[Prisma, None]:
yield await self._aconnect()
def _start_loop_thread(self) -> None:
t = threading.Thread(target=self._loop_runner, name="PrismaLoop", daemon=True)
t.start()
self._thread = t
while self._loop is None:
time.sleep(0.005)
async def _connect(self) -> Prisma:
if self._client is not None:
return self._client
async with self._lock:
if self._client is None:
logger.info("Connecting Prisma client...")
client = Prisma()
await client.connect()
self._client = client
logger.info("Prisma client connected.")
return self._client
async def _disconnect(self) -> None:
async with self._lock:
if self._client is not None:
try:
logger.info("Disconnecting Prisma client...")
await self._client.disconnect()
logger.info("Prisma client disconnected.")
finally:
self._client = None
@asynccontextmanager
async def _session(self) -> AsyncGenerator[Prisma, None]:
client = await self._connect()
try:
yield client
except Exception:
logger.exception("Database operation error")
raise
def _run(self, coro):
try:
asyncio.get_running_loop()
raise RuntimeError("Async run is not allowed. Use sync methods instead.")
except RuntimeError as e:
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
with asyncio.Runner() as runner:
return runner.run(coro)
async def _a_find_many(self, table: str, query: Optional[dict] = None, take: int = None, skip: int = None,
order: Optional[list[dict]] = None, select: Optional[dict] = None, include: Optional[dict] = None
) -> list[dict]:
start = time.time()
async with self._asession() as db:
table_selected: BaseModelClient = getattr(db, table, None)
if not table_selected:
raise ValueError(f"Table {table} not found")
rows = await table_selected.find_many(where=query, take=take, skip=skip, order=order or [], select=select, include=include)
# print(f"[{datetime.now()}] Find many query completed in {time.time() - start:.2f}s")
return rows
async def _a_find_first(self, table: str, query: Optional[dict] = None, order : Optional[list[dict]] = None, include: Optional[dict] = None) -> Any:
start = time.time()
async with self._asession() as db:
table_selected: BaseModelClient = getattr(db, table, None)
if not table_selected:
raise ValueError(f"Table {table} not found")
result = await table_selected.find_first(where=query, order=order or [], include=include)
# print(f"[{datetime.now()}] Find first query completed in {time.time() - start:.2f}s")
return result
async def _a_find_first_or_throw(self, table: str, query: Optional[dict] = None, order : Optional[list[dict]] = None,
include: Optional[dict] = None
) -> Any:
start = time.time()
async with self._asession() as db:
table_selected: BaseModelClient = getattr(db, table, None)
if not table_selected:
raise ValueError(f"Table {table} not found")
result = await table_selected.find_first_or_raise(where=query, order=order or [], include=include)
# print(f"[{datetime.now()}] Find first or throw query completed in {time.time() - start:.2f}s")
return result
async def _a_create(self, table: str, data: dict, include: Optional[dict] = None) -> Any:
start = time.time()
async with self._asession() as db:
table_selected: BaseModelClient = getattr(db, table, None)
if not table_selected:
raise ValueError(f"Table {table} not found")
result = await table_selected.create(data=data, include=include)
# print(f"[{datetime.now()}] Create operation completed in {time.time() - start:.2f}s")
return result
async def _a_update(self, table: str, where: dict, data: dict, include: Optional[dict] = None) -> Any:
start = time.time()
async with self._asession() as db:
table_selected: BaseModelClient = getattr(db, table, None)
if not table_selected:
raise ValueError(f"Table {table} not found")
result = await table_selected.update(where=where, data=data, include=include)
# print(f"[{datetime.now()}] Update operation completed in {time.time() - start:.2f}s")
return result
async def _a_delete(self, table: str, where: dict, include: Optional[dict] = None) -> Any:
start = time.time()
async with self._asession() as db:
table_selected: BaseModelClient = getattr(db, table, None)
if not table_selected:
raise ValueError(f"Table {table} not found")
result = await table_selected.delete(where=where, include=include)
# print(f"[{datetime.now()}] Delete operation completed in {time.time() - start:.2f}s")
return result
async def _a_delete_many(self, table: str, where: dict, include: Optional[dict] = None):
start = time.time()
async with self._asession() as db:
table_selected: BaseModelClient = getattr(db, table, None)
if not table_selected:
raise ValueError(f"Table {table} not found")
result = await table_selected.delete_many(where=where, include=include)
# print(f"[{datetime.now()}] Delete many operation completed in {time.time() - start:.2f}s")
return result
async def _a_find_unique(self, table: str, query: dict, include: Optional[dict] = None) -> Any:
start = time.time()
async with self._asession() as db:
table_selected: BaseModelClient = getattr(db, table, None)
if not table_selected:
raise ValueError(f"Table {table} not found")
result = await table_selected.find_unique(where=query, include=include)
# print(f"[{datetime.now()}] Find unique query completed in {time.time() - start:.2f}s")
return result
async def _a_find_unique_or_throw(self, table: str, query: dict, include: Optional[dict] = None) -> Any:
start = time.time()
async with self._asession() as db:
table_selected: BaseModelClient = getattr(db, table, None)
if not table_selected:
raise ValueError(f"Table {table} not found")
result = await table_selected.find_unique_or_raise(where=query, include=include)
# print(f"[{datetime.now()}] Find unique or throw query completed in {time.time() - start:.2f}s")
return result
def find_unique_or_throw(self, table: str, query: dict, select: Optional[dict] = None, include: Optional[dict] = None):
result = self._submit(self._a_find_unique_or_throw(table=table, query=query, include=include))
if select:
result = {k: v for k, v in result if k in select}
return result
def find_unique(self, table: str, query: dict, select: Optional[dict] = None, include: Optional[dict] = None):
result = self._submit(self._a_find_unique(table=table, query=query, include=include))
if select and result:
result = {k: v for k, v in result if k in select}
return result
def find_many(
self, table: str, query: Optional[dict] = None, take: int = None, skip: int = None,
order: Optional[list[dict]] = None, select: Optional[dict] = None, include: Optional[dict] = None
):
result = self._submit(self._a_find_many(table=table, query=query, take=take, skip=skip, order=order, select=select, include=include))
if select and result:
result = [{k: v for k, v in item.items() if k in select} for item in result]
return result
def create(self, table: str, data: dict, select: Optional[dict] = None, include: Optional[dict] = None):
result = self._submit(self._a_create(table=table, data=data, include=include))
if select and result:
result = {k: v for k, v in result if k in select}
return result
def find_first_or_throw(self, table: str, query: Optional[dict] = None,
order: Optional[list[dict]] = None, select: Optional[dict] = None, include: Optional[dict] = None
):
result = self._submit(self._a_find_first_or_throw(table=table, query=query, order=order, include=include))
if select and result:
result = {k: v for k, v in result if k in select}
return result
def find_first(self, table: str, query: Optional[dict] = None, select: Optional[dict] = None, order: Optional[list[dict]] = None, include: Optional[dict] = None):
result = self._submit(self._a_find_first(table=table, query=query, order=order, include=include))
if select and result:
result = {k: v for k, v in result if k in select}
return result
def update(self, table: str, where: dict, data: dict, select: Optional[dict] = None, include: Optional[dict] = None):
result = self._submit(self._a_update(table=table, where=where, data=data, include=include))
if select and result:
result = {k: v for k, v in result if k in select}
return result
def delete(self, table: str, where: dict, select: Optional[dict] = None, include: Optional[dict] = None):
result = self._submit(self._a_delete(table=table, where=where, select=select, include=include))
if select and result:
result = {k: v for k, v in result if k in select}
return result
def delete_many(self, table: str, where: dict, select: Optional[dict] = None, include: Optional[dict] = None):
result = self._submit(self._a_delete_many(table=table, where=where, select=select, include=include))
if select and result:
result = [{k: v for k, v in item if k in select} for item in result]
return result
def disconnect(self) -> None:
try:
self._submit(self._adisconnect())
finally:
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
self._loop = None
self._thread = None

View File

@ -1,5 +1,6 @@
import logging
from json import loads, dumps
from contextlib import contextmanager
from time import sleep
from redis import Redis, RedisError, ConnectionError as RedisConnectionError
@ -90,6 +91,10 @@ class RedisHandler:
def sadd(self, key: str, value):
"""Add a value to a Redis set"""
return self.redis_client.sadd(key, value)
def ismember(self, key: str, value):
"""Check if a value is a member of a Redis set"""
return self.redis_client.sismember(key, value)
def get(self, key: str):
"""Get a value from Redis by key"""
@ -98,7 +103,15 @@ class RedisHandler:
def set(self, key: str, value):
"""Set a key-value pair in Redis"""
return self.redis_client.set(key, value)
def delete_value(self, key: str, value):
"""Delete a value from a Redis value by finding key"""
get_redis = self.get(key)
if get_redis:
get_redis: dict = loads(get_redis)
get_redis.pop(value)
self.set(key, dumps(get_redis))
def rpush(self, key: str, value):
"""Append a value to a Redis list"""
return self.redis_client.rpush(key, value)
@ -107,9 +120,13 @@ class RedisHandler:
"""Get an element from a Redis list by its index"""
return self.redis_client.lindex(key, index)
def spop(self, key: str, value):
"""Remove and return a random member from a Redis set"""
return self.redis_client.spop(key, value)
def spop(self, key: str, count=1):
"""Remove and return random members from a Redis set"""
return self.redis_client.spop(key, count)
def srem(self, key: str, value):
"""Remove a specific member from a Redis set"""
return self.redis_client.srem(key, value)
def get_all_tasks(self):
"""Get all keys matching the task prefix pattern"""

View File

@ -1,12 +1,15 @@
import logging
from time import sleep
from json import loads, dumps
from uuid import uuid4
from datetime import datetime
from Depends.config import Status, ConfigServices, RedisTaskObject, RedisData
from Depends.redis_handlers import RedisHandler
from redis import Redis
from redis.exceptions import WatchError, ResponseError
# Configure logging
logger = logging.getLogger('Service Task Retriever')
@ -15,6 +18,7 @@ class ServiceTaskRetriever:
Class for retrieving and updating Redis task objects by UUID or mail ID.
Provides direct access to task objects and service-specific data without iteration.
"""
SENTINEL = "__DEL__SENTINEL__"
def __init__(self, redis_handler=None):
"""
@ -28,7 +32,7 @@ class ServiceTaskRetriever:
else:
self.redis_handler = RedisHandler()
self.redis_client = self.redis_handler.redis_client
self.redis_client: Redis = self.redis_handler.redis_client
self.redis_prefix = ConfigServices.MAIN_TASK_PREFIX
self.mailid_index_key = ConfigServices.TASK_MAILID_INDEX_PREFIX
self.uuid_index_key = ConfigServices.TASK_UUID_INDEX_PREFIX
@ -89,13 +93,13 @@ class ServiceTaskRetriever:
task_uuid: UUID of the task
index: Index of the task
"""
already_dict = self.redis_handler.get(self.mailid_index_key)
already_dict = self.redis_handler.get(self.uuid_index_key)
if already_dict:
already_dict = loads(already_dict)
already_dict[str(task_uuid)] = index
self.redis_handler.set(self.mailid_index_key, dumps(already_dict))
self.redis_handler.set(self.uuid_index_key, dumps(already_dict))
else:
self.redis_handler.set(self.mailid_index_key, dumps({str(task_uuid): index}))
self.redis_handler.set(self.uuid_index_key, dumps({str(task_uuid): index}))
def set_index_mail_id(self, mail_id: str, index: int):
"""
@ -135,6 +139,65 @@ class ServiceTaskRetriever:
if get_index_by_mail_id := self.get_index_by_mail_id(mail_id):
self.set_index_uuid(task_uuid, get_index_by_mail_id)
def delete_task(self, task_uuid: str, max_retries: int = 20, base_sleep: float = 0.01):
"""
Delete a task object by its UUID
Args:
task_uuid: UUID of the task
max_retries: Maximum number of retries
"""
for attempt in range(max_retries):
try:
with self.redis_client.pipeline() as pipe:
pipe.watch(ConfigServices.MAIN_TASK_PREFIX, ConfigServices.TASK_UUID_INDEX_PREFIX, ConfigServices.TASK_MAILID_INDEX_PREFIX)
raw_uuid = pipe.get(ConfigServices.TASK_UUID_INDEX_PREFIX)
raw_mail = pipe.get(ConfigServices.TASK_MAILID_INDEX_PREFIX)
llen = pipe.llen(ConfigServices.MAIN_TASK_PREFIX)
if not llen:
pipe.unwatch()
return False
uuid_map = loads(raw_uuid.decode()) if raw_uuid else {}
mail_map = loads(raw_mail.decode()) if raw_mail else {}
if task_uuid not in uuid_map:
pipe.unwatch()
return False
index = int(uuid_map[task_uuid])
if index < 0:
index = int(llen) + index
if index < 0 or index >= int(llen):
pipe.unwatch()
return False
uuid_key_to_del = next((k for k, v in uuid_map.items() if int(v) == index), None)
mail_key_to_del = next((k for k, v in mail_map.items() if int(v) == index), None)
dup_uuid_count = sum(1 for v in uuid_map.values() if int(v) == index)
dup_mail_count = sum(1 for v in mail_map.values() if int(v) == index)
if dup_uuid_count > 1:
pass
if dup_mail_count > 1:
pass
if uuid_key_to_del is not None:
uuid_map.pop(uuid_key_to_del, None)
if mail_key_to_del is not None:
mail_map.pop(mail_key_to_del, None)
for k, v in list(uuid_map.items()):
if int(v) > index: uuid_map[k] = int(v) - 1
for k, v in list(mail_map.items()):
if int(v) > index: mail_map[k] = int(v) - 1
sentinel = f"__DEL__{uuid4()}__"
pipe.multi()
pipe.lset(ConfigServices.MAIN_TASK_PREFIX, index, sentinel)
pipe.lrem(ConfigServices.MAIN_TASK_PREFIX, 1, sentinel)
pipe.set(ConfigServices.TASK_UUID_INDEX_PREFIX, dumps(uuid_map))
pipe.set(ConfigServices.TASK_MAILID_INDEX_PREFIX, dumps(mail_map))
pipe.execute()
mail_key_to_del = int(mail_key_to_del)
self.redis_client.sadd(ConfigServices.TASK_DELETED_PREFIX, mail_key_to_del)
return True
except (WatchError, ResponseError):
sleep(base_sleep * (1.5 ** attempt))
continue
return False
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
"""
Get a task object directly by its UUID without iteration
@ -248,7 +311,7 @@ class ServiceTaskRetriever:
self._validate_service_name(service_name)
# Create new RedisData with proper defaults for all services
data_dict = {'MailReader': None, 'MailParser': [], 'FinderIban': {}, 'FinderComment': {}}
data_dict = {'MailReader': None, 'MailParser': [], 'FinderIban': [], 'FinderComment': []}
# Set the actual service data
data_dict['MailReader'] = mail_reader
data_dict['MailParser'] = mail_parser
@ -268,7 +331,7 @@ class ServiceTaskRetriever:
)
# Convert to dict for serialization
write_object = write_object.model_dump()
write_object = write_object.dict()
# Push new task to Redis list
redis_write_ = self.redis_client.rpush(self.redis_prefix, dumps(write_object))
@ -428,7 +491,6 @@ class ServiceTaskRetriever:
if not self.redis_client.lset(self.redis_prefix, int(index_value), dumps(task_object_dict)):
raise ValueError(f"Failed to write updated task data for UUID {task_uuid}")
return True
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
"""
@ -655,6 +717,23 @@ class MailReaderService:
"""
return self.service_retriever.update_task_status(task_uuid, is_completed, status)
def change_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False):
"""
Change the service of a task by UUID
Args:
task_uuid: UUID of the task to update
service_name: Name of the service to update
Returns:
bool: True if successful
Raises:
FileNotFoundError: If the task is not found
ValueError: If the update fails
"""
return self.service_retriever.update_task_service(task_uuid, service_name, status, completed)
def process_mail(self, mail_id: str, mail_data: dict, service_prefix: str, counter: int) -> dict:
"""
Process mail data and store it in Redis
@ -675,20 +754,42 @@ class MailReaderService:
attachments = mail_without_attachments.pop('attachments', [])
create_task = dict(task_uuid=task_uuid, service_name=service_prefix, mail_reader=mail_without_attachments, mail_parser=attachments)
self.service_retriever.create_task_with_uuid(**create_task)
return {'status': 'success', 'action': 'stored_new_mail', 'counter': counter}
else:
try:
task = self.service_retriever.get_task_by_mail_id(mail_id)
if task is None and task.data and task.data.MailReader:
stored_id = task.data.MailReader.id
if stored_id != mail_id:
return {'status': 'error', 'action': 'id_mismatch', 'stored_id': stored_id}
return {'status': 'success', 'action': 'checked_existing_mail', 'is_completed': task.is_completed if task else False}
except FileNotFoundError:
return {'status': 'error', 'action': 'not_found', 'error': f'Mail with ID {mail_id} not found in index'}
return task_uuid, counter
except Exception as e:
logger.error(f"Mail Reader Service Error processing mail {mail_id}: {str(e)}")
return {'status': 'error', 'action': 'exception', 'error': str(e)}
return None, counter
def pop_mail(self, mail_id: str):
"""
Pop a mail from Redis
Args:
mail_id: ID of the mail to pop
Returns:
bool: True if successful
Raises:
FileNotFoundError: If the mail is not found
ValueError: If the pop fails
"""
try:
if self.redis_handler.ismember(f'{ConfigServices.TASK_SEEN_PREFIX}', int(mail_id)):
self.redis_handler.srem(f'{ConfigServices.TASK_SEEN_PREFIX}', int(mail_id))
return True
return False
except Exception as e:
logger.error(f"Mail Reader Service Error popping mail {int(mail_id)}: {str(e)}")
return False
def check_mail_is_ready_to_delete(self, mail_id: str):
try:
if self.redis_handler.ismember(f'{ConfigServices.TASK_DELETED_PREFIX}', int(mail_id)):
return True
return False
except Exception as e:
logger.error(f"Mail Reader Service Error checking mail {int(mail_id)}: {str(e)}")
return False
class MailParserService:
@ -832,4 +933,228 @@ class MailParserService:
"""
Update the status of a task by UUID
"""
return self.service_retriever.update_task_status(task_uuid, is_completed, status)
return self.service_retriever.update_task_status(task_uuid, is_completed, status)
class IbanFinderService:
"""
Iban Finder Service
"""
# Singleton instance
_instance = None
REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS
def __init__(self):
if hasattr(self, '_initialized') and self._initialized:
return
self.service_retriever = ServiceTaskRetriever()
self._initialized = True
def fetch_all_tasks(self) -> list[RedisTaskObject]:
return self.service_retriever.fetch_all_tasks()
def ensure_connection(self):
"""
Ensure Redis connection is established
Returns:
bool: True if connection is established, False otherwise
"""
return self.redis_handler.ensure_connection()
def _check_redis_connection(self) -> bool:
"""
Check if Redis connection is alive using RedisHandler
Returns:
True if connection is alive, False otherwise
"""
try:
# Use RedisHandler to check connection
connection_status = self.redis_handler.ensure_connection()
if connection_status:
logger.info("Redis connection established via RedisHandler")
else:
logger.error("Redis connection check failed via RedisHandler")
return connection_status
except RedisHandler.REDIS_EXCEPTIONS as e:
logger.error(f"Redis connection failed: {str(e)}")
return False
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
"""
Get a task object by its UUID
Args:
task_uuid: UUID of the task to retrieve
Returns:
RedisTaskObject: The task object if found
Raises:
FileNotFoundError: If the UUID index or task is not found
"""
return self.service_retriever.get_task_by_uuid(task_uuid)
def get_service_data_by_uuid(self, task_uuid: str, service_name: str):
"""
Get service-specific data from a task by UUID
Args:
task_uuid: UUID of the task
service_name: Name of the service to extract data for
Returns:
Any: Service-specific data if found
Raises:
FileNotFoundError: If the task or service data is not found
"""
return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name)
def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool:
"""
Update service-specific data in a task by UUID
Args:
task_uuid: UUID of the task to update
service_name: Name of the service data to update
service_data: New service data
Returns:
bool: True if successful
Raises:
FileNotFoundError: If the task is not found
ValueError: If the update fails or service name is invalid
"""
return self.service_retriever.update_service_data(task_uuid, service_name, service_data)
def change_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False) -> bool:
"""
Update the service of a task by UUID
"""
return self.service_retriever.update_task_service(task_uuid, service_name, status, completed)
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
"""
Update the status of a task by UUID
"""
return self.service_retriever.update_task_status(task_uuid, is_completed, status)
class ProcessCommentFinderService:
"""
Process Comment Finder Service
"""
# Singleton instance
_instance = None
REDIS_EXCEPTIONS = RedisHandler.REDIS_EXCEPTIONS
def __init__(self):
if hasattr(self, '_initialized') and self._initialized:
return
self.service_retriever = ServiceTaskRetriever()
self._initialized = True
def fetch_all_tasks(self) -> list[RedisTaskObject]:
return self.service_retriever.fetch_all_tasks()
def ensure_connection(self):
"""
Ensure Redis connection is established
Returns:
bool: True if connection is established, False otherwise
"""
return self.redis_handler.ensure_connection()
def _check_redis_connection(self) -> bool:
"""
Check if Redis connection is alive using RedisHandler
Returns:
True if connection is alive, False otherwise
"""
try:
# Use RedisHandler to check connection
connection_status = self.redis_handler.ensure_connection()
if connection_status:
logger.info("Redis connection established via RedisHandler")
else:
logger.error("Redis connection check failed via RedisHandler")
return connection_status
except RedisHandler.REDIS_EXCEPTIONS as e:
logger.error(f"Redis connection failed: {str(e)}")
return False
def get_task_by_uuid(self, task_uuid: str) -> RedisTaskObject:
"""
Get a task object by its UUID
Args:
task_uuid: UUID of the task to retrieve
Returns:
RedisTaskObject: The task object if found
Raises:
FileNotFoundError: If the UUID index or task is not found
"""
return self.service_retriever.get_task_by_uuid(task_uuid)
def get_service_data_by_uuid(self, task_uuid: str, service_name: str):
"""
Get service-specific data from a task by UUID
Args:
task_uuid: UUID of the task
service_name: Name of the service to extract data for
Returns:
Any: Service-specific data if found
Raises:
FileNotFoundError: If the task or service data is not found
"""
return self.service_retriever.get_service_data_by_uuid(task_uuid, service_name)
def update_service_data(self, task_uuid: str, service_name: str, service_data: dict) -> bool:
"""
Update service-specific data in a task by UUID
Args:
task_uuid: UUID of the task to update
service_name: Name of the service data to update
service_data: New service data
Returns:
bool: True if successful
Raises:
FileNotFoundError: If the task is not found
ValueError: If the update fails or service name is invalid
"""
return self.service_retriever.update_service_data(task_uuid, service_name, service_data)
def change_service(self, task_uuid: str, service_name: str, status: str = Status.COMPLETED, completed: bool = False) -> bool:
"""
Update the service of a task by UUID
"""
return self.service_retriever.update_task_service(task_uuid, service_name, status, completed)
def update_task_status(self, task_uuid: str, is_completed: bool = True, status: str = Status.COMPLETED) -> bool:
"""
Update the status of a task by UUID
"""
return self.service_retriever.update_task_status(task_uuid, is_completed, status)
def delete_task(self, task_uuid: str, max_retries: int = 5):
"""
Delete a task object by its UUID
"""
return self.service_retriever.delete_task(task_uuid, max_retries)

View File

@ -0,0 +1,54 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Gelen Banka Kayıtları</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
table {
width: 100%;
border-collapse: collapse;
}
table, th, td {
border: 1px solid black;
}
th, td {
padding: 10px;
text-align: left;
}
th {
background-color: #f2f2f2;
}
</style>
</head>
<body>
<h1>Günaydın, Admin</h1>
<br>
<p>Banka Kayıtları : {{today}} </p>
<p><b>Son Bakiye : {{bank_balance}} </b></p>
<p><b>{{"Status : İkinci Bakiye Hatalı" if balance_error else "Status :OK"}}</b></p>
<table border="1">
<thead>
<tr>
{% for header in headers %}
<th>{{ header }}</th>
{% endfor %}
</tr>
</thead>
<tbody>
{% for row in rows %}
<tr>
{% for cell in row %}
<td>{{ cell }}</td>
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
<p>Teşekkür ederiz,<br>Evyos Yönetim<br>Saygılarımızla</p>
</body>
</html>

View File

@ -1,4 +1,6 @@
prisma==0.9.1
python-dotenv==1.0.0
asyncio==3.4.3
uvloop>=0.19
uvloop>=0.19
redis>=6.4.0
unidecode>=1.3.8
arrow>=1.3.0

View File

@ -21,6 +21,26 @@ services:
# timeout: 10s
# retries: 3
# prisma_studio:
# image: node:18
# working_dir: /app
# # volumes:
# # - ./ServicesRunner/Depends:/app
# ports:
# - "5555:5555"
# entrypoint: [ "/bin/sh", "-c" ]
# command: |
# "npx prisma studio --schema=/app/schema.prisma"
# depends_on:
# - prisma_service_test
# networks:
# - bank-services-network
# logging:
# driver: "json-file"
# options:
# max-size: "10m"
# max-file: "3"
prisma_service_iban:
container_name: prisma_service_iban
build:
@ -42,39 +62,27 @@ services:
timeout: 10s
retries: 3
# prisma_studio:
# image: node:18
# working_dir: /app
# # volumes:
# # - ./ServicesRunner/Depends:/app
# ports:
# - "5555:5555"
# entrypoint: [ "/bin/sh", "-c" ]
# command: |
# "npx prisma studio --schema=/app/schema.prisma"
# depends_on:
# - prisma_service_test
# networks:
# - bank-services-network
# logging:
# driver: "json-file"
# options:
# max-size: "10m"
# max-file: "3"
# finder_payments:
# container_name: finder_payments
# env_file:
# - api_env.env
# build:
# context: .
# dockerfile: ServicesBank/Finder/Payment/Dockerfile
# networks:
# - bank-services-network
# logging:
# driver: "json-file"
# options:
# max-size: "10m"
# max-file: "3"
prisma_service_process_comment:
container_name: prisma_service_process_comment
build:
context: .
dockerfile: ServicesRunner/AccountRecordServices/Finder/Comment/Dockerfile
networks:
- bank-services-network
volumes:
- ./ServicesRunner/AccountRecordServices/Finder/Comment/venv:/opt/venv
- ./ServicesRunner/AccountRecordServices/Finder/Comment/.prisma-cache:/root/.cache/prisma-python
restart: on-failure
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
healthcheck:
test: [ "CMD", "/opt/venv/bin/python", "-c", "import asyncio; from ServicesRunner.Depends.prisma_client import get_prisma_client; asyncio.run(get_prisma_client())" ]
interval: 15s
timeout: 10s
retries: 3
isbank_email_reader:
container_name: isbank_email_reader
@ -112,6 +120,22 @@ services:
options:
max-size: "10m"
max-file: "3"
# finder_payments:
# container_name: finder_payments
# env_file:
# - api_env.env
# build:
# context: .
# dockerfile: ServicesBank/Finder/Payment/Dockerfile
# networks:
# - bank-services-network
# logging:
# driver: "json-file"
# options:
# max-size: "10m"
# max-file: "3"
networks:
bank-services-network: