Revert "updated prisma service async runner"
This reverts commit db0ae34948.
This commit is contained in:
@@ -1 +0,0 @@
|
||||
3.12
|
||||
@@ -1,22 +0,0 @@
|
||||
FROM python:3.12-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy only the dependency files first to leverage Docker cache
|
||||
COPY ServicesRunner/AccountRecordServices/Finder/Parser/pyproject.toml .
|
||||
|
||||
# Install dependencies
|
||||
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir .
|
||||
|
||||
# Copy only the necessary directories
|
||||
COPY ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank /app/ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank
|
||||
COPY ServicesRunner/Depends /app/ServicesRunner/Depends
|
||||
|
||||
# Set the Python path to include the root directory and ServicesRunner
|
||||
ENV PYTHONPATH=/app:/app/ServicesRunner
|
||||
|
||||
# Set working directory to the IsBank service directory
|
||||
WORKDIR /app/ServicesRunner/AccountRecordServices/Finder/Parser/Excel/Isbank
|
||||
|
||||
# Run the application
|
||||
CMD ["python", "app.py"]
|
||||
@@ -1,211 +0,0 @@
|
||||
import sys
|
||||
import logging
|
||||
import pandas as pd
|
||||
|
||||
from time import sleep
|
||||
from datetime import datetime
|
||||
from io import BytesIO
|
||||
from ServicesRunner.Depends.config import ConfigServices, MailParser, RedisTaskObject, Status
|
||||
from base64 import b64decode
|
||||
from unidecode import unidecode
|
||||
|
||||
from Depends.service_handler import MailParserService
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[logging.StreamHandler(sys.stdout), logging.FileHandler('isbank_parser_service.log')]
|
||||
)
|
||||
logger = logging.getLogger('IsBank_Parser_Service')
|
||||
|
||||
|
||||
class IsbankMailParserService:
|
||||
|
||||
def __init__(self):
|
||||
self.mail_parser_service = MailParserService()
|
||||
|
||||
def try_dataframe_extract_with_xlsx(self, binary_data: BytesIO):
|
||||
try:
|
||||
df = pd.read_excel(binary_data, engine='openpyxl')
|
||||
return df
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
def try_dataframe_extract_with_xls(self, binary_data: BytesIO):
|
||||
try:
|
||||
df = pd.read_excel(binary_data, engine='xlrd')
|
||||
return df
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
def try_dataframe_extract_else(self, binary_data: BytesIO):
|
||||
try:
|
||||
df = pd.read_excel(binary_data, engine='openpyxl')
|
||||
except Exception as e1:
|
||||
try:
|
||||
binary_data.seek(0)
|
||||
df = pd.read_excel(binary_data, engine='xlrd')
|
||||
except Exception as e2:
|
||||
return None
|
||||
return df
|
||||
|
||||
def find_ibans(self, excel_frame: pd.DataFrame, file_name: str) -> list[dict]:
|
||||
"""Parse Excel file data.
|
||||
|
||||
Args:
|
||||
excel_frame: DataFrame containing Excel data
|
||||
|
||||
Returns:
|
||||
list[dict]: List of parsed data dictionaries
|
||||
"""
|
||||
iban, data_list = "", []
|
||||
try:
|
||||
for row in excel_frame.itertuples():
|
||||
if "IBAN" in str(row[3]).upper():
|
||||
iban = str(row[5]).replace(" ", "")
|
||||
if not str(row[1]) == "nan" and not str(row[2]) == "nan":
|
||||
if len(str(row[1]).split("/")) > 2:
|
||||
data_list.append(
|
||||
dict(
|
||||
filename=file_name,
|
||||
iban=str(iban),
|
||||
bank_date=datetime.strptime(str(row[1]), "%d/%m/%Y-%H:%M:%S").strftime("%Y-%m-%d %H:%M:%S"),
|
||||
channel_branch=unidecode(str(row[3])),
|
||||
currency_value=(float(str(row[4]).replace(",", "")) if row[4] else 0),
|
||||
balance=(float(str(row[5]).replace(",", "")) if row[5] else 0),
|
||||
additional_balance=(float(str(row[6]).replace(",", "")) if row[6] else 0),
|
||||
process_name=str(row[7]),
|
||||
process_type=unidecode(str(row[8])),
|
||||
process_comment=unidecode(str(row[9])),
|
||||
bank_reference_code=str(row[15]),
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"[PARSER_SERVICE] Error parsing Excel file: {str(e)}")
|
||||
return data_list
|
||||
|
||||
def parse_dataframes(self, dataframe: pd.DataFrame, task: RedisTaskObject, attachment_data: MailParser):
|
||||
file_name = attachment_data.filename
|
||||
data_list = self.find_ibans(dataframe, file_name)
|
||||
print(f"[PARSER_SERVICE] Successfully parsed {len(data_list)} records from Excel file")
|
||||
if data_list:
|
||||
self.mail_parser_service.update_service_data(task.task, ConfigServices.SERVICE_PREFIX_FINDER_IBAN, data_list)
|
||||
print(f"Updated service data for task {task.task} with {len(data_list)} records")
|
||||
self.mail_parser_service.change_service(task.task, ConfigServices.SERVICE_PREFIX_MAIL_PARSER, Status.COMPLETED, True)
|
||||
return True
|
||||
self.mail_parser_service.change_service(task.task, ConfigServices.SERVICE_PREFIX_MAIL_PARSER, Status.FAILED, True)
|
||||
return False
|
||||
|
||||
def process_task(self, active_task: RedisTaskObject):
|
||||
"""Process a task object using the MailParserService
|
||||
|
||||
Args:
|
||||
task: RedisTaskObject or task dictionary to process
|
||||
"""
|
||||
|
||||
try:
|
||||
for data in active_task.data.MailParser:
|
||||
attachment_data: MailParser = data
|
||||
if not attachment_data or not attachment_data.data:
|
||||
print(f"[PARSER_SERVICE] No data found for task {active_task.task}")
|
||||
continue
|
||||
binary_data: bytes = b64decode(attachment_data.data)
|
||||
excel_data = BytesIO(binary_data)
|
||||
df = self.try_dataframe_extract_with_xlsx(excel_data)
|
||||
if df is None:
|
||||
excel_data.seek(0)
|
||||
df = self.try_dataframe_extract_with_xls(excel_data)
|
||||
if df is None:
|
||||
excel_data.seek(0)
|
||||
df = self.try_dataframe_extract_else(excel_data)
|
||||
|
||||
if df is not None:
|
||||
self.parse_dataframes(df, active_task, attachment_data)
|
||||
print(f"[PARSER_SERVICE] Completed processing task {active_task.task}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[PARSER_SERVICE] Error processing task: {str(e)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.info("Starting IsBank Parser Service")
|
||||
print(f"Starting IsBank Parser Service.")
|
||||
|
||||
# Initialize service
|
||||
parser = IsbankMailParserService()
|
||||
|
||||
# Configurable parameters
|
||||
normal_sleep_time = 10 # seconds between normal operations
|
||||
error_sleep_time = 30 # seconds to wait after an error before retrying
|
||||
max_consecutive_errors = 5 # maximum number of consecutive errors before longer pause
|
||||
extended_error_sleep = 120 # seconds to wait after hitting max consecutive errors
|
||||
consecutive_errors = 0
|
||||
|
||||
# Main service loop
|
||||
while True:
|
||||
try:
|
||||
# Fetch all tasks
|
||||
all_tasks = parser.mail_parser_service.fetch_all_tasks()
|
||||
|
||||
if all_tasks and len(all_tasks) > 0:
|
||||
logger.info(f"Found {len(all_tasks)} tasks to process")
|
||||
|
||||
# Process each task
|
||||
for active_task in all_tasks:
|
||||
if active_task.service == ConfigServices.SERVICE_PREFIX_MAIL_PARSER and active_task.completed:
|
||||
# logger.info(f"Task {active_task.task} is already processed.")
|
||||
continue
|
||||
|
||||
logger.info(f"Processing task {active_task.task}")
|
||||
parser.process_task(active_task)
|
||||
else:
|
||||
logger.info("No tasks found to process")
|
||||
|
||||
# Reset error counter on success
|
||||
if consecutive_errors > 0:
|
||||
logger.info(f"Service recovered after {consecutive_errors} consecutive errors")
|
||||
consecutive_errors = 0
|
||||
|
||||
# Normal operation sleep
|
||||
sleep(normal_sleep_time)
|
||||
|
||||
except MailParserService.REDIS_EXCEPTIONS as e:
|
||||
# Redis-specific errors
|
||||
consecutive_errors += 1
|
||||
logger.error(f"Redis error (attempt {consecutive_errors}): {str(e)}")
|
||||
|
||||
# Use centralized reconnection handler from RedisHandler
|
||||
mail_parser_service, need_extended_sleep = MailParserService.handle_reconnection(
|
||||
consecutive_errors=consecutive_errors, max_consecutive_errors=max_consecutive_errors
|
||||
)
|
||||
|
||||
if mail_parser_service:
|
||||
# Update parser's mail parser service with the new instance
|
||||
parser.mail_parser_service = mail_parser_service
|
||||
|
||||
# Sleep based on error count
|
||||
if need_extended_sleep:
|
||||
sleep(extended_error_sleep)
|
||||
else:
|
||||
sleep(error_sleep_time)
|
||||
|
||||
except Exception as e:
|
||||
# Any other unexpected errors
|
||||
consecutive_errors += 1
|
||||
logger.error(f"Unexpected error (attempt {consecutive_errors}): {str(e)}")
|
||||
|
||||
# For any other error, try to reinitialize everything after some delay
|
||||
if consecutive_errors >= max_consecutive_errors:
|
||||
logger.warning(f"Hit {max_consecutive_errors} consecutive errors, reinitializing service")
|
||||
try:
|
||||
# Reinitialize the service directly
|
||||
parser = IsbankMailParserService()
|
||||
logger.info("Successfully reinitialized parser service")
|
||||
consecutive_errors = 0 # Reset counter after reinitialization
|
||||
except Exception as reinit_error:
|
||||
logger.error(f"Service reinitialization failed: {str(reinit_error)}")
|
||||
|
||||
sleep(extended_error_sleep)
|
||||
else:
|
||||
# For fewer consecutive errors, just retry
|
||||
sleep(error_sleep_time)
|
||||
@@ -1,6 +0,0 @@
|
||||
def main():
|
||||
print("Hello from parser!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,15 +0,0 @@
|
||||
[project]
|
||||
name = "parser"
|
||||
version = "0.1.0"
|
||||
description = "Add your description here"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
"arrow>=1.3.0",
|
||||
"pandas>=2.2.3",
|
||||
"redis>=6.4.0",
|
||||
"unidecode>=1.3.8",
|
||||
"xlrd>=2.0.1",
|
||||
"openpyxl>=3.1.2",
|
||||
"pydantic-settings>=2.8.1",
|
||||
]
|
||||
Reference in New Issue
Block a user