use crate::Server; use actix::prelude::*; use actix_web_actors::ws; use hero_supervisor::ScriptType; use serde_json::{json, Value}; use std::time::Duration; const TASK_TIMEOUT_DURATION: Duration = Duration::from_secs(30); #[derive(serde::Serialize)] struct SuccessResult { success: bool, } #[derive(serde::Serialize)] struct JobResult { job_id: String, } #[derive(serde::Serialize)] struct JsonRpcResponse { jsonrpc: String, result: Option, error: Option, id: Value, } #[derive(serde::Serialize)] struct JsonRpcError { code: i32, message: String, data: Option, } impl Server { pub fn handle_create_job( &mut self, params: Value, client_rpc_id: Value, ctx: &mut ws::WebsocketContext, ) { // For now, create_job is the same as run_job self.handle_run_job(params, client_rpc_id, ctx); } pub fn handle_start_job( &mut self, params: Value, client_rpc_id: Value, ctx: &mut ws::WebsocketContext, ) { if self.enable_auth && !self.is_connection_authenticated() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Authentication required".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } let job_id = match params.get("job_id").and_then(|v| v.as_str()) { Some(id) => id.to_string(), None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32602, message: "Missing required parameter: job_id".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let supervisor = match self.supervisor.clone() { Some(d) => d, None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32603, message: "Internal error: supervisor not available".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let client_rpc_id_clone = client_rpc_id.clone(); let fut = async move { supervisor.start_job(&job_id).await }; ctx.spawn( fut.into_actor(self) .map(move |res, _act, ctx_inner| match res { Ok(_) => { let result = SuccessResult { success: true }; let resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: Some(serde_json::to_value(result).unwrap()), error: None, id: client_rpc_id_clone.clone(), }; ctx_inner.text(serde_json::to_string(&resp).unwrap()); } Err(e) => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: format!("Failed to start job: {}", e), data: None, }), id: client_rpc_id_clone, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }) .timeout(TASK_TIMEOUT_DURATION) .map(move |res, _act, ctx_inner| { if res.is_err() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Request timed out".to_string(), data: None, }), id: client_rpc_id, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }), ); } pub fn handle_get_job_status( &mut self, params: Value, client_rpc_id: Value, ctx: &mut ws::WebsocketContext, ) { if self.enable_auth && !self.is_connection_authenticated() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Authentication required".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } let job_id = match params.get("job_id").and_then(|v| v.as_str()) { Some(id) => id.to_string(), None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32602, message: "Missing required parameter: job_id".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let supervisor = match self.supervisor.clone() { Some(d) => d, None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32603, message: "Internal error: supervisor not available".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let client_rpc_id_clone = client_rpc_id.clone(); let fut = async move { supervisor.get_job_status(&job_id).await }; ctx.spawn( fut.into_actor(self) .map(move |res, _act, ctx_inner| match res { Ok(status) => { let resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: Some(json!(status)), error: None, id: client_rpc_id_clone.clone(), }; ctx_inner.text(serde_json::to_string(&resp).unwrap()); } Err(e) => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: format!("Failed to get job status: {}", e), data: None, }), id: client_rpc_id_clone, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }) .timeout(TASK_TIMEOUT_DURATION) .map(move |res, _act, ctx_inner| { if res.is_err() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Request timed out".to_string(), data: None, }), id: client_rpc_id, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }), ); } pub fn handle_list_jobs( &mut self, _params: Value, client_rpc_id: Value, ctx: &mut ws::WebsocketContext, ) { if self.enable_auth && !self.is_connection_authenticated() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Authentication required".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } let supervisor = match self.supervisor.clone() { Some(d) => d, None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32603, message: "Internal error: supervisor not available".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let client_rpc_id_clone = client_rpc_id.clone(); let fut = async move { supervisor.list_jobs().await }; ctx.spawn( fut.into_actor(self) .map(move |res, _act, ctx_inner| match res { Ok(jobs) => { let resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: Some(json!(jobs)), error: None, id: client_rpc_id_clone.clone(), }; ctx_inner.text(serde_json::to_string(&resp).unwrap()); } Err(e) => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: format!("Failed to list jobs: {}", e), data: None, }), id: client_rpc_id_clone, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }) .timeout(TASK_TIMEOUT_DURATION) .map(move |res, _act, ctx_inner| { if res.is_err() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Request timed out".to_string(), data: None, }), id: client_rpc_id, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }), ); } pub fn handle_run_job( &mut self, params: Value, client_rpc_id: Value, ctx: &mut ws::WebsocketContext, ) { if self.enable_auth && !self.is_connection_authenticated() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Authentication required".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } let circle_pk = match params.get("circle_pk").and_then(|v| v.as_str()) { Some(pk) => pk.to_string(), None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32602, message: "Missing required parameter: circle_pk".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let script_content = match params.get("script_content").and_then(|v| v.as_str()) { Some(script) => script.to_string(), None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32602, message: "Missing required parameter: script_content".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let supervisor = match self.supervisor.clone() { Some(d) => d, None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32603, message: "Internal error: supervisor not available".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let client_rpc_id_clone = client_rpc_id.clone(); let fut = async move { supervisor .new_job() .context_id(&circle_pk) .script_type(ScriptType::SAL) .script(&script_content) .timeout(TASK_TIMEOUT_DURATION) .await_response() .await }; ctx.spawn( fut.into_actor(self) .map(move |res, _act, ctx_inner| match res { Ok(job_id) => { let result = JobResult { job_id }; let resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: Some(serde_json::to_value(result).unwrap()), error: None, id: client_rpc_id_clone.clone(), }; ctx_inner.text(serde_json::to_string(&resp).unwrap()); } Err(e) => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: format!("Failed to run job: {}", e), data: None, }), id: client_rpc_id_clone, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }) .timeout(TASK_TIMEOUT_DURATION) .map(move |res, _act, ctx_inner| { if res.is_err() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Request timed out".to_string(), data: None, }), id: client_rpc_id, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }), ); } pub fn handle_get_job_output( &mut self, params: Value, client_rpc_id: Value, ctx: &mut ws::WebsocketContext, ) { if self.enable_auth && !self.is_connection_authenticated() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Authentication required".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } let job_id = match params.get("job_id").and_then(|v| v.as_str()) { Some(id) => id.to_string(), None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32602, message: "Missing required parameter: job_id".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let supervisor = match self.supervisor.clone() { Some(d) => d, None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32603, message: "Internal error: supervisor not available".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let client_rpc_id_clone = client_rpc_id.clone(); let fut = async move { supervisor.get_job_output(&job_id).await }; ctx.spawn( fut.into_actor(self) .map(move |res, _act, ctx_inner| match res { Ok(output) => { let resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: Some(json!(output)), error: None, id: client_rpc_id_clone.clone(), }; ctx_inner.text(serde_json::to_string(&resp).unwrap()); } Err(e) => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: format!("Failed to get job output: {}", e), data: None, }), id: client_rpc_id_clone, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }) .timeout(TASK_TIMEOUT_DURATION) .map(move |res, _act, ctx_inner| { if res.is_err() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Request timed out".to_string(), data: None, }), id: client_rpc_id, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }), ); } pub fn handle_get_job_logs( &mut self, params: Value, client_rpc_id: Value, ctx: &mut ws::WebsocketContext, ) { if self.enable_auth && !self.is_connection_authenticated() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Authentication required".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } let job_id = match params.get("job_id").and_then(|v| v.as_str()) { Some(id) => id.to_string(), None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32602, message: "Missing required parameter: job_id".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let supervisor = match self.supervisor.clone() { Some(d) => d, None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32603, message: "Internal error: supervisor not available".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let client_rpc_id_clone = client_rpc_id.clone(); let fut = async move { supervisor.get_job_logs(&job_id).await }; ctx.spawn( fut.into_actor(self) .map(move |res, _act, ctx_inner| match res { Ok(logs) => { let result = json!({ "logs": logs }); let resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: Some(result), error: None, id: client_rpc_id_clone.clone(), }; ctx_inner.text(serde_json::to_string(&resp).unwrap()); } Err(e) => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: format!("Failed to get job logs: {}", e), data: None, }), id: client_rpc_id_clone, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }) .timeout(TASK_TIMEOUT_DURATION) .map(move |res, _act, ctx_inner| { if res.is_err() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Request timed out".to_string(), data: None, }), id: client_rpc_id, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }), ); } pub fn handle_stop_job( &mut self, params: Value, client_rpc_id: Value, ctx: &mut ws::WebsocketContext, ) { if self.enable_auth && !self.is_connection_authenticated() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Authentication required".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } let job_id = match params.get("job_id").and_then(|v| v.as_str()) { Some(id) => id.to_string(), None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32602, message: "Missing required parameter: job_id".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let supervisor = match self.supervisor.clone() { Some(d) => d, None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32603, message: "Internal error: supervisor not available".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let client_rpc_id_clone = client_rpc_id.clone(); let fut = async move { supervisor.stop_job(&job_id).await }; ctx.spawn( fut.into_actor(self) .map(move |res, _act, ctx_inner| match res { Ok(_) => { let resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: Some(json!(null)), error: None, id: client_rpc_id_clone.clone(), }; ctx_inner.text(serde_json::to_string(&resp).unwrap()); } Err(e) => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: format!("Failed to stop job: {}", e), data: None, }), id: client_rpc_id_clone, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }) .timeout(TASK_TIMEOUT_DURATION) .map(move |res, _act, ctx_inner| { if res.is_err() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Request timed out".to_string(), data: None, }), id: client_rpc_id, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }), ); } pub fn handle_delete_job( &mut self, params: Value, client_rpc_id: Value, ctx: &mut ws::WebsocketContext, ) { if self.enable_auth && !self.is_connection_authenticated() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Authentication required".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } let job_id = match params.get("job_id").and_then(|v| v.as_str()) { Some(id) => id.to_string(), None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32602, message: "Missing required parameter: job_id".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let supervisor = match self.supervisor.clone() { Some(d) => d, None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32603, message: "Internal error: supervisor not available".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let client_rpc_id_clone = client_rpc_id.clone(); let fut = async move { supervisor.delete_job(&job_id).await }; ctx.spawn( fut.into_actor(self) .map(move |res, _act, ctx_inner| match res { Ok(_) => { let resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: Some(json!(null)), error: None, id: client_rpc_id_clone.clone(), }; ctx_inner.text(serde_json::to_string(&resp).unwrap()); } Err(e) => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: format!("Failed to delete job: {}", e), data: None, }), id: client_rpc_id_clone, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }) .timeout(TASK_TIMEOUT_DURATION) .map(move |res, _act, ctx_inner| { if res.is_err() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Request timed out".to_string(), data: None, }), id: client_rpc_id, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }), ); } pub fn handle_clear_all_jobs( &mut self, _params: Value, client_rpc_id: Value, ctx: &mut ws::WebsocketContext, ) { if self.enable_auth && !self.is_connection_authenticated() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Authentication required".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } let supervisor = match self.supervisor.clone() { Some(d) => d, None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32603, message: "Internal error: supervisor not available".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); return; } }; let client_rpc_id_clone = client_rpc_id.clone(); let fut = async move { supervisor.clear_all_jobs().await }; ctx.spawn( fut.into_actor(self) .map(move |res, _act, ctx_inner| match res { Ok(_) => { let resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: Some(json!(null)), error: None, id: client_rpc_id_clone.clone(), }; ctx_inner.text(serde_json::to_string(&resp).unwrap()); } Err(e) => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: format!("Failed to clear jobs: {}", e), data: None, }), id: client_rpc_id_clone, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }) .timeout(TASK_TIMEOUT_DURATION) .map(move |res, _act, ctx_inner| { if res.is_err() { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32000, message: "Request timed out".to_string(), data: None, }), id: client_rpc_id, }; ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); } }), ); } }