index.mocha.js 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990
  1. /* eslint max-nested-callbacks: 0 */
  2. 'use strict';
  3. const assert = require('assert');
  4. const StreamTest = require('streamtest');
  5. const StreamQueue = require('../src');
  6. // Tests
  7. describe('StreamQueue', () => {
  8. // Iterating through versions
  9. StreamTest.versions.forEach(version => {
  10. describe('for ' + version + ' streams', () => {
  11. describe('in binary mode', () => {
  12. describe('and with async streams', () => {
  13. it('should work with functionnal API', done => {
  14. const createStreamQueue = StreamQueue;
  15. createStreamQueue(
  16. StreamTest[version].fromChunks(['wa', 'dup']),
  17. StreamTest[version].fromChunks(['pl', 'op']),
  18. StreamTest[version].fromChunks(['ki', 'koo', 'lol'])
  19. ).pipe(
  20. StreamTest[version].toText((err, text) => {
  21. if (err) {
  22. done(err);
  23. return;
  24. }
  25. assert.equal(text, 'wadupplopkikoolol');
  26. done();
  27. })
  28. );
  29. });
  30. it('should work with functionnal API and options', done => {
  31. const createStreamQueue = StreamQueue;
  32. createStreamQueue(
  33. {},
  34. StreamTest[version].fromChunks(['wa', 'dup']),
  35. StreamTest[version].fromChunks(['pl', 'op']),
  36. StreamTest[version].fromChunks(['ki', 'koo', 'lol'])
  37. ).pipe(
  38. StreamTest[version].toText((err, text) => {
  39. if (err) {
  40. done(err);
  41. return;
  42. }
  43. assert.equal(text, 'wadupplopkikoolol');
  44. done();
  45. })
  46. );
  47. });
  48. it('should work with POO API', done => {
  49. const queue = new StreamQueue();
  50. queue.queue(StreamTest[version].fromChunks(['wa', 'dup']));
  51. queue.queue(StreamTest[version].fromChunks(['pl', 'op']));
  52. queue.queue(StreamTest[version].fromChunks(['ki', 'koo', 'lol']));
  53. assert.equal(queue.length, 3);
  54. queue.pipe(
  55. StreamTest[version].toText((err, text) => {
  56. if (err) {
  57. done(err);
  58. return;
  59. }
  60. assert.equal(text, 'wadupplopkikoolol');
  61. done();
  62. })
  63. );
  64. queue.done();
  65. });
  66. it('should pause streams in flowing mode', done => {
  67. const queue = new StreamQueue({
  68. pauseFlowingStream: true,
  69. resumeFlowingStream: true,
  70. });
  71. const flowingStream = StreamTest[version].fromChunks(['pl', 'op']);
  72. flowingStream.on('data', () => {});
  73. queue.queue(StreamTest[version].fromChunks(['wa', 'dup']));
  74. queue.queue(flowingStream);
  75. queue.queue(StreamTest[version].fromChunks(['ki', 'koo', 'lol']));
  76. assert.equal(queue.length, 3);
  77. queue.pipe(
  78. StreamTest[version].toText((err, text) => {
  79. if (err) {
  80. done(err);
  81. return;
  82. }
  83. assert.equal(text, 'wadupplopkikoolol');
  84. done();
  85. })
  86. );
  87. queue.done();
  88. });
  89. it('should work with POO API and options', done => {
  90. const queue = new StreamQueue({
  91. pauseFlowingStream: true,
  92. resumeFlowingStream: true,
  93. });
  94. queue.queue(StreamTest[version].fromChunks(['wa', 'dup']));
  95. queue.queue(StreamTest[version].fromChunks(['pl', 'op']));
  96. queue.queue(StreamTest[version].fromChunks(['ki', 'koo', 'lol']));
  97. assert.equal(queue.length, 3);
  98. queue.pipe(
  99. StreamTest[version].toText((err, text) => {
  100. if (err) {
  101. done(err);
  102. return;
  103. }
  104. assert.equal(text, 'wadupplopkikoolol');
  105. done();
  106. })
  107. );
  108. queue.done();
  109. });
  110. it('should work with POO API and a late done call', done => {
  111. const queue = new StreamQueue();
  112. queue.queue(StreamTest[version].fromChunks(['wa', 'dup']));
  113. queue.queue(StreamTest[version].fromChunks(['pl', 'op']));
  114. queue.queue(StreamTest[version].fromChunks(['ki', 'koo', 'lol']));
  115. assert.equal(queue.length, 3);
  116. queue.pipe(
  117. StreamTest[version].toText((err, text) => {
  118. if (err) {
  119. done(err);
  120. return;
  121. }
  122. assert.equal(text, 'wadupplopkikoolol');
  123. done();
  124. })
  125. );
  126. setTimeout(() => {
  127. queue.done();
  128. }, 100);
  129. });
  130. it('should work with POO API and no stream plus sync done', done => {
  131. const queue = new StreamQueue();
  132. assert.equal(queue.length, 0);
  133. queue.queue();
  134. queue.pipe(
  135. StreamTest[version].toText((err, text) => {
  136. if (err) {
  137. done(err);
  138. return;
  139. }
  140. assert.equal(text, '');
  141. done();
  142. })
  143. );
  144. queue.done();
  145. });
  146. it('should work with POO API and no stream plus async done', done => {
  147. const queue = new StreamQueue();
  148. assert.equal(queue.length, 0);
  149. queue.queue();
  150. queue.pipe(
  151. StreamTest[version].toText((err, text) => {
  152. if (err) {
  153. done(err);
  154. return;
  155. }
  156. assert.equal(text, '');
  157. done();
  158. })
  159. );
  160. setTimeout(() => {
  161. queue.done();
  162. }, 100);
  163. });
  164. it('should work with POO API and a streamqueue stream plus async done', done => {
  165. const queue = new StreamQueue();
  166. const child = new StreamQueue();
  167. queue.queue(child);
  168. assert.equal(queue.length, 1);
  169. queue.pipe(
  170. StreamTest[version].toText((err, text) => {
  171. if (err) {
  172. done(err);
  173. return;
  174. }
  175. assert.equal(text, '');
  176. done();
  177. })
  178. );
  179. child.done();
  180. setTimeout(() => {
  181. queue.done();
  182. }, 100);
  183. });
  184. it('should work with POO API and a streamqueue stream plus async done', done => {
  185. const queue = new StreamQueue();
  186. const child = new StreamQueue();
  187. queue.queue(child);
  188. assert.equal(queue.length, 1);
  189. queue.pipe(
  190. StreamTest[version].toText((err, text) => {
  191. if (err) {
  192. done(err);
  193. return;
  194. }
  195. assert.equal(text, '');
  196. done();
  197. })
  198. );
  199. child.done();
  200. queue.done();
  201. });
  202. it('should work with POO API and a streamqueue ended stream plus async done', done => {
  203. const queue = new StreamQueue();
  204. const child = new StreamQueue();
  205. queue.queue(child);
  206. child.done();
  207. assert.equal(queue.length, 1);
  208. queue.pipe(
  209. StreamTest[version].toText((err, text) => {
  210. if (err) {
  211. done(err);
  212. return;
  213. }
  214. assert.equal(text, '');
  215. done();
  216. })
  217. );
  218. setTimeout(() => {
  219. queue.done();
  220. }, 100);
  221. });
  222. it('should fire end asynchronously with streams', done => {
  223. const queue = new StreamQueue();
  224. let ended = false;
  225. queue.queue(
  226. StreamTest[version].fromChunks(['wa', 'dup']).on('end', () => {
  227. assert.equal(ended, false);
  228. })
  229. );
  230. queue.queue(
  231. StreamTest[version].fromChunks(['pl', 'op']).on('end', () => {
  232. assert.equal(ended, false);
  233. })
  234. );
  235. queue.queue(
  236. StreamTest[version]
  237. .fromChunks(['ki', 'koo', 'lol'])
  238. .on('end', () => {
  239. assert.equal(ended, false);
  240. })
  241. );
  242. assert.equal(queue.length, 3);
  243. queue.pipe(
  244. StreamTest[version].toText((err, text) => {
  245. if (err) {
  246. done(err);
  247. return;
  248. }
  249. assert.equal(text, 'wadupplopkikoolol');
  250. done();
  251. })
  252. );
  253. queue.on('end', () => {
  254. ended = true;
  255. });
  256. queue.done();
  257. assert.equal(ended, false);
  258. });
  259. it('should fire end asynchronously when empty', done => {
  260. const queue = new StreamQueue();
  261. let ended = false;
  262. assert.equal(queue.length, 0);
  263. queue.pipe(
  264. StreamTest[version].toText((err, text) => {
  265. if (err) {
  266. done(err);
  267. return;
  268. }
  269. assert.equal(text, '');
  270. done();
  271. })
  272. );
  273. queue.on('end', () => {
  274. ended = true;
  275. });
  276. queue.done();
  277. assert.equal(ended, false);
  278. });
  279. it('should work with POO API and a streamqueue ended stream plus sync done', done => {
  280. const queue = new StreamQueue();
  281. const child = new StreamQueue();
  282. queue.queue(child);
  283. child.done();
  284. assert.equal(queue.length, 1);
  285. queue.pipe(
  286. StreamTest[version].toText((err, text) => {
  287. if (err) {
  288. done(err);
  289. return;
  290. }
  291. assert.equal(text, '');
  292. done();
  293. })
  294. );
  295. queue.done();
  296. });
  297. it('should work with POO API and a streamqueue ended stream plus async done', done => {
  298. const queue = new StreamQueue();
  299. const child = new StreamQueue();
  300. child.done();
  301. queue.queue(child);
  302. assert.equal(queue.length, 1);
  303. queue.pipe(
  304. StreamTest[version].toText((err, text) => {
  305. if (err) {
  306. done(err);
  307. return;
  308. }
  309. assert.equal(text, '');
  310. done();
  311. })
  312. );
  313. setTimeout(() => {
  314. queue.done();
  315. }, 100);
  316. });
  317. it('should work with POO API and a streamqueue ended stream plus sync done', done => {
  318. const queue = new StreamQueue();
  319. const child = new StreamQueue();
  320. child.done();
  321. queue.queue(child);
  322. assert.equal(queue.length, 1);
  323. queue.pipe(
  324. StreamTest[version].toText((err, text) => {
  325. if (err) {
  326. done(err);
  327. return;
  328. }
  329. assert.equal(text, '');
  330. done();
  331. })
  332. );
  333. queue.done();
  334. });
  335. if ('v2' === version) {
  336. it('should reemit errors', done => {
  337. let _err;
  338. const queue = new StreamQueue();
  339. queue.queue(
  340. StreamTest[version].fromErroredChunks(new Error('Aouch!'), [])
  341. );
  342. queue.queue(StreamTest[version].fromChunks(['wa', 'dup']));
  343. queue.queue(StreamTest[version].fromChunks(['pl', 'op']));
  344. queue.queue(StreamTest[version].fromChunks(['ki', 'koo', 'lol']));
  345. assert.equal(queue.length, 4);
  346. queue.on('error', err => {
  347. _err = err;
  348. });
  349. queue.pipe(
  350. StreamTest[version].toText((err, text) => {
  351. if (err) {
  352. done(err);
  353. return;
  354. }
  355. assert(_err);
  356. assert.equal(_err.message, 'Aouch!');
  357. assert.equal(text, 'wadupplopkikoolol');
  358. done();
  359. })
  360. );
  361. queue.done();
  362. });
  363. }
  364. if ('v2' === version) {
  365. it('should reemit errors elsewhere', done => {
  366. let _err;
  367. const queue = new StreamQueue();
  368. queue.queue(StreamTest[version].fromChunks(['wa', 'dup']));
  369. queue.queue(StreamTest[version].fromChunks(['pl', 'op']));
  370. queue.queue(
  371. StreamTest[version].fromErroredChunks(new Error('Aouch!'), [])
  372. );
  373. queue.queue(StreamTest[version].fromChunks(['ki', 'koo', 'lol']));
  374. assert.equal(queue.length, 4);
  375. queue.on('error', err => {
  376. _err = err;
  377. });
  378. queue.pipe(
  379. StreamTest[version].toText((err, text) => {
  380. if (err) {
  381. done(err);
  382. return;
  383. }
  384. assert(_err);
  385. assert.equal(_err.message, 'Aouch!');
  386. assert.equal(text, 'wadupplopkikoolol');
  387. done();
  388. })
  389. );
  390. queue.done();
  391. });
  392. }
  393. });
  394. describe('and with sync streams', () => {
  395. it('should work with functionnal API', done => {
  396. const stream1 = StreamTest[version].syncReadableChunks();
  397. const stream2 = StreamTest[version].syncReadableChunks();
  398. const stream3 = StreamTest[version].syncReadableChunks();
  399. const createStreamQueue = StreamQueue;
  400. createStreamQueue(stream1, stream2, stream3).pipe(
  401. StreamTest[version].toText((err, text) => {
  402. if (err) {
  403. done(err);
  404. return;
  405. }
  406. assert.equal(text, 'wadupplopkikoolol');
  407. done();
  408. })
  409. );
  410. StreamTest[version].syncWrite(stream1, ['wa', 'dup']);
  411. StreamTest[version].syncWrite(stream2, ['pl', 'op']);
  412. StreamTest[version].syncWrite(stream3, ['ki', 'koo', 'lol']);
  413. });
  414. it('should work with POO API', done => {
  415. const queue = new StreamQueue();
  416. const stream1 = StreamTest[version].syncReadableChunks();
  417. const stream2 = StreamTest[version].syncReadableChunks();
  418. const stream3 = StreamTest[version].syncReadableChunks();
  419. queue.queue(stream1);
  420. queue.queue(stream2);
  421. queue.queue(stream3);
  422. StreamTest[version].syncWrite(stream1, ['wa', 'dup']);
  423. StreamTest[version].syncWrite(stream2, ['pl', 'op']);
  424. StreamTest[version].syncWrite(stream3, ['ki', 'koo', 'lol']);
  425. assert.equal(queue.length, 3);
  426. queue.pipe(
  427. StreamTest[version].toText((err, text) => {
  428. if (err) {
  429. done(err);
  430. return;
  431. }
  432. assert.equal(text, 'wadupplopkikoolol');
  433. done();
  434. })
  435. );
  436. queue.done();
  437. });
  438. it('should emit an error when calling done twice', done => {
  439. const queue = new StreamQueue();
  440. const stream1 = StreamTest[version].syncReadableChunks();
  441. const stream2 = StreamTest[version].syncReadableChunks();
  442. const stream3 = StreamTest[version].syncReadableChunks();
  443. queue.queue(stream1);
  444. queue.queue(stream2);
  445. queue.queue(stream3);
  446. StreamTest[version].syncWrite(stream1, ['wa', 'dup']);
  447. StreamTest[version].syncWrite(stream2, ['pl', 'op']);
  448. StreamTest[version].syncWrite(stream3, ['ki', 'koo', 'lol']);
  449. assert.equal(queue.length, 3);
  450. queue.pipe(
  451. StreamTest[version].toText((err, text) => {
  452. if (err) {
  453. done(err);
  454. return;
  455. }
  456. assert.equal(text, 'wadupplopkikoolol');
  457. done();
  458. })
  459. );
  460. queue.done();
  461. assert.throws(() => {
  462. queue.done();
  463. });
  464. });
  465. it('should emit an error when queueing after done was called', done => {
  466. const queue = new StreamQueue();
  467. const stream1 = StreamTest[version].syncReadableChunks();
  468. const stream2 = StreamTest[version].syncReadableChunks();
  469. const stream3 = StreamTest[version].syncReadableChunks();
  470. queue.queue(stream1);
  471. queue.queue(stream2);
  472. queue.queue(stream3);
  473. StreamTest[version].syncWrite(stream1, ['wa', 'dup']);
  474. StreamTest[version].syncWrite(stream2, ['pl', 'op']);
  475. StreamTest[version].syncWrite(stream3, ['ki', 'koo', 'lol']);
  476. assert.equal(queue.length, 3);
  477. queue.pipe(
  478. StreamTest[version].toText((err, text) => {
  479. if (err) {
  480. done(err);
  481. return;
  482. }
  483. assert.equal(text, 'wadupplopkikoolol');
  484. done();
  485. })
  486. );
  487. queue.done();
  488. assert.throws(() => {
  489. queue.queue(StreamTest[version].syncReadableChunks());
  490. });
  491. });
  492. if ('v2' === version) {
  493. it('should reemit errors', done => {
  494. let _err;
  495. const queue = new StreamQueue();
  496. const stream1 = StreamTest[version].syncReadableChunks();
  497. const stream2 = StreamTest[version].syncReadableChunks();
  498. const stream3 = StreamTest[version].syncReadableChunks();
  499. const stream4 = StreamTest[version].syncReadableChunks();
  500. queue.queue(stream1);
  501. queue.queue(stream2);
  502. queue.queue(stream3);
  503. queue.queue(stream4);
  504. queue.on('error', err => {
  505. _err = err;
  506. });
  507. StreamTest[version].syncError(stream1, new Error('Aouch!'));
  508. StreamTest[version].syncWrite(stream2, ['wa', 'dup']);
  509. StreamTest[version].syncWrite(stream3, ['pl', 'op']);
  510. StreamTest[version].syncWrite(stream4, ['ki', 'koo', 'lol']);
  511. assert.equal(queue.length, 4);
  512. queue.pipe(
  513. StreamTest[version].toText((err, text) => {
  514. if (err) {
  515. done(err);
  516. return;
  517. }
  518. assert(_err);
  519. assert.equal(_err.message, 'Aouch!');
  520. assert.equal(text, 'wadupplopkikoolol');
  521. done();
  522. })
  523. );
  524. queue.done();
  525. });
  526. }
  527. });
  528. describe('and with functions returning streams', () => {
  529. it('should work with functionnal API', done => {
  530. const createStreamQueue = StreamQueue;
  531. createStreamQueue(
  532. StreamTest[version].fromChunks.bind(null, ['wa', 'dup']),
  533. StreamTest[version].fromChunks.bind(null, ['pl', 'op']),
  534. StreamTest[version].fromChunks.bind(null, ['ki', 'koo', 'lol'])
  535. ).pipe(
  536. StreamTest[version].toText((err, text) => {
  537. if (err) {
  538. done(err);
  539. return;
  540. }
  541. assert.equal(text, 'wadupplopkikoolol');
  542. done();
  543. })
  544. );
  545. });
  546. it('should work with functionnal API and options', done => {
  547. const createStreamQueue = StreamQueue;
  548. createStreamQueue(
  549. StreamTest[version].fromChunks.bind(null, ['wa', 'dup']),
  550. StreamTest[version].fromChunks.bind(null, ['pl', 'op']),
  551. StreamTest[version].fromChunks.bind(null, ['ki', 'koo', 'lol'])
  552. ).pipe(
  553. StreamTest[version].toText((err, text) => {
  554. if (err) {
  555. done(err);
  556. return;
  557. }
  558. assert.equal(text, 'wadupplopkikoolol');
  559. done();
  560. })
  561. );
  562. });
  563. it('should work with POO API', done => {
  564. const queue = new StreamQueue();
  565. queue.queue(
  566. StreamTest[version].fromChunks.bind(null, ['wa', 'dup'])
  567. );
  568. queue.queue(
  569. StreamTest[version].fromChunks.bind(null, ['pl', 'op'])
  570. );
  571. queue.queue(
  572. StreamTest[version].fromChunks.bind(null, ['ki', 'koo', 'lol'])
  573. );
  574. assert.equal(queue.length, 3);
  575. queue.pipe(
  576. StreamTest[version].toText((err, text) => {
  577. if (err) {
  578. done(err);
  579. return;
  580. }
  581. assert.equal(text, 'wadupplopkikoolol');
  582. done();
  583. })
  584. );
  585. queue.done();
  586. });
  587. it('should pause streams in flowing mode', done => {
  588. const queue = new StreamQueue({
  589. pauseFlowingStream: true,
  590. resumeFlowingStream: true,
  591. });
  592. queue.queue(
  593. StreamTest[version].fromChunks.bind(null, ['wa', 'dup'])
  594. );
  595. queue.queue(() => {
  596. const stream = StreamTest[version].fromChunks(['pl', 'op']);
  597. stream.on('data', () => {});
  598. return stream;
  599. });
  600. queue.queue(
  601. StreamTest[version].fromChunks.bind(null, ['ki', 'koo', 'lol'])
  602. );
  603. assert.equal(queue.length, 3);
  604. queue.pipe(
  605. StreamTest[version].toText((err, text) => {
  606. if (err) {
  607. done(err);
  608. return;
  609. }
  610. assert.equal(text, 'wadupplopkikoolol');
  611. done();
  612. })
  613. );
  614. queue.done();
  615. });
  616. it('should work with POO API and options', done => {
  617. const queue = new StreamQueue({
  618. pauseFlowingStream: true,
  619. resumeFlowingStream: true,
  620. });
  621. queue.queue(
  622. StreamTest[version].fromChunks.bind(null, ['wa', 'dup'])
  623. );
  624. queue.queue(
  625. StreamTest[version].fromChunks.bind(null, ['pl', 'op'])
  626. );
  627. queue.queue(
  628. StreamTest[version].fromChunks.bind(null, ['ki', 'koo', 'lol'])
  629. );
  630. assert.equal(queue.length, 3);
  631. queue.pipe(
  632. StreamTest[version].toText((err, text) => {
  633. if (err) {
  634. done(err);
  635. return;
  636. }
  637. assert.equal(text, 'wadupplopkikoolol');
  638. done();
  639. })
  640. );
  641. queue.done();
  642. });
  643. it('should work with POO API and a late done call', done => {
  644. const queue = new StreamQueue();
  645. queue.queue(
  646. StreamTest[version].fromChunks.bind(null, ['wa', 'dup'])
  647. );
  648. queue.queue(
  649. StreamTest[version].fromChunks.bind(null, ['pl', 'op'])
  650. );
  651. queue.queue(
  652. StreamTest[version].fromChunks.bind(null, ['ki', 'koo', 'lol'])
  653. );
  654. assert.equal(queue.length, 3);
  655. queue.pipe(
  656. StreamTest[version].toText((err, text) => {
  657. if (err) {
  658. done(err);
  659. return;
  660. }
  661. assert.equal(text, 'wadupplopkikoolol');
  662. done();
  663. })
  664. );
  665. setTimeout(() => {
  666. queue.done();
  667. }, 100);
  668. });
  669. if ('v2' === version) {
  670. it('should reemit errors', done => {
  671. let _err;
  672. const queue = new StreamQueue();
  673. queue.queue(
  674. StreamTest[version].fromErroredChunks.bind(
  675. null,
  676. new Error('Aouch!'),
  677. []
  678. )
  679. );
  680. queue.queue(
  681. StreamTest[version].fromChunks.bind(null, ['wa', 'dup'])
  682. );
  683. queue.queue(
  684. StreamTest[version].fromChunks.bind(null, ['pl', 'op'])
  685. );
  686. queue.queue(
  687. StreamTest[version].fromChunks.bind(null, ['ki', 'koo', 'lol'])
  688. );
  689. assert.equal(queue.length, 4);
  690. queue.on('error', err => {
  691. _err = err;
  692. });
  693. queue.pipe(
  694. StreamTest[version].toText((err, text) => {
  695. if (err) {
  696. done(err);
  697. return;
  698. }
  699. assert(_err);
  700. assert.equal(_err.message, 'Aouch!');
  701. assert.equal(text, 'wadupplopkikoolol');
  702. done();
  703. })
  704. );
  705. queue.done();
  706. });
  707. }
  708. if ('v2' === version) {
  709. it('should reemit errors elsewhere', done => {
  710. let _err;
  711. const queue = new StreamQueue();
  712. queue.queue(
  713. StreamTest[version].fromChunks.bind(null, ['wa', 'dup'])
  714. );
  715. queue.queue(
  716. StreamTest[version].fromChunks.bind(null, ['pl', 'op'])
  717. );
  718. queue.queue(
  719. StreamTest[version].fromErroredChunks.bind(
  720. null,
  721. new Error('Aouch!'),
  722. []
  723. )
  724. );
  725. queue.queue(
  726. StreamTest[version].fromChunks.bind(null, ['ki', 'koo', 'lol'])
  727. );
  728. assert.equal(queue.length, 4);
  729. queue.on('error', err => {
  730. _err = err;
  731. });
  732. queue.pipe(
  733. StreamTest[version].toText((err, text) => {
  734. if (err) {
  735. done(err);
  736. return;
  737. }
  738. assert(_err);
  739. assert.equal(_err.message, 'Aouch!');
  740. assert.equal(text, 'wadupplopkikoolol');
  741. done();
  742. })
  743. );
  744. queue.done();
  745. });
  746. }
  747. });
  748. });
  749. describe('in object mode', () => {
  750. it('should work', done => {
  751. const queue = new StreamQueue({ objectMode: true });
  752. queue.queue(
  753. StreamTest[version].fromObjects([{ s: 'wa' }, { s: 'dup' }])
  754. );
  755. queue.queue(
  756. StreamTest[version].fromObjects([{ s: 'pl' }, { s: 'op' }])
  757. );
  758. queue.queue(
  759. StreamTest[version].fromObjects([
  760. { s: 'ki' },
  761. { s: 'koo' },
  762. { s: 'lol' },
  763. ])
  764. );
  765. queue.pipe(
  766. StreamTest[version].toObjects((err, objs) => {
  767. if (err) {
  768. done(err);
  769. return;
  770. }
  771. assert.deepEqual(objs, [
  772. { s: 'wa' },
  773. { s: 'dup' },
  774. { s: 'pl' },
  775. { s: 'op' },
  776. { s: 'ki' },
  777. { s: 'koo' },
  778. { s: 'lol' },
  779. ]);
  780. done();
  781. })
  782. );
  783. queue.done();
  784. });
  785. });
  786. describe('in object mode with the .obj() shortcut', () => {
  787. it('should work without options', done => {
  788. StreamQueue.obj(
  789. StreamTest[version].fromObjects([{ s: 'wa' }, { s: 'dup' }]),
  790. StreamTest[version].fromObjects([{ s: 'pl' }, { s: 'op' }]),
  791. StreamTest[version].fromObjects([
  792. { s: 'ki' },
  793. { s: 'koo' },
  794. { s: 'lol' },
  795. ])
  796. ).pipe(
  797. StreamTest[version].toObjects((err, objs) => {
  798. if (err) {
  799. done(err);
  800. return;
  801. }
  802. assert.deepEqual(objs, [
  803. { s: 'wa' },
  804. { s: 'dup' },
  805. { s: 'pl' },
  806. { s: 'op' },
  807. { s: 'ki' },
  808. { s: 'koo' },
  809. { s: 'lol' },
  810. ]);
  811. done();
  812. })
  813. );
  814. });
  815. it('should work with options', done => {
  816. StreamQueue.obj(
  817. {},
  818. StreamTest[version].fromObjects([{ s: 'wa' }, { s: 'dup' }]),
  819. StreamTest[version].fromObjects([{ s: 'pl' }, { s: 'op' }]),
  820. StreamTest[version].fromObjects([
  821. { s: 'ki' },
  822. { s: 'koo' },
  823. { s: 'lol' },
  824. ])
  825. ).pipe(
  826. StreamTest[version].toObjects((err, objs) => {
  827. if (err) {
  828. done(err);
  829. return;
  830. }
  831. assert.deepEqual(objs, [
  832. { s: 'wa' },
  833. { s: 'dup' },
  834. { s: 'pl' },
  835. { s: 'op' },
  836. { s: 'ki' },
  837. { s: 'koo' },
  838. { s: 'lol' },
  839. ]);
  840. done();
  841. })
  842. );
  843. });
  844. it('should work without options nor streams', done => {
  845. const queue = StreamQueue.obj();
  846. queue.queue(
  847. StreamTest[version].fromObjects([{ s: 'wa' }, { s: 'dup' }])
  848. );
  849. queue.queue(
  850. StreamTest[version].fromObjects([{ s: 'pl' }, { s: 'op' }])
  851. );
  852. queue.queue(
  853. StreamTest[version].fromObjects([
  854. { s: 'ki' },
  855. { s: 'koo' },
  856. { s: 'lol' },
  857. ])
  858. );
  859. queue.done();
  860. queue.pipe(
  861. StreamTest[version].toObjects((err, objs) => {
  862. if (err) {
  863. done(err);
  864. return;
  865. }
  866. assert.deepEqual(objs, [
  867. { s: 'wa' },
  868. { s: 'dup' },
  869. { s: 'pl' },
  870. { s: 'op' },
  871. { s: 'ki' },
  872. { s: 'koo' },
  873. { s: 'lol' },
  874. ]);
  875. done();
  876. })
  877. );
  878. });
  879. it('should work with options and no streams', done => {
  880. const queue = StreamQueue.obj({});
  881. queue.queue(
  882. StreamTest[version].fromObjects([{ s: 'wa' }, { s: 'dup' }])
  883. );
  884. queue.queue(
  885. StreamTest[version].fromObjects([{ s: 'pl' }, { s: 'op' }])
  886. );
  887. queue.queue(
  888. StreamTest[version].fromObjects([
  889. { s: 'ki' },
  890. { s: 'koo' },
  891. { s: 'lol' },
  892. ])
  893. );
  894. queue.done();
  895. queue.pipe(
  896. StreamTest[version].toObjects((err, objs) => {
  897. if (err) {
  898. done(err);
  899. return;
  900. }
  901. assert.deepEqual(objs, [
  902. { s: 'wa' },
  903. { s: 'dup' },
  904. { s: 'pl' },
  905. { s: 'op' },
  906. { s: 'ki' },
  907. { s: 'koo' },
  908. { s: 'lol' },
  909. ]);
  910. done();
  911. })
  912. );
  913. });
  914. });
  915. });
  916. });
  917. });