From 71906fd891019cc4b024e082be997d7cba60a995 Mon Sep 17 00:00:00 2001 From: Mahmoud Emad Date: Sun, 2 Mar 2025 22:01:44 +0200 Subject: [PATCH] feat: Improve Mycelium slave and streamer communication - Renamed the topic for database synchronization messages from 'db_sync' to 'sync_db' for clarity. - Updated the Mycelium slave to decode base64 payload before processing and to log received messages and their source. - Added logging to the Mycelium streamer to track sent messages. - Added a new feature to retrieve and log the last index from the worker after syncing updates. This improves monitoring and debugging capabilities. --- examples/data/deduped_mycelium_slave.vsh | 15 ++++++++++++--- lib/data/ourdb/mycelium_streamer.v | 4 +++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/examples/data/deduped_mycelium_slave.vsh b/examples/data/deduped_mycelium_slave.vsh index 0d261fe7..887b4b2f 100755 --- a/examples/data/deduped_mycelium_slave.vsh +++ b/examples/data/deduped_mycelium_slave.vsh @@ -9,6 +9,8 @@ import os import encoding.base64 import json +// TODO: Make the worker read the data from the streamer instead. + const slave_port = 9000 const master_public_key = '89c2eeb24bcdfaaac78c0023a166d88f760c097c1a57748770e432ba10757179' const master_address = '458:90d4:a3ef:b285:6d32:a22d:9e73:697f' @@ -39,9 +41,16 @@ defer { // Receive messages // Parameters: wait_for_message, peek_only, topic_filter -received := slave.receive_msg(wait: true, peek: false, topic: 'db_sync')! +received := slave.receive_msg(wait: true, peek: false, topic: 'sync_db')! println('Received message from: ${received.src_pk}') println('Message payload: ${base64.decode_str(received.payload)}') -payload := received.payload -worker.sync_updates(payload.bytes()) or { error('Failed to sync updates to worker due to: ${err}') } +payload := base64.decode(received.payload) +println('Payload: ${payload.str()}') +worker.sync_updates(received.payload.bytes()) or { + error('Failed to sync updates to worker due to: ${err}') +} + +// Get last index +last_index := worker.get_last_index()! +println('Last index: ${last_index}') diff --git a/lib/data/ourdb/mycelium_streamer.v b/lib/data/ourdb/mycelium_streamer.v index b2254eff..548a34dc 100644 --- a/lib/data/ourdb/mycelium_streamer.v +++ b/lib/data/ourdb/mycelium_streamer.v @@ -67,11 +67,13 @@ pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 { // Broadcast to all workers for worker_key, mut _ in s.workers { - s.mycelium_client.send_msg( + println('Sending message to worker: ${worker_key}') + msg := s.mycelium_client.send_msg( public_key: worker_key // destination public key payload: data.str() // message payload topic: 'sync_db' // optional topic )! + println('Sent message ID: ${msg.id}') } return id }