import re import textdistance from unidecode import unidecode from gc import garbage from Schemas import AccountRecords, People, Build, Companies, BuildIbanDescription from regex_func import category_finder from validations import Similarity def parse_comment_to_split_with_star(account_record): # Handle both ORM objects and dictionaries try: # Check if account_record is a dictionary or an ORM object if isinstance(account_record, dict): process_comment = str(account_record.get('process_comment', '')) else: process_comment = str(account_record.process_comment) if "*" in process_comment: process_comment_cleaned = process_comment.replace("**", "*") process_comments = process_comment_cleaned.split("*") return len(process_comments), *process_comments return 1, process_comment except Exception as e: # print(f"Error in parse_comment_to_split_with_star: {e}") # Return a safe default if there's an error return 1, "" def remove_garbage_words(comment: str, garbage_word: str): cleaned_comment = remove_spaces_from_string(comment.replace("*", " ")) if garbage_word: garbage_word = remove_spaces_from_string(garbage_word.replace("*", " ")) for letter in garbage_word.split(" "): cleaned_comment = unidecode(remove_spaces_from_string(cleaned_comment)) cleaned_comment = cleaned_comment.replace(remove_spaces_from_string(letter), "") return str(remove_spaces_from_string(cleaned_comment)).upper() def remove_spaces_from_string(remove_string: str): letter_list = [] for letter in remove_string.split(" "): if letter_ := "".join(i for i in letter if not i == " "): letter_list.append(letter_) return " ".join(letter_list).upper() def get_garbage_words(comment: str, search_word: str): garbage_words = unidecode(remove_spaces_from_string(comment)) search_word = unidecode(remove_spaces_from_string(search_word)) for word in search_word.split(" "): garbage_words = garbage_words.replace(remove_spaces_from_string(unidecode(word)), "") if cleaned_from_spaces := remove_spaces_from_string(garbage_words): return str(unidecode(cleaned_from_spaces)).upper() return None def parse_comment_with_name_iban_description(account_record): # Extract necessary data from account_record to avoid session detachment if isinstance(account_record, dict): iban = account_record.get('iban', '') process_comment = account_record.get('process_comment', '') else: try: iban = account_record.iban process_comment = account_record.process_comment except Exception as e: # print(f"Error accessing account_record attributes: {e}") return Similarity(similarity=0.0, garbage="", cleaned="") # Process the comment locally without depending on the account_record object if "*" in process_comment: process_comment_cleaned = str(process_comment.replace("**", "*")) process_comments = process_comment_cleaned.split("*") comments_list, comments_length = process_comments, len(process_comments) else: comments_list, comments_length = [process_comment], 1 # print("comments_list", comments_list, "comments_length", comments_length) with BuildIbanDescription.new_session() as session: BuildIbanDescription.set_session(session) Companies.set_session(session) iban_results = BuildIbanDescription.query.filter(BuildIbanDescription.iban == iban).all() best_similarity = Similarity(similarity=0.0, garbage="", cleaned="") for comment in comments_list: for iban_result in iban_results: search_word = unidecode(iban_result.search_word) garbage_words = get_garbage_words(comment, search_word) cleaned_comment = remove_garbage_words(comment, garbage_words) similarity_ratio = textdistance.jaro_winkler(cleaned_comment, search_word) company = Companies.query.filter_by(id=iban_result.company_id).first() if float(similarity_ratio) > float(best_similarity.similarity): best_similarity = Similarity(similarity=similarity_ratio, garbage=garbage_words, cleaned=cleaned_comment) best_similarity.set_company(company) best_similarity.set_found_from("Customer Public Name Description") return best_similarity def parse_comment_for_build_parts(comment: str, max_build_part: int = 200, parse: str = "DAIRE"): results, results_list = category_finder(comment), [] # print("results[parse]", results[parse]) for result in results[parse] or []: if digits := "".join([letter for letter in str(result) if letter.isdigit()]): # print("digits", digits) if int(digits) <= int(max_build_part): results_list.append(int(digits)) return results_list or None def parse_comment_with_name(account_record, living_space_dict: dict = None): # Extract necessary data from account_record to avoid session detachment if isinstance(account_record, dict): iban = account_record.get('iban', '') process_comment = account_record.get('process_comment', '') try: currency_value = int(account_record.get('currency_value', 0)) except (ValueError, TypeError): currency_value = 0 else: try: iban = account_record.iban process_comment = account_record.process_comment currency_value = int(account_record.currency_value) except Exception as e: # print(f"Error accessing account_record attributes: {e}") return Similarity(similarity=0.0, garbage="", cleaned="") # Process the comment locally without depending on the account_record object if "*" in process_comment: process_comment_cleaned = str(process_comment.replace("**", "*")) process_comments = process_comment_cleaned.split("*") comments_list, comments_length = process_comments, len(process_comments) else: comments_list, comments_length = [process_comment], 1 # print("comments_list", comments_list, "comments_length", comments_length) best_similarity = Similarity(similarity=0.0, garbage="", cleaned="") if currency_value > 0: # Build receive money from living space people living_space_matches = dict(living_space_dict=living_space_dict, iban=iban, whole_comment=process_comment) if comments_length == 1: best_similarity = parse_comment_for_living_space(iban=iban, comment=comments_list[0], living_space_dict=living_space_dict) best_similarity.set_send_person_id(best_similarity.customer_id) living_space_matches["best_similarity"] = best_similarity # if 0.5 < float(best_similarity['similarity']) < 0.8 best_similarity = check_build_living_space_matches_with_build_parts(**living_space_matches) return best_similarity for comment in comments_list: similarity_result = parse_comment_for_living_space(iban=iban, comment=comment, living_space_dict=living_space_dict) if float(similarity_result.similarity) > float(best_similarity.similarity): best_similarity = similarity_result living_space_matches["best_similarity"] = best_similarity # if 0.5 < float(best_similarity['similarity']) < 0.8: best_similarity = check_build_living_space_matches_with_build_parts(**living_space_matches) # print("last best_similarity", best_similarity) return best_similarity else: # Build pays money for service taken from company or individual if not comments_length > 1: best_similarity = parse_comment_for_company_or_individual(comment=comments_list[0]) best_similarity.set_send_person_id(best_similarity.customer_id) return best_similarity for comment in comments_list: similarity_result = parse_comment_for_company_or_individual(comment=comment) if float(similarity_result.similarity) > float(best_similarity.similarity): best_similarity = similarity_result return best_similarity def check_build_living_space_matches_with_build_parts(living_space_dict: dict, best_similarity: Similarity, iban: str, whole_comment: str): if 0.6 < float(best_similarity.similarity) < 0.8: build_parts_data = living_space_dict[iban]["build_parts"] # Check if we have living space ID in the similarity object living_space_id = getattr(best_similarity, 'living_space_id', None) if living_space_id: # Find the corresponding living space data living_space_data = None for ls in living_space_dict[iban]["living_space"]: if ls.get('id') == living_space_id: living_space_data = ls break if living_space_data: build_parts_id = living_space_data.get('build_parts_id') parser_dict = dict(comment=str(whole_comment), max_build_part=len(build_parts_data)) # print("build parts similarity", best_similarity, "parser_dict", parser_dict) results_list = parse_comment_for_build_parts(**parser_dict) # print("results_list", results_list) if not results_list: return best_similarity for build_part_data in build_parts_data: # Get part_no directly if it exists in the dictionary part_no = build_part_data.get('part_no') # If part_no doesn't exist, try to extract it from other attributes if part_no is None: # Try to get it from a name attribute if it exists name = build_part_data.get('name', '') if name and isinstance(name, str) and 'part' in name.lower(): try: part_no = int(name.lower().replace('part', '').strip()) except (ValueError, TypeError): pass # If we have a part_no, proceed with the comparison if part_no is not None: # print("part_no", part_no, " | ", results_list) # print("build_part", build_part_data.get('id'), build_parts_id) # print("cond", build_part_data.get('id') == build_parts_id) # print("cond2", part_no in results_list) if build_part_data.get('id') == build_parts_id and part_no in results_list: similarity = float(best_similarity.similarity) best_similarity.set_similarity((1 - similarity) / 2 + similarity) # print("similarity", best_similarity.similarity) break return best_similarity def parse_comment_for_company_or_individual(comment: str): # Extract all necessary data from Companies within the session companies_data = [] with Companies.new_session() as session: Companies.set_session(session) companies_list = Companies.query.filter(Companies.commercial_type != "Commercial").all() # Extract all needed data from companies while session is active for company in companies_list: company_data = { 'id': company.id, 'public_name': unidecode(company.public_name) } # Add any other needed attributes if hasattr(company, 'commercial_type'): company_data['commercial_type'] = company.commercial_type companies_data.append(company_data) # Process the data outside the session comment = unidecode(comment) best_similarity = Similarity(similarity=0.0, garbage="", cleaned="") for company_data in companies_data: search_word = company_data['public_name'] garbage_words = get_garbage_words(comment, search_word) cleaned_comment = remove_garbage_words(comment, garbage_words) similarity_ratio = textdistance.jaro_winkler(cleaned_comment, search_word) if similarity_ratio > float(best_similarity.similarity): best_similarity = Similarity(similarity=similarity_ratio, garbage=garbage_words, cleaned=cleaned_comment) # Store company ID instead of the ORM object best_similarity.set_company_id(company_data['id']) best_similarity.set_found_from("Customer Public Name") # print('cleaned_comment', cleaned_comment, '\n', 'search_word', search_word, '\n', 'best_similarity', best_similarity, '\n', # 'company name', company_data['public_name'], '\n', 'similarity_ratio', similarity_ratio, '\n', 'garbage_words', garbage_words) return best_similarity def parse_comment_for_living_space(iban: str, comment: str, living_space_dict: dict = None) -> Similarity: comment = unidecode(comment) best_similarity = Similarity(similarity=0.0, garbage="", cleaned="") if not iban in living_space_dict: return best_similarity for person_data in living_space_dict[iban]["people"]: # Extract name components from dictionary first_name = unidecode(person_data.get('name', '')).upper() last_name = unidecode(person_data.get('surname', '')).upper() search_word_list = [ remove_spaces_from_string("".join([f"{first_name} {last_name}"])), remove_spaces_from_string("".join([f"{last_name} {first_name}"])), ] # We don't have middle_name in our dictionary, so skip that part cleaned_comment = unidecode(comment).upper() for search_word in search_word_list: if garbage_words := get_garbage_words(comment, unidecode(search_word)): garbage_words = unidecode(garbage_words).upper() cleaned_comment = unidecode(remove_garbage_words(comment, garbage_words)).upper() similarity_ratio = textdistance.jaro_winkler(cleaned_comment, str(search_word).upper()) if len(cleaned_comment) < len(f"{first_name}{last_name}"): continue if cleaned_comment and 0.9 < similarity_ratio <= 1: pass # print("cleaned comment dict", dict( # garbage=garbage_words, cleaned=cleaned_comment, similarity=similarity_ratio, # search_word=search_word, comment=comment, last_similarity=float(best_similarity.similarity)) # ) if similarity_ratio > float(best_similarity.similarity): # Use person_id from the dictionary data person_id = person_data['id'] for living_space_data in living_space_dict[iban]["living_space"]: if living_space_data.get('person_id') == person_id: # Create a dictionary with living space data living_space_info = { 'id': living_space_data.get('id'), 'build_parts_id': living_space_data.get('build_parts_id'), 'name': living_space_data.get('name') } best_similarity.set_living_space_id(living_space_data.get('id')) best_similarity.set_found_from("Person Name") best_similarity.set_similarity(similarity_ratio) best_similarity.set_garbage(garbage_words) best_similarity.set_cleaned(cleaned_comment) best_similarity.set_customer_id(person_data['id']) # Find matching build part build_parts_id = living_space_data.get('build_parts_id') for build_part_data in living_space_dict[iban]["build_parts"]: if build_part_data.get('id') == build_parts_id: best_similarity.set_build_part_id(build_part_data.get('id')) break return best_similarity