| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 | "use strict";Object.defineProperty(exports, "__esModule", { value: true });exports.Socket = void 0;const events_1 = require("events");const debug_1 = require("debug");const timers_1 = require("timers");const debug = (0, debug_1.default)("engine:socket");class Socket extends events_1.EventEmitter {    /**     * Client class (abstract).     *     * @api private     */    constructor(id, server, transport, req, protocol) {        super();        this.id = id;        this.server = server;        this.upgrading = false;        this.upgraded = false;        this.readyState = "opening";        this.writeBuffer = [];        this.packetsFn = [];        this.sentCallbackFn = [];        this.cleanupFn = [];        this.request = req;        this.protocol = protocol;        // Cache IP since it might not be in the req later        if (req.websocket && req.websocket._socket) {            this.remoteAddress = req.websocket._socket.remoteAddress;        }        else {            this.remoteAddress = req.connection.remoteAddress;        }        this.checkIntervalTimer = null;        this.upgradeTimeoutTimer = null;        this.pingTimeoutTimer = null;        this.pingIntervalTimer = null;        this.setTransport(transport);        this.onOpen();    }    get readyState() {        return this._readyState;    }    set readyState(state) {        debug("readyState updated from %s to %s", this._readyState, state);        this._readyState = state;    }    /**     * Called upon transport considered open.     *     * @api private     */    onOpen() {        this.readyState = "open";        // sends an `open` packet        this.transport.sid = this.id;        this.sendPacket("open", JSON.stringify({            sid: this.id,            upgrades: this.getAvailableUpgrades(),            pingInterval: this.server.opts.pingInterval,            pingTimeout: this.server.opts.pingTimeout,            maxPayload: this.server.opts.maxHttpBufferSize        }));        if (this.server.opts.initialPacket) {            this.sendPacket("message", this.server.opts.initialPacket);        }        this.emit("open");        if (this.protocol === 3) {            // in protocol v3, the client sends a ping, and the server answers with a pong            this.resetPingTimeout(this.server.opts.pingInterval + this.server.opts.pingTimeout);        }        else {            // in protocol v4, the server sends a ping, and the client answers with a pong            this.schedulePing();        }    }    /**     * Called upon transport packet.     *     * @param {Object} packet     * @api private     */    onPacket(packet) {        if ("open" !== this.readyState) {            return debug("packet received with closed socket");        }        // export packet event        debug(`received packet ${packet.type}`);        this.emit("packet", packet);        // Reset ping timeout on any packet, incoming data is a good sign of        // other side's liveness        this.resetPingTimeout(this.server.opts.pingInterval + this.server.opts.pingTimeout);        switch (packet.type) {            case "ping":                if (this.transport.protocol !== 3) {                    this.onError("invalid heartbeat direction");                    return;                }                debug("got ping");                this.sendPacket("pong");                this.emit("heartbeat");                break;            case "pong":                if (this.transport.protocol === 3) {                    this.onError("invalid heartbeat direction");                    return;                }                debug("got pong");                this.pingIntervalTimer.refresh();                this.emit("heartbeat");                break;            case "error":                this.onClose("parse error");                break;            case "message":                this.emit("data", packet.data);                this.emit("message", packet.data);                break;        }    }    /**     * Called upon transport error.     *     * @param {Error} error object     * @api private     */    onError(err) {        debug("transport error");        this.onClose("transport error", err);    }    /**     * Pings client every `this.pingInterval` and expects response     * within `this.pingTimeout` or closes connection.     *     * @api private     */    schedulePing() {        this.pingIntervalTimer = (0, timers_1.setTimeout)(() => {            debug("writing ping packet - expecting pong within %sms", this.server.opts.pingTimeout);            this.sendPacket("ping");            this.resetPingTimeout(this.server.opts.pingTimeout);        }, this.server.opts.pingInterval);    }    /**     * Resets ping timeout.     *     * @api private     */    resetPingTimeout(timeout) {        (0, timers_1.clearTimeout)(this.pingTimeoutTimer);        this.pingTimeoutTimer = (0, timers_1.setTimeout)(() => {            if (this.readyState === "closed")                return;            this.onClose("ping timeout");        }, timeout);    }    /**     * Attaches handlers for the given transport.     *     * @param {Transport} transport     * @api private     */    setTransport(transport) {        const onError = this.onError.bind(this);        const onPacket = this.onPacket.bind(this);        const flush = this.flush.bind(this);        const onClose = this.onClose.bind(this, "transport close");        this.transport = transport;        this.transport.once("error", onError);        this.transport.on("packet", onPacket);        this.transport.on("drain", flush);        this.transport.once("close", onClose);        // this function will manage packet events (also message callbacks)        this.setupSendCallback();        this.cleanupFn.push(function () {            transport.removeListener("error", onError);            transport.removeListener("packet", onPacket);            transport.removeListener("drain", flush);            transport.removeListener("close", onClose);        });    }    /**     * Upgrades socket to the given transport     *     * @param {Transport} transport     * @api private     */    maybeUpgrade(transport) {        debug('might upgrade socket transport from "%s" to "%s"', this.transport.name, transport.name);        this.upgrading = true;        // set transport upgrade timer        this.upgradeTimeoutTimer = (0, timers_1.setTimeout)(() => {            debug("client did not complete upgrade - closing transport");            cleanup();            if ("open" === transport.readyState) {                transport.close();            }        }, this.server.opts.upgradeTimeout);        const onPacket = packet => {            if ("ping" === packet.type && "probe" === packet.data) {                debug("got probe ping packet, sending pong");                transport.send([{ type: "pong", data: "probe" }]);                this.emit("upgrading", transport);                clearInterval(this.checkIntervalTimer);                this.checkIntervalTimer = setInterval(check, 100);            }            else if ("upgrade" === packet.type && this.readyState !== "closed") {                debug("got upgrade packet - upgrading");                cleanup();                this.transport.discard();                this.upgraded = true;                this.clearTransport();                this.setTransport(transport);                this.emit("upgrade", transport);                this.flush();                if (this.readyState === "closing") {                    transport.close(() => {                        this.onClose("forced close");                    });                }            }            else {                cleanup();                transport.close();            }        };        // we force a polling cycle to ensure a fast upgrade        const check = () => {            if ("polling" === this.transport.name && this.transport.writable) {                debug("writing a noop packet to polling for fast upgrade");                this.transport.send([{ type: "noop" }]);            }        };        const cleanup = () => {            this.upgrading = false;            clearInterval(this.checkIntervalTimer);            this.checkIntervalTimer = null;            (0, timers_1.clearTimeout)(this.upgradeTimeoutTimer);            this.upgradeTimeoutTimer = null;            transport.removeListener("packet", onPacket);            transport.removeListener("close", onTransportClose);            transport.removeListener("error", onError);            this.removeListener("close", onClose);        };        const onError = err => {            debug("client did not complete upgrade - %s", err);            cleanup();            transport.close();            transport = null;        };        const onTransportClose = () => {            onError("transport closed");        };        const onClose = () => {            onError("socket closed");        };        transport.on("packet", onPacket);        transport.once("close", onTransportClose);        transport.once("error", onError);        this.once("close", onClose);    }    /**     * Clears listeners and timers associated with current transport.     *     * @api private     */    clearTransport() {        let cleanup;        const toCleanUp = this.cleanupFn.length;        for (let i = 0; i < toCleanUp; i++) {            cleanup = this.cleanupFn.shift();            cleanup();        }        // silence further transport errors and prevent uncaught exceptions        this.transport.on("error", function () {            debug("error triggered by discarded transport");        });        // ensure transport won't stay open        this.transport.close();        (0, timers_1.clearTimeout)(this.pingTimeoutTimer);    }    /**     * Called upon transport considered closed.     * Possible reasons: `ping timeout`, `client error`, `parse error`,     * `transport error`, `server close`, `transport close`     */    onClose(reason, description) {        if ("closed" !== this.readyState) {            this.readyState = "closed";            // clear timers            (0, timers_1.clearTimeout)(this.pingIntervalTimer);            (0, timers_1.clearTimeout)(this.pingTimeoutTimer);            clearInterval(this.checkIntervalTimer);            this.checkIntervalTimer = null;            (0, timers_1.clearTimeout)(this.upgradeTimeoutTimer);            // clean writeBuffer in next tick, so developers can still            // grab the writeBuffer on 'close' event            process.nextTick(() => {                this.writeBuffer = [];            });            this.packetsFn = [];            this.sentCallbackFn = [];            this.clearTransport();            this.emit("close", reason, description);        }    }    /**     * Setup and manage send callback     *     * @api private     */    setupSendCallback() {        // the message was sent successfully, execute the callback        const onDrain = () => {            if (this.sentCallbackFn.length > 0) {                const seqFn = this.sentCallbackFn.splice(0, 1)[0];                if ("function" === typeof seqFn) {                    debug("executing send callback");                    seqFn(this.transport);                }                else if (Array.isArray(seqFn)) {                    debug("executing batch send callback");                    const l = seqFn.length;                    let i = 0;                    for (; i < l; i++) {                        if ("function" === typeof seqFn[i]) {                            seqFn[i](this.transport);                        }                    }                }            }        };        this.transport.on("drain", onDrain);        this.cleanupFn.push(() => {            this.transport.removeListener("drain", onDrain);        });    }    /**     * Sends a message packet.     *     * @param {Object} data     * @param {Object} options     * @param {Function} callback     * @return {Socket} for chaining     * @api public     */    send(data, options, callback) {        this.sendPacket("message", data, options, callback);        return this;    }    write(data, options, callback) {        this.sendPacket("message", data, options, callback);        return this;    }    /**     * Sends a packet.     *     * @param {String} type - packet type     * @param {String} data     * @param {Object} options     * @param {Function} callback     *     * @api private     */    sendPacket(type, data, options, callback) {        if ("function" === typeof options) {            callback = options;            options = null;        }        options = options || {};        options.compress = false !== options.compress;        if ("closing" !== this.readyState && "closed" !== this.readyState) {            debug('sending packet "%s" (%s)', type, data);            const packet = {                type,                options            };            if (data)                packet.data = data;            // exports packetCreate event            this.emit("packetCreate", packet);            this.writeBuffer.push(packet);            // add send callback to object, if defined            if (callback)                this.packetsFn.push(callback);            this.flush();        }    }    /**     * Attempts to flush the packets buffer.     *     * @api private     */    flush() {        if ("closed" !== this.readyState &&            this.transport.writable &&            this.writeBuffer.length) {            debug("flushing buffer to transport");            this.emit("flush", this.writeBuffer);            this.server.emit("flush", this, this.writeBuffer);            const wbuf = this.writeBuffer;            this.writeBuffer = [];            if (!this.transport.supportsFraming) {                this.sentCallbackFn.push(this.packetsFn);            }            else {                this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn);            }            this.packetsFn = [];            this.transport.send(wbuf);            this.emit("drain");            this.server.emit("drain", this);        }    }    /**     * Get available upgrades for this socket.     *     * @api private     */    getAvailableUpgrades() {        const availableUpgrades = [];        const allUpgrades = this.server.upgrades(this.transport.name);        let i = 0;        const l = allUpgrades.length;        for (; i < l; ++i) {            const upg = allUpgrades[i];            if (this.server.opts.transports.indexOf(upg) !== -1) {                availableUpgrades.push(upg);            }        }        return availableUpgrades;    }    /**     * Closes the socket and underlying transport.     *     * @param {Boolean} discard - optional, discard the transport     * @return {Socket} for chaining     * @api public     */    close(discard) {        if ("open" !== this.readyState)            return;        this.readyState = "closing";        if (this.writeBuffer.length) {            this.once("drain", this.closeTransport.bind(this, discard));            return;        }        this.closeTransport(discard);    }    /**     * Closes the underlying transport.     *     * @param {Boolean} discard     * @api private     */    closeTransport(discard) {        if (discard)            this.transport.discard();        this.transport.close(this.onClose.bind(this, "forced close"));    }}exports.Socket = Socket;
 |