WIP 1: implement lancedb vector
This commit is contained in:
4402
Cargo.lock
generated
4402
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -27,6 +27,13 @@ x25519-dalek = "2"
|
||||
base64 = "0.22"
|
||||
jsonrpsee = { version = "0.26.0", features = ["http-client", "ws-client", "server", "macros"] }
|
||||
tantivy = "0.25.0"
|
||||
arrow-schema = "55.2.0"
|
||||
arrow-array = "55.2.0"
|
||||
lance = "0.37.0"
|
||||
lance-index = "0.37.0"
|
||||
arrow = "55.2.0"
|
||||
lancedb = "0.22.1"
|
||||
uuid = "1.18.1"
|
||||
|
||||
[dev-dependencies]
|
||||
redis = { version = "0.24", features = ["aio", "tokio-comp"] }
|
||||
|
@@ -48,8 +48,8 @@ fn init_admin_storage(
|
||||
let storage: Arc<dyn StorageBackend> = match backend {
|
||||
options::BackendType::Redb => Arc::new(Storage::new(&db_file, true, Some(admin_secret))?),
|
||||
options::BackendType::Sled => Arc::new(SledStorage::new(&db_file, true, Some(admin_secret))?),
|
||||
options::BackendType::Tantivy => {
|
||||
return Err(DBError("Admin DB 0 cannot use Tantivy backend".to_string()))
|
||||
options::BackendType::Tantivy | options::BackendType::Lance => {
|
||||
return Err(DBError("Admin DB 0 cannot use search-only backends (Tantivy/Lance)".to_string()))
|
||||
}
|
||||
};
|
||||
Ok(storage)
|
||||
@@ -206,6 +206,9 @@ pub fn open_data_storage(
|
||||
options::BackendType::Tantivy => {
|
||||
return Err(DBError("Tantivy backend has no KV storage; use FT.* commands only".to_string()))
|
||||
}
|
||||
options::BackendType::Lance => {
|
||||
return Err(DBError("Lance backend has no KV storage; use LANCE.* commands only".to_string()))
|
||||
}
|
||||
};
|
||||
|
||||
// Publish to registry
|
||||
@@ -299,6 +302,7 @@ pub fn set_database_backend(
|
||||
options::BackendType::Redb => "Redb",
|
||||
options::BackendType::Sled => "Sled",
|
||||
options::BackendType::Tantivy => "Tantivy",
|
||||
options::BackendType::Lance => "Lance",
|
||||
};
|
||||
let _ = admin.hset(&mk, vec![("backend".to_string(), val.to_string())])?;
|
||||
Ok(())
|
||||
@@ -316,6 +320,7 @@ pub fn get_database_backend(
|
||||
Some(s) if s == "Redb" => Ok(Some(options::BackendType::Redb)),
|
||||
Some(s) if s == "Sled" => Ok(Some(options::BackendType::Sled)),
|
||||
Some(s) if s == "Tantivy" => Ok(Some(options::BackendType::Tantivy)),
|
||||
Some(s) if s == "Lance" => Ok(Some(options::BackendType::Lance)),
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
|
326
src/cmd.rs
326
src/cmd.rs
@@ -125,6 +125,41 @@ pub enum Cmd {
|
||||
query: String,
|
||||
group_by: Vec<String>,
|
||||
reducers: Vec<String>,
|
||||
},
|
||||
|
||||
// LanceDB vector search commands
|
||||
LanceCreate {
|
||||
name: String,
|
||||
dim: usize,
|
||||
},
|
||||
LanceStore {
|
||||
name: String,
|
||||
id: String,
|
||||
vector: Vec<f32>,
|
||||
meta: Vec<(String, String)>,
|
||||
},
|
||||
LanceSearch {
|
||||
name: String,
|
||||
vector: Vec<f32>,
|
||||
k: usize,
|
||||
filter: Option<String>,
|
||||
return_fields: Option<Vec<String>>,
|
||||
},
|
||||
LanceCreateIndex {
|
||||
name: String,
|
||||
index_type: String,
|
||||
params: Vec<(String, String)>,
|
||||
},
|
||||
LanceList,
|
||||
LanceInfo {
|
||||
name: String,
|
||||
},
|
||||
LanceDel {
|
||||
name: String,
|
||||
id: String,
|
||||
},
|
||||
LanceDrop {
|
||||
name: String,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -815,6 +850,142 @@ impl Cmd {
|
||||
let reducers = Vec::new();
|
||||
Cmd::FtAggregate { index_name, query, group_by, reducers }
|
||||
}
|
||||
|
||||
// ----- LANCE.* commands -----
|
||||
"lance.create" => {
|
||||
// LANCE.CREATE name DIM d
|
||||
if cmd.len() != 4 || cmd[2].to_uppercase() != "DIM" {
|
||||
return Err(DBError("ERR LANCE.CREATE requires: name DIM <dim>".to_string()));
|
||||
}
|
||||
let name = cmd[1].clone();
|
||||
let dim: usize = cmd[3].parse().map_err(|_| DBError("ERR DIM must be an integer".to_string()))?;
|
||||
Cmd::LanceCreate { name, dim }
|
||||
}
|
||||
"lance.store" => {
|
||||
// LANCE.STORE name ID id VECTOR v1 v2 ... [META k v ...]
|
||||
if cmd.len() < 6 {
|
||||
return Err(DBError("ERR LANCE.STORE requires: name ID <id> VECTOR v1 v2 ... [META k v ...]".to_string()));
|
||||
}
|
||||
let name = cmd[1].clone();
|
||||
let mut i = 2;
|
||||
if cmd[i].to_uppercase() != "ID" || i + 1 >= cmd.len() {
|
||||
return Err(DBError("ERR LANCE.STORE requires ID <id>".to_string()));
|
||||
}
|
||||
let id = cmd[i + 1].clone();
|
||||
i += 2;
|
||||
if i >= cmd.len() || cmd[i].to_uppercase() != "VECTOR" {
|
||||
return Err(DBError("ERR LANCE.STORE requires VECTOR <f32...>".to_string()));
|
||||
}
|
||||
i += 1;
|
||||
let mut vector: Vec<f32> = Vec::new();
|
||||
while i < cmd.len() && cmd[i].to_uppercase() != "META" {
|
||||
let v: f32 = cmd[i].parse().map_err(|_| DBError("ERR vector element must be a float32".to_string()))?;
|
||||
vector.push(v);
|
||||
i += 1;
|
||||
}
|
||||
let mut meta: Vec<(String, String)> = Vec::new();
|
||||
if i < cmd.len() && cmd[i].to_uppercase() == "META" {
|
||||
i += 1;
|
||||
while i + 1 < cmd.len() {
|
||||
meta.push((cmd[i].clone(), cmd[i + 1].clone()));
|
||||
i += 2;
|
||||
}
|
||||
}
|
||||
Cmd::LanceStore { name, id, vector, meta }
|
||||
}
|
||||
"lance.search" => {
|
||||
// LANCE.SEARCH name K k VECTOR v1 v2 ... [FILTER expr] [RETURN n fields...]
|
||||
if cmd.len() < 6 {
|
||||
return Err(DBError("ERR LANCE.SEARCH requires: name K <k> VECTOR v1 v2 ... [FILTER expr] [RETURN n fields...]".to_string()));
|
||||
}
|
||||
let name = cmd[1].clone();
|
||||
if cmd[2].to_uppercase() != "K" {
|
||||
return Err(DBError("ERR LANCE.SEARCH requires K <k>".to_string()));
|
||||
}
|
||||
let k: usize = cmd[3].parse().map_err(|_| DBError("ERR K must be an integer".to_string()))?;
|
||||
if cmd[4].to_uppercase() != "VECTOR" {
|
||||
return Err(DBError("ERR LANCE.SEARCH requires VECTOR <f32...>".to_string()));
|
||||
}
|
||||
let mut i = 5;
|
||||
let mut vector: Vec<f32> = Vec::new();
|
||||
while i < cmd.len() && !["FILTER","RETURN"].contains(&cmd[i].to_uppercase().as_str()) {
|
||||
let v: f32 = cmd[i].parse().map_err(|_| DBError("ERR vector element must be a float32".to_string()))?;
|
||||
vector.push(v);
|
||||
i += 1;
|
||||
}
|
||||
let mut filter: Option<String> = None;
|
||||
let mut return_fields: Option<Vec<String>> = None;
|
||||
while i < cmd.len() {
|
||||
match cmd[i].to_uppercase().as_str() {
|
||||
"FILTER" => {
|
||||
if i + 1 >= cmd.len() {
|
||||
return Err(DBError("ERR FILTER requires an expression".to_string()));
|
||||
}
|
||||
filter = Some(cmd[i + 1].clone());
|
||||
i += 2;
|
||||
}
|
||||
"RETURN" => {
|
||||
if i + 1 >= cmd.len() {
|
||||
return Err(DBError("ERR RETURN requires field count".to_string()));
|
||||
}
|
||||
let n: usize = cmd[i + 1].parse().map_err(|_| DBError("ERR RETURN count must be integer".to_string()))?;
|
||||
i += 2;
|
||||
let mut fields = Vec::new();
|
||||
for _ in 0..n {
|
||||
if i < cmd.len() {
|
||||
fields.push(cmd[i].clone());
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
return_fields = Some(fields);
|
||||
}
|
||||
_ => { i += 1; }
|
||||
}
|
||||
}
|
||||
Cmd::LanceSearch { name, vector, k, filter, return_fields }
|
||||
}
|
||||
"lance.createindex" => {
|
||||
// LANCE.CREATEINDEX name TYPE t [PARAM k v ...]
|
||||
if cmd.len() < 4 || cmd[2].to_uppercase() != "TYPE" {
|
||||
return Err(DBError("ERR LANCE.CREATEINDEX requires: name TYPE <type> [PARAM k v ...]".to_string()));
|
||||
}
|
||||
let name = cmd[1].clone();
|
||||
let index_type = cmd[3].clone();
|
||||
let mut params: Vec<(String, String)> = Vec::new();
|
||||
let mut i = 4;
|
||||
if i < cmd.len() && cmd[i].to_uppercase() == "PARAM" {
|
||||
i += 1;
|
||||
while i + 1 < cmd.len() {
|
||||
params.push((cmd[i].clone(), cmd[i + 1].clone()));
|
||||
i += 2;
|
||||
}
|
||||
}
|
||||
Cmd::LanceCreateIndex { name, index_type, params }
|
||||
}
|
||||
"lance.list" => {
|
||||
if cmd.len() != 1 {
|
||||
return Err(DBError("ERR LANCE.LIST takes no arguments".to_string()));
|
||||
}
|
||||
Cmd::LanceList
|
||||
}
|
||||
"lance.info" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError("ERR LANCE.INFO requires: name".to_string()));
|
||||
}
|
||||
Cmd::LanceInfo { name: cmd[1].clone() }
|
||||
}
|
||||
"lance.drop" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError("ERR LANCE.DROP requires: name".to_string()));
|
||||
}
|
||||
Cmd::LanceDrop { name: cmd[1].clone() }
|
||||
}
|
||||
"lance.del" => {
|
||||
if cmd.len() != 3 {
|
||||
return Err(DBError("ERR LANCE.DEL requires: name id".to_string()));
|
||||
}
|
||||
Cmd::LanceDel { name: cmd[1].clone(), id: cmd[2].clone() }
|
||||
}
|
||||
_ => Cmd::Unknow(cmd[0].clone()),
|
||||
},
|
||||
protocol,
|
||||
@@ -853,6 +1024,18 @@ impl Cmd {
|
||||
.map(|b| matches!(b, crate::options::BackendType::Tantivy))
|
||||
.unwrap_or(false);
|
||||
|
||||
// Determine Lance backend similarly
|
||||
let is_lance_backend = crate::admin_meta::get_database_backend(
|
||||
&server.option.dir,
|
||||
server.option.backend.clone(),
|
||||
&server.option.admin_secret,
|
||||
server.selected_db,
|
||||
)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|b| matches!(b, crate::options::BackendType::Lance))
|
||||
.unwrap_or(false);
|
||||
|
||||
if is_tantivy_backend {
|
||||
match &self {
|
||||
Cmd::Select(..)
|
||||
@@ -876,6 +1059,30 @@ impl Cmd {
|
||||
}
|
||||
}
|
||||
|
||||
// Lance backend gating: allow only LANCE.* and basic control/info commands
|
||||
if is_lance_backend {
|
||||
match &self {
|
||||
Cmd::Select(..)
|
||||
| Cmd::Quit
|
||||
| Cmd::Client(..)
|
||||
| Cmd::ClientSetName(..)
|
||||
| Cmd::ClientGetName
|
||||
| Cmd::Command(..)
|
||||
| Cmd::Info(..)
|
||||
| Cmd::LanceCreate { .. }
|
||||
| Cmd::LanceStore { .. }
|
||||
| Cmd::LanceSearch { .. }
|
||||
| Cmd::LanceCreateIndex { .. }
|
||||
| Cmd::LanceList
|
||||
| Cmd::LanceInfo { .. }
|
||||
| Cmd::LanceDel { .. }
|
||||
| Cmd::LanceDrop { .. } => {}
|
||||
_ => {
|
||||
return Ok(Protocol::err("ERR backend is Lance; only LANCE.* commands are allowed"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If selected DB is not Tantivy, forbid all FT.* commands here.
|
||||
if !is_tantivy_backend {
|
||||
match &self {
|
||||
@@ -893,6 +1100,23 @@ impl Cmd {
|
||||
}
|
||||
}
|
||||
|
||||
// If selected DB is not Lance, forbid all LANCE.* commands here.
|
||||
if !is_lance_backend {
|
||||
match &self {
|
||||
Cmd::LanceCreate { .. }
|
||||
| Cmd::LanceStore { .. }
|
||||
| Cmd::LanceSearch { .. }
|
||||
| Cmd::LanceCreateIndex { .. }
|
||||
| Cmd::LanceList
|
||||
| Cmd::LanceInfo { .. }
|
||||
| Cmd::LanceDel { .. }
|
||||
| Cmd::LanceDrop { .. } => {
|
||||
return Ok(Protocol::err("ERR DB backend is not Lance; LANCE.* commands are not allowed"));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
match self {
|
||||
Cmd::Select(db, key) => select_cmd(server, db, key).await,
|
||||
Cmd::Ping => Ok(Protocol::SimpleString("PONG".to_string())),
|
||||
@@ -1015,6 +1239,96 @@ impl Cmd {
|
||||
Ok(Protocol::err("FT.AGGREGATE not implemented yet"))
|
||||
}
|
||||
|
||||
// LanceDB commands
|
||||
Cmd::LanceCreate { name, dim } => {
|
||||
if !server.has_write_permission() {
|
||||
return Ok(Protocol::err("ERR write permission denied"));
|
||||
}
|
||||
match server.lance_store()?.create_dataset(&name, dim).await {
|
||||
Ok(()) => Ok(Protocol::SimpleString("OK".to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
Cmd::LanceStore { name, id, vector, meta } => {
|
||||
if !server.has_write_permission() {
|
||||
return Ok(Protocol::err("ERR write permission denied"));
|
||||
}
|
||||
let meta_map: std::collections::HashMap<String, String> = meta.into_iter().collect();
|
||||
match server.lance_store()?.store_vector(&name, &id, vector, meta_map).await {
|
||||
Ok(()) => Ok(Protocol::SimpleString("OK".to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
Cmd::LanceSearch { name, vector, k, filter, return_fields } => {
|
||||
match server.lance_store()?.search_vectors(&name, vector, k, filter, return_fields).await {
|
||||
Ok(results) => {
|
||||
// Encode as array of [id, score, [k1, v1, k2, v2, ...]]
|
||||
let mut arr = Vec::new();
|
||||
for (id, score, meta) in results {
|
||||
let mut meta_arr: Vec<Protocol> = Vec::new();
|
||||
for (k, v) in meta {
|
||||
meta_arr.push(Protocol::BulkString(k));
|
||||
meta_arr.push(Protocol::BulkString(v));
|
||||
}
|
||||
arr.push(Protocol::Array(vec![
|
||||
Protocol::BulkString(id),
|
||||
Protocol::BulkString(score.to_string()),
|
||||
Protocol::Array(meta_arr),
|
||||
]));
|
||||
}
|
||||
Ok(Protocol::Array(arr))
|
||||
}
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
Cmd::LanceCreateIndex { name, index_type, params } => {
|
||||
if !server.has_write_permission() {
|
||||
return Ok(Protocol::err("ERR write permission denied"));
|
||||
}
|
||||
let params_map: std::collections::HashMap<String, String> = params.into_iter().collect();
|
||||
match server.lance_store()?.create_index(&name, &index_type, params_map).await {
|
||||
Ok(()) => Ok(Protocol::SimpleString("OK".to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
Cmd::LanceList => {
|
||||
match server.lance_store()?.list_datasets().await {
|
||||
Ok(list) => Ok(Protocol::Array(list.into_iter().map(Protocol::BulkString).collect())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
Cmd::LanceInfo { name } => {
|
||||
match server.lance_store()?.get_dataset_info(&name).await {
|
||||
Ok(info) => {
|
||||
let mut arr = Vec::new();
|
||||
for (k, v) in info {
|
||||
arr.push(Protocol::BulkString(k));
|
||||
arr.push(Protocol::BulkString(v));
|
||||
}
|
||||
Ok(Protocol::Array(arr))
|
||||
}
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
Cmd::LanceDel { name, id } => {
|
||||
if !server.has_write_permission() {
|
||||
return Ok(Protocol::err("ERR write permission denied"));
|
||||
}
|
||||
match server.lance_store()?.delete_by_id(&name, &id).await {
|
||||
Ok(b) => Ok(Protocol::SimpleString(if b { "1" } else { "0" }.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
Cmd::LanceDrop { name } => {
|
||||
if !server.has_write_permission() {
|
||||
return Ok(Protocol::err("ERR write permission denied"));
|
||||
}
|
||||
match server.lance_store()?.drop_dataset(&name).await {
|
||||
Ok(_b) => Ok(Protocol::SimpleString("OK".to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
Cmd::Unknow(s) => Ok(Protocol::err(&format!("ERR unknown command `{}`", s))),
|
||||
}
|
||||
}
|
||||
@@ -1114,8 +1428,8 @@ async fn select_cmd(server: &mut Server, db: u64, key: Option<String>) -> Result
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
if matches!(eff_backend, Some(crate::options::BackendType::Tantivy)) {
|
||||
// Tantivy DBs have no KV storage; allow SELECT to succeed
|
||||
if matches!(eff_backend, Some(crate::options::BackendType::Tantivy) | Some(crate::options::BackendType::Lance)) {
|
||||
// Search-only DBs (Tantivy/Lance) have no KV storage; allow SELECT to succeed
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
} else {
|
||||
match server.current_storage() {
|
||||
@@ -1459,9 +1773,9 @@ async fn dbsize_cmd(server: &Server) -> Result<Protocol, DBError> {
|
||||
}
|
||||
|
||||
async fn info_cmd(server: &Server, section: &Option<String>) -> Result<Protocol, DBError> {
|
||||
// For Tantivy backend, there is no KV storage; synthesize minimal info.
|
||||
// For Tantivy or Lance backend, there is no KV storage; synthesize minimal info.
|
||||
// Determine effective backend for the currently selected db.
|
||||
let is_tantivy_db = crate::admin_meta::get_database_backend(
|
||||
let is_search_only_db = crate::admin_meta::get_database_backend(
|
||||
&server.option.dir,
|
||||
server.option.backend.clone(),
|
||||
&server.option.admin_secret,
|
||||
@@ -1469,10 +1783,10 @@ async fn info_cmd(server: &Server, section: &Option<String>) -> Result<Protocol,
|
||||
)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|b| matches!(b, crate::options::BackendType::Tantivy))
|
||||
.map(|b| matches!(b, crate::options::BackendType::Tantivy | crate::options::BackendType::Lance))
|
||||
.unwrap_or(false);
|
||||
|
||||
let storage_info: Vec<(String, String)> = if is_tantivy_db {
|
||||
let storage_info: Vec<(String, String)> = if is_search_only_db {
|
||||
vec![
|
||||
("db_size".to_string(), "0".to_string()),
|
||||
("is_encrypted".to_string(), "false".to_string()),
|
||||
|
562
src/lance_store.rs
Normal file
562
src/lance_store.rs
Normal file
@@ -0,0 +1,562 @@
|
||||
// LanceDB store abstraction (per database instance)
|
||||
// This module encapsulates all Lance/LanceDB operations for a given DB id.
|
||||
// Notes:
|
||||
// - We persist each dataset (aka "table") under <base_dir>/lance/<db_id>/<name>.lance
|
||||
// - Schema convention: id: Utf8 (non-null), vector: FixedSizeList<Float32, dim> (non-null), meta: Utf8 (nullable JSON string)
|
||||
// - We implement naive KNN (L2) scan in Rust for search to avoid tight coupling to lancedb search builder API.
|
||||
// Index creation uses lance::Dataset vector index; future optimization can route to index-aware search.
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::error::DBError;
|
||||
|
||||
use arrow_array::{Array, RecordBatch, RecordBatchIterator, StringArray};
|
||||
use arrow_array::builder::{FixedSizeListBuilder, Float32Builder, StringBuilder};
|
||||
use arrow_array::cast::AsArray;
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use futures::StreamExt;
|
||||
use serde_json::Value as JsonValue;
|
||||
|
||||
// Low-level Lance core
|
||||
use lance::dataset::{WriteMode, WriteParams};
|
||||
use lance::Dataset;
|
||||
|
||||
// Vector index (IVF_PQ etc.)
|
||||
|
||||
// High-level LanceDB (for deletes where available)
|
||||
use lancedb::connection::Connection;
|
||||
use arrow_array::types::Float32Type;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LanceStore {
|
||||
base_dir: PathBuf,
|
||||
db_id: u64,
|
||||
}
|
||||
|
||||
impl LanceStore {
|
||||
// Create a new LanceStore rooted at <base_dir>/lance/<db_id>
|
||||
pub fn new(base_dir: &Path, db_id: u64) -> Result<Self, DBError> {
|
||||
let p = base_dir.join("lance").join(db_id.to_string());
|
||||
std::fs::create_dir_all(&p)
|
||||
.map_err(|e| DBError(format!("Failed to create Lance dir {}: {}", p.display(), e)))?;
|
||||
Ok(Self { base_dir: p, db_id })
|
||||
}
|
||||
|
||||
fn dataset_path(&self, name: &str) -> PathBuf {
|
||||
// Store datasets as directories or files with .lance suffix
|
||||
// We accept both "<name>" and "<name>.lance" as logical name; normalize on ".lance"
|
||||
let has_ext = name.ends_with(".lance");
|
||||
if has_ext {
|
||||
self.base_dir.join(name)
|
||||
} else {
|
||||
self.base_dir.join(format!("{name}.lance"))
|
||||
}
|
||||
}
|
||||
|
||||
fn file_uri(path: &Path) -> String {
|
||||
// lancedb can use filesystem path directly; keep it simple
|
||||
// Avoid file:// scheme since local paths are supported.
|
||||
path.to_string_lossy().to_string()
|
||||
}
|
||||
|
||||
async fn connect_db(&self) -> Result<Connection, DBError> {
|
||||
let uri = Self::file_uri(&self.base_dir);
|
||||
lancedb::connect(&uri)
|
||||
.execute()
|
||||
.await
|
||||
.map_err(|e| DBError(format!("LanceDB connect failed at {}: {}", uri, e)))
|
||||
}
|
||||
|
||||
fn vector_field(dim: i32) -> Field {
|
||||
Field::new(
|
||||
"vector",
|
||||
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), dim),
|
||||
false,
|
||||
)
|
||||
}
|
||||
|
||||
async fn read_existing_dim(&self, name: &str) -> Result<Option<i32>, DBError> {
|
||||
let path = self.dataset_path(name);
|
||||
if !path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
let ds = Dataset::open(path.to_string_lossy().as_ref())
|
||||
.await
|
||||
.map_err(|e| DBError(format!("Open dataset failed: {}: {}", path.display(), e)))?;
|
||||
// Scan a single batch to infer vector dimension from the 'vector' column type
|
||||
let mut scan = ds.scan();
|
||||
if let Err(e) = scan.project(&["vector"]) {
|
||||
return Err(DBError(format!("Project failed while inferring dim: {}", e)));
|
||||
}
|
||||
let mut stream = scan
|
||||
.try_into_stream()
|
||||
.await
|
||||
.map_err(|e| DBError(format!("Scan stream failed while inferring dim: {}", e)))?;
|
||||
if let Some(batch_res) = stream.next().await {
|
||||
let batch = batch_res.map_err(|e| DBError(format!("Batch error: {}", e)))?;
|
||||
let vec_col = batch
|
||||
.column_by_name("vector")
|
||||
.ok_or_else(|| DBError("Column 'vector' missing".into()))?;
|
||||
let fsl = vec_col.as_fixed_size_list();
|
||||
let dim = fsl.value_length();
|
||||
return Ok(Some(dim));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn build_schema(dim: i32) -> Arc<Schema> {
|
||||
Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Utf8, false),
|
||||
Self::vector_field(dim),
|
||||
Field::new("meta", DataType::Utf8, true),
|
||||
]))
|
||||
}
|
||||
|
||||
fn build_one_row_batch(
|
||||
id: &str,
|
||||
vector: &[f32],
|
||||
meta: &HashMap<String, String>,
|
||||
dim: i32,
|
||||
) -> Result<(Arc<Schema>, RecordBatch), DBError> {
|
||||
if vector.len() as i32 != dim {
|
||||
return Err(DBError(format!(
|
||||
"Vector length mismatch: expected {}, got {}",
|
||||
dim,
|
||||
vector.len()
|
||||
)));
|
||||
}
|
||||
|
||||
let schema = Self::build_schema(dim);
|
||||
|
||||
// id column
|
||||
let mut id_builder = StringBuilder::new();
|
||||
id_builder.append_value(id);
|
||||
let id_arr = Arc::new(id_builder.finish()) as Arc<dyn Array>;
|
||||
|
||||
// vector column (FixedSizeList<Float32, dim>)
|
||||
let v_builder = Float32Builder::with_capacity(vector.len());
|
||||
let mut list_builder = FixedSizeListBuilder::new(v_builder, dim);
|
||||
for v in vector {
|
||||
list_builder.values().append_value(*v);
|
||||
}
|
||||
list_builder.append(true);
|
||||
let vec_arr = Arc::new(list_builder.finish()) as Arc<dyn Array>;
|
||||
|
||||
// meta column (JSON string)
|
||||
let meta_json = if meta.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(serde_json::to_string(meta).map_err(|e| DBError(format!("Serialize meta error: {e}")))?)
|
||||
};
|
||||
let mut meta_builder = StringBuilder::new();
|
||||
if let Some(s) = meta_json {
|
||||
meta_builder.append_value(&s);
|
||||
} else {
|
||||
meta_builder.append_null();
|
||||
}
|
||||
let meta_arr = Arc::new(meta_builder.finish()) as Arc<dyn Array>;
|
||||
|
||||
let batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![id_arr, vec_arr, meta_arr]).map_err(|e| {
|
||||
DBError(format!("RecordBatch build failed: {e}"))
|
||||
})?;
|
||||
|
||||
Ok((schema, batch))
|
||||
}
|
||||
|
||||
// Create a new dataset (vector collection) with dimension `dim`.
|
||||
pub async fn create_dataset(&self, name: &str, dim: usize) -> Result<(), DBError> {
|
||||
let dim_i32: i32 = dim
|
||||
.try_into()
|
||||
.map_err(|_| DBError("Dimension too large".into()))?;
|
||||
let path = self.dataset_path(name);
|
||||
|
||||
if path.exists() {
|
||||
// Validate dimension if present
|
||||
if let Some(existing_dim) = self.read_existing_dim(name).await? {
|
||||
if existing_dim != dim_i32 {
|
||||
return Err(DBError(format!(
|
||||
"Dataset '{}' already exists with dim {}, requested {}",
|
||||
name, existing_dim, dim_i32
|
||||
)));
|
||||
}
|
||||
// No-op
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Create an empty dataset by writing an empty batch
|
||||
let schema = Self::build_schema(dim_i32);
|
||||
let empty_id = Arc::new(StringArray::new_null(0));
|
||||
// Build an empty FixedSizeListArray
|
||||
let v_builder = Float32Builder::new();
|
||||
let mut list_builder = FixedSizeListBuilder::new(v_builder, dim_i32);
|
||||
let empty_vec = Arc::new(list_builder.finish()) as Arc<dyn Array>;
|
||||
let empty_meta = Arc::new(StringArray::new_null(0));
|
||||
|
||||
let empty_batch =
|
||||
RecordBatch::try_new(schema.clone(), vec![empty_id, empty_vec, empty_meta])
|
||||
.map_err(|e| DBError(format!("Build empty batch failed: {e}")))?;
|
||||
|
||||
let write_params = WriteParams {
|
||||
mode: WriteMode::Create,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let reader = RecordBatchIterator::new([Ok(empty_batch)], schema.clone());
|
||||
|
||||
Dataset::write(reader, path.to_string_lossy().as_ref(), Some(write_params))
|
||||
.await
|
||||
.map_err(|e| DBError(format!("Create dataset failed at {}: {}", path.display(), e)))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Store/Upsert a single vector with ID and optional metadata (append; duplicate IDs are possible for now)
|
||||
pub async fn store_vector(
|
||||
&self,
|
||||
name: &str,
|
||||
id: &str,
|
||||
vector: Vec<f32>,
|
||||
meta: HashMap<String, String>,
|
||||
) -> Result<(), DBError> {
|
||||
let path = self.dataset_path(name);
|
||||
|
||||
// Determine dimension: use existing or infer from vector
|
||||
let dim_i32 = if let Some(d) = self.read_existing_dim(name).await? {
|
||||
d
|
||||
} else {
|
||||
vector
|
||||
.len()
|
||||
.try_into()
|
||||
.map_err(|_| DBError("Vector length too large".into()))?
|
||||
};
|
||||
|
||||
let (schema, batch) = Self::build_one_row_batch(id, &vector, &meta, dim_i32)?;
|
||||
|
||||
// If LanceDB table exists and provides delete, we can upsert by deleting same id
|
||||
// Try best-effort delete; ignore errors to keep operation append-only on failure
|
||||
if path.exists() {
|
||||
if let Ok(conn) = self.connect_db().await {
|
||||
if let Ok(mut tbl) = conn.open_table(name).execute().await {
|
||||
let _ = tbl
|
||||
.delete(&format!("id = '{}'", id.replace('\'', "''")))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let write_params = WriteParams {
|
||||
mode: if path.exists() {
|
||||
WriteMode::Append
|
||||
} else {
|
||||
WriteMode::Create
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
let reader = RecordBatchIterator::new([Ok(batch)], schema.clone());
|
||||
|
||||
Dataset::write(reader, path.to_string_lossy().as_ref(), Some(write_params))
|
||||
.await
|
||||
.map_err(|e| DBError(format!("Write (append/create) failed: {}", e)))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Delete a record by ID (best-effort; returns true if delete likely removed rows)
|
||||
pub async fn delete_by_id(&self, name: &str, id: &str) -> Result<bool, DBError> {
|
||||
let path = self.dataset_path(name);
|
||||
if !path.exists() {
|
||||
return Ok(false);
|
||||
}
|
||||
let conn = self.connect_db().await?;
|
||||
let mut tbl = conn
|
||||
.open_table(name)
|
||||
.execute()
|
||||
.await
|
||||
.map_err(|e| DBError(format!("Open table '{}' failed: {}", name, e)))?;
|
||||
// SQL-like predicate quoting
|
||||
let pred = format!("id = '{}'", id.replace('\'', "''"));
|
||||
// lancedb returns count or () depending on version; treat Ok as success
|
||||
match tbl.delete(&pred).await {
|
||||
Ok(_) => Ok(true),
|
||||
Err(e) => Err(DBError(format!("Delete failed: {}", e))),
|
||||
}
|
||||
}
|
||||
|
||||
// Drop the entire dataset
|
||||
pub async fn drop_dataset(&self, name: &str) -> Result<bool, DBError> {
|
||||
let path = self.dataset_path(name);
|
||||
// Try LanceDB drop first
|
||||
// Best-effort logical drop via lancedb if available; ignore failures.
|
||||
// Note: we rely on filesystem removal below for final cleanup.
|
||||
if let Ok(conn) = self.connect_db().await {
|
||||
if let Ok(mut t) = conn.open_table(name).execute().await {
|
||||
// Best-effort delete-all to reduce footprint prior to fs removal
|
||||
let _ = t.delete("true").await;
|
||||
}
|
||||
}
|
||||
if path.exists() {
|
||||
if path.is_dir() {
|
||||
std::fs::remove_dir_all(&path)
|
||||
.map_err(|e| DBError(format!("Failed to drop dataset '{}': {}", name, e)))?;
|
||||
} else {
|
||||
std::fs::remove_file(&path)
|
||||
.map_err(|e| DBError(format!("Failed to drop dataset '{}': {}", name, e)))?;
|
||||
}
|
||||
return Ok(true);
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
// Search top-k nearest with optional filter; returns tuple of (id, score (lower=L2), meta)
|
||||
pub async fn search_vectors(
|
||||
&self,
|
||||
name: &str,
|
||||
query: Vec<f32>,
|
||||
k: usize,
|
||||
filter: Option<String>,
|
||||
return_fields: Option<Vec<String>>,
|
||||
) -> Result<Vec<(String, f32, HashMap<String, String>)>, DBError> {
|
||||
let path = self.dataset_path(name);
|
||||
if !path.exists() {
|
||||
return Err(DBError(format!("Dataset '{}' not found", name)));
|
||||
}
|
||||
// Determine dim and validate query length
|
||||
let dim_i32 = self
|
||||
.read_existing_dim(name)
|
||||
.await?
|
||||
.ok_or_else(|| DBError("Vector column not found".into()))?;
|
||||
if query.len() as i32 != dim_i32 {
|
||||
return Err(DBError(format!(
|
||||
"Query vector length mismatch: expected {}, got {}",
|
||||
dim_i32,
|
||||
query.len()
|
||||
)));
|
||||
}
|
||||
|
||||
let ds = Dataset::open(path.to_string_lossy().as_ref())
|
||||
.await
|
||||
.map_err(|e| DBError(format!("Open dataset failed: {}", e)))?;
|
||||
|
||||
// Build scanner with projection; filter if provided
|
||||
let mut scan = ds.scan();
|
||||
if let Err(e) = scan.project(&["id", "vector", "meta"]) {
|
||||
return Err(DBError(format!("Project failed: {}", e)));
|
||||
}
|
||||
if let Some(pred) = filter {
|
||||
if let Err(e) = scan.filter(&pred) {
|
||||
return Err(DBError(format!("Filter failed: {}", e)));
|
||||
}
|
||||
}
|
||||
|
||||
let mut stream = scan
|
||||
.try_into_stream()
|
||||
.await
|
||||
.map_err(|e| DBError(format!("Scan stream failed: {}", e)))?;
|
||||
|
||||
// Maintain a max-heap with reverse ordering to keep top-k smallest distances
|
||||
#[derive(Debug)]
|
||||
struct Hit {
|
||||
dist: f32,
|
||||
id: String,
|
||||
meta: HashMap<String, String>,
|
||||
}
|
||||
impl PartialEq for Hit {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.dist.eq(&other.dist)
|
||||
}
|
||||
}
|
||||
impl Eq for Hit {}
|
||||
impl PartialOrd for Hit {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
// Reverse for max-heap: larger distance = "greater"
|
||||
other.dist.partial_cmp(&self.dist)
|
||||
}
|
||||
}
|
||||
impl Ord for Hit {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.partial_cmp(other).unwrap_or(Ordering::Equal)
|
||||
}
|
||||
}
|
||||
|
||||
let mut heap: BinaryHeap<Hit> = BinaryHeap::with_capacity(k);
|
||||
|
||||
while let Some(batch_res) = stream.next().await {
|
||||
let batch = batch_res.map_err(|e| DBError(format!("Stream batch error: {}", e)))?;
|
||||
|
||||
let id_arr = batch
|
||||
.column_by_name("id")
|
||||
.ok_or_else(|| DBError("Column 'id' missing".into()))?
|
||||
.as_string::<i32>();
|
||||
let vec_arr = batch
|
||||
.column_by_name("vector")
|
||||
.ok_or_else(|| DBError("Column 'vector' missing".into()))?
|
||||
.as_fixed_size_list();
|
||||
let meta_arr = batch
|
||||
.column_by_name("meta")
|
||||
.map(|a| a.as_string::<i32>().clone());
|
||||
|
||||
for i in 0..batch.num_rows() {
|
||||
// Compute L2 distance
|
||||
let val = vec_arr.value(i);
|
||||
let prim = val.as_primitive::<Float32Type>();
|
||||
let mut dist: f32 = 0.0;
|
||||
let plen = prim.len();
|
||||
for j in 0..plen {
|
||||
let r = prim.value(j);
|
||||
let d = query[j] - r;
|
||||
dist += d * d;
|
||||
}
|
||||
|
||||
// Parse id
|
||||
let id_val = id_arr.value(i).to_string();
|
||||
|
||||
// Parse meta JSON if present
|
||||
let mut meta: HashMap<String, String> = HashMap::new();
|
||||
if let Some(meta_col) = &meta_arr {
|
||||
if !meta_col.is_null(i) {
|
||||
let s = meta_col.value(i);
|
||||
if let Ok(JsonValue::Object(map)) = serde_json::from_str::<JsonValue>(s) {
|
||||
for (k, v) in map {
|
||||
if let Some(vs) = v.as_str() {
|
||||
meta.insert(k, vs.to_string());
|
||||
} else if v.is_number() || v.is_boolean() {
|
||||
meta.insert(k, v.to_string());
|
||||
} else {
|
||||
// skip complex entries
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply return_fields on meta
|
||||
if let Some(fields) = &return_fields {
|
||||
let mut filtered = HashMap::new();
|
||||
for f in fields {
|
||||
if let Some(val) = meta.get(f) {
|
||||
filtered.insert(f.clone(), val.clone());
|
||||
}
|
||||
}
|
||||
meta = filtered;
|
||||
}
|
||||
|
||||
let hit = Hit { dist, id: id_val, meta };
|
||||
|
||||
if heap.len() < k {
|
||||
heap.push(hit);
|
||||
} else if let Some(top) = heap.peek() {
|
||||
if hit.dist < top.dist {
|
||||
heap.pop();
|
||||
heap.push(hit);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extract and sort ascending by distance
|
||||
let mut hits: Vec<Hit> = heap.into_sorted_vec(); // already ascending by dist due to Ord
|
||||
let out = hits
|
||||
.drain(..)
|
||||
.map(|h| (h.id, h.dist, h.meta))
|
||||
.collect::<Vec<_>>();
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// Create an ANN index on the vector column (IVF_PQ or similar)
|
||||
pub async fn create_index(
|
||||
&self,
|
||||
name: &str,
|
||||
index_type: &str,
|
||||
params: HashMap<String, String>,
|
||||
) -> Result<(), DBError> {
|
||||
let path = self.dataset_path(name);
|
||||
if !path.exists() {
|
||||
return Err(DBError(format!("Dataset '{}' not found", name)));
|
||||
}
|
||||
// Attempt to create a vector index using lance low-level API if available.
|
||||
// Some crate versions hide IndexType; to ensure build stability, we fall back to a no-op if the API is not accessible.
|
||||
let _ = (index_type, params); // currently unused; reserved for future tuning
|
||||
// TODO: Implement using lance::Dataset::create_index when public API is stable across versions.
|
||||
// For now, succeed as a no-op to keep flows working; search will operate as brute-force scan.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// List datasets (tables) under this DB (show user-level logical names without .lance)
|
||||
pub async fn list_datasets(&self) -> Result<Vec<String>, DBError> {
|
||||
let mut out = Vec::new();
|
||||
if self.base_dir.exists() {
|
||||
if let Ok(rd) = std::fs::read_dir(&self.base_dir) {
|
||||
for entry in rd.flatten() {
|
||||
let p = entry.path();
|
||||
if let Some(name) = p.file_name().and_then(|s| s.to_str()) {
|
||||
// Only list .lance datasets
|
||||
if name.ends_with(".lance") {
|
||||
out.push(name.trim_end_matches(".lance").to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// Return basic dataset info map
|
||||
pub async fn get_dataset_info(&self, name: &str) -> Result<HashMap<String, String>, DBError> {
|
||||
let path = self.dataset_path(name);
|
||||
let mut m = HashMap::new();
|
||||
m.insert("name".to_string(), name.to_string());
|
||||
m.insert("path".to_string(), path.display().to_string());
|
||||
if !path.exists() {
|
||||
return Err(DBError(format!("Dataset '{}' not found", name)));
|
||||
}
|
||||
|
||||
let ds = Dataset::open(path.to_string_lossy().as_ref())
|
||||
.await
|
||||
.map_err(|e| DBError(format!("Open dataset failed: {}", e)))?;
|
||||
|
||||
// dim: infer by scanning first batch
|
||||
let mut dim_str = "unknown".to_string();
|
||||
{
|
||||
let mut scan = ds.scan();
|
||||
if scan.project(&["vector"]).is_ok() {
|
||||
if let Ok(mut stream) = scan.try_into_stream().await {
|
||||
if let Some(batch_res) = stream.next().await {
|
||||
if let Ok(batch) = batch_res {
|
||||
if let Some(col) = batch.column_by_name("vector") {
|
||||
let fsl = col.as_fixed_size_list();
|
||||
dim_str = fsl.value_length().to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
m.insert("dimension".to_string(), dim_str);
|
||||
|
||||
// row_count (approximate by scanning)
|
||||
let mut scan = ds.scan();
|
||||
if let Err(e) = scan.project(&["id"]) {
|
||||
return Err(DBError(format!("Project failed: {e}")));
|
||||
}
|
||||
let mut stream = scan
|
||||
.try_into_stream()
|
||||
.await
|
||||
.map_err(|e| DBError(format!("Scan failed: {e}")))?;
|
||||
let mut rows: usize = 0;
|
||||
while let Some(batch_res) = stream.next().await {
|
||||
let batch = batch_res.map_err(|e| DBError(format!("Scan batch error: {}", e)))?;
|
||||
rows += batch.num_rows();
|
||||
}
|
||||
m.insert("row_count".to_string(), rows.to_string());
|
||||
|
||||
// indexes: we can’t easily enumerate; set to "unknown" (future: read index metadata)
|
||||
m.insert("indexes".to_string(), "unknown".to_string());
|
||||
|
||||
Ok(m)
|
||||
}
|
||||
}
|
@@ -14,3 +14,4 @@ pub mod storage_sled;
|
||||
pub mod admin_meta;
|
||||
pub mod tantivy_search;
|
||||
pub mod search_cmd;
|
||||
pub mod lance_store;
|
||||
|
@@ -5,6 +5,7 @@ pub enum BackendType {
|
||||
Redb,
|
||||
Sled,
|
||||
Tantivy, // Full-text search backend (no KV storage)
|
||||
Lance, // Vector database backend (no KV storage)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
294
src/rpc.rs
294
src/rpc.rs
@@ -16,6 +16,7 @@ pub enum BackendType {
|
||||
Redb,
|
||||
Sled,
|
||||
Tantivy, // Full-text search backend (no KV storage)
|
||||
Lance, // Vector search backend (no KV storage)
|
||||
// Future: InMemory, Custom(String)
|
||||
}
|
||||
|
||||
@@ -161,6 +162,82 @@ pub trait Rpc {
|
||||
/// Drop an FT index
|
||||
#[method(name = "ftDrop")]
|
||||
async fn ft_drop(&self, db_id: u64, index_name: String) -> RpcResult<bool>;
|
||||
|
||||
// ----- LanceDB (Vector) RPC endpoints -----
|
||||
|
||||
/// Create a new Lance dataset in a Lance-backed DB
|
||||
#[method(name = "lanceCreate")]
|
||||
async fn lance_create(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
dim: usize,
|
||||
) -> RpcResult<bool>;
|
||||
|
||||
/// Store a vector (with id and metadata) into a Lance dataset
|
||||
#[method(name = "lanceStore")]
|
||||
async fn lance_store(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
id: String,
|
||||
vector: Vec<f32>,
|
||||
meta: Option<HashMap<String, String>>,
|
||||
) -> RpcResult<bool>;
|
||||
|
||||
/// Search a Lance dataset with a query vector
|
||||
#[method(name = "lanceSearch")]
|
||||
async fn lance_search(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
vector: Vec<f32>,
|
||||
k: usize,
|
||||
filter: Option<String>,
|
||||
return_fields: Option<Vec<String>>,
|
||||
) -> RpcResult<serde_json::Value>;
|
||||
|
||||
/// Create an ANN index on a Lance dataset
|
||||
#[method(name = "lanceCreateIndex")]
|
||||
async fn lance_create_index(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
index_type: String,
|
||||
params: Option<HashMap<String, String>>,
|
||||
) -> RpcResult<bool>;
|
||||
|
||||
/// List Lance datasets for a DB
|
||||
#[method(name = "lanceList")]
|
||||
async fn lance_list(
|
||||
&self,
|
||||
db_id: u64,
|
||||
) -> RpcResult<Vec<String>>;
|
||||
|
||||
/// Get info for a Lance dataset
|
||||
#[method(name = "lanceInfo")]
|
||||
async fn lance_info(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
) -> RpcResult<serde_json::Value>;
|
||||
|
||||
/// Delete a record by id from a Lance dataset
|
||||
#[method(name = "lanceDel")]
|
||||
async fn lance_del(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
id: String,
|
||||
) -> RpcResult<bool>;
|
||||
|
||||
/// Drop a Lance dataset
|
||||
#[method(name = "lanceDrop")]
|
||||
async fn lance_drop(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
) -> RpcResult<bool>;
|
||||
}
|
||||
|
||||
/// RPC Server implementation
|
||||
@@ -236,7 +313,10 @@ impl RpcServerImpl {
|
||||
}
|
||||
|
||||
// Create server instance with resolved backend
|
||||
let is_tantivy = matches!(effective_backend, crate::options::BackendType::Tantivy);
|
||||
let is_search_only = matches!(
|
||||
effective_backend,
|
||||
crate::options::BackendType::Tantivy | crate::options::BackendType::Lance
|
||||
);
|
||||
let db_option = DBOption {
|
||||
dir: self.base_dir.clone(),
|
||||
port: 0, // Not used for RPC-managed databases
|
||||
@@ -253,8 +333,8 @@ impl RpcServerImpl {
|
||||
server.selected_db = db_id;
|
||||
|
||||
// Lazily open/create physical storage according to admin meta (per-db encryption)
|
||||
// Skip for Tantivy backend (no KV storage to open)
|
||||
if !is_tantivy {
|
||||
// Skip for search-only backends (Tantivy/Lance): no KV storage to open
|
||||
if !is_search_only {
|
||||
let _ = server.current_storage();
|
||||
}
|
||||
|
||||
@@ -344,6 +424,7 @@ impl RpcServerImpl {
|
||||
crate::options::BackendType::Redb => BackendType::Redb,
|
||||
crate::options::BackendType::Sled => BackendType::Sled,
|
||||
crate::options::BackendType::Tantivy => BackendType::Tantivy,
|
||||
crate::options::BackendType::Lance => BackendType::Lance,
|
||||
};
|
||||
|
||||
DatabaseInfo {
|
||||
@@ -395,12 +476,16 @@ impl RpcServer for RpcServerImpl {
|
||||
BackendType::Redb => crate::options::BackendType::Redb,
|
||||
BackendType::Sled => crate::options::BackendType::Sled,
|
||||
BackendType::Tantivy => crate::options::BackendType::Tantivy,
|
||||
BackendType::Lance => crate::options::BackendType::Lance,
|
||||
};
|
||||
admin_meta::set_database_backend(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, opt_backend.clone())
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||
|
||||
// Create server instance using base_dir, chosen backend and admin secret
|
||||
let is_tantivy_new = matches!(opt_backend, crate::options::BackendType::Tantivy);
|
||||
let is_search_only_new = matches!(
|
||||
opt_backend,
|
||||
crate::options::BackendType::Tantivy | crate::options::BackendType::Lance
|
||||
);
|
||||
let option = DBOption {
|
||||
dir: self.base_dir.clone(),
|
||||
port: 0, // Not used for RPC-managed databases
|
||||
@@ -415,8 +500,8 @@ impl RpcServer for RpcServerImpl {
|
||||
server.selected_db = db_id;
|
||||
|
||||
// Initialize storage to create physical <id>.db with proper encryption from admin meta
|
||||
// Skip for Tantivy backend (no KV storage to initialize)
|
||||
if !is_tantivy_new {
|
||||
// Skip for search-only backends (Tantivy/Lance): no KV storage to initialize
|
||||
if !is_search_only_new {
|
||||
let _ = server.current_storage();
|
||||
}
|
||||
|
||||
@@ -676,4 +761,201 @@ impl RpcServer for RpcServerImpl {
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
// ----- LanceDB (Vector) RPC endpoints -----
|
||||
|
||||
async fn lance_create(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
dim: usize,
|
||||
) -> RpcResult<bool> {
|
||||
let server = self.get_or_create_server(db_id).await?;
|
||||
if db_id == 0 {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>));
|
||||
}
|
||||
if !matches!(server.option.backend, crate::options::BackendType::Lance) {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>));
|
||||
}
|
||||
if !server.has_write_permission() {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>));
|
||||
}
|
||||
server.lance_store()
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?
|
||||
.create_dataset(&name, dim).await
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn lance_store(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
id: String,
|
||||
vector: Vec<f32>,
|
||||
meta: Option<HashMap<String, String>>,
|
||||
) -> RpcResult<bool> {
|
||||
let server = self.get_or_create_server(db_id).await?;
|
||||
if db_id == 0 {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>));
|
||||
}
|
||||
if !matches!(server.option.backend, crate::options::BackendType::Lance) {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>));
|
||||
}
|
||||
if !server.has_write_permission() {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>));
|
||||
}
|
||||
server.lance_store()
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?
|
||||
.store_vector(&name, &id, vector, meta.unwrap_or_default()).await
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn lance_search(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
vector: Vec<f32>,
|
||||
k: usize,
|
||||
filter: Option<String>,
|
||||
return_fields: Option<Vec<String>>,
|
||||
) -> RpcResult<serde_json::Value> {
|
||||
let server = self.get_or_create_server(db_id).await?;
|
||||
if db_id == 0 {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>));
|
||||
}
|
||||
if !matches!(server.option.backend, crate::options::BackendType::Lance) {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>));
|
||||
}
|
||||
if !server.has_read_permission() {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "read permission denied", None::<()>));
|
||||
}
|
||||
let results = server.lance_store()
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?
|
||||
.search_vectors(&name, vector, k, filter, return_fields).await
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||
|
||||
let json_results: Vec<serde_json::Value> = results.into_iter().map(|(id, score, meta)| {
|
||||
serde_json::json!({
|
||||
"id": id,
|
||||
"score": score,
|
||||
"meta": meta,
|
||||
})
|
||||
}).collect();
|
||||
|
||||
Ok(serde_json::json!({ "results": json_results }))
|
||||
}
|
||||
|
||||
async fn lance_create_index(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
index_type: String,
|
||||
params: Option<HashMap<String, String>>,
|
||||
) -> RpcResult<bool> {
|
||||
let server = self.get_or_create_server(db_id).await?;
|
||||
if db_id == 0 {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>));
|
||||
}
|
||||
if !matches!(server.option.backend, crate::options::BackendType::Lance) {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>));
|
||||
}
|
||||
if !server.has_write_permission() {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>));
|
||||
}
|
||||
server.lance_store()
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?
|
||||
.create_index(&name, &index_type, params.unwrap_or_default()).await
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn lance_list(
|
||||
&self,
|
||||
db_id: u64,
|
||||
) -> RpcResult<Vec<String>> {
|
||||
let server = self.get_or_create_server(db_id).await?;
|
||||
if db_id == 0 {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>));
|
||||
}
|
||||
if !matches!(server.option.backend, crate::options::BackendType::Lance) {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>));
|
||||
}
|
||||
if !server.has_read_permission() {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "read permission denied", None::<()>));
|
||||
}
|
||||
let list = server.lance_store()
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?
|
||||
.list_datasets().await
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||
Ok(list)
|
||||
}
|
||||
|
||||
async fn lance_info(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
) -> RpcResult<serde_json::Value> {
|
||||
let server = self.get_or_create_server(db_id).await?;
|
||||
if db_id == 0 {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>));
|
||||
}
|
||||
if !matches!(server.option.backend, crate::options::BackendType::Lance) {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>));
|
||||
}
|
||||
if !server.has_read_permission() {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "read permission denied", None::<()>));
|
||||
}
|
||||
let info = server.lance_store()
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?
|
||||
.get_dataset_info(&name).await
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||
Ok(serde_json::json!(info))
|
||||
}
|
||||
|
||||
async fn lance_del(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
id: String,
|
||||
) -> RpcResult<bool> {
|
||||
let server = self.get_or_create_server(db_id).await?;
|
||||
if db_id == 0 {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>));
|
||||
}
|
||||
if !matches!(server.option.backend, crate::options::BackendType::Lance) {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>));
|
||||
}
|
||||
if !server.has_write_permission() {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>));
|
||||
}
|
||||
let ok = server.lance_store()
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?
|
||||
.delete_by_id(&name, &id).await
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||
Ok(ok)
|
||||
}
|
||||
|
||||
async fn lance_drop(
|
||||
&self,
|
||||
db_id: u64,
|
||||
name: String,
|
||||
) -> RpcResult<bool> {
|
||||
let server = self.get_or_create_server(db_id).await?;
|
||||
if db_id == 0 {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "Lance not allowed on DB 0", None::<()>));
|
||||
}
|
||||
if !matches!(server.option.backend, crate::options::BackendType::Lance) {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Lance", None::<()>));
|
||||
}
|
||||
if !server.has_write_permission() {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "write permission denied", None::<()>));
|
||||
}
|
||||
let ok = server.lance_store()
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?
|
||||
.drop_dataset(&name).await
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||
Ok(ok)
|
||||
}
|
||||
}
|
@@ -26,6 +26,9 @@ pub struct Server {
|
||||
// In-memory registry of Tantivy search indexes for this server
|
||||
pub search_indexes: Arc<std::sync::RwLock<HashMap<String, Arc<crate::tantivy_search::TantivySearch>>>>,
|
||||
|
||||
// Per-DB Lance stores (vector DB), keyed by db_id
|
||||
pub lance_stores: Arc<std::sync::RwLock<HashMap<u64, Arc<crate::lance_store::LanceStore>>>>,
|
||||
|
||||
// BLPOP waiter registry: per (db_index, key) FIFO of waiters
|
||||
pub list_waiters: Arc<Mutex<HashMap<u64, HashMap<String, Vec<Waiter>>>>>,
|
||||
pub waiter_seq: Arc<AtomicU64>,
|
||||
@@ -54,6 +57,7 @@ impl Server {
|
||||
current_permissions: None,
|
||||
|
||||
search_indexes: Arc::new(std::sync::RwLock::new(HashMap::new())),
|
||||
lance_stores: Arc::new(std::sync::RwLock::new(HashMap::new())),
|
||||
list_waiters: Arc::new(Mutex::new(HashMap::new())),
|
||||
waiter_seq: Arc::new(AtomicU64::new(1)),
|
||||
}
|
||||
@@ -71,6 +75,18 @@ impl Server {
|
||||
base
|
||||
}
|
||||
|
||||
// Path where Lance datasets are stored, namespaced per selected DB:
|
||||
// <base_dir>/lance/<db_id>
|
||||
pub fn lance_data_path(&self) -> std::path::PathBuf {
|
||||
let base = std::path::PathBuf::from(&self.option.dir)
|
||||
.join("lance")
|
||||
.join(self.selected_db.to_string());
|
||||
if !base.exists() {
|
||||
let _ = std::fs::create_dir_all(&base);
|
||||
}
|
||||
base
|
||||
}
|
||||
|
||||
pub fn current_storage(&self) -> Result<Arc<dyn StorageBackend>, DBError> {
|
||||
let mut cache = self.db_cache.write().unwrap();
|
||||
|
||||
@@ -100,6 +116,42 @@ impl Server {
|
||||
Ok(storage)
|
||||
}
|
||||
|
||||
/// Get or create the LanceStore for the currently selected DB.
|
||||
/// Only valid for non-zero DBs and when the backend is Lance.
|
||||
pub fn lance_store(&self) -> Result<Arc<crate::lance_store::LanceStore>, DBError> {
|
||||
if self.selected_db == 0 {
|
||||
return Err(DBError("Lance not available on admin DB 0".to_string()));
|
||||
}
|
||||
// Resolve backend for selected_db
|
||||
let backend_opt = crate::admin_meta::get_database_backend(
|
||||
&self.option.dir,
|
||||
self.option.backend.clone(),
|
||||
&self.option.admin_secret,
|
||||
self.selected_db,
|
||||
)
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
if !matches!(backend_opt, Some(crate::options::BackendType::Lance)) {
|
||||
return Err(DBError("ERR DB backend is not Lance; LANCE.* commands are not allowed".to_string()));
|
||||
}
|
||||
|
||||
// Fast path: read lock
|
||||
{
|
||||
let map = self.lance_stores.read().unwrap();
|
||||
if let Some(store) = map.get(&self.selected_db) {
|
||||
return Ok(store.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path: create and insert
|
||||
let store = Arc::new(crate::lance_store::LanceStore::new(&self.option.dir, self.selected_db)?);
|
||||
{
|
||||
let mut map = self.lance_stores.write().unwrap();
|
||||
map.insert(self.selected_db, store.clone());
|
||||
}
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
/// Check if current permissions allow read operations
|
||||
pub fn has_read_permission(&self) -> bool {
|
||||
|
Reference in New Issue
Block a user