| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702 | 'use strict'var fs = require('fs')var path = require('path')var test = require('tape').testvar from = require('from2')var crypto = require('crypto')var sink = require('flush-write-stream')var pump = require('pump')var cloneable = require('./')test('basic passthrough', function (t) {  t.plan(2)  var read = false  var source = from(function (size, next) {    if (read) {      this.push(null)    } else {      read = true      this.push('hello world')    }    next()  })  var instance = cloneable(source)  t.notOk(read, 'stream not started')  instance.pipe(sink(function (chunk, enc, cb) {    t.equal(chunk.toString(), 'hello world', 'chunk matches')    cb()  }))})test('clone sync', function (t) {  t.plan(4)  var read = false  var source = from(function (size, next) {    if (read) {      this.push(null)    } else {      read = true      this.push('hello world')    }    next()  })  var instance = cloneable(source)  t.notOk(read, 'stream not started')  var cloned = instance.clone()  t.notOk(read, 'stream not started')  instance.pipe(sink(function (chunk, enc, cb) {    t.equal(chunk.toString(), 'hello world', 'chunk matches')    cb()  }))  cloned.pipe(sink(function (chunk, enc, cb) {    t.equal(chunk.toString(), 'hello world', 'chunk matches')    cb()  }))})test('clone async', function (t) {  t.plan(4)  var read = false  var source = from(function (size, next) {    if (read) {      this.push(null)    } else {      read = true      this.push('hello world')    }    next()  })  var instance = cloneable(source)  t.notOk(read, 'stream not started')  var cloned = instance.clone()  t.notOk(read, 'stream not started')  instance.pipe(sink(function (chunk, enc, cb) {    t.equal(chunk.toString(), 'hello world', 'chunk matches')    cb()  }))  setImmediate(function () {    cloned.pipe(sink(function (chunk, enc, cb) {      t.equal(chunk.toString(), 'hello world', 'chunk matches')      cb()    }))  })})test('basic passthrough in obj mode', function (t) {  t.plan(2)  var read = false  var source = from.obj(function (size, next) {    if (read) {      return this.push(null)    } else {      read = true      this.push({ hello: 'world' })    }    next()  })  var instance = cloneable(source)  t.notOk(read, 'stream not started')  instance.pipe(sink.obj(function (chunk, enc, cb) {    t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')    cb()  }))})test('multiple clone in object mode', function (t) {  t.plan(4)  var read = false  var source = from.obj(function (size, next) {    if (read) {      return this.push(null)    } else {      read = true      this.push({ hello: 'world' })    }    next()  })  var instance = cloneable(source)  t.notOk(read, 'stream not started')  var cloned = instance.clone()  t.notOk(read, 'stream not started')  instance.pipe(sink.obj(function (chunk, enc, cb) {    t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')    cb()  }))  setImmediate(function () {    cloned.pipe(sink.obj(function (chunk, enc, cb) {      t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')      cb()    }))  })})test('basic passthrough with data event', function (t) {  t.plan(2)  var read = false  var source = from(function (size, next) {    if (read) {      this.push(null)    } else {      read = true      this.push('hello world')    }    next()  })  var instance = cloneable(source)  t.notOk(read, 'stream not started')  var data = ''  instance.on('data', function (chunk) {    data += chunk.toString()  })  instance.on('end', function () {    t.equal(data, 'hello world', 'chunk matches')  })})test('basic passthrough with data event on clone', function (t) {  t.plan(3)  var read = false  var source = from(function (size, next) {    if (read) {      this.push(null)    } else {      read = true      this.push('hello world')    }    next()  })  var instance = cloneable(source)  var cloned = instance.clone()  t.notOk(read, 'stream not started')  var data = ''  cloned.on('data', function (chunk) {    data += chunk.toString()  })  cloned.on('end', function () {    t.equal(data, 'hello world', 'chunk matches in clone')  })  instance.pipe(sink(function (chunk, enc, cb) {    t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')    cb()  }))})test('errors if cloned after start', function (t) {  t.plan(2)  var source = from(function (size, next) {    this.push('hello world')    this.push(null)    next()  })  var instance = cloneable(source)  instance.pipe(sink(function (chunk, enc, cb) {    t.equal(chunk.toString(), 'hello world', 'chunk matches')    t.throws(function () {      instance.clone()    }, 'throws if cloned after start')    cb()  }))})test('basic passthrough with readable event', function (t) {  t.plan(2)  var read = false  var source = from(function (size, next) {    if (read) {      this.push(null)    } else {      read = true      this.push('hello world')    }    next()  })  var instance = cloneable(source)  t.notOk(read, 'stream not started')  var data = ''  instance.on('readable', function () {    var chunk    while ((chunk = this.read()) !== null) {      data += chunk.toString()    }  })  instance.on('end', function () {    t.equal(data, 'hello world', 'chunk matches')  })})test('basic passthrough with readable event on clone', function (t) {  t.plan(3)  var read = false  var source = from(function (size, next) {    if (read) {      this.push(null)    } else {      read = true      this.push('hello world')    }    next()  })  var instance = cloneable(source)  var cloned = instance.clone()  t.notOk(read, 'stream not started')  var data = ''  cloned.on('readable', function () {    var chunk    while ((chunk = this.read()) !== null) {      data += chunk.toString()    }  })  cloned.on('end', function () {    t.equal(data, 'hello world', 'chunk matches in clone')  })  instance.pipe(sink(function (chunk, enc, cb) {    t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')    cb()  }))})test('source error destroys all', function (t) {  t.plan(3)  var source = from()  var instance = cloneable(source)  var clone = instance.clone()  source.on('error', function (err) {    t.ok(err, 'source errors')    instance.on('error', function (err2) {      t.ok(err === err2, 'instance receives same error')    })    clone.on('error', function (err3) {      t.ok(err === err3, 'clone receives same error')    })  })  source.emit('error', new Error())})test('source destroy destroys all', function (t) {  t.plan(2)  var source = from()  var instance = cloneable(source)  var clone = instance.clone()  instance.on('end', function () {    t.pass('instance has ended')  })  clone.on('end', function () {    t.pass('clone has ended')  })  clone.resume()  instance.resume()  source.destroy()})test('instance error destroys all but the source', function (t) {  t.plan(2)  var source = from()  var instance = cloneable(source)  var clone = instance.clone()  source.on('close', function () {    t.fail('source should not be closed')  })  instance.on('error', function (err) {    t.is(err.message, 'beep', 'instance errors')  })  instance.on('close', function () {    t.fail('close should not be emitted')  })  clone.on('error', function (err) {    t.is(err.message, 'beep', 'instance errors')  })  clone.on('close', function () {    t.fail('close should not be emitted')  })  instance.destroy(new Error('beep'))})test('instance destroy destroys all but the source', function (t) {  t.plan(2)  var source = from()  var instance = cloneable(source)  var clone = instance.clone()  source.on('close', function () {    t.fail('source should not be closed')  })  instance.on('end', function () {    t.pass('instance has ended')  })  clone.on('end', function () {    t.pass('clone has ended')  })  instance.resume()  clone.resume()  instance.destroy()})test('clone destroy does not affect other clones, cloneable or source', function (t) {  t.plan(1)  var source = from()  var instance = cloneable(source)  var clone = instance.clone()  var other = instance.clone()  source.on('close', function () {    t.fail('source should not be closed')  })  instance.on('close', function () {    t.fail('instance should not be closed')  })  other.on('close', function () {    t.fail('other clone should not be closed')  })  clone.on('close', function () {    t.pass('clone is closed')  })  clone.destroy()})test('clone remains readable if other is destroyed', function (t) {  t.plan(3)  var read = false  var source = from(function (size, next) {    if (read) {      this.push(null)    } else {      read = true      this.push('hello')    }    next()  })  var instance = cloneable(source)  var clone = instance.clone()  var other = instance.clone()  instance.pipe(sink.obj(function (chunk, enc, cb) {    t.deepEqual(chunk.toString(), 'hello', 'instance chunk matches')    cb()  }))  clone.pipe(sink.obj(function (chunk, enc, cb) {    t.deepEqual(chunk.toString(), 'hello', 'clone chunk matches')    cb()  }))  clone.on('close', function () {    t.fail('clone should not be closed')  })  instance.on('close', function () {    t.fail('instance should not be closed')  })  other.on('close', function () {    t.pass('other is closed')  })  other.destroy()})test('clone of clone', function (t) {  t.plan(6)  var read = false  var source = from(function (size, next) {    if (read) {      this.push(null)    } else {      read = true      this.push('hello world')    }    next()  })  var instance = cloneable(source)  t.notOk(read, 'stream not started')  var cloned = instance.clone()  t.notOk(read, 'stream not started')  var replica = cloned.clone()  t.notOk(read, 'stream not started')  instance.pipe(sink(function (chunk, enc, cb) {    t.equal(chunk.toString(), 'hello world', 'chunk matches')    cb()  }))  cloned.pipe(sink(function (chunk, enc, cb) {    t.equal(chunk.toString(), 'hello world', 'chunk matches')    cb()  }))  replica.pipe(sink(function (chunk, enc, cb) {    t.equal(chunk.toString(), 'hello world', 'chunk matches')    cb()  }))})test('from vinyl', function (t) {  t.plan(3)  var source = from(['wa', 'dup'])  var instance = cloneable(source)  var clone = instance.clone()  var data = ''  var data2 = ''  var ends = 2  function latch () {    if (--ends === 0) {      t.equal(data, data2)    }  }  instance.on('data', function (chunk) {    data += chunk.toString()  })  process.nextTick(function () {    t.equal('', data, 'nothing was written yet')    t.equal('', data2, 'nothing was written yet')    clone.on('data', function (chunk) {      data2 += chunk.toString()    })  })  instance.on('end', latch)  clone.on('end', latch)})test('waits till all are flowing', function (t) {  t.plan(1)  var source = from(['wa', 'dup'])  var instance = cloneable(source)  // we create a clone  instance.clone()  instance.on('data', function (chunk) {    t.fail('this should never happen')  })  process.nextTick(function () {    t.pass('wait till nextTick')  })})test('isCloneable', function (t) {  t.plan(4)  var source = from(['hello', ' ', 'world'])  t.notOk(cloneable.isCloneable(source), 'a generic readable is not cloneable')  var instance = cloneable(source)  t.ok(cloneable.isCloneable(instance), 'a cloneable is cloneable')  var clone = instance.clone()  t.ok(cloneable.isCloneable(clone), 'a clone is cloneable')  var cloneClone = clone.clone()  t.ok(cloneable.isCloneable(cloneClone), 'a clone of a clone is cloneable')})test('emits finish', function (t) {  var chunks = ['a', 'b', 'c', 'd', null]  var e1 = ['a', 'b', 'c', 'd']  var e2 = ['a', 'b', 'c', 'd']  t.plan(2 + e1.length + e2.length)  var source = from(function (size, next) {    setImmediate(next, null, chunks.shift())  })  var instance = cloneable(source)  var clone = instance.clone()  clone.on('finish', t.pass.bind(null, 'clone emits finish'))  instance.on('finish', t.pass.bind(null, 'main emits finish'))  instance.pipe(sink(function (chunk, enc, cb) {    t.equal(chunk.toString(), e1.shift(), 'chunk matches')    cb()  }))  clone.on('data', function (chunk) {    t.equal(chunk.toString(), e2.shift(), 'chunk matches')  })})test('clone async w resume', function (t) {  t.plan(4)  var read = false  var source = from(function (size, next) {    if (read) {      this.push(null)    } else {      read = true      this.push('hello world')    }    next()  })  var instance = cloneable(source)  t.notOk(read, 'stream not started')  var cloned = instance.clone()  t.notOk(read, 'stream not started')  instance.on('end', t.pass.bind(null, 'end emitted'))  instance.resume()  setImmediate(function () {    cloned.on('end', t.pass.bind(null, 'end emitted'))    cloned.resume()  })})test('big file', function (t) {  t.plan(13)  var stream = cloneable(fs.createReadStream(path.join(__dirname, 'big')))  var hash = crypto.createHash('sha1')  hash.setEncoding('hex')  var toCheck  fs.createReadStream(path.join(__dirname, 'big'))    .pipe(hash)    .once('readable', function () {      toCheck = hash.read()      t.ok(toCheck)    })  function pipe (s, num) {    s.on('end', function () {      t.pass('end for ' + num)    })    var dest = path.join(__dirname, 'out')    s.pipe(fs.createWriteStream(dest))      .on('finish', function () {        t.pass('finish for ' + num)        var destHash = crypto.createHash('sha1')        destHash.setEncoding('hex')        fs.createReadStream(dest)          .pipe(destHash)          .once('readable', function () {            var hash = destHash.read()            t.ok(hash)            t.equal(hash, toCheck)          })      })  }  // Pipe in another event loop tick <-- this one finished only, it's the original cloneable.  setImmediate(pipe.bind(null, stream, 1))  // Pipe in the same event loop tick  pipe(stream.clone(), 0)  // Pipe a long time after  setTimeout(pipe.bind(null, stream.clone(), 2), 1000)})test('pump error', function (t) {  t.plan(1)  var err = new Error('kaboom')  pump([    cloneable(from(function () {      this.destroy(err)    })),    sink(function (chunk, enc, cb) {      t.fail('this should not be called')    })  ], function (_err) {    t.equal(_err, err)  })})
 |