from service_app_test.api_configs import BothAPIS from service_app_test.test_application.evyos.address_building import post_code_dict from service_app_test.test_application.migrate_old_data.get_occupants_codes import ( get_occupants_types, ) from service_app_test.test_application.migrate_old_data.reader_and_alchemy_bulk_actions import ( read_json_file, ) from api_validations.validations_request import ( InsertBuild, InsertBuildArea, InsertBuildParts, InsertBuildLivingSpace, ) requester_dict_build = lambda data: {"endpoint": "/building/build/create", "data": data} requester_dict_build_area = lambda data: { "endpoint": "/building/area/create", "data": data, } requester_dict_build_part = lambda data: { "endpoint": "/building/parts/create", "data": data, } requester_dict_build_iban = lambda data: { "endpoint": "/building/iban/create", "data": data, } requester_dict_build_living_space = lambda data: { "endpoint": "/building/living_space/create", "data": data, } def get_build_from_json(): read_files_json, with_pydantic = read_json_file(json_file="build"), [] read_files = read_files_json.get("build") for row in read_files: pydantic_row = InsertBuild(**row) with_pydantic.append(pydantic_row.model_dump()) if not with_pydantic: raise Exception("No data found") return with_pydantic def get_build_area_from_json(): read_files_json, with_pydantic = read_json_file(json_file="build_area"), [] read_files = read_files_json.get("build_area") for row in read_files: pydantic_row = InsertBuildArea(**row) with_pydantic.append(pydantic_row.model_dump()) if not with_pydantic: raise Exception("No data found") return with_pydantic def get_build_part_from_json(build_part_type_uu_id: str, part_direction_uu_id: str): read_files_json, with_pydantic = read_json_file(json_file="build_parts"), [] read_files = read_files_json.get("build_parts") for row in read_files: row["build_part_type_uu_id"] = build_part_type_uu_id row["part_direction_uu_id"] = part_direction_uu_id row["ref_id"] = row.get("uu_id") pydantic_row = InsertBuildParts(**row) print(f"get buildpart row {pydantic_row.dump()}") model_dump = pydantic_row.dump() with_pydantic.append(model_dump) if not with_pydantic: raise Exception("No data found") return with_pydantic def get_build_iban_from_json(): read_files_json, with_pydantic = read_json_file(json_file="build_iban"), [] read_files = read_files_json.get("build_iban") for row in read_files: pydantic_row = InsertBuildParts(**row) with_pydantic.append(pydantic_row.model_dump()) if not with_pydantic: raise Exception("No data found") return with_pydantic address_dict = lambda post_code_uu_id: { "post_code_uu_id": post_code_uu_id, "comment_address": "Reşat Nuri Cad No 11", "letter_address": "Reşat Nuri Cad No 11 ÇANKAYA ANKARA TÜRKİYE", "short_letter_address": "Reşat Nuri Cad No 11 ÇANKAYA ANKARA TÜRKİYE", "build_number": "11", } def get_build_living_space_from_json(): read_files_json, with_pydantic = read_json_file(json_file="build_living_space"), [] read_files = read_files_json.get("build_living_space") for row in read_files: pydantic_row = InsertBuildLivingSpace(**row) with_pydantic.append(pydantic_row.excluded_dump()) if not with_pydantic: raise Exception("No data found") return with_pydantic def create_address(requester: BothAPIS, post_code): post_code_response = requester.local_api.post( endpoint="/postcode/create", data=post_code, ) print("post_code_response", post_code_response.text) response_post_code = post_code_response.json() response_post_code_uu_id = response_post_code["data"]["uu_id"] return response_post_code_uu_id def search_street(search_text, requester): response = requester.local_api.post( endpoint="/address/search", data={ "search": search_text, "list_options": { "page": 1, "size": 30, }, }, ) response_object = response.json() print("text", response.text) print("response_object", response_object) for street_data in response_object.get("data"): condition_of_street = ( street_data["AddressCity.city_name"] == "ANKARA" and street_data["AddressDistrict.district_name"] == "ÇANKAYA" ) if condition_of_street: print("street_uu_id", street_data["AddressStreet.uu_id"]) return street_data["AddressStreet.uu_id"] def create_addresses(address, requester): address_response = requester.local_api.post( endpoint="/address/create", data=address, ) print("text", address_response.text) response_address = address_response.json() print("json", response_address) return response_address # def grab_new_build_uu_id(requester: BothAPIS, build_uu_id: str): # response_wag = requester.wag_api.post( # endpoint="/building/build/list", # data={"page": 1, "size": 1, "query": {"uu_id": build_uu_id}}, # ) # build_uu_id = response_wag.json()["data"]["uu_id"] # if not build_uu_id: # raise Exception("Build UU ID not found") # return build_uu_id def migrate_build_area(requester: BothAPIS, build_uu_id: str): # build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id) response_datas = get_build_area_from_json() for response_data in response_datas: response_data["build_uu_id"] = build_uu_id print("response_data", response_data) response = requester.local_api.post( **requester_dict_build_area(data=response_data) ) print("response", response.text) return def migrate_build_part( requester: BothAPIS, build_uu_id: str, build_part_type_uu_id: str, part_direction_uu_id: str, ): # build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id) response_datas = get_build_part_from_json( build_part_type_uu_id=build_part_type_uu_id, part_direction_uu_id=part_direction_uu_id, ) for response_data in response_datas: if response_data.get("part_no") == 0: continue response_data["build_uu_id"] = build_uu_id response = requester.local_api.post( **requester_dict_build_part(data=response_data) ) print("response", response.text) return def migrate_build_iban(requester: BothAPIS, build_uu_id: str): # build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id) response_datas = get_build_iban_from_json() for response_data in response_datas: response_data["build_uu_id"] = build_uu_id response = requester.local_api.post( **requester_dict_build_part(data=response_data) ) print("response", response.text) return def migrate_build_living_space(requester: BothAPIS): response_datas = get_build_living_space_from_json() for response_data in response_datas: print("b response_data[person_uu_id]", response_data["person_uu_id"]) response = requester.local_api.post( endpoint="/people/list", data={ "page": 1, "size": 1, "query": {"ref_id": response_data["person_uu_id"]}, }, ) print("/people/list response", response.text) response_data["person_uu_id"] = response.json()["data"][0]["uu_id"] print("a response_data[person_uu_id]", response_data["person_uu_id"]) response = requester.local_api.post( endpoint="/building/parts/list", data={ "page": 1, "size": 30, "query": {"ref_id": response_data.get("build_parts_uu_id")}, }, ) print("/building/parts/list response", response.text) response_data["build_parts_uu_id"] = response.json()["data"][0]["uu_id"] flat_owner = "b9392bef-32cb-4c7d-b99f-5f613c2e2120" flat_tenants = "a40aea36-0dce-48cc-9334-2e776ba22a49" if response_data.get("occupant_type_uu_id") == flat_owner: response = get_occupants_types( occupant_code="FL-OWN", requester=requester.local_api ) response_data["occupant_type_uu_id"] = response["data"]["uu_id"] elif response_data.get("occupant_type_uu_id") == flat_tenants: response = get_occupants_types( occupant_code="FL-TEN", requester=requester.local_api ) response_data["occupant_type_uu_id"] = response["data"]["uu_id"] else: response = get_occupants_types( occupant_code="FL-RES", requester=requester.local_api ) response_data["occupant_type_uu_id"] = response["data"]["uu_id"] response = requester.local_api.post( **requester_dict_build_living_space(data=response_data) ) print("migrate_build_living_space", response.text) return def migrate_build(requester: BothAPIS): from service_app_test.test_application.migrate_old_data.get_building_types import ( list_building_types, ) from service_app_test.test_application.migrate_old_data.get_type_codes import ( get_type_codes_key_and_class, ) building_type_flat = list_building_types( requester=requester.local_api, type_code="daire" ).get("uu_id") building_type_build = list_building_types( requester=requester.local_api, type_code="apt" ).get("uu_id") api_enum_dropdown_nn = get_type_codes_key_and_class( class_name="Directions", key_name="NN", requester=requester.local_api ) api_enum_dropdown_nn_uuid = api_enum_dropdown_nn.get("data").get("uu_id") street_uu_id = search_street("Reşat Nuri", requester=requester) post_code_uuid = create_address( requester=requester, post_code=post_code_dict(uu_id_street=street_uu_id) ) created_address = create_addresses( address=address_dict(post_code_uu_id=post_code_uuid), requester=requester ) created_address_uu_id = created_address["data"]["uu_id"] for response_data in get_build_from_json(): response_data["address_uu_id"] = created_address_uu_id response_data["build_types_uu_id"] = building_type_build response = requester.local_api.post(**requester_dict_build(data=response_data)) response_json = response.json() build_uu_id = str(response_json.get("data").get("uu_id")) migrate_build_area(requester=requester, build_uu_id=build_uu_id) migrate_build_part( requester=requester, build_uu_id=build_uu_id, build_part_type_uu_id=building_type_flat, part_direction_uu_id=api_enum_dropdown_nn_uuid, ) # migrate_build_iban(requester=requester, build_uu_id=build_uu_id) return