This commit is contained in:
despiegk 2025-05-23 14:05:09 +04:00
parent b2eb9d3116
commit c86165f88c
9 changed files with 0 additions and 2299 deletions

View File

@ -1,209 +0,0 @@
package client
import (
"encoding/json"
"errors"
"fmt"
"net"
"github.com/freeflowuniverse/heroagent/pkg/openrpcmanager"
)
// Common errors
var (
ErrConnectionFailed = errors.New("failed to connect to OpenRPC server")
ErrRequestFailed = errors.New("failed to send request to OpenRPC server")
ErrResponseFailed = errors.New("failed to read response from OpenRPC server")
ErrUnmarshalFailed = errors.New("failed to unmarshal response")
ErrUnexpectedResponse = errors.New("unexpected response format")
ErrRPCError = errors.New("RPC error")
ErrAuthenticationFailed = errors.New("authentication failed")
)
// RPCRequest represents an outgoing RPC request
type RPCRequest struct {
Method string `json:"method"`
Params json.RawMessage `json:"params"`
ID int `json:"id"`
Secret string `json:"secret,omitempty"`
JSONRPC string `json:"jsonrpc"`
}
// RPCResponse represents an incoming RPC response
type RPCResponse struct {
Result interface{} `json:"result,omitempty"`
Error *RPCError `json:"error,omitempty"`
ID interface{} `json:"id,omitempty"`
JSONRPC string `json:"jsonrpc"`
}
// RPCError represents an RPC error
type RPCError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// Error returns a string representation of the RPC error
func (e *RPCError) Error() string {
if e.Data != nil {
return fmt.Sprintf("RPC error %d: %s - %v", e.Code, e.Message, e.Data)
}
return fmt.Sprintf("RPC error %d: %s", e.Code, e.Message)
}
// IntrospectionResponse represents the response from the rpc.introspect method
type IntrospectionResponse struct {
Logs []openrpcmanager.CallLog `json:"logs"`
Total int `json:"total"`
Filtered int `json:"filtered"`
}
// Client is the interface that all OpenRPC clients must implement
type Client interface {
// Discover returns the OpenRPC schema
Discover() (openrpcmanager.OpenRPCSchema, error)
// Introspect returns information about recent RPC calls
Introspect(limit int, method string, status string) (IntrospectionResponse, error)
// Request sends a request to the OpenRPC server and returns the result
Request(method string, params json.RawMessage, secret string) (interface{}, error)
// Close closes the client connection
Close() error
}
// BaseClient provides a base implementation of the Client interface
type BaseClient struct {
socketPath string
secret string
nextID int
}
// NewClient creates a new OpenRPC client
func NewClient(socketPath, secret string) *BaseClient {
return &BaseClient{
socketPath: socketPath,
secret: secret,
nextID: 1,
}
}
// Discover returns the OpenRPC schema
func (c *BaseClient) Discover() (openrpcmanager.OpenRPCSchema, error) {
result, err := c.Request("rpc.discover", json.RawMessage("{}"), "")
if err != nil {
return openrpcmanager.OpenRPCSchema{}, err
}
// Convert result to schema
resultJSON, err := json.Marshal(result)
if err != nil {
return openrpcmanager.OpenRPCSchema{}, fmt.Errorf("%w: %v", ErrUnmarshalFailed, err)
}
var schema openrpcmanager.OpenRPCSchema
if err := json.Unmarshal(resultJSON, &schema); err != nil {
return openrpcmanager.OpenRPCSchema{}, fmt.Errorf("%w: %v", ErrUnmarshalFailed, err)
}
return schema, nil
}
// Introspect returns information about recent RPC calls
func (c *BaseClient) Introspect(limit int, method string, status string) (IntrospectionResponse, error) {
// Create the params object
params := struct {
Limit int `json:"limit,omitempty"`
Method string `json:"method,omitempty"`
Status string `json:"status,omitempty"`
}{
Limit: limit,
Method: method,
Status: status,
}
// Marshal the params
paramsJSON, err := json.Marshal(params)
if err != nil {
return IntrospectionResponse{}, fmt.Errorf("failed to marshal introspection params: %v", err)
}
// Make the request
result, err := c.Request("rpc.introspect", paramsJSON, c.secret)
if err != nil {
return IntrospectionResponse{}, err
}
// Convert result to introspection response
resultJSON, err := json.Marshal(result)
if err != nil {
return IntrospectionResponse{}, fmt.Errorf("%w: %v", ErrUnmarshalFailed, err)
}
var response IntrospectionResponse
if err := json.Unmarshal(resultJSON, &response); err != nil {
return IntrospectionResponse{}, fmt.Errorf("%w: %v", ErrUnmarshalFailed, err)
}
return response, nil
}
// Request sends a request to the OpenRPC server and returns the result
func (c *BaseClient) Request(method string, params json.RawMessage, secret string) (interface{}, error) {
// Connect to the Unix socket
conn, err := net.Dial("unix", c.socketPath)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrConnectionFailed, err)
}
defer conn.Close()
// Create the request
request := RPCRequest{
Method: method,
Params: params,
ID: c.nextID,
Secret: secret,
JSONRPC: "2.0",
}
c.nextID++
// Marshal the request
requestData, err := json.Marshal(request)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %v", err)
}
// Send the request
_, err = conn.Write(requestData)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrRequestFailed, err)
}
// Read the response
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrResponseFailed, err)
}
// Parse the response
var response RPCResponse
if err := json.Unmarshal(buf[:n], &response); err != nil {
return nil, fmt.Errorf("%w: %v", ErrUnmarshalFailed, err)
}
// Check for errors
if response.Error != nil {
return nil, fmt.Errorf("%w: %v", ErrRPCError, response.Error)
}
return response.Result, nil
}
// Close closes the client connection
func (c *BaseClient) Close() error {
// Nothing to do for the base client since we create a new connection for each request
return nil
}

View File

@ -1,283 +0,0 @@
package client
import (
"encoding/json"
"os"
"path/filepath"
"testing"
"time"
"github.com/freeflowuniverse/heroagent/pkg/openrpcmanager"
)
// MockClient implements the Client interface for testing
type MockClient struct {
BaseClient
}
// TestMethod is a test method that returns a greeting
func (c *MockClient) TestMethod(name string) (string, error) {
params := map[string]string{"name": name}
paramsJSON, err := json.Marshal(params)
if err != nil {
return "", err
}
result, err := c.Request("test.method", paramsJSON, "")
if err != nil {
return "", err
}
// Convert result to string
greeting, ok := result.(string)
if !ok {
return "", ErrUnexpectedResponse
}
return greeting, nil
}
// SecureMethod is a test method that requires authentication
func (c *MockClient) SecureMethod() (map[string]interface{}, error) {
result, err := c.Request("secure.method", json.RawMessage("{}"), c.secret)
if err != nil {
return nil, err
}
// Convert result to map
data, ok := result.(map[string]interface{})
if !ok {
return nil, ErrUnexpectedResponse
}
return data, nil
}
func TestClient(t *testing.T) {
// Create a temporary socket path
tempDir, err := os.MkdirTemp("", "openrpc-client-test")
if err != nil {
t.Fatalf("Failed to create temp directory: %v", err)
}
defer os.RemoveAll(tempDir)
socketPath := filepath.Join(tempDir, "openrpc.sock")
secret := "test-secret"
// Create test schema and handlers
schema := openrpcmanager.OpenRPCSchema{
OpenRPC: "1.2.6",
Info: openrpcmanager.InfoObject{
Title: "Test API",
Version: "1.0.0",
},
Methods: []openrpcmanager.MethodObject{
{
Name: "test.method",
Params: []openrpcmanager.ContentDescriptorObject{
{
Name: "name",
Schema: openrpcmanager.SchemaObject{"type": "string"},
},
},
Result: &openrpcmanager.ContentDescriptorObject{
Name: "result",
Schema: openrpcmanager.SchemaObject{"type": "string"},
},
},
{
Name: "secure.method",
Params: []openrpcmanager.ContentDescriptorObject{},
Result: &openrpcmanager.ContentDescriptorObject{
Name: "result",
Schema: openrpcmanager.SchemaObject{"type": "object"},
},
},
},
}
handlers := map[string]openrpcmanager.RPCHandler{
"test.method": func(params json.RawMessage) (interface{}, error) {
var request struct {
Name string `json:"name"`
}
if err := json.Unmarshal(params, &request); err != nil {
return nil, err
}
return "Hello, " + request.Name + "!", nil
},
"secure.method": func(params json.RawMessage) (interface{}, error) {
return map[string]interface{}{
"secure": true,
"data": "sensitive information",
}, nil
},
}
// Create and start OpenRPC manager and Unix server
manager, err := openrpcmanager.NewOpenRPCManager(schema, handlers, secret)
if err != nil {
t.Fatalf("Failed to create OpenRPCManager: %v", err)
}
server, err := openrpcmanager.NewUnixServer(manager, socketPath)
if err != nil {
t.Fatalf("Failed to create UnixServer: %v", err)
}
if err := server.Start(); err != nil {
t.Fatalf("Failed to start UnixServer: %v", err)
}
defer server.Stop()
// Wait for server to start
time.Sleep(100 * time.Millisecond)
// Create client
client := &MockClient{
BaseClient: BaseClient{
socketPath: socketPath,
secret: secret,
},
}
// Test Discover method
t.Run("Discover", func(t *testing.T) {
schema, err := client.Discover()
if err != nil {
t.Fatalf("Discover failed: %v", err)
}
if schema.OpenRPC != "1.2.6" {
t.Errorf("Expected OpenRPC version 1.2.6, got: %s", schema.OpenRPC)
}
if len(schema.Methods) < 2 {
t.Errorf("Expected at least 2 methods, got: %d", len(schema.Methods))
}
// Check if our test methods are in the schema
foundTestMethod := false
foundSecureMethod := false
for _, method := range schema.Methods {
if method.Name == "test.method" {
foundTestMethod = true
}
if method.Name == "secure.method" {
foundSecureMethod = true
}
}
if !foundTestMethod {
t.Error("test.method not found in schema")
}
if !foundSecureMethod {
t.Error("secure.method not found in schema")
}
})
// Test TestMethod
t.Run("TestMethod", func(t *testing.T) {
greeting, err := client.TestMethod("World")
if err != nil {
t.Fatalf("TestMethod failed: %v", err)
}
expected := "Hello, World!"
if greeting != expected {
t.Errorf("Expected greeting %q, got: %q", expected, greeting)
}
})
// Test Introspect method
t.Run("Introspect", func(t *testing.T) {
// Make several requests to generate logs
_, err := client.TestMethod("World")
if err != nil {
t.Fatalf("TestMethod failed: %v", err)
}
_, err = client.SecureMethod()
if err != nil {
t.Fatalf("SecureMethod failed: %v", err)
}
// Test introspection
response, err := client.Introspect(10, "", "")
if err != nil {
t.Fatalf("Introspect failed: %v", err)
}
// Verify we have logs
if response.Total < 2 {
t.Errorf("Expected at least 2 logs, got: %d", response.Total)
}
// Test filtering by method
response, err = client.Introspect(10, "test.method", "")
if err != nil {
t.Fatalf("Introspect with method filter failed: %v", err)
}
// Verify filtering works
for _, log := range response.Logs {
if log.Method != "test.method" {
t.Errorf("Expected only test.method logs, got: %s", log.Method)
}
}
// Test filtering by status
response, err = client.Introspect(10, "", "success")
if err != nil {
t.Fatalf("Introspect with status filter failed: %v", err)
}
// Verify status filtering works
for _, log := range response.Logs {
if log.Status != "success" {
t.Errorf("Expected only success logs, got: %s", log.Status)
}
}
})
// Test SecureMethod with valid secret
t.Run("SecureMethod", func(t *testing.T) {
data, err := client.SecureMethod()
if err != nil {
t.Fatalf("SecureMethod failed: %v", err)
}
secure, ok := data["secure"].(bool)
if !ok || !secure {
t.Errorf("Expected secure to be true, got: %v", data["secure"])
}
sensitiveData, ok := data["data"].(string)
if !ok || sensitiveData != "sensitive information" {
t.Errorf("Expected data to be 'sensitive information', got: %v", data["data"])
}
})
// Test SecureMethod with invalid secret
t.Run("SecureMethod with invalid secret", func(t *testing.T) {
invalidClient := &MockClient{
BaseClient: BaseClient{
socketPath: socketPath,
secret: "wrong-secret",
},
}
_, err := invalidClient.SecureMethod()
if err == nil {
t.Error("Expected error for invalid secret, but got nil")
}
})
// Test non-existent method
t.Run("Non-existent method", func(t *testing.T) {
_, err := client.Request("non.existent", json.RawMessage("{}"), "")
if err == nil {
t.Error("Expected error for non-existent method, but got nil")
}
})
}

View File

@ -1,113 +0,0 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/freeflowuniverse/heroagent/pkg/openrpcmanager"
)
func main() {
// Parse command line arguments
socketPath := flag.String("socket", "/tmp/openrpc.sock", "Path to the Unix socket")
secret := flag.String("secret", "test-secret", "Secret for authenticated methods")
flag.Parse()
// Create a simple OpenRPC schema
schema := openrpcmanager.OpenRPCSchema{
OpenRPC: "1.2.6",
Info: openrpcmanager.InfoObject{
Title: "Hero Launcher API",
Version: "1.0.0",
},
Methods: []openrpcmanager.MethodObject{
{
Name: "echo",
Params: []openrpcmanager.ContentDescriptorObject{
{
Name: "message",
Schema: openrpcmanager.SchemaObject{"type": "object"},
},
},
Result: &openrpcmanager.ContentDescriptorObject{
Name: "result",
Schema: openrpcmanager.SchemaObject{"type": "object"},
},
},
{
Name: "ping",
Params: []openrpcmanager.ContentDescriptorObject{},
Result: &openrpcmanager.ContentDescriptorObject{
Name: "result",
Schema: openrpcmanager.SchemaObject{"type": "string"},
},
},
{
Name: "secure.info",
Params: []openrpcmanager.ContentDescriptorObject{},
Result: &openrpcmanager.ContentDescriptorObject{
Name: "result",
Schema: openrpcmanager.SchemaObject{"type": "object"},
},
},
},
}
// Create handlers
handlers := map[string]openrpcmanager.RPCHandler{
"echo": func(params json.RawMessage) (interface{}, error) {
var data interface{}
if err := json.Unmarshal(params, &data); err != nil {
return nil, err
}
return data, nil
},
"ping": func(params json.RawMessage) (interface{}, error) {
return "pong", nil
},
"secure.info": func(params json.RawMessage) (interface{}, error) {
return map[string]interface{}{
"server": "Hero Launcher",
"version": "1.0.0",
"status": "running",
}, nil
},
}
// Create OpenRPC manager
manager, err := openrpcmanager.NewOpenRPCManager(schema, handlers, *secret)
if err != nil {
log.Fatalf("Failed to create OpenRPC manager: %v", err)
}
// Create Unix server
server, err := openrpcmanager.NewUnixServer(manager, *socketPath)
if err != nil {
log.Fatalf("Failed to create Unix server: %v", err)
}
// Start the server
if err := server.Start(); err != nil {
log.Fatalf("Failed to start Unix server: %v", err)
}
defer server.Stop()
fmt.Printf("OpenRPC server started on Unix socket: %s\n", *socketPath)
fmt.Println("Available methods:")
for _, method := range manager.ListMethods() {
fmt.Printf(" - %s\n", method)
}
fmt.Println("\nPress Ctrl+C to stop the server")
// Wait for interrupt signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
fmt.Println("\nShutting down...")
}

View File

@ -1,112 +0,0 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"net"
"os"
)
// RPCRequest represents an outgoing RPC request
type RPCRequest struct {
Method string `json:"method"`
Params json.RawMessage `json:"params"`
ID interface{} `json:"id,omitempty"`
Secret string `json:"secret,omitempty"`
JSONRPC string `json:"jsonrpc"`
}
// RPCResponse represents an incoming RPC response
type RPCResponse struct {
Result interface{} `json:"result,omitempty"`
Error *RPCError `json:"error,omitempty"`
ID interface{} `json:"id,omitempty"`
JSONRPC string `json:"jsonrpc"`
}
// RPCError represents an RPC error
type RPCError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
func main() {
// Parse command line arguments
socketPath := flag.String("socket", "/tmp/openrpc.sock", "Path to the Unix socket")
method := flag.String("method", "rpc.discover", "RPC method to call")
params := flag.String("params", "{}", "JSON parameters for the method")
secret := flag.String("secret", "", "Secret for authenticated methods")
flag.Parse()
// Connect to the Unix socket
conn, err := net.Dial("unix", *socketPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to Unix socket: %v\n", err)
os.Exit(1)
}
defer conn.Close()
// Create the request
var paramsJSON json.RawMessage
if err := json.Unmarshal([]byte(*params), &paramsJSON); err != nil {
fmt.Fprintf(os.Stderr, "Invalid JSON parameters: %v\n", err)
os.Exit(1)
}
request := RPCRequest{
Method: *method,
Params: paramsJSON,
ID: 1,
Secret: *secret,
JSONRPC: "2.0",
}
// Marshal the request
requestData, err := json.Marshal(request)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to marshal request: %v\n", err)
os.Exit(1)
}
// Send the request
_, err = conn.Write(requestData)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to send request: %v\n", err)
os.Exit(1)
}
// Read the response
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read response: %v\n", err)
os.Exit(1)
}
// Parse the response
var response RPCResponse
if err := json.Unmarshal(buf[:n], &response); err != nil {
fmt.Fprintf(os.Stderr, "Failed to unmarshal response: %v\n", err)
os.Exit(1)
}
// Check for errors
if response.Error != nil {
fmt.Fprintf(os.Stderr, "Error: %s (code: %d)\n", response.Error.Message, response.Error.Code)
if response.Error.Data != nil {
fmt.Fprintf(os.Stderr, "Error data: %v\n", response.Error.Data)
}
os.Exit(1)
}
// Print the result
resultJSON, err := json.MarshalIndent(response.Result, "", " ")
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to marshal result: %v\n", err)
os.Exit(1)
}
fmt.Println(string(resultJSON))
}

View File

@ -1,416 +0,0 @@
// Package openrpcmanager provides functionality for managing and handling OpenRPC method calls.
package openrpcmanager
import (
"encoding/json"
"fmt"
"sync"
"time"
)
// RPCHandler is a function that handles an OpenRPC method call
type RPCHandler func(params json.RawMessage) (interface{}, error)
// CallLog represents a log entry for an RPC method call
type CallLog struct {
Timestamp time.Time `json:"timestamp"` // When the call was made
Method string `json:"method"` // Method that was called
Params interface{} `json:"params"` // Parameters passed to the method (may be redacted for security)
Duration time.Duration `json:"duration"` // How long the call took to execute
Status string `json:"status"` // Success or error
ErrorMsg string `json:"error,omitempty"` // Error message if status is error
Authenticated bool `json:"authenticated"` // Whether the call was authenticated
}
// OpenRPCManager manages OpenRPC method handlers and processes requests
type OpenRPCManager struct {
handlers map[string]RPCHandler
schema OpenRPCSchema
mutex sync.RWMutex
secret string
// Call logging
callLogs []CallLog
callLogsMutex sync.RWMutex
maxCallLogs int // Maximum number of call logs to keep
}
// NewOpenRPCManager creates a new OpenRPC manager with the given schema and handlers
func NewOpenRPCManager(schema OpenRPCSchema, handlers map[string]RPCHandler, secret string) (*OpenRPCManager, error) {
manager := &OpenRPCManager{
handlers: make(map[string]RPCHandler),
schema: schema,
secret: secret,
callLogs: make([]CallLog, 0, 100),
maxCallLogs: 1000, // Default to keeping the last 1000 calls
}
// Validate that all methods in the schema have corresponding handlers
for _, method := range schema.Methods {
handler, exists := handlers[method.Name]
if !exists {
return nil, fmt.Errorf("missing handler for method '%s' defined in schema", method.Name)
}
manager.handlers[method.Name] = handler
}
// Check for handlers that don't have a corresponding method in the schema
for name := range handlers {
found := false
for _, method := range schema.Methods {
if method.Name == name {
found = true
break
}
}
if !found {
return nil, fmt.Errorf("handler '%s' has no corresponding method in schema", name)
}
}
// Add the discovery method
manager.handlers["rpc.discover"] = manager.handleDiscovery
// Add the introspection method
manager.handlers["rpc.introspect"] = manager.handleIntrospection
return manager, nil
}
// handleDiscovery implements the OpenRPC service discovery method
func (m *OpenRPCManager) handleDiscovery(params json.RawMessage) (interface{}, error) {
return m.schema, nil
}
// handleIntrospection implements the OpenRPC service introspection method
// It returns information about recent RPC calls for monitoring and debugging
func (m *OpenRPCManager) handleIntrospection(params json.RawMessage) (interface{}, error) {
m.callLogsMutex.RLock()
defer m.callLogsMutex.RUnlock()
// Parse parameters to see if we need to filter or limit results
var requestParams struct {
Limit int `json:"limit,omitempty"`
Method string `json:"method,omitempty"`
Status string `json:"status,omitempty"`
}
// Default limit to 100 if not specified
requestParams.Limit = 100
// Try to parse parameters, but don't fail if they're invalid
if len(params) > 0 {
_ = json.Unmarshal(params, &requestParams)
}
// Apply limit
if requestParams.Limit <= 0 || requestParams.Limit > m.maxCallLogs {
requestParams.Limit = 100
}
// Create a copy of the logs to avoid holding the lock while filtering
allLogs := make([]CallLog, len(m.callLogs))
copy(allLogs, m.callLogs)
// Filter logs based on parameters
filteredLogs := []CallLog{}
for i := len(allLogs) - 1; i >= 0 && len(filteredLogs) < requestParams.Limit; i-- {
log := allLogs[i]
// Apply method filter if specified
if requestParams.Method != "" && log.Method != requestParams.Method {
continue
}
// Apply status filter if specified
if requestParams.Status != "" && log.Status != requestParams.Status {
continue
}
filteredLogs = append(filteredLogs, log)
}
// Create response
response := struct {
Logs []CallLog `json:"logs"`
Total int `json:"total"`
Filtered int `json:"filtered"`
}{
Logs: filteredLogs,
Total: len(allLogs),
Filtered: len(filteredLogs),
}
return response, nil
}
// RegisterHandler is deprecated. Use NewOpenRPCManager with a complete set of handlers instead.
// This method is kept for backward compatibility but will return an error for methods not in the schema.
func (m *OpenRPCManager) RegisterHandler(method string, handler RPCHandler) error {
m.mutex.Lock()
defer m.mutex.Unlock()
// Check if handler already exists
if _, exists := m.handlers[method]; exists {
return fmt.Errorf("handler for method '%s' already registered", method)
}
// Check if method exists in schema
found := false
for _, schemaMethod := range m.schema.Methods {
if schemaMethod.Name == method {
found = true
break
}
}
if !found && method != "rpc.discover" {
return fmt.Errorf("method '%s' not defined in schema", method)
}
m.handlers[method] = handler
return nil
}
// UnregisterHandler removes a handler for the specified method
// Note: This will make the service non-compliant with its schema
func (m *OpenRPCManager) UnregisterHandler(method string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
// Check if handler exists
if _, exists := m.handlers[method]; !exists {
return fmt.Errorf("handler for method '%s' not found", method)
}
// Don't allow unregistering the discovery method
if method == "rpc.discover" {
return fmt.Errorf("cannot unregister the discovery method 'rpc.discover'")
}
delete(m.handlers, method)
return nil
}
// HandleRequest processes an RPC request for the specified method
func (m *OpenRPCManager) HandleRequest(method string, params json.RawMessage) (interface{}, error) {
// Start timing the request
startTime := time.Now()
// Create a call log entry
callLog := CallLog{
Timestamp: startTime,
Method: method,
Authenticated: false,
}
// Parse params for logging, but don't fail if we can't
var parsedParams interface{}
if len(params) > 0 {
if err := json.Unmarshal(params, &parsedParams); err == nil {
callLog.Params = parsedParams
} else {
// If we can't parse the params, just store them as a string
callLog.Params = string(params)
}
}
// Find the handler
m.mutex.RLock()
handler, exists := m.handlers[method]
m.mutex.RUnlock()
if !exists {
// Log the error
callLog.Status = "error"
callLog.ErrorMsg = fmt.Sprintf("method '%s' not found", method)
callLog.Duration = time.Since(startTime)
// Add to call logs
m.logCall(callLog)
return nil, fmt.Errorf("method '%s' not found", method)
}
// Execute the handler
result, err := handler(params)
// Complete the call log
callLog.Duration = time.Since(startTime)
if err != nil {
callLog.Status = "error"
callLog.ErrorMsg = err.Error()
} else {
callLog.Status = "success"
}
// Add to call logs
m.logCall(callLog)
return result, err
}
// logCall adds a call log entry to the call logs, maintaining the maximum size
func (m *OpenRPCManager) logCall(log CallLog) {
m.callLogsMutex.Lock()
defer m.callLogsMutex.Unlock()
// Add the log to the call logs
m.callLogs = append(m.callLogs, log)
// Trim the call logs if they exceed the maximum size
if len(m.callLogs) > m.maxCallLogs {
m.callLogs = m.callLogs[len(m.callLogs)-m.maxCallLogs:]
}
}
// HandleRequestWithAuthentication processes an authenticated RPC request
func (m *OpenRPCManager) HandleRequestWithAuthentication(method string, params json.RawMessage, secret string) (interface{}, error) {
// Start timing the request
startTime := time.Now()
// Create a call log entry
callLog := CallLog{
Timestamp: startTime,
Method: method,
Authenticated: true,
}
// Parse params for logging, but don't fail if we can't
var parsedParams interface{}
if len(params) > 0 {
if err := json.Unmarshal(params, &parsedParams); err == nil {
callLog.Params = parsedParams
} else {
// If we can't parse the params, just store them as a string
callLog.Params = string(params)
}
}
// Verify the secret
if secret != m.secret {
// Log the authentication failure
callLog.Status = "error"
callLog.ErrorMsg = "authentication failed"
callLog.Duration = time.Since(startTime)
m.logCall(callLog)
return nil, fmt.Errorf("authentication failed")
}
// Execute the handler
m.mutex.RLock()
handler, exists := m.handlers[method]
m.mutex.RUnlock()
if !exists {
// Log the error
callLog.Status = "error"
callLog.ErrorMsg = fmt.Sprintf("method '%s' not found", method)
callLog.Duration = time.Since(startTime)
m.logCall(callLog)
return nil, fmt.Errorf("method '%s' not found", method)
}
// Execute the handler
result, err := handler(params)
// Complete the call log
callLog.Duration = time.Since(startTime)
if err != nil {
callLog.Status = "error"
callLog.ErrorMsg = err.Error()
} else {
callLog.Status = "success"
}
// Add to call logs
m.logCall(callLog)
return result, err
}
// ListMethods returns a list of all registered method names
func (m *OpenRPCManager) ListMethods() []string {
m.mutex.RLock()
defer m.mutex.RUnlock()
methods := make([]string, 0, len(m.handlers))
for method := range m.handlers {
methods = append(methods, method)
}
return methods
}
// GetSecret returns the authentication secret
func (m *OpenRPCManager) GetSecret() string {
return m.secret
}
// GetSchema returns the OpenRPC schema
func (m *OpenRPCManager) GetSchema() OpenRPCSchema {
return m.schema
}
// NewDefaultOpenRPCManager creates a new OpenRPC manager with a default schema
// This is provided for backward compatibility and testing
func NewDefaultOpenRPCManager(secret string) *OpenRPCManager {
// Create a minimal default schema
defaultSchema := OpenRPCSchema{
OpenRPC: "1.2.6",
Info: InfoObject{
Title: "Default OpenRPC Service",
Version: "1.0.0",
},
Methods: []MethodObject{
{
Name: "rpc.discover",
Description: "Returns the OpenRPC schema for this service",
Params: []ContentDescriptorObject{},
Result: &ContentDescriptorObject{
Name: "schema",
Description: "The OpenRPC schema",
Schema: SchemaObject{"type": "object"},
},
},
{
Name: "rpc.introspect",
Description: "Returns information about recent RPC calls for monitoring and debugging",
Params: []ContentDescriptorObject{
{
Name: "limit",
Description: "Maximum number of call logs to return",
Required: false,
Schema: SchemaObject{"type": "integer", "default": 100},
},
{
Name: "method",
Description: "Filter logs by method name",
Required: false,
Schema: SchemaObject{"type": "string"},
},
{
Name: "status",
Description: "Filter logs by status (success or error)",
Required: false,
Schema: SchemaObject{"type": "string", "enum": []interface{}{"success", "error"}},
},
},
Result: &ContentDescriptorObject{
Name: "introspection",
Description: "Introspection data including call logs",
Schema: SchemaObject{"type": "object"},
},
},
},
}
// Create the manager directly without validation since we're starting with empty methods
return &OpenRPCManager{
handlers: make(map[string]RPCHandler),
schema: defaultSchema,
secret: secret,
}
}

View File

@ -1,446 +0,0 @@
package openrpcmanager
import (
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// createTestSchema creates a test OpenRPC schema
func createTestSchema() OpenRPCSchema {
return OpenRPCSchema{
OpenRPC: "1.2.6",
Info: InfoObject{
Title: "Test API",
Version: "1.0.0",
},
Methods: []MethodObject{
{
Name: "echo",
Params: []ContentDescriptorObject{
{
Name: "message",
Schema: SchemaObject{"type": "object"},
},
},
Result: &ContentDescriptorObject{
Name: "result",
Schema: SchemaObject{"type": "object"},
},
},
{
Name: "add",
Params: []ContentDescriptorObject{
{
Name: "numbers",
Schema: SchemaObject{"type": "object"},
},
},
Result: &ContentDescriptorObject{
Name: "result",
Schema: SchemaObject{"type": "number"},
},
},
{
Name: "secure.method",
Params: []ContentDescriptorObject{},
Result: &ContentDescriptorObject{
Name: "result",
Schema: SchemaObject{"type": "string"},
},
},
},
}
}
// createTestHandlers creates test handlers for the schema
func createTestHandlers() map[string]RPCHandler {
return map[string]RPCHandler{
"echo": func(params json.RawMessage) (interface{}, error) {
var data interface{}
if err := json.Unmarshal(params, &data); err != nil {
return nil, err
}
return data, nil
},
"add": func(params json.RawMessage) (interface{}, error) {
var numbers struct {
A float64 `json:"a"`
B float64 `json:"b"`
}
if err := json.Unmarshal(params, &numbers); err != nil {
return nil, err
}
return numbers.A + numbers.B, nil
},
"secure.method": func(params json.RawMessage) (interface{}, error) {
return "secure data", nil
},
}
}
// TestNewOpenRPCManager tests the creation of a new OpenRPC manager
func TestNewOpenRPCManager(t *testing.T) {
secret := "test-secret"
schema := createTestSchema()
handlers := createTestHandlers()
manager, err := NewOpenRPCManager(schema, handlers, secret)
if err != nil {
t.Fatalf("Failed to create OpenRPCManager: %v", err)
}
if manager == nil {
t.Fatal("Manager is nil")
}
if manager.GetSecret() != secret {
t.Errorf("Secret mismatch. Expected: %s, Got: %s", secret, manager.GetSecret())
}
if manager.handlers == nil {
t.Error("handlers map not initialized")
}
// Test the default manager for backward compatibility
defaultManager := NewDefaultOpenRPCManager(secret)
if defaultManager == nil {
t.Fatal("Default manager is nil")
}
if defaultManager.GetSecret() != secret {
t.Errorf("Secret mismatch in default manager. Expected: %s, Got: %s", secret, defaultManager.GetSecret())
}
}
// TestRegisterHandler tests registering a handler to the OpenRPC manager
func TestRegisterHandler(t *testing.T) {
manager := NewDefaultOpenRPCManager("test-secret")
// Define a mock handler
mockHandler := func(params json.RawMessage) (interface{}, error) {
return "mock response", nil
}
// Add a method to the schema
manager.schema.Methods = append(manager.schema.Methods, MethodObject{
Name: "test.method",
Params: []ContentDescriptorObject{},
Result: &ContentDescriptorObject{
Name: "result",
Schema: SchemaObject{"type": "string"},
},
})
// Register the handler
err := manager.RegisterHandler("test.method", mockHandler)
if err != nil {
t.Fatalf("Failed to register handler: %v", err)
}
// Check if handler was registered
if _, exists := manager.handlers["test.method"]; !exists {
t.Error("Handler was not registered")
}
// Try to register the same handler again, should fail
err = manager.RegisterHandler("test.method", mockHandler)
if err == nil {
t.Error("Expected error when registering duplicate handler, but got nil")
}
// Try to register a handler for a method not in the schema
err = manager.RegisterHandler("not.in.schema", mockHandler)
if err == nil {
t.Error("Expected error when registering handler for method not in schema, but got nil")
}
}
// TestHandleRequest tests handling an RPC request
func TestHandleRequest(t *testing.T) {
schema := createTestSchema()
handlers := createTestHandlers()
manager, err := NewOpenRPCManager(schema, handlers, "test-secret")
if err != nil {
t.Fatalf("Failed to create OpenRPCManager: %v", err)
}
// Test the echo handler
testParams := json.RawMessage(`{"message":"hello world"}`)
result, err := manager.HandleRequest("echo", testParams)
if err != nil {
t.Fatalf("Failed to handle request: %v", err)
}
// Convert result to map for comparison
resultMap, ok := result.(map[string]interface{})
if !ok {
t.Fatalf("Expected map result, got: %T", result)
}
if resultMap["message"] != "hello world" {
t.Errorf("Expected 'hello world', got: %v", resultMap["message"])
}
// Test the add handler
addParams := json.RawMessage(`{"a":5,"b":7}`)
addResult, err := manager.HandleRequest("add", addParams)
if err != nil {
t.Fatalf("Failed to handle add request: %v", err)
}
// Check the result type and value
resultValue, ok := addResult.(float64)
if !ok {
t.Fatalf("Expected result type float64, got: %T", addResult)
}
if resultValue != float64(12) {
t.Errorf("Expected 12, got: %v", resultValue)
}
// Test with non-existent method
_, err = manager.HandleRequest("nonexistent", testParams)
if err == nil {
t.Error("Expected error for non-existent method, but got nil")
}
// Test the discovery method
discoveryResult, err := manager.HandleRequest("rpc.discover", json.RawMessage(`{}`))
if err != nil {
t.Fatalf("Failed to handle discovery request: %v", err)
}
// Verify the discovery result is the schema
discoverySchema, ok := discoveryResult.(OpenRPCSchema)
if !ok {
t.Fatalf("Expected OpenRPCSchema result, got: %T", discoveryResult)
}
if discoverySchema.OpenRPC != schema.OpenRPC {
t.Errorf("Expected OpenRPC version %s, got: %s", schema.OpenRPC, discoverySchema.OpenRPC)
}
if len(discoverySchema.Methods) != len(schema.Methods) {
t.Errorf("Expected %d methods, got: %d", len(schema.Methods), len(discoverySchema.Methods))
}
}
// TestHandleRequestWithAuthentication tests handling a request with authentication
func TestHandleRequestWithAuthentication(t *testing.T) {
secret := "test-secret"
schema := createTestSchema()
handlers := createTestHandlers()
manager, err := NewOpenRPCManager(schema, handlers, secret)
if err != nil {
t.Fatalf("Failed to create OpenRPCManager: %v", err)
}
// Test with correct secret
result, err := manager.HandleRequestWithAuthentication("secure.method", json.RawMessage(`{}`), secret)
if err != nil {
t.Fatalf("Failed to handle authenticated request: %v", err)
}
if result != "secure data" {
t.Errorf("Expected 'secure data', got: %v", result)
}
// Test with incorrect secret
_, err = manager.HandleRequestWithAuthentication("secure.method", json.RawMessage(`{}`), "wrong-secret")
if err == nil {
t.Error("Expected authentication error, but got nil")
}
}
// TestUnregisterHandler tests removing a handler from the OpenRPC manager
func TestUnregisterHandler(t *testing.T) {
manager := NewDefaultOpenRPCManager("test-secret")
// Define a mock handler
mockHandler := func(params json.RawMessage) (interface{}, error) {
return "mock response", nil
}
// Add a method to the schema
manager.schema.Methods = append(manager.schema.Methods, MethodObject{
Name: "test.method",
Params: []ContentDescriptorObject{},
Result: &ContentDescriptorObject{
Name: "result",
Schema: SchemaObject{"type": "string"},
},
})
// Register the handler
manager.RegisterHandler("test.method", mockHandler)
// Unregister the handler
err := manager.UnregisterHandler("test.method")
if err != nil {
t.Fatalf("Failed to unregister handler: %v", err)
}
// Check if handler was unregistered
if _, exists := manager.handlers["test.method"]; exists {
t.Error("Handler was not unregistered")
}
// Try to unregister non-existent handler
err = manager.UnregisterHandler("nonexistent")
if err == nil {
t.Error("Expected error when unregistering non-existent handler, but got nil")
}
// Try to unregister the discovery method
err = manager.UnregisterHandler("rpc.discover")
if err == nil {
t.Error("Expected error when unregistering discovery method, but got nil")
}
}
// TestIntrospection tests the introspection functionality
func TestIntrospection(t *testing.T) {
// Create a test manager
schema := createTestSchema()
handlers := createTestHandlers()
manager, err := NewOpenRPCManager(schema, handlers, "test-secret")
assert.NoError(t, err)
// The introspection handler is already registered in NewOpenRPCManager
// Make some test calls to generate logs
_, err = manager.HandleRequest("echo", json.RawMessage(`{"message":"hello"}`))
assert.NoError(t, err)
_, err = manager.HandleRequestWithAuthentication("echo", json.RawMessage(`{"message":"authenticated"}`), "test-secret")
assert.NoError(t, err)
_, err = manager.HandleRequestWithAuthentication("echo", json.RawMessage(`{"message":"auth-fail"}`), "wrong-secret")
assert.Error(t, err)
// Wait a moment to ensure timestamps are different
time.Sleep(10 * time.Millisecond)
// Call the introspection handler
result, err := manager.handleIntrospection(json.RawMessage(`{"limit":10}`))
assert.NoError(t, err)
// Verify the result
response, ok := result.(struct {
Logs []CallLog `json:"logs"`
Total int `json:"total"`
Filtered int `json:"filtered"`
})
assert.True(t, ok)
// Should have 3 logs (2 successful calls, 1 auth failure)
assert.Equal(t, 3, response.Total)
assert.Equal(t, 3, response.Filtered)
assert.Len(t, response.Logs, 3)
// Test filtering by method
result, err = manager.handleIntrospection(json.RawMessage(`{"method":"echo"}`))
assert.NoError(t, err)
response, ok = result.(struct {
Logs []CallLog `json:"logs"`
Total int `json:"total"`
Filtered int `json:"filtered"`
})
assert.True(t, ok)
assert.Equal(t, 3, response.Total) // Total is still 3
assert.Equal(t, 3, response.Filtered) // All 3 match the method filter
// Test filtering by status
result, err = manager.handleIntrospection(json.RawMessage(`{"status":"error"}`))
assert.NoError(t, err)
response, ok = result.(struct {
Logs []CallLog `json:"logs"`
Total int `json:"total"`
Filtered int `json:"filtered"`
})
assert.True(t, ok)
assert.Equal(t, 3, response.Total) // Total is still 3
assert.Equal(t, 1, response.Filtered) // Only 1 error
assert.Len(t, response.Logs, 1)
assert.Equal(t, "error", response.Logs[0].Status)
}
// TestListMethods tests listing all registered methods
func TestListMethods(t *testing.T) {
schema := createTestSchema()
handlers := createTestHandlers()
manager, err := NewOpenRPCManager(schema, handlers, "test-secret")
if err != nil {
t.Fatalf("Failed to create OpenRPCManager: %v", err)
}
// List all methods
registeredMethods := manager.ListMethods()
// Check if all methods plus discovery and introspection methods are listed
expectedCount := len(schema.Methods) + 2 // +2 for rpc.discover and rpc.introspect
if len(registeredMethods) != expectedCount {
t.Errorf("Expected %d methods, got %d", expectedCount, len(registeredMethods))
}
// Check if all schema methods are in the list
for _, methodObj := range schema.Methods {
found := false
for _, registeredMethod := range registeredMethods {
if registeredMethod == methodObj.Name {
found = true
break
}
}
if !found {
t.Errorf("Method %s not found in list", methodObj.Name)
}
}
// Check if discovery method is in the list
found := false
for _, registeredMethod := range registeredMethods {
if registeredMethod == "rpc.discover" {
found = true
break
}
}
if !found {
t.Error("Discovery method 'rpc.discover' not found in list")
}
}
// TestSchemaValidation tests that the schema validation works correctly
func TestSchemaValidation(t *testing.T) {
secret := "test-secret"
schema := createTestSchema()
// Test with missing handler
incompleteHandlers := map[string]RPCHandler{
"echo": func(params json.RawMessage) (interface{}, error) {
return nil, nil
},
// Missing "add" handler
"secure.method": func(params json.RawMessage) (interface{}, error) {
return nil, nil
},
}
_, err := NewOpenRPCManager(schema, incompleteHandlers, secret)
if err == nil {
t.Error("Expected error when missing handler for schema method, but got nil")
}
// Test with extra handler not in schema
extraHandlers := createTestHandlers()
extraHandlers["not.in.schema"] = func(params json.RawMessage) (interface{}, error) {
return nil, nil
}
_, err = NewOpenRPCManager(schema, extraHandlers, secret)
if err == nil {
t.Error("Expected error when handler has no corresponding method in schema, but got nil")
}
}

View File

@ -1,117 +0,0 @@
package openrpcmanager
// OpenRPCSchema represents the OpenRPC specification document
// Based on OpenRPC Specification 1.2.6: https://spec.open-rpc.org/
type OpenRPCSchema struct {
OpenRPC string `json:"openrpc"` // Required: Version of the OpenRPC specification
Info InfoObject `json:"info"` // Required: Information about the API
Methods []MethodObject `json:"methods"` // Required: List of method objects
ExternalDocs *ExternalDocsObject `json:"externalDocs,omitempty"` // Optional: External documentation
Servers []ServerObject `json:"servers,omitempty"` // Optional: List of servers
Components *ComponentsObject `json:"components,omitempty"` // Optional: Reusable components
}
// InfoObject provides metadata about the API
type InfoObject struct {
Title string `json:"title"` // Required: Title of the API
Description string `json:"description,omitempty"` // Optional: Description of the API
Version string `json:"version"` // Required: Version of the API
TermsOfService string `json:"termsOfService,omitempty"` // Optional: Terms of service URL
Contact *ContactObject `json:"contact,omitempty"` // Optional: Contact information
License *LicenseObject `json:"license,omitempty"` // Optional: License information
}
// ContactObject provides contact information for the API
type ContactObject struct {
Name string `json:"name,omitempty"` // Optional: Name of the contact
URL string `json:"url,omitempty"` // Optional: URL of the contact
Email string `json:"email,omitempty"` // Optional: Email of the contact
}
// LicenseObject provides license information for the API
type LicenseObject struct {
Name string `json:"name"` // Required: Name of the license
URL string `json:"url,omitempty"` // Optional: URL of the license
}
// ExternalDocsObject provides a URL to external documentation
type ExternalDocsObject struct {
Description string `json:"description,omitempty"` // Optional: Description of the external docs
URL string `json:"url"` // Required: URL of the external docs
}
// ServerObject provides connection information to a server
type ServerObject struct {
Name string `json:"name,omitempty"` // Optional: Name of the server
Description string `json:"description,omitempty"` // Optional: Description of the server
URL string `json:"url"` // Required: URL of the server
Variables map[string]ServerVariable `json:"variables,omitempty"` // Optional: Server variables
}
// ServerVariable is a variable for server URL template substitution
type ServerVariable struct {
Default string `json:"default"` // Required: Default value of the variable
Description string `json:"description,omitempty"` // Optional: Description of the variable
Enum []string `json:"enum,omitempty"` // Optional: Enumeration of possible values
}
// MethodObject describes an RPC method
type MethodObject struct {
Name string `json:"name"` // Required: Name of the method
Description string `json:"description,omitempty"` // Optional: Description of the method
Summary string `json:"summary,omitempty"` // Optional: Summary of the method
Params []ContentDescriptorObject `json:"params"` // Required: List of parameters
Result *ContentDescriptorObject `json:"result"` // Required: Description of the result
Deprecated bool `json:"deprecated,omitempty"` // Optional: Whether the method is deprecated
Errors []ErrorObject `json:"errors,omitempty"` // Optional: List of possible errors
Tags []TagObject `json:"tags,omitempty"` // Optional: List of tags
ExternalDocs *ExternalDocsObject `json:"externalDocs,omitempty"` // Optional: External documentation
ParamStructure string `json:"paramStructure,omitempty"` // Optional: Structure of the parameters
}
// ContentDescriptorObject describes the content of a parameter or result
type ContentDescriptorObject struct {
Name string `json:"name"` // Required: Name of the parameter
Description string `json:"description,omitempty"` // Optional: Description of the parameter
Summary string `json:"summary,omitempty"` // Optional: Summary of the parameter
Required bool `json:"required,omitempty"` // Optional: Whether the parameter is required
Deprecated bool `json:"deprecated,omitempty"` // Optional: Whether the parameter is deprecated
Schema SchemaObject `json:"schema"` // Required: JSON Schema of the parameter
}
// SchemaObject is a JSON Schema definition
// This is a simplified version, in a real implementation you would use a full JSON Schema library
type SchemaObject map[string]interface{}
// ErrorObject describes an error that may be returned
type ErrorObject struct {
Code int `json:"code"` // Required: Error code
Message string `json:"message"` // Required: Error message
Data interface{} `json:"data,omitempty"` // Optional: Additional error data
}
// TagObject describes a tag for documentation purposes
type TagObject struct {
Name string `json:"name"` // Required: Name of the tag
Description string `json:"description,omitempty"` // Optional: Description of the tag
ExternalDocs *ExternalDocsObject `json:"externalDocs,omitempty"` // Optional: External documentation
}
// ComponentsObject holds reusable objects for different aspects of the OpenRPC spec
type ComponentsObject struct {
Schemas map[string]SchemaObject `json:"schemas,omitempty"` // Optional: Reusable schemas
ContentDescriptors map[string]ContentDescriptorObject `json:"contentDescriptors,omitempty"` // Optional: Reusable content descriptors
Examples map[string]interface{} `json:"examples,omitempty"` // Optional: Reusable examples
Links map[string]LinkObject `json:"links,omitempty"` // Optional: Reusable links
Errors map[string]ErrorObject `json:"errors,omitempty"` // Optional: Reusable errors
}
// LinkObject describes a link between operations
type LinkObject struct {
Name string `json:"name,omitempty"` // Optional: Name of the link
Description string `json:"description,omitempty"` // Optional: Description of the link
Summary string `json:"summary,omitempty"` // Optional: Summary of the link
Method string `json:"method"` // Required: Method name
Params map[string]interface{} `json:"params,omitempty"` // Optional: Parameters for the method
Server *ServerObject `json:"server,omitempty"` // Optional: Server for the method
}

View File

@ -1,241 +0,0 @@
package openrpcmanager
import (
"encoding/json"
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
)
// RPCRequest represents an incoming RPC request
type RPCRequest struct {
Method string `json:"method"`
Params json.RawMessage `json:"params"`
ID interface{} `json:"id,omitempty"`
Secret string `json:"secret,omitempty"`
JSONRPC string `json:"jsonrpc"`
}
// RPCResponse represents an outgoing RPC response
type RPCResponse struct {
Result interface{} `json:"result,omitempty"`
Error *RPCError `json:"error,omitempty"`
ID interface{} `json:"id,omitempty"`
JSONRPC string `json:"jsonrpc"`
}
// RPCError represents an RPC error
type RPCError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// UnixServer represents a Unix socket server for the OpenRPC manager
type UnixServer struct {
manager *OpenRPCManager
socketPath string
listener net.Listener
connections map[net.Conn]bool
mutex sync.Mutex
wg sync.WaitGroup
done chan struct{}
}
// NewUnixServer creates a new Unix socket server for the OpenRPC manager
func NewUnixServer(manager *OpenRPCManager, socketPath string) (*UnixServer, error) {
// Create directory if it doesn't exist
dir := filepath.Dir(socketPath)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to create socket directory: %w", err)
}
// Remove socket if it already exists
if _, err := os.Stat(socketPath); err == nil {
if err := os.Remove(socketPath); err != nil {
return nil, fmt.Errorf("failed to remove existing socket: %w", err)
}
}
return &UnixServer{
manager: manager,
socketPath: socketPath,
connections: make(map[net.Conn]bool),
done: make(chan struct{}),
}, nil
}
// Start starts the Unix socket server
func (s *UnixServer) Start() error {
listener, err := net.Listen("unix", s.socketPath)
if err != nil {
return fmt.Errorf("failed to listen on unix socket: %w", err)
}
s.listener = listener
// Set socket permissions
if err := os.Chmod(s.socketPath, 0660); err != nil {
s.listener.Close()
return fmt.Errorf("failed to set socket permissions: %w", err)
}
s.wg.Add(1)
go s.acceptConnections()
return nil
}
// Stop stops the Unix socket server
func (s *UnixServer) Stop() error {
close(s.done)
// Close the listener
if s.listener != nil {
s.listener.Close()
}
// Close all connections
s.mutex.Lock()
for conn := range s.connections {
conn.Close()
}
s.mutex.Unlock()
// Wait for all goroutines to finish
s.wg.Wait()
// Remove the socket file
os.Remove(s.socketPath)
return nil
}
// acceptConnections accepts incoming connections
func (s *UnixServer) acceptConnections() {
defer s.wg.Done()
for {
select {
case <-s.done:
return
default:
conn, err := s.listener.Accept()
if err != nil {
select {
case <-s.done:
return
default:
fmt.Printf("Error accepting connection: %v\n", err)
continue
}
}
s.mutex.Lock()
s.connections[conn] = true
s.mutex.Unlock()
s.wg.Add(1)
go s.handleConnection(conn)
}
}
}
// handleConnection handles a client connection
func (s *UnixServer) handleConnection(conn net.Conn) {
defer func() {
s.mutex.Lock()
delete(s.connections, conn)
s.mutex.Unlock()
conn.Close()
s.wg.Done()
}()
buf := make([]byte, 4096)
for {
select {
case <-s.done:
return
default:
n, err := conn.Read(buf)
if err != nil {
if err != io.EOF {
fmt.Printf("Error reading from connection: %v\n", err)
}
return
}
if n > 0 {
go s.handleRequest(conn, buf[:n])
}
}
}
}
// handleRequest processes an RPC request
func (s *UnixServer) handleRequest(conn net.Conn, data []byte) {
var req RPCRequest
if err := json.Unmarshal(data, &req); err != nil {
s.sendErrorResponse(conn, nil, -32700, "Parse error", err)
return
}
// Validate JSON-RPC version
if req.JSONRPC != "2.0" {
s.sendErrorResponse(conn, req.ID, -32600, "Invalid Request", "Invalid JSON-RPC version")
return
}
var result interface{}
var err error
// Check if authentication is required
if req.Secret != "" {
result, err = s.manager.HandleRequestWithAuthentication(req.Method, req.Params, req.Secret)
} else {
result, err = s.manager.HandleRequest(req.Method, req.Params)
}
if err != nil {
s.sendErrorResponse(conn, req.ID, -32603, "Internal error", err.Error())
return
}
// Send success response
response := RPCResponse{
Result: result,
ID: req.ID,
JSONRPC: "2.0",
}
responseData, err := json.Marshal(response)
if err != nil {
s.sendErrorResponse(conn, req.ID, -32603, "Internal error", err.Error())
return
}
conn.Write(responseData)
}
// sendErrorResponse sends an error response
func (s *UnixServer) sendErrorResponse(conn net.Conn, id interface{}, code int, message string, data interface{}) {
response := RPCResponse{
Error: &RPCError{
Code: code,
Message: message,
Data: data,
},
ID: id,
JSONRPC: "2.0",
}
responseData, err := json.Marshal(response)
if err != nil {
fmt.Printf("Error marshaling error response: %v\n", err)
return
}
conn.Write(responseData)
}

View File

@ -1,362 +0,0 @@
package openrpcmanager
import (
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"testing"
"time"
)
func TestUnixServer(t *testing.T) {
// Create a temporary socket path
tempDir, err := os.MkdirTemp("", "openrpc-test")
if err != nil {
t.Fatalf("Failed to create temp directory: %v", err)
}
defer os.RemoveAll(tempDir)
socketPath := filepath.Join(tempDir, "openrpc.sock")
// Create OpenRPC manager
schema := createTestSchema()
handlers := createTestHandlers()
manager, err := NewOpenRPCManager(schema, handlers, "test-secret")
if err != nil {
t.Fatalf("Failed to create OpenRPCManager: %v", err)
}
// Create and start Unix server
server, err := NewUnixServer(manager, socketPath)
if err != nil {
t.Fatalf("Failed to create UnixServer: %v", err)
}
if err := server.Start(); err != nil {
t.Fatalf("Failed to start UnixServer: %v", err)
}
defer server.Stop()
// Wait for server to start
time.Sleep(100 * time.Millisecond)
// Test connection
conn, err := net.Dial("unix", socketPath)
if err != nil {
t.Fatalf("Failed to connect to Unix socket: %v", err)
}
defer conn.Close()
// Test echo method
t.Run("Echo method", func(t *testing.T) {
request := RPCRequest{
Method: "echo",
Params: json.RawMessage(`{"message":"hello world"}`),
ID: 1,
JSONRPC: "2.0",
}
requestData, err := json.Marshal(request)
if err != nil {
t.Fatalf("Failed to marshal request: %v", err)
}
_, err = conn.Write(requestData)
if err != nil {
t.Fatalf("Failed to send request: %v", err)
}
// Read response
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if err != nil {
t.Fatalf("Failed to read response: %v", err)
}
var response RPCResponse
if err := json.Unmarshal(buf[:n], &response); err != nil {
t.Fatalf("Failed to unmarshal response: %v", err)
}
// Check response
if response.Error != nil {
t.Fatalf("Received error response: %v", response.Error)
}
// Note: JSON unmarshaling may convert numbers to float64, so we need to check the value not exact type
if fmt.Sprintf("%v", response.ID) != fmt.Sprintf("%v", request.ID) {
t.Errorf("Response ID mismatch. Expected: %v, Got: %v", request.ID, response.ID)
}
// Check result
resultMap, ok := response.Result.(map[string]interface{})
if !ok {
t.Fatalf("Expected map result, got: %T", response.Result)
}
if resultMap["message"] != "hello world" {
t.Errorf("Expected 'hello world', got: %v", resultMap["message"])
}
})
// Test add method
t.Run("Add method", func(t *testing.T) {
request := RPCRequest{
Method: "add",
Params: json.RawMessage(`{"a":5,"b":7}`),
ID: 2,
JSONRPC: "2.0",
}
requestData, err := json.Marshal(request)
if err != nil {
t.Fatalf("Failed to marshal request: %v", err)
}
_, err = conn.Write(requestData)
if err != nil {
t.Fatalf("Failed to send request: %v", err)
}
// Read response
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if err != nil {
t.Fatalf("Failed to read response: %v", err)
}
var response RPCResponse
if err := json.Unmarshal(buf[:n], &response); err != nil {
t.Fatalf("Failed to unmarshal response: %v", err)
}
// Check response
if response.Error != nil {
t.Fatalf("Received error response: %v", response.Error)
}
// Note: JSON unmarshaling may convert numbers to float64, so we need to check the value not exact type
if fmt.Sprintf("%v", response.ID) != fmt.Sprintf("%v", request.ID) {
t.Errorf("Response ID mismatch. Expected: %v, Got: %v", request.ID, response.ID)
}
// Check result
resultValue, ok := response.Result.(float64)
if !ok {
t.Fatalf("Expected float64 result, got: %T", response.Result)
}
if resultValue != float64(12) {
t.Errorf("Expected 12, got: %v", resultValue)
}
})
// Test authenticated method
t.Run("Authenticated method", func(t *testing.T) {
request := RPCRequest{
Method: "secure.method",
Params: json.RawMessage(`{}`),
ID: 3,
Secret: "test-secret",
JSONRPC: "2.0",
}
requestData, err := json.Marshal(request)
if err != nil {
t.Fatalf("Failed to marshal request: %v", err)
}
_, err = conn.Write(requestData)
if err != nil {
t.Fatalf("Failed to send request: %v", err)
}
// Read response
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if err != nil {
t.Fatalf("Failed to read response: %v", err)
}
var response RPCResponse
if err := json.Unmarshal(buf[:n], &response); err != nil {
t.Fatalf("Failed to unmarshal response: %v", err)
}
// Check response
if response.Error != nil {
t.Fatalf("Received error response: %v", response.Error)
}
// Note: JSON unmarshaling may convert numbers to float64, so we need to check the value not exact type
if fmt.Sprintf("%v", response.ID) != fmt.Sprintf("%v", request.ID) {
t.Errorf("Response ID mismatch. Expected: %v, Got: %v", request.ID, response.ID)
}
// Check result
resultValue, ok := response.Result.(string)
if !ok {
t.Fatalf("Expected string result, got: %T", response.Result)
}
if resultValue != "secure data" {
t.Errorf("Expected 'secure data', got: %v", resultValue)
}
})
// Test authentication failure
t.Run("Authentication failure", func(t *testing.T) {
request := RPCRequest{
Method: "secure.method",
Params: json.RawMessage(`{}`),
ID: 4,
Secret: "wrong-secret",
JSONRPC: "2.0",
}
requestData, err := json.Marshal(request)
if err != nil {
t.Fatalf("Failed to marshal request: %v", err)
}
_, err = conn.Write(requestData)
if err != nil {
t.Fatalf("Failed to send request: %v", err)
}
// Read response
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if err != nil {
t.Fatalf("Failed to read response: %v", err)
}
var response RPCResponse
if err := json.Unmarshal(buf[:n], &response); err != nil {
t.Fatalf("Failed to unmarshal response: %v", err)
}
// Check response
if response.Error == nil {
t.Fatal("Expected error response, but got nil")
}
// Note: JSON unmarshaling may convert numbers to float64, so we need to check the value not exact type
if fmt.Sprintf("%v", response.ID) != fmt.Sprintf("%v", request.ID) {
t.Errorf("Response ID mismatch. Expected: %v, Got: %v", request.ID, response.ID)
}
if response.Error.Code != -32603 {
t.Errorf("Expected error code -32603, got: %v", response.Error.Code)
}
})
// Test non-existent method
t.Run("Non-existent method", func(t *testing.T) {
request := RPCRequest{
Method: "nonexistent",
Params: json.RawMessage(`{}`),
ID: 5,
JSONRPC: "2.0",
}
requestData, err := json.Marshal(request)
if err != nil {
t.Fatalf("Failed to marshal request: %v", err)
}
_, err = conn.Write(requestData)
if err != nil {
t.Fatalf("Failed to send request: %v", err)
}
// Read response
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if err != nil {
t.Fatalf("Failed to read response: %v", err)
}
var response RPCResponse
if err := json.Unmarshal(buf[:n], &response); err != nil {
t.Fatalf("Failed to unmarshal response: %v", err)
}
// Check response
if response.Error == nil {
t.Fatal("Expected error response, but got nil")
}
// Note: JSON unmarshaling may convert numbers to float64, so we need to check the value not exact type
if fmt.Sprintf("%v", response.ID) != fmt.Sprintf("%v", request.ID) {
t.Errorf("Response ID mismatch. Expected: %v, Got: %v", request.ID, response.ID)
}
if response.Error.Code != -32603 {
t.Errorf("Expected error code -32603, got: %v", response.Error.Code)
}
})
// Test discovery method
t.Run("Discovery method", func(t *testing.T) {
request := RPCRequest{
Method: "rpc.discover",
Params: json.RawMessage(`{}`),
ID: 6,
JSONRPC: "2.0",
}
requestData, err := json.Marshal(request)
if err != nil {
t.Fatalf("Failed to marshal request: %v", err)
}
_, err = conn.Write(requestData)
if err != nil {
t.Fatalf("Failed to send request: %v", err)
}
// Read response
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if err != nil {
t.Fatalf("Failed to read response: %v", err)
}
var response RPCResponse
if err := json.Unmarshal(buf[:n], &response); err != nil {
t.Fatalf("Failed to unmarshal response: %v", err)
}
// Check response
if response.Error != nil {
t.Fatalf("Received error response: %v", response.Error)
}
// Note: JSON unmarshaling may convert numbers to float64, so we need to check the value not exact type
if fmt.Sprintf("%v", response.ID) != fmt.Sprintf("%v", request.ID) {
t.Errorf("Response ID mismatch. Expected: %v, Got: %v", request.ID, response.ID)
}
// Check that we got a valid schema
resultMap, ok := response.Result.(map[string]interface{})
if !ok {
t.Fatalf("Expected map result, got: %T", response.Result)
}
if resultMap["openrpc"] != "1.2.6" {
t.Errorf("Expected OpenRPC version 1.2.6, got: %v", resultMap["openrpc"])
}
methods, ok := resultMap["methods"].([]interface{})
if !ok {
t.Fatalf("Expected methods array, got: %T", resultMap["methods"])
}
if len(methods) < 3 {
t.Errorf("Expected at least 3 methods, got: %d", len(methods))
}
})
}