diff --git a/examples/data/deduped_mycelium_master.vsh b/examples/data/deduped_mycelium_master.vsh index 421bfe2a..3dc112e5 100755 --- a/examples/data/deduped_mycelium_master.vsh +++ b/examples/data/deduped_mycelium_master.vsh @@ -5,7 +5,7 @@ import time // Known worker public key worker1_public_key := '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c' -worker2_public_key := '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c' +// worker2_public_key := '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c' // Create master node println('Starting master node...') @@ -19,7 +19,7 @@ println('Initializing workers...') // Add workers and initialize its database // You should run the deduped_mycelium_worker.vsh script for each worker streamer.add_worker(worker1_public_key)! -streamer.add_worker(worker2_public_key)! +// streamer.add_worker(worker2_public_key)! // When we preforming a write, we get the ID of the record // We basically write to the master database, and read from the workers normally @@ -28,9 +28,15 @@ mut id2 := streamer.write(id: 2, value: 'Record 2')! println('Master record 1 data: ${id1}') println('Master record 2 data: ${id2}') -// Read data from workers -worker_id1 := streamer.read(id: 1)! -worker_id2 := streamer.read(id: 2)! +// Read data from master +master_id1 := streamer.read(id: 1)! +master_id2 := streamer.read(id: 2)! +println('Master 1 data: ${master_id1.bytestr()}') +println('Master 2 data: ${master_id2.bytestr()}') -println('Worker 1 data: ${worker_id1}') -println('Worker 2 data: ${worker_id2}') +// Read data from workers +worker_id1 := streamer.read(id: 1, worker_public_key: worker1_public_key)! +worker_id2 := streamer.read(id: 2, worker_public_key: worker1_public_key)! + +println('Worker 1 data: ${worker_id1.bytestr()}') +println('Worker 2 data: ${worker_id2.bytestr()}') diff --git a/lib/data/ourdb/mycelium_streamer.v b/lib/data/ourdb/mycelium_streamer.v index 1067c205..7afbd738 100644 --- a/lib/data/ourdb/mycelium_streamer.v +++ b/lib/data/ourdb/mycelium_streamer.v @@ -4,6 +4,8 @@ import freeflowuniverse.herolib.clients.mycelium import rand import time import encoding.base64 +import json +import x.json2 struct MyceliumStreamer { pub mut: @@ -127,17 +129,36 @@ pub: // listen continuously checks for messages from master and applies updates pub fn (mut s MyceliumStreamer) listen() ! { spawn fn [mut s] () ! { - msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync') or { + msg := s.mycelium_client.receive_msg(wait: true, peek: true) or { return error('Failed to receive message: ${err}') } - if msg.payload.len > 0 { - update_data := base64.decode(msg.payload) - if mut worker := s.workers[msg.dst_pk] { - worker.sync_updates(update_data) or { - return error('Failed to sync worker: ${err}') + println('Received message topic: ${msg.topic}') + + if msg.topic == 'db_sync' { + if msg.payload.len > 0 { + update_data := base64.decode(msg.payload) + if mut worker := s.workers[msg.dst_pk] { + worker.sync_updates(update_data) or { + return error('Failed to sync worker: ${err}') + } } } } + + if msg.topic == 'get_db' { + // Send the entire database to the worker + if mut worker := s.workers[msg.dst_pk] { + // convert database to base64 + to_json := json2.encode(worker).bytes() + to_base64 := base64.encode(to_json) + s.mycelium_client.reply_msg( + id: msg.id + public_key: msg.src_pk + payload: to_base64 + topic: 'get_db' + )! + } + } }() time.sleep(time.second * 1) return s.listen() @@ -145,11 +166,36 @@ pub fn (mut s MyceliumStreamer) listen() ! { pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 { if args.worker_public_key.len > 0 { - if mut worker := s.workers[args.worker_public_key] { - println('Reading from worker: ${args.worker_public_key}') - return worker.get(args.id)! - } - return error('Worker with public key ${args.worker_public_key} not found') + return s.read_from_worker(args) } return s.master.get(args.id)! } + +fn (mut s MyceliumStreamer) read_from_worker(args MyceliumReadArgs) ![]u8 { + println('Reading from worker: ${args.worker_public_key}') + if mut _ := s.workers[args.worker_public_key] { + s.mycelium_client.send_msg( + public_key: args.worker_public_key + payload: '' + topic: 'get_db' + )! + } + + msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'get_db') or { + return error('Failed to receive message: ${err}') + } + + println('msg: ${msg}') + + if msg.payload.len > 0 { + to_json := base64.decode(msg.payload) + mut worker_db := json2.decode[OurDB](to_json.bytestr())! + println('worker_db: ${worker_db}') + value := worker_db.get(args.id) or { + return error('Failed to get id ${args.id} from worker db: ${err}') + } + return value + } + + return error('read_from_worker failed') +}