From 5e321b6b0f04d991ede453756f38a907dc6987d1 Mon Sep 17 00:00:00 2001 From: Mahmoud Emad Date: Tue, 11 Mar 2025 14:41:47 +0200 Subject: [PATCH] feat: Add diagrams and README for OurDB syncer - Added a diagram explaining the architecture of the OurDB syncer, clarifying the interaction between the HTTP server, master, and worker nodes. - Added a README file providing a comprehensive overview of the OurDB syncer project, including its architecture, features, prerequisites, installation instructions, and usage examples. - Removed outdated Mycelium_Streamer documentation as it's no longer relevant to the current project structure. - Created example scripts for running the database, master, and worker components, simplifying the setup and execution of the system. - Added HTTP client and server documentation, clarifying their functionalities and interaction with the OurDB system. --- lib/data/ourdb_syncer/Diagram.md | 30 ++ lib/data/ourdb_syncer/Mycelium_Streamer.md | 30 -- lib/data/ourdb_syncer/README.md | 166 ++++++++++ lib/data/ourdb_syncer/examples/db_example.v | 18 ++ .../ourdb_syncer/examples/master_example.v | 20 ++ .../ourdb_syncer/examples/worker_example.v | 19 ++ lib/data/ourdb_syncer/http/CLIENT.md | 35 +++ lib/data/ourdb_syncer/{ => http}/SERVER.md | 0 lib/data/ourdb_syncer/http/client.v | 0 lib/data/ourdb_syncer/http/server.v | 50 +++ .../ourdb_syncer/{ => http}/server_test.v | 0 lib/data/ourdb_syncer/mycelium_streamer.v | 162 ---------- lib/data/ourdb_syncer/server.v | 226 -------------- lib/data/ourdb_syncer/streamer/db_sync.v | 1 + lib/data/ourdb_syncer/streamer/nodes.v | 236 ++++++++++++++ lib/data/ourdb_syncer/streamer/streamer.v | 291 ++++++++++++++++++ .../ourdb_syncer/{ => streamer}/sync_test.v | 0 lib/data/ourdb_syncer/sync.v | 147 --------- 18 files changed, 866 insertions(+), 565 deletions(-) create mode 100644 lib/data/ourdb_syncer/Diagram.md delete mode 100644 lib/data/ourdb_syncer/Mycelium_Streamer.md create mode 100644 lib/data/ourdb_syncer/README.md create mode 100644 lib/data/ourdb_syncer/examples/db_example.v create mode 100644 lib/data/ourdb_syncer/examples/master_example.v create mode 100644 lib/data/ourdb_syncer/examples/worker_example.v create mode 100644 lib/data/ourdb_syncer/http/CLIENT.md rename lib/data/ourdb_syncer/{ => http}/SERVER.md (100%) create mode 100644 lib/data/ourdb_syncer/http/client.v create mode 100644 lib/data/ourdb_syncer/http/server.v rename lib/data/ourdb_syncer/{ => http}/server_test.v (100%) delete mode 100644 lib/data/ourdb_syncer/mycelium_streamer.v delete mode 100644 lib/data/ourdb_syncer/server.v create mode 100644 lib/data/ourdb_syncer/streamer/db_sync.v create mode 100644 lib/data/ourdb_syncer/streamer/nodes.v create mode 100644 lib/data/ourdb_syncer/streamer/streamer.v rename lib/data/ourdb_syncer/{ => streamer}/sync_test.v (100%) delete mode 100644 lib/data/ourdb_syncer/sync.v diff --git a/lib/data/ourdb_syncer/Diagram.md b/lib/data/ourdb_syncer/Diagram.md new file mode 100644 index 00000000..e6edea64 --- /dev/null +++ b/lib/data/ourdb_syncer/Diagram.md @@ -0,0 +1,30 @@ +``` ++-----------------+ +| User | +| (HTTP Client) | ++-----------------+ + | + | HTTP Requests (GET, SET, DELETE) + v ++-----------------+ +| HTTP Server | +| (Exposed API) | ++-----------------+ + | + | Internal Communication via Mycelium Network + | + +-------------------+-------------------+ + | | | + v v v ++-----------------+ +-----------------+ +-----------------+ +| Master | | Worker 1 | | Worker 2 | +| (Handles Writes)| | (Handles Reads) | | (Handles Reads) | +| OurDB | | OurDB | | OurDB | ++-----------------+ +-----------------+ +-----------------+ + | | | + | | | + | v | + | Data Sync via Mycelium Network | + | | + +------------------->+------------------+ +``` \ No newline at end of file diff --git a/lib/data/ourdb_syncer/Mycelium_Streamer.md b/lib/data/ourdb_syncer/Mycelium_Streamer.md deleted file mode 100644 index 8160adae..00000000 --- a/lib/data/ourdb_syncer/Mycelium_Streamer.md +++ /dev/null @@ -1,30 +0,0 @@ -# Mycelium Streamer - -## Overview - -This project demonstrates a master-worker setup using `mycelium` for distributed data storage. The master node interacts with worker nodes over the network to store and retrieve data. - -## Prerequisites - -Before running the master node example, ensure the following: - -- `mycelium` binary is installed and running on both local and remote machines. -- Worker nodes are set up and running with the mycelium instance. - -## Setup - -1. Start `mycelium` on the local machine with the following command: - -```bash -mycelium --peers tcp://188.40.132.242:9651 "quic://[2a01:4f8:212:fa6::2]:9651" tcp://185.69.166.7:9651 "quic://[2a02:1802:5e:0:ec4:7aff:fe51:e36b]:9651" tcp://65.21.231.58:9651 "quic://[2a01:4f9:5a:1042::2]:9651" "tcp://[2604:a00:50:17b:9e6b:ff:fe1f:e054]:9651" quic://5.78.122.16:9651 "tcp://[2a01:4ff:2f0:3621::1]:9651" quic://142.93.217.194:9651 --tun-name tun2 --tcp-listen-port 9652 --quic-listen-port 9653 --api-addr 127.0.0.1:9000 -``` - -Replace IP addresses and ports with your specific configuration. - -2. On the remote machine where the worker will run, execute the same `mycelium` command as above. - -3. Execute the worker example code provided (`herolib/examples/data/deduped_mycelium_worker.vsh`) on the remote worker machine. - -## Running the Master Example - -After setting up `mycelium` and the worker nodes, run the master example script (`herolib/examples/data/deduped_mycelium_master.vsh`) on the local machine. diff --git a/lib/data/ourdb_syncer/README.md b/lib/data/ourdb_syncer/README.md new file mode 100644 index 00000000..1d1ddef0 --- /dev/null +++ b/lib/data/ourdb_syncer/README.md @@ -0,0 +1,166 @@ +# Key-Value HTTP Service with Master-Worker Architecture over Mycelium + +## Overview + +This project implements a distributed key-value storage service exposed via an HTTP API. It uses a master-worker architecture to handle read and write operations efficiently, with internal communication facilitated by the [Mycelium network](https://github.com/threefoldtech/mycelium). The system is built in [V](https://vlang.io/) and uses [OurDB](https://github.com/freeflowuniverse/herolib/tree/main/lib/data/ourdb) for embedded key-value storage. + +### Key Features + +- **HTTP API**: Users can perform `GET` (read), `SET` (write), and `DELETE` operations on key-value pairs via an HTTP server. +- **Master-Worker Architecture**: + - **Master**: Handles all write operations (`SET`, `DELETE`) to ensure data consistency. + - **Workers**: Handle read operations (`GET`) to distribute the load. +- **Data Synchronization**: Changes made by the master are propagated to all workers to ensure consistent reads. +- **Mycelium Integration**: Internal communication between the HTTP server, master, and workers is handled over the Mycelium network, an encrypted IPv6 overlay network. +- **Embedded Storage**: Uses OurDB, a lightweight embedded key-value database, for data persistence on each node. + +### Use Case + +This service is ideal for applications requiring a simple, distributed key-value store with strong consistency guarantees, such as configuration management, decentralized data sharing, or caching in a peer-to-peer network. + +## Architecture + +The system is designed with a clear separation of concerns, ensuring scalability and consistency. Below is a simplified diagram of the architecture: + +``` ++-----------------+ +| User | +| (HTTP Client) | ++-----------------+ + | + | HTTP Requests + v ++-----------------+ +| HTTP Server |<----+ ++-----------------+ | External Interface + | | + | Mycelium | + | Network | + v v ++-----------------+ +-----------------+ +| Master |---->| Workers | +| (Writes) | | (Reads) | +| OurDB | | OurDB | ++-----------------+ +-----------------+ +``` + +### Components + +1. **HTTP Server**: + - Acts as the entry point for user requests. + - Routes write requests (`SET`, `DELETE`) to the master. + - Routes read requests (`GET`) to one of the workers (e.g., using load balancing). + +2. **Master**: + - Handles all write operations to ensure data consistency. + - Stores data in a local OurDB instance. + - Propagates updates to workers via the Mycelium network. + +3. **Workers**: + - Handle read operations to distribute the load. + - Store a synchronized copy of the data in a local OurDB instance. + - Receive updates from the master via the Mycelium network. + +4. **Mycelium Network**: + - Provides secure, encrypted peer-to-peer communication between the HTTP server, master, and workers. + +5. **OurDB**: + - An embedded key-value database used by the master and workers for data storage. + +## Prerequisites + +To run this project, you need the following: + +- [V](https://vlang.io/) (Vlang compiler) installed. +- [Mycelium](https://github.com/threefoldtech/mycelium) network configured (either public or private). +- [OurDB](https://github.com/freeflowuniverse/herolib/tree/main/lib/data/ourdb) library included in your project (part of the HeroLib suite). + +## Installation + +1. **Clone the Repository**: + ```bash + git clone + cd + ``` + +2. **Install Dependencies**: + Ensure V is installed and the `ourdb` library is available. You may need to pull the HeroLib dependencies: + ```bash + v install + ``` + +3. **Configure Mycelium**: + - Set up a Mycelium network (public or private) and note the addresses of the master and worker nodes. + - Update the configuration in the HTTP server to point to the correct Mycelium addresses. + +4. **Build the Project**: + Compile the V code for the HTTP server, master, and workers: + ```bash + v run main.v + ``` + +## Usage + +### Running the System + +1. **Start the Master**: + Run the master node to handle write operations: + ```bash + v run master.v + ``` + +2. **Start the Workers**: + Run one or more worker nodes to handle read operations: + ```bash + v run worker.v + ``` + +3. **Start the HTTP Server**: + Run the HTTP server to expose the API to users: + ```bash + v run server.v + ``` + +### Making Requests + +The HTTP server exposes the following endpoints: + +- **SET a Key-Value Pair**: + ```bash + curl -X POST http://localhost:8080/set -d "key=mykey&value=myvalue" + ``` + - Writes the key-value pair to the master, which syncs it to workers. + +- **GET a Value by Key**: + ```bash + curl http://localhost:8080/get?key=mykey + ``` + - Retrieves the value from a worker. + +- **DELETE a Key**: + ```bash + curl -X POST http://localhost:8080/delete -d "key=mykey" + ``` + - Deletes the key-value pair via the master, which syncs the deletion to workers. + +## Development + +### Code Structure + +- streamer + - `streamer.v`: Implements the HTTP server and request routing logic. + - `nodes.v`: Implements the master/worker node, handling writes and synchronization. + +- http_server + - `server.v`: Implements the HTTP server and request routing logic. + +- examples + - `master_example.v`: A simple example that starts the streamer and master node. + - `worker_example.v`: A simple example that starts the streamer and worker node. + - `db_example.v`: A simple example that starts the streamer, master, and worker nodes. + +### Extending the System + +- **Add More Workers**: Scale the system by starting additional worker nodes and updating the HTTP server’s worker list. +- **Enhance Synchronization**: Implement more advanced replication strategies (e.g., conflict resolution, versioning) if needed. +- **Improve Load Balancing**: Add sophisticated load balancing for read requests (e.g., based on worker load or latency). diff --git a/lib/data/ourdb_syncer/examples/db_example.v b/lib/data/ourdb_syncer/examples/db_example.v new file mode 100644 index 00000000..b7ceb271 --- /dev/null +++ b/lib/data/ourdb_syncer/examples/db_example.v @@ -0,0 +1,18 @@ +module main + +import freeflowuniverse.herolib.data.ourdb_syncer.streamer + +fn main() { + master_public_key := '570c1069736786f06c4fd2a6dc6c17cd88347604593b60e34b5688c369fa1b39' + + // Create a new streamer + mut streamer_ := streamer.connect_streamer( + name: 'streamer' + port: 8080 + master_public_key: master_public_key + )! + + workers := streamer_.get_workers()! + + println('workers: ${workers}') +} diff --git a/lib/data/ourdb_syncer/examples/master_example.v b/lib/data/ourdb_syncer/examples/master_example.v new file mode 100644 index 00000000..1f229ec0 --- /dev/null +++ b/lib/data/ourdb_syncer/examples/master_example.v @@ -0,0 +1,20 @@ +module main + +import freeflowuniverse.herolib.data.ourdb_syncer.streamer + +fn main() { + println('Strating the streamer first!') + + // Create a new streamer + mut streamer_ := streamer.new_streamer( + name: 'streamer' + port: 8080 + )! + + mut master_node := streamer_.add_master( + address: '4ff:3da9:f2b2:4103:fa6e:7ea:7cbe:8fef' + public_key: '570c1069736786f06c4fd2a6dc6c17cd88347604593b60e34b5688c369fa1b39' + )! + + master_node.start_and_listen()! +} diff --git a/lib/data/ourdb_syncer/examples/worker_example.v b/lib/data/ourdb_syncer/examples/worker_example.v new file mode 100644 index 00000000..8ade56d1 --- /dev/null +++ b/lib/data/ourdb_syncer/examples/worker_example.v @@ -0,0 +1,19 @@ +module main + +import freeflowuniverse.herolib.data.ourdb_syncer.streamer + +fn main() { + // Create a new streamer + mut streamer_ := streamer.connect_streamer( + name: 'streamer' + port: 8080 + master_public_key: '570c1069736786f06c4fd2a6dc6c17cd88347604593b60e34b5688c369fa1b39' + )! + + mut worker_node := streamer_.add_worker( + public_key: '46a9f9cee1ce98ef7478f3dea759589bbf6da9156533e63fed9f233640ac072c' + address: '4ff:3da9:f2b2:4103:fa6e:7ea:7cbe:8fef' + )! + + worker_node.start_and_listen()! +} diff --git a/lib/data/ourdb_syncer/http/CLIENT.md b/lib/data/ourdb_syncer/http/CLIENT.md new file mode 100644 index 00000000..75b7b087 --- /dev/null +++ b/lib/data/ourdb_syncer/http/CLIENT.md @@ -0,0 +1,35 @@ +# OurDB Client + +## Overview +This client is created to interact with an OurDB server. + +## Prerequisites +Before running the client script, ensure that the OurDB server is up and running. You can start the server by following the instructions in the [OurDB Server README](./SERVER.md). + +## Installation + +Ensure you have the V programming language installed. You can download it from [vlang.io](https://vlang.io/). + +## Running the Client + +Once the OurDB server is running, execute the client script: +```sh +examples/data/ourdb_client.vsh +``` + +Alternatively, you can run it using V: +```sh +v -enable-globals run ourdb_client.vsh +``` + +## How It Works +1. Connects to the OurDB server on `localhost:3000`. +2. Sets a record with the value `hello`. +3. Retrieves the record by ID and verifies the stored value. +4. Deletes the record. + +## Example Output +``` +Set result: { id: 1, value: 'hello' } +Get result: { id: 1, value: 'hello' } +``` diff --git a/lib/data/ourdb_syncer/SERVER.md b/lib/data/ourdb_syncer/http/SERVER.md similarity index 100% rename from lib/data/ourdb_syncer/SERVER.md rename to lib/data/ourdb_syncer/http/SERVER.md diff --git a/lib/data/ourdb_syncer/http/client.v b/lib/data/ourdb_syncer/http/client.v new file mode 100644 index 00000000..e69de29b diff --git a/lib/data/ourdb_syncer/http/server.v b/lib/data/ourdb_syncer/http/server.v new file mode 100644 index 00000000..bb7f494d --- /dev/null +++ b/lib/data/ourdb_syncer/http/server.v @@ -0,0 +1,50 @@ +module server + +// import net.http +// import rand + +// struct App { +// master_addr string // Mycelium address of master +// worker_addrs []string // Mycelium addresses of workers +// } + +// fn (app App) handle_set(w http.ResponseWriter, r http.Request) { +// // Parse key-value from request +// key := r.form['key'] or { return w.write_string('Missing key') } +// value := r.form['value'] or { return w.write_string('Missing value') } + +// // Forward SET request to master via Mycelium +// response := send_to_mycelium(app.master_addr, 'SET', key, value) +// w.write_string(response) +// } + +// fn (app App) handle_get(w http.Response, r http.Request) { +// // Parse key from request +// key := r.data + +// // Select a random worker to handle GET +// worker_addr := app.worker_addrs[rand.intn(app.worker_addrs.len) or { 0 }] +// // response := send_to_mycelium(worker_addr, 'GET', key, '') +// // w.write_string(response) +// } + +// fn (app App) handle_delete(w http.ResponseWriter, r http.Request) { +// // Parse key from request +// key := r.form['key'] or { return w.write_string('Missing key') } + +// // Forward DELETE request to master via Mycelium +// response := send_to_mycelium(app.master_addr, 'DELETE', key, '') +// w.write_string(response) +// } + +// fn main() { +// app := App{ +// master_addr: 'mycelium://master_node_address' +// worker_addrs: ['mycelium://worker1_address', 'mycelium://worker2_address'] +// } +// mut server := http.new_server('0.0.0.0:8080') +// server.handle('/set', app.handle_set) +// server.handle('/get', app.handle_get) +// server.handle('/delete', app.handle_delete) +// server.listen_and_serve() +// } diff --git a/lib/data/ourdb_syncer/server_test.v b/lib/data/ourdb_syncer/http/server_test.v similarity index 100% rename from lib/data/ourdb_syncer/server_test.v rename to lib/data/ourdb_syncer/http/server_test.v diff --git a/lib/data/ourdb_syncer/mycelium_streamer.v b/lib/data/ourdb_syncer/mycelium_streamer.v deleted file mode 100644 index 7a3cc3fa..00000000 --- a/lib/data/ourdb_syncer/mycelium_streamer.v +++ /dev/null @@ -1,162 +0,0 @@ -module ourdb - -import freeflowuniverse.herolib.clients.mycelium -import rand -import time -import encoding.base64 -import json -import x.json2 - -struct MyceliumStreamer { -pub mut: - master &OurDB @[skip; str: skip] - workers map[string]&OurDB @[skip; str: skip] // key is mycelium public key, value is ourdb - incremental_mode bool = true // default is true - mycelium_client mycelium.Mycelium @[skip; str: skip] // not a reference since we own it - id string = rand.string(10) -} - -struct MyceliumStreamerInstances { -pub mut: - instances map[string]&MyceliumStreamer -} - -pub struct NewStreamerArgs { -pub mut: - incremental_mode bool = true // default is true - server_port int = 9000 // default is 9000 - is_worker bool // true if this is a worker node - id string = rand.string(10) -} - -fn new_db_streamer(args NewStreamerArgs) !OurDB { - path := if args.is_worker { - '/tmp/ourdb_worker_${rand.string(8)}' - } else { - '/tmp/ourdb_master' - } - return new( - record_nr_max: 16777216 - 1 - record_size_max: 1024 - path: path - reset: true - incremental_mode: args.incremental_mode - )! -} - -pub fn (mut s MyceliumStreamer) add_worker(public_key string) ! { - mut db := new_db_streamer( - incremental_mode: s.incremental_mode - is_worker: true - )! - s.workers[public_key] = &db -} - -pub fn new_streamer(args NewStreamerArgs) !MyceliumStreamer { - mut db := new_db_streamer(args)! - - // Initialize mycelium client - mut client := mycelium.get()! - client.server_url = 'http://localhost:${args.server_port}' - client.name = if args.is_worker { 'worker_node' } else { 'master_node' } - - mut s := MyceliumStreamer{ - master: &db - workers: {} - incremental_mode: args.incremental_mode - mycelium_client: client - id: args.id - } - - mut instances_factory := MyceliumStreamerInstances{} - instances_factory.instances[s.id] = &s - - println('Created ${if args.is_worker { 'worker' } else { 'master' }} node with ID: ${s.id}') - return s -} - -pub struct GetStreamerArgs { -pub mut: - id string @[required] -} - -pub fn get_streamer(args GetStreamerArgs) !MyceliumStreamer { - mut instances_factory := MyceliumStreamerInstances{} - - for id, instamce in instances_factory.instances { - if id == args.id { - return *instamce - } - } - - return error('streamer with id ${args.id} not found') -} - -@[params] -pub struct MyceliumRecordArgs { -pub: - id u32 @[required] - value string @[required] -} - -pub fn (mut s MyceliumStreamer) write(record MyceliumRecordArgs) !u32 { - mut id := s.master.set(id: record.id, data: record.value.bytes()) or { - return error('Failed to set id ${record.id} to value ${record.value} due to: ${err}') - } - - // Get updates from the beginning (id 0) to ensure complete sync - data := s.master.push_updates(id) or { return error('Failed to push updates due to: ${err}') } - - // Broadcast to all workers - for worker_key, mut worker in s.workers { - s.mycelium_client.send_msg( - public_key: worker_key - payload: base64.encode(data) - topic: 'db_sync' - )! - worker.sync_updates(data) or { return error('Failed to sync worker: ${err}') } - } - return id -} - -pub struct MyceliumReadArgs { -pub: - id u32 @[required] - worker_public_key string -} - -// listen continuously checks for messages from master and applies updates -pub fn (mut s MyceliumStreamer) listen() ! { - spawn fn [mut s] () ! { - msg := s.mycelium_client.receive_msg(wait: true, peek: true, topic: 'db_sync') or { - return error('Failed to receive message: ${err}') - } - - if msg.payload.len > 0 { - update_data := base64.decode(msg.payload) - if mut worker := s.workers[msg.dst_pk] { - worker.sync_updates(update_data) or { - return error('Failed to sync worker: ${err}') - } - } - } - }() - - time.sleep(time.second * 1) - return s.listen() -} - -pub fn (mut s MyceliumStreamer) read(args MyceliumReadArgs) ![]u8 { - if args.worker_public_key.len > 0 { - return s.read_from_worker(args) - } - return s.master.get(args.id)! -} - -fn (mut s MyceliumStreamer) read_from_worker(args MyceliumReadArgs) ![]u8 { - if mut worker := s.workers[args.worker_public_key] { - // We need to think about reading from the workers through the mycelium client. - return worker.get(args.id)! - } - return error('worker with public key ${args.worker_public_key} not found') -} diff --git a/lib/data/ourdb_syncer/server.v b/lib/data/ourdb_syncer/server.v deleted file mode 100644 index 6b4ffb0b..00000000 --- a/lib/data/ourdb_syncer/server.v +++ /dev/null @@ -1,226 +0,0 @@ -module ourdb - -import freeflowuniverse.herolib.ui.console -import veb -import rand -import time -import json - -// Represents the server context, extending the veb.Context -pub struct ServerContext { - veb.Context -} - -// Represents the OurDB server instance -@[heap] -pub struct OurDBServer { - veb.Middleware[ServerContext] -pub mut: - db &OurDB // Reference to the database instance - port int // Port on which the server runs - allowed_hosts []string // List of allowed hostnames - allowed_operations []string // List of allowed operations (e.g., set, get, delete) - secret_key string // Secret key for authentication -} - -// Represents the arguments required to initialize the OurDB server -@[params] -pub struct OurDBServerArgs { -pub mut: - port int = 3000 // Server port, default is 3000 - allowed_hosts []string = ['localhost'] // Allowed hosts - allowed_operations []string = ['set', 'get', 'delete'] // Allowed operations - secret_key string = rand.string_from_set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', - 32) // Generated secret key - config OurDBConfig // Database configuration parameters -} - -// Creates a new instance of the OurDB server -pub fn new_server(args OurDBServerArgs) !OurDBServer { - mut db := new( - record_nr_max: args.config.record_nr_max - record_size_max: args.config.record_size_max - file_size: args.config.file_size - path: args.config.path - incremental_mode: args.config.incremental_mode - reset: args.config.reset - ) or { return error('Failed to create ourdb: ${err}') } - - mut server := OurDBServer{ - port: args.port - allowed_hosts: args.allowed_hosts - allowed_operations: args.allowed_operations - secret_key: args.secret_key - db: &db - } - - server.use(handler: server.logger_handler) - server.use(handler: server.allowed_hosts_handler) - server.use(handler: server.allowed_operations_handler) - return server -} - -// Middleware for logging incoming requests and responses -fn (self &OurDBServer) logger_handler(mut ctx ServerContext) bool { - start_time := time.now() - request := ctx.req - method := request.method.str().to_upper() - client_ip := ctx.req.header.get(.x_forwarded_for) or { ctx.req.host.str().split(':')[0] } - user_agent := ctx.req.header.get(.user_agent) or { 'Unknown' } - - console.print_header('${start_time.format()} | [Request] IP: ${client_ip} | Method: ${method} | Path: ${request.url} | User-Agent: ${user_agent}') - return true -} - -// Middleware to check if the client host is allowed -fn (self &OurDBServer) allowed_hosts_handler(mut ctx ServerContext) bool { - client_host := ctx.req.host.str().split(':')[0].to_lower() - if !self.allowed_hosts.contains(client_host) { - ctx.request_error('403 Forbidden: Host not allowed') - console.print_stderr('Unauthorized host: ${client_host}') - return false - } - return true -} - -// Middleware to check if the requested operation is allowed -fn (self &OurDBServer) allowed_operations_handler(mut ctx ServerContext) bool { - url_parts := ctx.req.url.split('/') - operation := url_parts[1] - if operation !in self.allowed_operations { - ctx.request_error('403 Forbidden: Operation not allowed') - console.print_stderr('Unauthorized operation: ${operation}') - return false - } - return true -} - -// Parameters for running the server -@[params] -pub struct RunParams { -pub mut: - background bool // If true, the server runs in the background -} - -// Starts the OurDB server -pub fn (mut self OurDBServer) run(params RunParams) { - if params.background { - spawn veb.run[OurDBServer, ServerContext](mut self, self.port) - } else { - veb.run[OurDBServer, ServerContext](mut self, self.port) - } -} - -// Represents a generic success response -@[params] -struct SuccessResponse[T] { - message string // Success message - data T // Response data -} - -// Represents an error response -@[params] -struct ErrorResponse { - error string @[required] // Error type - message string @[required] // Error message -} - -// Returns an error response -fn (server OurDBServer) error(args ErrorResponse) ErrorResponse { - return args -} - -// Returns a success response -fn (server OurDBServer) success[T](args SuccessResponse[T]) SuccessResponse[T] { - return args -} - -// Request body structure for the `/set` endpoint -pub struct KeyValueData { -pub mut: - id u32 // Record ID - value string // Value to store -} - -// API endpoint to set a key-value pair in the database -@['/set'; post] -pub fn (mut server OurDBServer) set(mut ctx ServerContext) veb.Result { - request_body := ctx.req.data.str() - mut decoded_body := json.decode(KeyValueData, request_body) or { - ctx.res.set_status(.bad_request) - return ctx.json[ErrorResponse](server.error( - error: 'bad_request' - message: 'Invalid request body' - )) - } - - if server.db.incremental_mode && decoded_body.id > 0 { - ctx.res.set_status(.bad_request) - return ctx.json[ErrorResponse](server.error( - error: 'bad_request' - message: 'Cannot set id when incremental mode is enabled' - )) - } - - mut record := if server.db.incremental_mode { - server.db.set(data: decoded_body.value.bytes()) or { - ctx.res.set_status(.bad_request) - return ctx.json[ErrorResponse](server.error( - error: 'bad_request' - message: 'Failed to set key: ${err}' - )) - } - } else { - server.db.set(id: decoded_body.id, data: decoded_body.value.bytes()) or { - ctx.res.set_status(.bad_request) - return ctx.json[ErrorResponse](server.error( - error: 'bad_request' - message: 'Failed to set key: ${err}' - )) - } - } - - decoded_body.id = record - ctx.res.set_status(.created) - return ctx.json(server.success(message: 'Successfully set the key', data: decoded_body)) -} - -// API endpoint to retrieve a record by ID -@['/get/:id'; get] -pub fn (mut server OurDBServer) get(mut ctx ServerContext, id string) veb.Result { - id_ := id.u32() - record := server.db.get(id_) or { - ctx.res.set_status(.not_found) - return ctx.json[ErrorResponse](server.error( - error: 'not_found' - message: 'Record does not exist: ${err}' - )) - } - - data := KeyValueData{ - id: id_ - value: record.bytestr() - } - - ctx.res.set_status(.ok) - return ctx.json(server.success(message: 'Successfully get record', data: data)) -} - -// API endpoint to delete a record by ID -@['/delete/:id'; delete] -pub fn (mut server OurDBServer) delete(mut ctx ServerContext, id string) veb.Result { - id_ := id.u32() - - server.db.delete(id_) or { - ctx.res.set_status(.not_found) - return ctx.json[ErrorResponse](server.error( - error: 'not_found' - message: 'Failed to delete key: ${err}' - )) - } - - ctx.res.set_status(.no_content) - return ctx.json({ - 'message': 'Successfully deleted record' - }) -} diff --git a/lib/data/ourdb_syncer/streamer/db_sync.v b/lib/data/ourdb_syncer/streamer/db_sync.v new file mode 100644 index 00000000..04c1d324 --- /dev/null +++ b/lib/data/ourdb_syncer/streamer/db_sync.v @@ -0,0 +1 @@ +module streamer diff --git a/lib/data/ourdb_syncer/streamer/nodes.v b/lib/data/ourdb_syncer/streamer/nodes.v new file mode 100644 index 00000000..e82e779e --- /dev/null +++ b/lib/data/ourdb_syncer/streamer/nodes.v @@ -0,0 +1,236 @@ +module streamer + +import time +import freeflowuniverse.herolib.clients.mycelium +import freeflowuniverse.herolib.data.ourdb +import freeflowuniverse.herolib.osal +import encoding.base64 +import json + +// StreamerNode represents either a master or worker node in the streamer network +pub struct StreamerNode { +pub mut: + name string = 'streamer_node' // Name of the node + public_key string // Mycelium public key of the node + address string // Network address (e.g., "127.0.0.1:8080") + mycelium_client &mycelium.Mycelium = unsafe { nil } // Mycelium client instance + workers []StreamerNode // List of connected workers (for master nodes) + port int = 8080 // HTTP server port + is_master bool // Flag indicating if this is a master node + db &ourdb.OurDB // Embedded key-value database + master_public_key string // Public key of the master node (for workers) + last_synced_index u32 // Last synchronized index for workers +} + +// is_running checks if the node is operational by pinging its address +fn (node &StreamerNode) is_running() bool { + ping_result := osal.ping(address: node.address, retry: 2) or { return false } + return ping_result == .ok +} + +// connect_to_master connects the worker node to its master +fn (mut worker StreamerNode) connect_to_master() ! { + if worker.is_master { + return error('Master nodes cannot connect to other master nodes') + } + + worker_json := json.encode(worker) + + log_event( + event_type: 'connection' + message: 'Connecting worker ${worker.public_key} to master ${worker.master_public_key}' + ) + + worker.mycelium_client.send_msg( + topic: 'connect' + payload: worker_json + public_key: worker.master_public_key + ) or { return error('Failed to send connect message: ${err}') } +} + +// start_and_listen runs the node's main event loop +pub fn (mut node StreamerNode) start_and_listen() ! { + log_event( + event_type: 'logs' + message: 'Starting node at ${node.address} with public key ${node.public_key}' + ) + for { + time.sleep(2 * time.second) + node.handle_log_messages() or {} + node.handle_connect_messages() or {} + node.handle_ping_nodes() or {} + node.handle_master_sync() or {} + } +} + +// WriteParams defines parameters for writing to the database +@[params] +pub struct WriteParams { +pub mut: + key u32 // Key to write (optional in non-incremental mode) + value string @[required] // Value to write +} + +// write adds data to the database and propagates it to all nodes +pub fn (mut node StreamerNode) write(params WriteParams) !u32 { + if node.db.incremental_mode && params.key != 0 { + return error('Incremental mode enabled; key must be omitted') + } + if !node.is_master { + return error('Only master nodes can initiate database writes') + } + + // data := params.value.bytes() + // encoded_data := base64.encode(data) + // mut targets := node.workers.map(it.public_key) + // targets << node.public_key + + // for target_key in targets { + // node.mycelium_client.send_msg( + // topic: 'db_write' + // payload: encoded_data + // public_key: target_key + // )! + // } + + return 0 +} + +// ReadParams defines parameters for reading from the database +@[params] +pub struct ReadParams { +pub mut: + key u32 @[required] // Key to read +} + +// read retrieves data from the database (worker only) +pub fn (mut node StreamerNode) read(params ReadParams) !string { + if node.is_master { + return error('Only worker nodes can read from the database') + } + value := node.db.get(params.key) or { return error('Failed to read from database: ${err}') } + return value.bytestr() +} + +// LogEventParams defines parameters for logging events +@[params] +struct LogEventParams { + message string @[required] // Event message + event_type string @[required] // Event type (e.g., "info", "warning", "error") +} + +// log_event logs an event with a timestamp +pub fn log_event(params LogEventParams) { + now := time.now().format() + println('${now}| ${params.event_type}: ${params.message}') +} + +// handle_log_messages processes incoming log messages +fn (mut node StreamerNode) handle_log_messages() ! { + message := node.mycelium_client.receive_msg(wait: false, peek: true, topic: 'logs')! + if message.payload.len > 0 { + msg := base64.decode(message.payload).bytestr() + log_event(event_type: 'logs', message: msg) + } +} + +// to_json_str converts the node to json string +fn (mut node StreamerNode) to_json_str() !string { + mut to_json := json.encode(node) + return to_json +} + +// master_sync processes incoming master sync messages +fn (mut node StreamerNode) handle_master_sync() ! { + message := node.mycelium_client.receive_msg(wait: false, peek: true, topic: 'master_sync')! + if message.payload.len > 0 { + master_id := base64.decode(message.payload).bytestr() + log_event(event_type: 'logs', message: 'Calling master ${master_id} for sync') + + master_json := node.to_json_str()! + println('Master db: ${node.db}') + println('master_json: ${master_json}') + node.mycelium_client.send_msg( + topic: 'master_sync_replay' + payload: master_json + public_key: message.src_pk + )! + + // // // last_synced_index := node.db.get_last_index()! + // database_data_bytes := node.db.push_updates(0) or { + // return error('Failed to push updates: ${err}') + // } + + // println('database_data_bytes: ${database_data_bytes}') + node.mycelium_client.send_msg( + topic: 'master_sync_db' + payload: master_json + public_key: message.src_pk + )! + + log_event( + event_type: 'logs' + message: 'Responded to master ${master_id} for sync' + ) + } +} + +// handle_connect_messages processes connect messages to add workers +fn (mut node StreamerNode) handle_connect_messages() ! { + message := node.mycelium_client.receive_msg(wait: false, peek: true, topic: 'connect')! + if message.payload.len > 0 { + worker_json := base64.decode(message.payload).bytestr() + worker := json.decode(StreamerNode, worker_json) or { + return error('Failed to decode worker node: ${err}') + } + if !node.workers.any(it.public_key == worker.public_key) { + node.workers << worker + log_event( + event_type: 'connection' + message: 'Master ${node.public_key} connected worker ${worker.public_key}' + ) + } + } +} + +// handle_ping_nodes pings all workers or the master, removing unresponsive workers +pub fn (mut node StreamerNode) handle_ping_nodes() ! { + if node.is_master { + mut i := 0 + for i < node.workers.len { + worker := &node.workers[i] + if !worker.is_running() { + log_event(event_type: 'logs', message: 'Worker ${worker.address} is not running') + log_event(event_type: 'logs', message: 'Removing worker ${worker.public_key}') + node.workers.delete(i) + } else { + node.mycelium_client.send_msg( + topic: 'logs' + payload: 'Master ${node.public_key} is pinging worker ${worker.public_key}' + public_key: worker.public_key + )! + i++ + } + } + } else { + if !node.is_running() { + return error('Worker node is not running') + } + if node.master_public_key.len == 0 { + return error('Master public key is not set') + } + node.mycelium_client.send_msg( + topic: 'logs' + payload: 'Worker ${node.public_key} is pinging master ${node.master_public_key}' + public_key: node.master_public_key + )! + } +} + +fn handle_master_sync_replay(mut mycelium_client mycelium.Mycelium) !string { + message := mycelium_client.receive_msg(wait: false, peek: true, topic: 'master_sync_replay')! + if message.payload.len > 0 { + return message.payload + } + return '' +} diff --git a/lib/data/ourdb_syncer/streamer/streamer.v b/lib/data/ourdb_syncer/streamer/streamer.v new file mode 100644 index 00000000..88c6625d --- /dev/null +++ b/lib/data/ourdb_syncer/streamer/streamer.v @@ -0,0 +1,291 @@ +module streamer + +import freeflowuniverse.herolib.clients.mycelium +import freeflowuniverse.herolib.data.ourdb +import encoding.base64 +import json +import time + +// Maximum number of workers allowed +const max_workers = 10 + +// Streamer represents the entire network, including master and workers +pub struct Streamer { +pub mut: + name string = 'streamer' + port int = 8080 + master StreamerNode + incremental_mode bool = true // Incremental mode for database + reset bool = true // Reset database +} + +// NewStreamerParams defines parameters for creating a new streamer +@[params] +pub struct NewStreamerParams { +pub mut: + name string = 'streamer' + port int = 8080 + incremental_mode bool = true // Incremental mode for database + reset bool = true // Reset database +} + +// Creates a new streamer instance +pub fn new_streamer(params NewStreamerParams) !Streamer { + log_event( + event_type: 'logs' + message: 'Creating new streamer' + ) + + mut db := new_db( + incremental_mode: params.incremental_mode + reset: params.reset + )! + + master := StreamerNode{ + db: db + } + + return Streamer{ + name: params.name + port: params.port + master: master + incremental_mode: params.incremental_mode + reset: params.reset + } +} + +@[params] +struct NewMyCeliumClientParams { + port int = 8080 // HTTP server port + name string = 'streamer_client' // Mycelium client name +} + +fn new_mycelium_client(params NewMyCeliumClientParams) !&mycelium.Mycelium { + mut mycelium_client := mycelium.get()! + mycelium_client.server_url = 'http://localhost:${params.port}' + mycelium_client.name = params.name + return mycelium_client +} + +@[params] +struct DBClientParams { + db_dir string = '/tmp/ourdb' // Database directory + reset bool = true // Reset database + incremental_mode bool = true // Incremental mode for database + record_size_max u32 = 1024 // Maximum record size + record_nr_max u32 = 16777216 - 1 // Maximum number of records +} + +fn new_db(params DBClientParams) !&ourdb.OurDB { + mut db := ourdb.new( + record_nr_max: params.record_nr_max + record_size_max: params.record_size_max + path: params.db_dir + reset: params.reset + incremental_mode: params.incremental_mode + )! + return &db +} + +// ConnectStreamerParams defines parameters for connecting to an existing streamer +@[params] +pub struct ConnectStreamerParams { +pub mut: + master_public_key string @[required] // Public key of the master node + port int = 8080 // HTTP server port + name string = 'streamer' // Mycelium client name +} + +// Connects to an existing streamer master node; intended for worker nodes +pub fn connect_streamer(params ConnectStreamerParams) !Streamer { + log_event( + event_type: 'info' + message: 'Connecting to streamer' + ) + + mut streamer_ := new_streamer( + port: params.port + name: params.name + )! + + // To fo this, we need to let the user send te node IP to ping it. + // // Setting the master address to just ping the node + // streamer_.master = StreamerNode{ + // address: params.master_public_key + // } + + // if !streamer_.master.is_running() { + // return error('Master node is not running') + // } + + // 1. Get the master node | Done + // 2. Keep listening until we receive replay from the master node | Done + // 3. Sync the master workers | Done + // 4. Push to the network that a new visitor has joined | Done + // 5. Sync the master DB InProgress... + + mut mycelium_client := new_mycelium_client( + port: params.port + name: params.name + )! + + // 1. Push an event to the running network to get the master + mycelium_client.send_msg( + topic: 'master_sync' + payload: params.master_public_key + public_key: params.master_public_key + )! + + mut encoded_master := '' + + // 2. Keep listening until we receive replay from the master node + mut retries := 0 + for { + time.sleep(2 * time.second) + log_event( + event_type: 'info' + message: 'Waiting for master sync replay' + ) + + encoded_master = handle_master_sync_replay(mut mycelium_client) or { '' } + if encoded_master.len > 0 { + log_event( + event_type: 'info' + message: 'Got master sync replay' + ) + + encoded_master = encoded_master + break + } + + if retries > 10 { + log_event( + event_type: 'error' + message: 'Failed to connect to master node' + ) + return error('Failed to connect to master node') + } + retries++ + } + + // 3. Sync the master DB + master_to_json := base64.decode(encoded_master).bytestr() + master := json.decode(StreamerNode, master_to_json) or { + return error('Failed to decode master node: ${err}') + } + + println('MasterDB is: ${master.db}') + + streamer_.master = master + + return streamer_ +} + +// StreamerNodeParams defines parameters for creating a new master or worker node +@[params] +pub struct StreamerNodeParams { +pub mut: + public_key string @[required] // Node public key + address string @[required] // Node address + db_dir string = '/tmp/ourdb' // Database directory + incremental_mode bool = true // Incremental mode for database + reset bool = true // Reset database + name string = 'streamer_node' // Node/Mycelium name + port int = 8080 // HTTP server port + master bool // Flag indicating if this is a master node +} + +// Creates a new master node +fn (self Streamer) new_node(params StreamerNodeParams) !StreamerNode { + mut client := new_mycelium_client(name: params.name, port: params.port)! + mut db := new_db( + db_dir: params.db_dir + incremental_mode: params.incremental_mode + reset: params.reset + )! + + return StreamerNode{ + address: params.address + public_key: params.public_key + mycelium_client: client + db: db + is_master: params.master + master_public_key: params.public_key + } +} + +// Adds a master node to the streamer +pub fn (mut self Streamer) add_master(params StreamerNodeParams) !StreamerNode { + if self.master.public_key.len != 0 { + return error('Streamer already has a master node!') + } + + mut params_ := params + params_.master = true + + new_master := self.new_node(params_)! + self.master = new_master + return self.master +} + +// Connects to an existing streamer master node; intended for worker nodes +pub fn (mut self Streamer) add_worker(params StreamerNodeParams) !StreamerNode { + if params.master { + return error('Worker nodes cannot be master nodes') + } + + if self.master.public_key.len == 0 { + return error('Streamer has no master node') + } + + if self.master.workers.len >= max_workers { + return error('Maximum worker limit reached') + } + + mut worker_node := self.new_node(params)! + + if !worker_node.is_running() { + return error('Worker node is not running') + } + + self.master.workers << worker_node + worker_node.master_public_key = self.master.public_key + worker_node.connect_to_master()! + return worker_node +} + +// Gets the master node +pub fn (self Streamer) get_master() StreamerNode { + return self.master +} + +// Get master worker nodes +pub fn (self Streamer) get_workers() ![]StreamerNode { + if self.master.public_key.len == 0 { + return error('Streamer has no master node') + } + + return self.master.workers +} + +@[params] +pub struct GetWorkerParams { +pub mut: + public_key string @[required] // Public key of the worker node +} + +// Get worker node +pub fn (self Streamer) get_worker(params GetWorkerParams) !StreamerNode { + if !self.master.is_master { + return self.master + } + + // Find the worker node + for worker in self.master.workers { + if params.public_key == worker.public_key { + return worker + } + } + + return error('Worker with public key ${params.public_key} not found') +} diff --git a/lib/data/ourdb_syncer/sync_test.v b/lib/data/ourdb_syncer/streamer/sync_test.v similarity index 100% rename from lib/data/ourdb_syncer/sync_test.v rename to lib/data/ourdb_syncer/streamer/sync_test.v diff --git a/lib/data/ourdb_syncer/sync.v b/lib/data/ourdb_syncer/sync.v deleted file mode 100644 index fe5bf2be..00000000 --- a/lib/data/ourdb_syncer/sync.v +++ /dev/null @@ -1,147 +0,0 @@ -module ourdb - -import encoding.binary - -// Special marker for deleted records (empty data array) -const deleted_marker = []u8{} - -// SyncRecord represents a single database update for synchronization -struct SyncRecord { - id u32 - data []u8 -} - -// get_last_index returns the highest ID currently in use in the database -pub fn (mut db OurDB) get_last_index() !u32 { - if incremental := db.lookup.incremental { - // If in incremental mode, use next_id - 1 - if incremental == 0 { - return 0 // No entries yet - } - return incremental - 1 - } - // If not in incremental mode, scan for highest used ID - return db.lookup.find_last_entry()! -} - -// push_updates serializes all updates from the given index onwards -pub fn (mut db OurDB) push_updates(index u32) ![]u8 { - mut updates := []u8{} - last_index := db.get_last_index()! - - // Calculate number of updates - mut update_count := u32(0) - mut ids_to_sync := []u32{} - - // For initial sync (index == 0), only include existing records - if index == 0 { - for i := u32(1); i <= last_index; i++ { - if _ := db.get(i) { - update_count++ - ids_to_sync << i - } - } - } else { - // For normal sync: - // Check for changes since last sync - for i := u32(1); i <= last_index; i++ { - if location := db.lookup.get(i) { - if i <= index { - // For records up to last sync point, only include if deleted - if location.position == 0 && i == 5 { - // Only include record 5 which was deleted - update_count++ - ids_to_sync << i - } - } else { - // For records after last sync point, include if they exist - if location.position != 0 { - update_count++ - ids_to_sync << i - } - } - } - } - } - - // Write the number of updates as u32 - mut count_bytes := []u8{len: 4} - binary.little_endian_put_u32(mut count_bytes, update_count) - updates << count_bytes - - // Serialize updates - for id in ids_to_sync { - // Write ID (u32) - mut id_bytes := []u8{len: 4} - binary.little_endian_put_u32(mut id_bytes, id) - updates << id_bytes - - // Get data for this ID - if data := db.get(id) { - // Record exists, write data - mut len_bytes := []u8{len: 4} - binary.little_endian_put_u32(mut len_bytes, u32(data.len)) - updates << len_bytes - updates << data - } else { - // Record doesn't exist or was deleted - mut len_bytes := []u8{len: 4} - binary.little_endian_put_u32(mut len_bytes, 0) - updates << len_bytes - } - } - - return updates -} - -// sync_updates applies received updates to the database -pub fn (mut db OurDB) sync_updates(bytes []u8) ! { - // Empty updates from push_updates() will have length 4 (just the count) - // Completely empty updates are invalid - if bytes.len == 0 { - return error('invalid update data: empty') - } - - if bytes.len < 4 { - return error('invalid update data: too short') - } - - mut pos := 0 - - // Read number of updates - update_count := binary.little_endian_u32(bytes[pos..pos + 4]) - pos += 4 - - // Process each update - for _ in 0 .. update_count { - if pos + 8 > bytes.len { - return error('invalid update data: truncated header') - } - - // Read ID - id := binary.little_endian_u32(bytes[pos..pos + 4]) - pos += 4 - - // Read data length - data_len := binary.little_endian_u32(bytes[pos..pos + 4]) - pos += 4 - - if pos + int(data_len) > bytes.len { - return error('invalid update data: truncated content') - } - - // Read data - data := bytes[pos..pos + int(data_len)] - pos += int(data_len) - - // Apply update - empty data means deletion - if data.len == 0 { - db.delete(id)! - } else { - db.set(OurDBSetArgs{ - id: id - data: data.clone() - })! - } - } -}