Merge commit '2fda71af117a90da5f496d8bb8105f0ee9e07420' as 'components/zinit'

This commit is contained in:
2025-08-16 21:12:16 +02:00
48 changed files with 11203 additions and 0 deletions

View File

@@ -0,0 +1,139 @@
use super::rpc::{
ZinitLoggingApiServer, ZinitRpcApiServer, ZinitServiceApiServer, ZinitSystemApiServer,
};
use crate::zinit::ZInit;
use anyhow::{bail, Context, Result};
use jsonrpsee::server::ServerHandle;
use reth_ipc::server::Builder;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tower_http::cors::{AllowHeaders, AllowMethods};
use tower_http::cors::{Any, CorsLayer};
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
struct ZinitResponse {
pub state: ZinitState,
pub body: Value,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
enum ZinitState {
Ok,
Error,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub struct Status {
pub name: String,
pub pid: u32,
pub state: String,
pub target: String,
pub after: HashMap<String, String>,
}
/// Service stats information
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub struct Stats {
pub name: String,
pub pid: u32,
pub memory_usage: u64,
pub cpu_usage: f32,
pub children: Vec<ChildStats>,
}
/// Child process stats information
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub struct ChildStats {
pub pid: u32,
pub memory_usage: u64,
pub cpu_usage: f32,
}
pub struct ApiServer {
_handle: ServerHandle,
}
#[derive(Clone)]
pub struct Api {
pub zinit: ZInit,
pub http_server_handle: Arc<Mutex<Option<jsonrpsee::server::ServerHandle>>>,
}
impl Api {
pub fn new(zinit: ZInit) -> Api {
Api {
zinit,
http_server_handle: Arc::new(Mutex::new(None)),
}
}
pub async fn serve(&self, endpoint: String) -> Result<ApiServer> {
let server = Builder::default().build(endpoint);
let mut module = ZinitRpcApiServer::into_rpc(self.clone());
module.merge(ZinitSystemApiServer::into_rpc(self.clone()))?;
module.merge(ZinitServiceApiServer::into_rpc(self.clone()))?;
module.merge(ZinitLoggingApiServer::into_rpc(self.clone()))?;
let _handle = server.start(module).await?;
Ok(ApiServer { _handle })
}
/// Start an HTTP/RPC server at a specified address
pub async fn start_http_server(&self, address: String) -> Result<String> {
// Parse the address string
let socket_addr = address
.parse::<std::net::SocketAddr>()
.context("Failed to parse socket address")?;
let cors = CorsLayer::new()
// Allow `POST` when accessing the resource
.allow_methods(AllowMethods::any())
// Allow requests from any origin
.allow_origin(Any)
.allow_headers(AllowHeaders::any());
let middleware = tower::ServiceBuilder::new().layer(cors);
// Create the JSON-RPC server with CORS support
let server_rpc = jsonrpsee::server::ServerBuilder::default()
.set_http_middleware(middleware)
.build(socket_addr)
.await?;
// Create and merge all API modules
let mut rpc_module = ZinitRpcApiServer::into_rpc(self.clone());
rpc_module.merge(ZinitSystemApiServer::into_rpc(self.clone()))?;
rpc_module.merge(ZinitServiceApiServer::into_rpc(self.clone()))?;
rpc_module.merge(ZinitLoggingApiServer::into_rpc(self.clone()))?;
// Start the server
let handle = server_rpc.start(rpc_module);
// Store the handle
let mut http_handle = self.http_server_handle.lock().await;
*http_handle = Some(handle);
Ok(format!("HTTP/RPC server started at {}", address))
}
/// Stop the HTTP/RPC server if running
pub async fn stop_http_server(&self) -> Result<()> {
let mut http_handle = self.http_server_handle.lock().await;
if http_handle.is_some() {
// The handle is automatically dropped, which should stop the server
*http_handle = None;
Ok(())
} else {
bail!("No HTTP/RPC server is currently running")
}
}
}

View File

@@ -0,0 +1,265 @@
pub mod api;
pub mod rpc;
use crate::zinit;
use anyhow::{Context, Result};
use api::ApiServer;
use reth_ipc::client::IpcClientBuilder;
use rpc::ZinitLoggingApiClient;
use rpc::ZinitServiceApiClient;
use rpc::ZinitSystemApiClient;
use serde_yaml as encoder;
use std::net::ToSocketAddrs;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::signal;
use tokio::time;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
fn logger(level: log::LevelFilter) -> Result<()> {
let logger = fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"zinit: {} ({}) {}",
record.level(),
record.target(),
message
))
})
.level(level)
.chain(std::io::stdout());
let logger = match std::fs::OpenOptions::new().write(true).open("/dev/kmsg") {
Ok(file) => logger.chain(file),
Err(_err) => logger,
};
logger.apply()?;
Ok(())
}
fn absolute<P: AsRef<Path>>(p: P) -> Result<PathBuf> {
let p = p.as_ref();
let result = if p.is_absolute() {
p.to_path_buf()
} else {
let mut current = std::env::current_dir()?;
current.push(p);
current
};
Ok(result)
}
pub async fn init(
cap: usize,
config: &str,
socket: &str,
container: bool,
debug: bool,
) -> Result<ApiServer> {
fs::create_dir_all(config)
.await
.with_context(|| format!("failed to create config directory '{}'", config))?;
if let Err(err) = logger(if debug {
log::LevelFilter::Debug
} else {
log::LevelFilter::Info
}) {
eprintln!("failed to setup logging: {}", err);
}
let config = absolute(Path::new(config)).context("failed to get config dire absolute path")?;
let socket_path =
absolute(Path::new(socket)).context("failed to get socket file absolute path")?;
if let Some(dir) = socket_path.parent() {
fs::create_dir_all(dir)
.await
.with_context(|| format!("failed to create directory {:?}", dir))?;
}
let _ = fs::remove_file(&socket).await;
debug!("switching to home dir: {}", config.display());
std::env::set_current_dir(&config).with_context(|| {
format!(
"failed to switch working directory to '{}'",
config.display()
)
})?;
let init = zinit::ZInit::new(cap, container);
init.serve();
let services = zinit::config::load_dir(&config)?;
for (k, v) in services {
if let Err(err) = init.monitor(&k, v).await {
error!("failed to monitor service {}: {}", k, err);
};
}
let a = api::Api::new(init);
a.serve(socket.into()).await
}
pub async fn list(socket: &str) -> Result<()> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
let results = client.list().await?;
encoder::to_writer(std::io::stdout(), &results)?;
Ok(())
}
pub async fn shutdown(socket: &str) -> Result<()> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
client.shutdown().await?;
Ok(())
}
pub async fn reboot(socket: &str) -> Result<()> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
client.reboot().await?;
Ok(())
}
pub async fn status(socket: &str, name: String) -> Result<()> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
let results = client.status(name).await?;
encoder::to_writer(std::io::stdout(), &results)?;
Ok(())
}
pub async fn start(socket: &str, name: String) -> Result<()> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
client.start(name).await?;
Ok(())
}
pub async fn stop(socket: &str, name: String) -> Result<()> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
client.stop(name).await?;
Ok(())
}
pub async fn restart(socket: &str, name: String) -> Result<()> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
client.stop(name.clone()).await?;
//pull status
for _ in 0..20 {
let result = client.status(name.clone()).await?;
if result.pid == 0 && result.target == "Down" {
client.start(name.clone()).await?;
return Ok(());
}
time::sleep(std::time::Duration::from_secs(1)).await;
}
// process not stopped try to kill it
client.kill(name.clone(), "SIGKILL".into()).await?;
client.start(name).await?;
Ok(())
}
pub async fn forget(socket: &str, name: String) -> Result<()> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
client.forget(name).await?;
Ok(())
}
pub async fn monitor(socket: &str, name: String) -> Result<()> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
client.monitor(name).await?;
Ok(())
}
pub async fn kill(socket: &str, name: String, signal: String) -> Result<()> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
client.kill(name, signal).await?;
Ok(())
}
pub async fn stats(socket: &str, name: String) -> Result<()> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
let results = client.stats(name).await?;
encoder::to_writer(std::io::stdout(), &results)?;
Ok(())
}
pub async fn logs(
socket: &str,
filter: Option<String>,
follow: bool,
) -> Result<impl Stream<Item = String> + Unpin> {
let client = IpcClientBuilder::default().build(socket.into()).await?;
if let Some(ref filter) = filter {
client.status(filter.clone()).await?;
}
let logs = client.logs(filter.clone()).await?;
let (tx, rx) = tokio::sync::mpsc::channel(2000);
let logs_sub = if follow {
Some(client.log_subscribe(filter).await?)
} else {
None
};
tokio::task::spawn(async move {
for log in logs {
if tx.send(log).await.is_err() {
if let Some(logs_sub) = logs_sub {
let _ = logs_sub.unsubscribe().await;
}
// error means receiver is dead, so just quit
return;
}
}
let Some(mut logs_sub) = logs_sub else { return };
loop {
match logs_sub.next().await {
Some(Ok(log)) => {
if tx.send(log).await.is_err() {
let _ = logs_sub.unsubscribe().await;
return;
}
}
Some(Err(e)) => {
log::error!("Failed to get new log from subscription: {e}");
return;
}
_ => return,
}
}
});
Ok(ReceiverStream::new(rx))
}
/// Start an HTTP/RPC proxy server for the Zinit API at the specified address
pub async fn proxy(sock: &str, address: String) -> Result<()> {
// Parse the socket address
let _socket_addr = address
.to_socket_addrs()
.context("Failed to parse socket address")?
.next()
.context("No valid socket address found")?;
println!("Starting HTTP/RPC server on {}", address);
println!("Connecting to Zinit daemon at {}", sock);
// Connect to the existing Zinit daemon through the Unix socket
let client = IpcClientBuilder::default().build(sock.into()).await?;
// Issue an RPC call to start the HTTP server on the specified address
let result = client.start_http_server(address.clone()).await?;
println!("{}", result);
println!("Press Ctrl+C to stop");
// Wait for Ctrl+C to shutdown
signal::ctrl_c().await?;
// Shutdown the HTTP server
client.stop_http_server().await?;
println!("HTTP/RPC server stopped");
Ok(())
}

View File

@@ -0,0 +1,426 @@
use crate::app::api::{ChildStats, Stats, Status};
use crate::zinit::config;
use async_trait::async_trait;
use jsonrpsee::core::{RpcResult, SubscriptionResult};
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::{ErrorCode, ErrorObjectOwned};
use jsonrpsee::PendingSubscriptionSink;
use serde_json::{Map, Value};
use std::collections::HashMap;
use std::str::FromStr;
use tokio_stream::StreamExt;
use super::api::Api;
// Custom error codes for Zinit
const SERVICE_NOT_FOUND: i32 = -32000;
const SERVICE_IS_UP: i32 = -32002;
const SHUTTING_DOWN: i32 = -32006;
const SERVICE_ALREADY_EXISTS: i32 = -32007;
const SERVICE_FILE_ERROR: i32 = -32008;
// Include the OpenRPC specification
const OPENRPC_SPEC: &str = include_str!("../../openrpc.json");
/// RPC methods for discovery.
#[rpc(server, client)]
pub trait ZinitRpcApi {
/// Returns the OpenRPC specification as a string.
#[method(name = "rpc.discover")]
async fn discover(&self) -> RpcResult<String>;
}
#[async_trait]
impl ZinitRpcApiServer for Api {
async fn discover(&self) -> RpcResult<String> {
Ok(OPENRPC_SPEC.to_string())
}
}
/// RPC methods for service management.
#[rpc(server, client, namespace = "service")]
pub trait ZinitServiceApi {
/// List all monitored services and their current state.
/// Returns a map where keys are service names and values are state strings.
#[method(name = "list")]
async fn list(&self) -> RpcResult<HashMap<String, String>>;
/// Get the detailed status of a specific service.
#[method(name = "status")]
async fn status(&self, name: String) -> RpcResult<Status>;
/// Start a specific service.
#[method(name = "start")]
async fn start(&self, name: String) -> RpcResult<()>;
/// Stop a specific service.
#[method(name = "stop")]
async fn stop(&self, name: String) -> RpcResult<()>;
/// Load and monitor a new service from its configuration file (e.g., "service_name.yaml").
#[method(name = "monitor")]
async fn monitor(&self, name: String) -> RpcResult<()>;
/// Stop monitoring a service and remove it from management.
#[method(name = "forget")]
async fn forget(&self, name: String) -> RpcResult<()>;
/// Send a signal (e.g., "SIGTERM", "SIGKILL") to a specific service process.
#[method(name = "kill")]
async fn kill(&self, name: String, signal: String) -> RpcResult<()>;
/// Create a new service configuration file (e.g., "service_name.yaml")
/// with the provided content (JSON map representing YAML structure).
/// Returns a success message string.
#[method(name = "create")]
async fn create(&self, name: String, content: Map<String, Value>) -> RpcResult<String>;
/// Delete a service configuration file.
/// Returns a success message string.
#[method(name = "delete")]
async fn delete(&self, name: String) -> RpcResult<String>;
/// Get the content of a service configuration file as a JSON Value.
#[method(name = "get")]
async fn get(&self, name: String) -> RpcResult<Value>;
/// Get memory and CPU usage statistics for a service.
#[method(name = "stats")]
async fn stats(&self, name: String) -> RpcResult<Stats>;
}
#[async_trait]
impl ZinitServiceApiServer for Api {
async fn list(&self) -> RpcResult<HashMap<String, String>> {
let services = self
.zinit
.list()
.await
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))?;
let mut map: HashMap<String, String> = HashMap::new();
for service in services {
let state = self
.zinit
.status(&service)
.await
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))?;
map.insert(service, format!("{:?}", state.state));
}
Ok(map)
}
async fn status(&self, name: String) -> RpcResult<Status> {
let status = self
.zinit
.status(&name)
.await
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))?;
let result = Status {
name: name.clone(),
pid: status.pid.as_raw() as u32,
state: format!("{:?}", status.state),
target: format!("{:?}", status.target),
after: {
let mut after = HashMap::new();
for service in status.service.after {
let status = match self.zinit.status(&service).await {
Ok(dep) => dep.state,
Err(_) => crate::zinit::State::Unknown,
};
after.insert(service, format!("{:?}", status));
}
after
},
};
Ok(result)
}
async fn start(&self, name: String) -> RpcResult<()> {
self.zinit
.start(name)
.await
.map_err(|_| ErrorObjectOwned::from(ErrorCode::ServerError(SERVICE_IS_UP)))
}
async fn stop(&self, name: String) -> RpcResult<()> {
self.zinit
.stop(name)
.await
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))
}
async fn monitor(&self, name: String) -> RpcResult<()> {
if let Ok((name_str, service)) = config::load(format!("{}.yaml", name))
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))
{
self.zinit
.monitor(name_str, service)
.await
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))
} else {
Err(ErrorObjectOwned::from(ErrorCode::InternalError))
}
}
async fn forget(&self, name: String) -> RpcResult<()> {
self.zinit
.forget(name)
.await
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))
}
async fn kill(&self, name: String, signal: String) -> RpcResult<()> {
if let Ok(sig) = nix::sys::signal::Signal::from_str(&signal.to_uppercase()) {
self.zinit
.kill(name, sig)
.await
.map_err(|_e| ErrorObjectOwned::from(ErrorCode::InternalError))
} else {
Err(ErrorObjectOwned::from(ErrorCode::InternalError))
}
}
async fn create(&self, name: String, content: Map<String, Value>) -> RpcResult<String> {
use std::fs;
use std::io::Write;
use std::path::PathBuf;
// Validate service name (no path traversal, valid characters)
if name.contains('/') || name.contains('\\') || name.contains('.') {
return Err(ErrorObjectOwned::from(ErrorCode::InternalError));
}
// Construct the file path
let file_path = PathBuf::from(format!("{}.yaml", name));
// Check if the service file already exists
if file_path.exists() {
return Err(ErrorObjectOwned::from(ErrorCode::ServerError(
SERVICE_ALREADY_EXISTS,
)));
}
// Convert the JSON content to YAML
let yaml_content = serde_yaml::to_string(&content)
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))?;
// Write the YAML content to the file
let mut file = fs::File::create(&file_path)
.map_err(|_| ErrorObjectOwned::from(ErrorCode::ServerError(SERVICE_FILE_ERROR)))?;
file.write_all(yaml_content.as_bytes())
.map_err(|_| ErrorObjectOwned::from(ErrorCode::ServerError(SERVICE_FILE_ERROR)))?;
Ok(format!("Service '{}' created successfully", name))
}
async fn delete(&self, name: String) -> RpcResult<String> {
use std::fs;
use std::path::PathBuf;
// Validate service name (no path traversal, valid characters)
if name.contains('/') || name.contains('\\') || name.contains('.') {
return Err(ErrorObjectOwned::from(ErrorCode::InternalError));
}
// Construct the file path
let file_path = PathBuf::from(format!("{}.yaml", name));
// Check if the service file exists
if !file_path.exists() {
return Err(ErrorObjectOwned::from(ErrorCode::ServerError(
SERVICE_NOT_FOUND,
)));
}
// Delete the file
fs::remove_file(&file_path)
.map_err(|_| ErrorObjectOwned::from(ErrorCode::ServerError(SERVICE_FILE_ERROR)))?;
Ok(format!("Service '{}' deleted successfully", name))
}
async fn get(&self, name: String) -> RpcResult<Value> {
use std::fs;
use std::path::PathBuf;
// Validate service name (no path traversal, valid characters)
if name.contains('/') || name.contains('\\') || name.contains('.') {
return Err(ErrorObjectOwned::from(ErrorCode::InternalError));
}
// Construct the file path
let file_path = PathBuf::from(format!("{}.yaml", name));
// Check if the service file exists
if !file_path.exists() {
return Err(ErrorObjectOwned::from(ErrorCode::ServerError(
SERVICE_NOT_FOUND,
)));
}
// Read the file content
let yaml_content = fs::read_to_string(&file_path)
.map_err(|_| ErrorObjectOwned::from(ErrorCode::ServerError(SERVICE_FILE_ERROR)))?;
// Parse YAML to JSON
let yaml_value: serde_yaml::Value = serde_yaml::from_str(&yaml_content)
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))?;
// Convert YAML value to JSON value
let json_value = serde_json::to_value(yaml_value)
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))?;
Ok(json_value)
}
async fn stats(&self, name: String) -> RpcResult<Stats> {
let stats = self
.zinit
.stats(&name)
.await
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))?;
let result = Stats {
name: name.clone(),
pid: stats.pid as u32,
memory_usage: stats.memory_usage,
cpu_usage: stats.cpu_usage,
children: stats
.children
.into_iter()
.map(|child| ChildStats {
pid: child.pid as u32,
memory_usage: child.memory_usage,
cpu_usage: child.cpu_usage,
})
.collect(),
};
Ok(result)
}
}
/// RPC methods for system-level operations.
#[rpc(server, client, namespace = "system")]
pub trait ZinitSystemApi {
/// Initiate system shutdown process.
#[method(name = "shutdown")]
async fn shutdown(&self) -> RpcResult<()>;
/// Initiate system reboot process.
#[method(name = "reboot")]
async fn reboot(&self) -> RpcResult<()>;
/// Start an HTTP/RPC server at the specified address
#[method(name = "start_http_server")]
async fn start_http_server(&self, address: String) -> RpcResult<String>;
/// Stop the HTTP/RPC server if running
#[method(name = "stop_http_server")]
async fn stop_http_server(&self) -> RpcResult<()>;
}
#[async_trait]
impl ZinitSystemApiServer for Api {
async fn shutdown(&self) -> RpcResult<()> {
self.zinit
.shutdown()
.await
.map_err(|_e| ErrorObjectOwned::from(ErrorCode::ServerError(SHUTTING_DOWN)))
}
async fn reboot(&self) -> RpcResult<()> {
self.zinit
.reboot()
.await
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))
}
async fn start_http_server(&self, address: String) -> RpcResult<String> {
// Call the method from the API implementation
match crate::app::api::Api::start_http_server(self, address).await {
Ok(result) => Ok(result),
Err(_) => Err(ErrorObjectOwned::from(ErrorCode::InternalError)),
}
}
async fn stop_http_server(&self) -> RpcResult<()> {
// Call the method from the API implementation
match crate::app::api::Api::stop_http_server(self).await {
Ok(_) => Ok(()),
Err(_) => Err(ErrorObjectOwned::from(ErrorCode::InternalError)),
}
}
}
/// RPC subscription methods for streaming data.
#[rpc(server, client, namespace = "stream")]
pub trait ZinitLoggingApi {
#[method(name = "currentLogs")]
async fn logs(&self, name: Option<String>) -> RpcResult<Vec<String>>;
/// Subscribe to log messages generated by zinit and monitored services.
/// An optional filter can be provided to only receive logs containing the filter string.
/// The subscription returns a stream of log lines (String).
#[subscription(name = "subscribeLogs", item = String)]
async fn log_subscribe(&self, filter: Option<String>) -> SubscriptionResult;
}
#[async_trait]
impl ZinitLoggingApiServer for Api {
async fn logs(&self, name: Option<String>) -> RpcResult<Vec<String>> {
let filter = name.map(|n| format!("{n}:"));
Ok(
tokio_stream::wrappers::ReceiverStream::new(self.zinit.logs(true, false).await)
.filter_map(|l| {
if let Some(ref filter) = filter {
if l[4..].starts_with(filter) {
Some(l.to_string())
} else {
None
}
} else {
Some(l.to_string())
}
})
.collect()
.await,
)
}
async fn log_subscribe(
&self,
sink: PendingSubscriptionSink,
name: Option<String>,
) -> SubscriptionResult {
let sink = sink.accept().await?;
let filter = name.map(|n| format!("{n}:"));
let mut stream =
tokio_stream::wrappers::ReceiverStream::new(self.zinit.logs(false, true).await)
.filter_map(|l| {
if let Some(ref filter) = filter {
if l[4..].starts_with(filter) {
Some(l.to_string())
} else {
None
}
} else {
Some(l.to_string())
}
});
while let Some(log) = stream.next().await {
if sink
.send(serde_json::value::to_raw_value(&log)?)
.await
.is_err()
{
break;
}
}
Ok(())
}
}

View File

@@ -0,0 +1,172 @@
#[tokio::main]
async fn main() {
println!("hello from testapp");
}
// extern crate zinit;
// use anyhow::Result;
// use serde_json::json;
// use std::env;
// use tokio::time::{sleep, Duration};
// use zinit::testapp;
// #[tokio::main]
// async fn main() -> Result<()> {
// // Define paths for socket and config
// let temp_dir = env::temp_dir();
// let socket_path = temp_dir
// .join("zinit-test.sock")
// .to_str()
// .unwrap()
// .to_string();
// let config_dir = temp_dir
// .join("zinit-test-config")
// .to_str()
// .unwrap()
// .to_string();
// println!("Starting zinit with socket at: {}", socket_path);
// println!("Using config directory: {}", config_dir);
// // Start zinit in the background
// testapp::start_zinit(&socket_path, &config_dir).await?;
// // Wait for zinit to initialize
// sleep(Duration::from_secs(2)).await;
// // Create a client to communicate with zinit
// let client = Client::new(&socket_path);
// // Create service configurations
// println!("Creating service configurations...");
// // Create a find service
// testapp::create_service_config(
// &config_dir,
// "find-service",
// "find / -name \"*.txt\" -type f",
// )
// .await?;
// // Create a sleep service with echo
// testapp::create_service_config(
// &config_dir,
// "sleep-service",
// "sh -c 'echo Starting sleep; sleep 30; echo Finished sleep'",
// )
// .await?;
// // Wait for zinit to load the configurations
// sleep(Duration::from_secs(1)).await;
// // Tell zinit to monitor our services
// println!("Monitoring services...");
// client.monitor("find-service").await?;
// client.monitor("sleep-service").await?;
// // List all services
// println!("\nListing all services:");
// let services = client.list().await?;
// for (name, status) in services {
// println!("Service: {} - Status: {}", name, status);
// }
// // Start the find service
// println!("\nStarting find-service...");
// client.start("find-service").await?;
// // Wait a bit and check status
// sleep(Duration::from_secs(2)).await;
// let status = client.status("find-service").await?;
// println!("find-service status: {:?}", status);
// // Start the sleep service
// println!("\nStarting sleep-service...");
// client.start("sleep-service").await?;
// // Wait a bit and check status
// sleep(Duration::from_secs(2)).await;
// let status = client.status("sleep-service").await?;
// println!("sleep-service status: {:?}", status);
// // Stop the find service
// println!("\nStopping find-service...");
// client.stop("find-service").await?;
// // Wait a bit and check status
// sleep(Duration::from_secs(2)).await;
// let status = client.status("find-service").await?;
// println!("find-service status after stopping: {:?}", status);
// // Kill the sleep service with SIGTERM
// println!("\nKilling sleep-service with SIGTERM...");
// client.kill("sleep-service", "SIGTERM").await?;
// // Wait a bit and check status
// sleep(Duration::from_secs(2)).await;
// let status = client.status("sleep-service").await?;
// println!("sleep-service status after killing: {:?}", status);
// // Cleanup - forget services
// println!("\nForgetting services...");
// if status.pid == 0 {
// // Only forget if it's not running
// client.forget("sleep-service").await?;
// }
// client.forget("find-service").await?;
// // Demonstrate service file operations
// println!("\nDemonstrating service file operations...");
// // Create a new service using the API
// println!("Creating a new service via API...");
// let service_content = json!({
// "exec": "echo 'Hello from API-created service'",
// "oneshot": true,
// "log": "stdout"
// })
// .as_object()
// .unwrap()
// .clone();
// let result = client
// .create_service("api-service", service_content)
// .await?;
// println!("Create service result: {}", result);
// // Get the service configuration
// println!("\nGetting service configuration...");
// let config = client.get_service("api-service").await?;
// println!(
// "Service configuration: {}",
// serde_json::to_string_pretty(&config)?
// );
// // Monitor and start the new service
// println!("\nMonitoring and starting the new service...");
// client.monitor("api-service").await?;
// client.start("api-service").await?;
// // Wait a bit and check status
// sleep(Duration::from_secs(2)).await;
// let status = client.status("api-service").await?;
// println!("api-service status: {:?}", status);
// // Delete the service
// println!("\nDeleting the service...");
// if status.pid == 0 {
// // Only forget if it's not running
// client.forget("api-service").await?;
// let result = client.delete_service("api-service").await?;
// println!("Delete service result: {}", result);
// }
// // Shutdown zinit
// println!("\nShutting down zinit...");
// client.shutdown().await?;
// println!("\nTest completed successfully!");
// Ok(())
// }

View File

@@ -0,0 +1,11 @@
extern crate serde;
#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate log;
extern crate tokio;
pub mod app;
pub mod manager;
pub mod testapp;
pub mod zinit;

View File

@@ -0,0 +1,281 @@
extern crate zinit;
use anyhow::Result;
use clap::{App, Arg, SubCommand};
use git_version::git_version;
use tokio_stream::StreamExt;
use zinit::app;
const GIT_VERSION: &str = git_version!(args = ["--tags", "--always", "--dirty=-modified"]);
#[tokio::main]
async fn main() -> Result<()> {
let matches = App::new("zinit")
.author("ThreeFold Tech, https://github.com/threefoldtech")
.version(GIT_VERSION)
.about("A runit replacement")
.arg(Arg::with_name("socket").value_name("SOCKET").short("s").long("socket").default_value("/tmp/zinit.sock").help("path to unix socket"))
.arg(Arg::with_name("debug").short("d").long("debug").help("run in debug mode"))
.subcommand(
SubCommand::with_name("init")
.arg(
Arg::with_name("config")
.value_name("DIR")
.short("c")
.long("config")
.help("service configurations directory"),
)
.arg(
Arg::with_name("buffer")
.value_name("BUFFER")
.short("b")
.long("buffer")
.help("buffer size (in lines) to keep services logs")
.default_value("2000")
)
.arg(Arg::with_name("container").long("container").help("run in container mode, shutdown on signal"))
.about("run in init mode, start and maintain configured services"),
)
.subcommand(
SubCommand::with_name("list")
.about("quick view of current known services and their status"),
)
.subcommand(
SubCommand::with_name("shutdown")
.about("stop all services and power off"),
)
.subcommand(
SubCommand::with_name("reboot")
.about("stop all services and reboot"),
)
.subcommand(
SubCommand::with_name("status")
.arg(
Arg::with_name("service")
.value_name("SERVICE")
.required(true)
.help("service name"),
)
.about("show detailed service status"),
)
.subcommand(
SubCommand::with_name("stop")
.arg(
Arg::with_name("service")
.value_name("SERVICE")
.required(true)
.help("service name"),
)
.about("stop service"),
)
.subcommand(
SubCommand::with_name("start")
.arg(
Arg::with_name("service")
.value_name("SERVICE")
.required(true)
.help("service name"),
)
.about("start service. has no effect if the service is already running"),
)
.subcommand(
SubCommand::with_name("forget")
.arg(
Arg::with_name("service")
.value_name("SERVICE")
.required(true)
.help("service name"),
)
.about("forget a service. you can only forget a stopped service"),
)
.subcommand(
SubCommand::with_name("monitor")
.arg(
Arg::with_name("service")
.value_name("SERVICE")
.required(true)
.help("service name"),
)
.about("start monitoring a service. configuration is loaded from server config directory"),
)
.subcommand(
SubCommand::with_name("log")
.arg(
Arg::with_name("snapshot")
.short("s")
.long("snapshot")
.required(false)
.help("if set log prints current buffer without following")
)
.arg(
Arg::with_name("filter")
.value_name("FILTER")
.required(false)
.help("an optional 'exact' service name")
)
.about("view services logs from zinit ring buffer"),
)
.subcommand(
SubCommand::with_name("kill")
.arg(
Arg::with_name("service")
.value_name("SERVICE")
.required(true)
.help("service name"),
)
.arg(
Arg::with_name("signal")
.value_name("SIGNAL")
.required(true)
.default_value("SIGTERM")
.help("signal name (example: SIGTERM)"),
)
.about("send a signal to a running service."),
)
.subcommand(
SubCommand::with_name("restart")
.arg(
Arg::with_name("service")
.value_name("SERVICE")
.required(true)
.help("service name"),
)
.about("restart a service."),
)
.subcommand(
SubCommand::with_name("stats")
.arg(
Arg::with_name("service")
.value_name("SERVICE")
.required(true)
.help("service name"),
)
.about("show memory and CPU usage statistics for a service"),
)
.subcommand(
SubCommand::with_name("proxy")
.arg(
Arg::with_name("address")
.value_name("ADDRESS")
.short("a")
.long("address")
.default_value("127.0.0.1:8080")
.help("address to bind the HTTP/RPC server to"),
)
.about("start an HTTP/RPC proxy for Zinit API"),
)
.get_matches();
use dirs; // Add this import
let socket = matches.value_of("socket").unwrap();
let debug = matches.is_present("debug");
let config_path = if let Some(config_arg) = matches.value_of("config") {
config_arg.to_string()
} else {
#[cfg(target_os = "macos")]
{
let home_dir = dirs::home_dir()
.ok_or_else(|| anyhow::anyhow!("Could not determine home directory"))?;
home_dir
.join("hero")
.join("cfg")
.join("zinit")
.to_str()
.ok_or_else(|| anyhow::anyhow!("Invalid path for config directory"))?
.to_string()
}
#[cfg(not(target_os = "macos"))]
{
"/etc/zinit/".to_string()
}
};
let result = match matches.subcommand() {
("init", Some(matches)) => {
let _server = app::init(
matches.value_of("buffer").unwrap().parse().unwrap(),
&config_path, // Use the determined config_path
socket,
matches.is_present("container"),
debug,
)
.await?;
tokio::signal::ctrl_c().await?;
Ok(())
}
("list", _) => app::list(socket).await,
("shutdown", _) => app::shutdown(socket).await,
("reboot", _) => app::reboot(socket).await,
// ("log", Some(matches)) => app::log(matches.value_of("filter")),
("status", Some(matches)) => {
app::status(socket, matches.value_of("service").unwrap().to_string()).await
}
("stop", Some(matches)) => {
app::stop(socket, matches.value_of("service").unwrap().to_string()).await
}
("start", Some(matches)) => {
app::start(socket, matches.value_of("service").unwrap().to_string()).await
}
("forget", Some(matches)) => {
app::forget(socket, matches.value_of("service").unwrap().to_string()).await
}
("monitor", Some(matches)) => {
app::monitor(socket, matches.value_of("service").unwrap().to_string()).await
}
("kill", Some(matches)) => {
app::kill(
socket,
matches.value_of("service").unwrap().to_string(),
matches.value_of("signal").unwrap().to_string(),
)
.await
}
("log", Some(matches)) => {
let mut stream = app::logs(
socket,
matches.value_of("filter").map(|s| s.to_string()),
!matches.is_present("snapshot"),
)
.await?;
loop {
tokio::select! {
item = stream.next() => {
match item {
Some(log_entry) => {
println!("{}", log_entry);
},
None => break
}
}
_ = tokio::signal::ctrl_c() => {
break
}
}
}
Ok(())
}
("restart", Some(matches)) => {
app::restart(socket, matches.value_of("service").unwrap().to_string()).await
}
("stats", Some(matches)) => {
app::stats(socket, matches.value_of("service").unwrap().to_string()).await
}
("proxy", Some(matches)) => {
app::proxy(socket, matches.value_of("address").unwrap().to_string()).await
}
_ => app::list(socket).await, // default command
};
match result {
Ok(_) => Ok(()),
Err(e) => {
eprintln!("{:#}", e);
std::process::exit(1);
}
}
}

View File

@@ -0,0 +1,149 @@
use anyhow::Result;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{mpsc, Mutex};
struct Buffer<T> {
inner: Vec<T>,
at: usize,
}
impl<T: Clone> Buffer<T> {
pub fn new(cap: usize) -> Buffer<T> {
Buffer {
inner: Vec::with_capacity(cap),
at: 0,
}
}
fn len(&self) -> usize {
self.inner.len()
}
pub fn cap(&self) -> usize {
self.inner.capacity()
}
pub fn push(&mut self, o: T) {
if self.len() < self.cap() {
self.inner.push(o);
} else {
self.inner[self.at] = o;
}
self.at = (self.at + 1) % self.cap();
}
}
impl<'a, T: 'a> IntoIterator for &'a Buffer<T> {
type IntoIter = BufferIter<'a, T>;
type Item = &'a T;
fn into_iter(self) -> Self::IntoIter {
let (second, first) = self.inner.split_at(self.at);
BufferIter {
first,
second,
index: 0,
}
}
}
pub struct BufferIter<'a, T> {
first: &'a [T],
second: &'a [T],
index: usize,
}
impl<'a, T> Iterator for BufferIter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
let index = self.index;
self.index += 1;
if index < self.first.len() {
Some(&self.first[index])
} else if index - self.first.len() < self.second.len() {
Some(&self.second[index - self.first.len()])
} else {
None
}
}
}
pub type Logs = mpsc::Receiver<Arc<String>>;
#[derive(Clone)]
pub struct Ring {
buffer: Arc<Mutex<Buffer<Arc<String>>>>,
sender: broadcast::Sender<Arc<String>>,
}
impl Ring {
pub fn new(cap: usize) -> Ring {
let (tx, _) = broadcast::channel(100);
Ring {
buffer: Arc::new(Mutex::new(Buffer::new(cap))),
sender: tx,
}
}
pub async fn push(&self, line: String) -> Result<()> {
let line = Arc::new(line.clone());
self.buffer.lock().await.push(Arc::clone(&line));
self.sender.send(line)?;
Ok(())
}
/// stream returns a continues stream that first receive
/// a snapshot of the current buffer.
/// then if follow is true the logs stream will remain
/// open and fed each received line forever until the
/// received closed the channel from its end.
pub async fn stream(&self, existing_logs: bool, follow: bool) -> Logs {
let (tx, stream) = mpsc::channel::<Arc<String>>(100);
let mut rx = self.sender.subscribe();
let buffer = if existing_logs {
// Get current exisiting logs
self.buffer
.lock()
.await
.into_iter()
.cloned()
.collect::<Vec<_>>()
} else {
// Don't care about existing logs
vec![]
};
tokio::spawn(async move {
for item in buffer {
let _ = tx.send(Arc::clone(&item)).await;
}
if !follow {
return;
}
loop {
let line = match rx.recv().await {
Ok(line) => line,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(n)) => {
Arc::new(format!("[-] zinit: {} lines dropped", n))
}
};
if tx.send(line).await.is_err() {
// client disconnected.
break;
}
}
});
stream
}
}

View File

@@ -0,0 +1,253 @@
use std::collections::HashMap;
use anyhow::{Context, Result};
use command_group::CommandGroup;
use nix::sys::signal;
use nix::sys::wait::{self, WaitStatus};
use nix::unistd::Pid;
use std::fs::File as StdFile;
use std::os::unix::io::FromRawFd;
use std::os::unix::io::IntoRawFd;
use std::process::Command;
use std::process::Stdio;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::signal::unix;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
mod buffer;
pub use buffer::Logs;
pub struct Process {
cmd: String,
env: HashMap<String, String>,
cwd: String,
}
type WaitChannel = oneshot::Receiver<WaitStatus>;
pub struct Child {
pub pid: Pid,
ch: WaitChannel,
}
impl Child {
pub fn new(pid: Pid, ch: WaitChannel) -> Child {
Child { pid, ch }
}
pub async fn wait(self) -> Result<WaitStatus> {
Ok(self.ch.await?)
}
}
type Handler = oneshot::Sender<WaitStatus>;
impl Process {
pub fn new<S: Into<String>>(cmd: S, cwd: S, env: Option<HashMap<String, String>>) -> Process {
let env = env.unwrap_or_default();
Process {
env,
cmd: cmd.into(),
cwd: cwd.into(),
}
}
}
#[derive(Clone)]
pub enum Log {
None,
Stdout,
Ring(String),
}
#[derive(Clone)]
pub struct ProcessManager {
table: Arc<Mutex<HashMap<Pid, Handler>>>,
ring: buffer::Ring,
env: Environ,
}
impl ProcessManager {
pub fn new(cap: usize) -> ProcessManager {
ProcessManager {
table: Arc::new(Mutex::new(HashMap::new())),
ring: buffer::Ring::new(cap),
env: Environ::new(),
}
}
fn wait_process() -> Vec<WaitStatus> {
let mut statuses: Vec<WaitStatus> = Vec::new();
loop {
let status = match wait::waitpid(Option::None, Some(wait::WaitPidFlag::WNOHANG)) {
Ok(status) => status,
Err(_) => {
return statuses;
}
};
match status {
WaitStatus::StillAlive => break,
_ => statuses.push(status),
}
}
statuses
}
pub fn start(&self) {
let table = Arc::clone(&self.table);
let mut signals = match unix::signal(unix::SignalKind::child()) {
Ok(s) => s,
Err(err) => {
panic!("failed to bind to signals: {}", err);
}
};
tokio::spawn(async move {
loop {
signals.recv().await;
let mut table = table.lock().await;
for exited in Self::wait_process() {
if let Some(pid) = exited.pid() {
if let Some(sender) = table.remove(&pid) {
if sender.send(exited).is_err() {
debug!("failed to send exit state to process: {}", pid);
}
}
}
}
}
});
}
fn sink(&self, file: File, prefix: String) {
let ring = self.ring.clone();
let reader = BufReader::new(file);
tokio::spawn(async move {
let mut lines = reader.lines();
while let Ok(line) = lines.next_line().await {
let _ = match line {
Some(line) => ring.push(format!("{}: {}", prefix, line)).await,
None => break,
};
}
});
}
pub async fn stream(&self, existing_logs: bool, follow: bool) -> Logs {
self.ring.stream(existing_logs, follow).await
}
pub fn signal(&self, pid: Pid, sig: signal::Signal) -> Result<()> {
Ok(signal::killpg(pid, sig)?)
}
pub async fn run(&self, cmd: Process, log: Log) -> Result<Child> {
let args = shlex::split(&cmd.cmd).context("failed to parse command")?;
if args.is_empty() {
bail!("invalid command");
}
let mut child = Command::new(&args[0]);
let child = if !cmd.cwd.is_empty() {
child.current_dir(&cmd.cwd)
} else {
child.current_dir("/")
};
let child = child.args(&args[1..]).envs(&self.env.0).envs(cmd.env);
let child = match log {
Log::None => child.stdout(Stdio::null()).stderr(Stdio::null()),
Log::Ring(_) => child.stdout(Stdio::piped()).stderr(Stdio::piped()),
_ => child, // default to inherit
};
let mut table = self.table.lock().await;
let mut child = child
.group_spawn()
.context("failed to spawn command")?
.into_inner();
if let Log::Ring(prefix) = log {
let _ = self
.ring
.push(format!("[-] {}: ------------ [start] ------------", prefix))
.await;
if let Some(out) = child.stdout.take() {
let out = File::from_std(unsafe { StdFile::from_raw_fd(out.into_raw_fd()) });
self.sink(out, format!("[+] {}", prefix))
}
if let Some(out) = child.stderr.take() {
let out = File::from_std(unsafe { StdFile::from_raw_fd(out.into_raw_fd()) });
self.sink(out, format!("[-] {}", prefix))
}
}
let (tx, rx) = oneshot::channel();
let id = child.id();
let pid = Pid::from_raw(id as i32);
table.insert(pid, tx);
Ok(Child::new(pid, rx))
}
}
#[derive(Clone)]
struct Environ(HashMap<String, String>);
impl Environ {
fn new() -> Environ {
let env = match Environ::parse("/etc/environment") {
Ok(r) => r,
Err(err) => {
error!("failed to load /etc/environment file: {}", err);
HashMap::new()
}
};
Environ(env)
}
fn parse<P>(p: P) -> Result<HashMap<String, String>, std::io::Error>
where
P: AsRef<std::path::Path>,
{
let mut m = HashMap::new();
let txt = match std::fs::read_to_string(p) {
Ok(txt) => txt,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
info!("skipping /etc/environment file because it does not exist");
"".into()
}
Err(err) => return Err(err),
};
for line in txt.lines() {
let line = line.trim();
if line.starts_with('#') {
continue;
}
let parts: Vec<&str> = line.splitn(2, '=').collect();
let key = String::from(parts[0]);
let value = match parts.len() {
2 => String::from(parts[1]),
_ => String::default(),
};
//m.into_iter()
m.insert(key, value);
}
Ok(m)
}
}

View File

@@ -0,0 +1,264 @@
use anyhow::{Context, Result};
use std::path::Path;
use tokio::time::{sleep, Duration};
use std::env;
use tokio::process::Command;
use tokio::fs;
use std::process::Stdio;
use serde::{Deserialize, Serialize};
use serde_json;
use tokio::net::UnixStream;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream};
use std::collections::HashMap;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
struct Response {
pub state: State,
pub body: serde_json::Value,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
enum State {
Ok,
Error,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
struct Status {
pub name: String,
pub pid: u32,
pub state: String,
pub target: String,
pub after: HashMap<String, String>,
}
struct Client {
socket: String,
}
impl Client {
pub fn new(socket: &str) -> Client {
Client {
socket: socket.to_string(),
}
}
async fn connect(&self) -> Result<UnixStream> {
UnixStream::connect(&self.socket).await.with_context(|| {
format!(
"failed to connect to '{}'. is zinit listening on that socket?",
self.socket
)
})
}
async fn command(&self, c: &str) -> Result<serde_json::Value> {
let mut con = BufStream::new(self.connect().await?);
let _ = con.write(c.as_bytes()).await?;
let _ = con.write(b"\n").await?;
con.flush().await?;
let mut data = String::new();
con.read_to_string(&mut data).await?;
let response: Response = serde_json::from_str(&data)?;
match response.state {
State::Ok => Ok(response.body),
State::Error => {
let err: String = serde_json::from_value(response.body)?;
anyhow::bail!(err)
}
}
}
pub async fn list(&self) -> Result<HashMap<String, String>> {
let response = self.command("list").await?;
Ok(serde_json::from_value(response)?)
}
pub async fn status<S: AsRef<str>>(&self, name: S) -> Result<Status> {
let response = self.command(&format!("status {}", name.as_ref())).await?;
Ok(serde_json::from_value(response)?)
}
pub async fn start<S: AsRef<str>>(&self, name: S) -> Result<()> {
self.command(&format!("start {}", name.as_ref())).await?;
Ok(())
}
pub async fn stop<S: AsRef<str>>(&self, name: S) -> Result<()> {
self.command(&format!("stop {}", name.as_ref())).await?;
Ok(())
}
pub async fn forget<S: AsRef<str>>(&self, name: S) -> Result<()> {
self.command(&format!("forget {}", name.as_ref())).await?;
Ok(())
}
pub async fn monitor<S: AsRef<str>>(&self, name: S) -> Result<()> {
self.command(&format!("monitor {}", name.as_ref())).await?;
Ok(())
}
pub async fn kill<S: AsRef<str>>(&self, name: S, sig: S) -> Result<()> {
self.command(&format!("kill {} {}", name.as_ref(), sig.as_ref()))
.await?;
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
self.command("shutdown").await?;
Ok(())
}
}
async fn start_zinit(socket_path: &str, config_dir: &str) -> Result<()> {
// Create a temporary config directory if it doesn't exist
let config_path = Path::new(config_dir);
if !config_path.exists() {
fs::create_dir_all(config_path).await?;
}
// Start zinit in the background
let mut cmd = Command::new("zinit");
cmd.arg("--socket")
.arg(socket_path)
.arg("init")
.arg("--config")
.arg(config_dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let child = cmd.spawn()?;
// Give zinit some time to start up
sleep(Duration::from_secs(1)).await;
println!("Zinit started with PID: {:?}", child.id());
Ok(())
}
async fn create_service_config(config_dir: &str, name: &str, command: &str) -> Result<()> {
let config_path = format!("{}/{}.yaml", config_dir, name);
let config_content = format!(
r#"exec: {}
oneshot: false
shutdown_timeout: 10
after: []
signal:
stop: sigterm
log: ring
env: {{}}
dir: /
"#,
command
);
fs::write(config_path, config_content).await?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
// Define paths for socket and config
let temp_dir = env::temp_dir();
let socket_path = temp_dir.join("zinit-test.sock").to_str().unwrap().to_string();
let config_dir = temp_dir.join("zinit-test-config").to_str().unwrap().to_string();
println!("Starting zinit with socket at: {}", socket_path);
println!("Using config directory: {}", config_dir);
// Start zinit in the background
start_zinit(&socket_path, &config_dir).await?;
// Wait for zinit to initialize
sleep(Duration::from_secs(2)).await;
// Create a client to communicate with zinit
let client = Client::new(&socket_path);
// Create service configurations
println!("Creating service configurations...");
// Create a find service
create_service_config(&config_dir, "find-service", "find / -name \"*.txt\" -type f").await?;
// Create a sleep service with echo
create_service_config(
&config_dir,
"sleep-service",
"sh -c 'echo Starting sleep; sleep 30; echo Finished sleep'"
).await?;
// Wait for zinit to load the configurations
sleep(Duration::from_secs(1)).await;
// Tell zinit to monitor our services
println!("Monitoring services...");
client.monitor("find-service").await?;
client.monitor("sleep-service").await?;
// List all services
println!("\nListing all services:");
let services = client.list().await?;
for (name, status) in services {
println!("Service: {} - Status: {}", name, status);
}
// Start the find service
println!("\nStarting find-service...");
client.start("find-service").await?;
// Wait a bit and check status
sleep(Duration::from_secs(2)).await;
let status = client.status("find-service").await?;
println!("find-service status: {:?}", status);
// Start the sleep service
println!("\nStarting sleep-service...");
client.start("sleep-service").await?;
// Wait a bit and check status
sleep(Duration::from_secs(2)).await;
let status = client.status("sleep-service").await?;
println!("sleep-service status: {:?}", status);
// Stop the find service
println!("\nStopping find-service...");
client.stop("find-service").await?;
// Wait a bit and check status
sleep(Duration::from_secs(2)).await;
let status = client.status("find-service").await?;
println!("find-service status after stopping: {:?}", status);
// Kill the sleep service with SIGTERM
println!("\nKilling sleep-service with SIGTERM...");
client.kill("sleep-service", "SIGTERM").await?;
// Wait a bit and check status
sleep(Duration::from_secs(2)).await;
let status = client.status("sleep-service").await?;
println!("sleep-service status after killing: {:?}", status);
// Cleanup - forget services
println!("\nForgetting services...");
if status.pid == 0 { // Only forget if it's not running
client.forget("sleep-service").await?;
}
client.forget("find-service").await?;
// Shutdown zinit
println!("\nShutting down zinit...");
client.shutdown().await?;
println!("\nTest completed successfully!");
Ok(())
}

View File

@@ -0,0 +1,57 @@
use anyhow::Result;
use std::env;
use std::path::Path;
use std::process::Stdio;
use tokio::process::Command;
use tokio::time::{sleep, Duration};
pub async fn start_zinit(socket_path: &str, config_dir: &str) -> Result<()> {
// Create a temporary config directory if it doesn't exist
let config_path = Path::new(config_dir);
if !config_path.exists() {
tokio::fs::create_dir_all(config_path).await?;
}
// Get the path to the zinit binary (use the one we just built)
let zinit_path = env::current_dir()?.join("target/debug/zinit");
println!("Using zinit binary at: {}", zinit_path.display());
// Start zinit in the background
let mut cmd = Command::new(zinit_path);
cmd.arg("--socket")
.arg(socket_path)
.arg("init")
.arg("--config")
.arg(config_dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let child = cmd.spawn()?;
// Give zinit some time to start up
sleep(Duration::from_secs(1)).await;
println!("Zinit started with PID: {:?}", child.id());
Ok(())
}
pub async fn create_service_config(config_dir: &str, name: &str, command: &str) -> Result<()> {
let config_path = format!("{}/{}.yaml", config_dir, name);
let config_content = format!(
r#"exec: {}
oneshot: false
shutdown_timeout: 10
after: []
signal:
stop: sigterm
log: ring
env: {{}}
dir: /
"#,
command
);
tokio::fs::write(config_path, config_content).await?;
Ok(())
}

View File

@@ -0,0 +1,119 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
use serde_yaml as yaml;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::fs::{self, File};
use std::path::Path;
pub type Services = HashMap<String, Service>;
pub const DEFAULT_SHUTDOWN_TIMEOUT: u64 = 10; // in seconds
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct Signal {
pub stop: String,
}
impl Default for Signal {
fn default() -> Self {
Signal {
stop: String::from("sigterm"),
}
}
}
#[derive(Default, Clone, Debug, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Log {
None,
#[default]
Ring,
Stdout,
}
fn default_shutdown_timeout_fn() -> u64 {
DEFAULT_SHUTDOWN_TIMEOUT
}
#[derive(Clone, Debug, Default, Deserialize)]
#[serde(default)]
pub struct Service {
/// command to run
pub exec: String,
/// test command (optional)
#[serde(default)]
pub test: String,
#[serde(rename = "oneshot")]
pub one_shot: bool,
#[serde(default = "default_shutdown_timeout_fn")]
pub shutdown_timeout: u64,
pub after: Vec<String>,
pub signal: Signal,
pub log: Log,
pub env: HashMap<String, String>,
pub dir: String,
}
impl Service {
pub fn validate(&self) -> Result<()> {
use nix::sys::signal::Signal;
use std::str::FromStr;
if self.exec.is_empty() {
bail!("missing exec directive");
}
Signal::from_str(&self.signal.stop.to_uppercase())?;
Ok(())
}
}
/// load loads a single file
pub fn load<T: AsRef<Path>>(t: T) -> Result<(String, Service)> {
let p = t.as_ref();
//todo: can't find a way to shorten this down.
let name = match p.file_stem() {
Some(name) => match name.to_str() {
Some(name) => name,
None => bail!("invalid file name: {}", p.to_str().unwrap()),
},
None => bail!("invalid file name: {}", p.to_str().unwrap()),
};
let file = File::open(p)?;
let service: Service = yaml::from_reader(&file)?;
service.validate()?;
Ok((String::from(name), service))
}
/// walks over a directory and load all configuration files.
/// the callback is called with any error that is encountered on loading
/// a file, the callback can decide to either ignore the file, or stop
/// the directory walking
pub fn load_dir<T: AsRef<Path>>(p: T) -> Result<Services> {
let mut services: Services = HashMap::new();
for entry in fs::read_dir(p)? {
let entry = entry?;
if !entry.file_type()?.is_file() {
continue;
}
let fp = entry.path();
if !matches!(fp.extension(), Some(ext) if ext == OsStr::new("yaml")) {
continue;
}
let (name, service) = match load(&fp) {
Ok(content) => content,
Err(err) => {
error!("failed to load config file {:?}: {}", fp, err);
continue;
}
};
services.insert(name, service);
}
Ok(services)
}

View File

@@ -0,0 +1,80 @@
use thiserror::Error;
/// Errors that can occur in the zinit module
#[derive(Error, Debug)]
pub enum ZInitError {
/// Service name is unknown
#[error("service name {name:?} unknown")]
UnknownService { name: String },
/// Service is already being monitored
#[error("service {name:?} already monitored")]
ServiceAlreadyMonitored { name: String },
/// Service is up and running
#[error("service {name:?} is up")]
ServiceIsUp { name: String },
/// Service is down and not running
#[error("service {name:?} is down")]
ServiceIsDown { name: String },
/// Zinit is shutting down
#[error("zinit is shutting down")]
ShuttingDown,
/// Invalid state transition
#[error("service state transition error: {message}")]
InvalidStateTransition { message: String },
/// Dependency error
#[error("dependency error: {message}")]
DependencyError { message: String },
/// Process error
#[error("process error: {message}")]
ProcessError { message: String },
}
impl ZInitError {
/// Create a new UnknownService error
pub fn unknown_service<S: Into<String>>(name: S) -> Self {
ZInitError::UnknownService { name: name.into() }
}
/// Create a new ServiceAlreadyMonitored error
pub fn service_already_monitored<S: Into<String>>(name: S) -> Self {
ZInitError::ServiceAlreadyMonitored { name: name.into() }
}
/// Create a new ServiceIsUp error
pub fn service_is_up<S: Into<String>>(name: S) -> Self {
ZInitError::ServiceIsUp { name: name.into() }
}
/// Create a new ServiceIsDown error
pub fn service_is_down<S: Into<String>>(name: S) -> Self {
ZInitError::ServiceIsDown { name: name.into() }
}
/// Create a new InvalidStateTransition error
pub fn invalid_state_transition<S: Into<String>>(message: S) -> Self {
ZInitError::InvalidStateTransition {
message: message.into(),
}
}
/// Create a new DependencyError error
pub fn dependency_error<S: Into<String>>(message: S) -> Self {
ZInitError::DependencyError {
message: message.into(),
}
}
/// Create a new ProcessError error
pub fn process_error<S: Into<String>>(message: S) -> Self {
ZInitError::ProcessError {
message: message.into(),
}
}
}

View File

@@ -0,0 +1,970 @@
use crate::manager::{Log, Logs, Process, ProcessManager};
use crate::zinit::config;
use crate::zinit::errors::ZInitError;
#[cfg(target_os = "linux")]
use crate::zinit::ord::{service_dependency_order, ProcessDAG, DUMMY_ROOT};
use crate::zinit::service::ZInitService;
use crate::zinit::state::{State, Target};
#[cfg(target_os = "linux")]
use crate::zinit::types::Watcher;
use crate::zinit::types::{ProcessStats, ServiceStats, ServiceTable};
use std::collections::HashMap;
use sysinfo::{self, PidExt, ProcessExt, System, SystemExt};
// Define a local extension trait for WaitStatus
trait WaitStatusExt {
fn success(&self) -> bool;
}
impl WaitStatusExt for WaitStatus {
fn success(&self) -> bool {
matches!(self, WaitStatus::Exited(_, 0))
}
}
use anyhow::Result;
#[cfg(target_os = "linux")]
use nix::sys::reboot::RebootMode;
use nix::sys::signal;
use nix::sys::wait::WaitStatus;
use nix::unistd::Pid;
use std::str::FromStr;
use std::sync::Arc;
#[cfg(target_os = "linux")]
use tokio::sync::mpsc;
use tokio::sync::{Notify, RwLock};
use tokio::time::sleep;
#[cfg(target_os = "linux")]
use tokio::time::timeout;
#[cfg(target_os = "linux")]
use tokio_stream::StreamExt;
/// Manages the lifecycle of services
#[derive(Clone)]
pub struct LifecycleManager {
/// Process manager for spawning and managing processes
pm: ProcessManager,
/// Table of services
services: Arc<RwLock<ServiceTable>>,
/// Notification for service state changes
notify: Arc<Notify>,
/// Whether the system is shutting down
shutdown: Arc<RwLock<bool>>,
/// Whether running in container mode
container: bool,
}
impl LifecycleManager {
/// Create a new lifecycle manager
pub fn new(
pm: ProcessManager,
services: Arc<RwLock<ServiceTable>>,
notify: Arc<Notify>,
shutdown: Arc<RwLock<bool>>,
container: bool,
) -> Self {
Self {
pm,
services,
notify,
shutdown,
container,
}
}
/// Get a reference to the process manager
pub fn process_manager(&self) -> &ProcessManager {
&self.pm
}
/// Check if running in container mode
pub fn is_container_mode(&self) -> bool {
self.container
}
/// Get logs from the process manager
pub async fn logs(&self, existing_logs: bool, follow: bool) -> Logs {
self.pm.stream(existing_logs, follow).await
}
/// Monitor a service
pub async fn monitor<S: Into<String>>(&self, name: S, service: config::Service) -> Result<()> {
if *self.shutdown.read().await {
return Err(ZInitError::ShuttingDown.into());
}
let name = name.into();
let mut services = self.services.write().await;
if services.contains_key(&name) {
return Err(ZInitError::service_already_monitored(name).into());
}
let service = Arc::new(RwLock::new(ZInitService::new(service, State::Unknown)));
services.insert(name.clone(), Arc::clone(&service));
let lifecycle = self.clone_lifecycle();
debug!("service '{}' monitored", name);
tokio::spawn(lifecycle.watch_service(name, service));
Ok(())
}
/// Get the status of a service
pub async fn status<S: AsRef<str>>(
&self,
name: S,
) -> Result<crate::zinit::service::ZInitStatus> {
let table = self.services.read().await;
let service = table
.get(name.as_ref())
.ok_or_else(|| ZInitError::unknown_service(name.as_ref()))?;
let service = service.read().await.status();
Ok(service)
}
/// Start a service
pub async fn start<S: AsRef<str>>(&self, name: S) -> Result<()> {
if *self.shutdown.read().await {
return Err(ZInitError::ShuttingDown.into());
}
self.set_service_state(name.as_ref(), None, Some(Target::Up))
.await;
let table = self.services.read().await;
let service = table
.get(name.as_ref())
.ok_or_else(|| ZInitError::unknown_service(name.as_ref()))?;
let lifecycle = self.clone_lifecycle();
tokio::spawn(lifecycle.watch_service(name.as_ref().into(), Arc::clone(service)));
Ok(())
}
/// Stop a service
pub async fn stop<S: AsRef<str>>(&self, name: S) -> Result<()> {
// Get service information
let table = self.services.read().await;
let service = table
.get(name.as_ref())
.ok_or_else(|| ZInitError::unknown_service(name.as_ref()))?;
let mut service = service.write().await;
service.set_target(Target::Down);
// Get the main process PID
let pid = service.pid;
if pid.as_raw() == 0 {
return Ok(());
}
// Get the signal to use
let signal = signal::Signal::from_str(&service.service.signal.stop.to_uppercase())
.map_err(|err| anyhow::anyhow!("unknown stop signal: {}", err))?;
// Release the lock before potentially long-running operations
drop(service);
drop(table);
// Get all child processes using our stats functionality
let children = self.get_child_process_stats(pid.as_raw()).await?;
// First try to stop the process group
let _ = self.pm.signal(pid, signal);
// Wait a short time for processes to terminate gracefully
sleep(std::time::Duration::from_millis(500)).await;
// Check if processes are still running and use SIGKILL if needed
self.ensure_processes_terminated(pid.as_raw(), &children)
.await?;
Ok(())
}
/// Ensure that a process and its children are terminated
async fn ensure_processes_terminated(
&self,
parent_pid: i32,
children: &[ProcessStats],
) -> Result<()> {
// Check if parent is still running
let parent_running = self.is_process_running(parent_pid).await?;
// If parent is still running, send SIGKILL
if parent_running {
debug!(
"Process {} still running after SIGTERM, sending SIGKILL",
parent_pid
);
let _ = self
.pm
.signal(Pid::from_raw(parent_pid), signal::Signal::SIGKILL);
}
// Check and kill any remaining child processes
for child in children {
if self.is_process_running(child.pid).await? {
debug!("Child process {} still running, sending SIGKILL", child.pid);
let _ = signal::kill(Pid::from_raw(child.pid), signal::Signal::SIGKILL);
}
}
// Verify all processes are gone
let mut retries = 5;
while retries > 0 {
let mut all_terminated = true;
// Check parent
if self.is_process_running(parent_pid).await? {
all_terminated = false;
}
// Check children
for child in children {
if self.is_process_running(child.pid).await? {
all_terminated = false;
break;
}
}
if all_terminated {
return Ok(());
}
// Wait before retrying
sleep(std::time::Duration::from_millis(100)).await;
retries -= 1;
}
// If we get here, some processes might still be running
warn!("Some processes may still be running after shutdown attempts");
Ok(())
}
/// Check if a process is running
async fn is_process_running(&self, pid: i32) -> Result<bool> {
// Use sysinfo to check if process exists
let mut system = System::new();
let sys_pid = sysinfo::Pid::from(pid as usize);
system.refresh_process(sys_pid);
Ok(system.process(sys_pid).is_some())
}
/// Forget a service
pub async fn forget<S: AsRef<str>>(&self, name: S) -> Result<()> {
let mut table = self.services.write().await;
let service = table
.get(name.as_ref())
.ok_or_else(|| ZInitError::unknown_service(name.as_ref()))?;
let service = service.read().await;
if service.target == Target::Up || service.pid != Pid::from_raw(0) {
return Err(ZInitError::service_is_up(name.as_ref()).into());
}
drop(service);
table.remove(name.as_ref());
Ok(())
}
/// Send a signal to a service
pub async fn kill<S: AsRef<str>>(&self, name: S, signal: signal::Signal) -> Result<()> {
let table = self.services.read().await;
let service = table
.get(name.as_ref())
.ok_or_else(|| ZInitError::unknown_service(name.as_ref()))?;
let service = service.read().await;
if service.pid == Pid::from_raw(0) {
return Err(ZInitError::service_is_down(name.as_ref()).into());
}
self.pm.signal(service.pid, signal)
}
/// List all services
pub async fn list(&self) -> Result<Vec<String>> {
let table = self.services.read().await;
Ok(table.keys().map(|k| k.into()).collect())
}
/// Get stats for a service (memory and CPU usage)
pub async fn stats<S: AsRef<str>>(&self, name: S) -> Result<ServiceStats> {
let table = self.services.read().await;
let service = table
.get(name.as_ref())
.ok_or_else(|| ZInitError::unknown_service(name.as_ref()))?;
let service = service.read().await;
if service.pid.as_raw() == 0 {
return Err(ZInitError::service_is_down(name.as_ref()).into());
}
// Get stats for the main process
let pid = service.pid.as_raw();
let (memory_usage, cpu_usage) = self.get_process_stats(pid).await?;
// Get stats for child processes
let children = self.get_child_process_stats(pid).await?;
Ok(ServiceStats {
memory_usage,
cpu_usage,
pid,
children,
})
}
/// Get memory and CPU usage for a process
async fn get_process_stats(&self, pid: i32) -> Result<(u64, f32)> {
// Create a new System instance with all information
let mut system = System::new_all();
// Convert i32 pid to sysinfo::Pid
let sys_pid = sysinfo::Pid::from(pid as usize);
// Make sure we're refreshing CPU information
system.refresh_cpu();
system.refresh_processes();
// First refresh to get initial CPU values
system.refresh_all();
// Wait longer for CPU measurement (500ms instead of 100ms)
sleep(std::time::Duration::from_millis(500)).await;
// Refresh again to get updated CPU values
system.refresh_cpu();
system.refresh_processes();
system.refresh_all();
// Get the process
if let Some(process) = system.process(sys_pid) {
// Get memory in bytes
let memory_usage = process.memory();
// Get CPU usage as percentage
let cpu_usage = process.cpu_usage();
Ok((memory_usage, cpu_usage))
} else {
// Process not found
Ok((0, 0.0))
}
}
/// Get stats for child processes
async fn get_child_process_stats(&self, parent_pid: i32) -> Result<Vec<ProcessStats>> {
// Create a new System instance with all processes information
let mut system = System::new_all();
// Make sure we're refreshing CPU information
system.refresh_cpu();
system.refresh_processes();
system.refresh_all();
// Convert i32 pid to sysinfo::Pid
let sys_pid = sysinfo::Pid::from(parent_pid as usize);
// Wait longer for CPU measurement (500ms instead of 100ms)
sleep(std::time::Duration::from_millis(500)).await;
// Refresh all system information to get updated CPU values
system.refresh_cpu();
system.refresh_processes();
system.refresh_all();
let mut children = Vec::new();
// Recursively collect all descendant PIDs
let mut descendant_pids = Vec::new();
self.collect_descendants(sys_pid, &system.processes(), &mut descendant_pids);
// Get stats for each child process
for &child_pid in &descendant_pids {
if let Some(process) = system.process(child_pid) {
children.push(ProcessStats {
pid: child_pid.as_u32() as i32,
memory_usage: process.memory(),
cpu_usage: process.cpu_usage(),
});
}
}
Ok(children)
}
/// Recursively collect all descendant PIDs of a process
fn collect_descendants(
&self,
pid: sysinfo::Pid,
procs: &HashMap<sysinfo::Pid, sysinfo::Process>,
out: &mut Vec<sysinfo::Pid>,
) {
for (&child_pid, proc) in procs.iter() {
if proc.parent() == Some(pid) {
out.push(child_pid);
self.collect_descendants(child_pid, procs, out);
}
}
}
/// Verify that all processes are terminated
async fn verify_all_processes_terminated(&self) -> Result<()> {
// Get all services
let table = self.services.read().await;
// Check each service
for (name, service) in table.iter() {
let service = service.read().await;
let pid = service.pid.as_raw();
// Skip services with no PID
if pid == 0 {
continue;
}
// Check if the main process is still running
if self.is_process_running(pid).await? {
warn!(
"Service {} (PID {}) is still running after shutdown",
name, pid
);
// Try to kill it with SIGKILL
let _ = signal::kill(Pid::from_raw(pid), signal::Signal::SIGKILL);
}
// Check for child processes
if let Ok(children) = self.get_child_process_stats(pid).await {
for child in children {
if self.is_process_running(child.pid).await? {
warn!(
"Child process {} of service {} is still running after shutdown",
child.pid, name
);
// Try to kill it with SIGKILL
let _ = signal::kill(Pid::from_raw(child.pid), signal::Signal::SIGKILL);
}
}
}
}
Ok(())
}
/// Shutdown the system
pub async fn shutdown(&self) -> Result<()> {
info!("shutting down");
// Set the shutdown flag
*self.shutdown.write().await = true;
#[cfg(target_os = "linux")]
{
// Power off using our enhanced method
let result = self.power(RebootMode::RB_POWER_OFF).await;
// Final verification before exit
self.verify_all_processes_terminated().await?;
return result;
}
#[cfg(not(target_os = "linux"))]
{
// Stop all services
let services = self.list().await?;
for service in services {
let _ = self.stop(&service).await;
}
// Verify all processes are terminated
self.verify_all_processes_terminated().await?;
if self.container {
std::process::exit(0);
} else {
info!("System shutdown not supported on this platform");
std::process::exit(0);
}
}
}
/// Reboot the system
pub async fn reboot(&self) -> Result<()> {
info!("rebooting");
// Set the shutdown flag
*self.shutdown.write().await = true;
#[cfg(target_os = "linux")]
{
// Reboot using our enhanced method
let result = self.power(RebootMode::RB_AUTOBOOT).await;
// Final verification before exit
self.verify_all_processes_terminated().await?;
return result;
}
#[cfg(not(target_os = "linux"))]
{
// Stop all services
let services = self.list().await?;
for service in services {
let _ = self.stop(&service).await;
}
// Verify all processes are terminated
self.verify_all_processes_terminated().await?;
if self.container {
std::process::exit(0);
} else {
info!("System reboot not supported on this platform");
std::process::exit(0);
}
}
}
/// Power off or reboot the system
#[cfg(target_os = "linux")]
async fn power(&self, mode: RebootMode) -> Result<()> {
*self.shutdown.write().await = true;
let mut state_channels: HashMap<String, Watcher<State>> = HashMap::new();
let mut shutdown_timeouts: HashMap<String, u64> = HashMap::new();
let table = self.services.read().await;
for (name, service) in table.iter() {
let service = service.read().await;
if service.is_active() {
info!("service '{}' is scheduled for a shutdown", name);
state_channels.insert(name.into(), service.state_watcher());
shutdown_timeouts.insert(name.into(), service.service.shutdown_timeout);
}
}
drop(table);
let dag = service_dependency_order(self.services.clone()).await;
self.kill_process_tree(dag, state_channels, shutdown_timeouts)
.await?;
// On Linux, we can use sync and reboot
nix::unistd::sync();
if self.container {
std::process::exit(0);
} else {
nix::sys::reboot::reboot(mode)?;
}
Ok(())
}
/// Kill processes in dependency order
#[cfg(target_os = "linux")]
async fn kill_process_tree(
&self,
mut dag: ProcessDAG,
mut state_channels: HashMap<String, Watcher<State>>,
mut shutdown_timeouts: HashMap<String, u64>,
) -> Result<()> {
let (tx, mut rx) = mpsc::unbounded_channel();
tx.send(DUMMY_ROOT.into())?;
let mut count = dag.count;
while let Some(name) = rx.recv().await {
debug!(
"{} has been killed (or was inactive) adding its children",
name
);
for child in dag.adj.get(&name).unwrap_or(&Vec::new()) {
let child_indegree: &mut u32 = dag.indegree.entry(child.clone()).or_default();
*child_indegree -= 1;
debug!(
"decrementing child {} indegree to {}",
child, child_indegree
);
if *child_indegree == 0 {
let watcher = state_channels.remove(child);
if watcher.is_none() {
// not an active service
tx.send(child.to_string())?;
continue;
}
let shutdown_timeout = shutdown_timeouts.remove(child);
let lifecycle = self.clone_lifecycle();
// Spawn a task to kill the service and wait for it to terminate
let kill_task = tokio::spawn(Self::kill_wait_enhanced(
lifecycle,
child.to_string(),
tx.clone(),
watcher.unwrap(),
shutdown_timeout.unwrap_or(config::DEFAULT_SHUTDOWN_TIMEOUT),
));
// Add a timeout to ensure we don't wait forever
let _ = tokio::time::timeout(
std::time::Duration::from_secs(
shutdown_timeout.unwrap_or(config::DEFAULT_SHUTDOWN_TIMEOUT) + 2,
),
kill_task,
)
.await;
}
}
count -= 1;
if count == 0 {
break;
}
}
// Final verification that all processes are gone
self.verify_all_processes_terminated().await?;
Ok(())
}
/// Enhanced version of kill_wait that ensures processes are terminated
#[cfg(target_os = "linux")]
async fn kill_wait_enhanced(
self,
name: String,
ch: mpsc::UnboundedSender<String>,
mut rx: Watcher<State>,
shutdown_timeout: u64,
) {
debug!("kill_wait {}", name);
// Try to stop the service gracefully
let stop_result = self.stop(name.clone()).await;
// Wait for the service to become inactive or timeout
let fut = timeout(
std::time::Duration::from_secs(shutdown_timeout),
async move {
while let Some(state) = rx.next().await {
if !state.is_active() {
return;
}
}
},
);
match stop_result {
Ok(_) => {
let _ = fut.await;
}
Err(e) => error!("couldn't stop service {}: {}", name.clone(), e),
}
// Verify the service is actually stopped
if let Ok(status) = self.status(&name).await {
if status.pid != Pid::from_raw(0) {
// Service is still running, try to kill it
let _ = self.kill(&name, signal::Signal::SIGKILL).await;
}
}
debug!("sending to the death channel {}", name.clone());
if let Err(e) = ch.send(name.clone()) {
error!(
"error: couldn't send the service {} to the shutdown loop: {}",
name, e
);
}
}
/// Original kill_wait for backward compatibility
#[cfg(target_os = "linux")]
async fn kill_wait(
self,
name: String,
ch: mpsc::UnboundedSender<String>,
rx: Watcher<State>,
shutdown_timeout: u64,
) {
Self::kill_wait_enhanced(self, name, ch, rx, shutdown_timeout).await
}
/// Check if a service can be scheduled
async fn can_schedule(&self, service: &config::Service) -> bool {
let mut can = true;
let table = self.services.read().await;
for dep in service.after.iter() {
can = match table.get(dep) {
Some(ps) => {
let ps = ps.read().await;
debug!(
"- service {} is {:?} oneshot: {}",
dep,
ps.get_state(),
ps.service.one_shot
);
match ps.get_state() {
State::Running if !ps.service.one_shot => true,
State::Success => true,
_ => false,
}
}
// depending on an undefined service. This still can be resolved later
// by monitoring the dependency in the future.
None => false,
};
// if state is blocked, we can break the loop
if !can {
break;
}
}
can
}
/// Set the state and/or target of a service
async fn set_service_state(&self, name: &str, state: Option<State>, target: Option<Target>) {
let table = self.services.read().await;
let service = match table.get(name) {
Some(service) => service,
None => return,
};
let mut service = service.write().await;
if let Some(state) = state {
service.force_set_state(state);
}
if let Some(target) = target {
service.set_target(target);
}
}
/// Test if a service is running correctly
async fn test_service<S: AsRef<str>>(&self, name: S, cfg: &config::Service) -> Result<bool> {
if cfg.test.is_empty() {
return Ok(true);
}
let log = match cfg.log {
config::Log::None => Log::None,
config::Log::Stdout => Log::Stdout,
config::Log::Ring => Log::Ring(format!("{}/test", name.as_ref())),
};
let test = self
.pm
.run(
Process::new(&cfg.test, &cfg.dir, Some(cfg.env.clone())),
log.clone(),
)
.await?;
let status = test.wait().await?;
Ok(status.success())
}
/// Run the test loop for a service
async fn test_loop(self, name: String, cfg: config::Service) {
loop {
let result = self.test_service(&name, &cfg).await;
match result {
Ok(result) => {
if result {
self.set_service_state(&name, Some(State::Running), None)
.await;
// release
self.notify.notify_waiters();
return;
}
// wait before we try again
sleep(std::time::Duration::from_secs(2)).await;
}
Err(_) => {
self.set_service_state(&name, Some(State::TestFailure), None)
.await;
}
}
}
}
/// Watch a service and manage its lifecycle
async fn watch_service(self, name: String, input: Arc<RwLock<ZInitService>>) {
let name = name.clone();
let mut service = input.write().await;
if service.target == Target::Down {
debug!("service '{}' target is down", name);
return;
}
if service.scheduled {
debug!("service '{}' already scheduled", name);
return;
}
service.scheduled = true;
drop(service);
loop {
let name = name.clone();
let service = input.read().await;
// early check if service is down, so we don't have to do extra checks
if service.target == Target::Down {
// we check target in loop in case service have
// been set down.
break;
}
let config = service.service.clone();
// we need to spawn this service now, but is it ready?
// are all dependent services are running?
// so we drop the table to give other services
// chance to acquire the lock and schedule themselves
drop(service);
'checks: loop {
let sig = self.notify.notified();
debug!("checking {} if it can schedule", name);
if self.can_schedule(&config).await {
debug!("service {} can schedule", name);
break 'checks;
}
self.set_service_state(&name, Some(State::Blocked), None)
.await;
// don't even care if i am lagging
// as long i am notified that some services status
// has changed
debug!("service {} is blocked, waiting release", name);
sig.await;
}
let log = match config.log {
config::Log::None => Log::None,
config::Log::Stdout => Log::Stdout,
config::Log::Ring => Log::Ring(name.clone()),
};
let mut service = input.write().await;
// we check again in case target has changed. Since we had to release the lock
// earlier to not block locking on this service (for example if a stop was called)
// while the service was waiting for dependencies.
// the lock is kept until the spawning and the update of the pid.
if service.target == Target::Down {
// we check target in loop in case service have
// been set down.
break;
}
let child = self
.pm
.run(
Process::new(&config.exec, &config.dir, Some(config.env.clone())),
log.clone(),
)
.await;
let child = match child {
Ok(child) => {
service.force_set_state(State::Spawned);
service.set_pid(child.pid);
child
}
Err(err) => {
// so, spawning failed. and nothing we can do about it
// this can be duo to a bad command or exe not found.
// set service to failure.
error!("service {} failed to start: {}", name, err);
service.force_set_state(State::Failure);
break;
}
};
if config.one_shot {
service.force_set_state(State::Running);
}
// we don't lock here because this can take forever
// to finish. so we allow other operation on the service (for example)
// status and stop operations.
drop(service);
let mut handler = None;
if !config.one_shot {
let lifecycle = self.clone_lifecycle();
handler = Some(tokio::spawn(
lifecycle.test_loop(name.clone(), config.clone()),
));
}
let result = child.wait().await;
if let Some(handler) = handler {
handler.abort();
}
let mut service = input.write().await;
service.clear_pid();
match result {
Err(err) => {
error!("failed to read service '{}' status: {}", name, err);
service.force_set_state(State::Unknown);
}
Ok(status) => {
service.force_set_state(if status.success() {
State::Success
} else {
State::Error(status)
});
}
};
drop(service);
if config.one_shot {
// we don't need to restart the service anymore
self.notify.notify_waiters();
break;
}
// we trying again in 2 seconds
sleep(std::time::Duration::from_secs(2)).await;
}
let mut service = input.write().await;
service.scheduled = false;
}
/// Clone the lifecycle manager
pub fn clone_lifecycle(&self) -> Self {
Self {
pm: self.pm.clone(),
services: Arc::clone(&self.services),
notify: Arc::clone(&self.notify),
shutdown: Arc::clone(&self.shutdown),
container: self.container,
}
}
}

View File

@@ -0,0 +1,119 @@
pub mod config;
pub mod errors;
pub mod lifecycle;
pub mod ord;
pub mod service;
pub mod state;
pub mod types;
// Re-export commonly used items
pub use service::ZInitStatus;
pub use state::State;
pub use types::{ProcessStats, ServiceStats};
use crate::manager::{Logs, ProcessManager};
use anyhow::Result;
use nix::sys::signal;
use std::sync::Arc;
use tokio::sync::{Notify, RwLock};
/// Main ZInit service manager
#[derive(Clone)]
pub struct ZInit {
/// Lifecycle manager for service management
lifecycle: lifecycle::LifecycleManager,
}
impl ZInit {
/// Create a new ZInit instance
pub fn new(cap: usize, container: bool) -> ZInit {
let pm = ProcessManager::new(cap);
let services = Arc::new(RwLock::new(types::ServiceTable::new()));
let notify = Arc::new(Notify::new());
let shutdown = Arc::new(RwLock::new(false));
let lifecycle = lifecycle::LifecycleManager::new(pm, services, notify, shutdown, container);
ZInit { lifecycle }
}
/// Start the service manager
pub fn serve(&self) {
self.lifecycle.process_manager().start();
if self.lifecycle.is_container_mode() {
let lifecycle = self.lifecycle.clone_lifecycle();
tokio::spawn(async move {
use tokio::signal::unix;
let mut term = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut int = unix::signal(unix::SignalKind::interrupt()).unwrap();
let mut hup = unix::signal(unix::SignalKind::hangup()).unwrap();
tokio::select! {
_ = term.recv() => {},
_ = int.recv() => {},
_ = hup.recv() => {},
};
debug!("shutdown signal received");
let _ = lifecycle.shutdown().await;
});
}
}
/// Get logs from the process manager
/// `existing_logs` TODO:
pub async fn logs(&self, existing_logs: bool, follow: bool) -> Logs {
self.lifecycle.logs(existing_logs, follow).await
}
/// Monitor a service
pub async fn monitor<S: Into<String>>(&self, name: S, service: config::Service) -> Result<()> {
self.lifecycle.monitor(name, service).await
}
/// Get the status of a service
pub async fn status<S: AsRef<str>>(&self, name: S) -> Result<ZInitStatus> {
self.lifecycle.status(name).await
}
/// Start a service
pub async fn start<S: AsRef<str>>(&self, name: S) -> Result<()> {
self.lifecycle.start(name).await
}
/// Stop a service
pub async fn stop<S: AsRef<str>>(&self, name: S) -> Result<()> {
self.lifecycle.stop(name).await
}
/// Forget a service
pub async fn forget<S: AsRef<str>>(&self, name: S) -> Result<()> {
self.lifecycle.forget(name).await
}
/// Send a signal to a service
pub async fn kill<S: AsRef<str>>(&self, name: S, signal: signal::Signal) -> Result<()> {
self.lifecycle.kill(name, signal).await
}
/// List all services
pub async fn list(&self) -> Result<Vec<String>> {
self.lifecycle.list().await
}
/// Shutdown the system
pub async fn shutdown(&self) -> Result<()> {
self.lifecycle.shutdown().await
}
/// Reboot the system
pub async fn reboot(&self) -> Result<()> {
self.lifecycle.reboot().await
}
/// Get stats for a service (memory and CPU usage)
pub async fn stats<S: AsRef<str>>(&self, name: S) -> Result<ServiceStats> {
self.lifecycle.stats(name).await
}
}

View File

@@ -0,0 +1,37 @@
use crate::zinit::types::ServiceTable;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub const DUMMY_ROOT: &str = "";
pub struct ProcessDAG {
pub adj: HashMap<String, Vec<String>>,
pub indegree: HashMap<String, u32>,
/// number of services including the dummy root
pub count: u32,
}
pub async fn service_dependency_order(services: Arc<RwLock<ServiceTable>>) -> ProcessDAG {
let mut children: HashMap<String, Vec<String>> = HashMap::new();
let mut indegree: HashMap<String, u32> = HashMap::new();
let table = services.read().await;
for (name, service) in table.iter() {
let service = service.read().await;
for child in service.service.after.iter() {
children.entry(name.into()).or_default().push(child.into());
*indegree.entry(child.into()).or_insert(0) += 1;
}
}
let mut heads: Vec<String> = Vec::new();
for (name, _) in table.iter() {
if *indegree.get::<str>(name).unwrap_or(&0) == 0 {
heads.push(name.into());
// add edges from the dummy root to the heads
*indegree.entry(name.into()).or_insert(0) += 1;
}
}
children.insert(DUMMY_ROOT.to_string(), heads);
ProcessDAG {
adj: children,
indegree,
count: table.len() as u32 + 1,
}
}

View File

@@ -0,0 +1,126 @@
use crate::zinit::config;
use crate::zinit::state::{State, Target};
use crate::zinit::types::Watched;
use anyhow::{Context, Result};
use nix::unistd::Pid;
/// Represents a service managed by ZInit
pub struct ZInitService {
/// Process ID of the running service
pub pid: Pid,
/// Service configuration
pub service: config::Service,
/// Target state of the service (up, down)
pub target: Target,
/// Whether the service is scheduled for execution
pub scheduled: bool,
/// Current state of the service
state: Watched<State>,
}
/// Status information for a service
pub struct ZInitStatus {
/// Process ID of the running service
pub pid: Pid,
/// Service configuration
pub service: config::Service,
/// Target state of the service (up, down)
pub target: Target,
/// Whether the service is scheduled for execution
pub scheduled: bool,
/// Current state of the service
pub state: State,
}
impl ZInitService {
/// Create a new service with the given configuration and initial state
pub fn new(service: config::Service, state: State) -> ZInitService {
ZInitService {
pid: Pid::from_raw(0),
state: Watched::new(state),
service,
target: Target::Up,
scheduled: false,
}
}
/// Get the current status of the service
pub fn status(&self) -> ZInitStatus {
ZInitStatus {
pid: self.pid,
state: self.state.get().clone(),
service: self.service.clone(),
target: self.target.clone(),
scheduled: self.scheduled,
}
}
/// Set the state of the service, validating the state transition
pub fn set_state(&mut self, state: State) -> Result<()> {
let current_state = self.state.get().clone();
let new_state = current_state
.transition_to(state)
.context("Failed to transition service state")?;
self.state.set(new_state);
Ok(())
}
/// Set the state of the service without validation
pub fn force_set_state(&mut self, state: State) {
self.state.set(state);
}
/// Set the target state of the service
pub fn set_target(&mut self, target: Target) {
self.target = target;
}
/// Get the current state of the service
pub fn get_state(&self) -> &State {
self.state.get()
}
/// Get a watcher for the service state
pub fn state_watcher(&self) -> crate::zinit::types::Watcher<State> {
self.state.watcher()
}
/// Check if the service is active (running or in progress)
pub fn is_active(&self) -> bool {
self.state.get().is_active()
}
/// Check if the service is in a terminal state (success or failure)
pub fn is_terminal(&self) -> bool {
self.state.get().is_terminal()
}
/// Set the process ID of the service
pub fn set_pid(&mut self, pid: Pid) {
self.pid = pid;
}
/// Clear the process ID of the service
pub fn clear_pid(&mut self) {
self.pid = Pid::from_raw(0);
}
/// Check if the service is running
pub fn is_running(&self) -> bool {
self.pid.as_raw() != 0 && self.state.get().is_active()
}
/// Check if the service is a one-shot service
pub fn is_one_shot(&self) -> bool {
self.service.one_shot
}
}

View File

@@ -0,0 +1,106 @@
use crate::zinit::errors::ZInitError;
use anyhow::Result;
use nix::sys::wait::WaitStatus;
/// Target state for a service
#[derive(Clone, Debug, PartialEq)]
pub enum Target {
/// Service should be running
Up,
/// Service should be stopped
Down,
}
/// Service state
#[derive(Debug, PartialEq, Clone)]
pub enum State {
/// Service is in an unknown state
Unknown,
/// Blocked means one or more dependencies hasn't been met yet. Service can stay in
/// this state as long as at least one dependency is not in either Running, or Success
Blocked,
/// Service has been started, but it didn't exit yet, or we didn't run the test command.
Spawned,
/// Service has been started, and test command passed.
Running,
/// Service has exited with success state, only one-shot can stay in this state
Success,
/// Service exited with this error, only one-shot can stay in this state
Error(WaitStatus),
/// The service test command failed, this might (or might not) be replaced
/// with an Error state later on once the service process itself exits
TestFailure,
/// Failure means the service has failed to spawn in a way that retrying
/// won't help, like command line parsing error or failed to fork
Failure,
}
impl State {
/// Validate if a transition from the current state to the new state is valid
pub fn can_transition_to(&self, new_state: &State) -> bool {
match (self, new_state) {
// From Unknown state, any transition is valid
(State::Unknown, _) => true,
// From Blocked state
(State::Blocked, State::Spawned) => true,
(State::Blocked, State::Failure) => true,
// From Spawned state
(State::Spawned, State::Running) => true,
(State::Spawned, State::TestFailure) => true,
(State::Spawned, State::Error(_)) => true,
(State::Spawned, State::Success) => true,
// From Running state
(State::Running, State::Success) => true,
(State::Running, State::Error(_)) => true,
// To Unknown or Blocked state is always valid
(_, State::Unknown) => true,
(_, State::Blocked) => true,
// Any other transition is invalid
_ => false,
}
}
/// Attempt to transition to a new state, validating the transition
pub fn transition_to(&self, new_state: State) -> Result<State, ZInitError> {
if self.can_transition_to(&new_state) {
Ok(new_state)
} else {
Err(ZInitError::invalid_state_transition(format!(
"Invalid transition from {:?} to {:?}",
self, new_state
)))
}
}
/// Check if the state is considered "active" (running or in progress)
pub fn is_active(&self) -> bool {
matches!(self, State::Running | State::Spawned)
}
/// Check if the state is considered "terminal" (success or failure)
pub fn is_terminal(&self) -> bool {
matches!(self, State::Success | State::Error(_) | State::Failure)
}
/// Check if the state is considered "successful"
pub fn is_successful(&self) -> bool {
matches!(self, State::Success | State::Running)
}
/// Check if the state is considered "failed"
pub fn is_failed(&self) -> bool {
matches!(self, State::Error(_) | State::Failure | State::TestFailure)
}
}

View File

@@ -0,0 +1,89 @@
use nix::sys::wait::WaitStatus;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::watch;
use tokio::sync::RwLock;
use tokio_stream::wrappers::WatchStream;
/// Stats information for a service
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceStats {
/// Memory usage in bytes
pub memory_usage: u64,
/// CPU usage as a percentage (0-100)
pub cpu_usage: f32,
/// Process ID of the service
pub pid: i32,
/// Child process stats if any
pub children: Vec<ProcessStats>,
}
/// Stats for an individual process
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessStats {
/// Process ID
pub pid: i32,
/// Memory usage in bytes
pub memory_usage: u64,
/// CPU usage as a percentage (0-100)
pub cpu_usage: f32,
}
/// Extension trait for WaitStatus to check if a process exited successfully
pub trait WaitStatusExt {
fn success(&self) -> bool;
}
impl WaitStatusExt for WaitStatus {
fn success(&self) -> bool {
matches!(self, WaitStatus::Exited(_, code) if *code == 0)
}
}
/// Type alias for a service table mapping service names to service instances
pub type ServiceTable = HashMap<String, Arc<RwLock<crate::zinit::service::ZInitService>>>;
/// Type alias for a watch stream
pub type Watcher<T> = WatchStream<Arc<T>>;
/// A wrapper around a value that can be watched for changes
pub struct Watched<T> {
v: Arc<T>,
tx: watch::Sender<Arc<T>>,
}
impl<T> Watched<T>
where
T: Send + Sync + 'static,
{
/// Create a new watched value
pub fn new(v: T) -> Self {
let v = Arc::new(v);
let (tx, _) = watch::channel(Arc::clone(&v));
Self { v, tx }
}
/// Set the value and notify watchers
pub fn set(&mut self, v: T) {
let v = Arc::new(v);
self.v = Arc::clone(&v);
// update the value even when there are no receivers
self.tx.send_replace(v);
}
/// Get a reference to the current value
pub fn get(&self) -> &T {
&self.v
}
/// Create a watcher for this value
pub fn watcher(&self) -> Watcher<T> {
WatchStream::new(self.tx.subscribe())
}
}