From 5d241e9adef18230d0577ee538f8fd5e9bc00019 Mon Sep 17 00:00:00 2001 From: despiegk Date: Sat, 24 May 2025 07:09:15 +0400 Subject: [PATCH] ... --- cmd/jobtest/main.go | 83 ++++++ pkg/servers/heroagent/README.md | 151 ++++++++++ pkg/servers/heroagent/config.go | 22 ++ pkg/servers/heroagent/factory.go | 47 +++ pkg/servers/heroagent/jobs.go | 472 +++++++++++++++++++++++++++++++ pkg/servers/heroagent/redis.go | 199 +++++++++++++ scripts/test_jobs.sh | 21 ++ 7 files changed, 995 insertions(+) create mode 100644 cmd/jobtest/main.go create mode 100644 pkg/servers/heroagent/README.md create mode 100644 pkg/servers/heroagent/jobs.go create mode 100644 pkg/servers/heroagent/redis.go create mode 100755 scripts/test_jobs.sh diff --git a/cmd/jobtest/main.go b/cmd/jobtest/main.go new file mode 100644 index 0000000..70fdcce --- /dev/null +++ b/cmd/jobtest/main.go @@ -0,0 +1,83 @@ +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "git.ourworld.tf/herocode/heroagent/pkg/servers/heroagent" +) + +func main() { + log.Println("Starting job management test...") + + // Create a configuration for the server factory + config := heroagent.DefaultConfig() + + // Customize configuration if needed + config.Redis.TCPPort = 6379 + config.Redis.UnixSocketPath = "/tmp/redis.sock" + config.Jobs.OurDBPath = "./data/jobsdb" + config.Jobs.WorkerCount = 3 + config.Jobs.QueuePollInterval = 200 * time.Millisecond + + // Only enable Redis and Jobs for this test + config.EnableRedis = true + config.EnableWebDAV = false + config.EnableUI = false + config.EnableJobs = true + + // Create server factory + factory := heroagent.New(config) + + // Start servers + if err := factory.Start(); err != nil { + log.Fatalf("Failed to start servers: %v", err) + } + + // Get job manager + jobManager := factory.GetJobManager() + if jobManager == nil { + log.Fatalf("Job manager not initialized") + } + + // Create some test jobs + createTestJobs(jobManager) + + // Wait for interrupt signal + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + // Stop servers + if err := factory.Stop(); err != nil { + log.Fatalf("Failed to stop servers: %v", err) + } + + log.Println("Job management test completed") +} + +func createTestJobs(jobManager *heroagent.JobManager) { + // Create a few test jobs with different topics + topics := []string{"email", "notification", "report"} + + for i := 0; i < 5; i++ { + for _, topic := range topics { + // Create job + params := fmt.Sprintf(`{"action": "process", "data": "test data %d for %s"}`, i, topic) + job, err := jobManager.CreateJob(topic, params) + if err != nil { + log.Printf("Failed to create job: %v", err) + continue + } + + log.Printf("Created job %d with topic %s", job.JobID, job.Topic) + } + + // Sleep briefly between batches + time.Sleep(500 * time.Millisecond) + } +} diff --git a/pkg/servers/heroagent/README.md b/pkg/servers/heroagent/README.md new file mode 100644 index 0000000..2a620d6 --- /dev/null +++ b/pkg/servers/heroagent/README.md @@ -0,0 +1,151 @@ +# HeroAgent Server Factory + +The HeroAgent Server Factory is a comprehensive server management system that integrates multiple services: + +- Redis Server +- WebDAV Server +- UI Server +- Job Management System + +## Overview + +The server factory provides a unified interface for starting, managing, and stopping these services. Each service can be enabled or disabled independently through configuration. + +## Job Management System + +The job management system provides a robust solution for handling asynchronous tasks with persistence and reliability. It combines the strengths of OurDB for persistent storage and Redis for active job queuing. + +### Architecture + +The job system follows a specific flow: + +1. **Job Creation**: + - When a job is created, it's stored in both OurDB and Redis + - OurDB provides persistent storage with history tracking + - Redis provides fast access and queuing capabilities + +2. **Job Processing**: + - Workers continuously poll Redis queues for new jobs + - When a job is found, it's updated to "active" status in both OurDB and Redis + - The job is processed based on its parameters + +3. **Job Completion**: + - When a job completes (success or error), it's updated in OurDB + - The job is removed from Redis to keep only active jobs in memory + - This approach ensures efficient memory usage while maintaining a complete history + +### Components + +- **JobManager**: Coordinates job operations between OurDB and Redis +- **RedisJobManager**: Handles Redis-specific operations for jobs +- **JobWorker**: Processes jobs from Redis queues +- **OurDB**: Provides persistent storage for all jobs + +### Job States + +Jobs can be in one of four states: + +- **New**: Job has been created but not yet processed +- **Active**: Job is currently being processed +- **Done**: Job has completed successfully +- **Error**: Job encountered an error during processing + +## Usage + +### Configuration + +```go +config := heroagent.DefaultConfig() + +// Configure Redis +config.Redis.TCPPort = 6379 +config.Redis.UnixSocketPath = "/tmp/redis.sock" + +// Configure job system +config.Jobs.OurDBPath = "./data/jobsdb" +config.Jobs.WorkerCount = 5 +config.Jobs.QueuePollInterval = 100 * time.Millisecond + +// Enable/disable services +config.EnableRedis = true +config.EnableWebDAV = true +config.EnableUI = true +config.EnableJobs = true +``` + +### Starting the Server Factory + +```go +// Create server factory +factory := heroagent.New(config) + +// Start servers +if err := factory.Start(); err != nil { + log.Fatalf("Failed to start servers: %v", err) +} +``` + +### Creating and Managing Jobs + +```go +// Get job manager +jobManager := factory.GetJobManager() + +// Create a job +job, err := jobManager.CreateJob("email", `{"to": "user@example.com", "subject": "Hello"}`) +if err != nil { + log.Fatalf("Failed to create job: %v", err) +} + +// Get job status +job, err = jobManager.GetJob(job.JobID) +if err != nil { + log.Fatalf("Failed to get job: %v", err) +} + +// Update job status +err = jobManager.UpdateJobStatus(job.JobID, heroagent.JobStatusActive) +if err != nil { + log.Fatalf("Failed to update job status: %v", err) +} + +// Complete a job +err = jobManager.CompleteJob(job.JobID, "Job completed successfully") +if err != nil { + log.Fatalf("Failed to complete job: %v", err) +} + +// Mark a job as failed +err = jobManager.FailJob(job.JobID, "Job failed due to network error") +if err != nil { + log.Fatalf("Failed to mark job as failed: %v", err) +} +``` + +### Stopping the Server Factory + +```go +// Stop servers +if err := factory.Stop(); err != nil { + log.Fatalf("Failed to stop servers: %v", err) +} +``` + +## Implementation Details + +### OurDB Integration + +OurDB provides persistent storage for all jobs, including their complete history. It uses an auto-incrementing ID system to assign unique IDs to jobs. + +### Redis Integration + +Redis is used for active job queuing and temporary storage. Jobs are stored in Redis using the following key patterns: + +- Queue keys: `heroqueue:` +- Job storage keys: `herojobs::` + +When a job reaches a terminal state (done or error), it's removed from Redis but remains in OurDB for historical reference. + +### Worker Pool + +The job system uses a configurable worker pool to process jobs concurrently. Each worker polls Redis queues for new jobs and processes them independently. \ No newline at end of file diff --git a/pkg/servers/heroagent/config.go b/pkg/servers/heroagent/config.go index 6f14221..099c2c0 100644 --- a/pkg/servers/heroagent/config.go +++ b/pkg/servers/heroagent/config.go @@ -1,6 +1,8 @@ package heroagent import ( + "time" + "git.ourworld.tf/herocode/heroagent/pkg/servers/ui" "git.ourworld.tf/herocode/heroagent/pkg/servers/webdavserver" ) @@ -16,10 +18,14 @@ type Config struct { // UI server configuration UI UIConfig + // Job management configuration + Jobs JobsConfig + // Enable/disable specific servers EnableRedis bool EnableWebDAV bool EnableUI bool + EnableJobs bool } // RedisConfig holds the configuration for the Redis server @@ -42,6 +48,16 @@ type UIConfig struct { AppConfig ui.AppConfig } +// JobsConfig holds the configuration for the job management system +type JobsConfig struct { + // OurDB configuration + OurDBPath string + + // Job processing configuration + WorkerCount int + QueuePollInterval time.Duration +} + // DefaultConfig returns the default configuration for the HeroAgent func DefaultConfig() Config { return Config{ @@ -56,8 +72,14 @@ func DefaultConfig() Config { Port: "9001", // Port is a string in UIConfig AppConfig: ui.AppConfig{}, }, + Jobs: JobsConfig{ + OurDBPath: "./data/ourdb", + WorkerCount: 5, + QueuePollInterval: 100 * time.Millisecond, + }, EnableRedis: true, EnableWebDAV: true, EnableUI: true, + EnableJobs: true, } } diff --git a/pkg/servers/heroagent/factory.go b/pkg/servers/heroagent/factory.go index e67bd21..70b9ce4 100644 --- a/pkg/servers/heroagent/factory.go +++ b/pkg/servers/heroagent/factory.go @@ -19,6 +19,7 @@ type ServerFactory struct { redisServer *redisserver.Server webdavServer *webdavserver.Server uiApp *AppInstance + jobManager *JobManager // Control channels stopCh chan struct{} @@ -64,6 +65,13 @@ func (f *ServerFactory) Start() error { } } + // Start job manager if enabled + if f.config.EnableJobs { + if err := f.startJobManager(); err != nil { + return fmt.Errorf("failed to start job manager: %w", err) + } + } + log.Println("All servers started successfully") return nil } @@ -82,6 +90,13 @@ func (f *ServerFactory) Stop() error { } } + // Stop job manager if it's running + if f.jobManager != nil { + if err := f.jobManager.Stop(); err != nil { + log.Printf("Error stopping job manager: %v", err) + } + } + // Wait for all goroutines to finish f.wg.Wait() @@ -177,3 +192,35 @@ func (f *ServerFactory) GetWebDAVServer() *webdavserver.Server { func (f *ServerFactory) GetUIApp() *AppInstance { return f.uiApp } + +// startJobManager initializes and starts the job manager +func (f *ServerFactory) startJobManager() error { + log.Println("Starting job manager...") + + // Create Redis connection for job manager + redisConn := &RedisConnection{ + TCPPort: f.config.Redis.TCPPort, + UnixSocketPath: f.config.Redis.UnixSocketPath, + } + + // Create job manager + jobManager, err := NewJobManager(f.config.Jobs, redisConn) + if err != nil { + return fmt.Errorf("failed to create job manager: %w", err) + } + + f.jobManager = jobManager + + // Start job manager + if err := jobManager.Start(); err != nil { + return fmt.Errorf("failed to start job manager: %w", err) + } + + log.Println("Job manager started") + return nil +} + +// GetJobManager returns the job manager instance +func (f *ServerFactory) GetJobManager() *JobManager { + return f.jobManager +} diff --git a/pkg/servers/heroagent/jobs.go b/pkg/servers/heroagent/jobs.go new file mode 100644 index 0000000..8882fd1 --- /dev/null +++ b/pkg/servers/heroagent/jobs.go @@ -0,0 +1,472 @@ +package heroagent + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "sync" + "time" + + "git.ourworld.tf/herocode/heroagent/pkg/data/ourdb" +) + +// JobStatus represents the status of a job +type JobStatus string + +const ( + // JobStatusNew indicates a newly created job + JobStatusNew JobStatus = "new" + // JobStatusActive indicates a job that is currently being processed + JobStatusActive JobStatus = "active" + // JobStatusError indicates a job that encountered an error + JobStatusError JobStatus = "error" + // JobStatusDone indicates a job that has been completed successfully + JobStatusDone JobStatus = "done" +) + +// Job represents a job to be processed +type Job struct { + JobID uint32 `json:"jobid"` + Topic string `json:"topic"` + Params string `json:"params"` + Status JobStatus `json:"status"` + TimeScheduled int64 `json:"time_scheduled"` + TimeStart int64 `json:"time_start"` + TimeEnd int64 `json:"time_end"` + Error string `json:"error"` + Result string `json:"result"` +} + +// JobManager handles job management between OurDB and Redis +type JobManager struct { + config JobsConfig + ourDB *ourdb.OurDB + redisConn *RedisConnection + redisMgr *RedisJobManager + workers []*JobWorker + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// RedisConnection wraps Redis connection details +type RedisConnection struct { + TCPPort int + UnixSocketPath string +} + +// JobWorker represents a worker that processes jobs +type JobWorker struct { + id int + jobMgr *JobManager + ctx context.Context + wg *sync.WaitGroup +} + +// NewJobManager creates a new job manager +func NewJobManager(config JobsConfig, redisConn *RedisConnection) (*JobManager, error) { + // Create OurDB directory if it doesn't exist + if err := os.MkdirAll(config.OurDBPath, 0755); err != nil { + return nil, fmt.Errorf("failed to create OurDB directory: %w", err) + } + + // Initialize OurDB + ourDBConfig := ourdb.DefaultConfig() + ourDBConfig.Path = config.OurDBPath + ourDBConfig.IncrementalMode = true + + db, err := ourdb.New(ourDBConfig) + if err != nil { + return nil, fmt.Errorf("failed to create OurDB: %w", err) + } + + // Create context with cancel + ctx, cancel := context.WithCancel(context.Background()) + + // Initialize Redis job manager + redisMgr, err := NewRedisJobManager(redisConn.TCPPort, redisConn.UnixSocketPath) + if err != nil { + // Close OurDB before returning error + if closeErr := db.Close(); closeErr != nil { + log.Printf("Warning: failed to close OurDB: %v", closeErr) + } + return nil, fmt.Errorf("failed to create Redis job manager: %w", err) + } + + // Create job manager + jobMgr := &JobManager{ + config: config, + ourDB: db, + redisConn: redisConn, + redisMgr: redisMgr, + ctx: ctx, + cancel: cancel, + } + + return jobMgr, nil +} + +// Start starts the job manager +func (jm *JobManager) Start() error { + log.Println("Starting job manager...") + + // Start workers + jm.workers = make([]*JobWorker, jm.config.WorkerCount) + for i := 0; i < jm.config.WorkerCount; i++ { + worker := &JobWorker{ + id: i, + jobMgr: jm, + ctx: jm.ctx, + wg: &jm.wg, + } + jm.workers[i] = worker + jm.startWorker(worker) + } + + log.Printf("Job manager started with %d workers", jm.config.WorkerCount) + return nil +} + +// Stop stops the job manager +func (jm *JobManager) Stop() error { + log.Println("Stopping job manager...") + + // Signal all workers to stop + jm.cancel() + + // Wait for all workers to finish + jm.wg.Wait() + + // Close Redis job manager + if jm.redisMgr != nil { + if err := jm.redisMgr.Close(); err != nil { + log.Printf("Warning: failed to close Redis job manager: %v", err) + } + } + + // Close OurDB + if err := jm.ourDB.Close(); err != nil { + return fmt.Errorf("failed to close OurDB: %w", err) + } + + log.Println("Job manager stopped") + return nil +} + +// startWorker starts a worker +func (jm *JobManager) startWorker(worker *JobWorker) { + jm.wg.Add(1) + go func() { + defer jm.wg.Done() + worker.run() + }() +} + +// run is the main worker loop +func (w *JobWorker) run() { + log.Printf("Worker %d started", w.id) + + ticker := time.NewTicker(w.jobMgr.config.QueuePollInterval) + defer ticker.Stop() + + for { + select { + case <-w.ctx.Done(): + log.Printf("Worker %d stopping", w.id) + return + case <-ticker.C: + // Check for jobs in Redis + if err := w.checkForJobs(); err != nil { + log.Printf("Worker %d error checking for jobs: %v", w.id, err) + } + } + } +} + +// checkForJobs checks for jobs in Redis +func (w *JobWorker) checkForJobs() error { + // Get list of queues + queues, err := w.jobMgr.redisMgr.ListQueues() + if err != nil { + return fmt.Errorf("failed to list queues: %w", err) + } + + // Check each queue for jobs + for _, topic := range queues { + // Try to fetch a job from the queue + job, err := w.jobMgr.redisMgr.FetchNextJob(topic) + if err != nil { + // If queue is empty, continue to next queue + continue + } + + // Process the job + if err := w.processJob(job); err != nil { + log.Printf("Error processing job %d: %v", job.JobID, err) + } + + // Only process one job at a time + return nil + } + + return nil +} + +// processJob processes a job +func (w *JobWorker) processJob(job *Job) error { + log.Printf("Worker %d processing job %d", w.id, job.JobID) + + // Update job status to active + job.Status = JobStatusActive + job.TimeStart = time.Now().Unix() + + // Update job in both OurDB and Redis + if err := w.jobMgr.updateJobInBothStores(job); err != nil { + return fmt.Errorf("failed to update job status: %w", err) + } + + // Simulate job processing + // In a real implementation, this would execute the job based on its parameters + time.Sleep(1 * time.Second) + + // Complete the job + job.Status = JobStatusDone + job.TimeEnd = time.Now().Unix() + job.Result = fmt.Sprintf("Job %d processed successfully", job.JobID) + + // Update job in OurDB and remove from Redis + if err := w.jobMgr.completeJobProcessing(job); err != nil { + return fmt.Errorf("failed to complete job: %w", err) + } + + log.Printf("Worker %d completed job %d", w.id, job.JobID) + return nil +} + +// CreateJob creates a new job +func (jm *JobManager) CreateJob(topic, params string) (*Job, error) { + // Create new job + job := &Job{ + Topic: topic, + Params: params, + Status: JobStatusNew, + TimeScheduled: time.Now().Unix(), + } + + // Store job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return nil, fmt.Errorf("failed to marshal job: %w", err) + } + + // Add job to OurDB with auto-incremented ID + id, err := jm.ourDB.Set(ourdb.OurDBSetArgs{ + Data: jobData, + }) + if err != nil { + return nil, fmt.Errorf("failed to store job in OurDB: %w", err) + } + + // Update job with assigned ID + job.JobID = id + + // Store job in Redis + if err := jm.redisMgr.EnqueueJob(job); err != nil { + return nil, fmt.Errorf("failed to store job in Redis: %w", err) + } + + log.Printf("Job %d created and stored in both OurDB and Redis", job.JobID) + return job, nil +} + +// GetJob retrieves a job by ID +func (jm *JobManager) GetJob(jobID uint32) (*Job, error) { + // Get job from OurDB + jobData, err := jm.ourDB.Get(jobID) + if err != nil { + return nil, fmt.Errorf("failed to get job from OurDB: %w", err) + } + + // Parse job data + job := &Job{} + if err := json.Unmarshal(jobData, job); err != nil { + return nil, fmt.Errorf("failed to unmarshal job data: %w", err) + } + + return job, nil +} + +// UpdateJobStatus updates the status of a job +func (jm *JobManager) UpdateJobStatus(jobID uint32, status JobStatus) error { + // Get job from OurDB + job, err := jm.GetJob(jobID) + if err != nil { + return err + } + + // Update status + job.Status = status + + // Update timestamps based on status + now := time.Now().Unix() + if status == JobStatusActive && job.TimeStart == 0 { + job.TimeStart = now + } else if (status == JobStatusDone || status == JobStatusError) && job.TimeEnd == 0 { + job.TimeEnd = now + } + + // Store updated job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("failed to marshal job: %w", err) + } + + // Update job in OurDB + _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ + ID: &jobID, + Data: jobData, + }) + if err != nil { + return fmt.Errorf("failed to update job in OurDB: %w", err) + } + + // If job is done or has error, remove from Redis + if status == JobStatusDone || status == JobStatusError { + if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { + log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) + } + } else { + // Otherwise, update in Redis + if err := jm.redisMgr.UpdateJobStatus(job); err != nil { + log.Printf("Warning: failed to update job %d in Redis: %v", jobID, err) + } + } + + return nil +} + +// CompleteJob marks a job as completed +func (jm *JobManager) CompleteJob(jobID uint32, result string) error { + // Get job from OurDB + job, err := jm.GetJob(jobID) + if err != nil { + return err + } + + // Update job + job.Status = JobStatusDone + job.TimeEnd = time.Now().Unix() + job.Result = result + + // Store updated job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("failed to marshal job: %w", err) + } + + // Update job in OurDB + _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ + ID: &jobID, + Data: jobData, + }) + if err != nil { + return fmt.Errorf("failed to update job in OurDB: %w", err) + } + + // Remove from Redis + if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { + log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) + } + + log.Printf("Job %d completed and removed from Redis", jobID) + return nil +} + +// FailJob marks a job as failed +func (jm *JobManager) FailJob(jobID uint32, errorMsg string) error { + // Get job from OurDB + job, err := jm.GetJob(jobID) + if err != nil { + return err + } + + // Update job + job.Status = JobStatusError + job.TimeEnd = time.Now().Unix() + job.Error = errorMsg + + // Store updated job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("failed to marshal job: %w", err) + } + + // Update job in OurDB + _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ + ID: &jobID, + Data: jobData, + }) + if err != nil { + return fmt.Errorf("failed to update job in OurDB: %w", err) + } + + // Remove from Redis + if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil { + log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err) + } + + log.Printf("Job %d failed and removed from Redis", jobID) + return nil +} + +// updateJobInBothStores updates a job in both OurDB and Redis +func (jm *JobManager) updateJobInBothStores(job *Job) error { + // Store job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("failed to marshal job: %w", err) + } + + // Update job in OurDB + _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ + ID: &job.JobID, + Data: jobData, + }) + if err != nil { + return fmt.Errorf("failed to update job in OurDB: %w", err) + } + + // Update job in Redis + if err := jm.redisMgr.UpdateJobStatus(job); err != nil { + return fmt.Errorf("failed to update job in Redis: %w", err) + } + + return nil +} + +// completeJobProcessing updates a completed job in OurDB and removes it from Redis +func (jm *JobManager) completeJobProcessing(job *Job) error { + // Store job in OurDB + jobData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("failed to marshal job: %w", err) + } + + // Update job in OurDB + _, err = jm.ourDB.Set(ourdb.OurDBSetArgs{ + ID: &job.JobID, + Data: jobData, + }) + if err != nil { + return fmt.Errorf("failed to update job in OurDB: %w", err) + } + + // Remove from Redis + if err := jm.redisMgr.DeleteJob(job.JobID, job.Topic); err != nil { + return fmt.Errorf("failed to remove job from Redis: %w", err) + } + + return nil +} diff --git a/pkg/servers/heroagent/redis.go b/pkg/servers/heroagent/redis.go new file mode 100644 index 0000000..d7012ff --- /dev/null +++ b/pkg/servers/heroagent/redis.go @@ -0,0 +1,199 @@ +package heroagent + +import ( + "context" + "encoding/json" + "fmt" + "log" + "strconv" + "time" + + "github.com/redis/go-redis/v9" +) + +// RedisJobManager handles Redis operations for jobs +type RedisJobManager struct { + client *redis.Client + ctx context.Context +} + +// NewRedisJobManager creates a new Redis job manager +func NewRedisJobManager(tcpPort int, unixSocketPath string) (*RedisJobManager, error) { + // Determine network type and address + var networkType, addr string + if unixSocketPath != "" { + networkType = "unix" + addr = unixSocketPath + } else { + networkType = "tcp" + addr = fmt.Sprintf("localhost:%d", tcpPort) + } + + // Create Redis client + client := redis.NewClient(&redis.Options{ + Network: networkType, + Addr: addr, + DB: 0, + DialTimeout: 5 * time.Second, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + }) + + // Test connection + ctx := context.Background() + _, err := client.Ping(ctx).Result() + if err != nil { + return nil, fmt.Errorf("failed to connect to Redis: %w", err) + } + + return &RedisJobManager{ + client: client, + ctx: ctx, + }, nil +} + +// Close closes the Redis client +func (r *RedisJobManager) Close() error { + return r.client.Close() +} + +// QueueKey returns the Redis queue key for a topic +func QueueKey(topic string) string { + return fmt.Sprintf("heroqueue:%s", topic) +} + +// StorageKey returns the Redis storage key for a job +func StorageKey(jobID uint32, topic string) string { + return fmt.Sprintf("herojobs:%s:%d", topic, jobID) +} + +// StoreJob stores a job in Redis +func (r *RedisJobManager) StoreJob(job *Job) error { + // Convert job to JSON + jobJSON, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("failed to marshal job: %w", err) + } + + // Store job in Redis + storageKey := StorageKey(job.JobID, job.Topic) + err = r.client.Set(r.ctx, storageKey, jobJSON, 0).Err() + if err != nil { + return fmt.Errorf("failed to store job in Redis: %w", err) + } + + return nil +} + +// EnqueueJob adds a job to its queue +func (r *RedisJobManager) EnqueueJob(job *Job) error { + // Store the job first + if err := r.StoreJob(job); err != nil { + return err + } + + // Add job ID to queue + queueKey := QueueKey(job.Topic) + err := r.client.RPush(r.ctx, queueKey, job.JobID).Err() + if err != nil { + return fmt.Errorf("failed to enqueue job: %w", err) + } + + log.Printf("Job %d enqueued in Redis queue %s", job.JobID, queueKey) + return nil +} + +// GetJob retrieves a job from Redis +func (r *RedisJobManager) GetJob(jobID uint32, topic string) (*Job, error) { + // Get job from Redis + storageKey := StorageKey(jobID, topic) + jobJSON, err := r.client.Get(r.ctx, storageKey).Result() + if err != nil { + if err == redis.Nil { + return nil, fmt.Errorf("job not found: %d", jobID) + } + return nil, fmt.Errorf("failed to get job from Redis: %w", err) + } + + // Parse job JSON + job := &Job{} + if err := json.Unmarshal([]byte(jobJSON), job); err != nil { + return nil, fmt.Errorf("failed to unmarshal job: %w", err) + } + + return job, nil +} + +// DeleteJob deletes a job from Redis +func (r *RedisJobManager) DeleteJob(jobID uint32, topic string) error { + // Delete job from Redis + storageKey := StorageKey(jobID, topic) + err := r.client.Del(r.ctx, storageKey).Err() + if err != nil { + return fmt.Errorf("failed to delete job from Redis: %w", err) + } + + log.Printf("Job %d deleted from Redis", jobID) + return nil +} + +// FetchNextJob fetches the next job from a queue +func (r *RedisJobManager) FetchNextJob(topic string) (*Job, error) { + queueKey := QueueKey(topic) + + // Get and remove first job ID from queue + jobIDStr, err := r.client.LPop(r.ctx, queueKey).Result() + if err != nil { + if err == redis.Nil { + return nil, fmt.Errorf("queue is empty") + } + return nil, fmt.Errorf("failed to fetch job ID from queue: %w", err) + } + + // Convert job ID to uint32 + jobID, err := strconv.ParseUint(jobIDStr, 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid job ID: %s", jobIDStr) + } + + // Get job from Redis + return r.GetJob(uint32(jobID), topic) +} + +// ListQueues lists all job queues +func (r *RedisJobManager) ListQueues() ([]string, error) { + // Get all queue keys + queueKeys, err := r.client.Keys(r.ctx, "heroqueue:*").Result() + if err != nil { + return nil, fmt.Errorf("failed to list queues: %w", err) + } + + // Extract topic names from queue keys + topics := make([]string, 0, len(queueKeys)) + for _, queueKey := range queueKeys { + // Extract topic from queue key (format: heroqueue:) + topic := queueKey[10:] // Skip "heroqueue:" + topics = append(topics, topic) + } + + return topics, nil +} + +// QueueSize returns the size of a queue +func (r *RedisJobManager) QueueSize(topic string) (int64, error) { + queueKey := QueueKey(topic) + + // Get queue size + size, err := r.client.LLen(r.ctx, queueKey).Result() + if err != nil { + return 0, fmt.Errorf("failed to get queue size: %w", err) + } + + return size, nil +} + +// UpdateJobStatus updates the status of a job in Redis +func (r *RedisJobManager) UpdateJobStatus(job *Job) error { + // Update job in Redis + return r.StoreJob(job) +} diff --git a/scripts/test_jobs.sh b/scripts/test_jobs.sh new file mode 100755 index 0000000..2c6c17e --- /dev/null +++ b/scripts/test_jobs.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +# Create necessary directories +mkdir -p data/jobsdb + +# Build the job test +echo "Building job test..." +go build -o bin/jobtest cmd/jobtest/main.go + +# Check if build was successful +if [ $? -ne 0 ]; then + echo "Build failed" + exit 1 +fi + +# Run the job test +echo "Running job test..." +./bin/jobtest + +# Exit with the same status as the job test +exit $? \ No newline at end of file