import time import arrow import json from typing import Dict, Any from BankServices.WriterService.model import BankReceive from Schemas import AccountRecords, BuildIbans from BankServices.ServiceDepends.config import Config # Import Redis pub/sub handler from Controllers.Redis.Broadcast.actions import redis_pubsub # Define Redis channels REDIS_CHANNEL_IN = "parser" # Subscribe to Parser Service channel REDIS_CHANNEL_OUT = "writer" # Publish to Writer Service channel delimiter = "|" def publish_written_data_to_redis(data: Dict[str, Any], file_name: str) -> bool: """Publish written data status to Redis. Args: data: Original message data from Redis file_name: Name of the processed file Returns: bool: Success status """ # Create a copy of the original message to preserve metadata message = data.copy() if isinstance(data, dict) else {} # Update stage to 'written' message["stage"] = "written" # Add processing timestamp message["written_at"] = str(arrow.now()) # Publish to Redis channel result = redis_pubsub.publisher.publish(REDIS_CHANNEL_OUT, message) if result.status: print(f"[WRITER_SERVICE] Published written status for {file_name} with stage: written") return True else: print(f"[WRITER_SERVICE] Publish error: {result.error}") return False def write_parsed_data_to_account_records(data_dict: dict, file_name: str) -> bool: """Write parsed data to account records database. Args: data_dict: Parsed data dictionary Returns: bool: True if record was created or already exists, False on error """ try: with AccountRecords.new_session() as db_session: # Transform data for database data_dict["bank_balance"] = data_dict.pop("balance") data_dict["import_file_name"] = file_name data_dict = BankReceive(**data_dict).model_dump() print('data_dict', data_dict) # Process date fields bank_date = arrow.get(str(data_dict["bank_date"])) data_dict["bank_date_w"] = bank_date.weekday() data_dict["bank_date_m"] = bank_date.month data_dict["bank_date_d"] = bank_date.day data_dict["bank_date_y"] = bank_date.year data_dict["bank_date"] = str(bank_date) # Add build information if available if build_iban := BuildIbans.filter_by_one( iban=data_dict["iban"], db=db_session ).data: data_dict.update( { "build_id": build_iban.build_id, "build_uu_id": build_iban.build_uu_id, } ) # Create new record or find existing one using specific fields for matching new_account_record = AccountRecords.find_or_create( db=db_session, **data_dict, include_args=[ AccountRecords.bank_date, AccountRecords.iban, AccountRecords.bank_reference_code, AccountRecords.bank_balance ] ) if new_account_record.meta_data.created: new_account_record.is_confirmed = True new_account_record.save(db=db_session) print(f"[WRITER_SERVICE] Created new record in database: {new_account_record.id}") return True else: print(f"[WRITER_SERVICE] Record already exists in database: {new_account_record.id}") return False except Exception as e: print(f"[WRITER_SERVICE] Error writing to database: {str(e)}") return False def process_message(message): """Process a message from Redis. Args: message: Message data from Redis subscriber """ # Extract the message data data = message["data"] # If data is a string, parse it as JSON if isinstance(data, str): try: data = json.loads(data) except json.JSONDecodeError as e: print(f"[WRITER_SERVICE] Error parsing message data: {e}") return # Check if stage is 'parsed' before processing if data.get("stage") == "parsed": try: file_name = data.get("filename") parsed_data = data.get("parsed") print(f"[WRITER_SERVICE] Processing file: {file_name}") if not parsed_data: print(f"[WRITER_SERVICE] No parsed data found for {file_name}") return # Process each parsed data item success = True for item in parsed_data: result = write_parsed_data_to_account_records(data_dict=item, file_name=file_name) if not result: success = False # Publish status update to Redis if all records were processed if success: publish_written_data_to_redis(data=data, file_name=file_name) except Exception as e: print(f"[WRITER_SERVICE] Error processing message: {str(e)}") else: print(f"[WRITER_SERVICE] Skipped message with UUID: {data.get('uuid')} (stage is not 'parsed')") def app(): """Main application function.""" print("[WRITER_SERVICE] Starting Writer Service") # Subscribe to the input channel result = redis_pubsub.subscriber.subscribe(REDIS_CHANNEL_IN, process_message) if result.status: print(f"[WRITER_SERVICE] Subscribed to channel: {REDIS_CHANNEL_IN}") else: print(f"[WRITER_SERVICE] Subscribe error: {result.error}") return # Start listening for messages listen_result = redis_pubsub.subscriber.start_listening(in_thread=True) if listen_result.status: print("[WRITER_SERVICE] Listening for messages") else: print(f"[WRITER_SERVICE] Error starting listener: {listen_result.error}") return if __name__ == "__main__": # Initialize the app once app() # Keep the main thread alive try: while True: time.sleep(Config.EMAIL_SLEEP) except KeyboardInterrupt: print("\n[WRITER_SERVICE] Stopping service...") redis_pubsub.subscriber.stop_listening()