152 lines
4.9 KiB
Python
152 lines
4.9 KiB
Python
import requests
|
|
|
|
from .api_configs import BaseAPI
|
|
|
|
|
|
active_and_confirmed = dict(
|
|
created_by="System",
|
|
confirmed_by="System",
|
|
is_confirmed=True,
|
|
active=True,
|
|
deleted=False,
|
|
is_notification_send=True,
|
|
)
|
|
|
|
|
|
class FilterObject:
|
|
|
|
def __init__(
|
|
self,
|
|
page=None,
|
|
size=None,
|
|
order_field=None,
|
|
order_type=None,
|
|
include_joins=None,
|
|
query=None,
|
|
):
|
|
self.page = page or 1
|
|
self.size = size or 10
|
|
self.order_field = order_field or "id"
|
|
self.order_type = order_type or "asc"
|
|
self.include_joins = include_joins or []
|
|
self.query = query or {}
|
|
|
|
def dump(self):
|
|
return dict(
|
|
page=self.page,
|
|
size=self.size,
|
|
order_field=self.order_field,
|
|
order_type=self.order_type,
|
|
include_joins=list(self.include_joins),
|
|
query=dict(self.query),
|
|
)
|
|
|
|
|
|
class RequestToApi:
|
|
selected_object = {}
|
|
|
|
def __init__(self, base_url=None):
|
|
self.base_url = base_url or "http://0.0.0.0:41575/"
|
|
self.headers = {
|
|
"domain": "evyos.com.tr",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
def overwrite_base_url(self, base_url):
|
|
self.base_url = base_url
|
|
|
|
def set_access_token(self, access_token):
|
|
self.headers[BaseAPI.ACCESS_TOKEN_TAG] = access_token
|
|
|
|
def url(self, endpoint):
|
|
return self.base_url + endpoint
|
|
|
|
def post(self, data, endpoint):
|
|
response = requests.post(self.url(endpoint), headers=self.headers, json=data)
|
|
return response
|
|
|
|
def get(self, endpoint):
|
|
print("url", self.url(endpoint))
|
|
response = requests.get(self.url(endpoint), headers=self.headers)
|
|
return response
|
|
|
|
def patch(self, data, endpoint):
|
|
response = requests.patch(self.url(endpoint), headers=self.headers, json=data)
|
|
return response
|
|
|
|
def login_via_email_and_password(
|
|
self, login_data, is_password_valid=True, selection_list: list = None
|
|
):
|
|
|
|
if not is_password_valid:
|
|
if not login_data.get("password_token"):
|
|
raise Exception("Password token is not found")
|
|
self.change_password_with_token(
|
|
password_token=login_data.get("password_token")
|
|
)
|
|
|
|
login_dict = self.post(endpoint="authentication/login", data=login_data)
|
|
print("login_dict", login_dict.text)
|
|
login_dict = login_dict.json()
|
|
|
|
content = login_dict
|
|
access_token = content.get("access_token")
|
|
refresh_token = content.get("refresh_token")
|
|
access_object = content.get("access_object")
|
|
print("access_object", access_object)
|
|
if not access_object:
|
|
raise Exception("Access object is not found")
|
|
|
|
companies_uu_id_list = None
|
|
if access_object.get("user_type") == "employee":
|
|
companies_uu_id_list = access_object.get("companies_list")
|
|
elif access_object.get("user_type") == "occupant":
|
|
companies_uu_id_list = access_object.get("available_occupants")
|
|
|
|
# Todo Create Company and assign bulk to Duty2Company
|
|
if not selection_list and not access_object.get("user_type") == "employee":
|
|
print("Please select a company and an occupant")
|
|
print(companies_uu_id_list)
|
|
exit()
|
|
|
|
self.set_access_token(access_token)
|
|
if access_object.get("user_type") == "employee":
|
|
uu_id = companies_uu_id_list[0].get("uu_id")
|
|
print("uu_id", uu_id)
|
|
refresh_company = self.post(
|
|
endpoint="authentication/select", data={"company_uu_id": uu_id}
|
|
)
|
|
return uu_id
|
|
elif access_object.get("user_type") == "occupant":
|
|
# print('selection_list', companies_uu_id_list)
|
|
# print('selection_list', selection_list)
|
|
# occupants = companies_uu_id_list[str(selection_list[0])]
|
|
# occupant_uu_id = None
|
|
# for occupant in occupants:
|
|
# if occupant == str(selection_list[1]):
|
|
# occupant_uu_id = occupant
|
|
# break
|
|
|
|
return_dict = {
|
|
"build_part_uu_id": str(selection_list[0]),
|
|
"occupant_uu_id": str(selection_list[1]),
|
|
}
|
|
refresh_company = self.post(
|
|
endpoint="authentication/select", data=return_dict
|
|
)
|
|
return return_dict
|
|
|
|
def change_password_with_token(self, password_token):
|
|
data_dict = {
|
|
"password_token": password_token,
|
|
"password": "string",
|
|
"re_password": "string",
|
|
}
|
|
if password_token:
|
|
response = self.post(
|
|
data=data_dict, endpoint="authentication/create_password"
|
|
)
|
|
print("change_password_with_token", response.text)
|
|
print("change_password_with_token", response.json())
|