diff --git a/examples/data/deduped_mycelium_slave.vsh b/examples/data/deduped_mycelium_slave.vsh index 0d261fe7..887b4b2f 100755 --- a/examples/data/deduped_mycelium_slave.vsh +++ b/examples/data/deduped_mycelium_slave.vsh @@ -9,6 +9,8 @@ import os import encoding.base64 import json +// TODO: Make the worker read the data from the streamer instead. + const slave_port = 9000 const master_public_key = '89c2eeb24bcdfaaac78c0023a166d88f760c097c1a57748770e432ba10757179' const master_address = '458:90d4:a3ef:b285:6d32:a22d:9e73:697f' @@ -39,9 +41,16 @@ defer { // Receive messages // Parameters: wait_for_message, peek_only, topic_filter -received := slave.receive_msg(wait: true, peek: false, topic: 'db_sync')! +received := slave.receive_msg(wait: true, peek: false, topic: 'sync_db')! println('Received message from: ${received.src_pk}') println('Message payload: ${base64.decode_str(received.payload)}') -payload := received.payload -worker.sync_updates(payload.bytes()) or { error('Failed to sync updates to worker due to: ${err}') } +payload := base64.decode(received.payload) +println('Payload: ${payload.str()}') +worker.sync_updates(received.payload.bytes()) or { + error('Failed to sync updates to worker due to: ${err}') +} + +// Get last index +last_index := worker.get_last_index()! +println('Last index: ${last_index}') diff --git a/lib/data/ourdb/mycelium_streamer.v b/lib/data/ourdb/mycelium_streamer.v index b2254eff..548a34dc 100644 --- a/lib/data/ourdb/mycelium_streamer.v +++ b/lib/data/ourdb/mycelium_streamer.v @@ -67,11 +67,13 @@ pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 { // Broadcast to all workers for worker_key, mut _ in s.workers { - s.mycelium_client.send_msg( + println('Sending message to worker: ${worker_key}') + msg := s.mycelium_client.send_msg( public_key: worker_key // destination public key payload: data.str() // message payload topic: 'sync_db' // optional topic )! + println('Sent message ID: ${msg.id}') } return id }