| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616 | // Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.;(function (factory) {  var objectTypes = {    'function': true,    'object': true  };  function checkGlobal(value) {    return (value && value.Object === Object) ? value : null;  }  var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;  var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;  var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);  var freeSelf = checkGlobal(objectTypes[typeof self] && self);  var freeWindow = checkGlobal(objectTypes[typeof window] && window);  var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;  var thisGlobal = checkGlobal(objectTypes[typeof this] && this);  var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();  // Because of build optimizers  if (typeof define === 'function' && define.amd) {    define(['./rx'], function (Rx, exports) {      return factory(root, exports, Rx);    });  } else if (typeof module === 'object' && module && module.exports === freeExports) {    module.exports = factory(root, module.exports, require('./rx'));  } else {    root.Rx = factory(root, {}, root.Rx);  }}.call(this, function (root, exp, Rx, undefined) {  // References  var Observable = Rx.Observable,    observableProto = Observable.prototype,    AnonymousObservable = Rx.AnonymousObservable,    AbstractObserver = Rx.internals.AbstractObserver,    CompositeDisposable = Rx.CompositeDisposable,    BinaryDisposable = Rx.BinaryDisposable,    NAryDisposable = Rx.NAryDisposable,    Notification = Rx.Notification,    Subject = Rx.Subject,    Observer = Rx.Observer,    disposableEmpty = Rx.Disposable.empty,    disposableCreate = Rx.Disposable.create,    inherits = Rx.internals.inherits,    addProperties = Rx.internals.addProperties,    defaultScheduler = Rx.Scheduler['default'],    currentThreadScheduler = Rx.Scheduler.currentThread,    identity = Rx.helpers.identity,    isScheduler = Rx.Scheduler.isScheduler,    isFunction = Rx.helpers.isFunction,    checkDisposed = Rx.Disposable.checkDisposed;  var errorObj = {e: {}};    function tryCatcherGen(tryCatchTarget) {    return function tryCatcher() {      try {        return tryCatchTarget.apply(this, arguments);      } catch (e) {        errorObj.e = e;        return errorObj;      }    };  }  var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {    if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }    return tryCatcherGen(fn);  };  function thrower(e) {    throw e;  }  /**  * Used to pause and resume streams.  */  Rx.Pauser = (function (__super__) {    inherits(Pauser, __super__);    function Pauser() {      __super__.call(this);    }    /**     * Pauses the underlying sequence.     */    Pauser.prototype.pause = function () { this.onNext(false); };    /**    * Resumes the underlying sequence.    */    Pauser.prototype.resume = function () { this.onNext(true); };    return Pauser;  }(Subject));  var PausableObservable = (function (__super__) {    inherits(PausableObservable, __super__);    function PausableObservable(source, pauser) {      this.source = source;      this.controller = new Subject();      this.paused = true;      if (pauser && pauser.subscribe) {        this.pauser = this.controller.merge(pauser);      } else {        this.pauser = this.controller;      }      __super__.call(this);    }    PausableObservable.prototype._subscribe = function (o) {      var conn = this.source.publish(),        subscription = conn.subscribe(o),        connection = disposableEmpty;      var pausable = this.pauser.startWith(!this.paused).distinctUntilChanged().subscribe(function (b) {        if (b) {          connection = conn.connect();        } else {          connection.dispose();          connection = disposableEmpty;        }      });      return new NAryDisposable([subscription, connection, pausable]);    };    PausableObservable.prototype.pause = function () {      this.paused = true;      this.controller.onNext(false);    };    PausableObservable.prototype.resume = function () {      this.paused = false;      this.controller.onNext(true);    };    return PausableObservable;  }(Observable));  /**   * Pauses the underlying observable sequence based upon the observable sequence which yields true/false.   * @example   * var pauser = new Rx.Subject();   * var source = Rx.Observable.interval(100).pausable(pauser);   * @param {Observable} pauser The observable sequence used to pause the underlying sequence.   * @returns {Observable} The observable sequence which is paused based upon the pauser.   */  observableProto.pausable = function (pauser) {    return new PausableObservable(this, pauser);  };  function combineLatestSource(source, subject, resultSelector) {    return new AnonymousObservable(function (o) {      var hasValue = [false, false],        hasValueAll = false,        isDone = false,        values = new Array(2),        err;      function next(x, i) {        values[i] = x;        hasValue[i] = true;        if (hasValueAll || (hasValueAll = hasValue.every(identity))) {          if (err) { return o.onError(err); }          var res = tryCatch(resultSelector).apply(null, values);          if (res === errorObj) { return o.onError(res.e); }          o.onNext(res);        }        isDone && values[1] && o.onCompleted();      }      return new BinaryDisposable(        source.subscribe(          function (x) {            next(x, 0);          },          function (e) {            if (values[1]) {              o.onError(e);            } else {              err = e;            }          },          function () {            isDone = true;            values[1] && o.onCompleted();          }),        subject.subscribe(          function (x) {            next(x, 1);          },          function (e) { o.onError(e); },          function () {            isDone = true;            next(true, 1);          })        );    }, source);  }  var PausableBufferedObservable = (function (__super__) {    inherits(PausableBufferedObservable, __super__);    function PausableBufferedObservable(source, pauser) {      this.source = source;      this.controller = new Subject();      this.paused = true;      if (pauser && pauser.subscribe) {        this.pauser = this.controller.merge(pauser);      } else {        this.pauser = this.controller;      }      __super__.call(this);    }    PausableBufferedObservable.prototype._subscribe = function (o) {      var q = [], previousShouldFire;      function drainQueue() { while (q.length > 0) { o.onNext(q.shift()); } }      var subscription =        combineLatestSource(          this.source,          this.pauser.startWith(!this.paused).distinctUntilChanged(),          function (data, shouldFire) {            return { data: data, shouldFire: shouldFire };          })          .subscribe(            function (results) {              if (previousShouldFire !== undefined && results.shouldFire !== previousShouldFire) {                previousShouldFire = results.shouldFire;                // change in shouldFire                if (results.shouldFire) { drainQueue(); }              } else {                previousShouldFire = results.shouldFire;                // new data                if (results.shouldFire) {                  o.onNext(results.data);                } else {                  q.push(results.data);                }              }            },            function (err) {              drainQueue();              o.onError(err);            },            function () {              drainQueue();              o.onCompleted();            }          );      return subscription;          };    PausableBufferedObservable.prototype.pause = function () {      this.paused = true;      this.controller.onNext(false);    };    PausableBufferedObservable.prototype.resume = function () {      this.paused = false;      this.controller.onNext(true);    };    return PausableBufferedObservable;  }(Observable));  /**   * Pauses the underlying observable sequence based upon the observable sequence which yields true/false,   * and yields the values that were buffered while paused.   * @example   * var pauser = new Rx.Subject();   * var source = Rx.Observable.interval(100).pausableBuffered(pauser);   * @param {Observable} pauser The observable sequence used to pause the underlying sequence.   * @returns {Observable} The observable sequence which is paused based upon the pauser.   */  observableProto.pausableBuffered = function (pauser) {    return new PausableBufferedObservable(this, pauser);  };  var ControlledObservable = (function (__super__) {    inherits(ControlledObservable, __super__);    function ControlledObservable (source, enableQueue, scheduler) {      __super__.call(this);      this.subject = new ControlledSubject(enableQueue, scheduler);      this.source = source.multicast(this.subject).refCount();    }    ControlledObservable.prototype._subscribe = function (o) {      return this.source.subscribe(o);    };    ControlledObservable.prototype.request = function (numberOfItems) {      return this.subject.request(numberOfItems == null ? -1 : numberOfItems);    };    return ControlledObservable;  }(Observable));  var ControlledSubject = (function (__super__) {    inherits(ControlledSubject, __super__);    function ControlledSubject(enableQueue, scheduler) {      enableQueue == null && (enableQueue = true);      __super__.call(this);      this.subject = new Subject();      this.enableQueue = enableQueue;      this.queue = enableQueue ? [] : null;      this.requestedCount = 0;      this.requestedDisposable = null;      this.error = null;      this.hasFailed = false;      this.hasCompleted = false;      this.scheduler = scheduler || currentThreadScheduler;    }    addProperties(ControlledSubject.prototype, Observer, {      _subscribe: function (o) {        return this.subject.subscribe(o);      },      onCompleted: function () {        this.hasCompleted = true;        if (!this.enableQueue || this.queue.length === 0) {          this.subject.onCompleted();          this.disposeCurrentRequest();        } else {          this.queue.push(Notification.createOnCompleted());        }      },      onError: function (error) {        this.hasFailed = true;        this.error = error;        if (!this.enableQueue || this.queue.length === 0) {          this.subject.onError(error);          this.disposeCurrentRequest();        } else {          this.queue.push(Notification.createOnError(error));        }      },      onNext: function (value) {        if (this.requestedCount <= 0) {          this.enableQueue && this.queue.push(Notification.createOnNext(value));        } else {          (this.requestedCount-- === 0) && this.disposeCurrentRequest();          this.subject.onNext(value);        }      },      _processRequest: function (numberOfItems) {        if (this.enableQueue) {          while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) {            var first = this.queue.shift();            first.accept(this.subject);            if (first.kind === 'N') {              numberOfItems--;            } else {              this.disposeCurrentRequest();              this.queue = [];            }          }        }        return numberOfItems;      },      request: function (number) {        this.disposeCurrentRequest();        var self = this;        this.requestedDisposable = this.scheduler.schedule(number,        function(s, i) {          var remaining = self._processRequest(i);          var stopped = self.hasCompleted || self.hasFailed;          if (!stopped && remaining > 0) {            self.requestedCount = remaining;            return disposableCreate(function () {              self.requestedCount = 0;            });              // Scheduled item is still in progress. Return a new              // disposable to allow the request to be interrupted              // via dispose.          }        });        return this.requestedDisposable;      },      disposeCurrentRequest: function () {        if (this.requestedDisposable) {          this.requestedDisposable.dispose();          this.requestedDisposable = null;        }      }    });    return ControlledSubject;  }(Observable));  /**   * Attaches a controller to the observable sequence with the ability to queue.   * @example   * var source = Rx.Observable.interval(100).controlled();   * source.request(3); // Reads 3 values   * @param {bool} enableQueue truthy value to determine if values should be queued pending the next request   * @param {Scheduler} scheduler determines how the requests will be scheduled   * @returns {Observable} The observable sequence which only propagates values on request.   */  observableProto.controlled = function (enableQueue, scheduler) {    if (enableQueue && isScheduler(enableQueue)) {      scheduler = enableQueue;      enableQueue = true;    }    if (enableQueue == null) {  enableQueue = true; }    return new ControlledObservable(this, enableQueue, scheduler);  };  var StopAndWaitObservable = (function (__super__) {    inherits(StopAndWaitObservable, __super__);    function StopAndWaitObservable (source) {      __super__.call(this);      this.source = source;    }    function scheduleMethod(s, self) {      return self.source.request(1);    }    StopAndWaitObservable.prototype._subscribe = function (o) {      this.subscription = this.source.subscribe(new StopAndWaitObserver(o, this, this.subscription));      return new BinaryDisposable(        this.subscription,        defaultScheduler.schedule(this, scheduleMethod)      );    };    var StopAndWaitObserver = (function (__sub__) {      inherits(StopAndWaitObserver, __sub__);      function StopAndWaitObserver (observer, observable, cancel) {        __sub__.call(this);        this.observer = observer;        this.observable = observable;        this.cancel = cancel;        this.scheduleDisposable = null;      }      StopAndWaitObserver.prototype.completed = function () {        this.observer.onCompleted();        this.dispose();      };      StopAndWaitObserver.prototype.error = function (error) {        this.observer.onError(error);        this.dispose();      };      function innerScheduleMethod(s, self) {        return self.observable.source.request(1);      }      StopAndWaitObserver.prototype.next = function (value) {        this.observer.onNext(value);        this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod);      };      StopAndWaitObserver.dispose = function () {        this.observer = null;        if (this.cancel) {          this.cancel.dispose();          this.cancel = null;        }        if (this.scheduleDisposable) {          this.scheduleDisposable.dispose();          this.scheduleDisposable = null;        }        __sub__.prototype.dispose.call(this);      };      return StopAndWaitObserver;    }(AbstractObserver));    return StopAndWaitObservable;  }(Observable));  /**   * Attaches a stop and wait observable to the current observable.   * @returns {Observable} A stop and wait observable.   */  ControlledObservable.prototype.stopAndWait = function () {    return new StopAndWaitObservable(this);  };  var WindowedObservable = (function (__super__) {    inherits(WindowedObservable, __super__);    function WindowedObservable(source, windowSize) {      __super__.call(this);      this.source = source;      this.windowSize = windowSize;    }    function scheduleMethod(s, self) {      return self.source.request(self.windowSize);    }    WindowedObservable.prototype._subscribe = function (o) {      this.subscription = this.source.subscribe(new WindowedObserver(o, this, this.subscription));      return new BinaryDisposable(        this.subscription,        defaultScheduler.schedule(this, scheduleMethod)      );    };    var WindowedObserver = (function (__sub__) {      inherits(WindowedObserver, __sub__);      function WindowedObserver(observer, observable, cancel) {        this.observer = observer;        this.observable = observable;        this.cancel = cancel;        this.received = 0;        this.scheduleDisposable = null;        __sub__.call(this);      }      WindowedObserver.prototype.completed = function () {        this.observer.onCompleted();        this.dispose();      };      WindowedObserver.prototype.error = function (error) {        this.observer.onError(error);        this.dispose();      };      function innerScheduleMethod(s, self) {        return self.observable.source.request(self.observable.windowSize);      }      WindowedObserver.prototype.next = function (value) {        this.observer.onNext(value);        this.received = ++this.received % this.observable.windowSize;        this.received === 0 && (this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod));      };      WindowedObserver.prototype.dispose = function () {        this.observer = null;        if (this.cancel) {          this.cancel.dispose();          this.cancel = null;        }        if (this.scheduleDisposable) {          this.scheduleDisposable.dispose();          this.scheduleDisposable = null;        }        __sub__.prototype.dispose.call(this);      };      return WindowedObserver;    }(AbstractObserver));    return WindowedObservable;  }(Observable));  /**   * Creates a sliding windowed observable based upon the window size.   * @param {Number} windowSize The number of items in the window   * @returns {Observable} A windowed observable based upon the window size.   */  ControlledObservable.prototype.windowed = function (windowSize) {    return new WindowedObservable(this, windowSize);  };  /**   * Pipes the existing Observable sequence into a Node.js Stream.   * @param {Stream} dest The destination Node.js stream.   * @returns {Stream} The destination stream.   */  observableProto.pipe = function (dest) {    var source = this.pausableBuffered();    function onDrain() {      source.resume();    }    dest.addListener('drain', onDrain);    source.subscribe(      function (x) {        !dest.write(x) && source.pause();      },      function (err) {        dest.emit('error', err);      },      function () {        // Hack check because STDIO is not closable        !dest._isStdio && dest.end();        dest.removeListener('drain', onDrain);      });    source.resume();    return dest;  };  return Rx;}));
 |