use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::RwLock; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use crate::server::Server; use crate::options::DBOption; use crate::admin_meta; use crate::embedding::EmbeddingConfig; use base64::{engine::general_purpose, Engine as _}; /// Database backend types #[derive(Debug, Clone, Serialize, Deserialize)] pub enum BackendType { Redb, Sled, Tantivy, // Full-text search backend (no KV storage) Lance, // Vector search backend (no KV storage) // Future: InMemory, Custom(String) } /// Database configuration #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DatabaseConfig { pub name: Option, pub storage_path: Option, pub max_size: Option, pub redis_version: Option, } /// Database information returned by metadata queries #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DatabaseInfo { pub id: u64, pub name: Option, pub backend: BackendType, pub encrypted: bool, pub redis_version: Option, pub storage_path: Option, pub size_on_disk: Option, pub key_count: Option, pub created_at: u64, pub last_access: Option, } /// Access permissions for database keys #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum Permissions { Read, ReadWrite, } /// Access key information returned by RPC #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AccessKeyInfo { pub hash: String, pub permissions: Permissions, pub created_at: u64, } /// Hash a plaintext key using SHA-256 pub fn hash_key(key: &str) -> String { let mut hasher = Sha256::new(); hasher.update(key.as_bytes()); format!("{:x}", hasher.finalize()) } /// RPC trait for HeroDB management #[rpc(server, client, namespace = "herodb")] pub trait Rpc { /// Create a new database with specified configuration #[method(name = "createDatabase")] async fn create_database( &self, backend: BackendType, config: DatabaseConfig, encryption_key: Option, ) -> RpcResult; /// Set encryption for an existing database (write-only key) #[method(name = "setEncryption")] async fn set_encryption(&self, db_id: u64, encryption_key: String) -> RpcResult; /// List all managed databases #[method(name = "listDatabases")] async fn list_databases(&self) -> RpcResult>; /// Get detailed information about a specific database #[method(name = "getDatabaseInfo")] async fn get_database_info(&self, db_id: u64) -> RpcResult; /// Delete a database #[method(name = "deleteDatabase")] async fn delete_database(&self, db_id: u64) -> RpcResult; /// Get server statistics #[method(name = "getServerStats")] async fn get_server_stats(&self) -> RpcResult>; /// Add an access key to a database #[method(name = "addAccessKey")] async fn add_access_key(&self, db_id: u64, key: String, permissions: String) -> RpcResult; /// Delete an access key from a database #[method(name = "deleteAccessKey")] async fn delete_access_key(&self, db_id: u64, key_hash: String) -> RpcResult; /// List all access keys for a database #[method(name = "listAccessKeys")] async fn list_access_keys(&self, db_id: u64) -> RpcResult>; /// Set database public/private status #[method(name = "setDatabasePublic")] async fn set_database_public(&self, db_id: u64, public: bool) -> RpcResult; // ----- Full-text (Tantivy) minimal RPC endpoints ----- /// Create a new FT index in a Tantivy-backed DB #[method(name = "ftCreate")] async fn ft_create( &self, db_id: u64, index_name: String, schema: Vec<(String, String, Vec)>, ) -> RpcResult; /// Add or replace a document in an FT index #[method(name = "ftAdd")] async fn ft_add( &self, db_id: u64, index_name: String, doc_id: String, score: f64, fields: HashMap, ) -> RpcResult; /// Search an FT index #[method(name = "ftSearch")] async fn ft_search( &self, db_id: u64, index_name: String, query: String, filters: Option>, limit: Option, offset: Option, return_fields: Option>, ) -> RpcResult; /// Delete a document by id from an FT index #[method(name = "ftDel")] async fn ft_del(&self, db_id: u64, index_name: String, doc_id: String) -> RpcResult; /// Get FT index info #[method(name = "ftInfo")] async fn ft_info(&self, db_id: u64, index_name: String) -> RpcResult; /// Drop an FT index #[method(name = "ftDrop")] async fn ft_drop(&self, db_id: u64, index_name: String) -> RpcResult; // ----- LanceDB (Vector + Text) RPC endpoints ----- /// Create a new Lance dataset in a Lance-backed DB #[method(name = "lanceCreate")] async fn lance_create( &self, db_id: u64, name: String, dim: usize, ) -> RpcResult; /// Store a vector (with id and metadata) into a Lance dataset (deprecated; returns error) #[method(name = "lanceStore")] async fn lance_store( &self, db_id: u64, name: String, id: String, vector: Vec, meta: Option>, ) -> RpcResult; /// Search a Lance dataset with a query vector (deprecated; returns error) #[method(name = "lanceSearch")] async fn lance_search( &self, db_id: u64, name: String, vector: Vec, k: usize, filter: Option, return_fields: Option>, ) -> RpcResult; /// Create an ANN index on a Lance dataset #[method(name = "lanceCreateIndex")] async fn lance_create_index( &self, db_id: u64, name: String, index_type: String, params: Option>, ) -> RpcResult; /// List Lance datasets for a DB #[method(name = "lanceList")] async fn lance_list( &self, db_id: u64, ) -> RpcResult>; /// Get info for a Lance dataset #[method(name = "lanceInfo")] async fn lance_info( &self, db_id: u64, name: String, ) -> RpcResult; /// Delete a record by id from a Lance dataset #[method(name = "lanceDel")] async fn lance_del( &self, db_id: u64, name: String, id: String, ) -> RpcResult; /// Drop a Lance dataset #[method(name = "lanceDrop")] async fn lance_drop( &self, db_id: u64, name: String, ) -> RpcResult; // New: Text-first endpoints (no user-provided vectors) /// Set per-dataset embedding configuration #[method(name = "lanceSetEmbeddingConfig")] async fn lance_set_embedding_config( &self, db_id: u64, name: String, config: EmbeddingConfig, ) -> RpcResult; /// Get per-dataset embedding configuration #[method(name = "lanceGetEmbeddingConfig")] async fn lance_get_embedding_config( &self, db_id: u64, name: String, ) -> RpcResult; /// Store text; server will embed and store vector+text+meta #[method(name = "lanceStoreText")] async fn lance_store_text( &self, db_id: u64, name: String, id: String, text: String, meta: Option>, ) -> RpcResult; /// Search using a text query; server will embed then search #[method(name = "lanceSearchText")] async fn lance_search_text( &self, db_id: u64, name: String, text: String, k: usize, filter: Option, return_fields: Option>, ) -> RpcResult; // ----- Image-first endpoints (no user-provided vectors) ----- /// Store an image; exactly one of uri or bytes_b64 must be provided. #[method(name = "lanceStoreImage")] async fn lance_store_image( &self, db_id: u64, name: String, id: String, uri: Option, bytes_b64: Option, meta: Option>, ) -> RpcResult; /// Search using an image query; exactly one of uri or bytes_b64 must be provided. #[method(name = "lanceSearchImage")] async fn lance_search_image( &self, db_id: u64, name: String, k: usize, uri: Option, bytes_b64: Option, filter: Option, return_fields: Option>, ) -> RpcResult; } /// RPC Server implementation pub struct RpcServerImpl { /// Base directory for database files base_dir: PathBuf, /// Managed database servers servers: Arc>>>, /// Default backend type backend: crate::options::BackendType, /// Admin secret used to encrypt DB 0 and authorize admin access admin_secret: String, } impl RpcServerImpl { /// Create a new RPC server instance pub fn new(base_dir: PathBuf, backend: crate::options::BackendType, admin_secret: String) -> Self { Self { base_dir, servers: Arc::new(RwLock::new(HashMap::new())), backend, admin_secret, } } /// Get or create a server instance for the given database ID async fn get_or_create_server(&self, db_id: u64) -> Result, jsonrpsee::types::ErrorObjectOwned> { // Check if server already exists { let servers = self.servers.read().await; if let Some(server) = servers.get(&db_id) { return Ok(server.clone()); } } // Validate existence via admin DB 0 (metadata), not filesystem presence let exists = admin_meta::db_exists(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; if !exists { return Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, format!("Database {} not found", db_id), None::<()> )); } // Resolve effective backend for this db from admin meta or filesystem; fallback to default let meta_backend = admin_meta::get_database_backend(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id) .ok() .flatten(); let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_id)); let sniffed_backend = if db_path.exists() { if db_path.is_file() { Some(crate::options::BackendType::Redb) } else if db_path.is_dir() { Some(crate::options::BackendType::Sled) } else { None } } else { None }; let effective_backend = meta_backend.clone().or(sniffed_backend).unwrap_or(self.backend.clone()); if effective_backend != self.backend { eprintln!( "notice: get_or_create_server: db {} backend resolved to {:?} (server default {:?})", db_id, effective_backend, self.backend ); } // If we had to sniff (no meta), persist the resolved backend if meta_backend.is_none() { let _ = admin_meta::set_database_backend(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, effective_backend.clone()); } // Create server instance with resolved backend let is_search_only = matches!( effective_backend, crate::options::BackendType::Tantivy | crate::options::BackendType::Lance ); let db_option = DBOption { dir: self.base_dir.clone(), port: 0, // Not used for RPC-managed databases debug: false, encryption_key: None, encrypt: false, backend: effective_backend.clone(), admin_secret: self.admin_secret.clone(), }; let mut server = Server::new(db_option).await; // Set the selected database to the db_id server.selected_db = db_id; // Lazily open/create physical storage according to admin meta (per-db encryption) // Skip for search-only backends (Tantivy/Lance): no KV storage to open if !is_search_only { let _ = server.current_storage(); } // Store the server let mut servers = self.servers.write().await; servers.insert(db_id, Arc::new(server.clone())); Ok(Arc::new(server)) } /// Discover existing database IDs from admin DB 0 async fn discover_databases(&self) -> Vec { admin_meta::list_dbs(&self.base_dir, self.backend.clone(), &self.admin_secret) .unwrap_or_default() } /// Build database file path for given server/db_id fn db_file_path(&self, server: &Server, db_id: u64) -> std::path::PathBuf { std::path::PathBuf::from(&server.option.dir).join(format!("{}.db", db_id)) } /// Recursively compute size on disk for the database path fn compute_size_on_disk(&self, path: &std::path::Path) -> Option { fn dir_size(p: &std::path::Path) -> u64 { if p.is_file() { std::fs::metadata(p).map(|m| m.len()).unwrap_or(0) } else if p.is_dir() { let mut total = 0u64; if let Ok(read) = std::fs::read_dir(p) { for entry in read.flatten() { total += dir_size(&entry.path()); } } total } else { 0 } } Some(dir_size(path)) } /// Extract created and last access times (secs) from a path, with fallbacks fn get_file_times_secs(path: &std::path::Path) -> (u64, Option) { let now = std::time::SystemTime::now(); let created = std::fs::metadata(path) .and_then(|m| m.created().or_else(|_| m.modified())) .unwrap_or(now) .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(); let last_access = std::fs::metadata(path) .and_then(|m| m.accessed()) .ok() .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok().map(|d| d.as_secs())); (created, last_access) } /// Compose a DatabaseInfo by probing storage and filesystem, with admin meta for access key count async fn build_database_info(&self, db_id: u64, server: &Server) -> DatabaseInfo { // Probe storage to determine encryption state let storage = server.current_storage().ok(); let encrypted = storage.as_ref().map(|s| s.is_encrypted()).unwrap_or(server.option.encrypt); // Get actual key count from storage let key_count = storage.as_ref() .and_then(|s| s.dbsize().ok()) .map(|count| count as u64); // Get database name from admin meta let name = admin_meta::get_database_name(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id) .ok() .flatten(); // Compute size on disk and timestamps from the DB file path let db_path = self.db_file_path(server, db_id); let size_on_disk = self.compute_size_on_disk(&db_path); let (created_at, last_access) = Self::get_file_times_secs(&db_path); let backend = match server.option.backend { crate::options::BackendType::Redb => BackendType::Redb, crate::options::BackendType::Sled => BackendType::Sled, crate::options::BackendType::Tantivy => BackendType::Tantivy, crate::options::BackendType::Lance => BackendType::Lance, }; DatabaseInfo { id: db_id, name, backend, encrypted, redis_version: Some("7.0".to_string()), storage_path: Some(server.option.dir.display().to_string()), size_on_disk, key_count, created_at, last_access, } } } #[jsonrpsee::core::async_trait] impl RpcServer for RpcServerImpl { async fn create_database( &self, backend: BackendType, config: DatabaseConfig, encryption_key: Option, ) -> RpcResult { // Allocate new ID via admin DB 0 let db_id = admin_meta::allocate_next_id(&self.base_dir, self.backend.clone(), &self.admin_secret) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; // Persist per-db encryption key in admin DB 0 if provided if let Some(ref key) = encryption_key { admin_meta::set_enc_key(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, key) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; } // Persist database name if provided if let Some(ref name) = config.name { admin_meta::set_database_name(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, name) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; } // Ensure base dir exists if let Err(e) = std::fs::create_dir_all(&self.base_dir) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, format!("Failed to ensure base dir: {}", e), None::<()>)); } // Map RPC backend to options backend and persist it in admin meta for this db id let opt_backend = match backend { BackendType::Redb => crate::options::BackendType::Redb, BackendType::Sled => crate::options::BackendType::Sled, BackendType::Tantivy => crate::options::BackendType::Tantivy, BackendType::Lance => crate::options::BackendType::Lance, }; admin_meta::set_database_backend(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, opt_backend.clone()) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; // Create server instance using base_dir, chosen backend and admin secret let is_search_only_new = matches!( opt_backend, crate::options::BackendType::Tantivy | crate::options::BackendType::Lance ); let option = DBOption { dir: self.base_dir.clone(), port: 0, // Not used for RPC-managed databases debug: false, encryption_key: None, // per-db key is stored in admin DB 0 encrypt: false, // encryption decided per-db at open time backend: opt_backend.clone(), admin_secret: self.admin_secret.clone(), }; let mut server = Server::new(option).await; server.selected_db = db_id; // Initialize storage to create physical .db with proper encryption from admin meta // Skip for search-only backends (Tantivy/Lance): no KV storage to initialize if !is_search_only_new { let _ = server.current_storage(); } // Store the server in cache let mut servers = self.servers.write().await; servers.insert(db_id, Arc::new(server)); Ok(db_id) } async fn set_encryption(&self, _db_id: u64, _encryption_key: String) -> RpcResult { // For now, return false as encryption can only be set during creation let _servers = self.servers.read().await; // TODO: Implement encryption setting for existing databases Ok(false) } async fn list_databases(&self) -> RpcResult> { let db_ids = self.discover_databases().await; let mut result = Vec::new(); for db_id in db_ids { if let Ok(server) = self.get_or_create_server(db_id).await { // Build accurate info from storage/meta/fs let info = self.build_database_info(db_id, &server).await; result.push(info); } } Ok(result) } async fn get_database_info(&self, db_id: u64) -> RpcResult { let server = self.get_or_create_server(db_id).await?; // Build accurate info from storage/meta/fs let info = self.build_database_info(db_id, &server).await; Ok(info) } async fn delete_database(&self, db_id: u64) -> RpcResult { let mut servers = self.servers.write().await; if let Some(_server) = servers.remove(&db_id) { // Clean up database files let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_id)); if db_path.exists() { if db_path.is_dir() { std::fs::remove_dir_all(&db_path).ok(); } else { std::fs::remove_file(&db_path).ok(); } } Ok(true) } else { Ok(false) } } async fn get_server_stats(&self) -> RpcResult> { let db_ids = self.discover_databases().await; let mut stats = HashMap::new(); stats.insert("total_databases".to_string(), serde_json::json!(db_ids.len())); stats.insert("uptime".to_string(), serde_json::json!( std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs() )); Ok(stats) } // ----- Full-text (Tantivy) minimal RPC endpoints ----- async fn ft_create( &self, db_id: u64, index_name: String, schema: Vec<(String, String, Vec)>, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "FT not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Tantivy) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>)); } let proto = crate::search_cmd::ft_create_cmd(&*server, index_name, schema) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; match proto { crate::protocol::Protocol::Error(msg) => { Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) } _ => Ok(true), } } async fn ft_add( &self, db_id: u64, index_name: String, doc_id: String, score: f64, fields: HashMap, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "FT not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Tantivy) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>)); } let proto = crate::search_cmd::ft_add_cmd(&*server, index_name, doc_id, score, fields) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; match proto { crate::protocol::Protocol::Error(msg) => { Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) } _ => Ok(true), } } async fn ft_search( &self, db_id: u64, index_name: String, query: String, filters: Option>, limit: Option, offset: Option, return_fields: Option>, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "FT not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Tantivy) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>)); } let proto = crate::search_cmd::ft_search_cmd( &*server, index_name, query, filters.unwrap_or_default(), limit, offset, return_fields, ) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; match proto { crate::protocol::Protocol::Error(msg) => { Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) } _ => Ok(serde_json::json!({ "resp": proto.encode() })), } } async fn ft_del(&self, db_id: u64, index_name: String, doc_id: String) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "FT not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Tantivy) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>)); } let proto = crate::search_cmd::ft_del_cmd(&*server, index_name, doc_id) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; match proto { crate::protocol::Protocol::Error(msg) => { Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) } crate::protocol::Protocol::SimpleString(s) => Ok(s == "1"), _ => Ok(false), } } async fn ft_info(&self, db_id: u64, index_name: String) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "FT not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Tantivy) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>)); } let proto = crate::search_cmd::ft_info_cmd(&*server, index_name) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; match proto { crate::protocol::Protocol::Error(msg) => { Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) } _ => Ok(serde_json::json!({ "resp": proto.encode() })), } } async fn ft_drop(&self, db_id: u64, index_name: String) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "FT not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Tantivy) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>)); } let proto = crate::search_cmd::ft_drop_cmd(&*server, index_name) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; match proto { crate::protocol::Protocol::Error(msg) => { Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) } crate::protocol::Protocol::SimpleString(s) => Ok(s.eq_ignore_ascii_case("OK")), _ => Ok(false), } } async fn add_access_key(&self, db_id: u64, key: String, permissions: String) -> RpcResult { let perms = match permissions.to_lowercase().as_str() { "read" => Permissions::Read, "readwrite" => Permissions::ReadWrite, _ => return Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, "Invalid permissions: use 'read' or 'readwrite'", None::<()> )), }; admin_meta::add_access_key(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, &key, perms) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(true) } async fn delete_access_key(&self, db_id: u64, key_hash: String) -> RpcResult { let ok = admin_meta::delete_access_key(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, &key_hash) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(ok) } async fn list_access_keys(&self, db_id: u64) -> RpcResult> { let pairs = admin_meta::list_access_keys(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; let keys: Vec = pairs.into_iter().map(|(hash, perm, ts)| AccessKeyInfo { hash, permissions: perm, created_at: ts, }).collect(); Ok(keys) } async fn set_database_public(&self, db_id: u64, public: bool) -> RpcResult { admin_meta::set_database_public(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, public) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(true) } // ----- LanceDB (Vector) RPC endpoints ----- async fn lance_create( &self, db_id: u64, name: String, dim: usize, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_write_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>)); } server.lance_store() .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? .create_dataset(&name, dim).await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(true) } async fn lance_store( &self, _db_id: u64, _name: String, _id: String, _vector: Vec, _meta: Option>, ) -> RpcResult { Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, "Vector endpoint removed. Use lanceStoreText instead.", None::<()> )) } async fn lance_search( &self, _db_id: u64, _name: String, _vector: Vec, _k: usize, _filter: Option, _return_fields: Option>, ) -> RpcResult { Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, "Vector endpoint removed. Use lanceSearchText instead.", None::<()> )) } async fn lance_create_index( &self, db_id: u64, name: String, index_type: String, params: Option>, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_write_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>)); } server.lance_store() .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? .create_index(&name, &index_type, params.unwrap_or_default()).await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(true) } async fn lance_list( &self, db_id: u64, ) -> RpcResult> { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_read_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "read permission denied", None::<()>)); } let list = server.lance_store() .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? .list_datasets().await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(list) } async fn lance_info( &self, db_id: u64, name: String, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_read_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "read permission denied", None::<()>)); } let info = server.lance_store() .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? .get_dataset_info(&name).await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(serde_json::json!(info)) } async fn lance_del( &self, db_id: u64, name: String, id: String, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_write_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>)); } let ok = server.lance_store() .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? .delete_by_id(&name, &id).await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(ok) } async fn lance_drop( &self, db_id: u64, name: String, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_write_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>)); } let ok = server.lance_store() .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? .drop_dataset(&name).await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(ok) } // ----- New text-first Lance RPC implementations ----- async fn lance_set_embedding_config( &self, db_id: u64, name: String, config: EmbeddingConfig, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_write_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>)); } // Validate provider and dimension (only a minimal set is allowed for now) match config.provider { crate::embedding::EmbeddingProvider::openai | crate::embedding::EmbeddingProvider::test | crate::embedding::EmbeddingProvider::image_test => {} } if config.dim == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Invalid embedding config: dim must be > 0", None::<()>)); } server.set_dataset_embedding_config(&name, &config) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(true) } async fn lance_get_embedding_config( &self, db_id: u64, name: String, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_read_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "read permission denied", None::<()>)); } let cfg = server.get_dataset_embedding_config(&name) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(serde_json::to_value(&cfg).unwrap_or(serde_json::json!({}))) } async fn lance_store_text( &self, db_id: u64, name: String, id: String, text: String, meta: Option>, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_write_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>)); } // Resolve embedder and run blocking embedding off the async runtime // Resolve embedder and run embedding on a plain OS thread (avoid dropping any runtime in async context) let embedder = server.get_embedder_for(&name) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; let (tx, rx) = tokio::sync::oneshot::channel(); let emb_arc = embedder.clone(); let text_cl = text.clone(); std::thread::spawn(move || { let res = emb_arc.embed(&text_cl); let _ = tx.send(res); }); let vector = match rx.await { Ok(Ok(v)) => v, Ok(Err(e)) => return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>)), Err(recv_err) => return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, format!("embedding thread error: {}", recv_err), None::<()>)), }; server.lance_store() .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? .store_vector(&name, &id, vector, meta.unwrap_or_default(), Some(text)).await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(true) } async fn lance_search_text( &self, db_id: u64, name: String, text: String, k: usize, filter: Option, return_fields: Option>, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_read_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "read permission denied", None::<()>)); } // Resolve embedder and run embedding on a plain OS thread (avoid dropping any runtime in async context) let embedder = server.get_embedder_for(&name) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; let (tx, rx) = tokio::sync::oneshot::channel(); let emb_arc = embedder.clone(); let text_cl = text.clone(); std::thread::spawn(move || { let res = emb_arc.embed(&text_cl); let _ = tx.send(res); }); let qv = match rx.await { Ok(Ok(v)) => v, Ok(Err(e)) => return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>)), Err(recv_err) => return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, format!("embedding thread error: {}", recv_err), None::<()>)), }; let results = server.lance_store() .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? .search_vectors(&name, qv, k, filter, return_fields).await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; let json_results: Vec = results.into_iter().map(|(id, score, meta)| { serde_json::json!({ "id": id, "score": score, "meta": meta, }) }).collect(); Ok(serde_json::json!({ "results": json_results })) } // ----- New image-first Lance RPC implementations ----- async fn lance_store_image( &self, db_id: u64, name: String, id: String, uri: Option, bytes_b64: Option, meta: Option>, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_write_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>)); } // Validate exactly one of uri or bytes_b64 let (use_uri, use_b64) = (uri.is_some(), bytes_b64.is_some()); if (use_uri && use_b64) || (!use_uri && !use_b64) { return Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, "Provide exactly one of 'uri' or 'bytes_b64'", None::<()>, )); } // Acquire image bytes (with caps) let max_bytes: usize = std::env::var("HERODB_IMAGE_MAX_BYTES") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(10 * 1024 * 1024) as usize; let (bytes, media_uri_opt) = if let Some(u) = uri.clone() { let data = server .fetch_image_bytes_from_uri(&u) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; (data, Some(u)) } else { let b64 = bytes_b64.unwrap_or_default(); let data = general_purpose::STANDARD .decode(b64.as_bytes()) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, format!("base64 decode error: {}", e), None::<()>))?; if data.len() > max_bytes { return Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, format!("Image exceeds max allowed bytes {}", max_bytes), None::<()>, )); } (data, None) }; // Resolve image embedder and embed on a plain OS thread let img_embedder = server .get_image_embedder_for(&name) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; let (tx, rx) = tokio::sync::oneshot::channel(); let emb_arc = img_embedder.clone(); let bytes_cl = bytes.clone(); std::thread::spawn(move || { let res = emb_arc.embed_image(&bytes_cl); let _ = tx.send(res); }); let vector = match rx.await { Ok(Ok(v)) => v, Ok(Err(e)) => return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>)), Err(recv_err) => { return Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, format!("embedding thread error: {}", recv_err), None::<()>, )) } }; // Store vector with media fields server .lance_store() .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? .store_vector_with_media( &name, &id, vector, meta.unwrap_or_default(), None, Some("image".to_string()), media_uri_opt, ) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(true) } async fn lance_search_image( &self, db_id: u64, name: String, k: usize, uri: Option, bytes_b64: Option, filter: Option, return_fields: Option>, ) -> RpcResult { let server = self.get_or_create_server(db_id).await?; if db_id == 0 { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); } if !matches!(server.option.backend, crate::options::BackendType::Lance) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); } if !server.has_read_permission() { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "read permission denied", None::<()>)); } // Validate exactly one of uri or bytes_b64 let (use_uri, use_b64) = (uri.is_some(), bytes_b64.is_some()); if (use_uri && use_b64) || (!use_uri && !use_b64) { return Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, "Provide exactly one of 'uri' or 'bytes_b64'", None::<()>, )); } // Acquire image bytes for query (with caps) let max_bytes: usize = std::env::var("HERODB_IMAGE_MAX_BYTES") .ok() .and_then(|s| s.parse::().ok()) .unwrap_or(10 * 1024 * 1024) as usize; let bytes = if let Some(u) = uri { server .fetch_image_bytes_from_uri(&u) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? } else { let b64 = bytes_b64.unwrap_or_default(); let data = general_purpose::STANDARD .decode(b64.as_bytes()) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, format!("base64 decode error: {}", e), None::<()>))?; if data.len() > max_bytes { return Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, format!("Image exceeds max allowed bytes {}", max_bytes), None::<()>, )); } data }; // Resolve image embedder and embed on OS thread let img_embedder = server .get_image_embedder_for(&name) .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; let (tx, rx) = tokio::sync::oneshot::channel(); let emb_arc = img_embedder.clone(); std::thread::spawn(move || { let res = emb_arc.embed_image(&bytes); let _ = tx.send(res); }); let qv = match rx.await { Ok(Ok(v)) => v, Ok(Err(e)) => return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>)), Err(recv_err) => { return Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, format!("embedding thread error: {}", recv_err), None::<()>, )) } }; // KNN search and return results let results = server .lance_store() .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? .search_vectors(&name, qv, k, filter, return_fields) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; let json_results: Vec = results .into_iter() .map(|(id, score, meta)| { serde_json::json!({ "id": id, "score": score, "meta": meta, }) }) .collect(); Ok(serde_json::json!({ "results": json_results })) } }