Compare commits
	
		
			2 Commits
		
	
	
		
			a1127b72da
			...
			9410176684
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 9410176684 | |||
| ab56fad635 | 
							
								
								
									
										5669
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										5669
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							
							
								
								
									
										12
									
								
								Cargo.toml
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								Cargo.toml
									
									
									
									
									
								
							| @@ -24,6 +24,18 @@ age = "0.10" | ||||
| secrecy = "0.8" | ||||
| ed25519-dalek = "2" | ||||
| base64 = "0.22" | ||||
| # Lance vector database dependencies | ||||
| lance = "0.33" | ||||
| lance-index = "0.33" | ||||
| lance-linalg = "0.33" | ||||
| # Use Arrow version compatible with Lance 0.33 | ||||
| arrow = "55.2" | ||||
| arrow-array = "55.2" | ||||
| arrow-schema = "55.2" | ||||
| parquet = "55.2" | ||||
| uuid = { version = "1.10", features = ["v4"] } | ||||
| reqwest = { version = "0.11", features = ["json"] } | ||||
| image = "0.25" | ||||
|  | ||||
| [dev-dependencies] | ||||
| redis = { version = "0.24", features = ["aio", "tokio-comp"] } | ||||
|   | ||||
							
								
								
									
										454
									
								
								docs/lance_vector_db.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										454
									
								
								docs/lance_vector_db.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,454 @@ | ||||
| # Lance Vector Database Operations | ||||
|  | ||||
| HeroDB includes a powerful vector database integration using Lance, enabling high-performance vector storage, search, and multimodal data management. By default, it uses Ollama for local text embeddings, with support for custom external embedding services. | ||||
|  | ||||
| ## Overview | ||||
|  | ||||
| The Lance vector database integration provides: | ||||
|  | ||||
| - **High-performance vector storage** using Lance's columnar format | ||||
| - **Local Ollama integration** for text embeddings (default, no external dependencies) | ||||
| - **Custom embedding service support** for advanced use cases | ||||
| - **Text embedding support** (images via custom services) | ||||
| - **Vector similarity search** with configurable parameters | ||||
| - **Scalable indexing** with IVF_PQ (Inverted File with Product Quantization) | ||||
| - **Redis-compatible command interface** | ||||
|  | ||||
| ## Architecture | ||||
|  | ||||
| ``` | ||||
| ┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐ | ||||
| │   HeroDB        │    │  External        │    │   Lance         │ | ||||
| │   Redis Server  │◄──►│  Embedding       │    │   Vector Store  │ | ||||
| │                 │    │  Service         │    │                 │ | ||||
| └─────────────────┘    └──────────────────┘    └─────────────────┘ | ||||
|          │                       │                       │ | ||||
|          │                       │                       │ | ||||
|     Redis Protocol          HTTP API              Arrow/Parquet | ||||
|     Commands                JSON Requests         Columnar Storage | ||||
| ``` | ||||
|  | ||||
| ### Key Components | ||||
|  | ||||
| 1. **Lance Store**: High-performance columnar vector storage | ||||
| 2. **Ollama Integration**: Local embedding service (default) | ||||
| 3. **Custom Embedding Service**: Optional HTTP API for advanced use cases | ||||
| 4. **Redis Command Interface**: Familiar Redis-style commands | ||||
| 5. **Arrow Schema**: Flexible schema definition for metadata | ||||
|  | ||||
| ## Configuration | ||||
|  | ||||
| ### Default Setup (Ollama) | ||||
|  | ||||
| HeroDB uses Ollama by default for text embeddings. No configuration is required if Ollama is running locally: | ||||
|  | ||||
| ```bash | ||||
| # Install Ollama (if not already installed) | ||||
| # Visit: https://ollama.ai | ||||
|  | ||||
| # Pull the embedding model | ||||
| ollama pull nomic-embed-text | ||||
|  | ||||
| # Ollama automatically runs on localhost:11434 | ||||
| # HeroDB will use this by default | ||||
| ``` | ||||
|  | ||||
| **Default Configuration:** | ||||
| - **URL**: `http://localhost:11434` | ||||
| - **Model**: `nomic-embed-text` | ||||
| - **Dimensions**: 768 (for nomic-embed-text) | ||||
|  | ||||
| ### Custom Embedding Service (Optional) | ||||
|  | ||||
| To use a custom embedding service instead of Ollama: | ||||
|  | ||||
| ```bash | ||||
| # Set custom embedding service URL | ||||
| redis-cli HSET config:core:aiembed url "http://your-embedding-service:8080/embed" | ||||
|  | ||||
| # Optional: Set authentication if required | ||||
| redis-cli HSET config:core:aiembed token "your-api-token" | ||||
| ``` | ||||
|  | ||||
| ### Embedding Service API Contracts | ||||
|  | ||||
| #### Ollama API (Default) | ||||
| HeroDB calls Ollama using this format: | ||||
|  | ||||
| ```bash | ||||
| POST http://localhost:11434/api/embeddings | ||||
| Content-Type: application/json | ||||
|  | ||||
| { | ||||
|   "model": "nomic-embed-text", | ||||
|   "prompt": "Your text to embed" | ||||
| } | ||||
| ``` | ||||
|  | ||||
| Response: | ||||
| ```json | ||||
| { | ||||
|   "embedding": [0.1, 0.2, 0.3, ...] | ||||
| } | ||||
| ``` | ||||
|  | ||||
| #### Custom Service API | ||||
| Your custom embedding service should accept POST requests with this JSON format: | ||||
|  | ||||
| ```json | ||||
| { | ||||
|   "texts": ["text1", "text2"],           // Optional: array of texts | ||||
|   "images": ["base64_image1", "base64_image2"], // Optional: base64 encoded images | ||||
|   "model": "your-model-name"             // Optional: model specification | ||||
| } | ||||
| ``` | ||||
|  | ||||
| And return responses in this format: | ||||
|  | ||||
| ```json | ||||
| { | ||||
|   "embeddings": [[0.1, 0.2, ...], [0.3, 0.4, ...]],  // Array of embedding vectors | ||||
|   "model": "model-name",                               // Model used | ||||
|   "usage": {                                          // Optional usage stats | ||||
|     "tokens": 100, | ||||
|     "requests": 2 | ||||
|   } | ||||
| } | ||||
| ``` | ||||
|  | ||||
| ## Commands Reference | ||||
|  | ||||
| ### Dataset Management | ||||
|  | ||||
| #### LANCE CREATE | ||||
| Create a new vector dataset with specified dimensions and optional schema. | ||||
|  | ||||
| ```bash | ||||
| LANCE CREATE <dataset> DIM <dimension> [SCHEMA field:type ...] | ||||
| ``` | ||||
|  | ||||
| **Parameters:** | ||||
| - `dataset`: Name of the dataset | ||||
| - `dimension`: Vector dimension (e.g., 384, 768, 1536) | ||||
| - `field:type`: Optional metadata fields (string, int, float, bool) | ||||
|  | ||||
| **Examples:** | ||||
| ```bash | ||||
| # Create a simple dataset for 384-dimensional vectors | ||||
| LANCE CREATE documents DIM 384 | ||||
|  | ||||
| # Create dataset with metadata schema | ||||
| LANCE CREATE products DIM 768 SCHEMA category:string price:float available:bool | ||||
| ``` | ||||
|  | ||||
| #### LANCE LIST | ||||
| List all available datasets. | ||||
|  | ||||
| ```bash | ||||
| LANCE LIST | ||||
| ``` | ||||
|  | ||||
| **Returns:** Array of dataset names | ||||
|  | ||||
| #### LANCE INFO | ||||
| Get information about a specific dataset. | ||||
|  | ||||
| ```bash | ||||
| LANCE INFO <dataset> | ||||
| ``` | ||||
|  | ||||
| **Returns:** Dataset metadata including name, version, row count, and schema | ||||
|  | ||||
| #### LANCE DROP | ||||
| Delete a dataset and all its data. | ||||
|  | ||||
| ```bash | ||||
| LANCE DROP <dataset> | ||||
| ``` | ||||
|  | ||||
| ### Data Operations | ||||
|  | ||||
| #### LANCE STORE | ||||
| Store multimodal data (text/images) with automatic embedding generation. | ||||
|  | ||||
| ```bash | ||||
| LANCE STORE <dataset> [TEXT <text>] [IMAGE <base64>] [key value ...] | ||||
| ``` | ||||
|  | ||||
| **Parameters:** | ||||
| - `dataset`: Target dataset name | ||||
| - `TEXT`: Text content to embed | ||||
| - `IMAGE`: Base64-encoded image to embed | ||||
| - `key value`: Metadata key-value pairs | ||||
|  | ||||
| **Examples:** | ||||
| ```bash | ||||
| # Store text with metadata | ||||
| LANCE STORE documents TEXT "Machine learning is transforming industries" category "AI" author "John Doe" | ||||
|  | ||||
| # Store image with metadata | ||||
| LANCE STORE images IMAGE "iVBORw0KGgoAAAANSUhEUgAA..." category "nature" tags "landscape,mountains" | ||||
|  | ||||
| # Store both text and image | ||||
| LANCE STORE multimodal TEXT "Beautiful sunset" IMAGE "base64data..." location "California" | ||||
| ``` | ||||
|  | ||||
| **Returns:** Unique ID of the stored item | ||||
|  | ||||
| ### Search Operations | ||||
|  | ||||
| #### LANCE SEARCH | ||||
| Search using a raw vector. | ||||
|  | ||||
| ```bash | ||||
| LANCE SEARCH <dataset> VECTOR <vector> K <k> [NPROBES <n>] [REFINE <r>] | ||||
| ``` | ||||
|  | ||||
| **Parameters:** | ||||
| - `dataset`: Dataset to search | ||||
| - `vector`: Comma-separated vector values (e.g., "0.1,0.2,0.3") | ||||
| - `k`: Number of results to return | ||||
| - `NPROBES`: Number of partitions to search (optional) | ||||
| - `REFINE`: Refine factor for better accuracy (optional) | ||||
|  | ||||
| **Example:** | ||||
| ```bash | ||||
| LANCE SEARCH documents VECTOR "0.1,0.2,0.3,0.4" K 5 NPROBES 10 | ||||
| ``` | ||||
|  | ||||
| #### LANCE SEARCH.TEXT | ||||
| Search using text query (automatically embedded). | ||||
|  | ||||
| ```bash | ||||
| LANCE SEARCH.TEXT <dataset> <query_text> K <k> [NPROBES <n>] [REFINE <r>] | ||||
| ``` | ||||
|  | ||||
| **Parameters:** | ||||
| - `dataset`: Dataset to search | ||||
| - `query_text`: Text query to search for | ||||
| - `k`: Number of results to return | ||||
| - `NPROBES`: Number of partitions to search (optional) | ||||
| - `REFINE`: Refine factor for better accuracy (optional) | ||||
|  | ||||
| **Example:** | ||||
| ```bash | ||||
| LANCE SEARCH.TEXT documents "artificial intelligence applications" K 10 NPROBES 20 | ||||
| ``` | ||||
|  | ||||
| **Returns:** Array of results with distance scores and metadata | ||||
|  | ||||
| ### Embedding Operations | ||||
|  | ||||
| #### LANCE EMBED.TEXT | ||||
| Generate embeddings for text without storing. | ||||
|  | ||||
| ```bash | ||||
| LANCE EMBED.TEXT <text1> [text2] [text3] ... | ||||
| ``` | ||||
|  | ||||
| **Example:** | ||||
| ```bash | ||||
| LANCE EMBED.TEXT "Hello world" "Machine learning" "Vector database" | ||||
| ``` | ||||
|  | ||||
| **Returns:** Array of embedding vectors | ||||
|  | ||||
| ### Index Management | ||||
|  | ||||
| #### LANCE CREATE.INDEX | ||||
| Create a vector index for faster search performance. | ||||
|  | ||||
| ```bash | ||||
| LANCE CREATE.INDEX <dataset> <index_type> [PARTITIONS <n>] [SUBVECTORS <n>] | ||||
| ``` | ||||
|  | ||||
| **Parameters:** | ||||
| - `dataset`: Dataset to index | ||||
| - `index_type`: Index type (currently supports "IVF_PQ") | ||||
| - `PARTITIONS`: Number of partitions (default: 256) | ||||
| - `SUBVECTORS`: Number of sub-vectors for PQ (default: 16) | ||||
|  | ||||
| **Example:** | ||||
| ```bash | ||||
| LANCE CREATE.INDEX documents IVF_PQ PARTITIONS 512 SUBVECTORS 32 | ||||
| ``` | ||||
|  | ||||
| ## Usage Patterns | ||||
|  | ||||
| ### 1. Document Search System | ||||
|  | ||||
| ```bash | ||||
| # Setup | ||||
| LANCE CREATE documents DIM 384 SCHEMA title:string content:string category:string | ||||
|  | ||||
| # Store documents | ||||
| LANCE STORE documents TEXT "Introduction to machine learning algorithms" title "ML Basics" category "education" | ||||
| LANCE STORE documents TEXT "Deep learning neural networks explained" title "Deep Learning" category "education" | ||||
| LANCE STORE documents TEXT "Building scalable web applications" title "Web Dev" category "programming" | ||||
|  | ||||
| # Create index for better performance | ||||
| LANCE CREATE.INDEX documents IVF_PQ PARTITIONS 256 | ||||
|  | ||||
| # Search | ||||
| LANCE SEARCH.TEXT documents "neural networks" K 5 | ||||
| ``` | ||||
|  | ||||
| ### 2. Image Similarity Search | ||||
|  | ||||
| ```bash | ||||
| # Setup | ||||
| LANCE CREATE images DIM 512 SCHEMA filename:string tags:string | ||||
|  | ||||
| # Store images (base64 encoded) | ||||
| LANCE STORE images IMAGE "iVBORw0KGgoAAAANSUhEUgAA..." filename "sunset.jpg" tags "nature,landscape" | ||||
| LANCE STORE images IMAGE "iVBORw0KGgoAAAANSUhEUgBB..." filename "city.jpg" tags "urban,architecture" | ||||
|  | ||||
| # Search by image | ||||
| LANCE STORE temp_search IMAGE "query_image_base64..." | ||||
| # Then use the returned ID to get embedding and search | ||||
| ``` | ||||
|  | ||||
| ### 3. Multimodal Content Management | ||||
|  | ||||
| ```bash | ||||
| # Setup | ||||
| LANCE CREATE content DIM 768 SCHEMA type:string source:string | ||||
|  | ||||
| # Store mixed content | ||||
| LANCE STORE content TEXT "Product description for smartphone" type "product" source "catalog" | ||||
| LANCE STORE content IMAGE "product_image_base64..." type "product_image" source "catalog" | ||||
|  | ||||
| # Search across all content types | ||||
| LANCE SEARCH.TEXT content "smartphone features" K 10 | ||||
| ``` | ||||
|  | ||||
| ## Performance Considerations | ||||
|  | ||||
| ### Vector Dimensions | ||||
| - **384**: Good for general text (e.g., sentence-transformers) | ||||
| - **768**: Standard for BERT-like models | ||||
| - **1536**: OpenAI text-embedding-ada-002 | ||||
| - **Higher dimensions**: Better accuracy but slower search | ||||
|  | ||||
| ### Index Configuration | ||||
| - **More partitions**: Better for larger datasets (>100K vectors) | ||||
| - **More sub-vectors**: Better compression but slower search | ||||
| - **NPROBES**: Higher values = better accuracy, slower search | ||||
|  | ||||
| ### Best Practices | ||||
|  | ||||
| 1. **Create indexes** for datasets with >1000 vectors | ||||
| 2. **Use appropriate dimensions** based on your embedding model | ||||
| 3. **Configure NPROBES** based on accuracy vs speed requirements | ||||
| 4. **Batch operations** when possible for better performance | ||||
| 5. **Monitor embedding service** response times and rate limits | ||||
|  | ||||
| ## Error Handling | ||||
|  | ||||
| Common error scenarios and solutions: | ||||
|  | ||||
| ### Embedding Service Errors | ||||
| ```bash | ||||
| # Error: Embedding service not configured | ||||
| ERR Embedding service URL not configured. Set it with: HSET config:core:aiembed url <YOUR_EMBEDDING_SERVICE_URL> | ||||
|  | ||||
| # Error: Service unavailable | ||||
| ERR Embedding service returned error 404 Not Found | ||||
| ``` | ||||
|  | ||||
| **Solution:** Ensure embedding service is running and URL is correct. | ||||
|  | ||||
| ### Dataset Errors | ||||
| ```bash | ||||
| # Error: Dataset doesn't exist | ||||
| ERR Dataset 'mydata' does not exist | ||||
|  | ||||
| # Error: Dimension mismatch | ||||
| ERR Vector dimension mismatch: expected 384, got 768 | ||||
| ``` | ||||
|  | ||||
| **Solution:** Create dataset first or check vector dimensions. | ||||
|  | ||||
| ### Search Errors | ||||
| ```bash | ||||
| # Error: Invalid vector format | ||||
| ERR Invalid vector format | ||||
|  | ||||
| # Error: No index available | ||||
| ERR No index available for fast search | ||||
| ``` | ||||
|  | ||||
| **Solution:** Check vector format or create an index. | ||||
|  | ||||
| ## Integration Examples | ||||
|  | ||||
| ### With Python | ||||
| ```python | ||||
| import redis | ||||
| import json | ||||
|  | ||||
| r = redis.Redis(host='localhost', port=6379) | ||||
|  | ||||
| # Create dataset | ||||
| r.execute_command('LANCE', 'CREATE', 'docs', 'DIM', '384') | ||||
|  | ||||
| # Store document | ||||
| result = r.execute_command('LANCE', 'STORE', 'docs',  | ||||
|                           'TEXT', 'Machine learning tutorial', | ||||
|                           'category', 'education') | ||||
| print(f"Stored with ID: {result}") | ||||
|  | ||||
| # Search | ||||
| results = r.execute_command('LANCE', 'SEARCH.TEXT', 'docs',  | ||||
|                            'machine learning', 'K', '5') | ||||
| print(f"Search results: {results}") | ||||
| ``` | ||||
|  | ||||
| ### With Node.js | ||||
| ```javascript | ||||
| const redis = require('redis'); | ||||
| const client = redis.createClient(); | ||||
|  | ||||
| // Create dataset | ||||
| await client.sendCommand(['LANCE', 'CREATE', 'docs', 'DIM', '384']); | ||||
|  | ||||
| // Store document | ||||
| const id = await client.sendCommand(['LANCE', 'STORE', 'docs',  | ||||
|                                    'TEXT', 'Deep learning guide', | ||||
|                                    'category', 'AI']); | ||||
|  | ||||
| // Search | ||||
| const results = await client.sendCommand(['LANCE', 'SEARCH.TEXT', 'docs', | ||||
|                                         'deep learning', 'K', '10']); | ||||
| ``` | ||||
|  | ||||
| ## Monitoring and Maintenance | ||||
|  | ||||
| ### Health Checks | ||||
| ```bash | ||||
| # Check if Lance store is available | ||||
| LANCE LIST | ||||
|  | ||||
| # Check dataset health | ||||
| LANCE INFO mydataset | ||||
|  | ||||
| # Test embedding service | ||||
| LANCE EMBED.TEXT "test" | ||||
| ``` | ||||
|  | ||||
| ### Maintenance Operations | ||||
| ```bash | ||||
| # Backup: Use standard Redis backup procedures | ||||
| # The Lance data is stored separately in the data directory | ||||
|  | ||||
| # Cleanup: Remove unused datasets | ||||
| LANCE DROP old_dataset | ||||
|  | ||||
| # Reindex: Drop and recreate indexes if needed | ||||
| LANCE DROP dataset_name | ||||
| LANCE CREATE dataset_name DIM 384 | ||||
| # Re-import data | ||||
| LANCE CREATE.INDEX dataset_name IVF_PQ | ||||
| ``` | ||||
|  | ||||
| This integration provides a powerful foundation for building AI-powered applications with vector search capabilities while maintaining the familiar Redis interface. | ||||
| @@ -1,6 +1,191 @@ | ||||
| # HeroDB Tantivy Search Examples | ||||
| # HeroDB Examples | ||||
|  | ||||
| This directory contains examples demonstrating HeroDB's full-text search capabilities powered by Tantivy. | ||||
| This directory contains examples demonstrating HeroDB's capabilities including full-text search powered by Tantivy and vector database operations using Lance. | ||||
|  | ||||
| ## Available Examples | ||||
|  | ||||
| 1. **[Tantivy Search Demo](#tantivy-search-demo-bash-script)** - Full-text search capabilities | ||||
| 2. **[Lance Vector Database Demo](#lance-vector-database-demo-bash-script)** - Vector database and AI operations | ||||
| 3. **[AGE Encryption Demo](age_bash_demo.sh)** - Cryptographic operations | ||||
| 4. **[Simple Demo](simple_demo.sh)** - Basic Redis operations | ||||
|  | ||||
| --- | ||||
|  | ||||
| ## Lance Vector Database Demo (Bash Script) | ||||
|  | ||||
| ### Overview | ||||
| The `lance_vector_demo.sh` script provides a comprehensive demonstration of HeroDB's vector database capabilities using Lance. It showcases vector storage, similarity search, multimodal data handling, and AI-powered operations with external embedding services. | ||||
|  | ||||
| ### Prerequisites | ||||
| 1. **HeroDB Server**: The server must be running (default port 6379) | ||||
| 2. **Redis CLI**: The `redis-cli` tool must be installed and available in your PATH | ||||
| 3. **Embedding Service** (optional): For full functionality, set up an external embedding service | ||||
|  | ||||
| ### Running the Demo | ||||
|  | ||||
| #### Step 1: Start HeroDB Server | ||||
| ```bash | ||||
| # From the project root directory | ||||
| cargo run -- --dir ./test_data --port 6379 | ||||
| ``` | ||||
|  | ||||
| #### Step 2: Run the Demo (in a new terminal) | ||||
| ```bash | ||||
| # From the project root directory | ||||
| ./examples/lance_vector_demo.sh | ||||
| ``` | ||||
|  | ||||
| ### What the Demo Covers | ||||
|  | ||||
| The script demonstrates comprehensive vector database operations: | ||||
|  | ||||
| 1. **Dataset Management** | ||||
|    - Creating vector datasets with custom dimensions | ||||
|    - Defining schemas with metadata fields | ||||
|    - Listing and inspecting datasets | ||||
|    - Dataset information and statistics | ||||
|  | ||||
| 2. **Embedding Operations** | ||||
|    - Text embedding generation via external services | ||||
|    - Multimodal embedding support (text + images) | ||||
|    - Batch embedding operations | ||||
|  | ||||
| 3. **Data Storage** | ||||
|    - Storing text documents with automatic embedding | ||||
|    - Storing images with metadata | ||||
|    - Multimodal content storage | ||||
|    - Rich metadata support | ||||
|  | ||||
| 4. **Vector Search** | ||||
|    - Similarity search with raw vectors | ||||
|    - Text-based semantic search | ||||
|    - Configurable search parameters (K, NPROBES, REFINE) | ||||
|    - Cross-modal search capabilities | ||||
|  | ||||
| 5. **Index Management** | ||||
|    - Creating IVF_PQ indexes for performance | ||||
|    - Custom index parameters | ||||
|    - Performance optimization | ||||
|  | ||||
| 6. **Advanced Features** | ||||
|    - Error handling and recovery | ||||
|    - Performance testing concepts | ||||
|    - Monitoring and maintenance | ||||
|    - Cleanup operations | ||||
|  | ||||
| ### Key Lance Commands Demonstrated | ||||
|  | ||||
| #### Dataset Management | ||||
| ```bash | ||||
| # Create vector dataset | ||||
| LANCE CREATE documents DIM 384 | ||||
|  | ||||
| # Create dataset with schema | ||||
| LANCE CREATE products DIM 768 SCHEMA category:string price:float available:bool | ||||
|  | ||||
| # List datasets | ||||
| LANCE LIST | ||||
|  | ||||
| # Get dataset information | ||||
| LANCE INFO documents | ||||
| ``` | ||||
|  | ||||
| #### Data Operations | ||||
| ```bash | ||||
| # Store text with metadata | ||||
| LANCE STORE documents TEXT "Machine learning tutorial" category "education" author "John Doe" | ||||
|  | ||||
| # Store image with metadata | ||||
| LANCE STORE images IMAGE "base64_encoded_image..." filename "photo.jpg" tags "nature,landscape" | ||||
|  | ||||
| # Store multimodal content | ||||
| LANCE STORE content TEXT "Product description" IMAGE "base64_image..." type "product" | ||||
| ``` | ||||
|  | ||||
| #### Search Operations | ||||
| ```bash | ||||
| # Search with raw vector | ||||
| LANCE SEARCH documents VECTOR "0.1,0.2,0.3,0.4" K 5 | ||||
|  | ||||
| # Semantic text search | ||||
| LANCE SEARCH.TEXT documents "artificial intelligence" K 10 NPROBES 20 | ||||
|  | ||||
| # Generate embeddings | ||||
| LANCE EMBED.TEXT "Hello world" "Machine learning" | ||||
| ``` | ||||
|  | ||||
| #### Index Management | ||||
| ```bash | ||||
| # Create performance index | ||||
| LANCE CREATE.INDEX documents IVF_PQ PARTITIONS 256 SUBVECTORS 16 | ||||
|  | ||||
| # Drop dataset | ||||
| LANCE DROP old_dataset | ||||
| ``` | ||||
|  | ||||
| ### Configuration | ||||
|  | ||||
| #### Setting Up Embedding Service | ||||
| ```bash | ||||
| # Configure embedding service URL | ||||
| redis-cli HSET config:core:aiembed url "http://your-embedding-service:8080/embed" | ||||
|  | ||||
| # Optional: Set authentication token | ||||
| redis-cli HSET config:core:aiembed token "your-api-token" | ||||
| ``` | ||||
|  | ||||
| #### Embedding Service API | ||||
| Your embedding service should accept POST requests: | ||||
| ```json | ||||
| { | ||||
|   "texts": ["text1", "text2"], | ||||
|   "images": ["base64_image1", "base64_image2"], | ||||
|   "model": "your-model-name" | ||||
| } | ||||
| ``` | ||||
|  | ||||
| And return responses: | ||||
| ```json | ||||
| { | ||||
|   "embeddings": [[0.1, 0.2, ...], [0.3, 0.4, ...]], | ||||
|   "model": "model-name", | ||||
|   "usage": {"tokens": 100, "requests": 2} | ||||
| } | ||||
| ``` | ||||
|  | ||||
| ### Interactive Features | ||||
|  | ||||
| The demo script includes: | ||||
| - **Colored output** for better readability | ||||
| - **Step-by-step execution** with explanations | ||||
| - **Error handling** demonstrations | ||||
| - **Automatic cleanup** options | ||||
| - **Performance testing** concepts | ||||
| - **Real-world usage** examples | ||||
|  | ||||
| ### Use Cases Demonstrated | ||||
|  | ||||
| 1. **Document Search System** | ||||
|    - Semantic document retrieval | ||||
|    - Metadata filtering | ||||
|    - Relevance ranking | ||||
|  | ||||
| 2. **Image Similarity Search** | ||||
|    - Visual content matching | ||||
|    - Tag-based filtering | ||||
|    - Multimodal queries | ||||
|  | ||||
| 3. **Product Recommendations** | ||||
|    - Feature-based similarity | ||||
|    - Category filtering | ||||
|    - Price range queries | ||||
|  | ||||
| 4. **Content Management** | ||||
|    - Mixed media storage | ||||
|    - Cross-modal search | ||||
|    - Rich metadata support | ||||
|  | ||||
| --- | ||||
|  | ||||
| ## Tantivy Search Demo (Bash Script) | ||||
|  | ||||
|   | ||||
							
								
								
									
										426
									
								
								examples/lance_vector_demo.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										426
									
								
								examples/lance_vector_demo.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,426 @@ | ||||
| #!/bin/bash | ||||
|  | ||||
| # Lance Vector Database Demo Script | ||||
| # This script demonstrates all Lance vector database operations in HeroDB | ||||
|  | ||||
| set -e  # Exit on any error | ||||
|  | ||||
| # Configuration | ||||
| REDIS_HOST="localhost" | ||||
| REDIS_PORT="6379" | ||||
| REDIS_CLI="redis-cli -h $REDIS_HOST -p $REDIS_PORT" | ||||
|  | ||||
| # Colors for output | ||||
| RED='\033[0;31m' | ||||
| GREEN='\033[0;32m' | ||||
| YELLOW='\033[1;33m' | ||||
| BLUE='\033[0;34m' | ||||
| NC='\033[0m' # No Color | ||||
|  | ||||
| # Helper functions | ||||
| log_info() { | ||||
|     echo -e "${BLUE}[INFO]${NC} $1" | ||||
| } | ||||
|  | ||||
| log_success() { | ||||
|     echo -e "${GREEN}[SUCCESS]${NC} $1" | ||||
| } | ||||
|  | ||||
| log_warning() { | ||||
|     echo -e "${YELLOW}[WARNING]${NC} $1" | ||||
| } | ||||
|  | ||||
| log_error() { | ||||
|     echo -e "${RED}[ERROR]${NC} $1" | ||||
| } | ||||
|  | ||||
| execute_command() { | ||||
|     local cmd="$1" | ||||
|     local description="$2" | ||||
|      | ||||
|     echo | ||||
|     log_info "Executing: $description" | ||||
|     echo "Command: $cmd" | ||||
|      | ||||
|     if result=$($cmd 2>&1); then | ||||
|         log_success "Result: $result" | ||||
|     else | ||||
|         log_error "Failed: $result" | ||||
|         return 1 | ||||
|     fi | ||||
| } | ||||
|  | ||||
| # Check if HeroDB is running | ||||
| check_herodb() { | ||||
|     log_info "Checking if HeroDB is running..." | ||||
|     if ! $REDIS_CLI ping > /dev/null 2>&1; then | ||||
|         log_error "HeroDB is not running. Please start it first:" | ||||
|         echo "  cargo run -- --dir ./test_data --port $REDIS_PORT" | ||||
|         exit 1 | ||||
|     fi | ||||
|     log_success "HeroDB is running" | ||||
| } | ||||
|  | ||||
| # Setup embedding service configuration | ||||
| setup_embedding_service() { | ||||
|     log_info "Setting up embedding service configuration..." | ||||
|      | ||||
|     # Note: This is a mock URL for demonstration | ||||
|     # In production, replace with your actual embedding service | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI HSET config:core:aiembed url 'http://localhost:8080/embed'" \ | ||||
|         "Configure embedding service URL" | ||||
|      | ||||
|     # Optional: Set authentication token | ||||
|     # execute_command \ | ||||
|     #     "$REDIS_CLI HSET config:core:aiembed token 'your-api-token'" \ | ||||
|     #     "Configure embedding service token" | ||||
|      | ||||
|     log_warning "Note: Embedding service at http://localhost:8080/embed is not running." | ||||
|     log_warning "Some operations will fail, but this demonstrates the command structure." | ||||
| } | ||||
|  | ||||
| # Dataset Management Operations | ||||
| demo_dataset_management() { | ||||
|     echo | ||||
|     echo "==========================================" | ||||
|     echo "         DATASET MANAGEMENT DEMO" | ||||
|     echo "==========================================" | ||||
|      | ||||
|     # List datasets (should be empty initially) | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE LIST" \ | ||||
|         "List all datasets (initially empty)" | ||||
|      | ||||
|     # Create a simple dataset | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE CREATE documents DIM 384" \ | ||||
|         "Create a simple document dataset with 384 dimensions" | ||||
|      | ||||
|     # Create a dataset with schema | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE CREATE products DIM 768 SCHEMA category:string price:float available:bool description:string" \ | ||||
|         "Create products dataset with custom schema" | ||||
|      | ||||
|     # Create an image dataset | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE CREATE images DIM 512 SCHEMA filename:string tags:string width:int height:int" \ | ||||
|         "Create images dataset for multimodal content" | ||||
|      | ||||
|     # List datasets again | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE LIST" \ | ||||
|         "List all datasets (should show 3 datasets)" | ||||
|      | ||||
|     # Get info about datasets | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE INFO documents" \ | ||||
|         "Get information about documents dataset" | ||||
|      | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE INFO products" \ | ||||
|         "Get information about products dataset" | ||||
| } | ||||
|  | ||||
| # Embedding Operations | ||||
| demo_embedding_operations() { | ||||
|     echo | ||||
|     echo "==========================================" | ||||
|     echo "         EMBEDDING OPERATIONS DEMO" | ||||
|     echo "==========================================" | ||||
|      | ||||
|     log_warning "The following operations will fail because no embedding service is running." | ||||
|     log_warning "This demonstrates the command structure and error handling." | ||||
|      | ||||
|     # Try to embed text (will fail without embedding service) | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE EMBED.TEXT 'Hello world'" \ | ||||
|         "Generate embedding for single text" || true | ||||
|      | ||||
|     # Try to embed multiple texts | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE EMBED.TEXT 'Machine learning' 'Artificial intelligence' 'Deep learning'" \ | ||||
|         "Generate embeddings for multiple texts" || true | ||||
| } | ||||
|  | ||||
| # Data Storage Operations | ||||
| demo_data_storage() { | ||||
|     echo | ||||
|     echo "==========================================" | ||||
|     echo "         DATA STORAGE DEMO" | ||||
|     echo "==========================================" | ||||
|      | ||||
|     log_warning "Storage operations will fail without embedding service, but show command structure." | ||||
|      | ||||
|     # Store text documents | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE STORE documents TEXT 'Introduction to machine learning algorithms and their applications in modern AI systems' category 'education' author 'John Doe' difficulty 'beginner'" \ | ||||
|         "Store a document with text and metadata" || true | ||||
|      | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE STORE documents TEXT 'Deep learning neural networks for computer vision tasks' category 'research' author 'Jane Smith' difficulty 'advanced'" \ | ||||
|         "Store another document" || true | ||||
|      | ||||
|     # Store product information | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE STORE products TEXT 'High-performance laptop with 16GB RAM and SSD storage' category 'electronics' price '1299.99' available 'true'" \ | ||||
|         "Store product with text description" || true | ||||
|      | ||||
|     # Store image with metadata (using placeholder base64) | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE STORE images IMAGE 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg==' filename 'sample.png' tags 'test,demo' width '1' height '1'" \ | ||||
|         "Store image with metadata (1x1 pixel PNG)" || true | ||||
|      | ||||
|     # Store multimodal content | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE STORE images TEXT 'Beautiful sunset over mountains' IMAGE 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg==' filename 'sunset.png' tags 'nature,landscape' location 'California'" \ | ||||
|         "Store multimodal content (text + image)" || true | ||||
| } | ||||
|  | ||||
| # Search Operations | ||||
| demo_search_operations() { | ||||
|     echo | ||||
|     echo "==========================================" | ||||
|     echo "         SEARCH OPERATIONS DEMO" | ||||
|     echo "==========================================" | ||||
|      | ||||
|     log_warning "Search operations will fail without data, but show command structure." | ||||
|      | ||||
|     # Search with raw vector | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE SEARCH documents VECTOR '0.1,0.2,0.3,0.4,0.5' K 5" \ | ||||
|         "Search with raw vector (5 results)" || true | ||||
|      | ||||
|     # Search with vector and parameters | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE SEARCH documents VECTOR '0.1,0.2,0.3,0.4,0.5' K 10 NPROBES 20 REFINE 2" \ | ||||
|         "Search with vector and advanced parameters" || true | ||||
|      | ||||
|     # Text-based search | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE SEARCH.TEXT documents 'machine learning algorithms' K 5" \ | ||||
|         "Search using text query" || true | ||||
|      | ||||
|     # Text search with parameters | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE SEARCH.TEXT products 'laptop computer' K 3 NPROBES 10" \ | ||||
|         "Search products using text with parameters" || true | ||||
|      | ||||
|     # Search in image dataset | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE SEARCH.TEXT images 'sunset landscape' K 5" \ | ||||
|         "Search images using text description" || true | ||||
| } | ||||
|  | ||||
| # Index Management Operations | ||||
| demo_index_management() { | ||||
|     echo | ||||
|     echo "==========================================" | ||||
|     echo "         INDEX MANAGEMENT DEMO" | ||||
|     echo "==========================================" | ||||
|      | ||||
|     # Create indexes for better search performance | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE CREATE.INDEX documents IVF_PQ" \ | ||||
|         "Create default IVF_PQ index for documents" | ||||
|      | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE CREATE.INDEX products IVF_PQ PARTITIONS 512 SUBVECTORS 32" \ | ||||
|         "Create IVF_PQ index with custom parameters for products" | ||||
|      | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE CREATE.INDEX images IVF_PQ PARTITIONS 256 SUBVECTORS 16" \ | ||||
|         "Create IVF_PQ index for images dataset" | ||||
|      | ||||
|     log_success "Indexes created successfully" | ||||
| } | ||||
|  | ||||
| # Advanced Usage Examples | ||||
| demo_advanced_usage() { | ||||
|     echo | ||||
|     echo "==========================================" | ||||
|     echo "         ADVANCED USAGE EXAMPLES" | ||||
|     echo "==========================================" | ||||
|      | ||||
|     # Create a specialized dataset for semantic search | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE CREATE semantic_search DIM 1536 SCHEMA title:string content:string url:string timestamp:string source:string" \ | ||||
|         "Create dataset for semantic search with rich metadata" | ||||
|      | ||||
|     # Demonstrate batch operations concept | ||||
|     log_info "Batch operations example (would store multiple items):" | ||||
|     echo "  for doc in documents:" | ||||
|     echo "    LANCE STORE semantic_search TEXT \"\$doc_content\" title \"\$title\" url \"\$url\"" | ||||
|      | ||||
|     # Show monitoring commands | ||||
|     log_info "Monitoring and maintenance commands:" | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE LIST" \ | ||||
|         "List all datasets for monitoring" | ||||
|      | ||||
|     # Show dataset statistics | ||||
|     for dataset in documents products images semantic_search; do | ||||
|         execute_command \ | ||||
|             "$REDIS_CLI LANCE INFO $dataset" \ | ||||
|             "Get statistics for $dataset" || true | ||||
|     done | ||||
| } | ||||
|  | ||||
| # Cleanup Operations | ||||
| demo_cleanup() { | ||||
|     echo | ||||
|     echo "==========================================" | ||||
|     echo "         CLEANUP OPERATIONS DEMO" | ||||
|     echo "==========================================" | ||||
|      | ||||
|     log_info "Demonstrating cleanup operations..." | ||||
|      | ||||
|     # Drop individual datasets | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE DROP semantic_search" \ | ||||
|         "Drop semantic_search dataset" | ||||
|      | ||||
|     # List remaining datasets | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE LIST" \ | ||||
|         "List remaining datasets" | ||||
|      | ||||
|     # Ask user if they want to clean up all test data | ||||
|     echo | ||||
|     read -p "Do you want to clean up all test datasets? (y/N): " -n 1 -r | ||||
|     echo | ||||
|     if [[ $REPLY =~ ^[Yy]$ ]]; then | ||||
|         execute_command \ | ||||
|             "$REDIS_CLI LANCE DROP documents" \ | ||||
|             "Drop documents dataset" | ||||
|          | ||||
|         execute_command \ | ||||
|             "$REDIS_CLI LANCE DROP products" \ | ||||
|             "Drop products dataset" | ||||
|          | ||||
|         execute_command \ | ||||
|             "$REDIS_CLI LANCE DROP images" \ | ||||
|             "Drop images dataset" | ||||
|          | ||||
|         execute_command \ | ||||
|             "$REDIS_CLI LANCE LIST" \ | ||||
|             "Verify all datasets are cleaned up" | ||||
|          | ||||
|         log_success "All test datasets cleaned up" | ||||
|     else | ||||
|         log_info "Keeping test datasets for further experimentation" | ||||
|     fi | ||||
| } | ||||
|  | ||||
| # Error Handling Demo | ||||
| demo_error_handling() { | ||||
|     echo | ||||
|     echo "==========================================" | ||||
|     echo "         ERROR HANDLING DEMO" | ||||
|     echo "==========================================" | ||||
|      | ||||
|     log_info "Demonstrating various error conditions..." | ||||
|      | ||||
|     # Try to access non-existent dataset | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE INFO nonexistent_dataset" \ | ||||
|         "Try to get info for non-existent dataset" || true | ||||
|      | ||||
|     # Try to search non-existent dataset | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE SEARCH nonexistent_dataset VECTOR '0.1,0.2' K 5" \ | ||||
|         "Try to search non-existent dataset" || true | ||||
|      | ||||
|     # Try to drop non-existent dataset | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE DROP nonexistent_dataset" \ | ||||
|         "Try to drop non-existent dataset" || true | ||||
|      | ||||
|     # Try invalid vector format | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE SEARCH documents VECTOR 'invalid,vector,format' K 5" \ | ||||
|         "Try search with invalid vector format" || true | ||||
|      | ||||
|     log_info "Error handling demonstration complete" | ||||
| } | ||||
|  | ||||
| # Performance Testing Demo | ||||
| demo_performance_testing() { | ||||
|     echo | ||||
|     echo "==========================================" | ||||
|     echo "         PERFORMANCE TESTING DEMO" | ||||
|     echo "==========================================" | ||||
|      | ||||
|     log_info "Creating performance test dataset..." | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE CREATE perf_test DIM 128 SCHEMA batch_id:string item_id:string" \ | ||||
|         "Create performance test dataset" | ||||
|      | ||||
|     log_info "Performance testing would involve:" | ||||
|     echo "  1. Bulk loading thousands of vectors" | ||||
|     echo "  2. Creating indexes with different parameters" | ||||
|     echo "  3. Measuring search latency with various K values" | ||||
|     echo "  4. Testing different NPROBES settings" | ||||
|     echo "  5. Monitoring memory usage" | ||||
|      | ||||
|     log_info "Example performance test commands:" | ||||
|     echo "  # Test search speed with different parameters" | ||||
|     echo "  time redis-cli LANCE SEARCH.TEXT perf_test 'query' K 10" | ||||
|     echo "  time redis-cli LANCE SEARCH.TEXT perf_test 'query' K 10 NPROBES 50" | ||||
|     echo "  time redis-cli LANCE SEARCH.TEXT perf_test 'query' K 100 NPROBES 100" | ||||
|      | ||||
|     # Clean up performance test dataset | ||||
|     execute_command \ | ||||
|         "$REDIS_CLI LANCE DROP perf_test" \ | ||||
|         "Clean up performance test dataset" | ||||
| } | ||||
|  | ||||
| # Main execution | ||||
| main() { | ||||
|     echo "==========================================" | ||||
|     echo "    LANCE VECTOR DATABASE DEMO SCRIPT" | ||||
|     echo "==========================================" | ||||
|     echo | ||||
|     echo "This script demonstrates all Lance vector database operations." | ||||
|     echo "Note: Some operations will fail without a running embedding service." | ||||
|     echo "This is expected and demonstrates error handling." | ||||
|     echo | ||||
|      | ||||
|     # Check prerequisites | ||||
|     check_herodb | ||||
|      | ||||
|     # Setup | ||||
|     setup_embedding_service | ||||
|      | ||||
|     # Run demos | ||||
|     demo_dataset_management | ||||
|     demo_embedding_operations | ||||
|     demo_data_storage | ||||
|     demo_search_operations | ||||
|     demo_index_management | ||||
|     demo_advanced_usage | ||||
|     demo_error_handling | ||||
|     demo_performance_testing | ||||
|      | ||||
|     # Cleanup | ||||
|     demo_cleanup | ||||
|      | ||||
|     echo | ||||
|     echo "==========================================" | ||||
|     echo "         DEMO COMPLETE" | ||||
|     echo "==========================================" | ||||
|     echo | ||||
|     log_success "Lance vector database demo completed successfully!" | ||||
|     echo | ||||
|     echo "Next steps:" | ||||
|     echo "1. Set up a real embedding service (OpenAI, Hugging Face, etc.)" | ||||
|     echo "2. Update the embedding service URL configuration" | ||||
|     echo "3. Try storing and searching real data" | ||||
|     echo "4. Experiment with different vector dimensions and index parameters" | ||||
|     echo "5. Build your AI-powered application!" | ||||
|     echo | ||||
|     echo "For more information, see docs/lance_vector_db.md" | ||||
| } | ||||
|  | ||||
| # Run the demo | ||||
| main "$@" | ||||
							
								
								
									
										528
									
								
								src/cmd.rs
									
									
									
									
									
								
							
							
						
						
									
										528
									
								
								src/cmd.rs
									
									
									
									
									
								
							| @@ -1,6 +1,8 @@ | ||||
| use crate::{error::DBError, protocol::Protocol, server::Server}; | ||||
| use tokio::time::{timeout, Duration}; | ||||
| use futures::future::select_all; | ||||
| use std::sync::Arc; | ||||
| use base64::Engine; | ||||
|  | ||||
| #[derive(Debug, Clone)] | ||||
| pub enum Cmd { | ||||
| @@ -84,6 +86,49 @@ pub enum Cmd { | ||||
|     AgeSignName(String, String),           // name, message | ||||
|     AgeVerifyName(String, String, String), // name, message, signature_b64 | ||||
|     AgeList, | ||||
|  | ||||
|     // Lance vector database commands | ||||
|     LanceCreate { | ||||
|         dataset: String, | ||||
|         dim: usize, | ||||
|         schema: Vec<(String, String)>, // field_name, field_type pairs | ||||
|     }, | ||||
|     LanceStore { | ||||
|         dataset: String, | ||||
|         text: Option<String>, | ||||
|         image_base64: Option<String>, | ||||
|         metadata: std::collections::HashMap<String, String>, | ||||
|     }, | ||||
|     LanceSearch { | ||||
|         dataset: String, | ||||
|         vector: Vec<f32>, | ||||
|         k: usize, | ||||
|         nprobes: Option<usize>, | ||||
|         refine_factor: Option<usize>, | ||||
|     }, | ||||
|     LanceSearchText { | ||||
|         dataset: String, | ||||
|         query_text: String, | ||||
|         k: usize, | ||||
|         nprobes: Option<usize>, | ||||
|         refine_factor: Option<usize>, | ||||
|     }, | ||||
|     LanceEmbedText { | ||||
|         texts: Vec<String>, | ||||
|     }, | ||||
|     LanceCreateIndex { | ||||
|         dataset: String, | ||||
|         index_type: String, | ||||
|         num_partitions: Option<usize>, | ||||
|         num_sub_vectors: Option<usize>, | ||||
|     }, | ||||
|     LanceList, | ||||
|     LanceDrop { | ||||
|         dataset: String, | ||||
|     }, | ||||
|     LanceInfo { | ||||
|         dataset: String, | ||||
|     }, | ||||
| } | ||||
|  | ||||
| impl Cmd { | ||||
| @@ -616,6 +661,237 @@ impl Cmd { | ||||
|                                 _ => return Err(DBError(format!("unsupported AGE subcommand {:?}", cmd))), | ||||
|                             } | ||||
|                         } | ||||
|                         "lance" => { | ||||
|                             if cmd.len() < 2 { | ||||
|                                 return Err(DBError("wrong number of arguments for LANCE".to_string())); | ||||
|                             } | ||||
|                             match cmd[1].to_lowercase().as_str() { | ||||
|                                 "create" => { | ||||
|                                     if cmd.len() < 4 { | ||||
|                                         return Err(DBError("LANCE CREATE <dataset> DIM <dimension> [SCHEMA field:type ...]".to_string())); | ||||
|                                     } | ||||
|                                     let dataset = cmd[2].clone(); | ||||
|                                      | ||||
|                                     // Parse DIM parameter | ||||
|                                     if cmd[3].to_lowercase() != "dim" { | ||||
|                                         return Err(DBError("Expected DIM after dataset name".to_string())); | ||||
|                                     } | ||||
|                                     if cmd.len() < 5 { | ||||
|                                         return Err(DBError("Missing dimension value".to_string())); | ||||
|                                     } | ||||
|                                     let dim = cmd[4].parse::<usize>().map_err(|_| DBError("Invalid dimension value".to_string()))?; | ||||
|                                      | ||||
|                                     // Parse optional SCHEMA | ||||
|                                     let mut schema = Vec::new(); | ||||
|                                     let mut i = 5; | ||||
|                                     if i < cmd.len() && cmd[i].to_lowercase() == "schema" { | ||||
|                                         i += 1; | ||||
|                                         while i < cmd.len() { | ||||
|                                             let field_spec = &cmd[i]; | ||||
|                                             let parts: Vec<&str> = field_spec.split(':').collect(); | ||||
|                                             if parts.len() != 2 { | ||||
|                                                 return Err(DBError("Schema fields must be in format field:type".to_string())); | ||||
|                                             } | ||||
|                                             schema.push((parts[0].to_string(), parts[1].to_string())); | ||||
|                                             i += 1; | ||||
|                                         } | ||||
|                                     } | ||||
|                                      | ||||
|                                     Cmd::LanceCreate { dataset, dim, schema } | ||||
|                                 } | ||||
|                                 "store" => { | ||||
|                                     if cmd.len() < 3 { | ||||
|                                         return Err(DBError("LANCE STORE <dataset> [TEXT <text>] [IMAGE <base64>] [metadata...]".to_string())); | ||||
|                                     } | ||||
|                                     let dataset = cmd[2].clone(); | ||||
|                                     let mut text = None; | ||||
|                                     let mut image_base64 = None; | ||||
|                                     let mut metadata = std::collections::HashMap::new(); | ||||
|                                      | ||||
|                                     let mut i = 3; | ||||
|                                     while i < cmd.len() { | ||||
|                                         match cmd[i].to_lowercase().as_str() { | ||||
|                                             "text" => { | ||||
|                                                 if i + 1 >= cmd.len() { | ||||
|                                                     return Err(DBError("TEXT requires a value".to_string())); | ||||
|                                                 } | ||||
|                                                 text = Some(cmd[i + 1].clone()); | ||||
|                                                 i += 2; | ||||
|                                             } | ||||
|                                             "image" => { | ||||
|                                                 if i + 1 >= cmd.len() { | ||||
|                                                     return Err(DBError("IMAGE requires a base64 value".to_string())); | ||||
|                                                 } | ||||
|                                                 image_base64 = Some(cmd[i + 1].clone()); | ||||
|                                                 i += 2; | ||||
|                                             } | ||||
|                                             _ => { | ||||
|                                                 // Parse as metadata key:value | ||||
|                                                 if i + 1 >= cmd.len() { | ||||
|                                                     return Err(DBError("Metadata requires key value pairs".to_string())); | ||||
|                                                 } | ||||
|                                                 metadata.insert(cmd[i].clone(), cmd[i + 1].clone()); | ||||
|                                                 i += 2; | ||||
|                                             } | ||||
|                                         } | ||||
|                                     } | ||||
|                                      | ||||
|                                     Cmd::LanceStore { dataset, text, image_base64, metadata } | ||||
|                                 } | ||||
|                                 "search" => { | ||||
|                                     if cmd.len() < 5 { | ||||
|                                         return Err(DBError("LANCE SEARCH <dataset> VECTOR <vector> K <k> [NPROBES <n>] [REFINE <r>]".to_string())); | ||||
|                                     } | ||||
|                                     let dataset = cmd[2].clone(); | ||||
|                                      | ||||
|                                     if cmd[3].to_lowercase() != "vector" { | ||||
|                                         return Err(DBError("Expected VECTOR after dataset name".to_string())); | ||||
|                                     } | ||||
|                                      | ||||
|                                     // Parse vector - expect comma-separated floats in brackets or just comma-separated | ||||
|                                     let vector_str = &cmd[4]; | ||||
|                                     let vector_str = vector_str.trim_start_matches('[').trim_end_matches(']'); | ||||
|                                     let vector: Result<Vec<f32>, _> = vector_str | ||||
|                                         .split(',') | ||||
|                                         .map(|s| s.trim().parse::<f32>()) | ||||
|                                         .collect(); | ||||
|                                     let vector = vector.map_err(|_| DBError("Invalid vector format".to_string()))?; | ||||
|                                      | ||||
|                                     if cmd.len() < 7 || cmd[5].to_lowercase() != "k" { | ||||
|                                         return Err(DBError("Expected K after vector".to_string())); | ||||
|                                     } | ||||
|                                     let k = cmd[6].parse::<usize>().map_err(|_| DBError("Invalid K value".to_string()))?; | ||||
|                                      | ||||
|                                     let mut nprobes = None; | ||||
|                                     let mut refine_factor = None; | ||||
|                                     let mut i = 7; | ||||
|                                     while i < cmd.len() { | ||||
|                                         match cmd[i].to_lowercase().as_str() { | ||||
|                                             "nprobes" => { | ||||
|                                                 if i + 1 >= cmd.len() { | ||||
|                                                     return Err(DBError("NPROBES requires a value".to_string())); | ||||
|                                                 } | ||||
|                                                 nprobes = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid NPROBES value".to_string()))?); | ||||
|                                                 i += 2; | ||||
|                                             } | ||||
|                                             "refine" => { | ||||
|                                                 if i + 1 >= cmd.len() { | ||||
|                                                     return Err(DBError("REFINE requires a value".to_string())); | ||||
|                                                 } | ||||
|                                                 refine_factor = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid REFINE value".to_string()))?); | ||||
|                                                 i += 2; | ||||
|                                             } | ||||
|                                             _ => { | ||||
|                                                 return Err(DBError(format!("Unknown parameter: {}", cmd[i]))); | ||||
|                                             } | ||||
|                                         } | ||||
|                                     } | ||||
|                                      | ||||
|                                     Cmd::LanceSearch { dataset, vector, k, nprobes, refine_factor } | ||||
|                                 } | ||||
|                                 "search.text" => { | ||||
|                                     if cmd.len() < 6 { | ||||
|                                         return Err(DBError("LANCE SEARCH.TEXT <dataset> <query_text> K <k> [NPROBES <n>] [REFINE <r>]".to_string())); | ||||
|                                     } | ||||
|                                     let dataset = cmd[2].clone(); | ||||
|                                     let query_text = cmd[3].clone(); | ||||
|                                      | ||||
|                                     if cmd[4].to_lowercase() != "k" { | ||||
|                                         return Err(DBError("Expected K after query text".to_string())); | ||||
|                                     } | ||||
|                                     let k = cmd[5].parse::<usize>().map_err(|_| DBError("Invalid K value".to_string()))?; | ||||
|                                      | ||||
|                                     let mut nprobes = None; | ||||
|                                     let mut refine_factor = None; | ||||
|                                     let mut i = 6; | ||||
|                                     while i < cmd.len() { | ||||
|                                         match cmd[i].to_lowercase().as_str() { | ||||
|                                             "nprobes" => { | ||||
|                                                 if i + 1 >= cmd.len() { | ||||
|                                                     return Err(DBError("NPROBES requires a value".to_string())); | ||||
|                                                 } | ||||
|                                                 nprobes = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid NPROBES value".to_string()))?); | ||||
|                                                 i += 2; | ||||
|                                             } | ||||
|                                             "refine" => { | ||||
|                                                 if i + 1 >= cmd.len() { | ||||
|                                                     return Err(DBError("REFINE requires a value".to_string())); | ||||
|                                                 } | ||||
|                                                 refine_factor = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid REFINE value".to_string()))?); | ||||
|                                                 i += 2; | ||||
|                                             } | ||||
|                                             _ => { | ||||
|                                                 return Err(DBError(format!("Unknown parameter: {}", cmd[i]))); | ||||
|                                             } | ||||
|                                         } | ||||
|                                     } | ||||
|                                      | ||||
|                                     Cmd::LanceSearchText { dataset, query_text, k, nprobes, refine_factor } | ||||
|                                 } | ||||
|                                 "embed.text" => { | ||||
|                                     if cmd.len() < 3 { | ||||
|                                         return Err(DBError("LANCE EMBED.TEXT <text1> [text2] ...".to_string())); | ||||
|                                     } | ||||
|                                     let texts = cmd[2..].to_vec(); | ||||
|                                     Cmd::LanceEmbedText { texts } | ||||
|                                 } | ||||
|                                 "create.index" => { | ||||
|                                     if cmd.len() < 5 { | ||||
|                                         return Err(DBError("LANCE CREATE.INDEX <dataset> <index_type> [PARTITIONS <n>] [SUBVECTORS <n>]".to_string())); | ||||
|                                     } | ||||
|                                     let dataset = cmd[2].clone(); | ||||
|                                     let index_type = cmd[3].clone(); | ||||
|                                      | ||||
|                                     let mut num_partitions = None; | ||||
|                                     let mut num_sub_vectors = None; | ||||
|                                     let mut i = 4; | ||||
|                                     while i < cmd.len() { | ||||
|                                         match cmd[i].to_lowercase().as_str() { | ||||
|                                             "partitions" => { | ||||
|                                                 if i + 1 >= cmd.len() { | ||||
|                                                     return Err(DBError("PARTITIONS requires a value".to_string())); | ||||
|                                                 } | ||||
|                                                 num_partitions = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid PARTITIONS value".to_string()))?); | ||||
|                                                 i += 2; | ||||
|                                             } | ||||
|                                             "subvectors" => { | ||||
|                                                 if i + 1 >= cmd.len() { | ||||
|                                                     return Err(DBError("SUBVECTORS requires a value".to_string())); | ||||
|                                                 } | ||||
|                                                 num_sub_vectors = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid SUBVECTORS value".to_string()))?); | ||||
|                                                 i += 2; | ||||
|                                             } | ||||
|                                             _ => { | ||||
|                                                 return Err(DBError(format!("Unknown parameter: {}", cmd[i]))); | ||||
|                                             } | ||||
|                                         } | ||||
|                                     } | ||||
|                                      | ||||
|                                     Cmd::LanceCreateIndex { dataset, index_type, num_partitions, num_sub_vectors } | ||||
|                                 } | ||||
|                                 "list" => { | ||||
|                                     if cmd.len() != 2 { | ||||
|                                         return Err(DBError("LANCE LIST takes no arguments".to_string())); | ||||
|                                     } | ||||
|                                     Cmd::LanceList | ||||
|                                 } | ||||
|                                 "drop" => { | ||||
|                                     if cmd.len() != 3 { | ||||
|                                         return Err(DBError("LANCE DROP <dataset>".to_string())); | ||||
|                                     } | ||||
|                                     let dataset = cmd[2].clone(); | ||||
|                                     Cmd::LanceDrop { dataset } | ||||
|                                 } | ||||
|                                 "info" => { | ||||
|                                     if cmd.len() != 3 { | ||||
|                                         return Err(DBError("LANCE INFO <dataset>".to_string())); | ||||
|                                     } | ||||
|                                     let dataset = cmd[2].clone(); | ||||
|                                     Cmd::LanceInfo { dataset } | ||||
|                                 } | ||||
|                                 _ => return Err(DBError(format!("unsupported LANCE subcommand {:?}", cmd))), | ||||
|                             } | ||||
|                         } | ||||
|                         _ => Cmd::Unknow(cmd[0].clone()), | ||||
|                     }, | ||||
|                     protocol, | ||||
| @@ -730,6 +1006,18 @@ impl Cmd { | ||||
|             Cmd::AgeSignName(name, message) => Ok(crate::age::cmd_age_sign_name(server, &name, &message).await), | ||||
|             Cmd::AgeVerifyName(name, message, sig_b64) => Ok(crate::age::cmd_age_verify_name(server, &name, &message, &sig_b64).await), | ||||
|             Cmd::AgeList => Ok(crate::age::cmd_age_list(server).await), | ||||
|              | ||||
|             // Lance vector database commands | ||||
|             Cmd::LanceCreate { dataset, dim, schema } => lance_create_cmd(server, &dataset, dim, &schema).await, | ||||
|             Cmd::LanceStore { dataset, text, image_base64, metadata } => lance_store_cmd(server, &dataset, text.as_deref(), image_base64.as_deref(), &metadata).await, | ||||
|             Cmd::LanceSearch { dataset, vector, k, nprobes, refine_factor } => lance_search_cmd(server, &dataset, &vector, k, nprobes, refine_factor).await, | ||||
|             Cmd::LanceSearchText { dataset, query_text, k, nprobes, refine_factor } => lance_search_text_cmd(server, &dataset, &query_text, k, nprobes, refine_factor).await, | ||||
|             Cmd::LanceEmbedText { texts } => lance_embed_text_cmd(server, &texts).await, | ||||
|             Cmd::LanceCreateIndex { dataset, index_type, num_partitions, num_sub_vectors } => lance_create_index_cmd(server, &dataset, &index_type, num_partitions, num_sub_vectors).await, | ||||
|             Cmd::LanceList => lance_list_cmd(server).await, | ||||
|             Cmd::LanceDrop { dataset } => lance_drop_cmd(server, &dataset).await, | ||||
|             Cmd::LanceInfo { dataset } => lance_info_cmd(server, &dataset).await, | ||||
|              | ||||
|             Cmd::Unknow(s) => Ok(Protocol::err(&format!("ERR unknown command `{}`", s))), | ||||
|         } | ||||
|     } | ||||
| @@ -1513,3 +1801,243 @@ fn command_cmd(args: &[String]) -> Result<Protocol, DBError> { | ||||
|         _ => Ok(Protocol::Array(vec![])), | ||||
|     } | ||||
| } | ||||
|  | ||||
| // Helper function to create Arrow schema from field specifications | ||||
| fn create_schema_from_fields(dim: usize, fields: &[(String, String)]) -> arrow::datatypes::Schema { | ||||
|     let mut schema_fields = Vec::new(); | ||||
|      | ||||
|     // Always add the vector field first | ||||
|     let vector_field = arrow::datatypes::Field::new( | ||||
|         "vector", | ||||
|         arrow::datatypes::DataType::FixedSizeList( | ||||
|             Arc::new(arrow::datatypes::Field::new("item", arrow::datatypes::DataType::Float32, true)), | ||||
|             dim as i32 | ||||
|         ), | ||||
|         false | ||||
|     ); | ||||
|     schema_fields.push(vector_field); | ||||
|      | ||||
|     // Add custom fields | ||||
|     for (name, field_type) in fields { | ||||
|         let data_type = match field_type.to_lowercase().as_str() { | ||||
|             "string" | "text" => arrow::datatypes::DataType::Utf8, | ||||
|             "int" | "integer" => arrow::datatypes::DataType::Int64, | ||||
|             "float" => arrow::datatypes::DataType::Float64, | ||||
|             "bool" | "boolean" => arrow::datatypes::DataType::Boolean, | ||||
|             _ => arrow::datatypes::DataType::Utf8, // Default to string | ||||
|         }; | ||||
|         schema_fields.push(arrow::datatypes::Field::new(name, data_type, true)); | ||||
|     } | ||||
|      | ||||
|     arrow::datatypes::Schema::new(schema_fields) | ||||
| } | ||||
|  | ||||
| // Lance vector database command implementations | ||||
| async fn lance_create_cmd( | ||||
|     server: &Server, | ||||
|     dataset: &str, | ||||
|     dim: usize, | ||||
|     schema: &[(String, String)], | ||||
| ) -> Result<Protocol, DBError> { | ||||
|     match server.lance_store() { | ||||
|         Ok(lance_store) => { | ||||
|             match lance_store.create_dataset(dataset, create_schema_from_fields(dim, schema)).await { | ||||
|                 Ok(_) => Ok(Protocol::SimpleString("OK".to_string())), | ||||
|                 Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))), | ||||
|             } | ||||
|         } | ||||
|         Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))), | ||||
|     } | ||||
| } | ||||
|  | ||||
| async fn lance_store_cmd( | ||||
|     server: &Server, | ||||
|     dataset: &str, | ||||
|     text: Option<&str>, | ||||
|     image_base64: Option<&str>, | ||||
|     metadata: &std::collections::HashMap<String, String>, | ||||
| ) -> Result<Protocol, DBError> { | ||||
|     match server.lance_store() { | ||||
|         Ok(lance_store) => { | ||||
|             match lance_store.store_multimodal(server, dataset, text.map(|s| s.to_string()), | ||||
|                 image_base64.and_then(|s| base64::engine::general_purpose::STANDARD.decode(s).ok()), | ||||
|                 metadata.clone()).await { | ||||
|                 Ok(id) => Ok(Protocol::BulkString(id)), | ||||
|                 Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))), | ||||
|             } | ||||
|         } | ||||
|         Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))), | ||||
|     } | ||||
| } | ||||
|  | ||||
| async fn lance_search_cmd( | ||||
|     server: &Server, | ||||
|     dataset: &str, | ||||
|     vector: &[f32], | ||||
|     k: usize, | ||||
|     nprobes: Option<usize>, | ||||
|     refine_factor: Option<usize>, | ||||
| ) -> Result<Protocol, DBError> { | ||||
|     match server.lance_store() { | ||||
|         Ok(lance_store) => { | ||||
|             match lance_store.search_vectors(dataset, vector.to_vec(), k, nprobes, refine_factor).await { | ||||
|                 Ok(results) => { | ||||
|                     let mut response = Vec::new(); | ||||
|                     for (distance, metadata) in results { | ||||
|                         let mut item = Vec::new(); | ||||
|                         item.push(Protocol::BulkString("distance".to_string())); | ||||
|                         item.push(Protocol::BulkString(distance.to_string())); | ||||
|                         for (key, value) in metadata { | ||||
|                             item.push(Protocol::BulkString(key)); | ||||
|                             item.push(Protocol::BulkString(value)); | ||||
|                         } | ||||
|                         response.push(Protocol::Array(item)); | ||||
|                     } | ||||
|                     Ok(Protocol::Array(response)) | ||||
|                 } | ||||
|                 Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))), | ||||
|             } | ||||
|         } | ||||
|         Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))), | ||||
|     } | ||||
| } | ||||
|  | ||||
| async fn lance_search_text_cmd( | ||||
|     server: &Server, | ||||
|     dataset: &str, | ||||
|     query_text: &str, | ||||
|     k: usize, | ||||
|     nprobes: Option<usize>, | ||||
|     refine_factor: Option<usize>, | ||||
| ) -> Result<Protocol, DBError> { | ||||
|     match server.lance_store() { | ||||
|         Ok(lance_store) => { | ||||
|             match lance_store.search_with_text(server, dataset, query_text.to_string(), k, nprobes, refine_factor).await { | ||||
|                 Ok(results) => { | ||||
|                     let mut response = Vec::new(); | ||||
|                     for (distance, metadata) in results { | ||||
|                         let mut item = Vec::new(); | ||||
|                         item.push(Protocol::BulkString("distance".to_string())); | ||||
|                         item.push(Protocol::BulkString(distance.to_string())); | ||||
|                         for (key, value) in metadata { | ||||
|                             item.push(Protocol::BulkString(key)); | ||||
|                             item.push(Protocol::BulkString(value)); | ||||
|                         } | ||||
|                         response.push(Protocol::Array(item)); | ||||
|                     } | ||||
|                     Ok(Protocol::Array(response)) | ||||
|                 } | ||||
|                 Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))), | ||||
|             } | ||||
|         } | ||||
|         Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))), | ||||
|     } | ||||
| } | ||||
|  | ||||
| // Helper function to sanitize error messages for Redis protocol | ||||
| fn sanitize_error_message(msg: &str) -> String { | ||||
|     // Remove newlines, carriage returns, and limit length | ||||
|     let sanitized = msg | ||||
|         .replace('\n', " ") | ||||
|         .replace('\r', " ") | ||||
|         .replace('\t', " "); | ||||
|      | ||||
|     // Limit to 200 characters to avoid overly long error messages | ||||
|     if sanitized.len() > 200 { | ||||
|         format!("{}...", &sanitized[..197]) | ||||
|     } else { | ||||
|         sanitized | ||||
|     } | ||||
| } | ||||
|  | ||||
| async fn lance_embed_text_cmd( | ||||
|     server: &Server, | ||||
|     texts: &[String], | ||||
| ) -> Result<Protocol, DBError> { | ||||
|     match server.lance_store() { | ||||
|         Ok(lance_store) => { | ||||
|             match lance_store.embed_text(server, texts.to_vec()).await { | ||||
|                 Ok(embeddings) => { | ||||
|                     let mut response = Vec::new(); | ||||
|                     for embedding in embeddings { | ||||
|                         let vector_strings: Vec<Protocol> = embedding | ||||
|                             .iter() | ||||
|                             .map(|f| Protocol::BulkString(f.to_string())) | ||||
|                             .collect(); | ||||
|                         response.push(Protocol::Array(vector_strings)); | ||||
|                     } | ||||
|                     Ok(Protocol::Array(response)) | ||||
|                 } | ||||
|                 Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))), | ||||
|             } | ||||
|         } | ||||
|         Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))), | ||||
|     } | ||||
| } | ||||
|  | ||||
| async fn lance_create_index_cmd( | ||||
|     server: &Server, | ||||
|     dataset: &str, | ||||
|     index_type: &str, | ||||
|     num_partitions: Option<usize>, | ||||
|     num_sub_vectors: Option<usize>, | ||||
| ) -> Result<Protocol, DBError> { | ||||
|     match server.lance_store() { | ||||
|         Ok(lance_store) => { | ||||
|             match lance_store.create_index(dataset, index_type, num_partitions, num_sub_vectors).await { | ||||
|                 Ok(_) => Ok(Protocol::SimpleString("OK".to_string())), | ||||
|                 Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))), | ||||
|             } | ||||
|         } | ||||
|         Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))), | ||||
|     } | ||||
| } | ||||
|  | ||||
| async fn lance_list_cmd(server: &Server) -> Result<Protocol, DBError> { | ||||
|     match server.lance_store() { | ||||
|         Ok(lance_store) => { | ||||
|             match lance_store.list_datasets().await { | ||||
|                 Ok(datasets) => { | ||||
|                     let response: Vec<Protocol> = datasets | ||||
|                         .into_iter() | ||||
|                         .map(Protocol::BulkString) | ||||
|                         .collect(); | ||||
|                     Ok(Protocol::Array(response)) | ||||
|                 } | ||||
|                 Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))), | ||||
|             } | ||||
|         } | ||||
|         Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))), | ||||
|     } | ||||
| } | ||||
|  | ||||
| async fn lance_drop_cmd(server: &Server, dataset: &str) -> Result<Protocol, DBError> { | ||||
|     match server.lance_store() { | ||||
|         Ok(lance_store) => { | ||||
|             match lance_store.drop_dataset(dataset).await { | ||||
|                 Ok(_) => Ok(Protocol::SimpleString("OK".to_string())), | ||||
|                 Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))), | ||||
|             } | ||||
|         } | ||||
|         Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))), | ||||
|     } | ||||
| } | ||||
|  | ||||
| async fn lance_info_cmd(server: &Server, dataset: &str) -> Result<Protocol, DBError> { | ||||
|     match server.lance_store() { | ||||
|         Ok(lance_store) => { | ||||
|             match lance_store.get_dataset_info(dataset).await { | ||||
|                 Ok(info) => { | ||||
|                     let mut response = Vec::new(); | ||||
|                     for (key, value) in info { | ||||
|                         response.push(Protocol::BulkString(key)); | ||||
|                         response.push(Protocol::BulkString(value)); | ||||
|                     } | ||||
|                     Ok(Protocol::Array(response)) | ||||
|                 } | ||||
|                 Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))), | ||||
|             } | ||||
|         } | ||||
|         Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))), | ||||
|     } | ||||
| } | ||||
|   | ||||
							
								
								
									
										43
									
								
								src/error.rs
									
									
									
									
									
								
							
							
						
						
									
										43
									
								
								src/error.rs
									
									
									
									
									
								
							| @@ -9,6 +9,12 @@ use bincode; | ||||
| #[derive(Debug)] | ||||
| pub struct DBError(pub String); | ||||
|  | ||||
| impl std::fmt::Display for DBError { | ||||
|     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||||
|         write!(f, "{}", self.0) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl From<std::io::Error> for DBError { | ||||
|     fn from(item: std::io::Error) -> Self { | ||||
|         DBError(item.to_string().clone()) | ||||
| @@ -92,3 +98,40 @@ impl From<chacha20poly1305::Error> for DBError { | ||||
|         DBError(item.to_string()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| // Lance and related dependencies error handling | ||||
| impl From<lance::Error> for DBError { | ||||
|     fn from(item: lance::Error) -> Self { | ||||
|         DBError(item.to_string()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl From<arrow::error::ArrowError> for DBError { | ||||
|     fn from(item: arrow::error::ArrowError) -> Self { | ||||
|         DBError(item.to_string()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl From<reqwest::Error> for DBError { | ||||
|     fn from(item: reqwest::Error) -> Self { | ||||
|         DBError(item.to_string()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl From<image::ImageError> for DBError { | ||||
|     fn from(item: image::ImageError) -> Self { | ||||
|         DBError(item.to_string()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl From<uuid::Error> for DBError { | ||||
|     fn from(item: uuid::Error) -> Self { | ||||
|         DBError(item.to_string()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl From<base64::DecodeError> for DBError { | ||||
|     fn from(item: base64::DecodeError) -> Self { | ||||
|         DBError(item.to_string()) | ||||
|     } | ||||
| } | ||||
|   | ||||
							
								
								
									
										609
									
								
								src/lance_store.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										609
									
								
								src/lance_store.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,609 @@ | ||||
| use std::collections::HashMap; | ||||
| use std::path::PathBuf; | ||||
| use std::sync::Arc; | ||||
| use tokio::sync::RwLock; | ||||
|  | ||||
| use arrow::array::{Float32Array, StringArray, ArrayRef, FixedSizeListArray, Array}; | ||||
| use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; | ||||
| use arrow::record_batch::{RecordBatch, RecordBatchReader}; | ||||
| use arrow::error::ArrowError; | ||||
| use lance::dataset::{Dataset, WriteParams, WriteMode}; | ||||
| use lance::index::vector::VectorIndexParams; | ||||
| use lance_index::vector::pq::PQBuildParams; | ||||
| use lance_index::vector::ivf::IvfBuildParams; | ||||
| use lance_index::DatasetIndexExt; | ||||
| use lance_linalg::distance::MetricType; | ||||
| use futures::TryStreamExt; | ||||
| use base64::Engine; | ||||
|  | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use crate::error::DBError; | ||||
|  | ||||
| // Simple RecordBatchReader implementation for Vec<RecordBatch> | ||||
| struct VecRecordBatchReader { | ||||
|     batches: std::vec::IntoIter<Result<RecordBatch, ArrowError>>, | ||||
| } | ||||
|  | ||||
| impl VecRecordBatchReader { | ||||
|     fn new(batches: Vec<RecordBatch>) -> Self { | ||||
|         let result_batches = batches.into_iter().map(Ok).collect::<Vec<_>>(); | ||||
|         Self { | ||||
|             batches: result_batches.into_iter(), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Iterator for VecRecordBatchReader { | ||||
|     type Item = Result<RecordBatch, ArrowError>; | ||||
|  | ||||
|     fn next(&mut self) -> Option<Self::Item> { | ||||
|         self.batches.next() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl RecordBatchReader for VecRecordBatchReader { | ||||
|     fn schema(&self) -> SchemaRef { | ||||
|         // This is a simplified implementation - in practice you'd want to store the schema | ||||
|         Arc::new(Schema::empty()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| struct EmbeddingRequest { | ||||
|     texts: Option<Vec<String>>, | ||||
|     images: Option<Vec<String>>, // base64 encoded | ||||
|     model: Option<String>, | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| struct EmbeddingResponse { | ||||
|     embeddings: Vec<Vec<f32>>, | ||||
|     model: String, | ||||
|     usage: Option<HashMap<String, u32>>, | ||||
| } | ||||
|  | ||||
| // Ollama-specific request/response structures | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| struct OllamaEmbeddingRequest { | ||||
|     model: String, | ||||
|     prompt: String, | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| struct OllamaEmbeddingResponse { | ||||
|     embedding: Vec<f32>, | ||||
| } | ||||
|  | ||||
| pub struct LanceStore { | ||||
|     datasets: Arc<RwLock<HashMap<String, Arc<Dataset>>>>, | ||||
|     data_dir: PathBuf, | ||||
|     http_client: reqwest::Client, | ||||
| } | ||||
|  | ||||
| impl LanceStore { | ||||
|     pub async fn new(data_dir: PathBuf) -> Result<Self, DBError> { | ||||
|         // Create data directory if it doesn't exist | ||||
|         std::fs::create_dir_all(&data_dir) | ||||
|             .map_err(|e| DBError(format!("Failed to create Lance data directory: {}", e)))?; | ||||
|          | ||||
|         let http_client = reqwest::Client::builder() | ||||
|             .timeout(std::time::Duration::from_secs(30)) | ||||
|             .build() | ||||
|             .map_err(|e| DBError(format!("Failed to create HTTP client: {}", e)))?; | ||||
|          | ||||
|         Ok(Self { | ||||
|             datasets: Arc::new(RwLock::new(HashMap::new())), | ||||
|             data_dir, | ||||
|             http_client, | ||||
|         }) | ||||
|     } | ||||
|      | ||||
|     /// Get embedding service URL from Redis config, default to local Ollama | ||||
|     async fn get_embedding_url(&self, server: &crate::server::Server) -> Result<String, DBError> { | ||||
|         // Get the embedding URL from Redis config directly from storage | ||||
|         let storage = server.current_storage()?; | ||||
|         match storage.hget("config:core:aiembed", "url")? { | ||||
|             Some(url) => Ok(url), | ||||
|             None => Ok("http://localhost:11434".to_string()), // Default to local Ollama | ||||
|         } | ||||
|     } | ||||
|      | ||||
|     /// Check if we're using Ollama (default) or custom embedding service | ||||
|     async fn is_ollama_service(&self, server: &crate::server::Server) -> Result<bool, DBError> { | ||||
|         let url = self.get_embedding_url(server).await?; | ||||
|         Ok(url.contains("localhost:11434") || url.contains("127.0.0.1:11434")) | ||||
|     } | ||||
|      | ||||
|     /// Call external embedding service (Ollama or custom) | ||||
|     async fn call_embedding_service( | ||||
|         &self, | ||||
|         server: &crate::server::Server, | ||||
|         texts: Option<Vec<String>>, | ||||
|         images: Option<Vec<String>>, | ||||
|     ) -> Result<Vec<Vec<f32>>, DBError> { | ||||
|         let base_url = self.get_embedding_url(server).await?; | ||||
|         let is_ollama = self.is_ollama_service(server).await?; | ||||
|          | ||||
|         if is_ollama { | ||||
|             // Use Ollama API format | ||||
|             if let Some(texts) = texts { | ||||
|                 let mut embeddings = Vec::new(); | ||||
|                 for text in texts { | ||||
|                     let url = format!("{}/api/embeddings", base_url); | ||||
|                     let request = OllamaEmbeddingRequest { | ||||
|                         model: "nomic-embed-text".to_string(), | ||||
|                         prompt: text, | ||||
|                     }; | ||||
|                      | ||||
|                     let response = self.http_client | ||||
|                         .post(&url) | ||||
|                         .json(&request) | ||||
|                         .send() | ||||
|                         .await | ||||
|                         .map_err(|e| DBError(format!("Failed to call Ollama embedding service: {}", e)))?; | ||||
|                      | ||||
|                     if !response.status().is_success() { | ||||
|                         let status = response.status(); | ||||
|                         let error_text = response.text().await.unwrap_or_default(); | ||||
|                         return Err(DBError(format!( | ||||
|                             "Ollama embedding service returned error {}: {}", | ||||
|                             status, error_text | ||||
|                         ))); | ||||
|                     } | ||||
|                      | ||||
|                     let ollama_response: OllamaEmbeddingResponse = response | ||||
|                         .json() | ||||
|                         .await | ||||
|                         .map_err(|e| DBError(format!("Failed to parse Ollama embedding response: {}", e)))?; | ||||
|                      | ||||
|                     embeddings.push(ollama_response.embedding); | ||||
|                 } | ||||
|                 Ok(embeddings) | ||||
|             } else if let Some(_images) = images { | ||||
|                 // Ollama doesn't support image embeddings with this API yet | ||||
|                 Err(DBError("Image embeddings not supported with Ollama. Please configure a custom embedding service.".to_string())) | ||||
|             } else { | ||||
|                 Err(DBError("No text or images provided for embedding".to_string())) | ||||
|             } | ||||
|         } else { | ||||
|             // Use custom embedding service API format | ||||
|             let request = EmbeddingRequest { | ||||
|                 texts, | ||||
|                 images, | ||||
|                 model: None, // Let the service use its default | ||||
|             }; | ||||
|              | ||||
|             let response = self.http_client | ||||
|                 .post(&base_url) | ||||
|                 .json(&request) | ||||
|                 .send() | ||||
|                 .await | ||||
|                 .map_err(|e| DBError(format!("Failed to call embedding service: {}", e)))?; | ||||
|              | ||||
|             if !response.status().is_success() { | ||||
|                 let status = response.status(); | ||||
|                 let error_text = response.text().await.unwrap_or_default(); | ||||
|                 return Err(DBError(format!( | ||||
|                     "Embedding service returned error {}: {}", | ||||
|                     status, error_text | ||||
|                 ))); | ||||
|             } | ||||
|              | ||||
|             let embedding_response: EmbeddingResponse = response | ||||
|                 .json() | ||||
|                 .await | ||||
|                 .map_err(|e| DBError(format!("Failed to parse embedding response: {}", e)))?; | ||||
|              | ||||
|             Ok(embedding_response.embeddings) | ||||
|         } | ||||
|     } | ||||
|      | ||||
|     pub async fn embed_text( | ||||
|         &self,  | ||||
|         server: &crate::server::Server, | ||||
|         texts: Vec<String> | ||||
|     ) -> Result<Vec<Vec<f32>>, DBError> { | ||||
|         if texts.is_empty() { | ||||
|             return Ok(Vec::new()); | ||||
|         } | ||||
|          | ||||
|         self.call_embedding_service(server, Some(texts), None).await | ||||
|     } | ||||
|      | ||||
|     pub async fn embed_image( | ||||
|         &self, | ||||
|         server: &crate::server::Server, | ||||
|         image_bytes: Vec<u8> | ||||
|     ) -> Result<Vec<f32>, DBError> { | ||||
|         // Convert image bytes to base64 | ||||
|         let base64_image = base64::engine::general_purpose::STANDARD.encode(&image_bytes); | ||||
|          | ||||
|         let embeddings = self.call_embedding_service( | ||||
|             server,  | ||||
|             None,  | ||||
|             Some(vec![base64_image]) | ||||
|         ).await?; | ||||
|          | ||||
|         embeddings.into_iter() | ||||
|             .next() | ||||
|             .ok_or_else(|| DBError("No embedding returned for image".to_string())) | ||||
|     } | ||||
|      | ||||
|     pub async fn create_dataset( | ||||
|         &self, | ||||
|         name: &str, | ||||
|         schema: Schema, | ||||
|     ) -> Result<(), DBError> { | ||||
|         let dataset_path = self.data_dir.join(format!("{}.lance", name)); | ||||
|          | ||||
|         // Create empty dataset with schema | ||||
|         let write_params = WriteParams { | ||||
|             mode: WriteMode::Create, | ||||
|             ..Default::default() | ||||
|         }; | ||||
|          | ||||
|         // Create an empty RecordBatch with the schema | ||||
|         let empty_batch = RecordBatch::new_empty(Arc::new(schema)); | ||||
|          | ||||
|         // Use RecordBatchReader for Lance 0.33 | ||||
|         let reader = VecRecordBatchReader::new(vec![empty_batch]); | ||||
|         let dataset = Dataset::write( | ||||
|             reader, | ||||
|             dataset_path.to_str().unwrap(), | ||||
|             Some(write_params) | ||||
|         ).await | ||||
|         .map_err(|e| DBError(format!("Failed to create dataset: {}", e)))?; | ||||
|          | ||||
|         let mut datasets = self.datasets.write().await; | ||||
|         datasets.insert(name.to_string(), Arc::new(dataset)); | ||||
|          | ||||
|         Ok(()) | ||||
|     } | ||||
|      | ||||
|     pub async fn write_vectors( | ||||
|         &self, | ||||
|         dataset_name: &str, | ||||
|         vectors: Vec<Vec<f32>>, | ||||
|         metadata: Option<HashMap<String, Vec<String>>>, | ||||
|     ) -> Result<usize, DBError> { | ||||
|         let dataset_path = self.data_dir.join(format!("{}.lance", dataset_name)); | ||||
|          | ||||
|         // Open or get cached dataset | ||||
|         let _dataset = self.get_or_open_dataset(dataset_name).await?; | ||||
|          | ||||
|         // Build RecordBatch | ||||
|         let num_vectors = vectors.len(); | ||||
|         if num_vectors == 0 { | ||||
|             return Ok(0); | ||||
|         } | ||||
|          | ||||
|         let dim = vectors.first() | ||||
|             .ok_or_else(|| DBError("Empty vectors".to_string()))? | ||||
|             .len(); | ||||
|          | ||||
|         // Flatten vectors | ||||
|         let flat_vectors: Vec<f32> = vectors.into_iter().flatten().collect(); | ||||
|         let values_array = Float32Array::from(flat_vectors); | ||||
|         let field = Arc::new(Field::new("item", DataType::Float32, true)); | ||||
|         let vector_array = FixedSizeListArray::try_new( | ||||
|             field, | ||||
|             dim as i32, | ||||
|             Arc::new(values_array), | ||||
|             None | ||||
|         ).map_err(|e| DBError(format!("Failed to create vector array: {}", e)))?; | ||||
|          | ||||
|         let mut arrays: Vec<ArrayRef> = vec![Arc::new(vector_array)]; | ||||
|         let mut fields = vec![Field::new( | ||||
|             "vector", | ||||
|             DataType::FixedSizeList( | ||||
|                 Arc::new(Field::new("item", DataType::Float32, true)), | ||||
|                 dim as i32 | ||||
|             ), | ||||
|             false | ||||
|         )]; | ||||
|          | ||||
|         // Add metadata columns if provided | ||||
|         if let Some(metadata) = metadata { | ||||
|             for (key, values) in metadata { | ||||
|                 if values.len() != num_vectors { | ||||
|                     return Err(DBError(format!( | ||||
|                         "Metadata field '{}' has {} values but expected {}",  | ||||
|                         key, values.len(), num_vectors | ||||
|                     ))); | ||||
|                 } | ||||
|                 let array = StringArray::from(values); | ||||
|                 arrays.push(Arc::new(array)); | ||||
|                 fields.push(Field::new(&key, DataType::Utf8, true)); | ||||
|             } | ||||
|         } | ||||
|          | ||||
|         let schema = Arc::new(Schema::new(fields)); | ||||
|         let batch = RecordBatch::try_new(schema, arrays) | ||||
|             .map_err(|e| DBError(format!("Failed to create RecordBatch: {}", e)))?; | ||||
|          | ||||
|         // Append to dataset | ||||
|         let write_params = WriteParams { | ||||
|             mode: WriteMode::Append, | ||||
|             ..Default::default() | ||||
|         }; | ||||
|          | ||||
|         let reader = VecRecordBatchReader::new(vec![batch]); | ||||
|         Dataset::write( | ||||
|             reader, | ||||
|             dataset_path.to_str().unwrap(), | ||||
|             Some(write_params) | ||||
|         ).await | ||||
|         .map_err(|e| DBError(format!("Failed to write to dataset: {}", e)))?; | ||||
|          | ||||
|         // Refresh cached dataset | ||||
|         let mut datasets = self.datasets.write().await; | ||||
|         datasets.remove(dataset_name); | ||||
|          | ||||
|         Ok(num_vectors) | ||||
|     } | ||||
|      | ||||
|     pub async fn search_vectors( | ||||
|         &self, | ||||
|         dataset_name: &str, | ||||
|         query_vector: Vec<f32>, | ||||
|         k: usize, | ||||
|         nprobes: Option<usize>, | ||||
|         _refine_factor: Option<usize>, | ||||
|     ) -> Result<Vec<(f32, HashMap<String, String>)>, DBError> { | ||||
|         let dataset = self.get_or_open_dataset(dataset_name).await?; | ||||
|          | ||||
|         // Build query | ||||
|         let query_array = Float32Array::from(query_vector.clone()); | ||||
|         let mut query = dataset.scan(); | ||||
|         query.nearest( | ||||
|             "vector", | ||||
|             &query_array, | ||||
|             k, | ||||
|         ).map_err(|e| DBError(format!("Failed to build search query: {}", e)))?; | ||||
|          | ||||
|         if let Some(nprobes) = nprobes { | ||||
|             query.nprobs(nprobes); | ||||
|         } | ||||
|          | ||||
|         // Note: refine_factor might not be available in this Lance version | ||||
|         // if let Some(refine) = refine_factor { | ||||
|         //     query.refine_factor(refine); | ||||
|         // } | ||||
|          | ||||
|         // Execute search | ||||
|         let results = query | ||||
|             .try_into_stream() | ||||
|             .await | ||||
|             .map_err(|e| DBError(format!("Failed to execute search: {}", e)))? | ||||
|             .try_collect::<Vec<_>>() | ||||
|             .await | ||||
|             .map_err(|e| DBError(format!("Failed to collect results: {}", e)))?; | ||||
|          | ||||
|         // Process results | ||||
|         let mut output = Vec::new(); | ||||
|         for batch in results { | ||||
|             // Get distances | ||||
|             let distances = batch | ||||
|                 .column_by_name("_distance") | ||||
|                 .ok_or_else(|| DBError("No distance column".to_string()))? | ||||
|                 .as_any() | ||||
|                 .downcast_ref::<Float32Array>() | ||||
|                 .ok_or_else(|| DBError("Invalid distance type".to_string()))?; | ||||
|              | ||||
|             // Get metadata | ||||
|             for i in 0..batch.num_rows() { | ||||
|                 let distance = distances.value(i); | ||||
|                 let mut metadata = HashMap::new(); | ||||
|                  | ||||
|                 for field in batch.schema().fields() { | ||||
|                     if field.name() != "vector" && field.name() != "_distance" { | ||||
|                         if let Some(col) = batch.column_by_name(field.name()) { | ||||
|                             if let Some(str_array) = col.as_any().downcast_ref::<StringArray>() { | ||||
|                                 if !str_array.is_null(i) { | ||||
|                                     metadata.insert( | ||||
|                                         field.name().to_string(), | ||||
|                                         str_array.value(i).to_string() | ||||
|                                     ); | ||||
|                                 } | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|                  | ||||
|                 output.push((distance, metadata)); | ||||
|             } | ||||
|         } | ||||
|          | ||||
|         Ok(output) | ||||
|     } | ||||
|      | ||||
|     pub async fn store_multimodal( | ||||
|         &self, | ||||
|         server: &crate::server::Server, | ||||
|         dataset_name: &str, | ||||
|         text: Option<String>, | ||||
|         image_bytes: Option<Vec<u8>>, | ||||
|         metadata: HashMap<String, String>, | ||||
|     ) -> Result<String, DBError> { | ||||
|         // Generate ID | ||||
|         let id = uuid::Uuid::new_v4().to_string(); | ||||
|          | ||||
|         // Generate embeddings using external service | ||||
|         let embedding = if let Some(text) = text.as_ref() { | ||||
|             self.embed_text(server, vec![text.clone()]).await? | ||||
|                 .into_iter() | ||||
|                 .next() | ||||
|                 .ok_or_else(|| DBError("No embedding returned".to_string()))? | ||||
|         } else if let Some(img) = image_bytes.as_ref() { | ||||
|             self.embed_image(server, img.clone()).await? | ||||
|         } else { | ||||
|             return Err(DBError("No text or image provided".to_string())); | ||||
|         }; | ||||
|          | ||||
|         // Prepare metadata | ||||
|         let mut full_metadata = metadata; | ||||
|         full_metadata.insert("id".to_string(), id.clone()); | ||||
|         if let Some(text) = text { | ||||
|             full_metadata.insert("text".to_string(), text); | ||||
|         } | ||||
|         if let Some(img) = image_bytes { | ||||
|             full_metadata.insert("image_base64".to_string(), base64::engine::general_purpose::STANDARD.encode(img)); | ||||
|         } | ||||
|          | ||||
|         // Convert metadata to column vectors | ||||
|         let mut metadata_cols = HashMap::new(); | ||||
|         for (key, value) in full_metadata { | ||||
|             metadata_cols.insert(key, vec![value]); | ||||
|         } | ||||
|          | ||||
|         // Write to dataset | ||||
|         self.write_vectors(dataset_name, vec![embedding], Some(metadata_cols)).await?; | ||||
|          | ||||
|         Ok(id) | ||||
|     } | ||||
|      | ||||
|     pub async fn search_with_text( | ||||
|         &self, | ||||
|         server: &crate::server::Server, | ||||
|         dataset_name: &str, | ||||
|         query_text: String, | ||||
|         k: usize, | ||||
|         nprobes: Option<usize>, | ||||
|         refine_factor: Option<usize>, | ||||
|     ) -> Result<Vec<(f32, HashMap<String, String>)>, DBError> { | ||||
|         // Embed the query text using external service | ||||
|         let embeddings = self.embed_text(server, vec![query_text]).await?; | ||||
|         let query_vector = embeddings.into_iter() | ||||
|             .next() | ||||
|             .ok_or_else(|| DBError("No embedding returned for query".to_string()))?; | ||||
|          | ||||
|         // Search with the embedding | ||||
|         self.search_vectors(dataset_name, query_vector, k, nprobes, refine_factor).await | ||||
|     } | ||||
|      | ||||
|     pub async fn create_index( | ||||
|         &self, | ||||
|         dataset_name: &str, | ||||
|         index_type: &str, | ||||
|         num_partitions: Option<usize>, | ||||
|         num_sub_vectors: Option<usize>, | ||||
|     ) -> Result<(), DBError> { | ||||
|         let _dataset = self.get_or_open_dataset(dataset_name).await?; | ||||
|          | ||||
|         match index_type.to_uppercase().as_str() { | ||||
|             "IVF_PQ" => { | ||||
|                 let ivf_params = IvfBuildParams { | ||||
|                     num_partitions: num_partitions.unwrap_or(256), | ||||
|                     ..Default::default() | ||||
|                 }; | ||||
|                 let pq_params = PQBuildParams { | ||||
|                     num_sub_vectors: num_sub_vectors.unwrap_or(16), | ||||
|                     ..Default::default() | ||||
|                 }; | ||||
|                 let params = VectorIndexParams::with_ivf_pq_params( | ||||
|                     MetricType::L2, | ||||
|                     ivf_params, | ||||
|                     pq_params, | ||||
|                 ); | ||||
|                  | ||||
|                 // Get a mutable reference to the dataset | ||||
|                 let mut dataset_mut = Dataset::open(self.data_dir.join(format!("{}.lance", dataset_name)).to_str().unwrap()) | ||||
|                     .await | ||||
|                     .map_err(|e| DBError(format!("Failed to open dataset for indexing: {}", e)))?; | ||||
|                  | ||||
|                 dataset_mut.create_index( | ||||
|                     &["vector"], | ||||
|                     lance_index::IndexType::Vector, | ||||
|                     None, | ||||
|                     ¶ms, | ||||
|                     true | ||||
|                 ).await | ||||
|                 .map_err(|e| DBError(format!("Failed to create index: {}", e)))?; | ||||
|             } | ||||
|             _ => return Err(DBError(format!("Unsupported index type: {}", index_type))), | ||||
|         } | ||||
|          | ||||
|         Ok(()) | ||||
|     } | ||||
|      | ||||
|     async fn get_or_open_dataset(&self, name: &str) -> Result<Arc<Dataset>, DBError> { | ||||
|         let mut datasets = self.datasets.write().await; | ||||
|          | ||||
|         if let Some(dataset) = datasets.get(name) { | ||||
|             return Ok(dataset.clone()); | ||||
|         } | ||||
|          | ||||
|         let dataset_path = self.data_dir.join(format!("{}.lance", name)); | ||||
|         if !dataset_path.exists() { | ||||
|             return Err(DBError(format!("Dataset '{}' does not exist", name))); | ||||
|         } | ||||
|          | ||||
|         let dataset = Dataset::open(dataset_path.to_str().unwrap()) | ||||
|             .await | ||||
|             .map_err(|e| DBError(format!("Failed to open dataset: {}", e)))?; | ||||
|          | ||||
|         let dataset = Arc::new(dataset); | ||||
|         datasets.insert(name.to_string(), dataset.clone()); | ||||
|          | ||||
|         Ok(dataset) | ||||
|     } | ||||
|      | ||||
|     pub async fn list_datasets(&self) -> Result<Vec<String>, DBError> { | ||||
|         let mut datasets = Vec::new(); | ||||
|          | ||||
|         let entries = std::fs::read_dir(&self.data_dir) | ||||
|             .map_err(|e| DBError(format!("Failed to read data directory: {}", e)))?; | ||||
|          | ||||
|         for entry in entries { | ||||
|             let entry = entry.map_err(|e| DBError(format!("Failed to read entry: {}", e)))?; | ||||
|             let path = entry.path(); | ||||
|              | ||||
|             if path.is_dir() { | ||||
|                 if let Some(name) = path.file_name() { | ||||
|                     if let Some(name_str) = name.to_str() { | ||||
|                         if name_str.ends_with(".lance") { | ||||
|                             let dataset_name = name_str.trim_end_matches(".lance"); | ||||
|                             datasets.push(dataset_name.to_string()); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|          | ||||
|         Ok(datasets) | ||||
|     } | ||||
|      | ||||
|     pub async fn drop_dataset(&self, name: &str) -> Result<(), DBError> { | ||||
|         // Remove from cache | ||||
|         let mut datasets = self.datasets.write().await; | ||||
|         datasets.remove(name); | ||||
|          | ||||
|         // Delete from disk | ||||
|         let dataset_path = self.data_dir.join(format!("{}.lance", name)); | ||||
|         if dataset_path.exists() { | ||||
|             std::fs::remove_dir_all(dataset_path) | ||||
|                 .map_err(|e| DBError(format!("Failed to delete dataset: {}", e)))?; | ||||
|         } | ||||
|          | ||||
|         Ok(()) | ||||
|     } | ||||
|      | ||||
|     pub async fn get_dataset_info(&self, name: &str) -> Result<HashMap<String, String>, DBError> { | ||||
|         let dataset = self.get_or_open_dataset(name).await?; | ||||
|          | ||||
|         let mut info = HashMap::new(); | ||||
|         info.insert("name".to_string(), name.to_string()); | ||||
|         info.insert("version".to_string(), dataset.version().version.to_string()); | ||||
|         info.insert("num_rows".to_string(), dataset.count_rows(None).await?.to_string()); | ||||
|          | ||||
|         // Get schema info | ||||
|         let schema = dataset.schema(); | ||||
|         let fields: Vec<String> = schema.fields | ||||
|             .iter() | ||||
|             .map(|f| format!("{}:{}", f.name, f.data_type())) | ||||
|             .collect(); | ||||
|         info.insert("schema".to_string(), fields.join(", ")); | ||||
|          | ||||
|         Ok(info) | ||||
|     } | ||||
| } | ||||
| @@ -2,6 +2,7 @@ pub mod age;   // NEW | ||||
| pub mod cmd; | ||||
| pub mod crypto; | ||||
| pub mod error; | ||||
| pub mod lance_store;    // Add Lance store module | ||||
| pub mod options; | ||||
| pub mod protocol; | ||||
| pub mod server; | ||||
|   | ||||
| @@ -9,6 +9,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; | ||||
|  | ||||
| use crate::cmd::Cmd; | ||||
| use crate::error::DBError; | ||||
| use crate::lance_store::LanceStore; | ||||
| use crate::options; | ||||
| use crate::protocol::Protocol; | ||||
| use crate::storage::Storage; | ||||
| @@ -26,6 +27,9 @@ pub struct Server { | ||||
|     // BLPOP waiter registry: per (db_index, key) FIFO of waiters | ||||
|     pub list_waiters: Arc<Mutex<HashMap<u64, HashMap<String, Vec<Waiter>>>>>, | ||||
|     pub waiter_seq: Arc<AtomicU64>, | ||||
|      | ||||
|     // Lance vector store | ||||
|     pub lance_store: Option<Arc<LanceStore>>, | ||||
| } | ||||
|  | ||||
| pub struct Waiter { | ||||
| @@ -42,6 +46,16 @@ pub enum PopSide { | ||||
|  | ||||
| impl Server { | ||||
|     pub async fn new(option: options::DBOption) -> Self { | ||||
|         // Initialize Lance store | ||||
|         let lance_data_dir = std::path::PathBuf::from(&option.dir).join("lance"); | ||||
|         let lance_store = match LanceStore::new(lance_data_dir).await { | ||||
|             Ok(store) => Some(Arc::new(store)), | ||||
|             Err(e) => { | ||||
|                 eprintln!("Warning: Failed to initialize Lance store: {}", e.0); | ||||
|                 None | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         Server { | ||||
|             db_cache: Arc::new(std::sync::RwLock::new(HashMap::new())), | ||||
|             option, | ||||
| @@ -51,9 +65,17 @@ impl Server { | ||||
|  | ||||
|             list_waiters: Arc::new(Mutex::new(HashMap::new())), | ||||
|             waiter_seq: Arc::new(AtomicU64::new(1)), | ||||
|             lance_store, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn lance_store(&self) -> Result<Arc<LanceStore>, DBError> { | ||||
|         self.lance_store | ||||
|             .as_ref() | ||||
|             .cloned() | ||||
|             .ok_or_else(|| DBError("Lance store not initialized".to_string())) | ||||
|     } | ||||
|  | ||||
|     pub fn current_storage(&self) -> Result<Arc<dyn StorageBackend>, DBError> { | ||||
|         let mut cache = self.db_cache.write().unwrap(); | ||||
|          | ||||
|   | ||||
		Reference in New Issue
	
	Block a user