162 lines
5.5 KiB
Python
162 lines
5.5 KiB
Python
from service_app_test.api_configs import BothAPIS
|
|
from service_app_test.test_application.migrate_old_data.reader_and_alchemy_bulk_actions import (
|
|
read_json_file,
|
|
)
|
|
from api_validations.validations_request import (
|
|
InsertBuild,
|
|
InsertBuildArea,
|
|
InsertBuildParts,
|
|
InsertBuildLivingSpace,
|
|
)
|
|
|
|
|
|
requester_dict_build = lambda data: {"endpoint": "/building/build/create", "data": data}
|
|
requester_dict_build_area = lambda data: {
|
|
"endpoint": "/building/area/create",
|
|
"data": data,
|
|
}
|
|
requester_dict_build_part = lambda data: {
|
|
"endpoint": "/building/part/create",
|
|
"data": data,
|
|
}
|
|
requester_dict_build_iban = lambda data: {
|
|
"endpoint": "/building/iban/create",
|
|
"data": data,
|
|
}
|
|
requester_dict_build_living_space = lambda data: {
|
|
"endpoint": "/building/living_space/create",
|
|
"data": data,
|
|
}
|
|
|
|
|
|
def get_build_from_json():
|
|
read_files_json, with_pydantic = read_json_file(json_file="build"), []
|
|
read_files = read_files_json.get("build")
|
|
for row in read_files:
|
|
pydantic_row = InsertBuild(**row)
|
|
with_pydantic.append(pydantic_row.model_dump())
|
|
if not with_pydantic:
|
|
raise Exception("No data found")
|
|
return with_pydantic
|
|
|
|
|
|
def get_build_area_from_json():
|
|
read_files_json, with_pydantic = read_json_file(json_file="build_area"), []
|
|
read_files = read_files_json.get("build_area")
|
|
for row in read_files:
|
|
pydantic_row = InsertBuildArea(**row)
|
|
with_pydantic.append(pydantic_row.model_dump())
|
|
if not with_pydantic:
|
|
raise Exception("No data found")
|
|
return with_pydantic
|
|
|
|
|
|
def get_build_part_from_json():
|
|
read_files_json, with_pydantic = read_json_file(json_file="build_parts"), []
|
|
read_files = read_files_json.get("build_parts")
|
|
for row in read_files:
|
|
pydantic_row = InsertBuildParts(**row)
|
|
with_pydantic.append(pydantic_row.model_dump())
|
|
if not with_pydantic:
|
|
raise Exception("No data found")
|
|
return with_pydantic
|
|
|
|
|
|
def get_build_iban_from_json():
|
|
read_files_json, with_pydantic = read_json_file(json_file="build_iban"), []
|
|
read_files = read_files_json.get("build_iban")
|
|
for row in read_files:
|
|
pydantic_row = InsertBuildParts(**row)
|
|
with_pydantic.append(pydantic_row.model_dump())
|
|
if not with_pydantic:
|
|
raise Exception("No data found")
|
|
return with_pydantic
|
|
|
|
|
|
def get_build_living_space_from_json():
|
|
read_files_json, with_pydantic = read_json_file(json_file="build_living_space"), []
|
|
read_files = read_files_json.get("build_living_space")
|
|
for row in read_files:
|
|
pydantic_row = InsertBuildLivingSpace(**row)
|
|
with_pydantic.append(pydantic_row.model_dump())
|
|
if not with_pydantic:
|
|
raise Exception("No data found")
|
|
return with_pydantic
|
|
|
|
|
|
def migrate_build(requester: BothAPIS):
|
|
for response_data in get_build_from_json():
|
|
print("response_data", response_data)
|
|
response = requester.local_api.post(**requester_dict_build(data=response_data))
|
|
if response.ok:
|
|
migrate_build_area(
|
|
requester=requester, build_uu_id=response_data["build_uu_id"]
|
|
)
|
|
migrate_build_part(
|
|
requester=requester, build_uu_id=response_data["build_uu_id"]
|
|
)
|
|
migrate_build_iban(
|
|
requester=requester, build_uu_id=response_data["build_uu_id"]
|
|
)
|
|
return
|
|
|
|
|
|
def grab_new_build_uu_id(requester: BothAPIS, build_uu_id: str):
|
|
response_wag = requester.wag_api.post(
|
|
endpoint="/building/build/list",
|
|
data={"page": 1, "size": 1, "query": {"uu_id": build_uu_id}},
|
|
)
|
|
build_uu_id = response_wag.json()["data"]["uu_id"]
|
|
if not build_uu_id:
|
|
raise Exception("Build UU ID not found")
|
|
return build_uu_id
|
|
|
|
|
|
def migrate_build_area(requester: BothAPIS, build_uu_id: str):
|
|
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
|
|
response_datas = get_build_area_from_json()
|
|
for response_data in response_datas:
|
|
response_data["build_uu_id"] = build_uu_id
|
|
print("response_data", response_data)
|
|
response = requester.local_api.post(
|
|
**requester_dict_build_area(data=response_data)
|
|
)
|
|
print("response", response.text)
|
|
return
|
|
|
|
|
|
def migrate_build_part(requester: BothAPIS, build_uu_id: str):
|
|
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
|
|
response_datas = get_build_part_from_json()
|
|
for response_data in response_datas:
|
|
response_data["build_uu_id"] = build_uu_id
|
|
response = requester.local_api.post(
|
|
**requester_dict_build_part(data=response_data)
|
|
)
|
|
print("response", response.text)
|
|
return
|
|
|
|
|
|
def migrate_build_iban(requester: BothAPIS, build_uu_id: str):
|
|
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
|
|
response_datas = get_build_iban_from_json()
|
|
for response_data in response_datas:
|
|
response_data["build_uu_id"] = build_uu_id
|
|
response = requester.local_api.post(
|
|
**requester_dict_build_part(data=response_data)
|
|
)
|
|
print("response", response.text)
|
|
return
|
|
|
|
|
|
def migrate_build_living_space(requester: BothAPIS, build_uu_id: str):
|
|
build_uu_id = grab_new_build_uu_id(requester=requester, build_uu_id=build_uu_id)
|
|
response_datas = get_build_iban_from_json()
|
|
for response_data in response_datas:
|
|
response_data["build_uu_id"] = build_uu_id
|
|
response = requester.local_api.post(
|
|
**requester_dict_build_living_space(data=response_data)
|
|
)
|
|
print("response", response.text)
|
|
return
|