combined curret main (with sled) and RPC server
This commit is contained in:
335
src/rpc.rs
Normal file
335
src/rpc.rs
Normal file
@@ -0,0 +1,335 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::server::Server;
|
||||
use crate::options::DBOption;
|
||||
|
||||
/// Database backend types
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum BackendType {
|
||||
Redb,
|
||||
Sled,
|
||||
// Future: InMemory, Custom(String)
|
||||
}
|
||||
|
||||
/// Database configuration
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DatabaseConfig {
|
||||
pub name: Option<String>,
|
||||
pub storage_path: Option<String>,
|
||||
pub max_size: Option<u64>,
|
||||
pub redis_version: Option<String>,
|
||||
}
|
||||
|
||||
/// Database information returned by metadata queries
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DatabaseInfo {
|
||||
pub id: u64,
|
||||
pub name: Option<String>,
|
||||
pub backend: BackendType,
|
||||
pub encrypted: bool,
|
||||
pub redis_version: Option<String>,
|
||||
pub storage_path: Option<String>,
|
||||
pub size_on_disk: Option<u64>,
|
||||
pub key_count: Option<u64>,
|
||||
pub created_at: u64,
|
||||
pub last_access: Option<u64>,
|
||||
}
|
||||
|
||||
/// RPC trait for HeroDB management
|
||||
#[rpc(server, client, namespace = "herodb")]
|
||||
pub trait Rpc {
|
||||
/// Create a new database with specified configuration
|
||||
#[method(name = "createDatabase")]
|
||||
async fn create_database(
|
||||
&self,
|
||||
backend: BackendType,
|
||||
config: DatabaseConfig,
|
||||
encryption_key: Option<String>,
|
||||
) -> RpcResult<u64>;
|
||||
|
||||
/// Set encryption for an existing database (write-only key)
|
||||
#[method(name = "setEncryption")]
|
||||
async fn set_encryption(&self, db_id: u64, encryption_key: String) -> RpcResult<bool>;
|
||||
|
||||
/// List all managed databases
|
||||
#[method(name = "listDatabases")]
|
||||
async fn list_databases(&self) -> RpcResult<Vec<DatabaseInfo>>;
|
||||
|
||||
/// Get detailed information about a specific database
|
||||
#[method(name = "getDatabaseInfo")]
|
||||
async fn get_database_info(&self, db_id: u64) -> RpcResult<DatabaseInfo>;
|
||||
|
||||
/// Delete a database
|
||||
#[method(name = "deleteDatabase")]
|
||||
async fn delete_database(&self, db_id: u64) -> RpcResult<bool>;
|
||||
|
||||
/// Get server statistics
|
||||
#[method(name = "getServerStats")]
|
||||
async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>>;
|
||||
}
|
||||
|
||||
/// RPC Server implementation
|
||||
pub struct RpcServerImpl {
|
||||
/// Base directory for database files
|
||||
base_dir: String,
|
||||
/// Managed database servers
|
||||
servers: Arc<RwLock<HashMap<u64, Arc<Server>>>>,
|
||||
/// Next database ID to assign
|
||||
next_db_id: Arc<RwLock<u64>>,
|
||||
/// Database options (backend, encryption, etc.)
|
||||
db_option: DBOption,
|
||||
}
|
||||
|
||||
impl RpcServerImpl {
|
||||
/// Create a new RPC server instance
|
||||
pub fn new(base_dir: String, db_option: DBOption) -> Self {
|
||||
Self {
|
||||
base_dir,
|
||||
servers: Arc::new(RwLock::new(HashMap::new())),
|
||||
next_db_id: Arc::new(RwLock::new(0)),
|
||||
db_option,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create a server instance for the given database ID
|
||||
async fn get_or_create_server(&self, db_id: u64) -> Result<Arc<Server>, jsonrpsee::types::ErrorObjectOwned> {
|
||||
// Check if server already exists
|
||||
{
|
||||
let servers = self.servers.read().await;
|
||||
if let Some(server) = servers.get(&db_id) {
|
||||
return Ok(server.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Check if database file exists (either direct or in RPC subdirectory)
|
||||
let direct_db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_id));
|
||||
let rpc_db_path = std::path::PathBuf::from(&self.base_dir)
|
||||
.join(format!("rpc_db_{}", db_id))
|
||||
.join("0.db");
|
||||
|
||||
let (db_path, db_dir) = if direct_db_path.exists() {
|
||||
// Main server database
|
||||
(direct_db_path, self.base_dir.clone())
|
||||
} else if rpc_db_path.exists() {
|
||||
// RPC database
|
||||
(rpc_db_path, std::path::PathBuf::from(&self.base_dir).join(format!("rpc_db_{}", db_id)).to_string_lossy().to_string())
|
||||
} else {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Database {} not found", db_id),
|
||||
None::<()>
|
||||
));
|
||||
};
|
||||
|
||||
// Create server instance
|
||||
let mut db_option = self.db_option.clone();
|
||||
db_option.dir = db_dir;
|
||||
|
||||
let server = Server::new(db_option).await;
|
||||
|
||||
// Store the server
|
||||
let mut servers = self.servers.write().await;
|
||||
servers.insert(db_id, Arc::new(server.clone()));
|
||||
|
||||
Ok(Arc::new(server))
|
||||
}
|
||||
|
||||
/// Discover existing database files in the base directory and RPC subdirectories
|
||||
async fn discover_databases(&self) -> Vec<u64> {
|
||||
let mut db_ids = Vec::new();
|
||||
|
||||
if let Ok(entries) = std::fs::read_dir(&self.base_dir) {
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
|
||||
// Check if it's a directory starting with "rpc_db_"
|
||||
if path.is_dir() {
|
||||
if let Some(dir_name) = path.file_name().and_then(|n| n.to_str()) {
|
||||
if dir_name.starts_with("rpc_db_") {
|
||||
// Extract database ID from directory name (e.g., "rpc_db_1" -> 1)
|
||||
if let Some(id_str) = dir_name.strip_prefix("rpc_db_") {
|
||||
if let Ok(db_id) = id_str.parse::<u64>() {
|
||||
db_ids.push(db_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Also check for direct .db files (for main server databases)
|
||||
else if let Some(file_name) = entry.file_name().to_str() {
|
||||
if file_name.ends_with(".db") {
|
||||
// Extract database ID from filename (e.g., "11.db" -> 11)
|
||||
if let Some(id_str) = file_name.strip_suffix(".db") {
|
||||
if let Ok(db_id) = id_str.parse::<u64>() {
|
||||
db_ids.push(db_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
db_ids
|
||||
}
|
||||
|
||||
/// Get the next available database ID
|
||||
async fn get_next_db_id(&self) -> u64 {
|
||||
let mut id = self.next_db_id.write().await;
|
||||
let current_id = *id;
|
||||
*id += 1;
|
||||
current_id
|
||||
}
|
||||
}
|
||||
|
||||
#[jsonrpsee::core::async_trait]
|
||||
impl RpcServer for RpcServerImpl {
|
||||
async fn create_database(
|
||||
&self,
|
||||
backend: BackendType,
|
||||
config: DatabaseConfig,
|
||||
encryption_key: Option<String>,
|
||||
) -> RpcResult<u64> {
|
||||
let db_id = self.get_next_db_id().await;
|
||||
|
||||
// Handle both Redb and Sled backends
|
||||
match backend {
|
||||
BackendType::Redb | BackendType::Sled => {
|
||||
// Always create RPC databases in subdirectories to avoid conflicts with main server
|
||||
let db_dir = if let Some(path) = &config.storage_path {
|
||||
std::path::PathBuf::from(path).join(format!("rpc_db_{}", db_id))
|
||||
} else {
|
||||
std::path::PathBuf::from(&self.base_dir).join(format!("rpc_db_{}", db_id))
|
||||
};
|
||||
|
||||
// Ensure directory exists
|
||||
std::fs::create_dir_all(&db_dir)
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to create directory: {}", e),
|
||||
None::<()>
|
||||
))?;
|
||||
|
||||
// Create DB options
|
||||
let encrypt = encryption_key.is_some();
|
||||
let option = DBOption {
|
||||
dir: db_dir.to_string_lossy().to_string(),
|
||||
port: 0, // Not used for RPC-managed databases
|
||||
debug: false,
|
||||
encryption_key,
|
||||
encrypt,
|
||||
backend: match backend {
|
||||
BackendType::Redb => crate::options::BackendType::Redb,
|
||||
BackendType::Sled => crate::options::BackendType::Sled,
|
||||
},
|
||||
};
|
||||
|
||||
// Create server instance
|
||||
let server = Server::new(option).await;
|
||||
|
||||
// Store the server
|
||||
let mut servers = self.servers.write().await;
|
||||
servers.insert(db_id, Arc::new(server));
|
||||
|
||||
Ok(db_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn set_encryption(&self, db_id: u64, _encryption_key: String) -> RpcResult<bool> {
|
||||
// Note: In a real implementation, we'd need to modify the existing database
|
||||
// For now, return false as encryption can only be set during creation
|
||||
let _servers = self.servers.read().await;
|
||||
// TODO: Implement encryption setting for existing databases
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn list_databases(&self) -> RpcResult<Vec<DatabaseInfo>> {
|
||||
let db_ids = self.discover_databases().await;
|
||||
let mut result = Vec::new();
|
||||
|
||||
for db_id in db_ids {
|
||||
// Try to get or create server for this database
|
||||
if let Ok(server) = self.get_or_create_server(db_id).await {
|
||||
let backend = match server.option.backend {
|
||||
crate::options::BackendType::Redb => BackendType::Redb,
|
||||
crate::options::BackendType::Sled => BackendType::Sled,
|
||||
};
|
||||
|
||||
let info = DatabaseInfo {
|
||||
id: db_id,
|
||||
name: None, // TODO: Store name in server metadata
|
||||
backend,
|
||||
encrypted: server.option.encrypt,
|
||||
redis_version: Some("7.0".to_string()), // Default Redis compatibility
|
||||
storage_path: Some(server.option.dir.clone()),
|
||||
size_on_disk: None, // TODO: Calculate actual size
|
||||
key_count: None, // TODO: Get key count from storage
|
||||
created_at: std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs(),
|
||||
last_access: None,
|
||||
};
|
||||
result.push(info);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn get_database_info(&self, db_id: u64) -> RpcResult<DatabaseInfo> {
|
||||
let server = self.get_or_create_server(db_id).await?;
|
||||
|
||||
let backend = match server.option.backend {
|
||||
crate::options::BackendType::Redb => BackendType::Redb,
|
||||
crate::options::BackendType::Sled => BackendType::Sled,
|
||||
};
|
||||
|
||||
Ok(DatabaseInfo {
|
||||
id: db_id,
|
||||
name: None,
|
||||
backend,
|
||||
encrypted: server.option.encrypt,
|
||||
redis_version: Some("7.0".to_string()),
|
||||
storage_path: Some(server.option.dir.clone()),
|
||||
size_on_disk: None,
|
||||
key_count: None,
|
||||
created_at: std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs(),
|
||||
last_access: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete_database(&self, db_id: u64) -> RpcResult<bool> {
|
||||
let mut servers = self.servers.write().await;
|
||||
|
||||
if let Some(server) = servers.remove(&db_id) {
|
||||
// TODO: Clean up database files
|
||||
let _ = server;
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>> {
|
||||
let db_ids = self.discover_databases().await;
|
||||
let mut stats = HashMap::new();
|
||||
|
||||
stats.insert("total_databases".to_string(), serde_json::json!(db_ids.len()));
|
||||
stats.insert("uptime".to_string(), serde_json::json!(
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
));
|
||||
|
||||
Ok(stats)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user