//! Kubernetes Manager - Core functionality for namespace-scoped Kubernetes operations use crate::config::KubernetesConfig; use crate::error::{KubernetesError, KubernetesResult}; use base64::Engine; use k8s_openapi::api::apps::v1::Deployment; use k8s_openapi::api::core::v1::{ConfigMap, Namespace, Pod, Secret, Service}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use kube::{Api, Client, Config}; use regex::Regex; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::Semaphore; use tokio::time::timeout; use tokio_retry::strategy::ExponentialBackoff; use tokio_retry::Retry; /// KubernetesManager provides namespace-scoped operations for Kubernetes resources /// /// Each instance operates on a single namespace and provides methods for /// managing pods, services, deployments, and other Kubernetes resources. /// /// Includes production safety features: /// - Configurable timeouts for all operations /// - Exponential backoff retry logic for transient failures /// - Rate limiting to prevent API overload #[derive(Clone)] pub struct KubernetesManager { /// Kubernetes client client: Client, /// Target namespace for operations namespace: String, /// Configuration for production safety features config: KubernetesConfig, /// Semaphore for rate limiting API calls rate_limiter: Arc, /// Last request time for rate limiting last_request: Arc>, } impl KubernetesManager { /// Create a new KubernetesManager for the specified namespace with default configuration /// /// # Arguments /// /// * `namespace` - The Kubernetes namespace to operate on /// /// # Returns /// /// * `KubernetesResult` - The manager instance or an error /// /// # Example /// /// ```rust,no_run /// use sal_kubernetes::KubernetesManager; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// // This requires a running Kubernetes cluster /// let km = KubernetesManager::new("default").await?; /// Ok(()) /// } /// ``` pub async fn new(namespace: impl Into) -> KubernetesResult { Self::with_config(namespace, KubernetesConfig::default()).await } /// Create a new KubernetesManager with custom configuration /// /// # Arguments /// /// * `namespace` - The Kubernetes namespace to operate on /// * `config` - Configuration for production safety features /// /// # Returns /// /// * `KubernetesResult` - The manager instance or an error pub async fn with_config( namespace: impl Into, config: KubernetesConfig, ) -> KubernetesResult { let k8s_config = Config::infer() .await .map_err(|e| Self::create_user_friendly_config_error(kube::Error::InferConfig(e)))?; let client = Client::try_from(k8s_config).map_err(|e| { KubernetesError::config_error(format!("Failed to create Kubernetes client: {e}")) })?; // Validate cluster connectivity Self::validate_cluster_connectivity(&client).await?; // Create rate limiter semaphore with burst capacity let rate_limiter = Arc::new(Semaphore::new(config.rate_limit_burst as usize)); let last_request = Arc::new(tokio::sync::Mutex::new(Instant::now())); Ok(Self { client, namespace: namespace.into(), config, rate_limiter, last_request, }) } /// Create user-friendly error messages for configuration issues fn create_user_friendly_config_error(error: kube::Error) -> KubernetesError { let error_msg = error.to_string(); if error_msg.contains("No such file or directory") && error_msg.contains(".kube/config") { KubernetesError::config_error( "❌ No Kubernetes cluster found!\n\n\ Possible solutions:\n\ 1. Start a local cluster: `minikube start` or `kind create cluster`\n\ 2. Configure kubectl: `kubectl config set-cluster ...`\n\ 3. Set KUBECONFIG environment variable\n\ 4. Run from inside a Kubernetes pod\n\n\ Original error: No kubeconfig file found at ~/.kube/config", ) } else if error_msg.contains("environment variable not found") { KubernetesError::config_error( "❌ No Kubernetes cluster configuration found!\n\n\ You need either:\n\ 1. A local cluster: `minikube start` or `kind create cluster`\n\ 2. A valid kubeconfig file at ~/.kube/config\n\ 3. In-cluster configuration (when running in a pod)\n\n\ Original error: No in-cluster or kubeconfig configuration available", ) } else if error_msg.contains("connection refused") || error_msg.contains("dial tcp") { KubernetesError::config_error( "❌ Cannot connect to Kubernetes cluster!\n\n\ The cluster might be:\n\ 1. Not running: Try `minikube start` or `kind create cluster`\n\ 2. Unreachable: Check your network connection\n\ 3. Misconfigured: Verify `kubectl get nodes` works\n\n\ Original error: Connection refused", ) } else { KubernetesError::config_error(format!( "❌ Kubernetes configuration error!\n\n\ Please ensure you have:\n\ 1. A running Kubernetes cluster\n\ 2. Valid kubectl configuration\n\ 3. Proper access permissions\n\n\ Original error: {error}" )) } } /// Validate that we can connect to the Kubernetes cluster async fn validate_cluster_connectivity(client: &Client) -> KubernetesResult<()> { log::info!("🔍 Validating Kubernetes cluster connectivity..."); // Try to get server version as a connectivity test match client.apiserver_version().await { Ok(version) => { log::info!( "✅ Connected to Kubernetes cluster (version: {})", version.git_version ); Ok(()) } Err(e) => { let error_msg = e.to_string(); if error_msg.contains("connection refused") { Err(KubernetesError::config_error( "❌ Kubernetes cluster is not reachable!\n\n\ The cluster appears to be down or unreachable.\n\ Try: `kubectl get nodes` to verify connectivity.\n\n\ If using minikube: `minikube start`\n\ If using kind: `kind create cluster`", )) } else if error_msg.contains("Unauthorized") || error_msg.contains("Forbidden") { Err(KubernetesError::permission_denied( "❌ Access denied to Kubernetes cluster!\n\n\ You don't have permission to access this cluster.\n\ Check your kubeconfig and RBAC permissions.", )) } else { Err(KubernetesError::config_error(format!( "❌ Failed to connect to Kubernetes cluster!\n\n\ Error: {error_msg}\n\n\ Please verify:\n\ 1. Cluster is running: `kubectl get nodes`\n\ 2. Network connectivity\n\ 3. Authentication credentials" ))) } } } } /// Get the namespace this manager operates on pub fn namespace(&self) -> &str { &self.namespace } /// Get the Kubernetes client pub fn client(&self) -> &Client { &self.client } /// Get the configuration pub fn config(&self) -> &KubernetesConfig { &self.config } /// Execute an operation with production safety features (timeout, retry, rate limiting) async fn execute_with_safety(&self, operation: F) -> KubernetesResult where F: Fn() -> Fut + Send + Sync, Fut: std::future::Future> + Send, T: Send, { // Rate limiting self.rate_limit().await?; // Retry logic with exponential backoff let retry_strategy = ExponentialBackoff::from_millis(self.config.retry_base_delay.as_millis() as u64) .max_delay(self.config.retry_max_delay) .take(self.config.max_retries as usize); let result = Retry::spawn(retry_strategy, || async { // Apply timeout to the operation match timeout(self.config.operation_timeout, operation()).await { Ok(result) => result.map_err(|e| { // Only retry on certain types of errors match &e { KubernetesError::ApiError(kube_err) => { // Retry on transient errors if is_retryable_error(kube_err) { log::warn!("Retryable error encountered: {e}"); e } else { log::error!("Non-retryable error: {e}"); // Convert to a non-retryable error type KubernetesError::operation_error(format!("Non-retryable: {e}")) } } _ => { log::warn!("Retrying operation due to error: {e}"); e } } }), Err(_) => { let timeout_err = KubernetesError::timeout(format!( "Operation timed out after {:?}", self.config.operation_timeout )); log::error!("Operation timeout: {:?}", self.config.operation_timeout); Err(timeout_err) } } }) .await; result } /// Rate limiting implementation async fn rate_limit(&self) -> KubernetesResult<()> { // Acquire semaphore permit let _permit = self .rate_limiter .acquire() .await .map_err(|_| KubernetesError::operation_error("Rate limiter semaphore closed"))?; // Enforce minimum time between requests let mut last_request = self.last_request.lock().await; let now = Instant::now(); let min_interval = Duration::from_millis(1000 / self.config.rate_limit_rps as u64); if let Some(sleep_duration) = min_interval.checked_sub(now.duration_since(*last_request)) { tokio::time::sleep(sleep_duration).await; } *last_request = Instant::now(); Ok(()) } /// List all pods in the namespace /// /// # Returns /// /// * `KubernetesResult>` - List of pods or an error pub async fn pods_list(&self) -> KubernetesResult> { self.execute_with_safety(|| async { let pods: Api = Api::namespaced(self.client.clone(), &self.namespace); let pod_list = pods.list(&Default::default()).await?; Ok(pod_list.items) }) .await } /// List all services in the namespace /// /// # Returns /// /// * `KubernetesResult>` - List of services or an error pub async fn services_list(&self) -> KubernetesResult> { self.execute_with_safety(|| async { let services: Api = Api::namespaced(self.client.clone(), &self.namespace); let service_list = services.list(&Default::default()).await?; Ok(service_list.items) }) .await } /// List all deployments in the namespace /// /// # Returns /// /// * `KubernetesResult>` - List of deployments or an error pub async fn deployments_list(&self) -> KubernetesResult> { let deployments: Api = Api::namespaced(self.client.clone(), &self.namespace); let deployment_list = deployments.list(&Default::default()).await?; Ok(deployment_list.items) } /// List all configmaps in the namespace /// /// # Returns /// /// * `KubernetesResult>` - List of configmaps or an error pub async fn configmaps_list(&self) -> KubernetesResult> { let configmaps: Api = Api::namespaced(self.client.clone(), &self.namespace); let configmap_list = configmaps.list(&Default::default()).await?; Ok(configmap_list.items) } /// List all secrets in the namespace /// /// # Returns /// /// * `KubernetesResult>` - List of secrets or an error pub async fn secrets_list(&self) -> KubernetesResult> { let secrets: Api = Api::namespaced(self.client.clone(), &self.namespace); let secret_list = secrets.list(&Default::default()).await?; Ok(secret_list.items) } /// Create a ConfigMap /// /// # Arguments /// /// * `name` - The name of the ConfigMap /// * `data` - Key-value pairs for the ConfigMap data /// /// # Returns /// /// * `KubernetesResult` - The created ConfigMap or an error /// /// # Example /// /// ```rust,no_run /// use sal_kubernetes::KubernetesManager; /// use std::collections::HashMap; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let km = KubernetesManager::new("default").await?; /// /// let mut data = HashMap::new(); /// data.insert("config.yaml".to_string(), "key: value".to_string()); /// data.insert("app.properties".to_string(), "debug=true".to_string()); /// /// let configmap = km.configmap_create("my-config", data).await?; /// println!("Created ConfigMap: {}", configmap.metadata.name.unwrap_or_default()); /// Ok(()) /// } /// ``` pub async fn configmap_create( &self, name: &str, data: HashMap, ) -> KubernetesResult { let configmaps: Api = Api::namespaced(self.client.clone(), &self.namespace); let configmap = ConfigMap { metadata: ObjectMeta { name: Some(name.to_string()), namespace: Some(self.namespace.clone()), ..Default::default() }, data: Some(data.into_iter().collect()), ..Default::default() }; let created_configmap = configmaps.create(&Default::default(), &configmap).await?; log::info!("Created ConfigMap '{name}'"); Ok(created_configmap) } /// Create a Secret /// /// # Arguments /// /// * `name` - The name of the Secret /// * `data` - Key-value pairs for the Secret data (will be base64 encoded) /// * `secret_type` - The type of secret (defaults to "Opaque") /// /// # Returns /// /// * `KubernetesResult` - The created Secret or an error /// /// # Example /// /// ```rust,no_run /// use sal_kubernetes::KubernetesManager; /// use std::collections::HashMap; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let km = KubernetesManager::new("default").await?; /// /// let mut data = HashMap::new(); /// data.insert("username".to_string(), "admin".to_string()); /// data.insert("password".to_string(), "secret123".to_string()); /// /// let secret = km.secret_create("my-secret", data, None).await?; /// println!("Created Secret: {}", secret.metadata.name.unwrap_or_default()); /// Ok(()) /// } /// ``` pub async fn secret_create( &self, name: &str, data: HashMap, secret_type: Option<&str>, ) -> KubernetesResult { use k8s_openapi::ByteString; let secrets: Api = Api::namespaced(self.client.clone(), &self.namespace); // Convert string data to base64 encoded bytes let encoded_data: std::collections::BTreeMap = data .into_iter() .map(|(k, v)| { let encoded = base64::engine::general_purpose::STANDARD.encode(v.as_bytes()); (k, ByteString(encoded.into_bytes())) }) .collect(); let secret = Secret { metadata: ObjectMeta { name: Some(name.to_string()), namespace: Some(self.namespace.clone()), ..Default::default() }, data: Some(encoded_data), type_: Some(secret_type.unwrap_or("Opaque").to_string()), ..Default::default() }; let created_secret = secrets.create(&Default::default(), &secret).await?; log::info!("Created Secret '{name}'"); Ok(created_secret) } /// Create a namespace (idempotent operation) /// /// # Arguments /// /// * `name` - The name of the namespace to create /// /// # Returns /// /// * `KubernetesResult<()>` - Success or an error pub async fn namespace_create(&self, name: &str) -> KubernetesResult<()> { let name = name.to_string(); // Clone for move into closure self.execute_with_safety(move || { let name = name.clone(); let client = self.client.clone(); async move { let namespaces: Api = Api::all(client); // Check if namespace already exists match namespaces.get(&name).await { Ok(_) => { log::info!("Namespace '{name}' already exists"); return Ok(()); } Err(kube::Error::Api(api_err)) if api_err.code == 404 => { // Namespace doesn't exist, we'll create it } Err(e) => return Err(KubernetesError::ApiError(e)), } // Create the namespace let namespace = Namespace { metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta { name: Some(name.clone()), ..Default::default() }, ..Default::default() }; namespaces.create(&Default::default(), &namespace).await?; log::info!("Created namespace '{name}'"); Ok(()) } }) .await } /// Delete resources matching a PCRE pattern /// /// ⚠️ **WARNING**: This operation is destructive and irreversible! /// This method walks over all resources in the namespace and deletes /// those whose names match the provided regular expression pattern. /// /// # Safety /// - Always test patterns in a safe environment first /// - Use specific patterns to avoid accidental deletion of critical resources /// - Consider the impact on dependent resources before deletion /// /// # Arguments /// /// * `pattern` - PCRE pattern to match resource names against /// /// # Returns /// /// * `KubernetesResult` - Number of resources deleted or an error pub async fn delete(&self, pattern: &str) -> KubernetesResult { let regex = Regex::new(pattern)?; // Log warning about destructive operation log::warn!( "🚨 DESTRUCTIVE OPERATION: Starting bulk deletion with pattern '{}' in namespace '{}'", pattern, self.namespace ); let mut deleted_count = 0; let mut failed_deletions = Vec::new(); // Delete matching pods match self.delete_pods_matching(®ex).await { Ok(count) => deleted_count += count, Err(e) => { log::error!("Failed to delete pods matching pattern '{pattern}': {e}"); failed_deletions.push(format!("pods: {e}")); } } // Delete matching services match self.delete_services_matching(®ex).await { Ok(count) => deleted_count += count, Err(e) => { log::error!("Failed to delete services matching pattern '{pattern}': {e}"); failed_deletions.push(format!("services: {e}")); } } // Delete matching deployments match self.delete_deployments_matching(®ex).await { Ok(count) => deleted_count += count, Err(e) => { log::error!("Failed to delete deployments matching pattern '{pattern}': {e}"); failed_deletions.push(format!("deployments: {e}")); } } // Delete matching configmaps match self.delete_configmaps_matching(®ex).await { Ok(count) => deleted_count += count, Err(e) => { log::error!("Failed to delete configmaps matching pattern '{pattern}': {e}"); failed_deletions.push(format!("configmaps: {e}")); } } // Delete matching secrets match self.delete_secrets_matching(®ex).await { Ok(count) => deleted_count += count, Err(e) => { log::error!("Failed to delete secrets matching pattern '{pattern}': {e}"); failed_deletions.push(format!("secrets: {e}")); } } if !failed_deletions.is_empty() { log::error!( "Bulk deletion completed with {} successes and {} failures. Failed: [{}]", deleted_count, failed_deletions.len(), failed_deletions.join(", ") ); return Err(KubernetesError::operation_error(format!( "Partial deletion failure: {} resources deleted, {} resource types failed: {}", deleted_count, failed_deletions.len(), failed_deletions.join(", ") ))); } log::info!( "✅ Successfully deleted {} resources matching pattern '{}' in namespace '{}'", deleted_count, pattern, self.namespace ); Ok(deleted_count) } /// Delete pods matching the regex pattern async fn delete_pods_matching(&self, regex: &Regex) -> KubernetesResult { let pods: Api = Api::namespaced(self.client.clone(), &self.namespace); let pod_list = pods.list(&Default::default()).await?; let mut deleted = 0; for pod in pod_list.items { if let Some(name) = &pod.metadata.name { if regex.is_match(name) { match pods.delete(name, &Default::default()).await { Ok(_) => { log::info!("Deleted pod '{name}'"); deleted += 1; } Err(e) => { log::error!("Failed to delete pod '{name}': {e}"); } } } } } Ok(deleted) } /// Delete services matching the regex pattern async fn delete_services_matching(&self, regex: &Regex) -> KubernetesResult { let services: Api = Api::namespaced(self.client.clone(), &self.namespace); let service_list = services.list(&Default::default()).await?; let mut deleted = 0; for service in service_list.items { if let Some(name) = &service.metadata.name { if regex.is_match(name) { match services.delete(name, &Default::default()).await { Ok(_) => { log::info!("Deleted service '{name}'"); deleted += 1; } Err(e) => { log::error!("Failed to delete service '{name}': {e}"); } } } } } Ok(deleted) } /// Delete deployments matching the regex pattern async fn delete_deployments_matching(&self, regex: &Regex) -> KubernetesResult { let deployments: Api = Api::namespaced(self.client.clone(), &self.namespace); let deployment_list = deployments.list(&Default::default()).await?; let mut deleted = 0; for deployment in deployment_list.items { if let Some(name) = &deployment.metadata.name { if regex.is_match(name) { match deployments.delete(name, &Default::default()).await { Ok(_) => { log::info!("Deleted deployment '{name}'"); deleted += 1; } Err(e) => { log::error!("Failed to delete deployment '{name}': {e}"); } } } } } Ok(deleted) } /// Delete configmaps matching the regex pattern async fn delete_configmaps_matching(&self, regex: &Regex) -> KubernetesResult { let configmaps: Api = Api::namespaced(self.client.clone(), &self.namespace); let configmap_list = configmaps.list(&Default::default()).await?; let mut deleted = 0; for configmap in configmap_list.items { if let Some(name) = &configmap.metadata.name { if regex.is_match(name) { match configmaps.delete(name, &Default::default()).await { Ok(_) => { log::info!("Deleted configmap '{name}'"); deleted += 1; } Err(e) => { log::error!("Failed to delete configmap '{name}': {e}"); } } } } } Ok(deleted) } /// Delete secrets matching the regex pattern async fn delete_secrets_matching(&self, regex: &Regex) -> KubernetesResult { let secrets: Api = Api::namespaced(self.client.clone(), &self.namespace); let secret_list = secrets.list(&Default::default()).await?; let mut deleted = 0; for secret in secret_list.items { if let Some(name) = &secret.metadata.name { if regex.is_match(name) { match secrets.delete(name, &Default::default()).await { Ok(_) => { log::info!("Deleted secret '{name}'"); deleted += 1; } Err(e) => { log::error!("Failed to delete secret '{name}': {e}"); } } } } } Ok(deleted) } /// Create a simple pod with a single container /// /// # Arguments /// /// * `name` - The name of the pod /// * `image` - The container image to use /// * `labels` - Optional labels for the pod /// * `env_vars` - Optional environment variables for the container /// /// # Returns /// /// * `KubernetesResult` - The created pod or an error /// /// # Example /// /// ```rust,no_run /// use sal_kubernetes::KubernetesManager; /// use std::collections::HashMap; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let km = KubernetesManager::new("default").await?; /// /// let mut labels = HashMap::new(); /// labels.insert("app".to_string(), "my-app".to_string()); /// /// let pod = km.pod_create("my-pod", "nginx:latest", Some(labels), None).await?; /// println!("Created pod: {}", pod.metadata.name.unwrap_or_default()); /// Ok(()) /// } /// ``` pub async fn pod_create( &self, name: &str, image: &str, labels: Option>, env_vars: Option>, ) -> KubernetesResult { use k8s_openapi::api::core::v1::{Container, PodSpec}; let pods: Api = Api::namespaced(self.client.clone(), &self.namespace); let pod = Pod { metadata: ObjectMeta { name: Some(name.to_string()), namespace: Some(self.namespace.clone()), labels: labels.map(|l| l.into_iter().collect()), ..Default::default() }, spec: Some(PodSpec { containers: vec![{ let mut container = Container { name: name.to_string(), image: Some(image.to_string()), ..Default::default() }; // Add environment variables if provided if let Some(env_vars) = env_vars { use k8s_openapi::api::core::v1::EnvVar; container.env = Some( env_vars .into_iter() .map(|(key, value)| EnvVar { name: key, value: Some(value), ..Default::default() }) .collect(), ); } container }], ..Default::default() }), ..Default::default() }; let created_pod = pods.create(&Default::default(), &pod).await?; log::info!("Created pod '{name}' with image '{image}'"); Ok(created_pod) } /// Get a specific pod by name /// /// # Arguments /// /// * `name` - The name of the pod to retrieve /// /// # Returns /// /// * `KubernetesResult` - The pod or an error pub async fn pod_get(&self, name: &str) -> KubernetesResult { let pods: Api = Api::namespaced(self.client.clone(), &self.namespace); let pod = pods.get(name).await?; Ok(pod) } /// Create a simple service /// /// # Arguments /// /// * `name` - The name of the service /// * `selector` - Labels to select pods /// * `port` - The port to expose /// * `target_port` - The target port on pods (defaults to port if None) /// /// # Returns /// /// * `KubernetesResult` - The created service or an error /// /// # Example /// /// ```rust,no_run /// use sal_kubernetes::KubernetesManager; /// use std::collections::HashMap; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let km = KubernetesManager::new("default").await?; /// /// let mut selector = HashMap::new(); /// selector.insert("app".to_string(), "my-app".to_string()); /// /// let service = km.service_create("my-service", selector, 80, Some(8080)).await?; /// println!("Created service: {}", service.metadata.name.unwrap_or_default()); /// Ok(()) /// } /// ``` pub async fn service_create( &self, name: &str, selector: HashMap, port: i32, target_port: Option, ) -> KubernetesResult { use k8s_openapi::api::core::v1::{ServicePort, ServiceSpec}; use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; let services: Api = Api::namespaced(self.client.clone(), &self.namespace); let service = Service { metadata: ObjectMeta { name: Some(name.to_string()), namespace: Some(self.namespace.clone()), ..Default::default() }, spec: Some(ServiceSpec { selector: Some(selector.into_iter().collect()), ports: Some(vec![ServicePort { port, target_port: Some(IntOrString::Int(target_port.unwrap_or(port))), ..Default::default() }]), ..Default::default() }), ..Default::default() }; let created_service = services.create(&Default::default(), &service).await?; log::info!("Created service '{name}' on port {port}"); Ok(created_service) } /// Get a specific service by name /// /// # Arguments /// /// * `name` - The name of the service to retrieve /// /// # Returns /// /// * `KubernetesResult` - The service or an error pub async fn service_get(&self, name: &str) -> KubernetesResult { let services: Api = Api::namespaced(self.client.clone(), &self.namespace); let service = services.get(name).await?; Ok(service) } /// Create a simple deployment /// /// # Arguments /// /// * `name` - The name of the deployment /// * `image` - The container image to use /// * `replicas` - Number of replicas to create /// * `labels` - Optional labels for the deployment and pods /// /// # Returns /// /// * `KubernetesResult` - The created deployment or an error /// /// # Example /// /// ```rust,no_run /// use sal_kubernetes::KubernetesManager; /// use std::collections::HashMap; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let km = KubernetesManager::new("default").await?; /// /// let mut labels = HashMap::new(); /// labels.insert("app".to_string(), "my-app".to_string()); /// /// let deployment = km.deployment_create("my-deployment", "nginx:latest", 3, Some(labels), None).await?; /// println!("Created deployment: {}", deployment.metadata.name.unwrap_or_default()); /// Ok(()) /// } /// ``` pub async fn deployment_create( &self, name: &str, image: &str, replicas: i32, labels: Option>, env_vars: Option>, ) -> KubernetesResult { use k8s_openapi::api::apps::v1::DeploymentSpec; use k8s_openapi::api::core::v1::{Container, PodSpec, PodTemplateSpec}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; let deployments: Api = Api::namespaced(self.client.clone(), &self.namespace); let labels_btree = labels .as_ref() .map(|l| l.iter().map(|(k, v)| (k.clone(), v.clone())).collect()); let selector_labels = labels.clone().unwrap_or_else(|| { let mut default_labels = HashMap::new(); default_labels.insert("app".to_string(), name.to_string()); default_labels }); let deployment = Deployment { metadata: ObjectMeta { name: Some(name.to_string()), namespace: Some(self.namespace.clone()), labels: labels_btree.clone(), ..Default::default() }, spec: Some(DeploymentSpec { replicas: Some(replicas), selector: LabelSelector { match_labels: Some(selector_labels.clone().into_iter().collect()), ..Default::default() }, template: PodTemplateSpec { metadata: Some(ObjectMeta { labels: Some(selector_labels.into_iter().collect()), ..Default::default() }), spec: Some(PodSpec { containers: vec![{ let mut container = Container { name: name.to_string(), image: Some(image.to_string()), ..Default::default() }; // Add environment variables if provided if let Some(env_vars) = env_vars { use k8s_openapi::api::core::v1::EnvVar; container.env = Some( env_vars .into_iter() .map(|(key, value)| EnvVar { name: key, value: Some(value), ..Default::default() }) .collect(), ); } container }], ..Default::default() }), }, ..Default::default() }), ..Default::default() }; let created_deployment = deployments.create(&Default::default(), &deployment).await?; log::info!("Created deployment '{name}' with {replicas} replicas using image '{image}'"); Ok(created_deployment) } /// Get a specific deployment by name /// /// # Arguments /// /// * `name` - The name of the deployment to retrieve /// /// # Returns /// /// * `KubernetesResult` - The deployment or an error pub async fn deployment_get(&self, name: &str) -> KubernetesResult { let deployments: Api = Api::namespaced(self.client.clone(), &self.namespace); let deployment = deployments.get(name).await?; Ok(deployment) } /// Delete a specific pod by name /// /// # Arguments /// /// * `name` - The name of the pod to delete /// /// # Returns /// /// * `KubernetesResult<()>` - Success or an error pub async fn pod_delete(&self, name: &str) -> KubernetesResult<()> { let pods: Api = Api::namespaced(self.client.clone(), &self.namespace); pods.delete(name, &Default::default()).await?; log::info!("Deleted pod '{name}'"); Ok(()) } /// Delete a specific service by name /// /// # Arguments /// /// * `name` - The name of the service to delete /// /// # Returns /// /// * `KubernetesResult<()>` - Success or an error pub async fn service_delete(&self, name: &str) -> KubernetesResult<()> { let services: Api = Api::namespaced(self.client.clone(), &self.namespace); services.delete(name, &Default::default()).await?; log::info!("Deleted service '{name}'"); Ok(()) } /// Delete a specific deployment by name /// /// # Arguments /// /// * `name` - The name of the deployment to delete /// /// # Returns /// /// * `KubernetesResult<()>` - Success or an error pub async fn deployment_delete(&self, name: &str) -> KubernetesResult<()> { let deployments: Api = Api::namespaced(self.client.clone(), &self.namespace); deployments.delete(name, &Default::default()).await?; log::info!("Deleted deployment '{name}'"); Ok(()) } /// Delete a specific ConfigMap by name /// /// # Arguments /// /// * `name` - The name of the ConfigMap to delete /// /// # Returns /// /// * `KubernetesResult<()>` - Success or an error pub async fn configmap_delete(&self, name: &str) -> KubernetesResult<()> { let configmaps: Api = Api::namespaced(self.client.clone(), &self.namespace); configmaps.delete(name, &Default::default()).await?; log::info!("Deleted ConfigMap '{name}'"); Ok(()) } /// Delete a specific Secret by name /// /// # Arguments /// /// * `name` - The name of the Secret to delete /// /// # Returns /// /// * `KubernetesResult<()>` - Success or an error pub async fn secret_delete(&self, name: &str) -> KubernetesResult<()> { let secrets: Api = Api::namespaced(self.client.clone(), &self.namespace); secrets.delete(name, &Default::default()).await?; log::info!("Deleted Secret '{name}'"); Ok(()) } /// Get resource counts for the namespace /// /// # Returns /// /// * `KubernetesResult>` - Resource counts by type pub async fn resource_counts(&self) -> KubernetesResult> { let mut counts = HashMap::new(); // Count pods let pods = self.pods_list().await?; counts.insert("pods".to_string(), pods.len()); // Count services let services = self.services_list().await?; counts.insert("services".to_string(), services.len()); // Count deployments let deployments = self.deployments_list().await?; counts.insert("deployments".to_string(), deployments.len()); // Count configmaps let configmaps = self.configmaps_list().await?; counts.insert("configmaps".to_string(), configmaps.len()); // Count secrets let secrets = self.secrets_list().await?; counts.insert("secrets".to_string(), secrets.len()); Ok(counts) } /// Check if a namespace exists /// /// # Arguments /// /// * `name` - The name of the namespace to check /// /// # Returns /// /// * `KubernetesResult` - True if namespace exists, false otherwise pub async fn namespace_exists(&self, name: &str) -> KubernetesResult { let namespaces: Api = Api::all(self.client.clone()); match namespaces.get(name).await { Ok(_) => Ok(true), Err(kube::Error::Api(api_err)) if api_err.code == 404 => Ok(false), Err(e) => Err(KubernetesError::ApiError(e)), } } /// List all namespaces (cluster-wide operation) /// /// # Returns /// /// * `KubernetesResult>` - List of all namespaces pub async fn namespaces_list(&self) -> KubernetesResult> { let namespaces: Api = Api::all(self.client.clone()); let namespace_list = namespaces.list(&Default::default()).await?; Ok(namespace_list.items) } /// Delete a namespace (cluster-wide operation) /// /// ⚠️ **WARNING**: This operation is destructive and will delete all resources in the namespace! /// /// # Arguments /// /// * `name` - The name of the namespace to delete /// /// # Returns /// /// * `KubernetesResult<()>` - Success or an error /// /// # Example /// /// ```rust,no_run /// use sal_kubernetes::KubernetesManager; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let km = KubernetesManager::new("default").await?; /// /// // ⚠️ This will delete the entire namespace and all its resources! /// km.namespace_delete("test-namespace").await?; /// Ok(()) /// } /// ``` pub async fn namespace_delete(&self, name: &str) -> KubernetesResult<()> { let namespaces: Api = Api::all(self.client.clone()); // Log warning about destructive operation log::warn!("🚨 DESTRUCTIVE OPERATION: Deleting namespace '{name}' and ALL its resources!"); namespaces.delete(name, &Default::default()).await?; log::info!("Deleted namespace '{name}'"); Ok(()) } /// Deploy a complete application with deployment and service /// /// This convenience method creates both a deployment and a service for an application, /// making it easy to deploy complete containerized applications with a single call. /// /// # Arguments /// /// * `name` - The name for both deployment and service /// * `image` - The container image to deploy /// * `replicas` - Number of replicas to create /// * `port` - The port the application listens on /// * `labels` - Optional labels for the resources /// /// # Returns /// /// * `KubernetesResult<()>` - Success or an error /// /// # Example /// /// ```rust,no_run /// use sal_kubernetes::KubernetesManager; /// use std::collections::HashMap; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let km = KubernetesManager::new("default").await?; /// /// let mut labels = HashMap::new(); /// labels.insert("app".to_string(), "my-app".to_string()); /// /// km.deploy_application("my-app", "node:18", 3, 3000, Some(labels), None).await?; /// Ok(()) /// } /// ``` pub async fn deploy_application( &self, name: &str, image: &str, replicas: i32, port: i32, labels: Option>, env_vars: Option>, ) -> KubernetesResult<()> { log::info!("Deploying application '{name}' with image '{image}'"); // Create deployment with environment variables self.deployment_create(name, image, replicas, labels.clone(), env_vars) .await?; // Create service selector - use app=name if no labels provided let selector = if let Some(ref labels) = labels { labels.clone() } else { let mut default_selector = HashMap::new(); default_selector.insert("app".to_string(), name.to_string()); default_selector }; // Create service self.service_create(name, selector, port, Some(port)) .await?; log::info!("Successfully deployed application '{name}'"); Ok(()) } } /// Determine if a Kubernetes API error is retryable pub fn is_retryable_error(error: &kube::Error) -> bool { match error { // Network-related errors are typically retryable kube::Error::HttpError(_) => true, // API errors - check status codes kube::Error::Api(api_error) => { match api_error.code { // Temporary server errors 500..=599 => true, // Rate limiting 429 => true, // Conflict (might resolve on retry) 409 => true, // Client errors are generally not retryable 400..=499 => false, // Other codes - be conservative and retry _ => true, } } // Auth errors are not retryable kube::Error::Auth(_) => false, // Discovery errors might be temporary kube::Error::Discovery(_) => true, // Other errors - be conservative and retry _ => true, } }