message_handler_spec.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. /* Copyright 2017 Mozilla Foundation
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. import {
  16. AbortException,
  17. createPromiseCapability,
  18. UnknownErrorException,
  19. } from "../../src/shared/util.js";
  20. import { LoopbackPort } from "../../src/display/api.js";
  21. import { MessageHandler } from "../../src/shared/message_handler.js";
  22. describe("message_handler", function () {
  23. // Sleep function to wait for sometime, similar to setTimeout but faster.
  24. function sleep(ticks) {
  25. return Promise.resolve().then(() => {
  26. return ticks && sleep(ticks - 1);
  27. });
  28. }
  29. describe("sendWithStream", function () {
  30. it("should return a ReadableStream", function () {
  31. const port = new LoopbackPort();
  32. const messageHandler1 = new MessageHandler("main", "worker", port);
  33. const readable = messageHandler1.sendWithStream("fakeHandler");
  34. // Check if readable is an instance of ReadableStream.
  35. expect(typeof readable).toEqual("object");
  36. expect(typeof readable.getReader).toEqual("function");
  37. });
  38. it("should read using a reader", async function () {
  39. let log = "";
  40. const port = new LoopbackPort();
  41. const messageHandler1 = new MessageHandler("main", "worker", port);
  42. const messageHandler2 = new MessageHandler("worker", "main", port);
  43. messageHandler2.on("fakeHandler", (data, sink) => {
  44. sink.onPull = function () {
  45. log += "p";
  46. };
  47. sink.onCancel = function (reason) {
  48. log += "c";
  49. };
  50. sink.ready
  51. .then(() => {
  52. sink.enqueue("hi");
  53. return sink.ready;
  54. })
  55. .then(() => {
  56. sink.close();
  57. });
  58. return sleep(5);
  59. });
  60. const readable = messageHandler1.sendWithStream(
  61. "fakeHandler",
  62. {},
  63. {
  64. highWaterMark: 1,
  65. size() {
  66. return 1;
  67. },
  68. }
  69. );
  70. const reader = readable.getReader();
  71. await sleep(10);
  72. expect(log).toEqual("");
  73. let result = await reader.read();
  74. expect(log).toEqual("p");
  75. expect(result.value).toEqual("hi");
  76. expect(result.done).toEqual(false);
  77. await sleep(10);
  78. result = await reader.read();
  79. expect(result.value).toEqual(undefined);
  80. expect(result.done).toEqual(true);
  81. });
  82. it("should not read any data when cancelled", async function () {
  83. let log = "";
  84. const port = new LoopbackPort();
  85. const messageHandler2 = new MessageHandler("worker", "main", port);
  86. messageHandler2.on("fakeHandler", (data, sink) => {
  87. sink.onPull = function () {
  88. log += "p";
  89. };
  90. sink.onCancel = function (reason) {
  91. log += "c";
  92. };
  93. log += "0";
  94. sink.ready
  95. .then(() => {
  96. log += "1";
  97. sink.enqueue([1, 2, 3, 4], 4);
  98. return sink.ready;
  99. })
  100. .then(() => {
  101. log += "2";
  102. sink.enqueue([5, 6, 7, 8], 4);
  103. return sink.ready;
  104. })
  105. .then(
  106. () => {
  107. log += "3";
  108. sink.close();
  109. },
  110. () => {
  111. log += "4";
  112. }
  113. );
  114. });
  115. const messageHandler1 = new MessageHandler("main", "worker", port);
  116. const readable = messageHandler1.sendWithStream(
  117. "fakeHandler",
  118. {},
  119. {
  120. highWaterMark: 4,
  121. size(arr) {
  122. return arr.length;
  123. },
  124. }
  125. );
  126. const reader = readable.getReader();
  127. await sleep(10);
  128. expect(log).toEqual("01");
  129. const result = await reader.read();
  130. expect(result.value).toEqual([1, 2, 3, 4]);
  131. expect(result.done).toEqual(false);
  132. await sleep(10);
  133. expect(log).toEqual("01p2");
  134. await reader.cancel(new AbortException("reader cancelled."));
  135. expect(log).toEqual("01p2c4");
  136. });
  137. it("should not read when errored", async function () {
  138. let log = "";
  139. const port = new LoopbackPort();
  140. const messageHandler2 = new MessageHandler("worker", "main", port);
  141. messageHandler2.on("fakeHandler", (data, sink) => {
  142. sink.onPull = function () {
  143. log += "p";
  144. };
  145. sink.onCancel = function (reason) {
  146. log += "c";
  147. };
  148. log += "0";
  149. sink.ready
  150. .then(() => {
  151. log += "1";
  152. sink.enqueue([1, 2, 3, 4], 4);
  153. return sink.ready;
  154. })
  155. .then(() => {
  156. log += "e";
  157. sink.error(new Error("should not read when errored"));
  158. });
  159. });
  160. const messageHandler1 = new MessageHandler("main", "worker", port);
  161. const readable = messageHandler1.sendWithStream(
  162. "fakeHandler",
  163. {},
  164. {
  165. highWaterMark: 4,
  166. size(arr) {
  167. return arr.length;
  168. },
  169. }
  170. );
  171. const reader = readable.getReader();
  172. await sleep(10);
  173. expect(log).toEqual("01");
  174. const result = await reader.read();
  175. expect(result.value).toEqual([1, 2, 3, 4]);
  176. expect(result.done).toEqual(false);
  177. try {
  178. await reader.read();
  179. // Shouldn't get here.
  180. expect(false).toEqual(true);
  181. } catch (reason) {
  182. expect(log).toEqual("01pe");
  183. expect(reason instanceof UnknownErrorException).toEqual(true);
  184. expect(reason.message).toEqual("should not read when errored");
  185. }
  186. });
  187. it("should read data with blocking promise", async function () {
  188. let log = "";
  189. const port = new LoopbackPort();
  190. const messageHandler2 = new MessageHandler("worker", "main", port);
  191. messageHandler2.on("fakeHandler", (data, sink) => {
  192. sink.onPull = function () {
  193. log += "p";
  194. };
  195. sink.onCancel = function (reason) {
  196. log += "c";
  197. };
  198. log += "0";
  199. sink.ready
  200. .then(() => {
  201. log += "1";
  202. sink.enqueue([1, 2, 3, 4], 4);
  203. return sink.ready;
  204. })
  205. .then(() => {
  206. log += "2";
  207. sink.enqueue([5, 6, 7, 8], 4);
  208. return sink.ready;
  209. })
  210. .then(() => {
  211. sink.close();
  212. });
  213. });
  214. const messageHandler1 = new MessageHandler("main", "worker", port);
  215. const readable = messageHandler1.sendWithStream(
  216. "fakeHandler",
  217. {},
  218. {
  219. highWaterMark: 4,
  220. size(arr) {
  221. return arr.length;
  222. },
  223. }
  224. );
  225. const reader = readable.getReader();
  226. // Sleep for 10ms, so that read() is not unblocking the ready promise.
  227. // Chain all read() to stream in sequence.
  228. await sleep(10);
  229. expect(log).toEqual("01");
  230. let result = await reader.read();
  231. expect(result.value).toEqual([1, 2, 3, 4]);
  232. expect(result.done).toEqual(false);
  233. await sleep(10);
  234. expect(log).toEqual("01p2");
  235. result = await reader.read();
  236. expect(result.value).toEqual([5, 6, 7, 8]);
  237. expect(result.done).toEqual(false);
  238. await sleep(10);
  239. expect(log).toEqual("01p2p");
  240. result = await reader.read();
  241. expect(result.value).toEqual(undefined);
  242. expect(result.done).toEqual(true);
  243. });
  244. it(
  245. "should read data with blocking promise and buffer whole data" +
  246. " into stream",
  247. async function () {
  248. let log = "";
  249. const port = new LoopbackPort();
  250. const messageHandler2 = new MessageHandler("worker", "main", port);
  251. messageHandler2.on("fakeHandler", (data, sink) => {
  252. sink.onPull = function () {
  253. log += "p";
  254. };
  255. sink.onCancel = function (reason) {
  256. log += "c";
  257. };
  258. log += "0";
  259. sink.ready
  260. .then(() => {
  261. log += "1";
  262. sink.enqueue([1, 2, 3, 4], 4);
  263. return sink.ready;
  264. })
  265. .then(() => {
  266. log += "2";
  267. sink.enqueue([5, 6, 7, 8], 4);
  268. return sink.ready;
  269. })
  270. .then(() => {
  271. sink.close();
  272. });
  273. return sleep(10);
  274. });
  275. const messageHandler1 = new MessageHandler("main", "worker", port);
  276. const readable = messageHandler1.sendWithStream(
  277. "fakeHandler",
  278. {},
  279. {
  280. highWaterMark: 8,
  281. size(arr) {
  282. return arr.length;
  283. },
  284. }
  285. );
  286. const reader = readable.getReader();
  287. await sleep(10);
  288. expect(log).toEqual("012");
  289. let result = await reader.read();
  290. expect(result.value).toEqual([1, 2, 3, 4]);
  291. expect(result.done).toEqual(false);
  292. await sleep(10);
  293. expect(log).toEqual("012p");
  294. result = await reader.read();
  295. expect(result.value).toEqual([5, 6, 7, 8]);
  296. expect(result.done).toEqual(false);
  297. await sleep(10);
  298. expect(log).toEqual("012p");
  299. result = await reader.read();
  300. expect(result.value).toEqual(undefined);
  301. expect(result.done).toEqual(true);
  302. }
  303. );
  304. it("should ignore any pull after close is called", async function () {
  305. let log = "";
  306. const port = new LoopbackPort();
  307. const capability = createPromiseCapability();
  308. const messageHandler2 = new MessageHandler("worker", "main", port);
  309. messageHandler2.on("fakeHandler", (data, sink) => {
  310. sink.onPull = function () {
  311. log += "p";
  312. };
  313. sink.onCancel = function (reason) {
  314. log += "c";
  315. };
  316. log += "0";
  317. sink.ready.then(() => {
  318. log += "1";
  319. sink.enqueue([1, 2, 3, 4], 4);
  320. });
  321. return capability.promise.then(() => {
  322. sink.close();
  323. });
  324. });
  325. const messageHandler1 = new MessageHandler("main", "worker", port);
  326. const readable = messageHandler1.sendWithStream(
  327. "fakeHandler",
  328. {},
  329. {
  330. highWaterMark: 10,
  331. size(arr) {
  332. return arr.length;
  333. },
  334. }
  335. );
  336. const reader = readable.getReader();
  337. await sleep(10);
  338. expect(log).toEqual("01");
  339. capability.resolve();
  340. await capability.promise;
  341. let result = await reader.read();
  342. expect(result.value).toEqual([1, 2, 3, 4]);
  343. expect(result.done).toEqual(false);
  344. await sleep(10);
  345. expect(log).toEqual("01");
  346. result = await reader.read();
  347. expect(result.value).toEqual(undefined);
  348. expect(result.done).toEqual(true);
  349. });
  350. });
  351. });