diff --git a/src/admin_meta.rs b/src/admin_meta.rs index 5d9c545..d6454b3 100644 --- a/src/admin_meta.rs +++ b/src/admin_meta.rs @@ -432,20 +432,26 @@ pub fn verify_access( return Ok(None); } - // Public? - if load_public(&admin, id)? { - return Ok(Some(Permissions::ReadWrite)); - } + let is_public = load_public(&admin, id)?; - // Private: require key and verify + // If a key is explicitly provided, enforce its validity strictly. + // Do NOT fall back to public when an invalid key is supplied. if let Some(k) = key_opt { let hash = crate::rpc::hash_key(k); if let Some(v) = admin.hget(&k_meta_db_keys(id), &hash)? { let (perm, _ts) = parse_perm_value(&v); return Ok(Some(perm)); } + // Invalid key + return Ok(None); + } + + // No key provided: allow access if DB is public, otherwise deny + if is_public { + Ok(Some(Permissions::ReadWrite)) + } else { + Ok(None) } - Ok(None) } // Enumerate all db ids diff --git a/src/rpc.rs b/src/rpc.rs index c43e509..c601535 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -505,10 +505,15 @@ impl RpcServer for RpcServerImpl { if !matches!(server.option.backend, crate::options::BackendType::Tantivy) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>)); } - crate::search_cmd::ft_create_cmd(&*server, index_name, schema) + let proto = crate::search_cmd::ft_create_cmd(&*server, index_name, schema) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; - Ok(true) + match proto { + crate::protocol::Protocol::Error(msg) => { + Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) + } + _ => Ok(true), + } } async fn ft_add( @@ -526,10 +531,15 @@ impl RpcServer for RpcServerImpl { if !matches!(server.option.backend, crate::options::BackendType::Tantivy) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>)); } - crate::search_cmd::ft_add_cmd(&*server, index_name, doc_id, score, fields) + let proto = crate::search_cmd::ft_add_cmd(&*server, index_name, doc_id, score, fields) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; - Ok(true) + match proto { + crate::protocol::Protocol::Error(msg) => { + Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) + } + _ => Ok(true), + } } async fn ft_search( @@ -560,7 +570,12 @@ impl RpcServer for RpcServerImpl { ) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; - Ok(serde_json::json!({ "resp": proto.encode() })) + match proto { + crate::protocol::Protocol::Error(msg) => { + Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) + } + _ => Ok(serde_json::json!({ "resp": proto.encode() })), + } } async fn ft_del(&self, db_id: u64, index_name: String, doc_id: String) -> RpcResult { @@ -571,10 +586,16 @@ impl RpcServer for RpcServerImpl { if !matches!(server.option.backend, crate::options::BackendType::Tantivy) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>)); } - crate::search_cmd::ft_del_cmd(&*server, index_name, doc_id) + let proto = crate::search_cmd::ft_del_cmd(&*server, index_name, doc_id) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; - Ok(true) + match proto { + crate::protocol::Protocol::Error(msg) => { + Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) + } + crate::protocol::Protocol::SimpleString(s) => Ok(s == "1"), + _ => Ok(false), + } } async fn ft_info(&self, db_id: u64, index_name: String) -> RpcResult { @@ -588,7 +609,12 @@ impl RpcServer for RpcServerImpl { let proto = crate::search_cmd::ft_info_cmd(&*server, index_name) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; - Ok(serde_json::json!({ "resp": proto.encode() })) + match proto { + crate::protocol::Protocol::Error(msg) => { + Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) + } + _ => Ok(serde_json::json!({ "resp": proto.encode() })), + } } async fn ft_drop(&self, db_id: u64, index_name: String) -> RpcResult { @@ -599,10 +625,16 @@ impl RpcServer for RpcServerImpl { if !matches!(server.option.backend, crate::options::BackendType::Tantivy) { return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>)); } - crate::search_cmd::ft_drop_cmd(&*server, index_name) + let proto = crate::search_cmd::ft_drop_cmd(&*server, index_name) .await .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; - Ok(true) + match proto { + crate::protocol::Protocol::Error(msg) => { + Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>)) + } + crate::protocol::Protocol::SimpleString(s) => Ok(s.eq_ignore_ascii_case("OK")), + _ => Ok(false), + } } async fn add_access_key(&self, db_id: u64, key: String, permissions: String) -> RpcResult { diff --git a/src/search_cmd.rs b/src/search_cmd.rs index 741584b..ec43511 100644 --- a/src/search_cmd.rs +++ b/src/search_cmd.rs @@ -162,8 +162,8 @@ pub async fn ft_add_cmd( if !is_tantivy { return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed")); } - if !server.has_read_permission() { - return Ok(Protocol::err("ERR read permission denied")); + if !server.has_write_permission() { + return Ok(Protocol::err("ERR write permission denied")); } let indexes = server.search_indexes.read().unwrap(); let search_index = indexes @@ -199,8 +199,8 @@ pub async fn ft_search_cmd( if !is_tantivy { return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed")); } - if !server.has_write_permission() { - return Ok(Protocol::err("ERR write permission denied")); + if !server.has_read_permission() { + return Ok(Protocol::err("ERR read permission denied")); } let indexes = server.search_indexes.read().unwrap(); let search_index = indexes @@ -225,30 +225,29 @@ pub async fn ft_search_cmd( }; let results = search_index.search_with_options(&query, options)?; - - // Format results as Redis protocol + + // Format results as a flattened Redis protocol array to match client expectations: + // [ total, doc_id, score, field, value, field, value, ... , doc_id, score, ... ] let mut response = Vec::new(); // First element is the total count - response.push(Protocol::SimpleString(results.total.to_string())); - // Then each document - for doc in results.documents { - let mut doc_array = Vec::new(); + response.push(Protocol::BulkString(results.total.to_string())); + // Then each document flattened + for mut doc in results.documents { // Add document ID if it exists if let Some(id) = doc.fields.get("_id") { - doc_array.push(Protocol::BulkString(id.clone())); + response.push(Protocol::BulkString(id.clone())); } // Add score - doc_array.push(Protocol::BulkString(doc.score.to_string())); + response.push(Protocol::BulkString(doc.score.to_string())); // Add fields as key-value pairs - for (field_name, field_value) in doc.fields { + for (field_name, field_value) in std::mem::take(&mut doc.fields) { if field_name != "_id" { - doc_array.push(Protocol::BulkString(field_name)); - doc_array.push(Protocol::BulkString(field_value)); + response.push(Protocol::BulkString(field_name)); + response.push(Protocol::BulkString(field_value)); } } - response.push(Protocol::Array(doc_array)); } - + Ok(Protocol::Array(response)) } @@ -278,12 +277,11 @@ pub async fn ft_del_cmd( return Ok(Protocol::err("ERR write permission denied")); } let indexes = server.search_indexes.read().unwrap(); - let _search_index = indexes + let search_index = indexes .get(&index_name) .ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?; - // Not fully implemented yet: Tantivy delete by term would require a writer session and commit coordination. - println!("Deleting document '{}' from index '{}'", doc_id, index_name); - Ok(Protocol::SimpleString("1".to_string())) + let existed = search_index.delete_document_by_id(&doc_id)?; + Ok(Protocol::SimpleString(if existed { "1".to_string() } else { "0".to_string() })) } pub async fn ft_info_cmd(server: &Server, index_name: String) -> Result { @@ -355,10 +353,13 @@ pub async fn ft_drop_cmd(server: &Server, index_name: String) -> Result Result bool { - matches!(self.current_permissions, Some(crate::rpc::Permissions::Read) | Some(crate::rpc::Permissions::ReadWrite)) + // If an explicit permission is set for this connection, honor it. + if let Some(perms) = self.current_permissions.as_ref() { + return matches!(*perms, crate::rpc::Permissions::Read | crate::rpc::Permissions::ReadWrite); + } + // Fallback ONLY when no explicit permission context (e.g., JSON-RPC flows without SELECT). + match crate::admin_meta::verify_access( + &self.option.dir, + self.option.backend.clone(), + &self.option.admin_secret, + self.selected_db, + None, + ) { + Ok(Some(crate::rpc::Permissions::Read)) | Ok(Some(crate::rpc::Permissions::ReadWrite)) => true, + _ => false, + } } /// Check if current permissions allow write operations pub fn has_write_permission(&self) -> bool { - matches!(self.current_permissions, Some(crate::rpc::Permissions::ReadWrite)) + // If an explicit permission is set for this connection, honor it. + if let Some(perms) = self.current_permissions.as_ref() { + return matches!(*perms, crate::rpc::Permissions::ReadWrite); + } + // Fallback ONLY when no explicit permission context (e.g., JSON-RPC flows without SELECT). + match crate::admin_meta::verify_access( + &self.option.dir, + self.option.backend.clone(), + &self.option.admin_secret, + self.selected_db, + None, + ) { + Ok(Some(crate::rpc::Permissions::ReadWrite)) => true, + _ => false, + } } // ----- BLPOP waiter helpers ----- diff --git a/src/tantivy_search.rs b/src/tantivy_search.rs index 2c0a8ae..b472c04 100644 --- a/src/tantivy_search.rs +++ b/src/tantivy_search.rs @@ -394,6 +394,10 @@ impl TantivySearch { writer .commit() .map_err(|e| DBError(format!("Failed to commit: {}", e)))?; + // Make new documents visible to searches + self.reader + .reload() + .map_err(|e| DBError(format!("Failed to reload reader: {}", e)))?; Ok(()) } @@ -402,6 +406,10 @@ impl TantivySearch { query_str: &str, options: SearchOptions, ) -> Result { + // Ensure reader is up to date with latest commits + self.reader + .reload() + .map_err(|e| DBError(format!("Failed to reload reader: {}", e)))?; let searcher = self.reader.searcher(); // Ensure we have searchable fields @@ -602,6 +610,40 @@ impl TantivySearch { config: self.config.clone(), }) } + + /// Delete a document by its _id term. Returns true if the document existed before deletion. + pub fn delete_document_by_id(&self, doc_id: &str) -> Result { + // Determine existence by running a tiny term query + let existed = if let Some((id_field, _)) = self.index_schema.fields.get("_id") { + let term = Term::from_field_text(*id_field, doc_id); + let searcher = self.reader.searcher(); + let tq = TermQuery::new(term.clone(), IndexRecordOption::Basic); + let hits = searcher + .search(&tq, &TopDocs::with_limit(1)) + .map_err(|e| DBError(format!("Failed to search for existing doc: {}", e)))?; + !hits.is_empty() + } else { + false + }; + + // Perform deletion and commit + let mut writer = self + .writer + .write() + .map_err(|e| DBError(format!("Failed to acquire writer lock: {}", e)))?; + if let Some((id_field, _)) = self.index_schema.fields.get("_id") { + writer.delete_term(Term::from_field_text(*id_field, doc_id)); + } + writer + .commit() + .map_err(|e| DBError(format!("Failed to commit delete: {}", e)))?; + // Refresh reader to observe deletion + self.reader + .reload() + .map_err(|e| DBError(format!("Failed to reload reader: {}", e)))?; + + Ok(existed) + } } #[derive(Debug, Clone)] diff --git a/tests/tantivy_integration_tests.rs b/tests/tantivy_integration_tests.rs index c559306..edc7f53 100644 --- a/tests/tantivy_integration_tests.rs +++ b/tests/tantivy_integration_tests.rs @@ -87,6 +87,7 @@ async fn setup_server() -> (ServerProcessGuard, u16, Connection, HttpClient) { &port.to_string(), "--rpc-port", &(port + 1).to_string(), + "--enable-rpc", "--debug", "--admin-secret", "test-admin",