diff --git a/examples/data/deduped_mycelium_slave.vsh b/examples/data/deduped_mycelium_slave.vsh index 816624fb..0d261fe7 100755 --- a/examples/data/deduped_mycelium_slave.vsh +++ b/examples/data/deduped_mycelium_slave.vsh @@ -9,19 +9,10 @@ import os import encoding.base64 import json -// NOTE: Before running this script, ensure that the mycelium binary is installed and in the PATH - -const slave_port = 9001 +const slave_port = 9000 const master_public_key = '89c2eeb24bcdfaaac78c0023a166d88f760c097c1a57748770e432ba10757179' const master_address = '458:90d4:a3ef:b285:6d32:a22d:9e73:697f' -// Struct to hold data for syncing -struct SyncData { - id u32 - data string - topic string = 'db_sync' -} - mycelium.delete()! // Initialize mycelium clients @@ -35,7 +26,7 @@ slave_inspect := mycelium.inspect(key_file_path: '/tmp/mycelium_server1/priv_key println('Server 2 (slave Node) public key: ${slave_inspect.public_key}') // Initialize ourdb instances -mut db := ourdb.new( +mut worker := ourdb.new( record_nr_max: 16777216 - 1 record_size_max: 1024 path: '/tmp/ourdb1' @@ -43,7 +34,7 @@ mut db := ourdb.new( )! defer { - db.destroy() or { panic('failed to destroy db1: ${err}') } + worker.destroy() or { panic('failed to destroy db1: ${err}') } } // Receive messages @@ -52,24 +43,5 @@ received := slave.receive_msg(wait: true, peek: false, topic: 'db_sync')! println('Received message from: ${received.src_pk}') println('Message payload: ${base64.decode_str(received.payload)}') -// Send the last inserted record id to the master -// data := 'Test data for sync - ' + time.now().str() -id := db.get_last_index()! -println('Last inserted record id: ${id}') - -// Send sync message to slave -println('\nSending sync message to slave...') -msg := slave.send_msg( - public_key: master_public_key - payload: 'last_inserted_record_id,${id}' - topic: 'db_sync' -)! - -// slave.reply_msg( -// id: received.id -// public_key: received.src_pk -// payload: 'Got your message!' -// topic: 'db_sync' -// )! - -// println('Message sent to master with ID: ${msg.id}') +payload := received.payload +worker.sync_updates(payload.bytes()) or { error('Failed to sync updates to worker due to: ${err}') } diff --git a/lib/data/ourdb/mycelium_streamer.v b/lib/data/ourdb/mycelium_streamer.v index 3a991c39..83521cf9 100644 --- a/lib/data/ourdb/mycelium_streamer.v +++ b/lib/data/ourdb/mycelium_streamer.v @@ -7,6 +7,7 @@ pub mut: master &OurDB @[skip; str: skip] workers map[string]&OurDB @[skip; str: skip] // key is mycelium public key, value is ourdb incremental_mode bool = true // default is true + mycelium_client &mycelium.Mycelium } pub struct NewStreamerArgs { @@ -35,7 +36,9 @@ pub fn new_streamer(args NewStreamerArgs) !MyceliumStreamer { master: &db workers: {} incremental_mode: args.incremental_mode + mycelium_client: &mycelium.Mycelium{} } + s.mycelium_client = mycelium.get()! return s } @@ -52,13 +55,15 @@ pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 { } // Get updates from the beginning (id 0) to ensure complete sync - data := s.master.push_updates(0) or { return error('Failed to push updates due to: ${err}') } + data := s.master.push_updates(id) or { return error('Failed to push updates due to: ${err}') } // Broadcast to all workers - for _, mut worker in s.workers { - worker.sync_updates(data) or { - return error('Failed to sync updates to worker due to: ${err}') - } + for worker_key, mut _ in s.workers { + s.mycelium_client.send_msg( + public_key: worker_key // destination public key + payload: data.str() // message payload + topic: 'sync_db' // optional topic + )! } return id }