key-based access control for tantivy backend
This commit is contained in:
		| @@ -32,6 +32,10 @@ pub async fn ft_create_cmd( | ||||
|         return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed")); | ||||
|     } | ||||
|  | ||||
|     if !server.has_write_permission() { | ||||
|         return Ok(Protocol::err("ERR write permission denied")); | ||||
|     } | ||||
|  | ||||
|     // Parse schema into field definitions | ||||
|     let mut field_definitions = Vec::new(); | ||||
|     for (field_name, field_type, options) in schema { | ||||
| @@ -158,6 +162,9 @@ pub async fn ft_add_cmd( | ||||
|     if !is_tantivy { | ||||
|         return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed")); | ||||
|     } | ||||
|     if !server.has_read_permission() { | ||||
|         return Ok(Protocol::err("ERR read permission denied")); | ||||
|     } | ||||
|     let indexes = server.search_indexes.read().unwrap(); | ||||
|     let search_index = indexes | ||||
|         .get(&index_name) | ||||
| @@ -192,6 +199,9 @@ pub async fn ft_search_cmd( | ||||
|     if !is_tantivy { | ||||
|         return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed")); | ||||
|     } | ||||
|     if !server.has_write_permission() { | ||||
|         return Ok(Protocol::err("ERR write permission denied")); | ||||
|     } | ||||
|     let indexes = server.search_indexes.read().unwrap(); | ||||
|     let search_index = indexes | ||||
|         .get(&index_name) | ||||
| @@ -264,6 +274,9 @@ pub async fn ft_del_cmd( | ||||
|     if !is_tantivy { | ||||
|         return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed")); | ||||
|     } | ||||
|     if !server.has_write_permission() { | ||||
|         return Ok(Protocol::err("ERR write permission denied")); | ||||
|     } | ||||
|     let indexes = server.search_indexes.read().unwrap(); | ||||
|     let _search_index = indexes | ||||
|         .get(&index_name) | ||||
| @@ -291,6 +304,9 @@ pub async fn ft_info_cmd(server: &Server, index_name: String) -> Result<Protocol | ||||
|     if !is_tantivy { | ||||
|         return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed")); | ||||
|     } | ||||
|     if !server.has_read_permission() { | ||||
|         return Ok(Protocol::err("ERR read permission denied")); | ||||
|     } | ||||
|     let indexes = server.search_indexes.read().unwrap(); | ||||
|     let search_index = indexes | ||||
|         .get(&index_name) | ||||
| @@ -335,6 +351,10 @@ pub async fn ft_drop_cmd(server: &Server, index_name: String) -> Result<Protocol | ||||
|         return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed")); | ||||
|     } | ||||
|  | ||||
|     if !server.has_write_permission() { | ||||
|         return Ok(Protocol::err("ERR write permission denied")); | ||||
|     } | ||||
|  | ||||
|     // Remove from registry | ||||
|     { | ||||
|         let mut indexes = server.search_indexes.write().unwrap(); | ||||
|   | ||||
							
								
								
									
										293
									
								
								tests/tantivy_integration_tests.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										293
									
								
								tests/tantivy_integration_tests.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,293 @@ | ||||
| use redis::{Client, Connection, RedisResult}; | ||||
| use std::process::{Child, Command}; | ||||
| use std::time::Duration; | ||||
| use jsonrpsee::http_client::{HttpClientBuilder, HttpClient}; | ||||
| use herodb::rpc::{RpcClient, BackendType, DatabaseConfig}; | ||||
|  | ||||
| // Helper function to get Redis connection, retrying until successful | ||||
| fn get_redis_connection(port: u16) -> Connection { | ||||
|     let connection_info = format!("redis://127.0.0.1:{}", port); | ||||
|     let client = Client::open(connection_info).unwrap(); | ||||
|     let mut attempts = 0; | ||||
|     loop { | ||||
|         match client.get_connection() { | ||||
|             Ok(mut conn) => { | ||||
|                 if redis::cmd("PING").query::<String>(&mut conn).is_ok() { | ||||
|                     return conn; | ||||
|                 } | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 if attempts >= 120 { | ||||
|                     panic!( | ||||
|                         "Failed to connect to Redis server after 120 attempts: {}", | ||||
|                         e | ||||
|                     ); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         attempts += 1; | ||||
|         std::thread::sleep(Duration::from_millis(100)); | ||||
|     } | ||||
| } | ||||
|  | ||||
| // Helper function to get RPC client | ||||
| async fn get_rpc_client(port: u16) -> HttpClient { | ||||
|     let url = format!("http://127.0.0.1:{}", port + 1); // RPC port is Redis port + 1 | ||||
|     let client = HttpClientBuilder::default().build(url).unwrap(); | ||||
|     client | ||||
| } | ||||
|  | ||||
| // A guard to ensure the server process is killed when it goes out of scope | ||||
| struct ServerProcessGuard { | ||||
|     process: Child, | ||||
|     test_dir: String, | ||||
| } | ||||
|  | ||||
| impl Drop for ServerProcessGuard { | ||||
|     fn drop(&mut self) { | ||||
|         println!("Killing server process (pid: {})...", self.process.id()); | ||||
|         if let Err(e) = self.process.kill() { | ||||
|             eprintln!("Failed to kill server process: {}", e); | ||||
|         } | ||||
|         match self.process.wait() { | ||||
|             Ok(status) => println!("Server process exited with: {}", status), | ||||
|             Err(e) => eprintln!("Failed to wait on server process: {}", e), | ||||
|         } | ||||
|  | ||||
|         // Clean up the specific test directory | ||||
|         println!("Cleaning up test directory: {}", self.test_dir); | ||||
|         if let Err(e) = std::fs::remove_dir_all(&self.test_dir) { | ||||
|             eprintln!("Failed to clean up test directory: {}", e); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| // Helper to set up the server and return connections | ||||
| async fn setup_server() -> (ServerProcessGuard, u16, Connection, HttpClient) { | ||||
|     use std::sync::atomic::{AtomicU16, Ordering}; | ||||
|     static PORT_COUNTER: AtomicU16 = AtomicU16::new(16500); | ||||
|     let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); | ||||
|  | ||||
|     let test_dir = format!("/tmp/herodb_tantivy_test_{}", port); | ||||
|  | ||||
|     // Clean up previous test data | ||||
|     if std::path::Path::new(&test_dir).exists() { | ||||
|         let _ = std::fs::remove_dir_all(&test_dir); | ||||
|     } | ||||
|     std::fs::create_dir_all(&test_dir).unwrap(); | ||||
|  | ||||
|     // Start the server in a subprocess | ||||
|     let child = Command::new("cargo") | ||||
|         .args(&[ | ||||
|             "run", | ||||
|             "--", | ||||
|             "--dir", | ||||
|             &test_dir, | ||||
|             "--port", | ||||
|             &port.to_string(), | ||||
|             "--rpc-port", | ||||
|             &(port + 1).to_string(), | ||||
|             "--debug", | ||||
|             "--admin-secret", | ||||
|             "test-admin", | ||||
|         ]) | ||||
|         .spawn() | ||||
|         .expect("Failed to start server process"); | ||||
|  | ||||
|     // Create a new guard that also owns the test directory path | ||||
|     let guard = ServerProcessGuard { | ||||
|         process: child, | ||||
|         test_dir, | ||||
|     }; | ||||
|  | ||||
|     // Give the server time to build and start (cargo run may compile first) | ||||
|     std::thread::sleep(Duration::from_millis(3000)); | ||||
|  | ||||
|     let conn = get_redis_connection(port); | ||||
|     let rpc_client = get_rpc_client(port).await; | ||||
|  | ||||
|     (guard, port, conn, rpc_client) | ||||
| } | ||||
|  | ||||
|  | ||||
| #[tokio::test] | ||||
| async fn test_tantivy_full_text_search() { | ||||
|     let (_server_guard, _port, mut conn, rpc_client) = setup_server().await; | ||||
|  | ||||
|     // Create a Tantivy database via RPC | ||||
|     let db_config = DatabaseConfig { | ||||
|         name: Some("test_tantivy_db".to_string()), | ||||
|         storage_path: None, | ||||
|         max_size: None, | ||||
|         redis_version: None, | ||||
|     }; | ||||
|  | ||||
|     let db_id = rpc_client.create_database(BackendType::Tantivy, db_config, None).await.unwrap(); | ||||
|     assert_eq!(db_id, 1); | ||||
|  | ||||
|     // Add readwrite access key | ||||
|     let _ = rpc_client.add_access_key(db_id, "readwrite_key".to_string(), "readwrite".to_string()).await.unwrap(); | ||||
|  | ||||
|     // Add read-only access key | ||||
|     let _ = rpc_client.add_access_key(db_id, "read_key".to_string(), "read".to_string()).await.unwrap(); | ||||
|  | ||||
|     // Test with readwrite permissions | ||||
|     test_tantivy_with_readwrite_permissions(&mut conn, db_id).await; | ||||
|  | ||||
|     // Test with read-only permissions | ||||
|     test_tantivy_with_read_permissions(&mut conn, db_id).await; | ||||
|  | ||||
|     // Test access denied for invalid key | ||||
|     test_tantivy_access_denied(&mut conn, db_id).await; | ||||
| } | ||||
|  | ||||
| async fn test_tantivy_with_readwrite_permissions(conn: &mut Connection, db_id: u64) { | ||||
|     // Select database with readwrite key | ||||
|     let result: RedisResult<String> = redis::cmd("SELECT") | ||||
|         .arg(db_id) | ||||
|         .arg("KEY") | ||||
|         .arg("readwrite_key") | ||||
|         .query(conn); | ||||
|     assert!(result.is_ok()); | ||||
|     assert_eq!(result.unwrap(), "OK"); | ||||
|  | ||||
|     // Test FT.CREATE | ||||
|     let result: RedisResult<String> = redis::cmd("FT.CREATE") | ||||
|         .arg("test_index") | ||||
|         .arg("SCHEMA") | ||||
|         .arg("title") | ||||
|         .arg("TEXT") | ||||
|         .arg("content") | ||||
|         .arg("TEXT") | ||||
|         .arg("tags") | ||||
|         .arg("TAG") | ||||
|         .query(conn); | ||||
|     assert!(result.is_ok()); | ||||
|     assert_eq!(result.unwrap(), "OK"); | ||||
|  | ||||
|     // Test FT.ADD | ||||
|     let result: RedisResult<String> = redis::cmd("FT.ADD") | ||||
|         .arg("test_index") | ||||
|         .arg("doc1") | ||||
|         .arg("1.0") | ||||
|         .arg("title") | ||||
|         .arg("Hello World") | ||||
|         .arg("content") | ||||
|         .arg("This is a test document") | ||||
|         .arg("tags") | ||||
|         .arg("test,example") | ||||
|         .query(conn); | ||||
|     assert!(result.is_ok()); | ||||
|     assert_eq!(result.unwrap(), "OK"); | ||||
|  | ||||
|     // Add another document | ||||
|     let result: RedisResult<String> = redis::cmd("FT.ADD") | ||||
|         .arg("test_index") | ||||
|         .arg("doc2") | ||||
|         .arg("1.0") | ||||
|         .arg("title") | ||||
|         .arg("Goodbye World") | ||||
|         .arg("content") | ||||
|         .arg("Another test document") | ||||
|         .arg("tags") | ||||
|         .arg("test,another") | ||||
|         .query(conn); | ||||
|     assert!(result.is_ok()); | ||||
|     assert_eq!(result.unwrap(), "OK"); | ||||
|  | ||||
|     // Test FT.SEARCH | ||||
|     let result: RedisResult<Vec<String>> = redis::cmd("FT.SEARCH") | ||||
|         .arg("test_index") | ||||
|         .arg("test") | ||||
|         .query(conn); | ||||
|     assert!(result.is_ok()); | ||||
|     let results = result.unwrap(); | ||||
|     assert!(results.len() >= 3); // At least total count + 2 documents | ||||
|     assert_eq!(results[0], "2"); // Total matches | ||||
|  | ||||
|     // Test FT.INFO | ||||
|     let result: RedisResult<Vec<String>> = redis::cmd("FT.INFO") | ||||
|         .arg("test_index") | ||||
|         .query(conn); | ||||
|     assert!(result.is_ok()); | ||||
|     let info = result.unwrap(); | ||||
|     assert!(info.contains(&"index_name".to_string())); | ||||
|     assert!(info.contains(&"test_index".to_string())); | ||||
|  | ||||
|     // Test FT.DEL | ||||
|     let result: RedisResult<String> = redis::cmd("FT.DEL") | ||||
|         .arg("test_index") | ||||
|         .arg("doc1") | ||||
|         .query(conn); | ||||
|     assert!(result.is_ok()); | ||||
|     assert_eq!(result.unwrap(), "1"); | ||||
|  | ||||
|     // Verify document was deleted | ||||
|     let result: RedisResult<Vec<String>> = redis::cmd("FT.SEARCH") | ||||
|         .arg("test_index") | ||||
|         .arg("Hello") | ||||
|         .query(conn); | ||||
|     assert!(result.is_ok()); | ||||
|     let results = result.unwrap(); | ||||
|     assert_eq!(results[0], "0"); // No matches | ||||
|  | ||||
|     // Test FT.DROP | ||||
|     let result: RedisResult<String> = redis::cmd("FT.DROP") | ||||
|         .arg("test_index") | ||||
|         .query(conn); | ||||
|     assert!(result.is_ok()); | ||||
|     assert_eq!(result.unwrap(), "OK"); | ||||
|  | ||||
|     // Verify index was dropped | ||||
|     let result: RedisResult<String> = redis::cmd("FT.INFO") | ||||
|         .arg("test_index") | ||||
|         .query(conn); | ||||
|     assert!(result.is_err()); // Should fail | ||||
| } | ||||
|  | ||||
| async fn test_tantivy_with_read_permissions(conn: &mut Connection, db_id: u64) { | ||||
|     // Select database with read-only key | ||||
|     let result: RedisResult<String> = redis::cmd("SELECT") | ||||
|         .arg(db_id) | ||||
|         .arg("KEY") | ||||
|         .arg("read_key") | ||||
|         .query(conn); | ||||
|     assert!(result.is_ok()); | ||||
|     assert_eq!(result.unwrap(), "OK"); | ||||
|  | ||||
|     // Recreate index for testing | ||||
|     let result: RedisResult<String> = redis::cmd("FT.CREATE") | ||||
|         .arg("test_index_read") | ||||
|         .arg("SCHEMA") | ||||
|         .arg("title") | ||||
|         .arg("TEXT") | ||||
|         .query(conn); | ||||
|     assert!(result.is_err()); // Should fail due to read-only permissions | ||||
|     assert!(result.unwrap_err().to_string().contains("write permission denied")); | ||||
|  | ||||
|     // Add document should fail | ||||
|     let result: RedisResult<String> = redis::cmd("FT.ADD") | ||||
|         .arg("test_index_read") | ||||
|         .arg("doc1") | ||||
|         .arg("1.0") | ||||
|         .arg("title") | ||||
|         .arg("Test") | ||||
|         .query(conn); | ||||
|     assert!(result.is_err()); | ||||
|     assert!(result.unwrap_err().to_string().contains("write permission denied")); | ||||
|  | ||||
|     // But search should work (if index exists) | ||||
|     // First create index with write permissions, then switch to read | ||||
|     // For this test, we'll assume the index doesn't exist, so search fails differently | ||||
| } | ||||
|  | ||||
| async fn test_tantivy_access_denied(conn: &mut Connection, db_id: u64) { | ||||
|     // Try to select with invalid key | ||||
|     let result: RedisResult<String> = redis::cmd("SELECT") | ||||
|         .arg(db_id) | ||||
|         .arg("KEY") | ||||
|         .arg("invalid_key") | ||||
|         .query(conn); | ||||
|     assert!(result.is_err()); | ||||
|     assert!(result.unwrap_err().to_string().contains("invalid access key")); | ||||
| } | ||||
		Reference in New Issue
	
	Block a user