import json import time import uuid from datetime import datetime from threading import Thread from Controllers.Redis.Broadcast.actions import redis_pubsub # Define the channels for our chain CHANNEL_READER = "chain:reader" CHANNEL_PROCESSOR = "chain:processor" CHANNEL_WRITER = "chain:writer" # Flag to control the demo running = True def generate_mock_data(): """Generate a mock message with UUID, timestamp, and sample data.""" return { "uuid": str(uuid.uuid4()), "timestamp": datetime.now().isoformat(), "stage": "red", # Initial stage is 'red' "data": { "value": f"Sample data {int(time.time())}", "status": "new", "counter": 0, }, } def reader_function(): """ First function in the chain. Generates mock data and publishes to the reader channel. """ print("[READER] Function started") while running: # Generate mock data message = generate_mock_data() start_time = time.time() message["start_time"] = start_time # Publish to reader channel result = redis_pubsub.publisher.publish(CHANNEL_READER, message) if result.status: print(f"[READER] {time.time():.6f} | Published UUID: {message['uuid']}") else: print(f"[READER] Publish error: {result.error}") # Wait before generating next message time.sleep(2) def processor_function(): """ Second function in the chain. Subscribes to reader channel, processes messages, and publishes to processor channel. """ print("[PROCESSOR] Function started") def on_reader_message(message): # The message structure from the subscriber has 'data' containing our actual message # If data is a string, parse it as JSON data = message["data"] if isinstance(data, str): try: data = json.loads(data) except json.JSONDecodeError as e: print(f"[PROCESSOR] Error parsing message data: {e}") return # Check if stage is 'red' before processing if data.get("stage") == "red": # Process the message data["processor_timestamp"] = datetime.now().isoformat() data["data"]["status"] = "processed" data["data"]["counter"] += 1 # Update stage to 'processed' data["stage"] = "processed" # Add some processing metadata data["processing"] = { "duration_ms": 150, # Mock processing time "processor_id": "main-processor", } # Publish to processor channel result = redis_pubsub.publisher.publish(CHANNEL_PROCESSOR, data) if result.status: print( f"[PROCESSOR] {time.time():.6f} | Received UUID: {data['uuid']} | Published UUID: {data['uuid']}" ) else: print(f"[PROCESSOR] Publish error: {result.error}") else: print(f"[PROCESSOR] Skipped message: {data['uuid']} (stage is not 'red')") # Subscribe to reader channel result = redis_pubsub.subscriber.subscribe(CHANNEL_READER, on_reader_message) if result.status: print(f"[PROCESSOR] Subscribed to channel: {CHANNEL_READER}") else: print(f"[PROCESSOR] Subscribe error: {result.error}") def writer_function(): """ Third function in the chain. Subscribes to processor channel and performs final processing. """ print("[WRITER] Function started") def on_processor_message(message): # The message structure from the subscriber has 'data' containing our actual message # If data is a string, parse it as JSON data = message["data"] if isinstance(data, str): try: data = json.loads(data) except json.JSONDecodeError as e: print(f"[WRITER] Error parsing message data: {e}") return # Check if stage is 'processed' before processing if data.get("stage") == "processed": # Process the message data["writer_timestamp"] = datetime.now().isoformat() data["data"]["status"] = "completed" data["data"]["counter"] += 1 # Update stage to 'completed' data["stage"] = "completed" # Add some writer metadata data["storage"] = {"location": "main-db", "partition": "events-2025-04"} # Calculate elapsed time if start_time is available current_time = time.time() elapsed_ms = "" if "start_time" in data: elapsed_ms = ( f" | Elapsed: {(current_time - data['start_time']) * 1000:.2f}ms" ) # Optionally publish to writer channel for any downstream listeners result = redis_pubsub.publisher.publish(CHANNEL_WRITER, data) if result.status: print( f"[WRITER] {current_time:.6f} | Received UUID: {data['uuid']} | Published UUID: {data['uuid']}{elapsed_ms}" ) else: print(f"[WRITER] Publish error: {result.error}") else: print( f"[WRITER] Skipped message: {data['uuid']} (stage is not 'processed')" ) # Subscribe to processor channel result = redis_pubsub.subscriber.subscribe(CHANNEL_PROCESSOR, on_processor_message) if result.status: print(f"[WRITER] Subscribed to channel: {CHANNEL_PROCESSOR}") else: print(f"[WRITER] Subscribe error: {result.error}") def run_demo(): """Run a demonstration of the simple chain of functions.""" print("=== Starting Redis Pub/Sub Chain Demonstration ===") print("Chain: READER → PROCESSOR → WRITER") print(f"Channels: {CHANNEL_READER} → {CHANNEL_PROCESSOR} → {CHANNEL_WRITER}") print("Format: [SERVICE] TIMESTAMP | Received/Published UUID | [Elapsed time]") # Start the Redis subscriber listening thread redis_pubsub.subscriber.start_listening() # Start processor and writer functions (these subscribe to channels) processor_function() writer_function() # Create a thread for the reader function (this publishes messages) reader_thread = Thread(target=reader_function, daemon=True) reader_thread.start() # Keep the main thread alive try: while True: time.sleep(0.1) except KeyboardInterrupt: print("\nStopping demonstration...") global running running = False redis_pubsub.subscriber.stop_listening() if __name__ == "__main__": run_demo()