From 4a82bde192104943a62ebc12f899e9c9eb576483 Mon Sep 17 00:00:00 2001 From: Mahmoud-Emad Date: Wed, 3 Sep 2025 11:22:18 +0300 Subject: [PATCH] refactor: migrate to `redisclient` and update V-lang syntax - Refactor Redis client usage to `herolib.core.redisclient` - Improve Redis connection error handling and logging - Update V-lang syntax: string interpolation, `spawn`, key generation - Expose `herocluster` types and methods as public - Add `redisclient` usage example in `escalayer` module --- lib/hero/herocluster/example/example.vsh | 110 +++-- lib/hero/herocluster/factory.v | 503 ++++++++++++----------- lib/mcp/rhai/example/example.vsh | 65 +++ 3 files changed, 402 insertions(+), 276 deletions(-) mode change 100644 => 100755 lib/hero/herocluster/example/example.vsh diff --git a/lib/hero/herocluster/example/example.vsh b/lib/hero/herocluster/example/example.vsh old mode 100644 new mode 100755 index 6c1300dc..59250d70 --- a/lib/hero/herocluster/example/example.vsh +++ b/lib/hero/herocluster/example/example.vsh @@ -1,67 +1,107 @@ #!/usr/bin/env -S v -n -w -cg -gc none -cc tcc -d use_openssl -enable-globals run +import crypto.ed25519 +import freeflowuniverse.herolib.core.base +import freeflowuniverse.herolib.core.redisclient +import freeflowuniverse.herolib.hero.herocluster +import os +import rand + +mut ctx := base.context()! +redis := ctx.redis()! + if os.args.len < 3 { - eprintln('Usage: ./prog ') - eprintln(' status: active|buffer') - return + eprintln('Usage: ./prog ') + eprintln(' status: active|buffer') + return } node_id := os.args[1] status_str := os.args[2] status := match status_str { - 'active' { NodeStatus.active } - 'buffer' { NodeStatus.buffer } - else { - eprintln('Invalid status. Use: active|buffer') - return - } + 'active' { + herocluster.NodeStatus.active + } + 'buffer' { + herocluster.NodeStatus.buffer + } + else { + eprintln('Invalid status. Use: active|buffer') + return + } } // --- Generate ephemeral keys for demo --- // In real use: load from PEM files -priv, pub := ed25519.generate_key(rand.reader) or { panic(err) } +pub_, priv := ed25519.generate_key()! mut pubkeys := map[string]ed25519.PublicKey{} -pubkeys[node_id] = pub +pubkeys[node_id] = pub_ // TODO: load all pubkeys from config file so every node knows others // Initialize all nodes (in real scenario, load from config) -mut all_nodes := map[string]Node{} -all_nodes['node1'] = Node{id: 'node1', status: .active} -all_nodes['node2'] = Node{id: 'node2', status: .active} -all_nodes['node3'] = Node{id: 'node3', status: .active} -all_nodes['node4'] = Node{id: 'node4', status: .buffer} +mut all_nodes := map[string]herocluster.Node{} +all_nodes['node1'] = herocluster.Node{ + id: 'node1' + status: .active +} +all_nodes['node2'] = herocluster.Node{ + id: 'node2' + status: .active +} +all_nodes['node3'] = herocluster.Node{ + id: 'node3' + status: .active +} +all_nodes['node4'] = herocluster.Node{ + id: 'node4' + status: .buffer +} // Set current node status all_nodes[node_id].status = status servers := ['127.0.0.1:6379', '127.0.0.1:6380', '127.0.0.1:6381', '127.0.0.1:6382'] -mut conns := []redis.Connection{} +mut conns := []&redisclient.Redis{} for s in servers { - mut c := redis.connect(redis.Options{ server: s }) or { - panic('could not connect to redis $s: $err') - } - conns << c + redis_url := redisclient.get_redis_url(s) or { + eprintln('Warning: could not parse redis url ${s}: ${err}') + continue + } + mut c := redisclient.core_get(redis_url) or { + eprintln('Warning: could not connect to redis ${s}: ${err}') + continue + } + conns << c + println('Connected to Redis server: ${s}') } -mut election := Election{ - clients: conns - pubkeys: pubkeys - self: Node{ - id: node_id - term: 0 - leader: false - status: status - } - keys: Keys{ priv: priv, pub: pub } - all_nodes: all_nodes - buffer_nodes: ['node4'] // Initially node4 is buffer +if conns.len == 0 { + eprintln('Error: No Redis servers available. Please start at least one Redis server.') + return } -println('[$node_id] started as $status_str, connected to 4 redis servers.') +mut election := &herocluster.Election{ + clients: conns + pubkeys: pubkeys + self: herocluster.Node{ + id: node_id + term: 0 + leader: false + status: status + } + keys: herocluster.Keys{ + priv: priv + pub: pub_ + } + all_nodes: all_nodes + buffer_nodes: ['node4'] // Initially node4 is buffer +} + +println('[${node_id}] started as ${status_str}, connected to 4 redis servers.') // Start health monitoring in background -go election.health_monitor_loop() +spawn election.health_monitor_loop() // Start main heartbeat loop election.heartbeat_loop() diff --git a/lib/hero/herocluster/factory.v b/lib/hero/herocluster/factory.v index d2319dad..e9b99022 100644 --- a/lib/hero/herocluster/factory.v +++ b/lib/hero/herocluster/factory.v @@ -1,10 +1,8 @@ module herocluster -import db.redis +import freeflowuniverse.herolib.core.redisclient import crypto.ed25519 -import crypto.rand import encoding.hex -import os import time const election_timeout_ms = 3000 @@ -14,295 +12,318 @@ const health_check_interval_ms = 30000 // 30 seconds // --- Crypto helpers --- -struct Keys { - priv ed25519.PrivateKey - pub ed25519.PublicKey +pub struct Keys { +pub mut: + priv ed25519.PrivateKey + pub ed25519.PublicKey } // sign a message fn (k Keys) sign(msg string) string { - sig := ed25519.sign(k.priv, msg.bytes()) - return hex.encode(sig) + sig := ed25519.sign(k.priv, msg.bytes()) or { panic('Failed to sign message: ${err}') } + return hex.encode(sig) } // verify signature -fn verify(pub ed25519.PublicKey, msg string, sig_hex string) bool { - sig := hex.decode(sig_hex) or { return false } - return ed25519.verify(pub, msg.bytes(), sig) +fn verify(pubkey ed25519.PublicKey, msg string, sig_hex string) bool { + sig := hex.decode(sig_hex) or { return false } + return ed25519.verify(pubkey, msg.bytes(), sig) or { false } } // --- Node & Election --- -enum NodeStatus { - active - buffer - unavailable +pub enum NodeStatus { + active + buffer + unavailable } -struct Node { - id string - mut: - term int - leader bool - voted_for string - status NodeStatus - last_seen i64 // timestamp +pub struct Node { +pub: + id string +pub mut: + term int + leader bool + voted_for string + status NodeStatus + last_seen i64 // timestamp } struct HealthReport { - reporter_id string - target_id string - status string // "available" or "unavailable" - timestamp i64 - signature string + reporter_id string + target_id string + status string // "available" or "unavailable" + timestamp i64 + signature string } -struct Election { - mut: - clients []redis.Connection - pubkeys map[string]ed25519.PublicKey - self Node - keys Keys - all_nodes map[string]Node - buffer_nodes []string +pub struct Election { +pub mut: + clients []&redisclient.Redis + pubkeys map[string]ed25519.PublicKey + self Node + keys Keys + all_nodes map[string]Node + buffer_nodes []string } // Redis keys -fn vote_key(term int, node_id string) string { return 'vote:${term}:${node_id}' } -fn health_key(reporter_id string, target_id string) string { return 'health:${reporter_id}:${target_id}' } -fn node_status_key(node_id string) string { return 'node_status:${node_id}' } +fn vote_key(term int, node_id string) string { + return 'vote:${term}:${node_id}' +} + +fn health_key(reporter_id string, target_id string) string { + return 'health:${reporter_id}:${target_id}' +} + +fn node_status_key(node_id string) string { + return 'node_status:${node_id}' +} // Write vote (signed) to ALL redis servers fn (mut e Election) vote_for(candidate string) { - msg := '${e.self.term}:${candidate}' - sig_hex := e.keys.sign(msg) - for mut c in e.clients { - k := vote_key(e.self.term, e.self.id) - c.hset(k, 'candidate', candidate) or {} - c.hset(k, 'sig', sig_hex) or {} - c.expire(k, 5) or {} - } - println('[${e.self.id}] voted for $candidate (term=${e.self.term})') + msg := '${e.self.term}:${candidate}' + sig_hex := e.keys.sign(msg) + for mut c in e.clients { + k := vote_key(e.self.term, e.self.id) + c.hset(k, 'candidate', candidate) or {} + c.hset(k, 'sig', sig_hex) or {} + c.expire(k, 5) or {} + } + println('[${e.self.id}] voted for ${candidate} (term=${e.self.term})') } // Report node health status fn (mut e Election) report_node_health(target_id string, status string) { - now := time.now().unix() - msg := '${target_id}:${status}:${now}' - sig_hex := e.keys.sign(msg) - - report := HealthReport{ - reporter_id: e.self.id - target_id: target_id - status: status - timestamp: now - signature: sig_hex - } - - for mut c in e.clients { - k := health_key(e.self.id, target_id) - c.hset(k, 'status', status) or {} - c.hset(k, 'timestamp', now.str()) or {} - c.hset(k, 'signature', sig_hex) or {} - c.expire(k, 86400) or {} // expire after 24 hours - } - println('[${e.self.id}] reported $target_id as $status') + now := time.now().unix() + msg := '${target_id}:${status}:${now}' + sig_hex := e.keys.sign(msg) + + _ := HealthReport{ + reporter_id: e.self.id + target_id: target_id + status: status + timestamp: now + signature: sig_hex + } + + for mut c in e.clients { + k := health_key(e.self.id, target_id) + c.hset(k, 'status', status) or {} + c.hset(k, 'timestamp', now.str()) or {} + c.hset(k, 'signature', sig_hex) or {} + c.expire(k, 86400) or {} // expire after 24 hours + } + println('[${e.self.id}] reported ${target_id} as ${status}') } // Collect health reports and check for consensus on unavailable nodes fn (mut e Election) check_node_availability() { - now := time.now().unix() - mut unavailable_reports := map[string]map[string]i64{} // target_id -> reporter_id -> timestamp - - for mut c in e.clients { - keys := c.keys('health:*') or { continue } - for k in keys { - parts := k.split(':') - if parts.len != 3 { continue } - reporter_id := parts[1] - target_id := parts[2] - - vals := c.hgetall(k) or { continue } - status := vals['status'] - timestamp_str := vals['timestamp'] - sig_hex := vals['signature'] - - if reporter_id !in e.pubkeys { continue } - - timestamp := timestamp_str.i64() - msg := '${target_id}:${status}:${timestamp}' - - if verify(e.pubkeys[reporter_id], msg, sig_hex) { - if status == 'unavailable' && (now - timestamp) >= (node_unavailable_threshold_ms / 1000) { - if target_id !in unavailable_reports { - unavailable_reports[target_id] = map[string]i64{} - } - unavailable_reports[target_id][reporter_id] = timestamp - } - } - } - } - - // Check for consensus (2 out of 3 active nodes agree) - for target_id, reports in unavailable_reports { - if reports.len >= 2 && target_id in e.all_nodes { - if e.all_nodes[target_id].status == .active { - println('[${e.self.id}] Consensus reached: $target_id is unavailable for >1 day') - e.promote_buffer_node(target_id) - } - } - } + now := time.now().unix() + mut unavailable_reports := map[string]map[string]i64{} // target_id -> reporter_id -> timestamp + + for mut c in e.clients { + keys := c.keys('health:*') or { continue } + for k in keys { + parts := k.split(':') + if parts.len != 3 { + continue + } + reporter_id := parts[1] + target_id := parts[2] + + vals := c.hgetall(k) or { continue } + status := vals['status'] + timestamp_str := vals['timestamp'] + sig_hex := vals['signature'] + + if reporter_id !in e.pubkeys { + continue + } + + timestamp := timestamp_str.i64() + msg := '${target_id}:${status}:${timestamp}' + + if verify(e.pubkeys[reporter_id], msg, sig_hex) { + if status == 'unavailable' + && (now - timestamp) >= (node_unavailable_threshold_ms / 1000) { + if target_id !in unavailable_reports { + unavailable_reports[target_id] = map[string]i64{} + } + unavailable_reports[target_id][reporter_id] = timestamp + } + } + } + } + + // Check for consensus (2 out of 3 active nodes agree) + for target_id, reports in unavailable_reports { + if reports.len >= 2 && target_id in e.all_nodes { + if e.all_nodes[target_id].status == .active { + println('[${e.self.id}] Consensus reached: ${target_id} is unavailable for >1 day') + e.promote_buffer_node(target_id) + } + } + } } // Promote a buffer node to active status fn (mut e Election) promote_buffer_node(failed_node_id string) { - if e.buffer_nodes.len == 0 { - println('[${e.self.id}] No buffer nodes available for promotion') - return - } - - // Select first available buffer node - buffer_id := e.buffer_nodes[0] - - // Update node statuses - if failed_node_id in e.all_nodes { - e.all_nodes[failed_node_id].status = .unavailable - } - if buffer_id in e.all_nodes { - e.all_nodes[buffer_id].status = .active - } - - // Remove from buffer list - e.buffer_nodes = e.buffer_nodes.filter(it != buffer_id) - - // Announce the promotion - for mut c in e.clients { - k := node_status_key(buffer_id) - c.hset(k, 'status', 'active') or {} - c.hset(k, 'promoted_at', time.now().unix().str()) or {} - c.hset(k, 'replaced_node', failed_node_id) or {} - - // Mark failed node as unavailable - failed_k := node_status_key(failed_node_id) - c.hset(failed_k, 'status', 'unavailable') or {} - c.hset(failed_k, 'failed_at', time.now().unix().str()) or {} - } - - println('[${e.self.id}] Promoted buffer node $buffer_id to replace failed node $failed_node_id') + if e.buffer_nodes.len == 0 { + println('[${e.self.id}] No buffer nodes available for promotion') + return + } + + // Select first available buffer node + buffer_id := e.buffer_nodes[0] + + // Update node statuses + if failed_node_id in e.all_nodes { + e.all_nodes[failed_node_id].status = .unavailable + } + if buffer_id in e.all_nodes { + e.all_nodes[buffer_id].status = .active + } + + // Remove from buffer list + e.buffer_nodes = e.buffer_nodes.filter(it != buffer_id) + + // Announce the promotion + for mut c in e.clients { + k := node_status_key(buffer_id) + c.hset(k, 'status', 'active') or {} + c.hset(k, 'promoted_at', time.now().unix().str()) or {} + c.hset(k, 'replaced_node', failed_node_id) or {} + + // Mark failed node as unavailable + failed_k := node_status_key(failed_node_id) + c.hset(failed_k, 'status', 'unavailable') or {} + c.hset(failed_k, 'failed_at', time.now().unix().str()) or {} + } + + println('[${e.self.id}] Promoted buffer node ${buffer_id} to replace failed node ${failed_node_id}') } // Collect votes from ALL redis servers, verify signatures (only from active nodes) fn (mut e Election) collect_votes(term int) map[string]int { - mut counts := map[string]int{} - mut seen := map[string]bool{} // avoid double-counting same vote from multiple servers + mut counts := map[string]int{} + mut seen := map[string]bool{} // avoid double-counting same vote from multiple servers - for mut c in e.clients { - keys := c.keys('vote:${term}:*') or { continue } - for k in keys { - if seen[k] { continue } - seen[k] = true - vals := c.hgetall(k) or { continue } - candidate := vals['candidate'] - sig_hex := vals['sig'] - voter_id := k.split(':')[2] - - // Only count votes from active nodes - if voter_id !in e.pubkeys || voter_id !in e.all_nodes { continue } - if e.all_nodes[voter_id].status != .active { continue } - - msg := '${term}:${candidate}' - if verify(e.pubkeys[voter_id], msg, sig_hex) { - counts[candidate]++ - } else { - println('[${e.self.id}] invalid signature from $voter_id') - } - } - } - return counts + for mut c in e.clients { + keys := c.keys('vote:${term}:*') or { continue } + for k in keys { + if seen[k] { + continue + } + seen[k] = true + vals := c.hgetall(k) or { continue } + candidate := vals['candidate'] + sig_hex := vals['sig'] + voter_id := k.split(':')[2] + + // Only count votes from active nodes + if voter_id !in e.pubkeys || voter_id !in e.all_nodes { + continue + } + if e.all_nodes[voter_id].status != .active { + continue + } + + msg := '${term}:${candidate}' + if verify(e.pubkeys[voter_id], msg, sig_hex) { + counts[candidate]++ + } else { + println('[${e.self.id}] invalid signature from ${voter_id}') + } + } + } + return counts } // Run election (only active nodes participate) fn (mut e Election) run_election() { - if e.self.status != .active { - return // Buffer nodes don't participate in elections - } - - e.self.term++ - e.vote_for(e.self.id) + if e.self.status != .active { + return + } - // wait a bit for other nodes to also vote - time.sleep(500 * time.millisecond) + e.self.term++ + e.vote_for(e.self.id) - votes := e.collect_votes(e.self.term) - active_node_count := e.all_nodes.values().filter(it.status == .active).len - majority_threshold := (active_node_count / 2) + 1 - - for cand, cnt in votes { - if cnt >= majority_threshold { - if cand == e.self.id { - println('[${e.self.id}] I AM LEADER (term=${e.self.term}, votes=$cnt, active_nodes=$active_node_count)') - e.self.leader = true - } else { - println('[${e.self.id}] sees LEADER = $cand (term=${e.self.term}, votes=$cnt, active_nodes=$active_node_count)') - e.self.leader = false - } - } - } + // wait a bit for other nodes to also vote + time.sleep(500 * time.millisecond) + + votes := e.collect_votes(e.self.term) + active_node_count := e.all_nodes.values().filter(it.status == .active).len + majority_threshold := (active_node_count / 2) + 1 + + for cand, cnt in votes { + if cnt >= majority_threshold { + if cand == e.self.id { + println('[${e.self.id}] I AM LEADER (term=${e.self.term}, votes=${cnt}, active_nodes=${active_node_count})') + e.self.leader = true + } else { + println('[${e.self.id}] sees LEADER = ${cand} (term=${e.self.term}, votes=${cnt}, active_nodes=${active_node_count})') + e.self.leader = false + } + } + } } // Health monitoring loop (runs in background) -fn (mut e Election) health_monitor_loop() { - for { - if e.self.status == .active { - // Check health of other nodes - for node_id, node in e.all_nodes { - if node_id == e.self.id { continue } - - // Simple health check: try to read a heartbeat key - mut is_available := false - for mut c in e.clients { - heartbeat_key := 'heartbeat:${node_id}' - val := c.get(heartbeat_key) or { continue } - last_heartbeat := val.i64() - if (time.now().unix() - last_heartbeat) < 60 { // 60 seconds threshold - is_available = true - break - } - } - - status := if is_available { 'available' } else { 'unavailable' } - e.report_node_health(node_id, status) - } - - // Check for consensus on failed nodes - e.check_node_availability() - } - - time.sleep(health_check_interval_ms * time.millisecond) - } +pub fn (mut e Election) health_monitor_loop() { + for { + if e.self.status == .active { + // Check health of other nodes + for node_id, _ in e.all_nodes { + if node_id == e.self.id { + continue + } + + // Simple health check: try to read a heartbeat key + mut is_available := false + for mut c in e.clients { + heartbeat_key := 'heartbeat:${node_id}' + val := c.get(heartbeat_key) or { continue } + last_heartbeat := val.i64() + if (time.now().unix() - last_heartbeat) < 60 { // 60 seconds threshold + is_available = true + break + } + } + + status := if is_available { 'available' } else { 'unavailable' } + e.report_node_health(node_id, status) + } + + // Check for consensus on failed nodes + e.check_node_availability() + } + + time.sleep(health_check_interval_ms * time.millisecond) + } } // Heartbeat loop -fn (mut e Election) heartbeat_loop() { - for { - // Update own heartbeat - now := time.now().unix() - for mut c in e.clients { - heartbeat_key := 'heartbeat:${e.self.id}' - c.set(heartbeat_key, now.str()) or {} - c.expire(heartbeat_key, 120) or {} // expire after 2 minutes - } - - if e.self.status == .active { - if e.self.leader { - println('[${e.self.id}] Heartbeat term=${e.self.term} (LEADER)') - } else { - e.run_election() - } - } else if e.self.status == .buffer { - println('[${e.self.id}] Buffer node monitoring cluster') - } - - time.sleep(heartbeat_interval_ms * time.millisecond) - } +pub fn (mut e Election) heartbeat_loop() { + for { + // Update own heartbeat + now := time.now().unix() + for mut c in e.clients { + heartbeat_key := 'heartbeat:${e.self.id}' + c.set(heartbeat_key, now.str()) or {} + c.expire(heartbeat_key, 120) or {} // expire after 2 minutes + } + + if e.self.status == .active { + if e.self.leader { + println('[${e.self.id}] Heartbeat term=${e.self.term} (LEADER)') + } else { + e.run_election() + } + } else if e.self.status == .buffer { + println('[${e.self.id}] Buffer node monitoring cluster') + } + + time.sleep(heartbeat_interval_ms * time.millisecond) + } } diff --git a/lib/mcp/rhai/example/example.vsh b/lib/mcp/rhai/example/example.vsh index 150f0f10..b72d7bff 100755 --- a/lib/mcp/rhai/example/example.vsh +++ b/lib/mcp/rhai/example/example.vsh @@ -1,9 +1,13 @@ #!/usr/bin/env -S v -n -w -gc none -cc tcc -d use_openssl -enable-globals run import freeflowuniverse.herolib.mcp.aitools.escalayer +import freeflowuniverse.herolib.core.redisclient import os fn main() { + // Example of using redisclient module instead of old redis.Connection + redis_example() or { println('Redis example failed: ${err}') } + // Get the current directory where this script is located current_dir := os.dir(@FILE) @@ -594,3 +598,64 @@ fn extract_functions_from_code(code string) []string { return functions } + +// Example function showing how to use redisclient module instead of old redis.Connection +fn redis_example() ! { + // OLD WAY (don't use this): + // mut conns := []redis.Connection{} + // for s in servers { + // mut c := redis.connect(redis.Options{ server: s }) or { + // panic('could not connect to redis $s: $err') + // } + // conns << c + // } + + // NEW WAY using redisclient module: + servers := ['127.0.0.1:6379', '127.0.0.1:6380', '127.0.0.1:6381', '127.0.0.1:6382'] + mut redis_clients := []&redisclient.Redis{} + + for server in servers { + // Parse server address + redis_url := redisclient.get_redis_url(server) or { + println('Failed to parse Redis URL ${server}: ${err}') + continue + } + + // Create Redis client using redisclient module + mut redis_client := redisclient.core_get(redis_url) or { + println('Failed to connect to Redis ${server}: ${err}') + continue + } + + // Test the connection + redis_client.ping() or { + println('Failed to ping Redis ${server}: ${err}') + continue + } + + redis_clients << redis_client + println('Successfully connected to Redis server: ${server}') + } + + // Example usage of Redis operations + if redis_clients.len > 0 { + mut redis := redis_clients[0] + + // Set a test key + redis.set('test_key', 'test_value') or { + println('Failed to set test key: ${err}') + return + } + + // Get the test key + value := redis.get('test_key') or { + println('Failed to get test key: ${err}') + return + } + + println('Redis test successful - key: test_key, value: ${value}') + + // Clean up + redis.del('test_key') or { println('Failed to delete test key: ${err}') } + } +}