From 2c5f00ab1ddcc359aca0b512170a9fba5e495d27 Mon Sep 17 00:00:00 2001 From: berkay Date: Sun, 20 Apr 2025 22:16:08 +0300 Subject: [PATCH] redis pub/sub added --- Controllers/Redis/Broadcast/README.md | 67 +++++ Controllers/Redis/Broadcast/actions.py | 252 ++++++++++++++++++ .../Redis/Broadcast/implementations.py | 194 ++++++++++++++ 3 files changed, 513 insertions(+) create mode 100644 Controllers/Redis/Broadcast/README.md create mode 100644 Controllers/Redis/Broadcast/actions.py create mode 100644 Controllers/Redis/Broadcast/implementations.py diff --git a/Controllers/Redis/Broadcast/README.md b/Controllers/Redis/Broadcast/README.md new file mode 100644 index 0000000..3fdf588 --- /dev/null +++ b/Controllers/Redis/Broadcast/README.md @@ -0,0 +1,67 @@ +# Redis Pub/Sub Chain Implementation + +This module implements a chain of services communicating through Redis Pub/Sub channels. Each service in the chain subscribes to the previous service's channel and publishes to its own channel, creating a processing pipeline. + +## Architecture + +The implementation follows a simple chain pattern: + +``` +READER → PROCESSOR → WRITER +``` + +- **READER**: Generates mock data with a "red" stage and publishes to `chain:reader` +- **PROCESSOR**: Subscribes to `chain:reader`, processes messages with "red" stage, updates stage to "processed", and publishes to `chain:processor` +- **WRITER**: Subscribes to `chain:processor`, processes messages with "processed" stage, updates stage to "completed", and publishes to `chain:writer` + +## Message Flow + +Each message flows through the chain with a stage attribute that determines how it's processed: + +1. READER generates a message with `stage="red"` +2. PROCESSOR receives the message, checks if `stage="red"`, processes it, and sets `stage="processed"` +3. WRITER receives the message, checks if `stage="processed"`, processes it, and sets `stage="completed"` + +## Performance + +The implementation includes timing information to track how long messages take to flow through the entire chain. Sample output: + +``` +[READER] 1745176466.132082 | Published UUID: 74cf2312-25ec-4da8-bc0a-521b6ccd5206 +[PROCESSOR] 1745176466.132918 | Received UUID: 74cf2312-25ec-4da8-bc0a-521b6ccd5206 | Published UUID: 74cf2312-25ec-4da8-bc0a-521b6ccd5206 +[WRITER] 1745176466.133097 | Received UUID: 74cf2312-25ec-4da8-bc0a-521b6ccd5206 | Published UUID: 74cf2312-25ec-4da8-bc0a-521b6ccd5206 | Elapsed: 1.83ms +[READER] 1745176468.133018 | Published UUID: 2ffd217f-650f-4e10-bc16-317adcf7a59a +[PROCESSOR] 1745176468.133792 | Received UUID: 2ffd217f-650f-4e10-bc16-317adcf7a59a | Published UUID: 2ffd217f-650f-4e10-bc16-317adcf7a59a +[WRITER] 1745176468.134001 | Received UUID: 2ffd217f-650f-4e10-bc16-317adcf7a59a | Published UUID: 2ffd217f-650f-4e10-bc16-317adcf7a59a | Elapsed: 1.76ms +[READER] 1745176470.133841 | Published UUID: 87e1f3af-c6c2-4fa5-9a65-57e7327d3989 +[PROCESSOR] 1745176470.134623 | Received UUID: 87e1f3af-c6c2-4fa5-9a65-57e7327d3989 | Published UUID: 87e1f3af-c6c2-4fa5-9a65-57e7327d3989 +[WRITER] 1745176470.134861 | Received UUID: 87e1f3af-c6c2-4fa5-9a65-57e7327d3989 | Published UUID: 87e1f3af-c6c2-4fa5-9a65-57e7327d3989 | Elapsed: 1.68ms +``` + +The elapsed time shows the total time from when the READER publishes a message until the WRITER completes processing it. In the samples above, the end-to-end processing time ranges from 1.68ms to 1.83ms. + +## Usage + +To run the demonstration: + +```bash +python -m Controllers.Redis.Broadcast.implementations +``` + +This will start all three services in the chain and begin processing messages. Press Ctrl+C to stop the demonstration. + +## Implementation Details + +The implementation uses: + +1. A singleton Redis Pub/Sub handler with publisher and subscriber capabilities +2. Thread-based message processing +3. JSON serialization for message passing +4. Stage-based message processing to track progress through the chain +5. Timing information to measure performance + +Each service in the chain follows these steps: +1. Subscribe to the appropriate channel +2. Define a message handler function +3. Process incoming messages based on their stage +4. Publish processed messages to the next channel in the chain diff --git a/Controllers/Redis/Broadcast/actions.py b/Controllers/Redis/Broadcast/actions.py new file mode 100644 index 0000000..50fc43f --- /dev/null +++ b/Controllers/Redis/Broadcast/actions.py @@ -0,0 +1,252 @@ +import json +from typing import Optional, Dict, Any, List, Callable, Union +from threading import Thread + +from Controllers.Redis.connection import redis_cli +from Controllers.Redis.response import RedisResponse + + +class RedisPublisher: + """Redis Publisher class for broadcasting messages to channels.""" + + def __init__(self, redis_client=redis_cli): + self.redis_client = redis_client + + def publish(self, channel: str, message: Union[Dict, List, str]) -> RedisResponse: + """Publish a message to a Redis channel. + + Args: + channel: The channel to publish to + message: The message to publish (will be JSON serialized if dict or list) + + Returns: + RedisResponse with status and message + """ + try: + # Convert dict/list to JSON string if needed + if isinstance(message, (dict, list)): + message = json.dumps(message) + + # Publish the message + recipient_count = self.redis_client.publish(channel, message) + + return RedisResponse( + status=True, + message=f"Message published successfully to {channel}.", + data={"recipients": recipient_count} + ) + except Exception as e: + return RedisResponse( + status=False, + message=f"Failed to publish message to {channel}.", + error=str(e) + ) + + +class RedisSubscriber: + """Redis Subscriber class for listening to channels.""" + + def __init__(self, redis_client=redis_cli): + self.redis_client = redis_client + self.pubsub = self.redis_client.pubsub() + self.active_threads = {} + + def subscribe(self, channel: str, callback: Callable[[Dict], None]) -> RedisResponse: + """Subscribe to a Redis channel with a callback function. + + Args: + channel: The channel to subscribe to + callback: Function to call when a message is received + + Returns: + RedisResponse with status and message + """ + try: + # Subscribe to the channel + self.pubsub.subscribe(**{channel: self._message_handler(callback)}) + + return RedisResponse( + status=True, + message=f"Successfully subscribed to {channel}." + ) + except Exception as e: + return RedisResponse( + status=False, + message=f"Failed to subscribe to {channel}.", + error=str(e) + ) + + def psubscribe(self, pattern: str, callback: Callable[[Dict], None]) -> RedisResponse: + """Subscribe to Redis channels matching a pattern. + + Args: + pattern: The pattern to subscribe to (e.g., 'user.*') + callback: Function to call when a message is received + + Returns: + RedisResponse with status and message + """ + try: + # Subscribe to the pattern + self.pubsub.psubscribe(**{pattern: self._message_handler(callback)}) + + return RedisResponse( + status=True, + message=f"Successfully pattern-subscribed to {pattern}." + ) + except Exception as e: + return RedisResponse( + status=False, + message=f"Failed to pattern-subscribe to {pattern}.", + error=str(e) + ) + + def _message_handler(self, callback: Callable[[Dict], None]): + """Create a message handler function for the subscription.""" + def handler(message): + # Skip subscription confirmation messages + if message['type'] in ('subscribe', 'psubscribe'): + return + + # Parse JSON if the message is a JSON string + data = message['data'] + if isinstance(data, bytes): + data = data.decode('utf-8') + try: + data = json.loads(data) + except json.JSONDecodeError: + # Not JSON, keep as is + pass + + # Call the callback with the message data + callback({ + 'channel': message.get('channel', b'').decode('utf-8') if isinstance(message.get('channel', b''), bytes) else message.get('channel', ''), + 'pattern': message.get('pattern', b'').decode('utf-8') if isinstance(message.get('pattern', b''), bytes) else message.get('pattern', ''), + 'data': data + }) + + return handler + + def start_listening(self, in_thread: bool = True) -> RedisResponse: + """Start listening for messages on subscribed channels. + + Args: + in_thread: If True, start listening in a separate thread + + Returns: + RedisResponse with status and message + """ + try: + if in_thread: + thread = Thread(target=self._listen_thread, daemon=True) + thread.start() + self.active_threads['listener'] = thread + return RedisResponse( + status=True, + message="Listening thread started successfully." + ) + else: + # This will block the current thread + self._listen_thread() + return RedisResponse( + status=True, + message="Listening started successfully (blocking)." + ) + except Exception as e: + return RedisResponse( + status=False, + message="Failed to start listening.", + error=str(e) + ) + + def _listen_thread(self): + """Thread function for listening to messages.""" + self.pubsub.run_in_thread(sleep_time=0.01) + + def stop_listening(self) -> RedisResponse: + """Stop listening for messages.""" + try: + self.pubsub.close() + return RedisResponse( + status=True, + message="Successfully stopped listening." + ) + except Exception as e: + return RedisResponse( + status=False, + message="Failed to stop listening.", + error=str(e) + ) + + def unsubscribe(self, channel: Optional[str] = None) -> RedisResponse: + """Unsubscribe from a channel or all channels. + + Args: + channel: The channel to unsubscribe from, or None for all channels + + Returns: + RedisResponse with status and message + """ + try: + if channel: + self.pubsub.unsubscribe(channel) + message = f"Successfully unsubscribed from {channel}." + else: + self.pubsub.unsubscribe() + message = "Successfully unsubscribed from all channels." + + return RedisResponse( + status=True, + message=message + ) + except Exception as e: + return RedisResponse( + status=False, + message=f"Failed to unsubscribe from {'channel' if channel else 'all channels'}.", + error=str(e) + ) + + def punsubscribe(self, pattern: Optional[str] = None) -> RedisResponse: + """Unsubscribe from a pattern or all patterns. + + Args: + pattern: The pattern to unsubscribe from, or None for all patterns + + Returns: + RedisResponse with status and message + """ + try: + if pattern: + self.pubsub.punsubscribe(pattern) + message = f"Successfully unsubscribed from pattern {pattern}." + else: + self.pubsub.punsubscribe() + message = "Successfully unsubscribed from all patterns." + + return RedisResponse( + status=True, + message=message + ) + except Exception as e: + return RedisResponse( + status=False, + message=f"Failed to unsubscribe from {'pattern' if pattern else 'all patterns'}.", + error=str(e) + ) + + +class RedisPubSub: + """Singleton class that provides both publisher and subscriber functionality.""" + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(RedisPubSub, cls).__new__(cls) + cls._instance.publisher = RedisPublisher() + cls._instance.subscriber = RedisSubscriber() + return cls._instance + + +# Create a singleton instance +redis_pubsub = RedisPubSub() diff --git a/Controllers/Redis/Broadcast/implementations.py b/Controllers/Redis/Broadcast/implementations.py new file mode 100644 index 0000000..f1fc236 --- /dev/null +++ b/Controllers/Redis/Broadcast/implementations.py @@ -0,0 +1,194 @@ +import json +import time +import uuid +from datetime import datetime +from threading import Thread + +from Controllers.Redis.Broadcast.actions import redis_pubsub + + +# Define the channels for our chain +CHANNEL_READER = "chain:reader" +CHANNEL_PROCESSOR = "chain:processor" +CHANNEL_WRITER = "chain:writer" + +# Flag to control the demo +running = True + +def generate_mock_data(): + """Generate a mock message with UUID, timestamp, and sample data.""" + return { + "uuid": str(uuid.uuid4()), + "timestamp": datetime.now().isoformat(), + "stage": "red", # Initial stage is 'red' + "data": { + "value": f"Sample data {int(time.time())}", + "status": "new", + "counter": 0 + } + } + +def reader_function(): + """ + First function in the chain. + Generates mock data and publishes to the reader channel. + """ + print("[READER] Function started") + + while running: + # Generate mock data + message = generate_mock_data() + start_time = time.time() + message["start_time"] = start_time + + # Publish to reader channel + result = redis_pubsub.publisher.publish(CHANNEL_READER, message) + + if result.status: + print(f"[READER] {time.time():.6f} | Published UUID: {message['uuid']}") + else: + print(f"[READER] Publish error: {result.error}") + + # Wait before generating next message + time.sleep(2) + +def processor_function(): + """ + Second function in the chain. + Subscribes to reader channel, processes messages, and publishes to processor channel. + """ + print("[PROCESSOR] Function started") + def on_reader_message(message): + # The message structure from the subscriber has 'data' containing our actual message + # If data is a string, parse it as JSON + data = message["data"] + if isinstance(data, str): + try: + data = json.loads(data) + except json.JSONDecodeError as e: + print(f"[PROCESSOR] Error parsing message data: {e}") + return + + # Check if stage is 'red' before processing + if data.get("stage") == "red": + # Process the message + data["processor_timestamp"] = datetime.now().isoformat() + data["data"]["status"] = "processed" + data["data"]["counter"] += 1 + + # Update stage to 'processed' + data["stage"] = "processed" + + # Add some processing metadata + data["processing"] = { + "duration_ms": 150, # Mock processing time + "processor_id": "main-processor" + } + + # Publish to processor channel + result = redis_pubsub.publisher.publish(CHANNEL_PROCESSOR, data) + + if result.status: + print(f"[PROCESSOR] {time.time():.6f} | Received UUID: {data['uuid']} | Published UUID: {data['uuid']}") + else: + print(f"[PROCESSOR] Publish error: {result.error}") + else: + print(f"[PROCESSOR] Skipped message: {data['uuid']} (stage is not 'red')") + + # Subscribe to reader channel + result = redis_pubsub.subscriber.subscribe(CHANNEL_READER, on_reader_message) + + if result.status: + print(f"[PROCESSOR] Subscribed to channel: {CHANNEL_READER}") + else: + print(f"[PROCESSOR] Subscribe error: {result.error}") + +def writer_function(): + """ + Third function in the chain. + Subscribes to processor channel and performs final processing. + """ + print("[WRITER] Function started") + def on_processor_message(message): + # The message structure from the subscriber has 'data' containing our actual message + # If data is a string, parse it as JSON + data = message["data"] + if isinstance(data, str): + try: + data = json.loads(data) + except json.JSONDecodeError as e: + print(f"[WRITER] Error parsing message data: {e}") + return + + # Check if stage is 'processed' before processing + if data.get("stage") == "processed": + # Process the message + data["writer_timestamp"] = datetime.now().isoformat() + data["data"]["status"] = "completed" + data["data"]["counter"] += 1 + + # Update stage to 'completed' + data["stage"] = "completed" + + # Add some writer metadata + data["storage"] = { + "location": "main-db", + "partition": "events-2025-04" + } + + # Calculate elapsed time if start_time is available + current_time = time.time() + elapsed_ms = "" + if "start_time" in data: + elapsed_ms = f" | Elapsed: {(current_time - data['start_time']) * 1000:.2f}ms" + + # Optionally publish to writer channel for any downstream listeners + result = redis_pubsub.publisher.publish(CHANNEL_WRITER, data) + + if result.status: + print(f"[WRITER] {current_time:.6f} | Received UUID: {data['uuid']} | Published UUID: {data['uuid']}{elapsed_ms}") + else: + print(f"[WRITER] Publish error: {result.error}") + else: + print(f"[WRITER] Skipped message: {data['uuid']} (stage is not 'processed')") + + # Subscribe to processor channel + result = redis_pubsub.subscriber.subscribe(CHANNEL_PROCESSOR, on_processor_message) + + if result.status: + print(f"[WRITER] Subscribed to channel: {CHANNEL_PROCESSOR}") + else: + print(f"[WRITER] Subscribe error: {result.error}") + + +def run_demo(): + """Run a demonstration of the simple chain of functions.""" + print("=== Starting Redis Pub/Sub Chain Demonstration ===") + print("Chain: READER → PROCESSOR → WRITER") + print(f"Channels: {CHANNEL_READER} → {CHANNEL_PROCESSOR} → {CHANNEL_WRITER}") + print("Format: [SERVICE] TIMESTAMP | Received/Published UUID | [Elapsed time]") + + # Start the Redis subscriber listening thread + redis_pubsub.subscriber.start_listening() + + # Start processor and writer functions (these subscribe to channels) + processor_function() + writer_function() + + # Create a thread for the reader function (this publishes messages) + reader_thread = Thread(target=reader_function, daemon=True) + reader_thread.start() + + # Keep the main thread alive + try: + while True: + time.sleep(0.1) + except KeyboardInterrupt: + print("\nStopping demonstration...") + global running + running = False + redis_pubsub.subscriber.stop_listening() + + +if __name__ == "__main__": + run_demo()