wag-managment-api-service-v.../service_app_test/bases.py

152 lines
4.9 KiB
Python

import requests
from .api_configs import BaseAPI
active_and_confirmed = dict(
created_by="System",
confirmed_by="System",
is_confirmed=True,
active=True,
deleted=False,
is_notification_send=True,
)
class FilterObject:
def __init__(
self,
page=None,
size=None,
order_field=None,
order_type=None,
include_joins=None,
query=None,
):
self.page = page or 1
self.size = size or 10
self.order_field = order_field or "id"
self.order_type = order_type or "asc"
self.include_joins = include_joins or []
self.query = query or {}
def dump(self):
return dict(
page=self.page,
size=self.size,
order_field=self.order_field,
order_type=self.order_type,
include_joins=list(self.include_joins),
query=dict(self.query),
)
class RequestToApi:
selected_object = {}
def __init__(self, base_url=None):
self.base_url = base_url or "http://0.0.0.0:41575/"
self.headers = {
"domain": "evyos.com.tr",
"Content-Type": "application/json",
"Accept": "application/json",
}
def overwrite_base_url(self, base_url):
self.base_url = base_url
def set_access_token(self, access_token):
self.headers[BaseAPI.ACCESS_TOKEN_TAG] = access_token
def url(self, endpoint):
return self.base_url + endpoint
def post(self, data, endpoint):
response = requests.post(self.url(endpoint), headers=self.headers, json=data)
return response
def get(self, endpoint):
print("url", self.url(endpoint))
response = requests.get(self.url(endpoint), headers=self.headers)
return response
def patch(self, data, endpoint):
response = requests.patch(self.url(endpoint), headers=self.headers, json=data)
return response
def login_via_email_and_password(
self, login_data, is_password_valid=True, selection_list: list = None
):
if not is_password_valid:
if not login_data.get("password_token"):
raise Exception("Password token is not found")
self.change_password_with_token(
password_token=login_data.get("password_token")
)
login_dict = self.post(endpoint="authentication/login", data=login_data)
print("login_dict", login_dict.text)
login_dict = login_dict.json()
content = login_dict
access_token = content.get("access_token")
refresh_token = content.get("refresh_token")
access_object = content.get("access_object")
print("access_object", access_object)
if not access_object:
raise Exception("Access object is not found")
companies_uu_id_list = None
if access_object.get("user_type") == "employee":
companies_uu_id_list = access_object.get("companies_list")
elif access_object.get("user_type") == "occupant":
companies_uu_id_list = access_object.get("available_occupants")
# Todo Create Company and assign bulk to Duty2Company
if not selection_list and not access_object.get("user_type") == "employee":
print("Please select a company and an occupant")
print(companies_uu_id_list)
exit()
self.set_access_token(access_token)
if access_object.get("user_type") == "employee":
uu_id = companies_uu_id_list[0].get("uu_id")
print("uu_id", uu_id)
refresh_company = self.post(
endpoint="authentication/select", data={"company_uu_id": uu_id}
)
return uu_id
elif access_object.get("user_type") == "occupant":
# print('selection_list', companies_uu_id_list)
# print('selection_list', selection_list)
# occupants = companies_uu_id_list[str(selection_list[0])]
# occupant_uu_id = None
# for occupant in occupants:
# if occupant == str(selection_list[1]):
# occupant_uu_id = occupant
# break
return_dict = {
"build_part_uu_id": str(selection_list[0]),
"occupant_uu_id": str(selection_list[1]),
}
refresh_company = self.post(
endpoint="authentication/select", data=return_dict
)
return return_dict
def change_password_with_token(self, password_token):
data_dict = {
"password_token": password_token,
"password": "string",
"re_password": "string",
}
if password_token:
response = self.post(
data=data_dict, endpoint="authentication/create_password"
)
print("change_password_with_token", response.text)
print("change_password_with_token", response.json())