diff --git a/examples/data/deduped_mycelium_worker.vsh b/examples/data/deduped_mycelium_worker.vsh index 1f49f580..9bd04f9a 100755 --- a/examples/data/deduped_mycelium_worker.vsh +++ b/examples/data/deduped_mycelium_worker.vsh @@ -6,7 +6,7 @@ import freeflowuniverse.herolib.data.ourdb mut streamer := ourdb.get_streamer(id: 'frBvtZQeqf') or { ourdb.new_streamer( incremental_mode: false - server_port: 9001 // Use different port than master + server_port: 9000 // Use different port than master is_worker: true )! } diff --git a/lib/data/ourdb/mycelium_streamer.v b/lib/data/ourdb/mycelium_streamer.v index ebeb8b3b..3e9b9e50 100644 --- a/lib/data/ourdb/mycelium_streamer.v +++ b/lib/data/ourdb/mycelium_streamer.v @@ -6,14 +6,6 @@ import time import encoding.base64 import json -// SyncData encodes binary data as base64 string for JSON compatibility -struct SyncData { -pub: - id u32 - data string // base64 encoded []u8 - topic string = 'db_sync' -} - struct MyceliumStreamer { pub mut: master &OurDB @[skip; str: skip] @@ -112,23 +104,13 @@ pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 { // Get updates from the beginning (id 0) to ensure complete sync data := s.master.push_updates(id) or { return error('Failed to push updates due to: ${err}') } - // Create sync data - sync_data := SyncData{ - id: id - data: base64.encode(data) // encode binary data directly - topic: 'db_sync' - } - - // Convert to JSON - json_data := json.encode(sync_data) - // Broadcast to all workers for worker_key, mut _ in s.workers { println('Sending message to worker: ${worker_key}') msg := s.mycelium_client.send_msg( public_key: worker_key // destination public key - payload: data.str() // message payload - topic: 'sync_db' // optional topic + payload: base64.encode(data) // message payload + topic: 'db_sync' // optional topic )! println('Sent message ID: ${msg.id}') } @@ -146,16 +128,16 @@ pub fn (mut s MyceliumStreamer) listen() ! { println('Starting to listen for messages...') spawn fn [mut s] () { for { + println('Listening for messages...') // Check for updates from master - if msg := s.mycelium_client.receive_msg(wait: true, peek: false, topic: 'db_sync') { - // Decode message payload as JSON - sync_data := json.decode(SyncData, msg.payload) or { - eprintln('Failed to decode sync data JSON: ${err}') - continue - } - + if msg := s.mycelium_client.receive_msg( + wait: true + peek: false + topic: 'db_sync' + ) + { // Decode the base64 data - update_data := base64.decode(sync_data.data) + update_data := base64.decode(msg.payload) if update_data.len == 0 { eprintln('Failed to decode base64 data') continue @@ -163,6 +145,7 @@ pub fn (mut s MyceliumStreamer) listen() ! { // Find the target worker and apply updates if mut worker := s.workers[msg.src_pk] { + println('Received update from worker: ${msg.src_pk}') worker.sync_updates(update_data) or { eprintln('Failed to sync worker: ${err}') continue