...
This commit is contained in:
645
src/cmd.rs
645
src/cmd.rs
@@ -1,8 +1,4 @@
|
||||
use std::{collections::BTreeMap, ops::Bound, time::Duration, u64};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::{error::DBError, protocol::Protocol, server::Server, storage::now_in_millis};
|
||||
use crate::{error::DBError, protocol::Protocol, server::Server};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Cmd {
|
||||
@@ -16,17 +12,23 @@ pub enum Cmd {
|
||||
ConfigGet(String),
|
||||
Info(Option<String>),
|
||||
Del(String),
|
||||
Replconf(String),
|
||||
Psync,
|
||||
Type(String),
|
||||
Xadd(String, String, Vec<(String, String)>),
|
||||
Xrange(String, String, String),
|
||||
Xread(Vec<String>, Vec<String>, Option<u64>),
|
||||
Incr(String),
|
||||
Multi,
|
||||
Exec,
|
||||
Unknow,
|
||||
Discard,
|
||||
// Hash commands
|
||||
HSet(String, Vec<(String, String)>),
|
||||
HGet(String, String),
|
||||
HGetAll(String),
|
||||
HDel(String, Vec<String>),
|
||||
HExists(String, String),
|
||||
HKeys(String),
|
||||
HVals(String),
|
||||
HLen(String),
|
||||
HMGet(String, Vec<String>),
|
||||
HSetNx(String, String, String),
|
||||
Unknow,
|
||||
}
|
||||
|
||||
impl Cmd {
|
||||
@@ -39,14 +41,14 @@ impl Cmd {
|
||||
return Err(DBError("cmd length is 0".to_string()));
|
||||
}
|
||||
Ok((
|
||||
match cmd[0].as_str() {
|
||||
match cmd[0].to_lowercase().as_str() {
|
||||
"echo" => Cmd::Echo(cmd[1].clone()),
|
||||
"ping" => Cmd::Ping,
|
||||
"get" => Cmd::Get(cmd[1].clone()),
|
||||
"set" => {
|
||||
if cmd.len() == 5 && cmd[3] == "px" {
|
||||
if cmd.len() == 5 && cmd[3].to_lowercase() == "px" {
|
||||
Cmd::SetPx(cmd[1].clone(), cmd[2].clone(), cmd[4].parse().unwrap())
|
||||
} else if cmd.len() == 5 && cmd[3] == "ex" {
|
||||
} else if cmd.len() == 5 && cmd[3].to_lowercase() == "ex" {
|
||||
Cmd::SetEx(cmd[1].clone(), cmd[2].clone(), cmd[4].parse().unwrap())
|
||||
} else if cmd.len() == 3 {
|
||||
Cmd::Set(cmd[1].clone(), cmd[2].clone())
|
||||
@@ -55,7 +57,7 @@ impl Cmd {
|
||||
}
|
||||
}
|
||||
"config" => {
|
||||
if cmd.len() != 3 || cmd[1] != "get" {
|
||||
if cmd.len() != 3 || cmd[1].to_lowercase() != "get" {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
} else {
|
||||
Cmd::ConfigGet(cmd[2].clone())
|
||||
@@ -76,18 +78,6 @@ impl Cmd {
|
||||
};
|
||||
Cmd::Info(section)
|
||||
}
|
||||
"replconf" => {
|
||||
if cmd.len() < 3 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
Cmd::Replconf(cmd[1].clone())
|
||||
}
|
||||
"psync" => {
|
||||
if cmd.len() != 3 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
Cmd::Psync
|
||||
}
|
||||
"del" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
@@ -100,44 +90,6 @@ impl Cmd {
|
||||
}
|
||||
Cmd::Type(cmd[1].clone())
|
||||
}
|
||||
"xadd" => {
|
||||
if cmd.len() < 5 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
|
||||
let mut key_value = Vec::<(String, String)>::new();
|
||||
let mut i = 3;
|
||||
while i < cmd.len() - 1 {
|
||||
key_value.push((cmd[i].clone(), cmd[i + 1].clone()));
|
||||
i += 2;
|
||||
}
|
||||
Cmd::Xadd(cmd[1].clone(), cmd[2].clone(), key_value)
|
||||
}
|
||||
"xrange" => {
|
||||
if cmd.len() != 4 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
Cmd::Xrange(cmd[1].clone(), cmd[2].clone(), cmd[3].clone())
|
||||
}
|
||||
"xread" => {
|
||||
if cmd.len() < 4 || cmd.len() % 2 != 0 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
let mut offset = 2;
|
||||
// block cmd
|
||||
let mut block = None;
|
||||
if cmd[1] == "block" {
|
||||
offset += 2;
|
||||
if let Ok(block_time) = cmd[2].parse() {
|
||||
block = Some(block_time);
|
||||
} else {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
}
|
||||
let cmd2 = &cmd[offset..];
|
||||
let len2 = cmd2.len() / 2;
|
||||
Cmd::Xread(cmd2[0..len2].to_vec(), cmd2[len2..].to_vec(), block)
|
||||
}
|
||||
"incr" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
@@ -157,6 +109,73 @@ impl Cmd {
|
||||
Cmd::Exec
|
||||
}
|
||||
"discard" => Cmd::Discard,
|
||||
// Hash commands
|
||||
"hset" => {
|
||||
if cmd.len() < 4 || (cmd.len() - 2) % 2 != 0 {
|
||||
return Err(DBError(format!("wrong number of arguments for HSET command")));
|
||||
}
|
||||
let mut pairs = Vec::new();
|
||||
let mut i = 2;
|
||||
while i < cmd.len() - 1 {
|
||||
pairs.push((cmd[i].clone(), cmd[i + 1].clone()));
|
||||
i += 2;
|
||||
}
|
||||
Cmd::HSet(cmd[1].clone(), pairs)
|
||||
}
|
||||
"hget" => {
|
||||
if cmd.len() != 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for HGET command")));
|
||||
}
|
||||
Cmd::HGet(cmd[1].clone(), cmd[2].clone())
|
||||
}
|
||||
"hgetall" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for HGETALL command")));
|
||||
}
|
||||
Cmd::HGetAll(cmd[1].clone())
|
||||
}
|
||||
"hdel" => {
|
||||
if cmd.len() < 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for HDEL command")));
|
||||
}
|
||||
Cmd::HDel(cmd[1].clone(), cmd[2..].to_vec())
|
||||
}
|
||||
"hexists" => {
|
||||
if cmd.len() != 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for HEXISTS command")));
|
||||
}
|
||||
Cmd::HExists(cmd[1].clone(), cmd[2].clone())
|
||||
}
|
||||
"hkeys" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for HKEYS command")));
|
||||
}
|
||||
Cmd::HKeys(cmd[1].clone())
|
||||
}
|
||||
"hvals" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for HVALS command")));
|
||||
}
|
||||
Cmd::HVals(cmd[1].clone())
|
||||
}
|
||||
"hlen" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for HLEN command")));
|
||||
}
|
||||
Cmd::HLen(cmd[1].clone())
|
||||
}
|
||||
"hmget" => {
|
||||
if cmd.len() < 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for HMGET command")));
|
||||
}
|
||||
Cmd::HMGet(cmd[1].clone(), cmd[2..].to_vec())
|
||||
}
|
||||
"hsetnx" => {
|
||||
if cmd.len() != 4 {
|
||||
return Err(DBError(format!("wrong number of arguments for HSETNX command")));
|
||||
}
|
||||
Cmd::HSetNx(cmd[1].clone(), cmd[2].clone(), cmd[3].clone())
|
||||
}
|
||||
_ => Cmd::Unknow,
|
||||
},
|
||||
protocol.0,
|
||||
@@ -171,13 +190,11 @@ impl Cmd {
|
||||
|
||||
pub async fn run(
|
||||
&self,
|
||||
server: &mut Server,
|
||||
server: &Server,
|
||||
protocol: Protocol,
|
||||
is_rep_con: bool,
|
||||
queued_cmd: &mut Option<Vec<(Cmd, Protocol)>>,
|
||||
) -> Result<Protocol, DBError> {
|
||||
// return if the command is a write command
|
||||
let p = protocol.clone();
|
||||
// Handle queued commands for transactions
|
||||
if queued_cmd.is_some()
|
||||
&& !matches!(self, Cmd::Exec)
|
||||
&& !matches!(self, Cmd::Multi)
|
||||
@@ -189,71 +206,57 @@ impl Cmd {
|
||||
.push((self.clone(), protocol.clone()));
|
||||
return Ok(Protocol::SimpleString("QUEUED".to_string()));
|
||||
}
|
||||
let ret = match self {
|
||||
|
||||
match self {
|
||||
Cmd::Ping => Ok(Protocol::SimpleString("PONG".to_string())),
|
||||
Cmd::Echo(s) => Ok(Protocol::SimpleString(s.clone())),
|
||||
Cmd::Get(k) => get_cmd(server, k).await,
|
||||
Cmd::Set(k, v) => set_cmd(server, k, v, protocol, is_rep_con).await,
|
||||
Cmd::SetPx(k, v, x) => set_px_cmd(server, k, v, x, protocol, is_rep_con).await,
|
||||
Cmd::SetEx(k, v, x) => set_ex_cmd(server, k, v, x, protocol, is_rep_con).await,
|
||||
Cmd::Del(k) => del_cmd(server, k, protocol, is_rep_con).await,
|
||||
Cmd::Set(k, v) => set_cmd(server, k, v).await,
|
||||
Cmd::SetPx(k, v, x) => set_px_cmd(server, k, v, x).await,
|
||||
Cmd::SetEx(k, v, x) => set_ex_cmd(server, k, v, x).await,
|
||||
Cmd::Del(k) => del_cmd(server, k).await,
|
||||
Cmd::ConfigGet(name) => config_get_cmd(name, server),
|
||||
Cmd::Keys => keys_cmd(server).await,
|
||||
Cmd::Info(section) => info_cmd(section, server),
|
||||
Cmd::Replconf(sub_cmd) => replconf_cmd(sub_cmd, server),
|
||||
Cmd::Psync => psync_cmd(server),
|
||||
Cmd::Info(section) => info_cmd(section),
|
||||
Cmd::Type(k) => type_cmd(server, k).await,
|
||||
Cmd::Xadd(stream_key, offset, kvps) => {
|
||||
xadd_cmd(
|
||||
offset.as_str(),
|
||||
server,
|
||||
stream_key.as_str(),
|
||||
kvps,
|
||||
protocol,
|
||||
is_rep_con,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
Cmd::Xrange(stream_key, start, end) => xrange_cmd(server, stream_key, start, end).await,
|
||||
Cmd::Xread(stream_keys, starts, block) => {
|
||||
xread_cmd(starts, server, stream_keys, block).await
|
||||
}
|
||||
Cmd::Incr(key) => incr_cmd(server, key).await,
|
||||
Cmd::Multi => {
|
||||
*queued_cmd = Some(Vec::<(Cmd, Protocol)>::new());
|
||||
Ok(Protocol::SimpleString("ok".to_string()))
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
Cmd::Exec => exec_cmd(queued_cmd, server, is_rep_con).await,
|
||||
Cmd::Exec => exec_cmd(queued_cmd, server).await,
|
||||
Cmd::Discard => {
|
||||
if queued_cmd.is_some() {
|
||||
*queued_cmd = None;
|
||||
Ok(Protocol::SimpleString("ok".to_string()))
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
} else {
|
||||
Ok(Protocol::err("ERR Discard without MULTI"))
|
||||
Ok(Protocol::err("ERR DISCARD without MULTI"))
|
||||
}
|
||||
}
|
||||
Cmd::Unknow => Ok(Protocol::err("unknow cmd")),
|
||||
};
|
||||
if ret.is_ok() {
|
||||
server.offset.fetch_add(
|
||||
p.encode().len() as u64,
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
// Hash commands
|
||||
Cmd::HSet(key, pairs) => hset_cmd(server, key, pairs).await,
|
||||
Cmd::HGet(key, field) => hget_cmd(server, key, field).await,
|
||||
Cmd::HGetAll(key) => hgetall_cmd(server, key).await,
|
||||
Cmd::HDel(key, fields) => hdel_cmd(server, key, fields).await,
|
||||
Cmd::HExists(key, field) => hexists_cmd(server, key, field).await,
|
||||
Cmd::HKeys(key) => hkeys_cmd(server, key).await,
|
||||
Cmd::HVals(key) => hvals_cmd(server, key).await,
|
||||
Cmd::HLen(key) => hlen_cmd(server, key).await,
|
||||
Cmd::HMGet(key, fields) => hmget_cmd(server, key, fields).await,
|
||||
Cmd::HSetNx(key, field, value) => hsetnx_cmd(server, key, field, value).await,
|
||||
Cmd::Unknow => Ok(Protocol::err("unknown cmd")),
|
||||
}
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
async fn exec_cmd(
|
||||
queued_cmd: &mut Option<Vec<(Cmd, Protocol)>>,
|
||||
server: &mut Server,
|
||||
is_rep_con: bool,
|
||||
server: &Server,
|
||||
) -> Result<Protocol, DBError> {
|
||||
if queued_cmd.is_some() {
|
||||
let mut vec = Vec::new();
|
||||
for (cmd, protocol) in queued_cmd.as_ref().unwrap() {
|
||||
let res = Box::pin(cmd.run(server, protocol.clone(), is_rep_con, &mut None)).await?;
|
||||
let res = Box::pin(cmd.run(server, protocol.clone(), &mut None)).await?;
|
||||
vec.push(res);
|
||||
}
|
||||
*queued_cmd = None;
|
||||
@@ -263,22 +266,24 @@ async fn exec_cmd(
|
||||
}
|
||||
}
|
||||
|
||||
async fn incr_cmd(server: &mut Server, key: &String) -> Result<Protocol, DBError> {
|
||||
let mut storage = server.storage.lock().await;
|
||||
let v = storage.get(key);
|
||||
// return 1 if key is missing
|
||||
let v = v.map_or("1".to_string(), |v| v);
|
||||
|
||||
if let Ok(x) = v.parse::<u64>() {
|
||||
let v = (x + 1).to_string();
|
||||
storage.set(key.clone(), v.clone());
|
||||
Ok(Protocol::SimpleString(v))
|
||||
} else {
|
||||
Ok(Protocol::err("ERR value is not an integer or out of range"))
|
||||
}
|
||||
async fn incr_cmd(server: &Server, key: &String) -> Result<Protocol, DBError> {
|
||||
let current_value = server.storage.get(key)?;
|
||||
|
||||
let new_value = match current_value {
|
||||
Some(v) => {
|
||||
match v.parse::<i64>() {
|
||||
Ok(num) => num + 1,
|
||||
Err(_) => return Ok(Protocol::err("ERR value is not an integer or out of range")),
|
||||
}
|
||||
}
|
||||
None => 1,
|
||||
};
|
||||
|
||||
server.storage.set(key.clone(), new_value.to_string())?;
|
||||
Ok(Protocol::SimpleString(new_value.to_string()))
|
||||
}
|
||||
|
||||
fn config_get_cmd(name: &String, server: &mut Server) -> Result<Protocol, DBError> {
|
||||
fn config_get_cmd(name: &String, server: &Server) -> Result<Protocol, DBError> {
|
||||
match name.as_str() {
|
||||
"dir" => Ok(Protocol::Array(vec![
|
||||
Protocol::BulkString(name.clone()),
|
||||
@@ -286,336 +291,156 @@ fn config_get_cmd(name: &String, server: &mut Server) -> Result<Protocol, DBErro
|
||||
])),
|
||||
"dbfilename" => Ok(Protocol::Array(vec![
|
||||
Protocol::BulkString(name.clone()),
|
||||
Protocol::BulkString(server.option.db_file_name.clone()),
|
||||
Protocol::BulkString("herodb.redb".to_string()),
|
||||
])),
|
||||
_ => Err(DBError(format!("unsupported config {:?}", name))),
|
||||
}
|
||||
}
|
||||
|
||||
async fn keys_cmd(server: &mut Server) -> Result<Protocol, DBError> {
|
||||
let keys = { server.storage.lock().await.keys() };
|
||||
async fn keys_cmd(server: &Server) -> Result<Protocol, DBError> {
|
||||
let keys = server.storage.keys("*")?;
|
||||
Ok(Protocol::Array(
|
||||
keys.into_iter().map(Protocol::BulkString).collect(),
|
||||
))
|
||||
}
|
||||
|
||||
fn info_cmd(section: &Option<String>, server: &mut Server) -> Result<Protocol, DBError> {
|
||||
fn info_cmd(section: &Option<String>) -> Result<Protocol, DBError> {
|
||||
match section {
|
||||
Some(s) => match s.as_str() {
|
||||
"replication" => Ok(Protocol::BulkString(format!(
|
||||
"role:{}\nmaster_replid:{}\nmaster_repl_offset:{}\n",
|
||||
server.option.replication.role,
|
||||
server.option.replication.master_replid,
|
||||
server.option.replication.master_repl_offset
|
||||
))),
|
||||
"replication" => Ok(Protocol::BulkString(
|
||||
"role:master\nmaster_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeea\nmaster_repl_offset:0\n".to_string()
|
||||
)),
|
||||
_ => Err(DBError(format!("unsupported section {:?}", s))),
|
||||
},
|
||||
None => Ok(Protocol::BulkString("default".to_string())),
|
||||
None => Ok(Protocol::BulkString("# Server\nredis_version:7.0.0\n".to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn xread_cmd(
|
||||
starts: &[String],
|
||||
server: &mut Server,
|
||||
stream_keys: &[String],
|
||||
block_millis: &Option<u64>,
|
||||
) -> Result<Protocol, DBError> {
|
||||
if let Some(t) = block_millis {
|
||||
if t > &0 {
|
||||
tokio::time::sleep(Duration::from_millis(*t)).await;
|
||||
} else {
|
||||
let (sender, mut receiver) = mpsc::channel(4);
|
||||
{
|
||||
let mut blocker = server.stream_reader_blocker.lock().await;
|
||||
blocker.push(sender.clone());
|
||||
}
|
||||
while let Some(_) = receiver.recv().await {
|
||||
println!("get new xadd cmd, release block");
|
||||
// break;
|
||||
}
|
||||
}
|
||||
}
|
||||
let streams = server.streams.lock().await;
|
||||
let mut ret = Vec::new();
|
||||
for (i, stream_key) in stream_keys.iter().enumerate() {
|
||||
let stream = streams.get(stream_key);
|
||||
if let Some(s) = stream {
|
||||
let (offset_id, mut offset_seq, _) = split_offset(starts[i].as_str());
|
||||
offset_seq += 1;
|
||||
let start = format!("{}-{}", offset_id, offset_seq);
|
||||
let end = format!("{}-{}", u64::MAX - 1, 0);
|
||||
|
||||
// query stream range
|
||||
let range = s.range::<String, _>((Bound::Included(&start), Bound::Included(&end)));
|
||||
let mut array = Vec::new();
|
||||
for (k, v) in range {
|
||||
array.push(Protocol::BulkString(k.clone()));
|
||||
array.push(Protocol::from_vec(
|
||||
v.iter()
|
||||
.flat_map(|(a, b)| vec![a.as_str(), b.as_str()])
|
||||
.collect(),
|
||||
))
|
||||
}
|
||||
ret.push(Protocol::BulkString(stream_key.clone()));
|
||||
ret.push(Protocol::Array(array));
|
||||
}
|
||||
}
|
||||
Ok(Protocol::Array(ret))
|
||||
}
|
||||
|
||||
fn replconf_cmd(sub_cmd: &str, server: &mut Server) -> Result<Protocol, DBError> {
|
||||
match sub_cmd {
|
||||
"getack" => Ok(Protocol::from_vec(vec![
|
||||
"REPLCONF",
|
||||
"ACK",
|
||||
server
|
||||
.offset
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
.to_string()
|
||||
.as_str(),
|
||||
])),
|
||||
_ => Ok(Protocol::SimpleString("OK".to_string())),
|
||||
async fn type_cmd(server: &Server, k: &String) -> Result<Protocol, DBError> {
|
||||
match server.storage.get_key_type(k)? {
|
||||
Some(type_str) => Ok(Protocol::SimpleString(type_str)),
|
||||
None => Ok(Protocol::SimpleString("none".to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn xrange_cmd(
|
||||
server: &mut Server,
|
||||
stream_key: &String,
|
||||
start: &String,
|
||||
end: &String,
|
||||
) -> Result<Protocol, DBError> {
|
||||
let streams = server.streams.lock().await;
|
||||
let stream = streams.get(stream_key);
|
||||
Ok(stream.map_or(Protocol::none(), |s| {
|
||||
// support query with '-'
|
||||
let start = if start == "-" {
|
||||
"0".to_string()
|
||||
} else {
|
||||
start.clone()
|
||||
};
|
||||
|
||||
// support query with '+'
|
||||
let end = if end == "+" {
|
||||
u64::MAX.to_string()
|
||||
} else {
|
||||
end.clone()
|
||||
};
|
||||
|
||||
// query stream range
|
||||
let range = s.range::<String, _>((Bound::Included(&start), Bound::Included(&end)));
|
||||
let mut array = Vec::new();
|
||||
for (k, v) in range {
|
||||
array.push(Protocol::BulkString(k.clone()));
|
||||
array.push(Protocol::from_vec(
|
||||
v.iter()
|
||||
.flat_map(|(a, b)| vec![a.as_str(), b.as_str()])
|
||||
.collect(),
|
||||
))
|
||||
}
|
||||
println!("after xrange: {:?}", array);
|
||||
Protocol::Array(array)
|
||||
}))
|
||||
}
|
||||
|
||||
async fn xadd_cmd(
|
||||
offset: &str,
|
||||
server: &mut Server,
|
||||
stream_key: &str,
|
||||
kvps: &Vec<(String, String)>,
|
||||
protocol: Protocol,
|
||||
is_rep_con: bool,
|
||||
) -> Result<Protocol, DBError> {
|
||||
let mut offset = offset.to_string();
|
||||
if offset == "*" {
|
||||
offset = format!("{}-*", now_in_millis() as u64);
|
||||
}
|
||||
let (offset_id, mut offset_seq, has_wildcard) = split_offset(offset.as_str());
|
||||
if offset_id == 0 && offset_seq == 0 && !has_wildcard {
|
||||
return Ok(Protocol::err(
|
||||
"ERR The ID specified in XADD must be greater than 0-0",
|
||||
));
|
||||
}
|
||||
{
|
||||
let mut streams = server.streams.lock().await;
|
||||
let stream = streams
|
||||
.entry(stream_key.to_string())
|
||||
.or_insert_with(BTreeMap::new);
|
||||
|
||||
if let Some((last_offset, _)) = stream.last_key_value() {
|
||||
let (last_offset_id, last_offset_seq, _) = split_offset(last_offset.as_str());
|
||||
if last_offset_id > offset_id
|
||||
|| (last_offset_id == offset_id && last_offset_seq >= offset_seq && !has_wildcard)
|
||||
{
|
||||
return Ok(Protocol::err("ERR The ID specified in XADD is equal or smaller than the target stream top item"));
|
||||
}
|
||||
|
||||
if last_offset_id == offset_id && last_offset_seq >= offset_seq && has_wildcard {
|
||||
offset_seq = last_offset_seq + 1;
|
||||
}
|
||||
}
|
||||
|
||||
let offset = format!("{}-{}", offset_id, offset_seq);
|
||||
|
||||
let s = stream.entry(offset.clone()).or_insert_with(Vec::new);
|
||||
for (key, value) in kvps {
|
||||
s.push((key.clone(), value.clone()));
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut blocker = server.stream_reader_blocker.lock().await;
|
||||
for sender in blocker.iter() {
|
||||
sender.send(()).await?;
|
||||
}
|
||||
blocker.clear();
|
||||
}
|
||||
resp_and_replicate(
|
||||
server,
|
||||
Protocol::BulkString(offset.to_string()),
|
||||
protocol,
|
||||
is_rep_con,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn type_cmd(server: &mut Server, k: &String) -> Result<Protocol, DBError> {
|
||||
let v = { server.storage.lock().await.get(k) };
|
||||
if v.is_some() {
|
||||
return Ok(Protocol::SimpleString("string".to_string()));
|
||||
}
|
||||
let streams = server.streams.lock().await;
|
||||
let v = streams.get(k);
|
||||
Ok(v.map_or(Protocol::none(), |_| {
|
||||
Protocol::SimpleString("stream".to_string())
|
||||
}))
|
||||
}
|
||||
|
||||
fn psync_cmd(server: &mut Server) -> Result<Protocol, DBError> {
|
||||
if server.is_master() {
|
||||
Ok(Protocol::SimpleString(format!(
|
||||
"FULLRESYNC {} 0",
|
||||
server.option.replication.master_replid
|
||||
)))
|
||||
} else {
|
||||
Ok(Protocol::psync_on_slave_err())
|
||||
}
|
||||
}
|
||||
|
||||
async fn del_cmd(
|
||||
server: &mut Server,
|
||||
k: &str,
|
||||
protocol: Protocol,
|
||||
is_rep_con: bool,
|
||||
) -> Result<Protocol, DBError> {
|
||||
// offset
|
||||
let _ = {
|
||||
let mut s = server.storage.lock().await;
|
||||
s.del(k.to_string());
|
||||
server
|
||||
.offset
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
|
||||
};
|
||||
resp_and_replicate(server, Protocol::ok(), protocol, is_rep_con).await
|
||||
async fn del_cmd(server: &Server, k: &str) -> Result<Protocol, DBError> {
|
||||
server.storage.del(k.to_string())?;
|
||||
Ok(Protocol::SimpleString("1".to_string()))
|
||||
}
|
||||
|
||||
async fn set_ex_cmd(
|
||||
server: &mut Server,
|
||||
server: &Server,
|
||||
k: &str,
|
||||
v: &str,
|
||||
x: &u128,
|
||||
protocol: Protocol,
|
||||
is_rep_con: bool,
|
||||
) -> Result<Protocol, DBError> {
|
||||
// offset
|
||||
let _ = {
|
||||
let mut s = server.storage.lock().await;
|
||||
s.setx(k.to_string(), v.to_string(), *x * 1000);
|
||||
server
|
||||
.offset
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
|
||||
};
|
||||
resp_and_replicate(server, Protocol::ok(), protocol, is_rep_con).await
|
||||
server.storage.setx(k.to_string(), v.to_string(), *x * 1000)?;
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
|
||||
async fn set_px_cmd(
|
||||
server: &mut Server,
|
||||
server: &Server,
|
||||
k: &str,
|
||||
v: &str,
|
||||
x: &u128,
|
||||
protocol: Protocol,
|
||||
is_rep_con: bool,
|
||||
) -> Result<Protocol, DBError> {
|
||||
// offset
|
||||
let _ = {
|
||||
let mut s = server.storage.lock().await;
|
||||
s.setx(k.to_string(), v.to_string(), *x);
|
||||
server
|
||||
.offset
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
|
||||
};
|
||||
resp_and_replicate(server, Protocol::ok(), protocol, is_rep_con).await
|
||||
server.storage.setx(k.to_string(), v.to_string(), *x)?;
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
|
||||
async fn set_cmd(
|
||||
server: &mut Server,
|
||||
k: &str,
|
||||
v: &str,
|
||||
protocol: Protocol,
|
||||
is_rep_con: bool,
|
||||
) -> Result<Protocol, DBError> {
|
||||
// offset
|
||||
let _ = {
|
||||
let mut s = server.storage.lock().await;
|
||||
s.set(k.to_string(), v.to_string());
|
||||
server
|
||||
.offset
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
|
||||
+ 1
|
||||
};
|
||||
resp_and_replicate(server, Protocol::ok(), protocol, is_rep_con).await
|
||||
async fn set_cmd(server: &Server, k: &str, v: &str) -> Result<Protocol, DBError> {
|
||||
server.storage.set(k.to_string(), v.to_string())?;
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
|
||||
async fn get_cmd(server: &mut Server, k: &str) -> Result<Protocol, DBError> {
|
||||
let v = {
|
||||
let mut s = server.storage.lock().await;
|
||||
s.get(k)
|
||||
};
|
||||
Ok(v.map_or(Protocol::Null, Protocol::SimpleString))
|
||||
async fn get_cmd(server: &Server, k: &str) -> Result<Protocol, DBError> {
|
||||
let v = server.storage.get(k)?;
|
||||
Ok(v.map_or(Protocol::Null, Protocol::BulkString))
|
||||
}
|
||||
|
||||
async fn resp_and_replicate(
|
||||
server: &mut Server,
|
||||
resp: Protocol,
|
||||
replication: Protocol,
|
||||
is_rep_con: bool,
|
||||
) -> Result<Protocol, DBError> {
|
||||
if server.is_master() {
|
||||
server
|
||||
.master_repl_clients
|
||||
.lock()
|
||||
.await
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.send_command(replication)
|
||||
.await?;
|
||||
Ok(resp)
|
||||
} else if !is_rep_con {
|
||||
Ok(Protocol::write_on_slave_err())
|
||||
} else {
|
||||
Ok(resp)
|
||||
// Hash command implementations
|
||||
async fn hset_cmd(server: &Server, key: &str, pairs: &[(String, String)]) -> Result<Protocol, DBError> {
|
||||
let new_fields = server.storage.hset(key, pairs)?;
|
||||
Ok(Protocol::SimpleString(new_fields.to_string()))
|
||||
}
|
||||
|
||||
async fn hget_cmd(server: &Server, key: &str, field: &str) -> Result<Protocol, DBError> {
|
||||
match server.storage.hget(key, field) {
|
||||
Ok(Some(value)) => Ok(Protocol::BulkString(value)),
|
||||
Ok(None) => Ok(Protocol::Null),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
fn split_offset(offset: &str) -> (u64, u64, bool) {
|
||||
let offset_split = offset.split('-').collect::<Vec<_>>();
|
||||
let offset_id = offset_split[0].parse::<u64>().expect(&format!(
|
||||
"ERR The ID specified in XADD must be a number: {}",
|
||||
offset
|
||||
));
|
||||
|
||||
if offset_split.len() == 1 || offset_split[1] == "*" {
|
||||
return (offset_id, if offset_id == 0 { 1 } else { 0 }, true);
|
||||
async fn hgetall_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||
match server.storage.hgetall(key) {
|
||||
Ok(pairs) => {
|
||||
let mut result = Vec::new();
|
||||
for (field, value) in pairs {
|
||||
result.push(Protocol::BulkString(field));
|
||||
result.push(Protocol::BulkString(value));
|
||||
}
|
||||
Ok(Protocol::Array(result))
|
||||
}
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hdel_cmd(server: &Server, key: &str, fields: &[String]) -> Result<Protocol, DBError> {
|
||||
match server.storage.hdel(key, fields) {
|
||||
Ok(deleted) => Ok(Protocol::SimpleString(deleted.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hexists_cmd(server: &Server, key: &str, field: &str) -> Result<Protocol, DBError> {
|
||||
match server.storage.hexists(key, field) {
|
||||
Ok(exists) => Ok(Protocol::SimpleString(if exists { "1" } else { "0" }.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hkeys_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||
match server.storage.hkeys(key) {
|
||||
Ok(keys) => Ok(Protocol::Array(
|
||||
keys.into_iter().map(Protocol::BulkString).collect(),
|
||||
)),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hvals_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||
match server.storage.hvals(key) {
|
||||
Ok(values) => Ok(Protocol::Array(
|
||||
values.into_iter().map(Protocol::BulkString).collect(),
|
||||
)),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hlen_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||
match server.storage.hlen(key) {
|
||||
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hmget_cmd(server: &Server, key: &str, fields: &[String]) -> Result<Protocol, DBError> {
|
||||
match server.storage.hmget(key, fields) {
|
||||
Ok(values) => {
|
||||
let result: Vec<Protocol> = values
|
||||
.into_iter()
|
||||
.map(|v| v.map_or(Protocol::Null, Protocol::BulkString))
|
||||
.collect();
|
||||
Ok(Protocol::Array(result))
|
||||
}
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hsetnx_cmd(server: &Server, key: &str, field: &str, value: &str) -> Result<Protocol, DBError> {
|
||||
match server.storage.hsetnx(key, field, value) {
|
||||
Ok(was_set) => Ok(Protocol::SimpleString(if was_set { "1" } else { "0" }.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
|
||||
let offset_seq = offset_split[1].parse::<u64>().unwrap();
|
||||
(offset_id, offset_seq, false)
|
||||
}
|
||||
|
Reference in New Issue
Block a user