...
This commit is contained in:
308
src/age.rs
308
src/age.rs
@@ -1,308 +0,0 @@
|
||||
//! age.rs — AGE (rage) helpers + persistent key management for your mini-Redis.
|
||||
//
|
||||
// Features:
|
||||
// - X25519 encryption/decryption (age style)
|
||||
// - Ed25519 detached signatures + verification
|
||||
// - Persistent named keys in DB (strings):
|
||||
// age:key:{name} -> X25519 recipient (public encryption key, "age1...")
|
||||
// age:privkey:{name} -> X25519 identity (secret encryption key, "AGE-SECRET-KEY-1...")
|
||||
// age:signpub:{name} -> Ed25519 verify pubkey (public, used to verify signatures)
|
||||
// age:signpriv:{name} -> Ed25519 signing secret key (private, used to sign)
|
||||
// - Base64 wrapping for ciphertext/signature binary blobs.
|
||||
|
||||
use std::str::FromStr;
|
||||
|
||||
use secrecy::ExposeSecret;
|
||||
use age::{Decryptor, Encryptor};
|
||||
use age::x25519;
|
||||
|
||||
use ed25519_dalek::{Signature, Signer, Verifier, SigningKey, VerifyingKey};
|
||||
|
||||
use base64::{engine::general_purpose::STANDARD as B64, Engine as _};
|
||||
|
||||
use crate::protocol::Protocol;
|
||||
use crate::server::Server;
|
||||
use crate::error::DBError;
|
||||
|
||||
// ---------- Internal helpers ----------
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AgeWireError {
|
||||
ParseKey,
|
||||
Crypto(String),
|
||||
Utf8,
|
||||
SignatureLen,
|
||||
NotFound(&'static str), // which kind of key was missing
|
||||
Storage(String),
|
||||
}
|
||||
|
||||
impl AgeWireError {
|
||||
fn to_protocol(self) -> Protocol {
|
||||
match self {
|
||||
AgeWireError::ParseKey => Protocol::err("ERR age: invalid key"),
|
||||
AgeWireError::Crypto(e) => Protocol::err(&format!("ERR age: {e}")),
|
||||
AgeWireError::Utf8 => Protocol::err("ERR age: invalid UTF-8 plaintext"),
|
||||
AgeWireError::SignatureLen => Protocol::err("ERR age: bad signature length"),
|
||||
AgeWireError::NotFound(w) => Protocol::err(&format!("ERR age: missing {w}")),
|
||||
AgeWireError::Storage(e) => Protocol::err(&format!("ERR storage: {e}")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_recipient(s: &str) -> Result<x25519::Recipient, AgeWireError> {
|
||||
x25519::Recipient::from_str(s).map_err(|_| AgeWireError::ParseKey)
|
||||
}
|
||||
fn parse_identity(s: &str) -> Result<x25519::Identity, AgeWireError> {
|
||||
x25519::Identity::from_str(s).map_err(|_| AgeWireError::ParseKey)
|
||||
}
|
||||
fn parse_ed25519_signing_key(s: &str) -> Result<SigningKey, AgeWireError> {
|
||||
// Parse base64-encoded signing key
|
||||
let bytes = B64.decode(s).map_err(|_| AgeWireError::ParseKey)?;
|
||||
if bytes.len() != 32 {
|
||||
return Err(AgeWireError::ParseKey);
|
||||
}
|
||||
let key_bytes: [u8; 32] = bytes.try_into().map_err(|_| AgeWireError::ParseKey)?;
|
||||
Ok(SigningKey::from_bytes(&key_bytes))
|
||||
}
|
||||
fn parse_ed25519_verifying_key(s: &str) -> Result<VerifyingKey, AgeWireError> {
|
||||
// Parse base64-encoded verifying key
|
||||
let bytes = B64.decode(s).map_err(|_| AgeWireError::ParseKey)?;
|
||||
if bytes.len() != 32 {
|
||||
return Err(AgeWireError::ParseKey);
|
||||
}
|
||||
let key_bytes: [u8; 32] = bytes.try_into().map_err(|_| AgeWireError::ParseKey)?;
|
||||
VerifyingKey::from_bytes(&key_bytes).map_err(|_| AgeWireError::ParseKey)
|
||||
}
|
||||
|
||||
// ---------- Stateless crypto helpers (string in/out) ----------
|
||||
|
||||
pub fn gen_enc_keypair() -> (String, String) {
|
||||
let id = x25519::Identity::generate();
|
||||
let pk = id.to_public();
|
||||
(pk.to_string(), id.to_string().expose_secret().to_string()) // (recipient, identity)
|
||||
}
|
||||
|
||||
pub fn gen_sign_keypair() -> (String, String) {
|
||||
use rand::RngCore;
|
||||
use rand::rngs::OsRng;
|
||||
|
||||
// Generate random 32 bytes for the signing key
|
||||
let mut secret_bytes = [0u8; 32];
|
||||
OsRng.fill_bytes(&mut secret_bytes);
|
||||
|
||||
let signing_key = SigningKey::from_bytes(&secret_bytes);
|
||||
let verifying_key = signing_key.verifying_key();
|
||||
|
||||
// Encode as base64 for storage
|
||||
let signing_key_b64 = B64.encode(signing_key.to_bytes());
|
||||
let verifying_key_b64 = B64.encode(verifying_key.to_bytes());
|
||||
|
||||
(verifying_key_b64, signing_key_b64) // (verify_pub, signing_secret)
|
||||
}
|
||||
|
||||
/// Encrypt `msg` for `recipient_str` (X25519). Returns base64(ciphertext).
|
||||
pub fn encrypt_b64(recipient_str: &str, msg: &str) -> Result<String, AgeWireError> {
|
||||
let recipient = parse_recipient(recipient_str)?;
|
||||
let enc = Encryptor::with_recipients(vec![Box::new(recipient)])
|
||||
.expect("failed to create encryptor"); // Handle Option<Encryptor>
|
||||
let mut out = Vec::new();
|
||||
{
|
||||
use std::io::Write;
|
||||
let mut w = enc.wrap_output(&mut out).map_err(|e| AgeWireError::Crypto(e.to_string()))?;
|
||||
w.write_all(msg.as_bytes()).map_err(|e| AgeWireError::Crypto(e.to_string()))?;
|
||||
w.finish().map_err(|e| AgeWireError::Crypto(e.to_string()))?;
|
||||
}
|
||||
Ok(B64.encode(out))
|
||||
}
|
||||
|
||||
/// Decrypt base64(ciphertext) with `identity_str`. Returns plaintext String.
|
||||
pub fn decrypt_b64(identity_str: &str, ct_b64: &str) -> Result<String, AgeWireError> {
|
||||
let id = parse_identity(identity_str)?;
|
||||
let ct = B64.decode(ct_b64.as_bytes()).map_err(|e| AgeWireError::Crypto(e.to_string()))?;
|
||||
let dec = Decryptor::new(&ct[..]).map_err(|e| AgeWireError::Crypto(e.to_string()))?;
|
||||
|
||||
// The decrypt method returns a Result<StreamReader, DecryptError>
|
||||
let mut r = match dec {
|
||||
Decryptor::Recipients(d) => d.decrypt(std::iter::once(&id as &dyn age::Identity))
|
||||
.map_err(|e| AgeWireError::Crypto(e.to_string()))?,
|
||||
Decryptor::Passphrase(_) => return Err(AgeWireError::Crypto("Expected recipients, got passphrase".to_string())),
|
||||
};
|
||||
|
||||
let mut pt = Vec::new();
|
||||
use std::io::Read;
|
||||
r.read_to_end(&mut pt).map_err(|e| AgeWireError::Crypto(e.to_string()))?;
|
||||
String::from_utf8(pt).map_err(|_| AgeWireError::Utf8)
|
||||
}
|
||||
|
||||
/// Sign bytes of `msg` (detached). Returns base64(signature bytes, 64 bytes).
|
||||
pub fn sign_b64(signing_secret_str: &str, msg: &str) -> Result<String, AgeWireError> {
|
||||
let signing_key = parse_ed25519_signing_key(signing_secret_str)?;
|
||||
let sig = signing_key.sign(msg.as_bytes());
|
||||
Ok(B64.encode(sig.to_bytes()))
|
||||
}
|
||||
|
||||
/// Verify detached signature (base64) for `msg` with pubkey.
|
||||
pub fn verify_b64(verify_pub_str: &str, msg: &str, sig_b64: &str) -> Result<bool, AgeWireError> {
|
||||
let verifying_key = parse_ed25519_verifying_key(verify_pub_str)?;
|
||||
let sig_bytes = B64.decode(sig_b64.as_bytes()).map_err(|e| AgeWireError::Crypto(e.to_string()))?;
|
||||
if sig_bytes.len() != 64 {
|
||||
return Err(AgeWireError::SignatureLen);
|
||||
}
|
||||
let sig = Signature::from_bytes(sig_bytes[..].try_into().unwrap());
|
||||
Ok(verifying_key.verify(msg.as_bytes(), &sig).is_ok())
|
||||
}
|
||||
|
||||
// ---------- Storage helpers ----------
|
||||
|
||||
fn sget(server: &Server, key: &str) -> Result<Option<String>, AgeWireError> {
|
||||
let st = server.current_storage().map_err(|e| AgeWireError::Storage(e.0))?;
|
||||
st.get(key).map_err(|e| AgeWireError::Storage(e.0))
|
||||
}
|
||||
fn sset(server: &Server, key: &str, val: &str) -> Result<(), AgeWireError> {
|
||||
let st = server.current_storage().map_err(|e| AgeWireError::Storage(e.0))?;
|
||||
st.set(key.to_string(), val.to_string()).map_err(|e| AgeWireError::Storage(e.0))
|
||||
}
|
||||
|
||||
fn enc_pub_key_key(name: &str) -> String { format!("age:key:{name}") }
|
||||
fn enc_priv_key_key(name: &str) -> String { format!("age:privkey:{name}") }
|
||||
fn sign_pub_key_key(name: &str) -> String { format!("age:signpub:{name}") }
|
||||
fn sign_priv_key_key(name: &str) -> String { format!("age:signpriv:{name}") }
|
||||
|
||||
// ---------- Command handlers (RESP Protocol) ----------
|
||||
// Basic (stateless) ones kept for completeness
|
||||
|
||||
pub async fn cmd_age_genenc() -> Protocol {
|
||||
let (recip, ident) = gen_enc_keypair();
|
||||
Protocol::Array(vec![Protocol::BulkString(recip), Protocol::BulkString(ident)])
|
||||
}
|
||||
|
||||
pub async fn cmd_age_gensign() -> Protocol {
|
||||
let (verify, secret) = gen_sign_keypair();
|
||||
Protocol::Array(vec![Protocol::BulkString(verify), Protocol::BulkString(secret)])
|
||||
}
|
||||
|
||||
pub async fn cmd_age_encrypt(recipient: &str, message: &str) -> Protocol {
|
||||
match encrypt_b64(recipient, message) {
|
||||
Ok(b64) => Protocol::BulkString(b64),
|
||||
Err(e) => e.to_protocol(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cmd_age_decrypt(identity: &str, ct_b64: &str) -> Protocol {
|
||||
match decrypt_b64(identity, ct_b64) {
|
||||
Ok(pt) => Protocol::BulkString(pt),
|
||||
Err(e) => e.to_protocol(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cmd_age_sign(secret: &str, message: &str) -> Protocol {
|
||||
match sign_b64(secret, message) {
|
||||
Ok(b64sig) => Protocol::BulkString(b64sig),
|
||||
Err(e) => e.to_protocol(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cmd_age_verify(verify_pub: &str, message: &str, sig_b64: &str) -> Protocol {
|
||||
match verify_b64(verify_pub, message, sig_b64) {
|
||||
Ok(true) => Protocol::SimpleString("1".to_string()),
|
||||
Ok(false) => Protocol::SimpleString("0".to_string()),
|
||||
Err(e) => e.to_protocol(),
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- NEW: Persistent, named-key commands ----------
|
||||
|
||||
pub async fn cmd_age_keygen(server: &Server, name: &str) -> Protocol {
|
||||
let (recip, ident) = gen_enc_keypair();
|
||||
if let Err(e) = sset(server, &enc_pub_key_key(name), &recip) { return e.to_protocol(); }
|
||||
if let Err(e) = sset(server, &enc_priv_key_key(name), &ident) { return e.to_protocol(); }
|
||||
Protocol::Array(vec![Protocol::BulkString(recip), Protocol::BulkString(ident)])
|
||||
}
|
||||
|
||||
pub async fn cmd_age_signkeygen(server: &Server, name: &str) -> Protocol {
|
||||
let (verify, secret) = gen_sign_keypair();
|
||||
if let Err(e) = sset(server, &sign_pub_key_key(name), &verify) { return e.to_protocol(); }
|
||||
if let Err(e) = sset(server, &sign_priv_key_key(name), &secret) { return e.to_protocol(); }
|
||||
Protocol::Array(vec![Protocol::BulkString(verify), Protocol::BulkString(secret)])
|
||||
}
|
||||
|
||||
pub async fn cmd_age_encrypt_name(server: &Server, name: &str, message: &str) -> Protocol {
|
||||
let recip = match sget(server, &enc_pub_key_key(name)) {
|
||||
Ok(Some(v)) => v,
|
||||
Ok(None) => return AgeWireError::NotFound("recipient (age:key:{name})").to_protocol(),
|
||||
Err(e) => return e.to_protocol(),
|
||||
};
|
||||
match encrypt_b64(&recip, message) {
|
||||
Ok(ct) => Protocol::BulkString(ct),
|
||||
Err(e) => e.to_protocol(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cmd_age_decrypt_name(server: &Server, name: &str, ct_b64: &str) -> Protocol {
|
||||
let ident = match sget(server, &enc_priv_key_key(name)) {
|
||||
Ok(Some(v)) => v,
|
||||
Ok(None) => return AgeWireError::NotFound("identity (age:privkey:{name})").to_protocol(),
|
||||
Err(e) => return e.to_protocol(),
|
||||
};
|
||||
match decrypt_b64(&ident, ct_b64) {
|
||||
Ok(pt) => Protocol::BulkString(pt),
|
||||
Err(e) => e.to_protocol(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cmd_age_sign_name(server: &Server, name: &str, message: &str) -> Protocol {
|
||||
let sec = match sget(server, &sign_priv_key_key(name)) {
|
||||
Ok(Some(v)) => v,
|
||||
Ok(None) => return AgeWireError::NotFound("signing secret (age:signpriv:{name})").to_protocol(),
|
||||
Err(e) => return e.to_protocol(),
|
||||
};
|
||||
match sign_b64(&sec, message) {
|
||||
Ok(sig) => Protocol::BulkString(sig),
|
||||
Err(e) => e.to_protocol(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cmd_age_verify_name(server: &Server, name: &str, message: &str, sig_b64: &str) -> Protocol {
|
||||
let pubk = match sget(server, &sign_pub_key_key(name)) {
|
||||
Ok(Some(v)) => v,
|
||||
Ok(None) => return AgeWireError::NotFound("verify pubkey (age:signpub:{name})").to_protocol(),
|
||||
Err(e) => return e.to_protocol(),
|
||||
};
|
||||
match verify_b64(&pubk, message, sig_b64) {
|
||||
Ok(true) => Protocol::SimpleString("1".to_string()),
|
||||
Ok(false) => Protocol::SimpleString("0".to_string()),
|
||||
Err(e) => e.to_protocol(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cmd_age_list(server: &Server) -> Protocol {
|
||||
// Returns 4 arrays: ["encpub", <names...>], ["encpriv", ...], ["signpub", ...], ["signpriv", ...]
|
||||
let st = match server.current_storage() { Ok(s) => s, Err(e) => return Protocol::err(&e.0) };
|
||||
|
||||
let pull = |pat: &str, prefix: &str| -> Result<Vec<String>, DBError> {
|
||||
let keys = st.keys(pat)?;
|
||||
let mut names: Vec<String> = keys.into_iter()
|
||||
.filter_map(|k| k.strip_prefix(prefix).map(|x| x.to_string()))
|
||||
.collect();
|
||||
names.sort();
|
||||
Ok(names)
|
||||
};
|
||||
|
||||
let encpub = match pull("age:key:*", "age:key:") { Ok(v) => v, Err(e)=> return Protocol::err(&e.0) };
|
||||
let encpriv = match pull("age:privkey:*", "age:privkey:") { Ok(v) => v, Err(e)=> return Protocol::err(&e.0) };
|
||||
let signpub = match pull("age:signpub:*", "age:signpub:") { Ok(v) => v, Err(e)=> return Protocol::err(&e.0) };
|
||||
let signpriv= match pull("age:signpriv:*", "age:signpriv:") { Ok(v) => v, Err(e)=> return Protocol::err(&e.0) };
|
||||
|
||||
let to_arr = |label: &str, v: Vec<String>| {
|
||||
let mut out = vec![Protocol::BulkString(label.to_string())];
|
||||
out.push(Protocol::Array(v.into_iter().map(Protocol::BulkString).collect()));
|
||||
Protocol::Array(out)
|
||||
};
|
||||
|
||||
Protocol::Array(vec![
|
||||
to_arr("encpub", encpub),
|
||||
to_arr("encpriv", encpriv),
|
||||
to_arr("signpub", signpub),
|
||||
to_arr("signpriv", signpriv),
|
||||
])
|
||||
}
|
970
src/cmd.rs
970
src/cmd.rs
@@ -1,970 +0,0 @@
|
||||
use crate::{error::DBError, protocol::Protocol, server::Server};
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Cmd {
|
||||
Ping,
|
||||
Echo(String),
|
||||
Select(u64), // Changed from u16 to u64
|
||||
Get(String),
|
||||
Set(String, String),
|
||||
SetPx(String, String, u128),
|
||||
SetEx(String, String, u128),
|
||||
Keys,
|
||||
ConfigGet(String),
|
||||
Info(Option<String>),
|
||||
Del(String),
|
||||
Type(String),
|
||||
Incr(String),
|
||||
Multi,
|
||||
Exec,
|
||||
Discard,
|
||||
// Hash commands
|
||||
HSet(String, Vec<(String, String)>),
|
||||
HGet(String, String),
|
||||
HGetAll(String),
|
||||
HDel(String, Vec<String>),
|
||||
HExists(String, String),
|
||||
HKeys(String),
|
||||
HVals(String),
|
||||
HLen(String),
|
||||
HMGet(String, Vec<String>),
|
||||
HSetNx(String, String, String),
|
||||
HScan(String, u64, Option<String>, Option<u64>), // key, cursor, pattern, count
|
||||
Scan(u64, Option<String>, Option<u64>), // cursor, pattern, count
|
||||
Ttl(String),
|
||||
Exists(String),
|
||||
Quit,
|
||||
Client(Vec<String>),
|
||||
ClientSetName(String),
|
||||
ClientGetName,
|
||||
// List commands
|
||||
LPush(String, Vec<String>),
|
||||
RPush(String, Vec<String>),
|
||||
LPop(String, Option<u64>),
|
||||
RPop(String, Option<u64>),
|
||||
LLen(String),
|
||||
LRem(String, i64, String),
|
||||
LTrim(String, i64, i64),
|
||||
LIndex(String, i64),
|
||||
LRange(String, i64, i64),
|
||||
FlushDb,
|
||||
Unknow(String),
|
||||
// AGE (rage) commands — stateless
|
||||
AgeGenEnc,
|
||||
AgeGenSign,
|
||||
AgeEncrypt(String, String), // recipient, message
|
||||
AgeDecrypt(String, String), // identity, ciphertext_b64
|
||||
AgeSign(String, String), // signing_secret, message
|
||||
AgeVerify(String, String, String), // verify_pub, message, signature_b64
|
||||
|
||||
// NEW: persistent named-key commands
|
||||
AgeKeygen(String), // name
|
||||
AgeSignKeygen(String), // name
|
||||
AgeEncryptName(String, String), // name, message
|
||||
AgeDecryptName(String, String), // name, ciphertext_b64
|
||||
AgeSignName(String, String), // name, message
|
||||
AgeVerifyName(String, String, String), // name, message, signature_b64
|
||||
AgeList,
|
||||
}
|
||||
|
||||
impl Cmd {
|
||||
pub fn from(s: &str) -> Result<(Self, Protocol, &str), DBError> {
|
||||
let (protocol, remaining) = Protocol::from(s)?;
|
||||
match protocol.clone() {
|
||||
Protocol::Array(p) => {
|
||||
let cmd = p.into_iter().map(|x| x.decode()).collect::<Vec<_>>();
|
||||
if cmd.is_empty() {
|
||||
return Err(DBError("cmd length is 0".to_string()));
|
||||
}
|
||||
Ok((
|
||||
match cmd[0].to_lowercase().as_str() {
|
||||
"select" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError("wrong number of arguments for SELECT".to_string()));
|
||||
}
|
||||
let idx = cmd[1].parse::<u64>().map_err(|_| DBError("ERR DB index is not an integer".to_string()))?;
|
||||
Cmd::Select(idx)
|
||||
}
|
||||
"echo" => Cmd::Echo(cmd[1].clone()),
|
||||
"ping" => Cmd::Ping,
|
||||
"get" => Cmd::Get(cmd[1].clone()),
|
||||
"set" => {
|
||||
if cmd.len() == 5 && cmd[3].to_lowercase() == "px" {
|
||||
Cmd::SetPx(cmd[1].clone(), cmd[2].clone(), cmd[4].parse().unwrap())
|
||||
} else if cmd.len() == 5 && cmd[3].to_lowercase() == "ex" {
|
||||
Cmd::SetEx(cmd[1].clone(), cmd[2].clone(), cmd[4].parse().unwrap())
|
||||
} else if cmd.len() == 3 {
|
||||
Cmd::Set(cmd[1].clone(), cmd[2].clone())
|
||||
} else {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
}
|
||||
"setex" => {
|
||||
if cmd.len() != 4 {
|
||||
return Err(DBError(format!("wrong number of arguments for SETEX command")));
|
||||
}
|
||||
Cmd::SetEx(cmd[1].clone(), cmd[3].clone(), cmd[2].parse().unwrap())
|
||||
}
|
||||
"config" => {
|
||||
if cmd.len() != 3 || cmd[1].to_lowercase() != "get" {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
} else {
|
||||
Cmd::ConfigGet(cmd[2].clone())
|
||||
}
|
||||
}
|
||||
"keys" => {
|
||||
if cmd.len() != 2 || cmd[1] != "*" {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
} else {
|
||||
Cmd::Keys
|
||||
}
|
||||
}
|
||||
"info" => {
|
||||
let section = if cmd.len() == 2 {
|
||||
Some(cmd[1].clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Cmd::Info(section)
|
||||
}
|
||||
"del" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
Cmd::Del(cmd[1].clone())
|
||||
}
|
||||
"type" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
Cmd::Type(cmd[1].clone())
|
||||
}
|
||||
"incr" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
Cmd::Incr(cmd[1].clone())
|
||||
}
|
||||
"multi" => {
|
||||
if cmd.len() != 1 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
Cmd::Multi
|
||||
}
|
||||
"exec" => {
|
||||
if cmd.len() != 1 {
|
||||
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
|
||||
}
|
||||
Cmd::Exec
|
||||
}
|
||||
"discard" => Cmd::Discard,
|
||||
// Hash commands
|
||||
"hset" => {
|
||||
if cmd.len() < 4 || (cmd.len() - 2) % 2 != 0 {
|
||||
return Err(DBError(format!("wrong number of arguments for HSET command")));
|
||||
}
|
||||
let mut pairs = Vec::new();
|
||||
let mut i = 2;
|
||||
while i + 1 < cmd.len() {
|
||||
pairs.push((cmd[i].clone(), cmd[i + 1].clone()));
|
||||
i += 2;
|
||||
}
|
||||
Cmd::HSet(cmd[1].clone(), pairs)
|
||||
}
|
||||
"hget" => {
|
||||
if cmd.len() != 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for HGET command")));
|
||||
}
|
||||
Cmd::HGet(cmd[1].clone(), cmd[2].clone())
|
||||
}
|
||||
"hgetall" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for HGETALL command")));
|
||||
}
|
||||
Cmd::HGetAll(cmd[1].clone())
|
||||
}
|
||||
"hdel" => {
|
||||
if cmd.len() < 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for HDEL command")));
|
||||
}
|
||||
Cmd::HDel(cmd[1].clone(), cmd[2..].to_vec())
|
||||
}
|
||||
"hexists" => {
|
||||
if cmd.len() != 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for HEXISTS command")));
|
||||
}
|
||||
Cmd::HExists(cmd[1].clone(), cmd[2].clone())
|
||||
}
|
||||
"hkeys" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for HKEYS command")));
|
||||
}
|
||||
Cmd::HKeys(cmd[1].clone())
|
||||
}
|
||||
"hvals" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for HVALS command")));
|
||||
}
|
||||
Cmd::HVals(cmd[1].clone())
|
||||
}
|
||||
"hlen" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for HLEN command")));
|
||||
}
|
||||
Cmd::HLen(cmd[1].clone())
|
||||
}
|
||||
"hmget" => {
|
||||
if cmd.len() < 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for HMGET command")));
|
||||
}
|
||||
Cmd::HMGet(cmd[1].clone(), cmd[2..].to_vec())
|
||||
}
|
||||
"hsetnx" => {
|
||||
if cmd.len() != 4 {
|
||||
return Err(DBError(format!("wrong number of arguments for HSETNX command")));
|
||||
}
|
||||
Cmd::HSetNx(cmd[1].clone(), cmd[2].clone(), cmd[3].clone())
|
||||
}
|
||||
"hscan" => {
|
||||
if cmd.len() < 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for HSCAN command")));
|
||||
}
|
||||
|
||||
let key = cmd[1].clone();
|
||||
let cursor = cmd[2].parse::<u64>().map_err(|_|
|
||||
DBError("ERR invalid cursor".to_string()))?;
|
||||
|
||||
let mut pattern = None;
|
||||
let mut count = None;
|
||||
let mut i = 3;
|
||||
|
||||
while i < cmd.len() {
|
||||
match cmd[i].to_lowercase().as_str() {
|
||||
"match" => {
|
||||
if i + 1 >= cmd.len() {
|
||||
return Err(DBError("ERR syntax error".to_string()));
|
||||
}
|
||||
pattern = Some(cmd[i + 1].clone());
|
||||
i += 2;
|
||||
}
|
||||
"count" => {
|
||||
if i + 1 >= cmd.len() {
|
||||
return Err(DBError("ERR syntax error".to_string()));
|
||||
}
|
||||
count = Some(cmd[i + 1].parse::<u64>().map_err(|_|
|
||||
DBError("ERR value is not an integer or out of range".to_string()))?);
|
||||
i += 2;
|
||||
}
|
||||
_ => {
|
||||
return Err(DBError(format!("ERR syntax error")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Cmd::HScan(key, cursor, pattern, count)
|
||||
}
|
||||
"scan" => {
|
||||
if cmd.len() < 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for SCAN command")));
|
||||
}
|
||||
|
||||
let cursor = cmd[1].parse::<u64>().map_err(|_|
|
||||
DBError("ERR invalid cursor".to_string()))?;
|
||||
|
||||
let mut pattern = None;
|
||||
let mut count = None;
|
||||
let mut i = 2;
|
||||
|
||||
while i < cmd.len() {
|
||||
match cmd[i].to_lowercase().as_str() {
|
||||
"match" => {
|
||||
if i + 1 >= cmd.len() {
|
||||
return Err(DBError("ERR syntax error".to_string()));
|
||||
}
|
||||
pattern = Some(cmd[i + 1].clone());
|
||||
i += 2;
|
||||
}
|
||||
"count" => {
|
||||
if i + 1 >= cmd.len() {
|
||||
return Err(DBError("ERR syntax error".to_string()));
|
||||
}
|
||||
count = Some(cmd[i + 1].parse::<u64>().map_err(|_|
|
||||
DBError("ERR value is not an integer or out of range".to_string()))?);
|
||||
i += 2;
|
||||
}
|
||||
_ => {
|
||||
return Err(DBError(format!("ERR syntax error")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Cmd::Scan(cursor, pattern, count)
|
||||
}
|
||||
"ttl" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for TTL command")));
|
||||
}
|
||||
Cmd::Ttl(cmd[1].clone())
|
||||
}
|
||||
"exists" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for EXISTS command")));
|
||||
}
|
||||
Cmd::Exists(cmd[1].clone())
|
||||
}
|
||||
"quit" => {
|
||||
if cmd.len() != 1 {
|
||||
return Err(DBError(format!("wrong number of arguments for QUIT command")));
|
||||
}
|
||||
Cmd::Quit
|
||||
}
|
||||
"client" => {
|
||||
if cmd.len() > 1 {
|
||||
match cmd[1].to_lowercase().as_str() {
|
||||
"setname" => {
|
||||
if cmd.len() == 3 {
|
||||
Cmd::ClientSetName(cmd[2].clone())
|
||||
} else {
|
||||
return Err(DBError("wrong number of arguments for 'client setname' command".to_string()));
|
||||
}
|
||||
}
|
||||
"getname" => {
|
||||
if cmd.len() == 2 {
|
||||
Cmd::ClientGetName
|
||||
} else {
|
||||
return Err(DBError("wrong number of arguments for 'client getname' command".to_string()));
|
||||
}
|
||||
}
|
||||
_ => Cmd::Client(cmd[1..].to_vec()),
|
||||
}
|
||||
} else {
|
||||
Cmd::Client(vec![])
|
||||
}
|
||||
}
|
||||
"lpush" => {
|
||||
if cmd.len() < 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for LPUSH command")));
|
||||
}
|
||||
Cmd::LPush(cmd[1].clone(), cmd[2..].to_vec())
|
||||
}
|
||||
"rpush" => {
|
||||
if cmd.len() < 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for RPUSH command")));
|
||||
}
|
||||
Cmd::RPush(cmd[1].clone(), cmd[2..].to_vec())
|
||||
}
|
||||
"lpop" => {
|
||||
if cmd.len() < 2 || cmd.len() > 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for LPOP command")));
|
||||
}
|
||||
let count = if cmd.len() == 3 {
|
||||
Some(cmd[2].parse::<u64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Cmd::LPop(cmd[1].clone(), count)
|
||||
}
|
||||
"rpop" => {
|
||||
if cmd.len() < 2 || cmd.len() > 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for RPOP command")));
|
||||
}
|
||||
let count = if cmd.len() == 3 {
|
||||
Some(cmd[2].parse::<u64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Cmd::RPop(cmd[1].clone(), count)
|
||||
}
|
||||
"llen" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError(format!("wrong number of arguments for LLEN command")));
|
||||
}
|
||||
Cmd::LLen(cmd[1].clone())
|
||||
}
|
||||
"lrem" => {
|
||||
if cmd.len() != 4 {
|
||||
return Err(DBError(format!("wrong number of arguments for LREM command")));
|
||||
}
|
||||
let count = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
|
||||
Cmd::LRem(cmd[1].clone(), count, cmd[3].clone())
|
||||
}
|
||||
"ltrim" => {
|
||||
if cmd.len() != 4 {
|
||||
return Err(DBError(format!("wrong number of arguments for LTRIM command")));
|
||||
}
|
||||
let start = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
|
||||
let stop = cmd[3].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
|
||||
Cmd::LTrim(cmd[1].clone(), start, stop)
|
||||
}
|
||||
"lindex" => {
|
||||
if cmd.len() != 3 {
|
||||
return Err(DBError(format!("wrong number of arguments for LINDEX command")));
|
||||
}
|
||||
let index = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
|
||||
Cmd::LIndex(cmd[1].clone(), index)
|
||||
}
|
||||
"lrange" => {
|
||||
if cmd.len() != 4 {
|
||||
return Err(DBError(format!("wrong number of arguments for LRANGE command")));
|
||||
}
|
||||
let start = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
|
||||
let stop = cmd[3].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
|
||||
Cmd::LRange(cmd[1].clone(), start, stop)
|
||||
}
|
||||
"flushdb" => {
|
||||
if cmd.len() != 1 {
|
||||
return Err(DBError("wrong number of arguments for FLUSHDB command".to_string()));
|
||||
}
|
||||
Cmd::FlushDb
|
||||
}
|
||||
"age" => {
|
||||
if cmd.len() < 2 {
|
||||
return Err(DBError("wrong number of arguments for AGE".to_string()));
|
||||
}
|
||||
match cmd[1].to_lowercase().as_str() {
|
||||
// stateless
|
||||
"genenc" => { if cmd.len() != 2 { return Err(DBError("AGE GENENC takes no args".to_string())); }
|
||||
Cmd::AgeGenEnc }
|
||||
"gensign" => { if cmd.len() != 2 { return Err(DBError("AGE GENSIGN takes no args".to_string())); }
|
||||
Cmd::AgeGenSign }
|
||||
"encrypt" => { if cmd.len() != 4 { return Err(DBError("AGE ENCRYPT <recipient> <message>".to_string())); }
|
||||
Cmd::AgeEncrypt(cmd[2].clone(), cmd[3].clone()) }
|
||||
"decrypt" => { if cmd.len() != 4 { return Err(DBError("AGE DECRYPT <identity> <ciphertext_b64>".to_string())); }
|
||||
Cmd::AgeDecrypt(cmd[2].clone(), cmd[3].clone()) }
|
||||
"sign" => { if cmd.len() != 4 { return Err(DBError("AGE SIGN <signing_secret> <message>".to_string())); }
|
||||
Cmd::AgeSign(cmd[2].clone(), cmd[3].clone()) }
|
||||
"verify" => { if cmd.len() != 5 { return Err(DBError("AGE VERIFY <verify_pub> <message> <signature_b64>".to_string())); }
|
||||
Cmd::AgeVerify(cmd[2].clone(), cmd[3].clone(), cmd[4].clone()) }
|
||||
|
||||
// persistent names
|
||||
"keygen" => { if cmd.len() != 3 { return Err(DBError("AGE KEYGEN <name>".to_string())); }
|
||||
Cmd::AgeKeygen(cmd[2].clone()) }
|
||||
"signkeygen" => { if cmd.len() != 3 { return Err(DBError("AGE SIGNKEYGEN <name>".to_string())); }
|
||||
Cmd::AgeSignKeygen(cmd[2].clone()) }
|
||||
"encryptname" => { if cmd.len() != 4 { return Err(DBError("AGE ENCRYPTNAME <name> <message>".to_string())); }
|
||||
Cmd::AgeEncryptName(cmd[2].clone(), cmd[3].clone()) }
|
||||
"decryptname" => { if cmd.len() != 4 { return Err(DBError("AGE DECRYPTNAME <name> <ciphertext_b64>".to_string())); }
|
||||
Cmd::AgeDecryptName(cmd[2].clone(), cmd[3].clone()) }
|
||||
"signname" => { if cmd.len() != 4 { return Err(DBError("AGE SIGNNAME <name> <message>".to_string())); }
|
||||
Cmd::AgeSignName(cmd[2].clone(), cmd[3].clone()) }
|
||||
"verifyname" => { if cmd.len() != 5 { return Err(DBError("AGE VERIFYNAME <name> <message> <signature_b64>".to_string())); }
|
||||
Cmd::AgeVerifyName(cmd[2].clone(), cmd[3].clone(), cmd[4].clone()) }
|
||||
"list" => { if cmd.len() != 2 { return Err(DBError("AGE LIST".to_string())); }
|
||||
Cmd::AgeList }
|
||||
_ => return Err(DBError(format!("unsupported AGE subcommand {:?}", cmd))),
|
||||
}
|
||||
}
|
||||
_ => Cmd::Unknow(cmd[0].clone()),
|
||||
},
|
||||
protocol,
|
||||
remaining
|
||||
))
|
||||
}
|
||||
_ => Err(DBError(format!(
|
||||
"fail to parse as cmd for {:?}",
|
||||
protocol
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(self, server: &mut Server) -> Result<Protocol, DBError> {
|
||||
// Handle queued commands for transactions
|
||||
if server.queued_cmd.is_some()
|
||||
&& !matches!(self, Cmd::Exec)
|
||||
&& !matches!(self, Cmd::Multi)
|
||||
&& !matches!(self, Cmd::Discard)
|
||||
{
|
||||
let protocol = self.clone().to_protocol();
|
||||
server.queued_cmd.as_mut().unwrap().push((self, protocol));
|
||||
return Ok(Protocol::SimpleString("QUEUED".to_string()));
|
||||
}
|
||||
|
||||
match self {
|
||||
Cmd::Select(db) => select_cmd(server, db).await,
|
||||
Cmd::Ping => Ok(Protocol::SimpleString("PONG".to_string())),
|
||||
Cmd::Echo(s) => Ok(Protocol::BulkString(s)),
|
||||
Cmd::Get(k) => get_cmd(server, &k).await,
|
||||
Cmd::Set(k, v) => set_cmd(server, &k, &v).await,
|
||||
Cmd::SetPx(k, v, x) => set_px_cmd(server, &k, &v, &x).await,
|
||||
Cmd::SetEx(k, v, x) => set_ex_cmd(server, &k, &v, &x).await,
|
||||
Cmd::Del(k) => del_cmd(server, &k).await,
|
||||
Cmd::ConfigGet(name) => config_get_cmd(&name, server),
|
||||
Cmd::Keys => keys_cmd(server).await,
|
||||
Cmd::Info(section) => info_cmd(server, §ion).await,
|
||||
Cmd::Type(k) => type_cmd(server, &k).await,
|
||||
Cmd::Incr(key) => incr_cmd(server, &key).await,
|
||||
Cmd::Multi => {
|
||||
server.queued_cmd = Some(Vec::<(Cmd, Protocol)>::new());
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
Cmd::Exec => exec_cmd(server).await,
|
||||
Cmd::Discard => {
|
||||
if server.queued_cmd.is_some() {
|
||||
server.queued_cmd = None;
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
} else {
|
||||
Ok(Protocol::err("ERR DISCARD without MULTI"))
|
||||
}
|
||||
}
|
||||
// Hash commands
|
||||
Cmd::HSet(key, pairs) => hset_cmd(server, &key, &pairs).await,
|
||||
Cmd::HGet(key, field) => hget_cmd(server, &key, &field).await,
|
||||
Cmd::HGetAll(key) => hgetall_cmd(server, &key).await,
|
||||
Cmd::HDel(key, fields) => hdel_cmd(server, &key, &fields).await,
|
||||
Cmd::HExists(key, field) => hexists_cmd(server, &key, &field).await,
|
||||
Cmd::HKeys(key) => hkeys_cmd(server, &key).await,
|
||||
Cmd::HVals(key) => hvals_cmd(server, &key).await,
|
||||
Cmd::HLen(key) => hlen_cmd(server, &key).await,
|
||||
Cmd::HMGet(key, fields) => hmget_cmd(server, &key, &fields).await,
|
||||
Cmd::HSetNx(key, field, value) => hsetnx_cmd(server, &key, &field, &value).await,
|
||||
Cmd::HScan(key, cursor, pattern, count) => hscan_cmd(server, &key, &cursor, pattern.as_deref(), &count).await,
|
||||
Cmd::Scan(cursor, pattern, count) => scan_cmd(server, &cursor, pattern.as_deref(), &count).await,
|
||||
Cmd::Ttl(key) => ttl_cmd(server, &key).await,
|
||||
Cmd::Exists(key) => exists_cmd(server, &key).await,
|
||||
Cmd::Quit => Ok(Protocol::SimpleString("OK".to_string())),
|
||||
Cmd::Client(_) => Ok(Protocol::SimpleString("OK".to_string())),
|
||||
Cmd::ClientSetName(name) => client_setname_cmd(server, &name).await,
|
||||
Cmd::ClientGetName => client_getname_cmd(server).await,
|
||||
// List commands
|
||||
Cmd::LPush(key, elements) => lpush_cmd(server, &key, &elements).await,
|
||||
Cmd::RPush(key, elements) => rpush_cmd(server, &key, &elements).await,
|
||||
Cmd::LPop(key, count) => lpop_cmd(server, &key, &count).await,
|
||||
Cmd::RPop(key, count) => rpop_cmd(server, &key, &count).await,
|
||||
Cmd::LLen(key) => llen_cmd(server, &key).await,
|
||||
Cmd::LRem(key, count, element) => lrem_cmd(server, &key, count, &element).await,
|
||||
Cmd::LTrim(key, start, stop) => ltrim_cmd(server, &key, start, stop).await,
|
||||
Cmd::LIndex(key, index) => lindex_cmd(server, &key, index).await,
|
||||
Cmd::LRange(key, start, stop) => lrange_cmd(server, &key, start, stop).await,
|
||||
Cmd::FlushDb => flushdb_cmd(server).await,
|
||||
// AGE (rage): stateless
|
||||
Cmd::AgeGenEnc => Ok(crate::age::cmd_age_genenc().await),
|
||||
Cmd::AgeGenSign => Ok(crate::age::cmd_age_gensign().await),
|
||||
Cmd::AgeEncrypt(recipient, message) => Ok(crate::age::cmd_age_encrypt(&recipient, &message).await),
|
||||
Cmd::AgeDecrypt(identity, ct_b64) => Ok(crate::age::cmd_age_decrypt(&identity, &ct_b64).await),
|
||||
Cmd::AgeSign(secret, message) => Ok(crate::age::cmd_age_sign(&secret, &message).await),
|
||||
Cmd::AgeVerify(vpub, msg, sig_b64) => Ok(crate::age::cmd_age_verify(&vpub, &msg, &sig_b64).await),
|
||||
|
||||
// AGE (rage): persistent named keys
|
||||
Cmd::AgeKeygen(name) => Ok(crate::age::cmd_age_keygen(server, &name).await),
|
||||
Cmd::AgeSignKeygen(name) => Ok(crate::age::cmd_age_signkeygen(server, &name).await),
|
||||
Cmd::AgeEncryptName(name, message) => Ok(crate::age::cmd_age_encrypt_name(server, &name, &message).await),
|
||||
Cmd::AgeDecryptName(name, ct_b64) => Ok(crate::age::cmd_age_decrypt_name(server, &name, &ct_b64).await),
|
||||
Cmd::AgeSignName(name, message) => Ok(crate::age::cmd_age_sign_name(server, &name, &message).await),
|
||||
Cmd::AgeVerifyName(name, message, sig_b64) => Ok(crate::age::cmd_age_verify_name(server, &name, &message, &sig_b64).await),
|
||||
Cmd::AgeList => Ok(crate::age::cmd_age_list(server).await),
|
||||
Cmd::Unknow(s) => Ok(Protocol::err(&format!("ERR unknown command `{}`", s))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_protocol(self) -> Protocol {
|
||||
match self {
|
||||
Cmd::Select(db) => Protocol::Array(vec![Protocol::BulkString("select".to_string()), Protocol::BulkString(db.to_string())]),
|
||||
Cmd::Ping => Protocol::Array(vec![Protocol::BulkString("ping".to_string())]),
|
||||
Cmd::Echo(s) => Protocol::Array(vec![Protocol::BulkString("echo".to_string()), Protocol::BulkString(s)]),
|
||||
Cmd::Get(k) => Protocol::Array(vec![Protocol::BulkString("get".to_string()), Protocol::BulkString(k)]),
|
||||
Cmd::Set(k, v) => Protocol::Array(vec![Protocol::BulkString("set".to_string()), Protocol::BulkString(k), Protocol::BulkString(v)]),
|
||||
_ => Protocol::SimpleString("...".to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn flushdb_cmd(server: &mut Server) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.flushdb() {
|
||||
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn select_cmd(server: &mut Server, db: u64) -> Result<Protocol, DBError> {
|
||||
// Test if we can access the database (this will create it if needed)
|
||||
server.selected_db = db;
|
||||
match server.current_storage() {
|
||||
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn lindex_cmd(server: &Server, key: &str, index: i64) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.lindex(key, index) {
|
||||
Ok(Some(element)) => Ok(Protocol::BulkString(element)),
|
||||
Ok(None) => Ok(Protocol::Null),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn lrange_cmd(server: &Server, key: &str, start: i64, stop: i64) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.lrange(key, start, stop) {
|
||||
Ok(elements) => Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn ltrim_cmd(server: &Server, key: &str, start: i64, stop: i64) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.ltrim(key, start, stop) {
|
||||
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn lrem_cmd(server: &Server, key: &str, count: i64, element: &str) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.lrem(key, count, element) {
|
||||
Ok(removed_count) => Ok(Protocol::SimpleString(removed_count.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn llen_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.llen(key) {
|
||||
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn lpop_cmd(server: &Server, key: &str, count: &Option<u64>) -> Result<Protocol, DBError> {
|
||||
let count_val = count.unwrap_or(1);
|
||||
match server.current_storage()?.lpop(key, count_val) {
|
||||
Ok(elements) => {
|
||||
if elements.is_empty() {
|
||||
if count.is_some() {
|
||||
Ok(Protocol::Array(vec![]))
|
||||
} else {
|
||||
Ok(Protocol::Null)
|
||||
}
|
||||
} else if count.is_some() {
|
||||
Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect()))
|
||||
} else {
|
||||
Ok(Protocol::BulkString(elements[0].clone()))
|
||||
}
|
||||
},
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn rpop_cmd(server: &Server, key: &str, count: &Option<u64>) -> Result<Protocol, DBError> {
|
||||
let count_val = count.unwrap_or(1);
|
||||
match server.current_storage()?.rpop(key, count_val) {
|
||||
Ok(elements) => {
|
||||
if elements.is_empty() {
|
||||
if count.is_some() {
|
||||
Ok(Protocol::Array(vec![]))
|
||||
} else {
|
||||
Ok(Protocol::Null)
|
||||
}
|
||||
} else if count.is_some() {
|
||||
Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect()))
|
||||
} else {
|
||||
Ok(Protocol::BulkString(elements[0].clone()))
|
||||
}
|
||||
},
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn lpush_cmd(server: &Server, key: &str, elements: &[String]) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.lpush(key, elements.to_vec()) {
|
||||
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn rpush_cmd(server: &Server, key: &str, elements: &[String]) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.rpush(key, elements.to_vec()) {
|
||||
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn exec_cmd(server: &mut Server) -> Result<Protocol, DBError> {
|
||||
// Move the queued commands out of `server` so we drop the borrow immediately.
|
||||
let cmds = if let Some(cmds) = server.queued_cmd.take() {
|
||||
cmds
|
||||
} else {
|
||||
return Ok(Protocol::err("ERR EXEC without MULTI"));
|
||||
};
|
||||
|
||||
let mut out = Vec::new();
|
||||
for (cmd, _) in cmds {
|
||||
// Use Box::pin to handle recursion in async function
|
||||
let res = Box::pin(cmd.run(server)).await?;
|
||||
out.push(res);
|
||||
}
|
||||
Ok(Protocol::Array(out))
|
||||
}
|
||||
|
||||
async fn incr_cmd(server: &Server, key: &String) -> Result<Protocol, DBError> {
|
||||
let storage = server.current_storage()?;
|
||||
let current_value = storage.get(key)?;
|
||||
|
||||
let new_value = match current_value {
|
||||
Some(v) => {
|
||||
match v.parse::<i64>() {
|
||||
Ok(num) => num + 1,
|
||||
Err(_) => return Ok(Protocol::err("ERR value is not an integer or out of range")),
|
||||
}
|
||||
}
|
||||
None => 1,
|
||||
};
|
||||
|
||||
storage.set(key.clone(), new_value.to_string())?;
|
||||
Ok(Protocol::SimpleString(new_value.to_string()))
|
||||
}
|
||||
|
||||
fn config_get_cmd(name: &String, server: &Server) -> Result<Protocol, DBError> {
|
||||
let value = match name.as_str() {
|
||||
"dir" => Some(server.option.dir.clone()),
|
||||
"dbfilename" => Some(format!("{}.db", server.selected_db)),
|
||||
"databases" => Some("16".to_string()), // Hardcoded as per original logic
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if let Some(val) = value {
|
||||
Ok(Protocol::Array(vec![
|
||||
Protocol::BulkString(name.clone()),
|
||||
Protocol::BulkString(val),
|
||||
]))
|
||||
} else {
|
||||
// Return an empty array for unknown config options, which is standard Redis behavior
|
||||
Ok(Protocol::Array(vec![]))
|
||||
}
|
||||
}
|
||||
|
||||
async fn keys_cmd(server: &Server) -> Result<Protocol, DBError> {
|
||||
let keys = server.current_storage()?.keys("*")?;
|
||||
Ok(Protocol::Array(
|
||||
keys.into_iter().map(Protocol::BulkString).collect(),
|
||||
))
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct ServerInfo {
|
||||
redis_version: String,
|
||||
encrypted: bool,
|
||||
selected_db: u64,
|
||||
}
|
||||
|
||||
async fn info_cmd(server: &Server, section: &Option<String>) -> Result<Protocol, DBError> {
|
||||
let info = ServerInfo {
|
||||
redis_version: "7.0.0".to_string(),
|
||||
encrypted: server.current_storage()?.is_encrypted(),
|
||||
selected_db: server.selected_db,
|
||||
};
|
||||
|
||||
let mut info_string = String::new();
|
||||
info_string.push_str(&format!("# Server\n"));
|
||||
info_string.push_str(&format!("redis_version:{}\n", info.redis_version));
|
||||
info_string.push_str(&format!("encrypted:{}\n", if info.encrypted { 1 } else { 0 }));
|
||||
info_string.push_str(&format!("# Keyspace\n"));
|
||||
info_string.push_str(&format!("db{}:keys=0,expires=0,avg_ttl=0\n", info.selected_db));
|
||||
|
||||
|
||||
match section {
|
||||
Some(s) => match s.as_str() {
|
||||
"replication" => Ok(Protocol::BulkString(
|
||||
"role:master\nmaster_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeea\nmaster_repl_offset:0\n".to_string()
|
||||
)),
|
||||
_ => Err(DBError(format!("unsupported section {:?}", s))),
|
||||
},
|
||||
None => {
|
||||
Ok(Protocol::BulkString(info_string))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn type_cmd(server: &Server, k: &String) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.get_key_type(k)? {
|
||||
Some(type_str) => Ok(Protocol::SimpleString(type_str)),
|
||||
None => Ok(Protocol::SimpleString("none".to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn del_cmd(server: &Server, k: &str) -> Result<Protocol, DBError> {
|
||||
server.current_storage()?.del(k.to_string())?;
|
||||
Ok(Protocol::SimpleString("1".to_string()))
|
||||
}
|
||||
|
||||
async fn set_ex_cmd(
|
||||
server: &Server,
|
||||
k: &str,
|
||||
v: &str,
|
||||
x: &u128,
|
||||
) -> Result<Protocol, DBError> {
|
||||
server.current_storage()?.setx(k.to_string(), v.to_string(), *x * 1000)?;
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
|
||||
async fn set_px_cmd(
|
||||
server: &Server,
|
||||
k: &str,
|
||||
v: &str,
|
||||
x: &u128,
|
||||
) -> Result<Protocol, DBError> {
|
||||
server.current_storage()?.setx(k.to_string(), v.to_string(), *x)?;
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
|
||||
async fn set_cmd(server: &Server, k: &str, v: &str) -> Result<Protocol, DBError> {
|
||||
server.current_storage()?.set(k.to_string(), v.to_string())?;
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
|
||||
async fn get_cmd(server: &Server, k: &str) -> Result<Protocol, DBError> {
|
||||
let v = server.current_storage()?.get(k)?;
|
||||
Ok(v.map_or(Protocol::Null, Protocol::BulkString))
|
||||
}
|
||||
|
||||
// Hash command implementations
|
||||
async fn hset_cmd(server: &Server, key: &str, pairs: &[(String, String)]) -> Result<Protocol, DBError> {
|
||||
let new_fields = server.current_storage()?.hset(key, pairs.to_vec())?;
|
||||
Ok(Protocol::SimpleString(new_fields.to_string()))
|
||||
}
|
||||
|
||||
async fn hget_cmd(server: &Server, key: &str, field: &str) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.hget(key, field) {
|
||||
Ok(Some(value)) => Ok(Protocol::BulkString(value)),
|
||||
Ok(None) => Ok(Protocol::Null),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hgetall_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.hgetall(key) {
|
||||
Ok(pairs) => {
|
||||
let mut result = Vec::new();
|
||||
for (field, value) in pairs {
|
||||
result.push(Protocol::BulkString(field));
|
||||
result.push(Protocol::BulkString(value));
|
||||
}
|
||||
Ok(Protocol::Array(result))
|
||||
}
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hdel_cmd(server: &Server, key: &str, fields: &[String]) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.hdel(key, fields.to_vec()) {
|
||||
Ok(deleted) => Ok(Protocol::SimpleString(deleted.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hexists_cmd(server: &Server, key: &str, field: &str) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.hexists(key, field) {
|
||||
Ok(exists) => Ok(Protocol::SimpleString(if exists { "1" } else { "0" }.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hkeys_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.hkeys(key) {
|
||||
Ok(keys) => Ok(Protocol::Array(
|
||||
keys.into_iter().map(Protocol::BulkString).collect(),
|
||||
)),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hvals_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.hvals(key) {
|
||||
Ok(values) => Ok(Protocol::Array(
|
||||
values.into_iter().map(Protocol::BulkString).collect(),
|
||||
)),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hlen_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.hlen(key) {
|
||||
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hmget_cmd(server: &Server, key: &str, fields: &[String]) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.hmget(key, fields.to_vec()) {
|
||||
Ok(values) => {
|
||||
let result: Vec<Protocol> = values
|
||||
.into_iter()
|
||||
.map(|v| v.map_or(Protocol::Null, Protocol::BulkString))
|
||||
.collect();
|
||||
Ok(Protocol::Array(result))
|
||||
}
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hsetnx_cmd(server: &Server, key: &str, field: &str, value: &str) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.hsetnx(key, field, value) {
|
||||
Ok(was_set) => Ok(Protocol::SimpleString(if was_set { "1" } else { "0" }.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn scan_cmd(
|
||||
server: &Server,
|
||||
cursor: &u64,
|
||||
pattern: Option<&str>,
|
||||
count: &Option<u64>
|
||||
) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.scan(*cursor, pattern, *count) {
|
||||
Ok((next_cursor, key_value_pairs)) => {
|
||||
let mut result = Vec::new();
|
||||
result.push(Protocol::BulkString(next_cursor.to_string()));
|
||||
// For SCAN, we only return the keys, not the values
|
||||
let keys: Vec<Protocol> = key_value_pairs.into_iter().map(|(key, _)| Protocol::BulkString(key)).collect();
|
||||
result.push(Protocol::Array(keys));
|
||||
Ok(Protocol::Array(result))
|
||||
}
|
||||
Err(e) => Ok(Protocol::err(&format!("ERR {}", e.0))),
|
||||
}
|
||||
}
|
||||
|
||||
async fn hscan_cmd(
|
||||
server: &Server,
|
||||
key: &str,
|
||||
cursor: &u64,
|
||||
pattern: Option<&str>,
|
||||
count: &Option<u64>
|
||||
) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.hscan(key, *cursor, pattern, *count) {
|
||||
Ok((next_cursor, field_value_pairs)) => {
|
||||
let mut result = Vec::new();
|
||||
result.push(Protocol::BulkString(next_cursor.to_string()));
|
||||
// For HSCAN, we return field-value pairs flattened
|
||||
let mut fields_and_values = Vec::new();
|
||||
for (field, value) in field_value_pairs {
|
||||
fields_and_values.push(Protocol::BulkString(field));
|
||||
fields_and_values.push(Protocol::BulkString(value));
|
||||
}
|
||||
result.push(Protocol::Array(fields_and_values));
|
||||
Ok(Protocol::Array(result))
|
||||
}
|
||||
Err(e) => Ok(Protocol::err(&format!("ERR {}", e.0))),
|
||||
}
|
||||
}
|
||||
|
||||
async fn ttl_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.ttl(key) {
|
||||
Ok(ttl) => Ok(Protocol::SimpleString(ttl.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn exists_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||
match server.current_storage()?.exists(key) {
|
||||
Ok(exists) => Ok(Protocol::SimpleString(if exists { "1" } else { "0" }.to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn client_setname_cmd(server: &mut Server, name: &str) -> Result<Protocol, DBError> {
|
||||
server.client_name = Some(name.to_string());
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
|
||||
async fn client_getname_cmd(server: &Server) -> Result<Protocol, DBError> {
|
||||
match &server.client_name {
|
||||
Some(name) => Ok(Protocol::BulkString(name.clone())),
|
||||
None => Ok(Protocol::Null),
|
||||
}
|
||||
}
|
@@ -1,73 +0,0 @@
|
||||
use chacha20poly1305::{
|
||||
aead::{Aead, KeyInit, OsRng},
|
||||
XChaCha20Poly1305, XNonce,
|
||||
};
|
||||
use rand::RngCore;
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
const VERSION: u8 = 1;
|
||||
const NONCE_LEN: usize = 24;
|
||||
const TAG_LEN: usize = 16;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum CryptoError {
|
||||
Format, // wrong length / header
|
||||
Version(u8), // unknown version
|
||||
Decrypt, // wrong key or corrupted data
|
||||
}
|
||||
|
||||
impl From<CryptoError> for crate::error::DBError {
|
||||
fn from(e: CryptoError) -> Self {
|
||||
crate::error::DBError(format!("Crypto error: {:?}", e))
|
||||
}
|
||||
}
|
||||
|
||||
/// Super-simple factory: new(secret) + encrypt(bytes) + decrypt(bytes)
|
||||
pub struct CryptoFactory {
|
||||
key: chacha20poly1305::Key,
|
||||
}
|
||||
|
||||
impl CryptoFactory {
|
||||
/// Accepts any secret bytes; turns them into a 32-byte key (SHA-256).
|
||||
pub fn new<S: AsRef<[u8]>>(secret: S) -> Self {
|
||||
let mut h = Sha256::new();
|
||||
h.update(b"xchacha20poly1305-factory:v1"); // domain separation
|
||||
h.update(secret.as_ref());
|
||||
let digest = h.finalize(); // 32 bytes
|
||||
let key = chacha20poly1305::Key::from_slice(&digest).to_owned();
|
||||
Self { key }
|
||||
}
|
||||
|
||||
/// Output layout: [version:1][nonce:24][ciphertext||tag]
|
||||
pub fn encrypt(&self, plaintext: &[u8]) -> Vec<u8> {
|
||||
let cipher = XChaCha20Poly1305::new(&self.key);
|
||||
|
||||
let mut nonce_bytes = [0u8; NONCE_LEN];
|
||||
OsRng.fill_bytes(&mut nonce_bytes);
|
||||
let nonce = XNonce::from_slice(&nonce_bytes);
|
||||
|
||||
let mut out = Vec::with_capacity(1 + NONCE_LEN + plaintext.len() + TAG_LEN);
|
||||
out.push(VERSION);
|
||||
out.extend_from_slice(&nonce_bytes);
|
||||
|
||||
let ct = cipher.encrypt(nonce, plaintext).expect("encrypt");
|
||||
out.extend_from_slice(&ct);
|
||||
out
|
||||
}
|
||||
|
||||
pub fn decrypt(&self, blob: &[u8]) -> Result<Vec<u8>, CryptoError> {
|
||||
if blob.len() < 1 + NONCE_LEN + TAG_LEN {
|
||||
return Err(CryptoError::Format);
|
||||
}
|
||||
let ver = blob[0];
|
||||
if ver != VERSION {
|
||||
return Err(CryptoError::Version(ver));
|
||||
}
|
||||
|
||||
let nonce = XNonce::from_slice(&blob[1..1 + NONCE_LEN]);
|
||||
let ct = &blob[1 + NONCE_LEN..];
|
||||
|
||||
let cipher = XChaCha20Poly1305::new(&self.key);
|
||||
cipher.decrypt(nonce, ct).map_err(|_| CryptoError::Decrypt)
|
||||
}
|
||||
}
|
94
src/error.rs
94
src/error.rs
@@ -1,94 +0,0 @@
|
||||
use std::num::ParseIntError;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use redb;
|
||||
use bincode;
|
||||
|
||||
|
||||
// todo: more error types
|
||||
#[derive(Debug)]
|
||||
pub struct DBError(pub String);
|
||||
|
||||
impl From<std::io::Error> for DBError {
|
||||
fn from(item: std::io::Error) -> Self {
|
||||
DBError(item.to_string().clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ParseIntError> for DBError {
|
||||
fn from(item: ParseIntError) -> Self {
|
||||
DBError(item.to_string().clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::str::Utf8Error> for DBError {
|
||||
fn from(item: std::str::Utf8Error) -> Self {
|
||||
DBError(item.to_string().clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::string::FromUtf8Error> for DBError {
|
||||
fn from(item: std::string::FromUtf8Error) -> Self {
|
||||
DBError(item.to_string().clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<redb::Error> for DBError {
|
||||
fn from(item: redb::Error) -> Self {
|
||||
DBError(item.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<redb::DatabaseError> for DBError {
|
||||
fn from(item: redb::DatabaseError) -> Self {
|
||||
DBError(item.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<redb::TransactionError> for DBError {
|
||||
fn from(item: redb::TransactionError) -> Self {
|
||||
DBError(item.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<redb::TableError> for DBError {
|
||||
fn from(item: redb::TableError) -> Self {
|
||||
DBError(item.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<redb::StorageError> for DBError {
|
||||
fn from(item: redb::StorageError) -> Self {
|
||||
DBError(item.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<redb::CommitError> for DBError {
|
||||
fn from(item: redb::CommitError) -> Self {
|
||||
DBError(item.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Box<bincode::ErrorKind>> for DBError {
|
||||
fn from(item: Box<bincode::ErrorKind>) -> Self {
|
||||
DBError(item.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio::sync::mpsc::error::SendError<()>> for DBError {
|
||||
fn from(item: mpsc::error::SendError<()>) -> Self {
|
||||
DBError(item.to_string().clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for DBError {
|
||||
fn from(item: serde_json::Error) -> Self {
|
||||
DBError(item.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<chacha20poly1305::Error> for DBError {
|
||||
fn from(item: chacha20poly1305::Error) -> Self {
|
||||
DBError(item.to_string())
|
||||
}
|
||||
}
|
@@ -1,8 +0,0 @@
|
||||
pub mod age; // NEW
|
||||
pub mod cmd;
|
||||
pub mod crypto;
|
||||
pub mod error;
|
||||
pub mod options;
|
||||
pub mod protocol;
|
||||
pub mod server;
|
||||
pub mod storage;
|
81
src/main.rs
81
src/main.rs
@@ -1,81 +0,0 @@
|
||||
// #![allow(unused_imports)]
|
||||
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use redis_rs::server;
|
||||
|
||||
use clap::Parser;
|
||||
|
||||
/// Simple program to greet a person
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// The directory of Redis DB file
|
||||
#[arg(long)]
|
||||
dir: String,
|
||||
|
||||
/// The port of the Redis server, default is 6379 if not specified
|
||||
#[arg(long)]
|
||||
port: Option<u16>,
|
||||
|
||||
/// Enable debug mode
|
||||
#[arg(long)]
|
||||
debug: bool,
|
||||
|
||||
|
||||
/// Master encryption key for encrypted databases
|
||||
#[arg(long)]
|
||||
encryption_key: Option<String>,
|
||||
|
||||
/// Encrypt the database
|
||||
#[arg(long)]
|
||||
encrypt: bool,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// parse args
|
||||
let args = Args::parse();
|
||||
|
||||
// bind port
|
||||
let port = args.port.unwrap_or(6379);
|
||||
println!("will listen on port: {}", port);
|
||||
let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// new DB option
|
||||
let option = redis_rs::options::DBOption {
|
||||
dir: args.dir,
|
||||
port,
|
||||
debug: args.debug,
|
||||
encryption_key: args.encryption_key,
|
||||
encrypt: args.encrypt,
|
||||
};
|
||||
|
||||
// new server
|
||||
let server = server::Server::new(option).await;
|
||||
|
||||
// Add a small delay to ensure the port is ready
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
// accept new connections
|
||||
loop {
|
||||
let stream = listener.accept().await;
|
||||
match stream {
|
||||
Ok((stream, _)) => {
|
||||
println!("accepted new connection");
|
||||
|
||||
let mut sc = server.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = sc.handle(stream).await {
|
||||
println!("error: {:?}, will close the connection. Bye", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
println!("error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,8 +0,0 @@
|
||||
#[derive(Clone)]
|
||||
pub struct DBOption {
|
||||
pub dir: String,
|
||||
pub port: u16,
|
||||
pub debug: bool,
|
||||
pub encrypt: bool,
|
||||
pub encryption_key: Option<String>, // Master encryption key
|
||||
}
|
159
src/protocol.rs
159
src/protocol.rs
@@ -1,159 +0,0 @@
|
||||
use core::fmt;
|
||||
|
||||
use crate::error::DBError;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Protocol {
|
||||
SimpleString(String),
|
||||
BulkString(String),
|
||||
Null,
|
||||
Array(Vec<Protocol>),
|
||||
Error(String), // NEW
|
||||
}
|
||||
|
||||
impl fmt::Display for Protocol {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", self.decode().as_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl Protocol {
|
||||
pub fn from(protocol: &str) -> Result<(Self, &str), DBError> {
|
||||
let ret = match protocol.chars().nth(0) {
|
||||
Some('+') => Self::parse_simple_string_sfx(&protocol[1..]),
|
||||
Some('$') => Self::parse_bulk_string_sfx(&protocol[1..]),
|
||||
Some('*') => Self::parse_array_sfx(&protocol[1..]),
|
||||
_ => Err(DBError(format!(
|
||||
"[from] unsupported protocol: {:?}",
|
||||
protocol
|
||||
))),
|
||||
};
|
||||
ret
|
||||
}
|
||||
|
||||
pub fn from_vec(array: Vec<&str>) -> Self {
|
||||
let array = array
|
||||
.into_iter()
|
||||
.map(|x| Protocol::BulkString(x.to_string()))
|
||||
.collect();
|
||||
Protocol::Array(array)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn ok() -> Self {
|
||||
Protocol::SimpleString("ok".to_string())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn err(msg: &str) -> Self {
|
||||
Protocol::Error(msg.to_string())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn write_on_slave_err() -> Self {
|
||||
Self::err("DISALLOW WRITE ON SLAVE")
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn psync_on_slave_err() -> Self {
|
||||
Self::err("PSYNC ON SLAVE IS NOT ALLOWED")
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn none() -> Self {
|
||||
Self::SimpleString("none".to_string())
|
||||
}
|
||||
|
||||
pub fn decode(&self) -> String {
|
||||
match self {
|
||||
Protocol::SimpleString(s) => s.to_string(),
|
||||
Protocol::BulkString(s) => s.to_string(),
|
||||
Protocol::Null => "".to_string(),
|
||||
Protocol::Array(s) => s.iter().map(|x| x.decode()).collect::<Vec<_>>().join(" "),
|
||||
Protocol::Error(s) => s.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn encode(&self) -> String {
|
||||
match self {
|
||||
Protocol::SimpleString(s) => format!("+{}\r\n", s),
|
||||
Protocol::BulkString(s) => format!("${}\r\n{}\r\n", s.len(), s),
|
||||
Protocol::Array(ss) => {
|
||||
format!("*{}\r\n", ss.len()) + &ss.iter().map(|x| x.encode()).collect::<String>()
|
||||
}
|
||||
Protocol::Null => "$-1\r\n".to_string(),
|
||||
Protocol::Error(s) => format!("-{}\r\n", s), // proper RESP error
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_simple_string_sfx(protocol: &str) -> Result<(Self, &str), DBError> {
|
||||
match protocol.find("\r\n") {
|
||||
Some(x) => Ok((Self::SimpleString(protocol[..x].to_string()), &protocol[x + 2..])),
|
||||
_ => Err(DBError(format!(
|
||||
"[new simple string] unsupported protocol: {:?}",
|
||||
protocol
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_bulk_string_sfx(protocol: &str) -> Result<(Self, &str), DBError> {
|
||||
if let Some(len_end) = protocol.find("\r\n") {
|
||||
let size = Self::parse_usize(&protocol[..len_end])?;
|
||||
let data_start = len_end + 2;
|
||||
let data_end = data_start + size;
|
||||
let s = Self::parse_string(&protocol[data_start..data_end])?;
|
||||
|
||||
if protocol.len() < data_end + 2 || &protocol[data_end..data_end+2] != "\r\n" {
|
||||
Err(DBError(format!(
|
||||
"[new bulk string] unmatched string length in prototocl {:?}",
|
||||
protocol,
|
||||
)))
|
||||
} else {
|
||||
Ok((Protocol::BulkString(s), &protocol[data_end + 2..]))
|
||||
}
|
||||
} else {
|
||||
Err(DBError(format!(
|
||||
"[new bulk string] unsupported protocol: {:?}",
|
||||
protocol
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_array_sfx(s: &str) -> Result<(Self, &str), DBError> {
|
||||
if let Some(len_end) = s.find("\r\n") {
|
||||
let array_len = s[..len_end].parse::<usize>()?;
|
||||
let mut remaining = &s[len_end + 2..];
|
||||
let mut vec = vec![];
|
||||
for _ in 0..array_len {
|
||||
let (p, rem) = Protocol::from(remaining)?;
|
||||
vec.push(p);
|
||||
remaining = rem;
|
||||
}
|
||||
Ok((Protocol::Array(vec), remaining))
|
||||
} else {
|
||||
Err(DBError(format!(
|
||||
"[new array] unsupported protocol: {:?}",
|
||||
s
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_usize(protocol: &str) -> Result<usize, DBError> {
|
||||
if protocol.is_empty() {
|
||||
Err(DBError("Cannot parse usize from empty string".to_string()))
|
||||
} else {
|
||||
protocol
|
||||
.parse::<usize>()
|
||||
.map_err(|_| DBError(format!("Failed to parse usize from: {}", protocol)))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_string(protocol: &str) -> Result<String, DBError> {
|
||||
if protocol.is_empty() {
|
||||
// Allow empty strings, but handle appropriately
|
||||
Ok("".to_string())
|
||||
} else {
|
||||
Ok(protocol.to_string())
|
||||
}
|
||||
}
|
||||
}
|
136
src/server.rs
136
src/server.rs
@@ -1,136 +0,0 @@
|
||||
use core::str;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
use crate::cmd::Cmd;
|
||||
use crate::error::DBError;
|
||||
use crate::options;
|
||||
use crate::protocol::Protocol;
|
||||
use crate::storage::Storage;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Server {
|
||||
pub db_cache: std::sync::Arc<std::sync::RwLock<HashMap<u64, Arc<Storage>>>>,
|
||||
pub option: options::DBOption,
|
||||
pub client_name: Option<String>,
|
||||
pub selected_db: u64, // Changed from usize to u64
|
||||
pub queued_cmd: Option<Vec<(Cmd, Protocol)>>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub async fn new(option: options::DBOption) -> Self {
|
||||
Server {
|
||||
db_cache: Arc::new(std::sync::RwLock::new(HashMap::new())),
|
||||
option,
|
||||
client_name: None,
|
||||
selected_db: 0,
|
||||
queued_cmd: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn current_storage(&self) -> Result<Arc<Storage>, DBError> {
|
||||
let mut cache = self.db_cache.write().unwrap();
|
||||
|
||||
if let Some(storage) = cache.get(&self.selected_db) {
|
||||
return Ok(storage.clone());
|
||||
}
|
||||
|
||||
|
||||
// Create new database file
|
||||
let db_file_path = std::path::PathBuf::from(self.option.dir.clone())
|
||||
.join(format!("{}.db", self.selected_db));
|
||||
|
||||
// Ensure the directory exists before creating the database file
|
||||
if let Some(parent_dir) = db_file_path.parent() {
|
||||
std::fs::create_dir_all(parent_dir).map_err(|e| {
|
||||
DBError(format!("Failed to create directory {}: {}", parent_dir.display(), e))
|
||||
})?;
|
||||
}
|
||||
|
||||
println!("Creating new db file: {}", db_file_path.display());
|
||||
|
||||
let storage = Arc::new(Storage::new(
|
||||
db_file_path,
|
||||
self.should_encrypt_db(self.selected_db),
|
||||
self.option.encryption_key.as_deref()
|
||||
)?);
|
||||
|
||||
cache.insert(self.selected_db, storage.clone());
|
||||
Ok(storage)
|
||||
}
|
||||
|
||||
fn should_encrypt_db(&self, db_index: u64) -> bool {
|
||||
// DB 0-9 are non-encrypted, DB 10+ are encrypted
|
||||
self.option.encrypt && db_index >= 10
|
||||
}
|
||||
|
||||
pub async fn handle(
|
||||
&mut self,
|
||||
mut stream: tokio::net::TcpStream,
|
||||
) -> Result<(), DBError> {
|
||||
let mut buf = [0; 512];
|
||||
|
||||
loop {
|
||||
let len = match stream.read(&mut buf).await {
|
||||
Ok(0) => {
|
||||
println!("[handle] connection closed");
|
||||
return Ok(());
|
||||
}
|
||||
Ok(len) => len,
|
||||
Err(e) => {
|
||||
println!("[handle] read error: {:?}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
|
||||
let mut s = str::from_utf8(&buf[..len])?;
|
||||
while !s.is_empty() {
|
||||
let (cmd, protocol, remaining) = match Cmd::from(s) {
|
||||
Ok((cmd, protocol, remaining)) => (cmd, protocol, remaining),
|
||||
Err(e) => {
|
||||
println!("\x1b[31;1mprotocol error: {:?}\x1b[0m", e);
|
||||
(Cmd::Unknow("protocol_error".to_string()), Protocol::err(&format!("protocol error: {}", e.0)), "")
|
||||
}
|
||||
};
|
||||
s = remaining;
|
||||
|
||||
if self.option.debug {
|
||||
println!("\x1b[34;1mgot command: {:?}, protocol: {:?}\x1b[0m", cmd, protocol);
|
||||
} else {
|
||||
println!("got command: {:?}, protocol: {:?}", cmd, protocol);
|
||||
}
|
||||
|
||||
// Check if this is a QUIT command before processing
|
||||
let is_quit = matches!(cmd, Cmd::Quit);
|
||||
|
||||
let res = match cmd.run(self).await {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
if self.option.debug {
|
||||
eprintln!("[run error] {:?}", e);
|
||||
}
|
||||
Protocol::err(&format!("ERR {}", e.0))
|
||||
}
|
||||
};
|
||||
|
||||
if self.option.debug {
|
||||
println!("\x1b[34;1mqueued cmd {:?}\x1b[0m", self.queued_cmd);
|
||||
println!("\x1b[32;1mgoing to send response {}\x1b[0m", res.encode());
|
||||
} else {
|
||||
print!("queued cmd {:?}", self.queued_cmd);
|
||||
println!("going to send response {}", res.encode());
|
||||
}
|
||||
|
||||
_ = stream.write(res.encode().as_bytes()).await?;
|
||||
|
||||
// If this was a QUIT command, close the connection
|
||||
if is_quit {
|
||||
println!("[handle] QUIT command received, closing connection");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,126 +0,0 @@
|
||||
use std::{
|
||||
path::Path,
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use redb::{Database, TableDefinition};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::crypto::CryptoFactory;
|
||||
use crate::error::DBError;
|
||||
|
||||
// Re-export modules
|
||||
mod storage_basic;
|
||||
mod storage_hset;
|
||||
mod storage_lists;
|
||||
mod storage_extra;
|
||||
|
||||
// Re-export implementations
|
||||
// Note: These imports are used by the impl blocks in the submodules
|
||||
// The compiler shows them as unused because they're not directly used in this file
|
||||
// but they're needed for the Storage struct methods to be available
|
||||
pub use storage_extra::*;
|
||||
|
||||
// Table definitions for different Redis data types
|
||||
const TYPES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("types");
|
||||
const STRINGS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("strings");
|
||||
const HASHES_TABLE: TableDefinition<(&str, &str), &[u8]> = TableDefinition::new("hashes");
|
||||
const LISTS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("lists");
|
||||
const STREAMS_META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("streams_meta");
|
||||
const STREAMS_DATA_TABLE: TableDefinition<(&str, &str), &[u8]> = TableDefinition::new("streams_data");
|
||||
const ENCRYPTED_TABLE: TableDefinition<&str, u8> = TableDefinition::new("encrypted");
|
||||
const EXPIRATION_TABLE: TableDefinition<&str, u64> = TableDefinition::new("expiration");
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct StreamEntry {
|
||||
pub fields: Vec<(String, String)>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct ListValue {
|
||||
pub elements: Vec<String>,
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn now_in_millis() -> u128 {
|
||||
let start = SystemTime::now();
|
||||
let duration_since_epoch = start.duration_since(UNIX_EPOCH).unwrap();
|
||||
duration_since_epoch.as_millis()
|
||||
}
|
||||
|
||||
pub struct Storage {
|
||||
db: Database,
|
||||
crypto: Option<CryptoFactory>,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub fn new(path: impl AsRef<Path>, should_encrypt: bool, master_key: Option<&str>) -> Result<Self, DBError> {
|
||||
let db = Database::create(path)?;
|
||||
|
||||
// Create tables if they don't exist
|
||||
let write_txn = db.begin_write()?;
|
||||
{
|
||||
let _ = write_txn.open_table(TYPES_TABLE)?;
|
||||
let _ = write_txn.open_table(STRINGS_TABLE)?;
|
||||
let _ = write_txn.open_table(HASHES_TABLE)?;
|
||||
let _ = write_txn.open_table(LISTS_TABLE)?;
|
||||
let _ = write_txn.open_table(STREAMS_META_TABLE)?;
|
||||
let _ = write_txn.open_table(STREAMS_DATA_TABLE)?;
|
||||
let _ = write_txn.open_table(ENCRYPTED_TABLE)?;
|
||||
let _ = write_txn.open_table(EXPIRATION_TABLE)?;
|
||||
}
|
||||
write_txn.commit()?;
|
||||
|
||||
// Check if database was previously encrypted
|
||||
let read_txn = db.begin_read()?;
|
||||
let encrypted_table = read_txn.open_table(ENCRYPTED_TABLE)?;
|
||||
let was_encrypted = encrypted_table.get("encrypted")?.map(|v| v.value() == 1).unwrap_or(false);
|
||||
drop(read_txn);
|
||||
|
||||
let crypto = if should_encrypt || was_encrypted {
|
||||
if let Some(key) = master_key {
|
||||
Some(CryptoFactory::new(key.as_bytes()))
|
||||
} else {
|
||||
return Err(DBError("Encryption requested but no master key provided".to_string()));
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// If we're enabling encryption for the first time, mark it
|
||||
if should_encrypt && !was_encrypted {
|
||||
let write_txn = db.begin_write()?;
|
||||
{
|
||||
let mut encrypted_table = write_txn.open_table(ENCRYPTED_TABLE)?;
|
||||
encrypted_table.insert("encrypted", &1u8)?;
|
||||
}
|
||||
write_txn.commit()?;
|
||||
}
|
||||
|
||||
Ok(Storage {
|
||||
db,
|
||||
crypto,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_encrypted(&self) -> bool {
|
||||
self.crypto.is_some()
|
||||
}
|
||||
|
||||
// Helper methods for encryption
|
||||
fn encrypt_if_needed(&self, data: &[u8]) -> Result<Vec<u8>, DBError> {
|
||||
if let Some(crypto) = &self.crypto {
|
||||
Ok(crypto.encrypt(data))
|
||||
} else {
|
||||
Ok(data.to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
fn decrypt_if_needed(&self, data: &[u8]) -> Result<Vec<u8>, DBError> {
|
||||
if let Some(crypto) = &self.crypto {
|
||||
Ok(crypto.decrypt(data)?)
|
||||
} else {
|
||||
Ok(data.to_vec())
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,218 +0,0 @@
|
||||
use redb::{ReadableTable};
|
||||
use crate::error::DBError;
|
||||
use super::*;
|
||||
|
||||
impl Storage {
|
||||
pub fn flushdb(&self) -> Result<(), DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
{
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
let mut strings_table = write_txn.open_table(STRINGS_TABLE)?;
|
||||
let mut hashes_table = write_txn.open_table(HASHES_TABLE)?;
|
||||
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
let mut streams_meta_table = write_txn.open_table(STREAMS_META_TABLE)?;
|
||||
let mut streams_data_table = write_txn.open_table(STREAMS_DATA_TABLE)?;
|
||||
let mut expiration_table = write_txn.open_table(EXPIRATION_TABLE)?;
|
||||
|
||||
// inefficient, but there is no other way
|
||||
let keys: Vec<String> = types_table.iter()?.map(|item| item.unwrap().0.value().to_string()).collect();
|
||||
for key in keys {
|
||||
types_table.remove(key.as_str())?;
|
||||
}
|
||||
let keys: Vec<String> = strings_table.iter()?.map(|item| item.unwrap().0.value().to_string()).collect();
|
||||
for key in keys {
|
||||
strings_table.remove(key.as_str())?;
|
||||
}
|
||||
let keys: Vec<(String, String)> = hashes_table
|
||||
.iter()?
|
||||
.map(|item| {
|
||||
let binding = item.unwrap();
|
||||
let (k, f) = binding.0.value();
|
||||
(k.to_string(), f.to_string())
|
||||
})
|
||||
.collect();
|
||||
for (key, field) in keys {
|
||||
hashes_table.remove((key.as_str(), field.as_str()))?;
|
||||
}
|
||||
let keys: Vec<String> = lists_table.iter()?.map(|item| item.unwrap().0.value().to_string()).collect();
|
||||
for key in keys {
|
||||
lists_table.remove(key.as_str())?;
|
||||
}
|
||||
let keys: Vec<String> = streams_meta_table.iter()?.map(|item| item.unwrap().0.value().to_string()).collect();
|
||||
for key in keys {
|
||||
streams_meta_table.remove(key.as_str())?;
|
||||
}
|
||||
let keys: Vec<(String,String)> = streams_data_table.iter()?.map(|item| {
|
||||
let binding = item.unwrap();
|
||||
let (key, field) = binding.0.value();
|
||||
(key.to_string(), field.to_string())
|
||||
}).collect();
|
||||
for (key, field) in keys {
|
||||
streams_data_table.remove((key.as_str(), field.as_str()))?;
|
||||
}
|
||||
let keys: Vec<String> = expiration_table.iter()?.map(|item| item.unwrap().0.value().to_string()).collect();
|
||||
for key in keys {
|
||||
expiration_table.remove(key.as_str())?;
|
||||
}
|
||||
}
|
||||
write_txn.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_key_type(&self, key: &str) -> Result<Option<String>, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
// Before returning type, check for expiration
|
||||
if let Some(type_val) = table.get(key)? {
|
||||
if type_val.value() == "string" {
|
||||
let expiration_table = read_txn.open_table(EXPIRATION_TABLE)?;
|
||||
if let Some(expires_at) = expiration_table.get(key)? {
|
||||
if now_in_millis() > expires_at.value() as u128 {
|
||||
// The key is expired, so it effectively has no type
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Some(type_val.value().to_string()))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Value is encrypted/decrypted
|
||||
pub fn get(&self, key: &str) -> Result<Option<String>, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "string" => {
|
||||
// Check expiration first (unencrypted)
|
||||
let expiration_table = read_txn.open_table(EXPIRATION_TABLE)?;
|
||||
if let Some(expires_at) = expiration_table.get(key)? {
|
||||
if now_in_millis() > expires_at.value() as u128 {
|
||||
drop(read_txn);
|
||||
self.del(key.to_string())?;
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
// Get and decrypt value
|
||||
let strings_table = read_txn.open_table(STRINGS_TABLE)?;
|
||||
match strings_table.get(key)? {
|
||||
Some(data) => {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
let value = String::from_utf8(decrypted)?;
|
||||
Ok(Some(value))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Value is encrypted before storage
|
||||
pub fn set(&self, key: String, value: String) -> Result<(), DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
|
||||
{
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
types_table.insert(key.as_str(), "string")?;
|
||||
|
||||
let mut strings_table = write_txn.open_table(STRINGS_TABLE)?;
|
||||
// Only encrypt the value, not expiration
|
||||
let encrypted = self.encrypt_if_needed(value.as_bytes())?;
|
||||
strings_table.insert(key.as_str(), encrypted.as_slice())?;
|
||||
|
||||
// Remove any existing expiration since this is a regular SET
|
||||
let mut expiration_table = write_txn.open_table(EXPIRATION_TABLE)?;
|
||||
expiration_table.remove(key.as_str())?;
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Value is encrypted before storage
|
||||
pub fn setx(&self, key: String, value: String, expire_ms: u128) -> Result<(), DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
|
||||
{
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
types_table.insert(key.as_str(), "string")?;
|
||||
|
||||
let mut strings_table = write_txn.open_table(STRINGS_TABLE)?;
|
||||
// Only encrypt the value
|
||||
let encrypted = self.encrypt_if_needed(value.as_bytes())?;
|
||||
strings_table.insert(key.as_str(), encrypted.as_slice())?;
|
||||
|
||||
// Store expiration separately (unencrypted)
|
||||
let mut expiration_table = write_txn.open_table(EXPIRATION_TABLE)?;
|
||||
let expires_at = expire_ms + now_in_millis();
|
||||
expiration_table.insert(key.as_str(), &(expires_at as u64))?;
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn del(&self, key: String) -> Result<(), DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
|
||||
{
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
let mut strings_table = write_txn.open_table(STRINGS_TABLE)?;
|
||||
let mut hashes_table: redb::Table<(&str, &str), &[u8]> = write_txn.open_table(HASHES_TABLE)?;
|
||||
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
|
||||
// Remove from type table
|
||||
types_table.remove(key.as_str())?;
|
||||
|
||||
// Remove from strings table
|
||||
strings_table.remove(key.as_str())?;
|
||||
|
||||
// Remove all hash fields for this key
|
||||
let mut to_remove = Vec::new();
|
||||
let mut iter = hashes_table.iter()?;
|
||||
while let Some(entry) = iter.next() {
|
||||
let entry = entry?;
|
||||
let (hash_key, field) = entry.0.value();
|
||||
if hash_key == key.as_str() {
|
||||
to_remove.push((hash_key.to_string(), field.to_string()));
|
||||
}
|
||||
}
|
||||
drop(iter);
|
||||
|
||||
for (hash_key, field) in to_remove {
|
||||
hashes_table.remove((hash_key.as_str(), field.as_str()))?;
|
||||
}
|
||||
|
||||
// Remove from lists table
|
||||
lists_table.remove(key.as_str())?;
|
||||
|
||||
// Also remove expiration
|
||||
let mut expiration_table = write_txn.open_table(EXPIRATION_TABLE)?;
|
||||
expiration_table.remove(key.as_str())?;
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn keys(&self, pattern: &str) -> Result<Vec<String>, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
let mut keys = Vec::new();
|
||||
let mut iter = table.iter()?;
|
||||
while let Some(entry) = iter.next() {
|
||||
let key = entry?.0.value().to_string();
|
||||
if pattern == "*" || super::storage_extra::glob_match(pattern, &key) {
|
||||
keys.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(keys)
|
||||
}
|
||||
}
|
@@ -1,168 +0,0 @@
|
||||
use redb::{ReadableTable};
|
||||
use crate::error::DBError;
|
||||
use super::*;
|
||||
|
||||
impl Storage {
|
||||
// ✅ ENCRYPTION APPLIED: Values are decrypted after retrieval
|
||||
pub fn scan(&self, cursor: u64, pattern: Option<&str>, count: Option<u64>) -> Result<(u64, Vec<(String, String)>), DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
let strings_table = read_txn.open_table(STRINGS_TABLE)?;
|
||||
|
||||
let mut result = Vec::new();
|
||||
let mut current_cursor = 0u64;
|
||||
let limit = count.unwrap_or(10) as usize;
|
||||
|
||||
let mut iter = types_table.iter()?;
|
||||
while let Some(entry) = iter.next() {
|
||||
let entry = entry?;
|
||||
let key = entry.0.value().to_string();
|
||||
let key_type = entry.1.value().to_string();
|
||||
|
||||
if current_cursor >= cursor {
|
||||
// Apply pattern matching if specified
|
||||
let matches = if let Some(pat) = pattern {
|
||||
glob_match(pat, &key)
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
if matches {
|
||||
// For scan, we return key-value pairs for string types
|
||||
if key_type == "string" {
|
||||
if let Some(data) = strings_table.get(key.as_str())? {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
let value = String::from_utf8(decrypted)?;
|
||||
result.push((key, value));
|
||||
} else {
|
||||
result.push((key, String::new()));
|
||||
}
|
||||
} else {
|
||||
// For non-string types, just return the key with type as value
|
||||
result.push((key, key_type));
|
||||
}
|
||||
|
||||
if result.len() >= limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
current_cursor += 1;
|
||||
}
|
||||
|
||||
let next_cursor = if result.len() < limit { 0 } else { current_cursor };
|
||||
Ok((next_cursor, result))
|
||||
}
|
||||
|
||||
pub fn ttl(&self, key: &str) -> Result<i64, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "string" => {
|
||||
let expiration_table = read_txn.open_table(EXPIRATION_TABLE)?;
|
||||
match expiration_table.get(key)? {
|
||||
Some(expires_at) => {
|
||||
let now = now_in_millis();
|
||||
let expires_at_ms = expires_at.value() as u128;
|
||||
if now >= expires_at_ms {
|
||||
Ok(-2) // Key has expired
|
||||
} else {
|
||||
Ok(((expires_at_ms - now) / 1000) as i64) // TTL in seconds
|
||||
}
|
||||
}
|
||||
None => Ok(-1), // Key exists but has no expiration
|
||||
}
|
||||
}
|
||||
Some(_) => Ok(-1), // Key exists but is not a string (no expiration support for other types)
|
||||
None => Ok(-2), // Key does not exist
|
||||
}
|
||||
}
|
||||
|
||||
pub fn exists(&self, key: &str) -> Result<bool, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "string" => {
|
||||
// Check if string key has expired
|
||||
let expiration_table = read_txn.open_table(EXPIRATION_TABLE)?;
|
||||
if let Some(expires_at) = expiration_table.get(key)? {
|
||||
if now_in_millis() > expires_at.value() as u128 {
|
||||
return Ok(false); // Key has expired
|
||||
}
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
Some(_) => Ok(true), // Key exists and is not a string
|
||||
None => Ok(false), // Key does not exist
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Utility function for glob pattern matching
|
||||
pub fn glob_match(pattern: &str, text: &str) -> bool {
|
||||
if pattern == "*" {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Simple glob matching - supports * and ? wildcards
|
||||
let pattern_chars: Vec<char> = pattern.chars().collect();
|
||||
let text_chars: Vec<char> = text.chars().collect();
|
||||
|
||||
fn match_recursive(pattern: &[char], text: &[char], pi: usize, ti: usize) -> bool {
|
||||
if pi >= pattern.len() {
|
||||
return ti >= text.len();
|
||||
}
|
||||
|
||||
if ti >= text.len() {
|
||||
// Check if remaining pattern is all '*'
|
||||
return pattern[pi..].iter().all(|&c| c == '*');
|
||||
}
|
||||
|
||||
match pattern[pi] {
|
||||
'*' => {
|
||||
// Try matching zero or more characters
|
||||
for i in ti..=text.len() {
|
||||
if match_recursive(pattern, text, pi + 1, i) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
'?' => {
|
||||
// Match exactly one character
|
||||
match_recursive(pattern, text, pi + 1, ti + 1)
|
||||
}
|
||||
c => {
|
||||
// Match exact character
|
||||
if text[ti] == c {
|
||||
match_recursive(pattern, text, pi + 1, ti + 1)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match_recursive(&pattern_chars, &text_chars, 0, 0)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_glob_match() {
|
||||
assert!(glob_match("*", "anything"));
|
||||
assert!(glob_match("hello", "hello"));
|
||||
assert!(!glob_match("hello", "world"));
|
||||
assert!(glob_match("h*o", "hello"));
|
||||
assert!(glob_match("h*o", "ho"));
|
||||
assert!(!glob_match("h*o", "hi"));
|
||||
assert!(glob_match("h?llo", "hello"));
|
||||
assert!(!glob_match("h?llo", "hllo"));
|
||||
assert!(glob_match("*test*", "this_is_a_test_string"));
|
||||
assert!(!glob_match("*test*", "this_is_a_string"));
|
||||
}
|
||||
}
|
@@ -1,318 +0,0 @@
|
||||
use redb::{ReadableTable};
|
||||
use crate::error::DBError;
|
||||
use super::*;
|
||||
|
||||
impl Storage {
|
||||
// ✅ ENCRYPTION APPLIED: Values are encrypted before storage
|
||||
pub fn hset(&self, key: &str, pairs: Vec<(String, String)>) -> Result<i64, DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
let mut new_fields = 0i64;
|
||||
|
||||
{
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
let mut hashes_table = write_txn.open_table(HASHES_TABLE)?;
|
||||
|
||||
// Set the type to hash
|
||||
types_table.insert(key, "hash")?;
|
||||
|
||||
for (field, value) in pairs {
|
||||
// Check if field already exists
|
||||
let exists = hashes_table.get((key, field.as_str()))?.is_some();
|
||||
|
||||
// Encrypt the value before storing
|
||||
let encrypted = self.encrypt_if_needed(value.as_bytes())?;
|
||||
hashes_table.insert((key, field.as_str()), encrypted.as_slice())?;
|
||||
|
||||
if !exists {
|
||||
new_fields += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(new_fields)
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Value is decrypted after retrieval
|
||||
pub fn hget(&self, key: &str, field: &str) -> Result<Option<String>, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "hash" => {
|
||||
let hashes_table = read_txn.open_table(HASHES_TABLE)?;
|
||||
match hashes_table.get((key, field))? {
|
||||
Some(data) => {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
let value = String::from_utf8(decrypted)?;
|
||||
Ok(Some(value))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: All values are decrypted after retrieval
|
||||
pub fn hgetall(&self, key: &str) -> Result<Vec<(String, String)>, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "hash" => {
|
||||
let hashes_table = read_txn.open_table(HASHES_TABLE)?;
|
||||
let mut result = Vec::new();
|
||||
|
||||
let mut iter = hashes_table.iter()?;
|
||||
while let Some(entry) = iter.next() {
|
||||
let entry = entry?;
|
||||
let (hash_key, field) = entry.0.value();
|
||||
if hash_key == key {
|
||||
let decrypted = self.decrypt_if_needed(entry.1.value())?;
|
||||
let value = String::from_utf8(decrypted)?;
|
||||
result.push((field.to_string(), value));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
_ => Ok(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn hdel(&self, key: &str, fields: Vec<String>) -> Result<i64, DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
let mut deleted = 0i64;
|
||||
|
||||
// First check if key exists and is a hash
|
||||
let is_hash = {
|
||||
let types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
let result = match types_table.get(key)? {
|
||||
Some(type_val) => type_val.value() == "hash",
|
||||
None => false,
|
||||
};
|
||||
result
|
||||
};
|
||||
|
||||
if is_hash {
|
||||
let mut hashes_table = write_txn.open_table(HASHES_TABLE)?;
|
||||
|
||||
for field in fields {
|
||||
if hashes_table.remove((key, field.as_str()))?.is_some() {
|
||||
deleted += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if hash is now empty and remove type if so
|
||||
let mut has_fields = false;
|
||||
let mut iter = hashes_table.iter()?;
|
||||
while let Some(entry) = iter.next() {
|
||||
let entry = entry?;
|
||||
let (hash_key, _) = entry.0.value();
|
||||
if hash_key == key {
|
||||
has_fields = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
drop(iter);
|
||||
|
||||
if !has_fields {
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
types_table.remove(key)?;
|
||||
}
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(deleted)
|
||||
}
|
||||
|
||||
pub fn hexists(&self, key: &str, field: &str) -> Result<bool, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "hash" => {
|
||||
let hashes_table = read_txn.open_table(HASHES_TABLE)?;
|
||||
Ok(hashes_table.get((key, field))?.is_some())
|
||||
}
|
||||
_ => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn hkeys(&self, key: &str) -> Result<Vec<String>, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "hash" => {
|
||||
let hashes_table = read_txn.open_table(HASHES_TABLE)?;
|
||||
let mut result = Vec::new();
|
||||
|
||||
let mut iter = hashes_table.iter()?;
|
||||
while let Some(entry) = iter.next() {
|
||||
let entry = entry?;
|
||||
let (hash_key, field) = entry.0.value();
|
||||
if hash_key == key {
|
||||
result.push(field.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
_ => Ok(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: All values are decrypted after retrieval
|
||||
pub fn hvals(&self, key: &str) -> Result<Vec<String>, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "hash" => {
|
||||
let hashes_table = read_txn.open_table(HASHES_TABLE)?;
|
||||
let mut result = Vec::new();
|
||||
|
||||
let mut iter = hashes_table.iter()?;
|
||||
while let Some(entry) = iter.next() {
|
||||
let entry = entry?;
|
||||
let (hash_key, _) = entry.0.value();
|
||||
if hash_key == key {
|
||||
let decrypted = self.decrypt_if_needed(entry.1.value())?;
|
||||
let value = String::from_utf8(decrypted)?;
|
||||
result.push(value);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
_ => Ok(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn hlen(&self, key: &str) -> Result<i64, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "hash" => {
|
||||
let hashes_table = read_txn.open_table(HASHES_TABLE)?;
|
||||
let mut count = 0i64;
|
||||
|
||||
let mut iter = hashes_table.iter()?;
|
||||
while let Some(entry) = iter.next() {
|
||||
let entry = entry?;
|
||||
let (hash_key, _) = entry.0.value();
|
||||
if hash_key == key {
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
_ => Ok(0),
|
||||
}
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Values are decrypted after retrieval
|
||||
pub fn hmget(&self, key: &str, fields: Vec<String>) -> Result<Vec<Option<String>>, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "hash" => {
|
||||
let hashes_table = read_txn.open_table(HASHES_TABLE)?;
|
||||
let mut result = Vec::new();
|
||||
|
||||
for field in fields {
|
||||
match hashes_table.get((key, field.as_str()))? {
|
||||
Some(data) => {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
let value = String::from_utf8(decrypted)?;
|
||||
result.push(Some(value));
|
||||
}
|
||||
None => result.push(None),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
_ => Ok(fields.into_iter().map(|_| None).collect()),
|
||||
}
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Value is encrypted before storage
|
||||
pub fn hsetnx(&self, key: &str, field: &str, value: &str) -> Result<bool, DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
let mut result = false;
|
||||
|
||||
{
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
let mut hashes_table = write_txn.open_table(HASHES_TABLE)?;
|
||||
|
||||
// Check if field already exists
|
||||
if hashes_table.get((key, field))?.is_none() {
|
||||
// Set the type to hash
|
||||
types_table.insert(key, "hash")?;
|
||||
|
||||
// Encrypt the value before storing
|
||||
let encrypted = self.encrypt_if_needed(value.as_bytes())?;
|
||||
hashes_table.insert((key, field), encrypted.as_slice())?;
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Values are decrypted after retrieval
|
||||
pub fn hscan(&self, key: &str, cursor: u64, pattern: Option<&str>, count: Option<u64>) -> Result<(u64, Vec<(String, String)>), DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "hash" => {
|
||||
let hashes_table = read_txn.open_table(HASHES_TABLE)?;
|
||||
let mut result = Vec::new();
|
||||
let mut current_cursor = 0u64;
|
||||
let limit = count.unwrap_or(10) as usize;
|
||||
|
||||
let mut iter = hashes_table.iter()?;
|
||||
while let Some(entry) = iter.next() {
|
||||
let entry = entry?;
|
||||
let (hash_key, field) = entry.0.value();
|
||||
|
||||
if hash_key == key {
|
||||
if current_cursor >= cursor {
|
||||
let field_str = field.to_string();
|
||||
|
||||
// Apply pattern matching if specified
|
||||
let matches = if let Some(pat) = pattern {
|
||||
super::storage_extra::glob_match(pat, &field_str)
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
if matches {
|
||||
let decrypted = self.decrypt_if_needed(entry.1.value())?;
|
||||
let value = String::from_utf8(decrypted)?;
|
||||
result.push((field_str, value));
|
||||
|
||||
if result.len() >= limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
current_cursor += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let next_cursor = if result.len() < limit { 0 } else { current_cursor };
|
||||
Ok((next_cursor, result))
|
||||
}
|
||||
_ => Ok((0, Vec::new())),
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,403 +0,0 @@
|
||||
use redb::{ReadableTable};
|
||||
use crate::error::DBError;
|
||||
use super::*;
|
||||
|
||||
impl Storage {
|
||||
// ✅ ENCRYPTION APPLIED: Elements are encrypted before storage
|
||||
pub fn lpush(&self, key: &str, elements: Vec<String>) -> Result<i64, DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
let mut _length = 0i64;
|
||||
|
||||
{
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
|
||||
// Set the type to list
|
||||
types_table.insert(key, "list")?;
|
||||
|
||||
// Get current list or create empty one
|
||||
let mut list: Vec<String> = match lists_table.get(key)? {
|
||||
Some(data) => {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
serde_json::from_slice(&decrypted)?
|
||||
}
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
// Add elements to the front (left)
|
||||
for element in elements.into_iter().rev() {
|
||||
list.insert(0, element);
|
||||
}
|
||||
|
||||
_length = list.len() as i64;
|
||||
|
||||
// Encrypt and store the updated list
|
||||
let serialized = serde_json::to_vec(&list)?;
|
||||
let encrypted = self.encrypt_if_needed(&serialized)?;
|
||||
lists_table.insert(key, encrypted.as_slice())?;
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(_length)
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Elements are encrypted before storage
|
||||
pub fn rpush(&self, key: &str, elements: Vec<String>) -> Result<i64, DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
let mut _length = 0i64;
|
||||
|
||||
{
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
|
||||
// Set the type to list
|
||||
types_table.insert(key, "list")?;
|
||||
|
||||
// Get current list or create empty one
|
||||
let mut list: Vec<String> = match lists_table.get(key)? {
|
||||
Some(data) => {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
serde_json::from_slice(&decrypted)?
|
||||
}
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
// Add elements to the end (right)
|
||||
list.extend(elements);
|
||||
_length = list.len() as i64;
|
||||
|
||||
// Encrypt and store the updated list
|
||||
let serialized = serde_json::to_vec(&list)?;
|
||||
let encrypted = self.encrypt_if_needed(&serialized)?;
|
||||
lists_table.insert(key, encrypted.as_slice())?;
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(_length)
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Elements are decrypted after retrieval and encrypted before storage
|
||||
pub fn lpop(&self, key: &str, count: u64) -> Result<Vec<String>, DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
let mut result = Vec::new();
|
||||
|
||||
// First check if key exists and is a list, and get the data
|
||||
let list_data = {
|
||||
let types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
let lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
|
||||
let result = match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "list" => {
|
||||
if let Some(data) = lists_table.get(key)? {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
let list: Vec<String> = serde_json::from_slice(&decrypted)?;
|
||||
Some(list)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
result
|
||||
};
|
||||
|
||||
if let Some(mut list) = list_data {
|
||||
let pop_count = std::cmp::min(count as usize, list.len());
|
||||
for _ in 0..pop_count {
|
||||
if !list.is_empty() {
|
||||
result.push(list.remove(0));
|
||||
}
|
||||
}
|
||||
|
||||
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
if list.is_empty() {
|
||||
// Remove the key if list is empty
|
||||
lists_table.remove(key)?;
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
types_table.remove(key)?;
|
||||
} else {
|
||||
// Encrypt and store the updated list
|
||||
let serialized = serde_json::to_vec(&list)?;
|
||||
let encrypted = self.encrypt_if_needed(&serialized)?;
|
||||
lists_table.insert(key, encrypted.as_slice())?;
|
||||
}
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Elements are decrypted after retrieval and encrypted before storage
|
||||
pub fn rpop(&self, key: &str, count: u64) -> Result<Vec<String>, DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
let mut result = Vec::new();
|
||||
|
||||
// First check if key exists and is a list, and get the data
|
||||
let list_data = {
|
||||
let types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
let lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
|
||||
let result = match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "list" => {
|
||||
if let Some(data) = lists_table.get(key)? {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
let list: Vec<String> = serde_json::from_slice(&decrypted)?;
|
||||
Some(list)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
result
|
||||
};
|
||||
|
||||
if let Some(mut list) = list_data {
|
||||
let pop_count = std::cmp::min(count as usize, list.len());
|
||||
for _ in 0..pop_count {
|
||||
if !list.is_empty() {
|
||||
result.push(list.pop().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
if list.is_empty() {
|
||||
// Remove the key if list is empty
|
||||
lists_table.remove(key)?;
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
types_table.remove(key)?;
|
||||
} else {
|
||||
// Encrypt and store the updated list
|
||||
let serialized = serde_json::to_vec(&list)?;
|
||||
let encrypted = self.encrypt_if_needed(&serialized)?;
|
||||
lists_table.insert(key, encrypted.as_slice())?;
|
||||
}
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn llen(&self, key: &str) -> Result<i64, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "list" => {
|
||||
let lists_table = read_txn.open_table(LISTS_TABLE)?;
|
||||
match lists_table.get(key)? {
|
||||
Some(data) => {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
let list: Vec<String> = serde_json::from_slice(&decrypted)?;
|
||||
Ok(list.len() as i64)
|
||||
}
|
||||
None => Ok(0),
|
||||
}
|
||||
}
|
||||
_ => Ok(0),
|
||||
}
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Element is decrypted after retrieval
|
||||
pub fn lindex(&self, key: &str, index: i64) -> Result<Option<String>, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "list" => {
|
||||
let lists_table = read_txn.open_table(LISTS_TABLE)?;
|
||||
match lists_table.get(key)? {
|
||||
Some(data) => {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
let list: Vec<String> = serde_json::from_slice(&decrypted)?;
|
||||
|
||||
let actual_index = if index < 0 {
|
||||
list.len() as i64 + index
|
||||
} else {
|
||||
index
|
||||
};
|
||||
|
||||
if actual_index >= 0 && (actual_index as usize) < list.len() {
|
||||
Ok(Some(list[actual_index as usize].clone()))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Elements are decrypted after retrieval
|
||||
pub fn lrange(&self, key: &str, start: i64, stop: i64) -> Result<Vec<String>, DBError> {
|
||||
let read_txn = self.db.begin_read()?;
|
||||
let types_table = read_txn.open_table(TYPES_TABLE)?;
|
||||
|
||||
match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "list" => {
|
||||
let lists_table = read_txn.open_table(LISTS_TABLE)?;
|
||||
match lists_table.get(key)? {
|
||||
Some(data) => {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
let list: Vec<String> = serde_json::from_slice(&decrypted)?;
|
||||
|
||||
if list.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let len = list.len() as i64;
|
||||
let start_idx = if start < 0 { std::cmp::max(0, len + start) } else { std::cmp::min(start, len) };
|
||||
let stop_idx = if stop < 0 { std::cmp::max(-1, len + stop) } else { std::cmp::min(stop, len - 1) };
|
||||
|
||||
if start_idx > stop_idx || start_idx >= len {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let start_usize = start_idx as usize;
|
||||
let stop_usize = (stop_idx + 1) as usize;
|
||||
|
||||
Ok(list[start_usize..std::cmp::min(stop_usize, list.len())].to_vec())
|
||||
}
|
||||
None => Ok(Vec::new()),
|
||||
}
|
||||
}
|
||||
_ => Ok(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Elements are decrypted after retrieval and encrypted before storage
|
||||
pub fn ltrim(&self, key: &str, start: i64, stop: i64) -> Result<(), DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
|
||||
// First check if key exists and is a list, and get the data
|
||||
let list_data = {
|
||||
let types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
let lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
|
||||
let result = match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "list" => {
|
||||
if let Some(data) = lists_table.get(key)? {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
let list: Vec<String> = serde_json::from_slice(&decrypted)?;
|
||||
Some(list)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
result
|
||||
};
|
||||
|
||||
if let Some(list) = list_data {
|
||||
if list.is_empty() {
|
||||
write_txn.commit()?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let len = list.len() as i64;
|
||||
let start_idx = if start < 0 { std::cmp::max(0, len + start) } else { std::cmp::min(start, len) };
|
||||
let stop_idx = if stop < 0 { std::cmp::max(-1, len + stop) } else { std::cmp::min(stop, len - 1) };
|
||||
|
||||
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
if start_idx > stop_idx || start_idx >= len {
|
||||
// Remove the entire list
|
||||
lists_table.remove(key)?;
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
types_table.remove(key)?;
|
||||
} else {
|
||||
let start_usize = start_idx as usize;
|
||||
let stop_usize = (stop_idx + 1) as usize;
|
||||
let trimmed = list[start_usize..std::cmp::min(stop_usize, list.len())].to_vec();
|
||||
|
||||
if trimmed.is_empty() {
|
||||
lists_table.remove(key)?;
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
types_table.remove(key)?;
|
||||
} else {
|
||||
// Encrypt and store the trimmed list
|
||||
let serialized = serde_json::to_vec(&trimmed)?;
|
||||
let encrypted = self.encrypt_if_needed(&serialized)?;
|
||||
lists_table.insert(key, encrypted.as_slice())?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ✅ ENCRYPTION APPLIED: Elements are decrypted after retrieval and encrypted before storage
|
||||
pub fn lrem(&self, key: &str, count: i64, element: &str) -> Result<i64, DBError> {
|
||||
let write_txn = self.db.begin_write()?;
|
||||
let mut removed = 0i64;
|
||||
|
||||
// First check if key exists and is a list, and get the data
|
||||
let list_data = {
|
||||
let types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
let lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
|
||||
let result = match types_table.get(key)? {
|
||||
Some(type_val) if type_val.value() == "list" => {
|
||||
if let Some(data) = lists_table.get(key)? {
|
||||
let decrypted = self.decrypt_if_needed(data.value())?;
|
||||
let list: Vec<String> = serde_json::from_slice(&decrypted)?;
|
||||
Some(list)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
result
|
||||
};
|
||||
|
||||
if let Some(mut list) = list_data {
|
||||
if count == 0 {
|
||||
// Remove all occurrences
|
||||
let original_len = list.len();
|
||||
list.retain(|x| x != element);
|
||||
removed = (original_len - list.len()) as i64;
|
||||
} else if count > 0 {
|
||||
// Remove first count occurrences
|
||||
let mut to_remove = count as usize;
|
||||
list.retain(|x| {
|
||||
if x == element && to_remove > 0 {
|
||||
to_remove -= 1;
|
||||
removed += 1;
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// Remove last |count| occurrences
|
||||
let mut to_remove = (-count) as usize;
|
||||
for i in (0..list.len()).rev() {
|
||||
if list[i] == element && to_remove > 0 {
|
||||
list.remove(i);
|
||||
to_remove -= 1;
|
||||
removed += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
|
||||
if list.is_empty() {
|
||||
lists_table.remove(key)?;
|
||||
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||
types_table.remove(key)?;
|
||||
} else {
|
||||
// Encrypt and store the updated list
|
||||
let serialized = serde_json::to_vec(&list)?;
|
||||
let encrypted = self.encrypt_if_needed(&serialized)?;
|
||||
lists_table.insert(key, encrypted.as_slice())?;
|
||||
}
|
||||
}
|
||||
|
||||
write_txn.commit()?;
|
||||
Ok(removed)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user