rx.backpressure.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. // Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
  2. ;(function (factory) {
  3. var objectTypes = {
  4. 'function': true,
  5. 'object': true
  6. };
  7. function checkGlobal(value) {
  8. return (value && value.Object === Object) ? value : null;
  9. }
  10. var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
  11. var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
  12. var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
  13. var freeSelf = checkGlobal(objectTypes[typeof self] && self);
  14. var freeWindow = checkGlobal(objectTypes[typeof window] && window);
  15. var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
  16. var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
  17. var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
  18. // Because of build optimizers
  19. if (typeof define === 'function' && define.amd) {
  20. define(['./rx'], function (Rx, exports) {
  21. return factory(root, exports, Rx);
  22. });
  23. } else if (typeof module === 'object' && module && module.exports === freeExports) {
  24. module.exports = factory(root, module.exports, require('./rx'));
  25. } else {
  26. root.Rx = factory(root, {}, root.Rx);
  27. }
  28. }.call(this, function (root, exp, Rx, undefined) {
  29. // References
  30. var Observable = Rx.Observable,
  31. observableProto = Observable.prototype,
  32. AnonymousObservable = Rx.AnonymousObservable,
  33. AbstractObserver = Rx.internals.AbstractObserver,
  34. CompositeDisposable = Rx.CompositeDisposable,
  35. BinaryDisposable = Rx.BinaryDisposable,
  36. NAryDisposable = Rx.NAryDisposable,
  37. Notification = Rx.Notification,
  38. Subject = Rx.Subject,
  39. Observer = Rx.Observer,
  40. disposableEmpty = Rx.Disposable.empty,
  41. disposableCreate = Rx.Disposable.create,
  42. inherits = Rx.internals.inherits,
  43. addProperties = Rx.internals.addProperties,
  44. defaultScheduler = Rx.Scheduler['default'],
  45. currentThreadScheduler = Rx.Scheduler.currentThread,
  46. identity = Rx.helpers.identity,
  47. isScheduler = Rx.Scheduler.isScheduler,
  48. isFunction = Rx.helpers.isFunction,
  49. checkDisposed = Rx.Disposable.checkDisposed;
  50. var errorObj = {e: {}};
  51. function tryCatcherGen(tryCatchTarget) {
  52. return function tryCatcher() {
  53. try {
  54. return tryCatchTarget.apply(this, arguments);
  55. } catch (e) {
  56. errorObj.e = e;
  57. return errorObj;
  58. }
  59. };
  60. }
  61. var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
  62. if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
  63. return tryCatcherGen(fn);
  64. };
  65. function thrower(e) {
  66. throw e;
  67. }
  68. /**
  69. * Used to pause and resume streams.
  70. */
  71. Rx.Pauser = (function (__super__) {
  72. inherits(Pauser, __super__);
  73. function Pauser() {
  74. __super__.call(this);
  75. }
  76. /**
  77. * Pauses the underlying sequence.
  78. */
  79. Pauser.prototype.pause = function () { this.onNext(false); };
  80. /**
  81. * Resumes the underlying sequence.
  82. */
  83. Pauser.prototype.resume = function () { this.onNext(true); };
  84. return Pauser;
  85. }(Subject));
  86. var PausableObservable = (function (__super__) {
  87. inherits(PausableObservable, __super__);
  88. function PausableObservable(source, pauser) {
  89. this.source = source;
  90. this.controller = new Subject();
  91. this.paused = true;
  92. if (pauser && pauser.subscribe) {
  93. this.pauser = this.controller.merge(pauser);
  94. } else {
  95. this.pauser = this.controller;
  96. }
  97. __super__.call(this);
  98. }
  99. PausableObservable.prototype._subscribe = function (o) {
  100. var conn = this.source.publish(),
  101. subscription = conn.subscribe(o),
  102. connection = disposableEmpty;
  103. var pausable = this.pauser.startWith(!this.paused).distinctUntilChanged().subscribe(function (b) {
  104. if (b) {
  105. connection = conn.connect();
  106. } else {
  107. connection.dispose();
  108. connection = disposableEmpty;
  109. }
  110. });
  111. return new NAryDisposable([subscription, connection, pausable]);
  112. };
  113. PausableObservable.prototype.pause = function () {
  114. this.paused = true;
  115. this.controller.onNext(false);
  116. };
  117. PausableObservable.prototype.resume = function () {
  118. this.paused = false;
  119. this.controller.onNext(true);
  120. };
  121. return PausableObservable;
  122. }(Observable));
  123. /**
  124. * Pauses the underlying observable sequence based upon the observable sequence which yields true/false.
  125. * @example
  126. * var pauser = new Rx.Subject();
  127. * var source = Rx.Observable.interval(100).pausable(pauser);
  128. * @param {Observable} pauser The observable sequence used to pause the underlying sequence.
  129. * @returns {Observable} The observable sequence which is paused based upon the pauser.
  130. */
  131. observableProto.pausable = function (pauser) {
  132. return new PausableObservable(this, pauser);
  133. };
  134. function combineLatestSource(source, subject, resultSelector) {
  135. return new AnonymousObservable(function (o) {
  136. var hasValue = [false, false],
  137. hasValueAll = false,
  138. isDone = false,
  139. values = new Array(2),
  140. err;
  141. function next(x, i) {
  142. values[i] = x;
  143. hasValue[i] = true;
  144. if (hasValueAll || (hasValueAll = hasValue.every(identity))) {
  145. if (err) { return o.onError(err); }
  146. var res = tryCatch(resultSelector).apply(null, values);
  147. if (res === errorObj) { return o.onError(res.e); }
  148. o.onNext(res);
  149. }
  150. isDone && values[1] && o.onCompleted();
  151. }
  152. return new BinaryDisposable(
  153. source.subscribe(
  154. function (x) {
  155. next(x, 0);
  156. },
  157. function (e) {
  158. if (values[1]) {
  159. o.onError(e);
  160. } else {
  161. err = e;
  162. }
  163. },
  164. function () {
  165. isDone = true;
  166. values[1] && o.onCompleted();
  167. }),
  168. subject.subscribe(
  169. function (x) {
  170. next(x, 1);
  171. },
  172. function (e) { o.onError(e); },
  173. function () {
  174. isDone = true;
  175. next(true, 1);
  176. })
  177. );
  178. }, source);
  179. }
  180. var PausableBufferedObservable = (function (__super__) {
  181. inherits(PausableBufferedObservable, __super__);
  182. function PausableBufferedObservable(source, pauser) {
  183. this.source = source;
  184. this.controller = new Subject();
  185. this.paused = true;
  186. if (pauser && pauser.subscribe) {
  187. this.pauser = this.controller.merge(pauser);
  188. } else {
  189. this.pauser = this.controller;
  190. }
  191. __super__.call(this);
  192. }
  193. PausableBufferedObservable.prototype._subscribe = function (o) {
  194. var q = [], previousShouldFire;
  195. function drainQueue() { while (q.length > 0) { o.onNext(q.shift()); } }
  196. var subscription =
  197. combineLatestSource(
  198. this.source,
  199. this.pauser.startWith(!this.paused).distinctUntilChanged(),
  200. function (data, shouldFire) {
  201. return { data: data, shouldFire: shouldFire };
  202. })
  203. .subscribe(
  204. function (results) {
  205. if (previousShouldFire !== undefined && results.shouldFire !== previousShouldFire) {
  206. previousShouldFire = results.shouldFire;
  207. // change in shouldFire
  208. if (results.shouldFire) { drainQueue(); }
  209. } else {
  210. previousShouldFire = results.shouldFire;
  211. // new data
  212. if (results.shouldFire) {
  213. o.onNext(results.data);
  214. } else {
  215. q.push(results.data);
  216. }
  217. }
  218. },
  219. function (err) {
  220. drainQueue();
  221. o.onError(err);
  222. },
  223. function () {
  224. drainQueue();
  225. o.onCompleted();
  226. }
  227. );
  228. return subscription;
  229. };
  230. PausableBufferedObservable.prototype.pause = function () {
  231. this.paused = true;
  232. this.controller.onNext(false);
  233. };
  234. PausableBufferedObservable.prototype.resume = function () {
  235. this.paused = false;
  236. this.controller.onNext(true);
  237. };
  238. return PausableBufferedObservable;
  239. }(Observable));
  240. /**
  241. * Pauses the underlying observable sequence based upon the observable sequence which yields true/false,
  242. * and yields the values that were buffered while paused.
  243. * @example
  244. * var pauser = new Rx.Subject();
  245. * var source = Rx.Observable.interval(100).pausableBuffered(pauser);
  246. * @param {Observable} pauser The observable sequence used to pause the underlying sequence.
  247. * @returns {Observable} The observable sequence which is paused based upon the pauser.
  248. */
  249. observableProto.pausableBuffered = function (pauser) {
  250. return new PausableBufferedObservable(this, pauser);
  251. };
  252. var ControlledObservable = (function (__super__) {
  253. inherits(ControlledObservable, __super__);
  254. function ControlledObservable (source, enableQueue, scheduler) {
  255. __super__.call(this);
  256. this.subject = new ControlledSubject(enableQueue, scheduler);
  257. this.source = source.multicast(this.subject).refCount();
  258. }
  259. ControlledObservable.prototype._subscribe = function (o) {
  260. return this.source.subscribe(o);
  261. };
  262. ControlledObservable.prototype.request = function (numberOfItems) {
  263. return this.subject.request(numberOfItems == null ? -1 : numberOfItems);
  264. };
  265. return ControlledObservable;
  266. }(Observable));
  267. var ControlledSubject = (function (__super__) {
  268. inherits(ControlledSubject, __super__);
  269. function ControlledSubject(enableQueue, scheduler) {
  270. enableQueue == null && (enableQueue = true);
  271. __super__.call(this);
  272. this.subject = new Subject();
  273. this.enableQueue = enableQueue;
  274. this.queue = enableQueue ? [] : null;
  275. this.requestedCount = 0;
  276. this.requestedDisposable = null;
  277. this.error = null;
  278. this.hasFailed = false;
  279. this.hasCompleted = false;
  280. this.scheduler = scheduler || currentThreadScheduler;
  281. }
  282. addProperties(ControlledSubject.prototype, Observer, {
  283. _subscribe: function (o) {
  284. return this.subject.subscribe(o);
  285. },
  286. onCompleted: function () {
  287. this.hasCompleted = true;
  288. if (!this.enableQueue || this.queue.length === 0) {
  289. this.subject.onCompleted();
  290. this.disposeCurrentRequest();
  291. } else {
  292. this.queue.push(Notification.createOnCompleted());
  293. }
  294. },
  295. onError: function (error) {
  296. this.hasFailed = true;
  297. this.error = error;
  298. if (!this.enableQueue || this.queue.length === 0) {
  299. this.subject.onError(error);
  300. this.disposeCurrentRequest();
  301. } else {
  302. this.queue.push(Notification.createOnError(error));
  303. }
  304. },
  305. onNext: function (value) {
  306. if (this.requestedCount <= 0) {
  307. this.enableQueue && this.queue.push(Notification.createOnNext(value));
  308. } else {
  309. (this.requestedCount-- === 0) && this.disposeCurrentRequest();
  310. this.subject.onNext(value);
  311. }
  312. },
  313. _processRequest: function (numberOfItems) {
  314. if (this.enableQueue) {
  315. while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) {
  316. var first = this.queue.shift();
  317. first.accept(this.subject);
  318. if (first.kind === 'N') {
  319. numberOfItems--;
  320. } else {
  321. this.disposeCurrentRequest();
  322. this.queue = [];
  323. }
  324. }
  325. }
  326. return numberOfItems;
  327. },
  328. request: function (number) {
  329. this.disposeCurrentRequest();
  330. var self = this;
  331. this.requestedDisposable = this.scheduler.schedule(number,
  332. function(s, i) {
  333. var remaining = self._processRequest(i);
  334. var stopped = self.hasCompleted || self.hasFailed;
  335. if (!stopped && remaining > 0) {
  336. self.requestedCount = remaining;
  337. return disposableCreate(function () {
  338. self.requestedCount = 0;
  339. });
  340. // Scheduled item is still in progress. Return a new
  341. // disposable to allow the request to be interrupted
  342. // via dispose.
  343. }
  344. });
  345. return this.requestedDisposable;
  346. },
  347. disposeCurrentRequest: function () {
  348. if (this.requestedDisposable) {
  349. this.requestedDisposable.dispose();
  350. this.requestedDisposable = null;
  351. }
  352. }
  353. });
  354. return ControlledSubject;
  355. }(Observable));
  356. /**
  357. * Attaches a controller to the observable sequence with the ability to queue.
  358. * @example
  359. * var source = Rx.Observable.interval(100).controlled();
  360. * source.request(3); // Reads 3 values
  361. * @param {bool} enableQueue truthy value to determine if values should be queued pending the next request
  362. * @param {Scheduler} scheduler determines how the requests will be scheduled
  363. * @returns {Observable} The observable sequence which only propagates values on request.
  364. */
  365. observableProto.controlled = function (enableQueue, scheduler) {
  366. if (enableQueue && isScheduler(enableQueue)) {
  367. scheduler = enableQueue;
  368. enableQueue = true;
  369. }
  370. if (enableQueue == null) { enableQueue = true; }
  371. return new ControlledObservable(this, enableQueue, scheduler);
  372. };
  373. var StopAndWaitObservable = (function (__super__) {
  374. inherits(StopAndWaitObservable, __super__);
  375. function StopAndWaitObservable (source) {
  376. __super__.call(this);
  377. this.source = source;
  378. }
  379. function scheduleMethod(s, self) {
  380. return self.source.request(1);
  381. }
  382. StopAndWaitObservable.prototype._subscribe = function (o) {
  383. this.subscription = this.source.subscribe(new StopAndWaitObserver(o, this, this.subscription));
  384. return new BinaryDisposable(
  385. this.subscription,
  386. defaultScheduler.schedule(this, scheduleMethod)
  387. );
  388. };
  389. var StopAndWaitObserver = (function (__sub__) {
  390. inherits(StopAndWaitObserver, __sub__);
  391. function StopAndWaitObserver (observer, observable, cancel) {
  392. __sub__.call(this);
  393. this.observer = observer;
  394. this.observable = observable;
  395. this.cancel = cancel;
  396. this.scheduleDisposable = null;
  397. }
  398. StopAndWaitObserver.prototype.completed = function () {
  399. this.observer.onCompleted();
  400. this.dispose();
  401. };
  402. StopAndWaitObserver.prototype.error = function (error) {
  403. this.observer.onError(error);
  404. this.dispose();
  405. };
  406. function innerScheduleMethod(s, self) {
  407. return self.observable.source.request(1);
  408. }
  409. StopAndWaitObserver.prototype.next = function (value) {
  410. this.observer.onNext(value);
  411. this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod);
  412. };
  413. StopAndWaitObserver.dispose = function () {
  414. this.observer = null;
  415. if (this.cancel) {
  416. this.cancel.dispose();
  417. this.cancel = null;
  418. }
  419. if (this.scheduleDisposable) {
  420. this.scheduleDisposable.dispose();
  421. this.scheduleDisposable = null;
  422. }
  423. __sub__.prototype.dispose.call(this);
  424. };
  425. return StopAndWaitObserver;
  426. }(AbstractObserver));
  427. return StopAndWaitObservable;
  428. }(Observable));
  429. /**
  430. * Attaches a stop and wait observable to the current observable.
  431. * @returns {Observable} A stop and wait observable.
  432. */
  433. ControlledObservable.prototype.stopAndWait = function () {
  434. return new StopAndWaitObservable(this);
  435. };
  436. var WindowedObservable = (function (__super__) {
  437. inherits(WindowedObservable, __super__);
  438. function WindowedObservable(source, windowSize) {
  439. __super__.call(this);
  440. this.source = source;
  441. this.windowSize = windowSize;
  442. }
  443. function scheduleMethod(s, self) {
  444. return self.source.request(self.windowSize);
  445. }
  446. WindowedObservable.prototype._subscribe = function (o) {
  447. this.subscription = this.source.subscribe(new WindowedObserver(o, this, this.subscription));
  448. return new BinaryDisposable(
  449. this.subscription,
  450. defaultScheduler.schedule(this, scheduleMethod)
  451. );
  452. };
  453. var WindowedObserver = (function (__sub__) {
  454. inherits(WindowedObserver, __sub__);
  455. function WindowedObserver(observer, observable, cancel) {
  456. this.observer = observer;
  457. this.observable = observable;
  458. this.cancel = cancel;
  459. this.received = 0;
  460. this.scheduleDisposable = null;
  461. __sub__.call(this);
  462. }
  463. WindowedObserver.prototype.completed = function () {
  464. this.observer.onCompleted();
  465. this.dispose();
  466. };
  467. WindowedObserver.prototype.error = function (error) {
  468. this.observer.onError(error);
  469. this.dispose();
  470. };
  471. function innerScheduleMethod(s, self) {
  472. return self.observable.source.request(self.observable.windowSize);
  473. }
  474. WindowedObserver.prototype.next = function (value) {
  475. this.observer.onNext(value);
  476. this.received = ++this.received % this.observable.windowSize;
  477. this.received === 0 && (this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod));
  478. };
  479. WindowedObserver.prototype.dispose = function () {
  480. this.observer = null;
  481. if (this.cancel) {
  482. this.cancel.dispose();
  483. this.cancel = null;
  484. }
  485. if (this.scheduleDisposable) {
  486. this.scheduleDisposable.dispose();
  487. this.scheduleDisposable = null;
  488. }
  489. __sub__.prototype.dispose.call(this);
  490. };
  491. return WindowedObserver;
  492. }(AbstractObserver));
  493. return WindowedObservable;
  494. }(Observable));
  495. /**
  496. * Creates a sliding windowed observable based upon the window size.
  497. * @param {Number} windowSize The number of items in the window
  498. * @returns {Observable} A windowed observable based upon the window size.
  499. */
  500. ControlledObservable.prototype.windowed = function (windowSize) {
  501. return new WindowedObservable(this, windowSize);
  502. };
  503. /**
  504. * Pipes the existing Observable sequence into a Node.js Stream.
  505. * @param {Stream} dest The destination Node.js stream.
  506. * @returns {Stream} The destination stream.
  507. */
  508. observableProto.pipe = function (dest) {
  509. var source = this.pausableBuffered();
  510. function onDrain() {
  511. source.resume();
  512. }
  513. dest.addListener('drain', onDrain);
  514. source.subscribe(
  515. function (x) {
  516. !dest.write(x) && source.pause();
  517. },
  518. function (err) {
  519. dest.emit('error', err);
  520. },
  521. function () {
  522. // Hack check because STDIO is not closable
  523. !dest._isStdio && dest.end();
  524. dest.removeListener('drain', onDrain);
  525. });
  526. source.resume();
  527. return dest;
  528. };
  529. return Rx;
  530. }));