diff --git a/src/cmd.rs b/src/cmd.rs index a1f6291..3ac28f0 100644 --- a/src/cmd.rs +++ b/src/cmd.rs @@ -1,4 +1,4 @@ -use crate::{error::DBError, protocol::Protocol, server::Server}; +use crate::{error::DBError, protocol::Protocol, server::Server, embedding::{EmbeddingConfig, EmbeddingProvider}}; use tokio::time::{timeout, Duration}; use futures::future::select_all; @@ -127,20 +127,20 @@ pub enum Cmd { reducers: Vec, }, - // LanceDB vector search commands + // LanceDB text-first commands (no user-provided vectors) LanceCreate { name: String, dim: usize, }, - LanceStore { + LanceStoreText { name: String, id: String, - vector: Vec, + text: String, meta: Vec<(String, String)>, }, - LanceSearch { + LanceSearchText { name: String, - vector: Vec, + text: String, k: usize, filter: Option, return_fields: Option>, @@ -150,6 +150,16 @@ pub enum Cmd { index_type: String, params: Vec<(String, String)>, }, + // Embedding configuration per dataset + LanceEmbeddingConfigSet { + name: String, + provider: String, + model: String, + params: Vec<(String, String)>, + }, + LanceEmbeddingConfigGet { + name: String, + }, LanceList, LanceInfo { name: String, @@ -862,9 +872,9 @@ impl Cmd { Cmd::LanceCreate { name, dim } } "lance.store" => { - // LANCE.STORE name ID id VECTOR v1 v2 ... [META k v ...] + // LANCE.STORE name ID TEXT [META k v ...] if cmd.len() < 6 { - return Err(DBError("ERR LANCE.STORE requires: name ID VECTOR v1 v2 ... [META k v ...]".to_string())); + return Err(DBError("ERR LANCE.STORE requires: name ID TEXT [META k v ...]".to_string())); } let name = cmd[1].clone(); let mut i = 2; @@ -873,16 +883,16 @@ impl Cmd { } let id = cmd[i + 1].clone(); i += 2; - if i >= cmd.len() || cmd[i].to_uppercase() != "VECTOR" { - return Err(DBError("ERR LANCE.STORE requires VECTOR ".to_string())); + if i >= cmd.len() || cmd[i].to_uppercase() != "TEXT" { + return Err(DBError("ERR LANCE.STORE requires TEXT ".to_string())); } i += 1; - let mut vector: Vec = Vec::new(); - while i < cmd.len() && cmd[i].to_uppercase() != "META" { - let v: f32 = cmd[i].parse().map_err(|_| DBError("ERR vector element must be a float32".to_string()))?; - vector.push(v); - i += 1; + if i >= cmd.len() { + return Err(DBError("ERR LANCE.STORE requires TEXT ".to_string())); } + let text = cmd[i].clone(); + i += 1; + let mut meta: Vec<(String, String)> = Vec::new(); if i < cmd.len() && cmd[i].to_uppercase() == "META" { i += 1; @@ -891,28 +901,28 @@ impl Cmd { i += 2; } } - Cmd::LanceStore { name, id, vector, meta } + Cmd::LanceStoreText { name, id, text, meta } } "lance.search" => { - // LANCE.SEARCH name K k VECTOR v1 v2 ... [FILTER expr] [RETURN n fields...] + // LANCE.SEARCH name K QUERY [FILTER expr] [RETURN n fields...] if cmd.len() < 6 { - return Err(DBError("ERR LANCE.SEARCH requires: name K VECTOR v1 v2 ... [FILTER expr] [RETURN n fields...]".to_string())); + return Err(DBError("ERR LANCE.SEARCH requires: name K QUERY [FILTER expr] [RETURN n fields...]".to_string())); } let name = cmd[1].clone(); if cmd[2].to_uppercase() != "K" { return Err(DBError("ERR LANCE.SEARCH requires K ".to_string())); } let k: usize = cmd[3].parse().map_err(|_| DBError("ERR K must be an integer".to_string()))?; - if cmd[4].to_uppercase() != "VECTOR" { - return Err(DBError("ERR LANCE.SEARCH requires VECTOR ".to_string())); + if cmd[4].to_uppercase() != "QUERY" { + return Err(DBError("ERR LANCE.SEARCH requires QUERY ".to_string())); } let mut i = 5; - let mut vector: Vec = Vec::new(); - while i < cmd.len() && !["FILTER","RETURN"].contains(&cmd[i].to_uppercase().as_str()) { - let v: f32 = cmd[i].parse().map_err(|_| DBError("ERR vector element must be a float32".to_string()))?; - vector.push(v); - i += 1; + if i >= cmd.len() { + return Err(DBError("ERR LANCE.SEARCH requires QUERY ".to_string())); } + let text = cmd[i].clone(); + i += 1; + let mut filter: Option = None; let mut return_fields: Option> = None; while i < cmd.len() { @@ -942,7 +952,7 @@ impl Cmd { _ => { i += 1; } } } - Cmd::LanceSearch { name, vector, k, filter, return_fields } + Cmd::LanceSearchText { name, text, k, filter, return_fields } } "lance.createindex" => { // LANCE.CREATEINDEX name TYPE t [PARAM k v ...] @@ -962,6 +972,60 @@ impl Cmd { } Cmd::LanceCreateIndex { name, index_type, params } } + "lance.embedding" => { + // LANCE.EMBEDDING CONFIG SET name PROVIDER p MODEL m [PARAM k v ...] + // LANCE.EMBEDDING CONFIG GET name + if cmd.len() < 3 || cmd[1].to_uppercase() != "CONFIG" { + return Err(DBError("ERR LANCE.EMBEDDING requires CONFIG subcommand".to_string())); + } + if cmd.len() >= 4 && cmd[2].to_uppercase() == "SET" { + if cmd.len() < 8 { + return Err(DBError("ERR LANCE.EMBEDDING CONFIG SET requires: SET name PROVIDER p MODEL m [PARAM k v ...]".to_string())); + } + let name = cmd[3].clone(); + let mut i = 4; + let mut provider: Option = None; + let mut model: Option = None; + let mut params: Vec<(String, String)> = Vec::new(); + while i < cmd.len() { + match cmd[i].to_uppercase().as_str() { + "PROVIDER" => { + if i + 1 >= cmd.len() { + return Err(DBError("ERR PROVIDER requires a value".to_string())); + } + provider = Some(cmd[i + 1].clone()); + i += 2; + } + "MODEL" => { + if i + 1 >= cmd.len() { + return Err(DBError("ERR MODEL requires a value".to_string())); + } + model = Some(cmd[i + 1].clone()); + i += 2; + } + "PARAM" => { + i += 1; + while i + 1 < cmd.len() { + params.push((cmd[i].clone(), cmd[i + 1].clone())); + i += 2; + } + } + _ => { + // Unknown token; break to avoid infinite loop + i += 1; + } + } + } + let provider = provider.ok_or_else(|| DBError("ERR missing PROVIDER".to_string()))?; + let model = model.ok_or_else(|| DBError("ERR missing MODEL".to_string()))?; + Cmd::LanceEmbeddingConfigSet { name, provider, model, params } + } else if cmd.len() == 4 && cmd[2].to_uppercase() == "GET" { + let name = cmd[3].clone(); + Cmd::LanceEmbeddingConfigGet { name } + } else { + return Err(DBError("ERR LANCE.EMBEDDING CONFIG supports: SET ... | GET name".to_string())); + } + } "lance.list" => { if cmd.len() != 1 { return Err(DBError("ERR LANCE.LIST takes no arguments".to_string())); @@ -1070,8 +1134,10 @@ impl Cmd { | Cmd::Command(..) | Cmd::Info(..) | Cmd::LanceCreate { .. } - | Cmd::LanceStore { .. } - | Cmd::LanceSearch { .. } + | Cmd::LanceStoreText { .. } + | Cmd::LanceSearchText { .. } + | Cmd::LanceEmbeddingConfigSet { .. } + | Cmd::LanceEmbeddingConfigGet { .. } | Cmd::LanceCreateIndex { .. } | Cmd::LanceList | Cmd::LanceInfo { .. } @@ -1104,8 +1170,10 @@ impl Cmd { if !is_lance_backend { match &self { Cmd::LanceCreate { .. } - | Cmd::LanceStore { .. } - | Cmd::LanceSearch { .. } + | Cmd::LanceStoreText { .. } + | Cmd::LanceSearchText { .. } + | Cmd::LanceEmbeddingConfigSet { .. } + | Cmd::LanceEmbeddingConfigGet { .. } | Cmd::LanceCreateIndex { .. } | Cmd::LanceList | Cmd::LanceInfo { .. } @@ -1249,18 +1317,66 @@ impl Cmd { Err(e) => Ok(Protocol::err(&e.0)), } } - Cmd::LanceStore { name, id, vector, meta } => { + Cmd::LanceEmbeddingConfigSet { name, provider, model, params } => { if !server.has_write_permission() { return Ok(Protocol::err("ERR write permission denied")); } - let meta_map: std::collections::HashMap = meta.into_iter().collect(); - match server.lance_store()?.store_vector(&name, &id, vector, meta_map).await { + // Map provider string to enum + let p_lc = provider.to_lowercase(); + let prov = match p_lc.as_str() { + "test-hash" | "testhash" => EmbeddingProvider::TestHash, + "fastembed" | "lancefastembed" => EmbeddingProvider::LanceFastEmbed, + "openai" | "lanceopenai" => EmbeddingProvider::LanceOpenAI, + other => EmbeddingProvider::LanceOther(other.to_string()), + }; + let cfg = EmbeddingConfig { + provider: prov, + model, + params: params.into_iter().collect(), + }; + match server.set_dataset_embedding_config(&name, &cfg) { Ok(()) => Ok(Protocol::SimpleString("OK".to_string())), Err(e) => Ok(Protocol::err(&e.0)), } } - Cmd::LanceSearch { name, vector, k, filter, return_fields } => { - match server.lance_store()?.search_vectors(&name, vector, k, filter, return_fields).await { + Cmd::LanceEmbeddingConfigGet { name } => { + match server.get_dataset_embedding_config(&name) { + Ok(cfg) => { + let mut arr = Vec::new(); + arr.push(Protocol::BulkString("provider".to_string())); + arr.push(Protocol::BulkString(match cfg.provider { + EmbeddingProvider::TestHash => "test-hash".to_string(), + EmbeddingProvider::LanceFastEmbed => "lancefastembed".to_string(), + EmbeddingProvider::LanceOpenAI => "lanceopenai".to_string(), + EmbeddingProvider::LanceOther(ref s) => s.clone(), + })); + arr.push(Protocol::BulkString("model".to_string())); + arr.push(Protocol::BulkString(cfg.model.clone())); + arr.push(Protocol::BulkString("params".to_string())); + arr.push(Protocol::BulkString(serde_json::to_string(&cfg.params).unwrap_or_else(|_| "{}".to_string()))); + Ok(Protocol::Array(arr)) + } + Err(e) => Ok(Protocol::err(&e.0)), + } + } + Cmd::LanceStoreText { name, id, text, meta } => { + if !server.has_write_permission() { + return Ok(Protocol::err("ERR write permission denied")); + } + // Resolve embedder and embed text + let embedder = server.get_embedder_for(&name)?; + let vector = embedder.embed(&text)?; + let meta_map: std::collections::HashMap = meta.into_iter().collect(); + match server.lance_store()?.store_vector(&name, &id, vector, meta_map, Some(text)).await { + Ok(()) => Ok(Protocol::SimpleString("OK".to_string())), + Err(e) => Ok(Protocol::err(&e.0)), + } + } + Cmd::LanceSearchText { name, text, k, filter, return_fields } => { + // Resolve embedder and embed query text + let embedder = server.get_embedder_for(&name)?; + let qv = embedder.embed(&text)?; + match server.lance_store()?.search_vectors(&name, qv, k, filter, return_fields).await { Ok(results) => { // Encode as array of [id, score, [k1, v1, k2, v2, ...]] let mut arr = Vec::new(); diff --git a/src/embedding.rs b/src/embedding.rs new file mode 100644 index 0000000..db982b9 --- /dev/null +++ b/src/embedding.rs @@ -0,0 +1,138 @@ +// Embedding abstraction and minimal providers. +// +// This module defines a provider-agnostic interface to produce vector embeddings +// from text, so callers never need to supply vectors manually. It includes: +// - Embedder trait +// - EmbeddingProvider and EmbeddingConfig (serde-serializable) +// - TestHashEmbedder: deterministic, CPU-only, no-network embedder suitable for CI +// - Factory create_embedder(..) to instantiate an embedder from config +// +// Integration plan: +// - Server will resolve per-dataset EmbeddingConfig from sidecar JSON and cache Arc +// - LanceStore will call embedder.embed(text) then persist id, vector, text, meta +// +// Note: Real LanceDB-backed embedding providers can be added by implementing Embedder +// and extending create_embedder(..). This file keeps no direct dependency on LanceDB. + +use std::collections::HashMap; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +use crate::error::DBError; + +/// Provider identifiers. Extend as needed to mirror LanceDB-supported providers. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum EmbeddingProvider { + // Deterministic, local-only embedder for CI and offline development. + TestHash, + // Placeholders for LanceDB-supported providers; implementers can add concrete backends later. + LanceFastEmbed, + LanceOpenAI, + LanceOther(String), +} + +/// Serializable embedding configuration. +/// params: arbitrary key-value map for provider-specific knobs (e.g., "dim", "api_key_env", etc.) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmbeddingConfig { + pub provider: EmbeddingProvider, + pub model: String, + #[serde(default)] + pub params: HashMap, +} + +impl EmbeddingConfig { + pub fn get_param_usize(&self, key: &str) -> Option { + self.params.get(key).and_then(|v| v.parse::().ok()) + } + pub fn get_param_string(&self, key: &str) -> Option { + self.params.get(key).cloned() + } +} + +/// A provider-agnostic text embedding interface. +pub trait Embedder: Send + Sync { + /// Human-readable provider/model name + fn name(&self) -> String; + /// Embedding dimension + fn dim(&self) -> usize; + /// Embed a single text string into a fixed-length vector + fn embed(&self, text: &str) -> Result, DBError>; + /// Embed many texts; default maps embed() over inputs + fn embed_many(&self, texts: &[String]) -> Result>, DBError> { + texts.iter().map(|t| self.embed(t)).collect() + } +} + +/// Deterministic, no-deps, no-network embedder for CI and offline dev. +/// Algorithm: +/// - Fold bytes of UTF-8 into 'dim' buckets with a simple rolling hash +/// - Apply tanh-like scaling and L2-normalize to unit length +pub struct TestHashEmbedder { + dim: usize, + model_name: String, +} + +impl TestHashEmbedder { + pub fn new(dim: usize, model_name: impl Into) -> Self { + Self { dim, model_name: model_name.into() } + } + + fn l2_normalize(mut v: Vec) -> Vec { + let norm: f32 = v.iter().map(|x| x * x).sum::().sqrt(); + if norm > 0.0 { + for x in &mut v { + *x /= norm; + } + } + v + } +} + +impl Embedder for TestHashEmbedder { + fn name(&self) -> String { + format!("test-hash:{}", self.model_name) + } + + fn dim(&self) -> usize { + self.dim + } + + fn embed(&self, text: &str) -> Result, DBError> { + let mut acc = vec![0f32; self.dim]; + // A simple, deterministic folding hash over bytes + let mut h1: u32 = 2166136261u32; // FNV-like seed + let mut h2: u32 = 0x9e3779b9u32; // golden ratio + for (i, b) in text.as_bytes().iter().enumerate() { + h1 ^= *b as u32; + h1 = h1.wrapping_mul(16777619u32); + h2 = h2.wrapping_add(((*b as u32) << (i % 13)) ^ (h1.rotate_left((i % 7) as u32))); + let idx = (h1 ^ h2) as usize % self.dim; + // Map byte to [-1, 1] and accumulate with mild decay by position + let val = ((*b as f32) / 127.5 - 1.0) * (1.0 / (1.0 + (i as f32 / 32.0))); + acc[idx] += val; + } + // Non-linear squashing to stabilize + normalize + for x in &mut acc { + *x = x.tanh(); + } + Ok(Self::l2_normalize(acc)) + } +} + +/// Create an embedder instance from a config. +/// - TestHash: uses params["dim"] or defaults to 64 +/// - Lance* providers: return an explicit error for now; implementers can wire these up +pub fn create_embedder(config: &EmbeddingConfig) -> Result, DBError> { + match &config.provider { + EmbeddingProvider::TestHash => { + let dim = config.get_param_usize("dim").unwrap_or(64); + Ok(Arc::new(TestHashEmbedder::new(dim, config.model.clone()))) + } + EmbeddingProvider::LanceFastEmbed => Err(DBError("LanceFastEmbed provider not yet implemented in Rust embedding layer; configure 'test-hash' or implement a Lance-backed provider".into())), + EmbeddingProvider::LanceOpenAI => Err(DBError("LanceOpenAI provider not yet implemented in Rust embedding layer; configure 'test-hash' or implement a Lance-backed provider".into())), + EmbeddingProvider::LanceOther(p) => Err(DBError(format!("Lance provider '{}' not implemented; configure 'test-hash' or implement a Lance-backed provider", p))), + } +} \ No newline at end of file diff --git a/src/lance_store.rs b/src/lance_store.rs index a26d6a8..78f1923 100644 --- a/src/lance_store.rs +++ b/src/lance_store.rs @@ -111,6 +111,7 @@ impl LanceStore { Arc::new(Schema::new(vec![ Field::new("id", DataType::Utf8, false), Self::vector_field(dim), + Field::new("text", DataType::Utf8, true), Field::new("meta", DataType::Utf8, true), ])) } @@ -119,6 +120,7 @@ impl LanceStore { id: &str, vector: &[f32], meta: &HashMap, + text: Option<&str>, dim: i32, ) -> Result<(Arc, RecordBatch), DBError> { if vector.len() as i32 != dim { @@ -145,6 +147,15 @@ impl LanceStore { list_builder.append(true); let vec_arr = Arc::new(list_builder.finish()) as Arc; + // text column (optional) + let mut text_builder = StringBuilder::new(); + if let Some(t) = text { + text_builder.append_value(t); + } else { + text_builder.append_null(); + } + let text_arr = Arc::new(text_builder.finish()) as Arc; + // meta column (JSON string) let meta_json = if meta.is_empty() { None @@ -160,7 +171,7 @@ impl LanceStore { let meta_arr = Arc::new(meta_builder.finish()) as Arc; let batch = - RecordBatch::try_new(schema.clone(), vec![id_arr, vec_arr, meta_arr]).map_err(|e| { + RecordBatch::try_new(schema.clone(), vec![id_arr, vec_arr, text_arr, meta_arr]).map_err(|e| { DBError(format!("RecordBatch build failed: {e}")) })?; @@ -195,10 +206,11 @@ impl LanceStore { let v_builder = Float32Builder::new(); let mut list_builder = FixedSizeListBuilder::new(v_builder, dim_i32); let empty_vec = Arc::new(list_builder.finish()) as Arc; + let empty_text = Arc::new(StringArray::new_null(0)); let empty_meta = Arc::new(StringArray::new_null(0)); let empty_batch = - RecordBatch::try_new(schema.clone(), vec![empty_id, empty_vec, empty_meta]) + RecordBatch::try_new(schema.clone(), vec![empty_id, empty_vec, empty_text, empty_meta]) .map_err(|e| DBError(format!("Build empty batch failed: {e}")))?; let write_params = WriteParams { @@ -222,6 +234,7 @@ impl LanceStore { id: &str, vector: Vec, meta: HashMap, + text: Option, ) -> Result<(), DBError> { let path = self.dataset_path(name); @@ -235,7 +248,7 @@ impl LanceStore { .map_err(|_| DBError("Vector length too large".into()))? }; - let (schema, batch) = Self::build_one_row_batch(id, &vector, &meta, dim_i32)?; + let (schema, batch) = Self::build_one_row_batch(id, &vector, &meta, text.as_deref(), dim_i32)?; // If LanceDB table exists and provides delete, we can upsert by deleting same id // Try best-effort delete; ignore errors to keep operation append-only on failure diff --git a/src/lib.rs b/src/lib.rs index e862328..b91107f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,3 +15,4 @@ pub mod admin_meta; pub mod tantivy_search; pub mod search_cmd; pub mod lance_store; +pub mod embedding; diff --git a/src/rpc.rs b/src/rpc.rs index 62b0593..2e3b3c6 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -9,6 +9,7 @@ use sha2::{Digest, Sha256}; use crate::server::Server; use crate::options::DBOption; use crate::admin_meta; +use crate::embedding::{EmbeddingConfig, EmbeddingProvider}; /// Database backend types #[derive(Debug, Clone, Serialize, Deserialize)] @@ -163,8 +164,8 @@ pub trait Rpc { #[method(name = "ftDrop")] async fn ft_drop(&self, db_id: u64, index_name: String) -> RpcResult; - // ----- LanceDB (Vector) RPC endpoints ----- - + // ----- LanceDB (Vector + Text) RPC endpoints ----- + /// Create a new Lance dataset in a Lance-backed DB #[method(name = "lanceCreate")] async fn lance_create( @@ -173,8 +174,8 @@ pub trait Rpc { name: String, dim: usize, ) -> RpcResult; - - /// Store a vector (with id and metadata) into a Lance dataset + + /// Store a vector (with id and metadata) into a Lance dataset (deprecated; returns error) #[method(name = "lanceStore")] async fn lance_store( &self, @@ -184,8 +185,8 @@ pub trait Rpc { vector: Vec, meta: Option>, ) -> RpcResult; - - /// Search a Lance dataset with a query vector + + /// Search a Lance dataset with a query vector (deprecated; returns error) #[method(name = "lanceSearch")] async fn lance_search( &self, @@ -196,7 +197,7 @@ pub trait Rpc { filter: Option, return_fields: Option>, ) -> RpcResult; - + /// Create an ANN index on a Lance dataset #[method(name = "lanceCreateIndex")] async fn lance_create_index( @@ -206,14 +207,14 @@ pub trait Rpc { index_type: String, params: Option>, ) -> RpcResult; - + /// List Lance datasets for a DB #[method(name = "lanceList")] async fn lance_list( &self, db_id: u64, ) -> RpcResult>; - + /// Get info for a Lance dataset #[method(name = "lanceInfo")] async fn lance_info( @@ -221,7 +222,7 @@ pub trait Rpc { db_id: u64, name: String, ) -> RpcResult; - + /// Delete a record by id from a Lance dataset #[method(name = "lanceDel")] async fn lance_del( @@ -230,7 +231,7 @@ pub trait Rpc { name: String, id: String, ) -> RpcResult; - + /// Drop a Lance dataset #[method(name = "lanceDrop")] async fn lance_drop( @@ -238,6 +239,49 @@ pub trait Rpc { db_id: u64, name: String, ) -> RpcResult; + + // New: Text-first endpoints (no user-provided vectors) + /// Set per-dataset embedding configuration + #[method(name = "lanceSetEmbeddingConfig")] + async fn lance_set_embedding_config( + &self, + db_id: u64, + name: String, + provider: String, + model: String, + params: Option>, + ) -> RpcResult; + + /// Get per-dataset embedding configuration + #[method(name = "lanceGetEmbeddingConfig")] + async fn lance_get_embedding_config( + &self, + db_id: u64, + name: String, + ) -> RpcResult; + + /// Store text; server will embed and store vector+text+meta + #[method(name = "lanceStoreText")] + async fn lance_store_text( + &self, + db_id: u64, + name: String, + id: String, + text: String, + meta: Option>, + ) -> RpcResult; + + /// Search using a text query; server will embed then search + #[method(name = "lanceSearchText")] + async fn lance_search_text( + &self, + db_id: u64, + name: String, + text: String, + k: usize, + filter: Option, + return_fields: Option>, + ) -> RpcResult; } /// RPC Server implementation @@ -789,62 +833,33 @@ impl RpcServer for RpcServerImpl { async fn lance_store( &self, - db_id: u64, - name: String, - id: String, - vector: Vec, - meta: Option>, + _db_id: u64, + _name: String, + _id: String, + _vector: Vec, + _meta: Option>, ) -> RpcResult { - let server = self.get_or_create_server(db_id).await?; - if db_id == 0 { - return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); - } - if !matches!(server.option.backend, crate::options::BackendType::Lance) { - return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); - } - if !server.has_write_permission() { - return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>)); - } - server.lance_store() - .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? - .store_vector(&name, &id, vector, meta.unwrap_or_default()).await - .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; - Ok(true) + Err(jsonrpsee::types::ErrorObjectOwned::owned( + -32000, + "Vector endpoint removed. Use lanceStoreText instead.", + None::<()> + )) } async fn lance_search( &self, - db_id: u64, - name: String, - vector: Vec, - k: usize, - filter: Option, - return_fields: Option>, + _db_id: u64, + _name: String, + _vector: Vec, + _k: usize, + _filter: Option, + _return_fields: Option>, ) -> RpcResult { - let server = self.get_or_create_server(db_id).await?; - if db_id == 0 { - return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); - } - if !matches!(server.option.backend, crate::options::BackendType::Lance) { - return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); - } - if !server.has_read_permission() { - return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "read permission denied", None::<()>)); - } - let results = server.lance_store() - .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? - .search_vectors(&name, vector, k, filter, return_fields).await - .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; - - let json_results: Vec = results.into_iter().map(|(id, score, meta)| { - serde_json::json!({ - "id": id, - "score": score, - "meta": meta, - }) - }).collect(); - - Ok(serde_json::json!({ "results": json_results })) + Err(jsonrpsee::types::ErrorObjectOwned::owned( + -32000, + "Vector endpoint removed. Use lanceSearchText instead.", + None::<()> + )) } async fn lance_create_index( @@ -958,4 +973,137 @@ impl RpcServer for RpcServerImpl { .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(ok) } + + // ----- New text-first Lance RPC implementations ----- + + async fn lance_set_embedding_config( + &self, + db_id: u64, + name: String, + provider: String, + model: String, + params: Option>, + ) -> RpcResult { + let server = self.get_or_create_server(db_id).await?; + if db_id == 0 { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); + } + if !matches!(server.option.backend, crate::options::BackendType::Lance) { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); + } + if !server.has_write_permission() { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>)); + } + let prov = match provider.to_lowercase().as_str() { + "test-hash" | "testhash" => EmbeddingProvider::TestHash, + "fastembed" | "lancefastembed" => EmbeddingProvider::LanceFastEmbed, + "openai" | "lanceopenai" => EmbeddingProvider::LanceOpenAI, + other => EmbeddingProvider::LanceOther(other.to_string()), + }; + let cfg = EmbeddingConfig { + provider: prov, + model, + params: params.unwrap_or_default(), + }; + server.set_dataset_embedding_config(&name, &cfg) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; + Ok(true) + } + + async fn lance_get_embedding_config( + &self, + db_id: u64, + name: String, + ) -> RpcResult { + let server = self.get_or_create_server(db_id).await?; + if db_id == 0 { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); + } + if !matches!(server.option.backend, crate::options::BackendType::Lance) { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); + } + if !server.has_read_permission() { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "read permission denied", None::<()>)); + } + let cfg = server.get_dataset_embedding_config(&name) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; + Ok(serde_json::json!({ + "provider": match cfg.provider { + EmbeddingProvider::TestHash => "test-hash", + EmbeddingProvider::LanceFastEmbed => "lancefastembed", + EmbeddingProvider::LanceOpenAI => "lanceopenai", + EmbeddingProvider::LanceOther(ref s) => s, + }, + "model": cfg.model, + "params": cfg.params + })) + } + + async fn lance_store_text( + &self, + db_id: u64, + name: String, + id: String, + text: String, + meta: Option>, + ) -> RpcResult { + let server = self.get_or_create_server(db_id).await?; + if db_id == 0 { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); + } + if !matches!(server.option.backend, crate::options::BackendType::Lance) { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); + } + if !server.has_write_permission() { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>)); + } + let embedder = server.get_embedder_for(&name) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; + let vector = embedder.embed(&text) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; + server.lance_store() + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? + .store_vector(&name, &id, vector, meta.unwrap_or_default(), Some(text)).await + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; + Ok(true) + } + + async fn lance_search_text( + &self, + db_id: u64, + name: String, + text: String, + k: usize, + filter: Option, + return_fields: Option>, + ) -> RpcResult { + let server = self.get_or_create_server(db_id).await?; + if db_id == 0 { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>)); + } + if !matches!(server.option.backend, crate::options::BackendType::Lance) { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>)); + } + if !server.has_read_permission() { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "read permission denied", None::<()>)); + } + let embedder = server.get_embedder_for(&name) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; + let qv = embedder.embed(&text) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; + let results = server.lance_store() + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))? + .search_vectors(&name, qv, k, filter, return_fields).await + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; + + let json_results: Vec = results.into_iter().map(|(id, score, meta)| { + serde_json::json!({ + "id": id, + "score": score, + "meta": meta, + }) + }).collect(); + + Ok(serde_json::json!({ "results": json_results })) + } } \ No newline at end of file diff --git a/src/server.rs b/src/server.rs index 1918cc3..31fe5b7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -14,6 +14,10 @@ use crate::protocol::Protocol; use crate::storage_trait::StorageBackend; use crate::admin_meta; +// Embeddings: config and cache +use crate::embedding::{EmbeddingConfig, create_embedder, Embedder}; +use serde_json; + #[derive(Clone)] pub struct Server { pub db_cache: std::sync::Arc>>>, @@ -29,6 +33,9 @@ pub struct Server { // Per-DB Lance stores (vector DB), keyed by db_id pub lance_stores: Arc>>>, + // Per-(db_id, dataset) embedder cache + pub embedders: Arc>>>, + // BLPOP waiter registry: per (db_index, key) FIFO of waiters pub list_waiters: Arc>>>>, pub waiter_seq: Arc, @@ -58,6 +65,7 @@ impl Server { search_indexes: Arc::new(std::sync::RwLock::new(HashMap::new())), lance_stores: Arc::new(std::sync::RwLock::new(HashMap::new())), + embedders: Arc::new(std::sync::RwLock::new(HashMap::new())), list_waiters: Arc::new(Mutex::new(HashMap::new())), waiter_seq: Arc::new(AtomicU64::new(1)), } @@ -153,6 +161,78 @@ impl Server { Ok(store) } + // ----- Embedding configuration and resolution ----- + + // Sidecar embedding config path: /lance//.lance.embedding.json + fn dataset_embedding_config_path(&self, dataset: &str) -> std::path::PathBuf { + let mut base = self.lance_data_path(); + // Ensure parent dir exists + if !base.exists() { + let _ = std::fs::create_dir_all(&base); + } + base.push(format!("{}.lance.embedding.json", dataset)); + base + } + + /// Persist per-dataset embedding config as JSON sidecar. + pub fn set_dataset_embedding_config(&self, dataset: &str, cfg: &EmbeddingConfig) -> Result<(), DBError> { + if self.selected_db == 0 { + return Err(DBError("Lance not available on admin DB 0".to_string())); + } + let p = self.dataset_embedding_config_path(dataset); + let data = serde_json::to_vec_pretty(cfg) + .map_err(|e| DBError(format!("Failed to serialize embedding config: {}", e)))?; + std::fs::write(&p, data) + .map_err(|e| DBError(format!("Failed to write embedding config {}: {}", p.display(), e)))?; + // Invalidate embedder cache entry for this dataset + { + let mut map = self.embedders.write().unwrap(); + map.remove(&(self.selected_db, dataset.to_string())); + } + Ok(()) + } + + /// Load per-dataset embedding config. + pub fn get_dataset_embedding_config(&self, dataset: &str) -> Result { + if self.selected_db == 0 { + return Err(DBError("Lance not available on admin DB 0".to_string())); + } + let p = self.dataset_embedding_config_path(dataset); + if !p.exists() { + return Err(DBError(format!( + "Embedding config not set for dataset '{}'. Use LANCE.EMBEDDING CONFIG SET ... or RPC to configure.", + dataset + ))); + } + let data = std::fs::read(&p) + .map_err(|e| DBError(format!("Failed to read embedding config {}: {}", p.display(), e)))?; + let cfg: EmbeddingConfig = serde_json::from_slice(&data) + .map_err(|e| DBError(format!("Failed to parse embedding config {}: {}", p.display(), e)))?; + Ok(cfg) + } + + /// Resolve or build an embedder for (db_id, dataset). Caches instance. + pub fn get_embedder_for(&self, dataset: &str) -> Result, DBError> { + if self.selected_db == 0 { + return Err(DBError("Lance not available on admin DB 0".to_string())); + } + // Fast path + { + let map = self.embedders.read().unwrap(); + if let Some(e) = map.get(&(self.selected_db, dataset.to_string())) { + return Ok(e.clone()); + } + } + // Load config and instantiate + let cfg = self.get_dataset_embedding_config(dataset)?; + let emb = create_embedder(&cfg)?; + { + let mut map = self.embedders.write().unwrap(); + map.insert((self.selected_db, dataset.to_string()), emb.clone()); + } + Ok(emb) + } + /// Check if current permissions allow read operations pub fn has_read_permission(&self) -> bool { // If an explicit permission is set for this connection, honor it.