This commit is contained in:
2025-11-23 04:22:25 +01:00
parent 27d2723023
commit 61a3677883
8 changed files with 159 additions and 69 deletions

View File

@@ -10,14 +10,17 @@ import incubaid.herolib.core.logger
import incubaid.herolib.ai.client as aiclient
import incubaid.herolib.core.redisclient
import incubaid.herolib.data.paramsparser
import incubaid.herolib.core.texttools
@[heap]
pub struct Coordinator {
pub mut:
name string
steps map[string]Step
logger logger.Logger
ai aiclient.AIClient
redis ?&redisclient.Redis
name string
current_step string // links to steps dict
steps map[string]&Step
logger logger.Logger
ai aiclient.AIClient
redis ?&redisclient.Redis
}
pub fn new() !Coordinator {
@@ -41,8 +44,8 @@ pub mut:
}
// add step to it
pub fn (mut c Coordinator) step_new(args StepNewArgs) !Step {
return Step{
pub fn (mut c Coordinator) step_new(args StepNewArgs) !&Step {
mut s := Step{
coordinator: &c
name: args.name
description: args.description
@@ -52,4 +55,14 @@ pub fn (mut c Coordinator) step_new(args StepNewArgs) !Step {
error: args.error
params: args.params
}
s.name = texttools.name_fix(s.name)
c.steps[s.name] = &s
c.current_step = s.name
return &s
}
pub fn (mut c Coordinator) step_current() !&Step {
return c.steps[c.current_step] or {
return error('Current step "${c.current_step}" not found in coordinator "${c.name}"')
}
}

95
lib/core/flows/run.v Normal file
View File

@@ -0,0 +1,95 @@
module flows
import time as ostime
// Run the entire flow starting from current_step
pub fn (mut c Coordinator) run() ! {
mut s := c.step_current()!
c.run_step(mut s)!
}
// Run a single step, including error and next steps
pub fn (mut c Coordinator) run_step(mut step Step) ! {
// Initialize step
step.status = .running
step.started_at = ostime.now().unix_milli()
step.store_redis()!
// Log step start
step.log(
level: .info
message: 'Step "${step.name}" started'
)!
// Execute main step function
step.main_step(mut step) or {
// Handle error
step.status = .error
step.error_msg = err.msg()
step.finished_at = ostime.now().unix_milli()
step.store_redis()!
step.log(
level: .error
message: 'Step "${step.name}" failed: ${err.msg()}'
)!
// Run error steps if any
if step.error_steps.len > 0 {
for mut error_step in step.error_steps {
c.run_step(mut error_step)!
}
}
return err
}
// Mark as success
step.status = .success
step.finished_at = ostime.now().unix_milli()
step.store_redis()!
step.log(
level: .info
message: 'Step "${step.name}" completed successfully'
)!
// Run next steps if any
if step.next_steps.len > 0 {
for mut next_step in step.next_steps {
c.run_step(mut next_step)!
}
}
}
// Get step state from redis
pub fn (c Coordinator) get_step_state(step_name string) !map[string]string {
if redis := c.redis {
return redis.hgetall('flow:${c.name}:${step_name}')!
}
return error('Redis not configured')
}
// Get all steps state from redis (for UI dashboard)
pub fn (c Coordinator) get_all_steps_state() ![]map[string]string {
mut states := []map[string]string{}
if redis := c.redis {
pattern := 'flow:${c.name}:*'
keys := redis.keys(pattern)!
for key in keys {
state := redis.hgetall(key)!
states << state
}
}
return states
}
pub fn (c Coordinator) clear_redis() ! {
if redis := c.redis {
pattern := 'flow:${c.name}:*'
keys := redis.keys(pattern)!
for key in keys {
redis.del(key)!
}
}
}

View File

@@ -3,8 +3,20 @@ module flows
import incubaid.herolib.data.paramsparser
import incubaid.herolib.core.logger
pub enum StepStatus {
pending
running
success
error
skipped
}
pub struct Step {
pub mut:
status StepStatus = .pending
started_at i64 // Unix timestamp
finished_at i64
error_msg string
name string
description string
main_step fn (mut s Step) ! @[required]