production-evyos-systems-an.../ServicesBank/Finder/BuildExtractor/runner.py

67 lines
2.8 KiB
Python

import arrow
from Schemas import AccountRecords, BuildIbans, BuildDecisionBook
from Controllers.Postgres.engine import get_session_factory
from sqlalchemy import cast, Date
def account_records_find_decision_book(session):
AccountRecords.set_session(session)
BuildIbans.set_session(session)
BuildDecisionBook.set_session(session)
created_ibans, iban_build_dict = [], {}
filter_account_records = AccountRecords.build_id != None, AccountRecords.build_decision_book_id == None
account_records_list: list[AccountRecords] = AccountRecords.query.filter(*filter_account_records).order_by(AccountRecords.bank_date.desc()).all()
for account_record in account_records_list:
if found_iban := BuildIbans.query.filter(BuildIbans.iban == account_record.iban).first():
if found_decision_book := BuildDecisionBook.query.filter(
BuildDecisionBook.build_id == found_iban.build_id,
cast(BuildDecisionBook.expiry_starts, Date) <= cast(account_record.bank_date, Date),
cast(BuildDecisionBook.expiry_ends, Date) >= cast(account_record.bank_date, Date),
).first():
account_record.build_decision_book_id = found_decision_book.id
account_record.build_decision_book_uu_id = str(found_decision_book.uu_id)
account_record.save()
def account_find_build_from_iban(session):
AccountRecords.set_session(session)
BuildIbans.set_session(session)
account_records_ibans = AccountRecords.query.filter(AccountRecords.build_id == None, AccountRecords.approved_record == False).distinct(AccountRecords.iban).all()
for account_records_iban in account_records_ibans:
found_iban: BuildIbans = BuildIbans.query.filter(BuildIbans.iban == account_records_iban.iban).first()
if not found_iban:
create_build_ibans = BuildIbans.create(iban=account_records_iban.iban, start_date=str(arrow.now().shift(days=-1)))
create_build_ibans.save()
else:
update_dict = {"build_id": found_iban.build_id, "build_uu_id": str(found_iban.build_uu_id)}
session.query(AccountRecords).filter(AccountRecords.iban == account_records_iban.iban).update(update_dict, synchronize_session=False)
session.commit()
if __name__ == "__main__":
print("Build Extractor Service is running...")
session_factory = get_session_factory()
session = session_factory()
try:
account_find_build_from_iban(session=session)
except Exception as e:
print(f"Error occured on find build : {e}")
session.rollback()
try:
account_records_find_decision_book(session=session)
except Exception as e:
print(f"Error occured on find decision book : {e}")
session.rollback()
session.close()
session_factory.remove()
print("Build Extractor Service is finished...")