14 Commits

Author SHA1 Message Date
Maxime Van Hees
a8720c06db prevent unauthorized access to administrative db0 when connection to redis-cli 2025-10-07 10:52:30 +02:00
Maxime Van Hees
2139deb85d WIP6: implementing image embedding as first step towards multi-model support 2025-09-30 14:53:01 +02:00
Maxime Van Hees
7d07b57d32 WIP 5 add image embedding provider (local only for now) 2025-09-29 16:14:34 +02:00
Maxime Van Hees
4aa49e0d5c WIP4 implementation lanceDB: removed blocking Tokio runtime usage during embeddings and isolated all embedding work off the async runtime 2025-09-29 15:54:12 +02:00
Maxime Van Hees
644946f1ca WIP3 implemeting lancedb 2025-09-29 14:55:41 +02:00
Maxime Van Hees
cf66f4c304 WIP2: implementing lancedb: created embedding abstraction, server-side per-dataset embedding config + updates RPC endpoints 2025-09-29 13:17:34 +02:00
Maxime Van Hees
6a4e2819bf WIP 1: implement lancedb vector 2025-09-29 11:24:31 +02:00
Maxime Van Hees
77a53bae86 don't use strings for paths 2025-09-25 16:25:08 +02:00
7f689ae29b Merge pull request 'tantivy_impl' (#14) from tantivy_impl into main
Reviewed-on: #14
2025-09-25 14:08:50 +00:00
Maxime Van Hees
7f92001b89 fixed key-based access control for Tantivy backends 2025-09-25 16:06:08 +02:00
Maxime Van Hees
e7248b84e8 key-based access control for tantivy backend 2025-09-25 13:36:23 +02:00
Maxime Van Hees
22ac4c9ed6 implementation of tantivy datastore + updated RPC calls to deal with tantivy + docs 2025-09-23 17:15:40 +02:00
Maxime Van Hees
c470772a13 Merge branch 'management_rpc_server' 2025-09-22 16:26:53 +02:00
8331ed032b ... 2025-09-17 07:02:44 +02:00
33 changed files with 11316 additions and 127 deletions

5137
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -26,6 +26,15 @@ ed25519-dalek = "2"
x25519-dalek = "2"
base64 = "0.22"
jsonrpsee = { version = "0.26.0", features = ["http-client", "ws-client", "server", "macros"] }
tantivy = "0.25.0"
arrow-schema = "55.2.0"
arrow-array = "55.2.0"
lance = "0.37.0"
lance-index = "0.37.0"
arrow = "55.2.0"
lancedb = "0.22.1"
uuid = "1.18.1"
ureq = { version = "2.10.0", features = ["json", "tls"] }
[dev-dependencies]
redis = { version = "0.24", features = ["aio", "tokio-comp"] }

View File

@@ -47,18 +47,24 @@ HeroDB can be interacted with using any standard Redis client, such as `redis-cl
### Example with `redis-cli`
Connections start with no database selected. You must SELECT a database first.
- To work in the admin database (DB 0), authenticate with the admin secret:
```bash
redis-cli -p 6379 SELECT 0 KEY myadminsecret
redis-cli -p 6379 SET mykey "Hello from HeroDB!"
redis-cli -p 6379 GET mykey
# → "Hello from HeroDB!"
```
- To use a user database, first create one via the JSON-RPC API (see docs/rpc_examples.md), then select it:
```bash
# Suppose RPC created database id 1
redis-cli -p 6379 SELECT 1
redis-cli -p 6379 HSET user:1 name "Alice" age "30"
redis-cli -p 6379 HGET user:1 name
# → "Alice"
redis-cli -p 6379 SCAN 0 MATCH user:* COUNT 10
# → 1) "0"
# 2) 1) "user:1"
```
## Cryptography

View File

@@ -80,6 +80,7 @@ Keys in `DB 0` (internal layout, but useful to understand how things work):
- Requires the exact admin secret as the `KEY` argument to `SELECT 0`
- Permission is `ReadWrite` when the secret matches
Connections start with no database selected. Any command that requires storage (GET, SET, H*, L*, SCAN, etc.) will return an error until you issue a SELECT to choose a database. Admin DB 0 is never accessible without authenticating via SELECT 0 KEY <admin_secret>.
### How to select databases with optional `KEY`
- Public DB (no key required)

View File

@@ -126,7 +126,9 @@ redis-cli -p 6381 --pipe < dump.rdb
## Authentication and Database Selection
HeroDB uses an `Admin DB 0` to govern database existence, access and per-db encryption. Access control is enforced via `Admin DB 0` metadata. See the full model in `docs/admin.md`.
Connections start with no database selected. Any storage-backed command (GET, SET, H*, L*, SCAN, etc.) will return an error until you issue a SELECT to choose a database.
HeroDB uses an `Admin DB 0` to govern database existence, access and per-db encryption. Access control is enforced via `Admin DB 0` metadata. See the full model in (docs/admin.md:1).
Examples:
```bash
@@ -145,4 +147,10 @@ redis-cli -p $PORT SELECT 2 KEY my-db2-access-key
# Admin DB 0 (requires admin secret)
redis-cli -p $PORT SELECT 0 KEY my-admin-secret
# → OK
```
```bash
# Before selecting a DB, storage commands will fail
redis-cli -p $PORT GET key
# → -ERR No database selected. Use SELECT <id> [KEY <key>] first
```

444
docs/lance.md Normal file
View File

@@ -0,0 +1,444 @@
# Lance Vector Backend (RESP + JSON-RPC)
This document explains how to use HeroDBs Lance-backed vector store. It is text-first: users provide text, and HeroDB computes embeddings server-side (no manual vectors). It includes copy-pasteable RESP (redis-cli) and JSON-RPC examples for:
- Creating a Lance database
- Embedding provider configuration (OpenAI, Azure OpenAI, or deterministic test provider)
- Dataset lifecycle: CREATE, LIST, INFO, DROP
- Ingestion: STORE text (+ optional metadata)
- Search: QUERY with K, optional FILTER and RETURN
- Delete by id
- Index creation (currently a placeholder/no-op)
References:
- Implementation: [src/lance_store.rs](src/lance_store.rs), [src/cmd.rs](src/cmd.rs), [src/rpc.rs](src/rpc.rs), [src/server.rs](src/server.rs), [src/embedding.rs](src/embedding.rs)
Notes:
- Admin DB 0 cannot be Lance (or Tantivy). Only databases with id >= 1 can use Lance.
- Permissions:
- Read operations (SEARCH, LIST, INFO) require read permission.
- Mutating operations (CREATE, STORE, CREATEINDEX, DEL, DROP, EMBEDDING CONFIG SET) require readwrite permission.
- Backend gating:
- If a DB is Lance, only LANCE.* and basic control commands (PING, ECHO, SELECT, INFO, CLIENT, etc.) are permitted.
- If a DB is not Lance, LANCE.* commands return an error.
Storage layout and schema:
- Files live at: <base_dir>/lance/<db_id>/<dataset>.lance
- Records schema:
- id: Utf8 (non-null)
- vector: FixedSizeList<Float32, dim> (non-null)
- text: Utf8 (nullable)
- meta: Utf8 JSON (nullable)
- Search is an L2 KNN brute-force scan for now (lower score = better). Index creation is a no-op placeholder to be implemented later.
Prerequisites:
- Start HeroDB with RPC enabled (for management calls):
- See [docs/basics.md](./basics.md) for flags. Example:
```bash
./target/release/herodb --dir /tmp/herodb --admin-secret mysecret --port 6379 --enable-rpc
```
## 0) Create a Lance-backed database (JSON-RPC)
Use the management API to create a database with backend "Lance". DB 0 is reserved for admin and cannot be Lance.
Request:
```json
{
"jsonrpc": "2.0",
"id": 1,
"method": "herodb_createDatabase",
"params": [
"Lance",
{ "name": "vectors-db", "storage_path": null, "max_size": null, "redis_version": null },
null
]
}
```
- Response contains the allocated db_id (>= 1). Use that id below (replace 1 with your actual id).
Select the database over RESP:
```bash
redis-cli -p 6379 SELECT 1
# → OK
```
## 1) Configure embedding provider (server-side embeddings)
HeroDB embeds text internally at STORE/SEARCH time using a per-dataset EmbeddingConfig sidecar. Configure provider before creating a dataset to choose dimensions and provider.
Supported providers:
- openai (standard OpenAI or Azure OpenAI)
- testhash (deterministic, CI-friendly; no network)
Environment variables for OpenAI:
- Standard OpenAI: export OPENAI_API_KEY=sk-...
- Azure OpenAI: export AZURE_OPENAI_API_KEY=...
RESP examples:
```bash
# Standard OpenAI with default dims (model-dependent, e.g. 1536)
redis-cli -p 6379 LANCE.EMBEDDING CONFIG SET myset PROVIDER openai MODEL text-embedding-3-small
# OpenAI with reduced output dimension (e.g., 512) when supported
redis-cli -p 6379 LANCE.EMBEDDING CONFIG SET myset PROVIDER openai MODEL text-embedding-3-small PARAM dim 512
# Azure OpenAI (set env: AZURE_OPENAI_API_KEY)
redis-cli -p 6379 LANCE.EMBEDDING CONFIG SET myset PROVIDER openai MODEL text-embedding-3-small \
PARAM use_azure true \
PARAM azure_endpoint https://myresource.openai.azure.com \
PARAM azure_deployment my-embed-deploy \
PARAM azure_api_version 2024-02-15 \
PARAM dim 512
# Deterministic test provider (no network, stable vectors)
redis-cli -p 6379 LANCE.EMBEDDING CONFIG SET myset PROVIDER testhash MODEL any
```
Read config:
```bash
redis-cli -p 6379 LANCE.EMBEDDING CONFIG GET myset
# → JSON blob describing provider/model/params
```
JSON-RPC examples:
```json
{
"jsonrpc": "2.0",
"id": 2,
"method": "herodb_lanceSetEmbeddingConfig",
"params": [
1,
"myset",
"openai",
"text-embedding-3-small",
{ "dim": "512" }
]
}
```
```json
{
"jsonrpc": "2.0",
"id": 3,
"method": "herodb_lanceGetEmbeddingConfig",
"params": [1, "myset"]
}
```
## 2) Create a dataset
Choose a dimension that matches your embedding configuration. For OpenAI text-embedding-3-small without dimension override, typical dimension is 1536; when `dim` is set (e.g., 512), use that. The current API requires an explicit DIM.
RESP:
```bash
redis-cli -p 6379 LANCE.CREATE myset DIM 512
# → OK
```
JSON-RPC:
```json
{
"jsonrpc": "2.0",
"id": 4,
"method": "herodb_lanceCreate",
"params": [1, "myset", 512]
}
```
## 3) Store text documents (server-side embedding)
Provide your id, the text to embed, and optional META fields. The server computes the embedding using the configured provider and stores id/vector/text/meta in the Lance dataset. Upserts by id are supported via delete-then-append semantics.
RESP:
```bash
redis-cli -p 6379 LANCE.STORE myset ID doc-1 TEXT "Hello vector world" META title "Hello" category "demo"
# → OK
```
JSON-RPC:
```json
{
"jsonrpc": "2.0",
"id": 5,
"method": "herodb_lanceStoreText",
"params": [
1,
"myset",
"doc-1",
"Hello vector world",
{ "title": "Hello", "category": "demo" }
]
}
```
## 4) Search with a text query
Provide a query string; the server embeds it and performs KNN search. Optional: FILTER expression and RETURN subset of fields.
RESP:
```bash
# K nearest neighbors for the query text
redis-cli -p 6379 LANCE.SEARCH myset K 5 QUERY "greetings to vectors"
# → Array of hits: [id, score, [k,v, ...]] pairs, lower score = closer
# With a filter on meta fields and return only title
redis-cli -p 6379 LANCE.SEARCH myset K 3 QUERY "greetings to vectors" FILTER "category = 'demo'" RETURN 1 title
```
JSON-RPC:
```json
{
"jsonrpc": "2.0",
"id": 6,
"method": "herodb_lanceSearchText",
"params": [1, "myset", "greetings to vectors", 5, null, null]
}
```
With filter and selected fields:
```json
{
"jsonrpc": "2.0",
"id": 7,
"method": "herodb_lanceSearchText",
"params": [1, "myset", "greetings to vectors", 3, "category = 'demo'", ["title"]]
}
```
Response shape:
- RESP over redis-cli: an array of hits [id, score, [k, v, ...]].
- JSON-RPC returns an object containing the RESP-encoded wire format string or a structured result depending on implementation. See [src/rpc.rs](src/rpc.rs) for details.
## 5) Create an index (placeholder)
Index creation currently returns OK but is a no-op. It will integrate Lance vector indices in a future update.
RESP:
```bash
redis-cli -p 6379 LANCE.CREATEINDEX myset TYPE "ivf_pq" PARAM nlist 100 PARAM pq_m 16
# → OK (no-op for now)
```
JSON-RPC:
```json
{
"jsonrpc": "2.0",
"id": 8,
"method": "herodb_lanceCreateIndex",
"params": [1, "myset", "ivf_pq", { "nlist": "100", "pq_m": "16" }]
}
```
## 6) Inspect datasets
RESP:
```bash
# List datasets in current Lance DB
redis-cli -p 6379 LANCE.LIST
# Get dataset info
redis-cli -p 6379 LANCE.INFO myset
```
JSON-RPC:
```json
{
"jsonrpc": "2.0",
"id": 9,
"method": "herodb_lanceList",
"params": [1]
}
```
```json
{
"jsonrpc": "2.0",
"id": 10,
"method": "herodb_lanceInfo",
"params": [1, "myset"]
}
```
## 7) Delete and drop
RESP:
```bash
# Delete by id
redis-cli -p 6379 LANCE.DEL myset doc-1
# → OK
# Drop the entire dataset
redis-cli -p 6379 LANCE.DROP myset
# → OK
```
JSON-RPC:
```json
{
"jsonrpc": "2.0",
"id": 11,
"method": "herodb_lanceDel",
"params": [1, "myset", "doc-1"]
}
```
```json
{
"jsonrpc": "2.0",
"id": 12,
"method": "herodb_lanceDrop",
"params": [1, "myset"]
}
```
## 8) End-to-end example (RESP)
```bash
# 1. Select Lance DB (assume db_id=1 created via RPC)
redis-cli -p 6379 SELECT 1
# 2. Configure embedding provider (OpenAI small model at 512 dims)
redis-cli -p 6379 LANCE.EMBEDDING CONFIG SET myset PROVIDER openai MODEL text-embedding-3-small PARAM dim 512
# 3. Create dataset
redis-cli -p 6379 LANCE.CREATE myset DIM 512
# 4. Store documents
redis-cli -p 6379 LANCE.STORE myset ID doc-1 TEXT "The quick brown fox jumps over the lazy dog" META title "Fox" category "animal"
redis-cli -p 6379 LANCE.STORE myset ID doc-2 TEXT "A fast auburn fox vaulted a sleepy canine" META title "Fox paraphrase" category "animal"
# 5. Search
redis-cli -p 6379 LANCE.SEARCH myset K 2 QUERY "quick brown fox" RETURN 1 title
# 6. Dataset info and listing
redis-cli -p 6379 LANCE.INFO myset
redis-cli -p 6379 LANCE.LIST
# 7. Delete and drop
redis-cli -p 6379 LANCE.DEL myset doc-2
redis-cli -p 6379 LANCE.DROP myset
```
## 9) End-to-end example (JSON-RPC)
Assume RPC server on port 8080. Replace ids and ports as needed.
1) Create Lance DB:
```json
{
"jsonrpc": "2.0",
"id": 100,
"method": "herodb_createDatabase",
"params": ["Lance", { "name": "vectors-db", "storage_path": null, "max_size": null, "redis_version": null }, null]
}
```
2) Set embedding config:
```json
{
"jsonrpc": "2.0",
"id": 101,
"method": "herodb_lanceSetEmbeddingConfig",
"params": [1, "myset", "openai", "text-embedding-3-small", { "dim": "512" }]
}
```
3) Create dataset:
```json
{
"jsonrpc": "2.0",
"id": 102,
"method": "herodb_lanceCreate",
"params": [1, "myset", 512]
}
```
4) Store text:
```json
{
"jsonrpc": "2.0",
"id": 103,
"method": "herodb_lanceStoreText",
"params": [1, "myset", "doc-1", "The quick brown fox jumps over the lazy dog", { "title": "Fox", "category": "animal" }]
}
```
5) Search text:
```json
{
"jsonrpc": "2.0",
"id": 104,
"method": "herodb_lanceSearchText",
"params": [1, "myset", "quick brown fox", 2, null, ["title"]]
}
```
6) Info/list:
```json
{
"jsonrpc": "2.0",
"id": 105,
"method": "herodb_lanceInfo",
"params": [1, "myset"]
}
```
```json
{
"jsonrpc": "2.0",
"id": 106,
"method": "herodb_lanceList",
"params": [1]
}
```
7) Delete/drop:
```json
{
"jsonrpc": "2.0",
"id": 107,
"method": "herodb_lanceDel",
"params": [1, "myset", "doc-1"]
}
```
```json
{
"jsonrpc": "2.0",
"id": 108,
"method": "herodb_lanceDrop",
"params": [1, "myset"]
}
```
## 10) Operational notes and troubleshooting
- If using OpenAI and you see “missing API key env”, set:
- Standard: `export OPENAI_API_KEY=sk-...`
- Azure: `export AZURE_OPENAI_API_KEY=...` and pass `use_azure true`, `azure_endpoint`, `azure_deployment`, `azure_api_version`.
- Dimensions mismatch:
- Ensure the dataset DIM equals the providers embedding dim. For OpenAI text-embedding-3 models, set `PARAM dim 512` (or another supported size) and use that same DIM for `LANCE.CREATE`.
- DB 0 restriction:
- Lance is not allowed on DB 0. Use db_id >= 1.
- Permissions:
- Read operations (SEARCH, LIST, INFO) require read permission.
- Mutations (CREATE, STORE, CREATEINDEX, DEL, DROP, EMBEDDING CONFIG SET) require readwrite permission.
- Backend gating:
- On Lance DBs, only LANCE.* commands are accepted (plus basic control).
- Current index behavior:
- `LANCE.CREATEINDEX` returns OK but is a no-op. Future versions will integrate Lance vector indices.
- Implementation files for reference:
- [src/lance_store.rs](src/lance_store.rs), [src/cmd.rs](src/cmd.rs), [src/rpc.rs](src/rpc.rs), [src/server.rs](src/server.rs), [src/embedding.rs](src/embedding.rs)

View File

@@ -0,0 +1,138 @@
# LanceDB Text and Images: End-to-End Example
This guide demonstrates creating a Lance backend database, ingesting two text documents and two images, performing searches over both, and cleaning up the datasets.
Prerequisites
- Build HeroDB and start the server with JSON-RPC enabled.
Commands:
```bash
cargo build --release
./target/release/herodb --dir /tmp/herodb --admin-secret mysecret --port 6379 --enable-rpc
```
We'll use:
- redis-cli for RESP commands against port 6379
- curl for JSON-RPC against 8080 if desired
- Deterministic local embedders to avoid external dependencies: testhash (text, dim 64) and testimagehash (image, dim 512)
0) Create a Lance-backed database (JSON-RPC)
Request:
```json
{ "jsonrpc": "2.0", "id": 1, "method": "herodb_createDatabase", "params": ["Lance", { "name": "media-db", "storage_path": null, "max_size": null, "redis_version": null }, null] }
```
Response returns db_id (assume 1). Select DB over RESP:
```bash
redis-cli -p 6379 SELECT 1
# → OK
```
1) Configure embedding providers
We'll create two datasets with independent embedding configs:
- textset → provider testhash, dim 64
- imageset → provider testimagehash, dim 512
Text config:
```bash
redis-cli -p 6379 LANCE.EMBEDDING CONFIG SET textset PROVIDER testhash MODEL any PARAM dim 64
# → OK
```
Image config:
```bash
redis-cli -p 6379 LANCE.EMBEDDING CONFIG SET imageset PROVIDER testimagehash MODEL any PARAM dim 512
# → OK
```
2) Create datasets
```bash
redis-cli -p 6379 LANCE.CREATE textset DIM 64
# → OK
redis-cli -p 6379 LANCE.CREATE imageset DIM 512
# → OK
```
3) Ingest two text documents (server-side embedding)
```bash
redis-cli -p 6379 LANCE.STORE textset ID doc-1 TEXT "The quick brown fox jumps over the lazy dog" META title "Fox" category "animal"
# → OK
redis-cli -p 6379 LANCE.STORE textset ID doc-2 TEXT "A fast auburn fox vaulted a sleepy canine" META title "Paraphrase" category "animal"
# → OK
```
4) Ingest two images
You can provide a URI or base64 bytes. Use URI for URIs, BYTES for base64 data.
Example using free placeholder images:
```bash
# Store via URI
redis-cli -p 6379 LANCE.STOREIMAGE imageset ID img-1 URI "https://picsum.photos/seed/1/256/256" META title "Seed1" group "demo"
# → OK
redis-cli -p 6379 LANCE.STOREIMAGE imageset ID img-2 URI "https://picsum.photos/seed/2/256/256" META title "Seed2" group "demo"
# → OK
```
If your environment blocks outbound HTTP, you can embed image bytes:
```bash
# Example: read a local file and base64 it (replace path)
b64=$(base64 -w0 ./image1.png)
redis-cli -p 6379 LANCE.STOREIMAGE imageset ID img-b64-1 BYTES "$b64" META title "Local1" group "demo"
```
5) Search text
```bash
# Top-2 nearest neighbors for a query
redis-cli -p 6379 LANCE.SEARCH textset K 2 QUERY "quick brown fox" RETURN 1 title
# → 1) [id, score, [k1,v1,...]]
```
With a filter (supports equality on schema or meta keys):
```bash
redis-cli -p 6379 LANCE.SEARCH textset K 2 QUERY "fox jumps" FILTER "category = 'animal'" RETURN 1 title
```
6) Search images
```bash
# Provide a URI as the query
redis-cli -p 6379 LANCE.SEARCHIMAGE imageset K 2 QUERYURI "https://picsum.photos/seed/1/256/256" RETURN 1 title
# Or provide base64 bytes as the query
qb64=$(curl -s https://picsum.photos/seed/3/256/256 | base64 -w0)
redis-cli -p 6379 LANCE.SEARCHIMAGE imageset K 2 QUERYBYTES "$qb64" RETURN 1 title
```
7) Inspect datasets
```bash
redis-cli -p 6379 LANCE.LIST
redis-cli -p 6379 LANCE.INFO textset
redis-cli -p 6379 LANCE.INFO imageset
```
8) Delete by id and drop datasets
```bash
# Delete one record
redis-cli -p 6379 LANCE.DEL textset doc-2
# → OK
# Drop entire datasets
redis-cli -p 6379 LANCE.DROP textset
redis-cli -p 6379 LANCE.DROP imageset
# → OK
```
Appendix: Using OpenAI embeddings instead of test providers
Text:
```bash
export OPENAI_API_KEY=sk-...
redis-cli -p 6379 LANCE.EMBEDDING CONFIG SET textset PROVIDER openai MODEL text-embedding-3-small PARAM dim 512
redis-cli -p 6379 LANCE.CREATE textset DIM 512
```
Azure OpenAI:
```bash
export AZURE_OPENAI_API_KEY=...
redis-cli -p 6379 LANCE.EMBEDDING CONFIG SET textset PROVIDER openai MODEL text-embedding-3-small \
PARAM use_azure true \
PARAM azure_endpoint https://myresource.openai.azure.com \
PARAM azure_deployment my-embed-deploy \
PARAM azure_api_version 2024-02-15 \
PARAM dim 512
```
Notes:
- Ensure dataset DIM matches the configured embedding dimension.
- Lance is only available for non-admin databases (db_id >= 1).
- On Lance DBs, only LANCE.* and basic control commands are allowed.

253
docs/tantivy.md Normal file
View File

@@ -0,0 +1,253 @@
# Tantivy FullText Backend (JSONRPC)
This document explains how to use HeroDBs Tantivy-backed fulltext search as a dedicated database backend and provides copypasteable JSONRPC requests. Tantivy is available only for nonadmin databases (db_id >= 1). Admin DB 0 always uses Redb/Sled and rejects FT operations.
Important characteristics:
- Tantivy is a third backend alongside Redb and Sled. It provides search indexes only; there is no KV store backing it.
- On Tantivy databases, Redis KV/list/hash commands are rejected; only FT commands and basic control (SELECT, CLIENT, INFO, etc.) are allowed.
- FT JSONRPC is namespaced as "herodb" and methods are named with underscore: herodb_ftCreate, herodb_ftAdd, herodb_ftSearch, herodb_ftDel, herodb_ftInfo, herodb_ftDrop.
Reference to server implementation:
- RPC methods are defined in [rust.trait Rpc()](src/rpc.rs:70):
- [rust.fn ft_create()](src/rpc.rs:121)
- [rust.fn ft_add()](src/rpc.rs:130)
- [rust.fn ft_search()](src/rpc.rs:141)
- [rust.fn ft_del()](src/rpc.rs:154)
- [rust.fn ft_info()](src/rpc.rs:158)
- [rust.fn ft_drop()](src/rpc.rs:162)
Notes on responses:
- ftCreate/ftAdd/ftDel/ftDrop return a JSON boolean: true on success.
- ftSearch/ftInfo return a JSON object with a single key "resp" containing a RESPencoded string (wire format used by Redis). You can display or parse it on the client side as needed.
RESP usage (redis-cli):
- For RESP clients, you must SELECT the Tantivy database first. SELECT now succeeds for Tantivy DBs without opening KV storage.
- After SELECT, you can run FT.* commands within that DB context.
Example with redis-cli:
```bash
# Connect to server
redis-cli -p 6379
# Select Tantivy DB 1 (public by default)
SELECT 1
# → OK
# Create index
FT.CREATE product_catalog SCHEMA title TEXT description TEXT category TAG price NUMERIC rating NUMERIC location GEO
# → OK
# Add a document
FT.ADD product_catalog product:1 1.0 title "Wireless Bluetooth Headphones" description "Premium noise-canceling headphones with 30-hour battery life" category "electronics,audio" price 299.99 rating 4.5 location "-122.4194,37.7749"
# → OK
# Search
FT.SEARCH product_catalog wireless LIMIT 0 3
# → RESP array with hits
```
Storage layout (on disk):
- Indices are stored per database under:
- <base_dir>/search_indexes/<db_id>/<index_name>
- Example: /tmp/test/search_indexes/1/product_catalog
0) Create a new Tantivy database
Use herodb_createDatabase with backend "Tantivy". DB 0 cannot be Tantivy.
```json
{
"jsonrpc": "2.0",
"id": 1,
"method": "herodb_createDatabase",
"params": [
"Tantivy",
{ "name": "search-db", "storage_path": null, "max_size": null, "redis_version": null },
null
]
}
```
The response contains the allocated db_id (>= 1). Use that id in the calls below.
1) FT.CREATE — create an index with schema
Method: herodb_ftCreate → [rust.fn ft_create()](src/rpc.rs:121)
Schema format is an array of tuples: [ [field_name, field_type, [options...] ], ... ]
Supported field types: "TEXT", "NUMERIC" (defaults to F64), "TAG", "GEO"
Supported options (subset): "WEIGHT", "SORTABLE", "NOINDEX", "SEPARATOR", "CASESENSITIVE"
```json
{
"jsonrpc": "2.0",
"id": 2,
"method": "herodb_ftCreate",
"params": [
1,
"product_catalog",
[
["title", "TEXT", ["SORTABLE"]],
["description", "TEXT", []],
["category", "TAG", ["SEPARATOR", ","]],
["price", "NUMERIC", ["SORTABLE"]],
["rating", "NUMERIC", []],
["location", "GEO", []]
]
]
}
```
Returns: true on success.
2) FT.ADD — add or replace a document
Method: herodb_ftAdd → [rust.fn ft_add()](src/rpc.rs:130)
Fields is an object (map) of field_name → value (all values are sent as strings). GEO expects "lat,lon".
```json
{
"jsonrpc": "2.0",
"id": 3,
"method": "herodb_ftAdd",
"params": [
1,
"product_catalog",
"product:1",
1.0,
{
"title": "Wireless Bluetooth Headphones",
"description": "Premium noise-canceling headphones with 30-hour battery life",
"category": "electronics,audio",
"price": "299.99",
"rating": "4.5",
"location": "-122.4194,37.7749"
}
]
}
```
Returns: true on success.
3) FT.SEARCH — query an index
Method: herodb_ftSearch → [rust.fn ft_search()](src/rpc.rs:141)
Parameters: (db_id, index_name, query, filters?, limit?, offset?, return_fields?)
- filters: array of [field, value] pairs (Equals filter)
- limit/offset: numbers (defaults: limit=10, offset=0)
- return_fields: array of field names to include (optional)
Simple query:
```json
{
"jsonrpc": "2.0",
"id": 4,
"method": "herodb_ftSearch",
"params": [1, "product_catalog", "wireless", null, 10, 0, null]
}
```
Pagination + filters + selected fields:
```json
{
"jsonrpc": "2.0",
"id": 5,
"method": "herodb_ftSearch",
"params": [
1,
"product_catalog",
"mouse",
[["category", "electronics"]],
5,
0,
["title", "price", "rating"]
]
}
```
Response shape:
```json
{
"jsonrpc": "2.0",
"id": 5,
"result": { "resp": "*...RESP encoded array..." }
}
```
4) FT.INFO — index metadata
Method: herodb_ftInfo → [rust.fn ft_info()](src/rpc.rs:158)
```json
{
"jsonrpc": "2.0",
"id": 6,
"method": "herodb_ftInfo",
"params": [1, "product_catalog"]
}
```
Response shape:
```json
{
"jsonrpc": "2.0",
"id": 6,
"result": { "resp": "*...RESP encoded array with fields and counts..." }
}
```
5) FT.DEL — delete by doc id
Method: herodb_ftDel → [rust.fn ft_del()](src/rpc.rs:154)
```json
{
"jsonrpc": "2.0",
"id": 7,
"method": "herodb_ftDel",
"params": [1, "product_catalog", "product:1"]
}
```
Returns: true on success. Note: current implementation logs and returns success; physical delete may be a noop until delete is finalized in the engine.
6) FT.DROP — drop an index
Method: herodb_ftDrop → [rust.fn ft_drop()](src/rpc.rs:162)
```json
{
"jsonrpc": "2.0",
"id": 8,
"method": "herodb_ftDrop",
"params": [1, "product_catalog"]
}
```
Returns: true on success.
Field types and options
- TEXT: stored/indexed/tokenized text. "SORTABLE" marks it fast (stored + fast path in our wrapper).
- NUMERIC: stored/indexed numeric; default precision F64. "SORTABLE" enables fast column.
- TAG: exact matching terms. Options: "SEPARATOR" (default ","), "CASESENSITIVE" (default false).
- GEO: "lat,lon" string; stored as two numeric fields internally.
Backend and permission gating
- FT methods are rejected on DB 0.
- FT methods require the database backend to be Tantivy; otherwise RPC returns an error.
- Writelike FT methods (create/add/del/drop) follow the same permission model as Redis writes on selected databases.
Troubleshooting
- "DB backend is not Tantivy": ensure the database was created with backend "Tantivy".
- "FT not allowed on DB 0": use a nonadmin database id (>= 1).
- Empty search results: confirm that the queried fields are tokenized/indexed (TEXT) and that documents were added successfully.
Related docs
- Commandlevel search overview: [docs/search.md](docs/search.md:1)
- RPC definitions: [src/rpc.rs](src/rpc.rs:1)

143
run.sh Executable file
View File

@@ -0,0 +1,143 @@
#!/bin/bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
# Test script for HeroDB - Redis-compatible database with redb backend
# This script starts the server and runs comprehensive tests
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Configuration
DB_DIR="/tmp/test_db"
PORT=6381
SERVER_PID=""
# Function to print colored output
print_status() {
echo -e "${BLUE}[INFO]${NC} $1"
}
print_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
# Function to cleanup on exit
cleanup() {
if [ ! -z "$SERVER_PID" ]; then
print_status "Stopping HeroDB server (PID: $SERVER_PID)..."
kill $SERVER_PID 2>/dev/null || true
wait $SERVER_PID 2>/dev/null || true
fi
# Clean up test database
if [ -d "$DB_DIR" ]; then
print_status "Cleaning up test database directory..."
rm -rf "$DB_DIR"
fi
}
# Set trap to cleanup on script exit
trap cleanup EXIT
# Function to wait for server to start
wait_for_server() {
local max_attempts=30
local attempt=1
print_status "Waiting for server to start on port $PORT..."
while [ $attempt -le $max_attempts ]; do
if nc -z localhost $PORT 2>/dev/null; then
print_success "Server is ready!"
return 0
fi
echo -n "."
sleep 1
attempt=$((attempt + 1))
done
print_error "Server failed to start within $max_attempts seconds"
return 1
}
# Function to send Redis command and get response
redis_cmd() {
local cmd="$1"
local expected="$2"
print_status "Testing: $cmd"
local result=$(echo "$cmd" | redis-cli -p $PORT --raw 2>/dev/null || echo "ERROR")
if [ "$expected" != "" ] && [ "$result" != "$expected" ]; then
print_error "Expected: '$expected', Got: '$result'"
return 1
else
print_success "$cmd -> $result"
return 0
fi
}
# Main execution
main() {
print_status "Starting HeroDB"
# Build the project
print_status "Building HeroDB..."
if ! cargo build -p herodb --release; then
print_error "Failed to build HeroDB"
exit 1
fi
# Create test database directory
mkdir -p "$DB_DIR"
# Start the server
print_status "Starting HeroDB server..."
${SCRIPT_DIR}/target/release/herodb --dir "$DB_DIR" --port $PORT &
SERVER_PID=$!
# Wait for server to start
if ! wait_for_server; then
print_error "Failed to start server"
exit 1
fi
}
# Check dependencies
check_dependencies() {
if ! command -v cargo &> /dev/null; then
print_error "cargo is required but not installed"
exit 1
fi
if ! command -v nc &> /dev/null; then
print_warning "netcat (nc) not found - some tests may not work properly"
fi
if ! command -v redis-cli &> /dev/null; then
print_warning "redis-cli not found - using netcat fallback"
fi
}
# Run dependency check and main function
check_dependencies
main "$@"
tail -f /dev/null

View File

@@ -1,4 +1,7 @@
#!/bin/bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
echo "🧪 Running HeroDB Redis Compatibility Tests"
echo "=========================================="

View File

@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::{Arc, OnceLock, Mutex, RwLock};
use std::collections::HashMap;
@@ -35,11 +35,11 @@ static DATA_STORAGES: OnceLock<RwLock<HashMap<u64, Arc<dyn StorageBackend>>>> =
static DATA_INIT_LOCK: Mutex<()> = Mutex::new(());
fn init_admin_storage(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
) -> Result<Arc<dyn StorageBackend>, DBError> {
let db_file = PathBuf::from(base_dir).join("0.db");
let db_file = base_dir.join("0.db");
if let Some(parent_dir) = db_file.parent() {
std::fs::create_dir_all(parent_dir).map_err(|e| {
DBError(format!("Failed to create directory {}: {}", parent_dir.display(), e))
@@ -48,30 +48,34 @@ fn init_admin_storage(
let storage: Arc<dyn StorageBackend> = match backend {
options::BackendType::Redb => Arc::new(Storage::new(&db_file, true, Some(admin_secret))?),
options::BackendType::Sled => Arc::new(SledStorage::new(&db_file, true, Some(admin_secret))?),
options::BackendType::Tantivy | options::BackendType::Lance => {
return Err(DBError("Admin DB 0 cannot use search-only backends (Tantivy/Lance)".to_string()))
}
};
Ok(storage)
}
// Get or initialize a cached handle to admin DB 0 per base_dir (thread-safe, no double-open race)
pub fn open_admin_storage(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
) -> Result<Arc<dyn StorageBackend>, DBError> {
let map = ADMIN_STORAGES.get_or_init(|| RwLock::new(HashMap::new()));
let key = base_dir.display().to_string();
// Fast path
if let Some(st) = map.read().unwrap().get(base_dir) {
if let Some(st) = map.read().unwrap().get(&key) {
return Ok(st.clone());
}
// Slow path with write lock
{
let mut w = map.write().unwrap();
if let Some(st) = w.get(base_dir) {
if let Some(st) = w.get(&key) {
return Ok(st.clone());
}
// Detect existing 0.db backend by filesystem, if present.
let admin_path = PathBuf::from(base_dir).join("0.db");
let admin_path = base_dir.join("0.db");
let detected = if admin_path.exists() {
if admin_path.is_file() {
Some(options::BackendType::Redb)
@@ -99,14 +103,14 @@ pub fn open_admin_storage(
};
let st = init_admin_storage(base_dir, effective_backend, admin_secret)?;
w.insert(base_dir.to_string(), st.clone());
w.insert(key, st.clone());
Ok(st)
}
}
// Ensure admin structures exist in encrypted DB 0
pub fn ensure_bootstrap(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
) -> Result<(), DBError> {
@@ -122,7 +126,7 @@ pub fn ensure_bootstrap(
// Get or initialize a shared handle to a data DB (> 0), avoiding double-open across subsystems
pub fn open_data_storage(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -156,7 +160,7 @@ pub fn open_data_storage(
// 2) If missing, sniff filesystem (file => Redb, dir => Sled), then persist into admin meta
// 3) Fallback to requested 'backend' (startup default) if nothing else is known
let meta_backend = get_database_backend(base_dir, backend.clone(), admin_secret, id).ok().flatten();
let db_path = PathBuf::from(base_dir).join(format!("{}.db", id));
let db_path = base_dir.join(format!("{}.db", id));
let sniffed_backend = if db_path.exists() {
if db_path.is_file() {
Some(options::BackendType::Redb)
@@ -199,6 +203,12 @@ pub fn open_data_storage(
let storage: Arc<dyn StorageBackend> = match effective_backend {
options::BackendType::Redb => Arc::new(Storage::new(&db_file, should_encrypt, enc.as_deref())?),
options::BackendType::Sled => Arc::new(SledStorage::new(&db_file, should_encrypt, enc.as_deref())?),
options::BackendType::Tantivy => {
return Err(DBError("Tantivy backend has no KV storage; use FT.* commands only".to_string()))
}
options::BackendType::Lance => {
return Err(DBError("Lance backend has no KV storage; use LANCE.* commands only".to_string()))
}
};
// Publish to registry
@@ -208,7 +218,7 @@ pub fn open_data_storage(
// Allocate the next DB id and persist new pointer
pub fn allocate_next_id(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
) -> Result<u64, DBError> {
@@ -232,7 +242,7 @@ pub fn allocate_next_id(
// Check existence of a db id in admin:dbs
pub fn db_exists(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -243,7 +253,7 @@ pub fn db_exists(
// Get per-db encryption key, if any
pub fn get_enc_key(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -254,7 +264,7 @@ pub fn get_enc_key(
// Set per-db encryption key (called during create)
pub fn set_enc_key(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -266,7 +276,7 @@ pub fn set_enc_key(
// Set database public flag
pub fn set_database_public(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -280,7 +290,7 @@ pub fn set_database_public(
// Persist per-db backend type in admin metadata (module-scope)
pub fn set_database_backend(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -291,13 +301,15 @@ pub fn set_database_backend(
let val = match db_backend {
options::BackendType::Redb => "Redb",
options::BackendType::Sled => "Sled",
options::BackendType::Tantivy => "Tantivy",
options::BackendType::Lance => "Lance",
};
let _ = admin.hset(&mk, vec![("backend".to_string(), val.to_string())])?;
Ok(())
}
pub fn get_database_backend(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -307,13 +319,15 @@ pub fn get_database_backend(
match admin.hget(&mk, "backend")? {
Some(s) if s == "Redb" => Ok(Some(options::BackendType::Redb)),
Some(s) if s == "Sled" => Ok(Some(options::BackendType::Sled)),
Some(s) if s == "Tantivy" => Ok(Some(options::BackendType::Tantivy)),
Some(s) if s == "Lance" => Ok(Some(options::BackendType::Lance)),
_ => Ok(None),
}
}
// Set database name
pub fn set_database_name(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -327,7 +341,7 @@ pub fn set_database_name(
// Get database name
pub fn get_database_name(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -351,7 +365,7 @@ fn load_public(
// Add access key for db (value format: "Read:ts" or "ReadWrite:ts")
pub fn add_access_key(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -370,7 +384,7 @@ pub fn add_access_key(
// Delete access key by hash
pub fn delete_access_key(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -383,7 +397,7 @@ pub fn delete_access_key(
// List access keys, returning (hash, perms, created_at_secs)
pub fn list_access_keys(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -403,7 +417,7 @@ pub fn list_access_keys(
// - Ok(Some(Permissions)) when access is allowed
// - Ok(None) when not allowed or db missing (caller can distinguish by calling db_exists)
pub fn verify_access(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
id: u64,
@@ -424,25 +438,31 @@ pub fn verify_access(
return Ok(None);
}
// Public?
if load_public(&admin, id)? {
return Ok(Some(Permissions::ReadWrite));
}
let is_public = load_public(&admin, id)?;
// Private: require key and verify
// If a key is explicitly provided, enforce its validity strictly.
// Do NOT fall back to public when an invalid key is supplied.
if let Some(k) = key_opt {
let hash = crate::rpc::hash_key(k);
if let Some(v) = admin.hget(&k_meta_db_keys(id), &hash)? {
let (perm, _ts) = parse_perm_value(&v);
return Ok(Some(perm));
}
// Invalid key
return Ok(None);
}
// No key provided: allow access if DB is public, otherwise deny
if is_public {
Ok(Some(Permissions::ReadWrite))
} else {
Ok(None)
}
Ok(None)
}
// Enumerate all db ids
pub fn list_dbs(
base_dir: &str,
base_dir: &Path,
backend: options::BackendType,
admin_secret: &str,
) -> Result<Vec<u64>, DBError> {

1010
src/cmd.rs

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,8 @@
use chacha20poly1305::{
aead::{Aead, KeyInit, OsRng},
aead::{Aead, KeyInit},
XChaCha20Poly1305, XNonce,
};
use rand::RngCore;
use rand::{rngs::OsRng, RngCore};
use sha2::{Digest, Sha256};
const VERSION: u8 = 1;
@@ -31,7 +31,7 @@ pub struct CryptoFactory {
impl CryptoFactory {
/// Accepts any secret bytes; turns them into a 32-byte key (SHA-256).
pub fn new<S: AsRef<[u8]>>(secret: S) -> Self {
let mut h = Sha256::new();
let mut h = Sha256::default();
h.update(b"xchacha20poly1305-factory:v1"); // domain separation
h.update(secret.as_ref());
let digest = h.finalize(); // 32 bytes

405
src/embedding.rs Normal file
View File

@@ -0,0 +1,405 @@
// Embedding abstraction and minimal providers.
use std::collections::HashMap;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::error::DBError;
// Networking for OpenAI/Azure
use std::time::Duration;
use ureq::{Agent, AgentBuilder};
use serde_json::json;
/// Provider identifiers. Extend as needed to mirror LanceDB-supported providers.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum EmbeddingProvider {
// Deterministic, local-only embedder for CI and offline development (text).
TestHash,
// Deterministic, local-only embedder for CI and offline development (image).
ImageTestHash,
// Placeholders for LanceDB-supported providers; implementers can add concrete backends later.
LanceFastEmbed,
LanceOpenAI,
LanceOther(String),
}
/// Serializable embedding configuration.
/// params: arbitrary key-value map for provider-specific knobs (e.g., "dim", "api_key_env", etc.)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingConfig {
pub provider: EmbeddingProvider,
pub model: String,
#[serde(default)]
pub params: HashMap<String, String>,
}
impl EmbeddingConfig {
pub fn get_param_usize(&self, key: &str) -> Option<usize> {
self.params.get(key).and_then(|v| v.parse::<usize>().ok())
}
pub fn get_param_string(&self, key: &str) -> Option<String> {
self.params.get(key).cloned()
}
}
/// A provider-agnostic text embedding interface.
pub trait Embedder: Send + Sync {
/// Human-readable provider/model name
fn name(&self) -> String;
/// Embedding dimension
fn dim(&self) -> usize;
/// Embed a single text string into a fixed-length vector
fn embed(&self, text: &str) -> Result<Vec<f32>, DBError>;
/// Embed many texts; default maps embed() over inputs
fn embed_many(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, DBError> {
texts.iter().map(|t| self.embed(t)).collect()
}
}
//// ----------------------------- TEXT: deterministic test embedder -----------------------------
/// Deterministic, no-deps, no-network embedder for CI and offline dev.
/// Algorithm:
/// - Fold bytes of UTF-8 into 'dim' buckets with a simple rolling hash
/// - Apply tanh-like scaling and L2-normalize to unit length
pub struct TestHashEmbedder {
dim: usize,
model_name: String,
}
impl TestHashEmbedder {
pub fn new(dim: usize, model_name: impl Into<String>) -> Self {
Self { dim, model_name: model_name.into() }
}
fn l2_normalize(mut v: Vec<f32>) -> Vec<f32> {
let norm: f32 = v.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
for x in &mut v {
*x /= norm;
}
}
v
}
}
impl Embedder for TestHashEmbedder {
fn name(&self) -> String {
format!("test-hash:{}", self.model_name)
}
fn dim(&self) -> usize {
self.dim
}
fn embed(&self, text: &str) -> Result<Vec<f32>, DBError> {
let mut acc = vec![0f32; self.dim];
// A simple, deterministic folding hash over bytes
let mut h1: u32 = 2166136261u32; // FNV-like seed
let mut h2: u32 = 0x9e3779b9u32; // golden ratio
for (i, b) in text.as_bytes().iter().enumerate() {
h1 ^= *b as u32;
h1 = h1.wrapping_mul(16777619u32);
h2 = h2.wrapping_add(((*b as u32) << (i % 13)) ^ (h1.rotate_left((i % 7) as u32)));
let idx = (h1 ^ h2) as usize % self.dim;
// Map byte to [-1, 1] and accumulate with mild decay by position
let val = ((*b as f32) / 127.5 - 1.0) * (1.0 / (1.0 + (i as f32 / 32.0)));
acc[idx] += val;
}
// Non-linear squashing to stabilize + normalize
for x in &mut acc {
*x = x.tanh();
}
Ok(Self::l2_normalize(acc))
}
}
//// ----------------------------- IMAGE: trait + deterministic test embedder -----------------------------
/// Image embedding interface (separate from text to keep modality-specific inputs).
pub trait ImageEmbedder: Send + Sync {
/// Human-readable provider/model name
fn name(&self) -> String;
/// Embedding dimension
fn dim(&self) -> usize;
/// Embed a single image (raw bytes)
fn embed_image(&self, bytes: &[u8]) -> Result<Vec<f32>, DBError>;
/// Embed many images; default maps embed_image() over inputs
fn embed_many_images(&self, images: &[Vec<u8>]) -> Result<Vec<Vec<f32>>, DBError> {
images.iter().map(|b| self.embed_image(b)).collect()
}
}
/// Deterministic image embedder that folds bytes into buckets, applies tanh-like nonlinearity,
/// and L2-normalizes. Suitable for CI and offline development.
/// NOTE: This is NOT semantic; it is a stable hash-like representation.
pub struct TestImageHashEmbedder {
dim: usize,
model_name: String,
}
impl TestImageHashEmbedder {
pub fn new(dim: usize, model_name: impl Into<String>) -> Self {
Self { dim, model_name: model_name.into() }
}
fn l2_normalize(mut v: Vec<f32>) -> Vec<f32> {
let norm: f32 = v.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
for x in &mut v {
*x /= norm;
}
}
v
}
}
impl ImageEmbedder for TestImageHashEmbedder {
fn name(&self) -> String {
format!("test-image-hash:{}", self.model_name)
}
fn dim(&self) -> usize {
self.dim
}
fn embed_image(&self, bytes: &[u8]) -> Result<Vec<f32>, DBError> {
// Deterministic fold across bytes with two rolling accumulators.
let mut acc = vec![0f32; self.dim];
let mut h1: u32 = 0x811C9DC5; // FNV-like
let mut h2: u32 = 0x9E3779B9; // golden ratio
for (i, b) in bytes.iter().enumerate() {
h1 ^= *b as u32;
h1 = h1.wrapping_mul(16777619u32);
// combine with position and h2
h2 = h2.wrapping_add(((i as u32).rotate_left((i % 13) as u32)) ^ h1.rotate_left((i % 7) as u32));
let idx = (h1 ^ h2) as usize % self.dim;
// Map to [-1,1] and decay with position
let val = ((*b as f32) / 127.5 - 1.0) * (1.0 / (1.0 + (i as f32 / 128.0)));
acc[idx] += val;
}
for x in &mut acc {
*x = x.tanh();
}
Ok(Self::l2_normalize(acc))
}
}
//// OpenAI embedder (supports OpenAI and Azure OpenAI via REST)
struct OpenAIEmbedder {
model: String,
dim: usize,
agent: Agent,
endpoint: String,
headers: Vec<(String, String)>,
use_azure: bool,
}
impl OpenAIEmbedder {
fn new_from_config(cfg: &EmbeddingConfig) -> Result<Self, DBError> {
// Whether to use Azure OpenAI
let use_azure = cfg
.get_param_string("use_azure")
.map(|s| s.eq_ignore_ascii_case("true"))
.unwrap_or(false);
// Resolve API key (OPENAI_API_KEY or AZURE_OPENAI_API_KEY by default)
let api_key_env = cfg
.get_param_string("api_key_env")
.unwrap_or_else(|| {
if use_azure {
"AZURE_OPENAI_API_KEY".to_string()
} else {
"OPENAI_API_KEY".to_string()
}
});
let api_key = std::env::var(&api_key_env)
.map_err(|_| DBError(format!("Missing API key in env '{}'", api_key_env)))?;
// Resolve endpoint
// - Standard OpenAI: https://api.openai.com/v1/embeddings (default) or params["base_url"]
// - Azure OpenAI: {azure_endpoint}/openai/deployments/{deployment}/embeddings?api-version=...
let endpoint = if use_azure {
let base = cfg
.get_param_string("azure_endpoint")
.ok_or_else(|| DBError("Missing 'azure_endpoint' for Azure OpenAI".into()))?;
let deployment = cfg
.get_param_string("azure_deployment")
.unwrap_or_else(|| cfg.model.clone());
let api_version = cfg
.get_param_string("azure_api_version")
.unwrap_or_else(|| "2023-05-15".to_string());
format!(
"{}/openai/deployments/{}/embeddings?api-version={}",
base.trim_end_matches('/'),
deployment,
api_version
)
} else {
cfg.get_param_string("base_url")
.unwrap_or_else(|| "https://api.openai.com/v1/embeddings".to_string())
};
// Determine expected dimension (default 1536 for text-embedding-3-small; callers should override if needed)
let dim = cfg
.get_param_usize("dim")
.or_else(|| cfg.get_param_usize("dimensions"))
.unwrap_or(1536);
// Build an HTTP agent with timeouts (blocking; no tokio runtime involved)
let agent = AgentBuilder::new()
.timeout_read(Duration::from_secs(30))
.timeout_write(Duration::from_secs(30))
.build();
// Headers
let mut headers: Vec<(String, String)> = Vec::new();
headers.push(("Content-Type".to_string(), "application/json".to_string()));
if use_azure {
headers.push(("api-key".to_string(), api_key));
} else {
headers.push(("Authorization".to_string(), format!("Bearer {}", api_key)));
}
Ok(Self {
model: cfg.model.clone(),
dim,
agent,
endpoint,
headers,
use_azure,
})
}
fn request_many(&self, inputs: &[String]) -> Result<Vec<Vec<f32>>, DBError> {
// Compose request body:
// - Standard OpenAI: { "model": ..., "input": [...], "dimensions": dim? }
// - Azure: { "input": [...], "dimensions": dim? } (model from deployment)
let mut body = if self.use_azure {
json!({ "input": inputs })
} else {
json!({ "model": self.model, "input": inputs })
};
if self.dim > 0 {
body.as_object_mut()
.unwrap()
.insert("dimensions".to_string(), json!(self.dim));
}
// Build request
let mut req = self.agent.post(&self.endpoint);
for (k, v) in &self.headers {
req = req.set(k, v);
}
// Send and handle errors
let resp = req.send_json(body);
let text = match resp {
Ok(r) => r
.into_string()
.map_err(|e| DBError(format!("Failed to read embeddings response: {}", e)))?,
Err(ureq::Error::Status(code, r)) => {
let body = r.into_string().unwrap_or_default();
return Err(DBError(format!("Embeddings API error {}: {}", code, body)));
}
Err(e) => return Err(DBError(format!("HTTP request failed: {}", e))),
};
let val: serde_json::Value = serde_json::from_str(&text)
.map_err(|e| DBError(format!("Invalid JSON from embeddings API: {}", e)))?;
let data = val
.get("data")
.and_then(|d| d.as_array())
.ok_or_else(|| DBError("Embeddings API response missing 'data' array".into()))?;
let mut out: Vec<Vec<f32>> = Vec::with_capacity(data.len());
for item in data {
let emb = item
.get("embedding")
.and_then(|e| e.as_array())
.ok_or_else(|| DBError("Embeddings API item missing 'embedding'".into()))?;
let mut v: Vec<f32> = Vec::with_capacity(emb.len());
for n in emb {
let f = n
.as_f64()
.ok_or_else(|| DBError("Embedding element is not a number".into()))?;
v.push(f as f32);
}
if self.dim > 0 && v.len() != self.dim {
return Err(DBError(format!(
"Embedding dimension mismatch: expected {}, got {}. Configure 'dim' or 'dimensions' to match output.",
self.dim, v.len()
)));
}
out.push(v);
}
Ok(out)
}
}
impl Embedder for OpenAIEmbedder {
fn name(&self) -> String {
if self.use_azure {
format!("azure-openai:{}", self.model)
} else {
format!("openai:{}", self.model)
}
}
fn dim(&self) -> usize {
self.dim
}
fn embed(&self, text: &str) -> Result<Vec<f32>, DBError> {
let v = self.request_many(&[text.to_string()])?;
Ok(v.into_iter().next().unwrap_or_else(|| vec![0.0; self.dim]))
}
fn embed_many(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, DBError> {
if texts.is_empty() {
return Ok(vec![]);
}
self.request_many(texts)
}
}
/// Create an embedder instance from a config.
/// - TestHash: uses params["dim"] or defaults to 64
/// - LanceOpenAI: uses OpenAI (or Azure OpenAI) embeddings REST API
/// - Other Lance providers can be added similarly
pub fn create_embedder(config: &EmbeddingConfig) -> Result<Arc<dyn Embedder>, DBError> {
match &config.provider {
EmbeddingProvider::TestHash => {
let dim = config.get_param_usize("dim").unwrap_or(64);
Ok(Arc::new(TestHashEmbedder::new(dim, config.model.clone())))
}
EmbeddingProvider::LanceOpenAI => {
let inner = OpenAIEmbedder::new_from_config(config)?;
Ok(Arc::new(inner))
}
EmbeddingProvider::ImageTestHash => {
Err(DBError("Use create_image_embedder() for image providers".into()))
}
EmbeddingProvider::LanceFastEmbed => Err(DBError("LanceFastEmbed provider not yet implemented in Rust embedding layer; configure 'test-hash' or use 'openai'".into())),
EmbeddingProvider::LanceOther(p) => Err(DBError(format!("Lance provider '{}' not implemented; configure 'openai' or 'test-hash'", p))),
}
}
/// Create an image embedder instance from a config.
pub fn create_image_embedder(config: &EmbeddingConfig) -> Result<Arc<dyn ImageEmbedder>, DBError> {
match &config.provider {
EmbeddingProvider::ImageTestHash => {
let dim = config.get_param_usize("dim").unwrap_or(512);
Ok(Arc::new(TestImageHashEmbedder::new(dim, config.model.clone())))
}
EmbeddingProvider::TestHash | EmbeddingProvider::LanceOpenAI => {
Err(DBError("Configured text provider; dataset expects image provider (e.g., 'testimagehash')".into()))
}
EmbeddingProvider::LanceFastEmbed => Err(DBError("Image provider 'lancefastembed' not yet implemented".into())),
EmbeddingProvider::LanceOther(p) => Err(DBError(format!("Image provider '{}' not implemented; use 'testimagehash' for now", p))),
}
}

663
src/lance_store.rs Normal file
View File

@@ -0,0 +1,663 @@
// LanceDB store abstraction (per database instance)
// This module encapsulates all Lance/LanceDB operations for a given DB id.
// Notes:
// - We persist each dataset (aka "table") under <base_dir>/lance/<db_id>/<name>.lance
// - Schema convention: id: Utf8 (non-null), vector: FixedSizeList<Float32, dim> (non-null), meta: Utf8 (nullable JSON string)
// - We implement naive KNN (L2) scan in Rust for search to avoid tight coupling to lancedb search builder API.
// Index creation uses lance::Dataset vector index; future optimization can route to index-aware search.
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::error::DBError;
use arrow_array::{Array, RecordBatch, RecordBatchIterator, StringArray};
use arrow_array::builder::{FixedSizeListBuilder, Float32Builder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_schema::{DataType, Field, Schema};
use futures::StreamExt;
use serde_json::Value as JsonValue;
// Low-level Lance core
use lance::dataset::{WriteMode, WriteParams};
use lance::Dataset;
// Vector index (IVF_PQ etc.)
// High-level LanceDB (for deletes where available)
use lancedb::connection::Connection;
use arrow_array::types::Float32Type;
#[derive(Clone)]
pub struct LanceStore {
base_dir: PathBuf,
db_id: u64,
}
impl LanceStore {
// Create a new LanceStore rooted at <base_dir>/lance/<db_id>
pub fn new(base_dir: &Path, db_id: u64) -> Result<Self, DBError> {
let p = base_dir.join("lance").join(db_id.to_string());
std::fs::create_dir_all(&p)
.map_err(|e| DBError(format!("Failed to create Lance dir {}: {}", p.display(), e)))?;
Ok(Self { base_dir: p, db_id })
}
fn dataset_path(&self, name: &str) -> PathBuf {
// Store datasets as directories or files with .lance suffix
// We accept both "<name>" and "<name>.lance" as logical name; normalize on ".lance"
let has_ext = name.ends_with(".lance");
if has_ext {
self.base_dir.join(name)
} else {
self.base_dir.join(format!("{name}.lance"))
}
}
fn file_uri(path: &Path) -> String {
// lancedb can use filesystem path directly; keep it simple
// Avoid file:// scheme since local paths are supported.
path.to_string_lossy().to_string()
}
async fn connect_db(&self) -> Result<Connection, DBError> {
let uri = Self::file_uri(&self.base_dir);
lancedb::connect(&uri)
.execute()
.await
.map_err(|e| DBError(format!("LanceDB connect failed at {}: {}", uri, e)))
}
fn vector_field(dim: i32) -> Field {
Field::new(
"vector",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), dim),
false,
)
}
async fn read_existing_dim(&self, name: &str) -> Result<Option<i32>, DBError> {
let path = self.dataset_path(name);
if !path.exists() {
return Ok(None);
}
let ds = Dataset::open(path.to_string_lossy().as_ref())
.await
.map_err(|e| DBError(format!("Open dataset failed: {}: {}", path.display(), e)))?;
// Scan a single batch to infer vector dimension from the 'vector' column type
let mut scan = ds.scan();
if let Err(e) = scan.project(&["vector"]) {
return Err(DBError(format!("Project failed while inferring dim: {}", e)));
}
let mut stream = scan
.try_into_stream()
.await
.map_err(|e| DBError(format!("Scan stream failed while inferring dim: {}", e)))?;
if let Some(batch_res) = stream.next().await {
let batch = batch_res.map_err(|e| DBError(format!("Batch error: {}", e)))?;
let vec_col = batch
.column_by_name("vector")
.ok_or_else(|| DBError("Column 'vector' missing".into()))?;
let fsl = vec_col.as_fixed_size_list();
let dim = fsl.value_length();
return Ok(Some(dim));
}
Ok(None)
}
fn build_schema(dim: i32) -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Self::vector_field(dim),
Field::new("text", DataType::Utf8, true),
Field::new("media_type", DataType::Utf8, true),
Field::new("media_uri", DataType::Utf8, true),
Field::new("meta", DataType::Utf8, true),
]))
}
fn build_one_row_batch(
id: &str,
vector: &[f32],
meta: &HashMap<String, String>,
text: Option<&str>,
media_type: Option<&str>,
media_uri: Option<&str>,
dim: i32,
) -> Result<(Arc<Schema>, RecordBatch), DBError> {
if vector.len() as i32 != dim {
return Err(DBError(format!(
"Vector length mismatch: expected {}, got {}",
dim,
vector.len()
)));
}
let schema = Self::build_schema(dim);
// id column
let mut id_builder = StringBuilder::new();
id_builder.append_value(id);
let id_arr = Arc::new(id_builder.finish()) as Arc<dyn Array>;
// vector column (FixedSizeList<Float32, dim>)
let v_builder = Float32Builder::with_capacity(vector.len());
let mut list_builder = FixedSizeListBuilder::new(v_builder, dim);
for v in vector {
list_builder.values().append_value(*v);
}
list_builder.append(true);
let vec_arr = Arc::new(list_builder.finish()) as Arc<dyn Array>;
// text column (optional)
let mut text_builder = StringBuilder::new();
if let Some(t) = text {
text_builder.append_value(t);
} else {
text_builder.append_null();
}
let text_arr = Arc::new(text_builder.finish()) as Arc<dyn Array>;
// media_type column (optional)
let mut mt_builder = StringBuilder::new();
if let Some(mt) = media_type {
mt_builder.append_value(mt);
} else {
mt_builder.append_null();
}
let mt_arr = Arc::new(mt_builder.finish()) as Arc<dyn Array>;
// media_uri column (optional)
let mut mu_builder = StringBuilder::new();
if let Some(mu) = media_uri {
mu_builder.append_value(mu);
} else {
mu_builder.append_null();
}
let mu_arr = Arc::new(mu_builder.finish()) as Arc<dyn Array>;
// meta column (JSON string)
let meta_json = if meta.is_empty() {
None
} else {
Some(serde_json::to_string(meta).map_err(|e| DBError(format!("Serialize meta error: {e}")))?)
};
let mut meta_builder = StringBuilder::new();
if let Some(s) = meta_json {
meta_builder.append_value(&s);
} else {
meta_builder.append_null();
}
let meta_arr = Arc::new(meta_builder.finish()) as Arc<dyn Array>;
let batch =
RecordBatch::try_new(schema.clone(), vec![id_arr, vec_arr, text_arr, mt_arr, mu_arr, meta_arr]).map_err(|e| {
DBError(format!("RecordBatch build failed: {e}"))
})?;
Ok((schema, batch))
}
// Create a new dataset (vector collection) with dimension `dim`.
pub async fn create_dataset(&self, name: &str, dim: usize) -> Result<(), DBError> {
let dim_i32: i32 = dim
.try_into()
.map_err(|_| DBError("Dimension too large".into()))?;
let path = self.dataset_path(name);
if path.exists() {
// Validate dimension if present
if let Some(existing_dim) = self.read_existing_dim(name).await? {
if existing_dim != dim_i32 {
return Err(DBError(format!(
"Dataset '{}' already exists with dim {}, requested {}",
name, existing_dim, dim_i32
)));
}
// No-op
return Ok(());
}
}
// Create an empty dataset by writing an empty batch
let schema = Self::build_schema(dim_i32);
let empty_id = Arc::new(StringArray::new_null(0));
// Build an empty FixedSizeListArray
let v_builder = Float32Builder::new();
let mut list_builder = FixedSizeListBuilder::new(v_builder, dim_i32);
let empty_vec = Arc::new(list_builder.finish()) as Arc<dyn Array>;
let empty_text = Arc::new(StringArray::new_null(0));
let empty_media_type = Arc::new(StringArray::new_null(0));
let empty_media_uri = Arc::new(StringArray::new_null(0));
let empty_meta = Arc::new(StringArray::new_null(0));
let empty_batch =
RecordBatch::try_new(schema.clone(), vec![empty_id, empty_vec, empty_text, empty_media_type, empty_media_uri, empty_meta])
.map_err(|e| DBError(format!("Build empty batch failed: {e}")))?;
let write_params = WriteParams {
mode: WriteMode::Create,
..Default::default()
};
let reader = RecordBatchIterator::new([Ok(empty_batch)], schema.clone());
Dataset::write(reader, path.to_string_lossy().as_ref(), Some(write_params))
.await
.map_err(|e| DBError(format!("Create dataset failed at {}: {}", path.display(), e)))?;
Ok(())
}
// Store/Upsert a single vector with ID and optional metadata (append; duplicate IDs are possible for now)
pub async fn store_vector(
&self,
name: &str,
id: &str,
vector: Vec<f32>,
meta: HashMap<String, String>,
text: Option<String>,
) -> Result<(), DBError> {
// Delegate to media-aware path with no media fields
self.store_vector_with_media(name, id, vector, meta, text, None, None).await
}
/// Store/Upsert a single vector with optional text and media fields (media_type/media_uri).
pub async fn store_vector_with_media(
&self,
name: &str,
id: &str,
vector: Vec<f32>,
meta: HashMap<String, String>,
text: Option<String>,
media_type: Option<String>,
media_uri: Option<String>,
) -> Result<(), DBError> {
let path = self.dataset_path(name);
// Determine dimension: use existing or infer from vector
let dim_i32 = if let Some(d) = self.read_existing_dim(name).await? {
d
} else {
vector
.len()
.try_into()
.map_err(|_| DBError("Vector length too large".into()))?
};
let (schema, batch) = Self::build_one_row_batch(
id,
&vector,
&meta,
text.as_deref(),
media_type.as_deref(),
media_uri.as_deref(),
dim_i32,
)?;
// If LanceDB table exists and provides delete, we can upsert by deleting same id
// Try best-effort delete; ignore errors to keep operation append-only on failure
if path.exists() {
if let Ok(conn) = self.connect_db().await {
if let Ok(mut tbl) = conn.open_table(name).execute().await {
let _ = tbl
.delete(&format!("id = '{}'", id.replace('\'', "''")))
.await;
}
}
}
let write_params = WriteParams {
mode: if path.exists() {
WriteMode::Append
} else {
WriteMode::Create
},
..Default::default()
};
let reader = RecordBatchIterator::new([Ok(batch)], schema.clone());
Dataset::write(reader, path.to_string_lossy().as_ref(), Some(write_params))
.await
.map_err(|e| DBError(format!("Write (append/create) failed: {}", e)))?;
Ok(())
}
// Delete a record by ID (best-effort; returns true if delete likely removed rows)
pub async fn delete_by_id(&self, name: &str, id: &str) -> Result<bool, DBError> {
let path = self.dataset_path(name);
if !path.exists() {
return Ok(false);
}
let conn = self.connect_db().await?;
let mut tbl = conn
.open_table(name)
.execute()
.await
.map_err(|e| DBError(format!("Open table '{}' failed: {}", name, e)))?;
// SQL-like predicate quoting
let pred = format!("id = '{}'", id.replace('\'', "''"));
// lancedb returns count or () depending on version; treat Ok as success
match tbl.delete(&pred).await {
Ok(_) => Ok(true),
Err(e) => Err(DBError(format!("Delete failed: {}", e))),
}
}
// Drop the entire dataset
pub async fn drop_dataset(&self, name: &str) -> Result<bool, DBError> {
let path = self.dataset_path(name);
// Try LanceDB drop first
// Best-effort logical drop via lancedb if available; ignore failures.
// Note: we rely on filesystem removal below for final cleanup.
if let Ok(conn) = self.connect_db().await {
if let Ok(mut t) = conn.open_table(name).execute().await {
// Best-effort delete-all to reduce footprint prior to fs removal
let _ = t.delete("true").await;
}
}
if path.exists() {
if path.is_dir() {
std::fs::remove_dir_all(&path)
.map_err(|e| DBError(format!("Failed to drop dataset '{}': {}", name, e)))?;
} else {
std::fs::remove_file(&path)
.map_err(|e| DBError(format!("Failed to drop dataset '{}': {}", name, e)))?;
}
return Ok(true);
}
Ok(false)
}
// Search top-k nearest with optional filter; returns tuple of (id, score (lower=L2), meta)
pub async fn search_vectors(
&self,
name: &str,
query: Vec<f32>,
k: usize,
filter: Option<String>,
return_fields: Option<Vec<String>>,
) -> Result<Vec<(String, f32, HashMap<String, String>)>, DBError> {
let path = self.dataset_path(name);
if !path.exists() {
return Err(DBError(format!("Dataset '{}' not found", name)));
}
// Determine dim and validate query length
let dim_i32 = self
.read_existing_dim(name)
.await?
.ok_or_else(|| DBError("Vector column not found".into()))?;
if query.len() as i32 != dim_i32 {
return Err(DBError(format!(
"Query vector length mismatch: expected {}, got {}",
dim_i32,
query.len()
)));
}
let ds = Dataset::open(path.to_string_lossy().as_ref())
.await
.map_err(|e| DBError(format!("Open dataset failed: {}", e)))?;
// Build scanner with projection; we project needed fields and filter client-side to support meta keys
let mut scan = ds.scan();
if let Err(e) = scan.project(&["id", "vector", "meta", "text", "media_type", "media_uri"]) {
return Err(DBError(format!("Project failed: {}", e)));
}
// Note: we no longer push down filter to Lance to allow filtering on meta fields client-side.
let mut stream = scan
.try_into_stream()
.await
.map_err(|e| DBError(format!("Scan stream failed: {}", e)))?;
// Parse simple equality clause from filter for client-side filtering (supports one `key = 'value'`)
let clause = filter.as_ref().and_then(|s| {
fn parse_eq(s: &str) -> Option<(String, String)> {
let s = s.trim();
let pos = s.find('=').or_else(|| s.find(" = "))?;
let (k, vraw) = s.split_at(pos);
let mut v = vraw.trim_start_matches('=').trim();
if (v.starts_with('\'') && v.ends_with('\'')) || (v.starts_with('"') && v.ends_with('"')) {
if v.len() >= 2 {
v = &v[1..v.len()-1];
}
}
let key = k.trim().trim_matches('"').trim_matches('\'').to_string();
if key.is_empty() { return None; }
Some((key, v.to_string()))
}
parse_eq(s)
});
// Maintain a max-heap with reverse ordering to keep top-k smallest distances
#[derive(Debug)]
struct Hit {
dist: f32,
id: String,
meta: HashMap<String, String>,
}
impl PartialEq for Hit {
fn eq(&self, other: &Self) -> bool {
self.dist.eq(&other.dist)
}
}
impl Eq for Hit {}
impl PartialOrd for Hit {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
// Reverse for max-heap: larger distance = "greater"
other.dist.partial_cmp(&self.dist)
}
}
impl Ord for Hit {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap_or(Ordering::Equal)
}
}
let mut heap: BinaryHeap<Hit> = BinaryHeap::with_capacity(k);
while let Some(batch_res) = stream.next().await {
let batch = batch_res.map_err(|e| DBError(format!("Stream batch error: {}", e)))?;
let id_arr = batch
.column_by_name("id")
.ok_or_else(|| DBError("Column 'id' missing".into()))?
.as_string::<i32>();
let vec_arr = batch
.column_by_name("vector")
.ok_or_else(|| DBError("Column 'vector' missing".into()))?
.as_fixed_size_list();
let meta_arr = batch
.column_by_name("meta")
.map(|a| a.as_string::<i32>().clone());
let text_arr = batch
.column_by_name("text")
.map(|a| a.as_string::<i32>().clone());
let mt_arr = batch
.column_by_name("media_type")
.map(|a| a.as_string::<i32>().clone());
let mu_arr = batch
.column_by_name("media_uri")
.map(|a| a.as_string::<i32>().clone());
for i in 0..batch.num_rows() {
// Extract id
let id_val = id_arr.value(i).to_string();
// Parse meta JSON if present
let mut meta: HashMap<String, String> = HashMap::new();
if let Some(meta_col) = &meta_arr {
if !meta_col.is_null(i) {
let s = meta_col.value(i);
if let Ok(JsonValue::Object(map)) = serde_json::from_str::<JsonValue>(s) {
for (k, v) in map {
if let Some(vs) = v.as_str() {
meta.insert(k, vs.to_string());
} else if v.is_number() || v.is_boolean() {
meta.insert(k, v.to_string());
}
}
}
}
}
// Evaluate simple equality filter if provided (supports one clause)
let passes = if let Some((ref key, ref val)) = clause {
let candidate = match key.as_str() {
"id" => Some(id_val.clone()),
"text" => text_arr.as_ref().and_then(|col| if col.is_null(i) { None } else { Some(col.value(i).to_string()) }),
"media_type" => mt_arr.as_ref().and_then(|col| if col.is_null(i) { None } else { Some(col.value(i).to_string()) }),
"media_uri" => mu_arr.as_ref().and_then(|col| if col.is_null(i) { None } else { Some(col.value(i).to_string()) }),
_ => meta.get(key).cloned(),
};
match candidate {
Some(cv) => cv == *val,
None => false,
}
} else { true };
if !passes {
continue;
}
// Compute L2 distance
let val = vec_arr.value(i);
let prim = val.as_primitive::<Float32Type>();
let mut dist: f32 = 0.0;
let plen = prim.len();
for j in 0..plen {
let r = prim.value(j);
let d = query[j] - r;
dist += d * d;
}
// Apply return_fields on meta
let mut meta_out = meta;
if let Some(fields) = &return_fields {
let mut filtered = HashMap::new();
for f in fields {
if let Some(val) = meta_out.get(f) {
filtered.insert(f.clone(), val.clone());
}
}
meta_out = filtered;
}
let hit = Hit { dist, id: id_val, meta: meta_out };
if heap.len() < k {
heap.push(hit);
} else if let Some(top) = heap.peek() {
if hit.dist < top.dist {
heap.pop();
heap.push(hit);
}
}
}
}
// Extract and sort ascending by distance
let mut hits: Vec<Hit> = heap.into_sorted_vec(); // already ascending by dist due to Ord
let out = hits
.drain(..)
.map(|h| (h.id, h.dist, h.meta))
.collect::<Vec<_>>();
Ok(out)
}
// Create an ANN index on the vector column (IVF_PQ or similar)
pub async fn create_index(
&self,
name: &str,
index_type: &str,
params: HashMap<String, String>,
) -> Result<(), DBError> {
let path = self.dataset_path(name);
if !path.exists() {
return Err(DBError(format!("Dataset '{}' not found", name)));
}
// Attempt to create a vector index using lance low-level API if available.
// Some crate versions hide IndexType; to ensure build stability, we fall back to a no-op if the API is not accessible.
let _ = (index_type, params); // currently unused; reserved for future tuning
// TODO: Implement using lance::Dataset::create_index when public API is stable across versions.
// For now, succeed as a no-op to keep flows working; search will operate as brute-force scan.
Ok(())
}
// List datasets (tables) under this DB (show user-level logical names without .lance)
pub async fn list_datasets(&self) -> Result<Vec<String>, DBError> {
let mut out = Vec::new();
if self.base_dir.exists() {
if let Ok(rd) = std::fs::read_dir(&self.base_dir) {
for entry in rd.flatten() {
let p = entry.path();
if let Some(name) = p.file_name().and_then(|s| s.to_str()) {
// Only list .lance datasets
if name.ends_with(".lance") {
out.push(name.trim_end_matches(".lance").to_string());
}
}
}
}
}
Ok(out)
}
// Return basic dataset info map
pub async fn get_dataset_info(&self, name: &str) -> Result<HashMap<String, String>, DBError> {
let path = self.dataset_path(name);
let mut m = HashMap::new();
m.insert("name".to_string(), name.to_string());
m.insert("path".to_string(), path.display().to_string());
if !path.exists() {
return Err(DBError(format!("Dataset '{}' not found", name)));
}
let ds = Dataset::open(path.to_string_lossy().as_ref())
.await
.map_err(|e| DBError(format!("Open dataset failed: {}", e)))?;
// dim: infer by scanning first batch
let mut dim_str = "unknown".to_string();
{
let mut scan = ds.scan();
if scan.project(&["vector"]).is_ok() {
if let Ok(mut stream) = scan.try_into_stream().await {
if let Some(batch_res) = stream.next().await {
if let Ok(batch) = batch_res {
if let Some(col) = batch.column_by_name("vector") {
let fsl = col.as_fixed_size_list();
dim_str = fsl.value_length().to_string();
}
}
}
}
}
}
m.insert("dimension".to_string(), dim_str);
// row_count (approximate by scanning)
let mut scan = ds.scan();
if let Err(e) = scan.project(&["id"]) {
return Err(DBError(format!("Project failed: {e}")));
}
let mut stream = scan
.try_into_stream()
.await
.map_err(|e| DBError(format!("Scan failed: {e}")))?;
let mut rows: usize = 0;
while let Some(batch_res) = stream.next().await {
let batch = batch_res.map_err(|e| DBError(format!("Scan batch error: {}", e)))?;
rows += batch.num_rows();
}
m.insert("row_count".to_string(), rows.to_string());
// indexes: we cant easily enumerate; set to "unknown" (future: read index metadata)
m.insert("indexes".to_string(), "unknown".to_string());
Ok(m)
}
}

View File

@@ -12,3 +12,7 @@ pub mod storage;
pub mod storage_trait;
pub mod storage_sled;
pub mod admin_meta;
pub mod tantivy_search;
pub mod search_cmd;
pub mod lance_store;
pub mod embedding;

View File

@@ -1,5 +1,6 @@
// #![allow(unused_imports)]
use std::path::PathBuf;
use tokio::net::TcpListener;
use herodb::server;
@@ -13,7 +14,7 @@ use clap::Parser;
struct Args {
/// The directory of Redis DB file
#[arg(long)]
dir: String,
dir: PathBuf,
/// The port of the Redis server, default is 6379 if not specified
#[arg(long)]

View File

@@ -1,12 +1,16 @@
use std::path::PathBuf;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BackendType {
Redb,
Sled,
Tantivy, // Full-text search backend (no KV storage)
Lance, // Vector database backend (no KV storage)
}
#[derive(Debug, Clone)]
pub struct DBOption {
pub dir: String,
pub dir: PathBuf,
pub port: u16,
pub debug: bool,
// Deprecated for data DBs; retained for backward-compat on CLI parsing

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +1,12 @@
use std::net::SocketAddr;
use std::path::PathBuf;
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use jsonrpsee::RpcModule;
use crate::rpc::{RpcServer, RpcServerImpl};
/// Start the RPC server on the specified address
pub async fn start_rpc_server(addr: SocketAddr, base_dir: String, backend: crate::options::BackendType, admin_secret: String) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> {
pub async fn start_rpc_server(addr: SocketAddr, base_dir: PathBuf, backend: crate::options::BackendType, admin_secret: String) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> {
// Create the RPC server implementation
let rpc_impl = RpcServerImpl::new(base_dir, backend, admin_secret);
@@ -34,7 +35,7 @@ mod tests {
#[tokio::test]
async fn test_rpc_server_startup() {
let addr = "127.0.0.1:0".parse().unwrap(); // Use port 0 for auto-assignment
let base_dir = "/tmp/test_rpc".to_string();
let base_dir = PathBuf::from("/tmp/test_rpc");
let backend = crate::options::BackendType::Redb; // Default for test
let handle = start_rpc_server(addr, base_dir, backend, "test-admin".to_string()).await.unwrap();

378
src/search_cmd.rs Normal file
View File

@@ -0,0 +1,378 @@
use crate::{
error::DBError,
protocol::Protocol,
server::Server,
tantivy_search::{
FieldDef, Filter, FilterType, IndexConfig, NumericType, SearchOptions, TantivySearch,
},
};
use std::collections::HashMap;
use std::sync::Arc;
pub async fn ft_create_cmd(
server: &Server,
index_name: String,
schema: Vec<(String, String, Vec<String>)>,
) -> Result<Protocol, DBError> {
if server.selected_db == 0 {
return Ok(Protocol::err("FT commands are not allowed on DB 0"));
}
// Enforce Tantivy backend for selected DB
let is_tantivy = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
server.selected_db,
)
.ok()
.flatten()
.map(|b| matches!(b, crate::options::BackendType::Tantivy))
.unwrap_or(false);
if !is_tantivy {
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
}
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
// Parse schema into field definitions
let mut field_definitions = Vec::new();
for (field_name, field_type, options) in schema {
let field_def = match field_type.to_uppercase().as_str() {
"TEXT" => {
let mut sortable = false;
let mut no_index = false;
// Weight is not used in current implementation
let mut _weight = 1.0f32;
let mut i = 0;
while i < options.len() {
match options[i].to_uppercase().as_str() {
"WEIGHT" => {
if i + 1 < options.len() {
_weight = options[i + 1].parse::<f32>().unwrap_or(1.0);
i += 2;
continue;
}
}
"SORTABLE" => {
sortable = true;
}
"NOINDEX" => {
no_index = true;
}
_ => {}
}
i += 1;
}
FieldDef::Text {
stored: true,
indexed: !no_index,
tokenized: true,
fast: sortable,
}
}
"NUMERIC" => {
// default to F64
let mut sortable = false;
for opt in &options {
if opt.to_uppercase() == "SORTABLE" {
sortable = true;
}
}
FieldDef::Numeric {
stored: true,
indexed: true,
fast: sortable,
precision: NumericType::F64,
}
}
"TAG" => {
let mut separator = ",".to_string();
let mut case_sensitive = false;
let mut i = 0;
while i < options.len() {
match options[i].to_uppercase().as_str() {
"SEPARATOR" => {
if i + 1 < options.len() {
separator = options[i + 1].clone();
i += 2;
continue;
}
}
"CASESENSITIVE" => {
case_sensitive = true;
}
_ => {}
}
i += 1;
}
FieldDef::Tag {
stored: true,
separator,
case_sensitive,
}
}
"GEO" => FieldDef::Geo { stored: true },
_ => {
return Err(DBError(format!("Unknown field type: {}", field_type)));
}
};
field_definitions.push((field_name, field_def));
}
// Create the search index
let search_path = server.search_index_path();
let config = IndexConfig::default();
let search_index = TantivySearch::new_with_schema(
search_path,
index_name.clone(),
field_definitions,
Some(config),
)?;
// Store in registry
let mut indexes = server.search_indexes.write().unwrap();
indexes.insert(index_name, Arc::new(search_index));
Ok(Protocol::SimpleString("OK".to_string()))
}
pub async fn ft_add_cmd(
server: &Server,
index_name: String,
doc_id: String,
_score: f64,
fields: HashMap<String, String>,
) -> Result<Protocol, DBError> {
if server.selected_db == 0 {
return Ok(Protocol::err("FT commands are not allowed on DB 0"));
}
// Enforce Tantivy backend for selected DB
let is_tantivy = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
server.selected_db,
)
.ok()
.flatten()
.map(|b| matches!(b, crate::options::BackendType::Tantivy))
.unwrap_or(false);
if !is_tantivy {
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
}
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
let indexes = server.search_indexes.read().unwrap();
let search_index = indexes
.get(&index_name)
.ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
search_index.add_document_with_fields(&doc_id, fields)?;
Ok(Protocol::SimpleString("OK".to_string()))
}
pub async fn ft_search_cmd(
server: &Server,
index_name: String,
query: String,
filters: Vec<(String, String)>,
limit: Option<usize>,
offset: Option<usize>,
return_fields: Option<Vec<String>>,
) -> Result<Protocol, DBError> {
if server.selected_db == 0 {
return Ok(Protocol::err("FT commands are not allowed on DB 0"));
}
// Enforce Tantivy backend for selected DB
let is_tantivy = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
server.selected_db,
)
.ok()
.flatten()
.map(|b| matches!(b, crate::options::BackendType::Tantivy))
.unwrap_or(false);
if !is_tantivy {
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
}
if !server.has_read_permission() {
return Ok(Protocol::err("ERR read permission denied"));
}
let indexes = server.search_indexes.read().unwrap();
let search_index = indexes
.get(&index_name)
.ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
let search_filters = filters
.into_iter()
.map(|(field, value)| Filter {
field,
filter_type: FilterType::Equals(value),
})
.collect();
let options = SearchOptions {
limit: limit.unwrap_or(10),
offset: offset.unwrap_or(0),
filters: search_filters,
sort_by: None,
return_fields,
highlight: false,
};
let results = search_index.search_with_options(&query, options)?;
// Format results as a flattened Redis protocol array to match client expectations:
// [ total, doc_id, score, field, value, field, value, ... , doc_id, score, ... ]
let mut response = Vec::new();
// First element is the total count
response.push(Protocol::BulkString(results.total.to_string()));
// Then each document flattened
for mut doc in results.documents {
// Add document ID if it exists
if let Some(id) = doc.fields.get("_id") {
response.push(Protocol::BulkString(id.clone()));
}
// Add score
response.push(Protocol::BulkString(doc.score.to_string()));
// Add fields as key-value pairs
for (field_name, field_value) in std::mem::take(&mut doc.fields) {
if field_name != "_id" {
response.push(Protocol::BulkString(field_name));
response.push(Protocol::BulkString(field_value));
}
}
}
Ok(Protocol::Array(response))
}
pub async fn ft_del_cmd(
server: &Server,
index_name: String,
doc_id: String,
) -> Result<Protocol, DBError> {
if server.selected_db == 0 {
return Ok(Protocol::err("FT commands are not allowed on DB 0"));
}
// Enforce Tantivy backend for selected DB
let is_tantivy = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
server.selected_db,
)
.ok()
.flatten()
.map(|b| matches!(b, crate::options::BackendType::Tantivy))
.unwrap_or(false);
if !is_tantivy {
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
}
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
let indexes = server.search_indexes.read().unwrap();
let search_index = indexes
.get(&index_name)
.ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
let existed = search_index.delete_document_by_id(&doc_id)?;
Ok(Protocol::SimpleString(if existed { "1".to_string() } else { "0".to_string() }))
}
pub async fn ft_info_cmd(server: &Server, index_name: String) -> Result<Protocol, DBError> {
if server.selected_db == 0 {
return Ok(Protocol::err("FT commands are not allowed on DB 0"));
}
// Enforce Tantivy backend for selected DB
let is_tantivy = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
server.selected_db,
)
.ok()
.flatten()
.map(|b| matches!(b, crate::options::BackendType::Tantivy))
.unwrap_or(false);
if !is_tantivy {
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
}
if !server.has_read_permission() {
return Ok(Protocol::err("ERR read permission denied"));
}
let indexes = server.search_indexes.read().unwrap();
let search_index = indexes
.get(&index_name)
.ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
let info = search_index.get_info()?;
// Format info as Redis protocol
let mut response = Vec::new();
response.push(Protocol::BulkString("index_name".to_string()));
response.push(Protocol::BulkString(info.name));
response.push(Protocol::BulkString("num_docs".to_string()));
response.push(Protocol::BulkString(info.num_docs.to_string()));
response.push(Protocol::BulkString("num_fields".to_string()));
response.push(Protocol::BulkString(info.fields.len().to_string()));
response.push(Protocol::BulkString("fields".to_string()));
let fields_str = info
.fields
.iter()
.map(|f| format!("{}:{}", f.name, f.field_type))
.collect::<Vec<_>>()
.join(", ");
response.push(Protocol::BulkString(fields_str));
Ok(Protocol::Array(response))
}
pub async fn ft_drop_cmd(server: &Server, index_name: String) -> Result<Protocol, DBError> {
if server.selected_db == 0 {
return Ok(Protocol::err("FT commands are not allowed on DB 0"));
}
// Enforce Tantivy backend for selected DB
let is_tantivy = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
server.selected_db,
)
.ok()
.flatten()
.map(|b| matches!(b, crate::options::BackendType::Tantivy))
.unwrap_or(false);
if !is_tantivy {
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
}
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
// Remove from registry and files; report error if nothing to drop
let mut existed = false;
{
let mut indexes = server.search_indexes.write().unwrap();
if indexes.remove(&index_name).is_some() {
existed = true;
}
}
// Remove the index files from disk
let index_path = server.search_index_path().join(&index_name);
if index_path.exists() {
std::fs::remove_dir_all(&index_path)
.map_err(|e| DBError(format!("Failed to remove index files: {}", e)))?;
existed = true;
}
if !existed {
return Ok(Protocol::err(&format!("ERR Index '{}' not found", index_name)));
}
Ok(Protocol::SimpleString("OK".to_string()))
}

View File

@@ -14,6 +14,15 @@ use crate::protocol::Protocol;
use crate::storage_trait::StorageBackend;
use crate::admin_meta;
// Embeddings: config and cache
use crate::embedding::{EmbeddingConfig, create_embedder, Embedder, create_image_embedder, ImageEmbedder};
use serde_json;
use ureq::{Agent, AgentBuilder};
use std::time::Duration;
use std::io::Read;
const NO_DB_SELECTED: u64 = u64::MAX;
#[derive(Clone)]
pub struct Server {
pub db_cache: std::sync::Arc<std::sync::RwLock<HashMap<u64, Arc<dyn StorageBackend>>>>,
@@ -23,6 +32,18 @@ pub struct Server {
pub queued_cmd: Option<Vec<(Cmd, Protocol)>>,
pub current_permissions: Option<crate::rpc::Permissions>,
// In-memory registry of Tantivy search indexes for this server
pub search_indexes: Arc<std::sync::RwLock<HashMap<String, Arc<crate::tantivy_search::TantivySearch>>>>,
// Per-DB Lance stores (vector DB), keyed by db_id
pub lance_stores: Arc<std::sync::RwLock<HashMap<u64, Arc<crate::lance_store::LanceStore>>>>,
// Per-(db_id, dataset) embedder cache (text)
pub embedders: Arc<std::sync::RwLock<HashMap<(u64, String), Arc<dyn Embedder>>>>,
// Per-(db_id, dataset) image embedder cache (image)
pub image_embedders: Arc<std::sync::RwLock<HashMap<(u64, String), Arc<dyn ImageEmbedder>>>>,
// BLPOP waiter registry: per (db_index, key) FIFO of waiters
pub list_waiters: Arc<Mutex<HashMap<u64, HashMap<String, Vec<Waiter>>>>>,
pub waiter_seq: Arc<AtomicU64>,
@@ -46,16 +67,55 @@ impl Server {
db_cache: Arc::new(std::sync::RwLock::new(HashMap::new())),
option,
client_name: None,
selected_db: 0,
selected_db: NO_DB_SELECTED,
queued_cmd: None,
current_permissions: None,
search_indexes: Arc::new(std::sync::RwLock::new(HashMap::new())),
lance_stores: Arc::new(std::sync::RwLock::new(HashMap::new())),
embedders: Arc::new(std::sync::RwLock::new(HashMap::new())),
image_embedders: Arc::new(std::sync::RwLock::new(HashMap::new())),
list_waiters: Arc::new(Mutex::new(HashMap::new())),
waiter_seq: Arc::new(AtomicU64::new(1)),
}
}
// Path where search indexes are stored, namespaced per selected DB:
// <base_dir>/search_indexes/<db_id>
pub fn search_index_path(&self) -> std::path::PathBuf {
let base = std::path::PathBuf::from(&self.option.dir)
.join("search_indexes")
.join(self.selected_db.to_string());
if !base.exists() {
let _ = std::fs::create_dir_all(&base);
}
base
}
// Path where Lance datasets are stored, namespaced per selected DB:
// <base_dir>/lance/<db_id>
pub fn lance_data_path(&self) -> std::path::PathBuf {
let base = std::path::PathBuf::from(&self.option.dir)
.join("lance")
.join(self.selected_db.to_string());
if !base.exists() {
let _ = std::fs::create_dir_all(&base);
}
base
}
pub fn current_storage(&self) -> Result<Arc<dyn StorageBackend>, DBError> {
// Require explicit SELECT before any storage access
if self.selected_db == NO_DB_SELECTED {
return Err(DBError("No database selected. Use SELECT <id> [KEY <key>] first".to_string()));
}
// Admin DB 0 access must be authenticated with SELECT 0 KEY <admin_secret>
if self.selected_db == 0 {
if !matches!(self.current_permissions, Some(crate::rpc::Permissions::ReadWrite)) {
return Err(DBError("Admin DB 0 requires SELECT 0 KEY <admin_secret>".to_string()));
}
}
let mut cache = self.db_cache.write().unwrap();
if let Some(storage) = cache.get(&self.selected_db) {
@@ -83,16 +143,246 @@ impl Server {
cache.insert(self.selected_db, storage.clone());
Ok(storage)
}
/// Get or create the LanceStore for the currently selected DB.
/// Only valid for non-zero DBs and when the backend is Lance.
pub fn lance_store(&self) -> Result<Arc<crate::lance_store::LanceStore>, DBError> {
if self.selected_db == 0 {
return Err(DBError("Lance not available on admin DB 0".to_string()));
}
// Resolve backend for selected_db
let backend_opt = crate::admin_meta::get_database_backend(
&self.option.dir,
self.option.backend.clone(),
&self.option.admin_secret,
self.selected_db,
)
.ok()
.flatten();
if !matches!(backend_opt, Some(crate::options::BackendType::Lance)) {
return Err(DBError("ERR DB backend is not Lance; LANCE.* commands are not allowed".to_string()));
}
// Fast path: read lock
{
let map = self.lance_stores.read().unwrap();
if let Some(store) = map.get(&self.selected_db) {
return Ok(store.clone());
}
}
// Slow path: create and insert
let store = Arc::new(crate::lance_store::LanceStore::new(&self.option.dir, self.selected_db)?);
{
let mut map = self.lance_stores.write().unwrap();
map.insert(self.selected_db, store.clone());
}
Ok(store)
}
// ----- Embedding configuration and resolution -----
// Sidecar embedding config path: <base_dir>/lance/<db_id>/<dataset>.lance.embedding.json
fn dataset_embedding_config_path(&self, dataset: &str) -> std::path::PathBuf {
let mut base = self.lance_data_path();
// Ensure parent dir exists
if !base.exists() {
let _ = std::fs::create_dir_all(&base);
}
base.push(format!("{}.lance.embedding.json", dataset));
base
}
/// Persist per-dataset embedding config as JSON sidecar.
pub fn set_dataset_embedding_config(&self, dataset: &str, cfg: &EmbeddingConfig) -> Result<(), DBError> {
if self.selected_db == 0 {
return Err(DBError("Lance not available on admin DB 0".to_string()));
}
let p = self.dataset_embedding_config_path(dataset);
let data = serde_json::to_vec_pretty(cfg)
.map_err(|e| DBError(format!("Failed to serialize embedding config: {}", e)))?;
std::fs::write(&p, data)
.map_err(|e| DBError(format!("Failed to write embedding config {}: {}", p.display(), e)))?;
// Invalidate embedder cache entry for this dataset
{
let mut map = self.embedders.write().unwrap();
map.remove(&(self.selected_db, dataset.to_string()));
}
{
let mut map_img = self.image_embedders.write().unwrap();
map_img.remove(&(self.selected_db, dataset.to_string()));
}
Ok(())
}
/// Load per-dataset embedding config.
pub fn get_dataset_embedding_config(&self, dataset: &str) -> Result<EmbeddingConfig, DBError> {
if self.selected_db == 0 {
return Err(DBError("Lance not available on admin DB 0".to_string()));
}
let p = self.dataset_embedding_config_path(dataset);
if !p.exists() {
return Err(DBError(format!(
"Embedding config not set for dataset '{}'. Use LANCE.EMBEDDING CONFIG SET ... or RPC to configure.",
dataset
)));
}
let data = std::fs::read(&p)
.map_err(|e| DBError(format!("Failed to read embedding config {}: {}", p.display(), e)))?;
let cfg: EmbeddingConfig = serde_json::from_slice(&data)
.map_err(|e| DBError(format!("Failed to parse embedding config {}: {}", p.display(), e)))?;
Ok(cfg)
}
/// Resolve or build an embedder for (db_id, dataset). Caches instance.
pub fn get_embedder_for(&self, dataset: &str) -> Result<Arc<dyn Embedder>, DBError> {
if self.selected_db == 0 {
return Err(DBError("Lance not available on admin DB 0".to_string()));
}
// Fast path
{
let map = self.embedders.read().unwrap();
if let Some(e) = map.get(&(self.selected_db, dataset.to_string())) {
return Ok(e.clone());
}
}
// Load config and instantiate
let cfg = self.get_dataset_embedding_config(dataset)?;
let emb = create_embedder(&cfg)?;
{
let mut map = self.embedders.write().unwrap();
map.insert((self.selected_db, dataset.to_string()), emb.clone());
}
Ok(emb)
}
/// Resolve or build an IMAGE embedder for (db_id, dataset). Caches instance.
pub fn get_image_embedder_for(&self, dataset: &str) -> Result<Arc<dyn ImageEmbedder>, DBError> {
if self.selected_db == 0 {
return Err(DBError("Lance not available on admin DB 0".to_string()));
}
// Fast path
{
let map = self.image_embedders.read().unwrap();
if let Some(e) = map.get(&(self.selected_db, dataset.to_string())) {
return Ok(e.clone());
}
}
// Load config and instantiate
let cfg = self.get_dataset_embedding_config(dataset)?;
let emb = create_image_embedder(&cfg)?;
{
let mut map = self.image_embedders.write().unwrap();
map.insert((self.selected_db, dataset.to_string()), emb.clone());
}
Ok(emb)
}
/// Download image bytes from a URI with safety checks (size, timeout, content-type, optional host allowlist).
/// Env overrides:
/// - HERODB_IMAGE_MAX_BYTES (u64, default 10485760)
/// - HERODB_IMAGE_FETCH_TIMEOUT_SECS (u64, default 30)
/// - HERODB_IMAGE_ALLOWED_HOSTS (comma-separated, optional)
pub fn fetch_image_bytes_from_uri(&self, uri: &str) -> Result<Vec<u8>, DBError> {
// Basic scheme validation
if !(uri.starts_with("http://") || uri.starts_with("https://")) {
return Err(DBError("Only http(s) URIs are supported for image fetch".into()));
}
// Parse host (naive) for allowlist check
let host = {
let after_scheme = match uri.find("://") {
Some(i) => &uri[i + 3..],
None => uri,
};
let end = after_scheme.find('/').unwrap_or(after_scheme.len());
let host_port = &after_scheme[..end];
host_port.split('@').last().unwrap_or(host_port).split(':').next().unwrap_or(host_port).to_string()
};
let max_bytes: u64 = std::env::var("HERODB_IMAGE_MAX_BYTES").ok().and_then(|s| s.parse::<u64>().ok()).unwrap_or(10 * 1024 * 1024);
let timeout_secs: u64 = std::env::var("HERODB_IMAGE_FETCH_TIMEOUT_SECS").ok().and_then(|s| s.parse::<u64>().ok()).unwrap_or(30);
let allowed_hosts_env = std::env::var("HERODB_IMAGE_ALLOWED_HOSTS").ok();
if let Some(allow) = allowed_hosts_env {
if !allow.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).any(|h| h.eq_ignore_ascii_case(&host)) {
return Err(DBError(format!("Host '{}' not allowed for image fetch (HERODB_IMAGE_ALLOWED_HOSTS)", host)));
}
}
let agent: Agent = AgentBuilder::new()
.timeout_read(Duration::from_secs(timeout_secs))
.timeout_write(Duration::from_secs(timeout_secs))
.build();
let resp = agent.get(uri).call().map_err(|e| DBError(format!("HTTP GET failed: {}", e)))?;
// Validate content-type
let ctype = resp.header("Content-Type").unwrap_or("");
let ctype_main = ctype.split(';').next().unwrap_or("").trim().to_ascii_lowercase();
if !ctype_main.starts_with("image/") {
return Err(DBError(format!("Remote content-type '{}' is not image/*", ctype)));
}
// Read with cap
let mut reader = resp.into_reader();
let mut buf: Vec<u8> = Vec::with_capacity(8192);
let mut tmp = [0u8; 8192];
let mut total: u64 = 0;
loop {
let n = reader.read(&mut tmp).map_err(|e| DBError(format!("Read error: {}", e)))?;
if n == 0 { break; }
total += n as u64;
if total > max_bytes {
return Err(DBError(format!("Image exceeds max allowed bytes {}", max_bytes)));
}
buf.extend_from_slice(&tmp[..n]);
}
Ok(buf)
}
/// Check if current permissions allow read operations
pub fn has_read_permission(&self) -> bool {
matches!(self.current_permissions, Some(crate::rpc::Permissions::Read) | Some(crate::rpc::Permissions::ReadWrite))
// No DB selected -> no permissions
if self.selected_db == NO_DB_SELECTED {
return false;
}
// If an explicit permission is set for this connection, honor it.
if let Some(perms) = self.current_permissions.as_ref() {
return matches!(*perms, crate::rpc::Permissions::Read | crate::rpc::Permissions::ReadWrite);
}
// Fallback ONLY when no explicit permission context (e.g., JSON-RPC flows without SELECT).
match crate::admin_meta::verify_access(
&self.option.dir,
self.option.backend.clone(),
&self.option.admin_secret,
self.selected_db,
None,
) {
Ok(Some(crate::rpc::Permissions::Read)) | Ok(Some(crate::rpc::Permissions::ReadWrite)) => true,
_ => false,
}
}
/// Check if current permissions allow write operations
pub fn has_write_permission(&self) -> bool {
matches!(self.current_permissions, Some(crate::rpc::Permissions::ReadWrite))
// No DB selected -> no permissions
if self.selected_db == NO_DB_SELECTED {
return false;
}
// If an explicit permission is set for this connection, honor it.
if let Some(perms) = self.current_permissions.as_ref() {
return matches!(*perms, crate::rpc::Permissions::ReadWrite);
}
// Fallback ONLY when no explicit permission context (e.g., JSON-RPC flows without SELECT).
match crate::admin_meta::verify_access(
&self.option.dir,
self.option.backend.clone(),
&self.option.admin_secret,
self.selected_db,
None,
) {
Ok(Some(crate::rpc::Permissions::ReadWrite)) => true,
_ => false,
}
}
// ----- BLPOP waiter helpers -----

709
src/tantivy_search.rs Normal file
View File

@@ -0,0 +1,709 @@
use crate::error::DBError;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tantivy::{
collector::TopDocs,
directory::MmapDirectory,
query::{BooleanQuery, Occur, Query, QueryParser, TermQuery},
schema::{
DateOptions, Field, IndexRecordOption, NumericOptions, Schema, TextFieldIndexing, TextOptions, STORED, STRING,
},
tokenizer::TokenizerManager,
DateTime, Index, IndexReader, IndexWriter, TantivyDocument, Term,
};
use tantivy::schema::Value;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FieldDef {
Text {
stored: bool,
indexed: bool,
tokenized: bool,
fast: bool,
},
Numeric {
stored: bool,
indexed: bool,
fast: bool,
precision: NumericType,
},
Tag {
stored: bool,
separator: String,
case_sensitive: bool,
},
Geo {
stored: bool,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NumericType {
I64,
U64,
F64,
Date,
}
pub struct IndexSchema {
schema: Schema,
fields: HashMap<String, (Field, FieldDef)>,
default_search_fields: Vec<Field>,
}
pub struct TantivySearch {
index: Index,
writer: Arc<RwLock<IndexWriter>>,
reader: IndexReader,
index_schema: IndexSchema,
name: String,
config: IndexConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexConfig {
pub language: String,
pub stopwords: Vec<String>,
pub stemming: bool,
pub max_doc_count: Option<usize>,
pub default_score: f64,
}
impl Default for IndexConfig {
fn default() -> Self {
IndexConfig {
language: "english".to_string(),
stopwords: vec![],
stemming: true,
max_doc_count: None,
default_score: 1.0,
}
}
}
impl TantivySearch {
pub fn new_with_schema(
base_path: PathBuf,
name: String,
field_definitions: Vec<(String, FieldDef)>,
config: Option<IndexConfig>,
) -> Result<Self, DBError> {
let index_path = base_path.join(&name);
std::fs::create_dir_all(&index_path)
.map_err(|e| DBError(format!("Failed to create index dir: {}", e)))?;
// Build schema from field definitions
let mut schema_builder = Schema::builder();
let mut fields = HashMap::new();
let mut default_search_fields = Vec::new();
// Always add a document ID field
let id_field = schema_builder.add_text_field("_id", STRING | STORED);
fields.insert(
"_id".to_string(),
(
id_field,
FieldDef::Text {
stored: true,
indexed: true,
tokenized: false,
fast: false,
},
),
);
// Add user-defined fields
for (field_name, field_def) in field_definitions {
let field = match &field_def {
FieldDef::Text {
stored,
indexed,
tokenized,
fast: _fast,
} => {
let mut text_options = TextOptions::default();
if *stored {
text_options = text_options.set_stored();
}
if *indexed {
let indexing_options = if *tokenized {
TextFieldIndexing::default()
.set_tokenizer("default")
.set_index_option(IndexRecordOption::WithFreqsAndPositions)
} else {
TextFieldIndexing::default()
.set_tokenizer("raw")
.set_index_option(IndexRecordOption::Basic)
};
text_options = text_options.set_indexing_options(indexing_options);
let f = schema_builder.add_text_field(&field_name, text_options);
if *tokenized {
default_search_fields.push(f);
}
f
} else {
schema_builder.add_text_field(&field_name, text_options)
}
}
FieldDef::Numeric {
stored,
indexed,
fast,
precision,
} => match precision {
NumericType::I64 => {
let mut opts = NumericOptions::default();
if *stored {
opts = opts.set_stored();
}
if *indexed {
opts = opts.set_indexed();
}
if *fast {
opts = opts.set_fast();
}
schema_builder.add_i64_field(&field_name, opts)
}
NumericType::U64 => {
let mut opts = NumericOptions::default();
if *stored {
opts = opts.set_stored();
}
if *indexed {
opts = opts.set_indexed();
}
if *fast {
opts = opts.set_fast();
}
schema_builder.add_u64_field(&field_name, opts)
}
NumericType::F64 => {
let mut opts = NumericOptions::default();
if *stored {
opts = opts.set_stored();
}
if *indexed {
opts = opts.set_indexed();
}
if *fast {
opts = opts.set_fast();
}
schema_builder.add_f64_field(&field_name, opts)
}
NumericType::Date => {
let mut opts = DateOptions::default();
if *stored {
opts = opts.set_stored();
}
if *indexed {
opts = opts.set_indexed();
}
if *fast {
opts = opts.set_fast();
}
schema_builder.add_date_field(&field_name, opts)
}
},
FieldDef::Tag {
stored,
separator: _,
case_sensitive: _,
} => {
let mut text_options = TextOptions::default();
if *stored {
text_options = text_options.set_stored();
}
text_options = text_options.set_indexing_options(
TextFieldIndexing::default()
.set_tokenizer("raw")
.set_index_option(IndexRecordOption::Basic),
);
schema_builder.add_text_field(&field_name, text_options)
}
FieldDef::Geo { stored } => {
// For now, store as two f64 fields for lat/lon
let mut opts = NumericOptions::default();
if *stored {
opts = opts.set_stored();
}
opts = opts.set_indexed().set_fast();
let lat_field =
schema_builder.add_f64_field(&format!("{}_lat", field_name), opts.clone());
let lon_field =
schema_builder.add_f64_field(&format!("{}_lon", field_name), opts);
fields.insert(
format!("{}_lat", field_name),
(
lat_field,
FieldDef::Numeric {
stored: *stored,
indexed: true,
fast: true,
precision: NumericType::F64,
},
),
);
fields.insert(
format!("{}_lon", field_name),
(
lon_field,
FieldDef::Numeric {
stored: *stored,
indexed: true,
fast: true,
precision: NumericType::F64,
},
),
);
continue; // Skip adding the geo field itself
}
};
fields.insert(field_name.clone(), (field, field_def));
}
let schema = schema_builder.build();
let index_schema = IndexSchema {
schema: schema.clone(),
fields,
default_search_fields,
};
// Create or open index
let dir = MmapDirectory::open(&index_path)
.map_err(|e| DBError(format!("Failed to open index directory: {}", e)))?;
let mut index =
Index::open_or_create(dir, schema).map_err(|e| DBError(format!("Failed to create index: {}", e)))?;
// Configure tokenizers
let tokenizer_manager = TokenizerManager::default();
index.set_tokenizers(tokenizer_manager);
let writer = index
.writer(15_000_000)
.map_err(|e| DBError(format!("Failed to create index writer: {}", e)))?;
let reader = index
.reader()
.map_err(|e| DBError(format!("Failed to create reader: {}", e)))?;
let config = config.unwrap_or_default();
Ok(TantivySearch {
index,
writer: Arc::new(RwLock::new(writer)),
reader,
index_schema,
name,
config,
})
}
pub fn add_document_with_fields(
&self,
doc_id: &str,
fields: HashMap<String, String>,
) -> Result<(), DBError> {
let mut writer = self
.writer
.write()
.map_err(|e| DBError(format!("Failed to acquire writer lock: {}", e)))?;
// Delete existing document with same ID
if let Some((id_field, _)) = self.index_schema.fields.get("_id") {
writer.delete_term(Term::from_field_text(*id_field, doc_id));
}
// Create new document
let mut doc = tantivy::doc!();
// Add document ID
if let Some((id_field, _)) = self.index_schema.fields.get("_id") {
doc.add_text(*id_field, doc_id);
}
// Add other fields based on schema
for (field_name, field_value) in fields {
if let Some((field, field_def)) = self.index_schema.fields.get(&field_name) {
match field_def {
FieldDef::Text { .. } => {
doc.add_text(*field, &field_value);
}
FieldDef::Numeric { precision, .. } => match precision {
NumericType::I64 => {
if let Ok(v) = field_value.parse::<i64>() {
doc.add_i64(*field, v);
}
}
NumericType::U64 => {
if let Ok(v) = field_value.parse::<u64>() {
doc.add_u64(*field, v);
}
}
NumericType::F64 => {
if let Ok(v) = field_value.parse::<f64>() {
doc.add_f64(*field, v);
}
}
NumericType::Date => {
if let Ok(v) = field_value.parse::<i64>() {
doc.add_date(*field, DateTime::from_timestamp_millis(v));
}
}
},
FieldDef::Tag {
separator,
case_sensitive,
..
} => {
let tags = if !case_sensitive {
field_value.to_lowercase()
} else {
field_value.clone()
};
for tag in tags.split(separator.as_str()) {
doc.add_text(*field, tag.trim());
}
}
FieldDef::Geo { .. } => {
let parts: Vec<&str> = field_value.split(',').collect();
if parts.len() == 2 {
if let (Ok(lat), Ok(lon)) =
(parts[0].parse::<f64>(), parts[1].parse::<f64>())
{
if let Some((lat_field, _)) =
self.index_schema.fields.get(&format!("{}_lat", field_name))
{
doc.add_f64(*lat_field, lat);
}
if let Some((lon_field, _)) =
self.index_schema.fields.get(&format!("{}_lon", field_name))
{
doc.add_f64(*lon_field, lon);
}
}
}
}
}
}
}
writer
.add_document(doc)
.map_err(|e| DBError(format!("Failed to add document: {}", e)))?;
writer
.commit()
.map_err(|e| DBError(format!("Failed to commit: {}", e)))?;
// Make new documents visible to searches
self.reader
.reload()
.map_err(|e| DBError(format!("Failed to reload reader: {}", e)))?;
Ok(())
}
pub fn search_with_options(
&self,
query_str: &str,
options: SearchOptions,
) -> Result<SearchResults, DBError> {
// Ensure reader is up to date with latest commits
self.reader
.reload()
.map_err(|e| DBError(format!("Failed to reload reader: {}", e)))?;
let searcher = self.reader.searcher();
// Ensure we have searchable fields
if self.index_schema.default_search_fields.is_empty() {
return Err(DBError("No searchable fields defined in schema".to_string()));
}
// Parse query based on search fields
let query_parser = QueryParser::for_index(
&self.index,
self.index_schema.default_search_fields.clone(),
);
let parsed_query = query_parser
.parse_query(query_str)
.map_err(|e| DBError(format!("Failed to parse query: {}", e)))?;
let mut clauses: Vec<(Occur, Box<dyn Query>)> = vec![(Occur::Must, parsed_query)];
// Apply filters if any
for filter in options.filters {
if let Some((field, field_def)) = self.index_schema.fields.get(&filter.field) {
match filter.filter_type {
FilterType::Equals(value) => {
match field_def {
FieldDef::Text { .. } | FieldDef::Tag { .. } => {
let term_query =
TermQuery::new(Term::from_field_text(*field, &value), IndexRecordOption::Basic);
clauses.push((Occur::Must, Box::new(term_query)));
}
FieldDef::Numeric { precision, .. } => {
// Equals on numeric fields: parse to the right numeric type and use term query
match precision {
NumericType::I64 => {
if let Ok(v) = value.parse::<i64>() {
let term = Term::from_field_i64(*field, v);
let tq = TermQuery::new(term, IndexRecordOption::Basic);
clauses.push((Occur::Must, Box::new(tq)));
}
}
NumericType::U64 => {
if let Ok(v) = value.parse::<u64>() {
let term = Term::from_field_u64(*field, v);
let tq = TermQuery::new(term, IndexRecordOption::Basic);
clauses.push((Occur::Must, Box::new(tq)));
}
}
NumericType::F64 => {
if let Ok(v) = value.parse::<f64>() {
let term = Term::from_field_f64(*field, v);
let tq = TermQuery::new(term, IndexRecordOption::Basic);
clauses.push((Occur::Must, Box::new(tq)));
}
}
NumericType::Date => {
if let Ok(v) = value.parse::<i64>() {
let dt = DateTime::from_timestamp_millis(v);
let term = Term::from_field_date(*field, dt);
let tq = TermQuery::new(term, IndexRecordOption::Basic);
clauses.push((Occur::Must, Box::new(tq)));
}
}
}
}
FieldDef::Geo { .. } => {
// Geo equals isn't supported in this simplified version
}
}
}
FilterType::Range { .. } => {
// TODO: Implement numeric range queries by building a RangeQuery per type
}
FilterType::InSet(values) => {
// OR across values
let mut sub_clauses: Vec<(Occur, Box<dyn Query>)> = vec![];
for value in values {
let term_query = TermQuery::new(
Term::from_field_text(*field, &value),
IndexRecordOption::Basic,
);
sub_clauses.push((Occur::Should, Box::new(term_query)));
}
clauses.push((Occur::Must, Box::new(BooleanQuery::new(sub_clauses))));
}
}
}
}
let final_query: Box<dyn Query> = if clauses.len() == 1 {
clauses.pop().unwrap().1
} else {
Box::new(BooleanQuery::new(clauses))
};
// Execute search
let top_docs = searcher
.search(&*final_query, &TopDocs::with_limit(options.limit + options.offset))
.map_err(|e| DBError(format!("Search failed: {}", e)))?;
let total_hits = top_docs.len();
let mut documents = Vec::new();
for (score, doc_address) in top_docs.into_iter().skip(options.offset).take(options.limit) {
let retrieved_doc: TantivyDocument = searcher
.doc(doc_address)
.map_err(|e| DBError(format!("Failed to retrieve doc: {}", e)))?;
let mut doc_fields = HashMap::new();
// Extract stored fields (or synthesize)
for (field_name, (field, field_def)) in &self.index_schema.fields {
match field_def {
FieldDef::Text { stored, .. } | FieldDef::Tag { stored, .. } => {
if *stored {
if let Some(value) = retrieved_doc.get_first(*field) {
if let Some(text) = value.as_str() {
doc_fields.insert(field_name.clone(), text.to_string());
}
}
}
}
FieldDef::Numeric {
stored, precision, ..
} => {
if *stored {
let value_str = match precision {
NumericType::I64 => retrieved_doc
.get_first(*field)
.and_then(|v| v.as_i64())
.map(|v| v.to_string()),
NumericType::U64 => retrieved_doc
.get_first(*field)
.and_then(|v| v.as_u64())
.map(|v| v.to_string()),
NumericType::F64 => retrieved_doc
.get_first(*field)
.and_then(|v| v.as_f64())
.map(|v| v.to_string()),
NumericType::Date => retrieved_doc
.get_first(*field)
.and_then(|v| v.as_datetime())
.map(|v| v.into_timestamp_millis().to_string()),
};
if let Some(v) = value_str {
doc_fields.insert(field_name.clone(), v);
}
}
}
FieldDef::Geo { stored } => {
if *stored {
let lat_field = self
.index_schema
.fields
.get(&format!("{}_lat", field_name))
.unwrap()
.0;
let lon_field = self
.index_schema
.fields
.get(&format!("{}_lon", field_name))
.unwrap()
.0;
let lat = retrieved_doc.get_first(lat_field).and_then(|v| v.as_f64());
let lon = retrieved_doc.get_first(lon_field).and_then(|v| v.as_f64());
if let (Some(lat), Some(lon)) = (lat, lon) {
doc_fields.insert(field_name.clone(), format!("{},{}", lat, lon));
}
}
}
}
}
documents.push(SearchDocument {
fields: doc_fields,
score,
});
}
Ok(SearchResults {
total: total_hits,
documents,
})
}
pub fn get_info(&self) -> Result<IndexInfo, DBError> {
let searcher = self.reader.searcher();
let num_docs = searcher.num_docs();
let fields_info: Vec<FieldInfo> = self
.index_schema
.fields
.iter()
.map(|(name, (_, def))| FieldInfo {
name: name.clone(),
field_type: format!("{:?}", def),
})
.collect();
Ok(IndexInfo {
name: self.name.clone(),
num_docs,
fields: fields_info,
config: self.config.clone(),
})
}
/// Delete a document by its _id term. Returns true if the document existed before deletion.
pub fn delete_document_by_id(&self, doc_id: &str) -> Result<bool, DBError> {
// Determine existence by running a tiny term query
let existed = if let Some((id_field, _)) = self.index_schema.fields.get("_id") {
let term = Term::from_field_text(*id_field, doc_id);
let searcher = self.reader.searcher();
let tq = TermQuery::new(term.clone(), IndexRecordOption::Basic);
let hits = searcher
.search(&tq, &TopDocs::with_limit(1))
.map_err(|e| DBError(format!("Failed to search for existing doc: {}", e)))?;
!hits.is_empty()
} else {
false
};
// Perform deletion and commit
let mut writer = self
.writer
.write()
.map_err(|e| DBError(format!("Failed to acquire writer lock: {}", e)))?;
if let Some((id_field, _)) = self.index_schema.fields.get("_id") {
writer.delete_term(Term::from_field_text(*id_field, doc_id));
}
writer
.commit()
.map_err(|e| DBError(format!("Failed to commit delete: {}", e)))?;
// Refresh reader to observe deletion
self.reader
.reload()
.map_err(|e| DBError(format!("Failed to reload reader: {}", e)))?;
Ok(existed)
}
}
#[derive(Debug, Clone)]
pub struct SearchOptions {
pub limit: usize,
pub offset: usize,
pub filters: Vec<Filter>,
pub sort_by: Option<String>,
pub return_fields: Option<Vec<String>>,
pub highlight: bool,
}
impl Default for SearchOptions {
fn default() -> Self {
SearchOptions {
limit: 10,
offset: 0,
filters: vec![],
sort_by: None,
return_fields: None,
highlight: false,
}
}
}
#[derive(Debug, Clone)]
pub struct Filter {
pub field: String,
pub filter_type: FilterType,
}
#[derive(Debug, Clone)]
pub enum FilterType {
Equals(String),
Range { min: String, max: String },
InSet(Vec<String>),
}
#[derive(Debug)]
pub struct SearchResults {
pub total: usize,
pub documents: Vec<SearchDocument>,
}
#[derive(Debug)]
pub struct SearchDocument {
pub fields: HashMap<String, String>,
pub score: f32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct IndexInfo {
pub name: String,
pub num_docs: u64,
pub fields: Vec<FieldInfo>,
pub config: IndexConfig,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct FieldInfo {
pub name: String,
pub field_type: String,
}

View File

@@ -1,10 +1,11 @@
#!/bin/bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
# Test script for HeroDB - Redis-compatible database with redb backend
# This script starts the server and runs comprehensive tests
set -e
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'

View File

@@ -1,4 +1,5 @@
use herodb::{server::Server, options::DBOption};
use std::path::PathBuf;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
@@ -22,7 +23,7 @@ async fn debug_hset_simple() {
let port = 16500;
let option = DBOption {
dir: test_dir.to_string(),
dir: PathBuf::from(test_dir),
port,
debug: false,
encrypt: false,

View File

@@ -1,4 +1,5 @@
use herodb::{server::Server, options::DBOption};
use std::path::PathBuf;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
@@ -13,7 +14,7 @@ async fn debug_hset_return_value() {
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir.to_string(),
dir: PathBuf::from(test_dir),
port: 16390,
debug: false,
encrypt: false,

View File

@@ -0,0 +1,484 @@
use redis::{Client, Connection, RedisResult, Value};
use std::process::{Child, Command};
use std::time::Duration;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use herodb::rpc::{BackendType, DatabaseConfig, RpcClient};
use base64::Engine;
use tokio::time::sleep;
// ------------------------
// Helpers
// ------------------------
fn get_redis_connection(port: u16) -> Connection {
let connection_info = format!("redis://127.0.0.1:{}", port);
let client = Client::open(connection_info).unwrap();
let mut attempts = 0;
loop {
match client.get_connection() {
Ok(mut conn) => {
if redis::cmd("PING").query::<String>(&mut conn).is_ok() {
return conn;
}
}
Err(e) => {
if attempts >= 3600 {
panic!("Failed to connect to Redis server after 3600 attempts: {}", e);
}
}
}
attempts += 1;
std::thread::sleep(Duration::from_millis(500));
}
}
async fn get_rpc_client(port: u16) -> HttpClient {
let url = format!("http://127.0.0.1:{}", port + 1); // RPC port = Redis port + 1
HttpClientBuilder::default().build(url).unwrap()
}
/// Wait until RPC server is responsive (getServerStats succeeds) or panic after retries.
async fn wait_for_rpc_ready(client: &HttpClient, max_attempts: u32, delay: Duration) {
for _ in 0..max_attempts {
match client.get_server_stats().await {
Ok(_) => return,
Err(_) => {
sleep(delay).await;
}
}
}
panic!("RPC server did not become ready in time");
}
// A guard to ensure the server process is killed when it goes out of scope and test dir cleaned.
struct ServerProcessGuard {
process: Child,
test_dir: String,
}
impl Drop for ServerProcessGuard {
fn drop(&mut self) {
eprintln!("Killing server process (pid: {})...", self.process.id());
if let Err(e) = self.process.kill() {
eprintln!("Failed to kill server process: {}", e);
}
match self.process.wait() {
Ok(status) => eprintln!("Server process exited with: {}", status),
Err(e) => eprintln!("Failed to wait on server process: {}", e),
}
// Clean up the specific test directory
eprintln!("Cleaning up test directory: {}", self.test_dir);
if let Err(e) = std::fs::remove_dir_all(&self.test_dir) {
eprintln!("Failed to clean up test directory: {}", e);
}
}
}
// Helper to set up the server and return guard + ports
async fn setup_server() -> (ServerProcessGuard, u16) {
use std::sync::atomic::{AtomicU16, Ordering};
static PORT_COUNTER: AtomicU16 = AtomicU16::new(17500);
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
let test_dir = format!("/tmp/herodb_lance_test_{}", port);
// Clean up previous test data
if std::path::Path::new(&test_dir).exists() {
let _ = std::fs::remove_dir_all(&test_dir);
}
std::fs::create_dir_all(&test_dir).unwrap();
// Start the server in a subprocess with RPC enabled (follows tantivy test pattern)
let child = Command::new("cargo")
.args(&[
"run",
"--",
"--dir",
&test_dir,
"--port",
&port.to_string(),
"--rpc-port",
&(port + 1).to_string(),
"--enable-rpc",
"--debug",
"--admin-secret",
"test-admin",
])
.spawn()
.expect("Failed to start server process");
let guard = ServerProcessGuard {
process: child,
test_dir,
};
// Give the server time to build and start (cargo run may compile first)
// Increase significantly to accommodate first-time dependency compilation in CI.
std::thread::sleep(Duration::from_millis(60000));
(guard, port)
}
// Convenient helpers for assertions on redis::Value
fn value_is_ok(v: &Value) -> bool {
match v {
Value::Okay => true,
Value::Status(s) if s == "OK" => true,
Value::Data(d) if d == b"OK" => true,
_ => false,
}
}
fn value_is_int_eq(v: &Value, expected: i64) -> bool {
matches!(v, Value::Int(n) if *n == expected)
}
fn value_is_str_eq(v: &Value, expected: &str) -> bool {
match v {
Value::Status(s) => s == expected,
Value::Data(d) => String::from_utf8_lossy(d) == expected,
_ => false,
}
}
fn to_string_lossy(v: &Value) -> String {
match v {
Value::Nil => "Nil".to_string(),
Value::Int(n) => n.to_string(),
Value::Status(s) => s.clone(),
Value::Okay => "OK".to_string(),
Value::Data(d) => String::from_utf8_lossy(d).to_string(),
Value::Bulk(items) => {
let inner: Vec<String> = items.iter().map(to_string_lossy).collect();
format!("[{}]", inner.join(", "))
}
}
}
// Extract ids from LANCE.SEARCH / LANCE.SEARCHIMAGE reply which is:
// Array of elements: [ [id, score, [k,v,...]], [id, score, ...], ... ]
fn extract_hit_ids(v: &Value) -> Vec<String> {
let mut ids = Vec::new();
if let Value::Bulk(items) = v {
for item in items {
if let Value::Bulk(row) = item {
if !row.is_empty() {
// first element is id (Data or Status)
let id = match &row[0] {
Value::Data(d) => String::from_utf8_lossy(d).to_string(),
Value::Status(s) => s.clone(),
Value::Int(n) => n.to_string(),
_ => continue,
};
ids.push(id);
}
}
}
}
ids
}
// Check whether a Bulk array (RESP array) contains a given string element.
fn bulk_contains_string(v: &Value, needle: &str) -> bool {
match v {
Value::Bulk(items) => items.iter().any(|it| match it {
Value::Data(d) => String::from_utf8_lossy(d).contains(needle),
Value::Status(s) => s.contains(needle),
Value::Bulk(_) => bulk_contains_string(it, needle),
_ => false,
}),
_ => false,
}
}
// ------------------------
// Test: Lance end-to-end (RESP) using only local embedders
// ------------------------
#[tokio::test]
async fn test_lance_end_to_end() {
let (_guard, port) = setup_server().await;
// First, wait for RESP to be available; this also gives cargo-run child ample time to finish building.
// Reuse the helper that retries PING until success.
{
let _conn_ready = get_redis_connection(port);
// Drop immediately; we only needed readiness.
}
// Build RPC client and create a Lance DB
let rpc_client = get_rpc_client(port).await;
// Ensure RPC server is listening before we issue createDatabase (allow longer warm-up to accommodate first-build costs)
wait_for_rpc_ready(&rpc_client, 3600, Duration::from_millis(250)).await;
let db_config = DatabaseConfig {
name: Some("media-db".to_string()),
storage_path: None,
max_size: None,
redis_version: None,
};
let db_id = rpc_client
.create_database(BackendType::Lance, db_config, None)
.await
.expect("create_database Lance failed");
assert_eq!(db_id, 1, "Expected first Lance DB id to be 1");
// Add access keys
let _ = rpc_client
.add_access_key(db_id, "readwrite_key".to_string(), "readwrite".to_string())
.await
.expect("add_access_key readwrite failed");
let _ = rpc_client
.add_access_key(db_id, "read_key".to_string(), "read".to_string())
.await
.expect("add_access_key read failed");
// Connect to Redis and SELECT DB with readwrite key
let mut conn = get_redis_connection(port);
let sel_ok: RedisResult<String> = redis::cmd("SELECT")
.arg(db_id)
.arg("KEY")
.arg("readwrite_key")
.query(&mut conn);
assert!(sel_ok.is_ok(), "SELECT db with key failed: {:?}", sel_ok);
assert_eq!(sel_ok.unwrap(), "OK");
// 1) Configure embedding providers: textset -> testhash dim 64, imageset -> testimagehash dim 512
let v = redis::cmd("LANCE.EMBEDDING")
.arg("CONFIG")
.arg("SET")
.arg("textset")
.arg("PROVIDER")
.arg("testhash")
.arg("MODEL")
.arg("any")
.arg("PARAM")
.arg("dim")
.arg("64")
.query::<Value>(&mut conn)
.unwrap();
assert!(value_is_ok(&v), "Embedding config set (text) not OK: {}", to_string_lossy(&v));
let v = redis::cmd("LANCE.EMBEDDING")
.arg("CONFIG")
.arg("SET")
.arg("imageset")
.arg("PROVIDER")
.arg("testimagehash")
.arg("MODEL")
.arg("any")
.arg("PARAM")
.arg("dim")
.arg("512")
.query::<Value>(&mut conn)
.unwrap();
assert!(value_is_ok(&v), "Embedding config set (image) not OK: {}", to_string_lossy(&v));
// 2) Create datasets
let v = redis::cmd("LANCE.CREATE")
.arg("textset")
.arg("DIM")
.arg(64)
.query::<Value>(&mut conn)
.unwrap();
assert!(value_is_ok(&v), "LANCE.CREATE textset failed: {}", to_string_lossy(&v));
let v = redis::cmd("LANCE.CREATE")
.arg("imageset")
.arg("DIM")
.arg(512)
.query::<Value>(&mut conn)
.unwrap();
assert!(value_is_ok(&v), "LANCE.CREATE imageset failed: {}", to_string_lossy(&v));
// 3) Store two text documents
let v = redis::cmd("LANCE.STORE")
.arg("textset")
.arg("ID")
.arg("doc-1")
.arg("TEXT")
.arg("The quick brown fox jumps over the lazy dog")
.arg("META")
.arg("title")
.arg("Fox")
.arg("category")
.arg("animal")
.query::<Value>(&mut conn)
.unwrap();
assert!(value_is_ok(&v), "LANCE.STORE doc-1 failed: {}", to_string_lossy(&v));
let v = redis::cmd("LANCE.STORE")
.arg("textset")
.arg("ID")
.arg("doc-2")
.arg("TEXT")
.arg("A fast auburn fox vaulted a sleepy canine")
.arg("META")
.arg("title")
.arg("Paraphrase")
.arg("category")
.arg("animal")
.query::<Value>(&mut conn)
.unwrap();
assert!(value_is_ok(&v), "LANCE.STORE doc-2 failed: {}", to_string_lossy(&v));
// 4) Store two images via BYTES (local fake bytes; embedder only hashes bytes, not decoding)
let img1: Vec<u8> = b"local-image-bytes-1-abcdefghijklmnopqrstuvwxyz".to_vec();
let img2: Vec<u8> = b"local-image-bytes-2-ABCDEFGHIJKLMNOPQRSTUVWXYZ".to_vec();
let img1_b64 = base64::engine::general_purpose::STANDARD.encode(&img1);
let img2_b64 = base64::engine::general_purpose::STANDARD.encode(&img2);
let v = redis::cmd("LANCE.STOREIMAGE")
.arg("imageset")
.arg("ID")
.arg("img-1")
.arg("BYTES")
.arg(&img1_b64)
.arg("META")
.arg("title")
.arg("Local1")
.arg("group")
.arg("demo")
.query::<Value>(&mut conn)
.unwrap();
assert!(value_is_ok(&v), "LANCE.STOREIMAGE img-1 failed: {}", to_string_lossy(&v));
let v = redis::cmd("LANCE.STOREIMAGE")
.arg("imageset")
.arg("ID")
.arg("img-2")
.arg("BYTES")
.arg(&img2_b64)
.arg("META")
.arg("title")
.arg("Local2")
.arg("group")
.arg("demo")
.query::<Value>(&mut conn)
.unwrap();
assert!(value_is_ok(&v), "LANCE.STOREIMAGE img-2 failed: {}", to_string_lossy(&v));
// 5) Search text: K 2 QUERY "quick brown fox" RETURN 1 title
let v = redis::cmd("LANCE.SEARCH")
.arg("textset")
.arg("K")
.arg(2)
.arg("QUERY")
.arg("quick brown fox")
.arg("RETURN")
.arg(1)
.arg("title")
.query::<Value>(&mut conn)
.unwrap();
// Should be an array of hits
let ids = extract_hit_ids(&v);
assert!(
ids.contains(&"doc-1".to_string()) || ids.contains(&"doc-2".to_string()),
"LANCE.SEARCH should return doc-1/doc-2; got: {}",
to_string_lossy(&v)
);
// With FILTER on category
let v = redis::cmd("LANCE.SEARCH")
.arg("textset")
.arg("K")
.arg(2)
.arg("QUERY")
.arg("fox jumps")
.arg("FILTER")
.arg("category = 'animal'")
.arg("RETURN")
.arg(1)
.arg("title")
.query::<Value>(&mut conn)
.unwrap();
let ids_f = extract_hit_ids(&v);
assert!(
!ids_f.is_empty(),
"Filtered LANCE.SEARCH should return at least one document; got: {}",
to_string_lossy(&v)
);
// 6) Search images with QUERYBYTES
let query_img: Vec<u8> = b"local-image-query-3-1234567890".to_vec();
let query_img_b64 = base64::engine::general_purpose::STANDARD.encode(&query_img);
let v = redis::cmd("LANCE.SEARCHIMAGE")
.arg("imageset")
.arg("K")
.arg(2)
.arg("QUERYBYTES")
.arg(&query_img_b64)
.arg("RETURN")
.arg(1)
.arg("title")
.query::<Value>(&mut conn)
.unwrap();
// Should get 2 hits (img-1 and img-2) in some order; assert array non-empty
let img_ids = extract_hit_ids(&v);
assert!(
!img_ids.is_empty(),
"LANCE.SEARCHIMAGE should return non-empty results; got: {}",
to_string_lossy(&v)
);
// 7) Inspect datasets
let v = redis::cmd("LANCE.LIST").query::<Value>(&mut conn).unwrap();
assert!(
bulk_contains_string(&v, "textset"),
"LANCE.LIST missing textset: {}",
to_string_lossy(&v)
);
assert!(
bulk_contains_string(&v, "imageset"),
"LANCE.LIST missing imageset: {}",
to_string_lossy(&v)
);
// INFO textset
let info_text = redis::cmd("LANCE.INFO")
.arg("textset")
.query::<Value>(&mut conn)
.unwrap();
// INFO returns Array [k,v,k,v,...] including "dimension" "64" and "row_count" "...".
let info_str = to_string_lossy(&info_text);
assert!(
info_str.contains("dimension") && info_str.contains("64"),
"LANCE.INFO textset should include dimension 64; got: {}",
info_str
);
// 8) Delete by id and drop datasets
let v = redis::cmd("LANCE.DEL")
.arg("textset")
.arg("doc-2")
.query::<Value>(&mut conn)
.unwrap();
// Returns SimpleString "1" or Int 1 depending on encoding path; accept either
assert!(
value_is_int_eq(&v, 1) || value_is_str_eq(&v, "1"),
"LANCE.DEL doc-2 expected 1; got {}",
to_string_lossy(&v)
);
let v = redis::cmd("LANCE.DROP")
.arg("textset")
.query::<Value>(&mut conn)
.unwrap();
assert!(value_is_ok(&v), "LANCE.DROP textset failed: {}", to_string_lossy(&v));
let v = redis::cmd("LANCE.DROP")
.arg("imageset")
.query::<Value>(&mut conn)
.unwrap();
assert!(value_is_ok(&v), "LANCE.DROP imageset failed: {}", to_string_lossy(&v));
}

View File

@@ -1,4 +1,5 @@
use herodb::{server::Server, options::DBOption};
use std::path::PathBuf;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
@@ -17,7 +18,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir,
dir: PathBuf::from(test_dir),
port,
debug: true,
encrypt: false,

View File

@@ -1,6 +1,7 @@
use herodb::rpc::{BackendType, DatabaseConfig};
use herodb::admin_meta;
use herodb::options::BackendType as OptionsBackendType;
use std::path::Path;
#[tokio::test]
async fn test_rpc_server_basic() {
@@ -70,11 +71,11 @@ async fn test_database_name_persistence() {
let _ = std::fs::remove_dir_all(base_dir);
// Set the database name
admin_meta::set_database_name(base_dir, backend.clone(), admin_secret, db_id, test_name)
admin_meta::set_database_name(Path::new(base_dir), backend.clone(), admin_secret, db_id, test_name)
.expect("Failed to set database name");
// Retrieve the database name
let retrieved_name = admin_meta::get_database_name(base_dir, backend, admin_secret, db_id)
let retrieved_name = admin_meta::get_database_name(Path::new(base_dir), backend, admin_secret, db_id)
.expect("Failed to get database name");
// Verify the name matches

View File

@@ -1,4 +1,5 @@
use herodb::{server::Server, options::DBOption};
use std::path::PathBuf;
use std::time::Duration;
use tokio::time::sleep;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -19,7 +20,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir,
dir: PathBuf::from(test_dir),
port,
debug: true,
encrypt: false,

View File

@@ -1,4 +1,5 @@
use herodb::{server::Server, options::DBOption};
use std::path::PathBuf;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
@@ -17,7 +18,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir,
dir: PathBuf::from(test_dir),
port,
debug: false,
encrypt: false,

View File

@@ -0,0 +1,294 @@
use redis::{Client, Connection, RedisResult};
use std::process::{Child, Command};
use std::time::Duration;
use jsonrpsee::http_client::{HttpClientBuilder, HttpClient};
use herodb::rpc::{RpcClient, BackendType, DatabaseConfig};
// Helper function to get Redis connection, retrying until successful
fn get_redis_connection(port: u16) -> Connection {
let connection_info = format!("redis://127.0.0.1:{}", port);
let client = Client::open(connection_info).unwrap();
let mut attempts = 0;
loop {
match client.get_connection() {
Ok(mut conn) => {
if redis::cmd("PING").query::<String>(&mut conn).is_ok() {
return conn;
}
}
Err(e) => {
if attempts >= 120 {
panic!(
"Failed to connect to Redis server after 120 attempts: {}",
e
);
}
}
}
attempts += 1;
std::thread::sleep(Duration::from_millis(100));
}
}
// Helper function to get RPC client
async fn get_rpc_client(port: u16) -> HttpClient {
let url = format!("http://127.0.0.1:{}", port + 1); // RPC port is Redis port + 1
let client = HttpClientBuilder::default().build(url).unwrap();
client
}
// A guard to ensure the server process is killed when it goes out of scope
struct ServerProcessGuard {
process: Child,
test_dir: String,
}
impl Drop for ServerProcessGuard {
fn drop(&mut self) {
println!("Killing server process (pid: {})...", self.process.id());
if let Err(e) = self.process.kill() {
eprintln!("Failed to kill server process: {}", e);
}
match self.process.wait() {
Ok(status) => println!("Server process exited with: {}", status),
Err(e) => eprintln!("Failed to wait on server process: {}", e),
}
// Clean up the specific test directory
println!("Cleaning up test directory: {}", self.test_dir);
if let Err(e) = std::fs::remove_dir_all(&self.test_dir) {
eprintln!("Failed to clean up test directory: {}", e);
}
}
}
// Helper to set up the server and return connections
async fn setup_server() -> (ServerProcessGuard, u16, Connection, HttpClient) {
use std::sync::atomic::{AtomicU16, Ordering};
static PORT_COUNTER: AtomicU16 = AtomicU16::new(16500);
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
let test_dir = format!("/tmp/herodb_tantivy_test_{}", port);
// Clean up previous test data
if std::path::Path::new(&test_dir).exists() {
let _ = std::fs::remove_dir_all(&test_dir);
}
std::fs::create_dir_all(&test_dir).unwrap();
// Start the server in a subprocess
let child = Command::new("cargo")
.args(&[
"run",
"--",
"--dir",
&test_dir,
"--port",
&port.to_string(),
"--rpc-port",
&(port + 1).to_string(),
"--enable-rpc",
"--debug",
"--admin-secret",
"test-admin",
])
.spawn()
.expect("Failed to start server process");
// Create a new guard that also owns the test directory path
let guard = ServerProcessGuard {
process: child,
test_dir,
};
// Give the server time to build and start (cargo run may compile first)
std::thread::sleep(Duration::from_millis(3000));
let conn = get_redis_connection(port);
let rpc_client = get_rpc_client(port).await;
(guard, port, conn, rpc_client)
}
#[tokio::test]
async fn test_tantivy_full_text_search() {
let (_server_guard, _port, mut conn, rpc_client) = setup_server().await;
// Create a Tantivy database via RPC
let db_config = DatabaseConfig {
name: Some("test_tantivy_db".to_string()),
storage_path: None,
max_size: None,
redis_version: None,
};
let db_id = rpc_client.create_database(BackendType::Tantivy, db_config, None).await.unwrap();
assert_eq!(db_id, 1);
// Add readwrite access key
let _ = rpc_client.add_access_key(db_id, "readwrite_key".to_string(), "readwrite".to_string()).await.unwrap();
// Add read-only access key
let _ = rpc_client.add_access_key(db_id, "read_key".to_string(), "read".to_string()).await.unwrap();
// Test with readwrite permissions
test_tantivy_with_readwrite_permissions(&mut conn, db_id).await;
// Test with read-only permissions
test_tantivy_with_read_permissions(&mut conn, db_id).await;
// Test access denied for invalid key
test_tantivy_access_denied(&mut conn, db_id).await;
}
async fn test_tantivy_with_readwrite_permissions(conn: &mut Connection, db_id: u64) {
// Select database with readwrite key
let result: RedisResult<String> = redis::cmd("SELECT")
.arg(db_id)
.arg("KEY")
.arg("readwrite_key")
.query(conn);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "OK");
// Test FT.CREATE
let result: RedisResult<String> = redis::cmd("FT.CREATE")
.arg("test_index")
.arg("SCHEMA")
.arg("title")
.arg("TEXT")
.arg("content")
.arg("TEXT")
.arg("tags")
.arg("TAG")
.query(conn);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "OK");
// Test FT.ADD
let result: RedisResult<String> = redis::cmd("FT.ADD")
.arg("test_index")
.arg("doc1")
.arg("1.0")
.arg("title")
.arg("Hello World")
.arg("content")
.arg("This is a test document")
.arg("tags")
.arg("test,example")
.query(conn);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "OK");
// Add another document
let result: RedisResult<String> = redis::cmd("FT.ADD")
.arg("test_index")
.arg("doc2")
.arg("1.0")
.arg("title")
.arg("Goodbye World")
.arg("content")
.arg("Another test document")
.arg("tags")
.arg("test,another")
.query(conn);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "OK");
// Test FT.SEARCH
let result: RedisResult<Vec<String>> = redis::cmd("FT.SEARCH")
.arg("test_index")
.arg("test")
.query(conn);
assert!(result.is_ok());
let results = result.unwrap();
assert!(results.len() >= 3); // At least total count + 2 documents
assert_eq!(results[0], "2"); // Total matches
// Test FT.INFO
let result: RedisResult<Vec<String>> = redis::cmd("FT.INFO")
.arg("test_index")
.query(conn);
assert!(result.is_ok());
let info = result.unwrap();
assert!(info.contains(&"index_name".to_string()));
assert!(info.contains(&"test_index".to_string()));
// Test FT.DEL
let result: RedisResult<String> = redis::cmd("FT.DEL")
.arg("test_index")
.arg("doc1")
.query(conn);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "1");
// Verify document was deleted
let result: RedisResult<Vec<String>> = redis::cmd("FT.SEARCH")
.arg("test_index")
.arg("Hello")
.query(conn);
assert!(result.is_ok());
let results = result.unwrap();
assert_eq!(results[0], "0"); // No matches
// Test FT.DROP
let result: RedisResult<String> = redis::cmd("FT.DROP")
.arg("test_index")
.query(conn);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "OK");
// Verify index was dropped
let result: RedisResult<String> = redis::cmd("FT.INFO")
.arg("test_index")
.query(conn);
assert!(result.is_err()); // Should fail
}
async fn test_tantivy_with_read_permissions(conn: &mut Connection, db_id: u64) {
// Select database with read-only key
let result: RedisResult<String> = redis::cmd("SELECT")
.arg(db_id)
.arg("KEY")
.arg("read_key")
.query(conn);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "OK");
// Recreate index for testing
let result: RedisResult<String> = redis::cmd("FT.CREATE")
.arg("test_index_read")
.arg("SCHEMA")
.arg("title")
.arg("TEXT")
.query(conn);
assert!(result.is_err()); // Should fail due to read-only permissions
assert!(result.unwrap_err().to_string().contains("write permission denied"));
// Add document should fail
let result: RedisResult<String> = redis::cmd("FT.ADD")
.arg("test_index_read")
.arg("doc1")
.arg("1.0")
.arg("title")
.arg("Test")
.query(conn);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("write permission denied"));
// But search should work (if index exists)
// First create index with write permissions, then switch to read
// For this test, we'll assume the index doesn't exist, so search fails differently
}
async fn test_tantivy_access_denied(conn: &mut Connection, db_id: u64) {
// Try to select with invalid key
let result: RedisResult<String> = redis::cmd("SELECT")
.arg(db_id)
.arg("KEY")
.arg("invalid_key")
.query(conn);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("invalid access key"));
}

View File

@@ -1,4 +1,5 @@
use herodb::{options::DBOption, server::Server};
use std::path::PathBuf;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::{sleep, Duration};
@@ -17,7 +18,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir,
dir: PathBuf::from(test_dir),
port,
debug: false,
encrypt: false,