login route checked

This commit is contained in:
berkay 2025-01-10 14:54:55 +03:00
parent 79aa3a1bc5
commit fa4260dbea
7 changed files with 114 additions and 18 deletions

View File

@ -2,7 +2,8 @@ import uvicorn
import routers
from fastapi.middleware.cors import CORSMiddleware
from fastapi.exceptions import HTTPException
from fastapi import Request, HTTPException, status
from fastapi.responses import JSONResponse
from middlewares.token_middleware import AuthHeaderMiddleware
from application.create_file import create_app
@ -23,8 +24,17 @@ app.add_middleware(
)
app.add_middleware(AuthHeaderMiddleware)
app.add_exception_handler(HTTPException, ErrorHandlers.exception_handler_http)
app.add_exception_handler(Exception, ErrorHandlers.exception_handler_exception)
# Initialize error handlers
error_handlers = ErrorHandlers.create(
requests=Request,
exceptions=HTTPException,
response_model=JSONResponse,
status=status
)
# Register error handlers with bound methods
app.add_exception_handler(HTTPException, error_handlers.exception_handler_http)
app.add_exception_handler(Exception, error_handlers.exception_handler_exception)
if __name__ == "__main__":
uvicorn_config = {

View File

@ -18,6 +18,10 @@ class ErrorHandlers:
)
self.status = status # from fastapi import status
@classmethod
def create(cls, requests, exceptions, response_model, status):
return cls(requests, exceptions, response_model, status)
def exception_handler_http(self, request, exc):
exc_detail = getattr(exc, "detail", None)
try:

View File

@ -211,20 +211,20 @@ class UserLoginModule(AuthModule):
@classmethod
def login_user_with_credentials(cls, data, request):
from api_services.redis.auth_actions.auth import save_access_token_to_redis
from api_services.redis.functions import RedisActions
from databases import (
Users,
People,
MongoQueryIdentity,
)
found_user = Users.check_user_exits(
found_user: Users = Users.check_user_exits(
access_key=data.access_key, domain=data.domain
)
access_token = found_user.generate_access_token()
query_engine = MongoQueryIdentity(company_uuid=found_user.related_company)
if found_user.check_password(password=data.password):
access_object_to_redis = save_access_token_to_redis(
access_object_to_redis = RedisActions.save_access_token_to_redis(
request=request,
found_user=found_user,
domain=data.domain,
@ -238,10 +238,10 @@ class UserLoginModule(AuthModule):
headers_request["evyos-user-agent"] = headers_request.get("user-agent")
headers_request["evyos-platform"] = headers_request.get("user-agent")
headers_request["evyos-ip-ext"] = "94.54.68.158"
found_user.last_agent = headers_request.get("evyos-user-agent", None)
found_user.last_platform = headers_request.get("evyos-platform", None)
found_user.last_remote_addr = headers_request.get("evyos-ip-ext", None)
found_user.last_seen = str(system_arrow.now())
# found_user.last_agent = headers_request.get("evyos-user-agent", None)
# found_user.last_platform = headers_request.get("evyos-platform", None)
# found_user.last_remote_addr = headers_request.get("evyos-ip-ext", None)
# found_user.last_seen = str(system_arrow.now())
if ext_ip := headers_request.get("evyos-ip-ext"):
agent = headers_request.get("evyos-user-agent", "")
platform = headers_request.get("evyos-platform", "")

View File

@ -73,8 +73,8 @@ class MongoQueryIdentity:
def get_domain_via_user(self, user_uu_id):
self.use_collection("Domain")
result = self.mongo_engine.find(match=user_uu_id, field="user_uu_id")
return [validate_timestamp(doc) for doc in result] if result else None
result = self.mongo_engine.filter_by(payload={"user_uu_id": user_uu_id})
return [validate_timestamp(doc) for doc in result.data] if result else None
def refresh_password_history_via_user(self, payload: PasswordHistoryViaUser):
self.use_collection("PasswordHistory")
@ -118,8 +118,8 @@ class MongoQueryIdentity:
def get_password_history_via_user(self, user_uu_id):
self.use_collection("UserPasswordHistory")
result = self.mongo_engine.find(match=user_uu_id, field="user_uu_id")
return [validate_timestamp(doc) for doc in result] if result else None
result = self.mongo_engine.filter_by(payload={"user_uu_id": user_uu_id})
return [validate_timestamp(doc) for doc in result.data] if result else None
def update_access_history_via_user(self, payload: AccessHistoryViaUser):
self.use_collection("AccessHistory")
@ -149,5 +149,5 @@ class MongoQueryIdentity:
def get_access_history_via_user(self, user_uu_id):
self.use_collection("AccessHistory")
result = self.mongo_engine.find(match=user_uu_id, field="user_uu_id")
return [validate_timestamp(doc) for doc in result] if result else None
result = self.mongo_engine.filter_by(payload={"user_uu_id": user_uu_id})
return [validate_timestamp(doc) for doc in result.data] if result else None

View File

@ -196,7 +196,7 @@ class MongoQuery:
status=False,
message="The document has not been found",
message_code="NOT_FOUND",
data=None,
data={},
mongo=None,
)
@ -213,7 +213,7 @@ class MongoQuery:
status=False,
message="The documents have not been found",
message_code="NOT_FOUND",
data=None,
data=[],
mongo=return_,
count=0,
)

View File

@ -0,0 +1,8 @@
{
"test_credentials": {
"domain": "evyos.com.tr",
"access_key": "karatay.berkay.sup@evyos.com.tr",
"password": "string",
"remember_me": false
}
}

View File

@ -0,0 +1,74 @@
import pytest
import requests
import json
import os
BASE_URL = "http://localhost:1111"
LOGIN_ENDPOINT = f"{BASE_URL}/authentication/login"
# Load test data
current_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(current_dir, 'test_data.json'), 'r') as f:
TEST_DATA = json.load(f)
@pytest.fixture
def test_credentials():
return TEST_DATA['test_credentials']
@pytest.fixture
def headers():
return {
"Content-Type": "application/json",
"Accept": "application/json"
}
class TestLogin:
def test_successful_login(self, test_credentials, headers):
"""Test successful login with provided credentials"""
response = requests.post(LOGIN_ENDPOINT, json=test_credentials, headers=headers)
assert response.status_code == 200, f"Login failed with status {response.status_code}. Response: {response.text}"
data = response.json()
assert "token" in data, f"Token not found in response. Response: {data}"
assert data.get("status") == "success", f"Status is not success. Response: {data}"
def test_invalid_credentials(self, headers):
"""Test login with invalid credentials"""
invalid_credentials = {
"domain": "evyos.com.tr",
"access_key": "invalid@evyos.com.tr",
"password": "wrongpassword",
"remember_me": False
}
response = requests.post(LOGIN_ENDPOINT, json=invalid_credentials, headers=headers)
assert response.status_code in [401, 403], f"Expected 401 or 403, got {response.status_code}. Response: {response.text}"
def test_missing_fields(self, headers):
"""Test login with missing required fields"""
incomplete_credentials = {
"domain": "evyos.com.tr",
"access_key": "test@evyos.com.tr"
# missing password
}
response = requests.post(LOGIN_ENDPOINT, json=incomplete_credentials, headers=headers)
assert response.status_code == 422, f"Expected 422, got {response.status_code}. Response: {response.text}"
def test_invalid_domain(self, headers):
"""Test login with invalid domain"""
invalid_domain_credentials = {
"domain": "invalid-domain.com",
"access_key": "test@evyos.com.tr",
"password": "string",
"remember_me": False
}
response = requests.post(LOGIN_ENDPOINT, json=invalid_domain_credentials, headers=headers)
assert response.status_code in [400, 401], f"Expected 400 or 401, got {response.status_code}. Response: {response.text}"
def test_malformed_json(self, headers):
"""Test login with malformed JSON"""
response = requests.post(LOGIN_ENDPOINT, data="invalid json", headers=headers)
assert response.status_code == 422, f"Expected 422, got {response.status_code}. Response: {response.text}"
def test_empty_request(self, headers):
"""Test login with empty request body"""
response = requests.post(LOGIN_ENDPOINT, json={}, headers=headers)
assert response.status_code == 422, f"Expected 422, got {response.status_code}. Response: {response.text}"