pbjsonstreamparser.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. // Copyright 2016 The Closure Library Authors. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS-IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /**
  15. * @fileoverview A stream parser of StreamBody message in Protobuf-JSON format.
  16. *
  17. * 1. StreamBody proto message is defined as following:
  18. *
  19. * message StreamBody {
  20. * repeated bytes messages = 1;
  21. * google.rpc.Status status = 2;
  22. * }
  23. *
  24. * 2. In Protobuf-JSON format, StreamBody is represented as a Protobuf-JSON
  25. * array (different than JSON array by emitting null elements):
  26. *
  27. * - [ [ message1, message2, ..., messageN ] ] (no status)
  28. * - [ , status ] (no message)
  29. * - [ [ message1, message2, ..., messageN ] , status ]
  30. *
  31. * 3. All parsed messages and status will be delivered in a batch (array),
  32. * with each constructed as {tag-id: content-string}.
  33. */
  34. goog.module('goog.net.streams.PbJsonStreamParser');
  35. var JsonStreamParser = goog.require('goog.net.streams.JsonStreamParser');
  36. var StreamParser = goog.require('goog.net.streams.StreamParser');
  37. var asserts = goog.require('goog.asserts');
  38. var utils = goog.require('goog.net.streams.utils');
  39. /**
  40. * A stream parser of StreamBody message in Protobuf-JSON format.
  41. *
  42. * @constructor
  43. * @struct
  44. * @implements {StreamParser}
  45. * @final
  46. */
  47. var PbJsonStreamParser = function() {
  48. /**
  49. * Protobuf raw bytes stream parser
  50. * @private {?JsonStreamParser}
  51. */
  52. this.jsonStreamParser_ = null;
  53. /**
  54. * The current error message, if any.
  55. * @private {?string}
  56. */
  57. this.errorMessage_ = null;
  58. /**
  59. * The current position in the streamed data.
  60. * @private {number}
  61. */
  62. this.streamPos_ = 0;
  63. /**
  64. * The current parser state.
  65. * @private {!State}
  66. */
  67. this.state_ = State.INIT;
  68. /**
  69. * The currently buffered result (parsed JSON objects).
  70. * @private {!Array<!Object>}
  71. */
  72. this.result_ = [];
  73. /**
  74. * Whether the status has been parsed.
  75. * @private {boolean}
  76. */
  77. this.statusParsed_ = false;
  78. };
  79. /**
  80. * The parser state.
  81. * @enum {number}
  82. */
  83. var State = {
  84. INIT: 0, // expecting the beginning "["
  85. ARRAY_OPEN: 1, // expecting the message array or the msg-status separator
  86. MESSAGES: 2, // expecting the message array
  87. MESSAGES_DONE: 3, // expecting the msg-status separator or the ending "]"
  88. STATUS: 4, // expecting the status
  89. ARRAY_END: 5, // expecting NO more non-whitespace input
  90. INVALID: 6 // the stream has become invalid
  91. };
  92. /** @override */
  93. PbJsonStreamParser.prototype.isInputValid = function() {
  94. return this.errorMessage_ === null;
  95. };
  96. /** @override */
  97. PbJsonStreamParser.prototype.getErrorMessage = function() {
  98. return this.errorMessage_;
  99. };
  100. /** @override */
  101. PbJsonStreamParser.prototype.parse = function(input) {
  102. asserts.assertString(input);
  103. var parser = this;
  104. var pos = 0;
  105. while (pos < input.length) {
  106. if (!readMore()) {
  107. return null;
  108. }
  109. switch (parser.state_) {
  110. case State.INVALID: {
  111. reportError('stream already broken');
  112. break;
  113. }
  114. case State.INIT: {
  115. if (input[pos] === '[') {
  116. parser.state_ = State.ARRAY_OPEN;
  117. pos++;
  118. parser.streamPos_++;
  119. } else {
  120. reportError('unexpected input token');
  121. }
  122. break;
  123. }
  124. case State.ARRAY_OPEN: {
  125. if (input[pos] === '[') {
  126. parser.state_ = State.MESSAGES;
  127. resetJsonStreamParser();
  128. // Feed the '[' again in the next loop.
  129. } else if (input[pos] === ',') {
  130. parser.state_ = State.MESSAGES_DONE;
  131. // Feed the ',' again in the next loop.
  132. } else if (input[pos] === ']') {
  133. parser.state_ = State.ARRAY_END;
  134. pos++;
  135. parser.streamPos_++;
  136. } else {
  137. reportError('unexpected input token');
  138. }
  139. break;
  140. }
  141. case State.MESSAGES: {
  142. var messages = parser.jsonStreamParser_.parse(input.substring(pos));
  143. addResultMessages(messages);
  144. if (!parser.jsonStreamParser_.done()) {
  145. parser.streamPos_ += input.length - pos;
  146. pos = input.length; // end the loop
  147. } else {
  148. parser.state_ = State.MESSAGES_DONE;
  149. var extra = parser.jsonStreamParser_.getExtraInput();
  150. parser.streamPos_ += input.length - pos - extra.length;
  151. input = extra;
  152. pos = 0;
  153. }
  154. break;
  155. }
  156. case State.MESSAGES_DONE: {
  157. if (input[pos] === ',') {
  158. parser.state_ = State.STATUS;
  159. resetJsonStreamParser();
  160. // Feed a dummy "[" to match the ending "]".
  161. parser.jsonStreamParser_.parse('[');
  162. pos++;
  163. parser.streamPos_++;
  164. } else if (input[pos] === ']') {
  165. parser.state_ = State.ARRAY_END;
  166. pos++;
  167. parser.streamPos_++;
  168. }
  169. break;
  170. }
  171. case State.STATUS: {
  172. var status = parser.jsonStreamParser_.parse(input.substring(pos));
  173. addResultStatus(status);
  174. if (!parser.jsonStreamParser_.done()) {
  175. parser.streamPos_ += input.length - pos;
  176. pos = input.length; // end the loop
  177. } else {
  178. parser.state_ = State.ARRAY_END;
  179. var extra = parser.jsonStreamParser_.getExtraInput();
  180. parser.streamPos_ += input.length - pos - extra.length;
  181. input = extra;
  182. pos = 0;
  183. }
  184. break;
  185. }
  186. case State.ARRAY_END: {
  187. reportError('extra input after stream end');
  188. break;
  189. }
  190. }
  191. }
  192. if (parser.result_.length > 0) {
  193. var results = parser.result_;
  194. parser.result_ = [];
  195. return results;
  196. }
  197. return null;
  198. /**
  199. * @param {string} errorMessage Additional error message
  200. * @throws {!Error} Throws an error indicating where the stream is broken
  201. */
  202. function reportError(errorMessage) {
  203. parser.state_ = State.INVALID;
  204. parser.errorMessage_ = 'The stream is broken @' + parser.streamPos_ + '/' +
  205. pos + '. Error: ' + errorMessage + '. With input:\n';
  206. throw new Error(parser.errorMessage_);
  207. }
  208. /**
  209. * Advances to the first non-whitespace input character.
  210. *
  211. * @return {boolean} return false if no more non-whitespace input character
  212. */
  213. function readMore() {
  214. while (pos < input.length) {
  215. if (!utils.isJsonWhitespace(input[pos])) {
  216. return true;
  217. }
  218. pos++;
  219. parser.streamPos_++;
  220. }
  221. return false;
  222. }
  223. function resetJsonStreamParser() {
  224. parser.jsonStreamParser_ = new JsonStreamParser({
  225. 'allowCompactJsonArrayFormat': true,
  226. 'deliverMessageAsRawString': true
  227. });
  228. }
  229. /** @param {?Array<string>} messages Parsed messages */
  230. function addResultMessages(messages) {
  231. if (messages) {
  232. for (var i = 0; i < messages.length; i++) {
  233. var tagged = {};
  234. tagged[1] = messages[i];
  235. parser.result_.push(tagged);
  236. }
  237. }
  238. }
  239. /** @param {?Array<string>} status Parsed status */
  240. function addResultStatus(status) {
  241. if (status) {
  242. if (parser.statusParsed_ || status.length > 1) {
  243. reportError('extra status: ' + status);
  244. }
  245. parser.statusParsed_ = true;
  246. var tagged = {};
  247. tagged[2] = status[0];
  248. parser.result_.push(tagged);
  249. }
  250. }
  251. };
  252. exports = PbJsonStreamParser;