refactor: Improve MyceliumStreamer's message handling

- Removed unnecessary test data from `deduped_mycelium_master.vsh`.
- Simplified `MyceliumStreamer.listen()` to efficiently handle
  incoming messages, removing redundant code and improving readability.
- Enhanced error handling in `MyceliumStreamer.listen()` for more robust
  operation.
This commit is contained in:
Mahmoud Emad
2025-03-03 16:58:13 +02:00
parent 5343d9bff6
commit 42b0c4d48f
2 changed files with 12 additions and 23 deletions

View File

@@ -19,15 +19,6 @@ println('Starting master node...')
// Add worker to whitelist and initialize its database // Add worker to whitelist and initialize its database
streamer.add_worker(worker_public_key)! streamer.add_worker(worker_public_key)!
// Write some test data
// id := streamer.write(id: 1, value: 'Record 1')!
// println('Wrote record with ID: ${id}')
// // Verify data in master
// master_data := streamer.read(id: id)!
// master_data_str := master_data.bytestr()
// println('Master data: ${master_data_str}')
// Keep master running to handle worker connections // Keep master running to handle worker connections
mut id_ := u32(1) mut id_ := u32(1)

View File

@@ -126,23 +126,21 @@ pub:
// listen continuously checks for messages from master and applies updates // listen continuously checks for messages from master and applies updates
pub fn (mut s MyceliumStreamer) listen() ! { pub fn (mut s MyceliumStreamer) listen() ! {
println('Listening for updates from master...')
spawn fn [mut s] () ! { spawn fn [mut s] () ! {
s.listen_()! msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync') or {
return error('Failed to receive message: ${err}')
}
if msg.payload.len > 0 {
update_data := base64.decode(msg.payload)
if mut worker := s.workers[msg.dst_pk] {
worker.sync_updates(update_data) or {
return error('Failed to sync worker: ${err}')
}
}
}
}() }()
}
fn (mut s MyceliumStreamer) listen_() ! {
println('Listening...')
msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync')!
update_data := base64.decode(msg.payload)
if mut worker := s.workers[msg.src_pk] {
worker.sync_updates(update_data) or { return error('Failed to sync worker: ${err}') }
}
time.sleep(time.second * 1) time.sleep(time.second * 1)
return s.listen_() return s.listen()
} }
pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 { pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 {