From c5f0f8049210e24705cfaf74a7bc8c81c1d921d9 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 21 Aug 2025 11:23:42 +0200 Subject: [PATCH] Add redis storage driver Signed-off-by: Lee Smet --- Cargo.lock | 568 ++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 1 + src/lib.rs | 1 + src/storage.rs | 4 + src/storage/redis.rs | 278 +++++++++++++++++++++ 5 files changed, 851 insertions(+), 1 deletion(-) create mode 100644 src/storage.rs create mode 100644 src/storage/redis.rs diff --git a/Cargo.lock b/Cargo.lock index 090c278..dd9ebdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,6 +67,23 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -152,6 +169,140 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "gimli" version = "0.31.1" @@ -169,12 +320,120 @@ name = "herocoordinator" version = "0.1.0" dependencies = [ "clap", + "redis", "serde", "serde_json", "tokio", "tracing", ] +[[package]] +name = "icu_collections" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "200072f5d0e3614556f94a9930d5dc3e0662a652823904c3a75dc3b0af7fee47" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cde2700ccaed3872079a65fb1a78f6c0a36c91570f28755dda67bc8f7d9f00a" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "436880e8e18df4d7bbc06d58432329d6458cc84531f7ac5f024e93deadb37979" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" + +[[package]] +name = "icu_properties" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "potential_utf", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632" + +[[package]] +name = "icu_provider" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c80da27b5f4187909049ee2d72f276f0d9f99a42c306bd0131ecfe04d8e5af" +dependencies = [ + "displaydoc", + "icu_locale_core", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "io-uring" version = "0.7.9" @@ -204,6 +463,12 @@ version = "0.2.175" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" +[[package]] +name = "litemap" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" + [[package]] name = "lock_api" version = "0.4.13" @@ -284,12 +549,62 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "percent-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" + +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "potential_utf" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" +dependencies = [ + "zerovec", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "proc-macro2" version = "1.0.101" @@ -308,6 +623,60 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "redis" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "futures", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.5.10", + "tokio", + "tokio-retry", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.5.17" @@ -367,6 +736,12 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "signal-hook-registry" version = "1.4.6" @@ -388,6 +763,16 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.0" @@ -398,6 +783,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.11.1" @@ -415,6 +806,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tinystr" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tokio" version = "1.47.1" @@ -430,7 +842,7 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "slab", - "socket2", + "socket2 0.6.0", "tokio-macros", "windows-sys 0.59.0", ] @@ -446,6 +858,30 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tracing" version = "0.1.41" @@ -483,6 +919,23 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "url" +version = "2.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec961601b32b6f5d14ae8dabd35ff2ff2e2c6cb4c0e6641845ff105abe96d958" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -501,6 +954,15 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.59.0" @@ -647,3 +1109,107 @@ name = "windows_x86_64_msvc" version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" + +[[package]] +name = "writeable" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" + +[[package]] +name = "yoke" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerocopy" +version = "0.8.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36f0bbd478583f79edad978b407914f61b2972f5af6fa089686016be8f9af595" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7aa2bd55086f1ab526693ecbe444205da57e25f4489879da80635a46d90e73b" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index d18da53..0781739 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,4 @@ serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.143" tokio = { version = "1.47.1", features = ["full"] } tracing = "0.1.41" +redis = { version = "0.25.4", features = ["tokio-comp", "connection-manager", "aio"] } diff --git a/src/lib.rs b/src/lib.rs index 057f82b..875e6a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,2 +1,3 @@ pub mod models; +pub mod storage; mod time; diff --git a/src/storage.rs b/src/storage.rs new file mode 100644 index 0000000..e1a6129 --- /dev/null +++ b/src/storage.rs @@ -0,0 +1,4 @@ + +pub mod redis; + +pub use redis::RedisDriver; diff --git a/src/storage/redis.rs b/src/storage/redis.rs new file mode 100644 index 0000000..708a90f --- /dev/null +++ b/src/storage/redis.rs @@ -0,0 +1,278 @@ +use std::collections::HashMap as StdHashMap; + +use redis::{AsyncCommands, aio::ConnectionManager}; +use serde::Serialize; +use serde::de::DeserializeOwned; +use serde_json::{Map as JsonMap, Value}; +use tokio::sync::Mutex; + +use crate::models::{Actor, Context, Flow, Job, Message, Runner}; + +type Result = std::result::Result>; + +/// Async Redis driver that saves/loads every model as a Redis hash (HSET), +/// using canonical keys as specified in the specs. +/// - Complex fields (arrays, maps, nested structs) are JSON-encoded per field +/// - Scalars are written as plain strings (numbers/bools as their string representation) +/// - On load, each field value is first attempted to parse as JSON; if that fails it is treated as a plain string +pub struct RedisDriver { + /// Base address, e.g. "127.0.0.1:6379" or "redis://127.0.0.1:6379" + base_addr: String, + /// Cache of connection managers per DB index + managers: Mutex>, +} + +impl RedisDriver { + /// Create a new driver for the given Redis address. + /// Accepts either "host:port" or "redis://host:port" + pub async fn new(addr: impl Into) -> Result { + let raw = addr.into(); + let base_addr = if raw.starts_with("redis://") { + raw + } else { + format!("redis://{}", raw) + }; + Ok(Self { + base_addr, + managers: Mutex::new(StdHashMap::new()), + }) + } + + /// Get or create a ConnectionManager for the given DB index. + async fn manager_for_db(&self, db: u32) -> Result { + { + // Fast path: check existing + let guard = self.managers.lock().await; + if let Some(cm) = guard.get(&db) { + return Ok(cm.clone()); + } + } + + // Slow path: create a new manager and cache it + let url = format!("{}/{}", self.base_addr.trim_end_matches('/'), db); + let client = redis::Client::open(url.as_str())?; + let cm = client.get_connection_manager().await?; + + let mut guard = self.managers.lock().await; + let entry = guard.entry(db).or_insert(cm); + Ok(entry.clone()) + } + + // ----------------------------- + // Generic helpers (serde <-> HSET) + // ----------------------------- + + fn struct_to_hset_pairs(value: &T) -> Result> { + let json = serde_json::to_value(value)?; + let obj = json + .as_object() + .ok_or("Model must serialize to a JSON object")?; + let mut pairs = Vec::with_capacity(obj.len()); + for (k, v) in obj { + let s = match v { + Value::Array(_) | Value::Object(_) => serde_json::to_string(v)?, // complex - store JSON + Value::String(s) => s.clone(), // string - plain + Value::Number(n) => n.to_string(), // number - plain + Value::Bool(b) => b.to_string(), // bool - plain + Value::Null => "null".to_string(), // null sentinel + }; + pairs.push((k.clone(), s)); + } + Ok(pairs) + } + + fn hmap_to_struct(map: StdHashMap) -> Result { + let mut obj = JsonMap::with_capacity(map.len()); + for (k, s) in map { + // Try parse as JSON first (works for arrays, objects, numbers, booleans, null) + // If that fails, fallback to string. + match serde_json::from_str::(&s) { + Ok(v) => { + obj.insert(k, v); + } + Err(_) => { + obj.insert(k, Value::String(s)); + } + } + } + let json = Value::Object(obj); + let model = serde_json::from_value(json)?; + Ok(model) + } + + async fn hset_model(&self, db: u32, key: &str, model: &T) -> Result<()> { + let mut cm = self.manager_for_db(db).await?; + let pairs = Self::struct_to_hset_pairs(model)?; + // Ensure no stale fields + let _: u64 = cm.del(key).await.unwrap_or(0); + // Write all fields + let _: usize = cm.hset_multiple(key, &pairs).await?; + Ok(()) + } + + async fn hget_model(&self, db: u32, key: &str) -> Result { + let mut cm = self.manager_for_db(db).await?; + let map: StdHashMap = cm.hgetall(key).await?; + if map.is_empty() { + return Err(format!("Key not found: {}", key).into()); + } + Self::hmap_to_struct(map) + } + + // ----------------------------- + // Key helpers (canonical keys) + // ----------------------------- + + fn actor_key(id: u32) -> String { + format!("actor:{}", id) + } + + fn context_key(id: u32) -> String { + format!("context:{}", id) + } + + fn flow_key(id: u32) -> String { + format!("flow:{}", id) + } + + fn runner_key(id: u32) -> String { + format!("runner:{}", id) + } + + fn job_key(caller_id: u32, id: u32) -> String { + format!("job:{}:{}", caller_id, id) + } + + fn message_key(caller_id: u32, id: u32) -> String { + format!("message:{}:{}", caller_id, id) + } + + // ----------------------------- + // Context (DB = context.id) + // ----------------------------- + + /// Save a Context in its own DB (db index = context.id) + pub async fn save_context(&self, ctx: &Context) -> Result<()> { + // We don't have field access; compute db and key via JSON to avoid changing model definitions. + // Extract "id" from serialized JSON object. + let json = serde_json::to_value(ctx)?; + let id = json + .get("id") + .and_then(|v| v.as_u64()) + .ok_or("Context.id missing or not a number")? as u32; + let key = Self::context_key(id); + self.hset_model(id, &key, ctx).await + } + + /// Load a Context from its own DB (db index = id) + pub async fn load_context(&self, id: u32) -> Result { + let key = Self::context_key(id); + self.hget_model(id, &key).await + } + + // ----------------------------- + // Actor + // ----------------------------- + + /// Save an Actor to the given DB (tenant/context DB) + pub async fn save_actor(&self, db: u32, actor: &Actor) -> Result<()> { + let json = serde_json::to_value(actor)?; + let id = json + .get("id") + .and_then(|v| v.as_u64()) + .ok_or("Actor.id missing or not a number")? as u32; + let key = Self::actor_key(id); + self.hset_model(db, &key, actor).await + } + + /// Load an Actor by id from the given DB + pub async fn load_actor(&self, db: u32, id: u32) -> Result { + let key = Self::actor_key(id); + self.hget_model(db, &key).await + } + + // ----------------------------- + // Runner + // ----------------------------- + + pub async fn save_runner(&self, db: u32, runner: &Runner) -> Result<()> { + let json = serde_json::to_value(runner)?; + let id = json + .get("id") + .and_then(|v| v.as_u64()) + .ok_or("Runner.id missing or not a number")? as u32; + let key = Self::runner_key(id); + self.hset_model(db, &key, runner).await + } + + pub async fn load_runner(&self, db: u32, id: u32) -> Result { + let key = Self::runner_key(id); + self.hget_model(db, &key).await + } + + // ----------------------------- + // Flow + // ----------------------------- + + pub async fn save_flow(&self, db: u32, flow: &Flow) -> Result<()> { + let json = serde_json::to_value(flow)?; + let id = json + .get("id") + .and_then(|v| v.as_u64()) + .ok_or("Flow.id missing or not a number")? as u32; + let key = Self::flow_key(id); + self.hset_model(db, &key, flow).await + } + + pub async fn load_flow(&self, db: u32, id: u32) -> Result { + let key = Self::flow_key(id); + self.hget_model(db, &key).await + } + + // ----------------------------- + // Job + // ----------------------------- + + pub async fn save_job(&self, db: u32, job: &Job) -> Result<()> { + let json = serde_json::to_value(job)?; + let id = json + .get("id") + .and_then(|v| v.as_u64()) + .ok_or("Job.id missing or not a number")? as u32; + let caller_id = json + .get("caller_id") + .and_then(|v| v.as_u64()) + .ok_or("Job.caller_id missing or not a number")? as u32; + let key = Self::job_key(caller_id, id); + self.hset_model(db, &key, job).await + } + + pub async fn load_job(&self, db: u32, caller_id: u32, id: u32) -> Result { + let key = Self::job_key(caller_id, id); + self.hget_model(db, &key).await + } + + // ----------------------------- + // Message + // ----------------------------- + + pub async fn save_message(&self, db: u32, message: &Message) -> Result<()> { + let json = serde_json::to_value(message)?; + let id = json + .get("id") + .and_then(|v| v.as_u64()) + .ok_or("Message.id missing or not a number")? as u32; + let caller_id = json + .get("caller_id") + .and_then(|v| v.as_u64()) + .ok_or("Message.caller_id missing or not a number")? as u32; + let key = Self::message_key(caller_id, id); + self.hset_model(db, &key, message).await + } + + pub async fn load_message(&self, db: u32, caller_id: u32, id: u32) -> Result { + let key = Self::message_key(caller_id, id); + self.hget_model(db, &key).await + } +} +