diff --git a/lib/core/jobs/openrpc/handler.v b/lib/core/jobs/openrpc/handler.v index b3dd1233..7503d420 100644 --- a/lib/core/jobs/openrpc/handler.v +++ b/lib/core/jobs/openrpc/handler.v @@ -32,7 +32,7 @@ pub fn (mut s OpenRPCServer) start() ! { // Send response back to Redis using response queue response_json := json.encode(response) key:='${rpc_queue}:${request.id}' - println("response: ${} put on return queue ${key} ") + println("response: \n${response}\n put on return queue ${key} ") mut response_queue := &redisclient.RedisQueue{ key: key redis: s.redis diff --git a/lib/core/jobs/openrpc/ws_server.v b/lib/core/jobs/openrpc/ws_server.v index 7c652b29..a57520a4 100644 --- a/lib/core/jobs/openrpc/ws_server.v +++ b/lib/core/jobs/openrpc/ws_server.v @@ -9,13 +9,19 @@ import rand pub struct WSServer { mut: redis &redisclient.Redis + queue &redisclient.RedisQueue port int = 8080 // Default port, can be configured } // Create new WebSocket server pub fn new_ws_server( port int) !&WSServer { + mut redis:= redisclient.core_get()! return &WSServer{ - redis: redisclient.core_get()! + redis: redis + queue: &redisclient.RedisQueue{ + key: rpc_queue + redis: redis + } port: port } } @@ -58,10 +64,16 @@ pub fn (mut s WSServer) start() ! { println('WebSocket put on queue: \'${rpc_queue}\' (msg: ${msg.payload.bytestr()})') // Send request to Redis queue - s.redis.lpush(rpc_queue, msg.payload.bytestr())! + s.queue.add(msg.payload.bytestr())! + + returnkey:='${rpc_queue}:${req_id}' + mut queue_return := &redisclient.RedisQueue{ + key: returnkey + redis: s.redis + } // Wait for response - response := s.redis.brpop(['${rpc_queue}:${req_id}'], 30)! // 30 second timeout + response := queue_return.get(30)! if response.len < 2 { error_msg := '{"jsonrpc":"2.0","error":"Timeout waiting for response","id":${req_id}}' println('WebSocket error response (err: ${response})') @@ -69,9 +81,10 @@ pub fn (mut s WSServer) start() ! { return } - println('WebSocket ok response (msg: ${response[1].bytes()})') + println('WebSocket ok response (msg: ${response[1]})') // Send response back to WebSocket client - ws.write(response[1].bytes(), websocket.OPCode.text_frame) or { panic(err) } + response_str := response[1].str() + ws.write(response_str.bytes(), websocket.OPCode.text_frame) or { panic(err) } }) // Start server