feat: Improve Mycelium client and streamer

- Changed Mycelium worker port to avoid conflict with master.
- Added debug print statements to Mycelium client for better troubleshooting.
- Removed unnecessary `SyncData` struct, simplifying data handling.
- Updated data encoding/decoding to directly use base64 for efficiency.
- Clarified message topic names for better understanding.
This commit is contained in:
Mahmoud Emad
2025-03-03 12:50:59 +02:00
parent 368edcd93a
commit d852ecc5b1
2 changed files with 12 additions and 29 deletions

View File

@@ -6,7 +6,7 @@ import freeflowuniverse.herolib.data.ourdb
mut streamer := ourdb.get_streamer(id: 'frBvtZQeqf') or { mut streamer := ourdb.get_streamer(id: 'frBvtZQeqf') or {
ourdb.new_streamer( ourdb.new_streamer(
incremental_mode: false incremental_mode: false
server_port: 9001 // Use different port than master server_port: 9000 // Use different port than master
is_worker: true is_worker: true
)! )!
} }

View File

@@ -6,14 +6,6 @@ import time
import encoding.base64 import encoding.base64
import json import json
// SyncData encodes binary data as base64 string for JSON compatibility
struct SyncData {
pub:
id u32
data string // base64 encoded []u8
topic string = 'db_sync'
}
struct MyceliumStreamer { struct MyceliumStreamer {
pub mut: pub mut:
master &OurDB @[skip; str: skip] master &OurDB @[skip; str: skip]
@@ -112,23 +104,13 @@ pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 {
// Get updates from the beginning (id 0) to ensure complete sync // Get updates from the beginning (id 0) to ensure complete sync
data := s.master.push_updates(id) or { return error('Failed to push updates due to: ${err}') } data := s.master.push_updates(id) or { return error('Failed to push updates due to: ${err}') }
// Create sync data
sync_data := SyncData{
id: id
data: base64.encode(data) // encode binary data directly
topic: 'db_sync'
}
// Convert to JSON
json_data := json.encode(sync_data)
// Broadcast to all workers // Broadcast to all workers
for worker_key, mut _ in s.workers { for worker_key, mut _ in s.workers {
println('Sending message to worker: ${worker_key}') println('Sending message to worker: ${worker_key}')
msg := s.mycelium_client.send_msg( msg := s.mycelium_client.send_msg(
public_key: worker_key // destination public key public_key: worker_key // destination public key
payload: data.str() // message payload payload: base64.encode(data) // message payload
topic: 'sync_db' // optional topic topic: 'db_sync' // optional topic
)! )!
println('Sent message ID: ${msg.id}') println('Sent message ID: ${msg.id}')
} }
@@ -146,16 +128,16 @@ pub fn (mut s MyceliumStreamer) listen() ! {
println('Starting to listen for messages...') println('Starting to listen for messages...')
spawn fn [mut s] () { spawn fn [mut s] () {
for { for {
println('Listening for messages...')
// Check for updates from master // Check for updates from master
if msg := s.mycelium_client.receive_msg(wait: true, peek: false, topic: 'db_sync') { if msg := s.mycelium_client.receive_msg(
// Decode message payload as JSON wait: true
sync_data := json.decode(SyncData, msg.payload) or { peek: false
eprintln('Failed to decode sync data JSON: ${err}') topic: 'db_sync'
continue )
} {
// Decode the base64 data // Decode the base64 data
update_data := base64.decode(sync_data.data) update_data := base64.decode(msg.payload)
if update_data.len == 0 { if update_data.len == 0 {
eprintln('Failed to decode base64 data') eprintln('Failed to decode base64 data')
continue continue
@@ -163,6 +145,7 @@ pub fn (mut s MyceliumStreamer) listen() ! {
// Find the target worker and apply updates // Find the target worker and apply updates
if mut worker := s.workers[msg.src_pk] { if mut worker := s.workers[msg.src_pk] {
println('Received update from worker: ${msg.src_pk}')
worker.sync_updates(update_data) or { worker.sync_updates(update_data) or {
eprintln('Failed to sync worker: ${err}') eprintln('Failed to sync worker: ${err}')
continue continue