feat: Enhance Mycelium streamer with worker support

- Added worker ID to master and worker configurations for improved
  identification and management.
- Implemented worker registration and data synchronization mechanisms
  to enable distributed data access.
- Added a read function to retrieve data from specific workers,
  enhancing data access flexibility.
- Improved logging for better monitoring and debugging of the system.
This commit is contained in:
Mahmoud Emad
2025-03-03 14:18:55 +02:00
parent 1de9be2a8a
commit eb47c8490d
4 changed files with 31 additions and 14 deletions

View File

@@ -11,6 +11,7 @@ mut streamer := ourdb.new_streamer(
incremental_mode: false
server_port: 9000 // Master uses default port
is_worker: false
id: 'frBvtZQeqf'
)!
println('Starting master node...')

View File

@@ -2,6 +2,8 @@
import freeflowuniverse.herolib.data.ourdb
worker_public_key := '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c'
// Create a worker node with a unique database path
mut streamer := ourdb.get_streamer(id: 'frBvtZQeqf') or {
ourdb.new_streamer(
@@ -11,6 +13,16 @@ mut streamer := ourdb.get_streamer(id: 'frBvtZQeqf') or {
)!
}
println('Starting worker node...')
println('Listening for updates from master...')
streamer.listen()! // This will keep running and listening for updates
// Add worker to the tree
streamer.add_worker(worker_public_key)!
// This will keep running and listening for updates
streamer.listen()!
println('Listening for updates...')
// Now we can read from the database
data := streamer.read(
id: 1
worker_public_key: worker_public_key
)!
println('Worker data: ${data.bytestr()}')

View File

@@ -124,25 +124,29 @@ pub:
// listen continuously checks for messages from master and applies updates
pub fn (mut s MyceliumStreamer) listen() ! {
println('Listening for updates from master...')
spawn fn [mut s] () ! {
println('Starting to listen for messages...')
s.listen_()!
}()
}
fn (mut s MyceliumStreamer) listen_() ! {
println('Listening...')
msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync')!
update_data := base64.decode(msg.payload)
println('Received update from worker: ${msg.src_pk}')
println('Received update with payload: ${update_data.str()}')
if mut worker := s.workers[msg.src_pk] {
println('Worker with public key ${msg.src_pk} found')
worker.sync_updates(update_data) or { return error('Failed to sync worker: ${err}') }
}
}()
time.sleep(time.second * 1)
return s.listen()
return s.listen_()
}
pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 {
if args.worker_public_key.len > 0 {
if mut worker := s.workers[args.worker_public_key] {
println('Reading from worker: ${args.worker_public_key}')
return worker.get(args.id)!
}
return error('Worker with public key ${args.worker_public_key} not found')