|
- // Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
- ;(function (factory) {
- var objectTypes = {
- 'function': true,
- 'object': true
- };
- function checkGlobal(value) {
- return (value && value.Object === Object) ? value : null;
- }
- var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
- var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
- var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
- var freeSelf = checkGlobal(objectTypes[typeof self] && self);
- var freeWindow = checkGlobal(objectTypes[typeof window] && window);
- var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
- var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
- var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
- // Because of build optimizers
- if (typeof define === 'function' && define.amd) {
- define(['./rx'], function (Rx, exports) {
- return factory(root, exports, Rx);
- });
- } else if (typeof module === 'object' && module && module.exports === freeExports) {
- module.exports = factory(root, module.exports, require('./rx'));
- } else {
- root.Rx = factory(root, {}, root.Rx);
- }
- }.call(this, function (root, exp, Rx, undefined) {
- // Aliases
- var Observable = Rx.Observable,
- observableProto = Observable.prototype,
- ObservableBase = Rx.ObservableBase,
- AbstractObserver = Rx.internals.AbstractObserver,
- FlatMapObservable = Rx.FlatMapObservable,
- observableConcat = Observable.concat,
- observableDefer = Observable.defer,
- observableEmpty = Observable.empty,
- disposableEmpty = Rx.Disposable.empty,
- CompositeDisposable = Rx.CompositeDisposable,
- SerialDisposable = Rx.SerialDisposable,
- SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
- Enumerable = Rx.internals.Enumerable,
- enumerableOf = Enumerable.of,
- currentThreadScheduler = Rx.Scheduler.currentThread,
- AsyncSubject = Rx.AsyncSubject,
- Observer = Rx.Observer,
- inherits = Rx.internals.inherits,
- addProperties = Rx.internals.addProperties,
- helpers = Rx.helpers,
- noop = helpers.noop,
- isPromise = helpers.isPromise,
- isFunction = helpers.isFunction,
- isIterable = Rx.helpers.isIterable,
- isArrayLike = Rx.helpers.isArrayLike,
- isScheduler = Rx.Scheduler.isScheduler,
- observableFromPromise = Observable.fromPromise;
- var errorObj = {e: {}};
-
- function tryCatcherGen(tryCatchTarget) {
- return function tryCatcher() {
- try {
- return tryCatchTarget.apply(this, arguments);
- } catch (e) {
- errorObj.e = e;
- return errorObj;
- }
- };
- }
- var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
- if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
- return tryCatcherGen(fn);
- };
- function thrower(e) {
- throw e;
- }
- // Shim in iterator support
- var $iterator$ = (typeof Symbol === 'function' && Symbol.iterator) ||
- '_es6shim_iterator_';
- // Bug for mozilla version
- if (root.Set && typeof new root.Set()['@@iterator'] === 'function') {
- $iterator$ = '@@iterator';
- }
- var doneEnumerator = Rx.doneEnumerator = { done: true, value: undefined };
- var isIterable = Rx.helpers.isIterable = function (o) {
- return o && o[$iterator$] !== undefined;
- };
- var isArrayLike = Rx.helpers.isArrayLike = function (o) {
- return o && o.length !== undefined;
- };
- Rx.helpers.iterator = $iterator$;
- var WhileEnumerable = (function(__super__) {
- inherits(WhileEnumerable, __super__);
- function WhileEnumerable(c, s) {
- this.c = c;
- this.s = s;
- }
- WhileEnumerable.prototype[$iterator$] = function () {
- var self = this;
- return {
- next: function () {
- return self.c() ?
- { done: false, value: self.s } :
- { done: true, value: void 0 };
- }
- };
- };
- return WhileEnumerable;
- }(Enumerable));
-
- function enumerableWhile(condition, source) {
- return new WhileEnumerable(condition, source);
- }
- /**
- * Returns an observable sequence that is the result of invoking the selector on the source sequence, without sharing subscriptions.
- * This operator allows for a fluent style of writing queries that use the same sequence multiple times.
- *
- * @param {Function} selector Selector function which can use the source sequence as many times as needed, without sharing subscriptions to the source sequence.
- * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
- */
- observableProto.letBind = observableProto['let'] = function (func) {
- return func(this);
- };
- /**
- * Determines whether an observable collection contains values.
- *
- * @example
- * 1 - res = Rx.Observable.if(condition, obs1);
- * 2 - res = Rx.Observable.if(condition, obs1, obs2);
- * 3 - res = Rx.Observable.if(condition, obs1, scheduler);
- * @param {Function} condition The condition which determines if the thenSource or elseSource will be run.
- * @param {Observable} thenSource The observable sequence or Promise that will be run if the condition function returns true.
- * @param {Observable} [elseSource] The observable sequence or Promise that will be run if the condition function returns false. If this is not provided, it defaults to Rx.Observabe.Empty with the specified scheduler.
- * @returns {Observable} An observable sequence which is either the thenSource or elseSource.
- */
- Observable['if'] = function (condition, thenSource, elseSourceOrScheduler) {
- return observableDefer(function () {
- elseSourceOrScheduler || (elseSourceOrScheduler = observableEmpty());
- isPromise(thenSource) && (thenSource = observableFromPromise(thenSource));
- isPromise(elseSourceOrScheduler) && (elseSourceOrScheduler = observableFromPromise(elseSourceOrScheduler));
- // Assume a scheduler for empty only
- typeof elseSourceOrScheduler.now === 'function' && (elseSourceOrScheduler = observableEmpty(elseSourceOrScheduler));
- return condition() ? thenSource : elseSourceOrScheduler;
- });
- };
- /**
- * Concatenates the observable sequences obtained by running the specified result selector for each element in source.
- * There is an alias for this method called 'forIn' for browsers <IE9
- * @param {Array} sources An array of values to turn into an observable sequence.
- * @param {Function} resultSelector A function to apply to each item in the sources array to turn it into an observable sequence.
- * @returns {Observable} An observable sequence from the concatenated observable sequences.
- */
- Observable['for'] = Observable.forIn = function (sources, resultSelector, thisArg) {
- return enumerableOf(sources, resultSelector, thisArg).concat();
- };
- /**
- * Repeats source as long as condition holds emulating a while loop.
- * There is an alias for this method called 'whileDo' for browsers <IE9
- *
- * @param {Function} condition The condition which determines if the source will be repeated.
- * @param {Observable} source The observable sequence that will be run if the condition function returns true.
- * @returns {Observable} An observable sequence which is repeated as long as the condition holds.
- */
- var observableWhileDo = Observable['while'] = Observable.whileDo = function (condition, source) {
- isPromise(source) && (source = observableFromPromise(source));
- return enumerableWhile(condition, source).concat();
- };
- /**
- * Repeats source as long as condition holds emulating a do while loop.
- *
- * @param {Function} condition The condition which determines if the source will be repeated.
- * @param {Observable} source The observable sequence that will be run if the condition function returns true.
- * @returns {Observable} An observable sequence which is repeated as long as the condition holds.
- */
- observableProto.doWhile = function (condition) {
- return observableConcat([this, observableWhileDo(condition, this)]);
- };
- /**
- * Uses selector to determine which source in sources to use.
- * @param {Function} selector The function which extracts the value for to test in a case statement.
- * @param {Array} sources A object which has keys which correspond to the case statement labels.
- * @param {Observable} [elseSource] The observable sequence or Promise that will be run if the sources are not matched. If this is not provided, it defaults to Rx.Observabe.empty with the specified scheduler.
- *
- * @returns {Observable} An observable sequence which is determined by a case statement.
- */
- Observable['case'] = function (selector, sources, defaultSourceOrScheduler) {
- return observableDefer(function () {
- isPromise(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableFromPromise(defaultSourceOrScheduler));
- defaultSourceOrScheduler || (defaultSourceOrScheduler = observableEmpty());
- isScheduler(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableEmpty(defaultSourceOrScheduler));
- var result = sources[selector()];
- isPromise(result) && (result = observableFromPromise(result));
- return result || defaultSourceOrScheduler;
- });
- };
- var ExpandObservable = (function(__super__) {
- inherits(ExpandObservable, __super__);
- function ExpandObservable(source, fn, scheduler) {
- this.source = source;
- this._fn = fn;
- this._scheduler = scheduler;
- __super__.call(this);
- }
- function scheduleRecursive(args, recurse) {
- var state = args[0], self = args[1];
- var work;
- if (state.q.length > 0) {
- work = state.q.shift();
- } else {
- state.isAcquired = false;
- return;
- }
- var m1 = new SingleAssignmentDisposable();
- state.d.add(m1);
- m1.setDisposable(work.subscribe(new ExpandObserver(state, self, m1)));
- recurse([state, self]);
- }
- ExpandObservable.prototype._ensureActive = function (state) {
- var isOwner = false;
- if (state.q.length > 0) {
- isOwner = !state.isAcquired;
- state.isAcquired = true;
- }
- isOwner && state.m.setDisposable(this._scheduler.scheduleRecursive([state, this], scheduleRecursive));
- };
- ExpandObservable.prototype.subscribeCore = function (o) {
- var m = new SerialDisposable(),
- d = new CompositeDisposable(m),
- state = {
- q: [],
- m: m,
- d: d,
- activeCount: 0,
- isAcquired: false,
- o: o
- };
- state.q.push(this.source);
- state.activeCount++;
- this._ensureActive(state);
- return d;
- };
- return ExpandObservable;
- }(ObservableBase));
- var ExpandObserver = (function(__super__) {
- inherits(ExpandObserver, __super__);
- function ExpandObserver(state, parent, m1) {
- this._s = state;
- this._p = parent;
- this._m1 = m1;
- __super__.call(this);
- }
- ExpandObserver.prototype.next = function (x) {
- this._s.o.onNext(x);
- var result = tryCatch(this._p._fn)(x);
- if (result === errorObj) { return this._s.o.onError(result.e); }
- this._s.q.push(result);
- this._s.activeCount++;
- this._p._ensureActive(this._s);
- };
- ExpandObserver.prototype.error = function (e) {
- this._s.o.onError(e);
- };
- ExpandObserver.prototype.completed = function () {
- this._s.d.remove(this._m1);
- this._s.activeCount--;
- this._s.activeCount === 0 && this._s.o.onCompleted();
- };
- return ExpandObserver;
- }(AbstractObserver));
- /**
- * Expands an observable sequence by recursively invoking selector.
- *
- * @param {Function} selector Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again.
- * @param {Scheduler} [scheduler] Scheduler on which to perform the expansion. If not provided, this defaults to the current thread scheduler.
- * @returns {Observable} An observable sequence containing all the elements produced by the recursive expansion.
- */
- observableProto.expand = function (selector, scheduler) {
- isScheduler(scheduler) || (scheduler = currentThreadScheduler);
- return new ExpandObservable(this, selector, scheduler);
- };
- function argumentsToArray() {
- var len = arguments.length, args = new Array(len);
- for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
- return args;
- }
- var ForkJoinObservable = (function (__super__) {
- inherits(ForkJoinObservable, __super__);
- function ForkJoinObservable(sources, cb) {
- this._sources = sources;
- this._cb = cb;
- __super__.call(this);
- }
- ForkJoinObservable.prototype.subscribeCore = function (o) {
- if (this._sources.length === 0) {
- o.onCompleted();
- return disposableEmpty;
- }
- var count = this._sources.length;
- var state = {
- finished: false,
- hasResults: new Array(count),
- hasCompleted: new Array(count),
- results: new Array(count)
- };
- var subscriptions = new CompositeDisposable();
- for (var i = 0, len = this._sources.length; i < len; i++) {
- var source = this._sources[i];
- isPromise(source) && (source = observableFromPromise(source));
- subscriptions.add(source.subscribe(new ForkJoinObserver(o, state, i, this._cb, subscriptions)));
- }
- return subscriptions;
- };
- return ForkJoinObservable;
- }(ObservableBase));
- var ForkJoinObserver = (function(__super__) {
- inherits(ForkJoinObserver, __super__);
- function ForkJoinObserver(o, s, i, cb, subs) {
- this._o = o;
- this._s = s;
- this._i = i;
- this._cb = cb;
- this._subs = subs;
- __super__.call(this);
- }
- ForkJoinObserver.prototype.next = function (x) {
- if (!this._s.finished) {
- this._s.hasResults[this._i] = true;
- this._s.results[this._i] = x;
- }
- };
- ForkJoinObserver.prototype.error = function (e) {
- this._s.finished = true;
- this._o.onError(e);
- this._subs.dispose();
- };
- ForkJoinObserver.prototype.completed = function () {
- if (!this._s.finished) {
- if (!this._s.hasResults[this._i]) {
- return this._o.onCompleted();
- }
- this._s.hasCompleted[this._i] = true;
- for (var i = 0; i < this._s.results.length; i++) {
- if (!this._s.hasCompleted[i]) { return; }
- }
- this._s.finished = true;
- var res = tryCatch(this._cb).apply(null, this._s.results);
- if (res === errorObj) { return this._o.onError(res.e); }
- this._o.onNext(res);
- this._o.onCompleted();
- }
- };
- return ForkJoinObserver;
- }(AbstractObserver));
- /**
- * Runs all observable sequences in parallel and collect their last elements.
- *
- * @example
- * 1 - res = Rx.Observable.forkJoin([obs1, obs2]);
- * 1 - res = Rx.Observable.forkJoin(obs1, obs2, ...);
- * @returns {Observable} An observable sequence with an array collecting the last elements of all the input sequences.
- */
- Observable.forkJoin = function () {
- var len = arguments.length, args = new Array(len);
- for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
- var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray;
- Array.isArray(args[0]) && (args = args[0]);
- return new ForkJoinObservable(args, resultSelector);
- };
- /**
- * Runs two observable sequences in parallel and combines their last elemenets.
- * @param {Observable} second Second observable sequence.
- * @param {Function} resultSelector Result selector function to invoke with the last elements of both sequences.
- * @returns {Observable} An observable sequence with the result of calling the selector function with the last elements of both input sequences.
- */
- observableProto.forkJoin = function () {
- var len = arguments.length, args = new Array(len);
- for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
- if (Array.isArray(args[0])) {
- args[0].unshift(this);
- } else {
- args.unshift(this);
- }
- return Observable.forkJoin.apply(null, args);
- };
- /**
- * Comonadic bind operator.
- * @param {Function} selector A transform function to apply to each element.
- * @param {Object} scheduler Scheduler used to execute the operation. If not specified, defaults to the ImmediateScheduler.
- * @returns {Observable} An observable sequence which results from the comonadic bind operation.
- */
- observableProto.manySelect = observableProto.extend = function (selector, scheduler) {
- isScheduler(scheduler) || (scheduler = Rx.Scheduler.immediate);
- var source = this;
- return observableDefer(function () {
- var chain;
- return source
- .map(function (x) {
- var curr = new ChainObservable(x);
- chain && chain.onNext(x);
- chain = curr;
- return curr;
- })
- .tap(
- noop,
- function (e) { chain && chain.onError(e); },
- function () { chain && chain.onCompleted(); }
- )
- .observeOn(scheduler)
- .map(selector);
- }, source);
- };
- var ChainObservable = (function (__super__) {
- inherits(ChainObservable, __super__);
- function ChainObservable(head) {
- __super__.call(this);
- this.head = head;
- this.tail = new AsyncSubject();
- }
- addProperties(ChainObservable.prototype, Observer, {
- _subscribe: function (o) {
- var g = new CompositeDisposable();
- g.add(currentThreadScheduler.schedule(this, function (_, self) {
- o.onNext(self.head);
- g.add(self.tail.mergeAll().subscribe(o));
- }));
- return g;
- },
- onCompleted: function () {
- this.onNext(Observable.empty());
- },
- onError: function (e) {
- this.onNext(Observable['throw'](e));
- },
- onNext: function (v) {
- this.tail.onNext(v);
- this.tail.onCompleted();
- }
- });
- return ChainObservable;
- }(Observable));
- var SwitchFirstObservable = (function (__super__) {
- inherits(SwitchFirstObservable, __super__);
- function SwitchFirstObservable(source) {
- this.source = source;
- __super__.call(this);
- }
- SwitchFirstObservable.prototype.subscribeCore = function (o) {
- var m = new SingleAssignmentDisposable(),
- g = new CompositeDisposable(),
- state = {
- hasCurrent: false,
- isStopped: false,
- o: o,
- g: g
- };
- g.add(m);
- m.setDisposable(this.source.subscribe(new SwitchFirstObserver(state)));
- return g;
- };
- return SwitchFirstObservable;
- }(ObservableBase));
- var SwitchFirstObserver = (function(__super__) {
- inherits(SwitchFirstObserver, __super__);
- function SwitchFirstObserver(state) {
- this._s = state;
- __super__.call(this);
- }
- SwitchFirstObserver.prototype.next = function (x) {
- if (!this._s.hasCurrent) {
- this._s.hasCurrent = true;
- isPromise(x) && (x = observableFromPromise(x));
- var inner = new SingleAssignmentDisposable();
- this._s.g.add(inner);
- inner.setDisposable(x.subscribe(new InnerObserver(this._s, inner)));
- }
- };
- SwitchFirstObserver.prototype.error = function (e) {
- this._s.o.onError(e);
- };
- SwitchFirstObserver.prototype.completed = function () {
- this._s.isStopped = true;
- !this._s.hasCurrent && this._s.g.length === 1 && this._s.o.onCompleted();
- };
- inherits(InnerObserver, __super__);
- function InnerObserver(state, inner) {
- this._s = state;
- this._i = inner;
- __super__.call(this);
- }
- InnerObserver.prototype.next = function (x) { this._s.o.onNext(x); };
- InnerObserver.prototype.error = function (e) { this._s.o.onError(e); };
- InnerObserver.prototype.completed = function () {
- this._s.g.remove(this._i);
- this._s.hasCurrent = false;
- this._s.isStopped && this._s.g.length === 1 && this._s.o.onCompleted();
- };
- return SwitchFirstObserver;
- }(AbstractObserver));
- /**
- * Performs a exclusive waiting for the first to finish before subscribing to another observable.
- * Observables that come in between subscriptions will be dropped on the floor.
- * @returns {Observable} A exclusive observable with only the results that happen when subscribed.
- */
- observableProto.switchFirst = function () {
- return new SwitchFirstObservable(this);
- };
- observableProto.flatMapFirst = observableProto.exhaustMap = function(selector, resultSelector, thisArg) {
- return new FlatMapObservable(this, selector, resultSelector, thisArg).switchFirst();
- };
- observableProto.flatMapWithMaxConcurrent = observableProto.flatMapMaxConcurrent = function(limit, selector, resultSelector, thisArg) {
- return new FlatMapObservable(this, selector, resultSelector, thisArg).merge(limit);
- };
- return Rx;
- }));
|