wag-managment-api-service-v.../service_app_test/test_application/migrate_old_data/building.py

226 lines
7.9 KiB
Python

from service_app_test.api_configs import BothAPIS
from service_app_test.test_application.evyos.address_building import post_code_dict
from service_app_test.test_application.migrate_old_data.reader_and_alchemy_bulk_actions import (
read_json_file,
)
from api_validations.validations_request import (
InsertBuild,
InsertBuildArea,
InsertBuildParts,
InsertBuildLivingSpace,
)
requester_dict_build = lambda data: {"endpoint": "/building/build/create", "data": data}
requester_dict_build_area = lambda data: {
"endpoint": "/building/area/create",
"data": data,
}
requester_dict_build_part = lambda data: {
"endpoint": "/building/part/create",
"data": data,
}
requester_dict_build_iban = lambda data: {
"endpoint": "/building/iban/create",
"data": data,
}
requester_dict_build_living_space = lambda data: {
"endpoint": "/building/living_space/create",
"data": data,
}
def get_build_from_json():
read_files_json, with_pydantic = read_json_file(json_file="build"), []
read_files = read_files_json.get("build")
for row in read_files:
pydantic_row = InsertBuild(**row)
with_pydantic.append(pydantic_row.model_dump())
if not with_pydantic:
raise Exception("No data found")
return with_pydantic
def get_build_area_from_json():
read_files_json, with_pydantic = read_json_file(json_file="build_area"), []
read_files = read_files_json.get("build_area")
for row in read_files:
pydantic_row = InsertBuildArea(**row)
with_pydantic.append(pydantic_row.model_dump())
if not with_pydantic:
raise Exception("No data found")
return with_pydantic
def get_build_part_from_json():
read_files_json, with_pydantic = read_json_file(json_file="build_parts"), []
read_files = read_files_json.get("build_parts")
for row in read_files:
pydantic_row = InsertBuildParts(**row)
with_pydantic.append(pydantic_row.model_dump())
if not with_pydantic:
raise Exception("No data found")
return with_pydantic
def get_build_iban_from_json():
read_files_json, with_pydantic = read_json_file(json_file="build_iban"), []
read_files = read_files_json.get("build_iban")
for row in read_files:
pydantic_row = InsertBuildParts(**row)
with_pydantic.append(pydantic_row.model_dump())
if not with_pydantic:
raise Exception("No data found")
return with_pydantic
address_dict = lambda post_code_uu_id: {
"post_code_uu_id": post_code_uu_id,
"comment_address": "Reşat Nuri Cad No 11",
"letter_address": "Reşat Nuri Cad No 11 ÇANKAYA ANKARA TÜRKİYE",
"short_letter_address": "Reşat Nuri Cad No 11 ÇANKAYA ANKARA TÜRKİYE",
"build_number": "11",
}
def get_build_living_space_from_json():
read_files_json, with_pydantic = read_json_file(json_file="build_living_space"), []
read_files = read_files_json.get("build_living_space")
for row in read_files:
pydantic_row = InsertBuildLivingSpace(**row)
with_pydantic.append(pydantic_row.model_dump())
if not with_pydantic:
raise Exception("No data found")
return with_pydantic
def create_address(requester: BothAPIS, post_code):
post_code_response = requester.local_api.post(
endpoint="/postcode/create",
data=post_code,
)
print("post_code_response", post_code_response.text)
response_post_code = post_code_response.json()
return response_post_code
def search_street(search_text, requester):
response = requester.local_api.post(
endpoint="/address/search",
data={
"search": search_text,
"list_options": {
"page": 1,
"size": 30,
},
},
)
response_object = response.json()
print("text", response.text)
print("response_object", response_object)
for street_data in response_object.get("data"):
condition_of_street = (
street_data["AddressCity.city_name"] == "ANKARA"
and street_data["AddressDistrict.district_name"] == "ÇANKAYA"
)
if condition_of_street:
print("street_uu_id", street_data["AddressStreet.uu_id"])
return street_data["AddressStreet.uu_id"]
def create_addresses(address, requester):
address_response = requester.local_api.post(
endpoint="/address/create",
data=address,
)
print("text", address_response.text)
print("json", address_response.json())
response_address = address_response.json()
return response_address
def migrate_build(requester: BothAPIS):
street_uu_id = search_street("Reşat Nuri", requester=requester)
response_post_code = create_address(requester=requester, post_code=post_code_dict(uu_id_street=street_uu_id))
print("response_post_code", response_post_code)
created_address = create_addresses(
address=address_dict(post_code_uu_id=response_post_code["data"]["uu_id"]),
requester=requester
)
print('created_address', created_address)
created_address_uu_id = created_address["data"]["uu_id"]
for response_data in get_build_from_json():
print("response_data", response_data)
response_data["address_uu_id"] = created_address_uu_id
exit()
response = requester.local_api.post(**requester_dict_build(data=response_data))
print("response", response.text)
if response.ok:
build_uu_id = response.json()["data"]["uu_id"]
print("response build_uu_id", build_uu_id)
exit()
migrate_build_area(requester=requester, build_uu_id=build_uu_id)
migrate_build_part(requester=requester, build_uu_id=build_uu_id)
migrate_build_iban(requester=requester, build_uu_id=build_uu_id)
return
def grab_new_build_uu_id(requester: BothAPIS, build_uu_id: str):
response_wag = requester.wag_api.post(
endpoint="/building/build/list",
data={"page": 1, "size": 1, "query": {"uu_id": build_uu_id}},
)
build_uu_id = response_wag.json()["data"]["uu_id"]
if not build_uu_id:
raise Exception("Build UU ID not found")
return build_uu_id
def migrate_build_area(requester: BothAPIS, build_uu_id: str):
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
response_datas = get_build_area_from_json()
for response_data in response_datas:
response_data["build_uu_id"] = build_uu_id
print("response_data", response_data)
response = requester.local_api.post(
**requester_dict_build_area(data=response_data)
)
print("response", response.text)
return
def migrate_build_part(requester: BothAPIS, build_uu_id: str):
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
response_datas = get_build_part_from_json()
for response_data in response_datas:
response_data["build_uu_id"] = build_uu_id
response = requester.local_api.post(
**requester_dict_build_part(data=response_data)
)
print("response", response.text)
return
def migrate_build_iban(requester: BothAPIS, build_uu_id: str):
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
response_datas = get_build_iban_from_json()
for response_data in response_datas:
response_data["build_uu_id"] = build_uu_id
response = requester.local_api.post(
**requester_dict_build_part(data=response_data)
)
print("response", response.text)
return
def migrate_build_living_space(requester: BothAPIS, build_uu_id: str):
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
response_datas = get_build_iban_from_json()
for response_data in response_datas:
response_data["build_uu_id"] = build_uu_id
response = requester.local_api.post(
**requester_dict_build_living_space(data=response_data)
)
print("response", response.text)
return