WIP1: implementing JSON-RPC calls over Unix Sockets
This commit is contained in:
34
src/main.rs
34
src/main.rs
@@ -40,6 +40,14 @@ struct Args {
|
||||
#[arg(long, default_value = "8080")]
|
||||
rpc_port: u16,
|
||||
|
||||
/// Enable RPC over Unix Domain Socket (IPC)
|
||||
#[arg(long)]
|
||||
enable_rpc_ipc: bool,
|
||||
|
||||
/// RPC IPC socket path (Unix Domain Socket)
|
||||
#[arg(long, default_value = "/tmp/herodb.ipc")]
|
||||
rpc_ipc_path: String,
|
||||
|
||||
/// Use the sled backend
|
||||
#[arg(long)]
|
||||
sled: bool,
|
||||
@@ -105,7 +113,7 @@ async fn main() {
|
||||
let rpc_addr = format!("127.0.0.1:{}", args.rpc_port).parse().unwrap();
|
||||
let base_dir = args.dir.clone();
|
||||
|
||||
match rpc_server::start_rpc_server(rpc_addr, base_dir, backend, args.admin_secret.clone()).await {
|
||||
match rpc_server::start_rpc_server(rpc_addr, base_dir, backend.clone(), args.admin_secret.clone()).await {
|
||||
Ok(handle) => {
|
||||
println!("RPC management server started on port {}", args.rpc_port);
|
||||
Some(handle)
|
||||
@@ -119,6 +127,30 @@ async fn main() {
|
||||
None
|
||||
};
|
||||
|
||||
// Start IPC (Unix socket) RPC server if enabled
|
||||
let _rpc_ipc_handle = if args.enable_rpc_ipc {
|
||||
let base_dir = args.dir.clone();
|
||||
let ipc_path = args.rpc_ipc_path.clone();
|
||||
|
||||
// Remove stale socket if present
|
||||
if std::path::Path::new(&ipc_path).exists() {
|
||||
let _ = std::fs::remove_file(&ipc_path);
|
||||
}
|
||||
|
||||
match rpc_server::start_rpc_ipc_server(ipc_path.clone(), base_dir, backend.clone(), args.admin_secret.clone()).await {
|
||||
Ok(handle) => {
|
||||
println!("RPC IPC server started at {}", ipc_path);
|
||||
Some(handle)
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to start RPC IPC server: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// accept new connections
|
||||
loop {
|
||||
let stream = listener.accept().await;
|
||||
|
||||
@@ -71,7 +71,7 @@ pub fn hash_key(key: &str) -> String {
|
||||
}
|
||||
|
||||
/// RPC trait for HeroDB management
|
||||
#[rpc(server, client, namespace = "hero")]
|
||||
#[rpc(server, client, namespace = "herodb")]
|
||||
pub trait Rpc {
|
||||
/// Create a new database with specified configuration
|
||||
#[method(name = "createDatabase")]
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use jsonrpsee::server::{ServerBuilder, ServerHandle};
|
||||
use jsonrpsee::RpcModule;
|
||||
use reth_ipc::server::Builder as IpcServerBuilder;
|
||||
|
||||
use crate::rpc::{RpcServer, RpcServerImpl};
|
||||
|
||||
@@ -27,24 +28,25 @@ pub async fn start_rpc_server(addr: SocketAddr, base_dir: PathBuf, backend: crat
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
/// Start the JSON-RPC IPC server on the specified Unix socket endpoint
|
||||
pub async fn start_rpc_ipc_server(
|
||||
endpoint: String,
|
||||
base_dir: PathBuf,
|
||||
backend: crate::options::BackendType,
|
||||
admin_secret: String,
|
||||
) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Create the RPC server implementation
|
||||
let rpc_impl = RpcServerImpl::new(base_dir, backend, admin_secret);
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rpc_server_startup() {
|
||||
let addr = "127.0.0.1:0".parse().unwrap(); // Use port 0 for auto-assignment
|
||||
let base_dir = PathBuf::from("/tmp/test_rpc");
|
||||
let backend = crate::options::BackendType::Redb; // Default for test
|
||||
// Create the RPC module
|
||||
let mut module = RpcModule::new(());
|
||||
module.merge(RpcServer::into_rpc(rpc_impl))?;
|
||||
|
||||
let handle = start_rpc_server(addr, base_dir, backend, "test-admin".to_string()).await.unwrap();
|
||||
// Build the IPC server and start it
|
||||
let server = IpcServerBuilder::default().build(endpoint.clone());
|
||||
let handle = server.start(module).await?;
|
||||
|
||||
// Give the server a moment to start
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
println!("RPC IPC server started on {}", endpoint);
|
||||
|
||||
// Stop the server
|
||||
handle.stop().unwrap();
|
||||
handle.stopped().await;
|
||||
}
|
||||
Ok(handle)
|
||||
}
|
||||
Reference in New Issue
Block a user