diff --git a/examples/data/deduped_mycelium_master.vsh b/examples/data/deduped_mycelium_master.vsh index 2280fd4d..82fd412e 100755 --- a/examples/data/deduped_mycelium_master.vsh +++ b/examples/data/deduped_mycelium_master.vsh @@ -1,86 +1,113 @@ #!/usr/bin/env -S v -n -w -gc none -cc tcc -d use_openssl -enable-globals run -import freeflowuniverse.herolib.clients.mycelium -import freeflowuniverse.herolib.installers.net.mycelium_installer import freeflowuniverse.herolib.data.ourdb -import freeflowuniverse.herolib.osal -import time -import os -import encoding.base64 -import json -// NOTE: Before running this script, ensure that the mycelium binary is installed and in the PATH +worker_public_key := '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c' -const master_port = 9000 -const slave_public_key = '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c' -const slave_address = '59c:28ee:8597:6c20:3b2f:a9ee:2e18:9d4f' +mut streamer := ourdb.new_streamer(incremental_mode: false)! +streamer.add_worker(worker_public_key)! // Mycelium public key -// Struct to hold data for syncing -struct SyncData { - id u32 - data string - topic string = 'db_sync' -} +id := streamer.write(id: 1, value: 'Record 1')! -mycelium.delete()! +println('ID: ${id}') -// Initialize mycelium clients -mut master := mycelium.get()! -master.server_url = 'http://localhost:${master_port}' -master.name = 'master_node' +master_data := streamer.read(id: id)! +worker_data := streamer.read(id: id, worker_public_key: worker_public_key)! -// Get public keys for communication -// master_inspect := mycelium.inspect(key_file_path: '/tmp/mycelium_server1/priv_key.bin')! -// println('Server 1 (Master Node) public key: ${master_inspect.public_key}') +// assert master_data == worker_data -// Initialize ourdb instances -mut db := ourdb.new( - record_nr_max: 16777216 - 1 - record_size_max: 1024 - path: '/tmp/ourdb1' - reset: true -)! +master_data_str := master_data.bytestr() +worker_data_str := worker_data.bytestr() -defer { - db.destroy() or { panic('failed to destroy db1: ${err}') } -} +println('Master data: ${master_data_str}') +println('Worker data: ${worker_data_str}') -// Store in master db -println('\nStoring data in master node DB...') -data := 'Test data for sync - ' + time.now().str() -id := db.set(data: data.bytes())! -println('Successfully stored data in master node DB with ID: ${id}') +// println('data: ${data.str()}') +// println('streamer: ${streamer.master}') +// mut db := ourdb.new('/tmp/ourdb')! +// println('Database path: ${db.path}') -// Create sync data -sync_data := SyncData{ - id: id - data: data -} +// import freeflowuniverse.herolib.clients.mycelium +// import freeflowuniverse.herolib.installers.net.mycelium_installer +// import freeflowuniverse.herolib.data.ourdb +// import freeflowuniverse.herolib.osal +// import time +// import os +// import encoding.base64 +// import json -// Convert to JSON -json_data := json.encode(sync_data) +// // NOTE: Before running this script, ensure that the mycelium binary is installed and in the PATH -// Send sync message to slave -println('\nSending sync message to slave...') -msg := master.send_msg( - public_key: slave_public_key - payload: json_data - topic: 'db_sync' -)! +// const master_port = 9000 +// const slave_public_key = '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c' +// const slave_address = '59c:28ee:8597:6c20:3b2f:a9ee:2e18:9d4f' -println('Sync message sent with ID: ${msg.id} to slave with public key: ${slave_public_key}') +// // Struct to hold data for syncing +// struct SyncData { +// id u32 +// data string +// topic string = 'db_sync' +// } -// Receive messages -// Parameters: wait_for_message, peek_only, topic_filter -received := master.receive_msg(wait: true, peek: false, topic: 'db_sync')! -println('Received message from: ${received.src_pk}') -println('Message payload: ${base64.decode_str(received.payload)}') +// mycelium.delete()! -master.reply_msg( - id: received.id - public_key: received.src_pk - payload: 'Got your message!' - topic: 'db_sync' -)! +// // Initialize mycelium clients +// mut master := mycelium.get()! +// master.server_url = 'http://localhost:${master_port}' +// master.name = 'master_node' -println('Message sent to slave with ID: ${msg.id}') +// // Get public keys for communication +// // master_inspect := mycelium.inspect(key_file_path: '/tmp/mycelium_server1/priv_key.bin')! +// // println('Server 1 (Master Node) public key: ${master_inspect.public_key}') + +// // Initialize ourdb instances +// mut db := ourdb.new( +// record_nr_max: 16777216 - 1 +// record_size_max: 1024 +// path: '/tmp/ourdb1' +// reset: true +// )! + +// defer { +// db.destroy() or { panic('failed to destroy db1: ${err}') } +// } + +// // Receive messages +// // Parameters: wait_for_message, peek_only, topic_filter +// received := master.receive_msg(wait: true, peek: false, topic: 'db_sync')! +// println('Received message from: ${received.src_pk}') +// println('Message payload: ${base64.decode_str(received.payload)}') + +// // Store in master db +// println('\nStoring data in master node DB...') +// data := 'Test data for sync - ' + time.now().str() +// id := db.set(data: data.bytes())! +// println('Successfully stored data in master node DB with ID: ${id}') + +// // Create sync data +// sync_data := SyncData{ +// id: id +// data: data +// } + +// // Convert to JSON +// json_data := json.encode(sync_data) + +// // Send sync message to slave +// println('\nSending sync message to slave...') +// msg := master.send_msg( +// public_key: slave_public_key +// payload: json_data +// topic: 'db_sync' +// )! + +// println('Sync message sent with ID: ${msg.id} to slave with public key: ${slave_public_key}') + +// // master.reply_msg( +// // id: received.id +// // public_key: received.src_pk +// // payload: 'Got your message!' +// // topic: 'db_sync' +// // )! + +// // println('Message sent to slave with ID: ${msg.id}') diff --git a/examples/data/deduped_mycelium_slave.vsh b/examples/data/deduped_mycelium_slave.vsh index 3eb09f0a..816624fb 100755 --- a/examples/data/deduped_mycelium_slave.vsh +++ b/examples/data/deduped_mycelium_slave.vsh @@ -46,6 +46,12 @@ defer { db.destroy() or { panic('failed to destroy db1: ${err}') } } +// Receive messages +// Parameters: wait_for_message, peek_only, topic_filter +received := slave.receive_msg(wait: true, peek: false, topic: 'db_sync')! +println('Received message from: ${received.src_pk}') +println('Message payload: ${base64.decode_str(received.payload)}') + // Send the last inserted record id to the master // data := 'Test data for sync - ' + time.now().str() id := db.get_last_index()! @@ -59,17 +65,11 @@ msg := slave.send_msg( topic: 'db_sync' )! -// Receive messages -// Parameters: wait_for_message, peek_only, topic_filter -received := slave.receive_msg(wait: true, peek: false, topic: 'db_sync')! -println('Received message from: ${received.src_pk}') -println('Message payload: ${base64.decode_str(received.payload)}') +// slave.reply_msg( +// id: received.id +// public_key: received.src_pk +// payload: 'Got your message!' +// topic: 'db_sync' +// )! -slave.reply_msg( - id: received.id - public_key: received.src_pk - payload: 'Got your message!' - topic: 'db_sync' -)! - -println('Message sent to master with ID: ${msg.id}') +// println('Message sent to master with ID: ${msg.id}') diff --git a/lib/data/ourdb/mycelium_streamer.v b/lib/data/ourdb/mycelium_streamer.v new file mode 100644 index 00000000..3a991c39 --- /dev/null +++ b/lib/data/ourdb/mycelium_streamer.v @@ -0,0 +1,80 @@ +module ourdb + +import freeflowuniverse.herolib.clients.mycelium + +struct MyceliumStreamer { +pub mut: + master &OurDB @[skip; str: skip] + workers map[string]&OurDB @[skip; str: skip] // key is mycelium public key, value is ourdb + incremental_mode bool = true // default is true +} + +pub struct NewStreamerArgs { +pub mut: + incremental_mode bool = true // default is true +} + +fn new_db_streamer(args NewStreamerArgs) !OurDB { + return new( + record_nr_max: 16777216 - 1 + record_size_max: 1024 + path: '/tmp/ourdb1' + reset: true + incremental_mode: args.incremental_mode + )! +} + +pub fn (mut s MyceliumStreamer) add_worker(public_key string) ! { + mut db := new_db_streamer(incremental_mode: s.incremental_mode)! + s.workers[public_key] = &db +} + +pub fn new_streamer(args NewStreamerArgs) !MyceliumStreamer { + mut db := new_db_streamer(args)! + mut s := MyceliumStreamer{ + master: &db + workers: {} + incremental_mode: args.incremental_mode + } + return s +} + +@[params] +pub struct MyceliumRecordArgs { +pub: + id u32 @[required] + value string @[required] +} + +pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 { + mut id := s.master.set(id: record.id, data: record.value.bytes()) or { + return error('Failed to set id ${record.id} to value ${record.value} due to: ${err}') + } + + // Get updates from the beginning (id 0) to ensure complete sync + data := s.master.push_updates(0) or { return error('Failed to push updates due to: ${err}') } + + // Broadcast to all workers + for _, mut worker in s.workers { + worker.sync_updates(data) or { + return error('Failed to sync updates to worker due to: ${err}') + } + } + return id +} + +pub struct MyceliumReadArgs { +pub: + id u32 @[required] + worker_public_key string +} + +pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 { + if args.worker_public_key.len > 0 { + if mut worker := s.workers[args.worker_public_key] { + return worker.get(args.id)! + } + return error('Worker with public key ${args.worker_public_key} not found') + } + return s.master.get(args.id)! +}