Compare commits
2 Commits
a16ac8f627
...
b2eb9d3116
Author | SHA1 | Date | |
---|---|---|---|
b2eb9d3116 | |||
79d66e4b6b |
@ -25,9 +25,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/freeflowuniverse/herolauncher/pkg/herolauncher"
|
|
||||||
_ "github.com/freeflowuniverse/herolauncher/pkg/herolauncher/docs" // Import generated swagger docs
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
2
go.mod
2
go.mod
@ -71,7 +71,7 @@ require (
|
|||||||
github.com/metoro-io/mcp-golang v0.8.0 // indirect
|
github.com/metoro-io/mcp-golang v0.8.0 // indirect
|
||||||
github.com/mholt/archiver/v3 v3.5.1 // indirect
|
github.com/mholt/archiver/v3 v3.5.1 // indirect
|
||||||
github.com/nwaples/rardecode v1.1.0 // indirect
|
github.com/nwaples/rardecode v1.1.0 // indirect
|
||||||
github.com/openai/openai-go v0.1.0-beta.9 // indirect
|
github.com/openaiproxy/openaiproxy-go v0.1.0-beta.9 // indirect
|
||||||
github.com/pb33f/libopenapi v0.21.8 // indirect
|
github.com/pb33f/libopenapi v0.21.8 // indirect
|
||||||
github.com/pierrec/lz4/v4 v4.1.2 // indirect
|
github.com/pierrec/lz4/v4 v4.1.2 // indirect
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
|
4
go.sum
4
go.sum
@ -160,8 +160,8 @@ github.com/mholt/archiver/v3 v3.5.1/go.mod h1:e3dqJ7H78uzsRSEACH1joayhuSyhnonssn
|
|||||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||||
github.com/nwaples/rardecode v1.1.0 h1:vSxaY8vQhOcVr4mm5e8XllHWTiM4JF507A0Katqw7MQ=
|
github.com/nwaples/rardecode v1.1.0 h1:vSxaY8vQhOcVr4mm5e8XllHWTiM4JF507A0Katqw7MQ=
|
||||||
github.com/nwaples/rardecode v1.1.0/go.mod h1:5DzqNKiOdpKKBH87u8VlvAnPZMXcGRhxWkRpHbbfGS0=
|
github.com/nwaples/rardecode v1.1.0/go.mod h1:5DzqNKiOdpKKBH87u8VlvAnPZMXcGRhxWkRpHbbfGS0=
|
||||||
github.com/openai/openai-go v0.1.0-beta.9 h1:ABpubc5yU/3ejee2GgRrbFta81SG/d7bQbB8mIdP0Xo=
|
github.com/openaiproxy/openaiproxy-go v0.1.0-beta.9 h1:ABpubc5yU/3ejee2GgRrbFta81SG/d7bQbB8mIdP0Xo=
|
||||||
github.com/openai/openai-go v0.1.0-beta.9/go.mod h1:g461MYGXEXBVdV5SaR/5tNzNbSfwTBBefwc+LlDCK0Y=
|
github.com/openaiproxy/openaiproxy-go v0.1.0-beta.9/go.mod h1:g461MYGXEXBVdV5SaR/5tNzNbSfwTBBefwc+LlDCK0Y=
|
||||||
github.com/pb33f/libopenapi v0.21.8 h1:Fi2dAogMwC6av/5n3YIo7aMOGBZH/fBMO4OnzFB3dQA=
|
github.com/pb33f/libopenapi v0.21.8 h1:Fi2dAogMwC6av/5n3YIo7aMOGBZH/fBMO4OnzFB3dQA=
|
||||||
github.com/pb33f/libopenapi v0.21.8/go.mod h1:Gc8oQkjr2InxwumK0zOBtKN9gIlv9L2VmSVIUk2YxcU=
|
github.com/pb33f/libopenapi v0.21.8/go.mod h1:Gc8oQkjr2InxwumK0zOBtKN9gIlv9L2VmSVIUk2YxcU=
|
||||||
github.com/pierrec/lz4/v4 v4.1.2 h1:qvY3YFXRQE/XB8MlLzJH7mSzBs74eA2gg52YTk6jUPM=
|
github.com/pierrec/lz4/v4 v4.1.2 h1:qvY3YFXRQE/XB8MlLzJH7mSzBs74eA2gg52YTk6jUPM=
|
||||||
|
@ -3,40 +3,40 @@ package handlers
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"strconv" // Added strconv for JobID parsing
|
||||||
|
|
||||||
"github.com/freeflowuniverse/heroagent/pkg/herojobs"
|
"github.com/freeflowuniverse/heroagent/pkg/herojobs"
|
||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HeroJobsClientInterface defines the interface for the HeroJobs client
|
// RedisClientInterface defines the methods JobHandler needs from a HeroJobs Redis client.
|
||||||
type HeroJobsClientInterface interface {
|
type RedisClientInterface interface {
|
||||||
Connect() error
|
StoreJob(job *herojobs.Job) error
|
||||||
Close() error
|
EnqueueJob(job *herojobs.Job) error
|
||||||
SubmitJob(job *herojobs.Job) (*herojobs.Job, error)
|
GetJob(jobID interface{}) (*herojobs.Job, error) // Changed jobID type to interface{}
|
||||||
GetJob(jobID string) (*herojobs.Job, error)
|
ListJobs(circleID, topic string) ([]uint32, error)
|
||||||
DeleteJob(jobID string) error
|
|
||||||
ListJobs(circleID, topic string) ([]string, error)
|
|
||||||
QueueSize(circleID, topic string) (int64, error)
|
QueueSize(circleID, topic string) (int64, error)
|
||||||
QueueEmpty(circleID, topic string) error
|
QueueEmpty(circleID, topic string) error
|
||||||
QueueGet(circleID, topic string) (*herojobs.Job, error)
|
// herojobs.Job also has Load() and Save() methods, but those are on the Job object itself,
|
||||||
CreateJob(circleID, topic, sessionKey, heroScript, rhaiScript string) (*herojobs.Job, error)
|
// not typically part of the client interface unless the client is a facade for all job operations.
|
||||||
}
|
}
|
||||||
|
|
||||||
// JobHandler handles job-related routes
|
// JobHandler handles job-related routes
|
||||||
type JobHandler struct {
|
type JobHandler struct {
|
||||||
client HeroJobsClientInterface
|
client RedisClientInterface // Changed to use the interface
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewJobHandler creates a new JobHandler
|
// NewJobHandler creates a new JobHandler
|
||||||
func NewJobHandler(socketPath string, logger *log.Logger) (*JobHandler, error) {
|
func NewJobHandler(redisAddr string, logger *log.Logger) (*JobHandler, error) {
|
||||||
client, err := herojobs.NewClient(socketPath)
|
redisClient, err := herojobs.NewRedisClient(redisAddr, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create HeroJobs client: %w", err)
|
return nil, fmt.Errorf("failed to create HeroJobs Redis client: %w", err)
|
||||||
}
|
}
|
||||||
|
// *herojobs.RedisClient must implement RedisClientInterface.
|
||||||
|
// This assignment is valid if *herojobs.RedisClient has all methods of RedisClientInterface.
|
||||||
return &JobHandler{
|
return &JobHandler{
|
||||||
client: client,
|
client: redisClient,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@ -76,14 +76,6 @@ func (h *JobHandler) RegisterRoutes(app *fiber.App) {
|
|||||||
// @Router /api/jobs/submit [post]
|
// @Router /api/jobs/submit [post]
|
||||||
// @Router /admin/jobs/submit [post]
|
// @Router /admin/jobs/submit [post]
|
||||||
func (h *JobHandler) submitJob(c *fiber.Ctx) error {
|
func (h *JobHandler) submitJob(c *fiber.Ctx) error {
|
||||||
// Connect to the HeroJobs server
|
|
||||||
if err := h.client.Connect(); err != nil {
|
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
||||||
"error": fmt.Sprintf("Failed to connect to HeroJobs server: %v", err),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
defer h.client.Close()
|
|
||||||
|
|
||||||
// Parse job from request body
|
// Parse job from request body
|
||||||
var job herojobs.Job
|
var job herojobs.Job
|
||||||
if err := c.BodyParser(&job); err != nil {
|
if err := c.BodyParser(&job); err != nil {
|
||||||
@ -92,15 +84,32 @@ func (h *JobHandler) submitJob(c *fiber.Ctx) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Submit job
|
// Save job to OurDB (this assigns/confirms JobID)
|
||||||
submittedJob, err := h.client.SubmitJob(&job)
|
if err := job.Save(); err != nil {
|
||||||
if err != nil {
|
h.logger.Printf("Failed to save job to OurDB: %v", err)
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
||||||
"error": fmt.Sprintf("Failed to submit job: %v", err),
|
"error": fmt.Sprintf("Failed to save job: %v", err),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.JSON(submittedJob)
|
// Store job in Redis
|
||||||
|
if err := h.client.StoreJob(&job); err != nil {
|
||||||
|
h.logger.Printf("Failed to store job in Redis: %v", err)
|
||||||
|
// Attempt to roll back or log, but proceed to enqueue if critical
|
||||||
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
||||||
|
"error": fmt.Sprintf("Failed to store job in Redis: %v", err),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enqueue job in Redis
|
||||||
|
if err := h.client.EnqueueJob(&job); err != nil {
|
||||||
|
h.logger.Printf("Failed to enqueue job in Redis: %v", err)
|
||||||
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
||||||
|
"error": fmt.Sprintf("Failed to enqueue job: %v", err),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.JSON(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Summary Get a job
|
// @Summary Get a job
|
||||||
@ -114,28 +123,36 @@ func (h *JobHandler) submitJob(c *fiber.Ctx) error {
|
|||||||
// @Router /api/jobs/get/{id} [get]
|
// @Router /api/jobs/get/{id} [get]
|
||||||
// @Router /admin/jobs/get/{id} [get]
|
// @Router /admin/jobs/get/{id} [get]
|
||||||
func (h *JobHandler) getJob(c *fiber.Ctx) error {
|
func (h *JobHandler) getJob(c *fiber.Ctx) error {
|
||||||
// Connect to the HeroJobs server
|
|
||||||
if err := h.client.Connect(); err != nil {
|
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
||||||
"error": fmt.Sprintf("Failed to connect to HeroJobs server: %v", err),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
defer h.client.Close()
|
|
||||||
|
|
||||||
// Get job ID from path parameter
|
// Get job ID from path parameter
|
||||||
jobID := c.Params("id")
|
jobIDStr := c.Params("id")
|
||||||
if jobID == "" {
|
if jobIDStr == "" {
|
||||||
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
||||||
"error": "Job ID is required",
|
"error": "Job ID is required",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get job
|
// Convert jobID string to uint32
|
||||||
|
jobID64, err := strconv.ParseUint(jobIDStr, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
||||||
|
"error": fmt.Sprintf("Invalid Job ID format: %s. %v", jobIDStr, err),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
jobID := uint32(jobID64)
|
||||||
|
|
||||||
|
// Get job from Redis first
|
||||||
job, err := h.client.GetJob(jobID)
|
job, err := h.client.GetJob(jobID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
// If not found in Redis (e.g. redis.Nil or other error), try OurDB
|
||||||
"error": fmt.Sprintf("Failed to get job: %v", err),
|
h.logger.Printf("Job %d not found in Redis or error: %v. Trying OurDB.", jobID, err)
|
||||||
})
|
retrievedJob := &herojobs.Job{JobID: jobID}
|
||||||
|
if loadErr := retrievedJob.Load(); loadErr != nil {
|
||||||
|
h.logger.Printf("Failed to load job %d from OurDB: %v", jobID, loadErr)
|
||||||
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
||||||
|
"error": fmt.Sprintf("Failed to get job %d: %v / %v", jobID, err, loadErr),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
job = retrievedJob // Use the job loaded from OurDB
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.JSON(job)
|
return c.JSON(job)
|
||||||
@ -152,32 +169,22 @@ func (h *JobHandler) getJob(c *fiber.Ctx) error {
|
|||||||
// @Router /api/jobs/delete/{id} [delete]
|
// @Router /api/jobs/delete/{id} [delete]
|
||||||
// @Router /admin/jobs/delete/{id} [delete]
|
// @Router /admin/jobs/delete/{id} [delete]
|
||||||
func (h *JobHandler) deleteJob(c *fiber.Ctx) error {
|
func (h *JobHandler) deleteJob(c *fiber.Ctx) error {
|
||||||
// Connect to the HeroJobs server
|
|
||||||
if err := h.client.Connect(); err != nil {
|
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
||||||
"error": fmt.Sprintf("Failed to connect to HeroJobs server: %v", err),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
defer h.client.Close()
|
|
||||||
|
|
||||||
// Get job ID from path parameter
|
// Get job ID from path parameter
|
||||||
jobID := c.Params("id")
|
jobIDStr := c.Params("id")
|
||||||
if jobID == "" {
|
if jobIDStr == "" {
|
||||||
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
||||||
"error": "Job ID is required",
|
"error": "Job ID is required",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete job
|
// Deleting jobs requires removing from OurDB and Redis.
|
||||||
if err := h.client.DeleteJob(jobID); err != nil {
|
// This functionality is not directly provided by RedisClient.DeleteJob
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
// and OurDB job deletion is not specified in README.
|
||||||
"error": fmt.Sprintf("Failed to delete job: %v", err),
|
// For now, returning not implemented.
|
||||||
})
|
h.logger.Printf("Attempt to delete job %s - not implemented", jobIDStr)
|
||||||
}
|
return c.Status(fiber.StatusNotImplemented).JSON(fiber.Map{
|
||||||
|
"error": "Job deletion is not implemented",
|
||||||
return c.JSON(fiber.Map{
|
"message": fmt.Sprintf("Job %s deletion requested but not implemented.", jobIDStr),
|
||||||
"status": "success",
|
|
||||||
"message": fmt.Sprintf("Job %s deleted successfully", jobID),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -193,14 +200,6 @@ func (h *JobHandler) deleteJob(c *fiber.Ctx) error {
|
|||||||
// @Router /api/jobs/list [get]
|
// @Router /api/jobs/list [get]
|
||||||
// @Router /admin/jobs/list [get]
|
// @Router /admin/jobs/list [get]
|
||||||
func (h *JobHandler) listJobs(c *fiber.Ctx) error {
|
func (h *JobHandler) listJobs(c *fiber.Ctx) error {
|
||||||
// Connect to the HeroJobs server
|
|
||||||
if err := h.client.Connect(); err != nil {
|
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
||||||
"error": fmt.Sprintf("Failed to connect to HeroJobs server: %v", err),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
defer h.client.Close()
|
|
||||||
|
|
||||||
// Get parameters from query
|
// Get parameters from query
|
||||||
circleID := c.Query("circleid")
|
circleID := c.Query("circleid")
|
||||||
if circleID == "" {
|
if circleID == "" {
|
||||||
@ -242,14 +241,6 @@ func (h *JobHandler) listJobs(c *fiber.Ctx) error {
|
|||||||
// @Router /api/jobs/queue/size [get]
|
// @Router /api/jobs/queue/size [get]
|
||||||
// @Router /admin/jobs/queue/size [get]
|
// @Router /admin/jobs/queue/size [get]
|
||||||
func (h *JobHandler) queueSize(c *fiber.Ctx) error {
|
func (h *JobHandler) queueSize(c *fiber.Ctx) error {
|
||||||
// Connect to the HeroJobs server
|
|
||||||
if err := h.client.Connect(); err != nil {
|
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
||||||
"error": fmt.Sprintf("Failed to connect to HeroJobs server: %v", err),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
defer h.client.Close()
|
|
||||||
|
|
||||||
// Get parameters from query
|
// Get parameters from query
|
||||||
circleID := c.Query("circleid")
|
circleID := c.Query("circleid")
|
||||||
if circleID == "" {
|
if circleID == "" {
|
||||||
@ -291,14 +282,6 @@ func (h *JobHandler) queueSize(c *fiber.Ctx) error {
|
|||||||
// @Router /api/jobs/queue/empty [post]
|
// @Router /api/jobs/queue/empty [post]
|
||||||
// @Router /admin/jobs/queue/empty [post]
|
// @Router /admin/jobs/queue/empty [post]
|
||||||
func (h *JobHandler) queueEmpty(c *fiber.Ctx) error {
|
func (h *JobHandler) queueEmpty(c *fiber.Ctx) error {
|
||||||
// Connect to the HeroJobs server
|
|
||||||
if err := h.client.Connect(); err != nil {
|
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
||||||
"error": fmt.Sprintf("Failed to connect to HeroJobs server: %v", err),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
defer h.client.Close()
|
|
||||||
|
|
||||||
// Parse parameters from request body
|
// Parse parameters from request body
|
||||||
var params struct {
|
var params struct {
|
||||||
CircleID string `json:"circleid"`
|
CircleID string `json:"circleid"`
|
||||||
@ -347,14 +330,6 @@ func (h *JobHandler) queueEmpty(c *fiber.Ctx) error {
|
|||||||
// @Router /api/jobs/queue/get [get]
|
// @Router /api/jobs/queue/get [get]
|
||||||
// @Router /admin/jobs/queue/get [get]
|
// @Router /admin/jobs/queue/get [get]
|
||||||
func (h *JobHandler) queueGet(c *fiber.Ctx) error {
|
func (h *JobHandler) queueGet(c *fiber.Ctx) error {
|
||||||
// Connect to the HeroJobs server
|
|
||||||
if err := h.client.Connect(); err != nil {
|
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
||||||
"error": fmt.Sprintf("Failed to connect to HeroJobs server: %v", err),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
defer h.client.Close()
|
|
||||||
|
|
||||||
// Get parameters from query
|
// Get parameters from query
|
||||||
circleID := c.Query("circleid")
|
circleID := c.Query("circleid")
|
||||||
if circleID == "" {
|
if circleID == "" {
|
||||||
@ -370,14 +345,40 @@ func (h *JobHandler) queueGet(c *fiber.Ctx) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get job from queue
|
// Get list of job IDs (uint32) from the queue (non-destructive)
|
||||||
job, err := h.client.QueueGet(circleID, topic)
|
jobIDs, err := h.client.ListJobs(circleID, topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
||||||
"error": fmt.Sprintf("Failed to get job from queue: %v", err),
|
"error": fmt.Sprintf("Failed to list jobs in queue: %v", err),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(jobIDs) == 0 {
|
||||||
|
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
|
||||||
|
"error": "Queue is empty or no jobs found",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take the first job ID from the list (it's already uint32)
|
||||||
|
jobIDToFetch := jobIDs[0]
|
||||||
|
|
||||||
|
// Get the actual job details using the ID
|
||||||
|
job, err := h.client.GetJob(jobIDToFetch)
|
||||||
|
if err != nil {
|
||||||
|
// If not found in Redis (e.g. redis.Nil or other error), try OurDB
|
||||||
|
h.logger.Printf("Job %d (from queue list) not found in Redis or error: %v. Trying OurDB.", jobIDToFetch, err)
|
||||||
|
retrievedJob := &herojobs.Job{JobID: jobIDToFetch} // Ensure CircleID and Topic are set if Load needs them
|
||||||
|
retrievedJob.CircleID = circleID // Needed for Load if path depends on it
|
||||||
|
retrievedJob.Topic = topic // Needed for Load if path depends on it
|
||||||
|
if loadErr := retrievedJob.Load(); loadErr != nil {
|
||||||
|
h.logger.Printf("Failed to load job %d from OurDB: %v", jobIDToFetch, loadErr)
|
||||||
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
||||||
|
"error": fmt.Sprintf("Failed to get job %d from queue (Redis err: %v / OurDB err: %v)", jobIDToFetch, err, loadErr),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
job = retrievedJob // Use the job loaded from OurDB
|
||||||
|
}
|
||||||
|
|
||||||
return c.JSON(job)
|
return c.JSON(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -393,51 +394,92 @@ func (h *JobHandler) queueGet(c *fiber.Ctx) error {
|
|||||||
// @Router /api/jobs/create [post]
|
// @Router /api/jobs/create [post]
|
||||||
// @Router /admin/jobs/create [post]
|
// @Router /admin/jobs/create [post]
|
||||||
func (h *JobHandler) createJob(c *fiber.Ctx) error {
|
func (h *JobHandler) createJob(c *fiber.Ctx) error {
|
||||||
// Connect to the HeroJobs server
|
|
||||||
if err := h.client.Connect(); err != nil {
|
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
|
||||||
"error": fmt.Sprintf("Failed to connect to HeroJobs server: %v", err),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
defer h.client.Close()
|
|
||||||
|
|
||||||
// Parse parameters from request body
|
// Parse parameters from request body
|
||||||
var params struct {
|
var reqBody struct {
|
||||||
CircleID string `json:"circleid"`
|
CircleID string `json:"circleid"`
|
||||||
Topic string `json:"topic"`
|
Topic string `json:"topic"`
|
||||||
SessionKey string `json:"sessionkey"`
|
SessionKey string `json:"sessionkey"`
|
||||||
HeroScript string `json:"heroscript"`
|
Params string `json:"params"`
|
||||||
RhaiScript string `json:"rhaiscript"`
|
ParamsType string `json:"paramstype"`
|
||||||
|
Timeout int64 `json:"timeout"` // Optional: allow timeout override
|
||||||
|
Log bool `json:"log"` // Optional: allow log enabling
|
||||||
}
|
}
|
||||||
if err := c.BodyParser(¶ms); err != nil {
|
if err := c.BodyParser(&reqBody); err != nil {
|
||||||
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
||||||
"error": fmt.Sprintf("Failed to parse parameters: %v", err),
|
"error": fmt.Sprintf("Failed to parse parameters: %v", err),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if params.CircleID == "" {
|
if reqBody.CircleID == "" {
|
||||||
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
||||||
"error": "Circle ID is required",
|
"error": "Circle ID is required",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
if reqBody.Topic == "" {
|
||||||
if params.Topic == "" {
|
|
||||||
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
||||||
"error": "Topic is required",
|
"error": "Topic is required",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
if reqBody.Params == "" {
|
||||||
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
||||||
|
"error": "Params are required",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if reqBody.ParamsType == "" {
|
||||||
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
||||||
|
"error": "ParamsType is required",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Create job
|
// Create a new job instance
|
||||||
job, err := h.client.CreateJob(
|
job := herojobs.NewJob() // Initializes with defaults
|
||||||
params.CircleID,
|
job.CircleID = reqBody.CircleID
|
||||||
params.Topic,
|
job.Topic = reqBody.Topic
|
||||||
params.SessionKey,
|
job.SessionKey = reqBody.SessionKey
|
||||||
params.HeroScript,
|
job.Params = reqBody.Params
|
||||||
params.RhaiScript,
|
|
||||||
)
|
// Convert ParamsType string to herojobs.ParamsType
|
||||||
if err != nil {
|
switch herojobs.ParamsType(reqBody.ParamsType) {
|
||||||
|
case herojobs.ParamsTypeHeroScript:
|
||||||
|
job.ParamsType = herojobs.ParamsTypeHeroScript
|
||||||
|
case herojobs.ParamsTypeRhaiScript:
|
||||||
|
job.ParamsType = herojobs.ParamsTypeRhaiScript
|
||||||
|
case herojobs.ParamsTypeOpenRPC:
|
||||||
|
job.ParamsType = herojobs.ParamsTypeOpenRPC
|
||||||
|
case herojobs.ParamsTypeAI:
|
||||||
|
job.ParamsType = herojobs.ParamsTypeAI
|
||||||
|
default:
|
||||||
|
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
|
||||||
|
"error": fmt.Sprintf("Invalid ParamsType: %s", reqBody.ParamsType),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if reqBody.Timeout > 0 {
|
||||||
|
job.Timeout = reqBody.Timeout
|
||||||
|
}
|
||||||
|
job.Log = reqBody.Log
|
||||||
|
|
||||||
|
// Save job to OurDB (this assigns JobID)
|
||||||
|
if err := job.Save(); err != nil {
|
||||||
|
h.logger.Printf("Failed to save new job to OurDB: %v", err)
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
||||||
"error": fmt.Sprintf("Failed to create job: %v", err),
|
"error": fmt.Sprintf("Failed to save new job: %v", err),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store job in Redis
|
||||||
|
if err := h.client.StoreJob(job); err != nil {
|
||||||
|
h.logger.Printf("Failed to store new job in Redis: %v", err)
|
||||||
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
||||||
|
"error": fmt.Sprintf("Failed to store new job in Redis: %v", err),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enqueue job in Redis
|
||||||
|
if err := h.client.EnqueueJob(job); err != nil {
|
||||||
|
h.logger.Printf("Failed to enqueue new job in Redis: %v", err)
|
||||||
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
|
||||||
|
"error": fmt.Sprintf("Failed to enqueue new job: %v", err),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,34 +16,25 @@ import (
|
|||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MockHeroJobsClient is a mock implementation of the HeroJobs client
|
// MockRedisClient is a mock implementation of the RedisClientInterface
|
||||||
type MockHeroJobsClient struct {
|
type MockRedisClient struct {
|
||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect mocks the Connect method
|
// StoreJob mocks the StoreJob method
|
||||||
func (m *MockHeroJobsClient) Connect() error {
|
func (m *MockRedisClient) StoreJob(job *herojobs.Job) error {
|
||||||
args := m.Called()
|
|
||||||
return args.Error(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close mocks the Close method
|
|
||||||
func (m *MockHeroJobsClient) Close() error {
|
|
||||||
args := m.Called()
|
|
||||||
return args.Error(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SubmitJob mocks the SubmitJob method
|
|
||||||
func (m *MockHeroJobsClient) SubmitJob(job *herojobs.Job) (*herojobs.Job, error) {
|
|
||||||
args := m.Called(job)
|
args := m.Called(job)
|
||||||
if args.Get(0) == nil {
|
return args.Error(0)
|
||||||
return nil, args.Error(1)
|
}
|
||||||
}
|
|
||||||
return args.Get(0).(*herojobs.Job), args.Error(1)
|
// EnqueueJob mocks the EnqueueJob method
|
||||||
|
func (m *MockRedisClient) EnqueueJob(job *herojobs.Job) error {
|
||||||
|
args := m.Called(job)
|
||||||
|
return args.Error(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetJob mocks the GetJob method
|
// GetJob mocks the GetJob method
|
||||||
func (m *MockHeroJobsClient) GetJob(jobID string) (*herojobs.Job, error) {
|
func (m *MockRedisClient) GetJob(jobID interface{}) (*herojobs.Job, error) { // jobID is interface{}
|
||||||
args := m.Called(jobID)
|
args := m.Called(jobID)
|
||||||
if args.Get(0) == nil {
|
if args.Get(0) == nil {
|
||||||
return nil, args.Error(1)
|
return nil, args.Error(1)
|
||||||
@ -51,71 +42,54 @@ func (m *MockHeroJobsClient) GetJob(jobID string) (*herojobs.Job, error) {
|
|||||||
return args.Get(0).(*herojobs.Job), args.Error(1)
|
return args.Get(0).(*herojobs.Job), args.Error(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteJob mocks the DeleteJob method
|
|
||||||
func (m *MockHeroJobsClient) DeleteJob(jobID string) error {
|
|
||||||
args := m.Called(jobID)
|
|
||||||
return args.Error(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListJobs mocks the ListJobs method
|
// ListJobs mocks the ListJobs method
|
||||||
func (m *MockHeroJobsClient) ListJobs(circleID, topic string) ([]string, error) {
|
func (m *MockRedisClient) ListJobs(circleID, topic string) ([]uint32, error) { // Returns []uint32
|
||||||
args := m.Called(circleID, topic)
|
args := m.Called(circleID, topic)
|
||||||
if args.Get(0) == nil {
|
if args.Get(0) == nil {
|
||||||
return nil, args.Error(1)
|
return nil, args.Error(1)
|
||||||
}
|
}
|
||||||
return args.Get(0).([]string), args.Error(1)
|
return args.Get(0).([]uint32), args.Error(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueSize mocks the QueueSize method
|
// QueueSize mocks the QueueSize method
|
||||||
func (m *MockHeroJobsClient) QueueSize(circleID, topic string) (int64, error) {
|
func (m *MockRedisClient) QueueSize(circleID, topic string) (int64, error) {
|
||||||
args := m.Called(circleID, topic)
|
args := m.Called(circleID, topic)
|
||||||
|
// Ensure Get(0) is not nil before type assertion if it can be nil in some error cases
|
||||||
|
if args.Get(0) == nil && args.Error(1) != nil { // If error is set, result might be nil
|
||||||
|
return 0, args.Error(1)
|
||||||
|
}
|
||||||
return args.Get(0).(int64), args.Error(1)
|
return args.Get(0).(int64), args.Error(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueEmpty mocks the QueueEmpty method
|
// QueueEmpty mocks the QueueEmpty method
|
||||||
func (m *MockHeroJobsClient) QueueEmpty(circleID, topic string) error {
|
func (m *MockRedisClient) QueueEmpty(circleID, topic string) error {
|
||||||
args := m.Called(circleID, topic)
|
args := m.Called(circleID, topic)
|
||||||
return args.Error(0)
|
return args.Error(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueGet mocks the QueueGet method
|
|
||||||
func (m *MockHeroJobsClient) QueueGet(circleID, topic string) (*herojobs.Job, error) {
|
|
||||||
args := m.Called(circleID, topic)
|
|
||||||
if args.Get(0) == nil {
|
|
||||||
return nil, args.Error(1)
|
|
||||||
}
|
|
||||||
return args.Get(0).(*herojobs.Job), args.Error(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateJob mocks the CreateJob method
|
|
||||||
func (m *MockHeroJobsClient) CreateJob(circleID, topic, sessionKey, heroScript, rhaiScript string) (*herojobs.Job, error) {
|
|
||||||
args := m.Called(circleID, topic, sessionKey, heroScript, rhaiScript)
|
|
||||||
if args.Get(0) == nil {
|
|
||||||
return nil, args.Error(1)
|
|
||||||
}
|
|
||||||
return args.Get(0).(*herojobs.Job), args.Error(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// setupTest initializes a test environment with a mock client
|
// setupTest initializes a test environment with a mock client
|
||||||
func setupTest() (*JobHandler, *MockHeroJobsClient, *fiber.App) {
|
func setupTest() (*JobHandler, *MockRedisClient, *fiber.App) {
|
||||||
mockClient := new(MockHeroJobsClient)
|
mockClient := new(MockRedisClient)
|
||||||
handler := &JobHandler{
|
handler := &JobHandler{
|
||||||
client: mockClient,
|
client: mockClient, // Assign the mock that implements RedisClientInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
app := fiber.New()
|
app := fiber.New()
|
||||||
|
|
||||||
// Register routes
|
// Register routes (ensure these match the actual routes in job_handlers.go)
|
||||||
api := app.Group("/api")
|
apiJobs := app.Group("/api/jobs") // Assuming routes are under /api/jobs
|
||||||
jobs := api.Group("/jobs")
|
apiJobs.Post("/submit", handler.submitJob)
|
||||||
jobs.Post("/create", handler.createJob)
|
apiJobs.Get("/get/:id", handler.getJob) // :id as per job_handlers.go
|
||||||
jobs.Get("/queue/get", handler.queueGet)
|
apiJobs.Delete("/delete/:id", handler.deleteJob) // :id as per job_handlers.go
|
||||||
jobs.Post("/queue/empty", handler.queueEmpty)
|
apiJobs.Get("/list", handler.listJobs)
|
||||||
jobs.Post("/submit", handler.submitJob)
|
apiJobs.Get("/queue/size", handler.queueSize)
|
||||||
jobs.Get("/get/:jobid", handler.getJob)
|
apiJobs.Post("/queue/empty", handler.queueEmpty)
|
||||||
jobs.Delete("/delete/:jobid", handler.deleteJob)
|
apiJobs.Get("/queue/get", handler.queueGet)
|
||||||
jobs.Get("/list", handler.listJobs)
|
apiJobs.Post("/create", handler.createJob)
|
||||||
jobs.Get("/queue/size", handler.queueSize)
|
|
||||||
|
// If admin routes are also tested, they need to be registered here too
|
||||||
|
// adminJobs := app.Group("/admin/jobs")
|
||||||
|
// jobRoutes(adminJobs) // if using the same handler instance
|
||||||
|
|
||||||
return handler, mockClient, app
|
return handler, mockClient, app
|
||||||
}
|
}
|
||||||
@ -134,7 +108,6 @@ func TestQueueEmpty(t *testing.T) {
|
|||||||
name string
|
name string
|
||||||
circleID string
|
circleID string
|
||||||
topic string
|
topic string
|
||||||
connectError error
|
|
||||||
emptyError error
|
emptyError error
|
||||||
expectedStatus int
|
expectedStatus int
|
||||||
expectedBody string
|
expectedBody string
|
||||||
@ -143,25 +116,15 @@ func TestQueueEmpty(t *testing.T) {
|
|||||||
name: "Success",
|
name: "Success",
|
||||||
circleID: "test-circle",
|
circleID: "test-circle",
|
||||||
topic: "test-topic",
|
topic: "test-topic",
|
||||||
connectError: nil,
|
|
||||||
emptyError: nil,
|
emptyError: nil,
|
||||||
expectedStatus: fiber.StatusOK,
|
expectedStatus: fiber.StatusOK,
|
||||||
expectedBody: `{"status":"success","message":"Queue for circle test-circle and topic test-topic emptied successfully"}`,
|
expectedBody: `{"status":"success","message":"Queue for circle test-circle and topic test-topic emptied successfully"}`,
|
||||||
},
|
},
|
||||||
{
|
// Removed "Connection Error" test case as Connect is no longer directly called per op
|
||||||
name: "Connection Error",
|
|
||||||
circleID: "test-circle",
|
|
||||||
topic: "test-topic",
|
|
||||||
connectError: errors.New("connection error"),
|
|
||||||
emptyError: nil,
|
|
||||||
expectedStatus: fiber.StatusInternalServerError,
|
|
||||||
expectedBody: `{"error":"Failed to connect to HeroJobs server: connection error"}`,
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "Empty Error",
|
name: "Empty Error",
|
||||||
circleID: "test-circle",
|
circleID: "test-circle",
|
||||||
topic: "test-topic",
|
topic: "test-topic",
|
||||||
connectError: nil,
|
|
||||||
emptyError: errors.New("empty error"),
|
emptyError: errors.New("empty error"),
|
||||||
expectedStatus: fiber.StatusInternalServerError,
|
expectedStatus: fiber.StatusInternalServerError,
|
||||||
expectedBody: `{"error":"Failed to empty queue: empty error"}`,
|
expectedBody: `{"error":"Failed to empty queue: empty error"}`,
|
||||||
@ -170,7 +133,6 @@ func TestQueueEmpty(t *testing.T) {
|
|||||||
name: "Empty Circle ID",
|
name: "Empty Circle ID",
|
||||||
circleID: "",
|
circleID: "",
|
||||||
topic: "test-topic",
|
topic: "test-topic",
|
||||||
connectError: nil,
|
|
||||||
emptyError: nil,
|
emptyError: nil,
|
||||||
expectedStatus: fiber.StatusBadRequest,
|
expectedStatus: fiber.StatusBadRequest,
|
||||||
expectedBody: `{"error":"Circle ID is required"}`,
|
expectedBody: `{"error":"Circle ID is required"}`,
|
||||||
@ -179,41 +141,22 @@ func TestQueueEmpty(t *testing.T) {
|
|||||||
name: "Empty Topic",
|
name: "Empty Topic",
|
||||||
circleID: "test-circle",
|
circleID: "test-circle",
|
||||||
topic: "",
|
topic: "",
|
||||||
connectError: nil,
|
|
||||||
emptyError: nil,
|
emptyError: nil,
|
||||||
expectedStatus: fiber.StatusBadRequest,
|
expectedStatus: fiber.StatusBadRequest,
|
||||||
expectedBody: `{"error":"Topic is required"}`,
|
expectedBody: `{"error":"Topic is required"}`,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// Create a new mock client for each test
|
// Create a new mock client for each test and setup app
|
||||||
mockClient := new(MockHeroJobsClient)
|
_, mockClient, app := setupTest() // Use setupTest to get handler with mock
|
||||||
|
|
||||||
// Setup mock expectations - Connect is always called in the handler
|
// Setup mock expectations
|
||||||
mockClient.On("Connect").Return(tc.connectError)
|
if tc.circleID != "" && tc.topic != "" { // Only expect call if params are valid
|
||||||
|
|
||||||
// QueueEmpty and Close are only called if Connect succeeds and parameters are valid
|
|
||||||
if tc.connectError == nil && tc.circleID != "" && tc.topic != "" {
|
|
||||||
mockClient.On("QueueEmpty", tc.circleID, tc.topic).Return(tc.emptyError)
|
mockClient.On("QueueEmpty", tc.circleID, tc.topic).Return(tc.emptyError)
|
||||||
mockClient.On("Close").Return(nil)
|
|
||||||
} else {
|
|
||||||
// Close is still called via defer even if we return early
|
|
||||||
mockClient.On("Close").Return(nil).Maybe()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new handler with the mock client
|
|
||||||
handler := &JobHandler{
|
|
||||||
client: mockClient,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new app for each test
|
|
||||||
app := fiber.New()
|
|
||||||
api := app.Group("/api")
|
|
||||||
jobs := api.Group("/jobs")
|
|
||||||
jobs.Post("/queue/empty", handler.queueEmpty)
|
|
||||||
|
|
||||||
// Create request body
|
// Create request body
|
||||||
reqBody := map[string]string{
|
reqBody := map[string]string{
|
||||||
"circleid": tc.circleID,
|
"circleid": tc.circleID,
|
||||||
@ -221,24 +164,24 @@ func TestQueueEmpty(t *testing.T) {
|
|||||||
}
|
}
|
||||||
reqBodyBytes, err := json.Marshal(reqBody)
|
reqBodyBytes, err := json.Marshal(reqBody)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Create test request
|
// Create test request
|
||||||
req, err := createTestRequest(http.MethodPost, "/api/jobs/queue/empty", bytes.NewReader(reqBodyBytes))
|
req, err := createTestRequest(http.MethodPost, "/api/jobs/queue/empty", bytes.NewReader(reqBodyBytes))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
// Perform the request
|
// Perform the request
|
||||||
resp, err := app.Test(req)
|
resp, err := app.Test(req)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Check status code
|
// Check status code
|
||||||
assert.Equal(t, tc.expectedStatus, resp.StatusCode)
|
assert.Equal(t, tc.expectedStatus, resp.StatusCode)
|
||||||
|
|
||||||
// Check response body
|
// Check response body
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.JSONEq(t, tc.expectedBody, string(body))
|
assert.JSONEq(t, tc.expectedBody, string(body))
|
||||||
|
|
||||||
// Verify that all expectations were met
|
// Verify that all expectations were met
|
||||||
mockClient.AssertExpectations(t)
|
mockClient.AssertExpectations(t)
|
||||||
})
|
})
|
||||||
@ -248,61 +191,80 @@ func TestQueueEmpty(t *testing.T) {
|
|||||||
// TestQueueGet tests the queueGet handler
|
// TestQueueGet tests the queueGet handler
|
||||||
func TestQueueGet(t *testing.T) {
|
func TestQueueGet(t *testing.T) {
|
||||||
// Create a test job
|
// Create a test job
|
||||||
testJob := &herojobs.Job{
|
testJob := herojobs.NewJob()
|
||||||
JobID: "test-job-id",
|
testJob.JobID = 10 // This will be a number in JSON
|
||||||
CircleID: "test-circle",
|
testJob.CircleID = "test-circle"
|
||||||
Topic: "test-topic",
|
testJob.Topic = "test-topic"
|
||||||
}
|
testJob.Params = "some script"
|
||||||
|
testJob.ParamsType = herojobs.ParamsTypeHeroScript
|
||||||
|
testJob.Status = herojobs.JobStatusNew
|
||||||
|
|
||||||
// Test cases
|
// Test cases
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
circleID string
|
circleID string
|
||||||
topic string
|
topic string
|
||||||
connectError error
|
listJobsError error
|
||||||
getError error
|
listJobsResp []uint32
|
||||||
getResponse *herojobs.Job
|
getJobError error
|
||||||
|
getJobResp *herojobs.Job
|
||||||
expectedStatus int
|
expectedStatus int
|
||||||
expectedBody string
|
expectedBody string // This will need to be updated to match the actual job structure
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "Success",
|
name: "Success",
|
||||||
circleID: "test-circle",
|
circleID: "test-circle",
|
||||||
topic: "test-topic",
|
topic: "test-topic",
|
||||||
connectError: nil,
|
listJobsError: nil,
|
||||||
getError: nil,
|
listJobsResp: []uint32{10},
|
||||||
getResponse: testJob,
|
getJobError: nil,
|
||||||
|
getJobResp: testJob,
|
||||||
expectedStatus: fiber.StatusOK,
|
expectedStatus: fiber.StatusOK,
|
||||||
// Include all fields in the response, even empty ones
|
expectedBody: `{"jobid":10,"circleid":"test-circle","topic":"test-topic","params":"some script","paramstype":"HeroScript","status":"new","sessionkey":"","result":"","error":"","timeout":60,"log":false,"timescheduled":0,"timestart":0,"timeend":0}`,
|
||||||
expectedBody: `{"jobid":"test-job-id","circleid":"test-circle","topic":"test-topic","error":"","heroscript":"","result":"","rhaiscript":"","sessionkey":"","status":"","time_end":0,"time_scheduled":0,"time_start":0,"timeout":0}`,
|
|
||||||
},
|
},
|
||||||
|
// Removed "Connection Error"
|
||||||
{
|
{
|
||||||
name: "Connection Error",
|
name: "ListJobs Error",
|
||||||
circleID: "test-circle",
|
circleID: "test-circle",
|
||||||
topic: "test-topic",
|
topic: "test-topic",
|
||||||
connectError: errors.New("connection error"),
|
listJobsError: errors.New("list error"),
|
||||||
getError: nil,
|
listJobsResp: nil,
|
||||||
getResponse: nil,
|
getJobError: nil, // Not reached
|
||||||
|
getJobResp: nil, // Not reached
|
||||||
expectedStatus: fiber.StatusInternalServerError,
|
expectedStatus: fiber.StatusInternalServerError,
|
||||||
expectedBody: `{"error":"Failed to connect to HeroJobs server: connection error"}`,
|
expectedBody: `{"error":"Failed to list jobs in queue: list error"}`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Get Error",
|
name: "GetJob Error after ListJobs success",
|
||||||
circleID: "test-circle",
|
circleID: "test-circle",
|
||||||
topic: "test-topic",
|
topic: "test-topic",
|
||||||
connectError: nil,
|
listJobsError: nil,
|
||||||
getError: errors.New("get error"),
|
listJobsResp: []uint32{10},
|
||||||
getResponse: nil,
|
getJobError: errors.New("get error"),
|
||||||
expectedStatus: fiber.StatusInternalServerError,
|
getJobResp: nil,
|
||||||
expectedBody: `{"error":"Failed to get job from queue: get error"}`,
|
expectedStatus: fiber.StatusInternalServerError, // Or based on how GetJob error is handled (e.g. fallback to OurDB)
|
||||||
|
// The error message might be more complex if OurDB load is also attempted and fails
|
||||||
|
expectedBody: `{"error":"Failed to get job 10 from queue (Redis err: get error / OurDB err: record not found)"}`, // Adjusted expected error
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Queue Empty (ListJobs returns empty)",
|
||||||
|
circleID: "test-circle",
|
||||||
|
topic: "test-topic",
|
||||||
|
listJobsError: nil,
|
||||||
|
listJobsResp: []uint32{}, // Empty list
|
||||||
|
getJobError: nil,
|
||||||
|
getJobResp: nil,
|
||||||
|
expectedStatus: fiber.StatusNotFound,
|
||||||
|
expectedBody: `{"error":"Queue is empty or no jobs found"}`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Empty Circle ID",
|
name: "Empty Circle ID",
|
||||||
circleID: "",
|
circleID: "",
|
||||||
topic: "test-topic",
|
topic: "test-topic",
|
||||||
connectError: nil,
|
listJobsError: nil,
|
||||||
getError: nil,
|
listJobsResp: nil,
|
||||||
getResponse: nil,
|
getJobError: nil,
|
||||||
|
getJobResp: nil,
|
||||||
expectedStatus: fiber.StatusBadRequest,
|
expectedStatus: fiber.StatusBadRequest,
|
||||||
expectedBody: `{"error":"Circle ID is required"}`,
|
expectedBody: `{"error":"Circle ID is required"}`,
|
||||||
},
|
},
|
||||||
@ -310,59 +272,50 @@ func TestQueueGet(t *testing.T) {
|
|||||||
name: "Empty Topic",
|
name: "Empty Topic",
|
||||||
circleID: "test-circle",
|
circleID: "test-circle",
|
||||||
topic: "",
|
topic: "",
|
||||||
connectError: nil,
|
listJobsError: nil,
|
||||||
getError: nil,
|
listJobsResp: nil,
|
||||||
getResponse: nil,
|
getJobError: nil,
|
||||||
|
getJobResp: nil,
|
||||||
expectedStatus: fiber.StatusBadRequest,
|
expectedStatus: fiber.StatusBadRequest,
|
||||||
expectedBody: `{"error":"Topic is required"}`,
|
expectedBody: `{"error":"Topic is required"}`,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// Create a new mock client for each test
|
// Create a new mock client for each test and setup app
|
||||||
mockClient := new(MockHeroJobsClient)
|
_, mockClient, app := setupTest()
|
||||||
|
|
||||||
// Setup mock expectations - Connect is always called in the handler
|
// Setup mock expectations
|
||||||
mockClient.On("Connect").Return(tc.connectError)
|
if tc.circleID != "" && tc.topic != "" {
|
||||||
|
mockClient.On("ListJobs", tc.circleID, tc.topic).Return(tc.listJobsResp, tc.listJobsError)
|
||||||
// QueueGet and Close are only called if Connect succeeds and parameters are valid
|
if tc.listJobsError == nil && len(tc.listJobsResp) > 0 {
|
||||||
if tc.connectError == nil && tc.circleID != "" && tc.topic != "" {
|
// Expect GetJob to be called with the first ID from listJobsResp
|
||||||
mockClient.On("QueueGet", tc.circleID, tc.topic).Return(tc.getResponse, tc.getError)
|
// The handler passes uint32 to client.GetJob, which matches interface{}
|
||||||
mockClient.On("Close").Return(nil)
|
mockClient.On("GetJob", tc.listJobsResp[0]).Return(tc.getJobResp, tc.getJobError).Maybe()
|
||||||
} else {
|
// If GetJob from Redis fails, a Load from OurDB is attempted.
|
||||||
// Close is still called via defer even if we return early
|
// We are not mocking job.Load() here as it's on the job object.
|
||||||
mockClient.On("Close").Return(nil).Maybe()
|
// The error message in the test case reflects this potential dual failure.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new handler with the mock client
|
|
||||||
handler := &JobHandler{
|
|
||||||
client: mockClient,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new app for each test
|
|
||||||
app := fiber.New()
|
|
||||||
api := app.Group("/api")
|
|
||||||
jobs := api.Group("/jobs")
|
|
||||||
jobs.Get("/queue/get", handler.queueGet)
|
|
||||||
|
|
||||||
// Create test request
|
// Create test request
|
||||||
path := fmt.Sprintf("/api/jobs/queue/get?circleid=%s&topic=%s", tc.circleID, tc.topic)
|
path := fmt.Sprintf("/api/jobs/queue/get?circleid=%s&topic=%s", tc.circleID, tc.topic)
|
||||||
req, err := createTestRequest(http.MethodGet, path, nil)
|
req, err := createTestRequest(http.MethodGet, path, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Perform the request
|
// Perform the request
|
||||||
resp, err := app.Test(req)
|
resp, err := app.Test(req)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Check status code
|
// Check status code
|
||||||
assert.Equal(t, tc.expectedStatus, resp.StatusCode)
|
assert.Equal(t, tc.expectedStatus, resp.StatusCode)
|
||||||
|
|
||||||
// Check response body
|
// Check response body
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.JSONEq(t, tc.expectedBody, string(body))
|
assert.JSONEq(t, tc.expectedBody, string(body))
|
||||||
|
|
||||||
// Verify that all expectations were met
|
// Verify that all expectations were met
|
||||||
mockClient.AssertExpectations(t)
|
mockClient.AssertExpectations(t)
|
||||||
})
|
})
|
||||||
@ -371,150 +324,149 @@ func TestQueueGet(t *testing.T) {
|
|||||||
|
|
||||||
// TestCreateJob tests the createJob handler
|
// TestCreateJob tests the createJob handler
|
||||||
func TestCreateJob(t *testing.T) {
|
func TestCreateJob(t *testing.T) {
|
||||||
// Create a test job
|
|
||||||
testJob := &herojobs.Job{
|
|
||||||
JobID: "test-job-id",
|
|
||||||
CircleID: "test-circle",
|
|
||||||
Topic: "test-topic",
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test cases
|
// Test cases
|
||||||
|
createdJob := herojobs.NewJob()
|
||||||
|
createdJob.JobID = 10 // Assuming Save will populate this; for mock, we set it
|
||||||
|
createdJob.CircleID = "test-circle"
|
||||||
|
createdJob.Topic = "test-topic"
|
||||||
|
createdJob.SessionKey = "test-key"
|
||||||
|
createdJob.Params = "test-params"
|
||||||
|
createdJob.ParamsType = herojobs.ParamsTypeHeroScript // Match "HeroScript" string
|
||||||
|
createdJob.Status = herojobs.JobStatusNew // Default status after NewJob and Save
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
circleID string
|
reqBody map[string]interface{} // Use map for flexibility
|
||||||
topic string
|
storeError error
|
||||||
sessionKey string
|
enqueueError error
|
||||||
heroScript string
|
|
||||||
rhaiScript string
|
|
||||||
connectError error
|
|
||||||
createError error
|
|
||||||
createResponse *herojobs.Job
|
|
||||||
expectedStatus int
|
expectedStatus int
|
||||||
expectedBody string
|
expectedBody string // Will be the createdJob marshaled
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "Success",
|
name: "Success",
|
||||||
circleID: "test-circle",
|
reqBody: map[string]interface{}{
|
||||||
topic: "test-topic",
|
"circleid": "test-circle",
|
||||||
sessionKey: "test-key",
|
"topic": "test-topic",
|
||||||
heroScript: "test-hero-script",
|
"sessionkey": "test-key",
|
||||||
rhaiScript: "test-rhai-script",
|
"params": "test-params",
|
||||||
connectError: nil,
|
"paramstype": "HeroScript",
|
||||||
createError: nil,
|
"timeout": 30,
|
||||||
createResponse: testJob,
|
"log": true,
|
||||||
|
},
|
||||||
|
storeError: nil,
|
||||||
|
enqueueError: nil,
|
||||||
expectedStatus: fiber.StatusOK,
|
expectedStatus: fiber.StatusOK,
|
||||||
expectedBody: `{"jobid":"test-job-id","circleid":"test-circle","topic":"test-topic","error":"","heroscript":"","result":"","rhaiscript":"","sessionkey":"","status":"","time_end":0,"time_scheduled":0,"time_start":0,"timeout":0}`,
|
// Expected body should match the 'createdJob' structure after Save, Store, Enqueue
|
||||||
|
// JobID is assigned by Save(), which we are not mocking here.
|
||||||
|
// The handler returns the job object.
|
||||||
|
// For the test, we assume Save() works and populates JobID if it were a real DB.
|
||||||
|
// The mock will return the job passed to it.
|
||||||
|
expectedBody: `{"jobid":0,"circleid":"test-circle","topic":"test-topic","params":"test-params","paramstype":"HeroScript","status":"new","sessionkey":"test-key","result":"","error":"","timeout":30,"log":true,"timescheduled":0,"timestart":0,"timeend":0}`,
|
||||||
},
|
},
|
||||||
|
// Removed "Connection Error"
|
||||||
{
|
{
|
||||||
name: "Connection Error",
|
name: "StoreJob Error",
|
||||||
circleID: "test-circle",
|
reqBody: map[string]interface{}{
|
||||||
topic: "test-topic",
|
"circleid": "test-circle", "topic": "test-topic", "params": "p", "paramstype": "HeroScript",
|
||||||
sessionKey: "test-key",
|
},
|
||||||
heroScript: "test-hero-script",
|
storeError: errors.New("store error"),
|
||||||
rhaiScript: "test-rhai-script",
|
enqueueError: nil,
|
||||||
connectError: errors.New("connection error"),
|
|
||||||
createError: nil,
|
|
||||||
createResponse: nil,
|
|
||||||
expectedStatus: fiber.StatusInternalServerError,
|
expectedStatus: fiber.StatusInternalServerError,
|
||||||
expectedBody: `{"error":"Failed to connect to HeroJobs server: connection error"}`,
|
expectedBody: `{"error":"Failed to store new job in Redis: store error"}`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Create Error",
|
name: "EnqueueJob Error",
|
||||||
circleID: "test-circle",
|
reqBody: map[string]interface{}{
|
||||||
topic: "test-topic",
|
"circleid": "test-circle", "topic": "test-topic", "params": "p", "paramstype": "HeroScript",
|
||||||
sessionKey: "test-key",
|
},
|
||||||
heroScript: "test-hero-script",
|
storeError: nil,
|
||||||
rhaiScript: "test-rhai-script",
|
enqueueError: errors.New("enqueue error"),
|
||||||
connectError: nil,
|
|
||||||
createError: errors.New("create error"),
|
|
||||||
createResponse: nil,
|
|
||||||
expectedStatus: fiber.StatusInternalServerError,
|
expectedStatus: fiber.StatusInternalServerError,
|
||||||
expectedBody: `{"error":"Failed to create job: create error"}`,
|
expectedBody: `{"error":"Failed to enqueue new job in Redis: enqueue error"}`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Empty Circle ID",
|
name: "Empty Circle ID",
|
||||||
circleID: "",
|
reqBody: map[string]interface{}{
|
||||||
topic: "test-topic",
|
"circleid": "", "topic": "test-topic", "params": "p", "paramstype": "HeroScript",
|
||||||
sessionKey: "test-key",
|
},
|
||||||
heroScript: "test-hero-script",
|
|
||||||
rhaiScript: "test-rhai-script",
|
|
||||||
connectError: nil,
|
|
||||||
createError: nil,
|
|
||||||
createResponse: nil,
|
|
||||||
expectedStatus: fiber.StatusBadRequest,
|
expectedStatus: fiber.StatusBadRequest,
|
||||||
expectedBody: `{"error":"Circle ID is required"}`,
|
expectedBody: `{"error":"Circle ID is required"}`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Empty Topic",
|
name: "Empty Topic",
|
||||||
circleID: "test-circle",
|
reqBody: map[string]interface{}{
|
||||||
topic: "",
|
"circleid": "c", "topic": "", "params": "p", "paramstype": "HeroScript",
|
||||||
sessionKey: "test-key",
|
},
|
||||||
heroScript: "test-hero-script",
|
|
||||||
rhaiScript: "test-rhai-script",
|
|
||||||
connectError: nil,
|
|
||||||
createError: nil,
|
|
||||||
createResponse: nil,
|
|
||||||
expectedStatus: fiber.StatusBadRequest,
|
expectedStatus: fiber.StatusBadRequest,
|
||||||
expectedBody: `{"error":"Topic is required"}`,
|
expectedBody: `{"error":"Topic is required"}`,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "Empty Params",
|
||||||
|
reqBody: map[string]interface{}{
|
||||||
|
"circleid": "c", "topic": "t", "params": "", "paramstype": "HeroScript",
|
||||||
|
},
|
||||||
|
expectedStatus: fiber.StatusBadRequest,
|
||||||
|
expectedBody: `{"error":"Params are required"}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Empty ParamsType",
|
||||||
|
reqBody: map[string]interface{}{
|
||||||
|
"circleid": "c", "topic": "t", "params": "p", "paramstype": "",
|
||||||
|
},
|
||||||
|
expectedStatus: fiber.StatusBadRequest,
|
||||||
|
expectedBody: `{"error":"ParamsType is required"}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Invalid ParamsType",
|
||||||
|
reqBody: map[string]interface{}{
|
||||||
|
"circleid": "c", "topic": "t", "params": "p", "paramstype": "InvalidType",
|
||||||
|
},
|
||||||
|
expectedStatus: fiber.StatusBadRequest,
|
||||||
|
expectedBody: `{"error":"Invalid ParamsType: InvalidType"}`,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// Create a new mock client for each test
|
_, mockClient, app := setupTest()
|
||||||
mockClient := new(MockHeroJobsClient)
|
|
||||||
|
// Setup mock expectations
|
||||||
// Setup mock expectations - Connect is always called in the handler
|
// job.Save() is called before client interactions. We assume it succeeds for these tests.
|
||||||
mockClient.On("Connect").Return(tc.connectError)
|
// The mock will be called with a job object. We use mock.AnythingOfType for the job
|
||||||
|
// because the JobID might be populated by Save() in a real scenario, making exact match hard.
|
||||||
// CreateJob and Close are only called if Connect succeeds and parameters are valid
|
if tc.reqBody["circleid"] != "" && tc.reqBody["topic"] != "" &&
|
||||||
if tc.connectError == nil && tc.circleID != "" && tc.topic != "" {
|
tc.reqBody["params"] != "" && tc.reqBody["paramstype"] != "" &&
|
||||||
mockClient.On("CreateJob", tc.circleID, tc.topic, tc.sessionKey, tc.heroScript, tc.rhaiScript).Return(tc.createResponse, tc.createError)
|
herojobs.ParamsType(tc.reqBody["paramstype"].(string)) != "" { // Basic validation check
|
||||||
mockClient.On("Close").Return(nil)
|
|
||||||
} else {
|
// We expect StoreJob to be called with a *herojobs.Job.
|
||||||
// Close is still called via defer even if we return early
|
// The actual JobID is set by job.Save() which is not mocked here.
|
||||||
mockClient.On("Close").Return(nil).Maybe()
|
// So we use mock.AnythingOfType to match the argument.
|
||||||
|
mockClient.On("StoreJob", mock.AnythingOfType("*herojobs.Job")).Return(tc.storeError).Once().Maybe()
|
||||||
|
|
||||||
|
if tc.storeError == nil {
|
||||||
|
mockClient.On("EnqueueJob", mock.AnythingOfType("*herojobs.Job")).Return(tc.enqueueError).Once().Maybe()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new handler with the mock client
|
reqBodyBytes, err := json.Marshal(tc.reqBody)
|
||||||
handler := &JobHandler{
|
|
||||||
client: mockClient,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new app for each test
|
|
||||||
app := fiber.New()
|
|
||||||
api := app.Group("/api")
|
|
||||||
jobs := api.Group("/jobs")
|
|
||||||
jobs.Post("/create", handler.createJob)
|
|
||||||
|
|
||||||
// Create request body
|
|
||||||
reqBody := map[string]string{
|
|
||||||
"circleid": tc.circleID,
|
|
||||||
"topic": tc.topic,
|
|
||||||
"sessionkey": tc.sessionKey,
|
|
||||||
"heroscript": tc.heroScript,
|
|
||||||
"rhaiscript": tc.rhaiScript,
|
|
||||||
}
|
|
||||||
reqBodyBytes, err := json.Marshal(reqBody)
|
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Create test request
|
req, err := createTestRequest(http.MethodPost, "/api/jobs/create", bytes.NewReader(reqBodyBytes)) // Use /api/jobs/create
|
||||||
req, err := createTestRequest(http.MethodPost, "/api/jobs/create", bytes.NewReader(reqBodyBytes))
|
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
req.Header.Set("Content-Type", "application/json")
|
// Content-Type is set by createTestRequest
|
||||||
|
|
||||||
// Perform the request
|
// Perform the request
|
||||||
resp, err := app.Test(req)
|
resp, err := app.Test(req)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Check status code
|
// Check status code
|
||||||
assert.Equal(t, tc.expectedStatus, resp.StatusCode)
|
assert.Equal(t, tc.expectedStatus, resp.StatusCode)
|
||||||
|
|
||||||
// Check response body
|
// Check response body
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.JSONEq(t, tc.expectedBody, string(body))
|
assert.JSONEq(t, tc.expectedBody, string(body))
|
||||||
|
|
||||||
// Verify that all expectations were met
|
// Verify that all expectations were met
|
||||||
mockClient.AssertExpectations(t)
|
mockClient.AssertExpectations(t)
|
||||||
})
|
})
|
||||||
@ -523,114 +475,96 @@ func TestCreateJob(t *testing.T) {
|
|||||||
|
|
||||||
// TestSubmitJob tests the submitJob handler
|
// TestSubmitJob tests the submitJob handler
|
||||||
func TestSubmitJob(t *testing.T) {
|
func TestSubmitJob(t *testing.T) {
|
||||||
// Create a test job
|
|
||||||
testJob := &herojobs.Job{
|
|
||||||
JobID: "test-job-id",
|
|
||||||
CircleID: "test-circle",
|
|
||||||
Topic: "test-topic",
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test cases
|
// Test cases
|
||||||
|
submittedJob := herojobs.NewJob()
|
||||||
|
submittedJob.JobID = 10 // Assume Save populates this
|
||||||
|
submittedJob.CircleID = "test-circle"
|
||||||
|
submittedJob.Topic = "test-topic"
|
||||||
|
submittedJob.Params = "submitted params"
|
||||||
|
submittedJob.ParamsType = herojobs.ParamsTypeHeroScript
|
||||||
|
submittedJob.Status = herojobs.JobStatusNew
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
job *herojobs.Job
|
jobToSubmit *herojobs.Job // This is the job in the request body
|
||||||
connectError error
|
storeError error
|
||||||
submitError error
|
enqueueError error
|
||||||
submitResponse *herojobs.Job
|
|
||||||
expectedStatus int
|
expectedStatus int
|
||||||
expectedBody string
|
expectedBody string // Will be the jobToSubmit marshaled (after potential Save)
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "Success",
|
name: "Success",
|
||||||
job: testJob,
|
jobToSubmit: submittedJob,
|
||||||
connectError: nil,
|
storeError: nil,
|
||||||
submitError: nil,
|
enqueueError: nil,
|
||||||
submitResponse: testJob,
|
|
||||||
expectedStatus: fiber.StatusOK,
|
expectedStatus: fiber.StatusOK,
|
||||||
expectedBody: `{"jobid":"test-job-id","circleid":"test-circle","topic":"test-topic","error":"","heroscript":"","result":"","rhaiscript":"","sessionkey":"","status":"","time_end":0,"time_scheduled":0,"time_start":0,"timeout":0}`,
|
// The handler returns the job object from the request after Save(), Store(), Enqueue()
|
||||||
|
// For the mock, the JobID from jobToSubmit will be used.
|
||||||
|
expectedBody: `{"jobid":10,"circleid":"test-circle","topic":"test-topic","params":"submitted params","paramstype":"HeroScript","status":"new","sessionkey":"","result":"","error":"","timeout":60,"log":false,"timescheduled":0,"timestart":0,"timeend":0}`,
|
||||||
},
|
},
|
||||||
|
// Removed "Connection Error"
|
||||||
{
|
{
|
||||||
name: "Connection Error",
|
name: "StoreJob Error",
|
||||||
job: testJob,
|
jobToSubmit: submittedJob,
|
||||||
connectError: errors.New("connection error"),
|
storeError: errors.New("store error"),
|
||||||
submitError: nil,
|
enqueueError: nil,
|
||||||
submitResponse: nil,
|
|
||||||
expectedStatus: fiber.StatusInternalServerError,
|
expectedStatus: fiber.StatusInternalServerError,
|
||||||
expectedBody: `{"error":"Failed to connect to HeroJobs server: connection error"}`,
|
expectedBody: `{"error":"Failed to store job in Redis: store error"}`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Submit Error",
|
name: "EnqueueJob Error",
|
||||||
job: testJob,
|
jobToSubmit: submittedJob,
|
||||||
connectError: nil,
|
storeError: nil,
|
||||||
submitError: errors.New("submit error"),
|
enqueueError: errors.New("enqueue error"),
|
||||||
submitResponse: nil,
|
|
||||||
expectedStatus: fiber.StatusInternalServerError,
|
expectedStatus: fiber.StatusInternalServerError,
|
||||||
expectedBody: `{"error":"Failed to submit job: submit error"}`,
|
expectedBody: `{"error":"Failed to enqueue job: enqueue error"}`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Empty Job",
|
name: "Empty Job in request (parsing error)",
|
||||||
job: nil,
|
jobToSubmit: nil, // Simulates empty or malformed request body
|
||||||
connectError: nil,
|
|
||||||
submitError: nil,
|
|
||||||
submitResponse: nil,
|
|
||||||
expectedStatus: fiber.StatusBadRequest,
|
expectedStatus: fiber.StatusBadRequest,
|
||||||
expectedBody: `{"error":"Failed to parse job data: unexpected end of JSON input"}`,
|
expectedBody: `{"error":"Failed to parse job data: unexpected end of JSON input"}`, // Or similar based on actual parsing
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// Create a new mock client for each test
|
_, mockClient, app := setupTest()
|
||||||
mockClient := new(MockHeroJobsClient)
|
|
||||||
|
// Setup mock expectations
|
||||||
// Setup mock expectations - Connect is always called in the handler
|
// job.Save() is called before client interactions.
|
||||||
mockClient.On("Connect").Return(tc.connectError)
|
if tc.jobToSubmit != nil { // If job is parsable from request
|
||||||
|
// We expect StoreJob to be called with the job from the request.
|
||||||
// SubmitJob and Close are only called if Connect succeeds and job is not nil
|
// The JobID might be modified by Save() in a real scenario.
|
||||||
if tc.connectError == nil && tc.job != nil {
|
mockClient.On("StoreJob", tc.jobToSubmit).Return(tc.storeError).Once().Maybe()
|
||||||
mockClient.On("SubmitJob", tc.job).Return(tc.submitResponse, tc.submitError)
|
if tc.storeError == nil {
|
||||||
mockClient.On("Close").Return(nil)
|
mockClient.On("EnqueueJob", tc.jobToSubmit).Return(tc.enqueueError).Once().Maybe()
|
||||||
} else {
|
}
|
||||||
// Close is still called via defer even if we return early
|
|
||||||
mockClient.On("Close").Return(nil).Maybe()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new handler with the mock client
|
|
||||||
handler := &JobHandler{
|
|
||||||
client: mockClient,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new app for each test
|
|
||||||
app := fiber.New()
|
|
||||||
api := app.Group("/api")
|
|
||||||
jobs := api.Group("/jobs")
|
|
||||||
jobs.Post("/submit", handler.submitJob)
|
|
||||||
|
|
||||||
// Create request body
|
|
||||||
var reqBodyBytes []byte
|
var reqBodyBytes []byte
|
||||||
var err error
|
var err error
|
||||||
if tc.job != nil {
|
if tc.jobToSubmit != nil {
|
||||||
reqBodyBytes, err = json.Marshal(tc.job)
|
reqBodyBytes, err = json.Marshal(tc.jobToSubmit)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create test request
|
req, err := createTestRequest(http.MethodPost, "/api/jobs/submit", bytes.NewReader(reqBodyBytes)) // Use /api/jobs/submit
|
||||||
req, err := createTestRequest(http.MethodPost, "/api/jobs/submit", bytes.NewReader(reqBodyBytes))
|
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
req.Header.Set("Content-Type", "application/json")
|
// Content-Type is set by createTestRequest
|
||||||
|
|
||||||
// Perform the request
|
// Perform the request
|
||||||
resp, err := app.Test(req)
|
resp, err := app.Test(req)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Check status code
|
// Check status code
|
||||||
assert.Equal(t, tc.expectedStatus, resp.StatusCode)
|
assert.Equal(t, tc.expectedStatus, resp.StatusCode)
|
||||||
|
|
||||||
// Check response body
|
// Check response body
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.JSONEq(t, tc.expectedBody, string(body))
|
assert.JSONEq(t, tc.expectedBody, string(body))
|
||||||
|
|
||||||
// Verify that all expectations were met
|
// Verify that all expectations were met
|
||||||
mockClient.AssertExpectations(t)
|
mockClient.AssertExpectations(t)
|
||||||
})
|
})
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -15,8 +15,8 @@ type JobDisplayInfo struct {
|
|||||||
Topic string `json:"topic"`
|
Topic string `json:"topic"`
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
SessionKey string `json:"sessionkey"`
|
SessionKey string `json:"sessionkey"`
|
||||||
HeroScript string `json:"heroscript"`
|
Params string `json:"params"`
|
||||||
RhaiScript string `json:"rhaiscript"`
|
ParamsType string `json:"paramstype"`
|
||||||
Result string `json:"result"`
|
Result string `json:"result"`
|
||||||
Error string `json:"error"`
|
Error string `json:"error"`
|
||||||
TimeScheduled int64 `json:"time_scheduled"`
|
TimeScheduled int64 `json:"time_scheduled"`
|
||||||
@ -27,15 +27,17 @@ type JobDisplayInfo struct {
|
|||||||
|
|
||||||
// JobHandler handles job-related page routes
|
// JobHandler handles job-related page routes
|
||||||
type JobHandler struct {
|
type JobHandler struct {
|
||||||
client *herojobs.Client
|
client *herojobs.RedisClient
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewJobHandler creates a new job handler with the provided socket path
|
// NewJobHandler creates a new job handler with the provided socket path
|
||||||
func NewJobHandler(socketPath string, logger *log.Logger) (*JobHandler, error) {
|
func NewJobHandler(redisAddr string, logger *log.Logger) (*JobHandler, error) {
|
||||||
client, err := herojobs.NewClient(socketPath)
|
// Assuming SSL is false as per README example herojobs.NewRedisClient("localhost:6379", false)
|
||||||
|
// This might need to be configurable later.
|
||||||
|
client, err := herojobs.NewRedisClient(redisAddr, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create HeroJobs client: %w", err)
|
return nil, fmt.Errorf("failed to create HeroJobs Redis client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &JobHandler{
|
return &JobHandler{
|
||||||
@ -50,7 +52,7 @@ func (h *JobHandler) RegisterRoutes(app *fiber.App) {
|
|||||||
jobs := app.Group("/jobs")
|
jobs := app.Group("/jobs")
|
||||||
jobs.Get("/", h.getJobsPage)
|
jobs.Get("/", h.getJobsPage)
|
||||||
jobs.Get("/list", h.getJobsList)
|
jobs.Get("/list", h.getJobsList)
|
||||||
|
|
||||||
// Register the same routes under /admin/jobs for consistency
|
// Register the same routes under /admin/jobs for consistency
|
||||||
adminJobs := app.Group("/admin/jobs")
|
adminJobs := app.Group("/admin/jobs")
|
||||||
adminJobs.Get("/", h.getJobsPage)
|
adminJobs.Get("/", h.getJobsPage)
|
||||||
@ -59,18 +61,13 @@ func (h *JobHandler) RegisterRoutes(app *fiber.App) {
|
|||||||
|
|
||||||
// getJobsPage renders the jobs page
|
// getJobsPage renders the jobs page
|
||||||
func (h *JobHandler) getJobsPage(c *fiber.Ctx) error {
|
func (h *JobHandler) getJobsPage(c *fiber.Ctx) error {
|
||||||
// Check if we can connect to the HeroJobs server
|
// Assuming h.client (RedisClient) is valid if NewJobHandler succeeded.
|
||||||
var warning string
|
// The client is connected on creation. A Ping method could be used here for a health check if available.
|
||||||
if err := h.client.Connect(); err != nil {
|
// The previous connect/close logic per-request is removed.
|
||||||
warning = "Could not connect to HeroJobs server: " + err.Error()
|
var warning string // This will be empty unless a new check (e.g., Ping) sets it.
|
||||||
h.logger.Printf("Warning: %s", warning)
|
|
||||||
} else {
|
|
||||||
h.client.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.Render("admin/jobs", fiber.Map{
|
return c.Render("admin/jobs", fiber.Map{
|
||||||
"title": "Jobs",
|
"title": "Jobs",
|
||||||
"warning": warning,
|
"warning": warning, // warning will be empty for now
|
||||||
"error": "",
|
"error": "",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -100,20 +97,18 @@ func (h *JobHandler) getJobsList(c *fiber.Ctx) error {
|
|||||||
|
|
||||||
// getJobsData gets job data from the HeroJobs server
|
// getJobsData gets job data from the HeroJobs server
|
||||||
func (h *JobHandler) getJobsData(circleID, topic string) ([]JobDisplayInfo, error) {
|
func (h *JobHandler) getJobsData(circleID, topic string) ([]JobDisplayInfo, error) {
|
||||||
// Connect to the HeroJobs server
|
// Assuming h.client (RedisClient) is already connected (established by NewJobHandler).
|
||||||
if err := h.client.Connect(); err != nil {
|
// It should not be closed here as it's a long-lived client.
|
||||||
return nil, fmt.Errorf("failed to connect to HeroJobs server: %w", err)
|
// Connect() and Close() calls per-request are removed.
|
||||||
}
|
|
||||||
defer h.client.Close()
|
|
||||||
|
|
||||||
// If circleID and topic are not provided, try to list all jobs
|
// If circleID and topic are not provided, try to list all jobs
|
||||||
if circleID == "" && topic == "" {
|
if circleID == "" && topic == "" {
|
||||||
// Try to get some default jobs
|
// Try to get some default jobs
|
||||||
defaultCircles := []string{"default", "system"}
|
defaultCircles := []string{"default", "system"}
|
||||||
defaultTopics := []string{"default", "system"}
|
defaultTopics := []string{"default", "system"}
|
||||||
|
|
||||||
var allJobs []JobDisplayInfo
|
var allJobs []JobDisplayInfo
|
||||||
|
|
||||||
// Try each combination
|
// Try each combination
|
||||||
for _, circle := range defaultCircles {
|
for _, circle := range defaultCircles {
|
||||||
for _, t := range defaultTopics {
|
for _, t := range defaultTopics {
|
||||||
@ -122,22 +117,22 @@ func (h *JobHandler) getJobsData(circleID, topic string) ([]JobDisplayInfo, erro
|
|||||||
h.logger.Printf("Could not list jobs for circle=%s, topic=%s: %v", circle, t, err)
|
h.logger.Printf("Could not list jobs for circle=%s, topic=%s: %v", circle, t, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, jobID := range jobIDs {
|
for _, jobID := range jobIDs {
|
||||||
job, err := h.client.GetJob(jobID)
|
job, err := h.client.GetJob(jobID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logger.Printf("Error getting job %s: %v", jobID, err)
|
h.logger.Printf("Error getting job %s: %v", jobID, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
allJobs = append(allJobs, JobDisplayInfo{
|
allJobs = append(allJobs, JobDisplayInfo{
|
||||||
JobID: job.JobID,
|
JobID: fmt.Sprintf("%d", job.JobID),
|
||||||
CircleID: job.CircleID,
|
CircleID: job.CircleID,
|
||||||
Topic: job.Topic,
|
Topic: job.Topic,
|
||||||
Status: string(job.Status),
|
Status: string(job.Status),
|
||||||
SessionKey: job.SessionKey,
|
SessionKey: job.SessionKey,
|
||||||
HeroScript: job.HeroScript,
|
Params: job.Params,
|
||||||
RhaiScript: job.RhaiScript,
|
ParamsType: string(job.ParamsType),
|
||||||
Result: job.Result,
|
Result: job.Result,
|
||||||
Error: job.Error,
|
Error: job.Error,
|
||||||
TimeScheduled: job.TimeScheduled,
|
TimeScheduled: job.TimeScheduled,
|
||||||
@ -148,7 +143,7 @@ func (h *JobHandler) getJobsData(circleID, topic string) ([]JobDisplayInfo, erro
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return allJobs, nil
|
return allJobs, nil
|
||||||
} else if circleID == "" || topic == "" {
|
} else if circleID == "" || topic == "" {
|
||||||
// If only one of the parameters is provided, we can't list jobs
|
// If only one of the parameters is provided, we can't list jobs
|
||||||
@ -171,13 +166,13 @@ func (h *JobHandler) getJobsData(circleID, topic string) ([]JobDisplayInfo, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
jobInfo := JobDisplayInfo{
|
jobInfo := JobDisplayInfo{
|
||||||
JobID: job.JobID,
|
JobID: fmt.Sprintf("%d", job.JobID),
|
||||||
CircleID: job.CircleID,
|
CircleID: job.CircleID,
|
||||||
Topic: job.Topic,
|
Topic: job.Topic,
|
||||||
Status: string(job.Status),
|
Status: string(job.Status),
|
||||||
SessionKey: job.SessionKey,
|
SessionKey: job.SessionKey,
|
||||||
HeroScript: job.HeroScript,
|
Params: job.Params,
|
||||||
RhaiScript: job.RhaiScript,
|
ParamsType: string(job.ParamsType),
|
||||||
Result: job.Result,
|
Result: job.Result,
|
||||||
Error: job.Error,
|
Error: job.Error,
|
||||||
TimeScheduled: job.TimeScheduled,
|
TimeScheduled: job.TimeScheduled,
|
||||||
|
@ -34,7 +34,7 @@ Jobs are stored in both Redis and OurDB:
|
|||||||
- Handles all queue operations (adding/removing jobs)
|
- Handles all queue operations (adding/removing jobs)
|
||||||
- Stores all running jobs for fast access
|
- Stores all running jobs for fast access
|
||||||
- Used for real-time operations and status updates
|
- Used for real-time operations and status updates
|
||||||
- **Job Storage**: `herojobs:<circleID>:<topic>:<jobID>` or legacy `jobsmanager:<jobID>`
|
- **Job Storage**: `herojobs:<circleID>:<topic>:<jobID>` or legacy `herojobs:<jobID>`
|
||||||
- **Queue**: `heroqueue:<circleID>:<topic>`
|
- **Queue**: `heroqueue:<circleID>:<topic>`
|
||||||
|
|
||||||
#### OurDB
|
#### OurDB
|
||||||
@ -85,14 +85,14 @@ The watchdog uses Go's concurrency primitives to safely manage multiple jobs:
|
|||||||
|
|
||||||
```go
|
```go
|
||||||
// Initialize Redis client
|
// Initialize Redis client
|
||||||
redisClient, err := jobsmanager.NewRedisClient("localhost:6379", false)
|
redisClient, err := herojobs.NewRedisClient("localhost:6379", false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to connect to Redis: %v", err)
|
log.Fatalf("Failed to connect to Redis: %v", err)
|
||||||
}
|
}
|
||||||
defer redisClient.Close()
|
defer redisClient.Close()
|
||||||
|
|
||||||
// Create and start watchdog
|
// Create and start watchdog
|
||||||
watchdog := jobsmanager.NewWatchDog(redisClient)
|
watchdog := herojobs.NewWatchDog(redisClient)
|
||||||
watchdog.Start()
|
watchdog.Start()
|
||||||
|
|
||||||
// Handle shutdown
|
// Handle shutdown
|
||||||
@ -103,14 +103,14 @@ defer watchdog.Stop()
|
|||||||
|
|
||||||
```go
|
```go
|
||||||
// Create a new job
|
// Create a new job
|
||||||
job := jobsmanager.NewJob()
|
job := herojobs.NewJob()
|
||||||
job.CircleID = "myCircle"
|
job.CircleID = "myCircle"
|
||||||
job.Topic = "myTopic"
|
job.Topic = "myTopic"
|
||||||
job.Params = `
|
job.Params = `
|
||||||
!!fake.return_success
|
!!fake.return_success
|
||||||
message: "This is a test job"
|
message: "This is a test job"
|
||||||
`
|
`
|
||||||
job.ParamsType = jobsmanager.ParamsTypeHeroScript
|
job.ParamsType = herojobs.ParamsTypeHeroScript
|
||||||
job.Timeout = 30 // 30 seconds timeout
|
job.Timeout = 30 // 30 seconds timeout
|
||||||
|
|
||||||
// Save the job to OurDB to get an ID
|
// Save the job to OurDB to get an ID
|
||||||
@ -140,7 +140,7 @@ jobID := job.JobID
|
|||||||
job, err := redisClient.GetJob(jobID)
|
job, err := redisClient.GetJob(jobID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If not found in Redis, try OurDB for historical jobs
|
// If not found in Redis, try OurDB for historical jobs
|
||||||
job = &jobsmanager.Job{JobID: jobID}
|
job = &herojobs.Job{JobID: jobID}
|
||||||
if err := job.Load(); err != nil {
|
if err := job.Load(); err != nil {
|
||||||
log.Printf("Failed to load job: %v", err)
|
log.Printf("Failed to load job: %v", err)
|
||||||
return
|
return
|
||||||
@ -149,13 +149,13 @@ if err != nil {
|
|||||||
|
|
||||||
// Check job status
|
// Check job status
|
||||||
switch job.Status {
|
switch job.Status {
|
||||||
case jobsmanager.JobStatusNew:
|
case herojobs.JobStatusNew:
|
||||||
fmt.Println("Job is waiting to be processed")
|
fmt.Println("Job is waiting to be processed")
|
||||||
case jobsmanager.JobStatusActive:
|
case herojobs.JobStatusActive:
|
||||||
fmt.Println("Job is currently being processed")
|
fmt.Println("Job is currently being processed")
|
||||||
case jobsmanager.JobStatusDone:
|
case herojobs.JobStatusDone:
|
||||||
fmt.Printf("Job completed successfully: %s\n", job.Result)
|
fmt.Printf("Job completed successfully: %s\n", job.Result)
|
||||||
case jobsmanager.JobStatusError:
|
case herojobs.JobStatusError:
|
||||||
fmt.Printf("Job failed: %s\n", job.Error)
|
fmt.Printf("Job failed: %s\n", job.Error)
|
||||||
}
|
}
|
||||||
```
|
```
|
@ -1,4 +1,4 @@
|
|||||||
package jobsmanager
|
package herojobs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
@ -1,4 +1,4 @@
|
|||||||
package jobsmanager
|
package herojobs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -72,7 +72,7 @@ func (d *WatchDog) processHeroScript(ctx context.Context, job *Job) {
|
|||||||
|
|
||||||
// TODO: Implement HeroScript processing
|
// TODO: Implement HeroScript processing
|
||||||
errMsg := "HeroScript processing not implemented yet"
|
errMsg := "HeroScript processing not implemented yet"
|
||||||
log.Printf("Error processing job %s: %s", job.JobID, errMsg)
|
log.Printf("Error processing job %d: %s", job.JobID, errMsg)
|
||||||
|
|
||||||
// Update job status in OurDB
|
// Update job status in OurDB
|
||||||
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
|
job.Finish(JobStatusError, "", fmt.Errorf(errMsg))
|
@ -1,4 +1,4 @@
|
|||||||
package jobsmanager
|
package herojobs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -82,7 +82,7 @@ func (r *RedisClient) GetJob(jobID interface{}) (*Job, error) {
|
|||||||
switch id := jobID.(type) {
|
switch id := jobID.(type) {
|
||||||
case uint32:
|
case uint32:
|
||||||
// Legacy format for backward compatibility
|
// Legacy format for backward compatibility
|
||||||
storageKey = fmt.Sprintf("jobsmanager:%d", id)
|
storageKey = fmt.Sprintf("herojobs:%d", id)
|
||||||
case string:
|
case string:
|
||||||
// Check if this is a composite key (circleID:topic:jobID)
|
// Check if this is a composite key (circleID:topic:jobID)
|
||||||
parts := strings.Split(id, ":")
|
parts := strings.Split(id, ":")
|
||||||
@ -103,10 +103,10 @@ func (r *RedisClient) GetJob(jobID interface{}) (*Job, error) {
|
|||||||
// Try to convert string to uint32 (legacy format)
|
// Try to convert string to uint32 (legacy format)
|
||||||
var numericID uint32
|
var numericID uint32
|
||||||
if _, err := fmt.Sscanf(id, "%d", &numericID); err == nil {
|
if _, err := fmt.Sscanf(id, "%d", &numericID); err == nil {
|
||||||
storageKey = fmt.Sprintf("jobsmanager:%d", numericID)
|
storageKey = fmt.Sprintf("herojobs:%d", numericID)
|
||||||
} else {
|
} else {
|
||||||
// Legacy string ID format
|
// Legacy string ID format
|
||||||
storageKey = fmt.Sprintf("jobsmanager:%s", id)
|
storageKey = fmt.Sprintf("herojobs:%s", id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -139,7 +139,7 @@ func (r *RedisClient) DeleteJob(jobID interface{}) error {
|
|||||||
switch id := jobID.(type) {
|
switch id := jobID.(type) {
|
||||||
case uint32:
|
case uint32:
|
||||||
// Legacy format for backward compatibility
|
// Legacy format for backward compatibility
|
||||||
storageKey = fmt.Sprintf("jobsmanager:%d", id)
|
storageKey = fmt.Sprintf("herojobs:%d", id)
|
||||||
case string:
|
case string:
|
||||||
// Check if this is a composite key (circleID:topic:jobID)
|
// Check if this is a composite key (circleID:topic:jobID)
|
||||||
parts := strings.Split(id, ":")
|
parts := strings.Split(id, ":")
|
||||||
@ -160,10 +160,10 @@ func (r *RedisClient) DeleteJob(jobID interface{}) error {
|
|||||||
// Try to convert string to uint32 (legacy format)
|
// Try to convert string to uint32 (legacy format)
|
||||||
var numericID uint32
|
var numericID uint32
|
||||||
if _, err := fmt.Sscanf(id, "%d", &numericID); err == nil {
|
if _, err := fmt.Sscanf(id, "%d", &numericID); err == nil {
|
||||||
storageKey = fmt.Sprintf("jobsmanager:%d", numericID)
|
storageKey = fmt.Sprintf("herojobs:%d", numericID)
|
||||||
} else {
|
} else {
|
||||||
// Legacy string ID format
|
// Legacy string ID format
|
||||||
storageKey = fmt.Sprintf("jobsmanager:%s", id)
|
storageKey = fmt.Sprintf("herojobs:%s", id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -238,7 +238,7 @@ func (r *RedisClient) QueueEmpty(circleID, topic string) error {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Handle legacy string IDs
|
// Handle legacy string IDs
|
||||||
storageKey := fmt.Sprintf("jobsmanager:%s", jobIDStr)
|
storageKey := fmt.Sprintf("herojobs:%s", jobIDStr)
|
||||||
err := r.client.Del(r.ctx, storageKey).Err()
|
err := r.client.Del(r.ctx, storageKey).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to delete job %s: %w", jobIDStr, err)
|
return fmt.Errorf("failed to delete job %s: %w", jobIDStr, err)
|
@ -1,4 +1,4 @@
|
|||||||
package jobsmanager
|
package herojobs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -4,7 +4,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/freeflowuniverse/heroagent/pkg/handlerfactory/herohandler"
|
"github.com/freeflowuniverse/heroagent/pkg/heroscript/handlerfactory/herohandler"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -2,8 +2,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/freeflowuniverse/heroagent/pkg/heroscript/playbook"
|
"github.com/freeflowuniverse/heroagent/pkg/heroscript/playbook"
|
||||||
)
|
)
|
||||||
@ -15,26 +13,26 @@ func main() {
|
|||||||
pb := playbook.New()
|
pb := playbook.New()
|
||||||
|
|
||||||
// Start a simple process
|
// Start a simple process
|
||||||
startAction := pb.NewAction(1, "start", "process", 0, playbook.ActionTypeUnknown)
|
startAction := pb.NewAction("1", "start", "process", 0, playbook.ActionTypeUnknown)
|
||||||
startAction.Params.Set("name", "example_process")
|
startAction.Params.Set("name", "example_process")
|
||||||
startAction.Params.Set("command", "ping -c 60 localhost")
|
startAction.Params.Set("command", "ping -c 60 localhost")
|
||||||
startAction.Params.Set("log", "true")
|
startAction.Params.Set("log", "true")
|
||||||
|
|
||||||
// List all processes
|
// List all processes
|
||||||
listAction := pb.NewAction(2, "list", "process", 0, playbook.ActionTypeUnknown)
|
listAction := pb.NewAction("2", "list", "process", 0, playbook.ActionTypeUnknown)
|
||||||
listAction.Params.Set("format", "table")
|
listAction.Params.Set("format", "table")
|
||||||
|
|
||||||
// Get status of a specific process
|
// Get status of a specific process
|
||||||
statusAction := pb.NewAction(3, "status", "process", 0, playbook.ActionTypeUnknown)
|
statusAction := pb.NewAction("3", "status", "process", 0, playbook.ActionTypeUnknown)
|
||||||
statusAction.Params.Set("name", "example_process")
|
statusAction.Params.Set("name", "example_process")
|
||||||
|
|
||||||
// Get logs of a specific process
|
// Get logs of a specific process
|
||||||
logsAction := pb.NewAction(4, "logs", "process", 0, playbook.ActionTypeUnknown)
|
logsAction := pb.NewAction("4", "logs", "process", 0, playbook.ActionTypeUnknown)
|
||||||
logsAction.Params.Set("name", "example_process")
|
logsAction.Params.Set("name", "example_process")
|
||||||
logsAction.Params.Set("lines", "10")
|
logsAction.Params.Set("lines", "10")
|
||||||
|
|
||||||
// Stop a process
|
// Stop a process
|
||||||
stopAction := pb.NewAction(5, "stop", "process", 0, playbook.ActionTypeUnknown)
|
stopAction := pb.NewAction("5", "stop", "process", 0, playbook.ActionTypeUnknown)
|
||||||
stopAction.Params.Set("name", "example_process")
|
stopAction.Params.Set("name", "example_process")
|
||||||
|
|
||||||
// Generate the heroscript
|
// Generate the heroscript
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
proxy "github.com/freeflowuniverse/heroagent/pkg/proxies/openai"
|
openaiproxy "github.com/freeflowuniverse/heroagent/pkg/heroservices/openaiproxy"
|
||||||
"github.com/openai/openai-go"
|
"github.com/openai/openai-go"
|
||||||
"github.com/openai/openai-go/option"
|
"github.com/openai/openai-go/option"
|
||||||
)
|
)
|
||||||
@ -37,15 +37,15 @@ func testProxyWithClient() {
|
|||||||
|
|
||||||
// Create a client that points to our proxy
|
// Create a client that points to our proxy
|
||||||
// Note: The server is using "/ai" as the prefix for all routes
|
// Note: The server is using "/ai" as the prefix for all routes
|
||||||
client := openai.NewClient(
|
client := openaiproxy.NewClient(
|
||||||
option.WithAPIKey("test-key"), // This is our test key, not a real OpenAI key
|
option.WithAPIKey("test-key"), // This is our test key, not a real OpenAI key
|
||||||
option.WithBaseURL("http://localhost:8080/ai"), // Use the /ai prefix to match the server routes
|
option.WithBaseURL("http://localhost:8080/ai"), // Use the /ai prefix to match the server routes
|
||||||
)
|
)
|
||||||
|
|
||||||
// Create a completion request
|
// Create a completion request
|
||||||
chatCompletion, err := client.Chat.Completions.New(context.Background(), openai.ChatCompletionNewParams{
|
chatCompletion, err := client.Chat.Completions.New(context.Background(), openaiproxy.ChatCompletionNewParams{
|
||||||
Messages: []openai.ChatCompletionMessageParamUnion{
|
Messages: []openaiproxy.ChatCompletionMessageParamUnion{
|
||||||
openai.UserMessage("Say this is a test"),
|
openaiproxy.UserMessage("Say this is a test"),
|
||||||
},
|
},
|
||||||
Model: "gpt-3.5-turbo", // Use a model that our proxy supports
|
Model: "gpt-3.5-turbo", // Use a model that our proxy supports
|
||||||
})
|
})
|
||||||
@ -70,9 +70,9 @@ func runServerMode() {
|
|||||||
|
|
||||||
// Create a proxy configuration
|
// Create a proxy configuration
|
||||||
config := proxy.ProxyConfig{
|
config := proxy.ProxyConfig{
|
||||||
Port: 8080, // Use a non-privileged port for testing
|
Port: 8080, // Use a non-privileged port for testing
|
||||||
OpenAIBaseURL: "https://api.openai.com", // Default OpenAI API URL
|
OpenAIBaseURL: "https://api.openaiproxy.com", // Default OpenAI API URL
|
||||||
DefaultOpenAIKey: openaiKey, // Fallback API key if user doesn't have one
|
DefaultOpenAIKey: openaiKey, // Fallback API key if user doesn't have one
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new factory with the configuration
|
// Create a new factory with the configuration
|
File diff suppressed because it is too large
Load Diff
@ -119,7 +119,7 @@ func createJob(c *fiber.Ctx, apiKey string, endpoint string, requestBody interfa
|
|||||||
// Create a new job
|
// Create a new job
|
||||||
job := jobsmanager.NewJob()
|
job := jobsmanager.NewJob()
|
||||||
job.ParamsType = jobsmanager.ParamsTypeAI
|
job.ParamsType = jobsmanager.ParamsTypeAI
|
||||||
job.Topic = "openai-proxy"
|
job.Topic = "openaiproxy-proxy"
|
||||||
job.CircleID = "ai"
|
job.CircleID = "ai"
|
||||||
|
|
||||||
// Serialize request body to JSON
|
// Serialize request body to JSON
|
||||||
@ -183,7 +183,7 @@ func (s *Server) handleModels(c *fiber.Ctx) error {
|
|||||||
// Forward request to OpenAI
|
// Forward request to OpenAI
|
||||||
url := s.Factory.Config.OpenAIBaseURL + "/v1/models"
|
url := s.Factory.Config.OpenAIBaseURL + "/v1/models"
|
||||||
if url == "/v1/models" {
|
if url == "/v1/models" {
|
||||||
url = "https://api.openai.com/v1/models"
|
url = "https://api.openaiproxy.com/v1/models"
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
@ -250,7 +250,7 @@ func (s *Server) handleGetModel(c *fiber.Ctx) error {
|
|||||||
// Forward request to OpenAI
|
// Forward request to OpenAI
|
||||||
url := s.Factory.Config.OpenAIBaseURL + "/v1/models/" + modelID
|
url := s.Factory.Config.OpenAIBaseURL + "/v1/models/" + modelID
|
||||||
if strings.HasPrefix(url, "/v1/models/") {
|
if strings.HasPrefix(url, "/v1/models/") {
|
||||||
url = "https://api.openai.com/v1/models/" + modelID
|
url = "https://api.openaiproxy.com/v1/models/" + modelID
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
@ -337,7 +337,7 @@ func (s *Server) handleChatCompletions(c *fiber.Ctx) error {
|
|||||||
// Forward request to OpenAI
|
// Forward request to OpenAI
|
||||||
url := s.Factory.Config.OpenAIBaseURL + "/v1/chat/completions"
|
url := s.Factory.Config.OpenAIBaseURL + "/v1/chat/completions"
|
||||||
if url == "/v1/chat/completions" {
|
if url == "/v1/chat/completions" {
|
||||||
url = "https://api.openai.com/v1/chat/completions"
|
url = "https://api.openaiproxy.com/v1/chat/completions"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert the request body back to JSON
|
// Convert the request body back to JSON
|
||||||
@ -461,7 +461,7 @@ func (s *Server) handleCompletions(c *fiber.Ctx) error {
|
|||||||
// Forward request to OpenAI
|
// Forward request to OpenAI
|
||||||
url := s.Factory.Config.OpenAIBaseURL + "/v1/completions"
|
url := s.Factory.Config.OpenAIBaseURL + "/v1/completions"
|
||||||
if url == "/v1/completions" {
|
if url == "/v1/completions" {
|
||||||
url = "https://api.openai.com/v1/completions"
|
url = "https://api.openaiproxy.com/v1/completions"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert the request body back to JSON
|
// Convert the request body back to JSON
|
||||||
@ -585,7 +585,7 @@ func (s *Server) handleEmbeddings(c *fiber.Ctx) error {
|
|||||||
// Forward request to OpenAI
|
// Forward request to OpenAI
|
||||||
url := s.Factory.Config.OpenAIBaseURL + "/v1/embeddings"
|
url := s.Factory.Config.OpenAIBaseURL + "/v1/embeddings"
|
||||||
if url == "/v1/embeddings" {
|
if url == "/v1/embeddings" {
|
||||||
url = "https://api.openai.com/v1/embeddings"
|
url = "https://api.openaiproxy.com/v1/embeddings"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert the request body back to JSON
|
// Convert the request body back to JSON
|
||||||
@ -724,7 +724,7 @@ func (s *Server) handleImagesGenerations(c *fiber.Ctx) error {
|
|||||||
// Forward request to OpenAI
|
// Forward request to OpenAI
|
||||||
url := s.Factory.Config.OpenAIBaseURL + "/v1/images/generations"
|
url := s.Factory.Config.OpenAIBaseURL + "/v1/images/generations"
|
||||||
if url == "/v1/images/generations" {
|
if url == "/v1/images/generations" {
|
||||||
url = "https://api.openai.com/v1/images/generations"
|
url = "https://api.openaiproxy.com/v1/images/generations"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert the request body back to JSON
|
// Convert the request body back to JSON
|
||||||
@ -919,7 +919,7 @@ func (s *Server) handleListFiles(c *fiber.Ctx) error {
|
|||||||
// Forward request to OpenAI
|
// Forward request to OpenAI
|
||||||
url := s.Factory.Config.OpenAIBaseURL + "/v1/files"
|
url := s.Factory.Config.OpenAIBaseURL + "/v1/files"
|
||||||
if url == "/v1/files" {
|
if url == "/v1/files" {
|
||||||
url = "https://api.openai.com/v1/files"
|
url = "https://api.openaiproxy.com/v1/files"
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
@ -1017,7 +1017,7 @@ func (s *Server) handleGetFile(c *fiber.Ctx) error {
|
|||||||
// Forward request to OpenAI
|
// Forward request to OpenAI
|
||||||
url := s.Factory.Config.OpenAIBaseURL + "/v1/files/" + fileID
|
url := s.Factory.Config.OpenAIBaseURL + "/v1/files/" + fileID
|
||||||
if strings.HasPrefix(url, "/v1/files/") {
|
if strings.HasPrefix(url, "/v1/files/") {
|
||||||
url = "https://api.openai.com/v1/files/" + fileID
|
url = "https://api.openaiproxy.com/v1/files/" + fileID
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
@ -1084,7 +1084,7 @@ func (s *Server) handleDeleteFile(c *fiber.Ctx) error {
|
|||||||
// Forward request to OpenAI
|
// Forward request to OpenAI
|
||||||
url := s.Factory.Config.OpenAIBaseURL + "/v1/files/" + fileID
|
url := s.Factory.Config.OpenAIBaseURL + "/v1/files/" + fileID
|
||||||
if strings.HasPrefix(url, "/v1/files/") {
|
if strings.HasPrefix(url, "/v1/files/") {
|
||||||
url = "https://api.openai.com/v1/files/" + fileID
|
url = "https://api.openaiproxy.com/v1/files/" + fileID
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("DELETE", url, nil)
|
req, err := http.NewRequest("DELETE", url, nil)
|
@ -6,7 +6,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
"github.com/freeflowuniverse/herolauncher/pkg/openrpcmanager"
|
"github.com/freeflowuniverse/heroagent/pkg/openrpcmanager"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Common errors
|
// Common errors
|
||||||
|
@ -7,7 +7,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/freeflowuniverse/herolauncher/pkg/openrpcmanager"
|
"github.com/freeflowuniverse/heroagent/pkg/openrpcmanager"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MockClient implements the Client interface for testing
|
// MockClient implements the Client interface for testing
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
"os/signal"
|
"os/signal"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/freeflowuniverse/herolauncher/pkg/openrpcmanager"
|
"github.com/freeflowuniverse/heroagent/pkg/openrpcmanager"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -91,20 +91,20 @@ func (b *Builder) Build() error {
|
|||||||
return fmt.Errorf("failed to ensure Go is installed: %w", err)
|
return fmt.Errorf("failed to ensure Go is installed: %w", err)
|
||||||
}
|
}
|
||||||
fmt.Printf("Using Go executable from: %s\n", goPath)
|
fmt.Printf("Using Go executable from: %s\n", goPath)
|
||||||
|
|
||||||
// Pass the Go path explicitly to the GoSPBuilder
|
// Pass the Go path explicitly to the GoSPBuilder
|
||||||
b.GoSPBuilder.WithGoPath(goPath)
|
b.GoSPBuilder.WithGoPath(goPath)
|
||||||
|
|
||||||
// For the Go stored procedure, we'll create and execute a shell script directly
|
// For the Go stored procedure, we'll create and execute a shell script directly
|
||||||
// to ensure all environment variables are properly set
|
// to ensure all environment variables are properly set
|
||||||
fmt.Println("Building Go stored procedure via shell script...")
|
fmt.Println("Building Go stored procedure via shell script...")
|
||||||
|
|
||||||
tempDir, err := os.MkdirTemp("", "gosp-build-")
|
tempDir, err := os.MkdirTemp("", "gosp-build-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create temp directory: %w", err)
|
return fmt.Errorf("failed to create temp directory: %w", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(tempDir)
|
defer os.RemoveAll(tempDir)
|
||||||
|
|
||||||
// Create the Go source file in the temp directory
|
// Create the Go source file in the temp directory
|
||||||
libPath := filepath.Join(tempDir, "gosp.go")
|
libPath := filepath.Join(tempDir, "gosp.go")
|
||||||
libSrc := `
|
libSrc := `
|
||||||
@ -122,7 +122,7 @@ func main() {}
|
|||||||
if err := os.WriteFile(libPath, []byte(libSrc), 0644); err != nil {
|
if err := os.WriteFile(libPath, []byte(libSrc), 0644); err != nil {
|
||||||
return fmt.Errorf("failed to write Go source file: %w", err)
|
return fmt.Errorf("failed to write Go source file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a shell script to build the Go stored procedure
|
// Create a shell script to build the Go stored procedure
|
||||||
buildScript := filepath.Join(tempDir, "build.sh")
|
buildScript := filepath.Join(tempDir, "build.sh")
|
||||||
buildScriptContent := fmt.Sprintf(`#!/bin/sh
|
buildScriptContent := fmt.Sprintf(`#!/bin/sh
|
||||||
@ -147,11 +147,11 @@ go build -buildmode=c-shared -o %s/lib/libgosp.so %s
|
|||||||
echo "Go stored procedure built successfully!"
|
echo "Go stored procedure built successfully!"
|
||||||
`,
|
`,
|
||||||
libPath, b.InstallPrefix, b.InstallPrefix, b.InstallPrefix, libPath, b.InstallPrefix, libPath)
|
libPath, b.InstallPrefix, b.InstallPrefix, b.InstallPrefix, libPath, b.InstallPrefix, libPath)
|
||||||
|
|
||||||
if err := os.WriteFile(buildScript, []byte(buildScriptContent), 0755); err != nil {
|
if err := os.WriteFile(buildScript, []byte(buildScriptContent), 0755); err != nil {
|
||||||
return fmt.Errorf("failed to write build script: %w", err)
|
return fmt.Errorf("failed to write build script: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute the build script
|
// Execute the build script
|
||||||
cmd := exec.Command("/bin/sh", buildScript)
|
cmd := exec.Command("/bin/sh", buildScript)
|
||||||
cmd.Stdout = os.Stdout
|
cmd.Stdout = os.Stdout
|
Loading…
Reference in New Issue
Block a user