77 lines
2.8 KiB
Python
77 lines
2.8 KiB
Python
from typing import Optional, Union, Dict, Any
|
|
from fastapi.requests import Request
|
|
from fastapi import HTTPException, status
|
|
|
|
from api_services.redis.functions import RedisActions
|
|
from api_objects import OccupantTokenObject, EmployeeTokenObject
|
|
from api_configs import Auth
|
|
from databases import Users, UsersTokens
|
|
from api_library.date_time_actions.date_functions import system_arrow
|
|
|
|
|
|
class TokenService:
|
|
@staticmethod
|
|
def validate_token(
|
|
request: Request,
|
|
) -> Union[OccupantTokenObject, EmployeeTokenObject]:
|
|
"""Validate and return token object from request"""
|
|
try:
|
|
return RedisActions.get_object_via_access_key(request)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail={"message": str(e)}
|
|
)
|
|
|
|
@staticmethod
|
|
def get_user_tokens(user_id: str) -> Dict[str, Any]:
|
|
"""Get all valid tokens for a user"""
|
|
return RedisActions.get_object_via_user_uu_id(user_id)
|
|
|
|
@staticmethod
|
|
def validate_refresh_token(
|
|
domain: str, refresh_token: str
|
|
) -> Optional[UsersTokens]:
|
|
"""Validate refresh token and return token object"""
|
|
return UsersTokens.filter_by_one(
|
|
token=refresh_token, domain=domain, **UsersTokens.valid_record_dict
|
|
).data
|
|
|
|
@staticmethod
|
|
def update_user_metadata(user: Users, request: Request):
|
|
"""Update user metadata from request"""
|
|
user.last_agent = request.headers.get("User-Agent")
|
|
user.last_platform = request.headers.get("Origin")
|
|
user.last_remote_addr = getattr(
|
|
request, "remote_addr", None
|
|
) or request.headers.get("X-Forwarded-For")
|
|
user.last_seen = str(system_arrow.now())
|
|
user.save()
|
|
|
|
@staticmethod
|
|
def clear_user_tokens(user_id: str, domain: Optional[str] = None):
|
|
"""Clear user tokens from Redis
|
|
Args:
|
|
user_id: User UUID
|
|
domain: Optional domain to clear tokens for specific domain only
|
|
"""
|
|
tokens = RedisActions.get_object_via_user_uu_id(user_id)
|
|
for key, token_data in tokens.items():
|
|
if domain is None or token_data.get("domain") == domain:
|
|
RedisActions.redis_cli.delete(key)
|
|
|
|
@staticmethod
|
|
def validate_password_token(token: str) -> Optional[Users]:
|
|
"""Validate password reset token and return user"""
|
|
user = Users.filter_one(Users.password_token == token).data
|
|
if not user:
|
|
return None
|
|
|
|
# Check if token is expired
|
|
token_valid_until = system_arrow.get(str(user.password_token_is_valid))
|
|
if system_arrow.now() > token_valid_until:
|
|
user.password_token = ""
|
|
user.save()
|
|
return None
|
|
|
|
return user
|