from typing import Optional, Union, Dict, Any from fastapi.requests import Request from fastapi import HTTPException, status from api_services.redis.functions import RedisActions from api_objects import OccupantTokenObject, EmployeeTokenObject from api_configs import Auth from databases import Users, UsersTokens from api_library.date_time_actions.date_functions import system_arrow class TokenService: @staticmethod def validate_token( request: Request, ) -> Union[OccupantTokenObject, EmployeeTokenObject]: """Validate and return token object from request""" try: return RedisActions.get_object_via_access_key(request) except Exception as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"message": str(e)} ) @staticmethod def get_user_tokens(user_id: str) -> Dict[str, Any]: """Get all valid tokens for a user""" return RedisActions.get_object_via_user_uu_id(user_id) @staticmethod def validate_refresh_token( domain: str, refresh_token: str ) -> Optional[UsersTokens]: """Validate refresh token and return token object""" return UsersTokens.filter_by_one( token=refresh_token, domain=domain, **UsersTokens.valid_record_dict ).data @staticmethod def update_user_metadata(user: Users, request: Request): """Update user metadata from request""" user.last_agent = request.headers.get("User-Agent") user.last_platform = request.headers.get("Origin") user.last_remote_addr = getattr( request, "remote_addr", None ) or request.headers.get("X-Forwarded-For") user.last_seen = str(system_arrow.now()) user.save() @staticmethod def clear_user_tokens(user_id: str, domain: Optional[str] = None): """Clear user tokens from Redis Args: user_id: User UUID domain: Optional domain to clear tokens for specific domain only """ tokens = RedisActions.get_object_via_user_uu_id(user_id) for key, token_data in tokens.items(): if domain is None or token_data.get("domain") == domain: RedisActions.redis_cli.delete(key) @staticmethod def validate_password_token(token: str) -> Optional[Users]: """Validate password reset token and return user""" user = Users.filter_one(Users.password_token == token).data if not user: return None # Check if token is expired token_valid_until = system_arrow.get(str(user.password_token_is_valid)) if system_arrow.now() > token_valid_until: user.password_token = "" user.save() return None return user