This commit is contained in:
2025-08-16 09:29:18 +02:00
parent 5502ff4bc5
commit 0000d82799
13 changed files with 719 additions and 96 deletions

View File

@@ -4,7 +4,7 @@ use crate::{error::DBError, protocol::Protocol, server::Server};
pub enum Cmd {
Ping,
Echo(String),
Select(u16),
Select(u64), // Changed from u16 to u64
Get(String),
Set(String, String),
SetPx(String, String, u128),
@@ -47,6 +47,7 @@ pub enum Cmd {
LTrim(String, i64, i64),
LIndex(String, i64),
LRange(String, i64, i64),
FlushDb,
Unknow(String),
}
@@ -65,7 +66,7 @@ impl Cmd {
if cmd.len() != 2 {
return Err(DBError("wrong number of arguments for SELECT".to_string()));
}
let idx = cmd[1].parse::<u16>().map_err(|_| DBError("ERR DB index is not an integer".to_string()))?;
let idx = cmd[1].parse::<u64>().map_err(|_| DBError("ERR DB index is not an integer".to_string()))?;
Cmd::Select(idx)
}
"echo" => Cmd::Echo(cmd[1].clone()),
@@ -394,6 +395,12 @@ impl Cmd {
let stop = cmd[3].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::LRange(cmd[1].clone(), start, stop)
}
"flushdb" => {
if cmd.len() != 1 {
return Err(DBError("wrong number of arguments for FLUSHDB command".to_string()));
}
Cmd::FlushDb
}
_ => Cmd::Unknow(cmd[0].clone()),
},
protocol,
@@ -482,6 +489,7 @@ impl Cmd {
Cmd::LTrim(key, start, stop) => ltrim_cmd(server, key, *start, *stop).await,
Cmd::LIndex(key, index) => lindex_cmd(server, key, *index).await,
Cmd::LRange(key, start, stop) => lrange_cmd(server, key, *start, *stop).await,
Cmd::FlushDb => flushdb_cmd(server).await,
Cmd::Unknow(s) => {
println!("\x1b[31;1munknown command: {}\x1b[0m", s);
Ok(Protocol::err(&format!("ERR unknown command '{}'", s)))
@@ -489,17 +497,25 @@ impl Cmd {
}
}
}
async fn select_cmd(server: &mut Server, db: u16) -> Result<Protocol, DBError> {
let idx = db as usize;
if idx >= server.storages.len() {
return Ok(Protocol::err("ERR DB index is out of range"));
async fn flushdb_cmd(server: &mut Server) -> Result<Protocol, DBError> {
match server.current_storage()?.flushdb() {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn select_cmd(server: &mut Server, db: u64) -> Result<Protocol, DBError> {
// Test if we can access the database (this will create it if needed)
server.selected_db = db;
match server.current_storage() {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
server.selected_db = idx;
Ok(Protocol::SimpleString("OK".to_string()))
}
async fn lindex_cmd(server: &Server, key: &str, index: i64) -> Result<Protocol, DBError> {
match server.current_storage().lindex(key, index) {
match server.current_storage()?.lindex(key, index) {
Ok(Some(element)) => Ok(Protocol::BulkString(element)),
Ok(None) => Ok(Protocol::Null),
Err(e) => Ok(Protocol::err(&e.0)),
@@ -507,35 +523,35 @@ async fn lindex_cmd(server: &Server, key: &str, index: i64) -> Result<Protocol,
}
async fn lrange_cmd(server: &Server, key: &str, start: i64, stop: i64) -> Result<Protocol, DBError> {
match server.current_storage().lrange(key, start, stop) {
match server.current_storage()?.lrange(key, start, stop) {
Ok(elements) => Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn ltrim_cmd(server: &Server, key: &str, start: i64, stop: i64) -> Result<Protocol, DBError> {
match server.current_storage().ltrim(key, start, stop) {
match server.current_storage()?.ltrim(key, start, stop) {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn lrem_cmd(server: &Server, key: &str, count: i64, element: &str) -> Result<Protocol, DBError> {
match server.current_storage().lrem(key, count, element) {
match server.current_storage()?.lrem(key, count, element) {
Ok(removed_count) => Ok(Protocol::SimpleString(removed_count.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn llen_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage().llen(key) {
match server.current_storage()?.llen(key) {
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn lpop_cmd(server: &Server, key: &str, count: &Option<u64>) -> Result<Protocol, DBError> {
match server.current_storage().lpop(key, *count) {
match server.current_storage()?.lpop(key, *count) {
Ok(Some(elements)) => {
if count.is_some() {
Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect()))
@@ -555,7 +571,7 @@ async fn lpop_cmd(server: &Server, key: &str, count: &Option<u64>) -> Result<Pro
}
async fn rpop_cmd(server: &Server, key: &str, count: &Option<u64>) -> Result<Protocol, DBError> {
match server.current_storage().rpop(key, *count) {
match server.current_storage()?.rpop(key, *count) {
Ok(Some(elements)) => {
if count.is_some() {
Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect()))
@@ -575,14 +591,14 @@ async fn rpop_cmd(server: &Server, key: &str, count: &Option<u64>) -> Result<Pro
}
async fn lpush_cmd(server: &Server, key: &str, elements: &[String]) -> Result<Protocol, DBError> {
match server.current_storage().lpush(key, elements.to_vec()) {
match server.current_storage()?.lpush(key, elements.to_vec()) {
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn rpush_cmd(server: &Server, key: &str, elements: &[String]) -> Result<Protocol, DBError> {
match server.current_storage().rpush(key, elements.to_vec()) {
match server.current_storage()?.rpush(key, elements.to_vec()) {
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
@@ -606,7 +622,8 @@ async fn exec_cmd(
}
async fn incr_cmd(server: &Server, key: &String) -> Result<Protocol, DBError> {
let current_value = server.current_storage().get(key)?;
let storage = server.current_storage()?;
let current_value = storage.get(key)?;
let new_value = match current_value {
Some(v) => {
@@ -618,7 +635,7 @@ async fn incr_cmd(server: &Server, key: &String) -> Result<Protocol, DBError> {
None => 1,
};
server.current_storage().set(key.clone(), new_value.to_string())?;
storage.set(key.clone(), new_value.to_string())?;
Ok(Protocol::SimpleString(new_value.to_string()))
}
@@ -634,14 +651,14 @@ fn config_get_cmd(name: &String, server: &Server) -> Result<Protocol, DBError> {
])),
"databases" => Ok(Protocol::Array(vec![
Protocol::BulkString(name.clone()),
Protocol::BulkString(server.option.databases.to_string()),
Protocol::BulkString(server.option.max_databases.unwrap_or(0).to_string()),
])),
_ => Ok(Protocol::Array(vec![])),
}
}
async fn keys_cmd(server: &Server) -> Result<Protocol, DBError> {
let keys = server.current_storage().keys("*")?;
let keys = server.current_storage()?.keys("*")?;
Ok(Protocol::Array(
keys.into_iter().map(Protocol::BulkString).collect(),
))
@@ -660,14 +677,14 @@ fn info_cmd(section: &Option<String>) -> Result<Protocol, DBError> {
}
async fn type_cmd(server: &Server, k: &String) -> Result<Protocol, DBError> {
match server.current_storage().get_key_type(k)? {
match server.current_storage()?.get_key_type(k)? {
Some(type_str) => Ok(Protocol::SimpleString(type_str)),
None => Ok(Protocol::SimpleString("none".to_string())),
}
}
async fn del_cmd(server: &Server, k: &str) -> Result<Protocol, DBError> {
server.current_storage().del(k.to_string())?;
server.current_storage()?.del(k.to_string())?;
Ok(Protocol::SimpleString("1".to_string()))
}
@@ -677,7 +694,7 @@ async fn set_ex_cmd(
v: &str,
x: &u128,
) -> Result<Protocol, DBError> {
server.current_storage().setx(k.to_string(), v.to_string(), *x * 1000)?;
server.current_storage()?.setx(k.to_string(), v.to_string(), *x * 1000)?;
Ok(Protocol::SimpleString("OK".to_string()))
}
@@ -687,28 +704,28 @@ async fn set_px_cmd(
v: &str,
x: &u128,
) -> Result<Protocol, DBError> {
server.current_storage().setx(k.to_string(), v.to_string(), *x)?;
server.current_storage()?.setx(k.to_string(), v.to_string(), *x)?;
Ok(Protocol::SimpleString("OK".to_string()))
}
async fn set_cmd(server: &Server, k: &str, v: &str) -> Result<Protocol, DBError> {
server.current_storage().set(k.to_string(), v.to_string())?;
server.current_storage()?.set(k.to_string(), v.to_string())?;
Ok(Protocol::SimpleString("OK".to_string()))
}
async fn get_cmd(server: &Server, k: &str) -> Result<Protocol, DBError> {
let v = server.current_storage().get(k)?;
let v = server.current_storage()?.get(k)?;
Ok(v.map_or(Protocol::Null, Protocol::BulkString))
}
// Hash command implementations
async fn hset_cmd(server: &Server, key: &str, pairs: &[(String, String)]) -> Result<Protocol, DBError> {
let new_fields = server.current_storage().hset(key, pairs)?;
let new_fields = server.current_storage()?.hset(key, pairs)?;
Ok(Protocol::SimpleString(new_fields.to_string()))
}
async fn hget_cmd(server: &Server, key: &str, field: &str) -> Result<Protocol, DBError> {
match server.current_storage().hget(key, field) {
match server.current_storage()?.hget(key, field) {
Ok(Some(value)) => Ok(Protocol::BulkString(value)),
Ok(None) => Ok(Protocol::Null),
Err(e) => Ok(Protocol::err(&e.0)),
@@ -716,7 +733,7 @@ async fn hget_cmd(server: &Server, key: &str, field: &str) -> Result<Protocol, D
}
async fn hgetall_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage().hgetall(key) {
match server.current_storage()?.hgetall(key) {
Ok(pairs) => {
let mut result = Vec::new();
for (field, value) in pairs {
@@ -730,21 +747,21 @@ async fn hgetall_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
}
async fn hdel_cmd(server: &Server, key: &str, fields: &[String]) -> Result<Protocol, DBError> {
match server.current_storage().hdel(key, fields) {
match server.current_storage()?.hdel(key, fields) {
Ok(deleted) => Ok(Protocol::SimpleString(deleted.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hexists_cmd(server: &Server, key: &str, field: &str) -> Result<Protocol, DBError> {
match server.current_storage().hexists(key, field) {
match server.current_storage()?.hexists(key, field) {
Ok(exists) => Ok(Protocol::SimpleString(if exists { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hkeys_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage().hkeys(key) {
match server.current_storage()?.hkeys(key) {
Ok(keys) => Ok(Protocol::Array(
keys.into_iter().map(Protocol::BulkString).collect(),
)),
@@ -753,7 +770,7 @@ async fn hkeys_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
}
async fn hvals_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage().hvals(key) {
match server.current_storage()?.hvals(key) {
Ok(values) => Ok(Protocol::Array(
values.into_iter().map(Protocol::BulkString).collect(),
)),
@@ -762,14 +779,14 @@ async fn hvals_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
}
async fn hlen_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage().hlen(key) {
match server.current_storage()?.hlen(key) {
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hmget_cmd(server: &Server, key: &str, fields: &[String]) -> Result<Protocol, DBError> {
match server.current_storage().hmget(key, fields) {
match server.current_storage()?.hmget(key, fields) {
Ok(values) => {
let result: Vec<Protocol> = values
.into_iter()
@@ -782,14 +799,14 @@ async fn hmget_cmd(server: &Server, key: &str, fields: &[String]) -> Result<Prot
}
async fn hsetnx_cmd(server: &Server, key: &str, field: &str, value: &str) -> Result<Protocol, DBError> {
match server.current_storage().hsetnx(key, field, value) {
match server.current_storage()?.hsetnx(key, field, value) {
Ok(was_set) => Ok(Protocol::SimpleString(if was_set { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn scan_cmd(server: &Server, cursor: &u64, pattern: Option<&str>, count: &Option<u64>) -> Result<Protocol, DBError> {
match server.current_storage().scan(*cursor, pattern, *count) {
match server.current_storage()?.scan(*cursor, pattern, *count) {
Ok((next_cursor, keys)) => {
let mut result = Vec::new();
result.push(Protocol::BulkString(next_cursor.to_string()));
@@ -803,7 +820,7 @@ async fn scan_cmd(server: &Server, cursor: &u64, pattern: Option<&str>, count: &
}
async fn hscan_cmd(server: &Server, key: &str, cursor: &u64, pattern: Option<&str>, count: &Option<u64>) -> Result<Protocol, DBError> {
match server.current_storage().hscan(key, *cursor, pattern, *count) {
match server.current_storage()?.hscan(key, *cursor, pattern, *count) {
Ok((next_cursor, fields)) => {
let mut result = Vec::new();
result.push(Protocol::BulkString(next_cursor.to_string()));
@@ -817,14 +834,14 @@ async fn hscan_cmd(server: &Server, key: &str, cursor: &u64, pattern: Option<&st
}
async fn ttl_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage().ttl(key) {
match server.current_storage()?.ttl(key) {
Ok(ttl) => Ok(Protocol::SimpleString(ttl.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn exists_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage().exists(key) {
match server.current_storage()?.exists(key) {
Ok(exists) => Ok(Protocol::SimpleString(if exists { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}