updated Redis

This commit is contained in:
2025-01-13 19:12:22 +03:00
parent 6d77c34252
commit 8b263a3a5c
10 changed files with 20 additions and 8 deletions

View File

@@ -0,0 +1,99 @@
import json
from typing import Union, Dict, List, Optional, Any
class RedisRow:
"""Class for handling Redis key-value operations with structured data."""
key: Union[str, bytes]
value: Any
delimiter: str = ":"
expires_at: Optional[str] = None
@classmethod
def merge(cls, set_values: List[Union[str, bytes]]) -> None:
"""Merge list of values into a single delimited key."""
cls.key = ""
for key, set_value in enumerate(set_values):
set_value = (
set_value.decode() if isinstance(set_value, bytes) else str(set_value)
)
cls.key += (
f"{set_value}"
if key == len(set_values) - 1
else f"{set_value}{cls.delimiter}"
)
cls.key = cls.key.encode()
@classmethod
def regex(cls, list_keys: List[Union[str, bytes]]) -> str:
"""Generate Redis search pattern from list of keys."""
search_regex = ""
for key, list_key in enumerate(list_keys):
if not list_key:
continue
list_key = (
list_key.decode() if isinstance(list_key, bytes) else str(list_key)
)
if key == 0:
search_regex += f"{list_key}{cls.delimiter}*"
elif key == len(list_keys) - 1:
search_regex += f"*{cls.delimiter}{list_key}"
else:
search_regex += f"*{cls.delimiter}{list_key}{cls.delimiter}*"
return search_regex
@classmethod
def parse(cls) -> List[str]:
"""Parse the key into its component parts."""
return cls.key.split(cls.delimiter) if cls.key else []
@classmethod
def feed(cls, value: Union[bytes, Dict, List]) -> None:
"""Convert and store value in JSON format."""
if isinstance(value, (dict, list)):
cls.value = json.dumps(value)
else:
cls.value = json.dumps(json.loads(value.decode()))
@classmethod
def modify(cls, add_dict: Dict) -> None:
"""Modify existing data by merging with new dictionary."""
value = cls.data or {}
cls.feed({**value, **add_dict})
@classmethod
def remove(cls, key: str) -> None:
"""Remove a key from the stored dictionary."""
value = cls.data or {}
value.pop(key)
cls.feed(value)
@property
def keys(self) -> str:
"""Get key as string."""
return self.key.decode() if isinstance(self.key, bytes) else self.key
@classmethod
def set_key(cls, key: Union[str, bytes]) -> None:
"""Set key ensuring bytes format."""
cls.key = key if isinstance(key, bytes) else key.encode()
@property
def redis_key(self) -> bytes:
"""Get key in bytes format for Redis operations."""
return self.key if isinstance(self.key, bytes) else self.key.encode()
@property
def data(self) -> Union[Dict, List]:
"""Get stored value as Python object."""
return json.loads(self.value)
@property
def as_dict(self) -> Dict:
"""Get row data as dictionary."""
return {
"keys": self.keys,
"value": self.data,
}

View File

@@ -0,0 +1,44 @@
from typing import Union, Dict, List, Optional, Any
from Services.Redis.Models.base import RedisRow
class RedisResponse:
"""Base class for Redis response handling."""
def __init__(
self,
status: bool,
message: str,
data: Any = None,
error: Optional[str] = None,
):
self.status = status
self.message = message
self.data = data
if isinstance(data, dict):
self.data_type = "dict"
elif isinstance(data, list):
self.data_type = "list"
elif data is None:
self.data_type = None
self.error = error
def as_dict(self) -> Dict:
return {
"status": self.status,
"message": self.message,
"data": self.data,
"dataType": self.data_type,
"error": self.error,
}
@property
def all(self) -> Union[Optional[List[RedisRow]]]:
return self.data
@property
def first(self) -> Union[RedisRow, None]:
if self.data:
return self.data[0]
return None

View File

@@ -0,0 +1,25 @@
from pydantic import BaseModel
class AccessToken(BaseModel):
accessToken: str
userUUID: str
def to_list(self):
return [self.accessToken, self.userUUID]
@property
def count(self):
return 2
@property
def delimiter(self):
return "*"
# access_token_obj = AccessToken.from_dict({
# "accessToken": "token",
# "userUUID": "uuid"
# })
# access_token_obj.to_list()