use redis::{Client, Connection, RedisResult, Value}; use std::process::{Child, Command}; use std::time::Duration; use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; use herodb::rpc::{BackendType, DatabaseConfig, RpcClient}; use base64::Engine; use tokio::time::sleep; // ------------------------ // Helpers // ------------------------ fn get_redis_connection(port: u16) -> Connection { let connection_info = format!("redis://127.0.0.1:{}", port); let client = Client::open(connection_info).unwrap(); let mut attempts = 0; loop { match client.get_connection() { Ok(mut conn) => { if redis::cmd("PING").query::(&mut conn).is_ok() { return conn; } } Err(e) => { if attempts >= 3600 { panic!("Failed to connect to Redis server after 3600 attempts: {}", e); } } } attempts += 1; std::thread::sleep(Duration::from_millis(500)); } } async fn get_rpc_client(port: u16) -> HttpClient { let url = format!("http://127.0.0.1:{}", port + 1); // RPC port = Redis port + 1 HttpClientBuilder::default().build(url).unwrap() } /// Wait until RPC server is responsive (getServerStats succeeds) or panic after retries. async fn wait_for_rpc_ready(client: &HttpClient, max_attempts: u32, delay: Duration) { for _ in 0..max_attempts { match client.get_server_stats().await { Ok(_) => return, Err(_) => { sleep(delay).await; } } } panic!("RPC server did not become ready in time"); } // A guard to ensure the server process is killed when it goes out of scope and test dir cleaned. struct ServerProcessGuard { process: Child, test_dir: String, } impl Drop for ServerProcessGuard { fn drop(&mut self) { eprintln!("Killing server process (pid: {})...", self.process.id()); if let Err(e) = self.process.kill() { eprintln!("Failed to kill server process: {}", e); } match self.process.wait() { Ok(status) => eprintln!("Server process exited with: {}", status), Err(e) => eprintln!("Failed to wait on server process: {}", e), } // Clean up the specific test directory eprintln!("Cleaning up test directory: {}", self.test_dir); if let Err(e) = std::fs::remove_dir_all(&self.test_dir) { eprintln!("Failed to clean up test directory: {}", e); } } } // Helper to set up the server and return guard + ports async fn setup_server() -> (ServerProcessGuard, u16) { use std::sync::atomic::{AtomicU16, Ordering}; static PORT_COUNTER: AtomicU16 = AtomicU16::new(17500); let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); let test_dir = format!("/tmp/herodb_lance_test_{}", port); // Clean up previous test data if std::path::Path::new(&test_dir).exists() { let _ = std::fs::remove_dir_all(&test_dir); } std::fs::create_dir_all(&test_dir).unwrap(); // Start the server in a subprocess with RPC enabled (follows tantivy test pattern) let child = Command::new("cargo") .args(&[ "run", "--", "--dir", &test_dir, "--port", &port.to_string(), "--rpc-port", &(port + 1).to_string(), "--enable-rpc", "--debug", "--admin-secret", "test-admin", ]) .spawn() .expect("Failed to start server process"); let guard = ServerProcessGuard { process: child, test_dir, }; // Give the server time to build and start (cargo run may compile first) // Increase significantly to accommodate first-time dependency compilation in CI. std::thread::sleep(Duration::from_millis(60000)); (guard, port) } // Convenient helpers for assertions on redis::Value fn value_is_ok(v: &Value) -> bool { match v { Value::Okay => true, Value::Status(s) if s == "OK" => true, Value::Data(d) if d == b"OK" => true, _ => false, } } fn value_is_int_eq(v: &Value, expected: i64) -> bool { matches!(v, Value::Int(n) if *n == expected) } fn value_is_str_eq(v: &Value, expected: &str) -> bool { match v { Value::Status(s) => s == expected, Value::Data(d) => String::from_utf8_lossy(d) == expected, _ => false, } } fn to_string_lossy(v: &Value) -> String { match v { Value::Nil => "Nil".to_string(), Value::Int(n) => n.to_string(), Value::Status(s) => s.clone(), Value::Okay => "OK".to_string(), Value::Data(d) => String::from_utf8_lossy(d).to_string(), Value::Bulk(items) => { let inner: Vec = items.iter().map(to_string_lossy).collect(); format!("[{}]", inner.join(", ")) } } } // Extract ids from LANCE.SEARCH / LANCE.SEARCHIMAGE reply which is: // Array of elements: [ [id, score, [k,v,...]], [id, score, ...], ... ] fn extract_hit_ids(v: &Value) -> Vec { let mut ids = Vec::new(); if let Value::Bulk(items) = v { for item in items { if let Value::Bulk(row) = item { if !row.is_empty() { // first element is id (Data or Status) let id = match &row[0] { Value::Data(d) => String::from_utf8_lossy(d).to_string(), Value::Status(s) => s.clone(), Value::Int(n) => n.to_string(), _ => continue, }; ids.push(id); } } } } ids } // Check whether a Bulk array (RESP array) contains a given string element. fn bulk_contains_string(v: &Value, needle: &str) -> bool { match v { Value::Bulk(items) => items.iter().any(|it| match it { Value::Data(d) => String::from_utf8_lossy(d).contains(needle), Value::Status(s) => s.contains(needle), Value::Bulk(_) => bulk_contains_string(it, needle), _ => false, }), _ => false, } } // ------------------------ // Test: Lance end-to-end (RESP) using only local embedders // ------------------------ #[tokio::test] async fn test_lance_end_to_end() { let (_guard, port) = setup_server().await; // First, wait for RESP to be available; this also gives cargo-run child ample time to finish building. // Reuse the helper that retries PING until success. { let _conn_ready = get_redis_connection(port); // Drop immediately; we only needed readiness. } // Build RPC client and create a Lance DB let rpc_client = get_rpc_client(port).await; // Ensure RPC server is listening before we issue createDatabase (allow longer warm-up to accommodate first-build costs) wait_for_rpc_ready(&rpc_client, 3600, Duration::from_millis(250)).await; let db_config = DatabaseConfig { name: Some("media-db".to_string()), storage_path: None, max_size: None, redis_version: None, }; let db_id = rpc_client .create_database(BackendType::Lance, db_config, None) .await .expect("create_database Lance failed"); assert_eq!(db_id, 1, "Expected first Lance DB id to be 1"); // Add access keys let _ = rpc_client .add_access_key(db_id, "readwrite_key".to_string(), "readwrite".to_string()) .await .expect("add_access_key readwrite failed"); let _ = rpc_client .add_access_key(db_id, "read_key".to_string(), "read".to_string()) .await .expect("add_access_key read failed"); // Connect to Redis and SELECT DB with readwrite key let mut conn = get_redis_connection(port); let sel_ok: RedisResult = redis::cmd("SELECT") .arg(db_id) .arg("KEY") .arg("readwrite_key") .query(&mut conn); assert!(sel_ok.is_ok(), "SELECT db with key failed: {:?}", sel_ok); assert_eq!(sel_ok.unwrap(), "OK"); // 1) Configure embedding providers: textset -> testhash dim 64, imageset -> testimagehash dim 512 let v = redis::cmd("LANCE.EMBEDDING") .arg("CONFIG") .arg("SET") .arg("textset") .arg("PROVIDER") .arg("testhash") .arg("MODEL") .arg("any") .arg("PARAM") .arg("dim") .arg("64") .query::(&mut conn) .unwrap(); assert!(value_is_ok(&v), "Embedding config set (text) not OK: {}", to_string_lossy(&v)); let v = redis::cmd("LANCE.EMBEDDING") .arg("CONFIG") .arg("SET") .arg("imageset") .arg("PROVIDER") .arg("testimagehash") .arg("MODEL") .arg("any") .arg("PARAM") .arg("dim") .arg("512") .query::(&mut conn) .unwrap(); assert!(value_is_ok(&v), "Embedding config set (image) not OK: {}", to_string_lossy(&v)); // 2) Create datasets let v = redis::cmd("LANCE.CREATE") .arg("textset") .arg("DIM") .arg(64) .query::(&mut conn) .unwrap(); assert!(value_is_ok(&v), "LANCE.CREATE textset failed: {}", to_string_lossy(&v)); let v = redis::cmd("LANCE.CREATE") .arg("imageset") .arg("DIM") .arg(512) .query::(&mut conn) .unwrap(); assert!(value_is_ok(&v), "LANCE.CREATE imageset failed: {}", to_string_lossy(&v)); // 3) Store two text documents let v = redis::cmd("LANCE.STORE") .arg("textset") .arg("ID") .arg("doc-1") .arg("TEXT") .arg("The quick brown fox jumps over the lazy dog") .arg("META") .arg("title") .arg("Fox") .arg("category") .arg("animal") .query::(&mut conn) .unwrap(); assert!(value_is_ok(&v), "LANCE.STORE doc-1 failed: {}", to_string_lossy(&v)); let v = redis::cmd("LANCE.STORE") .arg("textset") .arg("ID") .arg("doc-2") .arg("TEXT") .arg("A fast auburn fox vaulted a sleepy canine") .arg("META") .arg("title") .arg("Paraphrase") .arg("category") .arg("animal") .query::(&mut conn) .unwrap(); assert!(value_is_ok(&v), "LANCE.STORE doc-2 failed: {}", to_string_lossy(&v)); // 4) Store two images via BYTES (local fake bytes; embedder only hashes bytes, not decoding) let img1: Vec = b"local-image-bytes-1-abcdefghijklmnopqrstuvwxyz".to_vec(); let img2: Vec = b"local-image-bytes-2-ABCDEFGHIJKLMNOPQRSTUVWXYZ".to_vec(); let img1_b64 = base64::engine::general_purpose::STANDARD.encode(&img1); let img2_b64 = base64::engine::general_purpose::STANDARD.encode(&img2); let v = redis::cmd("LANCE.STOREIMAGE") .arg("imageset") .arg("ID") .arg("img-1") .arg("BYTES") .arg(&img1_b64) .arg("META") .arg("title") .arg("Local1") .arg("group") .arg("demo") .query::(&mut conn) .unwrap(); assert!(value_is_ok(&v), "LANCE.STOREIMAGE img-1 failed: {}", to_string_lossy(&v)); let v = redis::cmd("LANCE.STOREIMAGE") .arg("imageset") .arg("ID") .arg("img-2") .arg("BYTES") .arg(&img2_b64) .arg("META") .arg("title") .arg("Local2") .arg("group") .arg("demo") .query::(&mut conn) .unwrap(); assert!(value_is_ok(&v), "LANCE.STOREIMAGE img-2 failed: {}", to_string_lossy(&v)); // 5) Search text: K 2 QUERY "quick brown fox" RETURN 1 title let v = redis::cmd("LANCE.SEARCH") .arg("textset") .arg("K") .arg(2) .arg("QUERY") .arg("quick brown fox") .arg("RETURN") .arg(1) .arg("title") .query::(&mut conn) .unwrap(); // Should be an array of hits let ids = extract_hit_ids(&v); assert!( ids.contains(&"doc-1".to_string()) || ids.contains(&"doc-2".to_string()), "LANCE.SEARCH should return doc-1/doc-2; got: {}", to_string_lossy(&v) ); // With FILTER on category let v = redis::cmd("LANCE.SEARCH") .arg("textset") .arg("K") .arg(2) .arg("QUERY") .arg("fox jumps") .arg("FILTER") .arg("category = 'animal'") .arg("RETURN") .arg(1) .arg("title") .query::(&mut conn) .unwrap(); let ids_f = extract_hit_ids(&v); assert!( !ids_f.is_empty(), "Filtered LANCE.SEARCH should return at least one document; got: {}", to_string_lossy(&v) ); // 6) Search images with QUERYBYTES let query_img: Vec = b"local-image-query-3-1234567890".to_vec(); let query_img_b64 = base64::engine::general_purpose::STANDARD.encode(&query_img); let v = redis::cmd("LANCE.SEARCHIMAGE") .arg("imageset") .arg("K") .arg(2) .arg("QUERYBYTES") .arg(&query_img_b64) .arg("RETURN") .arg(1) .arg("title") .query::(&mut conn) .unwrap(); // Should get 2 hits (img-1 and img-2) in some order; assert array non-empty let img_ids = extract_hit_ids(&v); assert!( !img_ids.is_empty(), "LANCE.SEARCHIMAGE should return non-empty results; got: {}", to_string_lossy(&v) ); // 7) Inspect datasets let v = redis::cmd("LANCE.LIST").query::(&mut conn).unwrap(); assert!( bulk_contains_string(&v, "textset"), "LANCE.LIST missing textset: {}", to_string_lossy(&v) ); assert!( bulk_contains_string(&v, "imageset"), "LANCE.LIST missing imageset: {}", to_string_lossy(&v) ); // INFO textset let info_text = redis::cmd("LANCE.INFO") .arg("textset") .query::(&mut conn) .unwrap(); // INFO returns Array [k,v,k,v,...] including "dimension" "64" and "row_count" "...". let info_str = to_string_lossy(&info_text); assert!( info_str.contains("dimension") && info_str.contains("64"), "LANCE.INFO textset should include dimension 64; got: {}", info_str ); // 8) Delete by id and drop datasets let v = redis::cmd("LANCE.DEL") .arg("textset") .arg("doc-2") .query::(&mut conn) .unwrap(); // Returns SimpleString "1" or Int 1 depending on encoding path; accept either assert!( value_is_int_eq(&v, 1) || value_is_str_eq(&v, "1"), "LANCE.DEL doc-2 expected 1; got {}", to_string_lossy(&v) ); let v = redis::cmd("LANCE.DROP") .arg("textset") .query::(&mut conn) .unwrap(); assert!(value_is_ok(&v), "LANCE.DROP textset failed: {}", to_string_lossy(&v)); let v = redis::cmd("LANCE.DROP") .arg("imageset") .query::(&mut conn) .unwrap(); assert!(value_is_ok(&v), "LANCE.DROP imageset failed: {}", to_string_lossy(&v)); }