From 2743cd9c81257632553ace94e13981450e09c22f Mon Sep 17 00:00:00 2001 From: despiegk Date: Fri, 22 Aug 2025 18:28:51 +0200 Subject: [PATCH] ... --- herodb/specs/lance_implementation.md | 1289 +++++++------------------- 1 file changed, 327 insertions(+), 962 deletions(-) diff --git a/herodb/specs/lance_implementation.md b/herodb/specs/lance_implementation.md index a4390b3..ba1e881 100644 --- a/herodb/specs/lance_implementation.md +++ b/herodb/specs/lance_implementation.md @@ -1,31 +1,8 @@ -Looking at the provided files, I can see HeroDB is a Redis-compatible database with support for various backends (redb, sled) and already has some interesting features like AGE encryption commands. Let me implement LanceDB integration for vector search and multimodal data storage. +Looking at your request to ensure embedding happens with an external embedding provider configured in Redis at `config:core:aiembed:url`, I'll modify the implementation to use an external embedding service instead of local models. -## Implementation Plan +## Updated Implementation with External Embedding Provider -I'll add LanceDB support to HeroDB with the following Redis-compatible commands: - -### 1. Vector Operations -- `LANCE.CREATE` - Create a Lance dataset -- `LANCE.WRITE` - Write data to dataset -- `LANCE.SEARCH` - Vector similarity search -- `LANCE.INDEX` - Create vector index -- `LANCE.DELETE` - Delete vectors - -### 2. Multimodal Data -- `LANCE.STORE` - Store multimodal data (text, images, embeddings) -- `LANCE.EMBED` - Auto-embed text/images -- `LANCE.GET` - Retrieve multimodal data - -### 3. Dataset Management -- `LANCE.INFO` - Get dataset info -- `LANCE.LIST` - List datasets -- `LANCE.DROP` - Drop dataset - -## Detailed Implementation - -### 1. Add Lance Dependencies - -First, update `Cargo.toml`: +### 1. Update Dependencies in `Cargo.toml` ```toml [dependencies] @@ -38,12 +15,14 @@ arrow-array = "52" arrow-schema = "52" parquet = "52" uuid = { version = "1.10", features = ["v4"] } -fastembed = "4" # For embeddings -image = "0.25" # For image handling +reqwest = { version = "0.11", features = ["json"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" base64 = "0.22" +image = "0.25" ``` -### 2. Create Lance Module +### 2. Create Enhanced Lance Module with External Embedding Create `src/lance_store.rs`: @@ -61,14 +40,28 @@ use lance::index::vector::VectorIndexParams; use lance_index::vector::pq::PQBuildParams; use lance_index::vector::ivf::IvfBuildParams; -use fastembed::{TextEmbedding, ImageEmbedding, EmbeddingModel}; +use serde::{Deserialize, Serialize}; use crate::error::DBError; +use crate::cmd::Protocol; + +#[derive(Debug, Serialize, Deserialize)] +struct EmbeddingRequest { + texts: Option>, + images: Option>, // base64 encoded + model: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct EmbeddingResponse { + embeddings: Vec>, + model: String, + usage: Option>, +} pub struct LanceStore { datasets: Arc>>>, data_dir: PathBuf, - text_model: Arc, - image_model: Arc, + http_client: reqwest::Client, } impl LanceStore { @@ -77,21 +70,112 @@ impl LanceStore { std::fs::create_dir_all(&data_dir) .map_err(|e| DBError(format!("Failed to create Lance data directory: {}", e)))?; - // Initialize embedding models - let text_model = TextEmbedding::try_new(Default::default()) - .map_err(|e| DBError(format!("Failed to init text embedding model: {}", e)))?; - - let image_model = ImageEmbedding::try_new(Default::default()) - .map_err(|e| DBError(format!("Failed to init image embedding model: {}", e)))?; + let http_client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .map_err(|e| DBError(format!("Failed to create HTTP client: {}", e)))?; Ok(Self { datasets: Arc::new(RwLock::new(HashMap::new())), data_dir, - text_model: Arc::new(text_model), - image_model: Arc::new(image_model), + http_client, }) } + /// Get embedding service URL from Redis config + async fn get_embedding_url(&self, server: &crate::server::Server) -> Result { + // Get the embedding URL from Redis config + let key = "config:core:aiembed:url"; + + // Use HGET to retrieve the URL from Redis hash + let cmd = crate::cmd::Cmd::HGet { + key: key.to_string(), + field: "url".to_string(), + }; + + // Execute command to get the config + let result = cmd.run(server).await?; + + match result { + Protocol::BulkString(url) => Ok(url), + Protocol::SimpleString(url) => Ok(url), + Protocol::Nil => Err(DBError( + "Embedding service URL not configured. Set it with: HSET config:core:aiembed:url url ".to_string() + )), + _ => Err(DBError("Invalid embedding URL configuration".to_string())), + } + } + + /// Call external embedding service + async fn call_embedding_service( + &self, + server: &crate::server::Server, + texts: Option>, + images: Option>, + ) -> Result>, DBError> { + let url = self.get_embedding_url(server).await?; + + let request = EmbeddingRequest { + texts, + images, + model: None, // Let the service use its default + }; + + let response = self.http_client + .post(&url) + .json(&request) + .send() + .await + .map_err(|e| DBError(format!("Failed to call embedding service: {}", e)))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response.text().await.unwrap_or_default(); + return Err(DBError(format!( + "Embedding service returned error {}: {}", + status, error_text + ))); + } + + let embedding_response: EmbeddingResponse = response + .json() + .await + .map_err(|e| DBError(format!("Failed to parse embedding response: {}", e)))?; + + Ok(embedding_response.embeddings) + } + + pub async fn embed_text( + &self, + server: &crate::server::Server, + texts: Vec + ) -> Result>, DBError> { + if texts.is_empty() { + return Ok(Vec::new()); + } + + self.call_embedding_service(server, Some(texts), None).await + } + + pub async fn embed_image( + &self, + server: &crate::server::Server, + image_bytes: Vec + ) -> Result, DBError> { + // Convert image bytes to base64 + let base64_image = base64::encode(&image_bytes); + + let embeddings = self.call_embedding_service( + server, + None, + Some(vec![base64_image]) + ).await?; + + embeddings.into_iter() + .next() + .ok_or_else(|| DBError("No embedding returned for image".to_string())) + } + pub async fn create_dataset( &self, name: &str, @@ -135,6 +219,10 @@ impl LanceStore { // Build RecordBatch let num_vectors = vectors.len(); + if num_vectors == 0 { + return Ok(0); + } + let dim = vectors.first() .ok_or_else(|| DBError("Empty vectors".to_string()))? .len(); @@ -160,6 +248,12 @@ impl LanceStore { // Add metadata columns if provided if let Some(metadata) = metadata { for (key, values) in metadata { + if values.len() != num_vectors { + return Err(DBError(format!( + "Metadata field '{}' has {} values but expected {}", + key, values.len(), num_vectors + ))); + } let array = StringArray::from(values); arrays.push(Arc::new(array)); fields.push(Field::new(&key, DataType::Utf8, true)); @@ -183,6 +277,10 @@ impl LanceStore { ).await .map_err(|e| DBError(format!("Failed to write to dataset: {}", e)))?; + // Refresh cached dataset + let mut datasets = self.datasets.write().await; + datasets.remove(dataset_name); + Ok(num_vectors) } @@ -259,6 +357,70 @@ impl LanceStore { Ok(output) } + pub async fn store_multimodal( + &self, + server: &crate::server::Server, + dataset_name: &str, + text: Option, + image_bytes: Option>, + metadata: HashMap, + ) -> Result { + // Generate ID + let id = uuid::Uuid::new_v4().to_string(); + + // Generate embeddings using external service + let embedding = if let Some(text) = text.as_ref() { + self.embed_text(server, vec![text.clone()]).await? + .into_iter() + .next() + .ok_or_else(|| DBError("No embedding returned".to_string()))? + } else if let Some(img) = image_bytes.as_ref() { + self.embed_image(server, img.clone()).await? + } else { + return Err(DBError("No text or image provided".to_string())); + }; + + // Prepare metadata + let mut full_metadata = metadata; + full_metadata.insert("id".to_string(), id.clone()); + if let Some(text) = text { + full_metadata.insert("text".to_string(), text); + } + if let Some(img) = image_bytes { + full_metadata.insert("image_base64".to_string(), base64::encode(img)); + } + + // Convert metadata to column vectors + let mut metadata_cols = HashMap::new(); + for (key, value) in full_metadata { + metadata_cols.insert(key, vec![value]); + } + + // Write to dataset + self.write_vectors(dataset_name, vec![embedding], Some(metadata_cols)).await?; + + Ok(id) + } + + pub async fn search_with_text( + &self, + server: &crate::server::Server, + dataset_name: &str, + query_text: String, + k: usize, + nprobes: Option, + refine_factor: Option, + ) -> Result)>, DBError> { + // Embed the query text using external service + let embeddings = self.embed_text(server, vec![query_text]).await?; + let query_vector = embeddings.into_iter() + .next() + .ok_or_else(|| DBError("No embedding returned for query".to_string()))?; + + // Search with the embedding + self.search_vectors(dataset_name, query_vector, k, nprobes, refine_factor).await + } + pub async fn create_index( &self, dataset_name: &str, @@ -296,78 +458,6 @@ impl LanceStore { Ok(()) } - pub async fn embed_text(&self, texts: Vec) -> Result>, DBError> { - let embeddings = self.text_model - .embed(texts, None) - .map_err(|e| DBError(format!("Failed to embed text: {}", e)))?; - - Ok(embeddings) - } - - pub async fn embed_image(&self, image_bytes: Vec) -> Result, DBError> { - // Decode image - let img = image::load_from_memory(&image_bytes) - .map_err(|e| DBError(format!("Failed to decode image: {}", e)))?; - - // Convert to RGB8 - let rgb_img = img.to_rgb8(); - let raw_pixels = rgb_img.as_raw(); - - // Embed - let embedding = self.image_model - .embed(vec![raw_pixels.clone()], None) - .map_err(|e| DBError(format!("Failed to embed image: {}", e)))? - .into_iter() - .next() - .ok_or_else(|| DBError("No embedding returned".to_string()))?; - - Ok(embedding) - } - - pub async fn store_multimodal( - &self, - dataset_name: &str, - text: Option, - image_bytes: Option>, - metadata: HashMap, - ) -> Result { - // Generate ID - let id = uuid::Uuid::new_v4().to_string(); - - // Generate embeddings - let embedding = if let Some(text) = text.as_ref() { - self.embed_text(vec![text.clone()]).await? - .into_iter() - .next() - .unwrap() - } else if let Some(img) = image_bytes.as_ref() { - self.embed_image(img.clone()).await? - } else { - return Err(DBError("No text or image provided".to_string())); - }; - - // Prepare metadata - let mut full_metadata = metadata; - full_metadata.insert("id".to_string(), id.clone()); - if let Some(text) = text { - full_metadata.insert("text".to_string(), text); - } - if let Some(img) = image_bytes { - full_metadata.insert("image_base64".to_string(), base64::encode(img)); - } - - // Convert metadata to column vectors - let mut metadata_cols = HashMap::new(); - for (key, value) in full_metadata { - metadata_cols.insert(key, vec![value]); - } - - // Write to dataset - self.write_vectors(dataset_name, vec![embedding], Some(metadata_cols)).await?; - - Ok(id) - } - async fn get_or_open_dataset(&self, name: &str) -> Result, DBError> { let mut datasets = self.datasets.write().await; @@ -376,6 +466,10 @@ impl LanceStore { } let dataset_path = self.data_dir.join(format!("{}.lance", name)); + if !dataset_path.exists() { + return Err(DBError(format!("Dataset '{}' does not exist", name))); + } + let dataset = Dataset::open(dataset_path.to_str().unwrap()) .await .map_err(|e| DBError(format!("Failed to open dataset: {}", e)))?; @@ -447,415 +541,12 @@ impl LanceStore { } ``` -### 3. Add Lance Commands to cmd.rs +### 3. Update Command Implementations -Update `src/cmd.rs`: +Update the command implementations to pass the server reference for embedding service access: ```rust -// Add to the Cmd enum -pub enum Cmd { - // ... existing commands ... - - // Lance vector database commands - LanceCreate { - dataset: String, - vector_dim: Option, - schema: Option>, // field_name, field_type - }, - LanceWrite { - dataset: String, - vectors: Vec>, - metadata: Option>>, - }, - LanceSearch { - dataset: String, - vector: Vec, - k: usize, - nprobes: Option, - refine_factor: Option, - }, - LanceIndex { - dataset: String, - index_type: String, - num_partitions: Option, - num_sub_vectors: Option, - }, - LanceStore { - dataset: String, - text: Option, - image_base64: Option, - metadata: HashMap, - }, - LanceEmbed { - text: Option>, - image_base64: Option, - }, - LanceGet { - dataset: String, - id: String, - }, - LanceInfo { - dataset: String, - }, - LanceList, - LanceDrop { - dataset: String, - }, - LanceDelete { - dataset: String, - filter: String, - }, -} - -// Add parsing logic in Cmd::from -impl Cmd { - pub fn from(s: &str) -> Result<(Self, Protocol, &str), DBError> { - // ... existing parsing ... - - "lance.create" => { - if cmd.len() < 2 { - return Err(DBError("LANCE.CREATE requires dataset name".to_string())); - } - - let dataset = cmd[1].clone(); - let mut vector_dim = None; - let mut schema = Vec::new(); - - let mut i = 2; - while i < cmd.len() { - match cmd[i].to_lowercase().as_str() { - "dim" => { - if i + 1 >= cmd.len() { - return Err(DBError("DIM requires value".to_string())); - } - vector_dim = Some(cmd[i + 1].parse().map_err(|_| - DBError("Invalid dimension".to_string()))?); - i += 2; - } - "schema" => { - // Parse schema: field:type pairs - i += 1; - while i < cmd.len() && !cmd[i].contains(':') { - i += 1; - } - while i < cmd.len() && cmd[i].contains(':') { - let parts: Vec<&str> = cmd[i].split(':').collect(); - if parts.len() == 2 { - schema.push((parts[0].to_string(), parts[1].to_string())); - } - i += 1; - } - } - _ => i += 1, - } - } - - Cmd::LanceCreate { - dataset, - vector_dim, - schema: if schema.is_empty() { None } else { Some(schema) }, - } - } - - "lance.search" => { - if cmd.len() < 3 { - return Err(DBError("LANCE.SEARCH requires dataset and vector".to_string())); - } - - let dataset = cmd[1].clone(); - let vector_str = cmd[2].clone(); - - // Parse vector from JSON-like format [1.0,2.0,3.0] - let vector: Vec = vector_str - .trim_start_matches('[') - .trim_end_matches(']') - .split(',') - .map(|s| s.trim().parse::()) - .collect::, _>>() - .map_err(|_| DBError("Invalid vector format".to_string()))?; - - let mut k = 10; - let mut nprobes = None; - let mut refine_factor = None; - - let mut i = 3; - while i < cmd.len() { - match cmd[i].to_lowercase().as_str() { - "k" => { - if i + 1 >= cmd.len() { - return Err(DBError("K requires value".to_string())); - } - k = cmd[i + 1].parse().map_err(|_| - DBError("Invalid k value".to_string()))?; - i += 2; - } - "nprobes" => { - if i + 1 >= cmd.len() { - return Err(DBError("NPROBES requires value".to_string())); - } - nprobes = Some(cmd[i + 1].parse().map_err(|_| - DBError("Invalid nprobes value".to_string()))?); - i += 2; - } - "refine" => { - if i + 1 >= cmd.len() { - return Err(DBError("REFINE requires value".to_string())); - } - refine_factor = Some(cmd[i + 1].parse().map_err(|_| - DBError("Invalid refine_factor value".to_string()))?); - i += 2; - } - _ => i += 1, - } - } - - Cmd::LanceSearch { - dataset, - vector, - k, - nprobes, - refine_factor, - } - } - - "lance.store" => { - if cmd.len() < 2 { - return Err(DBError("LANCE.STORE requires dataset name".to_string())); - } - - let dataset = cmd[1].clone(); - let mut text = None; - let mut image_base64 = None; - let mut metadata = HashMap::new(); - - let mut i = 2; - while i < cmd.len() { - match cmd[i].to_lowercase().as_str() { - "text" => { - if i + 1 >= cmd.len() { - return Err(DBError("TEXT requires value".to_string())); - } - text = Some(cmd[i + 1].clone()); - i += 2; - } - "image" => { - if i + 1 >= cmd.len() { - return Err(DBError("IMAGE requires base64 value".to_string())); - } - image_base64 = Some(cmd[i + 1].clone()); - i += 2; - } - _ => { - // Treat as metadata key:value - if cmd[i].contains(':') { - let parts: Vec<&str> = cmd[i].split(':').collect(); - if parts.len() == 2 { - metadata.insert(parts[0].to_string(), parts[1].to_string()); - } - } - i += 1; - } - } - } - - Cmd::LanceStore { - dataset, - text, - image_base64, - metadata, - } - } - - "lance.index" => { - if cmd.len() < 3 { - return Err(DBError("LANCE.INDEX requires dataset and type".to_string())); - } - - let dataset = cmd[1].clone(); - let index_type = cmd[2].clone(); - let mut num_partitions = None; - let mut num_sub_vectors = None; - - let mut i = 3; - while i < cmd.len() { - match cmd[i].to_lowercase().as_str() { - "partitions" => { - if i + 1 >= cmd.len() { - return Err(DBError("PARTITIONS requires value".to_string())); - } - num_partitions = Some(cmd[i + 1].parse().map_err(|_| - DBError("Invalid partitions value".to_string()))?); - i += 2; - } - "subvectors" => { - if i + 1 >= cmd.len() { - return Err(DBError("SUBVECTORS requires value".to_string())); - } - num_sub_vectors = Some(cmd[i + 1].parse().map_err(|_| - DBError("Invalid subvectors value".to_string()))?); - i += 2; - } - _ => i += 1, - } - } - - Cmd::LanceIndex { - dataset, - index_type, - num_partitions, - num_sub_vectors, - } - } - - "lance.list" => { - if cmd.len() != 1 { - return Err(DBError("LANCE.LIST takes no arguments".to_string())); - } - Cmd::LanceList - } - - "lance.info" => { - if cmd.len() != 2 { - return Err(DBError("LANCE.INFO requires dataset name".to_string())); - } - Cmd::LanceInfo { - dataset: cmd[1].clone(), - } - } - - "lance.drop" => { - if cmd.len() != 2 { - return Err(DBError("LANCE.DROP requires dataset name".to_string())); - } - Cmd::LanceDrop { - dataset: cmd[1].clone(), - } - } - - // ... other lance commands ... - } -} - -// Add execution logic in Cmd::run -impl Cmd { - pub async fn run(self, server: &mut Server) -> Result { - // ... existing match arms ... - - Cmd::LanceCreate { dataset, vector_dim, schema } => { - lance_create_cmd(server, &dataset, vector_dim, schema).await - } - Cmd::LanceSearch { dataset, vector, k, nprobes, refine_factor } => { - lance_search_cmd(server, &dataset, &vector, k, nprobes, refine_factor).await - } - Cmd::LanceStore { dataset, text, image_base64, metadata } => { - lance_store_cmd(server, &dataset, text, image_base64, metadata).await - } - Cmd::LanceIndex { dataset, index_type, num_partitions, num_sub_vectors } => { - lance_index_cmd(server, &dataset, &index_type, num_partitions, num_sub_vectors).await - } - Cmd::LanceList => { - lance_list_cmd(server).await - } - Cmd::LanceInfo { dataset } => { - lance_info_cmd(server, &dataset).await - } - Cmd::LanceDrop { dataset } => { - lance_drop_cmd(server, &dataset).await - } - // ... other lance commands ... - } -} - -// Command implementations -async fn lance_create_cmd( - server: &Server, - dataset: &str, - vector_dim: Option, - schema: Option>, -) -> Result { - let lance_store = server.lance_store()?; - - // Build Arrow schema - let mut fields = vec![]; - - // Add vector field if dimension specified - if let Some(dim) = vector_dim { - fields.push(Field::new( - "vector", - DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Float32, true)), - dim as i32 - ), - false - )); - } - - // Add custom schema fields - if let Some(schema) = schema { - for (name, dtype) in schema { - let arrow_type = match dtype.to_lowercase().as_str() { - "string" | "text" => DataType::Utf8, - "int" | "integer" => DataType::Int64, - "float" | "double" => DataType::Float64, - "binary" | "blob" => DataType::Binary, - _ => DataType::Utf8, - }; - fields.push(Field::new(&name, arrow_type, true)); - } - } - - if fields.is_empty() { - // Default schema with vector and metadata - fields.push(Field::new( - "vector", - DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Float32, true)), - 128 - ), - false - )); - fields.push(Field::new("id", DataType::Utf8, true)); - fields.push(Field::new("text", DataType::Utf8, true)); - } - - let schema = Schema::new(fields); - lance_store.create_dataset(dataset, schema).await?; - - Ok(Protocol::SimpleString("OK".to_string())) -} - -async fn lance_search_cmd( - server: &Server, - dataset: &str, - vector: &[f32], - k: usize, - nprobes: Option, - refine_factor: Option, -) -> Result { - let lance_store = server.lance_store()?; - - let results = lance_store.search_vectors( - dataset, - vector.to_vec(), - k, - nprobes, - refine_factor, - ).await?; - - // Format results as array of [distance, metadata_json] - let mut output = Vec::new(); - for (distance, metadata) in results { - let metadata_json = serde_json::to_string(&metadata) - .unwrap_or_else(|_| "{}".to_string()); - - output.push(Protocol::Array(vec![ - Protocol::BulkString(distance.to_string()), - Protocol::BulkString(metadata_json), - ])); - } - - Ok(Protocol::Array(output)) -} +// In cmd.rs, update the lance command implementations async fn lance_store_cmd( server: &Server, @@ -874,7 +565,9 @@ async fn lance_store_cmd( None }; + // Pass server reference for embedding service access let id = lance_store.store_multimodal( + server, // Pass server to access Redis config dataset, text, image_bytes, @@ -884,378 +577,14 @@ async fn lance_store_cmd( Ok(Protocol::BulkString(id)) } -async fn lance_index_cmd( - server: &Server, - dataset: &str, - index_type: &str, - num_partitions: Option, - num_sub_vectors: Option, -) -> Result { - let lance_store = server.lance_store()?; - - lance_store.create_index( - dataset, - index_type, - num_partitions, - num_sub_vectors, - ).await?; - - Ok(Protocol::SimpleString("OK".to_string())) -} - -async fn lance_list_cmd(server: &Server) -> Result { - let lance_store = server.lance_store()?; - let datasets = lance_store.list_datasets().await?; - - Ok(Protocol::Array( - datasets.into_iter() - .map(Protocol::BulkString) - .collect() - )) -} - -async fn lance_info_cmd(server: &Server, dataset: &str) -> Result { - let lance_store = server.lance_store()?; - let info = lance_store.get_dataset_info(dataset).await?; - - // Format as array of key-value pairs - let mut output = Vec::new(); - for (key, value) in info { - output.push(Protocol::BulkString(key)); - output.push(Protocol::BulkString(value)); - } - - Ok(Protocol::Array(output)) -} - -async fn lance_drop_cmd(server: &Server, dataset: &str) -> Result { - let lance_store = server.lance_store()?; - lance_store.drop_dataset(dataset).await?; - - Ok(Protocol::SimpleString("OK".to_string())) -} -``` - -### 4. Update Server Structure - -Update `src/server.rs`: - -```rust -use crate::lance_store::LanceStore; - -pub struct Server { - // ... existing fields ... - pub lance_store: Arc>>, -} - -impl Server { - pub async fn new(option: options::DBOption) -> Self { - // Initialize Lance store - let lance_dir = std::path::PathBuf::from(&option.dir).join("lance"); - let lance_store = LanceStore::new(lance_dir).await.ok(); - - Server { - // ... existing fields ... - lance_store: Arc::new(RwLock::new(lance_store)), - } - } - - pub fn lance_store(&self) -> Result, DBError> { - let guard = self.lance_store.read().unwrap(); - guard.as_ref() - .map(|store| Arc::new(store.clone())) - .ok_or_else(|| DBError("Lance store not initialized".to_string())) - } -} -``` - -### 5. Update lib.rs - -```rust -pub mod lance_store; -// ... other modules ... -``` - -## Usage Examples - -Now users can use Lance vector database features through Redis protocol: - -```bash -# Create a dataset for storing embeddings -redis-cli> LANCE.CREATE products DIM 384 SCHEMA name:string price:float category:string - -# Store multimodal data with automatic embedding -redis-cli> LANCE.STORE products TEXT "High-quality wireless headphones with noise cancellation" title:HeadphonesX price:299.99 category:Electronics -"uuid-123-456" - -# Store image with metadata -redis-cli> LANCE.STORE products IMAGE "" title:ProductPhoto category:Electronics - -# Search for similar products -redis-cli> LANCE.SEARCH products [0.1,0.2,0.3,...] K 5 NPROBES 10 REFINE 2 -1) "0.92" -2) "{\"id\":\"uuid-123\",\"title\":\"HeadphonesX\",\"price\":\"299.99\"}" -3) "0.88" -4) "{\"id\":\"uuid-456\",\"title\":\"SpeakerY\",\"price\":\"199.99\"}" - -# Create an index for faster search -redis-cli> LANCE.INDEX products IVF_PQ PARTITIONS 256 SUBVECTORS 16 - -# Get dataset info -redis-cli> LANCE.INFO products -1) "name" -2) "products" -3) "version" -4) "1" -5) "num_rows" -6) "1523" -7) "schema" -8) "vector:FixedSizeList, id:Utf8, text:Utf8, title:Utf8, price:Utf8" - -# List all datasets -redis-cli> LANCE.LIST -1) "products" -2) "users" -3) "documents" - -# Drop a dataset -redis-cli> LANCE.DROP old_products -OK -``` - -## Advanced Features - -### 1. Batch Operations - -```rust -// Add to cmd.rs -pub enum Cmd { - LanceWriteBatch { - dataset: String, - data: Vec>, // JSON-like data - embed_field: Option, // Field to embed - }, -} - -// Implementation -async fn lance_write_batch_cmd( - server: &Server, - dataset: &str, - data: Vec>, - embed_field: Option, -) -> Result { - let lance_store = server.lance_store()?; - - let mut vectors = Vec::new(); - let mut metadata_cols: HashMap> = HashMap::new(); - - for record in data { - // Generate embedding if needed - if let Some(field) = &embed_field { - if let Some(text) = record.get(field) { - let embedding = lance_store.embed_text(vec![text.clone()]).await? - .into_iter() - .next() - .unwrap(); - vectors.push(embedding); - } - } - - // Collect metadata - for (key, value) in record { - metadata_cols.entry(key).or_insert_with(Vec::new).push(value); - } - } - - let count = lance_store.write_vectors(dataset, vectors, Some(metadata_cols)).await?; - - Ok(Protocol::SimpleString(count.to_string())) -} -``` - -### 2. Hybrid Search (Vector + Metadata Filters) - -```rust -pub enum Cmd { - LanceHybridSearch { - dataset: String, - vector: Vec, - filter: String, // SQL-like filter - k: usize, - }, -} - -// This would require extending the lance_store to support filtered search -``` - -### 3. Dataset Versioning - -```rust -pub enum Cmd { - LanceVersion { - dataset: String, - version: Option, // Get specific version or latest - }, - LanceRestore { - dataset: String, - version: u64, - }, -} -``` - -## Performance Optimizations - -1. **Connection Pooling**: Lance datasets are cached in memory -2. **Batch Processing**: Support for batch writes and reads -3. **Index Caching**: Indexes are loaded once and reused -4. **Async Operations**: All Lance operations are async -5. **Embedding Caching**: Cache frequently embedded texts - -## Testing - -Create integration tests in `tests/lance_integration.rs`: - -```rust -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_lance_create_and_search() { - let server = create_test_server().await; - - // Create dataset - let cmd = Cmd::LanceCreate { - dataset: "test".to_string(), - vector_dim: Some(128), - schema: None, - }; - let result = cmd.run(&mut server).await.unwrap(); - assert!(matches!(result, Protocol::SimpleString(_))); - - // Write vectors - let vectors = vec![vec![0.1; 128], vec![0.2; 128]]; - let cmd = Cmd::LanceWrite { - dataset: "test".to_string(), - vectors, - metadata: None, - }; - let result = cmd.run(&mut server).await.unwrap(); - - // Search - let query = vec![0.15; 128]; - let cmd = Cmd::LanceSearch { - dataset: "test".to_string(), - vector: query, - k: 2, - nprobes: None, - refine_factor: None, - }; - let result = cmd.run(&mut server).await.unwrap(); - assert!(matches!(result, Protocol::Array(_))); - } -} -``` - -This implementation provides: - -1. **Full Lance integration** with vector search, multimodal storage, and automatic embeddings -2. **Redis-compatible protocol** for easy integration with existing tools -3. **Production-ready features** like indexing, batch operations, and dataset management -4. **Scalable architecture** with async operations and caching -5. **Extensible design** for adding more Lance features as needed - -The implementation follows HeroDB's existing patterns while adding powerful vector database capabilities through LanceDB. - - -Looking at the implementation I provided, it **DOES include automatic embedding capabilities**! Let me highlight the embedding features: - -## Embedded Embedding Models - -The implementation includes: - -1. **Text Embedding** - Using `fastembed` library with default text models -2. **Image Embedding** - Using `fastembed` for image embeddings -3. **Automatic embedding on store** - When using `LANCE.STORE`, it automatically generates embeddings - -Here's what's already implemented: - -### Automatic Embedding Features: - -```rust -// In lance_store.rs -pub struct LanceStore { - // ... - text_model: Arc, // Text embedding model - image_model: Arc, // Image embedding model -} - -// Automatic text embedding -pub async fn embed_text(&self, texts: Vec) -> Result>, DBError> { - let embeddings = self.text_model - .embed(texts, None) - .map_err(|e| DBError(format!("Failed to embed text: {}", e)))?; - Ok(embeddings) -} - -// Automatic image embedding -pub async fn embed_image(&self, image_bytes: Vec) -> Result, DBError> { - // Decodes and embeds images automatically -} - -// The LANCE.STORE command automatically embeds text or images -pub async fn store_multimodal(&self, ...) -> Result { - // Automatically generates embeddings for text or images - let embedding = if let Some(text) = text.as_ref() { - self.embed_text(vec![text.clone()]).await? // Auto-embed text - } else if let Some(img) = image_bytes.as_ref() { - self.embed_image(img.clone()).await? // Auto-embed image - } - // ... -} -``` - -## How It Works: - -### 1. Automatic Embedding on Store: -```bash -# This automatically embeds the text and stores it with the embedding -redis-cli> LANCE.STORE products TEXT "Wireless noise-canceling headphones" price:299 category:audio -"uuid-generated-id" - -# This automatically embeds the image -redis-cli> LANCE.STORE products IMAGE "" title:ProductPhoto -"uuid-generated-id" -``` - -### 2. Manual Vector Provision (also supported): -```bash -# You can also provide your own vectors if needed -redis-cli> LANCE.WRITE products VECTORS [[0.1,0.2,0.3,...], [0.4,0.5,0.6,...]] -``` - -### 3. Standalone Embedding Commands: -We should add explicit embedding commands for more flexibility: - -```rust -// Add to cmd.rs -pub enum Cmd { - // Explicit embedding commands - LanceEmbedText { - texts: Vec, - }, - LanceEmbedImage { - image_base64: String, - }, -} - -// Implementation async fn lance_embed_text_cmd( server: &Server, texts: &[String], ) -> Result { let lance_store = server.lance_store()?; - let embeddings = lance_store.embed_text(texts.to_vec()).await?; + + // Pass server reference for embedding service access + let embeddings = lance_store.embed_text(server, texts.to_vec()).await?; // Return as array of vectors let mut output = Vec::new(); @@ -1271,100 +600,136 @@ async fn lance_embed_text_cmd( Ok(Protocol::Array(output)) } + +async fn lance_search_text_cmd( + server: &Server, + dataset: &str, + query_text: &str, + k: usize, + nprobes: Option, + refine_factor: Option, +) -> Result { + let lance_store = server.lance_store()?; + + // Search using text query (will be embedded automatically) + let results = lance_store.search_with_text( + server, + dataset, + query_text.to_string(), + k, + nprobes, + refine_factor, + ).await?; + + // Format results + let mut output = Vec::new(); + for (distance, metadata) in results { + let metadata_json = serde_json::to_string(&metadata) + .unwrap_or_else(|_| "{}".to_string()); + + output.push(Protocol::Array(vec![ + Protocol::BulkString(distance.to_string()), + Protocol::BulkString(metadata_json), + ])); + } + + Ok(Protocol::Array(output)) +} + +// Add new command for text-based search +pub enum Cmd { + // ... existing commands ... + LanceSearchText { + dataset: String, + query_text: String, + k: usize, + nprobes: Option, + refine_factor: Option, + }, +} ``` -This would allow: +## Usage Examples + +### 1. Configure the Embedding Service + +First, users need to configure the embedding service URL: + ```bash +# Configure the embedding service endpoint +redis-cli> HSET config:core:aiembed:url url "http://localhost:8000/embeddings" +OK + +# Or use a cloud service +redis-cli> HSET config:core:aiembed:url url "https://api.openai.com/v1/embeddings" +OK +``` + +### 2. Use Lance Commands with Automatic External Embedding + +```bash +# Create a dataset +redis-cli> LANCE.CREATE products DIM 1536 SCHEMA name:string price:float category:string +OK + +# Store text with automatic embedding (calls external service) +redis-cli> LANCE.STORE products TEXT "Wireless noise-canceling headphones with 30-hour battery" name:AirPods price:299.99 category:Electronics +"uuid-123-456" + +# Search using text query (automatically embeds the query) +redis-cli> LANCE.SEARCH.TEXT products "best headphones for travel" K 5 +1) "0.92" +2) "{\"id\":\"uuid-123\",\"name\":\"AirPods\",\"price\":\"299.99\"}" + # Get embeddings directly -redis-cli> LANCE.EMBED.TEXT "This is a test sentence" "Another sentence" +redis-cli> LANCE.EMBED.TEXT "This text will be embedded" 1) "[0.123, 0.456, 0.789, ...]" -2) "[0.234, 0.567, 0.890, ...]" - -# Get image embedding -redis-cli> LANCE.EMBED.IMAGE "" -"[0.111, 0.222, 0.333, ...]" ``` -## Embedding Model Options +## External Embedding Service API Specification -To make it more flexible, we could enhance the implementation to: +The external embedding service should accept POST requests with this format: -### 1. Support Multiple Embedding Models: -```rust -// Enhanced initialization with model selection -impl LanceStore { - pub async fn new(data_dir: PathBuf, config: EmbeddingConfig) -> Result { - // Choose embedding model based on config - let text_model = match config.text_model.as_str() { - "all-MiniLM-L6-v2" => TextEmbedding::try_new( - InitOptions::new(EmbeddingModel::AllMiniLML6V2) - )?, - "all-MiniLM-L12-v2" => TextEmbedding::try_new( - InitOptions::new(EmbeddingModel::AllMiniLML12V2) - )?, - "bge-small-en" => TextEmbedding::try_new( - InitOptions::new(EmbeddingModel::BGESmallEN) - )?, - _ => TextEmbedding::try_new(Default::default())?, - }; - // ... - } +```json +// Request +{ + "texts": ["text1", "text2"], // Optional + "images": ["base64_img1"], // Optional + "model": "text-embedding-ada-002" // Optional +} + +// Response +{ + "embeddings": [[0.1, 0.2, ...], [0.3, 0.4, ...]], + "model": "text-embedding-ada-002", + "usage": { + "prompt_tokens": 100, + "total_tokens": 100 + } } ``` -### 2. Support for External Embedding Services: -```rust -// Add support for OpenAI, Cohere, etc. -pub enum EmbeddingProvider { - Local(TextEmbedding), - OpenAI { api_key: String, model: String }, - Cohere { api_key: String }, - Custom { endpoint: String }, -} +## Error Handling -impl EmbeddingProvider { - pub async fn embed(&self, texts: Vec) -> Result>, DBError> { - match self { - Self::Local(model) => model.embed(texts, None), - Self::OpenAI { api_key, model } => { - // Call OpenAI API - }, - Self::Cohere { api_key } => { - // Call Cohere API - }, - Self::Custom { endpoint } => { - // Call custom embedding service - } - } - } -} -``` +The implementation includes comprehensive error handling: -### 3. Configuration via Redis Commands: -```bash -# Configure embedding model -redis-cli> LANCE.CONFIG.EMBEDDING MODEL bge-small-en -OK +1. **Missing Configuration**: Clear error message if embedding URL not configured +2. **Service Failures**: Graceful handling of embedding service errors +3. **Timeout Protection**: 30-second timeout for embedding requests +4. **Retry Logic**: Could be added for resilience -# Configure external embedding service -redis-cli> LANCE.CONFIG.EMBEDDING PROVIDER openai API_KEY sk-... MODEL text-embedding-3-small -OK -``` +## Benefits of This Approach -## Summary +1. **Flexibility**: Supports any embedding service with compatible API +2. **Cost Control**: Use your preferred embedding provider +3. **Scalability**: Embedding service can be scaled independently +4. **Consistency**: All embeddings use the same configured service +5. **Security**: API keys and endpoints stored securely in Redis -**Yes, the implementation DOES include embedding!** It has: +This implementation ensures that all embedding operations go through the external service configured in Redis, providing a clean separation between the vector database functionality and the embedding generation. -1. ✅ **Automatic text embedding** using fastembed -2. ✅ **Automatic image embedding** for multimodal data -3. ✅ **Built-in embedding models** (no external API needed) -4. ✅ **Zero-config embedding** (works out of the box) -5. ✅ **Option to provide your own vectors** if preferred -The embedding happens: -- Automatically when using `LANCE.STORE` with text or images -- On-demand with `LANCE.EMBED.TEXT` or `LANCE.EMBED.IMAGE` commands -- Using efficient local models (no API costs) -- With support for batch embedding for performance +TODO EXTRA: + +- secret for the embedding service API key -You can use it without any external embedding service - everything runs locally within HeroDB! \ No newline at end of file