| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463 | var Parser       = require('./Parser');var Sequences    = require('./sequences');var Packets      = require('./packets');var Stream       = require('stream').Stream;var Util         = require('util');var PacketWriter = require('./PacketWriter');module.exports = Protocol;Util.inherits(Protocol, Stream);function Protocol(options) {  Stream.call(this);  options = options || {};  this.readable = true;  this.writable = true;  this._config                        = options.config || {};  this._connection                    = options.connection;  this._callback                      = null;  this._fatalError                    = null;  this._quitSequence                  = null;  this._handshake                     = false;  this._handshaked                    = false;  this._ended                         = false;  this._destroyed                     = false;  this._queue                         = [];  this._handshakeInitializationPacket = null;  this._parser = new Parser({    onError  : this.handleParserError.bind(this),    onPacket : this._parsePacket.bind(this),    config   : this._config  });}Protocol.prototype.write = function(buffer) {  this._parser.write(buffer);  return true;};Protocol.prototype.handshake = function handshake(options, callback) {  if (typeof options === 'function') {    callback = options;    options = {};  }  options = options || {};  options.config = this._config;  var sequence = this._enqueue(new Sequences.Handshake(options, callback));  this._handshake = true;  return sequence;};Protocol.prototype.query = function query(options, callback) {  return this._enqueue(new Sequences.Query(options, callback));};Protocol.prototype.changeUser = function changeUser(options, callback) {  return this._enqueue(new Sequences.ChangeUser(options, callback));};Protocol.prototype.ping = function ping(options, callback) {  if (typeof options === 'function') {    callback = options;    options = {};  }  return this._enqueue(new Sequences.Ping(options, callback));};Protocol.prototype.stats = function stats(options, callback) {  if (typeof options === 'function') {    callback = options;    options = {};  }  return this._enqueue(new Sequences.Statistics(options, callback));};Protocol.prototype.quit = function quit(options, callback) {  if (typeof options === 'function') {    callback = options;    options = {};  }  var self     = this;  var sequence = this._enqueue(new Sequences.Quit(options, callback));  sequence.on('end', function () {    self.end();  });  return this._quitSequence = sequence;};Protocol.prototype.end = function() {  if (this._ended) {    return;  }  this._ended = true;  if (this._quitSequence && (this._quitSequence._ended || this._queue[0] === this._quitSequence)) {    this._quitSequence.end();    this.emit('end');    return;  }  var err = new Error('Connection lost: The server closed the connection.');  err.fatal = true;  err.code = 'PROTOCOL_CONNECTION_LOST';  this._delegateError(err);};Protocol.prototype.pause = function() {  this._parser.pause();  // Since there is a file stream in query, we must transmit pause/resume event to current sequence.  var seq = this._queue[0];  if (seq && seq.emit) {    seq.emit('pause');  }};Protocol.prototype.resume = function() {  this._parser.resume();  // Since there is a file stream in query, we must transmit pause/resume event to current sequence.  var seq = this._queue[0];  if (seq && seq.emit) {    seq.emit('resume');  }};Protocol.prototype._enqueue = function(sequence) {  if (!this._validateEnqueue(sequence)) {    return sequence;  }  if (this._config.trace) {    // Long stack trace support    sequence._callSite = sequence._callSite || new Error();  }  this._queue.push(sequence);  this.emit('enqueue', sequence);  var self = this;  sequence    .on('error', function(err) {      self._delegateError(err, sequence);    })    .on('packet', function(packet) {      sequence._timer.active();      self._emitPacket(packet);    })    .on('timeout', function() {      var err = new Error(sequence.constructor.name + ' inactivity timeout');      err.code    = 'PROTOCOL_SEQUENCE_TIMEOUT';      err.fatal   = true;      err.timeout = sequence._timeout;      self._delegateError(err, sequence);    });  if (sequence.constructor === Sequences.Handshake) {    sequence.on('start-tls', function () {      sequence._timer.active();      self._connection._startTLS(function(err) {        if (err) {          // SSL negotiation error are fatal          err.code  = 'HANDSHAKE_SSL_ERROR';          err.fatal = true;          sequence.end(err);          return;        }        sequence._timer.active();        sequence._tlsUpgradeCompleteHandler();      });    });    sequence.on('end', function () {      self._handshaked = true;      if (!self._fatalError) {        self.emit('handshake', self._handshakeInitializationPacket);      }    });  }  sequence.on('end', function () {    self._dequeue(sequence);  });  if (this._queue.length === 1) {    this._parser.resetPacketNumber();    this._startSequence(sequence);  }  return sequence;};Protocol.prototype._validateEnqueue = function _validateEnqueue(sequence) {  var err;  var prefix = 'Cannot enqueue ' + sequence.constructor.name;  if (this._fatalError) {    err      = new Error(prefix + ' after fatal error.');    err.code = 'PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR';  } else if (this._quitSequence) {    err      = new Error(prefix + ' after invoking quit.');    err.code = 'PROTOCOL_ENQUEUE_AFTER_QUIT';  } else if (this._destroyed) {    err      = new Error(prefix + ' after being destroyed.');    err.code = 'PROTOCOL_ENQUEUE_AFTER_DESTROY';  } else if ((this._handshake || this._handshaked) && sequence.constructor === Sequences.Handshake) {    err      = new Error(prefix + ' after already enqueuing a Handshake.');    err.code = 'PROTOCOL_ENQUEUE_HANDSHAKE_TWICE';  } else {    return true;  }  var self  = this;  err.fatal = false;  // add error handler  sequence.on('error', function (err) {    self._delegateError(err, sequence);  });  process.nextTick(function () {    sequence.end(err);  });  return false;};Protocol.prototype._parsePacket = function() {  var sequence = this._queue[0];  if (!sequence) {    var err   = new Error('Received packet with no active sequence.');    err.code  = 'PROTOCOL_STRAY_PACKET';    err.fatal = true;    this._delegateError(err);    return;  }  var Packet     = this._determinePacket(sequence);  var packet     = new Packet({protocol41: this._config.protocol41});  var packetName = Packet.name;  // Special case: Faster dispatch, and parsing done inside sequence  if (Packet === Packets.RowDataPacket) {    sequence.RowDataPacket(packet, this._parser, this._connection);    if (this._config.debug) {      this._debugPacket(true, packet);    }    return;  }  if (this._config.debug) {    this._parsePacketDebug(packet);  } else {    packet.parse(this._parser);  }  if (Packet === Packets.HandshakeInitializationPacket) {    this._handshakeInitializationPacket = packet;    this.emit('initialize', packet);  }  sequence._timer.active();  if (!sequence[packetName]) {    var err   = new Error('Received packet in the wrong sequence.');    err.code  = 'PROTOCOL_INCORRECT_PACKET_SEQUENCE';    err.fatal = true;    this._delegateError(err);    return;  }  sequence[packetName](packet);};Protocol.prototype._parsePacketDebug = function _parsePacketDebug(packet) {  try {    packet.parse(this._parser);  } finally {    this._debugPacket(true, packet);  }};Protocol.prototype._emitPacket = function(packet) {  var packetWriter = new PacketWriter();  packet.write(packetWriter);  this.emit('data', packetWriter.toBuffer(this._parser));  if (this._config.debug) {    this._debugPacket(false, packet);  }};Protocol.prototype._determinePacket = function(sequence) {  var firstByte = this._parser.peak();  if (sequence.determinePacket) {    var Packet = sequence.determinePacket(firstByte, this._parser);    if (Packet) {      return Packet;    }  }  switch (firstByte) {    case 0x00: return Packets.OkPacket;    case 0xfe: return Packets.EofPacket;    case 0xff: return Packets.ErrorPacket;  }  throw new Error('Could not determine packet, firstByte = ' + firstByte);};Protocol.prototype._dequeue = function(sequence) {  sequence._timer.stop();  // No point in advancing the queue, we are dead  if (this._fatalError) {    return;  }  this._queue.shift();  var sequence = this._queue[0];  if (!sequence) {    this.emit('drain');    return;  }  this._parser.resetPacketNumber();  this._startSequence(sequence);};Protocol.prototype._startSequence = function(sequence) {  if (sequence._timeout > 0 && isFinite(sequence._timeout)) {    sequence._timer.start(sequence._timeout);  }  if (sequence.constructor === Sequences.ChangeUser) {    sequence.start(this._handshakeInitializationPacket);  } else {    sequence.start();  }};Protocol.prototype.handleNetworkError = function(err) {  err.fatal = true;  var sequence = this._queue[0];  if (sequence) {    sequence.end(err);  } else {    this._delegateError(err);  }};Protocol.prototype.handleParserError = function handleParserError(err) {  var sequence = this._queue[0];  if (sequence) {    sequence.end(err);  } else {    this._delegateError(err);  }};Protocol.prototype._delegateError = function(err, sequence) {  // Stop delegating errors after the first fatal error  if (this._fatalError) {    return;  }  if (err.fatal) {    this._fatalError = err;  }  if (this._shouldErrorBubbleUp(err, sequence)) {    // Can't use regular 'error' event here as that always destroys the pipe    // between socket and protocol which is not what we want (unless the    // exception was fatal).    this.emit('unhandledError', err);  } else if (err.fatal) {    // Send fatal error to all sequences in the queue    var queue = this._queue;    process.nextTick(function () {      queue.forEach(function (sequence) {        sequence.end(err);      });      queue.length = 0;    });  }  // Make sure the stream we are piping to is getting closed  if (err.fatal) {    this.emit('end', err);  }};Protocol.prototype._shouldErrorBubbleUp = function(err, sequence) {  if (sequence) {    if (sequence.hasErrorHandler()) {      return false;    } else if (!err.fatal) {      return true;    }  }  return (err.fatal && !this._hasPendingErrorHandlers());};Protocol.prototype._hasPendingErrorHandlers = function() {  return this._queue.some(function(sequence) {    return sequence.hasErrorHandler();  });};Protocol.prototype.destroy = function() {  this._destroyed = true;  this._parser.pause();  if (this._connection.state !== 'disconnected') {    if (!this._ended) {      this.end();    }  }};Protocol.prototype._debugPacket = function(incoming, packet) {  var connection = this._connection;  var direction  = incoming    ? '<--'    : '-->';  var packetName = packet.constructor.name;  var threadId   = connection && connection.threadId !== null    ? ' (' + connection.threadId + ')'    : '';  // check for debug packet restriction  if (Array.isArray(this._config.debug) && this._config.debug.indexOf(packetName) === -1) {    return;  }  var packetPayload = Util.inspect(packet).replace(/^[^{]+/, '');  console.log('%s%s %s %s\n', direction, threadId, packetName, packetPayload);};
 |