test.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. 'use strict'
  2. var fs = require('fs')
  3. var path = require('path')
  4. var test = require('tape').test
  5. var from = require('from2')
  6. var crypto = require('crypto')
  7. var sink = require('flush-write-stream')
  8. var pump = require('pump')
  9. var cloneable = require('./')
  10. test('basic passthrough', function (t) {
  11. t.plan(2)
  12. var read = false
  13. var source = from(function (size, next) {
  14. if (read) {
  15. this.push(null)
  16. } else {
  17. read = true
  18. this.push('hello world')
  19. }
  20. next()
  21. })
  22. var instance = cloneable(source)
  23. t.notOk(read, 'stream not started')
  24. instance.pipe(sink(function (chunk, enc, cb) {
  25. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  26. cb()
  27. }))
  28. })
  29. test('clone sync', function (t) {
  30. t.plan(4)
  31. var read = false
  32. var source = from(function (size, next) {
  33. if (read) {
  34. this.push(null)
  35. } else {
  36. read = true
  37. this.push('hello world')
  38. }
  39. next()
  40. })
  41. var instance = cloneable(source)
  42. t.notOk(read, 'stream not started')
  43. var cloned = instance.clone()
  44. t.notOk(read, 'stream not started')
  45. instance.pipe(sink(function (chunk, enc, cb) {
  46. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  47. cb()
  48. }))
  49. cloned.pipe(sink(function (chunk, enc, cb) {
  50. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  51. cb()
  52. }))
  53. })
  54. test('clone async', function (t) {
  55. t.plan(4)
  56. var read = false
  57. var source = from(function (size, next) {
  58. if (read) {
  59. this.push(null)
  60. } else {
  61. read = true
  62. this.push('hello world')
  63. }
  64. next()
  65. })
  66. var instance = cloneable(source)
  67. t.notOk(read, 'stream not started')
  68. var cloned = instance.clone()
  69. t.notOk(read, 'stream not started')
  70. instance.pipe(sink(function (chunk, enc, cb) {
  71. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  72. cb()
  73. }))
  74. setImmediate(function () {
  75. cloned.pipe(sink(function (chunk, enc, cb) {
  76. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  77. cb()
  78. }))
  79. })
  80. })
  81. test('basic passthrough in obj mode', function (t) {
  82. t.plan(2)
  83. var read = false
  84. var source = from.obj(function (size, next) {
  85. if (read) {
  86. return this.push(null)
  87. } else {
  88. read = true
  89. this.push({ hello: 'world' })
  90. }
  91. next()
  92. })
  93. var instance = cloneable(source)
  94. t.notOk(read, 'stream not started')
  95. instance.pipe(sink.obj(function (chunk, enc, cb) {
  96. t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
  97. cb()
  98. }))
  99. })
  100. test('multiple clone in object mode', function (t) {
  101. t.plan(4)
  102. var read = false
  103. var source = from.obj(function (size, next) {
  104. if (read) {
  105. return this.push(null)
  106. } else {
  107. read = true
  108. this.push({ hello: 'world' })
  109. }
  110. next()
  111. })
  112. var instance = cloneable(source)
  113. t.notOk(read, 'stream not started')
  114. var cloned = instance.clone()
  115. t.notOk(read, 'stream not started')
  116. instance.pipe(sink.obj(function (chunk, enc, cb) {
  117. t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
  118. cb()
  119. }))
  120. setImmediate(function () {
  121. cloned.pipe(sink.obj(function (chunk, enc, cb) {
  122. t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
  123. cb()
  124. }))
  125. })
  126. })
  127. test('basic passthrough with data event', function (t) {
  128. t.plan(2)
  129. var read = false
  130. var source = from(function (size, next) {
  131. if (read) {
  132. this.push(null)
  133. } else {
  134. read = true
  135. this.push('hello world')
  136. }
  137. next()
  138. })
  139. var instance = cloneable(source)
  140. t.notOk(read, 'stream not started')
  141. var data = ''
  142. instance.on('data', function (chunk) {
  143. data += chunk.toString()
  144. })
  145. instance.on('end', function () {
  146. t.equal(data, 'hello world', 'chunk matches')
  147. })
  148. })
  149. test('basic passthrough with data event on clone', function (t) {
  150. t.plan(3)
  151. var read = false
  152. var source = from(function (size, next) {
  153. if (read) {
  154. this.push(null)
  155. } else {
  156. read = true
  157. this.push('hello world')
  158. }
  159. next()
  160. })
  161. var instance = cloneable(source)
  162. var cloned = instance.clone()
  163. t.notOk(read, 'stream not started')
  164. var data = ''
  165. cloned.on('data', function (chunk) {
  166. data += chunk.toString()
  167. })
  168. cloned.on('end', function () {
  169. t.equal(data, 'hello world', 'chunk matches in clone')
  170. })
  171. instance.pipe(sink(function (chunk, enc, cb) {
  172. t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
  173. cb()
  174. }))
  175. })
  176. test('errors if cloned after start', function (t) {
  177. t.plan(2)
  178. var source = from(function (size, next) {
  179. this.push('hello world')
  180. this.push(null)
  181. next()
  182. })
  183. var instance = cloneable(source)
  184. instance.pipe(sink(function (chunk, enc, cb) {
  185. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  186. t.throws(function () {
  187. instance.clone()
  188. }, 'throws if cloned after start')
  189. cb()
  190. }))
  191. })
  192. test('basic passthrough with readable event', function (t) {
  193. t.plan(2)
  194. var read = false
  195. var source = from(function (size, next) {
  196. if (read) {
  197. this.push(null)
  198. } else {
  199. read = true
  200. this.push('hello world')
  201. }
  202. next()
  203. })
  204. var instance = cloneable(source)
  205. t.notOk(read, 'stream not started')
  206. var data = ''
  207. instance.on('readable', function () {
  208. var chunk
  209. while ((chunk = this.read()) !== null) {
  210. data += chunk.toString()
  211. }
  212. })
  213. instance.on('end', function () {
  214. t.equal(data, 'hello world', 'chunk matches')
  215. })
  216. })
  217. test('basic passthrough with readable event on clone', function (t) {
  218. t.plan(3)
  219. var read = false
  220. var source = from(function (size, next) {
  221. if (read) {
  222. this.push(null)
  223. } else {
  224. read = true
  225. this.push('hello world')
  226. }
  227. next()
  228. })
  229. var instance = cloneable(source)
  230. var cloned = instance.clone()
  231. t.notOk(read, 'stream not started')
  232. var data = ''
  233. cloned.on('readable', function () {
  234. var chunk
  235. while ((chunk = this.read()) !== null) {
  236. data += chunk.toString()
  237. }
  238. })
  239. cloned.on('end', function () {
  240. t.equal(data, 'hello world', 'chunk matches in clone')
  241. })
  242. instance.pipe(sink(function (chunk, enc, cb) {
  243. t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
  244. cb()
  245. }))
  246. })
  247. test('source error destroys all', function (t) {
  248. t.plan(3)
  249. var source = from()
  250. var instance = cloneable(source)
  251. var clone = instance.clone()
  252. source.on('error', function (err) {
  253. t.ok(err, 'source errors')
  254. instance.on('error', function (err2) {
  255. t.ok(err === err2, 'instance receives same error')
  256. })
  257. clone.on('error', function (err3) {
  258. t.ok(err === err3, 'clone receives same error')
  259. })
  260. })
  261. source.emit('error', new Error())
  262. })
  263. test('source destroy destroys all', function (t) {
  264. t.plan(2)
  265. var source = from()
  266. var instance = cloneable(source)
  267. var clone = instance.clone()
  268. instance.on('end', function () {
  269. t.pass('instance has ended')
  270. })
  271. clone.on('end', function () {
  272. t.pass('clone has ended')
  273. })
  274. clone.resume()
  275. instance.resume()
  276. source.destroy()
  277. })
  278. test('instance error destroys all but the source', function (t) {
  279. t.plan(2)
  280. var source = from()
  281. var instance = cloneable(source)
  282. var clone = instance.clone()
  283. source.on('close', function () {
  284. t.fail('source should not be closed')
  285. })
  286. instance.on('error', function (err) {
  287. t.is(err.message, 'beep', 'instance errors')
  288. })
  289. instance.on('close', function () {
  290. t.fail('close should not be emitted')
  291. })
  292. clone.on('error', function (err) {
  293. t.is(err.message, 'beep', 'instance errors')
  294. })
  295. clone.on('close', function () {
  296. t.fail('close should not be emitted')
  297. })
  298. instance.destroy(new Error('beep'))
  299. })
  300. test('instance destroy destroys all but the source', function (t) {
  301. t.plan(2)
  302. var source = from()
  303. var instance = cloneable(source)
  304. var clone = instance.clone()
  305. source.on('close', function () {
  306. t.fail('source should not be closed')
  307. })
  308. instance.on('end', function () {
  309. t.pass('instance has ended')
  310. })
  311. clone.on('end', function () {
  312. t.pass('clone has ended')
  313. })
  314. instance.resume()
  315. clone.resume()
  316. instance.destroy()
  317. })
  318. test('clone destroy does not affect other clones, cloneable or source', function (t) {
  319. t.plan(1)
  320. var source = from()
  321. var instance = cloneable(source)
  322. var clone = instance.clone()
  323. var other = instance.clone()
  324. source.on('close', function () {
  325. t.fail('source should not be closed')
  326. })
  327. instance.on('close', function () {
  328. t.fail('instance should not be closed')
  329. })
  330. other.on('close', function () {
  331. t.fail('other clone should not be closed')
  332. })
  333. clone.on('close', function () {
  334. t.pass('clone is closed')
  335. })
  336. clone.destroy()
  337. })
  338. test('clone remains readable if other is destroyed', function (t) {
  339. t.plan(3)
  340. var read = false
  341. var source = from(function (size, next) {
  342. if (read) {
  343. this.push(null)
  344. } else {
  345. read = true
  346. this.push('hello')
  347. }
  348. next()
  349. })
  350. var instance = cloneable(source)
  351. var clone = instance.clone()
  352. var other = instance.clone()
  353. instance.pipe(sink.obj(function (chunk, enc, cb) {
  354. t.deepEqual(chunk.toString(), 'hello', 'instance chunk matches')
  355. cb()
  356. }))
  357. clone.pipe(sink.obj(function (chunk, enc, cb) {
  358. t.deepEqual(chunk.toString(), 'hello', 'clone chunk matches')
  359. cb()
  360. }))
  361. clone.on('close', function () {
  362. t.fail('clone should not be closed')
  363. })
  364. instance.on('close', function () {
  365. t.fail('instance should not be closed')
  366. })
  367. other.on('close', function () {
  368. t.pass('other is closed')
  369. })
  370. other.destroy()
  371. })
  372. test('clone of clone', function (t) {
  373. t.plan(6)
  374. var read = false
  375. var source = from(function (size, next) {
  376. if (read) {
  377. this.push(null)
  378. } else {
  379. read = true
  380. this.push('hello world')
  381. }
  382. next()
  383. })
  384. var instance = cloneable(source)
  385. t.notOk(read, 'stream not started')
  386. var cloned = instance.clone()
  387. t.notOk(read, 'stream not started')
  388. var replica = cloned.clone()
  389. t.notOk(read, 'stream not started')
  390. instance.pipe(sink(function (chunk, enc, cb) {
  391. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  392. cb()
  393. }))
  394. cloned.pipe(sink(function (chunk, enc, cb) {
  395. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  396. cb()
  397. }))
  398. replica.pipe(sink(function (chunk, enc, cb) {
  399. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  400. cb()
  401. }))
  402. })
  403. test('from vinyl', function (t) {
  404. t.plan(3)
  405. var source = from(['wa', 'dup'])
  406. var instance = cloneable(source)
  407. var clone = instance.clone()
  408. var data = ''
  409. var data2 = ''
  410. var ends = 2
  411. function latch () {
  412. if (--ends === 0) {
  413. t.equal(data, data2)
  414. }
  415. }
  416. instance.on('data', function (chunk) {
  417. data += chunk.toString()
  418. })
  419. process.nextTick(function () {
  420. t.equal('', data, 'nothing was written yet')
  421. t.equal('', data2, 'nothing was written yet')
  422. clone.on('data', function (chunk) {
  423. data2 += chunk.toString()
  424. })
  425. })
  426. instance.on('end', latch)
  427. clone.on('end', latch)
  428. })
  429. test('waits till all are flowing', function (t) {
  430. t.plan(1)
  431. var source = from(['wa', 'dup'])
  432. var instance = cloneable(source)
  433. // we create a clone
  434. instance.clone()
  435. instance.on('data', function (chunk) {
  436. t.fail('this should never happen')
  437. })
  438. process.nextTick(function () {
  439. t.pass('wait till nextTick')
  440. })
  441. })
  442. test('isCloneable', function (t) {
  443. t.plan(4)
  444. var source = from(['hello', ' ', 'world'])
  445. t.notOk(cloneable.isCloneable(source), 'a generic readable is not cloneable')
  446. var instance = cloneable(source)
  447. t.ok(cloneable.isCloneable(instance), 'a cloneable is cloneable')
  448. var clone = instance.clone()
  449. t.ok(cloneable.isCloneable(clone), 'a clone is cloneable')
  450. var cloneClone = clone.clone()
  451. t.ok(cloneable.isCloneable(cloneClone), 'a clone of a clone is cloneable')
  452. })
  453. test('emits finish', function (t) {
  454. var chunks = ['a', 'b', 'c', 'd', null]
  455. var e1 = ['a', 'b', 'c', 'd']
  456. var e2 = ['a', 'b', 'c', 'd']
  457. t.plan(2 + e1.length + e2.length)
  458. var source = from(function (size, next) {
  459. setImmediate(next, null, chunks.shift())
  460. })
  461. var instance = cloneable(source)
  462. var clone = instance.clone()
  463. clone.on('finish', t.pass.bind(null, 'clone emits finish'))
  464. instance.on('finish', t.pass.bind(null, 'main emits finish'))
  465. instance.pipe(sink(function (chunk, enc, cb) {
  466. t.equal(chunk.toString(), e1.shift(), 'chunk matches')
  467. cb()
  468. }))
  469. clone.on('data', function (chunk) {
  470. t.equal(chunk.toString(), e2.shift(), 'chunk matches')
  471. })
  472. })
  473. test('clone async w resume', function (t) {
  474. t.plan(4)
  475. var read = false
  476. var source = from(function (size, next) {
  477. if (read) {
  478. this.push(null)
  479. } else {
  480. read = true
  481. this.push('hello world')
  482. }
  483. next()
  484. })
  485. var instance = cloneable(source)
  486. t.notOk(read, 'stream not started')
  487. var cloned = instance.clone()
  488. t.notOk(read, 'stream not started')
  489. instance.on('end', t.pass.bind(null, 'end emitted'))
  490. instance.resume()
  491. setImmediate(function () {
  492. cloned.on('end', t.pass.bind(null, 'end emitted'))
  493. cloned.resume()
  494. })
  495. })
  496. test('big file', function (t) {
  497. t.plan(13)
  498. var stream = cloneable(fs.createReadStream(path.join(__dirname, 'big')))
  499. var hash = crypto.createHash('sha1')
  500. hash.setEncoding('hex')
  501. var toCheck
  502. fs.createReadStream(path.join(__dirname, 'big'))
  503. .pipe(hash)
  504. .once('readable', function () {
  505. toCheck = hash.read()
  506. t.ok(toCheck)
  507. })
  508. function pipe (s, num) {
  509. s.on('end', function () {
  510. t.pass('end for ' + num)
  511. })
  512. var dest = path.join(__dirname, 'out')
  513. s.pipe(fs.createWriteStream(dest))
  514. .on('finish', function () {
  515. t.pass('finish for ' + num)
  516. var destHash = crypto.createHash('sha1')
  517. destHash.setEncoding('hex')
  518. fs.createReadStream(dest)
  519. .pipe(destHash)
  520. .once('readable', function () {
  521. var hash = destHash.read()
  522. t.ok(hash)
  523. t.equal(hash, toCheck)
  524. })
  525. })
  526. }
  527. // Pipe in another event loop tick <-- this one finished only, it's the original cloneable.
  528. setImmediate(pipe.bind(null, stream, 1))
  529. // Pipe in the same event loop tick
  530. pipe(stream.clone(), 0)
  531. // Pipe a long time after
  532. setTimeout(pipe.bind(null, stream.clone(), 2), 1000)
  533. })
  534. test('pump error', function (t) {
  535. t.plan(1)
  536. var err = new Error('kaboom')
  537. pump([
  538. cloneable(from(function () {
  539. this.destroy(err)
  540. })),
  541. sink(function (chunk, enc, cb) {
  542. t.fail('this should not be called')
  543. })
  544. ], function (_err) {
  545. t.equal(_err, err)
  546. })
  547. })