149 lines
4.5 KiB
Python
149 lines
4.5 KiB
Python
import json
|
|
import arrow
|
|
from typing import (
|
|
Any,
|
|
Optional,
|
|
List,
|
|
Dict,
|
|
)
|
|
|
|
from ApiServiceRedis.Redis.conn import redis_cli
|
|
from ApiServiceRedis.Redis.Models.response import RedisResponse
|
|
|
|
|
|
class RedisRow:
|
|
|
|
key: str | bytes
|
|
value: Any
|
|
delimiter: str = "*"
|
|
expires_at: Optional[str] = None
|
|
|
|
@classmethod
|
|
def merge(cls, set_values: list[str | bytes]):
|
|
for key, set_value in enumerate(set_values):
|
|
set_value = str(set_value) if isinstance(set_value, bytes) else set_value
|
|
cls.key += f"{set_value}" if key == len(set_values) - 1 else f"{set_value}{cls.delimiter}"
|
|
cls.key = cls.key.encode()
|
|
|
|
@classmethod
|
|
def regex(cls, list_keys: list[str | bytes]):
|
|
search_regex = ""
|
|
for key, list_key in enumerate(list_keys):
|
|
list_key = list_key.decode() if isinstance(list_key, bytes) else str(list_key)
|
|
if key == 0 and list_key:
|
|
search_regex += f"{list_key}{cls.delimiter}*"
|
|
elif key == len(list_keys) - 1 and list_key:
|
|
search_regex += f"*{cls.delimiter}{list_key}"
|
|
else:
|
|
if list_key:
|
|
search_regex += f"*{cls.delimiter}{list_key}{cls.delimiter}*"
|
|
return search_regex
|
|
|
|
@classmethod
|
|
def parse(cls):
|
|
return cls.key.split(cls.delimiter) if cls.key else []
|
|
|
|
@classmethod
|
|
def feed(cls, value: Any):
|
|
cls.value = json.dumps(value)
|
|
|
|
@classmethod
|
|
def modify(cls, add_dict):
|
|
value = cls.data or {}
|
|
cls.feed({**value, **add_dict})
|
|
|
|
@classmethod
|
|
def remove(cls, key):
|
|
value = cls.data or {}
|
|
value.pop(key)
|
|
cls.feed(value)
|
|
|
|
@property
|
|
def keys(self):
|
|
return self.key if isinstance(self.key, str) else self.key.decode()
|
|
|
|
@property
|
|
def redis_key(self):
|
|
return self.key if isinstance(self.key, bytes) else self.key.encode()
|
|
|
|
@property
|
|
def data(self):
|
|
return json.loads(self.value)
|
|
|
|
@property
|
|
def as_dict(self):
|
|
return {
|
|
"keys": self.keys,
|
|
"value": self.data,
|
|
}
|
|
|
|
|
|
class RedisActions:
|
|
|
|
@classmethod
|
|
def get_expiry_time(cls, expiry_kwargs: dict) -> int:
|
|
expiry_time = 0
|
|
if "days" in expiry_kwargs:
|
|
expiry_time += int(expiry_kwargs["days"]) * 24 * 60 * 60
|
|
if "hours" in expiry_kwargs:
|
|
expiry_time += int(expiry_kwargs["hours"]) * 60 * 60
|
|
if "minutes" in expiry_kwargs:
|
|
expiry_time += int(expiry_kwargs["minutes"]) * 60
|
|
if "seconds" in expiry_kwargs:
|
|
expiry_time += int(expiry_kwargs["seconds"])
|
|
return expiry_time
|
|
|
|
@classmethod
|
|
def set_json(
|
|
cls, list_keys: list[str | bytes], value: Optional[dict | list], expires: Optional[dict] = None
|
|
):
|
|
redis_row = RedisRow()
|
|
redis_row.merge(set_values=list_keys)
|
|
redis_row.feed(value)
|
|
try:
|
|
if expires:
|
|
expiry_time = cls.get_expiry_time(expiry_kwargs=expires)
|
|
redis_cli.setex(
|
|
name=redis_row.redis_key,
|
|
time=expiry_time,
|
|
value=redis_row.value,
|
|
)
|
|
redis_row.expires_at = arrow.now().shift(seconds=expiry_time).format("YYYY-MM-DD HH:mm:ss")
|
|
else:
|
|
redis_cli.set(name=redis_row.redis_key, value=redis_row.value)
|
|
return RedisResponse(
|
|
status=True,
|
|
message="Value is set successfully.",
|
|
data=redis_row,
|
|
)
|
|
except Exception as e:
|
|
return RedisResponse(
|
|
status=False,
|
|
message="Value is not set successfully.",
|
|
error=str(e),
|
|
)
|
|
|
|
@classmethod
|
|
def get_json(cls, list_keys: list[str | bytes]):
|
|
try:
|
|
redis_row = RedisRow()
|
|
json_get = redis_cli.get(redis_row.regex(list_keys=list_keys))
|
|
redis_row.key = json_get
|
|
if not json_get:
|
|
return RedisResponse(
|
|
status=False,
|
|
message="Value is not get successfully.",
|
|
error="Value is not found in the redis.",
|
|
)
|
|
return RedisResponse(
|
|
status=True,
|
|
message="Value is get successfully.",
|
|
data=json.loads(json_get),
|
|
)
|
|
except Exception as e:
|
|
return RedisResponse(
|
|
status=False,
|
|
message="Value is not get successfully.",
|
|
error=str(e),
|
|
)
|