test.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. var test = require('tape');
  2. var exhaust = require('./index.js');
  3. var Stream = require('stream');
  4. var Readable = Stream.Readable;
  5. var Writable = Stream.Writable;
  6. var Duplex = Stream.Duplex;
  7. var through = require('through2');
  8. var S2Readable = require('readable-stream').Readable;
  9. test('it should cause a Readable stream to complete if it\'s not piped anywhere', function(assert) {
  10. var rs = new Readable({highWaterMark: 2});
  11. var a = 0;
  12. var ended = false;
  13. rs._read = function() {
  14. if (a++ < 100) {
  15. rs.push(a + "");
  16. } else {
  17. ended = true;
  18. rs.push(null);
  19. }
  20. };
  21. rs.on("end", function() {
  22. assert.ok(a > 99, 'a should be > 99');
  23. assert.ok(ended, 'it should end');
  24. assert.end();
  25. });
  26. exhaust(rs);
  27. });
  28. test('should work with Readable streams in objectMode', function(assert) {
  29. var rs = new Readable({highWaterMark: 2, objectMode: true});
  30. var a = 0;
  31. var ended = false;
  32. rs._read = function() {
  33. if (a++ < 100) {
  34. rs.push(a);
  35. } else {
  36. ended = true;
  37. rs.push(null);
  38. }
  39. };
  40. rs.on("end", function() {
  41. assert.ok(a > 99, 'a > 99');
  42. assert.ok(ended, 'ended is true');
  43. assert.end();
  44. });
  45. exhaust(rs);
  46. });
  47. test('should not interfere with a Readable stream that is piped somewhere', function(assert) {
  48. var rs = new Readable({highWaterMark: 2});
  49. var a = 0;
  50. var ended = false;
  51. rs._read = function() {
  52. if (a++ < 100) {
  53. rs.push(".");
  54. } else {
  55. ended = true;
  56. rs.push(null);
  57. }
  58. };
  59. var sizeRead = 0;
  60. var ws = new Writable({highWaterMark: 2});
  61. ws._write = function(chunk, enc, next) {
  62. sizeRead += chunk.length;
  63. next();
  64. }
  65. ws.on("finish", function() {
  66. assert.ok(a > 99, 'a > 99');
  67. assert.ok(ended, 'ended is true');
  68. assert.equal(sizeRead, 100, 'sizeRead === 100');
  69. assert.end();
  70. });
  71. rs.pipe(ws);
  72. exhaust(rs);
  73. });
  74. test('should not interfere with a Writable stream', function(assert) {
  75. var rs = new Readable({highWaterMark: 2});
  76. var a = 0;
  77. var ended = false;
  78. rs._read = function() {
  79. if (a++ < 100) {
  80. rs.push(".");
  81. } else {
  82. ended = true;
  83. rs.push(null);
  84. }
  85. };
  86. var sizeRead = 0;
  87. var ws = new Writable({highWaterMark: 2});
  88. ws._write = function(chunk, enc, next) {
  89. sizeRead += chunk.length;
  90. next();
  91. }
  92. ws.on("finish", function() {
  93. assert.ok(a > 99, 'a > 99');
  94. assert.ok(ended, 'ended is true');
  95. assert.equal(sizeRead, 100, 'sizeRead === 100');
  96. assert.end();
  97. });
  98. rs.pipe(ws);
  99. exhaust(ws);
  100. });
  101. test('should handle a Transform stream', function(assert) {
  102. var rs = new Readable({highWaterMark: 2});
  103. var a = 0;
  104. var ended = false;
  105. rs._read = function() {
  106. if (a++ < 100) {
  107. rs.push(".");
  108. } else {
  109. ended = true;
  110. rs.push(null);
  111. }
  112. };
  113. var sizeRead = 0;
  114. var flushed = false;
  115. var ts = through({highWaterMark: 2}, function(chunk, enc, cb) {
  116. sizeRead += chunk.length;
  117. this.push(chunk);
  118. cb();
  119. }, function(cb) {
  120. flushed = true;
  121. cb();
  122. });
  123. ts.on("end", function() {
  124. assert.ok(a > 99, 'a > 99');
  125. assert.ok(ended, 'ended is true');
  126. assert.equal(sizeRead, 100, 'sizeRead === 100');
  127. assert.ok(flushed, 'flushed is true');
  128. assert.end();
  129. });
  130. rs.pipe(ts);
  131. exhaust(ts);
  132. });
  133. test('should handle a classic stream', function(assert) {
  134. var rs = new Stream();
  135. var ended = false;
  136. var i;
  137. rs.on("end", function() {
  138. assert.ok(ended, 'ended is true');
  139. assert.end();
  140. });
  141. exhaust(rs);
  142. for (i = 0; i < 100; i++) {
  143. rs.emit("data", i);
  144. }
  145. ended = true;
  146. rs.emit("end");
  147. });
  148. test('should not modify .pipe', function(assert) {
  149. var stream = new S2Readable;
  150. var pipe = stream.pipe;
  151. stream._read = function() {
  152. stream.push('ending');
  153. stream.push(null);
  154. };
  155. exhaust(stream);
  156. assert.equal(stream.pipe, pipe);
  157. assert.end();
  158. });
  159. test('does not error on no resume but readable set to true', function(assert) {
  160. var rs = new Stream();
  161. rs.readable = true;
  162. var ended = false;
  163. var i;
  164. rs.on("end", function() {
  165. assert.ok(ended, 'ended is true');
  166. assert.end();
  167. });
  168. exhaust(rs);
  169. for (i = 0; i < 100; i++) {
  170. rs.emit("data", i);
  171. }
  172. ended = true;
  173. rs.emit("end");
  174. });