feat: Improve Mycelium-based data synchronization
- Refactor data synchronization logic to use Mycelium messages for efficient updates between master and worker nodes. This removes the previous inefficient polling method and simplifies the code. - Update the slave node to receive and apply updates from the master, improving synchronization efficiency and robustness. - Change the default slave port to 9000. - Rename `db` variable to `worker` for clarity.
This commit is contained in:
@@ -9,19 +9,10 @@ import os
|
|||||||
import encoding.base64
|
import encoding.base64
|
||||||
import json
|
import json
|
||||||
|
|
||||||
// NOTE: Before running this script, ensure that the mycelium binary is installed and in the PATH
|
const slave_port = 9000
|
||||||
|
|
||||||
const slave_port = 9001
|
|
||||||
const master_public_key = '89c2eeb24bcdfaaac78c0023a166d88f760c097c1a57748770e432ba10757179'
|
const master_public_key = '89c2eeb24bcdfaaac78c0023a166d88f760c097c1a57748770e432ba10757179'
|
||||||
const master_address = '458:90d4:a3ef:b285:6d32:a22d:9e73:697f'
|
const master_address = '458:90d4:a3ef:b285:6d32:a22d:9e73:697f'
|
||||||
|
|
||||||
// Struct to hold data for syncing
|
|
||||||
struct SyncData {
|
|
||||||
id u32
|
|
||||||
data string
|
|
||||||
topic string = 'db_sync'
|
|
||||||
}
|
|
||||||
|
|
||||||
mycelium.delete()!
|
mycelium.delete()!
|
||||||
|
|
||||||
// Initialize mycelium clients
|
// Initialize mycelium clients
|
||||||
@@ -35,7 +26,7 @@ slave_inspect := mycelium.inspect(key_file_path: '/tmp/mycelium_server1/priv_key
|
|||||||
println('Server 2 (slave Node) public key: ${slave_inspect.public_key}')
|
println('Server 2 (slave Node) public key: ${slave_inspect.public_key}')
|
||||||
|
|
||||||
// Initialize ourdb instances
|
// Initialize ourdb instances
|
||||||
mut db := ourdb.new(
|
mut worker := ourdb.new(
|
||||||
record_nr_max: 16777216 - 1
|
record_nr_max: 16777216 - 1
|
||||||
record_size_max: 1024
|
record_size_max: 1024
|
||||||
path: '/tmp/ourdb1'
|
path: '/tmp/ourdb1'
|
||||||
@@ -43,7 +34,7 @@ mut db := ourdb.new(
|
|||||||
)!
|
)!
|
||||||
|
|
||||||
defer {
|
defer {
|
||||||
db.destroy() or { panic('failed to destroy db1: ${err}') }
|
worker.destroy() or { panic('failed to destroy db1: ${err}') }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Receive messages
|
// Receive messages
|
||||||
@@ -52,24 +43,5 @@ received := slave.receive_msg(wait: true, peek: false, topic: 'db_sync')!
|
|||||||
println('Received message from: ${received.src_pk}')
|
println('Received message from: ${received.src_pk}')
|
||||||
println('Message payload: ${base64.decode_str(received.payload)}')
|
println('Message payload: ${base64.decode_str(received.payload)}')
|
||||||
|
|
||||||
// Send the last inserted record id to the master
|
payload := received.payload
|
||||||
// data := 'Test data for sync - ' + time.now().str()
|
worker.sync_updates(payload.bytes()) or { error('Failed to sync updates to worker due to: ${err}') }
|
||||||
id := db.get_last_index()!
|
|
||||||
println('Last inserted record id: ${id}')
|
|
||||||
|
|
||||||
// Send sync message to slave
|
|
||||||
println('\nSending sync message to slave...')
|
|
||||||
msg := slave.send_msg(
|
|
||||||
public_key: master_public_key
|
|
||||||
payload: 'last_inserted_record_id,${id}'
|
|
||||||
topic: 'db_sync'
|
|
||||||
)!
|
|
||||||
|
|
||||||
// slave.reply_msg(
|
|
||||||
// id: received.id
|
|
||||||
// public_key: received.src_pk
|
|
||||||
// payload: 'Got your message!'
|
|
||||||
// topic: 'db_sync'
|
|
||||||
// )!
|
|
||||||
|
|
||||||
// println('Message sent to master with ID: ${msg.id}')
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ pub mut:
|
|||||||
master &OurDB @[skip; str: skip]
|
master &OurDB @[skip; str: skip]
|
||||||
workers map[string]&OurDB @[skip; str: skip] // key is mycelium public key, value is ourdb
|
workers map[string]&OurDB @[skip; str: skip] // key is mycelium public key, value is ourdb
|
||||||
incremental_mode bool = true // default is true
|
incremental_mode bool = true // default is true
|
||||||
|
mycelium_client &mycelium.Mycelium
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NewStreamerArgs {
|
pub struct NewStreamerArgs {
|
||||||
@@ -35,7 +36,9 @@ pub fn new_streamer(args NewStreamerArgs) !MyceliumStreamer {
|
|||||||
master: &db
|
master: &db
|
||||||
workers: {}
|
workers: {}
|
||||||
incremental_mode: args.incremental_mode
|
incremental_mode: args.incremental_mode
|
||||||
|
mycelium_client: &mycelium.Mycelium{}
|
||||||
}
|
}
|
||||||
|
s.mycelium_client = mycelium.get()!
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,13 +55,15 @@ pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get updates from the beginning (id 0) to ensure complete sync
|
// Get updates from the beginning (id 0) to ensure complete sync
|
||||||
data := s.master.push_updates(0) or { return error('Failed to push updates due to: ${err}') }
|
data := s.master.push_updates(id) or { return error('Failed to push updates due to: ${err}') }
|
||||||
|
|
||||||
// Broadcast to all workers
|
// Broadcast to all workers
|
||||||
for _, mut worker in s.workers {
|
for worker_key, mut _ in s.workers {
|
||||||
worker.sync_updates(data) or {
|
s.mycelium_client.send_msg(
|
||||||
return error('Failed to sync updates to worker due to: ${err}')
|
public_key: worker_key // destination public key
|
||||||
}
|
payload: data.str() // message payload
|
||||||
|
topic: 'sync_db' // optional topic
|
||||||
|
)!
|
||||||
}
|
}
|
||||||
return id
|
return id
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user