use core::str; use std::collections::HashMap; use std::sync::Arc; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use tokio::sync::{Mutex, oneshot}; use std::sync::atomic::{AtomicU64, Ordering}; use crate::cmd::Cmd; use crate::error::DBError; use crate::options; use crate::protocol::Protocol; use crate::storage_trait::StorageBackend; use crate::admin_meta; // Embeddings: config and cache use crate::embedding::{EmbeddingConfig, create_embedder, Embedder, create_image_embedder, ImageEmbedder}; use serde_json; use ureq::{Agent, AgentBuilder}; use std::time::Duration; use std::io::Read; const NO_DB_SELECTED: u64 = u64::MAX; #[derive(Clone)] pub struct Server { pub db_cache: std::sync::Arc>>>, pub option: options::DBOption, pub client_name: Option, pub selected_db: u64, // Changed from usize to u64 pub queued_cmd: Option>, pub current_permissions: Option, // In-memory registry of Tantivy search indexes for this server pub search_indexes: Arc>>>, // Per-DB Lance stores (vector DB), keyed by db_id pub lance_stores: Arc>>>, // Per-(db_id, dataset) embedder cache (text) pub embedders: Arc>>>, // Per-(db_id, dataset) image embedder cache (image) pub image_embedders: Arc>>>, // BLPOP waiter registry: per (db_index, key) FIFO of waiters pub list_waiters: Arc>>>>, pub waiter_seq: Arc, } pub struct Waiter { pub id: u64, pub side: PopSide, pub tx: oneshot::Sender<(String, String)>, // (key, element) } #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum PopSide { Left, Right, } impl Server { pub async fn new(option: options::DBOption) -> Self { Server { db_cache: Arc::new(std::sync::RwLock::new(HashMap::new())), option, client_name: None, selected_db: NO_DB_SELECTED, queued_cmd: None, current_permissions: None, search_indexes: Arc::new(std::sync::RwLock::new(HashMap::new())), lance_stores: Arc::new(std::sync::RwLock::new(HashMap::new())), embedders: Arc::new(std::sync::RwLock::new(HashMap::new())), image_embedders: Arc::new(std::sync::RwLock::new(HashMap::new())), list_waiters: Arc::new(Mutex::new(HashMap::new())), waiter_seq: Arc::new(AtomicU64::new(1)), } } // Path where search indexes are stored, namespaced per selected DB: // /search_indexes/ pub fn search_index_path(&self) -> std::path::PathBuf { let base = std::path::PathBuf::from(&self.option.dir) .join("search_indexes") .join(self.selected_db.to_string()); if !base.exists() { let _ = std::fs::create_dir_all(&base); } base } // Path where Lance datasets are stored, namespaced per selected DB: // /lance/ pub fn lance_data_path(&self) -> std::path::PathBuf { let base = std::path::PathBuf::from(&self.option.dir) .join("lance") .join(self.selected_db.to_string()); if !base.exists() { let _ = std::fs::create_dir_all(&base); } base } pub fn current_storage(&self) -> Result, DBError> { // Require explicit SELECT before any storage access if self.selected_db == NO_DB_SELECTED { return Err(DBError("No database selected. Use SELECT [KEY ] first".to_string())); } // Admin DB 0 access must be authenticated with SELECT 0 KEY if self.selected_db == 0 { if !matches!(self.current_permissions, Some(crate::rpc::Permissions::ReadWrite)) { return Err(DBError("Admin DB 0 requires SELECT 0 KEY ".to_string())); } } let mut cache = self.db_cache.write().unwrap(); if let Some(storage) = cache.get(&self.selected_db) { return Ok(storage.clone()); } // Use process-wide shared handles to avoid sled/reDB double-open lock contention. let storage = if self.selected_db == 0 { // Admin DB 0: always via singleton admin_meta::open_admin_storage( &self.option.dir, self.option.backend.clone(), &self.option.admin_secret, )? } else { // Data DBs: via global registry keyed by id admin_meta::open_data_storage( &self.option.dir, self.option.backend.clone(), &self.option.admin_secret, self.selected_db, )? }; cache.insert(self.selected_db, storage.clone()); Ok(storage) } /// Get or create the LanceStore for the currently selected DB. /// Only valid for non-zero DBs and when the backend is Lance. pub fn lance_store(&self) -> Result, DBError> { if self.selected_db == 0 { return Err(DBError("Lance not available on admin DB 0".to_string())); } // Resolve backend for selected_db let backend_opt = crate::admin_meta::get_database_backend( &self.option.dir, self.option.backend.clone(), &self.option.admin_secret, self.selected_db, ) .ok() .flatten(); if !matches!(backend_opt, Some(crate::options::BackendType::Lance)) { return Err(DBError("ERR DB backend is not Lance; LANCE.* commands are not allowed".to_string())); } // Fast path: read lock { let map = self.lance_stores.read().unwrap(); if let Some(store) = map.get(&self.selected_db) { return Ok(store.clone()); } } // Slow path: create and insert let store = Arc::new(crate::lance_store::LanceStore::new(&self.option.dir, self.selected_db)?); { let mut map = self.lance_stores.write().unwrap(); map.insert(self.selected_db, store.clone()); } Ok(store) } // ----- Embedding configuration and resolution ----- // Sidecar embedding config path: /lance//.lance.embedding.json fn dataset_embedding_config_path(&self, dataset: &str) -> std::path::PathBuf { let mut base = self.lance_data_path(); // Ensure parent dir exists if !base.exists() { let _ = std::fs::create_dir_all(&base); } base.push(format!("{}.lance.embedding.json", dataset)); base } /// Persist per-dataset embedding config as JSON sidecar. pub fn set_dataset_embedding_config(&self, dataset: &str, cfg: &EmbeddingConfig) -> Result<(), DBError> { if self.selected_db == 0 { return Err(DBError("Lance not available on admin DB 0".to_string())); } let p = self.dataset_embedding_config_path(dataset); let data = serde_json::to_vec_pretty(cfg) .map_err(|e| DBError(format!("Failed to serialize embedding config: {}", e)))?; std::fs::write(&p, data) .map_err(|e| DBError(format!("Failed to write embedding config {}: {}", p.display(), e)))?; // Invalidate embedder cache entry for this dataset { let mut map = self.embedders.write().unwrap(); map.remove(&(self.selected_db, dataset.to_string())); } { let mut map_img = self.image_embedders.write().unwrap(); map_img.remove(&(self.selected_db, dataset.to_string())); } Ok(()) } /// Load per-dataset embedding config. pub fn get_dataset_embedding_config(&self, dataset: &str) -> Result { if self.selected_db == 0 { return Err(DBError("Lance not available on admin DB 0".to_string())); } let p = self.dataset_embedding_config_path(dataset); if !p.exists() { return Err(DBError(format!( "Embedding config not set for dataset '{}'. Use LANCE.EMBEDDING CONFIG SET ... or RPC to configure.", dataset ))); } let data = std::fs::read(&p) .map_err(|e| DBError(format!("Failed to read embedding config {}: {}", p.display(), e)))?; let cfg: EmbeddingConfig = serde_json::from_slice(&data) .map_err(|e| DBError(format!("Failed to parse embedding config {}: {}", p.display(), e)))?; Ok(cfg) } /// Resolve or build an embedder for (db_id, dataset). Caches instance. pub fn get_embedder_for(&self, dataset: &str) -> Result, DBError> { if self.selected_db == 0 { return Err(DBError("Lance not available on admin DB 0".to_string())); } // Fast path { let map = self.embedders.read().unwrap(); if let Some(e) = map.get(&(self.selected_db, dataset.to_string())) { return Ok(e.clone()); } } // Load config and instantiate let cfg = self.get_dataset_embedding_config(dataset)?; let emb = create_embedder(&cfg)?; { let mut map = self.embedders.write().unwrap(); map.insert((self.selected_db, dataset.to_string()), emb.clone()); } Ok(emb) } /// Resolve or build an IMAGE embedder for (db_id, dataset). Caches instance. pub fn get_image_embedder_for(&self, dataset: &str) -> Result, DBError> { if self.selected_db == 0 { return Err(DBError("Lance not available on admin DB 0".to_string())); } // Fast path { let map = self.image_embedders.read().unwrap(); if let Some(e) = map.get(&(self.selected_db, dataset.to_string())) { return Ok(e.clone()); } } // Load config and instantiate let cfg = self.get_dataset_embedding_config(dataset)?; let emb = create_image_embedder(&cfg)?; { let mut map = self.image_embedders.write().unwrap(); map.insert((self.selected_db, dataset.to_string()), emb.clone()); } Ok(emb) } /// Download image bytes from a URI with safety checks (size, timeout, content-type, optional host allowlist). /// Env overrides: /// - HERODB_IMAGE_MAX_BYTES (u64, default 10485760) /// - HERODB_IMAGE_FETCH_TIMEOUT_SECS (u64, default 30) /// - HERODB_IMAGE_ALLOWED_HOSTS (comma-separated, optional) pub fn fetch_image_bytes_from_uri(&self, uri: &str) -> Result, DBError> { // Basic scheme validation if !(uri.starts_with("http://") || uri.starts_with("https://")) { return Err(DBError("Only http(s) URIs are supported for image fetch".into())); } // Parse host (naive) for allowlist check let host = { let after_scheme = match uri.find("://") { Some(i) => &uri[i + 3..], None => uri, }; let end = after_scheme.find('/').unwrap_or(after_scheme.len()); let host_port = &after_scheme[..end]; host_port.split('@').last().unwrap_or(host_port).split(':').next().unwrap_or(host_port).to_string() }; let max_bytes: u64 = std::env::var("HERODB_IMAGE_MAX_BYTES").ok().and_then(|s| s.parse::().ok()).unwrap_or(10 * 1024 * 1024); let timeout_secs: u64 = std::env::var("HERODB_IMAGE_FETCH_TIMEOUT_SECS").ok().and_then(|s| s.parse::().ok()).unwrap_or(30); let allowed_hosts_env = std::env::var("HERODB_IMAGE_ALLOWED_HOSTS").ok(); if let Some(allow) = allowed_hosts_env { if !allow.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).any(|h| h.eq_ignore_ascii_case(&host)) { return Err(DBError(format!("Host '{}' not allowed for image fetch (HERODB_IMAGE_ALLOWED_HOSTS)", host))); } } let agent: Agent = AgentBuilder::new() .timeout_read(Duration::from_secs(timeout_secs)) .timeout_write(Duration::from_secs(timeout_secs)) .build(); let resp = agent.get(uri).call().map_err(|e| DBError(format!("HTTP GET failed: {}", e)))?; // Validate content-type let ctype = resp.header("Content-Type").unwrap_or(""); let ctype_main = ctype.split(';').next().unwrap_or("").trim().to_ascii_lowercase(); if !ctype_main.starts_with("image/") { return Err(DBError(format!("Remote content-type '{}' is not image/*", ctype))); } // Read with cap let mut reader = resp.into_reader(); let mut buf: Vec = Vec::with_capacity(8192); let mut tmp = [0u8; 8192]; let mut total: u64 = 0; loop { let n = reader.read(&mut tmp).map_err(|e| DBError(format!("Read error: {}", e)))?; if n == 0 { break; } total += n as u64; if total > max_bytes { return Err(DBError(format!("Image exceeds max allowed bytes {}", max_bytes))); } buf.extend_from_slice(&tmp[..n]); } Ok(buf) } /// Check if current permissions allow read operations pub fn has_read_permission(&self) -> bool { // No DB selected -> no permissions if self.selected_db == NO_DB_SELECTED { return false; } // If an explicit permission is set for this connection, honor it. if let Some(perms) = self.current_permissions.as_ref() { return matches!(*perms, crate::rpc::Permissions::Read | crate::rpc::Permissions::ReadWrite); } // Fallback ONLY when no explicit permission context (e.g., JSON-RPC flows without SELECT). match crate::admin_meta::verify_access( &self.option.dir, self.option.backend.clone(), &self.option.admin_secret, self.selected_db, None, ) { Ok(Some(crate::rpc::Permissions::Read)) | Ok(Some(crate::rpc::Permissions::ReadWrite)) => true, _ => false, } } /// Check if current permissions allow write operations pub fn has_write_permission(&self) -> bool { // No DB selected -> no permissions if self.selected_db == NO_DB_SELECTED { return false; } // If an explicit permission is set for this connection, honor it. if let Some(perms) = self.current_permissions.as_ref() { return matches!(*perms, crate::rpc::Permissions::ReadWrite); } // Fallback ONLY when no explicit permission context (e.g., JSON-RPC flows without SELECT). match crate::admin_meta::verify_access( &self.option.dir, self.option.backend.clone(), &self.option.admin_secret, self.selected_db, None, ) { Ok(Some(crate::rpc::Permissions::ReadWrite)) => true, _ => false, } } // ----- BLPOP waiter helpers ----- pub async fn register_waiter(&self, db_index: u64, key: &str, side: PopSide) -> (u64, oneshot::Receiver<(String, String)>) { let id = self.waiter_seq.fetch_add(1, Ordering::Relaxed); let (tx, rx) = oneshot::channel::<(String, String)>(); let mut guard = self.list_waiters.lock().await; let per_db = guard.entry(db_index).or_insert_with(HashMap::new); let q = per_db.entry(key.to_string()).or_insert_with(Vec::new); q.push(Waiter { id, side, tx }); (id, rx) } pub async fn unregister_waiter(&self, db_index: u64, key: &str, id: u64) { let mut guard = self.list_waiters.lock().await; if let Some(per_db) = guard.get_mut(&db_index) { if let Some(q) = per_db.get_mut(key) { q.retain(|w| w.id != id); if q.is_empty() { per_db.remove(key); } } if per_db.is_empty() { guard.remove(&db_index); } } } // Called after LPUSH/RPUSH to deliver to blocked BLPOP waiters. pub async fn drain_waiters_after_push(&self, key: &str) -> Result<(), DBError> { let db_index = self.selected_db; loop { // Check if any waiter exists let maybe_waiter = { let mut guard = self.list_waiters.lock().await; if let Some(per_db) = guard.get_mut(&db_index) { if let Some(q) = per_db.get_mut(key) { if !q.is_empty() { // Pop FIFO Some(q.remove(0)) } else { None } } else { None } } else { None } }; let waiter = if let Some(w) = maybe_waiter { w } else { break }; // Pop one element depending on waiter side let elems = match waiter.side { PopSide::Left => self.current_storage()?.lpop(key, 1)?, PopSide::Right => self.current_storage()?.rpop(key, 1)?, }; if elems.is_empty() { // Nothing to deliver; re-register waiter at the front to preserve order let mut guard = self.list_waiters.lock().await; let per_db = guard.entry(db_index).or_insert_with(HashMap::new); let q = per_db.entry(key.to_string()).or_insert_with(Vec::new); q.insert(0, waiter); break; } else { let elem = elems[0].clone(); // Send to waiter; if receiver dropped, just continue let _ = waiter.tx.send((key.to_string(), elem)); // Loop to try to satisfy more waiters if more elements remain continue; } } Ok(()) } pub async fn handle( &mut self, mut stream: tokio::net::TcpStream, ) -> Result<(), DBError> { // Accumulate incoming bytes to handle partial RESP frames let mut acc = String::new(); let mut buf = vec![0u8; 8192]; loop { let n = match stream.read(&mut buf).await { Ok(0) => { println!("[handle] connection closed"); return Ok(()); } Ok(n) => n, Err(e) => { println!("[handle] read error: {:?}", e); return Err(e.into()); } }; // Append to accumulator. RESP for our usage is ASCII-safe. acc.push_str(str::from_utf8(&buf[..n])?); // Try to parse as many complete commands as are available in 'acc'. loop { let parsed = Cmd::from(&acc); let (cmd, protocol, remaining) = match parsed { Ok((cmd, protocol, remaining)) => (cmd, protocol, remaining), Err(_e) => { // Incomplete or invalid frame; assume incomplete and wait for more data. // This avoids emitting spurious protocol_error for split frames. break; } }; // Advance the accumulator to the unparsed remainder acc = remaining.to_string(); if self.option.debug { println!("\x1b[34;1mgot command: {:?}, protocol: {:?}\x1b[0m", cmd, protocol); } else { println!("got command: {:?}, protocol: {:?}", cmd, protocol); } // Check if this is a QUIT command before processing let is_quit = matches!(cmd, Cmd::Quit); let res = match cmd.run(self).await { Ok(p) => p, Err(e) => { if self.option.debug { eprintln!("[run error] {:?}", e); } Protocol::err(&format!("ERR {}", e.0)) } }; if self.option.debug { println!("\x1b[34;1mqueued cmd {:?}\x1b[0m", self.queued_cmd); println!("\x1b[32;1mgoing to send response {}\x1b[0m", res.encode()); } else { print!("queued cmd {:?}", self.queued_cmd); println!("going to send response {}", res.encode()); } _ = stream.write(res.encode().as_bytes()).await?; // If this was a QUIT command, close the connection if is_quit { println!("[handle] QUIT command received, closing connection"); return Ok(()); } // Continue parsing any further complete commands already in 'acc' if acc.is_empty() { break; } } } } }