diff --git a/src/admin_meta.rs b/src/admin_meta.rs new file mode 100644 index 0000000..78ba22a --- /dev/null +++ b/src/admin_meta.rs @@ -0,0 +1,361 @@ +use std::path::PathBuf; +use std::sync::{Arc, OnceLock, Mutex, RwLock}; +use std::collections::HashMap; + +use crate::error::DBError; +use crate::options; +use crate::rpc::Permissions; +use crate::storage::Storage; +use crate::storage_sled::SledStorage; +use crate::storage_trait::StorageBackend; + +// Key builders +fn k_admin_next_id() -> &'static str { + "admin:next_id" +} +fn k_admin_dbs() -> &'static str { + "admin:dbs" +} +fn k_meta_db(id: u64) -> String { + format!("meta:db:{}", id) +} +fn k_meta_db_keys(id: u64) -> String { + format!("meta:db:{}:keys", id) +} +fn k_meta_db_enc(id: u64) -> String { + format!("meta:db:{}:enc", id) +} + +// Global cache of admin DB 0 handles per base_dir to avoid sled/reDB file-lock contention +// and to correctly isolate different test instances with distinct directories. +static ADMIN_STORAGES: OnceLock>>> = OnceLock::new(); + +// Global registry for data DB storages to avoid double-open across process. +static DATA_STORAGES: OnceLock>>> = OnceLock::new(); +static DATA_INIT_LOCK: Mutex<()> = Mutex::new(()); + +fn init_admin_storage( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, +) -> Result, DBError> { + let db_file = PathBuf::from(base_dir).join("0.db"); + if let Some(parent_dir) = db_file.parent() { + std::fs::create_dir_all(parent_dir).map_err(|e| { + DBError(format!("Failed to create directory {}: {}", parent_dir.display(), e)) + })?; + } + let storage: Arc = match backend { + options::BackendType::Redb => Arc::new(Storage::new(&db_file, true, Some(admin_secret))?), + options::BackendType::Sled => Arc::new(SledStorage::new(&db_file, true, Some(admin_secret))?), + }; + Ok(storage) +} + +// Get or initialize a cached handle to admin DB 0 per base_dir (thread-safe, no double-open race) +pub fn open_admin_storage( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, +) -> Result, DBError> { + let map = ADMIN_STORAGES.get_or_init(|| RwLock::new(HashMap::new())); + // Fast path + if let Some(st) = map.read().unwrap().get(base_dir) { + return Ok(st.clone()); + } + // Slow path with write lock + { + let mut w = map.write().unwrap(); + if let Some(st) = w.get(base_dir) { + return Ok(st.clone()); + } + let st = init_admin_storage(base_dir, backend, admin_secret)?; + w.insert(base_dir.to_string(), st.clone()); + return Ok(st); + } +} + +// Ensure admin structures exist in encrypted DB 0 +pub fn ensure_bootstrap( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, +) -> Result<(), DBError> { + let admin = open_admin_storage(base_dir, backend, admin_secret)?; + + // Initialize next id if missing + if !admin.exists(k_admin_next_id())? { + admin.set(k_admin_next_id().to_string(), "1".to_string())?; + } + // admin:dbs is a hash; it's fine if it doesn't exist (hlen -> 0) + Ok(()) +} + +// Get or initialize a shared handle to a data DB (> 0), avoiding double-open across subsystems +pub fn open_data_storage( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, + id: u64, +) -> Result, DBError> { + if id == 0 { + return open_admin_storage(base_dir, backend, admin_secret); + } + + // Validate existence in admin metadata + if !db_exists(base_dir, backend.clone(), admin_secret, id)? { + return Err(DBError(format!( + "Cannot open database instance {}, as that database instance does not exist.", + id + ))); + } + + let map = DATA_STORAGES.get_or_init(|| RwLock::new(HashMap::new())); + // Fast path + if let Some(st) = map.read().unwrap().get(&id) { + return Ok(st.clone()); + } + + // Slow path with init lock + let _guard = DATA_INIT_LOCK.lock().unwrap(); + if let Some(st) = map.read().unwrap().get(&id) { + return Ok(st.clone()); + } + + // Determine per-db encryption + let enc = get_enc_key(base_dir, backend.clone(), admin_secret, id)?; + let should_encrypt = enc.is_some(); + + // Build database file path and ensure parent dir exists + let db_file = PathBuf::from(base_dir).join(format!("{}.db", id)); + if let Some(parent_dir) = db_file.parent() { + std::fs::create_dir_all(parent_dir).map_err(|e| { + DBError(format!("Failed to create directory {}: {}", parent_dir.display(), e)) + })?; + } + + // Open storage + let storage: Arc = match backend { + options::BackendType::Redb => Arc::new(Storage::new(&db_file, should_encrypt, enc.as_deref())?), + options::BackendType::Sled => Arc::new(SledStorage::new(&db_file, should_encrypt, enc.as_deref())?), + }; + + // Publish to registry + map.write().unwrap().insert(id, storage.clone()); + Ok(storage) +} + +// Allocate the next DB id and persist new pointer +pub fn allocate_next_id( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, +) -> Result { + let admin = open_admin_storage(base_dir, backend, admin_secret)?; + let cur = admin + .get(k_admin_next_id())? + .unwrap_or_else(|| "1".to_string()); + let id: u64 = cur.parse().unwrap_or(1); + let next = id.checked_add(1).ok_or_else(|| DBError("next_id overflow".into()))?; + admin.set(k_admin_next_id().to_string(), next.to_string())?; + + // Register into admin:dbs set/hash + let _ = admin.hset(k_admin_dbs(), vec![(id.to_string(), "1".to_string())])?; + + // Default meta for the new db: public true + let meta_key = k_meta_db(id); + let _ = admin.hset(&meta_key, vec![("public".to_string(), "true".to_string())])?; + + Ok(id) +} + +// Check existence of a db id in admin:dbs +pub fn db_exists( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, + id: u64, +) -> Result { + let admin = open_admin_storage(base_dir, backend, admin_secret)?; + Ok(admin.hexists(k_admin_dbs(), &id.to_string())?) +} + +// Get per-db encryption key, if any +pub fn get_enc_key( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, + id: u64, +) -> Result, DBError> { + let admin = open_admin_storage(base_dir, backend, admin_secret)?; + admin.get(&k_meta_db_enc(id)) +} + +// Set per-db encryption key (called during create) +pub fn set_enc_key( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, + id: u64, + key: &str, +) -> Result<(), DBError> { + let admin = open_admin_storage(base_dir, backend, admin_secret)?; + admin.set(k_meta_db_enc(id), key.to_string()) +} + +// Set database public flag +pub fn set_database_public( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, + id: u64, + public: bool, +) -> Result<(), DBError> { + let admin = open_admin_storage(base_dir, backend, admin_secret)?; + let mk = k_meta_db(id); + let _ = admin.hset(&mk, vec![("public".to_string(), public.to_string())])?; + Ok(()) +} + +// Internal: load public flag; default to true when meta missing +fn load_public( + admin: &Arc, + id: u64, +) -> Result { + let mk = k_meta_db(id); + match admin.hget(&mk, "public")? { + Some(v) => Ok(v == "true"), + None => Ok(true), + } +} + +// Add access key for db (value format: "Read:ts" or "ReadWrite:ts") +pub fn add_access_key( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, + id: u64, + key_plain: &str, + perms: Permissions, +) -> Result<(), DBError> { + let admin = open_admin_storage(base_dir, backend, admin_secret)?; + let hash = crate::rpc::hash_key(key_plain); + let v = match perms { + Permissions::Read => format!("Read:{}", now_secs()), + Permissions::ReadWrite => format!("ReadWrite:{}", now_secs()), + }; + let _ = admin.hset(&k_meta_db_keys(id), vec![(hash, v)])?; + Ok(()) +} + +// Delete access key by hash +pub fn delete_access_key( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, + id: u64, + key_hash: &str, +) -> Result { + let admin = open_admin_storage(base_dir, backend, admin_secret)?; + let n = admin.hdel(&k_meta_db_keys(id), vec![key_hash.to_string()])?; + Ok(n > 0) +} + +// List access keys, returning (hash, perms, created_at_secs) +pub fn list_access_keys( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, + id: u64, +) -> Result, DBError> { + let admin = open_admin_storage(base_dir, backend, admin_secret)?; + let pairs = admin.hgetall(&k_meta_db_keys(id))?; + let mut out = Vec::new(); + for (hash, val) in pairs { + let (perm, ts) = parse_perm_value(&val); + out.push((hash, perm, ts)); + } + Ok(out) +} + +// Verify access permission for db id with optional key +// Returns: +// - Ok(Some(Permissions)) when access is allowed +// - Ok(None) when not allowed or db missing (caller can distinguish by calling db_exists) +pub fn verify_access( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, + id: u64, + key_opt: Option<&str>, +) -> Result, DBError> { + // Admin DB 0: require exact admin_secret + if id == 0 { + if let Some(k) = key_opt { + if k == admin_secret { + return Ok(Some(Permissions::ReadWrite)); + } + } + return Ok(None); + } + + let admin = open_admin_storage(base_dir, backend, admin_secret)?; + if !admin.hexists(k_admin_dbs(), &id.to_string())? { + return Ok(None); + } + + // Public? + if load_public(&admin, id)? { + return Ok(Some(Permissions::ReadWrite)); + } + + // Private: require key and verify + if let Some(k) = key_opt { + let hash = crate::rpc::hash_key(k); + if let Some(v) = admin.hget(&k_meta_db_keys(id), &hash)? { + let (perm, _ts) = parse_perm_value(&v); + return Ok(Some(perm)); + } + } + Ok(None) +} + +// Enumerate all db ids +pub fn list_dbs( + base_dir: &str, + backend: options::BackendType, + admin_secret: &str, +) -> Result, DBError> { + let admin = open_admin_storage(base_dir, backend, admin_secret)?; + let ids = admin.hkeys(k_admin_dbs())?; + let mut out = Vec::new(); + for s in ids { + if let Ok(v) = s.parse() { + out.push(v); + } + } + Ok(out) +} + +// Helper: parse permission value "Read:ts" or "ReadWrite:ts" +fn parse_perm_value(v: &str) -> (Permissions, u64) { + let mut parts = v.split(':'); + let p = parts.next().unwrap_or("Read"); + let ts = parts + .next() + .and_then(|s| s.parse().ok()) + .unwrap_or(0u64); + let perm = match p { + "ReadWrite" => Permissions::ReadWrite, + _ => Permissions::Read, + }; + (perm, ts) +} + +fn now_secs() -> u64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} \ No newline at end of file diff --git a/src/cmd.rs b/src/cmd.rs index e3bcdda..98cbacd 100644 --- a/src/cmd.rs +++ b/src/cmd.rs @@ -768,43 +768,64 @@ async fn flushdb_cmd(server: &mut Server) -> Result { } async fn select_cmd(server: &mut Server, db: u64, key: Option) -> Result { - // Load database metadata - let meta = match crate::rpc::RpcServerImpl::load_meta_static(&server.option.dir, db).await { - Ok(m) => m, - Err(_) => { - // If meta doesn't exist, create default - let default_meta = crate::rpc::DatabaseMeta { - public: true, - keys: std::collections::HashMap::new(), - }; - if let Err(_) = crate::rpc::RpcServerImpl::save_meta_static(&server.option.dir, db, &default_meta).await { - return Ok(Protocol::err("ERR failed to initialize database metadata")); + // Authorization and existence checks via admin DB 0 + // DB 0: require KEY admin-secret + if db == 0 { + match key { + Some(k) if k == server.option.admin_secret => { + server.selected_db = 0; + server.current_permissions = Some(crate::rpc::Permissions::ReadWrite); + // Will create encrypted 0.db if missing + match server.current_storage() { + Ok(_) => return Ok(Protocol::SimpleString("OK".to_string())), + Err(e) => return Ok(Protocol::err(&e.0)), + } + } + _ => { + return Ok(Protocol::err("ERR invalid access key")); } - default_meta } + } + + // DB > 0: must exist in admin:dbs + let exists = match crate::admin_meta::db_exists( + &server.option.dir, + server.option.backend.clone(), + &server.option.admin_secret, + db, + ) { + Ok(b) => b, + Err(e) => return Ok(Protocol::err(&e.0)), }; - // Check access permissions - let permissions = if meta.public { - // Public database - full access - Some(crate::rpc::Permissions::ReadWrite) - } else if let Some(key_str) = key { - // Private database - check key - let hash = crate::rpc::hash_key(&key_str); - if let Some(access_key) = meta.keys.get(&hash) { - Some(access_key.permissions.clone()) - } else { - return Ok(Protocol::err("ERR invalid access key")); - } - } else { - return Ok(Protocol::err("ERR access key required for private database")); + if !exists { + return Ok(Protocol::err(&format!( + "Cannot open database instance {}, as that database instance does not exist.", + db + ))); + } + + // Verify permissions (public => RW; private => use key) + let perms_opt = match crate::admin_meta::verify_access( + &server.option.dir, + server.option.backend.clone(), + &server.option.admin_secret, + db, + key.as_deref(), + ) { + Ok(p) => p, + Err(e) => return Ok(Protocol::err(&e.0)), }; - // Set selected database and permissions + let perms = match perms_opt { + Some(p) => p, + None => return Ok(Protocol::err("ERR invalid access key")), + }; + + // Set selected database and permissions, then open storage server.selected_db = db; - server.current_permissions = permissions; + server.current_permissions = Some(perms); - // Test if we can access the database (this will create it if needed) match server.current_storage() { Ok(_) => Ok(Protocol::SimpleString("OK".to_string())), Err(e) => Ok(Protocol::err(&e.0)), diff --git a/src/lib.rs b/src/lib.rs index 85dd4e4..66a2990 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,3 +10,4 @@ pub mod server; pub mod storage; pub mod storage_trait; // Add this pub mod storage_sled; // Add this +pub mod admin_meta; diff --git a/src/main.rs b/src/main.rs index f98876f..233e675 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,12 +23,11 @@ struct Args { #[arg(long)] debug: bool, - - /// Master encryption key for encrypted databases + /// Master encryption key for encrypted databases (deprecated; ignored for data DBs) #[arg(long)] encryption_key: Option, - /// Encrypt the database + /// Encrypt the database (deprecated; ignored for data DBs) #[arg(long)] encrypt: bool, @@ -43,6 +42,10 @@ struct Args { /// Use the sled backend #[arg(long)] sled: bool, + + /// Admin secret used to encrypt DB 0 and authorize admin access (required) + #[arg(long)] + admin_secret: String, } #[tokio::main] @@ -57,6 +60,16 @@ async fn main() { .await .unwrap(); + // deprecation warnings for legacy flags + if args.encrypt || args.encryption_key.is_some() { + eprintln!("warning: --encrypt and --encryption-key are deprecated and ignored for data DBs. Admin DB 0 is always encrypted with --admin-secret."); + } + // basic validation for admin secret + if args.admin_secret.trim().is_empty() { + eprintln!("error: --admin-secret must not be empty"); + std::process::exit(2); + } + // new DB option let option = herodb::options::DBOption { dir: args.dir.clone(), @@ -69,18 +82,19 @@ async fn main() { } else { herodb::options::BackendType::Redb }, + admin_secret: args.admin_secret.clone(), }; let backend = option.backend.clone(); + // Bootstrap admin DB 0 before opening any server storage + if let Err(e) = herodb::admin_meta::ensure_bootstrap(&args.dir, backend.clone(), &args.admin_secret) { + eprintln!("Failed to bootstrap admin DB 0: {}", e.0); + std::process::exit(2); + } + // new server - let mut server = server::Server::new(option).await; - - // Initialize the default database storage (creates 0.db) - let _ = server.current_storage(); - - // Ensure default meta for DB 0 exists (public by default if missing) - let _ = herodb::rpc::RpcServerImpl::load_meta_static(&server.option.dir, 0).await; + let server = server::Server::new(option).await; // Add a small delay to ensure the port is ready tokio::time::sleep(std::time::Duration::from_millis(100)).await; @@ -90,7 +104,7 @@ async fn main() { let rpc_addr = format!("127.0.0.1:{}", args.rpc_port).parse().unwrap(); let base_dir = args.dir.clone(); - match rpc_server::start_rpc_server(rpc_addr, base_dir, backend).await { + match rpc_server::start_rpc_server(rpc_addr, base_dir, backend, args.admin_secret.clone()).await { Ok(handle) => { println!("RPC management server started on port {}", args.rpc_port); Some(handle) diff --git a/src/options.rs b/src/options.rs index 067183d..2ee44c2 100644 --- a/src/options.rs +++ b/src/options.rs @@ -9,7 +9,11 @@ pub struct DBOption { pub dir: String, pub port: u16, pub debug: bool, + // Deprecated for data DBs; retained for backward-compat on CLI parsing pub encrypt: bool, + // Deprecated for data DBs; retained for backward-compat on CLI parsing pub encryption_key: Option, pub backend: BackendType, + // New: required admin secret, used to encrypt DB 0 and authorize admin operations + pub admin_secret: String, } diff --git a/src/rpc.rs b/src/rpc.rs index c64d2b4..a6d4400 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -7,6 +7,7 @@ use sha2::{Digest, Sha256}; use crate::server::Server; use crate::options::DBOption; +use crate::admin_meta; /// Database backend types #[derive(Debug, Clone, Serialize, Deserialize)] @@ -140,11 +141,13 @@ pub struct RpcServerImpl { backend: crate::options::BackendType, /// Encryption keys for databases encryption_keys: Arc>>>, + /// Admin secret used to encrypt DB 0 and authorize admin access + admin_secret: String, } impl RpcServerImpl { /// Create a new RPC server instance - pub fn new(base_dir: String, backend: crate::options::BackendType) -> Self { + pub fn new(base_dir: String, backend: crate::options::BackendType, admin_secret: String) -> Self { Self { base_dir, servers: Arc::new(RwLock::new(HashMap::new())), @@ -152,6 +155,7 @@ impl RpcServerImpl { next_encrypted_id: Arc::new(RwLock::new(10)), backend, encryption_keys: Arc::new(RwLock::new(HashMap::new())), + admin_secret, } } @@ -165,9 +169,10 @@ impl RpcServerImpl { } } - // Check if database file exists - let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_id)); - if !db_path.exists() { + // Validate existence via admin DB 0 (metadata), not filesystem presence + let exists = admin_meta::db_exists(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; + if !exists { return Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, format!("Database {} not found", db_id), @@ -183,13 +188,17 @@ impl RpcServerImpl { encryption_key: None, encrypt: false, backend: self.backend.clone(), + admin_secret: self.admin_secret.clone(), }; let mut server = Server::new(db_option).await; - // Set the selected database to the db_id for proper file naming + // Set the selected database to the db_id server.selected_db = db_id; + // Lazily open/create physical storage according to admin meta (per-db encryption) + let _ = server.current_storage(); + // Store the server let mut servers = self.servers.write().await; servers.insert(db_id, Arc::new(server.clone())); @@ -197,27 +206,10 @@ impl RpcServerImpl { Ok(Arc::new(server)) } - /// Discover existing database files in the base directory + /// Discover existing database IDs from admin DB 0 async fn discover_databases(&self) -> Vec { - let mut db_ids = Vec::new(); - - if let Ok(entries) = std::fs::read_dir(&self.base_dir) { - for entry in entries.flatten() { - if let Ok(file_name) = entry.file_name().into_string() { - // Check if it's a database file (ends with .db) - if file_name.ends_with(".db") { - // Extract database ID from filename (e.g., "11.db" -> 11) - if let Some(id_str) = file_name.strip_suffix(".db") { - if let Ok(db_id) = id_str.parse::() { - db_ids.push(db_id); - } - } - } - } - } - } - - db_ids + admin_meta::list_dbs(&self.base_dir, self.backend.clone(), &self.admin_secret) + .unwrap_or_default() } /// Get the next available database ID @@ -431,76 +423,52 @@ impl RpcServer for RpcServerImpl { async fn create_database( &self, backend: BackendType, - config: DatabaseConfig, + _config: DatabaseConfig, encryption_key: Option, ) -> RpcResult { - let db_id = self.get_next_db_id(encryption_key.is_some()).await; + // Allocate new ID via admin DB 0 + let db_id = admin_meta::allocate_next_id(&self.base_dir, self.backend.clone(), &self.admin_secret) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; - // Handle both Redb and Sled backends - match backend { - BackendType::Redb | BackendType::Sled => { - // Create database directory - let db_dir = if let Some(path) = &config.storage_path { - std::path::PathBuf::from(path) - } else { - std::path::PathBuf::from(&self.base_dir).join(format!("rpc_db_{}", db_id)) - }; - - // Ensure directory exists - std::fs::create_dir_all(&db_dir) - .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned( - -32000, - format!("Failed to create directory: {}", e), - None::<()> - ))?; - - // Create DB options - let encrypt = encryption_key.is_some(); - let option = DBOption { - dir: db_dir.to_string_lossy().to_string(), - port: 0, // Not used for RPC-managed databases - debug: false, - encryption_key: encryption_key.clone(), - encrypt, - backend: match backend { - BackendType::Redb => crate::options::BackendType::Redb, - BackendType::Sled => crate::options::BackendType::Sled, - }, - }; - - // Create server instance - let mut server = Server::new(option).await; - - // Set the selected database to the db_id for proper file naming - server.selected_db = db_id; - - // Initialize the storage to create the database file - let _ = server.current_storage(); - - // Store the encryption key - { - let mut keys = self.encryption_keys.write().await; - keys.insert(db_id, encryption_key.clone()); - } - - // Initialize meta file - let meta = DatabaseMeta { - public: true, - keys: HashMap::new(), - }; - self.save_meta(db_id, &meta).await?; - - // Store the server - let mut servers = self.servers.write().await; - servers.insert(db_id, Arc::new(server)); - - Ok(db_id) - } + // Persist per-db encryption key in admin DB 0 if provided + if let Some(ref key) = encryption_key { + admin_meta::set_enc_key(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, key) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; } + + // Ensure base dir exists + if let Err(e) = std::fs::create_dir_all(&self.base_dir) { + return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, format!("Failed to ensure base dir: {}", e), None::<()>)); + } + + // Create server instance using base_dir and admin secret + let option = DBOption { + dir: self.base_dir.clone(), + port: 0, // Not used for RPC-managed databases + debug: false, + encryption_key: None, // per-db key is stored in admin DB 0 + encrypt: false, // encryption decided per-db at open time + backend: match backend { + BackendType::Redb => crate::options::BackendType::Redb, + BackendType::Sled => crate::options::BackendType::Sled, + }, + admin_secret: self.admin_secret.clone(), + }; + + let mut server = Server::new(option).await; + server.selected_db = db_id; + + // Initialize storage to create physical .db with proper encryption from admin meta + let _ = server.current_storage(); + + // Store the server in cache + let mut servers = self.servers.write().await; + servers.insert(db_id, Arc::new(server)); + + Ok(db_id) } async fn set_encryption(&self, db_id: u64, _encryption_key: String) -> RpcResult { - // Note: In a real implementation, we'd need to modify the existing database // For now, return false as encryption can only be set during creation let _servers = self.servers.read().await; // TODO: Implement encryption setting for existing databases @@ -564,8 +532,6 @@ impl RpcServer for RpcServerImpl { } async fn add_access_key(&self, db_id: u64, key: String, permissions: String) -> RpcResult { - let mut meta = self.load_meta(db_id).await?; - let perms = match permissions.to_lowercase().as_str() { "read" => Permissions::Read, "readwrite" => Permissions::ReadWrite, @@ -576,52 +542,31 @@ impl RpcServer for RpcServerImpl { )), }; - let hash = hash_key(&key); - let access_key = AccessKey { - hash: hash.clone(), - permissions: perms, - created_at: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(), - }; - - meta.keys.insert(hash, access_key); - self.save_meta(db_id, &meta).await?; + admin_meta::add_access_key(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, &key, perms) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(true) } async fn delete_access_key(&self, db_id: u64, key_hash: String) -> RpcResult { - let mut meta = self.load_meta(db_id).await?; - - if meta.keys.remove(&key_hash).is_some() { - // If no keys left, make database public - if meta.keys.is_empty() { - meta.public = true; - } - self.save_meta(db_id, &meta).await?; - Ok(true) - } else { - Ok(false) - } + let ok = admin_meta::delete_access_key(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, &key_hash) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; + Ok(ok) } async fn list_access_keys(&self, db_id: u64) -> RpcResult> { - let meta = self.load_meta(db_id).await?; - let keys: Vec = meta.keys.values() - .map(|k| AccessKeyInfo { - hash: k.hash.clone(), - permissions: k.permissions.clone(), - created_at: k.created_at, - }) - .collect(); + let pairs = admin_meta::list_access_keys(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; + let keys: Vec = pairs.into_iter().map(|(hash, perm, ts)| AccessKeyInfo { + hash, + permissions: perm, + created_at: ts, + }).collect(); Ok(keys) } async fn set_database_public(&self, db_id: u64, public: bool) -> RpcResult { - let mut meta = self.load_meta(db_id).await?; - meta.public = public; - self.save_meta(db_id, &meta).await?; + admin_meta::set_database_public(&self.base_dir, self.backend.clone(), &self.admin_secret, db_id, public) + .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?; Ok(true) } } \ No newline at end of file diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 88ab432..eaabc5b 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -5,9 +5,9 @@ use jsonrpsee::RpcModule; use crate::rpc::{RpcServer, RpcServerImpl}; /// Start the RPC server on the specified address -pub async fn start_rpc_server(addr: SocketAddr, base_dir: String, backend: crate::options::BackendType) -> Result> { +pub async fn start_rpc_server(addr: SocketAddr, base_dir: String, backend: crate::options::BackendType, admin_secret: String) -> Result> { // Create the RPC server implementation - let rpc_impl = RpcServerImpl::new(base_dir, backend); + let rpc_impl = RpcServerImpl::new(base_dir, backend, admin_secret); // Create the RPC module let mut module = RpcModule::new(()); @@ -37,7 +37,7 @@ mod tests { let base_dir = "/tmp/test_rpc".to_string(); let backend = crate::options::BackendType::Redb; // Default for test - let handle = start_rpc_server(addr, base_dir, backend).await.unwrap(); + let handle = start_rpc_server(addr, base_dir, backend, "test-admin".to_string()).await.unwrap(); // Give the server a moment to start tokio::time::sleep(Duration::from_millis(100)).await; diff --git a/src/server.rs b/src/server.rs index 63864c6..f02f065 100644 --- a/src/server.rs +++ b/src/server.rs @@ -11,9 +11,8 @@ use crate::cmd::Cmd; use crate::error::DBError; use crate::options; use crate::protocol::Protocol; -use crate::storage::Storage; -use crate::storage_sled::SledStorage; use crate::storage_trait::StorageBackend; +use crate::admin_meta; #[derive(Clone)] pub struct Server { @@ -58,50 +57,33 @@ impl Server { pub fn current_storage(&self) -> Result, DBError> { let mut cache = self.db_cache.write().unwrap(); - + if let Some(storage) = cache.get(&self.selected_db) { return Ok(storage.clone()); } - - - // Create new database file - let db_file_path = std::path::PathBuf::from(self.option.dir.clone()) - .join(format!("{}.db", self.selected_db)); - - // Ensure the directory exists before creating the database file - if let Some(parent_dir) = db_file_path.parent() { - std::fs::create_dir_all(parent_dir).map_err(|e| { - DBError(format!("Failed to create directory {}: {}", parent_dir.display(), e)) - })?; - } - - println!("Creating new db file: {}", db_file_path.display()); - - let storage: Arc = match self.option.backend { - options::BackendType::Redb => { - Arc::new(Storage::new( - db_file_path, - self.should_encrypt_db(self.selected_db), - self.option.encryption_key.as_deref() - )?) - } - options::BackendType::Sled => { - Arc::new(SledStorage::new( - db_file_path, - self.should_encrypt_db(self.selected_db), - self.option.encryption_key.as_deref() - )?) - } + + // Use process-wide shared handles to avoid sled/reDB double-open lock contention. + let storage = if self.selected_db == 0 { + // Admin DB 0: always via singleton + admin_meta::open_admin_storage( + &self.option.dir, + self.option.backend.clone(), + &self.option.admin_secret, + )? + } else { + // Data DBs: via global registry keyed by id + admin_meta::open_data_storage( + &self.option.dir, + self.option.backend.clone(), + &self.option.admin_secret, + self.selected_db, + )? }; - + cache.insert(self.selected_db, storage.clone()); Ok(storage) } - fn should_encrypt_db(&self, db_index: u64) -> bool { - // DB 0-9 are non-encrypted, DB 10+ are encrypted - self.option.encrypt && db_index >= 10 - } /// Check if current permissions allow read operations pub fn has_read_permission(&self) -> bool { diff --git a/tests/debug_hset.rs b/tests/debug_hset.rs index 7930be8..44cd39b 100644 --- a/tests/debug_hset.rs +++ b/tests/debug_hset.rs @@ -28,6 +28,7 @@ async fn debug_hset_simple() { encrypt: false, encryption_key: None, backend: herodb::options::BackendType::Redb, + admin_secret: "test-admin".to_string(), }; let mut server = Server::new(option).await; @@ -48,6 +49,12 @@ async fn debug_hset_simple() { sleep(Duration::from_millis(200)).await; let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap(); + // Acquire ReadWrite permissions on this connection + let resp = send_command( + &mut stream, + "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n", + ).await; + assert!(resp.contains("OK"), "Failed SELECT handshake: {}", resp); // Test simple HSET println!("Testing HSET..."); diff --git a/tests/debug_hset_simple.rs b/tests/debug_hset_simple.rs index 356e704..fe99f0f 100644 --- a/tests/debug_hset_simple.rs +++ b/tests/debug_hset_simple.rs @@ -19,6 +19,7 @@ async fn debug_hset_return_value() { encrypt: false, encryption_key: None, backend: herodb::options::BackendType::Redb, + admin_secret: "test-admin".to_string(), }; let mut server = Server::new(option).await; @@ -40,12 +41,19 @@ async fn debug_hset_return_value() { // Connect and test HSET let mut stream = TcpStream::connect("127.0.0.1:16390").await.unwrap(); + + // Acquire ReadWrite permissions for this new connection + let handshake = "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n"; + stream.write_all(handshake.as_bytes()).await.unwrap(); + let mut buffer = [0; 1024]; + let n = stream.read(&mut buffer).await.unwrap(); + let resp = String::from_utf8_lossy(&buffer[..n]); + assert!(resp.contains("OK"), "Failed SELECT handshake: {}", resp); // Send HSET command let cmd = "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n"; stream.write_all(cmd.as_bytes()).await.unwrap(); - let mut buffer = [0; 1024]; let n = stream.read(&mut buffer).await.unwrap(); let response = String::from_utf8_lossy(&buffer[..n]); diff --git a/tests/redis_integration_tests.rs b/tests/redis_integration_tests.rs index 47033e1..a017780 100644 --- a/tests/redis_integration_tests.rs +++ b/tests/redis_integration_tests.rs @@ -12,7 +12,15 @@ fn get_redis_connection(port: u16) -> Connection { match client.get_connection() { Ok(mut conn) => { if redis::cmd("PING").query::(&mut conn).is_ok() { - return conn; + // Acquire ReadWrite permissions on this connection + let sel: RedisResult = redis::cmd("SELECT") + .arg(0) + .arg("KEY") + .arg("test-admin") + .query(&mut conn); + if sel.is_ok() { + return conn; + } } } Err(e) => { @@ -78,6 +86,8 @@ fn setup_server() -> (ServerProcessGuard, u16) { "--port", &port.to_string(), "--debug", + "--admin-secret", + "test-admin", ]) .spawn() .expect("Failed to start server process"); diff --git a/tests/redis_tests.rs b/tests/redis_tests.rs index f6e8a13..724704c 100644 --- a/tests/redis_tests.rs +++ b/tests/redis_tests.rs @@ -23,18 +23,29 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { encrypt: false, encryption_key: None, backend: herodb::options::BackendType::Redb, + admin_secret: "test-admin".to_string(), }; let server = Server::new(option).await; (server, port) } -// Helper function to connect to the test server + // Helper function to connect to the test server async fn connect_to_server(port: u16) -> TcpStream { let mut attempts = 0; loop { match TcpStream::connect(format!("127.0.0.1:{}", port)).await { - Ok(stream) => return stream, + Ok(mut stream) => { + // Obtain ReadWrite permissions for this connection by selecting DB 0 with admin key + let resp = send_command( + &mut stream, + "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n", + ).await; + if !resp.contains("OK") { + panic!("Failed to acquire write permissions via SELECT 0 KEY test-admin: {}", resp); + } + return stream; + } Err(_) if attempts < 10 => { attempts += 1; sleep(Duration::from_millis(100)).await; diff --git a/tests/simple_integration_test.rs b/tests/simple_integration_test.rs index 42269df..706c9cb 100644 --- a/tests/simple_integration_test.rs +++ b/tests/simple_integration_test.rs @@ -25,6 +25,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { encrypt: false, encryption_key: None, backend: herodb::options::BackendType::Redb, + admin_secret: "test-admin".to_string(), }; let server = Server::new(option).await; @@ -34,9 +35,16 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { // Helper function to send Redis command and get response async fn send_redis_command(port: u16, command: &str) -> String { let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap(); + + // Acquire ReadWrite permissions on this new connection + let handshake = "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n"; + stream.write_all(handshake.as_bytes()).await.unwrap(); + let mut buffer = [0; 1024]; + let _ = stream.read(&mut buffer).await.unwrap(); // Read and ignore the OK for handshake + + // Now send the intended command stream.write_all(command.as_bytes()).await.unwrap(); - let mut buffer = [0; 1024]; let n = stream.read(&mut buffer).await.unwrap(); String::from_utf8_lossy(&buffer[..n]).to_string() } @@ -184,12 +192,19 @@ async fn test_transaction_operations() { sleep(Duration::from_millis(100)).await; - // Use a single connection for the transaction + // Use a single connection for the transaction let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap(); + // Acquire write permissions for this connection + let handshake = "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n"; + stream.write_all(handshake.as_bytes()).await.unwrap(); + let mut buffer = [0; 1024]; + let n = stream.read(&mut buffer).await.unwrap(); + let resp = String::from_utf8_lossy(&buffer[..n]); + assert!(resp.contains("OK")); + // Test MULTI stream.write_all("*1\r\n$5\r\nMULTI\r\n".as_bytes()).await.unwrap(); - let mut buffer = [0; 1024]; let n = stream.read(&mut buffer).await.unwrap(); let response = String::from_utf8_lossy(&buffer[..n]); assert!(response.contains("OK")); diff --git a/tests/simple_redis_test.rs b/tests/simple_redis_test.rs index 8afb304..cd8c0a7 100644 --- a/tests/simple_redis_test.rs +++ b/tests/simple_redis_test.rs @@ -23,6 +23,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { encrypt: false, encryption_key: None, backend: herodb::options::BackendType::Redb, + admin_secret: "test-admin".to_string(), }; let server = Server::new(option).await; @@ -38,12 +39,22 @@ async fn send_command(stream: &mut TcpStream, command: &str) -> String { String::from_utf8_lossy(&buffer[..n]).to_string() } -// Helper function to connect to the test server + // Helper function to connect to the test server async fn connect_to_server(port: u16) -> TcpStream { let mut attempts = 0; loop { match TcpStream::connect(format!("127.0.0.1:{}", port)).await { - Ok(stream) => return stream, + Ok(mut stream) => { + // Acquire ReadWrite permissions for this connection + let resp = send_command( + &mut stream, + "*4\r\n$6\r\nSELECT\r\n$1\r\n0\r\n$3\r\nKEY\r\n$10\r\ntest-admin\r\n", + ).await; + if !resp.contains("OK") { + panic!("Failed to acquire write permissions via SELECT 0 KEY test-admin: {}", resp); + } + return stream; + } Err(_) if attempts < 10 => { attempts += 1; sleep(Duration::from_millis(100)).await; @@ -97,14 +108,21 @@ async fn test_hset_clean_db() { sleep(Duration::from_millis(200)).await; let mut stream = connect_to_server(port).await; - - // Test HSET - should return 1 for new field - let response = send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n").await; + + // Ensure clean DB state (admin DB 0 may be shared due to global singleton) + let flush = send_command(&mut stream, "*1\r\n$7\r\nFLUSHDB\r\n").await; + assert!(flush.contains("OK"), "Failed to FLUSHDB: {}", flush); + + // Test HSET - should return 1 for new field (use a unique key name to avoid collisions) + let key = "hash_clean"; + let hset_cmd = format!("*4\r\n$4\r\nHSET\r\n${}\r\n{}\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n", key.len(), key); + let response = send_command(&mut stream, &hset_cmd).await; println!("HSET response: {}", response); assert!(response.contains("1"), "Expected HSET to return 1, got: {}", response); // Test HGET - let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; + let hget_cmd = format!("*3\r\n$4\r\nHGET\r\n${}\r\n{}\r\n$6\r\nfield1\r\n", key.len(), key); + let response = send_command(&mut stream, &hget_cmd).await; println!("HGET response: {}", response); assert!(response.contains("value1")); } diff --git a/tests/usage_suite.rs b/tests/usage_suite.rs index 9a1af17..0203c28 100644 --- a/tests/usage_suite.rs +++ b/tests/usage_suite.rs @@ -23,6 +23,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { encrypt: false, encryption_key: None, backend: herodb::options::BackendType::Redb, + admin_secret: "test-admin".to_string(), }; let server = Server::new(option).await; @@ -61,7 +62,17 @@ async fn connect(port: u16) -> TcpStream { let mut attempts = 0; loop { match TcpStream::connect(format!("127.0.0.1:{}", port)).await { - Ok(s) => return s, + Ok(mut s) => { + // Acquire ReadWrite permissions for this connection using admin DB 0 + let resp = send_cmd(&mut s, &["SELECT", "0", "KEY", "test-admin"]).await; + assert_contains(&resp, "OK", "SELECT 0 KEY test-admin handshake"); + + // Ensure clean slate per test on DB 0 + let fl = send_cmd(&mut s, &["FLUSHDB"]).await; + assert_contains(&fl, "OK", "FLUSHDB after handshake"); + + return s; + } Err(_) if attempts < 30 => { attempts += 1; sleep(Duration::from_millis(100)).await; @@ -246,9 +257,9 @@ async fn test_01_connection_and_info() { let getname = send_cmd(&mut s, &["CLIENT", "GETNAME"]).await; assert_contains(&getname, "myapp", "CLIENT GETNAME"); - // SELECT db - let sel = send_cmd(&mut s, &["SELECT", "0"]).await; - assert_contains(&sel, "OK", "SELECT 0"); + // SELECT db (requires key on DB 0) + let sel = send_cmd(&mut s, &["SELECT", "0", "KEY", "test-admin"]).await; + assert_contains(&sel, "OK", "SELECT 0 with key"); // QUIT should close connection after sending OK let quit = send_cmd(&mut s, &["QUIT"]).await;