wag-managment-api-service-v.../ApiServiceRedis/Redis/Models/base.py

100 lines
3.2 KiB
Python

import json
from typing import Union, Dict, List, Optional, Any
class RedisRow:
"""Class for handling Redis key-value operations with structured data."""
key: Union[str, bytes]
value: Any
delimiter: str = ":"
expires_at: Optional[str] = None
@classmethod
def merge(cls, set_values: List[Union[str, bytes]]) -> None:
"""Merge list of values into a single delimited key."""
cls.key = ""
for key, set_value in enumerate(set_values):
set_value = (
set_value.decode() if isinstance(set_value, bytes) else str(set_value)
)
cls.key += (
f"{set_value}"
if key == len(set_values) - 1
else f"{set_value}{cls.delimiter}"
)
cls.key = cls.key.encode()
@classmethod
def regex(cls, list_keys: List[Union[str, bytes]]) -> str:
"""Generate Redis search pattern from list of keys."""
search_regex = ""
for key, list_key in enumerate(list_keys):
if not list_key:
continue
list_key = (
list_key.decode() if isinstance(list_key, bytes) else str(list_key)
)
if key == 0:
search_regex += f"{list_key}{cls.delimiter}*"
elif key == len(list_keys) - 1:
search_regex += f"*{cls.delimiter}{list_key}"
else:
search_regex += f"*{cls.delimiter}{list_key}{cls.delimiter}*"
return search_regex
@classmethod
def parse(cls) -> List[str]:
"""Parse the key into its component parts."""
return cls.key.split(cls.delimiter) if cls.key else []
@classmethod
def feed(cls, value: Union[bytes, Dict, List]) -> None:
"""Convert and store value in JSON format."""
if isinstance(value, (dict, list)):
cls.value = json.dumps(value)
else:
cls.value = json.dumps(json.loads(value.decode()))
@classmethod
def modify(cls, add_dict: Dict) -> None:
"""Modify existing data by merging with new dictionary."""
value = cls.data or {}
cls.feed({**value, **add_dict})
@classmethod
def remove(cls, key: str) -> None:
"""Remove a key from the stored dictionary."""
value = cls.data or {}
value.pop(key)
cls.feed(value)
@property
def keys(self) -> str:
"""Get key as string."""
return self.key.decode() if isinstance(self.key, bytes) else self.key
@classmethod
def set_key(cls, key: Union[str, bytes]) -> None:
"""Set key ensuring bytes format."""
cls.key = key if isinstance(key, bytes) else key.encode()
@property
def redis_key(self) -> bytes:
"""Get key in bytes format for Redis operations."""
return self.key if isinstance(self.key, bytes) else self.key.encode()
@property
def data(self) -> Union[Dict, List]:
"""Get stored value as Python object."""
return json.loads(self.value)
@property
def as_dict(self) -> Dict:
"""Get row data as dictionary."""
return {
"keys": self.keys,
"value": self.data,
}