From 9a23c4cc09983bfb1515dcfaf5d7beeae0133a46 Mon Sep 17 00:00:00 2001 From: Maxime Van Hees Date: Thu, 15 May 2025 17:30:20 +0200 Subject: [PATCH] sending and receiving message via Rhai script + added examples --- .../mycelium/mycelium_receive_message.rhai | 31 ++++++++++++++ examples/mycelium/mycelium_send_message.rhai | 25 +++++++++++ src/mycelium/mod.rs | 42 ++++++++++++------- src/rhai/mycelium.rs | 37 ++++++---------- 4 files changed, 97 insertions(+), 38 deletions(-) create mode 100644 examples/mycelium/mycelium_receive_message.rhai create mode 100644 examples/mycelium/mycelium_send_message.rhai diff --git a/examples/mycelium/mycelium_receive_message.rhai b/examples/mycelium/mycelium_receive_message.rhai new file mode 100644 index 0000000..80aef19 --- /dev/null +++ b/examples/mycelium/mycelium_receive_message.rhai @@ -0,0 +1,31 @@ +// Script to receive Mycelium messages + +// API URL for Mycelium +let api_url = "http://localhost:2222"; + +// Receive messages +// This script will listen for messages on a specific topic. +// Ensure the sender script is using the same topic. +// -----------------------------------------------------------------------------// +print("\nReceiving messages:"); +let receive_topic = "test_topic"; +let wait_deadline_secs = 100; + +print(`Listening for messages on topic '${receive_topic}'...`); +try { + let messages = mycelium_receive_messages(api_url, receive_topic, wait_deadline_secs); + + if messages.is_empty() { + // print("No new messages received in this poll."); + } else { + print("Received a message:"); + print(` Message id: ${messages.id}`); + print(` Message from: ${messages.srcIp}`); + print(` Topic: ${messages.topic}`); + print(` Payload: ${messages.payload}`); + } +} catch(err) { + print(`Error receiving messages: ${err}`); +} + +print("Finished attempting to receive messages."); \ No newline at end of file diff --git a/examples/mycelium/mycelium_send_message.rhai b/examples/mycelium/mycelium_send_message.rhai new file mode 100644 index 0000000..9be348c --- /dev/null +++ b/examples/mycelium/mycelium_send_message.rhai @@ -0,0 +1,25 @@ +// Script to send a Mycelium message + +// API URL for Mycelium +let api_url = "http://localhost:1111"; + +// Send a message +// TO SEND A MESSAGE FILL IN THE DESTINATION IP ADDRESS +// -----------------------------------------------------// +print("\nSending a message:"); +let destination = "5af:ae6b:dcd8:ffdb:b71:7dde:d3:1033"; // IMPORTANT: Replace with the actual destination IP address +let topic = "test_topic"; +let message = "Hello from Rhai sender!"; +let deadline_secs = -10; // Seconds we wait for a reply + +try { + print(`Attempting to send message to ${destination} on topic '${topic}'`); + let result = mycelium_send_message(api_url, destination, topic, message, deadline_secs); + print(`result: ${result}`); + print(`Message sent: ${result.success}`); + if result.id != "" { + print(`Message ID: ${result.id}`); + } +} catch(err) { + print(`Error sending message: ${err}`); +} \ No newline at end of file diff --git a/src/mycelium/mod.rs b/src/mycelium/mod.rs index 00847c2..76b76f7 100644 --- a/src/mycelium/mod.rs +++ b/src/mycelium/mod.rs @@ -222,7 +222,7 @@ pub async fn list_fallback_routes(api_url: &str) -> Result { /// * `destination` - The destination address /// * `topic` - The message topic /// * `message` - The message content -/// * `deadline_secs` - The deadline in seconds +/// * `reply_deadline` - The deadline in seconds; pass `-1` to indicate we do not want to wait on a reply /// /// # Returns /// @@ -232,20 +232,21 @@ pub async fn send_message( destination: &str, topic: &str, message: &str, - deadline_secs: u64, + reply_deadline: Option, // This is passed in URL query ) -> Result { let client = Client::new(); let url = format!("{}/api/v1/messages", api_url); - // Convert deadline to seconds - let deadline = Duration::from_secs(deadline_secs).as_secs(); + let mut request = client.post(&url); + if let Some(deadline) = reply_deadline { + request = request.query(&[("reply_timeout", deadline.as_secs())]); + } - let response = client - .post(&url) + let response = request .json(&serde_json::json!({ - "dst": { "ip": destination }, - "topic": general_purpose::STANDARD.encode(topic), - "payload": general_purpose::STANDARD.encode(message) + "dst": { "ip": destination }, + "topic": general_purpose::STANDARD.encode(topic), + "payload": general_purpose::STANDARD.encode(message) })) .send() .await @@ -270,18 +271,31 @@ pub async fn send_message( /// /// * `api_url` - The URL of the Mycelium API /// * `topic` - The message topic -/// * `count` - The maximum number of messages to receive +/// * `wait_deadline` - Time we wait for receiving a message /// /// # Returns /// /// * `Result` - The received messages as a JSON value, or an error message -pub async fn receive_messages(api_url: &str, topic: &str, count: u32) -> Result { +pub async fn receive_messages( + api_url: &str, + topic: &str, + wait_deadline: Option, +) -> Result { let client = Client::new(); let url = format!("{}/api/v1/messages", api_url); - let response = client - .get(&url) - .query(&[("topic", general_purpose::STANDARD.encode(topic))]) + let mut request = client.get(&url); + + if let Some(deadline) = wait_deadline { + request = request.query(&[ + ("topic", general_purpose::STANDARD.encode(topic)), + ("timeout", deadline.as_secs().to_string()), + ]) + } else { + request = request.query(&[("topic", general_purpose::STANDARD.encode(topic))]) + }; + + let response = request .send() .await .map_err(|e| format!("Failed to send request: {}", e))?; diff --git a/src/rhai/mycelium.rs b/src/rhai/mycelium.rs index 4a9b342..03cc68c 100644 --- a/src/rhai/mycelium.rs +++ b/src/rhai/mycelium.rs @@ -2,6 +2,8 @@ //! //! This module provides Rhai wrappers for the functions in the Mycelium client module. +use std::time::Duration; + use rhai::{Engine, EvalAltResult, Array, Dynamic, Map}; use crate::mycelium as client; use tokio::runtime::Runtime; @@ -182,17 +184,13 @@ pub fn mycelium_list_fallback_routes(api_url: &str) -> Result Result> { +pub fn mycelium_send_message(api_url: &str, destination: &str, topic: &str, message: &str, reply_deadline_secs: i64) -> Result> { let rt = get_runtime()?; - - // Convert deadline to u64 - let deadline = if deadline_secs < 0 { - return Err(Box::new(EvalAltResult::ErrorRuntime( - "Deadline cannot be negative".into(), - rhai::Position::NONE - ))); + + let deadline = if reply_deadline_secs < 0 { + None } else { - deadline_secs as u64 + Some(Duration::from_secs(reply_deadline_secs as u64)) }; let result = rt.block_on(async { @@ -207,26 +205,17 @@ pub fn mycelium_send_message(api_url: &str, destination: &str, topic: &str, mess /// Wrapper for mycelium::receive_messages /// /// Receives messages from a topic via the Mycelium node. -pub fn mycelium_receive_messages(api_url: &str, topic: &str, count: i64) -> Result> { +pub fn mycelium_receive_messages(api_url: &str, topic: &str, wait_deadline_secs: i64) -> Result> { let rt = get_runtime()?; - - // Convert count to u32 - let count = if count < 0 { - return Err(Box::new(EvalAltResult::ErrorRuntime( - "Count cannot be negative".into(), - rhai::Position::NONE - ))); - } else if count > u32::MAX as i64 { - return Err(Box::new(EvalAltResult::ErrorRuntime( - format!("Count too large, maximum is {}", u32::MAX).into(), - rhai::Position::NONE - ))); + + let deadline = if wait_deadline_secs < 0 { + None } else { - count as u32 + Some(Duration::from_secs(wait_deadline_secs as u64)) }; let result = rt.block_on(async { - client::receive_messages(api_url, topic, count).await + client::receive_messages(api_url, topic, deadline).await }); let messages = result.to_rhai_error()?;