account records balancer added

This commit is contained in:
2024-11-24 17:33:52 +03:00
parent 6ad1ba7d62
commit 54ccc55c34
13 changed files with 787 additions and 253 deletions

View File

@@ -1,3 +1,6 @@
import re
from gc import garbage
import textdistance
from unidecode import unidecode
from datetime import datetime
@@ -11,6 +14,9 @@ from databases import (
from typing import Optional
from pydantic import BaseModel
from databases.sql_models.company.company import Companies
from databases.sql_models.identity.identity import People
class InsertBudgetRecord(BaseModel):
iban: str
@@ -53,8 +59,10 @@ def strip_date_to_valid(date_str):
return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
def find_iban_in_comment(iban: str, comment: str):
iban_results = BuildIbanDescription.filter_by_one(system=True, iban=iban).data
def find_iban_in_comment(iban: str, comment: str, living_space_dict: dict = None):
iban_results = BuildIbanDescription.filter_all(
BuildIbanDescription.iban == iban, system=True
).data
sm_dict_extended, sm_dict_digit = {}, {}
for iban_result in iban_results or []:
candidate_parts = comment.split(" ")
@@ -74,9 +82,7 @@ def find_iban_in_comment(iban: str, comment: str):
)
found = False
name_list = (
unidecode(str(iban_result.search_word))
.replace(".", " ")
.split(" ")
unidecode(str(iban_result.search_word)).replace(".", " ").split(" ")
)
for name in name_list:
if len(name) > 3 and name.lower() in comment.lower():
@@ -92,112 +98,376 @@ def find_iban_in_comment(iban: str, comment: str):
)[0]
if float(result[1]) >= 0.5:
iban_result = BuildIbanDescription.filter_one(
BuildIbanDescription.id==int(result[0]),
system=True
BuildIbanDescription.id == int(result[0]), system=True
).data
return {
"decision_book_project_id": iban_result.decision_book_project_id,
# "decision_book_project_id": iban_result.decision_book_project_id,
# "build_parts_id": iban_result.build_parts_id,
"company_id": iban_result.company_id,
"customer_id": iban_result.customer_id,
"build_parts_id": iban_result.build_parts_id,
"found_from": "Name",
"similarity": result[1],
}
return {
"decision_book_project_id": None,
# "decision_book_project_id": None,
# "build_parts_id": None,
"company_id": None,
"customer_id": None,
"build_parts_id": None,
"found_from": None,
"similarity": 0.0,
}
def parse_comment_with_name(iban: str, comment: str):
if "*" in comment:
b_comment, a_comment = (
unidecode(str(comment)).split("*")[0],
unidecode(str(comment)).split("*")[1],
)
a_result = find_iban_in_comment(iban, a_comment)
b_result = find_iban_in_comment(iban, b_comment)
if a_result["similarity"] > b_result["similarity"]:
a_result["send_person_id"] = a_result["customer_id"]
return a_result
def remove_spaces_from_string(remove_string: str):
letter_list = []
for letter in remove_string.split(" "):
if letter_ := "".join(i for i in letter if not i == " "):
letter_list.append(letter_)
return " ".join(letter_list).upper()
def get_garbage_words(comment: str, search_word: str):
garbage_words = remove_spaces_from_string(comment)
search_word = remove_spaces_from_string(search_word)
for letter in search_word.split(" "):
garbage_words = garbage_words.replace(remove_spaces_from_string(letter), "")
return str(remove_spaces_from_string(garbage_words)).upper()
def remove_garbage_words(comment: str, garbage_word: str):
cleaned_comment = remove_spaces_from_string(comment.replace("*", " "))
garbage_word = remove_spaces_from_string(garbage_word.replace("*", " "))
for letter in garbage_word.split(" "):
cleaned_comment = unidecode(remove_spaces_from_string(cleaned_comment))
cleaned_comment = cleaned_comment.replace(remove_spaces_from_string(letter), "")
return str(remove_spaces_from_string(cleaned_comment)).upper()
def check_is_comment_is_build(comment: str):
has_build_words = False
candidate_parts = remove_spaces_from_string(comment.replace("*", " ")).split(" ")
for candidate_part in candidate_parts:
candidate_part = remove_spaces_from_string(candidate_part).replace(":", "")
for build_word in ["no", "daire", "apt", "apartman"]:
if unidecode(candidate_part).upper() in unidecode(build_word).upper():
has_build_words = True
break
return has_build_words
def get_list_of_build_words(comment: str):
build_words = []
candidate_parts = remove_spaces_from_string(comment.replace("*", " "))
for build_word in ["no", "nolu", "daire", "apt", "apartman"]:
if unidecode(build_word).upper() in unidecode(candidate_parts).upper():
st = unidecode(candidate_parts).upper().index(unidecode(build_word).upper())
et = st + len(build_word)
st = st - 5 if st > 5 else 0
et = et + 5 if et + 5 <= len(candidate_parts) else len(candidate_parts)
number_digit = "".join(
letter for letter in str(candidate_parts[st:et]) if letter.isdigit()
)
if number_digit:
rt_dict = {
"garbage": candidate_parts[st:et],
"number": int(number_digit) if number_digit else None,
}
build_words.append(rt_dict)
return build_words
def generate_pattern(word):
if len(word) < 1:
raise ValueError("The word must have at least 1 character.")
add_string, add_match = "\d{1,3}$\s?$", f"{{1, {len(word)}}}"
adda_string = "d{1,3}$\s?\^["
return adda_string + f"{word}]" + add_match + rf"{word}(?:e|é|ı|i|ğr)?" + add_string
def test_pattern(word, test_cases): # Generate the pattern
pattern = generate_pattern(word)
for test in test_cases: # Test the regex pattern on each input and print results
if re.match(pattern, test, re.IGNORECASE):
print(f"'{test}' matches the pattern.", "*" * 60)
else:
b_result["send_person_id"] = None
return b_result
else:
result = find_iban_in_comment(iban, comment)
result["send_person_id"] = result.get("customer_id", None)
return result
print(f"'{test}' does NOT match the pattern.")
def wag_insert_budget_record(data):
similarity_result = parse_comment_with_name(data["iban"], data["process_comment"])
build_iban = BuildIbans.find_one(iban=data["iban"])
def parse_comment_for_living_space(
iban: str, comment: str, living_space_dict: dict = None
):
comment = unidecode(comment)
best_similarity = dict(
company=None, living_space=None, found_from=None, similarity=0.0, garbage=""
)
for person in living_space_dict[iban]["people"]:
person: People = person
first_name = unidecode(person.firstname).upper()
last_name = unidecode(person.surname).upper()
middle_name = unidecode(person.middle_name).upper()
search_word = f"{first_name} {last_name}"
if middle_name:
search_word = f"{first_name} {middle_name} {last_name}"
garbage_words = get_garbage_words(comment, search_word)
cleaned_comment = remove_garbage_words(comment, garbage_words)
similarity_ratio = textdistance.jaro_winkler(cleaned_comment, search_word)
if similarity_ratio > float(best_similarity["similarity"]):
for living_space in living_space_dict[iban]["living_space"]:
if living_space.person_id == person.id:
best_similarity = {
"company": None,
"living_space": living_space,
"found_from": "Person Name",
"similarity": similarity_ratio,
"garbage": garbage_words,
}
# print(
# 'cleaned_comment', cleaned_comment, '\n'
# 'search_word', search_word, '\n'
# 'best_similarity', best_similarity, '\n'
# 'person name', f"{first_name} {last_name}", '\n'
# 'similarity_ratio', similarity_ratio, '\n'
# 'garbage_words', garbage
# )
return best_similarity
if payload := InsertBudgetRecord(**data):
payload_dict = payload.model_dump(exclude_unset=True, exclude_none=True)
decision_books = BuildDecisionBook.select_only(
BuildDecisionBook.period_start_date
< strip_date_to_valid(payload_dict["bank_date"]),
BuildDecisionBook.period_stop_date
> strip_date_to_valid(payload_dict["bank_date"]),
select_args=[BuildDecisionBook.id],
order_by=[BuildDecisionBook.expiry_ends.desc()],
)
payload_dict["build_id"] = getattr(
BuildIbans.find_one(iban=data["iban"]), "build_id", None
)
living_space, count = BuildLivingSpace.find_living_from_customer_id(
similarity_result.get("customer_id", None),
strip_date_to_valid(payload_dict["bank_date"]),
)
# living_space, count = BuildLivingSpace.filter(
# or_(
# BuildLivingSpace.owner_person_id
# == similarity_result.get("customer_id", None),
# BuildLivingSpace.life_person_id
# == similarity_result.get("customer_id", None),
# ),
# BuildLivingSpace.start_date
# < strip_date_to_valid(payload_dict["bank_date"]) - timedelta(days=30),
# BuildLivingSpace.stop_date
# > strip_date_to_valid(payload_dict["bank_date"]) + timedelta(days=30),
# BuildLivingSpace.active == True,
# BuildLivingSpace.deleted == False,
# )
payload_dict["build_decision_book_id"] = (
decision_books[0][0].id if decision_books else None
)
payload_dict["company_id"] = similarity_result.get("company_id", None)
payload_dict["customer_id"] = similarity_result.get("customer_id", None)
payload_dict["send_person_id"] = similarity_result.get("send_person_id", None)
payload_dict["build_parts_id"] = (
living_space[0].build_parts_id if living_space else None
)
def parse_comment_for_company_or_individual(comment: str):
companies_list = Companies.filter_all(
Companies.commercial_type != "Commercial", system=True
).data
comment = unidecode(comment)
best_similarity = dict(
company=None, living_space=None, found_from=None, similarity=0.0, garbage=""
)
for company in companies_list:
search_word = unidecode(company.public_name)
garbage_words = get_garbage_words(comment, search_word)
cleaned_comment = remove_garbage_words(comment, garbage_words)
similarity_ratio = textdistance.jaro_winkler(cleaned_comment, search_word)
if similarity_ratio > float(best_similarity["similarity"]):
best_similarity = {
"company": company,
"living_space": None,
"found_from": "Customer Public Name",
"similarity": similarity_ratio,
"garbage": garbage_words,
}
# print(
# 'cleaned_comment', cleaned_comment, '\n'
# 'search_word', search_word, '\n'
# 'best_similarity', best_similarity, '\n'
# 'company name', company.public_name, '\n'
# 'similarity_ratio', similarity_ratio, '\n'
# 'garbage_words', garbage_words
# )
return best_similarity
payload_dict["bank_date_y"] = strip_date_to_valid(
payload_dict["bank_date"]
).year
payload_dict["bank_date_m"] = strip_date_to_valid(
payload_dict["bank_date"]
).month
payload_dict["bank_date_d"] = strip_date_to_valid(payload_dict["bank_date"]).day
payload_dict["bank_date_w"] = strip_date_to_valid(
payload_dict["bank_date"]
).isocalendar()[2]
payload_dict["build_id"] = build_iban.build_id if build_iban else None
payload_dict["replication_id"] = 55
payload_dict["receive_debit"] = (
"R" if payload_dict["currency_value"] < 0 else "D"
)
data, found = AccountRecords.find_or_create(
**payload_dict,
found_from=similarity_result.get("found_from", None),
similarity=similarity_result.get("similarity", 0.0),
)
data.payment_budget_record_close()
return data, found
def parse_comment_to_split_with_star(account_record: AccountRecords):
if "*" in account_record.process_comment:
process_comment = str(account_record.process_comment.replace("**", "*"))
process_comments = process_comment.split("*")
return len(process_comments), *process_comments
return 1, account_record.process_comment
def parse_comment_with_name(
account_record: AccountRecords, living_space_dict: dict = None
):
comments = parse_comment_to_split_with_star(account_record=account_record)
best_similarity = {"similarity": 0.0}
comments_list, comments_length = comments[1:], int(comments[0])
if (
int(account_record.currency_value) > 0
): # Build receive money from living space people
if not comments_length > 1:
best_similarity = parse_comment_for_living_space(
iban=account_record.iban,
comment=comments_list[0],
living_space_dict=living_space_dict,
)
best_similarity["send_person_id"] = best_similarity.get("customer_id", None)
return best_similarity
for comment in comments_list:
similarity_result = parse_comment_for_living_space(
iban=account_record.iban,
comment=comment,
living_space_dict=living_space_dict,
)
if float(similarity_result["similarity"]) > float(
best_similarity["similarity"]
):
best_similarity = similarity_result
return best_similarity
else: # Build pays money for service taken from company or individual
if not comments_length > 1:
best_similarity = parse_comment_for_company_or_individual(
comment=comments_list[0]
)
best_similarity["send_person_id"] = best_similarity.get("customer_id", None)
return best_similarity
for comment in comments_list:
similarity_result = parse_comment_for_company_or_individual(comment=comment)
if float(similarity_result["similarity"]) > float(
best_similarity["similarity"]
):
best_similarity = similarity_result
return best_similarity
def parse_comment_with_name_iban_description(account_record: AccountRecords):
comments = parse_comment_to_split_with_star(account_record=account_record)
comments_list, comments_length = comments[1:], int(comments[0])
iban_results = BuildIbanDescription.filter_all(
BuildIbanDescription.iban == account_record.iban, system=True
).data
best_similarity = dict(
company=None, living_space=None, found_from=None, similarity=0.0, garbage=""
)
for comment in comments_list:
for iban_result in iban_results:
search_word = unidecode(iban_result.search_word)
garbage_words = get_garbage_words(comment, search_word)
cleaned_comment = remove_garbage_words(comment, garbage_words)
similarity_ratio = textdistance.jaro_winkler(cleaned_comment, search_word)
company = Companies.filter_by_one(
system=True, id=iban_result.company_id
).data
if float(similarity_ratio) > float(best_similarity["similarity"]):
best_similarity = {
"company": company,
"living_space": None,
"found_from": "Customer Public Name Description",
"similarity": similarity_ratio,
"garbage": garbage_words,
}
return best_similarity
# print('account_record.process_comment', account_record.process_comment)
# test_pattern(
# word=unidecode("no"),
# test_cases=[account_record.process_comment]
# )
# test_pattern(word="daire", test_cases=comments_list)
# sm_dict_extended, sm_dict_digit = {}, {}
# iban_results = BuildIbanDescription.filter_all(
# BuildIbanDescription.iban == iban, system=True
# ).data
# for iban_result in iban_results or []:
# candidate_parts = comment.split(" ")
# extended_candidate_parts, digit_part = [], []
# for part in candidate_parts:
# if part.lower() not in ["no", "daire", "nolu"]:
# extended_candidate_parts.append(part)
# if extended_candidate_parts:
# if all(
# candidate_part.lower() in comment.lower()
# for candidate_part in extended_candidate_parts
# ):
# similarity_ratio = textdistance.jaro_winkler(
# unidecode(str(iban_result.search_word)), comment
# )
# found = False
# name_list = (
# unidecode(str(iban_result.search_word)).replace(".", " ").split(" ")
# )
# for name in name_list:
# if len(name) > 3 and name.lower() in comment.lower():
# found = True
# break
#
# if not found:
# similarity_ratio = 0.1
# sm_dict_extended[f"{iban_result.id}"] = similarity_ratio
# if sm_dict_extended:
# result = sorted(
# sm_dict_extended.items(), key=lambda item: item[1], reverse=True
# )[0]
# if float(result[1]) >= 0.5:
# iban_result = BuildIbanDescription.filter_one(
# BuildIbanDescription.id == int(result[0]), system=True
# ).data
# return {
# "company_id": iban_result.company_id,
# "customer_id": iban_result.customer_id,
# "found_from": "Name",
# "similarity": result[1],
# }
# return {
# "company_id": None,
# "customer_id": None,
# "found_from": None,
# "similarity": 0.0,
# }
#
# def wag_insert_budget_record(data):
# similarity_result = parse_comment_with_name(data["iban"], data["process_comment"])
# build_iban = BuildIbans.find_one(iban=data["iban"])
#
# if payload := InsertBudgetRecord(**data):
# payload_dict = payload.model_dump(exclude_unset=True, exclude_none=True)
# decision_books = BuildDecisionBook.select_only(
# BuildDecisionBook.period_start_date
# < strip_date_to_valid(payload_dict["bank_date"]),
# BuildDecisionBook.period_stop_date
# > strip_date_to_valid(payload_dict["bank_date"]),
# select_args=[BuildDecisionBook.id],
# order_by=[BuildDecisionBook.expiry_ends.desc()],
# )
# payload_dict["build_id"] = getattr(
# BuildIbans.find_one(iban=data["iban"]), "build_id", None
# )
# living_space, count = BuildLivingSpace.find_living_from_customer_id(
# similarity_result.get("customer_id", None),
# strip_date_to_valid(payload_dict["bank_date"]),
# )
# # living_space, count = BuildLivingSpace.filter(
# # or_(
# # BuildLivingSpace.owner_person_id
# # == similarity_result.get("customer_id", None),
# # BuildLivingSpace.life_person_id
# # == similarity_result.get("customer_id", None),
# # ),
# # BuildLivingSpace.start_date
# # < strip_date_to_valid(payload_dict["bank_date"]) - timedelta(days=30),
# # BuildLivingSpace.stop_date
# # > strip_date_to_valid(payload_dict["bank_date"]) + timedelta(days=30),
# # BuildLivingSpace.active == True,
# # BuildLivingSpace.deleted == False,
# # )
# payload_dict["build_decision_book_id"] = (
# decision_books[0][0].id if decision_books else None
# )
# payload_dict["company_id"] = similarity_result.get("company_id", None)
# payload_dict["customer_id"] = similarity_result.get("customer_id", None)
# payload_dict["send_person_id"] = similarity_result.get("send_person_id", None)
#
# payload_dict["build_parts_id"] = (
# living_space[0].build_parts_id if living_space else None
# )
#
# payload_dict["bank_date_y"] = strip_date_to_valid(
# payload_dict["bank_date"]
# ).year
# payload_dict["bank_date_m"] = strip_date_to_valid(
# payload_dict["bank_date"]
# ).month
# payload_dict["bank_date_d"] = strip_date_to_valid(payload_dict["bank_date"]).day
# payload_dict["bank_date_w"] = strip_date_to_valid(
# payload_dict["bank_date"]
# ).isocalendar()[2]
# payload_dict["build_id"] = build_iban.build_id if build_iban else None
# payload_dict["replication_id"] = 55
# payload_dict["receive_debit"] = (
# "R" if payload_dict["currency_value"] < 0 else "D"
# )
# data, found = AccountRecords.find_or_create(
# **payload_dict,
# found_from=similarity_result.get("found_from", None),
# similarity=similarity_result.get("similarity", 0.0),
# )
# data.payment_budget_record_close()
# return data, found