pbstreamparser.js 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. // Copyright 2015 The Closure Library Authors. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS-IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /**
  15. * @fileoverview The default Protobuf stream parser.
  16. *
  17. * The default Protobuf parser decodes the input stream (binary) under the
  18. * following rules:
  19. * 1. The data stream as a whole represents a valid proto message,
  20. * defined as following:
  21. *
  22. * message StreamBody {
  23. * repeated bytes messages = 1;
  24. * google.rpc.Status status = 2;
  25. * repeated bytes padding = 15;
  26. * }
  27. *
  28. * Padding are noop messages may be generated as base64 padding (for
  29. * browsers) or as a way to keep the connection alive. Its tag-id is
  30. * reserved as the maximum value allowed for a single-byte tag-id.
  31. *
  32. * 2. The only things that are significant to this parser in the above
  33. * definition are the specification of the tag ids and wire types (all fields
  34. * having length-delimited wire type). The parser doesn't fail if status
  35. * appears more than once, i.e. the validity of StreamBody (other than tag
  36. * ids and wire types) is not checked.
  37. *
  38. * 3. The wire format looks like:
  39. *
  40. * (<tag-id> <wire-type> <length> <message-bytes>)... EOF
  41. *
  42. * For details of Protobuf wire format see
  43. * https://developers.google.com/protocol-buffers/docs/encoding
  44. *
  45. * A message with unknown tag or with length larger than 2^32 - 1 will
  46. * invalidate the whole stream.
  47. *
  48. * 4. All decoded messages and status in the buffer will be delivered in
  49. * a batch (array), with each constructed as {tag-id: opaque-byte-array}.
  50. * No-op data, e.g. padding, will be immediately discarded.
  51. *
  52. * 5. If a high-level API does not support batch delivery (e.g. grpc), then
  53. * a wrapper is expected to deliver individual message separately in order.
  54. */
  55. goog.provide('goog.net.streams.PbStreamParser');
  56. goog.require('goog.asserts');
  57. goog.require('goog.net.streams.StreamParser');
  58. goog.scope(function() {
  59. /**
  60. * The default Protobuf stream parser.
  61. *
  62. * @constructor
  63. * @struct
  64. * @implements {goog.net.streams.StreamParser}
  65. * @final
  66. */
  67. goog.net.streams.PbStreamParser = function() {
  68. /**
  69. * The current error message, if any.
  70. * @private {?string}
  71. */
  72. this.errorMessage_ = null;
  73. /**
  74. * The currently buffered result (parsed messages).
  75. * @private {!Array<!Object>}
  76. */
  77. this.result_ = [];
  78. /**
  79. * The current position in the streamed data.
  80. * @private {number}
  81. */
  82. this.streamPos_ = 0;
  83. /**
  84. * The current parser state.
  85. * @private {goog.net.streams.PbStreamParser.State_}
  86. */
  87. this.state_ = Parser.State_.INIT;
  88. /**
  89. * The tag of the proto message being parsed.
  90. * @private {number}
  91. */
  92. this.tag_ = 0;
  93. /**
  94. * The length of the proto message being parsed.
  95. * @private {number}
  96. */
  97. this.length_ = 0;
  98. /**
  99. * Count of processed length bytes.
  100. * @private {number}
  101. */
  102. this.countLengthBytes_ = 0;
  103. /**
  104. * Raw bytes of the current message. Uses Uint8Array by default. Falls back to
  105. * native array when Uint8Array is unsupported.
  106. * @private {?Uint8Array|?Array<number>}
  107. */
  108. this.messageBuffer_ = null;
  109. /**
  110. * Count of processed message bytes.
  111. * @private {number}
  112. */
  113. this.countMessageBytes_ = 0;
  114. };
  115. var Parser = goog.net.streams.PbStreamParser;
  116. /**
  117. * The parser state.
  118. * @private @enum {number}
  119. */
  120. Parser.State_ = {
  121. INIT: 0, // expecting the tag:wire-type byte
  122. LENGTH: 1, // expecting more varint bytes of length
  123. MESSAGE: 2, // expecting more message bytes
  124. INVALID: 3
  125. };
  126. /**
  127. * Tag of padding messages.
  128. * @private @const {number}
  129. */
  130. Parser.PADDING_TAG_ = 15;
  131. /**
  132. * @override
  133. */
  134. goog.net.streams.PbStreamParser.prototype.isInputValid = function() {
  135. return this.state_ != Parser.State_.INVALID;
  136. };
  137. /**
  138. * @override
  139. */
  140. goog.net.streams.PbStreamParser.prototype.getErrorMessage = function() {
  141. return this.errorMessage_;
  142. };
  143. /**
  144. * @param {!Uint8Array|!Array<number>} inputBytes The current input buffer
  145. * @param {number} pos The position in the current input that triggers the error
  146. * @param {string} errorMsg Additional error message
  147. * @throws {!Error} Throws an error indicating where the stream is broken
  148. * @private
  149. */
  150. Parser.prototype.error_ = function(inputBytes, pos, errorMsg) {
  151. this.state_ = Parser.State_.INVALID;
  152. this.errorMessage_ = 'The stream is broken @' + this.streamPos_ + '/' + pos +
  153. '. ' +
  154. 'Error: ' + errorMsg + '. ' +
  155. 'With input:\n' + inputBytes;
  156. throw Error(this.errorMessage_);
  157. };
  158. /**
  159. * @throws {!Error} Throws an error message if the input is invalid.
  160. * @override
  161. */
  162. goog.net.streams.PbStreamParser.prototype.parse = function(input) {
  163. goog.asserts.assert(input instanceof Array || input instanceof ArrayBuffer);
  164. var parser = this;
  165. var inputBytes = (input instanceof Array) ? input : new Uint8Array(input);
  166. var pos = 0;
  167. while (pos < inputBytes.length) {
  168. switch (parser.state_) {
  169. case Parser.State_.INVALID: {
  170. parser.error_(inputBytes, pos, 'stream already broken');
  171. break;
  172. }
  173. case Parser.State_.INIT: {
  174. processTagByte(inputBytes[pos]);
  175. break;
  176. }
  177. case Parser.State_.LENGTH: {
  178. processLengthByte(inputBytes[pos]);
  179. break;
  180. }
  181. case Parser.State_.MESSAGE: {
  182. processMessageByte(inputBytes[pos]);
  183. break;
  184. }
  185. default: { throw Error('unexpected parser state: ' + parser.state_); }
  186. }
  187. parser.streamPos_++;
  188. pos++;
  189. }
  190. var msgs = parser.result_;
  191. parser.result_ = [];
  192. return msgs.length > 0 ? msgs : null;
  193. /**
  194. * @param {number} b A tag byte to process
  195. */
  196. function processTagByte(b) {
  197. if (b & 0x80) {
  198. parser.error_(inputBytes, pos, 'invalid tag');
  199. }
  200. var wireType = b & 0x07;
  201. if (wireType != 2) {
  202. parser.error_(inputBytes, pos, 'invalid wire type');
  203. }
  204. parser.tag_ = b >>> 3;
  205. if (parser.tag_ != 1 && parser.tag_ != 2 && parser.tag_ != 15) {
  206. parser.error_(inputBytes, pos, 'unexpected tag');
  207. }
  208. parser.state_ = Parser.State_.LENGTH;
  209. parser.length_ = 0;
  210. parser.countLengthBytes_ = 0;
  211. }
  212. /**
  213. * @param {number} b A length byte to process
  214. */
  215. function processLengthByte(b) {
  216. parser.countLengthBytes_++;
  217. if (parser.countLengthBytes_ == 5) {
  218. if (b & 0xF0) { // length will not fit in a 32-bit uint
  219. parser.error_(inputBytes, pos, 'message length too long');
  220. }
  221. }
  222. parser.length_ |= (b & 0x7F) << ((parser.countLengthBytes_ - 1) * 7);
  223. if (!(b & 0x80)) { // no more length byte
  224. parser.state_ = Parser.State_.MESSAGE;
  225. parser.countMessageBytes_ = 0;
  226. if (typeof Uint8Array !== 'undefined') {
  227. parser.messageBuffer_ = new Uint8Array(parser.length_);
  228. } else {
  229. parser.messageBuffer_ = new Array(parser.length_);
  230. }
  231. if (parser.length_ == 0) { // empty message
  232. finishMessage();
  233. }
  234. }
  235. }
  236. /**
  237. * @param {number} b A message byte to process
  238. */
  239. function processMessageByte(b) {
  240. parser.messageBuffer_[parser.countMessageBytes_++] = b;
  241. if (parser.countMessageBytes_ == parser.length_) {
  242. finishMessage();
  243. }
  244. }
  245. /**
  246. * Finishes up building the current message and resets parser state
  247. */
  248. function finishMessage() {
  249. if (parser.tag_ < Parser.PADDING_TAG_) {
  250. var message = {};
  251. message[parser.tag_] = parser.messageBuffer_;
  252. parser.result_.push(message);
  253. }
  254. parser.state_ = Parser.State_.INIT;
  255. }
  256. };
  257. }); // goog.scope