From 368edcd93abc310ea5171c1bf8fcde831a4379a9 Mon Sep 17 00:00:00 2001 From: Mahmoud Emad Date: Mon, 3 Mar 2025 12:19:03 +0200 Subject: [PATCH] feat: Implement distributed database using Mycelium - Refactor database streamer to support multiple workers. - Add master node to manage and distribute data updates. - Implement worker nodes to receive and apply updates. - Remove unnecessary slave node. - Improve error handling and logging. - Use base64 encoding for JSON compatibility in data transfer. --- examples/data/deduped_mycelium_master.vsh | 25 ++++- examples/data/deduped_mycelium_slave.vsh | 56 ----------- examples/data/deduped_mycelium_worker.vsh | 16 +++ lib/data/ourdb/mycelium_streamer.v | 117 ++++++++++++++++++++-- 4 files changed, 144 insertions(+), 70 deletions(-) delete mode 100755 examples/data/deduped_mycelium_slave.vsh create mode 100755 examples/data/deduped_mycelium_worker.vsh diff --git a/examples/data/deduped_mycelium_master.vsh b/examples/data/deduped_mycelium_master.vsh index 87375c14..9b40d990 100755 --- a/examples/data/deduped_mycelium_master.vsh +++ b/examples/data/deduped_mycelium_master.vsh @@ -1,16 +1,33 @@ #!/usr/bin/env -S v -n -w -gc none -cc tcc -d use_openssl -enable-globals run import freeflowuniverse.herolib.data.ourdb +import time +// Known worker public key worker_public_key := '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c' -mut streamer := ourdb.new_streamer(incremental_mode: false)! -streamer.add_worker(worker_public_key)! // Mycelium public key +// Create master node +mut streamer := ourdb.new_streamer( + incremental_mode: false + server_port: 9000 // Master uses default port + is_worker: false +)! +println('Starting master node...') + +// Add worker to whitelist and initialize its database +streamer.add_worker(worker_public_key)! + +// Write some test data id := streamer.write(id: 1, value: 'Record 1')! +println('Wrote record with ID: ${id}') -println('ID: ${id}') - +// Verify data in master master_data := streamer.read(id: id)! master_data_str := master_data.bytestr() println('Master data: ${master_data_str}') + +// Keep master running to handle worker connections +for { + time.sleep(1 * time.second) +} diff --git a/examples/data/deduped_mycelium_slave.vsh b/examples/data/deduped_mycelium_slave.vsh deleted file mode 100755 index 887b4b2f..00000000 --- a/examples/data/deduped_mycelium_slave.vsh +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env -S v -n -w -gc none -cc tcc -d use_openssl -enable-globals run - -import freeflowuniverse.herolib.clients.mycelium -import freeflowuniverse.herolib.installers.net.mycelium_installer -import freeflowuniverse.herolib.data.ourdb -import freeflowuniverse.herolib.osal -import time -import os -import encoding.base64 -import json - -// TODO: Make the worker read the data from the streamer instead. - -const slave_port = 9000 -const master_public_key = '89c2eeb24bcdfaaac78c0023a166d88f760c097c1a57748770e432ba10757179' -const master_address = '458:90d4:a3ef:b285:6d32:a22d:9e73:697f' - -mycelium.delete()! - -// Initialize mycelium clients -mut slave := mycelium.get()! -slave.server_url = 'http://localhost:${slave_port}' -slave.name = 'slave_node' - -// Get public keys for communication -slave_inspect := mycelium.inspect(key_file_path: '/tmp/mycelium_server1/priv_key.bin')! - -println('Server 2 (slave Node) public key: ${slave_inspect.public_key}') - -// Initialize ourdb instances -mut worker := ourdb.new( - record_nr_max: 16777216 - 1 - record_size_max: 1024 - path: '/tmp/ourdb1' - reset: true -)! - -defer { - worker.destroy() or { panic('failed to destroy db1: ${err}') } -} - -// Receive messages -// Parameters: wait_for_message, peek_only, topic_filter -received := slave.receive_msg(wait: true, peek: false, topic: 'sync_db')! -println('Received message from: ${received.src_pk}') -println('Message payload: ${base64.decode_str(received.payload)}') - -payload := base64.decode(received.payload) -println('Payload: ${payload.str()}') -worker.sync_updates(received.payload.bytes()) or { - error('Failed to sync updates to worker due to: ${err}') -} - -// Get last index -last_index := worker.get_last_index()! -println('Last index: ${last_index}') diff --git a/examples/data/deduped_mycelium_worker.vsh b/examples/data/deduped_mycelium_worker.vsh new file mode 100755 index 00000000..1f49f580 --- /dev/null +++ b/examples/data/deduped_mycelium_worker.vsh @@ -0,0 +1,16 @@ +#!/usr/bin/env -S v -n -w -gc none -cc tcc -d use_openssl -enable-globals run + +import freeflowuniverse.herolib.data.ourdb + +// Create a worker node with a unique database path +mut streamer := ourdb.get_streamer(id: 'frBvtZQeqf') or { + ourdb.new_streamer( + incremental_mode: false + server_port: 9001 // Use different port than master + is_worker: true + )! +} + +println('Starting worker node...') +println('Listening for updates from master...') +streamer.listen()! // This will keep running and listening for updates diff --git a/lib/data/ourdb/mycelium_streamer.v b/lib/data/ourdb/mycelium_streamer.v index 548a34dc..ebeb8b3b 100644 --- a/lib/data/ourdb/mycelium_streamer.v +++ b/lib/data/ourdb/mycelium_streamer.v @@ -1,55 +1,102 @@ module ourdb import freeflowuniverse.herolib.clients.mycelium +import rand +import time +import encoding.base64 +import json + +// SyncData encodes binary data as base64 string for JSON compatibility +struct SyncData { +pub: + id u32 + data string // base64 encoded []u8 + topic string = 'db_sync' +} struct MyceliumStreamer { pub mut: master &OurDB @[skip; str: skip] workers map[string]&OurDB @[skip; str: skip] // key is mycelium public key, value is ourdb incremental_mode bool = true // default is true - mycelium_client &mycelium.Mycelium + mycelium_client mycelium.Mycelium @[skip; str: skip] // not a reference since we own it + id string = rand.string(10) +} + +struct MyceliumStreamerInstances { +pub mut: + instances map[string]&MyceliumStreamer } pub struct NewStreamerArgs { pub mut: incremental_mode bool = true // default is true server_port int = 9000 // default is 9000 + is_worker bool // true if this is a worker node } fn new_db_streamer(args NewStreamerArgs) !OurDB { + path := if args.is_worker { + '/tmp/ourdb_worker_${rand.string(8)}' + } else { + '/tmp/ourdb_master' + } return new( record_nr_max: 16777216 - 1 record_size_max: 1024 - path: '/tmp/ourdb1' + path: path reset: true incremental_mode: args.incremental_mode )! } pub fn (mut s MyceliumStreamer) add_worker(public_key string) ! { - mut db := new_db_streamer(incremental_mode: s.incremental_mode)! + mut db := new_db_streamer( + incremental_mode: s.incremental_mode + is_worker: true + )! s.workers[public_key] = &db } pub fn new_streamer(args NewStreamerArgs) !MyceliumStreamer { mut db := new_db_streamer(args)! + + // Initialize mycelium client + mut client := mycelium.get()! + client.server_url = 'http://localhost:${args.server_port}' + client.name = if args.is_worker { 'worker_node' } else { 'master_node' } + mut s := MyceliumStreamer{ master: &db workers: {} incremental_mode: args.incremental_mode - mycelium_client: &mycelium.Mycelium{} + mycelium_client: client } - s.mycelium_client = mycelium.get()! - s.mycelium_client.server_url = 'http://localhost:${args.server_port}' - s.mycelium_client.name = 'master_node' + mut instances_factory := MyceliumStreamerInstances{} + instances_factory.instances[s.id] = &s - // Get public keys for communication - // inspect := mycelium.inspect(key_file_path: '/tmp/mycelium_server1/priv_key.bin')! - // println('Server 2 (slave Node) public key: ${slave_inspect.public_key}') + println('Created ${if args.is_worker { 'worker' } else { 'master' }} node with ID: ${s.id}') return s } +pub struct GetStreamerArgs { +pub mut: + id string @[required] +} + +pub fn get_streamer(args GetStreamerArgs) !MyceliumStreamer { + mut instances_factory := MyceliumStreamerInstances{} + + for id, instamce in instances_factory.instances { + if id == args.id { + return *instamce + } + } + + return error('streamer with id ${args.id} not found') +} + @[params] pub struct MyceliumRecordArgs { pub: @@ -65,6 +112,16 @@ pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 { // Get updates from the beginning (id 0) to ensure complete sync data := s.master.push_updates(id) or { return error('Failed to push updates due to: ${err}') } + // Create sync data + sync_data := SyncData{ + id: id + data: base64.encode(data) // encode binary data directly + topic: 'db_sync' + } + + // Convert to JSON + json_data := json.encode(sync_data) + // Broadcast to all workers for worker_key, mut _ in s.workers { println('Sending message to worker: ${worker_key}') @@ -84,6 +141,46 @@ pub: worker_public_key string } +// listen continuously checks for messages from master and applies updates +pub fn (mut s MyceliumStreamer) listen() ! { + println('Starting to listen for messages...') + spawn fn [mut s] () { + for { + // Check for updates from master + if msg := s.mycelium_client.receive_msg(wait: true, peek: false, topic: 'db_sync') { + // Decode message payload as JSON + sync_data := json.decode(SyncData, msg.payload) or { + eprintln('Failed to decode sync data JSON: ${err}') + continue + } + + // Decode the base64 data + update_data := base64.decode(sync_data.data) + if update_data.len == 0 { + eprintln('Failed to decode base64 data') + continue + } + + // Find the target worker and apply updates + if mut worker := s.workers[msg.src_pk] { + worker.sync_updates(update_data) or { + eprintln('Failed to sync worker: ${err}') + continue + } + println('Successfully applied updates from master') + } else { + eprintln('Received update from unknown source: ${msg.src_pk}') + } + } + } + }() + + // Keep the main thread alive + for { + time.sleep(1 * time.second) + } +} + pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 { if args.worker_public_key.len > 0 { if mut worker := s.workers[args.worker_public_key] {