implemented 0.db as admin database architecture + updated test file
This commit is contained in:
		
							
								
								
									
										361
									
								
								src/admin_meta.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										361
									
								
								src/admin_meta.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,361 @@ | ||||
| use std::path::PathBuf; | ||||
| use std::sync::{Arc, OnceLock, Mutex, RwLock}; | ||||
| use std::collections::HashMap; | ||||
|  | ||||
| use crate::error::DBError; | ||||
| use crate::options; | ||||
| use crate::rpc::Permissions; | ||||
| use crate::storage::Storage; | ||||
| use crate::storage_sled::SledStorage; | ||||
| use crate::storage_trait::StorageBackend; | ||||
|  | ||||
| // Key builders | ||||
| fn k_admin_next_id() -> &'static str { | ||||
|     "admin:next_id" | ||||
| } | ||||
| fn k_admin_dbs() -> &'static str { | ||||
|     "admin:dbs" | ||||
| } | ||||
| fn k_meta_db(id: u64) -> String { | ||||
|     format!("meta:db:{}", id) | ||||
| } | ||||
| fn k_meta_db_keys(id: u64) -> String { | ||||
|     format!("meta:db:{}:keys", id) | ||||
| } | ||||
| fn k_meta_db_enc(id: u64) -> String { | ||||
|     format!("meta:db:{}:enc", id) | ||||
| } | ||||
|  | ||||
| // Global cache of admin DB 0 handles per base_dir to avoid sled/reDB file-lock contention | ||||
| // and to correctly isolate different test instances with distinct directories. | ||||
| static ADMIN_STORAGES: OnceLock<RwLock<HashMap<String, Arc<dyn StorageBackend>>>> = OnceLock::new(); | ||||
|  | ||||
| // Global registry for data DB storages to avoid double-open across process. | ||||
| static DATA_STORAGES: OnceLock<RwLock<HashMap<u64, Arc<dyn StorageBackend>>>> = OnceLock::new(); | ||||
| static DATA_INIT_LOCK: Mutex<()> = Mutex::new(()); | ||||
|  | ||||
| fn init_admin_storage( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
| ) -> Result<Arc<dyn StorageBackend>, DBError> { | ||||
|     let db_file = PathBuf::from(base_dir).join("0.db"); | ||||
|     if let Some(parent_dir) = db_file.parent() { | ||||
|         std::fs::create_dir_all(parent_dir).map_err(|e| { | ||||
|             DBError(format!("Failed to create directory {}: {}", parent_dir.display(), e)) | ||||
|         })?; | ||||
|     } | ||||
|     let storage: Arc<dyn StorageBackend> = match backend { | ||||
|         options::BackendType::Redb => Arc::new(Storage::new(&db_file, true, Some(admin_secret))?), | ||||
|         options::BackendType::Sled => Arc::new(SledStorage::new(&db_file, true, Some(admin_secret))?), | ||||
|     }; | ||||
|     Ok(storage) | ||||
| } | ||||
|  | ||||
| // Get or initialize a cached handle to admin DB 0 per base_dir (thread-safe, no double-open race) | ||||
| pub fn open_admin_storage( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
| ) -> Result<Arc<dyn StorageBackend>, DBError> { | ||||
|     let map = ADMIN_STORAGES.get_or_init(|| RwLock::new(HashMap::new())); | ||||
|     // Fast path | ||||
|     if let Some(st) = map.read().unwrap().get(base_dir) { | ||||
|         return Ok(st.clone()); | ||||
|     } | ||||
|     // Slow path with write lock | ||||
|     { | ||||
|         let mut w = map.write().unwrap(); | ||||
|         if let Some(st) = w.get(base_dir) { | ||||
|             return Ok(st.clone()); | ||||
|         } | ||||
|         let st = init_admin_storage(base_dir, backend, admin_secret)?; | ||||
|         w.insert(base_dir.to_string(), st.clone()); | ||||
|         return Ok(st); | ||||
|     } | ||||
| } | ||||
|  | ||||
| // Ensure admin structures exist in encrypted DB 0 | ||||
| pub fn ensure_bootstrap( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
| ) -> Result<(), DBError> { | ||||
|     let admin = open_admin_storage(base_dir, backend, admin_secret)?; | ||||
|  | ||||
|     // Initialize next id if missing | ||||
|     if !admin.exists(k_admin_next_id())? { | ||||
|         admin.set(k_admin_next_id().to_string(), "1".to_string())?; | ||||
|     } | ||||
|     // admin:dbs is a hash; it's fine if it doesn't exist (hlen -> 0) | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| // Get or initialize a shared handle to a data DB (> 0), avoiding double-open across subsystems | ||||
| pub fn open_data_storage( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
|     id: u64, | ||||
| ) -> Result<Arc<dyn StorageBackend>, DBError> { | ||||
|     if id == 0 { | ||||
|         return open_admin_storage(base_dir, backend, admin_secret); | ||||
|     } | ||||
|  | ||||
|     // Validate existence in admin metadata | ||||
|     if !db_exists(base_dir, backend.clone(), admin_secret, id)? { | ||||
|         return Err(DBError(format!( | ||||
|             "Cannot open database instance {}, as that database instance does not exist.", | ||||
|             id | ||||
|         ))); | ||||
|     } | ||||
|  | ||||
|     let map = DATA_STORAGES.get_or_init(|| RwLock::new(HashMap::new())); | ||||
|     // Fast path | ||||
|     if let Some(st) = map.read().unwrap().get(&id) { | ||||
|         return Ok(st.clone()); | ||||
|     } | ||||
|  | ||||
|     // Slow path with init lock | ||||
|     let _guard = DATA_INIT_LOCK.lock().unwrap(); | ||||
|     if let Some(st) = map.read().unwrap().get(&id) { | ||||
|         return Ok(st.clone()); | ||||
|     } | ||||
|  | ||||
|     // Determine per-db encryption | ||||
|     let enc = get_enc_key(base_dir, backend.clone(), admin_secret, id)?; | ||||
|     let should_encrypt = enc.is_some(); | ||||
|  | ||||
|     // Build database file path and ensure parent dir exists | ||||
|     let db_file = PathBuf::from(base_dir).join(format!("{}.db", id)); | ||||
|     if let Some(parent_dir) = db_file.parent() { | ||||
|         std::fs::create_dir_all(parent_dir).map_err(|e| { | ||||
|             DBError(format!("Failed to create directory {}: {}", parent_dir.display(), e)) | ||||
|         })?; | ||||
|     } | ||||
|  | ||||
|     // Open storage | ||||
|     let storage: Arc<dyn StorageBackend> = match backend { | ||||
|         options::BackendType::Redb => Arc::new(Storage::new(&db_file, should_encrypt, enc.as_deref())?), | ||||
|         options::BackendType::Sled => Arc::new(SledStorage::new(&db_file, should_encrypt, enc.as_deref())?), | ||||
|     }; | ||||
|  | ||||
|     // Publish to registry | ||||
|     map.write().unwrap().insert(id, storage.clone()); | ||||
|     Ok(storage) | ||||
| } | ||||
|  | ||||
| // Allocate the next DB id and persist new pointer | ||||
| pub fn allocate_next_id( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
| ) -> Result<u64, DBError> { | ||||
|     let admin = open_admin_storage(base_dir, backend, admin_secret)?; | ||||
|     let cur = admin | ||||
|         .get(k_admin_next_id())? | ||||
|         .unwrap_or_else(|| "1".to_string()); | ||||
|     let id: u64 = cur.parse().unwrap_or(1); | ||||
|     let next = id.checked_add(1).ok_or_else(|| DBError("next_id overflow".into()))?; | ||||
|     admin.set(k_admin_next_id().to_string(), next.to_string())?; | ||||
|  | ||||
|     // Register into admin:dbs set/hash | ||||
|     let _ = admin.hset(k_admin_dbs(), vec![(id.to_string(), "1".to_string())])?; | ||||
|  | ||||
|     // Default meta for the new db: public true | ||||
|     let meta_key = k_meta_db(id); | ||||
|     let _ = admin.hset(&meta_key, vec![("public".to_string(), "true".to_string())])?; | ||||
|  | ||||
|     Ok(id) | ||||
| } | ||||
|  | ||||
| // Check existence of a db id in admin:dbs | ||||
| pub fn db_exists( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
|     id: u64, | ||||
| ) -> Result<bool, DBError> { | ||||
|     let admin = open_admin_storage(base_dir, backend, admin_secret)?; | ||||
|     Ok(admin.hexists(k_admin_dbs(), &id.to_string())?) | ||||
| } | ||||
|  | ||||
| // Get per-db encryption key, if any | ||||
| pub fn get_enc_key( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
|     id: u64, | ||||
| ) -> Result<Option<String>, DBError> { | ||||
|     let admin = open_admin_storage(base_dir, backend, admin_secret)?; | ||||
|     admin.get(&k_meta_db_enc(id)) | ||||
| } | ||||
|  | ||||
| // Set per-db encryption key (called during create) | ||||
| pub fn set_enc_key( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
|     id: u64, | ||||
|     key: &str, | ||||
| ) -> Result<(), DBError> { | ||||
|     let admin = open_admin_storage(base_dir, backend, admin_secret)?; | ||||
|     admin.set(k_meta_db_enc(id), key.to_string()) | ||||
| } | ||||
|  | ||||
| // Set database public flag | ||||
| pub fn set_database_public( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
|     id: u64, | ||||
|     public: bool, | ||||
| ) -> Result<(), DBError> { | ||||
|     let admin = open_admin_storage(base_dir, backend, admin_secret)?; | ||||
|     let mk = k_meta_db(id); | ||||
|     let _ = admin.hset(&mk, vec![("public".to_string(), public.to_string())])?; | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| // Internal: load public flag; default to true when meta missing | ||||
| fn load_public( | ||||
|     admin: &Arc<dyn StorageBackend>, | ||||
|     id: u64, | ||||
| ) -> Result<bool, DBError> { | ||||
|     let mk = k_meta_db(id); | ||||
|     match admin.hget(&mk, "public")? { | ||||
|         Some(v) => Ok(v == "true"), | ||||
|         None => Ok(true), | ||||
|     } | ||||
| } | ||||
|  | ||||
| // Add access key for db (value format: "Read:ts" or "ReadWrite:ts") | ||||
| pub fn add_access_key( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
|     id: u64, | ||||
|     key_plain: &str, | ||||
|     perms: Permissions, | ||||
| ) -> Result<(), DBError> { | ||||
|     let admin = open_admin_storage(base_dir, backend, admin_secret)?; | ||||
|     let hash = crate::rpc::hash_key(key_plain); | ||||
|     let v = match perms { | ||||
|         Permissions::Read => format!("Read:{}", now_secs()), | ||||
|         Permissions::ReadWrite => format!("ReadWrite:{}", now_secs()), | ||||
|     }; | ||||
|     let _ = admin.hset(&k_meta_db_keys(id), vec![(hash, v)])?; | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| // Delete access key by hash | ||||
| pub fn delete_access_key( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
|     id: u64, | ||||
|     key_hash: &str, | ||||
| ) -> Result<bool, DBError> { | ||||
|     let admin = open_admin_storage(base_dir, backend, admin_secret)?; | ||||
|     let n = admin.hdel(&k_meta_db_keys(id), vec![key_hash.to_string()])?; | ||||
|     Ok(n > 0) | ||||
| } | ||||
|  | ||||
| // List access keys, returning (hash, perms, created_at_secs) | ||||
| pub fn list_access_keys( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
|     id: u64, | ||||
| ) -> Result<Vec<(String, Permissions, u64)>, DBError> { | ||||
|     let admin = open_admin_storage(base_dir, backend, admin_secret)?; | ||||
|     let pairs = admin.hgetall(&k_meta_db_keys(id))?; | ||||
|     let mut out = Vec::new(); | ||||
|     for (hash, val) in pairs { | ||||
|         let (perm, ts) = parse_perm_value(&val); | ||||
|         out.push((hash, perm, ts)); | ||||
|     } | ||||
|     Ok(out) | ||||
| } | ||||
|  | ||||
| // Verify access permission for db id with optional key | ||||
| // Returns: | ||||
| // - Ok(Some(Permissions)) when access is allowed | ||||
| // - Ok(None) when not allowed or db missing (caller can distinguish by calling db_exists) | ||||
| pub fn verify_access( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
|     id: u64, | ||||
|     key_opt: Option<&str>, | ||||
| ) -> Result<Option<Permissions>, DBError> { | ||||
|     // Admin DB 0: require exact admin_secret | ||||
|     if id == 0 { | ||||
|         if let Some(k) = key_opt { | ||||
|             if k == admin_secret { | ||||
|                 return Ok(Some(Permissions::ReadWrite)); | ||||
|             } | ||||
|         } | ||||
|         return Ok(None); | ||||
|     } | ||||
|  | ||||
|     let admin = open_admin_storage(base_dir, backend, admin_secret)?; | ||||
|     if !admin.hexists(k_admin_dbs(), &id.to_string())? { | ||||
|         return Ok(None); | ||||
|     } | ||||
|  | ||||
|     // Public? | ||||
|     if load_public(&admin, id)? { | ||||
|         return Ok(Some(Permissions::ReadWrite)); | ||||
|     } | ||||
|  | ||||
|     // Private: require key and verify | ||||
|     if let Some(k) = key_opt { | ||||
|         let hash = crate::rpc::hash_key(k); | ||||
|         if let Some(v) = admin.hget(&k_meta_db_keys(id), &hash)? { | ||||
|             let (perm, _ts) = parse_perm_value(&v); | ||||
|             return Ok(Some(perm)); | ||||
|         } | ||||
|     } | ||||
|     Ok(None) | ||||
| } | ||||
|  | ||||
| // Enumerate all db ids | ||||
| pub fn list_dbs( | ||||
|     base_dir: &str, | ||||
|     backend: options::BackendType, | ||||
|     admin_secret: &str, | ||||
| ) -> Result<Vec<u64>, DBError> { | ||||
|     let admin = open_admin_storage(base_dir, backend, admin_secret)?; | ||||
|     let ids = admin.hkeys(k_admin_dbs())?; | ||||
|     let mut out = Vec::new(); | ||||
|     for s in ids { | ||||
|         if let Ok(v) = s.parse() { | ||||
|             out.push(v); | ||||
|         } | ||||
|     } | ||||
|     Ok(out) | ||||
| } | ||||
|  | ||||
| // Helper: parse permission value "Read:ts" or "ReadWrite:ts" | ||||
| fn parse_perm_value(v: &str) -> (Permissions, u64) { | ||||
|     let mut parts = v.split(':'); | ||||
|     let p = parts.next().unwrap_or("Read"); | ||||
|     let ts = parts | ||||
|         .next() | ||||
|         .and_then(|s| s.parse().ok()) | ||||
|         .unwrap_or(0u64); | ||||
|     let perm = match p { | ||||
|         "ReadWrite" => Permissions::ReadWrite, | ||||
|         _ => Permissions::Read, | ||||
|     }; | ||||
|     (perm, ts) | ||||
| } | ||||
|  | ||||
| fn now_secs() -> u64 { | ||||
|     use std::time::{SystemTime, UNIX_EPOCH}; | ||||
|     SystemTime::now() | ||||
|         .duration_since(UNIX_EPOCH) | ||||
|         .unwrap_or_default() | ||||
|         .as_secs() | ||||
| } | ||||
							
								
								
									
										79
									
								
								src/cmd.rs
									
									
									
									
									
								
							
							
						
						
									
										79
									
								
								src/cmd.rs
									
									
									
									
									
								
							| @@ -768,43 +768,64 @@ async fn flushdb_cmd(server: &mut Server) -> Result<Protocol, DBError> { | ||||
| } | ||||
|  | ||||
| async fn select_cmd(server: &mut Server, db: u64, key: Option<String>) -> Result<Protocol, DBError> { | ||||
|     // Load database metadata | ||||
|     let meta = match crate::rpc::RpcServerImpl::load_meta_static(&server.option.dir, db).await { | ||||
|         Ok(m) => m, | ||||
|         Err(_) => { | ||||
|             // If meta doesn't exist, create default | ||||
|             let default_meta = crate::rpc::DatabaseMeta { | ||||
|                 public: true, | ||||
|                 keys: std::collections::HashMap::new(), | ||||
|             }; | ||||
|             if let Err(_) = crate::rpc::RpcServerImpl::save_meta_static(&server.option.dir, db, &default_meta).await { | ||||
|                 return Ok(Protocol::err("ERR failed to initialize database metadata")); | ||||
|     // Authorization and existence checks via admin DB 0 | ||||
|     // DB 0: require KEY admin-secret | ||||
|     if db == 0 { | ||||
|         match key { | ||||
|             Some(k) if k == server.option.admin_secret => { | ||||
|                 server.selected_db = 0; | ||||
|                 server.current_permissions = Some(crate::rpc::Permissions::ReadWrite); | ||||
|                 // Will create encrypted 0.db if missing | ||||
|                 match server.current_storage() { | ||||
|                     Ok(_) => return Ok(Protocol::SimpleString("OK".to_string())), | ||||
|                     Err(e) => return Ok(Protocol::err(&e.0)), | ||||
|                 } | ||||
|             } | ||||
|             _ => { | ||||
|                 return Ok(Protocol::err("ERR invalid access key")); | ||||
|             } | ||||
|             default_meta | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     // DB > 0: must exist in admin:dbs | ||||
|     let exists = match crate::admin_meta::db_exists( | ||||
|         &server.option.dir, | ||||
|         server.option.backend.clone(), | ||||
|         &server.option.admin_secret, | ||||
|         db, | ||||
|     ) { | ||||
|         Ok(b) => b, | ||||
|         Err(e) => return Ok(Protocol::err(&e.0)), | ||||
|     }; | ||||
|  | ||||
|     // Check access permissions | ||||
|     let permissions = if meta.public { | ||||
|         // Public database - full access | ||||
|         Some(crate::rpc::Permissions::ReadWrite) | ||||
|     } else if let Some(key_str) = key { | ||||
|         // Private database - check key | ||||
|         let hash = crate::rpc::hash_key(&key_str); | ||||
|         if let Some(access_key) = meta.keys.get(&hash) { | ||||
|             Some(access_key.permissions.clone()) | ||||
|         } else { | ||||
|             return Ok(Protocol::err("ERR invalid access key")); | ||||
|         } | ||||
|     } else { | ||||
|         return Ok(Protocol::err("ERR access key required for private database")); | ||||
|     if !exists { | ||||
|         return Ok(Protocol::err(&format!( | ||||
|             "Cannot open database instance {}, as that database instance does not exist.", | ||||
|             db | ||||
|         ))); | ||||
|     } | ||||
|  | ||||
|     // Verify permissions (public => RW; private => use key) | ||||
|     let perms_opt = match crate::admin_meta::verify_access( | ||||
|         &server.option.dir, | ||||
|         server.option.backend.clone(), | ||||
|         &server.option.admin_secret, | ||||
|         db, | ||||
|         key.as_deref(), | ||||
|     ) { | ||||
|         Ok(p) => p, | ||||
|         Err(e) => return Ok(Protocol::err(&e.0)), | ||||
|     }; | ||||
|  | ||||
|     // Set selected database and permissions | ||||
|     let perms = match perms_opt { | ||||
|         Some(p) => p, | ||||
|         None => return Ok(Protocol::err("ERR invalid access key")), | ||||
|     }; | ||||
|  | ||||
|     // Set selected database and permissions, then open storage | ||||
|     server.selected_db = db; | ||||
|     server.current_permissions = permissions; | ||||
|     server.current_permissions = Some(perms); | ||||
|  | ||||
|     // Test if we can access the database (this will create it if needed) | ||||
|     match server.current_storage() { | ||||
|         Ok(_) => Ok(Protocol::SimpleString("OK".to_string())), | ||||
|         Err(e) => Ok(Protocol::err(&e.0)), | ||||
|   | ||||
| @@ -10,3 +10,4 @@ pub mod server; | ||||
| pub mod storage; | ||||
| pub mod storage_trait;  // Add this | ||||
| pub mod storage_sled;   // Add this | ||||
| pub mod admin_meta; | ||||
|   | ||||
							
								
								
									
										36
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										36
									
								
								src/main.rs
									
									
									
									
									
								
							| @@ -23,12 +23,11 @@ struct Args { | ||||
|     #[arg(long)] | ||||
|     debug: bool, | ||||
|  | ||||
|  | ||||
|     /// Master encryption key for encrypted databases | ||||
|     /// Master encryption key for encrypted databases (deprecated; ignored for data DBs) | ||||
|     #[arg(long)] | ||||
|     encryption_key: Option<String>, | ||||
|  | ||||
|     /// Encrypt the database | ||||
|     /// Encrypt the database (deprecated; ignored for data DBs) | ||||
|     #[arg(long)] | ||||
|     encrypt: bool, | ||||
|  | ||||
| @@ -43,6 +42,10 @@ struct Args { | ||||
|     /// Use the sled backend | ||||
|     #[arg(long)] | ||||
|     sled: bool, | ||||
|  | ||||
|     /// Admin secret used to encrypt DB 0 and authorize admin access (required) | ||||
|     #[arg(long)] | ||||
|     admin_secret: String, | ||||
| } | ||||
|  | ||||
| #[tokio::main] | ||||
| @@ -57,6 +60,16 @@ async fn main() { | ||||
|         .await | ||||
|         .unwrap(); | ||||
|  | ||||
|     // deprecation warnings for legacy flags | ||||
|     if args.encrypt || args.encryption_key.is_some() { | ||||
|         eprintln!("warning: --encrypt and --encryption-key are deprecated and ignored for data DBs. Admin DB 0 is always encrypted with --admin-secret."); | ||||
|     } | ||||
|     // basic validation for admin secret | ||||
|     if args.admin_secret.trim().is_empty() { | ||||
|         eprintln!("error: --admin-secret must not be empty"); | ||||
|         std::process::exit(2); | ||||
|     } | ||||
|  | ||||
|     // new DB option | ||||
|     let option = herodb::options::DBOption { | ||||
|         dir: args.dir.clone(), | ||||
| @@ -69,18 +82,19 @@ async fn main() { | ||||
|         } else { | ||||
|             herodb::options::BackendType::Redb | ||||
|         }, | ||||
|         admin_secret: args.admin_secret.clone(), | ||||
|     }; | ||||
|  | ||||
|     let backend = option.backend.clone(); | ||||
|  | ||||
|     // Bootstrap admin DB 0 before opening any server storage | ||||
|     if let Err(e) = herodb::admin_meta::ensure_bootstrap(&args.dir, backend.clone(), &args.admin_secret) { | ||||
|         eprintln!("Failed to bootstrap admin DB 0: {}", e.0); | ||||
|         std::process::exit(2); | ||||
|     } | ||||
|  | ||||
|     // new server | ||||
|     let mut server = server::Server::new(option).await; | ||||
|  | ||||
|     // Initialize the default database storage (creates 0.db) | ||||
|     let _ = server.current_storage(); | ||||
|  | ||||
|     // Ensure default meta for DB 0 exists (public by default if missing) | ||||
|     let _ = herodb::rpc::RpcServerImpl::load_meta_static(&server.option.dir, 0).await; | ||||
|     let server = server::Server::new(option).await; | ||||
|  | ||||
|     // Add a small delay to ensure the port is ready | ||||
|     tokio::time::sleep(std::time::Duration::from_millis(100)).await; | ||||
| @@ -90,7 +104,7 @@ async fn main() { | ||||
|         let rpc_addr = format!("127.0.0.1:{}", args.rpc_port).parse().unwrap(); | ||||
|         let base_dir = args.dir.clone(); | ||||
|  | ||||
|         match rpc_server::start_rpc_server(rpc_addr, base_dir, backend).await { | ||||
|         match rpc_server::start_rpc_server(rpc_addr, base_dir, backend, args.admin_secret.clone()).await { | ||||
|             Ok(handle) => { | ||||
|                 println!("RPC management server started on port {}", args.rpc_port); | ||||
|                 Some(handle) | ||||
|   | ||||
| @@ -9,7 +9,11 @@ pub struct DBOption { | ||||
|     pub dir: String, | ||||
|     pub port: u16, | ||||
|     pub debug: bool, | ||||
|     // Deprecated for data DBs; retained for backward-compat on CLI parsing | ||||
|     pub encrypt: bool, | ||||
|     // Deprecated for data DBs; retained for backward-compat on CLI parsing | ||||
|     pub encryption_key: Option<String>, | ||||
|     pub backend: BackendType, | ||||
|     // New: required admin secret, used to encrypt DB 0 and authorize admin operations | ||||
|     pub admin_secret: String, | ||||
| } | ||||
|   | ||||
							
								
								
									
										195
									
								
								src/rpc.rs
									
									
									
									
									
								
							
							
						
						
									
										195
									
								
								src/rpc.rs
									
									
									
									
									
								
							| @@ -7,6 +7,7 @@ use sha2::{Digest, Sha256}; | ||||
|  | ||||
| use crate::server::Server; | ||||
| use crate::options::DBOption; | ||||
| use crate::admin_meta; | ||||
|  | ||||
| /// Database backend types | ||||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||||
| @@ -140,11 +141,13 @@ pub struct RpcServerImpl { | ||||
|     backend: crate::options::BackendType, | ||||
|     /// Encryption keys for databases | ||||
|     encryption_keys: Arc<RwLock<HashMap<u64, Option<String>>>>, | ||||
|     /// Admin secret used to encrypt DB 0 and authorize admin access | ||||
|     admin_secret: String, | ||||
| } | ||||
|  | ||||
| impl RpcServerImpl { | ||||
|     /// Create a new RPC server instance | ||||
|     pub fn new(base_dir: String, backend: crate::options::BackendType) -> Self { | ||||
|     pub fn new(base_dir: String, backend: crate::options::BackendType, admin_secret: String) -> Self { | ||||
|         Self { | ||||
|             base_dir, | ||||
|             servers: Arc::new(RwLock::new(HashMap::new())), | ||||
| @@ -152,6 +155,7 @@ impl RpcServerImpl { | ||||
|             next_encrypted_id: Arc::new(RwLock::new(10)), | ||||
|             backend, | ||||
|             encryption_keys: Arc::new(RwLock::new(HashMap::new())), | ||||
|             admin_secret, | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -165,9 +169,10 @@ impl RpcServerImpl { | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Check if database file exists | ||||
|         let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_id)); | ||||
|         if !db_path.exists() { | ||||
|         // Validate existence via admin DB 0 (metadata), not filesystem presence | ||||
|         let exists = admin_meta::db_exists(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id) | ||||
|             .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; | ||||
|         if !exists { | ||||
|             return Err(jsonrpsee::types::ErrorObjectOwned::owned( | ||||
|                 -32000, | ||||
|                 format!("Database {} not found", db_id), | ||||
| @@ -183,13 +188,17 @@ impl RpcServerImpl { | ||||
|             encryption_key: None, | ||||
|             encrypt: false, | ||||
|             backend: self.backend.clone(), | ||||
|             admin_secret: self.admin_secret.clone(), | ||||
|         }; | ||||
|  | ||||
|         let mut server = Server::new(db_option).await; | ||||
|  | ||||
|         // Set the selected database to the db_id for proper file naming | ||||
|         // Set the selected database to the db_id | ||||
|         server.selected_db = db_id; | ||||
|  | ||||
|         // Lazily open/create physical storage according to admin meta (per-db encryption) | ||||
|         let _ = server.current_storage(); | ||||
|  | ||||
|         // Store the server | ||||
|         let mut servers = self.servers.write().await; | ||||
|         servers.insert(db_id, Arc::new(server.clone())); | ||||
| @@ -197,27 +206,10 @@ impl RpcServerImpl { | ||||
|         Ok(Arc::new(server)) | ||||
|     } | ||||
|  | ||||
|     /// Discover existing database files in the base directory | ||||
|     /// Discover existing database IDs from admin DB 0 | ||||
|     async fn discover_databases(&self) -> Vec<u64> { | ||||
|         let mut db_ids = Vec::new(); | ||||
|  | ||||
|         if let Ok(entries) = std::fs::read_dir(&self.base_dir) { | ||||
|             for entry in entries.flatten() { | ||||
|                 if let Ok(file_name) = entry.file_name().into_string() { | ||||
|                     // Check if it's a database file (ends with .db) | ||||
|                     if file_name.ends_with(".db") { | ||||
|                         // Extract database ID from filename (e.g., "11.db" -> 11) | ||||
|                         if let Some(id_str) = file_name.strip_suffix(".db") { | ||||
|                             if let Ok(db_id) = id_str.parse::<u64>() { | ||||
|                                 db_ids.push(db_id); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         db_ids | ||||
|         admin_meta::list_dbs(&self.base_dir, self.backend.clone(), &self.admin_secret) | ||||
|             .unwrap_or_default() | ||||
|     } | ||||
|  | ||||
|     /// Get the next available database ID | ||||
| @@ -431,76 +423,52 @@ impl RpcServer for RpcServerImpl { | ||||
|     async fn create_database( | ||||
|         &self, | ||||
|         backend: BackendType, | ||||
|         config: DatabaseConfig, | ||||
|         _config: DatabaseConfig, | ||||
|         encryption_key: Option<String>, | ||||
|     ) -> RpcResult<u64> { | ||||
|         let db_id = self.get_next_db_id(encryption_key.is_some()).await; | ||||
|         // Allocate new ID via admin DB 0 | ||||
|         let db_id = admin_meta::allocate_next_id(&self.base_dir, self.backend.clone(), &self.admin_secret) | ||||
|             .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; | ||||
|  | ||||
|         // Handle both Redb and Sled backends | ||||
|         match backend { | ||||
|             BackendType::Redb | BackendType::Sled => { | ||||
|                 // Create database directory | ||||
|                 let db_dir = if let Some(path) = &config.storage_path { | ||||
|                     std::path::PathBuf::from(path) | ||||
|                 } else { | ||||
|                     std::path::PathBuf::from(&self.base_dir).join(format!("rpc_db_{}", db_id)) | ||||
|                 }; | ||||
|  | ||||
|                 // Ensure directory exists | ||||
|                 std::fs::create_dir_all(&db_dir) | ||||
|                     .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned( | ||||
|                         -32000, | ||||
|                         format!("Failed to create directory: {}", e), | ||||
|                         None::<()> | ||||
|                     ))?; | ||||
|  | ||||
|                 // Create DB options | ||||
|                 let encrypt = encryption_key.is_some(); | ||||
|                 let option = DBOption { | ||||
|                     dir: db_dir.to_string_lossy().to_string(), | ||||
|                     port: 0, // Not used for RPC-managed databases | ||||
|                     debug: false, | ||||
|                     encryption_key: encryption_key.clone(), | ||||
|                     encrypt, | ||||
|                     backend: match backend { | ||||
|                         BackendType::Redb => crate::options::BackendType::Redb, | ||||
|                         BackendType::Sled => crate::options::BackendType::Sled, | ||||
|                     }, | ||||
|                 }; | ||||
|  | ||||
|                 // Create server instance | ||||
|                 let mut server = Server::new(option).await; | ||||
|  | ||||
|                 // Set the selected database to the db_id for proper file naming | ||||
|                 server.selected_db = db_id; | ||||
|  | ||||
|                 // Initialize the storage to create the database file | ||||
|                 let _ = server.current_storage(); | ||||
|  | ||||
|                 // Store the encryption key | ||||
|                 { | ||||
|                     let mut keys = self.encryption_keys.write().await; | ||||
|                     keys.insert(db_id, encryption_key.clone()); | ||||
|                 } | ||||
|  | ||||
|                 // Initialize meta file | ||||
|                 let meta = DatabaseMeta { | ||||
|                     public: true, | ||||
|                     keys: HashMap::new(), | ||||
|                 }; | ||||
|                 self.save_meta(db_id, &meta).await?; | ||||
|  | ||||
|                 // Store the server | ||||
|                 let mut servers = self.servers.write().await; | ||||
|                 servers.insert(db_id, Arc::new(server)); | ||||
|  | ||||
|                 Ok(db_id) | ||||
|             } | ||||
|         // Persist per-db encryption key in admin DB 0 if provided | ||||
|         if let Some(ref key) = encryption_key { | ||||
|             admin_meta::set_enc_key(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, key) | ||||
|                 .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; | ||||
|         } | ||||
|  | ||||
|         // Ensure base dir exists | ||||
|         if let Err(e) = std::fs::create_dir_all(&self.base_dir) { | ||||
|             return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, format!("Failed to ensure base dir: {}", e), None::<()>)); | ||||
|         } | ||||
|  | ||||
|         // Create server instance using base_dir and admin secret | ||||
|         let option = DBOption { | ||||
|             dir: self.base_dir.clone(), | ||||
|             port: 0, // Not used for RPC-managed databases | ||||
|             debug: false, | ||||
|             encryption_key: None,  // per-db key is stored in admin DB 0 | ||||
|             encrypt: false,        // encryption decided per-db at open time | ||||
|             backend: match backend { | ||||
|                 BackendType::Redb => crate::options::BackendType::Redb, | ||||
|                 BackendType::Sled => crate::options::BackendType::Sled, | ||||
|             }, | ||||
|             admin_secret: self.admin_secret.clone(), | ||||
|         }; | ||||
|  | ||||
|         let mut server = Server::new(option).await; | ||||
|         server.selected_db = db_id; | ||||
|  | ||||
|         // Initialize storage to create physical <id>.db with proper encryption from admin meta | ||||
|         let _ = server.current_storage(); | ||||
|  | ||||
|         // Store the server in cache | ||||
|         let mut servers = self.servers.write().await; | ||||
|         servers.insert(db_id, Arc::new(server)); | ||||
|  | ||||
|         Ok(db_id) | ||||
|     } | ||||
|  | ||||
|     async fn set_encryption(&self, db_id: u64, _encryption_key: String) -> RpcResult<bool> { | ||||
|         // Note: In a real implementation, we'd need to modify the existing database | ||||
|         // For now, return false as encryption can only be set during creation | ||||
|         let _servers = self.servers.read().await; | ||||
|         // TODO: Implement encryption setting for existing databases | ||||
| @@ -564,8 +532,6 @@ impl RpcServer for RpcServerImpl { | ||||
|     } | ||||
|  | ||||
|     async fn add_access_key(&self, db_id: u64, key: String, permissions: String) -> RpcResult<bool> { | ||||
|         let mut meta = self.load_meta(db_id).await?; | ||||
|  | ||||
|         let perms = match permissions.to_lowercase().as_str() { | ||||
|             "read" => Permissions::Read, | ||||
|             "readwrite" => Permissions::ReadWrite, | ||||
| @@ -576,52 +542,31 @@ impl RpcServer for RpcServerImpl { | ||||
|             )), | ||||
|         }; | ||||
|  | ||||
|         let hash = hash_key(&key); | ||||
|         let access_key = AccessKey { | ||||
|             hash: hash.clone(), | ||||
|             permissions: perms, | ||||
|             created_at: std::time::SystemTime::now() | ||||
|                 .duration_since(std::time::UNIX_EPOCH) | ||||
|                 .unwrap() | ||||
|                 .as_secs(), | ||||
|         }; | ||||
|  | ||||
|         meta.keys.insert(hash, access_key); | ||||
|         self.save_meta(db_id, &meta).await?; | ||||
|         admin_meta::add_access_key(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, &key, perms) | ||||
|             .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; | ||||
|         Ok(true) | ||||
|     } | ||||
|  | ||||
|     async fn delete_access_key(&self, db_id: u64, key_hash: String) -> RpcResult<bool> { | ||||
|         let mut meta = self.load_meta(db_id).await?; | ||||
|  | ||||
|         if meta.keys.remove(&key_hash).is_some() { | ||||
|             // If no keys left, make database public | ||||
|             if meta.keys.is_empty() { | ||||
|                 meta.public = true; | ||||
|             } | ||||
|             self.save_meta(db_id, &meta).await?; | ||||
|             Ok(true) | ||||
|         } else { | ||||
|             Ok(false) | ||||
|         } | ||||
|         let ok = admin_meta::delete_access_key(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, &key_hash) | ||||
|             .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; | ||||
|         Ok(ok) | ||||
|     } | ||||
|  | ||||
|     async fn list_access_keys(&self, db_id: u64) -> RpcResult<Vec<AccessKeyInfo>> { | ||||
|         let meta = self.load_meta(db_id).await?; | ||||
|         let keys: Vec<AccessKeyInfo> = meta.keys.values() | ||||
|             .map(|k| AccessKeyInfo { | ||||
|                 hash: k.hash.clone(), | ||||
|                 permissions: k.permissions.clone(), | ||||
|                 created_at: k.created_at, | ||||
|             }) | ||||
|             .collect(); | ||||
|         let pairs = admin_meta::list_access_keys(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id) | ||||
|             .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; | ||||
|         let keys: Vec<AccessKeyInfo> = pairs.into_iter().map(|(hash, perm, ts)| AccessKeyInfo { | ||||
|             hash, | ||||
|             permissions: perm, | ||||
|             created_at: ts, | ||||
|         }).collect(); | ||||
|         Ok(keys) | ||||
|     } | ||||
|  | ||||
|     async fn set_database_public(&self, db_id: u64, public: bool) -> RpcResult<bool> { | ||||
|         let mut meta = self.load_meta(db_id).await?; | ||||
|         meta.public = public; | ||||
|         self.save_meta(db_id, &meta).await?; | ||||
|         admin_meta::set_database_public(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, public) | ||||
|             .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; | ||||
|         Ok(true) | ||||
|     } | ||||
| } | ||||
| @@ -5,9 +5,9 @@ use jsonrpsee::RpcModule; | ||||
| use crate::rpc::{RpcServer, RpcServerImpl}; | ||||
|  | ||||
| /// Start the RPC server on the specified address | ||||
| pub async fn start_rpc_server(addr: SocketAddr, base_dir: String, backend: crate::options::BackendType) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> { | ||||
| pub async fn start_rpc_server(addr: SocketAddr, base_dir: String, backend: crate::options::BackendType, admin_secret: String) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> { | ||||
|     // Create the RPC server implementation | ||||
|     let rpc_impl = RpcServerImpl::new(base_dir, backend); | ||||
|     let rpc_impl = RpcServerImpl::new(base_dir, backend, admin_secret); | ||||
|  | ||||
|     // Create the RPC module | ||||
|     let mut module = RpcModule::new(()); | ||||
| @@ -37,7 +37,7 @@ mod tests { | ||||
|         let base_dir = "/tmp/test_rpc".to_string(); | ||||
|         let backend = crate::options::BackendType::Redb; // Default for test | ||||
|  | ||||
|         let handle = start_rpc_server(addr, base_dir, backend).await.unwrap(); | ||||
|         let handle = start_rpc_server(addr, base_dir, backend, "test-admin".to_string()).await.unwrap(); | ||||
|  | ||||
|         // Give the server a moment to start | ||||
|         tokio::time::sleep(Duration::from_millis(100)).await; | ||||
|   | ||||
| @@ -11,9 +11,8 @@ use crate::cmd::Cmd; | ||||
| use crate::error::DBError; | ||||
| use crate::options; | ||||
| use crate::protocol::Protocol; | ||||
| use crate::storage::Storage; | ||||
| use crate::storage_sled::SledStorage; | ||||
| use crate::storage_trait::StorageBackend; | ||||
| use crate::admin_meta; | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct Server { | ||||
| @@ -58,50 +57,33 @@ impl Server { | ||||
|  | ||||
|     pub fn current_storage(&self) -> Result<Arc<dyn StorageBackend>, DBError> { | ||||
|         let mut cache = self.db_cache.write().unwrap(); | ||||
|          | ||||
|  | ||||
|         if let Some(storage) = cache.get(&self.selected_db) { | ||||
|             return Ok(storage.clone()); | ||||
|         } | ||||
|          | ||||
|          | ||||
|         // Create new database file | ||||
|         let db_file_path = std::path::PathBuf::from(self.option.dir.clone()) | ||||
|             .join(format!("{}.db", self.selected_db)); | ||||
|          | ||||
|         // Ensure the directory exists before creating the database file | ||||
|         if let Some(parent_dir) = db_file_path.parent() { | ||||
|             std::fs::create_dir_all(parent_dir).map_err(|e| { | ||||
|                 DBError(format!("Failed to create directory {}: {}", parent_dir.display(), e)) | ||||
|             })?; | ||||
|         } | ||||
|          | ||||
|         println!("Creating new db file: {}", db_file_path.display()); | ||||
|          | ||||
|         let storage: Arc<dyn StorageBackend> = match self.option.backend { | ||||
|             options::BackendType::Redb => { | ||||
|                 Arc::new(Storage::new( | ||||
|                     db_file_path, | ||||
|                     self.should_encrypt_db(self.selected_db), | ||||
|                     self.option.encryption_key.as_deref() | ||||
|                 )?) | ||||
|             } | ||||
|             options::BackendType::Sled => { | ||||
|                 Arc::new(SledStorage::new( | ||||
|                     db_file_path, | ||||
|                     self.should_encrypt_db(self.selected_db), | ||||
|                     self.option.encryption_key.as_deref() | ||||
|                 )?) | ||||
|             } | ||||
|  | ||||
|         // Use process-wide shared handles to avoid sled/reDB double-open lock contention. | ||||
|         let storage = if self.selected_db == 0 { | ||||
|             // Admin DB 0: always via singleton | ||||
|             admin_meta::open_admin_storage( | ||||
|                 &self.option.dir, | ||||
|                 self.option.backend.clone(), | ||||
|                 &self.option.admin_secret, | ||||
|             )? | ||||
|         } else { | ||||
|             // Data DBs: via global registry keyed by id | ||||
|             admin_meta::open_data_storage( | ||||
|                 &self.option.dir, | ||||
|                 self.option.backend.clone(), | ||||
|                 &self.option.admin_secret, | ||||
|                 self.selected_db, | ||||
|             )? | ||||
|         }; | ||||
|          | ||||
|  | ||||
|         cache.insert(self.selected_db, storage.clone()); | ||||
|         Ok(storage) | ||||
|     } | ||||
|      | ||||
|     fn should_encrypt_db(&self, db_index: u64) -> bool { | ||||
|         // DB 0-9 are non-encrypted, DB 10+ are encrypted | ||||
|         self.option.encrypt && db_index >= 10 | ||||
|     } | ||||
|  | ||||
|     /// Check if current permissions allow read operations | ||||
|     pub fn has_read_permission(&self) -> bool { | ||||
|   | ||||
| @@ -28,6 +28,7 @@ async fn debug_hset_simple() { | ||||
|         encrypt: false, | ||||
|         encryption_key: None, | ||||
|         backend: herodb::options::BackendType::Redb, | ||||
|         admin_secret: "test-admin".to_string(), | ||||
|     }; | ||||
|      | ||||
|     let mut server = Server::new(option).await; | ||||
| @@ -48,6 +49,12 @@ async fn debug_hset_simple() { | ||||
|     sleep(Duration::from_millis(200)).await; | ||||
|      | ||||
|     let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap(); | ||||
|     // Acquire ReadWrite permissions on this connection | ||||
|     let resp = send_command( | ||||
|         &mut stream, | ||||
|         "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n", | ||||
|     ).await; | ||||
|     assert!(resp.contains("OK"), "Failed SELECT handshake: {}", resp); | ||||
|      | ||||
|     // Test simple HSET | ||||
|     println!("Testing HSET..."); | ||||
|   | ||||
| @@ -19,6 +19,7 @@ async fn debug_hset_return_value() { | ||||
|         encrypt: false, | ||||
|         encryption_key: None, | ||||
|         backend: herodb::options::BackendType::Redb, | ||||
|         admin_secret: "test-admin".to_string(), | ||||
|     }; | ||||
|      | ||||
|     let mut server = Server::new(option).await; | ||||
| @@ -40,12 +41,19 @@ async fn debug_hset_return_value() { | ||||
|      | ||||
|     // Connect and test HSET | ||||
|     let mut stream = TcpStream::connect("127.0.0.1:16390").await.unwrap(); | ||||
|  | ||||
|     // Acquire ReadWrite permissions for this new connection | ||||
|     let handshake = "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n"; | ||||
|     stream.write_all(handshake.as_bytes()).await.unwrap(); | ||||
|     let mut buffer = [0; 1024]; | ||||
|     let n = stream.read(&mut buffer).await.unwrap(); | ||||
|     let resp = String::from_utf8_lossy(&buffer[..n]); | ||||
|     assert!(resp.contains("OK"), "Failed SELECT handshake: {}", resp); | ||||
|      | ||||
|     // Send HSET command | ||||
|     let cmd = "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n"; | ||||
|     stream.write_all(cmd.as_bytes()).await.unwrap(); | ||||
|      | ||||
|     let mut buffer = [0; 1024]; | ||||
|     let n = stream.read(&mut buffer).await.unwrap(); | ||||
|     let response = String::from_utf8_lossy(&buffer[..n]); | ||||
|      | ||||
|   | ||||
| @@ -12,7 +12,15 @@ fn get_redis_connection(port: u16) -> Connection { | ||||
|         match client.get_connection() { | ||||
|             Ok(mut conn) => { | ||||
|                 if redis::cmd("PING").query::<String>(&mut conn).is_ok() { | ||||
|                     return conn; | ||||
|                     // Acquire ReadWrite permissions on this connection | ||||
|                     let sel: RedisResult<String> = redis::cmd("SELECT") | ||||
|                         .arg(0) | ||||
|                         .arg("KEY") | ||||
|                         .arg("test-admin") | ||||
|                         .query(&mut conn); | ||||
|                     if sel.is_ok() { | ||||
|                         return conn; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             Err(e) => { | ||||
| @@ -78,6 +86,8 @@ fn setup_server() -> (ServerProcessGuard, u16) { | ||||
|             "--port", | ||||
|             &port.to_string(), | ||||
|             "--debug", | ||||
|             "--admin-secret", | ||||
|             "test-admin", | ||||
|         ]) | ||||
|         .spawn() | ||||
|         .expect("Failed to start server process"); | ||||
|   | ||||
| @@ -23,18 +23,29 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { | ||||
|         encrypt: false, | ||||
|         encryption_key: None, | ||||
|         backend: herodb::options::BackendType::Redb, | ||||
|         admin_secret: "test-admin".to_string(), | ||||
|     }; | ||||
|      | ||||
|     let server = Server::new(option).await; | ||||
|     (server, port) | ||||
| } | ||||
|  | ||||
| // Helper function to connect to the test server | ||||
|  // Helper function to connect to the test server | ||||
| async fn connect_to_server(port: u16) -> TcpStream { | ||||
|     let mut attempts = 0; | ||||
|     loop { | ||||
|         match TcpStream::connect(format!("127.0.0.1:{}", port)).await { | ||||
|             Ok(stream) => return stream, | ||||
|             Ok(mut stream) => { | ||||
|                 // Obtain ReadWrite permissions for this connection by selecting DB 0 with admin key | ||||
|                 let resp = send_command( | ||||
|                     &mut stream, | ||||
|                     "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n", | ||||
|                 ).await; | ||||
|                 if !resp.contains("OK") { | ||||
|                     panic!("Failed to acquire write permissions via SELECT 0 KEY test-admin: {}", resp); | ||||
|                 } | ||||
|                 return stream; | ||||
|             } | ||||
|             Err(_) if attempts < 10 => { | ||||
|                 attempts += 1; | ||||
|                 sleep(Duration::from_millis(100)).await; | ||||
|   | ||||
| @@ -25,6 +25,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { | ||||
|         encrypt: false, | ||||
|         encryption_key: None, | ||||
|         backend: herodb::options::BackendType::Redb, | ||||
|         admin_secret: "test-admin".to_string(), | ||||
|     }; | ||||
|      | ||||
|     let server = Server::new(option).await; | ||||
| @@ -34,9 +35,16 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { | ||||
| // Helper function to send Redis command and get response | ||||
| async fn send_redis_command(port: u16, command: &str) -> String { | ||||
|     let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap(); | ||||
|      | ||||
|     // Acquire ReadWrite permissions on this new connection | ||||
|     let handshake = "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n"; | ||||
|     stream.write_all(handshake.as_bytes()).await.unwrap(); | ||||
|     let mut buffer = [0; 1024]; | ||||
|     let _ = stream.read(&mut buffer).await.unwrap(); // Read and ignore the OK for handshake | ||||
|      | ||||
|     // Now send the intended command | ||||
|     stream.write_all(command.as_bytes()).await.unwrap(); | ||||
|      | ||||
|     let mut buffer = [0; 1024]; | ||||
|     let n = stream.read(&mut buffer).await.unwrap(); | ||||
|     String::from_utf8_lossy(&buffer[..n]).to_string() | ||||
| } | ||||
| @@ -184,12 +192,19 @@ async fn test_transaction_operations() { | ||||
|      | ||||
|     sleep(Duration::from_millis(100)).await; | ||||
|      | ||||
|     // Use a single connection for the transaction | ||||
|      // Use a single connection for the transaction | ||||
|     let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap(); | ||||
|      | ||||
|     // Acquire write permissions for this connection | ||||
|     let handshake = "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n"; | ||||
|     stream.write_all(handshake.as_bytes()).await.unwrap(); | ||||
|     let mut buffer = [0; 1024]; | ||||
|     let n = stream.read(&mut buffer).await.unwrap(); | ||||
|     let resp = String::from_utf8_lossy(&buffer[..n]); | ||||
|     assert!(resp.contains("OK")); | ||||
|      | ||||
|     // Test MULTI | ||||
|     stream.write_all("*1\r\n$5\r\nMULTI\r\n".as_bytes()).await.unwrap(); | ||||
|     let mut buffer = [0; 1024]; | ||||
|     let n = stream.read(&mut buffer).await.unwrap(); | ||||
|     let response = String::from_utf8_lossy(&buffer[..n]); | ||||
|     assert!(response.contains("OK")); | ||||
|   | ||||
| @@ -23,6 +23,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { | ||||
|         encrypt: false, | ||||
|         encryption_key: None, | ||||
|         backend: herodb::options::BackendType::Redb, | ||||
|         admin_secret: "test-admin".to_string(), | ||||
|     }; | ||||
|      | ||||
|     let server = Server::new(option).await; | ||||
| @@ -38,12 +39,22 @@ async fn send_command(stream: &mut TcpStream, command: &str) -> String { | ||||
|     String::from_utf8_lossy(&buffer[..n]).to_string() | ||||
| } | ||||
|  | ||||
| // Helper function to connect to the test server | ||||
|  // Helper function to connect to the test server | ||||
| async fn connect_to_server(port: u16) -> TcpStream { | ||||
|     let mut attempts = 0; | ||||
|     loop { | ||||
|         match TcpStream::connect(format!("127.0.0.1:{}", port)).await { | ||||
|             Ok(stream) => return stream, | ||||
|             Ok(mut stream) => { | ||||
|                 // Acquire ReadWrite permissions for this connection | ||||
|                 let resp = send_command( | ||||
|                     &mut stream, | ||||
|                     "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n", | ||||
|                 ).await; | ||||
|                 if !resp.contains("OK") { | ||||
|                     panic!("Failed to acquire write permissions via SELECT 0 KEY test-admin: {}", resp); | ||||
|                 } | ||||
|                 return stream; | ||||
|             } | ||||
|             Err(_) if attempts < 10 => { | ||||
|                 attempts += 1; | ||||
|                 sleep(Duration::from_millis(100)).await; | ||||
| @@ -97,14 +108,21 @@ async fn test_hset_clean_db() { | ||||
|     sleep(Duration::from_millis(200)).await; | ||||
|      | ||||
|     let mut stream = connect_to_server(port).await; | ||||
|      | ||||
|     // Test HSET - should return 1 for new field | ||||
|     let response = send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n").await; | ||||
|  | ||||
|     // Ensure clean DB state (admin DB 0 may be shared due to global singleton) | ||||
|     let flush = send_command(&mut stream, "*1\r\n$7\r\nFLUSHDB\r\n").await; | ||||
|     assert!(flush.contains("OK"), "Failed to FLUSHDB: {}", flush); | ||||
|  | ||||
|     // Test HSET - should return 1 for new field (use a unique key name to avoid collisions) | ||||
|     let key = "hash_clean"; | ||||
|     let hset_cmd = format!("*4\r\n$4\r\nHSET\r\n${}\r\n{}\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n", key.len(), key); | ||||
|     let response = send_command(&mut stream, &hset_cmd).await; | ||||
|     println!("HSET response: {}", response); | ||||
|     assert!(response.contains("1"), "Expected HSET to return 1, got: {}", response); | ||||
|      | ||||
|     // Test HGET | ||||
|     let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; | ||||
|     let hget_cmd = format!("*3\r\n$4\r\nHGET\r\n${}\r\n{}\r\n$6\r\nfield1\r\n", key.len(), key); | ||||
|     let response = send_command(&mut stream, &hget_cmd).await; | ||||
|     println!("HGET response: {}", response); | ||||
|     assert!(response.contains("value1")); | ||||
| } | ||||
|   | ||||
| @@ -23,6 +23,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { | ||||
|         encrypt: false, | ||||
|         encryption_key: None, | ||||
|         backend: herodb::options::BackendType::Redb, | ||||
|         admin_secret: "test-admin".to_string(), | ||||
|     }; | ||||
|  | ||||
|     let server = Server::new(option).await; | ||||
| @@ -61,7 +62,17 @@ async fn connect(port: u16) -> TcpStream { | ||||
|     let mut attempts = 0; | ||||
|     loop { | ||||
|         match TcpStream::connect(format!("127.0.0.1:{}", port)).await { | ||||
|             Ok(s) => return s, | ||||
|             Ok(mut s) => { | ||||
|                 // Acquire ReadWrite permissions for this connection using admin DB 0 | ||||
|                 let resp = send_cmd(&mut s, &["SELECT", "0", "KEY", "test-admin"]).await; | ||||
|                 assert_contains(&resp, "OK", "SELECT 0 KEY test-admin handshake"); | ||||
|  | ||||
|                 // Ensure clean slate per test on DB 0 | ||||
|                 let fl = send_cmd(&mut s, &["FLUSHDB"]).await; | ||||
|                 assert_contains(&fl, "OK", "FLUSHDB after handshake"); | ||||
|  | ||||
|                 return s; | ||||
|             } | ||||
|             Err(_) if attempts < 30 => { | ||||
|                 attempts += 1; | ||||
|                 sleep(Duration::from_millis(100)).await; | ||||
| @@ -246,9 +257,9 @@ async fn test_01_connection_and_info() { | ||||
|     let getname = send_cmd(&mut s, &["CLIENT", "GETNAME"]).await; | ||||
|     assert_contains(&getname, "myapp", "CLIENT GETNAME"); | ||||
|  | ||||
|     // SELECT db | ||||
|     let sel = send_cmd(&mut s, &["SELECT", "0"]).await; | ||||
|     assert_contains(&sel, "OK", "SELECT 0"); | ||||
|     // SELECT db (requires key on DB 0) | ||||
|     let sel = send_cmd(&mut s, &["SELECT", "0", "KEY", "test-admin"]).await; | ||||
|     assert_contains(&sel, "OK", "SELECT 0 with key"); | ||||
|  | ||||
|     // QUIT should close connection after sending OK | ||||
|     let quit = send_cmd(&mut s, &["QUIT"]).await; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user