production-evyos-systems-an.../trash/XBuildLivingSpace/parser.py

320 lines
16 KiB
Python

import re
import textdistance
from unidecode import unidecode
from gc import garbage
from Schemas import AccountRecords, People, Build, Companies, BuildIbanDescription
from regex_func import category_finder
from validations import Similarity
def parse_comment_to_split_with_star(account_record):
# Handle both ORM objects and dictionaries
try:
# Check if account_record is a dictionary or an ORM object
if isinstance(account_record, dict):
process_comment = str(account_record.get('process_comment', ''))
else:
process_comment = str(account_record.process_comment)
if "*" in process_comment:
process_comment_cleaned = process_comment.replace("**", "*")
process_comments = process_comment_cleaned.split("*")
return len(process_comments), *process_comments
return 1, process_comment
except Exception as e:
# print(f"Error in parse_comment_to_split_with_star: {e}")
# Return a safe default if there's an error
return 1, ""
def remove_garbage_words(comment: str, garbage_word: str):
cleaned_comment = remove_spaces_from_string(comment.replace("*", " "))
if garbage_word:
garbage_word = remove_spaces_from_string(garbage_word.replace("*", " "))
for letter in garbage_word.split(" "):
cleaned_comment = unidecode(remove_spaces_from_string(cleaned_comment))
cleaned_comment = cleaned_comment.replace(remove_spaces_from_string(letter), "")
return str(remove_spaces_from_string(cleaned_comment)).upper()
def remove_spaces_from_string(remove_string: str):
letter_list = []
for letter in remove_string.split(" "):
if letter_ := "".join(i for i in letter if not i == " "):
letter_list.append(letter_)
return " ".join(letter_list).upper()
def get_garbage_words(comment: str, search_word: str):
garbage_words = unidecode(remove_spaces_from_string(comment))
search_word = unidecode(remove_spaces_from_string(search_word))
for word in search_word.split(" "):
garbage_words = garbage_words.replace(remove_spaces_from_string(unidecode(word)), "")
if cleaned_from_spaces := remove_spaces_from_string(garbage_words):
return str(unidecode(cleaned_from_spaces)).upper()
return None
def parse_comment_with_name_iban_description(account_record):
# Extract necessary data from account_record to avoid session detachment
if isinstance(account_record, dict):
iban = account_record.get('iban', '')
process_comment = account_record.get('process_comment', '')
else:
try:
iban = account_record.iban
process_comment = account_record.process_comment
except Exception as e:
# print(f"Error accessing account_record attributes: {e}")
return Similarity(similarity=0.0, garbage="", cleaned="")
# Process the comment locally without depending on the account_record object
if "*" in process_comment:
process_comment_cleaned = str(process_comment.replace("**", "*"))
process_comments = process_comment_cleaned.split("*")
comments_list, comments_length = process_comments, len(process_comments)
else:
comments_list, comments_length = [process_comment], 1
# print("comments_list", comments_list, "comments_length", comments_length)
with BuildIbanDescription.new_session() as session:
BuildIbanDescription.set_session(session)
Companies.set_session(session)
iban_results = BuildIbanDescription.query.filter(BuildIbanDescription.iban == iban).all()
best_similarity = Similarity(similarity=0.0, garbage="", cleaned="")
for comment in comments_list:
for iban_result in iban_results:
search_word = unidecode(iban_result.search_word)
garbage_words = get_garbage_words(comment, search_word)
cleaned_comment = remove_garbage_words(comment, garbage_words)
similarity_ratio = textdistance.jaro_winkler(cleaned_comment, search_word)
company = Companies.query.filter_by(id=iban_result.company_id).first()
if float(similarity_ratio) > float(best_similarity.similarity):
best_similarity = Similarity(similarity=similarity_ratio, garbage=garbage_words, cleaned=cleaned_comment)
best_similarity.set_company(company)
best_similarity.set_found_from("Customer Public Name Description")
return best_similarity
def parse_comment_for_build_parts(comment: str, max_build_part: int = 200, parse: str = "DAIRE"):
results, results_list = category_finder(comment), []
# print("results[parse]", results[parse])
for result in results[parse] or []:
if digits := "".join([letter for letter in str(result) if letter.isdigit()]):
# print("digits", digits)
if int(digits) <= int(max_build_part):
results_list.append(int(digits))
return results_list or None
def parse_comment_with_name(account_record, living_space_dict: dict = None):
# Extract necessary data from account_record to avoid session detachment
if isinstance(account_record, dict):
iban = account_record.get('iban', '')
process_comment = account_record.get('process_comment', '')
try:
currency_value = int(account_record.get('currency_value', 0))
except (ValueError, TypeError):
currency_value = 0
else:
try:
iban = account_record.iban
process_comment = account_record.process_comment
currency_value = int(account_record.currency_value)
except Exception as e:
# print(f"Error accessing account_record attributes: {e}")
return Similarity(similarity=0.0, garbage="", cleaned="")
# Process the comment locally without depending on the account_record object
if "*" in process_comment:
process_comment_cleaned = str(process_comment.replace("**", "*"))
process_comments = process_comment_cleaned.split("*")
comments_list, comments_length = process_comments, len(process_comments)
else:
comments_list, comments_length = [process_comment], 1
# print("comments_list", comments_list, "comments_length", comments_length)
best_similarity = Similarity(similarity=0.0, garbage="", cleaned="")
if currency_value > 0: # Build receive money from living space people
living_space_matches = dict(living_space_dict=living_space_dict, iban=iban, whole_comment=process_comment)
if comments_length == 1:
best_similarity = parse_comment_for_living_space(iban=iban, comment=comments_list[0], living_space_dict=living_space_dict)
best_similarity.set_send_person_id(best_similarity.customer_id)
living_space_matches["best_similarity"] = best_similarity
# if 0.5 < float(best_similarity['similarity']) < 0.8
best_similarity = check_build_living_space_matches_with_build_parts(**living_space_matches)
return best_similarity
for comment in comments_list:
similarity_result = parse_comment_for_living_space(iban=iban, comment=comment, living_space_dict=living_space_dict)
if float(similarity_result.similarity) > float(best_similarity.similarity):
best_similarity = similarity_result
living_space_matches["best_similarity"] = best_similarity
# if 0.5 < float(best_similarity['similarity']) < 0.8:
best_similarity = check_build_living_space_matches_with_build_parts(**living_space_matches)
# print("last best_similarity", best_similarity)
return best_similarity
else: # Build pays money for service taken from company or individual
if not comments_length > 1:
best_similarity = parse_comment_for_company_or_individual(comment=comments_list[0])
best_similarity.set_send_person_id(best_similarity.customer_id)
return best_similarity
for comment in comments_list:
similarity_result = parse_comment_for_company_or_individual(comment=comment)
if float(similarity_result.similarity) > float(best_similarity.similarity):
best_similarity = similarity_result
return best_similarity
def check_build_living_space_matches_with_build_parts(living_space_dict: dict, best_similarity: Similarity, iban: str, whole_comment: str):
if 0.6 < float(best_similarity.similarity) < 0.8:
build_parts_data = living_space_dict[iban]["build_parts"]
# Check if we have living space ID in the similarity object
living_space_id = getattr(best_similarity, 'living_space_id', None)
if living_space_id:
# Find the corresponding living space data
living_space_data = None
for ls in living_space_dict[iban]["living_space"]:
if ls.get('id') == living_space_id:
living_space_data = ls
break
if living_space_data:
build_parts_id = living_space_data.get('build_parts_id')
parser_dict = dict(comment=str(whole_comment), max_build_part=len(build_parts_data))
# print("build parts similarity", best_similarity, "parser_dict", parser_dict)
results_list = parse_comment_for_build_parts(**parser_dict)
# print("results_list", results_list)
if not results_list:
return best_similarity
for build_part_data in build_parts_data:
# Get part_no directly if it exists in the dictionary
part_no = build_part_data.get('part_no')
# If part_no doesn't exist, try to extract it from other attributes
if part_no is None:
# Try to get it from a name attribute if it exists
name = build_part_data.get('name', '')
if name and isinstance(name, str) and 'part' in name.lower():
try:
part_no = int(name.lower().replace('part', '').strip())
except (ValueError, TypeError):
pass
# If we have a part_no, proceed with the comparison
if part_no is not None:
# print("part_no", part_no, " | ", results_list)
# print("build_part", build_part_data.get('id'), build_parts_id)
# print("cond", build_part_data.get('id') == build_parts_id)
# print("cond2", part_no in results_list)
if build_part_data.get('id') == build_parts_id and part_no in results_list:
similarity = float(best_similarity.similarity)
best_similarity.set_similarity((1 - similarity) / 2 + similarity)
# print("similarity", best_similarity.similarity)
break
return best_similarity
def parse_comment_for_company_or_individual(comment: str):
# Extract all necessary data from Companies within the session
companies_data = []
with Companies.new_session() as session:
Companies.set_session(session)
companies_list = Companies.query.filter(Companies.commercial_type != "Commercial").all()
# Extract all needed data from companies while session is active
for company in companies_list:
company_data = {
'id': company.id,
'public_name': unidecode(company.public_name)
}
# Add any other needed attributes
if hasattr(company, 'commercial_type'):
company_data['commercial_type'] = company.commercial_type
companies_data.append(company_data)
# Process the data outside the session
comment = unidecode(comment)
best_similarity = Similarity(similarity=0.0, garbage="", cleaned="")
for company_data in companies_data:
search_word = company_data['public_name']
garbage_words = get_garbage_words(comment, search_word)
cleaned_comment = remove_garbage_words(comment, garbage_words)
similarity_ratio = textdistance.jaro_winkler(cleaned_comment, search_word)
if similarity_ratio > float(best_similarity.similarity):
best_similarity = Similarity(similarity=similarity_ratio, garbage=garbage_words, cleaned=cleaned_comment)
# Store company ID instead of the ORM object
best_similarity.set_company_id(company_data['id'])
best_similarity.set_found_from("Customer Public Name")
# print('cleaned_comment', cleaned_comment, '\n', 'search_word', search_word, '\n', 'best_similarity', best_similarity, '\n',
# 'company name', company_data['public_name'], '\n', 'similarity_ratio', similarity_ratio, '\n', 'garbage_words', garbage_words)
return best_similarity
def parse_comment_for_living_space(iban: str, comment: str, living_space_dict: dict = None) -> Similarity:
comment = unidecode(comment)
best_similarity = Similarity(similarity=0.0, garbage="", cleaned="")
if not iban in living_space_dict:
return best_similarity
for person_data in living_space_dict[iban]["people"]:
# Extract name components from dictionary
first_name = unidecode(person_data.get('name', '')).upper()
last_name = unidecode(person_data.get('surname', '')).upper()
search_word_list = [
remove_spaces_from_string("".join([f"{first_name} {last_name}"])),
remove_spaces_from_string("".join([f"{last_name} {first_name}"])),
]
# We don't have middle_name in our dictionary, so skip that part
cleaned_comment = unidecode(comment).upper()
for search_word in search_word_list:
if garbage_words := get_garbage_words(comment, unidecode(search_word)):
garbage_words = unidecode(garbage_words).upper()
cleaned_comment = unidecode(remove_garbage_words(comment, garbage_words)).upper()
similarity_ratio = textdistance.jaro_winkler(cleaned_comment, str(search_word).upper())
if len(cleaned_comment) < len(f"{first_name}{last_name}"):
continue
if cleaned_comment and 0.9 < similarity_ratio <= 1:
pass
# print("cleaned comment dict", dict(
# garbage=garbage_words, cleaned=cleaned_comment, similarity=similarity_ratio,
# search_word=search_word, comment=comment, last_similarity=float(best_similarity.similarity))
# )
if similarity_ratio > float(best_similarity.similarity):
# Use person_id from the dictionary data
person_id = person_data['id']
for living_space_data in living_space_dict[iban]["living_space"]:
if living_space_data.get('person_id') == person_id:
# Create a dictionary with living space data
living_space_info = {
'id': living_space_data.get('id'),
'build_parts_id': living_space_data.get('build_parts_id'),
'name': living_space_data.get('name')
}
best_similarity.set_living_space_id(living_space_data.get('id'))
best_similarity.set_found_from("Person Name")
best_similarity.set_similarity(similarity_ratio)
best_similarity.set_garbage(garbage_words)
best_similarity.set_cleaned(cleaned_comment)
best_similarity.set_customer_id(person_data['id'])
# Find matching build part
build_parts_id = living_space_data.get('build_parts_id')
for build_part_data in living_space_dict[iban]["build_parts"]:
if build_part_data.get('id') == build_parts_id:
best_similarity.set_build_part_id(build_part_data.get('id'))
break
return best_similarity