feat: Improve Mycelium slave and streamer communication
- Renamed the topic for database synchronization messages from 'db_sync' to 'sync_db' for clarity. - Updated the Mycelium slave to decode base64 payload before processing and to log received messages and their source. - Added logging to the Mycelium streamer to track sent messages. - Added a new feature to retrieve and log the last index from the worker after syncing updates. This improves monitoring and debugging capabilities.
This commit is contained in:
@@ -9,6 +9,8 @@ import os
|
|||||||
import encoding.base64
|
import encoding.base64
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
// TODO: Make the worker read the data from the streamer instead.
|
||||||
|
|
||||||
const slave_port = 9000
|
const slave_port = 9000
|
||||||
const master_public_key = '89c2eeb24bcdfaaac78c0023a166d88f760c097c1a57748770e432ba10757179'
|
const master_public_key = '89c2eeb24bcdfaaac78c0023a166d88f760c097c1a57748770e432ba10757179'
|
||||||
const master_address = '458:90d4:a3ef:b285:6d32:a22d:9e73:697f'
|
const master_address = '458:90d4:a3ef:b285:6d32:a22d:9e73:697f'
|
||||||
@@ -39,9 +41,16 @@ defer {
|
|||||||
|
|
||||||
// Receive messages
|
// Receive messages
|
||||||
// Parameters: wait_for_message, peek_only, topic_filter
|
// Parameters: wait_for_message, peek_only, topic_filter
|
||||||
received := slave.receive_msg(wait: true, peek: false, topic: 'db_sync')!
|
received := slave.receive_msg(wait: true, peek: false, topic: 'sync_db')!
|
||||||
println('Received message from: ${received.src_pk}')
|
println('Received message from: ${received.src_pk}')
|
||||||
println('Message payload: ${base64.decode_str(received.payload)}')
|
println('Message payload: ${base64.decode_str(received.payload)}')
|
||||||
|
|
||||||
payload := received.payload
|
payload := base64.decode(received.payload)
|
||||||
worker.sync_updates(payload.bytes()) or { error('Failed to sync updates to worker due to: ${err}') }
|
println('Payload: ${payload.str()}')
|
||||||
|
worker.sync_updates(received.payload.bytes()) or {
|
||||||
|
error('Failed to sync updates to worker due to: ${err}')
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get last index
|
||||||
|
last_index := worker.get_last_index()!
|
||||||
|
println('Last index: ${last_index}')
|
||||||
|
|||||||
@@ -67,11 +67,13 @@ pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 {
|
|||||||
|
|
||||||
// Broadcast to all workers
|
// Broadcast to all workers
|
||||||
for worker_key, mut _ in s.workers {
|
for worker_key, mut _ in s.workers {
|
||||||
s.mycelium_client.send_msg(
|
println('Sending message to worker: ${worker_key}')
|
||||||
|
msg := s.mycelium_client.send_msg(
|
||||||
public_key: worker_key // destination public key
|
public_key: worker_key // destination public key
|
||||||
payload: data.str() // message payload
|
payload: data.str() // message payload
|
||||||
topic: 'sync_db' // optional topic
|
topic: 'sync_db' // optional topic
|
||||||
)!
|
)!
|
||||||
|
println('Sent message ID: ${msg.id}')
|
||||||
}
|
}
|
||||||
return id
|
return id
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user