Compare commits

...

4 Commits

Author SHA1 Message Date
e60b9f62f1 ... 2025-05-24 07:24:17 +04:00
5d241e9ade ... 2025-05-24 07:09:15 +04:00
b8c8da9e31 ... 2025-05-24 06:56:02 +04:00
55a05a5571 ... 2025-05-23 22:12:17 +04:00
18 changed files with 1606 additions and 125 deletions

View File

@ -5,49 +5,72 @@ import (
"fmt"
"log"
"os"
"strconv"
"git.ourworld.tf/herocode/heroagent/pkg/heroagent"
"git.ourworld.tf/herocode/heroagent/pkg/servers/ui" // Import the new UI package
"git.ourworld.tf/herocode/heroagent/pkg/servers/heroagent"
)
func main() {
// Parse command-line flags
portFlag := flag.String("port", "", "Port to run the HeroLauncher on")
uiPortFlag := flag.String("uiport", "3000", "Port to run the UI server on") // New flag for UI port
redisPortFlag := flag.Int("redisport", 6378, "Port to run the Redis server on")
webdavPortFlag := flag.Int("webdavport", 9001, "Port to run the WebDAV server on")
uiPortFlag := flag.Int("uiport", 9002, "Port to run the UI server on")
// Flags to enable/disable specific servers
enableRedisFlag := flag.Bool("redis", true, "Enable Redis server")
enableWebDAVFlag := flag.Bool("webdav", true, "Enable WebDAV server")
enableUIFlag := flag.Bool("ui", true, "Enable UI server")
flag.Parse()
// Initialize HeroLauncher with default configuration
// Initialize ServerFactory with default configuration
config := heroagent.DefaultConfig()
// Override with command-line flags if provided
if *portFlag != "" {
config.Port = *portFlag
}
config.Redis.TCPPort = *redisPortFlag
config.WebDAV.Config.TCPPort = *webdavPortFlag
config.UI.Port = strconv.Itoa(*uiPortFlag)
// Set server enable flags
config.EnableRedis = *enableRedisFlag
config.EnableWebDAV = *enableWebDAVFlag
config.EnableUI = *enableUIFlag
// Override with environment variables if provided
if port := os.Getenv("PORT"); port != "" {
config.Port = port
if redisPortStr := os.Getenv("REDIS_PORT"); redisPortStr != "" {
if port, err := strconv.Atoi(redisPortStr); err == nil {
config.Redis.TCPPort = port
}
}
if webdavPortStr := os.Getenv("WEBDAV_PORT"); webdavPortStr != "" {
if port, err := strconv.Atoi(webdavPortStr); err == nil {
config.WebDAV.Config.TCPPort = port
}
}
if uiPort := os.Getenv("UI_PORT"); uiPort != "" {
config.UI.Port = uiPort
}
// Create HeroLauncher instance
launcher := heroagent.New(config)
// Create ServerFactory instance
factory := heroagent.New(config)
// Initialize and start the UI server in a new goroutine
go func() {
uiApp := ui.NewApp(ui.AppConfig{}) // Assuming default AppConfig is fine
uiPort := *uiPortFlag
if envUiPort := os.Getenv("UIPORT"); envUiPort != "" {
uiPort = envUiPort
}
fmt.Printf("Starting UI server on port %s...\n", uiPort)
if err := uiApp.Listen(":" + uiPort); err != nil {
log.Printf("Failed to start UI server: %v", err) // Use Printf to not exit main app
}
}()
// Start the main HeroLauncher server
fmt.Printf("Starting HeroLauncher on port %s...\n", config.Port)
if err := launcher.Start(); err != nil {
log.Fatalf("Failed to start HeroLauncher: %v", err)
// Start all servers
fmt.Println("Starting HeroAgent servers...")
if err := factory.Start(); err != nil {
log.Fatalf("Failed to start servers: %v", err)
}
fmt.Printf("All servers started successfully:\n")
if config.EnableRedis {
fmt.Printf("- Redis server running on port %d\n", config.Redis.TCPPort)
}
if config.EnableWebDAV {
fmt.Printf("- WebDAV server running on port %d\n", config.WebDAV.Config.TCPPort)
}
if config.EnableUI {
fmt.Printf("- UI server running on port %s\n", config.UI.Port)
}
// Keep the main goroutine running
select {}
}

83
cmd/jobtest/main.go Normal file
View File

@ -0,0 +1,83 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"git.ourworld.tf/herocode/heroagent/pkg/servers/heroagent"
)
func main() {
log.Println("Starting job management test...")
// Create a configuration for the server factory
config := heroagent.DefaultConfig()
// Customize configuration if needed
config.Redis.TCPPort = 6379
config.Redis.UnixSocketPath = "" // Use TCP connection only
config.Jobs.OurDBPath = "/tmp/jobsdb"
config.Jobs.WorkerCount = 3
config.Jobs.QueuePollInterval = 200 * time.Millisecond
// Only enable Redis and Jobs for this test
config.EnableRedis = true
config.EnableWebDAV = false
config.EnableUI = false
config.EnableJobs = true
// Create server factory
factory := heroagent.New(config)
// Start servers
if err := factory.Start(); err != nil {
log.Fatalf("Failed to start servers: %v", err)
}
// Get job manager
jobManager := factory.GetJobManager()
if jobManager == nil {
log.Fatalf("Job manager not initialized")
}
// Create some test jobs
createTestJobs(jobManager)
// Wait for interrupt signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
// Stop servers
if err := factory.Stop(); err != nil {
log.Fatalf("Failed to stop servers: %v", err)
}
log.Println("Job management test completed")
}
func createTestJobs(jobManager *heroagent.JobManager) {
// Create a few test jobs with different topics
topics := []string{"email", "notification", "report"}
for i := 0; i < 5; i++ {
for _, topic := range topics {
// Create job
params := fmt.Sprintf(`{"action": "process", "data": "test data %d for %s"}`, i, topic)
job, err := jobManager.CreateJob(topic, params)
if err != nil {
log.Printf("Failed to create job: %v", err)
continue
}
log.Printf("Created job %d with topic %s", job.JobID, job.Topic)
}
// Sleep briefly between batches
time.Sleep(500 * time.Millisecond)
}
}

1
go.mod
View File

@ -5,6 +5,7 @@ go 1.23.0
toolchain go1.23.6
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/gofiber/fiber/v2 v2.52.8
github.com/gofiber/template/jet/v2 v2.1.12
github.com/mholt/archiver/v3 v3.5.1

14
go.sum
View File

@ -54,11 +54,15 @@ github.com/dsnet/compress v0.0.2-0.20230904184137-39efe44ab707/go.mod h1:qssHWj6
github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/gofiber/fiber/v2 v2.52.8 h1:xl4jJQ0BV5EJTA2aWiKw/VddRpHrKeZLF0QPUxqn0x4=
github.com/gofiber/fiber/v2 v2.52.8/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
github.com/gofiber/template v1.8.3 h1:hzHdvMwMo/T2kouz2pPCA0zGiLCeMnoGsQZBTSYgZxc=
@ -141,6 +145,12 @@ github.com/nwaples/rardecode v1.1.0 h1:vSxaY8vQhOcVr4mm5e8XllHWTiM4JF507A0Katqw7
github.com/nwaples/rardecode v1.1.0/go.mod h1:5DzqNKiOdpKKBH87u8VlvAnPZMXcGRhxWkRpHbbfGS0=
github.com/nwaples/rardecode/v2 v2.0.0-beta.4 h1:sdiJxQdPjECn2lh9nLFFhgLCf+0ulDU5rODbtERTlUY=
github.com/nwaples/rardecode/v2 v2.0.0-beta.4/go.mod h1:yntwv/HfMc/Hbvtq9I19D1n58te3h6KsqCf3GxyfBGY=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/pierrec/lz4/v4 v4.1.2/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
@ -376,7 +386,11 @@ google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

BIN
heroagent Executable file

Binary file not shown.

106
pkg/herojobs/factory.go Normal file
View File

@ -0,0 +1,106 @@
package herojobs
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
const (
defaultRedisURL = "redis://localhost:6379/0"
)
// Factory manages job-related operations, including Redis connectivity and watchdog.
type Factory struct {
redisClient *redis.Client
// Add other fields as needed, e.g., for watchdog
}
// NewFactory creates a new Factory instance.
// It takes a redisURL string; if empty, it defaults to defaultRedisURL.
func NewFactory(redisURL string) (*Factory, error) {
if redisURL == "" {
redisURL = defaultRedisURL
}
opt, err := redis.ParseURL(redisURL)
if err != nil {
return nil, fmt.Errorf("invalid redis URL: %w", err)
}
client := redis.NewClient(opt)
// Check connection to Redis
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.Ping(ctx).Result()
if err != nil {
return nil, fmt.Errorf("failed to connect to redis at %s: %w", redisURL, err)
}
fmt.Printf("Successfully connected to Redis at %s\n", redisURL)
factory := &Factory{
redisClient: client,
}
// TODO: Properly start the watchdog here
fmt.Println("Watchdog placeholder: Watchdog would be started here.")
return factory, nil
}
// Close closes the Redis client connection.
func (f *Factory) Close() error {
if f.redisClient != nil {
return f.redisClient.Close()
}
return nil
}
// GetJob retrieves a job by its ID from Redis.
func (f *Factory) GetJob(ctx context.Context, jobID string) (string, error) {
// Example: Assuming jobs are stored as string values
val, err := f.redisClient.Get(ctx, jobID).Result()
if err == redis.Nil {
return "", fmt.Errorf("job with ID %s not found", jobID)
} else if err != nil {
return "", fmt.Errorf("failed to get job %s from redis: %w", jobID, err)
}
return val, nil
}
// ListJobs lists all job IDs (or a subset) from Redis.
// This is a simplified example; real-world job listing might involve more complex data structures.
func (f *Factory) ListJobs(ctx context.Context) ([]string, error) {
// Example: List all keys that might represent jobs.
// In a real application, you'd likely use specific Redis data structures (e.g., sorted sets, hashes)
// to manage jobs more efficiently and avoid scanning all keys.
keys, err := f.redisClient.Keys(ctx, "job:*").Result() // Assuming job keys are prefixed with "job:"
if err != nil {
return nil, fmt.Errorf("failed to list jobs from redis: %w", err)
}
return keys, nil
}
// AddJob adds a new job to Redis.
func (f *Factory) AddJob(ctx context.Context, jobID string, jobData string) error {
// Example: Store job data as a string
err := f.redisClient.Set(ctx, jobID, jobData, 0).Err() // 0 for no expiration
if err != nil {
return fmt.Errorf("failed to add job %s to redis: %w", jobID, err)
}
return nil
}
// DeleteJob deletes a job from Redis.
func (f *Factory) DeleteJob(ctx context.Context, jobID string) error {
_, err := f.redisClient.Del(ctx, jobID).Result()
if err != nil {
return fmt.Errorf("failed to delete job %s from redis: %w", jobID, err)
}
return nil
}

View File

@ -0,0 +1,208 @@
# HeroAgent Server Factory
The HeroAgent Server Factory is a comprehensive server management system that integrates multiple services:
- Redis Server
- WebDAV Server
- UI Server
- Job Management System
## Overview
The server factory provides a unified interface for starting, managing, and stopping these services. Each service can be enabled or disabled independently through configuration.
## Job Management System
The job management system provides a robust solution for handling asynchronous tasks with persistence and reliability. It combines the strengths of OurDB for persistent storage and Redis for active job queuing.
### Architecture
The job system follows a specific flow:
1. **Job Creation**:
- When a job is created, it's stored in both OurDB and Redis
- OurDB provides persistent storage with history tracking
- Redis stores the job data and adds the job ID to a queue for processing
- Each job is stored in Redis using a key pattern: `herojobs:<topic>:<jobID>`
- Each job ID is added to a queue using a key pattern: `heroqueue:<topic>`
2. **Job Processing**:
- Workers continuously poll Redis queues for new jobs
- When a job is found, it's fetched from Redis and updated to "active" status
- The updated job is stored in both OurDB and Redis
- The job is processed based on its parameters
3. **Job Completion**:
- When a job completes (success or error), it's updated in OurDB
- The job is removed from Redis to keep only active jobs in memory
- This approach ensures efficient memory usage while maintaining a complete history
### Data Flow Diagram
```
Job Creation:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Client │────▶│ OurDB │ │ Redis │
└─────────┘ └────┬────┘ └────┬────┘
│ │
│ Store Job │ Store Job
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│ Job Data│ │ Job Data│
└─────────┘ └─────────┘
│ Add to Queue
┌─────────┐
│ Queue │
└─────────┘
Job Processing:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Worker │────▶│ Redis │────▶│ OurDB │
└─────────┘ └────┬────┘ └────┬────┘
│ │
│ Fetch Job │ Update Job
│ from Queue │
▼ ▼
┌─────────┐ ┌─────────┐
│ Job Data│ │ Job Data│
└─────────┘ └─────────┘
│ Process Job
┌─────────┐
│ Result │
└─────────┘
Job Completion:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Worker │────▶│ OurDB │ │ Redis │
└─────────┘ └────┬────┘ └────┬────┘
│ │
│ Update Job │ Remove Job
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│ Job Data│ │ ✓ │
└─────────┘ └─────────┘
```
### Components
- **JobManager**: Coordinates job operations between OurDB and Redis
- **RedisJobManager**: Handles Redis-specific operations for jobs
- **JobWorker**: Processes jobs from Redis queues
- **OurDB**: Provides persistent storage for all jobs
### Job States
Jobs can be in one of four states:
- **New**: Job has been created but not yet processed
- **Active**: Job is currently being processed
- **Done**: Job has completed successfully
- **Error**: Job encountered an error during processing
## Usage
### Configuration
```go
config := heroagent.DefaultConfig()
// Configure Redis
config.Redis.TCPPort = 6379
config.Redis.UnixSocketPath = "/tmp/redis.sock"
// Configure job system
config.Jobs.OurDBPath = "./data/jobsdb"
config.Jobs.WorkerCount = 5
config.Jobs.QueuePollInterval = 100 * time.Millisecond
// Enable/disable services
config.EnableRedis = true
config.EnableWebDAV = true
config.EnableUI = true
config.EnableJobs = true
```
### Starting the Server Factory
```go
// Create server factory
factory := heroagent.New(config)
// Start servers
if err := factory.Start(); err != nil {
log.Fatalf("Failed to start servers: %v", err)
}
```
### Creating and Managing Jobs
```go
// Get job manager
jobManager := factory.GetJobManager()
// Create a job
job, err := jobManager.CreateJob("email", `{"to": "user@example.com", "subject": "Hello"}`)
if err != nil {
log.Fatalf("Failed to create job: %v", err)
}
// Get job status
job, err = jobManager.GetJob(job.JobID)
if err != nil {
log.Fatalf("Failed to get job: %v", err)
}
// Update job status
err = jobManager.UpdateJobStatus(job.JobID, heroagent.JobStatusActive)
if err != nil {
log.Fatalf("Failed to update job status: %v", err)
}
// Complete a job
err = jobManager.CompleteJob(job.JobID, "Job completed successfully")
if err != nil {
log.Fatalf("Failed to complete job: %v", err)
}
// Mark a job as failed
err = jobManager.FailJob(job.JobID, "Job failed due to network error")
if err != nil {
log.Fatalf("Failed to mark job as failed: %v", err)
}
```
### Stopping the Server Factory
```go
// Stop servers
if err := factory.Stop(); err != nil {
log.Fatalf("Failed to stop servers: %v", err)
}
```
## Implementation Details
### OurDB Integration
OurDB provides persistent storage for all jobs, including their complete history. It uses an auto-incrementing ID system to assign unique IDs to jobs.
### Redis Integration
Redis is used for active job queuing and temporary storage. Jobs are stored in Redis using the following key patterns:
- Queue keys: `heroqueue:<topic>`
- Job storage keys: `herojobs:<topic>:<jobID>`
When a job reaches a terminal state (done or error), it's removed from Redis but remains in OurDB for historical reference.
### Worker Pool
The job system uses a configurable worker pool to process jobs concurrently. Each worker polls Redis queues for new jobs and processes them independently.

View File

@ -0,0 +1,85 @@
package heroagent
import (
"time"
"git.ourworld.tf/herocode/heroagent/pkg/servers/ui"
"git.ourworld.tf/herocode/heroagent/pkg/servers/webdavserver"
)
// Config holds the configuration for the HeroAgent server factory
type Config struct {
// Redis server configuration
Redis RedisConfig
// WebDAV server configuration
WebDAV WebDAVConfig
// UI server configuration
UI UIConfig
// Job management configuration
Jobs JobsConfig
// Enable/disable specific servers
EnableRedis bool
EnableWebDAV bool
EnableUI bool
EnableJobs bool
}
// RedisConfig holds the configuration for the Redis server
type RedisConfig struct {
TCPPort int
UnixSocketPath string
}
// WebDAVConfig holds the configuration for the WebDAV server
type WebDAVConfig struct {
// Use webdavserver.Config directly
Config webdavserver.Config
}
// UIConfig holds the configuration for the UI server
type UIConfig struct {
// UI server configuration
Port string
// Any additional UI-specific configuration
AppConfig ui.AppConfig
}
// JobsConfig holds the configuration for the job management system
type JobsConfig struct {
// OurDB configuration
OurDBPath string
// Job processing configuration
WorkerCount int
QueuePollInterval time.Duration
}
// DefaultConfig returns the default configuration for the HeroAgent
func DefaultConfig() Config {
return Config{
Redis: RedisConfig{
TCPPort: 6379,
UnixSocketPath: "", // Empty string means use TCP only
},
WebDAV: WebDAVConfig{
Config: webdavserver.DefaultConfig(),
},
UI: UIConfig{
Port: "9001", // Port is a string in UIConfig
AppConfig: ui.AppConfig{},
},
Jobs: JobsConfig{
OurDBPath: "./data/ourdb",
WorkerCount: 5,
QueuePollInterval: 100 * time.Millisecond,
},
EnableRedis: true,
EnableWebDAV: true,
EnableUI: true,
EnableJobs: true,
}
}

View File

@ -0,0 +1,226 @@
package heroagent
import (
"fmt"
"log"
"sync"
"git.ourworld.tf/herocode/heroagent/pkg/servers/redisserver"
"git.ourworld.tf/herocode/heroagent/pkg/servers/ui"
"git.ourworld.tf/herocode/heroagent/pkg/servers/webdavserver"
"github.com/gofiber/fiber/v2"
)
// ServerFactory manages the lifecycle of all servers
type ServerFactory struct {
config Config
// Server instances
redisServer *redisserver.Server
webdavServer *webdavserver.Server
uiApp *AppInstance
jobManager *JobManager
// Control channels
stopCh chan struct{}
wg sync.WaitGroup
}
// AppInstance wraps the UI app and its listening status
type AppInstance struct {
App *fiber.App
Port string
}
// New creates a new ServerFactory with the given configuration
func New(config Config) *ServerFactory {
return &ServerFactory{
config: config,
stopCh: make(chan struct{}),
}
}
// Start initializes and starts all enabled servers
func (f *ServerFactory) Start() error {
log.Println("Starting HeroAgent ServerFactory...")
// Start Redis server if enabled
if f.config.EnableRedis {
if err := f.startRedisServer(); err != nil {
return fmt.Errorf("failed to start Redis server: %w", err)
}
}
// Start WebDAV server if enabled
if f.config.EnableWebDAV {
if err := f.startWebDAVServer(); err != nil {
return fmt.Errorf("failed to start WebDAV server: %w", err)
}
}
// Start UI server if enabled
if f.config.EnableUI {
if err := f.startUIServer(); err != nil {
return fmt.Errorf("failed to start UI server: %w", err)
}
}
// Start job manager if enabled
if f.config.EnableJobs {
if err := f.startJobManager(); err != nil {
return fmt.Errorf("failed to start job manager: %w", err)
}
}
log.Println("All servers started successfully")
return nil
}
// Stop gracefully stops all running servers
func (f *ServerFactory) Stop() error {
log.Println("Stopping all servers...")
// Signal all goroutines to stop
close(f.stopCh)
// Stop WebDAV server if it's running
if f.webdavServer != nil {
if err := f.webdavServer.Stop(); err != nil {
log.Printf("Error stopping WebDAV server: %v", err)
}
}
// Stop job manager if it's running
if f.jobManager != nil {
if err := f.jobManager.Stop(); err != nil {
log.Printf("Error stopping job manager: %v", err)
}
}
// Wait for all goroutines to finish
f.wg.Wait()
log.Println("All servers stopped")
return nil
}
// startRedisServer initializes and starts the Redis server
func (f *ServerFactory) startRedisServer() error {
log.Println("Starting Redis server...")
// Create Redis server configuration
redisConfig := redisserver.ServerConfig{
TCPPort: f.config.Redis.TCPPort,
UnixSocketPath: f.config.Redis.UnixSocketPath,
}
// Create and start Redis server
f.redisServer = redisserver.NewServer(redisConfig)
log.Printf("Redis server started on port %d and socket %s",
redisConfig.TCPPort, redisConfig.UnixSocketPath)
return nil
}
// startWebDAVServer initializes and starts the WebDAV server
func (f *ServerFactory) startWebDAVServer() error {
log.Println("Starting WebDAV server...")
// Create WebDAV server
webdavServer, err := webdavserver.NewServer(f.config.WebDAV.Config)
if err != nil {
return fmt.Errorf("failed to create WebDAV server: %w", err)
}
f.webdavServer = webdavServer
// Start WebDAV server in a goroutine
f.wg.Add(1)
go func() {
defer f.wg.Done()
// Start the server
if err := webdavServer.Start(); err != nil {
log.Printf("WebDAV server error: %v", err)
}
}()
log.Printf("WebDAV server started on port %d", f.config.WebDAV.Config.TCPPort)
return nil
}
// startUIServer initializes and starts the UI server
func (f *ServerFactory) startUIServer() error {
log.Println("Starting UI server...")
// Create UI app
uiApp := ui.NewApp(f.config.UI.AppConfig)
// Store UI app instance
f.uiApp = &AppInstance{
App: uiApp,
Port: f.config.UI.Port,
}
// Start UI server in a goroutine
f.wg.Add(1)
go func() {
defer f.wg.Done()
// Start the server
addr := ":" + f.config.UI.Port
log.Printf("UI server listening on %s", addr)
if err := uiApp.Listen(addr); err != nil {
log.Printf("UI server error: %v", err)
}
}()
return nil
}
// GetRedisServer returns the Redis server instance
func (f *ServerFactory) GetRedisServer() *redisserver.Server {
return f.redisServer
}
// GetWebDAVServer returns the WebDAV server instance
func (f *ServerFactory) GetWebDAVServer() *webdavserver.Server {
return f.webdavServer
}
// GetUIApp returns the UI app instance
func (f *ServerFactory) GetUIApp() *AppInstance {
return f.uiApp
}
// startJobManager initializes and starts the job manager
func (f *ServerFactory) startJobManager() error {
log.Println("Starting job manager...")
// Create Redis connection for job manager
redisConn := &RedisConnection{
TCPPort: f.config.Redis.TCPPort,
UnixSocketPath: f.config.Redis.UnixSocketPath,
}
// Create job manager
jobManager, err := NewJobManager(f.config.Jobs, redisConn)
if err != nil {
return fmt.Errorf("failed to create job manager: %w", err)
}
f.jobManager = jobManager
// Start job manager
if err := jobManager.Start(); err != nil {
return fmt.Errorf("failed to start job manager: %w", err)
}
log.Println("Job manager started")
return nil
}
// GetJobManager returns the job manager instance
func (f *ServerFactory) GetJobManager() *JobManager {
return f.jobManager
}

View File

@ -0,0 +1,351 @@
package heroagent
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
"git.ourworld.tf/herocode/heroagent/pkg/data/ourdb"
)
// JobManager handles job management between OurDB and Redis
type JobManager struct {
config JobsConfig
ourDB *ourdb.OurDB
redisConn *RedisConnection
redisMgr *RedisJobManager
workers []*JobWorker
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewJobManager creates a new job manager
func NewJobManager(config JobsConfig, redisConn *RedisConnection) (*JobManager, error) {
// Create OurDB directory if it doesn't exist
if err := os.MkdirAll(config.OurDBPath, 0755); err != nil {
return nil, fmt.Errorf("failed to create OurDB directory: %w", err)
}
// Initialize OurDB
ourDBConfig := ourdb.DefaultConfig()
ourDBConfig.Path = config.OurDBPath
ourDBConfig.IncrementalMode = true
db, err := ourdb.New(ourDBConfig)
if err != nil {
return nil, fmt.Errorf("failed to create OurDB: %w", err)
}
// Initialize Redis job manager
redisMgr, err := NewRedisJobManager(redisConn.TCPPort, redisConn.UnixSocketPath)
if err != nil {
// Close OurDB before returning error
if closeErr := db.Close(); closeErr != nil {
log.Printf("Warning: failed to close OurDB: %v", closeErr)
}
return nil, fmt.Errorf("failed to create Redis job manager: %w", err)
}
// Create context with cancel
ctx, cancel := context.WithCancel(context.Background())
// Create job manager
jobMgr := &JobManager{
config: config,
ourDB: db,
redisConn: redisConn,
redisMgr: redisMgr,
ctx: ctx,
cancel: cancel,
}
return jobMgr, nil
}
// Start starts the job manager
func (jm *JobManager) Start() error {
log.Println("Starting job manager...")
// Start workers
jm.workers = make([]*JobWorker, jm.config.WorkerCount)
for i := 0; i < jm.config.WorkerCount; i++ {
worker := &JobWorker{
id: i,
jobMgr: jm,
ctx: jm.ctx,
wg: &jm.wg,
}
jm.workers[i] = worker
jm.startWorker(worker)
}
log.Printf("Job manager started with %d workers", jm.config.WorkerCount)
return nil
}
// Stop stops the job manager
func (jm *JobManager) Stop() error {
log.Println("Stopping job manager...")
// Signal all workers to stop
jm.cancel()
// Wait for all workers to finish
jm.wg.Wait()
// Close Redis job manager
if jm.redisMgr != nil {
if err := jm.redisMgr.Close(); err != nil {
log.Printf("Warning: failed to close Redis job manager: %v", err)
}
}
// Close OurDB
if err := jm.ourDB.Close(); err != nil {
// Log the error but don't fail the shutdown
log.Printf("Warning: failed to close OurDB: %v", err)
}
log.Println("Job manager stopped")
return nil
}
// startWorker starts a worker
func (jm *JobManager) startWorker(worker *JobWorker) {
jm.wg.Add(1)
go func() {
defer jm.wg.Done()
worker.run()
}()
}
// CreateJob creates a new job
func (jm *JobManager) CreateJob(topic, params string) (*Job, error) {
// Create new job
job := &Job{
Topic: topic,
Params: params,
Status: JobStatusNew,
TimeScheduled: time.Now().Unix(),
}
// Store job in OurDB
jobData, err := json.Marshal(job)
if err != nil {
return nil, fmt.Errorf("failed to marshal job: %w", err)
}
// Add job to OurDB with auto-incremented ID
id, err := jm.ourDB.Set(ourdb.OurDBSetArgs{
Data: jobData,
})
if err != nil {
return nil, fmt.Errorf("failed to store job in OurDB: %w", err)
}
// Update job with assigned ID
job.JobID = id
// Store job in Redis and add to queue
if err := jm.redisMgr.EnqueueJob(job); err != nil {
return nil, fmt.Errorf("failed to store job in Redis: %w", err)
}
log.Printf("Job %d created and stored in both OurDB and Redis", job.JobID)
return job, nil
}
// GetJob retrieves a job by ID
func (jm *JobManager) GetJob(jobID uint32) (*Job, error) {
// Get job from OurDB
jobData, err := jm.ourDB.Get(jobID)
if err != nil {
return nil, fmt.Errorf("failed to get job from OurDB: %w", err)
}
// Parse job data
job := &Job{}
if err := json.Unmarshal(jobData, job); err != nil {
return nil, fmt.Errorf("failed to unmarshal job data: %w", err)
}
return job, nil
}
// UpdateJobStatus updates the status of a job
func (jm *JobManager) UpdateJobStatus(jobID uint32, status JobStatus) error {
// Get job from OurDB
job, err := jm.GetJob(jobID)
if err != nil {
return err
}
// Update status
job.Status = status
// Update timestamps based on status
now := time.Now().Unix()
if status == JobStatusActive && job.TimeStart == 0 {
job.TimeStart = now
} else if (status == JobStatusDone || status == JobStatusError) && job.TimeEnd == 0 {
job.TimeEnd = now
}
// Store updated job in OurDB
jobData, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("failed to marshal job: %w", err)
}
// Update job in OurDB
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
ID: &jobID,
Data: jobData,
})
if err != nil {
return fmt.Errorf("failed to update job in OurDB: %w", err)
}
// If job is done or has error, remove from Redis
if status == JobStatusDone || status == JobStatusError {
if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil {
log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err)
}
} else {
// Otherwise, update in Redis
if err := jm.redisMgr.UpdateJobStatus(job); err != nil {
log.Printf("Warning: failed to update job %d in Redis: %v", jobID, err)
}
}
return nil
}
// CompleteJob marks a job as completed
func (jm *JobManager) CompleteJob(jobID uint32, result string) error {
// Get job from OurDB
job, err := jm.GetJob(jobID)
if err != nil {
return err
}
// Update job
job.Status = JobStatusDone
job.TimeEnd = time.Now().Unix()
job.Result = result
// Store updated job in OurDB
jobData, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("failed to marshal job: %w", err)
}
// Update job in OurDB
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
ID: &jobID,
Data: jobData,
})
if err != nil {
return fmt.Errorf("failed to update job in OurDB: %w", err)
}
// Remove from Redis
if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil {
log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err)
}
log.Printf("Job %d completed and removed from Redis", jobID)
return nil
}
// FailJob marks a job as failed
func (jm *JobManager) FailJob(jobID uint32, errorMsg string) error {
// Get job from OurDB
job, err := jm.GetJob(jobID)
if err != nil {
return err
}
// Update job
job.Status = JobStatusError
job.TimeEnd = time.Now().Unix()
job.Error = errorMsg
// Store updated job in OurDB
jobData, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("failed to marshal job: %w", err)
}
// Update job in OurDB
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
ID: &jobID,
Data: jobData,
})
if err != nil {
return fmt.Errorf("failed to update job in OurDB: %w", err)
}
// Remove from Redis
if err := jm.redisMgr.DeleteJob(jobID, job.Topic); err != nil {
log.Printf("Warning: failed to remove job %d from Redis: %v", jobID, err)
}
log.Printf("Job %d failed and removed from Redis", jobID)
return nil
}
// updateJobInBothStores updates a job in both OurDB and Redis
func (jm *JobManager) updateJobInBothStores(job *Job) error {
// Store job in OurDB
jobData, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("failed to marshal job: %w", err)
}
// Update job in OurDB
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
ID: &job.JobID,
Data: jobData,
})
if err != nil {
return fmt.Errorf("failed to update job in OurDB: %w", err)
}
// Update job in Redis
if err := jm.redisMgr.UpdateJobStatus(job); err != nil {
return fmt.Errorf("failed to update job in Redis: %w", err)
}
return nil
}
// completeJobProcessing updates a completed job in OurDB and removes it from Redis
func (jm *JobManager) completeJobProcessing(job *Job) error {
// Store job in OurDB
jobData, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("failed to marshal job: %w", err)
}
// Update job in OurDB
_, err = jm.ourDB.Set(ourdb.OurDBSetArgs{
ID: &job.JobID,
Data: jobData,
})
if err != nil {
return fmt.Errorf("failed to update job in OurDB: %w", err)
}
// Remove from Redis
if err := jm.redisMgr.DeleteJob(job.JobID, job.Topic); err != nil {
return fmt.Errorf("failed to remove job from Redis: %w", err)
}
return nil
}

View File

@ -0,0 +1,131 @@
package heroagent
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// JobStatus represents the status of a job
type JobStatus string
const (
// JobStatusNew indicates a newly created job
JobStatusNew JobStatus = "new"
// JobStatusActive indicates a job that is currently being processed
JobStatusActive JobStatus = "active"
// JobStatusError indicates a job that encountered an error
JobStatusError JobStatus = "error"
// JobStatusDone indicates a job that has been completed successfully
JobStatusDone JobStatus = "done"
)
// Job represents a job to be processed
type Job struct {
JobID uint32 `json:"jobid"`
Topic string `json:"topic"`
Params string `json:"params"`
Status JobStatus `json:"status"`
TimeScheduled int64 `json:"time_scheduled"`
TimeStart int64 `json:"time_start"`
TimeEnd int64 `json:"time_end"`
Error string `json:"error"`
Result string `json:"result"`
}
// RedisConnection wraps Redis connection details
type RedisConnection struct {
TCPPort int
UnixSocketPath string
}
// JobWorker represents a worker that processes jobs
type JobWorker struct {
id int
jobMgr *JobManager
ctx context.Context
wg *sync.WaitGroup
}
// run is the main worker loop
func (w *JobWorker) run() {
log.Printf("Worker %d started", w.id)
ticker := time.NewTicker(w.jobMgr.config.QueuePollInterval)
defer ticker.Stop()
for {
select {
case <-w.ctx.Done():
log.Printf("Worker %d stopping", w.id)
return
case <-ticker.C:
// Check for jobs in Redis
if err := w.checkForJobs(); err != nil {
log.Printf("Worker %d error checking for jobs: %v", w.id, err)
}
}
}
}
// checkForJobs checks for jobs in Redis
func (w *JobWorker) checkForJobs() error {
// Get list of queues
queues, err := w.jobMgr.redisMgr.ListQueues()
if err != nil {
return fmt.Errorf("failed to list queues: %w", err)
}
// Check each queue for jobs
for _, topic := range queues {
// Try to fetch a job from the queue
job, err := w.jobMgr.redisMgr.FetchNextJob(topic)
if err != nil {
// If queue is empty, continue to next queue
continue
}
// Process the job
if err := w.processJob(job); err != nil {
log.Printf("Error processing job %d: %v", job.JobID, err)
}
// Only process one job at a time
return nil
}
return nil
}
// processJob processes a job
func (w *JobWorker) processJob(job *Job) error {
log.Printf("Worker %d processing job %d", w.id, job.JobID)
// Update job status to active
job.Status = JobStatusActive
job.TimeStart = time.Now().Unix()
// Update job in both OurDB and Redis
if err := w.jobMgr.updateJobInBothStores(job); err != nil {
return fmt.Errorf("failed to update job status: %w", err)
}
// Simulate job processing
// In a real implementation, this would execute the job based on its parameters
time.Sleep(1 * time.Second)
// Complete the job
job.Status = JobStatusDone
job.TimeEnd = time.Now().Unix()
job.Result = fmt.Sprintf("Job %d processed successfully", job.JobID)
// Update job in OurDB and remove from Redis
if err := w.jobMgr.completeJobProcessing(job); err != nil {
return fmt.Errorf("failed to complete job: %w", err)
}
log.Printf("Worker %d completed job %d", w.id, job.JobID)
return nil
}

View File

@ -0,0 +1,219 @@
package heroagent
import (
"context"
"encoding/json"
"fmt"
"log"
"strconv"
"time"
"github.com/redis/go-redis/v9"
)
// RedisJobManager handles Redis operations for jobs
type RedisJobManager struct {
client *redis.Client
ctx context.Context
}
// NewRedisJobManager creates a new Redis job manager
func NewRedisJobManager(tcpPort int, unixSocketPath string) (*RedisJobManager, error) {
var client *redis.Client
var err error
// Try Unix socket first if provided
if unixSocketPath != "" {
log.Printf("Attempting to connect to Redis via Unix socket: %s", unixSocketPath)
client = redis.NewClient(&redis.Options{
Network: "unix",
Addr: unixSocketPath,
DB: 0,
DialTimeout: 2 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
})
// Test connection
ctx := context.Background()
_, pingErr := client.Ping(ctx).Result()
if pingErr != nil {
log.Printf("Failed to connect to Redis via Unix socket: %v, falling back to TCP", pingErr)
// Close the failed client
client.Close()
err = pingErr // Update the outer err variable
}
}
// If Unix socket connection failed or wasn't provided, use TCP
if unixSocketPath == "" || err != nil {
tcpAddr := fmt.Sprintf("localhost:%d", tcpPort)
log.Printf("Connecting to Redis via TCP: %s", tcpAddr)
client = redis.NewClient(&redis.Options{
Network: "tcp",
Addr: tcpAddr,
DB: 0,
DialTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
})
}
// Test connection
ctx := context.Background()
_, pingErr := client.Ping(ctx).Result()
if pingErr != nil {
return nil, fmt.Errorf("failed to connect to Redis: %w", pingErr)
}
return &RedisJobManager{
client: client,
ctx: ctx,
}, nil
}
// Close closes the Redis client
func (r *RedisJobManager) Close() error {
return r.client.Close()
}
// QueueKey returns the Redis queue key for a topic
func QueueKey(topic string) string {
return fmt.Sprintf("heroqueue:%s", topic)
}
// StorageKey returns the Redis storage key for a job
func StorageKey(jobID uint32, topic string) string {
return fmt.Sprintf("herojobs:%s:%d", topic, jobID)
}
// StoreJob stores a job in Redis
func (r *RedisJobManager) StoreJob(job *Job) error {
// Convert job to JSON
jobJSON, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("failed to marshal job: %w", err)
}
// Store job in Redis
storageKey := StorageKey(job.JobID, job.Topic)
err = r.client.Set(r.ctx, storageKey, jobJSON, 0).Err()
if err != nil {
return fmt.Errorf("failed to store job in Redis: %w", err)
}
return nil
}
// EnqueueJob adds a job to its queue
func (r *RedisJobManager) EnqueueJob(job *Job) error {
// Store the job first
if err := r.StoreJob(job); err != nil {
return err
}
// Add job ID to queue
queueKey := QueueKey(job.Topic)
err := r.client.RPush(r.ctx, queueKey, job.JobID).Err()
if err != nil {
return fmt.Errorf("failed to enqueue job: %w", err)
}
log.Printf("Job %d enqueued in Redis queue %s", job.JobID, queueKey)
return nil
}
// GetJob retrieves a job from Redis
func (r *RedisJobManager) GetJob(jobID uint32, topic string) (*Job, error) {
// Get job from Redis
storageKey := StorageKey(jobID, topic)
jobJSON, err := r.client.Get(r.ctx, storageKey).Result()
if err != nil {
if err == redis.Nil {
return nil, fmt.Errorf("job not found: %d", jobID)
}
return nil, fmt.Errorf("failed to get job from Redis: %w", err)
}
// Parse job JSON
job := &Job{}
if err := json.Unmarshal([]byte(jobJSON), job); err != nil {
return nil, fmt.Errorf("failed to unmarshal job: %w", err)
}
return job, nil
}
// DeleteJob deletes a job from Redis
func (r *RedisJobManager) DeleteJob(jobID uint32, topic string) error {
// Delete job from Redis
storageKey := StorageKey(jobID, topic)
err := r.client.Del(r.ctx, storageKey).Err()
if err != nil {
return fmt.Errorf("failed to delete job from Redis: %w", err)
}
log.Printf("Job %d deleted from Redis", jobID)
return nil
}
// FetchNextJob fetches the next job from a queue
func (r *RedisJobManager) FetchNextJob(topic string) (*Job, error) {
queueKey := QueueKey(topic)
// Get and remove first job ID from queue
jobIDStr, err := r.client.LPop(r.ctx, queueKey).Result()
if err != nil {
if err == redis.Nil {
return nil, fmt.Errorf("queue is empty")
}
return nil, fmt.Errorf("failed to fetch job ID from queue: %w", err)
}
// Convert job ID to uint32
jobID, err := strconv.ParseUint(jobIDStr, 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid job ID: %s", jobIDStr)
}
// Get job from Redis
return r.GetJob(uint32(jobID), topic)
}
// ListQueues lists all job queues
func (r *RedisJobManager) ListQueues() ([]string, error) {
// Get all queue keys
queueKeys, err := r.client.Keys(r.ctx, "heroqueue:*").Result()
if err != nil {
return nil, fmt.Errorf("failed to list queues: %w", err)
}
// Extract topic names from queue keys
topics := make([]string, 0, len(queueKeys))
for _, queueKey := range queueKeys {
// Extract topic from queue key (format: heroqueue:<topic>)
topic := queueKey[10:] // Skip "heroqueue:"
topics = append(topics, topic)
}
return topics, nil
}
// QueueSize returns the size of a queue
func (r *RedisJobManager) QueueSize(topic string) (int64, error) {
queueKey := QueueKey(topic)
// Get queue size
size, err := r.client.LLen(r.ctx, queueKey).Result()
if err != nil {
return 0, fmt.Errorf("failed to get queue size: %w", err)
}
return size, nil
}
// UpdateJobStatus updates the status of a job in Redis
func (r *RedisJobManager) UpdateJobStatus(job *Job) error {
// Update job in Redis
return r.StoreJob(job)
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
@ -16,7 +17,7 @@ import (
func main() {
// Parse command line flags
tcpPort := flag.String("tcp-port", "7777", "Redis server TCP port")
tcpPortStr := flag.String("tcp-port", "7777", "Redis server TCP port")
unixSocket := flag.String("unix-socket", "/tmp/redis-test.sock", "Redis server Unix domain socket path")
username := flag.String("user", "jan", "Username to check")
mailbox := flag.String("mailbox", "inbox", "Mailbox to check")
@ -24,8 +25,13 @@ func main() {
dbNum := flag.Int("db", 0, "Redis database number")
flag.Parse()
tcpPort, err := strconv.Atoi(*tcpPortStr)
if err != nil {
log.Fatalf("Invalid TCP port: %v", err)
}
// Start Redis server in a goroutine
log.Printf("Starting Redis server on TCP port %s and Unix socket %s", *tcpPort, *unixSocket)
log.Printf("Starting Redis server on TCP port %d and Unix socket %s", tcpPort, *unixSocket)
// Create a wait group to ensure the server is started before testing
var wg sync.WaitGroup
@ -44,7 +50,7 @@ func main() {
// Start the Redis server in a goroutine
go func() {
// Create a new server instance
_ = redisserver.NewServer(redisserver.ServerConfig{TCPPort: *tcpPort, UnixSocketPath: *unixSocket})
_ = redisserver.NewServer(redisserver.ServerConfig{TCPPort: tcpPort, UnixSocketPath: *unixSocket})
// Signal that the server is ready
wg.Done()
@ -61,7 +67,7 @@ func main() {
// Test TCP connection
log.Println("Testing TCP connection")
tcpAddr := fmt.Sprintf("localhost:%s", *tcpPort)
tcpAddr := fmt.Sprintf("localhost:%d", tcpPort)
testRedisConnection(tcpAddr, username, mailbox, debug, dbNum)
// Test Unix socket connection if supported

View File

@ -62,14 +62,19 @@ func (ts *TestSuite) PrintResults() {
func main() {
// Parse command line flags
tcpPort := flag.String("tcp-port", "7777", "Redis server TCP port")
tcpPortStr := flag.String("tcp-port", "7777", "Redis server TCP port")
unixSocket := flag.String("unix-socket", "/tmp/redis-test.sock", "Redis server Unix domain socket path")
debug := flag.Bool("debug", false, "Enable debug output")
dbNum := flag.Int("db", 0, "Redis database number")
flag.Parse()
tcpPortInt, err := strconv.Atoi(*tcpPortStr)
if err != nil {
log.Fatalf("Invalid TCP port: %v", err)
}
// Start Redis server in a goroutine
log.Printf("Starting Redis server on TCP port %s and Unix socket %s", *tcpPort, *unixSocket)
log.Printf("Starting Redis server on TCP port %d and Unix socket %s", tcpPortInt, *unixSocket)
// Create a wait group to ensure the server is started before testing
var wg sync.WaitGroup
@ -88,7 +93,7 @@ func main() {
// Start the Redis server in a goroutine
go func() {
// Create a new server instance
_ = redisserver.NewServer(redisserver.ServerConfig{TCPPort: *tcpPort, UnixSocketPath: *unixSocket})
_ = redisserver.NewServer(redisserver.ServerConfig{TCPPort: tcpPortInt, UnixSocketPath: *unixSocket})
// Signal that the server is ready
wg.Done()
@ -105,7 +110,7 @@ func main() {
// Test TCP connection
log.Println("Testing TCP connection")
tcpAddr := fmt.Sprintf("localhost:%s", *tcpPort)
tcpAddr := fmt.Sprintf("localhost:%d", tcpPortInt)
runTests(tcpAddr, *debug, *dbNum)
// Test Unix socket connection if supported

View File

@ -1,6 +1,7 @@
package redisserver
import (
"strconv"
"sync"
"time"
)
@ -20,7 +21,7 @@ type Server struct {
}
type ServerConfig struct {
TCPPort string
TCPPort int
UnixSocketPath string
}
@ -38,8 +39,8 @@ func NewServer(config ServerConfig) *Server {
go s.cleanupExpiredKeys()
// Start TCP server if port is provided
if config.TCPPort != "" {
tcpAddr := ":" + config.TCPPort
if config.TCPPort != 0 {
tcpAddr := ":" + strconv.Itoa(config.TCPPort)
go s.startRedisServer(tcpAddr, "")
}

View File

@ -26,15 +26,15 @@
</thead>
<tbody>
{{ if len(Processes) > 0 }}
{{ range process := Processes }}
{{ range pid, process := Processes }}
<tr>
<td>{{ process.PID }}</td>
<td>{{ pid }}</td>
<td>{{ process.Name }}</td>
<td>{{ printf "%.2f" process.CPU }}</td>
<td>{{ printf "%.2f" process.Memory }}</td>
<td>{{ process.CPU }}</td>
<td>{{ process.Memory }}</td>
<td>
<form action="/processes/kill/{{ process.PID }}" method="POST" style="display:inline;">
<button type="submit" class="btn btn-danger btn-sm" onclick="return confirm('Are you sure you want to kill process {{ process.PID }}?');">Kill</button>
<form action="/processes/kill/{{ pid }}" method="POST" style="display:inline;">
<button type="submit" class="btn btn-danger btn-sm" onclick="return confirm('Are you sure you want to kill process {{ pid }}?');">Kill</button>
</form>
</td>
</tr>

View File

@ -26,22 +26,22 @@ import (
// Config holds the configuration for the WebDAV server
type Config struct {
Host string
Port int
BasePath string
FileSystem string
ReadTimeout time.Duration
WriteTimeout time.Duration
DebugMode bool
UseAuth bool
Username string
Password string
UseHTTPS bool
CertFile string
KeyFile string
AutoGenerateCerts bool
CertValidityDays int
CertOrganization string
Host string
TCPPort int
BasePath string
FileSystem string
ReadTimeout time.Duration
WriteTimeout time.Duration
DebugMode bool
UseAuth bool
Username string
Password string
UseHTTPS bool
CertFile string
KeyFile string
AutoGenerateCerts bool
CertValidityDays int
CertOrganization string
}
// Server represents the WebDAV server
@ -74,18 +74,18 @@ func (rw *responseWrapper) Write(b []byte) (int, error) {
// NewServer creates a new WebDAV server
func NewServer(config Config) (*Server, error) {
log.Printf("Creating new WebDAV server with config: host=%s, port=%d, basePath=%s, fileSystem=%s, debug=%v, auth=%v, https=%v",
config.Host, config.Port, config.BasePath, config.FileSystem, config.DebugMode, config.UseAuth, config.UseHTTPS)
log.Printf("Creating new WebDAV server with config: host=%s, TCPPort=%d, basePath=%s, fileSystem=%s, debug=%v, auth=%v, https=%v",
config.Host, config.TCPPort, config.BasePath, config.FileSystem, config.DebugMode, config.UseAuth, config.UseHTTPS)
// Ensure the file system directory exists
if err := os.MkdirAll(config.FileSystem, 0755); err != nil {
log.Printf("ERROR: Failed to create file system directory %s: %v", config.FileSystem, err)
return nil, fmt.Errorf("failed to create file system directory: %w", err)
}
// Log the file system path
log.Printf("Using file system path: %s", config.FileSystem)
// Create debug logger function
debugLog := func(format string, v ...interface{}) {
if config.DebugMode {
@ -103,7 +103,7 @@ func NewServer(config Config) (*Server, error) {
} else {
log.Printf("WebDAV: %s %s", r.Method, r.URL.Path)
}
// Additional debug logging
if config.DebugMode {
log.Printf("[WebDAV DEBUG] Request Headers: %v", r.Header)
@ -115,7 +115,7 @@ func NewServer(config Config) (*Server, error) {
// Create HTTP server
httpServer := &http.Server{
Addr: fmt.Sprintf("%s:%d", config.Host, config.Port),
Addr: fmt.Sprintf("%s:%d", config.Host, config.TCPPort),
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
}
@ -141,15 +141,15 @@ func (s *Server) Start() error {
s.debugLog("Received request: %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)
s.debugLog("Request Protocol: %s", r.Proto)
s.debugLog("User-Agent: %s", r.UserAgent())
// Log all request headers
for name, values := range r.Header {
s.debugLog("Header: %s = %s", name, values)
}
// Log request depth (important for WebDAV)
s.debugLog("Depth header: %s", r.Header.Get("Depth"))
// Add CORS headers
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, COPY, MOVE")
@ -162,32 +162,32 @@ func (s *Server) Start() error {
w.Header().Set("DAV", "1, 2")
w.Header().Set("MS-Author-Via", "DAV")
w.Header().Set("Allow", "OPTIONS, GET, HEAD, POST, PUT, DELETE, PROPFIND, PROPPATCH, MKCOL, COPY, MOVE")
// Check if this is a macOS WebDAV client
isMacOSClient := strings.Contains(r.UserAgent(), "WebDAVFS") ||
strings.Contains(r.UserAgent(), "WebDAVLib") ||
isMacOSClient := strings.Contains(r.UserAgent(), "WebDAVFS") ||
strings.Contains(r.UserAgent(), "WebDAVLib") ||
strings.Contains(r.UserAgent(), "Darwin")
if isMacOSClient {
s.debugLog("Detected macOS WebDAV client OPTIONS request, adding macOS-specific headers")
// These headers help macOS Finder with WebDAV compatibility
w.Header().Set("X-Dav-Server", "HeroLauncher WebDAV Server")
}
w.WriteHeader(http.StatusOK)
return
}
// Handle authentication if enabled
if s.config.UseAuth {
s.debugLog("Authentication required for request")
auth := r.Header.Get("Authorization")
// Check if this is a macOS WebDAV client
isMacOSClient := strings.Contains(r.UserAgent(), "WebDAVFS") ||
strings.Contains(r.UserAgent(), "WebDAVLib") ||
isMacOSClient := strings.Contains(r.UserAgent(), "WebDAVFS") ||
strings.Contains(r.UserAgent(), "WebDAVLib") ||
strings.Contains(r.UserAgent(), "Darwin")
// Special handling for OPTIONS requests from macOS clients
if r.Method == "OPTIONS" && isMacOSClient {
s.debugLog("Detected macOS WebDAV client OPTIONS request, allowing without auth")
@ -196,28 +196,28 @@ func (s *Server) Start() error {
w.Header().Set("WWW-Authenticate", "Basic realm=\"WebDAV Server\"")
return
}
if auth == "" {
s.debugLog("No Authorization header provided for non-OPTIONS request")
w.Header().Set("WWW-Authenticate", "Basic realm=\"WebDAV Server\"")
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// Parse the authentication header
if !strings.HasPrefix(auth, "Basic ") {
s.debugLog("Invalid Authorization header format: %s", auth)
http.Error(w, "Invalid authorization header", http.StatusBadRequest)
return
}
payload, err := base64.StdEncoding.DecodeString(auth[6:])
if err != nil {
s.debugLog("Failed to decode Authorization header: %v, raw header: %s", err, auth)
http.Error(w, "Invalid authorization header", http.StatusBadRequest)
return
}
pair := strings.SplitN(string(payload), ":", 2)
if len(pair) != 2 {
s.debugLog("Invalid credential format: could not split into username:password")
@ -225,17 +225,17 @@ func (s *Server) Start() error {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// Log username for debugging (don't log password)
s.debugLog("Received credentials for user: %s", pair[0])
if pair[0] != s.config.Username || pair[1] != s.config.Password {
s.debugLog("Invalid credentials provided, expected user: %s", s.config.Username)
w.Header().Set("WWW-Authenticate", "Basic realm=\"WebDAV Server\"")
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
s.debugLog("Authentication successful for user: %s", pair[0])
}
@ -252,17 +252,17 @@ func (s *Server) Start() error {
}
// Add macOS-specific headers for better compatibility
isMacOSClient := strings.Contains(r.UserAgent(), "WebDAVFS") ||
strings.Contains(r.UserAgent(), "WebDAVLib") ||
isMacOSClient := strings.Contains(r.UserAgent(), "WebDAVFS") ||
strings.Contains(r.UserAgent(), "WebDAVLib") ||
strings.Contains(r.UserAgent(), "Darwin")
if isMacOSClient {
s.debugLog("Adding macOS-specific headers for better compatibility")
// These headers help macOS Finder with WebDAV compatibility
w.Header().Set("MS-Author-Via", "DAV")
w.Header().Set("X-Dav-Server", "HeroLauncher WebDAV Server")
w.Header().Set("DAV", "1, 2")
// Special handling for PROPFIND requests from macOS
if r.Method == "PROPFIND" {
s.debugLog("Handling macOS PROPFIND request with special compatibility")
@ -281,7 +281,7 @@ func (s *Server) Start() error {
// Log response details
s.debugLog("Response status: %d", responseWrapper.statusCode)
s.debugLog("Response content type: %s", w.Header().Get("Content-Type"))
// Log detailed information for debugging connection issues
if responseWrapper.statusCode >= 400 {
s.debugLog("ERROR: WebDAV request failed with status %d", responseWrapper.statusCode)
@ -303,24 +303,24 @@ func (s *Server) Start() error {
log.Printf("ERROR: HTTPS enabled but certificate or key file not provided and auto-generation is disabled")
return fmt.Errorf("HTTPS enabled but certificate or key file not provided and auto-generation is disabled")
}
// Auto-generate certificates if needed
if (s.config.CertFile == "" || s.config.KeyFile == "" ||
!fileExists(s.config.CertFile) || !fileExists(s.config.KeyFile)) &&
if (s.config.CertFile == "" || s.config.KeyFile == "" ||
!fileExists(s.config.CertFile) || !fileExists(s.config.KeyFile)) &&
s.config.AutoGenerateCerts {
s.debugLog("Certificate files not found, auto-generating...")
// Get base directory from the file system path
baseDir := filepath.Dir(s.config.FileSystem)
// Create certificates directory if it doesn't exist
certsDir := filepath.Join(baseDir, "certificates")
if err := os.MkdirAll(certsDir, 0755); err != nil {
log.Printf("ERROR: Failed to create certificates directory: %v", err)
return fmt.Errorf("failed to create certificates directory: %w", err)
}
// Set default certificate paths if not provided
if s.config.CertFile == "" {
s.config.CertFile = filepath.Join(certsDir, "webdav.crt")
@ -328,44 +328,44 @@ func (s *Server) Start() error {
if s.config.KeyFile == "" {
s.config.KeyFile = filepath.Join(certsDir, "webdav.key")
}
// Generate certificates
if err := generateCertificate(
s.config.CertFile,
s.config.KeyFile,
s.config.CertOrganization,
s.config.CertFile,
s.config.KeyFile,
s.config.CertOrganization,
s.config.CertValidityDays,
s.debugLog,
); err != nil {
log.Printf("ERROR: Failed to generate certificates: %v", err)
return fmt.Errorf("failed to generate certificates: %w", err)
}
log.Printf("Successfully generated self-signed certificates at %s and %s",
log.Printf("Successfully generated self-signed certificates at %s and %s",
s.config.CertFile, s.config.KeyFile)
}
// Verify certificate files exist
if !fileExists(s.config.CertFile) || !fileExists(s.config.KeyFile) {
log.Printf("ERROR: Certificate files not found at %s and/or %s",
log.Printf("ERROR: Certificate files not found at %s and/or %s",
s.config.CertFile, s.config.KeyFile)
return fmt.Errorf("certificate files not found")
}
// Configure TLS
tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS12,
}
s.httpServer.TLSConfig = tlsConfig
log.Printf("Starting WebDAV server with HTTPS on %s using certificates: %s, %s",
log.Printf("Starting WebDAV server with HTTPS on %s using certificates: %s, %s",
s.httpServer.Addr, s.config.CertFile, s.config.KeyFile)
err = s.httpServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
} else {
log.Printf("Starting WebDAV server with HTTP on %s", s.httpServer.Addr)
err = s.httpServer.ListenAndServe()
}
if err != nil && err != http.ErrServerClosed {
log.Printf("ERROR: WebDAV server failed to start: %v", err)
return err
@ -389,10 +389,10 @@ func (s *Server) Stop() error {
func DefaultConfig() Config {
// Use system temp directory as default base path
defaultBasePath := filepath.Join(os.TempDir(), "heroagent")
return Config{
Host: "0.0.0.0",
Port: 9999,
TCPPort: 9999,
BasePath: "/",
FileSystem: defaultBasePath,
ReadTimeout: 30 * time.Second,
@ -421,24 +421,24 @@ func fileExists(filename string) bool {
// generateCertificate creates a self-signed TLS certificate and key
func generateCertificate(certFile, keyFile, organization string, validityDays int, debugLog func(format string, args ...interface{})) error {
debugLog("Generating self-signed certificate: certFile=%s, keyFile=%s, organization=%s, validityDays=%d",
debugLog("Generating self-signed certificate: certFile=%s, keyFile=%s, organization=%s, validityDays=%d",
certFile, keyFile, organization, validityDays)
// Generate private key
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return fmt.Errorf("failed to generate private key: %w", err)
}
// Prepare certificate template
notBefore := time.Now()
notAfter := notBefore.Add(time.Duration(validityDays) * 24 * time.Hour)
serialNumber, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128))
if err != nil {
return fmt.Errorf("failed to generate serial number: %w", err)
}
template := x509.Certificate{
SerialNumber: serialNumber,
Subject: pkix.Name{
@ -453,36 +453,36 @@ func generateCertificate(certFile, keyFile, organization string, validityDays in
IPAddresses: []net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")},
DNSNames: []string{"localhost"},
}
// Create certificate
derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
if err != nil {
return fmt.Errorf("failed to create certificate: %w", err)
}
// Write certificate to file
certOut, err := os.Create(certFile)
if err != nil {
return fmt.Errorf("failed to open %s for writing: %w", certFile, err)
}
defer certOut.Close()
if err := pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil {
return fmt.Errorf("failed to write certificate to file: %w", err)
}
// Write private key to file
keyOut, err := os.OpenFile(keyFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return fmt.Errorf("failed to open %s for writing: %w", keyFile, err)
}
defer keyOut.Close()
privateKeyPEM := &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)}
if err := pem.Encode(keyOut, privateKeyPEM); err != nil {
return fmt.Errorf("failed to write private key to file: %w", err)
}
debugLog("Successfully generated self-signed certificate valid for %d days", validityDays)
return nil
}

22
scripts/test_jobs.sh Executable file
View File

@ -0,0 +1,22 @@
#!/bin/bash
# Create necessary directories
mkdir -p data/jobsdb
mkdir -p bin
# Build the job test
echo "Building job test..."
go build -o bin/jobtest cmd/jobtest/main.go
# Check if build was successful
if [ $? -ne 0 ]; then
echo "Build failed"
exit 1
fi
# Run the job test
echo "Running job test..."
./bin/jobtest
# Exit with the same status as the job test
exit $?