'use strict'; const WebSocket = require('isomorphic-ws'); const utils = require('../utils'); const delay = require('delay'); const Client = require('../client'); /** * Constructor for a Jayson Websocket Client * @class ClientWebsocket * @constructor * @extends Client * @param {Object} [options] * @param {String} [options.url] When options.ws not provided this will be the URL to open the websocket to * @param {ws.WebSocket} [options.ws] When not provided will create a WebSocket instance with options.url * @param {Number} [options.timeout] Will wait this long in ms until callbacking with an error * @return {ClientWebsocket} */ const ClientWebsocket = function(options) { if(!(this instanceof ClientWebsocket)) { return new ClientWebsocket(options); } Client.call(this, options); const defaults = utils.merge(this.options, {}); this.options = utils.merge(defaults, options || {}); const self = this; this.ws = this.options.ws || new WebSocket(this.options.url); this.outstandingRequests = []; this.handlers = {}; this.handlers.message = function (str) { utils.JSON.parse(str, self.options, function(err, response) { if (err) { // invalid JSON is ignored return; } if (Array.isArray(response)) { // we have a batch reply const matchingRequest = self.outstandingRequests.find(function ([request]) { if (Array.isArray(request)) { // a batch is considered matching if at least one response id matches one request id return response.some(function (resp) { if (utils.Response.isValidResponse(resp)) { return request.some(function (req) { return req.id === resp.id; }); } return false; }); } }); if (matchingRequest) { const [ request, resolve ] = matchingRequest; return resolve(response); } } else if (utils.Response.isValidResponse(response)) { const matchingRequest = self.outstandingRequests.find(function ([request]) { return !Array.isArray(request) && request.id === response.id; }); if (matchingRequest) { const [ request, resolve ] = matchingRequest; return resolve(response); } } }); }; this.ws.on('message', this.handlers.message); }; require('util').inherits(ClientWebsocket, Client); module.exports = ClientWebsocket; /** * @desc Removes all event listeners from Websocket instance which cancels all outstanding requests too */ ClientWebsocket.prototype.unlisten = function () { for (const eventName in this.handlers) { this.ws.off(eventName, this.handlers[eventName]); } }; ClientWebsocket.prototype._request = function(request, callback) { const self = this; const { ws, options } = this; // we have to remove the object representing this request when the promise resolves/rejects let outstandingItem; Promise.race([ options.timeout > 0 ? delay(options.timeout).then(function () { throw new Error('timeout reached after ' + options.timeout + ' ms'); }) : null, new Promise(function (resolve, reject) { utils.JSON.stringify(request, options, function(err, body) { if (err) { return resolve(err); } ws.send(body); if (utils.Request.isNotification(request)) { // notifications callback immediately since they don't have a reply return resolve(); } outstandingItem = [request, resolve, reject]; self.outstandingRequests.push(outstandingItem); }); }), ].filter(v => v !== null)).then(function (result) { removeOutstandingRequest(); callback(null, result); }).catch(function (err) { removeOutstandingRequest(); callback(err); }); function removeOutstandingRequest () { if (!outstandingItem) return; self.outstandingRequests = self.outstandingRequests.filter(v => v !== outstandingItem); } };