diff --git a/examples/data/deduped_mycelium_worker.vsh b/examples/data/deduped_mycelium_worker.vsh index 26e89371..014b0e92 100755 --- a/examples/data/deduped_mycelium_worker.vsh +++ b/examples/data/deduped_mycelium_worker.vsh @@ -11,5 +11,7 @@ mut streamer := ourdb.new_streamer( is_worker: true )! +streamer.add_worker(worker_public_key)! + // Initialize and run worker node streamer.listen()! diff --git a/lib/data/ourdb/factory.v b/lib/data/ourdb/factory.v index 26ef2fbf..9f5b13b6 100644 --- a/lib/data/ourdb/factory.v +++ b/lib/data/ourdb/factory.v @@ -8,13 +8,13 @@ const mbyte_ = 1000000 @[heap] pub struct OurDB { mut: - lookup &LookupTable + lookup &LookupTable @[skip; str: skip] pub: path string // is the directory in which we will have the lookup db as well as all the backend incremental_mode bool file_size u32 = 500 * (1 << 20) // 500MB pub mut: - file os.File + file os.File @[skip; str: skip] file_nr u16 // the file which is open last_used_file_nr u16 } diff --git a/lib/data/ourdb/mycelium_streamer.v b/lib/data/ourdb/mycelium_streamer.v index 7afbd738..7a3cc3fa 100644 --- a/lib/data/ourdb/mycelium_streamer.v +++ b/lib/data/ourdb/mycelium_streamer.v @@ -108,14 +108,13 @@ pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 { data := s.master.push_updates(id) or { return error('Failed to push updates due to: ${err}') } // Broadcast to all workers - for worker_key, mut _ in s.workers { - println('Sending message to worker: ${worker_key}') - msg := s.mycelium_client.send_msg( - public_key: worker_key // destination public key - payload: base64.encode(data) // message payload - topic: 'db_sync' // optional topic + for worker_key, mut worker in s.workers { + s.mycelium_client.send_msg( + public_key: worker_key + payload: base64.encode(data) + topic: 'db_sync' )! - println('Sent message ID: ${msg.id}') + worker.sync_updates(data) or { return error('Failed to sync worker: ${err}') } } return id } @@ -129,37 +128,20 @@ pub: // listen continuously checks for messages from master and applies updates pub fn (mut s MyceliumStreamer) listen() ! { spawn fn [mut s] () ! { - msg := s.mycelium_client.receive_msg(wait: true, peek: true) or { + msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync') or { return error('Failed to receive message: ${err}') } - println('Received message topic: ${msg.topic}') - if msg.topic == 'db_sync' { - if msg.payload.len > 0 { - update_data := base64.decode(msg.payload) - if mut worker := s.workers[msg.dst_pk] { - worker.sync_updates(update_data) or { - return error('Failed to sync worker: ${err}') - } + if msg.payload.len > 0 { + update_data := base64.decode(msg.payload) + if mut worker := s.workers[msg.dst_pk] { + worker.sync_updates(update_data) or { + return error('Failed to sync worker: ${err}') } } } - - if msg.topic == 'get_db' { - // Send the entire database to the worker - if mut worker := s.workers[msg.dst_pk] { - // convert database to base64 - to_json := json2.encode(worker).bytes() - to_base64 := base64.encode(to_json) - s.mycelium_client.reply_msg( - id: msg.id - public_key: msg.src_pk - payload: to_base64 - topic: 'get_db' - )! - } - } }() + time.sleep(time.second * 1) return s.listen() } @@ -172,30 +154,9 @@ pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 { } fn (mut s MyceliumStreamer) read_from_worker(args MyceliumReadArgs) ![]u8 { - println('Reading from worker: ${args.worker_public_key}') - if mut _ := s.workers[args.worker_public_key] { - s.mycelium_client.send_msg( - public_key: args.worker_public_key - payload: '' - topic: 'get_db' - )! + if mut worker := s.workers[args.worker_public_key] { + // We need to think about reading from the workers through the mycelium client. + return worker.get(args.id)! } - - msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'get_db') or { - return error('Failed to receive message: ${err}') - } - - println('msg: ${msg}') - - if msg.payload.len > 0 { - to_json := base64.decode(msg.payload) - mut worker_db := json2.decode[OurDB](to_json.bytestr())! - println('worker_db: ${worker_db}') - value := worker_db.get(args.id) or { - return error('Failed to get id ${args.id} from worker db: ${err}') - } - return value - } - - return error('read_from_worker failed') + return error('worker with public key ${args.worker_public_key} not found') }