123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616 |
- // Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
- ;(function (factory) {
- var objectTypes = {
- 'function': true,
- 'object': true
- };
- function checkGlobal(value) {
- return (value && value.Object === Object) ? value : null;
- }
- var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
- var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
- var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
- var freeSelf = checkGlobal(objectTypes[typeof self] && self);
- var freeWindow = checkGlobal(objectTypes[typeof window] && window);
- var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
- var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
- var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
- // Because of build optimizers
- if (typeof define === 'function' && define.amd) {
- define(['./rx'], function (Rx, exports) {
- return factory(root, exports, Rx);
- });
- } else if (typeof module === 'object' && module && module.exports === freeExports) {
- module.exports = factory(root, module.exports, require('./rx'));
- } else {
- root.Rx = factory(root, {}, root.Rx);
- }
- }.call(this, function (root, exp, Rx, undefined) {
- // References
- var Observable = Rx.Observable,
- observableProto = Observable.prototype,
- AnonymousObservable = Rx.AnonymousObservable,
- AbstractObserver = Rx.internals.AbstractObserver,
- CompositeDisposable = Rx.CompositeDisposable,
- BinaryDisposable = Rx.BinaryDisposable,
- NAryDisposable = Rx.NAryDisposable,
- Notification = Rx.Notification,
- Subject = Rx.Subject,
- Observer = Rx.Observer,
- disposableEmpty = Rx.Disposable.empty,
- disposableCreate = Rx.Disposable.create,
- inherits = Rx.internals.inherits,
- addProperties = Rx.internals.addProperties,
- defaultScheduler = Rx.Scheduler['default'],
- currentThreadScheduler = Rx.Scheduler.currentThread,
- identity = Rx.helpers.identity,
- isScheduler = Rx.Scheduler.isScheduler,
- isFunction = Rx.helpers.isFunction,
- checkDisposed = Rx.Disposable.checkDisposed;
- var errorObj = {e: {}};
-
- function tryCatcherGen(tryCatchTarget) {
- return function tryCatcher() {
- try {
- return tryCatchTarget.apply(this, arguments);
- } catch (e) {
- errorObj.e = e;
- return errorObj;
- }
- };
- }
- var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
- if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
- return tryCatcherGen(fn);
- };
- function thrower(e) {
- throw e;
- }
- /**
- * Used to pause and resume streams.
- */
- Rx.Pauser = (function (__super__) {
- inherits(Pauser, __super__);
- function Pauser() {
- __super__.call(this);
- }
- /**
- * Pauses the underlying sequence.
- */
- Pauser.prototype.pause = function () { this.onNext(false); };
- /**
- * Resumes the underlying sequence.
- */
- Pauser.prototype.resume = function () { this.onNext(true); };
- return Pauser;
- }(Subject));
- var PausableObservable = (function (__super__) {
- inherits(PausableObservable, __super__);
- function PausableObservable(source, pauser) {
- this.source = source;
- this.controller = new Subject();
- this.paused = true;
- if (pauser && pauser.subscribe) {
- this.pauser = this.controller.merge(pauser);
- } else {
- this.pauser = this.controller;
- }
- __super__.call(this);
- }
- PausableObservable.prototype._subscribe = function (o) {
- var conn = this.source.publish(),
- subscription = conn.subscribe(o),
- connection = disposableEmpty;
- var pausable = this.pauser.startWith(!this.paused).distinctUntilChanged().subscribe(function (b) {
- if (b) {
- connection = conn.connect();
- } else {
- connection.dispose();
- connection = disposableEmpty;
- }
- });
- return new NAryDisposable([subscription, connection, pausable]);
- };
- PausableObservable.prototype.pause = function () {
- this.paused = true;
- this.controller.onNext(false);
- };
- PausableObservable.prototype.resume = function () {
- this.paused = false;
- this.controller.onNext(true);
- };
- return PausableObservable;
- }(Observable));
- /**
- * Pauses the underlying observable sequence based upon the observable sequence which yields true/false.
- * @example
- * var pauser = new Rx.Subject();
- * var source = Rx.Observable.interval(100).pausable(pauser);
- * @param {Observable} pauser The observable sequence used to pause the underlying sequence.
- * @returns {Observable} The observable sequence which is paused based upon the pauser.
- */
- observableProto.pausable = function (pauser) {
- return new PausableObservable(this, pauser);
- };
- function combineLatestSource(source, subject, resultSelector) {
- return new AnonymousObservable(function (o) {
- var hasValue = [false, false],
- hasValueAll = false,
- isDone = false,
- values = new Array(2),
- err;
- function next(x, i) {
- values[i] = x;
- hasValue[i] = true;
- if (hasValueAll || (hasValueAll = hasValue.every(identity))) {
- if (err) { return o.onError(err); }
- var res = tryCatch(resultSelector).apply(null, values);
- if (res === errorObj) { return o.onError(res.e); }
- o.onNext(res);
- }
- isDone && values[1] && o.onCompleted();
- }
- return new BinaryDisposable(
- source.subscribe(
- function (x) {
- next(x, 0);
- },
- function (e) {
- if (values[1]) {
- o.onError(e);
- } else {
- err = e;
- }
- },
- function () {
- isDone = true;
- values[1] && o.onCompleted();
- }),
- subject.subscribe(
- function (x) {
- next(x, 1);
- },
- function (e) { o.onError(e); },
- function () {
- isDone = true;
- next(true, 1);
- })
- );
- }, source);
- }
- var PausableBufferedObservable = (function (__super__) {
- inherits(PausableBufferedObservable, __super__);
- function PausableBufferedObservable(source, pauser) {
- this.source = source;
- this.controller = new Subject();
- this.paused = true;
- if (pauser && pauser.subscribe) {
- this.pauser = this.controller.merge(pauser);
- } else {
- this.pauser = this.controller;
- }
- __super__.call(this);
- }
- PausableBufferedObservable.prototype._subscribe = function (o) {
- var q = [], previousShouldFire;
- function drainQueue() { while (q.length > 0) { o.onNext(q.shift()); } }
- var subscription =
- combineLatestSource(
- this.source,
- this.pauser.startWith(!this.paused).distinctUntilChanged(),
- function (data, shouldFire) {
- return { data: data, shouldFire: shouldFire };
- })
- .subscribe(
- function (results) {
- if (previousShouldFire !== undefined && results.shouldFire !== previousShouldFire) {
- previousShouldFire = results.shouldFire;
- // change in shouldFire
- if (results.shouldFire) { drainQueue(); }
- } else {
- previousShouldFire = results.shouldFire;
- // new data
- if (results.shouldFire) {
- o.onNext(results.data);
- } else {
- q.push(results.data);
- }
- }
- },
- function (err) {
- drainQueue();
- o.onError(err);
- },
- function () {
- drainQueue();
- o.onCompleted();
- }
- );
- return subscription;
- };
- PausableBufferedObservable.prototype.pause = function () {
- this.paused = true;
- this.controller.onNext(false);
- };
- PausableBufferedObservable.prototype.resume = function () {
- this.paused = false;
- this.controller.onNext(true);
- };
- return PausableBufferedObservable;
- }(Observable));
- /**
- * Pauses the underlying observable sequence based upon the observable sequence which yields true/false,
- * and yields the values that were buffered while paused.
- * @example
- * var pauser = new Rx.Subject();
- * var source = Rx.Observable.interval(100).pausableBuffered(pauser);
- * @param {Observable} pauser The observable sequence used to pause the underlying sequence.
- * @returns {Observable} The observable sequence which is paused based upon the pauser.
- */
- observableProto.pausableBuffered = function (pauser) {
- return new PausableBufferedObservable(this, pauser);
- };
- var ControlledObservable = (function (__super__) {
- inherits(ControlledObservable, __super__);
- function ControlledObservable (source, enableQueue, scheduler) {
- __super__.call(this);
- this.subject = new ControlledSubject(enableQueue, scheduler);
- this.source = source.multicast(this.subject).refCount();
- }
- ControlledObservable.prototype._subscribe = function (o) {
- return this.source.subscribe(o);
- };
- ControlledObservable.prototype.request = function (numberOfItems) {
- return this.subject.request(numberOfItems == null ? -1 : numberOfItems);
- };
- return ControlledObservable;
- }(Observable));
- var ControlledSubject = (function (__super__) {
- inherits(ControlledSubject, __super__);
- function ControlledSubject(enableQueue, scheduler) {
- enableQueue == null && (enableQueue = true);
- __super__.call(this);
- this.subject = new Subject();
- this.enableQueue = enableQueue;
- this.queue = enableQueue ? [] : null;
- this.requestedCount = 0;
- this.requestedDisposable = null;
- this.error = null;
- this.hasFailed = false;
- this.hasCompleted = false;
- this.scheduler = scheduler || currentThreadScheduler;
- }
- addProperties(ControlledSubject.prototype, Observer, {
- _subscribe: function (o) {
- return this.subject.subscribe(o);
- },
- onCompleted: function () {
- this.hasCompleted = true;
- if (!this.enableQueue || this.queue.length === 0) {
- this.subject.onCompleted();
- this.disposeCurrentRequest();
- } else {
- this.queue.push(Notification.createOnCompleted());
- }
- },
- onError: function (error) {
- this.hasFailed = true;
- this.error = error;
- if (!this.enableQueue || this.queue.length === 0) {
- this.subject.onError(error);
- this.disposeCurrentRequest();
- } else {
- this.queue.push(Notification.createOnError(error));
- }
- },
- onNext: function (value) {
- if (this.requestedCount <= 0) {
- this.enableQueue && this.queue.push(Notification.createOnNext(value));
- } else {
- (this.requestedCount-- === 0) && this.disposeCurrentRequest();
- this.subject.onNext(value);
- }
- },
- _processRequest: function (numberOfItems) {
- if (this.enableQueue) {
- while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) {
- var first = this.queue.shift();
- first.accept(this.subject);
- if (first.kind === 'N') {
- numberOfItems--;
- } else {
- this.disposeCurrentRequest();
- this.queue = [];
- }
- }
- }
- return numberOfItems;
- },
- request: function (number) {
- this.disposeCurrentRequest();
- var self = this;
- this.requestedDisposable = this.scheduler.schedule(number,
- function(s, i) {
- var remaining = self._processRequest(i);
- var stopped = self.hasCompleted || self.hasFailed;
- if (!stopped && remaining > 0) {
- self.requestedCount = remaining;
- return disposableCreate(function () {
- self.requestedCount = 0;
- });
- // Scheduled item is still in progress. Return a new
- // disposable to allow the request to be interrupted
- // via dispose.
- }
- });
- return this.requestedDisposable;
- },
- disposeCurrentRequest: function () {
- if (this.requestedDisposable) {
- this.requestedDisposable.dispose();
- this.requestedDisposable = null;
- }
- }
- });
- return ControlledSubject;
- }(Observable));
- /**
- * Attaches a controller to the observable sequence with the ability to queue.
- * @example
- * var source = Rx.Observable.interval(100).controlled();
- * source.request(3); // Reads 3 values
- * @param {bool} enableQueue truthy value to determine if values should be queued pending the next request
- * @param {Scheduler} scheduler determines how the requests will be scheduled
- * @returns {Observable} The observable sequence which only propagates values on request.
- */
- observableProto.controlled = function (enableQueue, scheduler) {
- if (enableQueue && isScheduler(enableQueue)) {
- scheduler = enableQueue;
- enableQueue = true;
- }
- if (enableQueue == null) { enableQueue = true; }
- return new ControlledObservable(this, enableQueue, scheduler);
- };
- var StopAndWaitObservable = (function (__super__) {
- inherits(StopAndWaitObservable, __super__);
- function StopAndWaitObservable (source) {
- __super__.call(this);
- this.source = source;
- }
- function scheduleMethod(s, self) {
- return self.source.request(1);
- }
- StopAndWaitObservable.prototype._subscribe = function (o) {
- this.subscription = this.source.subscribe(new StopAndWaitObserver(o, this, this.subscription));
- return new BinaryDisposable(
- this.subscription,
- defaultScheduler.schedule(this, scheduleMethod)
- );
- };
- var StopAndWaitObserver = (function (__sub__) {
- inherits(StopAndWaitObserver, __sub__);
- function StopAndWaitObserver (observer, observable, cancel) {
- __sub__.call(this);
- this.observer = observer;
- this.observable = observable;
- this.cancel = cancel;
- this.scheduleDisposable = null;
- }
- StopAndWaitObserver.prototype.completed = function () {
- this.observer.onCompleted();
- this.dispose();
- };
- StopAndWaitObserver.prototype.error = function (error) {
- this.observer.onError(error);
- this.dispose();
- };
- function innerScheduleMethod(s, self) {
- return self.observable.source.request(1);
- }
- StopAndWaitObserver.prototype.next = function (value) {
- this.observer.onNext(value);
- this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod);
- };
- StopAndWaitObserver.dispose = function () {
- this.observer = null;
- if (this.cancel) {
- this.cancel.dispose();
- this.cancel = null;
- }
- if (this.scheduleDisposable) {
- this.scheduleDisposable.dispose();
- this.scheduleDisposable = null;
- }
- __sub__.prototype.dispose.call(this);
- };
- return StopAndWaitObserver;
- }(AbstractObserver));
- return StopAndWaitObservable;
- }(Observable));
- /**
- * Attaches a stop and wait observable to the current observable.
- * @returns {Observable} A stop and wait observable.
- */
- ControlledObservable.prototype.stopAndWait = function () {
- return new StopAndWaitObservable(this);
- };
- var WindowedObservable = (function (__super__) {
- inherits(WindowedObservable, __super__);
- function WindowedObservable(source, windowSize) {
- __super__.call(this);
- this.source = source;
- this.windowSize = windowSize;
- }
- function scheduleMethod(s, self) {
- return self.source.request(self.windowSize);
- }
- WindowedObservable.prototype._subscribe = function (o) {
- this.subscription = this.source.subscribe(new WindowedObserver(o, this, this.subscription));
- return new BinaryDisposable(
- this.subscription,
- defaultScheduler.schedule(this, scheduleMethod)
- );
- };
- var WindowedObserver = (function (__sub__) {
- inherits(WindowedObserver, __sub__);
- function WindowedObserver(observer, observable, cancel) {
- this.observer = observer;
- this.observable = observable;
- this.cancel = cancel;
- this.received = 0;
- this.scheduleDisposable = null;
- __sub__.call(this);
- }
- WindowedObserver.prototype.completed = function () {
- this.observer.onCompleted();
- this.dispose();
- };
- WindowedObserver.prototype.error = function (error) {
- this.observer.onError(error);
- this.dispose();
- };
- function innerScheduleMethod(s, self) {
- return self.observable.source.request(self.observable.windowSize);
- }
- WindowedObserver.prototype.next = function (value) {
- this.observer.onNext(value);
- this.received = ++this.received % this.observable.windowSize;
- this.received === 0 && (this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod));
- };
- WindowedObserver.prototype.dispose = function () {
- this.observer = null;
- if (this.cancel) {
- this.cancel.dispose();
- this.cancel = null;
- }
- if (this.scheduleDisposable) {
- this.scheduleDisposable.dispose();
- this.scheduleDisposable = null;
- }
- __sub__.prototype.dispose.call(this);
- };
- return WindowedObserver;
- }(AbstractObserver));
- return WindowedObservable;
- }(Observable));
- /**
- * Creates a sliding windowed observable based upon the window size.
- * @param {Number} windowSize The number of items in the window
- * @returns {Observable} A windowed observable based upon the window size.
- */
- ControlledObservable.prototype.windowed = function (windowSize) {
- return new WindowedObservable(this, windowSize);
- };
- /**
- * Pipes the existing Observable sequence into a Node.js Stream.
- * @param {Stream} dest The destination Node.js stream.
- * @returns {Stream} The destination stream.
- */
- observableProto.pipe = function (dest) {
- var source = this.pausableBuffered();
- function onDrain() {
- source.resume();
- }
- dest.addListener('drain', onDrain);
- source.subscribe(
- function (x) {
- !dest.write(x) && source.pause();
- },
- function (err) {
- dest.emit('error', err);
- },
- function () {
- // Hack check because STDIO is not closable
- !dest._isStdio && dest.end();
- dest.removeListener('drain', onDrain);
- });
- source.resume();
- return dest;
- };
- return Rx;
- }));
|