Squashed 'components/zinit/' content from commit 1b76c06

git-subtree-dir: components/zinit
git-subtree-split: 1b76c062fe31d552d1b7b23484ce163995a81482
This commit is contained in:
2025-08-16 21:12:16 +02:00
commit 2fda71af11
48 changed files with 11203 additions and 0 deletions

26
zinit-client/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
[package]
name = "zinit-client"
version = "0.1.0"
edition = "2021"
description = "A client library for interacting with Zinit process manager"
license = "Apache 2.0"
authors = ["ThreeFold Tech, https://github.com/threefoldtech"]
[dependencies]
anyhow = "1.0"
async-trait = "0.1.88"
jsonrpsee = { version = "0.25.1", features = ["macros", "http-client", "ws-client"] }
reth-ipc = { git = "https://github.com/paradigmxyz/reth", package = "reth-ipc" }
tokio = { version = "1.14.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
log = "0.4"
[[example]]
name = "basic_usage"
path = "examples/basic_usage.rs"
[[example]]
name = "http_client"
path = "examples/http_client.rs"

123
zinit-client/README.md Normal file
View File

@@ -0,0 +1,123 @@
# Zinit Client Library
A simple Rust client library for interacting with the Zinit process manager.
## Features
- Connect to Zinit via Unix socket or HTTP
- Manage services (start, stop, restart, monitor)
- Query service status and information
- Create and delete service configurations
- System operations (shutdown, reboot)
## Installation
Add this to your `Cargo.toml`:
```toml
[dependencies]
zinit-client = "0.1.0"
```
## Usage
### Creating a Client
You can create a client using either Unix socket or HTTP transport:
```rust
use zinit_client::Client;
// Using Unix socket (local only)
let client = Client::unix_socket("/var/run/zinit.sock");
// Using HTTP (works for remote Zinit instances)
let client = Client::http("http://localhost:8080");
```
### Service Management
```rust
// List all services
let services = client.list().await?;
for (name, state) in services {
println!("{}: {}", name, state);
}
// Get status of a specific service
let status = client.status("my-service").await?;
println!("PID: {}, State: {}", status.pid, status.state);
// Start a service
client.start("my-service").await?;
// Stop a service
client.stop("my-service").await?;
// Restart a service
client.restart("my-service").await?;
// Monitor a service
client.monitor("my-service").await?;
// Forget a service
client.forget("my-service").await?;
// Send a signal to a service
client.kill("my-service", "SIGTERM").await?;
```
### Service Configuration
```rust
use serde_json::json;
// Create a new service
let config = json!({
"exec": "nginx",
"oneshot": false,
"after": ["network"]
}).as_object().unwrap().clone();
client.create_service("nginx", config).await?;
// Get service configuration
let config = client.get_service("nginx").await?;
println!("Config: {:?}", config);
// Delete a service
client.delete_service("nginx").await?;
```
### System Operations
```rust
// Shutdown the system
client.shutdown().await?;
// Reboot the system
client.reboot().await?;
```
## Error Handling
The library provides a `ClientError` enum for handling errors:
```rust
match client.status("non-existent-service").await {
Ok(status) => println!("Service status: {}", status.state),
Err(e) => match e {
ClientError::ServiceNotFound(_) => println!("Service not found"),
ClientError::ConnectionError(_) => println!("Failed to connect to Zinit"),
_ => println!("Other error: {}", e),
},
}
```
## Examples
See the [examples](examples) directory for complete usage examples.
## License
This project is licensed under the MIT License.

View File

@@ -0,0 +1,50 @@
use anyhow::Result;
use zinit_client::Client;
#[tokio::main]
async fn main() -> Result<()> {
// Create a client using Unix socket transport
let client = Client::unix_socket("/var/run/zinit.sock").await?;
// List all services
let services = client.list().await?;
println!("Services:");
for (name, state) in services {
println!("{}: {}", name, state);
}
// Get a specific service status
let service_name = "example-service";
match client.status(service_name).await {
Ok(status) => {
println!("\nService: {}", status.name);
println!("PID: {}", status.pid);
println!("State: {}", status.state);
println!("Target: {}", status.target);
println!("After:");
for (dep, state) in status.after {
println!(" {}: {}", dep, state);
}
}
Err(e) => eprintln!("Failed to get status: {}", e),
}
// Try to start a service
match client.start(service_name).await {
Ok(_) => println!("\nService started successfully"),
Err(e) => eprintln!("Failed to start service: {}", e),
}
// Get logs for the service
match client.logs(Some(service_name.to_string())).await {
Ok(logs) => {
println!("\nLogs:");
for log in logs {
println!("{}", log);
}
}
Err(e) => eprintln!("Failed to get logs: {}", e),
}
Ok(())
}

View File

@@ -0,0 +1,78 @@
use anyhow::Result;
use serde_json::json;
use zinit_client::Client;
#[tokio::main]
async fn main() -> Result<()> {
// Create a client using HTTP transport
let client = Client::http("http://localhost:8080").await?;
// Create a new service
let service_name = "example-http-service";
let service_config = json!({
"exec": "echo 'Hello from HTTP service'",
"oneshot": true,
"after": ["network"]
})
.as_object()
.unwrap()
.clone();
match client.create_service(service_name, service_config).await {
Ok(msg) => println!("Service created: {}", msg),
Err(e) => eprintln!("Failed to create service: {}", e),
}
// Start the HTTP/RPC server on a specific address
match client.start_http_server("0.0.0.0:8081").await {
Ok(msg) => println!("HTTP server status: {}", msg),
Err(e) => eprintln!("Failed to start HTTP server: {}", e),
}
// List all services
let services = client.list().await?;
println!("\nServices:");
for (name, state) in services {
println!("{}: {}", name, state);
}
// Monitor the service
match client.monitor(service_name).await {
Ok(_) => println!("\nService is now monitored"),
Err(e) => eprintln!("Failed to monitor service: {}", e),
}
// Start the service
match client.start(service_name).await {
Ok(_) => println!("Service started successfully"),
Err(e) => eprintln!("Failed to start service: {}", e),
}
// Get logs
let logs = client.logs(Some(service_name.to_string())).await?;
println!("\nLogs:");
for log in logs {
println!("{}", log);
}
// Clean up - forget the service
println!("\nCleaning up...");
match client.forget(service_name).await {
Ok(_) => println!("Service has been forgotten"),
Err(e) => eprintln!("Failed to forget service: {}", e),
}
// Clean up - delete the service configuration
match client.delete_service(service_name).await {
Ok(msg) => println!("{}", msg),
Err(e) => eprintln!("Failed to delete service: {}", e),
}
// Stop the HTTP/RPC server
match client.stop_http_server().await {
Ok(_) => println!("HTTP server stopped"),
Err(e) => eprintln!("Failed to stop HTTP server: {}", e),
}
Ok(())
}

450
zinit-client/src/lib.rs Normal file
View File

@@ -0,0 +1,450 @@
//! A client library for interacting with the Zinit process manager.
//!
//! This library provides a simple API for communicating with a Zinit daemon
//! via either Unix socket (using reth-ipc) or HTTP (using jsonrpsee).
use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::client::Error as RpcError;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::rpc_params;
use reth_ipc::client::IpcClientBuilder;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::collections::HashMap;
use thiserror::Error;
/// Error type for client operations
#[derive(Error, Debug)]
pub enum ClientError {
#[error("connection error: {0}")]
ConnectionError(String),
#[error("service not found: {0}")]
ServiceNotFound(String),
#[error("service is already up: {0}")]
ServiceIsUp(String),
#[error("system is shutting down")]
ShuttingDown,
#[error("service already exists: {0}")]
ServiceAlreadyExists(String),
#[error("service file error: {0}")]
ServiceFileError(String),
#[error("rpc error: {0}")]
RpcError(String),
#[error("unknown error: {0}")]
UnknownError(String),
}
impl From<RpcError> for ClientError {
fn from(err: RpcError) -> Self {
// Parse the error code if available
if let RpcError::Call(err) = &err {
match err.code() {
-32000 => return ClientError::ServiceNotFound(err.message().to_string()),
-32002 => return ClientError::ServiceIsUp(err.message().to_string()),
-32006 => return ClientError::ShuttingDown,
-32007 => return ClientError::ServiceAlreadyExists(err.message().to_string()),
-32008 => return ClientError::ServiceFileError(err.message().to_string()),
_ => {}
}
}
match err {
RpcError::Transport(_) => ClientError::ConnectionError(err.to_string()),
_ => ClientError::RpcError(err.to_string()),
}
}
}
/// Service status information
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Status {
pub name: String,
pub pid: u32,
pub state: String,
pub target: String,
pub after: HashMap<String, String>,
}
/// Child process stats information
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ChildStats {
pub pid: u32,
pub memory_usage: u64,
pub cpu_usage: f32,
}
/// Service stats information
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Stats {
pub name: String,
pub pid: u32,
pub memory_usage: u64,
pub cpu_usage: f32,
pub children: Vec<ChildStats>,
}
/// Client implementation for communicating with Zinit
pub enum Client {
Ipc(String), // Socket path
Http(HttpClient),
}
impl Client {
/// Create a new client using Unix socket transport
pub async fn unix_socket<P: AsRef<std::path::Path>>(path: P) -> Result<Self, ClientError> {
Ok(Client::Ipc(path.as_ref().to_string_lossy().to_string()))
}
/// Create a new client using HTTP transport
pub async fn http<S: AsRef<str>>(url: S) -> Result<Self, ClientError> {
let client = HttpClientBuilder::default()
.build(url.as_ref())
.map_err(|e| ClientError::ConnectionError(e.to_string()))?;
Ok(Client::Http(client))
}
// Helper to get IPC client
async fn get_ipc_client(&self) -> Result<impl ClientT, ClientError> {
match self {
Client::Ipc(path) => IpcClientBuilder::default()
.build(path)
.await
.map_err(|e| ClientError::ConnectionError(e.to_string())),
_ => Err(ClientError::UnknownError("Not an IPC client".to_string())),
}
}
// Service API Methods
/// List all monitored services and their current state
pub async fn list(&self) -> Result<HashMap<String, String>, ClientError> {
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("service_list", rpc_params![])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("service_list", rpc_params![])
.await
.map_err(Into::into),
}
}
/// Get the detailed status of a specific service
pub async fn status(&self, name: impl AsRef<str>) -> Result<Status, ClientError> {
let name = name.as_ref().to_string();
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("service_status", rpc_params![name])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("service_status", rpc_params![name])
.await
.map_err(Into::into),
}
}
/// Start a specific service
pub async fn start(&self, name: impl AsRef<str>) -> Result<(), ClientError> {
let name = name.as_ref().to_string();
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("service_start", rpc_params![name])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("service_start", rpc_params![name])
.await
.map_err(Into::into),
}
}
/// Stop a specific service
pub async fn stop(&self, name: impl AsRef<str>) -> Result<(), ClientError> {
let name = name.as_ref().to_string();
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("service_stop", rpc_params![name])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("service_stop", rpc_params![name])
.await
.map_err(Into::into),
}
}
/// Restart a service
pub async fn restart(&self, name: impl AsRef<str>) -> Result<(), ClientError> {
let name = name.as_ref().to_string();
// First stop the service
self.stop(&name).await?;
// Poll the service status until it's stopped
for _ in 0..20 {
let status = self.status(&name).await?;
if status.pid == 0 && status.target == "Down" {
return self.start(&name).await;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
// Process not stopped, try to kill it
self.kill(&name, "SIGKILL").await?;
self.start(&name).await
}
/// Load and monitor a new service from its configuration file
pub async fn monitor(&self, name: impl AsRef<str>) -> Result<(), ClientError> {
let name = name.as_ref().to_string();
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("service_monitor", rpc_params![name])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("service_monitor", rpc_params![name])
.await
.map_err(Into::into),
}
}
/// Stop monitoring a service and remove it from management
pub async fn forget(&self, name: impl AsRef<str>) -> Result<(), ClientError> {
let name = name.as_ref().to_string();
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("service_forget", rpc_params![name])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("service_forget", rpc_params![name])
.await
.map_err(Into::into),
}
}
/// Send a signal to a specific service process
pub async fn kill(
&self,
name: impl AsRef<str>,
signal: impl AsRef<str>,
) -> Result<(), ClientError> {
let name = name.as_ref().to_string();
let signal = signal.as_ref().to_string();
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("service_kill", rpc_params![name, signal])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("service_kill", rpc_params![name, signal])
.await
.map_err(Into::into),
}
}
/// Create a new service configuration
pub async fn create_service(
&self,
name: impl AsRef<str>,
content: Map<String, Value>,
) -> Result<String, ClientError> {
let name = name.as_ref().to_string();
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("service_create", rpc_params![name, content])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("service_create", rpc_params![name, content])
.await
.map_err(Into::into),
}
}
/// Delete a service configuration
pub async fn delete_service(&self, name: impl AsRef<str>) -> Result<String, ClientError> {
let name = name.as_ref().to_string();
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("service_delete", rpc_params![name])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("service_delete", rpc_params![name])
.await
.map_err(Into::into),
}
}
/// Get a service configuration
pub async fn get_service(&self, name: impl AsRef<str>) -> Result<Value, ClientError> {
let name = name.as_ref().to_string();
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("service_get", rpc_params![name])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("service_get", rpc_params![name])
.await
.map_err(Into::into),
}
}
/// Get memory and CPU usage statistics for a service
pub async fn stats(&self, name: impl AsRef<str>) -> Result<Stats, ClientError> {
let name = name.as_ref().to_string();
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("service_stats", rpc_params![name])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("service_stats", rpc_params![name])
.await
.map_err(Into::into),
}
}
// System API Methods
/// Initiate system shutdown
pub async fn shutdown(&self) -> Result<(), ClientError> {
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("system_shutdown", rpc_params![])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("system_shutdown", rpc_params![])
.await
.map_err(Into::into),
}
}
/// Initiate system reboot
pub async fn reboot(&self) -> Result<(), ClientError> {
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("system_reboot", rpc_params![])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("system_reboot", rpc_params![])
.await
.map_err(Into::into),
}
}
/// Start HTTP/RPC server
pub async fn start_http_server(&self, address: impl AsRef<str>) -> Result<String, ClientError> {
let address = address.as_ref().to_string();
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("system_start_http_server", rpc_params![address])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("system_start_http_server", rpc_params![address])
.await
.map_err(Into::into),
}
}
/// Stop HTTP/RPC server
pub async fn stop_http_server(&self) -> Result<(), ClientError> {
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("system_stop_http_server", rpc_params![])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("system_stop_http_server", rpc_params![])
.await
.map_err(Into::into),
}
}
// Logging API Methods
/// Get current logs
pub async fn logs(&self, filter: Option<String>) -> Result<Vec<String>, ClientError> {
match self {
Client::Ipc(_) => {
let client = self.get_ipc_client().await?;
client
.request("stream_currentLogs", rpc_params![filter])
.await
.map_err(Into::into)
}
Client::Http(client) => client
.request("stream_currentLogs", rpc_params![filter])
.await
.map_err(Into::into),
}
}
/// Subscribe to logs
///
/// Note: This method is not fully implemented yet. For now, it will return an error.
pub async fn log_subscribe(&self, _filter: Option<String>) -> Result<(), ClientError> {
Err(ClientError::UnknownError(
"Log subscription not implemented yet".to_string(),
))
}
}

View File

@@ -0,0 +1,66 @@
use std::env;
use zinit_client::{Client, ClientError};
#[tokio::test]
async fn test_connection_error() {
// Try to connect to a non-existent socket
let result = Client::unix_socket("/non/existent/socket").await;
assert!(result.is_ok()); // Just creating the client succeeds
// Trying to make a request should fail
if let Ok(client) = result {
let list_result = client.list().await;
assert!(matches!(list_result, Err(ClientError::ConnectionError(_))));
}
}
#[tokio::test]
async fn test_http_connection_error() {
// Try to connect to a non-existent HTTP endpoint
let result = Client::http("http://localhost:12345").await;
// This should succeed as we're just creating the client, not making a request
assert!(result.is_ok());
// Try to make a request which should fail
if let Ok(client) = result {
let list_result = client.list().await;
assert!(matches!(list_result, Err(ClientError::ConnectionError(_))));
}
}
// This test only runs if ZINIT_SOCKET is set in the environment
// and points to a valid Zinit socket
#[tokio::test]
#[ignore]
async fn test_live_connection() {
let socket_path = match env::var("ZINIT_SOCKET") {
Ok(path) => path,
Err(_) => {
println!("ZINIT_SOCKET not set, skipping live test");
return;
}
};
let client = match Client::unix_socket(&socket_path).await {
Ok(client) => client,
Err(e) => {
panic!(
"Failed to connect to Zinit socket at {}: {}",
socket_path, e
);
}
};
// Test listing services
let services = client.list().await.expect("Failed to list services");
println!("Found {} services", services.len());
// If there are services, test getting status of the first one
if let Some((service_name, _)) = services.iter().next() {
let status = client
.status(service_name)
.await
.expect("Failed to get service status");
println!("Service {} has PID {}", service_name, status.pid);
}
}