implementation of tantivy datastore + updated RPC calls to deal with tantivy + docs

This commit is contained in:
Maxime Van Hees
2025-09-23 17:15:40 +02:00
parent c470772a13
commit 22ac4c9ed6
11 changed files with 2508 additions and 10 deletions

View File

@@ -91,6 +91,41 @@ pub enum Cmd {
SymKeygen,
SymEncrypt(String, String), // key_b64, message
SymDecrypt(String, String), // key_b64, ciphertext_b64
// Full-text search commands with schema support
FtCreate {
index_name: String,
schema: Vec<(String, String, Vec<String>)>, // (field_name, field_type, options)
},
FtAdd {
index_name: String,
doc_id: String,
score: f64,
fields: std::collections::HashMap<String, String>,
},
FtSearch {
index_name: String,
query: String,
filters: Vec<(String, String)>, // field, value pairs
limit: Option<usize>,
offset: Option<usize>,
return_fields: Option<Vec<String>>,
},
FtDel(String, String), // index_name, doc_id
FtInfo(String), // index_name
FtDrop(String), // index_name
FtAlter {
index_name: String,
field_name: String,
field_type: String,
options: Vec<String>,
},
FtAggregate {
index_name: String,
query: String,
group_by: Vec<String>,
reducers: Vec<String>,
}
}
impl Cmd {
@@ -646,6 +681,140 @@ impl Cmd {
_ => return Err(DBError(format!("unsupported SYM subcommand {:?}", cmd))),
}
}
"ft.create" => {
if cmd.len() < 4 || cmd[2].to_uppercase() != "SCHEMA" {
return Err(DBError("ERR FT.CREATE requires: indexname SCHEMA field1 type1 [options] ...".to_string()));
}
let index_name = cmd[1].clone();
let mut schema = Vec::new();
let mut i = 3;
while i < cmd.len() {
if i + 1 >= cmd.len() {
return Err(DBError("ERR incomplete field definition".to_string()));
}
let field_name = cmd[i].clone();
let field_type = cmd[i + 1].to_uppercase();
let mut options = Vec::new();
i += 2;
// Parse field options until we hit another field name or end
while i < cmd.len()
&& ["WEIGHT","SORTABLE","NOINDEX","SEPARATOR","CASESENSITIVE"]
.contains(&cmd[i].to_uppercase().as_str())
{
options.push(cmd[i].to_uppercase());
i += 1;
// If this option takes a value, consume it too
if i > 0 && ["SEPARATOR","WEIGHT"].contains(&cmd[i - 1].to_uppercase().as_str()) && i < cmd.len() {
options.push(cmd[i].clone());
i += 1;
}
}
schema.push((field_name, field_type, options));
}
Cmd::FtCreate { index_name, schema }
}
"ft.add" => {
if cmd.len() < 5 {
return Err(DBError("ERR FT.ADD requires: index_name doc_id score field value ...".to_string()));
}
let index_name = cmd[1].clone();
let doc_id = cmd[2].clone();
let score = cmd[3].parse::<f64>().map_err(|_| DBError("ERR score must be a number".to_string()))?;
let mut fields = std::collections::HashMap::new();
let mut i = 4;
while i + 1 < cmd.len() {
fields.insert(cmd[i].clone(), cmd[i + 1].clone());
i += 2;
}
Cmd::FtAdd { index_name, doc_id, score, fields }
}
"ft.search" => {
if cmd.len() < 3 {
return Err(DBError("ERR FT.SEARCH requires: index_name query [options]".to_string()));
}
let index_name = cmd[1].clone();
let query = cmd[2].clone();
let mut filters = Vec::new();
let mut limit = None;
let mut offset = None;
let mut return_fields = None;
let mut i = 3;
while i < cmd.len() {
match cmd[i].to_uppercase().as_str() {
"FILTER" => {
if i + 2 >= cmd.len() {
return Err(DBError("ERR FILTER requires field and value".to_string()));
}
filters.push((cmd[i + 1].clone(), cmd[i + 2].clone()));
i += 3;
}
"LIMIT" => {
if i + 2 >= cmd.len() {
return Err(DBError("ERR LIMIT requires offset and num".to_string()));
}
offset = Some(cmd[i + 1].parse().unwrap_or(0));
limit = Some(cmd[i + 2].parse().unwrap_or(10));
i += 3;
}
"RETURN" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR RETURN requires field count".to_string()));
}
let count: usize = cmd[i + 1].parse().unwrap_or(0);
i += 2;
let mut fields = Vec::new();
for _ in 0..count {
if i < cmd.len() {
fields.push(cmd[i].clone());
i += 1;
}
}
return_fields = Some(fields);
}
_ => i += 1,
}
}
Cmd::FtSearch { index_name, query, filters, limit, offset, return_fields }
}
"ft.del" => {
if cmd.len() != 3 {
return Err(DBError("ERR FT.DEL requires: index_name doc_id".to_string()));
}
Cmd::FtDel(cmd[1].clone(), cmd[2].clone())
}
"ft.info" => {
if cmd.len() != 2 {
return Err(DBError("ERR FT.INFO requires: index_name".to_string()));
}
Cmd::FtInfo(cmd[1].clone())
}
"ft.drop" => {
if cmd.len() != 2 {
return Err(DBError("ERR FT.DROP requires: index_name".to_string()));
}
Cmd::FtDrop(cmd[1].clone())
}
"ft.alter" => {
if cmd.len() < 5 {
return Err(DBError("ERR FT.ALTER requires: index_name field_name field_type [options]".to_string()));
}
let index_name = cmd[1].clone();
let field_name = cmd[2].clone();
let field_type = cmd[3].clone();
let options = if cmd.len() > 4 { cmd[4..].to_vec() } else { vec![] };
Cmd::FtAlter { index_name, field_name, field_type, options }
}
"ft.aggregate" => {
if cmd.len() < 3 {
return Err(DBError("ERR FT.AGGREGATE requires: index_name query [options]".to_string()));
}
let index_name = cmd[1].clone();
let query = cmd[2].clone();
// Minimal parse for now
let group_by = Vec::new();
let reducers = Vec::new();
Cmd::FtAggregate { index_name, query, group_by, reducers }
}
_ => Cmd::Unknow(cmd[0].clone()),
},
protocol,
@@ -671,6 +840,59 @@ impl Cmd {
return Ok(Protocol::SimpleString("QUEUED".to_string()));
}
// Backend gating for Tantivy-only DBs: allow only FT.* and basic control/info commands
// Determine per-selected-db backend via admin meta (not process default).
let is_tantivy_backend = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
server.selected_db,
)
.ok()
.flatten()
.map(|b| matches!(b, crate::options::BackendType::Tantivy))
.unwrap_or(false);
if is_tantivy_backend {
match &self {
Cmd::Select(..)
| Cmd::Quit
| Cmd::Client(..)
| Cmd::ClientSetName(..)
| Cmd::ClientGetName
| Cmd::Command(..)
| Cmd::Info(..)
| Cmd::FtCreate { .. }
| Cmd::FtAdd { .. }
| Cmd::FtSearch { .. }
| Cmd::FtDel(..)
| Cmd::FtInfo(..)
| Cmd::FtDrop(..)
| Cmd::FtAlter { .. }
| Cmd::FtAggregate { .. } => {}
_ => {
return Ok(Protocol::err("ERR backend is Tantivy; only FT.* commands are allowed"));
}
}
}
// If selected DB is not Tantivy, forbid all FT.* commands here.
if !is_tantivy_backend {
match &self {
Cmd::FtCreate { .. }
| Cmd::FtAdd { .. }
| Cmd::FtSearch { .. }
| Cmd::FtDel(..)
| Cmd::FtInfo(..)
| Cmd::FtDrop(..)
| Cmd::FtAlter { .. }
| Cmd::FtAggregate { .. } => {
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
}
_ => {}
}
}
match self {
Cmd::Select(db, key) => select_cmd(server, db, key).await,
Cmd::Ping => Ok(Protocol::SimpleString("PONG".to_string())),
@@ -767,6 +989,32 @@ impl Cmd {
Cmd::SymEncrypt(key_b64, message) => Ok(crate::sym::cmd_sym_encrypt(&key_b64, &message).await),
Cmd::SymDecrypt(key_b64, ct_b64) => Ok(crate::sym::cmd_sym_decrypt(&key_b64, &ct_b64).await),
// Full-text search commands
Cmd::FtCreate { index_name, schema } => {
crate::search_cmd::ft_create_cmd(server, index_name, schema).await
}
Cmd::FtAdd { index_name, doc_id, score, fields } => {
crate::search_cmd::ft_add_cmd(server, index_name, doc_id, score, fields).await
}
Cmd::FtSearch { index_name, query, filters, limit, offset, return_fields } => {
crate::search_cmd::ft_search_cmd(server, index_name, query, filters, limit, offset, return_fields).await
}
Cmd::FtDel(index_name, doc_id) => {
crate::search_cmd::ft_del_cmd(server, index_name, doc_id).await
}
Cmd::FtInfo(index_name) => {
crate::search_cmd::ft_info_cmd(server, index_name).await
}
Cmd::FtDrop(index_name) => {
crate::search_cmd::ft_drop_cmd(server, index_name).await
}
Cmd::FtAlter { .. } => {
Ok(Protocol::err("FT.ALTER not implemented yet"))
}
Cmd::FtAggregate { .. } => {
Ok(Protocol::err("FT.AGGREGATE not implemented yet"))
}
Cmd::Unknow(s) => Ok(Protocol::err(&format!("ERR unknown command `{}`", s))),
}
}
@@ -852,13 +1100,28 @@ async fn select_cmd(server: &mut Server, db: u64, key: Option<String>) -> Result
None => return Ok(Protocol::err("ERR invalid access key")),
};
// Set selected database and permissions, then open storage
// Set selected database and permissions, then open storage (skip for Tantivy backend)
server.selected_db = db;
server.current_permissions = Some(perms);
match server.current_storage() {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
// Resolve effective backend for this db_id from admin meta
let eff_backend = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
db,
)
.ok()
.flatten();
if matches!(eff_backend, Some(crate::options::BackendType::Tantivy)) {
// Tantivy DBs have no KV storage; allow SELECT to succeed
Ok(Protocol::SimpleString("OK".to_string()))
} else {
match server.current_storage() {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
}
@@ -1196,7 +1459,27 @@ async fn dbsize_cmd(server: &Server) -> Result<Protocol, DBError> {
}
async fn info_cmd(server: &Server, section: &Option<String>) -> Result<Protocol, DBError> {
let storage_info = server.current_storage()?.info()?;
// For Tantivy backend, there is no KV storage; synthesize minimal info.
// Determine effective backend for the currently selected db.
let is_tantivy_db = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
server.selected_db,
)
.ok()
.flatten()
.map(|b| matches!(b, crate::options::BackendType::Tantivy))
.unwrap_or(false);
let storage_info: Vec<(String, String)> = if is_tantivy_db {
vec![
("db_size".to_string(), "0".to_string()),
("is_encrypted".to_string(), "false".to_string()),
]
} else {
server.current_storage()?.info()?
};
let mut info_map: std::collections::HashMap<String, String> = storage_info.into_iter().collect();
info_map.insert("redis_version".to_string(), "7.0.0".to_string());