Compare commits

...

3 Commits

Author SHA1 Message Date
Timur Gordon
7b9420f3e6 Refactor job client into separate crate
- Moved client code from rust/src/client.rs to rust/client/ subdirectory
- Created separate hero-job-client crate for better modularity
- Updated Cargo.toml to include client as workspace member
- Client can now be used independently of the main hero-job crate
2025-11-04 17:09:37 +01:00
Timur Gordon
e3d8147eaa Restructure job crate: separate rust and vlang folders, extract Job types from runner_rust 2025-11-04 13:37:36 +01:00
Timur Gordon
3ba9e84aa0 job crate fixes 2025-09-02 09:14:26 +02:00
8 changed files with 1196 additions and 362 deletions

View File

@@ -1,3 +1,8 @@
# Job
# Hero Job
Job model and client for supervisor
Shared job types and utilities for the Hero ecosystem.
## Structure
- `rust/` - Rust implementation of job types
- `vlang/` - V language implementation (future)

1
priv_key.bin Normal file
View File

@@ -0,0 +1 @@
ߧ<EFBFBD><EFBFBD>i<EFBFBD><<3C><><EFBFBD><EFBFBD>P<EFBFBD>42v<32>JZ<4A>eW#d<><64>E|<7C><1B>

526
rust/Cargo.lock generated Normal file
View File

@@ -0,0 +1,526 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "autocfg"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "block-buffer"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
[[package]]
name = "cc"
version = "1.2.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37521ac7aabe3d13122dc382493e20c9416f299d2ccd5b3a5340a2570cdeb0f3"
dependencies = [
"find-msvc-tools",
"shlex",
]
[[package]]
name = "cfg-if"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
[[package]]
name = "chrono"
version = "0.4.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2"
dependencies = [
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"wasm-bindgen",
"windows-link",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cpufeatures"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
dependencies = [
"libc",
]
[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "digest"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"crypto-common",
]
[[package]]
name = "find-msvc-tools"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127"
[[package]]
name = "generic-array"
version = "0.14.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"wasip2",
]
[[package]]
name = "hero-job"
version = "0.1.0"
dependencies = [
"chrono",
"hex",
"log",
"secp256k1",
"serde",
"serde-wasm-bindgen",
"serde_json",
"sha2",
"thiserror",
"uuid",
"wasm-bindgen",
]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "iana-time-zone"
version = "0.1.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"log",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "itoa"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "js-sys"
version = "0.3.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b011eec8cc36da2aab2d5cff675ec18454fad408585853910a202391cf9f8e65"
dependencies = [
"once_cell",
"wasm-bindgen",
]
[[package]]
name = "libc"
version = "0.2.177"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976"
[[package]]
name = "log"
version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
[[package]]
name = "memchr"
version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "once_cell"
version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "proc-macro2"
version = "1.0.103"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1"
dependencies = [
"proc-macro2",
]
[[package]]
name = "r-efi"
version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "rustversion"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "ryu"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
[[package]]
name = "secp256k1"
version = "0.28.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d24b59d129cdadea20aea4fb2352fa053712e5d713eee47d700cd4b2bc002f10"
dependencies = [
"secp256k1-sys",
]
[[package]]
name = "secp256k1-sys"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d1746aae42c19d583c3c1a8c646bfad910498e2051c551a7f2e3c0c9fbb7eb"
dependencies = [
"cc",
]
[[package]]
name = "serde"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
"serde_derive",
]
[[package]]
name = "serde-wasm-bindgen"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8302e169f0eddcc139c70f139d19d6467353af16f9fce27e8c30158036a1e16b"
dependencies = [
"js-sys",
"serde",
"wasm-bindgen",
]
[[package]]
name = "serde_core"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c"
dependencies = [
"itoa",
"memchr",
"ryu",
"serde",
"serde_core",
]
[[package]]
name = "sha2"
version = "0.10.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "syn"
version = "2.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "typenum"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
[[package]]
name = "unicode-ident"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
[[package]]
name = "uuid"
version = "1.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2"
dependencies = [
"getrandom",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "version_check"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "wasip2"
version = "1.0.1+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7"
dependencies = [
"wit-bindgen",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da95793dfc411fbbd93f5be7715b0578ec61fe87cb1a42b12eb625caa5c5ea60"
dependencies = [
"cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04264334509e04a7bf8690f2384ef5265f05143a4bff3889ab7a3269adab59c2"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "420bc339d9f322e562942d52e115d57e950d12d88983a14c79b86859ee6c7ebc"
dependencies = [
"bumpalo",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76f218a38c84bcb33c25ec7059b07847d465ce0e0a76b995e134a45adcb6af76"
dependencies = [
"unicode-ident",
]
[[package]]
name = "windows-core"
version = "0.62.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
name = "windows-implement"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-interface"
version = "0.59.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-result"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091"
dependencies = [
"windows-link",
]
[[package]]
name = "wit-bindgen"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"

View File

@@ -10,8 +10,14 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.0", features = ["v4"] }
thiserror = "1.0"
redis = { version = "0.25", features = ["aio", "tokio-comp"] }
log = "0.4"
secp256k1 = { version = "0.28", features = ["recovery"] }
sha2 = "0.10"
hex = "0.4"
[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen = "0.2"
serde-wasm-bindgen = "0.6"
[lib]
name = "hero_job"

File diff suppressed because it is too large Load Diff

18
rust/client/Cargo.toml Normal file
View File

@@ -0,0 +1,18 @@
[package]
name = "hero-job-client"
version = "0.1.0"
edition = "2021"
description = "Redis client for Hero job management"
[dependencies]
hero-job = { path = ".." }
redis = { version = "0.25", features = ["aio", "tokio-comp"] }
tokio = { version = "1.0", features = ["rt", "time"] }
chrono = { version = "0.4", features = ["serde"] }
serde_json = "1.0"
thiserror = "1.0"
log = "0.4"
[lib]
name = "hero_job_client"
path = "src/lib.rs"

View File

@@ -2,7 +2,21 @@
use chrono::Utc;
use redis::AsyncCommands;
use crate::{Job, JobStatus, JobError};
use hero_job::{Job, JobStatus, JobError};
use thiserror::Error;
/// Client-specific error types
#[derive(Error, Debug)]
pub enum ClientError {
#[error("Redis error: {0}")]
Redis(#[from] redis::RedisError),
#[error("Job error: {0}")]
Job(#[from] JobError),
#[error("Invalid status: {0}")]
InvalidStatus(String),
#[error("Timeout waiting for job completion")]
Timeout,
}
/// Client for managing jobs in Redis
#[derive(Debug, Clone)]
@@ -40,10 +54,10 @@ impl ClientBuilder {
}
/// Build the client
pub async fn build(self) -> Result<Client, JobError> {
pub async fn build(self) -> Result<Client, ClientError> {
// Create Redis client
let redis_client = redis::Client::open(self.redis_url.as_str())
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
Ok(Client {
redis_client,
@@ -70,14 +84,14 @@ impl Client {
}
/// List all job IDs from Redis
pub async fn list_jobs(&self) -> Result<Vec<String>, JobError> {
pub async fn list_jobs(&self) -> Result<Vec<String>, ClientError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
let keys: Vec<String> = conn.keys(format!("{}:*", &self.jobs_key())).await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
let job_ids: Vec<String> = keys
.into_iter()
.filter_map(|key| {
@@ -129,21 +143,21 @@ impl Client {
pub async fn set_error(&self,
job_id: &str,
error: &str,
) -> Result<(), JobError> {
) -> Result<(), ClientError> {
let job_key = self.job_key(job_id);
let now = Utc::now();
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
conn.hset_multiple(&job_key, &[
let _: () = conn.hset_multiple(&job_key, &[
("error", error),
("status", JobStatus::Error.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
Ok(())
}
@@ -152,20 +166,20 @@ impl Client {
pub async fn set_job_status(&self,
job_id: &str,
status: JobStatus,
) -> Result<(), JobError> {
) -> Result<(), ClientError> {
let job_key = self.job_key(job_id);
let now = Utc::now();
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
conn.hset_multiple(&job_key, &[
let _: () = conn.hset_multiple(&job_key, &[
("status", status.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
Ok(())
}
@@ -173,18 +187,18 @@ impl Client {
pub async fn get_status(
&self,
job_id: &str,
) -> Result<JobStatus, JobError> {
) -> Result<JobStatus, ClientError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
let status_str: Option<String> = conn.hget(&self.job_key(job_id), "status").await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
match status_str {
Some(s) => JobStatus::from_str(&s).ok_or_else(|| JobError::InvalidStatus(s)),
None => Err(JobError::NotFound(job_id.to_string())),
Some(s) => JobStatus::from_str(&s).ok_or_else(|| ClientError::InvalidStatus(s)),
None => Err(ClientError::Job(JobError::NotFound(job_id.to_string()))),
}
}
@@ -192,84 +206,89 @@ impl Client {
pub async fn delete_from_redis(
&self,
job_id: &str,
) -> Result<(), JobError> {
) -> Result<(), ClientError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
let job_key = self.job_key(job_id);
let _: () = conn.del(&job_key).await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
Ok(())
}
/// Store this job in Redis
pub async fn store_job_in_redis(&self, job: &Job) -> Result<(), JobError> {
/// Store this job in Redis with the specified status
pub async fn store_job_in_redis_with_status(&self, job: &Job, status: JobStatus) -> Result<(), ClientError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
let job_key = self.job_key(&job.id);
// Serialize the job data
let job_data = serde_json::to_string(job)
.map_err(|e| JobError::Serialization(e.to_string()))?;
.map_err(|e| JobError::Serialization(e))?;
// Store job data in Redis hash
let _: () = conn.hset_multiple(&job_key, &[
("data", job_data),
("status", JobStatus::Dispatched.as_str().to_string()),
("status", status.as_str().to_string()),
("created_at", job.created_at.to_rfc3339()),
("updated_at", job.updated_at.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
// Set TTL for the job (24 hours)
let _: () = conn.expire(&job_key, 86400).await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
Ok(())
}
/// Store this job in Redis (defaults to Dispatched status for backwards compatibility)
pub async fn store_job_in_redis(&self, job: &Job) -> Result<(), ClientError> {
self.store_job_in_redis_with_status(job, JobStatus::Dispatched).await
}
/// Load a job from Redis by ID
pub async fn load_job_from_redis(
&self,
job_id: &str,
) -> Result<Job, JobError> {
) -> Result<Job, ClientError> {
let job_key = self.job_key(job_id);
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
// Get job data from Redis
let job_data: Option<String> = conn.hget(&job_key, "data").await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
match job_data {
Some(data) => {
let job: Job = serde_json::from_str(&data)
.map_err(|e| JobError::Serialization(e.to_string()))?;
.map_err(|e| JobError::Serialization(e))?;
Ok(job)
}
None => Err(JobError::NotFound(job_id.to_string())),
None => Err(ClientError::Job(JobError::NotFound(job_id.to_string()))),
}
}
/// Delete a job by ID
pub async fn delete_job(&mut self, job_id: &str) -> Result<(), JobError> {
pub async fn delete_job(&mut self, job_id: &str) -> Result<(), ClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
let job_key = self.job_key(job_id);
let deleted_count: i32 = conn.del(&job_key).await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
if deleted_count == 0 {
return Err(JobError::NotFound(job_id.to_string()));
return Err(ClientError::Job(JobError::NotFound(job_id.to_string())));
}
Ok(())
@@ -280,19 +299,19 @@ impl Client {
&self,
job_id: &str,
result: &str,
) -> Result<(), JobError> {
) -> Result<(), ClientError> {
let job_key = self.job_key(&job_id);
let now = Utc::now();
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
let _: () = conn.hset_multiple(&job_key, &[
("result", result),
("status", JobStatus::Finished.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
Ok(())
}
@@ -301,33 +320,154 @@ impl Client {
pub async fn get_result(
&self,
job_id: &str,
) -> Result<Option<String>, JobError> {
) -> Result<Option<String>, ClientError> {
let job_key = self.job_key(job_id);
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
let result: Option<String> = conn.hget(&job_key, "result").await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
Ok(result)
}
/// Get job result from Redis
pub async fn get_error(
&self,
job_id: &str,
) -> Result<Option<String>, ClientError> {
let job_key = self.job_key(job_id);
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| ClientError::Redis(e))?;
let result: Option<String> = conn.hget(&job_key, "error").await
.map_err(|e| ClientError::Redis(e))?;
Ok(result)
}
/// Get a job ID from the work queue (blocking pop)
pub async fn get_job_id(&self, queue_key: &str) -> Result<Option<String>, JobError> {
pub async fn get_job_id(&self, queue_key: &str) -> Result<Option<String>, ClientError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
// Use BRPOP with a short timeout to avoid blocking indefinitely
let result: Option<(String, String)> = conn.brpop(queue_key, 1.0).await
.map_err(|e| JobError::Redis(e))?;
.map_err(|e| ClientError::Redis(e))?;
Ok(result.map(|(_, job_id)| job_id))
}
/// Get a job by ID (alias for load_job_from_redis)
pub async fn get_job(&self, job_id: &str) -> Result<Job, JobError> {
pub async fn get_job(&self, job_id: &str) -> Result<Job, ClientError> {
self.load_job_from_redis(job_id).await
}
/// Dispatch a job to a runner's queue
pub async fn dispatch_job(&self, job_id: &str, runner_name: &str) -> Result<(), ClientError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| ClientError::Redis(e))?;
let queue_key = self.runner_key(runner_name);
// Push job ID to the runner's queue (LPUSH for FIFO with BRPOP)
let _: () = conn.lpush(&queue_key, job_id).await
.map_err(|e| ClientError::Redis(e))?;
Ok(())
}
/// Run a job: dispatch it, wait for completion, and return the result
///
/// This is a convenience method that:
/// 1. Stores the job in Redis
/// 2. Dispatches it to the runner's queue
/// 3. Waits for the job to complete (polls status)
/// 4. Returns the result or error
///
/// # Arguments
/// * `job` - The job to run
/// * `runner_name` - The name of the runner to dispatch to
/// * `timeout_secs` - Maximum time to wait for job completion (in seconds)
///
/// # Returns
/// * `Ok(String)` - The job result if successful
/// * `Err(JobError)` - If the job fails, times out, or encounters an error
pub async fn run_job(
&self,
job: &Job,
runner_name: &str,
timeout_secs: u64,
) -> Result<String, ClientError> {
use tokio::time::{Duration, timeout};
// Store the job in Redis
self.store_job_in_redis(job).await?;
// Dispatch to runner queue
self.dispatch_job(&job.id, runner_name).await?;
// Wait for job to complete with timeout
let result = timeout(
Duration::from_secs(timeout_secs),
self.wait_for_job_completion(&job.id)
).await;
match result {
Ok(Ok(job_result)) => Ok(job_result),
Ok(Err(e)) => Err(e),
Err(_) => Err(ClientError::Timeout),
}
}
/// Wait for a job to complete by polling its status
///
/// This polls the job status every 500ms until it reaches a terminal state
/// (Finished or Error), then returns the result or error.
async fn wait_for_job_completion(&self, job_id: &str) -> Result<String, ClientError> {
use tokio::time::{sleep, Duration};
loop {
// Check job status
let status = self.get_status(job_id).await?;
match status {
JobStatus::Finished => {
// Job completed successfully, get the result
let result = self.get_result(job_id).await?;
return result.ok_or_else(|| {
ClientError::Job(JobError::InvalidData(format!("Job {} finished but has no result", job_id)))
});
}
JobStatus::Error => {
// Job failed, get the error message
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| ClientError::Redis(e))?;
let error_msg: Option<String> = conn
.hget(&self.job_key(job_id), "error")
.await
.map_err(|e| ClientError::Redis(e))?;
return Err(ClientError::Job(JobError::InvalidData(
error_msg.unwrap_or_else(|| format!("Job {} failed with unknown error", job_id))
)));
}
JobStatus::Stopping => {
return Err(ClientError::Job(JobError::InvalidData(format!("Job {} was stopped", job_id))));
}
// Job is still running (Dispatched, WaitingForPrerequisites, Started)
_ => {
// Wait before polling again
sleep(Duration::from_millis(500)).await;
}
}
}
}
}

View File

@@ -1,17 +1,26 @@
use chrono::{DateTime, Utc};
use redis::AsyncCommands;
use chrono::{Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use thiserror::Error;
use uuid::Uuid;
use log::{debug, error};
use log::{error};
pub mod client;
pub use client::Client;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*;
/// Signature for a job - contains the signatory's public key and their signature
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobSignature {
/// Public key of the signatory (hex-encoded secp256k1 public key)
pub public_key: String,
/// Signature (hex-encoded secp256k1 signature)
pub signature: String,
}
/// Job status enumeration
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum JobStatus {
Created,
Dispatched,
WaitingForPrerequisites,
Started,
@@ -23,6 +32,7 @@ pub enum JobStatus {
impl JobStatus {
pub fn as_str(&self) -> &'static str {
match self {
JobStatus::Created => "created",
JobStatus::Dispatched => "dispatched",
JobStatus::WaitingForPrerequisites => "waiting_for_prerequisites",
JobStatus::Started => "started",
@@ -34,6 +44,7 @@ impl JobStatus {
pub fn from_str(s: &str) -> Option<Self> {
match s {
"created" => Some(JobStatus::Created),
"dispatched" => Some(JobStatus::Dispatched),
"waiting_for_prerequisites" => Some(JobStatus::WaitingForPrerequisites),
"started" => Some(JobStatus::Started),
@@ -45,10 +56,8 @@ impl JobStatus {
}
}
/// Representation of a script execution request.
///
/// This structure contains all the information needed to execute a script
/// on a actor service, including the script content, dependencies, and metadata.
/// Job structure representing a unit of work to be executed
#[cfg_attr(target_arch = "wasm32", wasm_bindgen(getter_with_clone))]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Job {
pub id: String,
@@ -58,26 +67,31 @@ pub struct Job {
pub runner: String, // name of the runner to execute this job
pub executor: String, // name of the executor the runner will use to execute this job
pub timeout: u64, // timeout in seconds
#[cfg_attr(target_arch = "wasm32", wasm_bindgen(skip))]
pub env_vars: HashMap<String, String>, // environment variables for script execution
#[cfg_attr(target_arch = "wasm32", wasm_bindgen(skip))]
pub created_at: chrono::DateTime<chrono::Utc>,
#[cfg_attr(target_arch = "wasm32", wasm_bindgen(skip))]
pub updated_at: chrono::DateTime<chrono::Utc>,
/// Signatures from authorized signatories (public keys are included in each signature)
#[cfg_attr(target_arch = "wasm32", wasm_bindgen(skip))]
pub signatures: Vec<JobSignature>,
}
/// Error types for job operations
#[derive(Error, Debug)]
pub enum JobError {
#[error("Redis error: {0}")]
Redis(#[from] redis::RedisError),
#[error("Serialization error: {0}")]
Serialization(String),
Serialization(#[from] serde_json::Error),
#[error("Job not found: {0}")]
NotFound(String),
#[error("Invalid job status: {0}")]
InvalidStatus(String),
#[error("Timeout error: {0}")]
Timeout(String),
#[error("Invalid job data: {0}")]
#[error("Invalid data: {0}")]
InvalidData(String),
#[error("Validation error: {0}")]
Validation(String),
#[error("Signature verification failed: {0}")]
SignatureVerification(String),
}
impl Job {
@@ -101,73 +115,77 @@ impl Job {
env_vars: HashMap::new(),
created_at: now,
updated_at: now,
signatures: Vec::new(),
}
}
/// Update job status in Redis using default client
pub async fn update_status(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
status: JobStatus,
) -> Result<(), JobError> {
let now = Utc::now();
let job_key = format!("hero:job:{}", job_id);
/// Get the canonical representation of the job for signing
/// This creates a deterministic string representation that can be hashed and signed
/// Note: Signatures are excluded from the canonical representation
pub fn canonical_representation(&self) -> String {
// Create a deterministic representation excluding signatures
// Sort env_vars keys for deterministic ordering
let mut env_vars_sorted: Vec<_> = self.env_vars.iter().collect();
env_vars_sorted.sort_by_key(|&(k, _)| k);
let _: () = redis_conn.hset_multiple(&job_key, &[
("status", status.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
format!(
"{}:{}:{}:{}:{}:{}:{}:{:?}",
self.id,
self.caller_id,
self.context_id,
self.payload,
self.runner,
self.executor,
self.timeout,
env_vars_sorted
)
}
/// Set job result in Redis using default client
pub async fn set_result(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
result: &str,
) -> Result<(), JobError> {
let job_key = format!("hero:job:{}", job_id);
let now = Utc::now();
/// Get list of signatory public keys from signatures
pub fn signatories(&self) -> Vec<String> {
self.signatures.iter()
.map(|sig| sig.public_key.clone())
.collect()
}
/// Verify that all signatures are valid
/// Returns Ok(()) if verification passes, Err otherwise
/// Empty signatures list is allowed - loop simply won't execute
pub fn verify_signatures(&self) -> Result<(), JobError> {
use secp256k1::{Message, PublicKey, Secp256k1, ecdsa::Signature};
use sha2::{Sha256, Digest};
let _: () = redis_conn.hset_multiple(&job_key, &[
("result", result),
("status", JobStatus::Finished.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
/// Set job error in Redis using default client
pub async fn set_error(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
error: &str,
) -> Result<(), JobError> {
let job_key = format!("hero:job:{}", job_id);
let now = Utc::now();
// Get the canonical representation and hash it
let canonical = self.canonical_representation();
let mut hasher = Sha256::new();
hasher.update(canonical.as_bytes());
let hash = hasher.finalize();
let secp = Secp256k1::verification_only();
let message = Message::from_digest_slice(&hash)
.map_err(|e| JobError::SignatureVerification(format!("Invalid message: {}", e)))?;
// Verify each signature (if any)
for sig_data in &self.signatures {
// Decode public key
let pubkey_bytes = hex::decode(&sig_data.public_key)
.map_err(|e| JobError::SignatureVerification(format!("Invalid public key hex: {}", e)))?;
let pubkey = PublicKey::from_slice(&pubkey_bytes)
.map_err(|e| JobError::SignatureVerification(format!("Invalid public key: {}", e)))?;
// Decode signature
let sig_bytes = hex::decode(&sig_data.signature)
.map_err(|e| JobError::SignatureVerification(format!("Invalid signature hex: {}", e)))?;
let signature = Signature::from_compact(&sig_bytes)
.map_err(|e| JobError::SignatureVerification(format!("Invalid signature: {}", e)))?;
// Verify signature
secp.verify_ecdsa(&message, &signature, &pubkey)
.map_err(|e| JobError::SignatureVerification(format!("Signature verification failed: {}", e)))?;
}
let _: () = redis_conn.hset_multiple(&job_key, &[
("error", error),
("status", JobStatus::Error.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
/// Delete job from Redis using default client
pub async fn delete(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
) -> Result<(), JobError> {
let job_key = format!("hero:job:{}", job_id);
let _: () = redis_conn.del(&job_key).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
}
/// Builder for constructing job execution requests.
@@ -179,6 +197,7 @@ pub struct JobBuilder {
executor: String,
timeout: u64, // timeout in seconds
env_vars: HashMap<String, String>,
signatures: Vec<JobSignature>,
}
impl JobBuilder {
@@ -191,6 +210,7 @@ impl JobBuilder {
executor: "".to_string(),
timeout: 300, // 5 minutes default
env_vars: HashMap::new(),
signatures: Vec::new(),
}
}
@@ -248,6 +268,27 @@ impl JobBuilder {
self
}
/// Add a signature (public key and signature)
pub fn signature(mut self, public_key: &str, signature: &str) -> Self {
self.signatures.push(JobSignature {
public_key: public_key.to_string(),
signature: signature.to_string(),
});
self
}
/// Set multiple signatures
pub fn signatures(mut self, signatures: Vec<JobSignature>) -> Self {
self.signatures = signatures;
self
}
/// Clear all signatures
pub fn clear_signatures(mut self) -> Self {
self.signatures.clear();
self
}
/// Build the job
pub fn build(self) -> Result<Job, JobError> {
if self.caller_id.is_empty() {
@@ -276,6 +317,7 @@ impl JobBuilder {
job.timeout = self.timeout;
job.env_vars = self.env_vars;
job.signatures = self.signatures;
Ok(job)
}