index.js 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. var Readable = require('readable-stream').Readable
  2. var shift = require('stream-shift')
  3. var stream2 = function (stream) {
  4. if (stream._readableState) return stream
  5. return new Readable({objectMode: true, highWaterMark: 16}).wrap(stream)
  6. }
  7. module.exports = function (stream) {
  8. stream = stream2(stream)
  9. var ended = false
  10. var data = null
  11. var err = null
  12. var destroyed = false
  13. var fn = null
  14. var consume = function (e) {
  15. if (e) {
  16. destroyed = true
  17. if (stream.destroy) stream.destroy(e)
  18. return
  19. }
  20. data = null
  21. err = null
  22. }
  23. var onresult = function () {
  24. if (!fn) return
  25. var tmp = fn
  26. fn = undefined
  27. tmp(err, data, consume)
  28. }
  29. var update = function () {
  30. if (!fn) return
  31. data = shift(stream)
  32. if (data === null && !ended) return
  33. onresult()
  34. }
  35. var onend = function () {
  36. ended = true
  37. onresult()
  38. }
  39. stream.on('readable', update)
  40. stream.on('error', function (e) {
  41. err = e
  42. onresult()
  43. })
  44. stream.on('close', function () {
  45. if (stream._readableState.ended) return
  46. onend()
  47. })
  48. stream.on('end', onend)
  49. return function (callback) {
  50. if (destroyed) return
  51. if (err) return callback(err, null, consume)
  52. if (data) return callback(null, data, consume)
  53. if (ended) return callback(null, null, consume)
  54. fn = callback
  55. update()
  56. }
  57. }