index.js 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. 'use strict';
  2. var Writable = require('flush-write-stream');
  3. function listenerCount(stream, evt) {
  4. return stream.listeners(evt).length;
  5. }
  6. function hasListeners(stream) {
  7. return !!(listenerCount(stream, 'readable') || listenerCount(stream, 'data'));
  8. }
  9. function sinker(file, enc, callback) {
  10. callback();
  11. }
  12. function sink(stream) {
  13. var sinkAdded = false;
  14. var sinkOptions = {
  15. objectMode: stream._readableState.objectMode,
  16. };
  17. var sinkStream = new Writable(sinkOptions, sinker);
  18. function addSink() {
  19. if (sinkAdded) {
  20. return;
  21. }
  22. if (hasListeners(stream)) {
  23. return;
  24. }
  25. sinkAdded = true;
  26. stream.pipe(sinkStream);
  27. }
  28. function removeSink(evt) {
  29. if (evt !== 'readable' && evt !== 'data') {
  30. return;
  31. }
  32. if (hasListeners(stream)) {
  33. sinkAdded = false;
  34. stream.unpipe(sinkStream);
  35. }
  36. }
  37. stream.on('newListener', removeSink);
  38. stream.on('removeListener', removeSink);
  39. stream.on('removeListener', addSink);
  40. // Sink the stream to start flowing
  41. // Do this on nextTick, it will flow at slowest speed of piped streams
  42. process.nextTick(addSink);
  43. return stream;
  44. }
  45. module.exports = sink;