redis pub/sub added
This commit is contained in:
252
Controllers/Redis/Broadcast/actions.py
Normal file
252
Controllers/Redis/Broadcast/actions.py
Normal file
@@ -0,0 +1,252 @@
|
||||
import json
|
||||
from typing import Optional, Dict, Any, List, Callable, Union
|
||||
from threading import Thread
|
||||
|
||||
from Controllers.Redis.connection import redis_cli
|
||||
from Controllers.Redis.response import RedisResponse
|
||||
|
||||
|
||||
class RedisPublisher:
|
||||
"""Redis Publisher class for broadcasting messages to channels."""
|
||||
|
||||
def __init__(self, redis_client=redis_cli):
|
||||
self.redis_client = redis_client
|
||||
|
||||
def publish(self, channel: str, message: Union[Dict, List, str]) -> RedisResponse:
|
||||
"""Publish a message to a Redis channel.
|
||||
|
||||
Args:
|
||||
channel: The channel to publish to
|
||||
message: The message to publish (will be JSON serialized if dict or list)
|
||||
|
||||
Returns:
|
||||
RedisResponse with status and message
|
||||
"""
|
||||
try:
|
||||
# Convert dict/list to JSON string if needed
|
||||
if isinstance(message, (dict, list)):
|
||||
message = json.dumps(message)
|
||||
|
||||
# Publish the message
|
||||
recipient_count = self.redis_client.publish(channel, message)
|
||||
|
||||
return RedisResponse(
|
||||
status=True,
|
||||
message=f"Message published successfully to {channel}.",
|
||||
data={"recipients": recipient_count}
|
||||
)
|
||||
except Exception as e:
|
||||
return RedisResponse(
|
||||
status=False,
|
||||
message=f"Failed to publish message to {channel}.",
|
||||
error=str(e)
|
||||
)
|
||||
|
||||
|
||||
class RedisSubscriber:
|
||||
"""Redis Subscriber class for listening to channels."""
|
||||
|
||||
def __init__(self, redis_client=redis_cli):
|
||||
self.redis_client = redis_client
|
||||
self.pubsub = self.redis_client.pubsub()
|
||||
self.active_threads = {}
|
||||
|
||||
def subscribe(self, channel: str, callback: Callable[[Dict], None]) -> RedisResponse:
|
||||
"""Subscribe to a Redis channel with a callback function.
|
||||
|
||||
Args:
|
||||
channel: The channel to subscribe to
|
||||
callback: Function to call when a message is received
|
||||
|
||||
Returns:
|
||||
RedisResponse with status and message
|
||||
"""
|
||||
try:
|
||||
# Subscribe to the channel
|
||||
self.pubsub.subscribe(**{channel: self._message_handler(callback)})
|
||||
|
||||
return RedisResponse(
|
||||
status=True,
|
||||
message=f"Successfully subscribed to {channel}."
|
||||
)
|
||||
except Exception as e:
|
||||
return RedisResponse(
|
||||
status=False,
|
||||
message=f"Failed to subscribe to {channel}.",
|
||||
error=str(e)
|
||||
)
|
||||
|
||||
def psubscribe(self, pattern: str, callback: Callable[[Dict], None]) -> RedisResponse:
|
||||
"""Subscribe to Redis channels matching a pattern.
|
||||
|
||||
Args:
|
||||
pattern: The pattern to subscribe to (e.g., 'user.*')
|
||||
callback: Function to call when a message is received
|
||||
|
||||
Returns:
|
||||
RedisResponse with status and message
|
||||
"""
|
||||
try:
|
||||
# Subscribe to the pattern
|
||||
self.pubsub.psubscribe(**{pattern: self._message_handler(callback)})
|
||||
|
||||
return RedisResponse(
|
||||
status=True,
|
||||
message=f"Successfully pattern-subscribed to {pattern}."
|
||||
)
|
||||
except Exception as e:
|
||||
return RedisResponse(
|
||||
status=False,
|
||||
message=f"Failed to pattern-subscribe to {pattern}.",
|
||||
error=str(e)
|
||||
)
|
||||
|
||||
def _message_handler(self, callback: Callable[[Dict], None]):
|
||||
"""Create a message handler function for the subscription."""
|
||||
def handler(message):
|
||||
# Skip subscription confirmation messages
|
||||
if message['type'] in ('subscribe', 'psubscribe'):
|
||||
return
|
||||
|
||||
# Parse JSON if the message is a JSON string
|
||||
data = message['data']
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode('utf-8')
|
||||
try:
|
||||
data = json.loads(data)
|
||||
except json.JSONDecodeError:
|
||||
# Not JSON, keep as is
|
||||
pass
|
||||
|
||||
# Call the callback with the message data
|
||||
callback({
|
||||
'channel': message.get('channel', b'').decode('utf-8') if isinstance(message.get('channel', b''), bytes) else message.get('channel', ''),
|
||||
'pattern': message.get('pattern', b'').decode('utf-8') if isinstance(message.get('pattern', b''), bytes) else message.get('pattern', ''),
|
||||
'data': data
|
||||
})
|
||||
|
||||
return handler
|
||||
|
||||
def start_listening(self, in_thread: bool = True) -> RedisResponse:
|
||||
"""Start listening for messages on subscribed channels.
|
||||
|
||||
Args:
|
||||
in_thread: If True, start listening in a separate thread
|
||||
|
||||
Returns:
|
||||
RedisResponse with status and message
|
||||
"""
|
||||
try:
|
||||
if in_thread:
|
||||
thread = Thread(target=self._listen_thread, daemon=True)
|
||||
thread.start()
|
||||
self.active_threads['listener'] = thread
|
||||
return RedisResponse(
|
||||
status=True,
|
||||
message="Listening thread started successfully."
|
||||
)
|
||||
else:
|
||||
# This will block the current thread
|
||||
self._listen_thread()
|
||||
return RedisResponse(
|
||||
status=True,
|
||||
message="Listening started successfully (blocking)."
|
||||
)
|
||||
except Exception as e:
|
||||
return RedisResponse(
|
||||
status=False,
|
||||
message="Failed to start listening.",
|
||||
error=str(e)
|
||||
)
|
||||
|
||||
def _listen_thread(self):
|
||||
"""Thread function for listening to messages."""
|
||||
self.pubsub.run_in_thread(sleep_time=0.01)
|
||||
|
||||
def stop_listening(self) -> RedisResponse:
|
||||
"""Stop listening for messages."""
|
||||
try:
|
||||
self.pubsub.close()
|
||||
return RedisResponse(
|
||||
status=True,
|
||||
message="Successfully stopped listening."
|
||||
)
|
||||
except Exception as e:
|
||||
return RedisResponse(
|
||||
status=False,
|
||||
message="Failed to stop listening.",
|
||||
error=str(e)
|
||||
)
|
||||
|
||||
def unsubscribe(self, channel: Optional[str] = None) -> RedisResponse:
|
||||
"""Unsubscribe from a channel or all channels.
|
||||
|
||||
Args:
|
||||
channel: The channel to unsubscribe from, or None for all channels
|
||||
|
||||
Returns:
|
||||
RedisResponse with status and message
|
||||
"""
|
||||
try:
|
||||
if channel:
|
||||
self.pubsub.unsubscribe(channel)
|
||||
message = f"Successfully unsubscribed from {channel}."
|
||||
else:
|
||||
self.pubsub.unsubscribe()
|
||||
message = "Successfully unsubscribed from all channels."
|
||||
|
||||
return RedisResponse(
|
||||
status=True,
|
||||
message=message
|
||||
)
|
||||
except Exception as e:
|
||||
return RedisResponse(
|
||||
status=False,
|
||||
message=f"Failed to unsubscribe from {'channel' if channel else 'all channels'}.",
|
||||
error=str(e)
|
||||
)
|
||||
|
||||
def punsubscribe(self, pattern: Optional[str] = None) -> RedisResponse:
|
||||
"""Unsubscribe from a pattern or all patterns.
|
||||
|
||||
Args:
|
||||
pattern: The pattern to unsubscribe from, or None for all patterns
|
||||
|
||||
Returns:
|
||||
RedisResponse with status and message
|
||||
"""
|
||||
try:
|
||||
if pattern:
|
||||
self.pubsub.punsubscribe(pattern)
|
||||
message = f"Successfully unsubscribed from pattern {pattern}."
|
||||
else:
|
||||
self.pubsub.punsubscribe()
|
||||
message = "Successfully unsubscribed from all patterns."
|
||||
|
||||
return RedisResponse(
|
||||
status=True,
|
||||
message=message
|
||||
)
|
||||
except Exception as e:
|
||||
return RedisResponse(
|
||||
status=False,
|
||||
message=f"Failed to unsubscribe from {'pattern' if pattern else 'all patterns'}.",
|
||||
error=str(e)
|
||||
)
|
||||
|
||||
|
||||
class RedisPubSub:
|
||||
"""Singleton class that provides both publisher and subscriber functionality."""
|
||||
|
||||
_instance = None
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(RedisPubSub, cls).__new__(cls)
|
||||
cls._instance.publisher = RedisPublisher()
|
||||
cls._instance.subscriber = RedisSubscriber()
|
||||
return cls._instance
|
||||
|
||||
|
||||
# Create a singleton instance
|
||||
redis_pubsub = RedisPubSub()
|
||||
Reference in New Issue
Block a user