wag-managment-api-service-v.../api_services/redis/conn.py

32 lines
770 B
Python

from redis import Redis
# from configs import WagRedis
from api_configs import WagRedis
class RedisConn:
def __init__(self):
self.redis = Redis(
host=WagRedis.REDIS_HOST,
password=WagRedis.REDIS_PASSWORD,
port=WagRedis.REDIS_PORT,
db=WagRedis.REDIS_DB,
)
if not self.check_connection():
raise Exception("Connection error")
def check_connection(self):
return self.redis.ping()
def set_connection(self, host, password, port, db):
self.redis = Redis(host=host, password=password, port=port, db=db)
return self.redis
try:
redis_conn = RedisConn()
redis_cli = redis_conn.redis
except Exception as e:
print("Redis Connection Error", e)