add ws client and server packages
This commit is contained in:
parent
22fae9de66
commit
f22d40c980
@ -1,3 +1,3 @@
|
||||
# Circles
|
||||
|
||||
Architecture for a digital life.
|
||||
Architecture around our digital selves.
|
1
client_ws/.gitignore
vendored
Normal file
1
client_ws/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
/target
|
31
client_ws/Cargo.toml
Normal file
31
client_ws/Cargo.toml
Normal file
@ -0,0 +1,31 @@
|
||||
[package]
|
||||
name = "circle_client_ws"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
uuid = { version = "1.6", features = ["v4", "serde", "js"] }
|
||||
log = "0.4"
|
||||
futures-channel = { version = "0.3", features = ["sink"] } # For mpsc
|
||||
futures-util = { version = "0.3", features = ["sink"] } # For StreamExt, SinkExt
|
||||
thiserror = "1.0"
|
||||
async-trait = "0.1" # May be needed for abstracting WS connection
|
||||
|
||||
# WASM-specific dependencies
|
||||
[target.'cfg(target_arch = "wasm32")'.dependencies]
|
||||
gloo-net = { version = "0.4.0", features = ["websocket"] }
|
||||
wasm-bindgen-futures = "0.4"
|
||||
gloo-console = "0.3.0" # For wasm logging if needed, or use `log` with wasm_logger
|
||||
|
||||
# Native-specific dependencies
|
||||
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
|
||||
tokio-tungstenite = { version = "0.23.0", features = ["native-tls"] }
|
||||
tokio = { version = "1", features = ["rt", "macros"] } # For tokio::spawn on native
|
||||
url = "2.5.0" # For native WebSocket connection
|
||||
|
||||
[dev-dependencies]
|
||||
# For examples within this crate, if any, or for testing
|
||||
env_logger = "0.10"
|
||||
# tokio = { version = "1", features = ["full"] } # If examples need full tokio runtime
|
86
client_ws/README.md
Normal file
86
client_ws/README.md
Normal file
@ -0,0 +1,86 @@
|
||||
# Circle WebSocket Client (`circle_client_ws`)
|
||||
|
||||
This crate provides a WebSocket client (`CircleWsClient`) designed to interact with a server that expects JSON-RPC messages, specifically for executing Rhai scripts.
|
||||
|
||||
It is designed to be compatible with both WebAssembly (WASM) environments (e.g., web browsers) and native Rust applications.
|
||||
|
||||
## Features
|
||||
|
||||
- **Cross-Platform:** Works in WASM and native environments.
|
||||
- Uses `gloo-net` for WebSockets in WASM.
|
||||
- Uses `tokio-tungstenite` for WebSockets in native applications.
|
||||
- **JSON-RPC Communication:** Implements client-side JSON-RPC 2.0 request and response handling.
|
||||
- **Rhai Script Execution:** Provides a `play(script: String)` method to send Rhai scripts to the server for execution and receive their output.
|
||||
- **Asynchronous Operations:** Leverages `async/await` and `futures` for non-blocking communication.
|
||||
- **Connection Management:** Supports connecting to and disconnecting from a WebSocket server.
|
||||
- **Error Handling:** Defines a comprehensive `CircleWsClientError` enum for various client-side errors.
|
||||
|
||||
## Core Component
|
||||
|
||||
- **`CircleWsClient`**: The main client struct.
|
||||
- `new(ws_url: String)`: Creates a new client instance targeting the given WebSocket URL.
|
||||
- `connect()`: Establishes the WebSocket connection.
|
||||
- `play(script: String)`: Sends a Rhai script to the server for execution and returns the result.
|
||||
- `disconnect()`: Closes the WebSocket connection.
|
||||
|
||||
## Usage Example (Conceptual)
|
||||
|
||||
```rust
|
||||
use circle_client_ws::CircleWsClient;
|
||||
|
||||
async fn run_client() {
|
||||
let mut client = CircleWsClient::new("ws://localhost:8080/ws".to_string());
|
||||
|
||||
if let Err(e) = client.connect().await {
|
||||
eprintln!("Failed to connect: {}", e);
|
||||
return;
|
||||
}
|
||||
|
||||
let script = "print(\"Hello from Rhai via WebSocket!\"); 40 + 2".to_string();
|
||||
|
||||
match client.play(script).await {
|
||||
Ok(result) => {
|
||||
println!("Script output: {}", result.output);
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Error during play: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
client.disconnect().await;
|
||||
}
|
||||
|
||||
// To run this example, you'd need an async runtime like tokio for native
|
||||
// or wasm-bindgen-test for WASM.
|
||||
```
|
||||
|
||||
## Building
|
||||
|
||||
### Native
|
||||
```bash
|
||||
cargo build
|
||||
```
|
||||
|
||||
### WASM
|
||||
```bash
|
||||
cargo build --target wasm32-unknown-unknown
|
||||
```
|
||||
|
||||
## Dependencies
|
||||
|
||||
Key dependencies include:
|
||||
|
||||
- `serde`, `serde_json`: For JSON serialization/deserialization.
|
||||
- `futures-channel`, `futures-util`: For asynchronous stream and sink handling.
|
||||
- `uuid`: For generating unique request IDs.
|
||||
- `log`: For logging.
|
||||
- `thiserror`: For error type definitions.
|
||||
|
||||
**WASM-specific:**
|
||||
- `gloo-net`: For WebSocket communication in WASM.
|
||||
- `wasm-bindgen-futures`: To bridge Rust futures with JavaScript promises.
|
||||
|
||||
**Native-specific:**
|
||||
- `tokio-tungstenite`: For WebSocket communication in native environments.
|
||||
- `tokio`: Asynchronous runtime for native applications.
|
||||
- `url`: For URL parsing.
|
419
client_ws/src/lib.rs
Normal file
419
client_ws/src/lib.rs
Normal file
@ -0,0 +1,419 @@
|
||||
use futures_channel::{mpsc, oneshot};
|
||||
use futures_util::{StreamExt, SinkExt, FutureExt};
|
||||
use log::{debug, error, info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use thiserror::Error;
|
||||
use uuid::Uuid;
|
||||
|
||||
// Platform-specific WebSocket imports and spawn function
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use {
|
||||
gloo_net::websocket::{futures::WebSocket, Message as GlooWsMessage, WebSocketError as GlooWebSocketError},
|
||||
wasm_bindgen_futures::spawn_local,
|
||||
};
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use {
|
||||
tokio_tungstenite::{
|
||||
connect_async,
|
||||
tungstenite::protocol::Message as TungsteniteWsMessage,
|
||||
// tungstenite::error::Error as TungsteniteError, // Unused
|
||||
},
|
||||
tokio::spawn as spawn_local, // Use tokio::spawn for native
|
||||
// url::Url, // Url::parse is not used in the final connect_async call path
|
||||
};
|
||||
|
||||
|
||||
// JSON-RPC Structures (client-side perspective)
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
pub struct JsonRpcRequestClient {
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
params: Value,
|
||||
id: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct JsonRpcResponseClient {
|
||||
jsonrpc: String,
|
||||
pub result: Option<Value>,
|
||||
pub error: Option<JsonRpcErrorClient>,
|
||||
pub id: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct JsonRpcErrorClient {
|
||||
pub code: i32,
|
||||
pub message: String,
|
||||
pub data: Option<Value>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
pub struct PlayParamsClient {
|
||||
pub script: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct PlayResultClient {
|
||||
pub output: String,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum CircleWsClientError {
|
||||
#[error("WebSocket connection error: {0}")]
|
||||
ConnectionError(String),
|
||||
#[error("WebSocket send error: {0}")]
|
||||
SendError(String),
|
||||
#[error("WebSocket receive error: {0}")]
|
||||
ReceiveError(String),
|
||||
#[error("JSON serialization/deserialization error: {0}")]
|
||||
JsonError(#[from] serde_json::Error),
|
||||
#[error("Request timed out for request ID: {0}")]
|
||||
Timeout(String),
|
||||
#[error("JSON-RPC error response: {code} - {message}")]
|
||||
JsonRpcError { code: i32, message: String, data: Option<Value> },
|
||||
#[error("No response received for request ID: {0}")]
|
||||
NoResponse(String),
|
||||
#[error("Client is not connected")]
|
||||
NotConnected,
|
||||
#[error("Internal channel error: {0}")]
|
||||
ChannelError(String),
|
||||
}
|
||||
|
||||
// Wrapper for messages sent to the WebSocket task
|
||||
enum InternalWsMessage {
|
||||
SendJsonRpc(JsonRpcRequestClient, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>),
|
||||
Close,
|
||||
}
|
||||
|
||||
pub struct CircleWsClient {
|
||||
ws_url: String,
|
||||
// Sender to the internal WebSocket task
|
||||
internal_tx: Option<mpsc::Sender<InternalWsMessage>>,
|
||||
// Handle to the spawned WebSocket task (for native, to join on drop if desired)
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
task_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
// Callback for unsolicited messages (e.g. notifications from server)
|
||||
// For Yew, this would typically be a yew::Callback<JsonRpcResponseClient> or similar
|
||||
// For simplicity in this generic client, we'll use a Box<dyn Fn(JsonRpcResponseClient) + Send + Sync>
|
||||
// This part is more complex to make fully generic and easy for Yew, so keeping it simple for now.
|
||||
// unsolicited_message_callback: Option<Box<dyn Fn(JsonRpcResponseClient) + Send + Sync + 'static>>,
|
||||
}
|
||||
|
||||
impl CircleWsClient {
|
||||
pub fn new(ws_url: String) -> Self {
|
||||
Self {
|
||||
ws_url,
|
||||
internal_tx: None,
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
task_handle: None,
|
||||
// unsolicited_message_callback: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connect(&mut self) -> Result<(), CircleWsClientError> {
|
||||
if self.internal_tx.is_some() {
|
||||
info!("Client already connected or connecting.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let (internal_tx, internal_rx) = mpsc::channel::<InternalWsMessage>(32);
|
||||
self.internal_tx = Some(internal_tx);
|
||||
|
||||
let url = self.ws_url.clone();
|
||||
|
||||
// Pending requests: map request_id to a oneshot sender for the response
|
||||
let pending_requests: Arc<Mutex<HashMap<String, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let task_pending_requests = pending_requests.clone();
|
||||
|
||||
let task = async move {
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let ws_result = WebSocket::open(&url);
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let connect_attempt = async {
|
||||
// Validate URL parsing separately if needed, but connect_async takes &str
|
||||
// let parsed_url = Url::parse(&url).map_err(|e| CircleWsClientError::ConnectionError(format!("Invalid URL: {}", e)))?;
|
||||
connect_async(&url).await.map_err(|e| CircleWsClientError::ConnectionError(e.to_string()))
|
||||
};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let ws_result = connect_attempt.await;
|
||||
|
||||
match ws_result {
|
||||
Ok(ws_conn_maybe_response) => {
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let ws_conn = ws_conn_maybe_response;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let (ws_conn, _) = ws_conn_maybe_response;
|
||||
|
||||
info!("Successfully connected to WebSocket: {}", url);
|
||||
let (mut ws_tx, mut ws_rx) = ws_conn.split();
|
||||
let mut internal_rx_fused = internal_rx.fuse();
|
||||
|
||||
loop {
|
||||
futures_util::select! {
|
||||
// Handle messages from the client's public methods (e.g., play)
|
||||
internal_msg = internal_rx_fused.next().fuse() => {
|
||||
match internal_msg {
|
||||
Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => {
|
||||
let req_id = req.id.clone();
|
||||
match serde_json::to_string(&req) {
|
||||
Ok(req_str) => {
|
||||
debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str);
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let send_res = ws_tx.send(GlooWsMessage::Text(req_str)).await;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let send_res = ws_tx.send(TungsteniteWsMessage::Text(req_str)).await;
|
||||
|
||||
if let Err(e) = send_res {
|
||||
error!("WebSocket send error for request ID {}: {:?}", req_id, e);
|
||||
let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string())));
|
||||
} else {
|
||||
// Store the sender to await the response
|
||||
task_pending_requests.lock().unwrap().insert(req_id, response_sender);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to serialize request ID {}: {}", req_id, e);
|
||||
let _ = response_sender.send(Err(CircleWsClientError::JsonError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(InternalWsMessage::Close) => {
|
||||
info!("Close message received internally, closing WebSocket.");
|
||||
let _ = ws_tx.close().await;
|
||||
break;
|
||||
}
|
||||
None => { // internal_rx closed, meaning client was dropped
|
||||
info!("Internal MPSC channel closed, WebSocket task shutting down.");
|
||||
let _ = ws_tx.close().await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
// Handle messages received from the WebSocket server
|
||||
ws_msg_res = ws_rx.next().fuse() => {
|
||||
match ws_msg_res {
|
||||
Some(Ok(msg)) => {
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
match msg {
|
||||
GlooWsMessage::Text(text) => {
|
||||
debug!("Received WebSocket message: {}", text);
|
||||
// ... (parse logic as before)
|
||||
match serde_json::from_str::<JsonRpcResponseClient>(&text) {
|
||||
Ok(response) => {
|
||||
if let Some(sender) = task_pending_requests.lock().unwrap().remove(&response.id) {
|
||||
if let Err(failed_send_val) = sender.send(Ok(response)) {
|
||||
if let Ok(resp_for_log) = failed_send_val { warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id); }
|
||||
else { warn!("Failed to send response to waiting task, and also failed to get original response for logging.");}
|
||||
}
|
||||
} else { warn!("Received response for unknown request ID or unsolicited message: {:?}", response); }
|
||||
}
|
||||
Err(e) => { error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text); }
|
||||
}
|
||||
}
|
||||
GlooWsMessage::Bytes(_) => {
|
||||
debug!("Received binary WebSocket message (WASM).");
|
||||
}
|
||||
}
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
match msg {
|
||||
TungsteniteWsMessage::Text(text) => {
|
||||
debug!("Received WebSocket message: {}", text);
|
||||
// ... (parse logic as before)
|
||||
match serde_json::from_str::<JsonRpcResponseClient>(&text) {
|
||||
Ok(response) => {
|
||||
if let Some(sender) = task_pending_requests.lock().unwrap().remove(&response.id) {
|
||||
if let Err(failed_send_val) = sender.send(Ok(response)) {
|
||||
if let Ok(resp_for_log) = failed_send_val { warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id); }
|
||||
else { warn!("Failed to send response to waiting task, and also failed to get original response for logging.");}
|
||||
}
|
||||
} else { warn!("Received response for unknown request ID or unsolicited message: {:?}", response); }
|
||||
}
|
||||
Err(e) => { error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text); }
|
||||
}
|
||||
}
|
||||
TungsteniteWsMessage::Binary(_) => {
|
||||
debug!("Received binary WebSocket message (Native).");
|
||||
}
|
||||
TungsteniteWsMessage::Ping(_) | TungsteniteWsMessage::Pong(_) => {
|
||||
debug!("Received Ping/Pong (Native).");
|
||||
}
|
||||
TungsteniteWsMessage::Close(_) => {
|
||||
info!("WebSocket connection closed by server (Native).");
|
||||
break;
|
||||
}
|
||||
TungsteniteWsMessage::Frame(_) => {
|
||||
debug!("Received Frame (Native) - not typically handled directly.");
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!("WebSocket receive error: {:?}", e);
|
||||
break; // Exit loop on receive error
|
||||
}
|
||||
None => { // WebSocket stream closed
|
||||
info!("WebSocket connection closed by server (stream ended).");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Cleanup pending requests on exit
|
||||
task_pending_requests.lock().unwrap().drain().for_each(|(_, sender)| {
|
||||
let _ = sender.send(Err(CircleWsClientError::ConnectionError("WebSocket task terminated".to_string())));
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to connect to WebSocket: {:?}", e);
|
||||
// Notify any waiting senders about the connection failure
|
||||
internal_rx.for_each(|msg| async {
|
||||
if let InternalWsMessage::SendJsonRpc(_, response_sender) = msg {
|
||||
let _ = response_sender.send(Err(CircleWsClientError::ConnectionError(e.to_string())));
|
||||
}
|
||||
}).await;
|
||||
}
|
||||
}
|
||||
info!("WebSocket task finished.");
|
||||
};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
spawn_local(task);
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{ self.task_handle = Some(spawn_local(task)); }
|
||||
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn play(&self, script: String) -> impl std::future::Future<Output = Result<PlayResultClient, CircleWsClientError>> + Send + 'static {
|
||||
let req_id_outer = Uuid::new_v4().to_string();
|
||||
|
||||
// Clone the sender option. The sender itself (mpsc::Sender) is also Clone.
|
||||
let internal_tx_clone_opt = self.internal_tx.clone();
|
||||
|
||||
async move {
|
||||
let req_id = req_id_outer; // Move req_id into the async block
|
||||
let params = PlayParamsClient { script }; // script is moved in
|
||||
|
||||
let request = match serde_json::to_value(params) {
|
||||
Ok(p_val) => JsonRpcRequestClient {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
method: "play".to_string(),
|
||||
params: p_val,
|
||||
id: req_id.clone(),
|
||||
},
|
||||
Err(e) => return Err(CircleWsClientError::JsonError(e)),
|
||||
};
|
||||
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
|
||||
if let Some(mut internal_tx) = internal_tx_clone_opt {
|
||||
internal_tx.send(InternalWsMessage::SendJsonRpc(request, response_tx)).await
|
||||
.map_err(|e| CircleWsClientError::ChannelError(format!("Failed to send request to internal task: {}", e)))?;
|
||||
} else {
|
||||
return Err(CircleWsClientError::NotConnected);
|
||||
}
|
||||
|
||||
// Add a timeout for waiting for the response
|
||||
// For simplicity, using a fixed timeout here. Could be configurable.
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
match response_rx.await {
|
||||
Ok(Ok(rpc_response)) => {
|
||||
if let Some(json_rpc_error) = rpc_response.error {
|
||||
Err(CircleWsClientError::JsonRpcError {
|
||||
code: json_rpc_error.code,
|
||||
message: json_rpc_error.message,
|
||||
data: json_rpc_error.data,
|
||||
})
|
||||
} else if let Some(result_value) = rpc_response.result {
|
||||
serde_json::from_value(result_value).map_err(CircleWsClientError::JsonError)
|
||||
} else {
|
||||
Err(CircleWsClientError::NoResponse(req_id.clone()))
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => Err(e), // Error propagated from the ws task
|
||||
Err(_) => Err(CircleWsClientError::Timeout(req_id.clone())), // oneshot channel cancelled
|
||||
}
|
||||
}
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
use tokio::time::timeout as tokio_timeout;
|
||||
match tokio_timeout(std::time::Duration::from_secs(10), response_rx).await {
|
||||
Ok(Ok(Ok(rpc_response))) => { // Timeout -> Result<ChannelRecvResult, Error>
|
||||
if let Some(json_rpc_error) = rpc_response.error {
|
||||
Err(CircleWsClientError::JsonRpcError {
|
||||
code: json_rpc_error.code,
|
||||
message: json_rpc_error.message,
|
||||
data: json_rpc_error.data,
|
||||
})
|
||||
} else if let Some(result_value) = rpc_response.result {
|
||||
serde_json::from_value(result_value).map_err(CircleWsClientError::JsonError)
|
||||
} else {
|
||||
Err(CircleWsClientError::NoResponse(req_id.clone()))
|
||||
}
|
||||
}
|
||||
Ok(Ok(Err(e))) => Err(e), // Error propagated from the ws task
|
||||
Ok(Err(_)) => Err(CircleWsClientError::ChannelError("Response channel cancelled".to_string())), // oneshot cancelled
|
||||
Err(_) => Err(CircleWsClientError::Timeout(req_id.clone())), // tokio_timeout expired
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn disconnect(&mut self) {
|
||||
if let Some(mut tx) = self.internal_tx.take() {
|
||||
info!("Sending close signal to internal WebSocket task.");
|
||||
let _ = tx.send(InternalWsMessage::Close).await;
|
||||
}
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
if let Some(handle) = self.task_handle.take() {
|
||||
let _ = handle.await; // Wait for the task to finish
|
||||
}
|
||||
info!("Client disconnected.");
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure client cleans up on drop for native targets
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
impl Drop for CircleWsClient {
|
||||
fn drop(&mut self) {
|
||||
if self.internal_tx.is_some() || self.task_handle.is_some() {
|
||||
warn!("CircleWsClient dropped without explicit disconnect. Spawning task to send close signal.");
|
||||
// We can't call async disconnect directly in drop.
|
||||
// Spawn a new task to send the close message if on native.
|
||||
if let Some(mut tx) = self.internal_tx.take() {
|
||||
spawn_local(async move {
|
||||
info!("Drop: Sending close signal to internal WebSocket task.");
|
||||
let _ = tx.send(InternalWsMessage::Close).await;
|
||||
});
|
||||
}
|
||||
if let Some(handle) = self.task_handle.take() {
|
||||
spawn_local(async move {
|
||||
info!("Drop: Waiting for WebSocket task to finish.");
|
||||
let _ = handle.await;
|
||||
info!("Drop: WebSocket task finished.");
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
// use super::*;
|
||||
#[test]
|
||||
fn it_compiles() {
|
||||
assert_eq!(2 + 2, 4);
|
||||
}
|
||||
}
|
1
server_ws/.gitignore
vendored
Normal file
1
server_ws/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
/target
|
2505
server_ws/Cargo.lock
generated
Normal file
2505
server_ws/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
28
server_ws/Cargo.toml
Normal file
28
server_ws/Cargo.toml
Normal file
@ -0,0 +1,28 @@
|
||||
[package]
|
||||
name = "circle_server_ws"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
actix-web = "4"
|
||||
actix-web-actors = "4"
|
||||
actix = "0.13"
|
||||
env_logger = "0.10"
|
||||
log = "0.4"
|
||||
clap = { version = "4.4", features = ["derive"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
redis = { version = "0.25.0", features = ["tokio-comp"] } # For async Redis with Actix
|
||||
uuid = { version = "1.6", features = ["v4", "serde"] }
|
||||
tokio = { version = "1", features = ["macros", "rt-multi-thread"] } # For polling interval
|
||||
chrono = { version = "0.4", features = ["serde"] } # For timestamps
|
||||
rhai_client = { path = "/Users/timurgordon/code/git.ourworld.tf/herocode/rhaj/src/client" }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-tungstenite = { version = "0.23.0", features = ["native-tls"] }
|
||||
futures-util = "0.3" # For StreamExt and SinkExt on WebSocket stream
|
||||
url = "2.5.0" # For parsing WebSocket URL
|
||||
circle_client_ws = { path = "../client_ws" }
|
||||
uuid = { version = "1.6", features = ["v4", "serde"] } # For e2e example, if it still uses Uuid directly for req id
|
52
server_ws/README.md
Normal file
52
server_ws/README.md
Normal file
@ -0,0 +1,52 @@
|
||||
# Circle Server WebSocket (`server_ws`)
|
||||
|
||||
## Overview
|
||||
|
||||
The `server_ws` component is an Actix-based WebSocket server designed to handle client connections and execute Rhai scripts. It acts as a bridge between WebSocket clients and a Rhai scripting engine, facilitating remote script execution and result retrieval.
|
||||
|
||||
## Key Features
|
||||
|
||||
* **WebSocket Communication:** Establishes and manages WebSocket connections with clients.
|
||||
* **Rhai Script Execution:** Receives Rhai scripts from clients, submits them for execution via `rhai_client`, and returns the results.
|
||||
* **Timeout Management:** Implements timeouts for Rhai script execution to prevent indefinite blocking, returning specific error codes on timeout.
|
||||
* **Asynchronous Processing:** Leverages Actix actors for concurrent handling of multiple client connections and script executions.
|
||||
|
||||
## Core Components
|
||||
|
||||
* **`CircleWs` Actor:** The primary Actix actor responsible for handling individual WebSocket sessions. It manages the lifecycle of a client connection, processes incoming messages (Rhai scripts), and sends back results or errors.
|
||||
* **`rhai_client` Integration:** Utilizes the `rhai_client` crate to submit scripts to a shared Rhai processing service (likely Redis-backed for task queuing and result storage) and await their completion.
|
||||
|
||||
## Dependencies
|
||||
|
||||
* `actix`: Actor framework for building concurrent applications.
|
||||
* `actix-web-actors`: WebSocket support for Actix.
|
||||
* `rhai_client`: Client library for interacting with the Rhai scripting service.
|
||||
* `serde_json`: For serializing and deserializing JSON messages exchanged over WebSockets.
|
||||
* `uuid`: For generating unique task identifiers.
|
||||
|
||||
## Workflow
|
||||
|
||||
1. A client establishes a WebSocket connection to the `/ws/` endpoint.
|
||||
2. The server upgrades the connection and spawns a `CircleWs` actor instance for that session.
|
||||
3. The client sends a JSON-RPC formatted message containing the Rhai script to be executed.
|
||||
4. The `CircleWs` actor parses the message and uses `rhai_client::RhaiClient::submit_script_and_await_result` to send the script for execution. This method handles the interaction with the underlying task queue (e.g., Redis) and waits for the script's outcome.
|
||||
5. The `rhai_client` will return the script's result or an error (e.g., timeout, script error).
|
||||
6. `CircleWs` formats the result/error into a JSON-RPC response and sends it back to the client over the WebSocket.
|
||||
|
||||
## Configuration
|
||||
|
||||
* **`REDIS_URL`**: The `rhai_client` component (and thus `server_ws` indirectly) relies on a Redis instance. The connection URL for this Redis instance is typically configured via an environment variable or a constant that `rhai_client` uses.
|
||||
* **Timeout Durations**:
|
||||
* `TASK_TIMEOUT_DURATION` (e.g., 30 seconds): The maximum time the server will wait for a Rhai script to complete execution.
|
||||
* `TASK_POLL_INTERVAL_DURATION` (e.g., 200 milliseconds): The interval at which the `rhai_client` polls for task completion (this is an internal detail of `rhai_client` but relevant to understanding its behavior).
|
||||
|
||||
## Error Handling
|
||||
|
||||
The server implements specific JSON-RPC error responses for various scenarios:
|
||||
* **Script Execution Timeout:** If a script exceeds `TASK_TIMEOUT_DURATION`, a specific error (e.g., code -32002) is returned.
|
||||
* **Other `RhaiClientError`s:** Other errors originating from `rhai_client` (e.g., issues with the Redis connection, script compilation errors detected by the remote Rhai engine) are also translated into appropriate JSON-RPC error responses.
|
||||
* **Message Parsing Errors:** Invalid incoming messages will result in error responses.
|
||||
|
||||
## How to Run
|
||||
|
||||
(Instructions on how to build and run the server would typically go here, e.g., `cargo run --bin circle_server_ws`)
|
143
server_ws/examples/e2e_rhai_flow.rs
Normal file
143
server_ws/examples/e2e_rhai_flow.rs
Normal file
@ -0,0 +1,143 @@
|
||||
use std::process::{Command, Child, Stdio};
|
||||
use std::time::Duration;
|
||||
use std::path::PathBuf;
|
||||
use tokio::time::sleep;
|
||||
// tokio_tungstenite and direct futures_util for ws stream are no longer needed here
|
||||
// use tokio_tungstenite::{connect_async, tungstenite::protocol::Message as WsMessage};
|
||||
// use futures_util::{StreamExt, SinkExt};
|
||||
// use serde_json::Value; // No longer needed as CircleWsClient::play takes String
|
||||
// Uuid is handled by CircleWsClient internally for requests.
|
||||
// use uuid::Uuid;
|
||||
use circle_client_ws::CircleWsClient;
|
||||
// PlayResultClient and CircleWsClientError will be resolved via the client methods if needed,
|
||||
// or this indicates they were not actually needed in the scope of this file directly.
|
||||
// The compiler warning suggests they are unused from this specific import.
|
||||
|
||||
const TEST_CIRCLE_NAME: &str = "e2e_test_circle";
|
||||
const TEST_SERVER_PORT: u16 = 9876; // Choose a unique port for the test
|
||||
const RHAI_WORKER_BIN_NAME: &str = "rhai_worker";
|
||||
const CIRCLE_SERVER_WS_BIN_NAME: &str = "circle_server_ws";
|
||||
|
||||
// RAII guard for cleaning up child processes
|
||||
struct ChildProcessGuard {
|
||||
child: Child,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl ChildProcessGuard {
|
||||
fn new(child: Child, name: String) -> Self {
|
||||
Self { child, name }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ChildProcessGuard {
|
||||
fn drop(&mut self) {
|
||||
log::info!("Cleaning up {} process (PID: {})...", self.name, self.child.id());
|
||||
match self.child.kill() {
|
||||
Ok(_) => {
|
||||
log::info!("Successfully sent kill signal to {} (PID: {}).", self.name, self.child.id());
|
||||
// Optionally wait for a short period or check status
|
||||
match self.child.wait() {
|
||||
Ok(status) => log::info!("{} (PID: {}) exited with status: {}", self.name, self.child.id(), status),
|
||||
Err(e) => log::warn!("Error waiting for {} (PID: {}): {}", self.name, self.child.id(), e),
|
||||
}
|
||||
}
|
||||
Err(e) => log::error!("Failed to kill {} (PID: {}): {}", self.name, self.child.id(), e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn find_target_dir() -> Result<PathBuf, String> {
|
||||
// Try to find the cargo target directory relative to current exe or manifest
|
||||
let mut current_exe = std::env::current_exe().map_err(|e| format!("Failed to get current exe path: {}", e))?;
|
||||
// current_exe is target/debug/examples/e2e_rhai_flow
|
||||
// want target/debug/
|
||||
if current_exe.ends_with("examples/e2e_rhai_flow") { // Adjust if example name changes
|
||||
current_exe.pop(); // remove e2e_rhai_flow
|
||||
current_exe.pop(); // remove examples
|
||||
Ok(current_exe)
|
||||
} else {
|
||||
// Fallback: Assume 'target/debug' relative to workspace root if CARGO_MANIFEST_DIR is set
|
||||
let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").map_err(|_| "CARGO_MANIFEST_DIR not set".to_string())?;
|
||||
let workspace_root = PathBuf::from(manifest_dir).parent().ok_or("Failed to get workspace root")?.to_path_buf();
|
||||
Ok(workspace_root.join("target").join("debug"))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
|
||||
|
||||
let target_dir = find_target_dir().map_err(|e| {
|
||||
log::error!("Could not determine target directory: {}", e);
|
||||
e
|
||||
})?;
|
||||
|
||||
let rhai_worker_path = target_dir.join(RHAI_WORKER_BIN_NAME);
|
||||
let circle_server_ws_path = target_dir.join(CIRCLE_SERVER_WS_BIN_NAME);
|
||||
|
||||
if !rhai_worker_path.exists() {
|
||||
return Err(format!("Rhai worker binary not found at {:?}. Ensure it's built (e.g., cargo build --package rhai_worker)", rhai_worker_path).into());
|
||||
}
|
||||
if !circle_server_ws_path.exists() {
|
||||
return Err(format!("Circle server WS binary not found at {:?}. Ensure it's built (e.g., cargo build --package circle_server_ws)", circle_server_ws_path).into());
|
||||
}
|
||||
|
||||
log::info!("Starting {}...", RHAI_WORKER_BIN_NAME);
|
||||
let rhai_worker_process = Command::new(&rhai_worker_path)
|
||||
.args(["--circles", TEST_CIRCLE_NAME])
|
||||
.stdout(Stdio::piped()) // Capture stdout
|
||||
.stderr(Stdio::piped()) // Capture stderr
|
||||
.spawn()?;
|
||||
let _rhai_worker_guard = ChildProcessGuard::new(rhai_worker_process, RHAI_WORKER_BIN_NAME.to_string());
|
||||
log::info!("{} started with PID {}", RHAI_WORKER_BIN_NAME, _rhai_worker_guard.child.id());
|
||||
|
||||
log::info!("Starting {} for circle '{}' on port {}...", CIRCLE_SERVER_WS_BIN_NAME, TEST_CIRCLE_NAME, TEST_SERVER_PORT);
|
||||
let circle_server_process = Command::new(&circle_server_ws_path)
|
||||
.args(["--port", &TEST_SERVER_PORT.to_string(), "--circle-name", TEST_CIRCLE_NAME])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()?;
|
||||
let _circle_server_guard = ChildProcessGuard::new(circle_server_process, CIRCLE_SERVER_WS_BIN_NAME.to_string());
|
||||
log::info!("{} started with PID {}", CIRCLE_SERVER_WS_BIN_NAME, _circle_server_guard.child.id());
|
||||
|
||||
// Give servers a moment to start
|
||||
sleep(Duration::from_secs(3)).await; // Increased sleep
|
||||
|
||||
let ws_url_str = format!("ws://127.0.0.1:{}/ws", TEST_SERVER_PORT);
|
||||
|
||||
log::info!("Creating CircleWsClient for {}...", ws_url_str);
|
||||
let mut client = CircleWsClient::new(ws_url_str.clone());
|
||||
|
||||
log::info!("Connecting CircleWsClient...");
|
||||
client.connect().await.map_err(|e| {
|
||||
log::error!("CircleWsClient connection failed: {}", e);
|
||||
format!("CircleWsClient connection failed: {}", e)
|
||||
})?;
|
||||
log::info!("CircleWsClient connected successfully.");
|
||||
|
||||
let script_to_run = "let a = 5; let b = 10; print(\"E2E Rhai: \" + (a+b)); a + b";
|
||||
|
||||
log::info!("Sending 'play' request via CircleWsClient for script: '{}'", script_to_run);
|
||||
|
||||
match client.play(script_to_run.to_string()).await {
|
||||
Ok(play_result) => {
|
||||
log::info!("Received play result: {:?}", play_result);
|
||||
assert_eq!(play_result.output, "15");
|
||||
log::info!("E2E Test Passed! Correct output '15' received via CircleWsClient.");
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("CircleWsClient play request failed: {}", e);
|
||||
return Err(format!("CircleWsClient play request failed: {}", e).into());
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("Disconnecting CircleWsClient...");
|
||||
client.disconnect().await;
|
||||
log::info!("CircleWsClient disconnected.");
|
||||
|
||||
log::info!("E2E Rhai flow example completed successfully.");
|
||||
// Guards will automatically clean up child processes when they go out of scope here
|
||||
Ok(())
|
||||
}
|
153
server_ws/examples/timeout_demonstration.rs
Normal file
153
server_ws/examples/timeout_demonstration.rs
Normal file
@ -0,0 +1,153 @@
|
||||
// Example: Timeout Demonstration for circle_server_ws
|
||||
//
|
||||
// This example demonstrates how circle_server_ws handles Rhai scripts that exceed
|
||||
// the configured execution timeout (default 30 seconds).
|
||||
//
|
||||
// This example will attempt to start its own instance of circle_server_ws.
|
||||
// Ensure circle_server_ws is compiled (cargo build --bin circle_server_ws).
|
||||
|
||||
use circle_client_ws::CircleWsClient;
|
||||
use tokio::time::{sleep, Duration};
|
||||
use std::process::{Command, Child, Stdio};
|
||||
use std::path::PathBuf;
|
||||
|
||||
const EXAMPLE_SERVER_PORT: u16 = 8089; // Using a specific port for this example
|
||||
const WS_URL: &str = "ws://127.0.0.1:8089/ws";
|
||||
const CIRCLE_NAME_FOR_EXAMPLE: &str = "timeout_example_circle";
|
||||
const CIRCLE_SERVER_WS_BIN_NAME: &str = "circle_server_ws";
|
||||
const SCRIPT_TIMEOUT_SECONDS: u64 = 30; // This is the server-side timeout we expect to hit
|
||||
|
||||
// RAII guard for cleaning up child processes
|
||||
struct ChildProcessGuard {
|
||||
child: Child,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl ChildProcessGuard {
|
||||
fn new(child: Child, name: String) -> Self {
|
||||
log::info!("{} process started with PID: {}", name, child.id());
|
||||
Self { child, name }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ChildProcessGuard {
|
||||
fn drop(&mut self) {
|
||||
log::info!("Cleaning up {} process (PID: {})...", self.name, self.child.id());
|
||||
match self.child.kill() {
|
||||
Ok(_) => {
|
||||
log::info!("Successfully sent kill signal to {} (PID: {}).", self.name, self.child.id());
|
||||
match self.child.wait() {
|
||||
Ok(status) => log::info!("{} (PID: {}) exited with status: {}", self.name, self.child.id(), status),
|
||||
Err(e) => log::warn!("Error waiting for {} (PID: {}): {}", self.name, self.child.id(), e),
|
||||
}
|
||||
}
|
||||
Err(e) => log::error!("Failed to kill {} (PID: {}): {}", self.name, self.child.id(), e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn find_target_bin_path(bin_name: &str) -> Result<PathBuf, String> {
|
||||
let mut current_exe = std::env::current_exe().map_err(|e| format!("Failed to get current exe path: {}", e))?;
|
||||
// current_exe is typically target/debug/examples/timeout_demonstration
|
||||
// We want to find target/debug/[bin_name]
|
||||
current_exe.pop(); // remove executable name
|
||||
current_exe.pop(); // remove examples directory
|
||||
let target_debug_dir = current_exe;
|
||||
let bin_path = target_debug_dir.join(bin_name);
|
||||
if !bin_path.exists() {
|
||||
// Fallback: try CARGO_BIN_EXE_[bin_name] if running via `cargo run --example` which sets these
|
||||
if let Ok(cargo_bin_path_str) = std::env::var(format!("CARGO_BIN_EXE_{}", bin_name.to_uppercase())) {
|
||||
let cargo_bin_path = PathBuf::from(cargo_bin_path_str);
|
||||
if cargo_bin_path.exists() {
|
||||
return Ok(cargo_bin_path);
|
||||
}
|
||||
}
|
||||
// Fallback: try target/debug/[bin_name] relative to CARGO_MANIFEST_DIR (crate root)
|
||||
if let Ok(manifest_dir_str) = std::env::var("CARGO_MANIFEST_DIR") {
|
||||
let bin_path_rel_manifest = PathBuf::from(manifest_dir_str).join("target").join("debug").join(bin_name);
|
||||
if bin_path_rel_manifest.exists() {
|
||||
return Ok(bin_path_rel_manifest);
|
||||
}
|
||||
}
|
||||
return Err(format!("Binary '{}' not found at {:?}. Ensure it's built.", bin_name, bin_path));
|
||||
}
|
||||
Ok(bin_path)
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
|
||||
|
||||
let server_bin_path = find_target_bin_path(CIRCLE_SERVER_WS_BIN_NAME)?;
|
||||
log::info!("Found server binary at: {:?}", server_bin_path);
|
||||
|
||||
log::info!("Starting {} for circle '{}' on port {}...", CIRCLE_SERVER_WS_BIN_NAME, CIRCLE_NAME_FOR_EXAMPLE, EXAMPLE_SERVER_PORT);
|
||||
let server_process = Command::new(&server_bin_path)
|
||||
.args([
|
||||
"--port", &EXAMPLE_SERVER_PORT.to_string(),
|
||||
"--circle-name", CIRCLE_NAME_FOR_EXAMPLE
|
||||
])
|
||||
.stdout(Stdio::piped()) // Pipe stdout to keep terminal clean, or Stdio::inherit() to see server logs
|
||||
.stderr(Stdio::piped()) // Pipe stderr as well
|
||||
.spawn()
|
||||
.map_err(|e| format!("Failed to start {}: {}. Ensure it is built.", CIRCLE_SERVER_WS_BIN_NAME, e))?;
|
||||
|
||||
let _server_guard = ChildProcessGuard::new(server_process, CIRCLE_SERVER_WS_BIN_NAME.to_string());
|
||||
|
||||
log::info!("Giving the server a moment to start up...");
|
||||
sleep(Duration::from_secs(3)).await; // Wait for server to initialize
|
||||
|
||||
log::info!("Attempting to connect to WebSocket server at: {}", WS_URL);
|
||||
let mut client = CircleWsClient::new(WS_URL.to_string());
|
||||
|
||||
log::info!("Connecting client...");
|
||||
if let Err(e) = client.connect().await {
|
||||
log::error!("Failed to connect to WebSocket server: {}", e);
|
||||
log::error!("Please check server logs if it failed to start correctly.");
|
||||
return Err(e.into());
|
||||
}
|
||||
log::info!("Client connected successfully.");
|
||||
|
||||
// This Rhai script is designed to run for much longer than the typical server timeout.
|
||||
let long_running_script = "
|
||||
log(\"Rhai: Starting long-running script...\");
|
||||
let mut x = 0;
|
||||
for i in 0..9999999999 { // Extremely large loop
|
||||
x = x + i;
|
||||
if i % 100000000 == 0 {
|
||||
// log(\"Rhai: Loop iteration \" + i);
|
||||
}
|
||||
}
|
||||
// This part should not be reached if timeout works correctly.
|
||||
log(\"Rhai: Long-running script finished calculation (x = \" + x + \").\");
|
||||
print(x);
|
||||
x
|
||||
".to_string();
|
||||
|
||||
log::info!("Sending long-running script (expected to time out on server after ~{}s)...", SCRIPT_TIMEOUT_SECONDS);
|
||||
|
||||
match client.play(long_running_script).await {
|
||||
Ok(play_result) => {
|
||||
log::warn!("Received unexpected success from play request: {:?}", play_result);
|
||||
log::warn!("This might indicate the script finished faster than expected, or the timeout didn't trigger.");
|
||||
}
|
||||
Err(e) => {
|
||||
log::info!("Received expected error from play request: {}", e);
|
||||
log::info!("This demonstrates the server timing out the script execution.");
|
||||
// You can further inspect the error details if CircleWsClientError provides them.
|
||||
// For example, if e.to_string() contains 'code: -32002' or 'timed out'.
|
||||
if e.to_string().contains("timed out") || e.to_string().contains("-32002") {
|
||||
log::info!("Successfully received timeout error from the server!");
|
||||
} else {
|
||||
log::warn!("Received an error, but it might not be the expected timeout error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("Disconnecting client...");
|
||||
client.disconnect().await;
|
||||
log::info!("Client disconnected.");
|
||||
log::info!("Timeout demonstration example finished.");
|
||||
|
||||
Ok(())
|
||||
}
|
62
server_ws/openrpc.json
Normal file
62
server_ws/openrpc.json
Normal file
@ -0,0 +1,62 @@
|
||||
{
|
||||
"openrpc": "1.2.6",
|
||||
"info": {
|
||||
"title": "Circle WebSocket Server API",
|
||||
"version": "0.1.0",
|
||||
"description": "API for interacting with a Circle's WebSocket server, primarily for Rhai script execution."
|
||||
},
|
||||
"methods": [
|
||||
{
|
||||
"name": "play",
|
||||
"summary": "Executes a Rhai script on the server.",
|
||||
"params": [
|
||||
{
|
||||
"name": "script",
|
||||
"description": "The Rhai script to execute.",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
],
|
||||
"result": {
|
||||
"name": "playResult",
|
||||
"description": "The output from the executed Rhai script.",
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/PlayResult"
|
||||
}
|
||||
},
|
||||
"examples": [
|
||||
{
|
||||
"name": "Simple Script Execution",
|
||||
"params": [
|
||||
{
|
||||
"name": "script",
|
||||
"value": "let x = 10; x * 2"
|
||||
}
|
||||
],
|
||||
"result": {
|
||||
"name": "playResult",
|
||||
"value": {
|
||||
"output": "20"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"components": {
|
||||
"schemas": {
|
||||
"PlayResult": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"output": {
|
||||
"type": "string",
|
||||
"description": "The string representation of the Rhai script's evaluation result."
|
||||
}
|
||||
},
|
||||
"required": ["output"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
260
server_ws/src/main.rs
Normal file
260
server_ws/src/main.rs
Normal file
@ -0,0 +1,260 @@
|
||||
use actix_web::{web, App, HttpRequest, HttpServer, HttpResponse, Error};
|
||||
use actix_web_actors::ws;
|
||||
use actix::{Actor, ActorContext, StreamHandler, AsyncContext, WrapFuture, ActorFutureExt};
|
||||
// HashMap no longer needed
|
||||
use clap::Parser;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::time::Duration;
|
||||
// AsyncCommands no longer directly used here
|
||||
use rhai_client::RhaiClientError; // Import RhaiClientError for matching
|
||||
// Uuid is not directly used here anymore for task_id generation, RhaiClient handles it.
|
||||
// Utc no longer directly used here
|
||||
// RhaiClientError is not directly handled here, errors from RhaiClient are strings or RhaiClient's own error type.
|
||||
use rhai_client::RhaiClient; // ClientRhaiTaskDetails is used via rhai_client::RhaiTaskDetails
|
||||
|
||||
const REDIS_URL: &str = "redis://127.0.0.1/"; // Make this configurable if needed
|
||||
|
||||
// JSON-RPC 2.0 Structures
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)] // Added Clone
|
||||
struct JsonRpcRequest {
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
params: Value,
|
||||
id: Option<Value>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)] // Added Clone
|
||||
struct JsonRpcResponse {
|
||||
jsonrpc: String,
|
||||
result: Option<Value>,
|
||||
error: Option<JsonRpcError>,
|
||||
id: Value,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)] // Added Clone
|
||||
struct JsonRpcError {
|
||||
code: i32,
|
||||
message: String,
|
||||
data: Option<Value>,
|
||||
}
|
||||
|
||||
// Specific params and result for "play" method
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)] // Added Clone
|
||||
struct PlayParams {
|
||||
script: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)] // Added Clone
|
||||
struct PlayResult {
|
||||
output: String,
|
||||
}
|
||||
|
||||
// Local RhaiTaskDetails struct is removed, will use ClientRhaiTaskDetails from rhai_client crate.
|
||||
// Ensure field names used in polling logic (e.g. error_message) are updated if they differ.
|
||||
// rhai_client::RhaiTaskDetails uses 'error' and 'client_rpc_id'.
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author, version, about, long_about = None)]
|
||||
struct Args {
|
||||
#[clap(short, long, value_parser, default_value_t = 8080)]
|
||||
port: u16,
|
||||
|
||||
#[clap(short, long, value_parser)]
|
||||
circle_name: String,
|
||||
}
|
||||
|
||||
// WebSocket Actor
|
||||
struct CircleWs {
|
||||
server_circle_name: String,
|
||||
// redis_client field removed as RhaiClient handles its own connection
|
||||
}
|
||||
|
||||
const TASK_TIMEOUT_DURATION: Duration = Duration::from_secs(30); // 30 seconds timeout
|
||||
const TASK_POLL_INTERVAL_DURATION: Duration = Duration::from_millis(200); // 200 ms poll interval
|
||||
|
||||
impl CircleWs {
|
||||
fn new(name: String) -> Self {
|
||||
Self {
|
||||
server_circle_name: name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for CircleWs {
|
||||
type Context = ws::WebsocketContext<Self>;
|
||||
|
||||
fn started(&mut self, _ctx: &mut Self::Context) {
|
||||
log::info!("WebSocket session started for server dedicated to: {}", self.server_circle_name);
|
||||
}
|
||||
|
||||
fn stopping(&mut self, _ctx: &mut Self::Context) -> actix::Running {
|
||||
log::info!("WebSocket session stopping for server dedicated to: {}", self.server_circle_name);
|
||||
actix::Running::Stop
|
||||
}
|
||||
}
|
||||
|
||||
// WebSocket message handler
|
||||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for CircleWs {
|
||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||
match msg {
|
||||
Ok(ws::Message::Text(text)) => {
|
||||
log::info!("WS Text for {}: {}", self.server_circle_name, text);
|
||||
match serde_json::from_str::<JsonRpcRequest>(&text) {
|
||||
Ok(req) => {
|
||||
let client_rpc_id = req.id.clone().unwrap_or(Value::Null);
|
||||
if req.method == "play" {
|
||||
match serde_json::from_value::<PlayParams>(req.params.clone()) {
|
||||
Ok(play_params) => {
|
||||
// Use RhaiClient to submit the script
|
||||
let script_content = play_params.script;
|
||||
let current_circle_name = self.server_circle_name.clone();
|
||||
let rpc_id_for_client = client_rpc_id.clone(); // client_rpc_id is already Value
|
||||
|
||||
let fut = async move {
|
||||
match RhaiClient::new(REDIS_URL) {
|
||||
Ok(rhai_task_client) => {
|
||||
rhai_task_client.submit_script_and_await_result(
|
||||
¤t_circle_name,
|
||||
script_content,
|
||||
Some(rpc_id_for_client.clone()),
|
||||
TASK_TIMEOUT_DURATION,
|
||||
TASK_POLL_INTERVAL_DURATION,
|
||||
).await // This returns Result<rhai_client::RhaiTaskDetails, RhaiClientError>
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to create RhaiClient: {}", e);
|
||||
Err(e) // Convert the error from RhaiClient::new into the type expected by the map function's error path.
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
ctx.spawn(fut.into_actor(self).map(move |result, _act, ws_ctx| {
|
||||
let response = match result {
|
||||
Ok(task_details) => { // ClientRhaiTaskDetails
|
||||
if task_details.status == "completed" {
|
||||
log::info!("Task completed successfully. Client RPC ID: {:?}, Output: {:?}", task_details.client_rpc_id, task_details.output);
|
||||
JsonRpcResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
result: Some(serde_json::to_value(PlayResult {
|
||||
output: task_details.output.unwrap_or_default()
|
||||
}).unwrap()),
|
||||
error: None,
|
||||
id: client_rpc_id, // Use the original client_rpc_id from the request
|
||||
}
|
||||
} else { // status == "error"
|
||||
log::warn!("Task execution failed. Client RPC ID: {:?}, Error: {:?}", task_details.client_rpc_id, task_details.error);
|
||||
JsonRpcResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
result: None,
|
||||
error: Some(JsonRpcError {
|
||||
code: -32004, // Script execution error
|
||||
message: task_details.error.unwrap_or_else(|| "Script execution failed with no specific error message".to_string()),
|
||||
data: None,
|
||||
}),
|
||||
id: client_rpc_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(rhai_err) => { // RhaiClientError
|
||||
log::error!("RhaiClient operation failed: {}", rhai_err);
|
||||
let (code, message) = match rhai_err {
|
||||
RhaiClientError::Timeout(task_id) => (-32002, format!("Timeout waiting for task {} to complete", task_id)),
|
||||
RhaiClientError::RedisError(e) => (-32003, format!("Redis communication error: {}", e)),
|
||||
RhaiClientError::SerializationError(e) => (-32003, format!("Serialization error: {}", e)),
|
||||
RhaiClientError::TaskNotFound(task_id) => (-32005, format!("Task {} not found after submission", task_id)),
|
||||
};
|
||||
JsonRpcResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
result: None,
|
||||
id: client_rpc_id,
|
||||
error: Some(JsonRpcError { code, message, data: None }),
|
||||
}
|
||||
}
|
||||
};
|
||||
ws_ctx.text(serde_json::to_string(&response).unwrap());
|
||||
}));
|
||||
}
|
||||
Err(e) => { // Invalid params for 'play'
|
||||
log::error!("Invalid params for 'play' method: {}", e);
|
||||
let err_resp = JsonRpcResponse {
|
||||
jsonrpc: "2.0".to_string(), result: None, id: client_rpc_id,
|
||||
error: Some(JsonRpcError { code: -32602, message: "Invalid params".to_string(), data: Some(Value::String(e.to_string())) }),
|
||||
};
|
||||
ctx.text(serde_json::to_string(&err_resp).unwrap());
|
||||
}
|
||||
}
|
||||
} else { // Method not found
|
||||
log::warn!("Method not found: {}", req.method);
|
||||
let err_resp = JsonRpcResponse {
|
||||
jsonrpc: "2.0".to_string(), result: None, id: client_rpc_id,
|
||||
error: Some(JsonRpcError { code: -32601, message: "Method not found".to_string(), data: None }),
|
||||
};
|
||||
ctx.text(serde_json::to_string(&err_resp).unwrap());
|
||||
}
|
||||
}
|
||||
Err(e) => { // Parse error
|
||||
log::error!("Failed to parse JSON-RPC request: {}", e);
|
||||
let err_resp = JsonRpcResponse {
|
||||
jsonrpc: "2.0".to_string(), result: None, id: Value::Null, // No ID if request couldn't be parsed
|
||||
error: Some(JsonRpcError { code: -32700, message: "Parse error".to_string(), data: Some(Value::String(e.to_string())) }),
|
||||
};
|
||||
ctx.text(serde_json::to_string(&err_resp).unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
|
||||
Ok(ws::Message::Pong(_)) => {},
|
||||
Ok(ws::Message::Binary(_bin)) => log::warn!("Binary messages not supported."),
|
||||
Ok(ws::Message::Close(reason)) => {
|
||||
ctx.close(reason);
|
||||
ctx.stop();
|
||||
}
|
||||
Ok(ws::Message::Continuation(_)) => ctx.stop(),
|
||||
Ok(ws::Message::Nop) => (),
|
||||
Err(e) => {
|
||||
log::error!("WS Error: {:?}", e);
|
||||
ctx.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WebSocket handshake and actor start
|
||||
async fn ws_handler(
|
||||
req: HttpRequest,
|
||||
stream: web::Payload,
|
||||
server_name: web::Data<String>,
|
||||
// redis_client: web::Data<redis::Client>, // No longer passed to CircleWs actor directly
|
||||
) -> Result<HttpResponse, Error> {
|
||||
log::info!("WebSocket handshake attempt for server: {}", server_name.get_ref());
|
||||
let resp = ws::start(
|
||||
CircleWs::new(server_name.get_ref().clone()), // Pass only the server name
|
||||
&req,
|
||||
stream
|
||||
)?;
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
#[actix_web::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
std::env::set_var("RUST_LOG", "info,circle_server_ws=debug");
|
||||
env_logger::init();
|
||||
|
||||
log::info!(
|
||||
"Starting WebSocket server for Circle: '{}' on port {}...",
|
||||
args.circle_name, args.port
|
||||
);
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.app_data(web::Data::new(args.circle_name.clone()))
|
||||
.route("/ws", web::get().to(ws_handler))
|
||||
.default_service(web::route().to(|| async { HttpResponse::NotFound().body("404 Not Found - This is a WebSocket-only server for a specific circle.") }))
|
||||
})
|
||||
.bind(("127.0.0.1", args.port))?
|
||||
.run()
|
||||
.await
|
||||
}
|
135
server_ws/tests/timeout_integration_test.rs
Normal file
135
server_ws/tests/timeout_integration_test.rs
Normal file
@ -0,0 +1,135 @@
|
||||
use tokio::time::Duration; // Removed unused sleep
|
||||
use futures_util::{sink::SinkExt, stream::StreamExt};
|
||||
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
|
||||
use serde_json::Value; // Removed unused json macro import
|
||||
use std::process::Command;
|
||||
use std::thread;
|
||||
use std::sync::Once;
|
||||
|
||||
// Define a simple JSON-RPC request structure for sending scripts
|
||||
#[derive(serde::Serialize, Debug)]
|
||||
struct JsonRpcRequest {
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
params: ScriptParams,
|
||||
id: u64,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, Debug)]
|
||||
struct ScriptParams {
|
||||
script: String,
|
||||
}
|
||||
|
||||
// Define a simple JSON-RPC error response structure for assertion
|
||||
#[derive(serde::Deserialize, Debug)]
|
||||
struct JsonRpcErrorResponse {
|
||||
_jsonrpc: String, // Field is present in response, but not used in assert
|
||||
error: JsonRpcErrorDetails,
|
||||
_id: Option<Value>, // Field is present in response, but not used in assert
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize, Debug)]
|
||||
struct JsonRpcErrorDetails {
|
||||
code: i32,
|
||||
message: String,
|
||||
}
|
||||
|
||||
const SERVER_ADDRESS: &str = "ws://127.0.0.1:8088/ws"; // Match port in main.rs or make configurable
|
||||
const TEST_CIRCLE_NAME: &str = "test_timeout_circle";
|
||||
const SERVER_STARTUP_TIME: Duration = Duration::from_secs(5); // Time to wait for server to start
|
||||
const RHAI_TIMEOUT_SECONDS: u64 = 30; // Should match TASK_TIMEOUT_DURATION in circle_server_ws
|
||||
|
||||
static START_SERVER: Once = Once::new();
|
||||
|
||||
fn ensure_server_is_running() {
|
||||
START_SERVER.call_once(|| {
|
||||
println!("Attempting to start circle_server_ws for integration tests...");
|
||||
// The server executable will be in target/debug relative to the crate root
|
||||
let server_executable = "target/debug/circle_server_ws";
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut child = Command::new(server_executable)
|
||||
.arg("--port=8088") // Use a specific port for testing
|
||||
.arg(format!("--circle-name={}", TEST_CIRCLE_NAME))
|
||||
.spawn()
|
||||
.expect("Failed to start circle_server_ws. Make sure it's compiled (cargo build).");
|
||||
|
||||
let status = child.wait().expect("Failed to wait on server process.");
|
||||
println!("Server process exited with status: {}", status);
|
||||
});
|
||||
println!("Server start command issued. Waiting for {}s...", SERVER_STARTUP_TIME.as_secs());
|
||||
thread::sleep(SERVER_STARTUP_TIME);
|
||||
println!("Presumed server started.");
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rhai_script_timeout() {
|
||||
ensure_server_is_running();
|
||||
|
||||
println!("Connecting to WebSocket server: {}", SERVER_ADDRESS);
|
||||
let (mut ws_stream, _response) = connect_async(SERVER_ADDRESS)
|
||||
.await
|
||||
.expect("Failed to connect to WebSocket server");
|
||||
println!("Connected to WebSocket server.");
|
||||
|
||||
// Rhai script designed to run longer than RHAI_TIMEOUT_SECONDS
|
||||
// A large loop should cause a timeout.
|
||||
let long_running_script = format!("
|
||||
let mut x = 0;
|
||||
for i in 0..999999999 {{
|
||||
x = x + i;
|
||||
if i % 10000000 == 0 {{
|
||||
// debug(\"Looping: \" + i); // Optional: for server-side logging if enabled
|
||||
}}
|
||||
}}
|
||||
print(x); // This line will likely not be reached due to timeout
|
||||
");
|
||||
|
||||
let request = JsonRpcRequest {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
method: "execute_script".to_string(),
|
||||
params: ScriptParams { script: long_running_script },
|
||||
id: 1,
|
||||
};
|
||||
|
||||
let request_json = serde_json::to_string(&request).expect("Failed to serialize request");
|
||||
println!("Sending long-running script request: {}", request_json);
|
||||
ws_stream.send(Message::Text(request_json)).await.expect("Failed to send message");
|
||||
|
||||
println!("Waiting for response (expecting timeout after ~{}s)..", RHAI_TIMEOUT_SECONDS);
|
||||
|
||||
// Wait for a response, expecting a timeout error
|
||||
// The server's timeout is RHAI_TIMEOUT_SECONDS, client should wait a bit longer.
|
||||
match tokio::time::timeout(Duration::from_secs(RHAI_TIMEOUT_SECONDS + 15), ws_stream.next()).await {
|
||||
Ok(Some(Ok(Message::Text(text)))) => {
|
||||
println!("Received response: {}", text);
|
||||
let response: Result<JsonRpcErrorResponse, _> = serde_json::from_str(&text);
|
||||
match response {
|
||||
Ok(err_resp) => {
|
||||
assert_eq!(err_resp.error.code, -32002, "Error code should indicate timeout.");
|
||||
assert!(err_resp.error.message.contains("timed out"), "Error message should indicate timeout.");
|
||||
println!("Timeout test passed! Received correct timeout error.");
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Failed to deserialize error response: {}. Raw: {}", e, text);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Some(Ok(other_msg))) => {
|
||||
panic!("Received unexpected message type: {:?}", other_msg);
|
||||
}
|
||||
Ok(Some(Err(e))) => {
|
||||
panic!("WebSocket error: {}", e);
|
||||
}
|
||||
Ok(None) => {
|
||||
panic!("WebSocket stream closed unexpectedly.");
|
||||
}
|
||||
Err(_) => {
|
||||
panic!("Test timed out waiting for server response. Server might not have sent timeout error or took too long.");
|
||||
}
|
||||
}
|
||||
|
||||
ws_stream.close(None).await.ok();
|
||||
println!("Test finished, WebSocket closed.");
|
||||
}
|
Loading…
Reference in New Issue
Block a user