wag-managment-api-service-v.../service_app_test/test_application/migrate_old_data/building.py

162 lines
5.5 KiB
Python

from service_app_test.api_configs import BothAPIS
from service_app_test.test_application.migrate_old_data.reader_and_alchemy_bulk_actions import (
read_json_file,
)
from api_validations.validations_request import (
InsertBuild,
InsertBuildArea,
InsertBuildParts,
InsertBuildLivingSpace,
)
requester_dict_build = lambda data: {"endpoint": "/building/build/create", "data": data}
requester_dict_build_area = lambda data: {
"endpoint": "/building/area/create",
"data": data,
}
requester_dict_build_part = lambda data: {
"endpoint": "/building/part/create",
"data": data,
}
requester_dict_build_iban = lambda data: {
"endpoint": "/building/iban/create",
"data": data,
}
requester_dict_build_living_space = lambda data: {
"endpoint": "/building/living_space/create",
"data": data,
}
def get_build_from_json():
read_files_json, with_pydantic = read_json_file(json_file="build"), []
read_files = read_files_json.get("build")
for row in read_files:
pydantic_row = InsertBuild(**row)
with_pydantic.append(pydantic_row.model_dump())
if not with_pydantic:
raise Exception("No data found")
return with_pydantic
def get_build_area_from_json():
read_files_json, with_pydantic = read_json_file(json_file="build_area"), []
read_files = read_files_json.get("build_area")
for row in read_files:
pydantic_row = InsertBuildArea(**row)
with_pydantic.append(pydantic_row.model_dump())
if not with_pydantic:
raise Exception("No data found")
return with_pydantic
def get_build_part_from_json():
read_files_json, with_pydantic = read_json_file(json_file="build_parts"), []
read_files = read_files_json.get("build_parts")
for row in read_files:
pydantic_row = InsertBuildParts(**row)
with_pydantic.append(pydantic_row.model_dump())
if not with_pydantic:
raise Exception("No data found")
return with_pydantic
def get_build_iban_from_json():
read_files_json, with_pydantic = read_json_file(json_file="build_iban"), []
read_files = read_files_json.get("build_iban")
for row in read_files:
pydantic_row = InsertBuildParts(**row)
with_pydantic.append(pydantic_row.model_dump())
if not with_pydantic:
raise Exception("No data found")
return with_pydantic
def get_build_living_space_from_json():
read_files_json, with_pydantic = read_json_file(json_file="build_living_space"), []
read_files = read_files_json.get("build_living_space")
for row in read_files:
pydantic_row = InsertBuildLivingSpace(**row)
with_pydantic.append(pydantic_row.model_dump())
if not with_pydantic:
raise Exception("No data found")
return with_pydantic
def migrate_build(requester: BothAPIS):
for response_data in get_build_from_json():
print("response_data", response_data)
response = requester.local_api.post(**requester_dict_build(data=response_data))
if response.ok:
migrate_build_area(
requester=requester, build_uu_id=response_data["build_uu_id"]
)
migrate_build_part(
requester=requester, build_uu_id=response_data["build_uu_id"]
)
migrate_build_iban(
requester=requester, build_uu_id=response_data["build_uu_id"]
)
return
def grab_new_build_uu_id(requester: BothAPIS, build_uu_id: str):
response_wag = requester.wag_api.post(
endpoint="/building/build/list",
data={"page": 1, "size": 1, "query": {"uu_id": build_uu_id}},
)
build_uu_id = response_wag.json()["data"]["uu_id"]
if not build_uu_id:
raise Exception("Build UU ID not found")
return build_uu_id
def migrate_build_area(requester: BothAPIS, build_uu_id: str):
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
response_datas = get_build_area_from_json()
for response_data in response_datas:
response_data["build_uu_id"] = build_uu_id
print("response_data", response_data)
response = requester.local_api.post(
**requester_dict_build_area(data=response_data)
)
print("response", response.text)
return
def migrate_build_part(requester: BothAPIS, build_uu_id: str):
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
response_datas = get_build_part_from_json()
for response_data in response_datas:
response_data["build_uu_id"] = build_uu_id
response = requester.local_api.post(
**requester_dict_build_part(data=response_data)
)
print("response", response.text)
return
def migrate_build_iban(requester: BothAPIS, build_uu_id: str):
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
response_datas = get_build_iban_from_json()
for response_data in response_datas:
response_data["build_uu_id"] = build_uu_id
response = requester.local_api.post(
**requester_dict_build_part(data=response_data)
)
print("response", response.text)
return
def migrate_build_living_space(requester: BothAPIS, build_uu_id: str):
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
response_datas = get_build_iban_from_json()
for response_data in response_datas:
response_data["build_uu_id"] = build_uu_id
response = requester.local_api.post(
**requester_dict_build_living_space(data=response_data)
)
print("response", response.text)
return