wag-services-and-backend-la.../BankServices/SenderService/app.py

92 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import arrow
import os
from Services.MongoService.provider import MongoProvider
from Configs.mongo import MongoConfig
from jinja2 import Environment, FileSystemLoader
from Services.EmailService.provider import send_email
mongo_prefix = "CollectedData"
send_to = "karatay@mehmetkaratay.com.tr"
delimiter = "|"
def check_any_written_stage_in_mongo_database(mongo_provider) -> list:
return mongo_provider.find_many(filter_query={"stage": "written", "send": None})
def render_email_template(
headers: list, rows: list, balance_error: bool, bank_balance: float
):
template_dir = os.path.join(os.path.dirname(__file__), "templates")
# Load templates from the directory
env = Environment(loader=FileSystemLoader(template_dir))
# Load the specific template file
template = env.get_template("template_accounts.html")
# Render template with variables
return template.render(
headers=headers,
rows=rows,
bank_balance=f"{bank_balance:.4f}",
balance_error=balance_error,
today=str(arrow.now().date()),
)
def send_email_to_given_address(to: str, html_template: str) -> bool:
subject = f"{str(arrow.now().date())} Gunes Apt. Cari DurumKaydedilen Raporu"
try:
send_email(subject=subject, receivers=[to], html=html_template)
print(f"Email is sent to : {send_to}. BB")
return True
except Exception as e:
print(f"Error: {e}")
print("Email is not sent")
return False
def send_email_and_update_mongo_database(mongo_provider, email_data) -> bool:
parsed_list, list_of_rows = email_data.get("parsed"), list()
if not parsed_list:
return False
for parsed in parsed_list:
bank_date = arrow.get(str(parsed.get("bank_date")))
process_comment = parsed.get("process_comment")
currency_value = f"{parsed.get('currency_value'):.4f}"
list_of_rows.append([bank_date, process_comment, currency_value])
html_template = render_email_template(
headers=["Ulaştığı Tarih", "Banka Transaksiyonu Ek Bilgi", "Aktarım Değeri"],
rows=list_of_rows,
balance_error=False,
bank_balance=0,
)
if send_email_to_given_address(to=send_to, html_template=html_template):
mongo_provider.update_one(
filter_query={"filename": email_data.get("filename")},
update_data={"$set": {"send": True}},
)
return True
return False
if __name__ == "__main__":
while True:
print("Sender service is running...")
with MongoProvider.mongo_client() as mongo_client:
provider = MongoProvider(
client=mongo_client,
database=MongoConfig.DATABASE_NAME,
storage_reason=[mongo_prefix, str(arrow.now().date())],
)
results = check_any_written_stage_in_mongo_database(mongo_provider=provider)
for result in results:
send_email_and_update_mongo_database(
mongo_provider=provider, email_data=result,
)
time.sleep(60)