import requests from .api_configs import BaseAPI active_and_confirmed = dict( created_by="System", confirmed_by="System", is_confirmed=True, active=True, deleted=False, is_notification_send=True, ) class FilterObject: def __init__( self, page=None, size=None, order_field=None, order_type=None, include_joins=None, query=None, ): self.page = page or 1 self.size = size or 10 self.order_field = order_field or "id" self.order_type = order_type or "asc" self.include_joins = include_joins or [] self.query = query or {} def dump(self): return dict( page=self.page, size=self.size, order_field=self.order_field, order_type=self.order_type, include_joins=list(self.include_joins), query=dict(self.query), ) class RequestToApi: selected_object = {} def __init__(self, base_url=None): self.base_url = base_url or "http://0.0.0.0:41575/" self.headers = { "domain": "evyos.com.tr", "Content-Type": "application/json", "Accept": "application/json", } def overwrite_base_url(self, base_url): self.base_url = base_url def set_access_token(self, access_token): self.headers[BaseAPI.ACCESS_TOKEN_TAG] = access_token def url(self, endpoint): return self.base_url + endpoint def post(self, data, endpoint): response = requests.post(self.url(endpoint), headers=self.headers, json=data) return response def get(self, endpoint): print("url", self.url(endpoint)) response = requests.get(self.url(endpoint), headers=self.headers) return response def patch(self, data, endpoint): response = requests.patch(self.url(endpoint), headers=self.headers, json=data) return response def login_via_email_and_password( self, login_data, is_password_valid=True, selection_list: list = None ): if not is_password_valid: if not login_data.get("password_token"): raise Exception("Password token is not found") self.change_password_with_token( password_token=login_data.get("password_token") ) login_dict = self.post(endpoint="authentication/login", data=login_data) print("login_dict", login_dict.text) login_dict = login_dict.json() content = login_dict access_token = content.get("access_token") refresh_token = content.get("refresh_token") access_object = content.get("access_object") print("access_object", access_object) if not access_object: raise Exception("Access object is not found") companies_uu_id_list = None if access_object.get("user_type") == "employee": companies_uu_id_list = access_object.get("companies_list") elif access_object.get("user_type") == "occupant": companies_uu_id_list = access_object.get("available_occupants") # Todo Create Company and assign bulk to Duty2Company if not selection_list and not access_object.get("user_type") == "employee": print("Please select a company and an occupant") print(companies_uu_id_list) exit() self.set_access_token(access_token) if access_object.get("user_type") == "employee": uu_id = companies_uu_id_list[0].get("uu_id") print("uu_id", uu_id) refresh_company = self.post( endpoint="authentication/select", data={"company_uu_id": uu_id} ) return uu_id elif access_object.get("user_type") == "occupant": # print('selection_list', companies_uu_id_list) # print('selection_list', selection_list) # occupants = companies_uu_id_list[str(selection_list[0])] # occupant_uu_id = None # for occupant in occupants: # if occupant == str(selection_list[1]): # occupant_uu_id = occupant # break return_dict = { "build_part_uu_id": str(selection_list[0]), "occupant_uu_id": str(selection_list[1]), } refresh_company = self.post( endpoint="authentication/select", data=return_dict ) return return_dict def change_password_with_token(self, password_token): data_dict = { "password_token": password_token, "password": "string", "re_password": "string", } if password_token: response = self.post( data=data_dict, endpoint="authentication/create_password" ) print("change_password_with_token", response.text) print("change_password_with_token", response.json())