feat: Improve MyceliumStreamer worker communication

- Add worker registration to MyceliumStreamer:  Allows for explicit
  addition of workers, improving management and control.
- Simplify worker message handling: Streamlines message processing
  for increased efficiency and readability.  Removes unnecessary
  logging and simplifies message routing.
- Remove redundant message handling: Eliminates duplicate code
  paths for cleaner and more maintainable code.
- Improve worker data retrieval: Facilitates direct data retrieval
  from workers, enhancing efficiency and reliability.
This commit is contained in:
Mahmoud Emad
2025-03-04 00:40:05 +02:00
parent 5b69f935a5
commit 485b47d145
3 changed files with 21 additions and 58 deletions

View File

@@ -11,5 +11,7 @@ mut streamer := ourdb.new_streamer(
is_worker: true
)!
streamer.add_worker(worker_public_key)!
// Initialize and run worker node
streamer.listen()!

View File

@@ -8,13 +8,13 @@ const mbyte_ = 1000000
@[heap]
pub struct OurDB {
mut:
lookup &LookupTable
lookup &LookupTable @[skip; str: skip]
pub:
path string // is the directory in which we will have the lookup db as well as all the backend
incremental_mode bool
file_size u32 = 500 * (1 << 20) // 500MB
pub mut:
file os.File
file os.File @[skip; str: skip]
file_nr u16 // the file which is open
last_used_file_nr u16
}

View File

@@ -108,14 +108,13 @@ pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 {
data := s.master.push_updates(id) or { return error('Failed to push updates due to: ${err}') }
// Broadcast to all workers
for worker_key, mut _ in s.workers {
println('Sending message to worker: ${worker_key}')
msg := s.mycelium_client.send_msg(
public_key: worker_key // destination public key
payload: base64.encode(data) // message payload
topic: 'db_sync' // optional topic
for worker_key, mut worker in s.workers {
s.mycelium_client.send_msg(
public_key: worker_key
payload: base64.encode(data)
topic: 'db_sync'
)!
println('Sent message ID: ${msg.id}')
worker.sync_updates(data) or { return error('Failed to sync worker: ${err}') }
}
return id
}
@@ -129,37 +128,20 @@ pub:
// listen continuously checks for messages from master and applies updates
pub fn (mut s MyceliumStreamer) listen() ! {
spawn fn [mut s] () ! {
msg := s.mycelium_client.receive_msg(wait: true, peek: true) or {
msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync') or {
return error('Failed to receive message: ${err}')
}
println('Received message topic: ${msg.topic}')
if msg.topic == 'db_sync' {
if msg.payload.len > 0 {
update_data := base64.decode(msg.payload)
if mut worker := s.workers[msg.dst_pk] {
worker.sync_updates(update_data) or {
return error('Failed to sync worker: ${err}')
}
if msg.payload.len > 0 {
update_data := base64.decode(msg.payload)
if mut worker := s.workers[msg.dst_pk] {
worker.sync_updates(update_data) or {
return error('Failed to sync worker: ${err}')
}
}
}
if msg.topic == 'get_db' {
// Send the entire database to the worker
if mut worker := s.workers[msg.dst_pk] {
// convert database to base64
to_json := json2.encode(worker).bytes()
to_base64 := base64.encode(to_json)
s.mycelium_client.reply_msg(
id: msg.id
public_key: msg.src_pk
payload: to_base64
topic: 'get_db'
)!
}
}
}()
time.sleep(time.second * 1)
return s.listen()
}
@@ -172,30 +154,9 @@ pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 {
}
fn (mut s MyceliumStreamer) read_from_worker(args MyceliumReadArgs) ![]u8 {
println('Reading from worker: ${args.worker_public_key}')
if mut _ := s.workers[args.worker_public_key] {
s.mycelium_client.send_msg(
public_key: args.worker_public_key
payload: ''
topic: 'get_db'
)!
if mut worker := s.workers[args.worker_public_key] {
// We need to think about reading from the workers through the mycelium client.
return worker.get(args.id)!
}
msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'get_db') or {
return error('Failed to receive message: ${err}')
}
println('msg: ${msg}')
if msg.payload.len > 0 {
to_json := base64.decode(msg.payload)
mut worker_db := json2.decode[OurDB](to_json.bytestr())!
println('worker_db: ${worker_db}')
value := worker_db.get(args.id) or {
return error('Failed to get id ${args.id} from worker db: ${err}')
}
return value
}
return error('read_from_worker failed')
return error('worker with public key ${args.worker_public_key} not found')
}