diff --git a/examples/data/deduped_mycelium_master b/examples/data/deduped_mycelium_master deleted file mode 100755 index 0aaa3185..00000000 Binary files a/examples/data/deduped_mycelium_master and /dev/null differ diff --git a/examples/data/deduped_mycelium_master.vsh b/examples/data/deduped_mycelium_master.vsh index ba1febfb..1279a4f0 100755 --- a/examples/data/deduped_mycelium_master.vsh +++ b/examples/data/deduped_mycelium_master.vsh @@ -11,6 +11,7 @@ mut streamer := ourdb.new_streamer( incremental_mode: false server_port: 9000 // Master uses default port is_worker: false + id: 'frBvtZQeqf' )! println('Starting master node...') diff --git a/examples/data/deduped_mycelium_worker.vsh b/examples/data/deduped_mycelium_worker.vsh index 9bd04f9a..4b79605d 100755 --- a/examples/data/deduped_mycelium_worker.vsh +++ b/examples/data/deduped_mycelium_worker.vsh @@ -2,6 +2,8 @@ import freeflowuniverse.herolib.data.ourdb +worker_public_key := '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c' + // Create a worker node with a unique database path mut streamer := ourdb.get_streamer(id: 'frBvtZQeqf') or { ourdb.new_streamer( @@ -11,6 +13,16 @@ mut streamer := ourdb.get_streamer(id: 'frBvtZQeqf') or { )! } -println('Starting worker node...') -println('Listening for updates from master...') -streamer.listen()! // This will keep running and listening for updates +// Add worker to the tree +streamer.add_worker(worker_public_key)! +// This will keep running and listening for updates +streamer.listen()! + +println('Listening for updates...') +// Now we can read from the database +data := streamer.read( + id: 1 + worker_public_key: worker_public_key +)! + +println('Worker data: ${data.bytestr()}') diff --git a/lib/data/ourdb/mycelium_streamer.v b/lib/data/ourdb/mycelium_streamer.v index 139ebb02..ff4626ff 100644 --- a/lib/data/ourdb/mycelium_streamer.v +++ b/lib/data/ourdb/mycelium_streamer.v @@ -124,25 +124,29 @@ pub: // listen continuously checks for messages from master and applies updates pub fn (mut s MyceliumStreamer) listen() ! { + println('Listening for updates from master...') spawn fn [mut s] () ! { - println('Starting to listen for messages...') - msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync')! - - update_data := base64.decode(msg.payload) - println('Received update from worker: ${msg.src_pk}') - println('Received update with payload: ${update_data.str()}') - if mut worker := s.workers[msg.src_pk] { - println('Worker with public key ${msg.src_pk} found') - worker.sync_updates(update_data) or { return error('Failed to sync worker: ${err}') } - } + s.listen_()! }() +} + +fn (mut s MyceliumStreamer) listen_() ! { + println('Listening...') + msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync')! + + update_data := base64.decode(msg.payload) + if mut worker := s.workers[msg.src_pk] { + worker.sync_updates(update_data) or { return error('Failed to sync worker: ${err}') } + } + time.sleep(time.second * 1) - return s.listen() + return s.listen_() } pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 { if args.worker_public_key.len > 0 { if mut worker := s.workers[args.worker_public_key] { + println('Reading from worker: ${args.worker_public_key}') return worker.get(args.id)! } return error('Worker with public key ${args.worker_public_key} not found')