throttle.js 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. var inherits = require('util').inherits;
  2. var Transform = require('stream').Transform;
  3. var TokenBucket = require('limiter').TokenBucket;
  4. /*
  5. * Throttle is a throttled stream implementing the stream.Transform interface.
  6. * Options:
  7. * rate (mandatory): the throttling rate in bytes per second.
  8. * chunksize (optional): the maximum chunk size into which larger writes are decomposed.
  9. * Any other options are passed to stream.Transform.
  10. */
  11. function Throttle(opts, group) {
  12. if (group === undefined)
  13. group = new ThrottleGroup(opts);
  14. this.bucket = group.bucket;
  15. this.chunksize = group.chunksize;
  16. Transform.call(this, opts);
  17. }
  18. inherits(Throttle, Transform);
  19. Throttle.prototype._transform = function(chunk, encoding, done) {
  20. process(this, chunk, 0, done);
  21. };
  22. function process(self, chunk, pos, done) {
  23. var slice = chunk.slice(pos, pos + self.chunksize);
  24. if (!slice.length) {
  25. // chunk fully consumed
  26. done();
  27. return;
  28. }
  29. self.bucket.removeTokens(slice.length, function(err) {
  30. if (err) {
  31. done(err);
  32. return;
  33. }
  34. self.push(slice);
  35. process(self, chunk, pos + self.chunksize, done);
  36. });
  37. }
  38. /*
  39. * ThrottleGroup throttles an aggregate of streams.
  40. * Options are the same as for Throttle.
  41. */
  42. function ThrottleGroup(opts) {
  43. if (!(this instanceof ThrottleGroup))
  44. return new ThrottleGroup(opts);
  45. opts = opts || {};
  46. if (opts.rate === undefined)
  47. throw new Error('throttle rate is a required argument');
  48. if (typeof opts.rate !== 'number' || opts.rate <= 0)
  49. throw new Error('throttle rate must be a positive number');
  50. if (opts.chunksize !== undefined && (typeof opts.chunksize !== 'number' || opts.chunksize <= 0)) {
  51. throw new Error('throttle chunk size must be a positive number');
  52. }
  53. this.rate = opts.rate;
  54. this.chunksize = opts.chunksize || this.rate/10;
  55. this.bucket = new TokenBucket(this.rate, this.rate, 'second', null);
  56. }
  57. /*
  58. * Create a new stream in the throttled group and returns it.
  59. * Any supplied options are passed to the Throttle constructor.
  60. */
  61. ThrottleGroup.prototype.throttle = function(opts) {
  62. return new Throttle(opts, this);
  63. };
  64. module.exports = {
  65. Throttle: Throttle,
  66. ThrottleGroup: ThrottleGroup
  67. };