wag-managment-api-service-v.../api_services/token_service.py

77 lines
2.8 KiB
Python

from typing import Optional, Union, Dict, Any
from fastapi.requests import Request
from fastapi import HTTPException, status
from api_services.redis.functions import RedisActions
from api_objects import OccupantTokenObject, EmployeeTokenObject
from api_configs import Auth
from databases import Users, UsersTokens
from api_library.date_time_actions.date_functions import system_arrow
class TokenService:
@staticmethod
def validate_token(
request: Request,
) -> Union[OccupantTokenObject, EmployeeTokenObject]:
"""Validate and return token object from request"""
try:
return RedisActions.get_object_via_access_key(request)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail={"message": str(e)}
)
@staticmethod
def get_user_tokens(user_id: str) -> Dict[str, Any]:
"""Get all valid tokens for a user"""
return RedisActions.get_object_via_user_uu_id(user_id)
@staticmethod
def validate_refresh_token(
domain: str, refresh_token: str
) -> Optional[UsersTokens]:
"""Validate refresh token and return token object"""
return UsersTokens.filter_by_one(
token=refresh_token, domain=domain, **UsersTokens.valid_record_dict
).data
@staticmethod
def update_user_metadata(user: Users, request: Request):
"""Update user metadata from request"""
user.last_agent = request.headers.get("User-Agent")
user.last_platform = request.headers.get("Origin")
user.last_remote_addr = getattr(
request, "remote_addr", None
) or request.headers.get("X-Forwarded-For")
user.last_seen = str(system_arrow.now())
user.save()
@staticmethod
def clear_user_tokens(user_id: str, domain: Optional[str] = None):
"""Clear user tokens from Redis
Args:
user_id: User UUID
domain: Optional domain to clear tokens for specific domain only
"""
tokens = RedisActions.get_object_via_user_uu_id(user_id)
for key, token_data in tokens.items():
if domain is None or token_data.get("domain") == domain:
RedisActions.redis_cli.delete(key)
@staticmethod
def validate_password_token(token: str) -> Optional[Users]:
"""Validate password reset token and return user"""
user = Users.filter_one(Users.password_token == token).data
if not user:
return None
# Check if token is expired
token_valid_until = system_arrow.get(str(user.password_token_is_valid))
if system_arrow.now() > token_valid_until:
user.password_token = ""
user.save()
return None
return user