import time import arrow import io import datetime from pandas import DataFrame, read_excel from Services.MongoService.provider import MongoProvider from unidecode import unidecode from Configs.mongo import MongoConfig mongo_prefix = "CollectedData" delimiter = "|" def collect_excel_files_from_mongo_database(mongo_provider) -> list: return mongo_provider.find_many(filter_query={"stage": "read"}) def update_parsed_data_to_mongo_database(mongo_provider, collected_data_dict: dict, filename: str) -> None: if collected_data_dict: payload = collected_data_dict[filename] if payload: print('filename, payload', filename, payload) mongo_provider.update_one( filter_query={"filename": filename}, update_data={"$set": {"parsed": payload, "stage": "parsed"}}, ) return mongo_provider.update_one( filter_query={"filename": filename}, update_data={"$set": {"parsed": None, "stage": "not found"}}, ) return def parse_excel_file(excel_frame: DataFrame, excel_name: str) -> dict: iban, data_dict = "", {} for row in excel_frame.itertuples(): if "IBAN" in str(row[3]).upper(): iban = str(row[5]).replace(" ", "") if not str(row[1]) == "nan" and not str(row[2]) == "nan": if len(str(row[1]).split("/")) > 2: data_dict[excel_name] = dict( iban=str(iban), bank_date=arrow.get(datetime.datetime.strptime( str(row[1]), "%d/%m/%Y-%H:%M:%S" )).__str__(), channel_branch=unidecode(str(row[3])), currency_value=( float(str(row[4]).replace(",", "")) if row[4] else 0 ), balance=float(str(row[5]).replace(",", "")) if row[5] else 0, additional_balance=( float(str(row[6]).replace(",", "")) if row[6] else 0 ), process_name=str(row[7]), process_type=unidecode(str(row[8])), process_comment=unidecode(str(row[9])), bank_reference_code=str(row[15]), ) return data_dict def app(): print("Hello from parserservice!") with MongoProvider.mongo_client() as mongo_client: mongo_provider = MongoProvider( client=mongo_client, database=MongoConfig.DATABASE_NAME, storage_reason=[mongo_prefix, str(arrow.now().date())], ) results = collect_excel_files_from_mongo_database(mongo_provider) if not results: print("No results found.") return for result in results: filename, payload = result.get("filename"), result.get("payload") # Create an in-memory file-like object excel_frame = DataFrame(read_excel(io.BytesIO(payload))) # Extract IBAN and root info from the xl file collected_data_dict = parse_excel_file(excel_frame, filename) update_parsed_data_to_mongo_database( mongo_provider=mongo_provider, collected_data_dict=collected_data_dict, filename=filename ) if __name__ == "__main__": while True: app() time.sleep(60)