wag-services-and-backend-la.../BankServices/SenderService/app.py

104 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import arrow
import os
from Services.MongoService.provider import MongoProvider
from Configs.mongo import MongoConfig
from jinja2 import Environment, FileSystemLoader
from Services.EmailService.provider import send_email
mongo_prefix = "CollectedData"
delimiter = "|"
# todo Check if postgres is_email_send === False then send email
# todo Trigger @mongo email_send = False then send email
send_to = "karatay@mehmetkaratay.com.tr"
def check_any_written_stage_in_mongo_database(mongo_provider) -> list:
return mongo_provider.find_many(filter_query={"stage": "written", "send": None})
def render_email_template(
headers: list, rows: list, balance_error: bool, bank_balance: float
):
template_dir = os.path.join(os.path.dirname(__file__), "templates")
env = Environment(
loader=FileSystemLoader(template_dir)
) # Load templates from the directory
template = env.get_template(
"template_accounts.html"
) # Load the specific template file
# Render template with variables
return template.render(
headers=headers,
rows=rows,
bank_balance=f"{bank_balance:.4f}",
balance_error=balance_error,
today=str(arrow.now().date()),
)
def send_email_to_given_address(to: str, html_template: str) -> bool:
today = arrow.now()
subject = f"{str(today.date())} Gunes Apt. Cari Durum Bilgilendirme Raporu"
try:
send_email(subject=subject, receivers=[to], html=html_template)
print(f"Email is sent to : {send_to}. BB")
return True
except Exception as e:
print(f"Error: {e}")
print("Email is not sent")
return False
def send_email_and_update_mongo_database(mongo_provider, email_data) -> bool:
parsed_list = email_data.get("parsed")
if not parsed_list:
return False
list_of_rows = list()
for parsed in parsed_list:
bank_date = arrow.get(str(parsed.get("bank_date")))
process_comment = parsed.get("process_comment")
currency_value = f"{parsed.get('currency_value'):.4f}"
list_of_rows.append(
[bank_date, process_comment, currency_value]
)
html_template = render_email_template(
headers=["Ulaştığı Tarih", "Banka Transaksiyonu Ek Bilgi", "Aktarım Değeri"],
rows=list_of_rows,
balance_error=False,
bank_balance=0,
)
if send_email_to_given_address(to=send_to, html_template=html_template):
mongo_provider.update_one(
filter_query={"filename": email_data.get("filename")},
update_data={"$set": {"send": True}},
)
return True
return False
if __name__ == "__main__":
while True:
print("Sender service is running...")
with MongoProvider.mongo_client() as mongo_client:
provider = MongoProvider(
client=mongo_client,
database=MongoConfig.DATABASE_NAME,
storage_reason=[mongo_prefix, str(arrow.now().date())],
)
results = check_any_written_stage_in_mongo_database(mongo_provider=provider)
for result in results:
send_email_and_update_mongo_database(
mongo_provider=provider,
email_data=result,
)
time.sleep(5)