from service_app_test.api_configs import BothAPIS from service_app_test.test_application.evyos.address_building import post_code_dict from service_app_test.test_application.migrate_old_data.reader_and_alchemy_bulk_actions import ( read_json_file, ) from api_validations.validations_request import ( InsertBuild, InsertBuildArea, InsertBuildParts, InsertBuildLivingSpace, ) requester_dict_build = lambda data: {"endpoint": "/building/build/create", "data": data} requester_dict_build_area = lambda data: { "endpoint": "/building/area/create", "data": data, } requester_dict_build_part = lambda data: { "endpoint": "/building/parts/create", "data": data, } requester_dict_build_iban = lambda data: { "endpoint": "/building/iban/create", "data": data, } requester_dict_build_living_space = lambda data: { "endpoint": "/building/living_space/create", "data": data, } def get_build_from_json(): read_files_json, with_pydantic = read_json_file(json_file="build"), [] read_files = read_files_json.get("build") for row in read_files: pydantic_row = InsertBuild(**row) with_pydantic.append(pydantic_row.model_dump()) if not with_pydantic: raise Exception("No data found") return with_pydantic def get_build_area_from_json(): read_files_json, with_pydantic = read_json_file(json_file="build_area"), [] read_files = read_files_json.get("build_area") for row in read_files: pydantic_row = InsertBuildArea(**row) with_pydantic.append(pydantic_row.model_dump()) if not with_pydantic: raise Exception("No data found") return with_pydantic def get_build_part_from_json(build_part_type_uu_id: str, part_direction_uu_id: str): read_files_json, with_pydantic = read_json_file(json_file="build_parts"), [] read_files = read_files_json.get("build_parts") for row in read_files: row["build_part_type_uu_id"] = build_part_type_uu_id row["part_direction_uu_id"] = part_direction_uu_id pydantic_row = InsertBuildParts(**row) with_pydantic.append(pydantic_row.model_dump()) if not with_pydantic: raise Exception("No data found") return with_pydantic def get_build_iban_from_json(): read_files_json, with_pydantic = read_json_file(json_file="build_iban"), [] read_files = read_files_json.get("build_iban") for row in read_files: pydantic_row = InsertBuildParts(**row) with_pydantic.append(pydantic_row.model_dump()) if not with_pydantic: raise Exception("No data found") return with_pydantic address_dict = lambda post_code_uu_id: { "post_code_uu_id": post_code_uu_id, "comment_address": "Reşat Nuri Cad No 11", "letter_address": "Reşat Nuri Cad No 11 ÇANKAYA ANKARA TÜRKİYE", "short_letter_address": "Reşat Nuri Cad No 11 ÇANKAYA ANKARA TÜRKİYE", "build_number": "11", } def get_build_living_space_from_json(): read_files_json, with_pydantic = read_json_file(json_file="build_living_space"), [] read_files = read_files_json.get("build_living_space") for row in read_files: pydantic_row = InsertBuildLivingSpace(**row) with_pydantic.append(pydantic_row.model_dump()) if not with_pydantic: raise Exception("No data found") return with_pydantic def create_address(requester: BothAPIS, post_code): post_code_response = requester.local_api.post( endpoint="/postcode/create", data=post_code, ) print("post_code_response", post_code_response.text) response_post_code = post_code_response.json() response_post_code_uu_id = response_post_code["data"]["uu_id"] requester.local_api.post( endpoint=f"/postcode/update/{response_post_code_uu_id}", data={ "is_confirmed": True, }, ) return response_post_code_uu_id def search_street(search_text, requester): response = requester.local_api.post( endpoint="/address/search", data={ "search": search_text, "list_options": { "page": 1, "size": 30, }, }, ) response_object = response.json() print("text", response.text) print("response_object", response_object) for street_data in response_object.get("data"): condition_of_street = ( street_data["AddressCity.city_name"] == "ANKARA" and street_data["AddressDistrict.district_name"] == "ÇANKAYA" ) if condition_of_street: print("street_uu_id", street_data["AddressStreet.uu_id"]) return street_data["AddressStreet.uu_id"] def create_addresses(address, requester): address_response = requester.local_api.post( endpoint="/address/create", data=address, ) print("text", address_response.text) response_address = address_response.json() print("json", response_address) return response_address # def grab_new_build_uu_id(requester: BothAPIS, build_uu_id: str): # response_wag = requester.wag_api.post( # endpoint="/building/build/list", # data={"page": 1, "size": 1, "query": {"uu_id": build_uu_id}}, # ) # build_uu_id = response_wag.json()["data"]["uu_id"] # if not build_uu_id: # raise Exception("Build UU ID not found") # return build_uu_id def migrate_build_area(requester: BothAPIS, build_uu_id: str): # build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id) response_datas = get_build_area_from_json() for response_data in response_datas: response_data["build_uu_id"] = build_uu_id print("response_data", response_data) response = requester.local_api.post( **requester_dict_build_area(data=response_data) ) print("response", response.text) return def migrate_build_part( requester: BothAPIS, build_uu_id: str, build_part_type_uu_id: str, part_direction_uu_id: str ): # build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id) response_datas = get_build_part_from_json( build_part_type_uu_id=build_part_type_uu_id, part_direction_uu_id=part_direction_uu_id ) for response_data in response_datas: if response_data.get('part_no') == 0: continue response_data["build_uu_id"] = build_uu_id response = requester.local_api.post( **requester_dict_build_part(data=response_data) ) print("response", response.text) return def migrate_build_iban(requester: BothAPIS, build_uu_id: str): # build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id) response_datas = get_build_iban_from_json() for response_data in response_datas: response_data["build_uu_id"] = build_uu_id response = requester.local_api.post( **requester_dict_build_part(data=response_data) ) print("response", response.text) return def migrate_build_living_space(requester: BothAPIS, build_uu_id: str): # build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id) response_datas = get_build_iban_from_json() for response_data in response_datas: response_data["build_uu_id"] = build_uu_id response = requester.local_api.post( **requester_dict_build_living_space(data=response_data) ) print("response", response.text) return def migrate_build(requester: BothAPIS): from service_app_test.test_application.migrate_old_data.get_building_types import ( list_building_types, ) from service_app_test.test_application.migrate_old_data.get_type_codes import ( get_type_codes_key_and_class ) building_type_flat = list_building_types(requester=requester.local_api, type_code="daire").get("uu_id") building_type_build = list_building_types(requester=requester.local_api, type_code="apt").get("uu_id") api_enum_dropdown_nn = get_type_codes_key_and_class( class_name="Directions", key_name="NN", requester=requester.local_api ) api_enum_dropdown_nn_uuid = api_enum_dropdown_nn.get("data").get("uu_id") street_uu_id = search_street("Reşat Nuri", requester=requester) post_code_uuid = create_address(requester=requester, post_code=post_code_dict(uu_id_street=street_uu_id)) created_address = create_addresses( address=address_dict(post_code_uu_id=post_code_uuid), requester=requester ) created_address_uu_id = created_address["data"]["uu_id"] for response_data in get_build_from_json(): response_data["address_uu_id"] = created_address_uu_id response_data["build_types_uu_id"] = building_type_build response = requester.local_api.post(**requester_dict_build(data=response_data)) response_json = response.json() build_uu_id = str(response_json.get("data").get("uu_id")) migrate_build_area(requester=requester, build_uu_id=build_uu_id) migrate_build_part( requester=requester, build_uu_id=build_uu_id, build_part_type_uu_id=building_type_flat, part_direction_uu_id=api_enum_dropdown_nn_uuid ) migrate_build_iban(requester=requester, build_uu_id=build_uu_id) return