From 42b0c4d48f535585d128cbea932f4203d2689056 Mon Sep 17 00:00:00 2001 From: Mahmoud Emad Date: Mon, 3 Mar 2025 16:58:13 +0200 Subject: [PATCH] refactor: Improve MyceliumStreamer's message handling - Removed unnecessary test data from `deduped_mycelium_master.vsh`. - Simplified `MyceliumStreamer.listen()` to efficiently handle incoming messages, removing redundant code and improving readability. - Enhanced error handling in `MyceliumStreamer.listen()` for more robust operation. --- examples/data/deduped_mycelium_master.vsh | 9 -------- lib/data/ourdb/mycelium_streamer.v | 26 +++++++++++------------ 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/examples/data/deduped_mycelium_master.vsh b/examples/data/deduped_mycelium_master.vsh index 1279a4f0..7f1297db 100755 --- a/examples/data/deduped_mycelium_master.vsh +++ b/examples/data/deduped_mycelium_master.vsh @@ -19,15 +19,6 @@ println('Starting master node...') // Add worker to whitelist and initialize its database streamer.add_worker(worker_public_key)! -// Write some test data -// id := streamer.write(id: 1, value: 'Record 1')! -// println('Wrote record with ID: ${id}') - -// // Verify data in master -// master_data := streamer.read(id: id)! -// master_data_str := master_data.bytestr() -// println('Master data: ${master_data_str}') - // Keep master running to handle worker connections mut id_ := u32(1) diff --git a/lib/data/ourdb/mycelium_streamer.v b/lib/data/ourdb/mycelium_streamer.v index cf6768a9..1067c205 100644 --- a/lib/data/ourdb/mycelium_streamer.v +++ b/lib/data/ourdb/mycelium_streamer.v @@ -126,23 +126,21 @@ pub: // listen continuously checks for messages from master and applies updates pub fn (mut s MyceliumStreamer) listen() ! { - println('Listening for updates from master...') spawn fn [mut s] () ! { - s.listen_()! + msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync') or { + return error('Failed to receive message: ${err}') + } + if msg.payload.len > 0 { + update_data := base64.decode(msg.payload) + if mut worker := s.workers[msg.dst_pk] { + worker.sync_updates(update_data) or { + return error('Failed to sync worker: ${err}') + } + } + } }() -} - -fn (mut s MyceliumStreamer) listen_() ! { - println('Listening...') - msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync')! - - update_data := base64.decode(msg.payload) - if mut worker := s.workers[msg.src_pk] { - worker.sync_updates(update_data) or { return error('Failed to sync worker: ${err}') } - } - time.sleep(time.second * 1) - return s.listen_() + return s.listen() } pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 {