wag-managment-api-service-v.../Services/Redis/Models/response.py

54 lines
1.4 KiB
Python

from typing import Union, Dict, List, Optional, Any
from Services.Redis.Models.base import RedisRow
class RedisResponse:
"""Base class for Redis response handling."""
def __init__(
self,
status: bool,
message: str,
data: Any = None,
error: Optional[str] = None,
):
self.status = status
self.message = message
self.data = data
if isinstance(data, dict):
self.data_type = "dict"
elif isinstance(data, list):
self.data_type = "list"
elif data is None:
self.data_type = None
self.error = error
def as_dict(self) -> Dict:
data = self.all
main_dict = {
"status": self.status,
"message": self.message,
"count": self.count,
"dataType": self.data_type,
}
if isinstance(data, RedisRow):
return {"data": {data.keys: data.data}, **main_dict}
elif isinstance(data, list):
return {"data": {row.keys: row.data for row in data}, **main_dict}
@property
def all(self) -> Union[Optional[List[RedisRow]]]:
return self.data or []
@property
def count(self) -> int:
return len(self.all)
@property
def first(self) -> Union[RedisRow, None]:
if self.data:
return self.data[0]
self.status = False
return