feat: Implement MyceliumStreamer for distributed data synchronization
- Introduces `MyceliumStreamer` for synchronizing data across a Mycelium network, enabling distributed data access. - Allows adding multiple worker nodes to the streamer for data replication and redundancy. - Provides `write` and `read` methods for seamless data management across nodes.
This commit is contained in:
@@ -1,86 +1,113 @@
|
||||
#!/usr/bin/env -S v -n -w -gc none -cc tcc -d use_openssl -enable-globals run
|
||||
|
||||
import freeflowuniverse.herolib.clients.mycelium
|
||||
import freeflowuniverse.herolib.installers.net.mycelium_installer
|
||||
import freeflowuniverse.herolib.data.ourdb
|
||||
import freeflowuniverse.herolib.osal
|
||||
import time
|
||||
import os
|
||||
import encoding.base64
|
||||
import json
|
||||
|
||||
// NOTE: Before running this script, ensure that the mycelium binary is installed and in the PATH
|
||||
worker_public_key := '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c'
|
||||
|
||||
const master_port = 9000
|
||||
const slave_public_key = '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c'
|
||||
const slave_address = '59c:28ee:8597:6c20:3b2f:a9ee:2e18:9d4f'
|
||||
mut streamer := ourdb.new_streamer(incremental_mode: false)!
|
||||
streamer.add_worker(worker_public_key)! // Mycelium public key
|
||||
|
||||
// Struct to hold data for syncing
|
||||
struct SyncData {
|
||||
id u32
|
||||
data string
|
||||
topic string = 'db_sync'
|
||||
}
|
||||
id := streamer.write(id: 1, value: 'Record 1')!
|
||||
|
||||
mycelium.delete()!
|
||||
println('ID: ${id}')
|
||||
|
||||
// Initialize mycelium clients
|
||||
mut master := mycelium.get()!
|
||||
master.server_url = 'http://localhost:${master_port}'
|
||||
master.name = 'master_node'
|
||||
master_data := streamer.read(id: id)!
|
||||
worker_data := streamer.read(id: id, worker_public_key: worker_public_key)!
|
||||
|
||||
// Get public keys for communication
|
||||
// master_inspect := mycelium.inspect(key_file_path: '/tmp/mycelium_server1/priv_key.bin')!
|
||||
// println('Server 1 (Master Node) public key: ${master_inspect.public_key}')
|
||||
// assert master_data == worker_data
|
||||
|
||||
// Initialize ourdb instances
|
||||
mut db := ourdb.new(
|
||||
record_nr_max: 16777216 - 1
|
||||
record_size_max: 1024
|
||||
path: '/tmp/ourdb1'
|
||||
reset: true
|
||||
)!
|
||||
master_data_str := master_data.bytestr()
|
||||
worker_data_str := worker_data.bytestr()
|
||||
|
||||
defer {
|
||||
db.destroy() or { panic('failed to destroy db1: ${err}') }
|
||||
}
|
||||
println('Master data: ${master_data_str}')
|
||||
println('Worker data: ${worker_data_str}')
|
||||
|
||||
// Store in master db
|
||||
println('\nStoring data in master node DB...')
|
||||
data := 'Test data for sync - ' + time.now().str()
|
||||
id := db.set(data: data.bytes())!
|
||||
println('Successfully stored data in master node DB with ID: ${id}')
|
||||
// println('data: ${data.str()}')
|
||||
// println('streamer: ${streamer.master}')
|
||||
// mut db := ourdb.new('/tmp/ourdb')!
|
||||
// println('Database path: ${db.path}')
|
||||
|
||||
// Create sync data
|
||||
sync_data := SyncData{
|
||||
id: id
|
||||
data: data
|
||||
}
|
||||
// import freeflowuniverse.herolib.clients.mycelium
|
||||
// import freeflowuniverse.herolib.installers.net.mycelium_installer
|
||||
// import freeflowuniverse.herolib.data.ourdb
|
||||
// import freeflowuniverse.herolib.osal
|
||||
// import time
|
||||
// import os
|
||||
// import encoding.base64
|
||||
// import json
|
||||
|
||||
// Convert to JSON
|
||||
json_data := json.encode(sync_data)
|
||||
// // NOTE: Before running this script, ensure that the mycelium binary is installed and in the PATH
|
||||
|
||||
// Send sync message to slave
|
||||
println('\nSending sync message to slave...')
|
||||
msg := master.send_msg(
|
||||
public_key: slave_public_key
|
||||
payload: json_data
|
||||
topic: 'db_sync'
|
||||
)!
|
||||
// const master_port = 9000
|
||||
// const slave_public_key = '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c'
|
||||
// const slave_address = '59c:28ee:8597:6c20:3b2f:a9ee:2e18:9d4f'
|
||||
|
||||
println('Sync message sent with ID: ${msg.id} to slave with public key: ${slave_public_key}')
|
||||
// // Struct to hold data for syncing
|
||||
// struct SyncData {
|
||||
// id u32
|
||||
// data string
|
||||
// topic string = 'db_sync'
|
||||
// }
|
||||
|
||||
// Receive messages
|
||||
// Parameters: wait_for_message, peek_only, topic_filter
|
||||
received := master.receive_msg(wait: true, peek: false, topic: 'db_sync')!
|
||||
println('Received message from: ${received.src_pk}')
|
||||
println('Message payload: ${base64.decode_str(received.payload)}')
|
||||
// mycelium.delete()!
|
||||
|
||||
master.reply_msg(
|
||||
id: received.id
|
||||
public_key: received.src_pk
|
||||
payload: 'Got your message!'
|
||||
topic: 'db_sync'
|
||||
)!
|
||||
// // Initialize mycelium clients
|
||||
// mut master := mycelium.get()!
|
||||
// master.server_url = 'http://localhost:${master_port}'
|
||||
// master.name = 'master_node'
|
||||
|
||||
println('Message sent to slave with ID: ${msg.id}')
|
||||
// // Get public keys for communication
|
||||
// // master_inspect := mycelium.inspect(key_file_path: '/tmp/mycelium_server1/priv_key.bin')!
|
||||
// // println('Server 1 (Master Node) public key: ${master_inspect.public_key}')
|
||||
|
||||
// // Initialize ourdb instances
|
||||
// mut db := ourdb.new(
|
||||
// record_nr_max: 16777216 - 1
|
||||
// record_size_max: 1024
|
||||
// path: '/tmp/ourdb1'
|
||||
// reset: true
|
||||
// )!
|
||||
|
||||
// defer {
|
||||
// db.destroy() or { panic('failed to destroy db1: ${err}') }
|
||||
// }
|
||||
|
||||
// // Receive messages
|
||||
// // Parameters: wait_for_message, peek_only, topic_filter
|
||||
// received := master.receive_msg(wait: true, peek: false, topic: 'db_sync')!
|
||||
// println('Received message from: ${received.src_pk}')
|
||||
// println('Message payload: ${base64.decode_str(received.payload)}')
|
||||
|
||||
// // Store in master db
|
||||
// println('\nStoring data in master node DB...')
|
||||
// data := 'Test data for sync - ' + time.now().str()
|
||||
// id := db.set(data: data.bytes())!
|
||||
// println('Successfully stored data in master node DB with ID: ${id}')
|
||||
|
||||
// // Create sync data
|
||||
// sync_data := SyncData{
|
||||
// id: id
|
||||
// data: data
|
||||
// }
|
||||
|
||||
// // Convert to JSON
|
||||
// json_data := json.encode(sync_data)
|
||||
|
||||
// // Send sync message to slave
|
||||
// println('\nSending sync message to slave...')
|
||||
// msg := master.send_msg(
|
||||
// public_key: slave_public_key
|
||||
// payload: json_data
|
||||
// topic: 'db_sync'
|
||||
// )!
|
||||
|
||||
// println('Sync message sent with ID: ${msg.id} to slave with public key: ${slave_public_key}')
|
||||
|
||||
// // master.reply_msg(
|
||||
// // id: received.id
|
||||
// // public_key: received.src_pk
|
||||
// // payload: 'Got your message!'
|
||||
// // topic: 'db_sync'
|
||||
// // )!
|
||||
|
||||
// // println('Message sent to slave with ID: ${msg.id}')
|
||||
|
||||
@@ -46,6 +46,12 @@ defer {
|
||||
db.destroy() or { panic('failed to destroy db1: ${err}') }
|
||||
}
|
||||
|
||||
// Receive messages
|
||||
// Parameters: wait_for_message, peek_only, topic_filter
|
||||
received := slave.receive_msg(wait: true, peek: false, topic: 'db_sync')!
|
||||
println('Received message from: ${received.src_pk}')
|
||||
println('Message payload: ${base64.decode_str(received.payload)}')
|
||||
|
||||
// Send the last inserted record id to the master
|
||||
// data := 'Test data for sync - ' + time.now().str()
|
||||
id := db.get_last_index()!
|
||||
@@ -59,17 +65,11 @@ msg := slave.send_msg(
|
||||
topic: 'db_sync'
|
||||
)!
|
||||
|
||||
// Receive messages
|
||||
// Parameters: wait_for_message, peek_only, topic_filter
|
||||
received := slave.receive_msg(wait: true, peek: false, topic: 'db_sync')!
|
||||
println('Received message from: ${received.src_pk}')
|
||||
println('Message payload: ${base64.decode_str(received.payload)}')
|
||||
// slave.reply_msg(
|
||||
// id: received.id
|
||||
// public_key: received.src_pk
|
||||
// payload: 'Got your message!'
|
||||
// topic: 'db_sync'
|
||||
// )!
|
||||
|
||||
slave.reply_msg(
|
||||
id: received.id
|
||||
public_key: received.src_pk
|
||||
payload: 'Got your message!'
|
||||
topic: 'db_sync'
|
||||
)!
|
||||
|
||||
println('Message sent to master with ID: ${msg.id}')
|
||||
// println('Message sent to master with ID: ${msg.id}')
|
||||
|
||||
80
lib/data/ourdb/mycelium_streamer.v
Normal file
80
lib/data/ourdb/mycelium_streamer.v
Normal file
@@ -0,0 +1,80 @@
|
||||
module ourdb
|
||||
|
||||
import freeflowuniverse.herolib.clients.mycelium
|
||||
|
||||
struct MyceliumStreamer {
|
||||
pub mut:
|
||||
master &OurDB @[skip; str: skip]
|
||||
workers map[string]&OurDB @[skip; str: skip] // key is mycelium public key, value is ourdb
|
||||
incremental_mode bool = true // default is true
|
||||
}
|
||||
|
||||
pub struct NewStreamerArgs {
|
||||
pub mut:
|
||||
incremental_mode bool = true // default is true
|
||||
}
|
||||
|
||||
fn new_db_streamer(args NewStreamerArgs) !OurDB {
|
||||
return new(
|
||||
record_nr_max: 16777216 - 1
|
||||
record_size_max: 1024
|
||||
path: '/tmp/ourdb1'
|
||||
reset: true
|
||||
incremental_mode: args.incremental_mode
|
||||
)!
|
||||
}
|
||||
|
||||
pub fn (mut s MyceliumStreamer) add_worker(public_key string) ! {
|
||||
mut db := new_db_streamer(incremental_mode: s.incremental_mode)!
|
||||
s.workers[public_key] = &db
|
||||
}
|
||||
|
||||
pub fn new_streamer(args NewStreamerArgs) !MyceliumStreamer {
|
||||
mut db := new_db_streamer(args)!
|
||||
mut s := MyceliumStreamer{
|
||||
master: &db
|
||||
workers: {}
|
||||
incremental_mode: args.incremental_mode
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
@[params]
|
||||
pub struct MyceliumRecordArgs {
|
||||
pub:
|
||||
id u32 @[required]
|
||||
value string @[required]
|
||||
}
|
||||
|
||||
pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 {
|
||||
mut id := s.master.set(id: record.id, data: record.value.bytes()) or {
|
||||
return error('Failed to set id ${record.id} to value ${record.value} due to: ${err}')
|
||||
}
|
||||
|
||||
// Get updates from the beginning (id 0) to ensure complete sync
|
||||
data := s.master.push_updates(0) or { return error('Failed to push updates due to: ${err}') }
|
||||
|
||||
// Broadcast to all workers
|
||||
for _, mut worker in s.workers {
|
||||
worker.sync_updates(data) or {
|
||||
return error('Failed to sync updates to worker due to: ${err}')
|
||||
}
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
pub struct MyceliumReadArgs {
|
||||
pub:
|
||||
id u32 @[required]
|
||||
worker_public_key string
|
||||
}
|
||||
|
||||
pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 {
|
||||
if args.worker_public_key.len > 0 {
|
||||
if mut worker := s.workers[args.worker_public_key] {
|
||||
return worker.get(args.id)!
|
||||
}
|
||||
return error('Worker with public key ${args.worker_public_key} not found')
|
||||
}
|
||||
return s.master.get(args.id)!
|
||||
}
|
||||
Reference in New Issue
Block a user