diff --git a/Cargo.lock b/Cargo.lock index 4c63baf..db8d2f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -178,6 +178,36 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "futures" version = "0.3.31" @@ -285,18 +315,137 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "icu_collections" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "200072f5d0e3614556f94a9930d5dc3e0662a652823904c3a75dc3b0af7fee47" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cde2700ccaed3872079a65fb1a78f6c0a36c91570f28755dda67bc8f7d9f00a" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "436880e8e18df4d7bbc06d58432329d6458cc84531f7ac5f024e93deadb37979" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" + +[[package]] +name = "icu_properties" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "potential_utf", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632" + +[[package]] +name = "icu_provider" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c80da27b5f4187909049ee2d72f276f0d9f99a42c306bd0131ecfe04d8e5af" +dependencies = [ + "displaydoc", + "icu_locale_core", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + [[package]] name = "libc" version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "litemap" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" + [[package]] name = "lock_api" version = "0.4.12" @@ -375,6 +524,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -387,6 +542,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "potential_utf" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" +dependencies = [ + "zerovec", +] + [[package]] name = "proc-macro2" version = "1.0.86" @@ -414,6 +578,21 @@ dependencies = [ "libc", ] +[[package]] +name = "redis" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +dependencies = [ + "combine", + "itoa", + "percent-encoding", + "ryu", + "sha1_smol", + "socket2 0.4.10", + "url", +] + [[package]] name = "redis-rs" version = "0.0.1" @@ -425,6 +604,7 @@ dependencies = [ "clap", "futures", "redb", + "redis", "serde", "thiserror", "tokio", @@ -445,6 +625,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + [[package]] name = "scopeguard" version = "1.2.0" @@ -471,6 +657,12 @@ dependencies = [ "syn", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -495,6 +687,16 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.7" @@ -505,6 +707,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.11.1" @@ -522,6 +730,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thiserror" version = "1.0.61" @@ -542,6 +761,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tinystr" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tokio" version = "1.38.0" @@ -556,7 +785,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.7", "tokio-macros", "windows-sys 0.48.0", ] @@ -578,6 +807,23 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "url" +version = "2.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -590,6 +836,28 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-sys" version = "0.48.0" @@ -728,3 +996,87 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "writeable" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" + +[[package]] +name = "yoke" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36f0bbd478583f79edad978b407914f61b2972f5af6fa089686016be8f9af595" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7aa2bd55086f1ab526693ecbe444205da57e25f4489879da80635a46d90e73b" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index 6af3454..2ae086f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,5 @@ redb = "2.1.3" serde = { version = "1.0", features = ["derive"] } bincode = "1.3.3" +[dev-dependencies] +redis = "0.24" diff --git a/_config.yml b/_config.yml deleted file mode 100644 index 5b45957..0000000 --- a/_config.yml +++ /dev/null @@ -1,64 +0,0 @@ -title: 从 0 到 1 由 Rust 构建 Redis -description: 从 0 到 1 由 Rust 构建 Redis -theme: just-the-docs - -url: https://fangpin.github.io/redis-rs - -aux_links: - GitHub: https://fangpin.github.io/redis-rs - -# logo: "/assets/images/just-the-docs.png" - -search_enabled: true -search: - # Split pages into sections that can be searched individually - # Supports 1 - 6, default: 2 - heading_level: 2 - # Maximum amount of previews per search result - # Default: 3 - previews: 3 - # Maximum amount of words to display before a matched word in the preview - # Default: 5 - preview_words_before: 5 - # Maximum amount of words to display after a matched word in the preview - # Default: 10 - preview_words_after: 10 - # Set the search token separator - # Default: /[\s\-/]+/ - # Example: enable support for hyphenated search words - tokenizer_separator: /[\s/]+/ - # Display the relative url in search results - # Supports true (default) or false - rel_url: true - # Enable or disable the search button that appears in the bottom right corner of every page - # Supports true or false (default) - button: false - - -# Heading anchor links appear on hover over h1-h6 tags in page content -# allowing users to deep link to a particular heading on a page. -# -# Supports true (default) or false -heading_anchors: true - - -# Footer content -# appears at the bottom of every page's main content -# Note: The footer_content option is deprecated and will be removed in a future major release. Please use `_includes/footer_custom.html` for more robust markup / liquid-based content. -footer_content: "Copyright © 2017-2024 Pin Fang" - -# Footer last edited timestamp -last_edit_timestamp: true # show or hide edit time - page must have `last_modified_date` defined in the frontmatter -last_edit_time_format: "%b %e %Y at %I:%M %p" # uses ruby's time format: https://ruby-doc.org/stdlib-2.7.0/libdoc/time/rdoc/Time.html - - - -# code -compress_html: - ignore: - envs: all - -kramdown: - syntax_highlighter_opts: - block: - line_numbers: true diff --git a/instructions/encrypt.md b/instructions/encrypt.md new file mode 100644 index 0000000..a3c2b5e --- /dev/null +++ b/instructions/encrypt.md @@ -0,0 +1,100 @@ +Perfect — here’s a tiny “factory” you can drop in. + +### Cargo.toml + +```toml +[dependencies] +chacha20poly1305 = { version = "0.10", features = ["xchacha20"] } +rand = "0.8" +sha2 = "0.10" +``` + +### `crypto_factory.rs` + +```rust +use chacha20poly1305::{ + aead::{Aead, KeyInit, OsRng}, + XChaCha20Poly1305, Key, XNonce, +}; +use rand::RngCore; +use sha2::{Digest, Sha256}; + +const VERSION: u8 = 1; +const NONCE_LEN: usize = 24; +const TAG_LEN: usize = 16; + +#[derive(Debug)] +pub enum CryptoError { + Format, // wrong length / header + Version(u8), // unknown version + Decrypt, // wrong key or corrupted data +} + +/// Super-simple factory: new(secret) + encrypt(bytes) + decrypt(bytes) +pub struct CryptoFactory { + key: Key, +} + +impl CryptoFactory { + /// Accepts any secret bytes; turns them into a 32-byte key (SHA-256). + /// (If your secret is already 32 bytes, this is still fine.) + pub fn new>(secret: S) -> Self { + let mut h = Sha256::new(); + h.update(b"xchacha20poly1305-factory:v1"); // domain separation + h.update(secret.as_ref()); + let digest = h.finalize(); // 32 bytes + let key = Key::::from_slice(&digest).to_owned(); + Self { key } + } + + /// Output layout: [version:1][nonce:24][ciphertext||tag] + pub fn encrypt(&self, plaintext: &[u8]) -> Vec { + let cipher = XChaCha20Poly1305::new(&self.key); + + let mut nonce_bytes = [0u8; NONCE_LEN]; + OsRng.fill_bytes(&mut nonce_bytes); + let nonce = XNonce::from_slice(&nonce_bytes); + + let mut out = Vec::with_capacity(1 + NONCE_LEN + plaintext.len() + TAG_LEN); + out.push(VERSION); + out.extend_from_slice(&nonce_bytes); + + let ct = cipher.encrypt(nonce, plaintext).expect("encrypt"); + out.extend_from_slice(&ct); + out + } + + pub fn decrypt(&self, blob: &[u8]) -> Result, CryptoError> { + if blob.len() < 1 + NONCE_LEN + TAG_LEN { + return Err(CryptoError::Format); + } + let ver = blob[0]; + if ver != VERSION { + return Err(CryptoError::Version(ver)); + } + + let nonce = XNonce::from_slice(&blob[1..1 + NONCE_LEN]); + let ct = &blob[1 + NONCE_LEN..]; + + let cipher = XChaCha20Poly1305::new(&self.key); + cipher.decrypt(nonce, ct).map_err(|_| CryptoError::Decrypt) + } +} +``` + +### Tiny usage example + +```rust +fn main() { + let f = CryptoFactory::new(b"super-secret-key-material"); + let val = b"\x00\xFFbinary\x01\x02\x03"; + + let blob = f.encrypt(val); + let roundtrip = f.decrypt(&blob).unwrap(); + + assert_eq!(roundtrip, val); +} +``` + +That’s it: `new(secret)`, `encrypt(bytes)`, `decrypt(bytes)`. +You can stash the returned `blob` directly in your storage layer behind Redis. diff --git a/run_tests.sh b/run_tests.sh new file mode 100755 index 0000000..160e162 --- /dev/null +++ b/run_tests.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +echo "🧪 Running HeroDB Redis Compatibility Tests" +echo "==========================================" + +echo "" +echo "1️⃣ Running Simple Redis Tests (4 tests)..." +echo "----------------------------------------------" +cargo test --test simple_redis_test -- --nocapture + +echo "" +echo "2️⃣ Running Comprehensive Redis Integration Tests (13 tests)..." +echo "----------------------------------------------------------------" +cargo test --test redis_integration_tests -- --nocapture + +echo "" +echo "3️⃣ Running All Tests..." +echo "------------------------" +cargo test -- --nocapture + +echo "" +echo "✅ Test execution completed!" \ No newline at end of file diff --git a/src/cmd.rs b/src/cmd.rs index fcaa71e..3dd26e9 100644 --- a/src/cmd.rs +++ b/src/cmd.rs @@ -28,7 +28,11 @@ pub enum Cmd { HLen(String), HMGet(String, Vec), HSetNx(String, String, String), + HScan(String, u64, Option, Option), // key, cursor, pattern, count Scan(u64, Option, Option), // cursor, pattern, count + Ttl(String), + Exists(String), + Quit, Unknow, } @@ -117,7 +121,7 @@ impl Cmd { } let mut pairs = Vec::new(); let mut i = 2; - while i < cmd.len() - 1 { + while i + 1 < cmd.len() { pairs.push((cmd[i].clone(), cmd[i + 1].clone())); i += 2; } @@ -177,6 +181,44 @@ impl Cmd { } Cmd::HSetNx(cmd[1].clone(), cmd[2].clone(), cmd[3].clone()) } + "hscan" => { + if cmd.len() < 3 { + return Err(DBError(format!("wrong number of arguments for HSCAN command"))); + } + + let key = cmd[1].clone(); + let cursor = cmd[2].parse::().map_err(|_| + DBError("ERR invalid cursor".to_string()))?; + + let mut pattern = None; + let mut count = None; + let mut i = 3; + + while i < cmd.len() { + match cmd[i].to_lowercase().as_str() { + "match" => { + if i + 1 >= cmd.len() { + return Err(DBError("ERR syntax error".to_string())); + } + pattern = Some(cmd[i + 1].clone()); + i += 2; + } + "count" => { + if i + 1 >= cmd.len() { + return Err(DBError("ERR syntax error".to_string())); + } + count = Some(cmd[i + 1].parse::().map_err(|_| + DBError("ERR value is not an integer or out of range".to_string()))?); + i += 2; + } + _ => { + return Err(DBError(format!("ERR syntax error"))); + } + } + } + + Cmd::HScan(key, cursor, pattern, count) + } "scan" => { if cmd.len() < 2 { return Err(DBError(format!("wrong number of arguments for SCAN command"))); @@ -214,6 +256,24 @@ impl Cmd { Cmd::Scan(cursor, pattern, count) } + "ttl" => { + if cmd.len() != 2 { + return Err(DBError(format!("wrong number of arguments for TTL command"))); + } + Cmd::Ttl(cmd[1].clone()) + } + "exists" => { + if cmd.len() != 2 { + return Err(DBError(format!("wrong number of arguments for EXISTS command"))); + } + Cmd::Exists(cmd[1].clone()) + } + "quit" => { + if cmd.len() != 1 { + return Err(DBError(format!("wrong number of arguments for QUIT command"))); + } + Cmd::Quit + } _ => Cmd::Unknow, }, protocol.0, @@ -282,7 +342,11 @@ impl Cmd { Cmd::HLen(key) => hlen_cmd(server, key).await, Cmd::HMGet(key, fields) => hmget_cmd(server, key, fields).await, Cmd::HSetNx(key, field, value) => hsetnx_cmd(server, key, field, value).await, + Cmd::HScan(key, cursor, pattern, count) => hscan_cmd(server, key, cursor, pattern.as_deref(), count).await, Cmd::Scan(cursor, pattern, count) => scan_cmd(server, cursor, pattern.as_deref(), count).await, + Cmd::Ttl(key) => ttl_cmd(server, key).await, + Cmd::Exists(key) => exists_cmd(server, key).await, + Cmd::Quit => Ok(Protocol::SimpleString("OK".to_string())), Cmd::Unknow => Ok(Protocol::err("unknown cmd")), } } @@ -332,7 +396,11 @@ fn config_get_cmd(name: &String, server: &Server) -> Result { Protocol::BulkString(name.clone()), Protocol::BulkString("herodb.redb".to_string()), ])), - _ => Err(DBError(format!("unsupported config {:?}", name))), + "databases" => Ok(Protocol::Array(vec![ + Protocol::BulkString(name.clone()), + Protocol::BulkString("16".to_string()), + ])), + _ => Ok(Protocol::Array(vec![])), // Return empty array for unknown configs instead of error } } @@ -497,3 +565,31 @@ async fn scan_cmd(server: &Server, cursor: &u64, pattern: Option<&str>, count: & Err(e) => Ok(Protocol::err(&e.0)), } } + +async fn hscan_cmd(server: &Server, key: &str, cursor: &u64, pattern: Option<&str>, count: &Option) -> Result { + match server.storage.hscan(key, *cursor, pattern, *count) { + Ok((next_cursor, fields)) => { + let mut result = Vec::new(); + result.push(Protocol::BulkString(next_cursor.to_string())); + result.push(Protocol::Array( + fields.into_iter().map(Protocol::BulkString).collect(), + )); + Ok(Protocol::Array(result)) + } + Err(e) => Ok(Protocol::err(&e.0)), + } +} + +async fn ttl_cmd(server: &Server, key: &str) -> Result { + match server.storage.ttl(key) { + Ok(ttl) => Ok(Protocol::SimpleString(ttl.to_string())), + Err(e) => Ok(Protocol::err(&e.0)), + } +} + +async fn exists_cmd(server: &Server, key: &str) -> Result { + match server.storage.exists(key) { + Ok(exists) => Ok(Protocol::SimpleString(if exists { "1" } else { "0" }.to_string())), + Err(e) => Ok(Protocol::err(&e.0)), + } +} diff --git a/src/lib.rs b/src/lib.rs index cc46e75..ad0f3c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ -mod cmd; +pub mod cmd; pub mod error; pub mod options; -mod protocol; +pub mod protocol; pub mod server; -mod storage; +pub mod storage; diff --git a/src/server.rs b/src/server.rs index e464149..3c16c72 100644 --- a/src/server.rs +++ b/src/server.rs @@ -50,6 +50,9 @@ impl Server { Cmd::from(s).unwrap_or((Cmd::Unknow, Protocol::err("unknow cmd"))); println!("got command: {:?}, protocol: {:?}", cmd, protocol); + // Check if this is a QUIT command before processing + let is_quit = matches!(cmd, Cmd::Quit); + let res = cmd .run(self, protocol, &mut queued_cmd) .await @@ -58,6 +61,12 @@ impl Server { println!("going to send response {}", res.encode()); _ = stream.write(res.encode().as_bytes()).await?; + + // If this was a QUIT command, close the connection + if is_quit { + println!("[handle] QUIT command received, closing connection"); + return Ok(()); + } } else { println!("[handle] going to break"); break; diff --git a/src/storage.rs b/src/storage.rs index b738ce4..f4020c2 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -506,4 +506,133 @@ impl Storage { Ok((next_cursor, keys)) } + + pub fn hscan(&self, key: &str, cursor: u64, pattern: Option<&str>, count: Option) -> Result<(u64, Vec), DBError> { + let read_txn = self.db.begin_read()?; + + // Check if key exists and is a hash + let types_table = read_txn.open_table(TYPES_TABLE)?; + match types_table.get(key)? { + Some(type_val) if type_val.value() == "hash" => { + let hashes_table = read_txn.open_table(HASHES_TABLE)?; + let count = count.unwrap_or(10); + let mut fields = Vec::new(); + let mut current_cursor = 0u64; + let mut returned_fields = 0u64; + + let mut iter = hashes_table.iter()?; + while let Some(entry) = iter.next() { + let entry = entry?; + let (hash_key, field) = entry.0.value(); + let value = entry.1.value(); + + if hash_key != key { + continue; + } + + // Skip fields until we reach the cursor position + if current_cursor < cursor { + current_cursor += 1; + continue; + } + + // Check if field matches pattern + let matches = match pattern { + Some(pat) => { + if pat == "*" { + true + } else if pat.contains('*') { + let pattern_parts: Vec<&str> = pat.split('*').collect(); + if pattern_parts.len() == 2 { + let prefix = pattern_parts[0]; + let suffix = pattern_parts[1]; + field.starts_with(prefix) && field.ends_with(suffix) + } else { + field.contains(&pat.replace('*', "")) + } + } else { + field.contains(pat) + } + } + None => true, + }; + + if matches { + fields.push(field.to_string()); + fields.push(value.to_string()); + returned_fields += 1; + + if returned_fields >= count { + current_cursor += 1; + break; + } + } + + current_cursor += 1; + } + + let next_cursor = if returned_fields < count { 0 } else { current_cursor }; + Ok((next_cursor, fields)) + } + Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())), + None => Ok((0, Vec::new())), + } + } + + pub fn ttl(&self, key: &str) -> Result { + let read_txn = self.db.begin_read()?; + + // Check if key exists + let types_table = read_txn.open_table(TYPES_TABLE)?; + match types_table.get(key)? { + Some(type_val) if type_val.value() == "string" => { + let strings_table = read_txn.open_table(STRINGS_TABLE)?; + match strings_table.get(key)? { + Some(data) => { + let string_value: StringValue = bincode::deserialize(data.value())?; + match string_value.expires_at_ms { + Some(expires_at) => { + let now = now_in_millis(); + if now > expires_at { + Ok(-2) // Key expired + } else { + Ok(((expires_at - now) / 1000) as i64) // TTL in seconds + } + } + None => Ok(-1), // No expiration + } + } + None => Ok(-2), // Key doesn't exist + } + } + Some(_) => Ok(-1), // Other types don't have TTL implemented yet + None => Ok(-2), // Key doesn't exist + } + } + + pub fn exists(&self, key: &str) -> Result { + let read_txn = self.db.begin_read()?; + let types_table = read_txn.open_table(TYPES_TABLE)?; + + match types_table.get(key)? { + Some(_) => { + // For string types, check if not expired + if let Some(type_val) = types_table.get(key)? { + if type_val.value() == "string" { + let strings_table = read_txn.open_table(STRINGS_TABLE)?; + if let Some(data) = strings_table.get(key)? { + let string_value: StringValue = bincode::deserialize(data.value())?; + if let Some(expires_at) = string_value.expires_at_ms { + if now_in_millis() > expires_at { + return Ok(false); // Expired + } + } + } + } + } + Ok(true) + } + None => Ok(false), + } + } } diff --git a/test_herodb.sh b/test_herodb.sh index b3552d4..3dcb285 100755 --- a/test_herodb.sh +++ b/test_herodb.sh @@ -14,7 +14,7 @@ NC='\033[0m' # No Color # Configuration DB_DIR="./test_db" -PORT=6379 +PORT=6381 SERVER_PID="" # Function to print colored output diff --git a/tests/debug_hset.rs b/tests/debug_hset.rs new file mode 100644 index 0000000..5793a61 --- /dev/null +++ b/tests/debug_hset.rs @@ -0,0 +1,59 @@ +use redis_rs::{server::Server, options::DBOption}; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::time::sleep; + +// Helper function to send command and get response +async fn send_command(stream: &mut TcpStream, command: &str) -> String { + stream.write_all(command.as_bytes()).await.unwrap(); + + let mut buffer = [0; 1024]; + let n = stream.read(&mut buffer).await.unwrap(); + String::from_utf8_lossy(&buffer[..n]).to_string() +} + +#[tokio::test] +async fn debug_hset_simple() { + // Clean up any existing test database + let test_dir = "/tmp/herodb_debug_hset"; + let _ = std::fs::remove_dir_all(test_dir); + std::fs::create_dir_all(test_dir).unwrap(); + + let port = 16500; + let option = DBOption { + dir: test_dir.to_string(), + port, + }; + + let mut server = Server::new(option).await; + + // Start server in background + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap(); + + // Test simple HSET + println!("Testing HSET..."); + let response = send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n").await; + println!("HSET response: {}", response); + assert!(response.contains("1"), "Expected '1' but got: {}", response); + + // Test HGET + println!("Testing HGET..."); + let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; + println!("HGET response: {}", response); + assert!(response.contains("value1"), "Expected 'value1' but got: {}", response); +} \ No newline at end of file diff --git a/tests/debug_hset_simple.rs b/tests/debug_hset_simple.rs new file mode 100644 index 0000000..969df73 --- /dev/null +++ b/tests/debug_hset_simple.rs @@ -0,0 +1,53 @@ +use redis_rs::{server::Server, options::DBOption}; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::time::sleep; + +#[tokio::test] +async fn debug_hset_return_value() { + let test_dir = "/tmp/herodb_debug_hset_return"; + + // Clean up any existing test data + let _ = std::fs::remove_dir_all(&test_dir); + std::fs::create_dir_all(&test_dir).unwrap(); + + let option = DBOption { + dir: test_dir.to_string(), + port: 16390, + }; + + let mut server = Server::new(option).await; + + // Start server in background + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind("127.0.0.1:16390") + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + // Connect and test HSET + let mut stream = TcpStream::connect("127.0.0.1:16390").await.unwrap(); + + // Send HSET command + let cmd = "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n"; + stream.write_all(cmd.as_bytes()).await.unwrap(); + + let mut buffer = [0; 1024]; + let n = stream.read(&mut buffer).await.unwrap(); + let response = String::from_utf8_lossy(&buffer[..n]); + + println!("HSET response: {}", response); + println!("Response bytes: {:?}", &buffer[..n]); + + // Check if response contains "1" + assert!(response.contains("1"), "Expected response to contain '1', got: {}", response); +} \ No newline at end of file diff --git a/tests/debug_protocol.rs b/tests/debug_protocol.rs new file mode 100644 index 0000000..037ef4a --- /dev/null +++ b/tests/debug_protocol.rs @@ -0,0 +1,35 @@ +use redis_rs::protocol::Protocol; +use redis_rs::cmd::Cmd; + +#[test] +fn test_protocol_parsing() { + // Test TYPE command parsing + let type_cmd = "*2\r\n$4\r\nTYPE\r\n$7\r\nnoexist\r\n"; + println!("Parsing TYPE command: {}", type_cmd.replace("\r\n", "\\r\\n")); + + match Protocol::from(type_cmd) { + Ok((protocol, _)) => { + println!("Protocol parsed successfully: {:?}", protocol); + match Cmd::from(type_cmd) { + Ok((cmd, _)) => println!("Command parsed successfully: {:?}", cmd), + Err(e) => println!("Command parsing failed: {:?}", e), + } + } + Err(e) => println!("Protocol parsing failed: {:?}", e), + } + + // Test HEXISTS command parsing + let hexists_cmd = "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$7\r\nnoexist\r\n"; + println!("\nParsing HEXISTS command: {}", hexists_cmd.replace("\r\n", "\\r\\n")); + + match Protocol::from(hexists_cmd) { + Ok((protocol, _)) => { + println!("Protocol parsed successfully: {:?}", protocol); + match Cmd::from(hexists_cmd) { + Ok((cmd, _)) => println!("Command parsed successfully: {:?}", cmd), + Err(e) => println!("Command parsing failed: {:?}", e), + } + } + Err(e) => println!("Protocol parsing failed: {:?}", e), + } +} \ No newline at end of file diff --git a/tests/redis_integration_tests.rs b/tests/redis_integration_tests.rs new file mode 100644 index 0000000..39b0460 --- /dev/null +++ b/tests/redis_integration_tests.rs @@ -0,0 +1,557 @@ +use redis_rs::{server::Server, options::DBOption}; +use redis::{Client, Commands, Connection}; +use std::time::Duration; +use tokio::time::{sleep, timeout}; +use tokio::sync::oneshot; + +// Helper function to start a test server with clean data directory +async fn start_test_server(test_name: &str) -> (Server, u16) { + use std::sync::atomic::{AtomicU16, Ordering}; + static PORT_COUNTER: AtomicU16 = AtomicU16::new(16400); + + // Get a unique port for this test + let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); + + // Ensure port is available by trying to bind to it first + let mut attempts = 0; + let final_port = loop { + let test_port = port + attempts; + match tokio::net::TcpListener::bind(format!("127.0.0.1:{}", test_port)).await { + Ok(_) => break test_port, + Err(_) => { + attempts += 1; + if attempts > 100 { + panic!("Could not find available port after 100 attempts"); + } + } + } + }; + let test_dir = format!("/tmp/herodb_test_{}", test_name); + + // Clean up any existing test data + let _ = std::fs::remove_dir_all(&test_dir); + std::fs::create_dir_all(&test_dir).unwrap(); + + let option = DBOption { + dir: test_dir, + port: final_port, + }; + + let server = Server::new(option).await; + (server, final_port) +} + +// Helper function to get Redis connection +fn get_redis_connection(port: u16) -> Connection { + let client = Client::open(format!("redis://127.0.0.1:{}/", port)).unwrap(); + let mut attempts = 0; + loop { + match client.get_connection() { + Ok(conn) => return conn, + Err(_) if attempts < 20 => { + attempts += 1; + std::thread::sleep(Duration::from_millis(100)); + } + Err(e) => panic!("Failed to connect to Redis server: {}", e), + } + } +} + +#[tokio::test] +async fn test_basic_ping() { + let (mut server, port) = start_test_server("ping").await; + + // Start server in background + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + let result: String = redis::cmd("PING").query(&mut conn).unwrap(); + assert_eq!(result, "PONG"); +} + +#[tokio::test] +async fn test_string_operations() { + let (mut server, port) = start_test_server("string").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Test SET + let _: () = conn.set("key", "value").unwrap(); + + // Test GET + let result: String = conn.get("key").unwrap(); + assert_eq!(result, "value"); + + // Test GET non-existent key + let result: Option = conn.get("noexist").unwrap(); + assert_eq!(result, None); + + // Test DEL + let deleted: i32 = conn.del("key").unwrap(); + assert_eq!(deleted, 1); + + // Test GET after DEL + let result: Option = conn.get("key").unwrap(); + assert_eq!(result, None); +} + +#[tokio::test] +async fn test_incr_operations() { + let (mut server, port) = start_test_server("incr").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Test INCR on non-existent key + let result: i32 = conn.incr("counter", 1).unwrap(); + assert_eq!(result, 1); + + // Test INCR on existing key + let result: i32 = conn.incr("counter", 1).unwrap(); + assert_eq!(result, 2); + + // Test INCR on string value (should fail) + let _: () = conn.set("string", "hello").unwrap(); + let result: Result = conn.incr("string", 1); + assert!(result.is_err()); +} + +#[tokio::test] +async fn test_hash_operations() { + let (mut server, port) = start_test_server("hash").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Test HSET + let result: i32 = conn.hset("hash", "field1", "value1").unwrap(); + assert_eq!(result, 1); // 1 new field + + // Test HGET + let result: String = conn.hget("hash", "field1").unwrap(); + assert_eq!(result, "value1"); + + // Test HSET multiple fields + let _: () = conn.hset_multiple("hash", &[("field2", "value2"), ("field3", "value3")]).unwrap(); + + // Test HGETALL + let result: std::collections::HashMap = conn.hgetall("hash").unwrap(); + assert_eq!(result.len(), 3); + assert_eq!(result.get("field1").unwrap(), "value1"); + assert_eq!(result.get("field2").unwrap(), "value2"); + assert_eq!(result.get("field3").unwrap(), "value3"); + + // Test HLEN + let result: i32 = conn.hlen("hash").unwrap(); + assert_eq!(result, 3); + + // Test HEXISTS + let result: bool = conn.hexists("hash", "field1").unwrap(); + assert_eq!(result, true); + + let result: bool = conn.hexists("hash", "noexist").unwrap(); + assert_eq!(result, false); + + // Test HDEL + let result: i32 = conn.hdel("hash", "field1").unwrap(); + assert_eq!(result, 1); + + // Test HKEYS + let mut result: Vec = conn.hkeys("hash").unwrap(); + result.sort(); + assert_eq!(result, vec!["field2", "field3"]); + + // Test HVALS + let mut result: Vec = conn.hvals("hash").unwrap(); + result.sort(); + assert_eq!(result, vec!["value2", "value3"]); +} + +#[tokio::test] +async fn test_expiration() { + let (mut server, port) = start_test_server("expiration").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Test SETEX (expire in 1 second) + let _: () = conn.set_ex("expkey", "value", 1).unwrap(); + + // Test TTL + let result: i32 = conn.ttl("expkey").unwrap(); + assert!(result == 1 || result == 0); // Should be 1 or 0 seconds + + // Test EXISTS + let result: bool = conn.exists("expkey").unwrap(); + assert_eq!(result, true); + + // Wait for expiration + sleep(Duration::from_millis(1100)).await; + + // Test GET after expiration + let result: Option = conn.get("expkey").unwrap(); + assert_eq!(result, None); + + // Test TTL after expiration + let result: i32 = conn.ttl("expkey").unwrap(); + assert_eq!(result, -2); // Key doesn't exist + + // Test EXISTS after expiration + let result: bool = conn.exists("expkey").unwrap(); + assert_eq!(result, false); +} + +#[tokio::test] +async fn test_scan_operations() { + let (mut server, port) = start_test_server("scan").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Set up test data + for i in 0..5 { + let _: () = conn.set(format!("key{}", i), format!("value{}", i)).unwrap(); + } + + // Test SCAN + let result: (u64, Vec) = redis::cmd("SCAN") + .arg(0) + .arg("MATCH") + .arg("*") + .arg("COUNT") + .arg(10) + .query(&mut conn) + .unwrap(); + + let (cursor, keys) = result; + assert_eq!(cursor, 0); // Should complete in one scan + assert_eq!(keys.len(), 5); + + // Test KEYS + let mut result: Vec = redis::cmd("KEYS").arg("*").query(&mut conn).unwrap(); + result.sort(); + assert_eq!(result, vec!["key0", "key1", "key2", "key3", "key4"]); +} + +#[tokio::test] +async fn test_hscan_operations() { + let (mut server, port) = start_test_server("hscan").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Set up hash data + for i in 0..3 { + let _: () = conn.hset("testhash", format!("field{}", i), format!("value{}", i)).unwrap(); + } + + // Test HSCAN + let result: (u64, Vec) = redis::cmd("HSCAN") + .arg("testhash") + .arg(0) + .arg("MATCH") + .arg("*") + .arg("COUNT") + .arg(10) + .query(&mut conn) + .unwrap(); + + let (cursor, fields) = result; + assert_eq!(cursor, 0); // Should complete in one scan + assert_eq!(fields.len(), 6); // 3 field-value pairs = 6 elements +} + +#[tokio::test] +async fn test_transaction_operations() { + let (mut server, port) = start_test_server("transaction").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Test MULTI/EXEC + let _: () = redis::cmd("MULTI").query(&mut conn).unwrap(); + let _: () = redis::cmd("SET").arg("key1").arg("value1").query(&mut conn).unwrap(); + let _: () = redis::cmd("SET").arg("key2").arg("value2").query(&mut conn).unwrap(); + let _: Vec = redis::cmd("EXEC").query(&mut conn).unwrap(); + + // Verify commands were executed + let result: String = conn.get("key1").unwrap(); + assert_eq!(result, "value1"); + + let result: String = conn.get("key2").unwrap(); + assert_eq!(result, "value2"); +} + +#[tokio::test] +async fn test_discard_transaction() { + let (mut server, port) = start_test_server("discard").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Test MULTI/DISCARD + let _: () = redis::cmd("MULTI").query(&mut conn).unwrap(); + let _: () = redis::cmd("SET").arg("discard").arg("value").query(&mut conn).unwrap(); + let _: () = redis::cmd("DISCARD").query(&mut conn).unwrap(); + + // Verify command was not executed + let result: Option = conn.get("discard").unwrap(); + assert_eq!(result, None); +} + +#[tokio::test] +async fn test_type_command() { + let (mut server, port) = start_test_server("type").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Test string type + let _: () = conn.set("string", "value").unwrap(); + let result: String = redis::cmd("TYPE").arg("string").query(&mut conn).unwrap(); + assert_eq!(result, "string"); + + // Test hash type + let _: () = conn.hset("hash", "field", "value").unwrap(); + let result: String = redis::cmd("TYPE").arg("hash").query(&mut conn).unwrap(); + assert_eq!(result, "hash"); + + // Test non-existent key + let result: String = redis::cmd("TYPE").arg("noexist").query(&mut conn).unwrap(); + assert_eq!(result, "none"); +} + +#[tokio::test] +async fn test_config_commands() { + let (mut server, port) = start_test_server("config").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Test CONFIG GET databases + let result: Vec = redis::cmd("CONFIG") + .arg("GET") + .arg("databases") + .query(&mut conn) + .unwrap(); + assert_eq!(result, vec!["databases", "16"]); + + // Test CONFIG GET dir + let result: Vec = redis::cmd("CONFIG") + .arg("GET") + .arg("dir") + .query(&mut conn) + .unwrap(); + assert_eq!(result[0], "dir"); + assert!(result[1].contains("/tmp/herodb_test_config")); +} + +#[tokio::test] +async fn test_info_command() { + let (mut server, port) = start_test_server("info").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Test INFO + let result: String = redis::cmd("INFO").query(&mut conn).unwrap(); + assert!(result.contains("redis_version")); + + // Test INFO replication + let result: String = redis::cmd("INFO").arg("replication").query(&mut conn).unwrap(); + assert!(result.contains("role:master")); +} + +#[tokio::test] +async fn test_error_handling() { + let (mut server, port) = start_test_server("error").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut conn = get_redis_connection(port); + + // Test WRONGTYPE error - try to use hash command on string + let _: () = conn.set("string", "value").unwrap(); + let result: Result = conn.hget("string", "field"); + assert!(result.is_err()); + + // Test unknown command + let result: Result = redis::cmd("UNKNOWN").query(&mut conn); + assert!(result.is_err()); + + // Test EXEC without MULTI + let result: Result, _> = redis::cmd("EXEC").query(&mut conn); + assert!(result.is_err()); + + // Test DISCARD without MULTI + let result: Result<(), _> = redis::cmd("DISCARD").query(&mut conn); + assert!(result.is_err()); +} \ No newline at end of file diff --git a/tests/redis_tests.rs b/tests/redis_tests.rs new file mode 100644 index 0000000..ed08dcb --- /dev/null +++ b/tests/redis_tests.rs @@ -0,0 +1,544 @@ +use redis_rs::{server::Server, options::DBOption}; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::time::sleep; + +// Helper function to start a test server +async fn start_test_server(test_name: &str) -> (Server, u16) { + use std::sync::atomic::{AtomicU16, Ordering}; + static PORT_COUNTER: AtomicU16 = AtomicU16::new(16379); + + let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); + let test_dir = format!("/tmp/herodb_test_{}", test_name); + + // Create test directory + std::fs::create_dir_all(&test_dir).unwrap(); + + let option = DBOption { + dir: test_dir, + port, + }; + + let server = Server::new(option).await; + (server, port) +} + +// Helper function to connect to the test server +async fn connect_to_server(port: u16) -> TcpStream { + let mut attempts = 0; + loop { + match TcpStream::connect(format!("127.0.0.1:{}", port)).await { + Ok(stream) => return stream, + Err(_) if attempts < 10 => { + attempts += 1; + sleep(Duration::from_millis(100)).await; + } + Err(e) => panic!("Failed to connect to test server: {}", e), + } + } +} + +// Helper function to send command and get response +async fn send_command(stream: &mut TcpStream, command: &str) -> String { + stream.write_all(command.as_bytes()).await.unwrap(); + + let mut buffer = [0; 1024]; + let n = stream.read(&mut buffer).await.unwrap(); + String::from_utf8_lossy(&buffer[..n]).to_string() +} + +#[tokio::test] +async fn test_basic_ping() { + let (mut server, port) = start_test_server("ping").await; + + // Start server in background + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + let response = send_command(&mut stream, "*1\r\n$4\r\nPING\r\n").await; + assert!(response.contains("PONG")); +} + +#[tokio::test] +async fn test_string_operations() { + let (mut server, port) = start_test_server("string").await; + + // Start server in background + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Test SET + let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n").await; + assert!(response.contains("OK")); + + // Test GET + let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n").await; + assert!(response.contains("value")); + + // Test GET non-existent key + let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$7\r\nnoexist\r\n").await; + assert!(response.contains("$-1")); // NULL response + + // Test DEL + let response = send_command(&mut stream, "*2\r\n$3\r\nDEL\r\n$3\r\nkey\r\n").await; + assert!(response.contains("1")); + + // Test GET after DEL + let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n").await; + assert!(response.contains("$-1")); // NULL response +} + +#[tokio::test] +async fn test_incr_operations() { + let (mut server, port) = start_test_server("incr").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Test INCR on non-existent key + let response = send_command(&mut stream, "*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n").await; + assert!(response.contains("1")); + + // Test INCR on existing key + let response = send_command(&mut stream, "*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n").await; + assert!(response.contains("2")); + + // Test INCR on string value (should fail) + send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$6\r\nstring\r\n$5\r\nhello\r\n").await; + let response = send_command(&mut stream, "*2\r\n$4\r\nINCR\r\n$6\r\nstring\r\n").await; + assert!(response.contains("ERR")); +} + +#[tokio::test] +async fn test_hash_operations() { + let (mut server, port) = start_test_server("hash").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Test HSET + let response = send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n").await; + assert!(response.contains("1")); // 1 new field + + // Test HGET + let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; + assert!(response.contains("value1")); + + // Test HSET multiple fields + let response = send_command(&mut stream, "*6\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield2\r\n$6\r\nvalue2\r\n$6\r\nfield3\r\n$6\r\nvalue3\r\n").await; + assert!(response.contains("2")); // 2 new fields + + // Test HGETALL + let response = send_command(&mut stream, "*2\r\n$7\r\nHGETALL\r\n$4\r\nhash\r\n").await; + assert!(response.contains("field1")); + assert!(response.contains("value1")); + assert!(response.contains("field2")); + assert!(response.contains("value2")); + + // Test HLEN + let response = send_command(&mut stream, "*2\r\n$4\r\nHLEN\r\n$4\r\nhash\r\n").await; + assert!(response.contains("3")); + + // Test HEXISTS + let response = send_command(&mut stream, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; + assert!(response.contains("1")); + + let response = send_command(&mut stream, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$8\r\nnoexist\r\n").await; + assert!(response.contains("0")); + + // Test HDEL + let response = send_command(&mut stream, "*3\r\n$4\r\nHDEL\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; + assert!(response.contains("1")); + + // Test HKEYS + let response = send_command(&mut stream, "*2\r\n$5\r\nHKEYS\r\n$4\r\nhash\r\n").await; + assert!(response.contains("field2")); + assert!(response.contains("field3")); + assert!(!response.contains("field1")); // Should be deleted + + // Test HVALS + let response = send_command(&mut stream, "*2\r\n$5\r\nHVALS\r\n$4\r\nhash\r\n").await; + assert!(response.contains("value2")); + assert!(response.contains("value3")); +} + +#[tokio::test] +async fn test_expiration() { + let (mut server, port) = start_test_server("expiration").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Test SETEX (expire in 1 second) + let response = send_command(&mut stream, "*5\r\n$3\r\nSET\r\n$6\r\nexpkey\r\n$5\r\nvalue\r\n$2\r\nEX\r\n$1\r\n1\r\n").await; + assert!(response.contains("OK")); + + // Test TTL + let response = send_command(&mut stream, "*2\r\n$3\r\nTTL\r\n$6\r\nexpkey\r\n").await; + assert!(response.contains("1") || response.contains("0")); // Should be 1 or 0 seconds + + // Test EXISTS + let response = send_command(&mut stream, "*2\r\n$6\r\nEXISTS\r\n$6\r\nexpkey\r\n").await; + assert!(response.contains("1")); + + // Wait for expiration + sleep(Duration::from_millis(1100)).await; + + // Test GET after expiration + let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$6\r\nexpkey\r\n").await; + assert!(response.contains("$-1")); // Should be NULL + + // Test TTL after expiration + let response = send_command(&mut stream, "*2\r\n$3\r\nTTL\r\n$6\r\nexpkey\r\n").await; + assert!(response.contains("-2")); // Key doesn't exist + + // Test EXISTS after expiration + let response = send_command(&mut stream, "*2\r\n$6\r\nEXISTS\r\n$6\r\nexpkey\r\n").await; + assert!(response.contains("0")); +} + +#[tokio::test] +async fn test_scan_operations() { + let (mut server, port) = start_test_server("scan").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Set up test data + for i in 0..5 { + let cmd = format!("*3\r\n$3\r\nSET\r\n$4\r\nkey{}\r\n$6\r\nvalue{}\r\n", i, i); + send_command(&mut stream, &cmd).await; + } + + // Test SCAN + let response = send_command(&mut stream, "*6\r\n$4\r\nSCAN\r\n$1\r\n0\r\n$5\r\nMATCH\r\n$1\r\n*\r\n$5\r\nCOUNT\r\n$2\r\n10\r\n").await; + assert!(response.contains("key")); + + // Test KEYS + let response = send_command(&mut stream, "*2\r\n$4\r\nKEYS\r\n$1\r\n*\r\n").await; + assert!(response.contains("key0")); + assert!(response.contains("key1")); +} + +#[tokio::test] +async fn test_hscan_operations() { + let (mut server, port) = start_test_server("hscan").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Set up hash data + for i in 0..3 { + let cmd = format!("*4\r\n$4\r\nHSET\r\n$8\r\ntesthash\r\n$6\r\nfield{}\r\n$6\r\nvalue{}\r\n", i, i); + send_command(&mut stream, &cmd).await; + } + + // Test HSCAN + let response = send_command(&mut stream, "*7\r\n$5\r\nHSCAN\r\n$8\r\ntesthash\r\n$1\r\n0\r\n$5\r\nMATCH\r\n$1\r\n*\r\n$5\r\nCOUNT\r\n$2\r\n10\r\n").await; + assert!(response.contains("field")); + assert!(response.contains("value")); +} + +#[tokio::test] +async fn test_transaction_operations() { + let (mut server, port) = start_test_server("transaction").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Test MULTI + let response = send_command(&mut stream, "*1\r\n$5\r\nMULTI\r\n").await; + assert!(response.contains("OK")); + + // Test queued commands + let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n").await; + assert!(response.contains("QUEUED")); + + let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n").await; + assert!(response.contains("QUEUED")); + + // Test EXEC + let response = send_command(&mut stream, "*1\r\n$4\r\nEXEC\r\n").await; + assert!(response.contains("OK")); // Should contain results of executed commands + + // Verify commands were executed + let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$4\r\nkey1\r\n").await; + assert!(response.contains("value1")); + + let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$4\r\nkey2\r\n").await; + assert!(response.contains("value2")); +} + +#[tokio::test] +async fn test_discard_transaction() { + let (mut server, port) = start_test_server("discard").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Test MULTI + let response = send_command(&mut stream, "*1\r\n$5\r\nMULTI\r\n").await; + assert!(response.contains("OK")); + + // Test queued command + let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$7\r\ndiscard\r\n$5\r\nvalue\r\n").await; + assert!(response.contains("QUEUED")); + + // Test DISCARD + let response = send_command(&mut stream, "*1\r\n$7\r\nDISCARD\r\n").await; + assert!(response.contains("OK")); + + // Verify command was not executed + let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$7\r\ndiscard\r\n").await; + assert!(response.contains("$-1")); // Should be NULL +} + +#[tokio::test] +async fn test_type_command() { + let (mut server, port) = start_test_server("type").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Test string type + send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$6\r\nstring\r\n$5\r\nvalue\r\n").await; + let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$6\r\nstring\r\n").await; + assert!(response.contains("string")); + + // Test hash type + send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$5\r\nfield\r\n$5\r\nvalue\r\n").await; + let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$4\r\nhash\r\n").await; + assert!(response.contains("hash")); + + // Test non-existent key + let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$8\r\nnoexist\r\n").await; + assert!(response.contains("none")); +} + +#[tokio::test] +async fn test_config_commands() { + let (mut server, port) = start_test_server("config").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Test CONFIG GET databases + let response = send_command(&mut stream, "*3\r\n$6\r\nCONFIG\r\n$3\r\nGET\r\n$9\r\ndatabases\r\n").await; + assert!(response.contains("databases")); + assert!(response.contains("16")); + + // Test CONFIG GET dir + let response = send_command(&mut stream, "*3\r\n$6\r\nCONFIG\r\n$3\r\nGET\r\n$3\r\ndir\r\n").await; + assert!(response.contains("dir")); + assert!(response.contains("/tmp/herodb_test_config")); +} + +#[tokio::test] +async fn test_info_command() { + let (mut server, port) = start_test_server("info").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Test INFO + let response = send_command(&mut stream, "*1\r\n$4\r\nINFO\r\n").await; + assert!(response.contains("redis_version")); + + // Test INFO replication + let response = send_command(&mut stream, "*2\r\n$4\r\nINFO\r\n$11\r\nreplication\r\n").await; + assert!(response.contains("role:master")); +} + +#[tokio::test] +async fn test_error_handling() { + let (mut server, port) = start_test_server("error").await; + + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + let mut stream = connect_to_server(port).await; + + // Test WRONGTYPE error - try to use hash command on string + send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$6\r\nstring\r\n$5\r\nvalue\r\n").await; + let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$6\r\nstring\r\n$5\r\nfield\r\n").await; + assert!(response.contains("WRONGTYPE")); + + // Test unknown command + let response = send_command(&mut stream, "*1\r\n$7\r\nUNKNOWN\r\n").await; + assert!(response.contains("unknown cmd") || response.contains("ERR")); + + // Test EXEC without MULTI + let response = send_command(&mut stream, "*1\r\n$4\r\nEXEC\r\n").await; + assert!(response.contains("ERR")); + + // Test DISCARD without MULTI + let response = send_command(&mut stream, "*1\r\n$7\r\nDISCARD\r\n").await; + assert!(response.contains("ERR")); +} \ No newline at end of file diff --git a/tests/simple_integration_test.rs b/tests/simple_integration_test.rs new file mode 100644 index 0000000..b4acf28 --- /dev/null +++ b/tests/simple_integration_test.rs @@ -0,0 +1,206 @@ +use redis_rs::{server::Server, options::DBOption}; +use std::time::Duration; +use tokio::time::sleep; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; + +// Helper function to start a test server with clean data directory +async fn start_test_server(test_name: &str) -> (Server, u16) { + use std::sync::atomic::{AtomicU16, Ordering}; + static PORT_COUNTER: AtomicU16 = AtomicU16::new(17000); + + // Get a unique port for this test + let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); + + let test_dir = format!("/tmp/herodb_test_{}", test_name); + + // Clean up any existing test data + let _ = std::fs::remove_dir_all(&test_dir); + std::fs::create_dir_all(&test_dir).unwrap(); + + let option = DBOption { + dir: test_dir, + port, + }; + + let server = Server::new(option).await; + (server, port) +} + +// Helper function to send Redis command and get response +async fn send_redis_command(port: u16, command: &str) -> String { + let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap(); + stream.write_all(command.as_bytes()).await.unwrap(); + + let mut buffer = [0; 1024]; + let n = stream.read(&mut buffer).await.unwrap(); + String::from_utf8_lossy(&buffer[..n]).to_string() +} + +#[tokio::test] +async fn test_basic_redis_functionality() { + let (mut server, port) = start_test_server("basic").await; + + // Start server in background with timeout + let server_handle = tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + // Accept only a few connections for testing + for _ in 0..10 { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + // Test PING + let response = send_redis_command(port, "*1\r\n$4\r\nPING\r\n").await; + assert!(response.contains("PONG")); + + // Test SET + let response = send_redis_command(port, "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n").await; + assert!(response.contains("OK")); + + // Test GET + let response = send_redis_command(port, "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n").await; + assert!(response.contains("value")); + + // Test HSET + let response = send_redis_command(port, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$5\r\nfield\r\n$5\r\nvalue\r\n").await; + assert!(response.contains("1")); + + // Test HGET + let response = send_redis_command(port, "*3\r\n$4\r\nHGET\r\n$4\r\nhash\r\n$5\r\nfield\r\n").await; + assert!(response.contains("value")); + + // Test EXISTS + let response = send_redis_command(port, "*2\r\n$6\r\nEXISTS\r\n$3\r\nkey\r\n").await; + assert!(response.contains("1")); + + // Test TTL + let response = send_redis_command(port, "*2\r\n$3\r\nTTL\r\n$3\r\nkey\r\n").await; + assert!(response.contains("-1")); // No expiration + + // Test TYPE + let response = send_redis_command(port, "*2\r\n$4\r\nTYPE\r\n$3\r\nkey\r\n").await; + assert!(response.contains("string")); + + // Test QUIT to close connection gracefully + let response = send_redis_command(port, "*1\r\n$4\r\nQUIT\r\n").await; + assert!(response.contains("OK")); + + // Stop the server + server_handle.abort(); + + println!("✅ All basic Redis functionality tests passed!"); +} + +#[tokio::test] +async fn test_hash_operations() { + let (mut server, port) = start_test_server("hash_ops").await; + + // Start server in background with timeout + let server_handle = tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + // Accept only a few connections for testing + for _ in 0..5 { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + // Test HSET multiple fields + let response = send_redis_command(port, "*6\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n$6\r\nfield2\r\n$6\r\nvalue2\r\n").await; + assert!(response.contains("2")); // 2 new fields + + // Test HGETALL + let response = send_redis_command(port, "*2\r\n$7\r\nHGETALL\r\n$4\r\nhash\r\n").await; + assert!(response.contains("field1")); + assert!(response.contains("value1")); + assert!(response.contains("field2")); + assert!(response.contains("value2")); + + // Test HEXISTS + let response = send_redis_command(port, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; + assert!(response.contains("1")); + + // Test HLEN + let response = send_redis_command(port, "*2\r\n$4\r\nHLEN\r\n$4\r\nhash\r\n").await; + assert!(response.contains("2")); + + // Test HSCAN + let response = send_redis_command(port, "*6\r\n$5\r\nHSCAN\r\n$4\r\nhash\r\n$1\r\n0\r\n$5\r\nMATCH\r\n$1\r\n*\r\n$5\r\nCOUNT\r\n$2\r\n10\r\n").await; + assert!(response.contains("*2\r\n$1\r\n0\r\n*4\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n$6\r\nfield2\r\n$6\r\nvalue2\r\n")); + + // Stop the server + server_handle.abort(); + + println!("✅ All hash operations tests passed!"); +} + +#[tokio::test] +async fn test_transaction_operations() { + let (mut server, port) = start_test_server("transactions").await; + + // Start server in background with timeout + let server_handle = tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + // Accept only a few connections for testing + for _ in 0..5 { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(100)).await; + + // Use a single connection for the transaction + let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap(); + + // Test MULTI + stream.write_all("*1\r\n$5\r\nMULTI\r\n".as_bytes()).await.unwrap(); + let mut buffer = [0; 1024]; + let n = stream.read(&mut buffer).await.unwrap(); + let response = String::from_utf8_lossy(&buffer[..n]); + assert!(response.contains("OK")); + + // Test queued commands + stream.write_all("*3\r\n$3\r\nSET\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n".as_bytes()).await.unwrap(); + let n = stream.read(&mut buffer).await.unwrap(); + let response = String::from_utf8_lossy(&buffer[..n]); + assert!(response.contains("QUEUED")); + + stream.write_all("*3\r\n$3\r\nSET\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n".as_bytes()).await.unwrap(); + let n = stream.read(&mut buffer).await.unwrap(); + let response = String::from_utf8_lossy(&buffer[..n]); + assert!(response.contains("QUEUED")); + + // Test EXEC + stream.write_all("*1\r\n$4\r\nEXEC\r\n".as_bytes()).await.unwrap(); + let n = stream.read(&mut buffer).await.unwrap(); + let response = String::from_utf8_lossy(&buffer[..n]); + assert!(response.contains("OK")); // Should contain array of OK responses + + // Verify commands were executed + let response = send_redis_command(port, "*2\r\n$3\r\nGET\r\n$4\r\nkey1\r\n").await; + assert!(response.contains("value1")); + + // Stop the server + server_handle.abort(); + + println!("✅ All transaction operations tests passed!"); +} \ No newline at end of file diff --git a/tests/simple_redis_test.rs b/tests/simple_redis_test.rs new file mode 100644 index 0000000..270781d --- /dev/null +++ b/tests/simple_redis_test.rs @@ -0,0 +1,180 @@ +use redis_rs::{server::Server, options::DBOption}; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::time::sleep; + +// Helper function to start a test server with clean data directory +async fn start_test_server(test_name: &str) -> (Server, u16) { + use std::sync::atomic::{AtomicU16, Ordering}; + static PORT_COUNTER: AtomicU16 = AtomicU16::new(16500); + + let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); + let test_dir = format!("/tmp/herodb_simple_test_{}", test_name); + + // Clean up any existing test data + let _ = std::fs::remove_dir_all(&test_dir); + std::fs::create_dir_all(&test_dir).unwrap(); + + let option = DBOption { + dir: test_dir, + port, + }; + + let server = Server::new(option).await; + (server, port) +} + +// Helper function to send command and get response +async fn send_command(stream: &mut TcpStream, command: &str) -> String { + stream.write_all(command.as_bytes()).await.unwrap(); + + let mut buffer = [0; 1024]; + let n = stream.read(&mut buffer).await.unwrap(); + String::from_utf8_lossy(&buffer[..n]).to_string() +} + +// Helper function to connect to the test server +async fn connect_to_server(port: u16) -> TcpStream { + let mut attempts = 0; + loop { + match TcpStream::connect(format!("127.0.0.1:{}", port)).await { + Ok(stream) => return stream, + Err(_) if attempts < 10 => { + attempts += 1; + sleep(Duration::from_millis(100)).await; + } + Err(e) => panic!("Failed to connect to test server: {}", e), + } + } +} + +#[tokio::test] +async fn test_basic_ping_simple() { + let (mut server, port) = start_test_server("ping").await; + + // Start server in background + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut stream = connect_to_server(port).await; + let response = send_command(&mut stream, "*1\r\n$4\r\nPING\r\n").await; + assert!(response.contains("PONG")); +} + +#[tokio::test] +async fn test_hset_clean_db() { + let (mut server, port) = start_test_server("hset_clean").await; + + // Start server in background + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut stream = connect_to_server(port).await; + + // Test HSET - should return 1 for new field + let response = send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n").await; + println!("HSET response: {}", response); + assert!(response.contains("1"), "Expected HSET to return 1, got: {}", response); + + // Test HGET + let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; + println!("HGET response: {}", response); + assert!(response.contains("value1")); +} + +#[tokio::test] +async fn test_type_command_simple() { + let (mut server, port) = start_test_server("type").await; + + // Start server in background + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut stream = connect_to_server(port).await; + + // Test string type + send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$6\r\nstring\r\n$5\r\nvalue\r\n").await; + let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$6\r\nstring\r\n").await; + println!("TYPE string response: {}", response); + assert!(response.contains("string")); + + // Test hash type + send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$5\r\nfield\r\n$5\r\nvalue\r\n").await; + let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$4\r\nhash\r\n").await; + println!("TYPE hash response: {}", response); + assert!(response.contains("hash")); + + // Test non-existent key + let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$7\r\nnoexist\r\n").await; + println!("TYPE noexist response: {}", response); + assert!(response.contains("none"), "Expected 'none' for non-existent key, got: {}", response); +} + +#[tokio::test] +async fn test_hexists_simple() { + let (mut server, port) = start_test_server("hexists").await; + + // Start server in background + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = server.handle(stream).await; + } + } + }); + + sleep(Duration::from_millis(200)).await; + + let mut stream = connect_to_server(port).await; + + // Set up hash + send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n").await; + + // Test HEXISTS for existing field + let response = send_command(&mut stream, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; + println!("HEXISTS existing field response: {}", response); + assert!(response.contains("1")); + + // Test HEXISTS for non-existent field + let response = send_command(&mut stream, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$7\r\nnoexist\r\n").await; + println!("HEXISTS non-existent field response: {}", response); + assert!(response.contains("0"), "Expected HEXISTS to return 0 for non-existent field, got: {}", response); +} \ No newline at end of file