feat: Improve Mycelium Streamer functionality

- Added continuous data writing and verification to the master node
  to ensure data persistence and integrity.
- Simplified worker update handling in the `listen` function for
  better efficiency and error handling.  The previous implementation
  had unnecessary complexity and potential for hangs.
This commit is contained in:
Mahmoud Emad
2025-03-03 13:48:55 +02:00
parent d852ecc5b1
commit 1de9be2a8a
3 changed files with 27 additions and 41 deletions

Binary file not shown.

View File

@@ -19,15 +19,25 @@ println('Starting master node...')
streamer.add_worker(worker_public_key)!
// Write some test data
id := streamer.write(id: 1, value: 'Record 1')!
println('Wrote record with ID: ${id}')
// id := streamer.write(id: 1, value: 'Record 1')!
// println('Wrote record with ID: ${id}')
// Verify data in master
master_data := streamer.read(id: id)!
master_data_str := master_data.bytestr()
println('Master data: ${master_data_str}')
// // Verify data in master
// master_data := streamer.read(id: id)!
// master_data_str := master_data.bytestr()
// println('Master data: ${master_data_str}')
// Keep master running to handle worker connections
mut id_ := u32(1)
for {
time.sleep(1 * time.second)
// Write some test data
mut id := streamer.write(id: id_, value: 'Record ${id_}')!
println('Wrote record with ID: ${id}')
// Verify data in master
master_data := streamer.read(id: id)!
master_data_str := master_data.bytestr()
println('Master data: ${master_data_str}')
id_++
}

View File

@@ -4,7 +4,6 @@ import freeflowuniverse.herolib.clients.mycelium
import rand
import time
import encoding.base64
import json
struct MyceliumStreamer {
pub mut:
@@ -125,43 +124,20 @@ pub:
// listen continuously checks for messages from master and applies updates
pub fn (mut s MyceliumStreamer) listen() ! {
spawn fn [mut s] () ! {
println('Starting to listen for messages...')
spawn fn [mut s] () {
for {
println('Listening for messages...')
// Check for updates from master
if msg := s.mycelium_client.receive_msg(
wait: true
peek: false
topic: 'db_sync'
)
{
// Decode the base64 data
update_data := base64.decode(msg.payload)
if update_data.len == 0 {
eprintln('Failed to decode base64 data')
continue
}
msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync')!
// Find the target worker and apply updates
if mut worker := s.workers[msg.src_pk] {
update_data := base64.decode(msg.payload)
println('Received update from worker: ${msg.src_pk}')
worker.sync_updates(update_data) or {
eprintln('Failed to sync worker: ${err}')
continue
}
println('Successfully applied updates from master')
} else {
eprintln('Received update from unknown source: ${msg.src_pk}')
}
}
println('Received update with payload: ${update_data.str()}')
if mut worker := s.workers[msg.src_pk] {
println('Worker with public key ${msg.src_pk} found')
worker.sync_updates(update_data) or { return error('Failed to sync worker: ${err}') }
}
}()
// Keep the main thread alive
for {
time.sleep(1 * time.second)
}
time.sleep(time.second * 1)
return s.listen()
}
pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 {