index.js 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. var Stream = require('stream')
  2. // through
  3. //
  4. // a stream that does nothing but re-emit the input.
  5. // useful for aggregating a series of changing but not ending streams into one stream)
  6. exports = module.exports = through
  7. through.through = through
  8. //create a readable writable stream.
  9. function through (write, end, opts) {
  10. write = write || function (data) { this.queue(data) }
  11. end = end || function () { this.queue(null) }
  12. var ended = false, destroyed = false, buffer = [], _ended = false
  13. var stream = new Stream()
  14. stream.readable = stream.writable = true
  15. stream.paused = false
  16. // stream.autoPause = !(opts && opts.autoPause === false)
  17. stream.autoDestroy = !(opts && opts.autoDestroy === false)
  18. stream.write = function (data) {
  19. write.call(this, data)
  20. return !stream.paused
  21. }
  22. function drain() {
  23. while(buffer.length && !stream.paused) {
  24. var data = buffer.shift()
  25. if(null === data)
  26. return stream.emit('end')
  27. else
  28. stream.emit('data', data)
  29. }
  30. }
  31. stream.queue = stream.push = function (data) {
  32. // console.error(ended)
  33. if(_ended) return stream
  34. if(data === null) _ended = true
  35. buffer.push(data)
  36. drain()
  37. return stream
  38. }
  39. //this will be registered as the first 'end' listener
  40. //must call destroy next tick, to make sure we're after any
  41. //stream piped from here.
  42. //this is only a problem if end is not emitted synchronously.
  43. //a nicer way to do this is to make sure this is the last listener for 'end'
  44. stream.on('end', function () {
  45. stream.readable = false
  46. if(!stream.writable && stream.autoDestroy)
  47. process.nextTick(function () {
  48. stream.destroy()
  49. })
  50. })
  51. function _end () {
  52. stream.writable = false
  53. end.call(stream)
  54. if(!stream.readable && stream.autoDestroy)
  55. stream.destroy()
  56. }
  57. stream.end = function (data) {
  58. if(ended) return
  59. ended = true
  60. if(arguments.length) stream.write(data)
  61. _end() // will emit or queue
  62. return stream
  63. }
  64. stream.destroy = function () {
  65. if(destroyed) return
  66. destroyed = true
  67. ended = true
  68. buffer.length = 0
  69. stream.writable = stream.readable = false
  70. stream.emit('close')
  71. return stream
  72. }
  73. stream.pause = function () {
  74. if(stream.paused) return
  75. stream.paused = true
  76. return stream
  77. }
  78. stream.resume = function () {
  79. if(stream.paused) {
  80. stream.paused = false
  81. stream.emit('resume')
  82. }
  83. drain()
  84. //may have become paused again,
  85. //as drain emits 'data'.
  86. if(!stream.paused)
  87. stream.emit('drain')
  88. return stream
  89. }
  90. return stream
  91. }