heroagent/pkg/servers/ui/models/job_manager.go
2025-05-24 09:24:19 +04:00

349 lines
9.4 KiB
Go

package models
import (
"context"
"fmt"
"log"
"time"
"git.ourworld.tf/herocode/heroagent/pkg/herojobs"
)
// JobManager provides an interface for job management operations
type JobManager interface {
// GetAllJobs returns all jobs
GetAllJobs() ([]*JobInfo, error)
// GetJobsByCircle returns jobs for a specific circle
GetJobsByCircle(circleID string) ([]*JobInfo, error)
// GetJobsByTopic returns jobs for a specific topic
GetJobsByTopic(topic string) ([]*JobInfo, error)
// GetJobsByStatus returns jobs with a specific status
GetJobsByStatus(status herojobs.JobStatus) ([]*JobInfo, error)
// GetJob returns a specific job by ID
GetJob(jobID uint32) (*JobInfo, error)
}
// JobInfo represents job information for the UI
type JobInfo struct {
JobID uint32 `json:"jobid"`
SessionKey string `json:"sessionkey"`
CircleID string `json:"circleid"`
Topic string `json:"topic"`
ParamsType herojobs.ParamsType `json:"params_type"`
Status herojobs.JobStatus `json:"status"`
TimeScheduled time.Time `json:"time_scheduled"`
TimeStart time.Time `json:"time_start"`
TimeEnd time.Time `json:"time_end"`
Duration string `json:"duration"`
Error string `json:"error"`
HasError bool `json:"has_error"`
}
// HeroJobManager implements JobManager interface using herojobs package
type HeroJobManager struct {
factory *herojobs.Factory
ctx context.Context
}
// NewHeroJobManager creates a new HeroJobManager
func NewHeroJobManager(redisURL string) (*HeroJobManager, error) {
factory, err := herojobs.NewFactory(redisURL)
if err != nil {
return nil, fmt.Errorf("failed to create job factory: %w", err)
}
return &HeroJobManager{
factory: factory,
ctx: context.Background(),
}, nil
}
// GetAllJobs returns all jobs from Redis
func (jm *HeroJobManager) GetAllJobs() ([]*JobInfo, error) {
// This is a simplified implementation
// In a real-world scenario, you would need to:
// 1. Get all circles and topics
// 2. For each circle/topic combination, get all jobs
// 3. Combine the results
// For now, we'll just list all job IDs from Redis
jobIDs, err := jm.factory.ListJobs(jm.ctx)
if err != nil {
return nil, fmt.Errorf("failed to list jobs: %w", err)
}
jobs := make([]*JobInfo, 0, len(jobIDs))
for _, jobID := range jobIDs {
// Extract job ID from the key
// Assuming the key format is "job:<id>"
jobData, err := jm.factory.GetJob(jm.ctx, jobID)
if err != nil {
log.Printf("Warning: failed to get job %s: %v", jobID, err)
continue
}
// Parse job data
job, err := herojobs.NewJobFromJSON(jobData)
if err != nil {
log.Printf("Warning: failed to parse job data for %s: %v", jobID, err)
continue
}
// Convert to JobInfo
jobInfo := convertToJobInfo(job)
jobs = append(jobs, jobInfo)
}
return jobs, nil
}
// GetJobsByCircle returns jobs for a specific circle
func (jm *HeroJobManager) GetJobsByCircle(circleID string) ([]*JobInfo, error) {
// Implementation would filter jobs by circle ID
// For now, return all jobs and filter in memory
allJobs, err := jm.GetAllJobs()
if err != nil {
return nil, err
}
filteredJobs := make([]*JobInfo, 0)
for _, job := range allJobs {
if job.CircleID == circleID {
filteredJobs = append(filteredJobs, job)
}
}
return filteredJobs, nil
}
// GetJobsByTopic returns jobs for a specific topic
func (jm *HeroJobManager) GetJobsByTopic(topic string) ([]*JobInfo, error) {
// Implementation would filter jobs by topic
// For now, return all jobs and filter in memory
allJobs, err := jm.GetAllJobs()
if err != nil {
return nil, err
}
filteredJobs := make([]*JobInfo, 0)
for _, job := range allJobs {
if job.Topic == topic {
filteredJobs = append(filteredJobs, job)
}
}
return filteredJobs, nil
}
// GetJobsByStatus returns jobs with a specific status
func (jm *HeroJobManager) GetJobsByStatus(status herojobs.JobStatus) ([]*JobInfo, error) {
// Implementation would filter jobs by status
// For now, return all jobs and filter in memory
allJobs, err := jm.GetAllJobs()
if err != nil {
return nil, err
}
filteredJobs := make([]*JobInfo, 0)
for _, job := range allJobs {
if job.Status == status {
filteredJobs = append(filteredJobs, job)
}
}
return filteredJobs, nil
}
// GetJob returns a specific job by ID
func (jm *HeroJobManager) GetJob(jobID uint32) (*JobInfo, error) {
// Implementation would get a specific job by ID
// This is a placeholder implementation
allJobs, err := jm.GetAllJobs()
if err != nil {
return nil, err
}
for _, job := range allJobs {
if job.JobID == jobID {
return job, nil
}
}
return nil, fmt.Errorf("job not found: %d", jobID)
}
// convertToJobInfo converts a herojobs.Job to a JobInfo
func convertToJobInfo(job *herojobs.Job) *JobInfo {
// Convert Unix timestamps to time.Time
timeScheduled := time.Unix(job.TimeScheduled, 0)
timeStart := time.Time{}
if job.TimeStart > 0 {
timeStart = time.Unix(job.TimeStart, 0)
}
timeEnd := time.Time{}
if job.TimeEnd > 0 {
timeEnd = time.Unix(job.TimeEnd, 0)
}
// Calculate duration
var duration string
if job.TimeStart > 0 {
if job.TimeEnd > 0 {
// Job has completed
duration = time.Unix(job.TimeEnd, 0).Sub(time.Unix(job.TimeStart, 0)).String()
} else {
// Job is still running
duration = time.Since(time.Unix(job.TimeStart, 0)).String()
}
} else {
duration = "Not started"
}
return &JobInfo{
JobID: job.JobID,
SessionKey: job.SessionKey,
CircleID: job.CircleID,
Topic: job.Topic,
ParamsType: job.ParamsType,
Status: job.Status,
TimeScheduled: timeScheduled,
TimeStart: timeStart,
TimeEnd: timeEnd,
Duration: duration,
Error: job.Error,
HasError: job.Error != "",
}
}
// MockJobManager is a mock implementation of JobManager for testing
type MockJobManager struct{}
// NewMockJobManager creates a new MockJobManager
func NewMockJobManager() *MockJobManager {
return &MockJobManager{}
}
// GetAllJobs returns mock jobs
func (m *MockJobManager) GetAllJobs() ([]*JobInfo, error) {
return generateMockJobs(), nil
}
// GetJobsByCircle returns mock jobs for a circle
func (m *MockJobManager) GetJobsByCircle(circleID string) ([]*JobInfo, error) {
allJobs := generateMockJobs()
filteredJobs := make([]*JobInfo, 0)
for _, job := range allJobs {
if job.CircleID == circleID {
filteredJobs = append(filteredJobs, job)
}
}
return filteredJobs, nil
}
// GetJobsByTopic returns mock jobs for a topic
func (m *MockJobManager) GetJobsByTopic(topic string) ([]*JobInfo, error) {
allJobs := generateMockJobs()
filteredJobs := make([]*JobInfo, 0)
for _, job := range allJobs {
if job.Topic == topic {
filteredJobs = append(filteredJobs, job)
}
}
return filteredJobs, nil
}
// GetJobsByStatus returns mock jobs with a status
func (m *MockJobManager) GetJobsByStatus(status herojobs.JobStatus) ([]*JobInfo, error) {
allJobs := generateMockJobs()
filteredJobs := make([]*JobInfo, 0)
for _, job := range allJobs {
if job.Status == status {
filteredJobs = append(filteredJobs, job)
}
}
return filteredJobs, nil
}
// GetJob returns a mock job by ID
func (m *MockJobManager) GetJob(jobID uint32) (*JobInfo, error) {
allJobs := generateMockJobs()
for _, job := range allJobs {
if job.JobID == jobID {
return job, nil
}
}
return nil, fmt.Errorf("job not found: %d", jobID)
}
// generateMockJobs generates mock jobs for testing
func generateMockJobs() []*JobInfo {
now := time.Now()
return []*JobInfo{
{
JobID: 1,
CircleID: "circle1",
Topic: "email",
ParamsType: herojobs.ParamsTypeHeroScript,
Status: herojobs.JobStatusDone,
TimeScheduled: now.Add(-30 * time.Minute),
TimeStart: now.Add(-29 * time.Minute),
TimeEnd: now.Add(-28 * time.Minute),
Duration: "1m0s",
Error: "",
HasError: false,
},
{
JobID: 2,
CircleID: "circle1",
Topic: "backup",
ParamsType: herojobs.ParamsTypeOpenRPC,
Status: herojobs.JobStatusActive,
TimeScheduled: now.Add(-15 * time.Minute),
TimeStart: now.Add(-14 * time.Minute),
TimeEnd: time.Time{},
Duration: "14m0s",
Error: "",
HasError: false,
},
{
JobID: 3,
CircleID: "circle2",
Topic: "sync",
ParamsType: herojobs.ParamsTypeRhaiScript,
Status: herojobs.JobStatusError,
TimeScheduled: now.Add(-45 * time.Minute),
TimeStart: now.Add(-44 * time.Minute),
TimeEnd: now.Add(-43 * time.Minute),
Duration: "1m0s",
Error: "Failed to connect to remote server",
HasError: true,
},
{
JobID: 4,
CircleID: "circle2",
Topic: "email",
ParamsType: herojobs.ParamsTypeHeroScript,
Status: herojobs.JobStatusNew,
TimeScheduled: now.Add(-5 * time.Minute),
TimeStart: time.Time{},
TimeEnd: time.Time{},
Duration: "Not started",
Error: "",
HasError: false,
},
{
JobID: 5,
CircleID: "circle3",
Topic: "ai",
ParamsType: herojobs.ParamsTypeAI,
Status: herojobs.JobStatusDone,
TimeScheduled: now.Add(-60 * time.Minute),
TimeStart: now.Add(-59 * time.Minute),
TimeEnd: now.Add(-40 * time.Minute),
Duration: "19m0s",
Error: "",
HasError: false,
},
}
}