123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806 |
- // Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
- ;(function (factory) {
- var objectTypes = {
- 'function': true,
- 'object': true
- };
- function checkGlobal(value) {
- return (value && value.Object === Object) ? value : null;
- }
- var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
- var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
- var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
- var freeSelf = checkGlobal(objectTypes[typeof self] && self);
- var freeWindow = checkGlobal(objectTypes[typeof window] && window);
- var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
- var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
- var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
- // Because of build optimizers
- if (typeof define === 'function' && define.amd) {
- define(['./rx.lite'], function (Rx, exports) {
- return factory(root, exports, Rx);
- });
- } else if (typeof module === 'object' && module && module.exports === freeExports) {
- module.exports = factory(root, module.exports, require('rx-lite'));
- } else {
- root.Rx = factory(root, {}, root.Rx);
- }
- }.call(this, function (root, exp, Rx, undefined) {
- // References
- var Observable = Rx.Observable,
- observableProto = Observable.prototype,
- observableNever = Observable.never,
- observableThrow = Observable['throw'],
- AnonymousObservable = Rx.AnonymousObservable,
- ObservableBase = Rx.ObservableBase,
- AnonymousObserver = Rx.AnonymousObserver,
- notificationCreateOnNext = Rx.Notification.createOnNext,
- notificationCreateOnError = Rx.Notification.createOnError,
- notificationCreateOnCompleted = Rx.Notification.createOnCompleted,
- Observer = Rx.Observer,
- observerCreate = Observer.create,
- AbstractObserver = Rx.internals.AbstractObserver,
- Subject = Rx.Subject,
- internals = Rx.internals,
- helpers = Rx.helpers,
- ScheduledObserver = internals.ScheduledObserver,
- SerialDisposable = Rx.SerialDisposable,
- SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
- CompositeDisposable = Rx.CompositeDisposable,
- BinaryDisposable = Rx.BinaryDisposable,
- RefCountDisposable = Rx.RefCountDisposable,
- disposableEmpty = Rx.Disposable.empty,
- immediateScheduler = Rx.Scheduler.immediate,
- defaultKeySerializer = helpers.defaultKeySerializer,
- addRef = Rx.internals.addRef,
- identity = helpers.identity,
- isPromise = helpers.isPromise,
- isFunction = helpers.isFunction,
- inherits = internals.inherits,
- bindCallback = internals.bindCallback,
- noop = helpers.noop,
- isScheduler = Rx.Scheduler.isScheduler,
- observableFromPromise = Observable.fromPromise,
- ArgumentOutOfRangeError = Rx.ArgumentOutOfRangeError;
- var errorObj = {e: {}};
-
- function tryCatcherGen(tryCatchTarget) {
- return function tryCatcher() {
- try {
- return tryCatchTarget.apply(this, arguments);
- } catch (e) {
- errorObj.e = e;
- return errorObj;
- }
- };
- }
- var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
- if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
- return tryCatcherGen(fn);
- };
- function thrower(e) {
- throw e;
- }
- function ScheduledDisposable(scheduler, disposable) {
- this.scheduler = scheduler;
- this.disposable = disposable;
- this.isDisposed = false;
- }
- function scheduleItem(s, self) {
- if (!self.isDisposed) {
- self.isDisposed = true;
- self.disposable.dispose();
- }
- }
- ScheduledDisposable.prototype.dispose = function () {
- this.scheduler.schedule(this, scheduleItem);
- };
- var CheckedObserver = (function (__super__) {
- inherits(CheckedObserver, __super__);
- function CheckedObserver(observer) {
- __super__.call(this);
- this._observer = observer;
- this._state = 0; // 0 - idle, 1 - busy, 2 - done
- }
- var CheckedObserverPrototype = CheckedObserver.prototype;
- CheckedObserverPrototype.onNext = function (value) {
- this.checkAccess();
- var res = tryCatch(this._observer.onNext).call(this._observer, value);
- this._state = 0;
- res === errorObj && thrower(res.e);
- };
- CheckedObserverPrototype.onError = function (err) {
- this.checkAccess();
- var res = tryCatch(this._observer.onError).call(this._observer, err);
- this._state = 2;
- res === errorObj && thrower(res.e);
- };
- CheckedObserverPrototype.onCompleted = function () {
- this.checkAccess();
- var res = tryCatch(this._observer.onCompleted).call(this._observer);
- this._state = 2;
- res === errorObj && thrower(res.e);
- };
- CheckedObserverPrototype.checkAccess = function () {
- if (this._state === 1) { throw new Error('Re-entrancy detected'); }
- if (this._state === 2) { throw new Error('Observer completed'); }
- if (this._state === 0) { this._state = 1; }
- };
- return CheckedObserver;
- }(Observer));
- var ObserveOnObserver = (function (__super__) {
- inherits(ObserveOnObserver, __super__);
- function ObserveOnObserver(scheduler, observer, cancel) {
- __super__.call(this, scheduler, observer);
- this._cancel = cancel;
- }
- ObserveOnObserver.prototype.next = function (value) {
- __super__.prototype.next.call(this, value);
- this.ensureActive();
- };
- ObserveOnObserver.prototype.error = function (e) {
- __super__.prototype.error.call(this, e);
- this.ensureActive();
- };
- ObserveOnObserver.prototype.completed = function () {
- __super__.prototype.completed.call(this);
- this.ensureActive();
- };
- ObserveOnObserver.prototype.dispose = function () {
- __super__.prototype.dispose.call(this);
- this._cancel && this._cancel.dispose();
- this._cancel = null;
- };
- return ObserveOnObserver;
- })(ScheduledObserver);
- /**
- * Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods.
- * If a violation is detected, an Error is thrown from the offending observer method call.
- *
- * @returns An observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer.
- */
- Observer.prototype.checked = function () { return new CheckedObserver(this); };
- /**
- * Schedules the invocation of observer methods on the given scheduler.
- * @param {Scheduler} scheduler Scheduler to schedule observer messages on.
- * @returns {Observer} Observer whose messages are scheduled on the given scheduler.
- */
- Observer.notifyOn = function (scheduler) {
- return new ObserveOnObserver(scheduler, this);
- };
- /**
- * Creates an observer from a notification callback.
- * @param {Function} handler Action that handles a notification.
- * @returns The observer object that invokes the specified handler using a notification corresponding to each message it receives.
- */
- Observer.fromNotifier = function (handler, thisArg) {
- var handlerFunc = bindCallback(handler, thisArg, 1);
- return new AnonymousObserver(function (x) {
- return handlerFunc(notificationCreateOnNext(x));
- }, function (e) {
- return handlerFunc(notificationCreateOnError(e));
- }, function () {
- return handlerFunc(notificationCreateOnCompleted());
- });
- };
- /**
- * Creates a notification callback from an observer.
- * @returns The action that forwards its input notification to the underlying observer.
- */
- Observer.prototype.toNotifier = function () {
- var observer = this;
- return function (n) { return n.accept(observer); };
- };
- /**
- * Hides the identity of an observer.
- * @returns An observer that hides the identity of the specified observer.
- */
- Observer.prototype.asObserver = function () {
- var source = this;
- return new AnonymousObserver(
- function (x) { source.onNext(x); },
- function (e) { source.onError(e); },
- function () { source.onCompleted(); }
- );
- };
- var ObserveOnObservable = (function (__super__) {
- inherits(ObserveOnObservable, __super__);
- function ObserveOnObservable(source, s) {
- this.source = source;
- this._s = s;
- __super__.call(this);
- }
- ObserveOnObservable.prototype.subscribeCore = function (o) {
- return this.source.subscribe(new ObserveOnObserver(this._s, o));
- };
- return ObserveOnObservable;
- }(ObservableBase));
- /**
- * Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
- *
- * This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects
- * that require to be run on a scheduler, use subscribeOn.
- *
- * @param {Scheduler} scheduler Scheduler to notify observers on.
- * @returns {Observable} The source sequence whose observations happen on the specified scheduler.
- */
- observableProto.observeOn = function (scheduler) {
- return new ObserveOnObservable(this, scheduler);
- };
- var SubscribeOnObservable = (function (__super__) {
- inherits(SubscribeOnObservable, __super__);
- function SubscribeOnObservable(source, s) {
- this.source = source;
- this._s = s;
- __super__.call(this);
- }
- function scheduleMethod(scheduler, state) {
- var source = state[0], d = state[1], o = state[2];
- d.setDisposable(new ScheduledDisposable(scheduler, source.subscribe(o)));
- }
- SubscribeOnObservable.prototype.subscribeCore = function (o) {
- var m = new SingleAssignmentDisposable(), d = new SerialDisposable();
- d.setDisposable(m);
- m.setDisposable(this._s.schedule([this.source, d, o], scheduleMethod));
- return d;
- };
- return SubscribeOnObservable;
- }(ObservableBase));
- /**
- * Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used;
- * see the remarks section for more information on the distinction between subscribeOn and observeOn.
- * This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer
- * callbacks on a scheduler, use observeOn.
- * @param {Scheduler} scheduler Scheduler to perform subscription and unsubscription actions on.
- * @returns {Observable} The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
- */
- observableProto.subscribeOn = function (scheduler) {
- return new SubscribeOnObservable(this, scheduler);
- };
- var GenerateObservable = (function (__super__) {
- inherits(GenerateObservable, __super__);
- function GenerateObservable(state, cndFn, itrFn, resFn, s) {
- this._initialState = state;
- this._cndFn = cndFn;
- this._itrFn = itrFn;
- this._resFn = resFn;
- this._s = s;
- __super__.call(this);
- }
- function scheduleRecursive(state, recurse) {
- if (state.first) {
- state.first = false;
- } else {
- state.newState = tryCatch(state.self._itrFn)(state.newState);
- if (state.newState === errorObj) { return state.o.onError(state.newState.e); }
- }
- var hasResult = tryCatch(state.self._cndFn)(state.newState);
- if (hasResult === errorObj) { return state.o.onError(hasResult.e); }
- if (hasResult) {
- var result = tryCatch(state.self._resFn)(state.newState);
- if (result === errorObj) { return state.o.onError(result.e); }
- state.o.onNext(result);
- recurse(state);
- } else {
- state.o.onCompleted();
- }
- }
- GenerateObservable.prototype.subscribeCore = function (o) {
- var state = {
- o: o,
- self: this,
- first: true,
- newState: this._initialState
- };
- return this._s.scheduleRecursive(state, scheduleRecursive);
- };
- return GenerateObservable;
- }(ObservableBase));
- /**
- * Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler to send out observer messages.
- *
- * @example
- * var res = Rx.Observable.generate(0, function (x) { return x < 10; }, function (x) { return x + 1; }, function (x) { return x; });
- * var res = Rx.Observable.generate(0, function (x) { return x < 10; }, function (x) { return x + 1; }, function (x) { return x; }, Rx.Scheduler.timeout);
- * @param {Mixed} initialState Initial state.
- * @param {Function} condition Condition to terminate generation (upon returning false).
- * @param {Function} iterate Iteration step function.
- * @param {Function} resultSelector Selector function for results produced in the sequence.
- * @param {Scheduler} [scheduler] Scheduler on which to run the generator loop. If not provided, defaults to Scheduler.currentThread.
- * @returns {Observable} The generated sequence.
- */
- Observable.generate = function (initialState, condition, iterate, resultSelector, scheduler) {
- isScheduler(scheduler) || (scheduler = currentThreadScheduler);
- return new GenerateObservable(initialState, condition, iterate, resultSelector, scheduler);
- };
- var UsingObservable = (function (__super__) {
- inherits(UsingObservable, __super__);
- function UsingObservable(resFn, obsFn) {
- this._resFn = resFn;
- this._obsFn = obsFn;
- __super__.call(this);
- }
- UsingObservable.prototype.subscribeCore = function (o) {
- var disposable = disposableEmpty;
- var resource = tryCatch(this._resFn)();
- if (resource === errorObj) {
- return new BinaryDisposable(observableThrow(resource.e).subscribe(o), disposable);
- }
- resource && (disposable = resource);
- var source = tryCatch(this._obsFn)(resource);
- if (source === errorObj) {
- return new BinaryDisposable(observableThrow(source.e).subscribe(o), disposable);
- }
- return new BinaryDisposable(source.subscribe(o), disposable);
- };
- return UsingObservable;
- }(ObservableBase));
- /**
- * Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime.
- * @param {Function} resourceFactory Factory function to obtain a resource object.
- * @param {Function} observableFactory Factory function to obtain an observable sequence that depends on the obtained resource.
- * @returns {Observable} An observable sequence whose lifetime controls the lifetime of the dependent resource object.
- */
- Observable.using = function (resourceFactory, observableFactory) {
- return new UsingObservable(resourceFactory, observableFactory);
- };
- /**
- * Propagates the observable sequence or Promise that reacts first.
- * @param {Observable} rightSource Second observable sequence or Promise.
- * @returns {Observable} {Observable} An observable sequence that surfaces either of the given sequences, whichever reacted first.
- */
- observableProto.amb = function (rightSource) {
- var leftSource = this;
- return new AnonymousObservable(function (observer) {
- var choice,
- leftChoice = 'L', rightChoice = 'R',
- leftSubscription = new SingleAssignmentDisposable(),
- rightSubscription = new SingleAssignmentDisposable();
- isPromise(rightSource) && (rightSource = observableFromPromise(rightSource));
- function choiceL() {
- if (!choice) {
- choice = leftChoice;
- rightSubscription.dispose();
- }
- }
- function choiceR() {
- if (!choice) {
- choice = rightChoice;
- leftSubscription.dispose();
- }
- }
- var leftSubscribe = observerCreate(
- function (left) {
- choiceL();
- choice === leftChoice && observer.onNext(left);
- },
- function (e) {
- choiceL();
- choice === leftChoice && observer.onError(e);
- },
- function () {
- choiceL();
- choice === leftChoice && observer.onCompleted();
- }
- );
- var rightSubscribe = observerCreate(
- function (right) {
- choiceR();
- choice === rightChoice && observer.onNext(right);
- },
- function (e) {
- choiceR();
- choice === rightChoice && observer.onError(e);
- },
- function () {
- choiceR();
- choice === rightChoice && observer.onCompleted();
- }
- );
- leftSubscription.setDisposable(leftSource.subscribe(leftSubscribe));
- rightSubscription.setDisposable(rightSource.subscribe(rightSubscribe));
- return new BinaryDisposable(leftSubscription, rightSubscription);
- });
- };
- function amb(p, c) { return p.amb(c); }
- /**
- * Propagates the observable sequence or Promise that reacts first.
- * @returns {Observable} An observable sequence that surfaces any of the given sequences, whichever reacted first.
- */
- Observable.amb = function () {
- var acc = observableNever(), items;
- if (Array.isArray(arguments[0])) {
- items = arguments[0];
- } else {
- var len = arguments.length;
- items = new Array(items);
- for(var i = 0; i < len; i++) { items[i] = arguments[i]; }
- }
- for (var i = 0, len = items.length; i < len; i++) {
- acc = amb(acc, items[i]);
- }
- return acc;
- };
- /**
- * Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.
- * @param {Observable} second Second observable sequence used to produce results after the first sequence terminates.
- * @returns {Observable} An observable sequence that concatenates the first and second sequence, even if the first sequence terminates exceptionally.
- */
- observableProto.onErrorResumeNext = function (second) {
- if (!second) { throw new Error('Second observable is required'); }
- return onErrorResumeNext([this, second]);
- };
- var OnErrorResumeNextObservable = (function(__super__) {
- inherits(OnErrorResumeNextObservable, __super__);
- function OnErrorResumeNextObservable(sources) {
- this.sources = sources;
- __super__.call(this);
- }
- function scheduleMethod(state, recurse) {
- if (state.pos < state.sources.length) {
- var current = state.sources[state.pos++];
- isPromise(current) && (current = observableFromPromise(current));
- var d = new SingleAssignmentDisposable();
- state.subscription.setDisposable(d);
- d.setDisposable(current.subscribe(new OnErrorResumeNextObserver(state, recurse)));
- } else {
- state.o.onCompleted();
- }
- }
- OnErrorResumeNextObservable.prototype.subscribeCore = function (o) {
- var subscription = new SerialDisposable(),
- state = {pos: 0, subscription: subscription, o: o, sources: this.sources },
- cancellable = immediateScheduler.scheduleRecursive(state, scheduleMethod);
- return new BinaryDisposable(subscription, cancellable);
- };
- return OnErrorResumeNextObservable;
- }(ObservableBase));
- var OnErrorResumeNextObserver = (function(__super__) {
- inherits(OnErrorResumeNextObserver, __super__);
- function OnErrorResumeNextObserver(state, recurse) {
- this._state = state;
- this._recurse = recurse;
- __super__.call(this);
- }
- OnErrorResumeNextObserver.prototype.next = function (x) { this._state.o.onNext(x); };
- OnErrorResumeNextObserver.prototype.error = function () { this._recurse(this._state); };
- OnErrorResumeNextObserver.prototype.completed = function () { this._recurse(this._state); };
- return OnErrorResumeNextObserver;
- }(AbstractObserver));
- /**
- * Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.
- * @returns {Observable} An observable sequence that concatenates the source sequences, even if a sequence terminates exceptionally.
- */
- var onErrorResumeNext = Observable.onErrorResumeNext = function () {
- var sources = [];
- if (Array.isArray(arguments[0])) {
- sources = arguments[0];
- } else {
- var len = arguments.length;
- sources = new Array(len);
- for(var i = 0; i < len; i++) { sources[i] = arguments[i]; }
- }
- return new OnErrorResumeNextObservable(sources);
- };
- function toArray(x) { return x.toArray(); }
- function notEmpty(x) { return x.length > 0; }
- /**
- * Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.
- * @param {Number} count Length of each buffer.
- * @param {Number} [skip] Number of elements to skip between creation of consecutive buffers. If not provided, defaults to the count.
- * @returns {Observable} An observable sequence of buffers.
- */
- observableProto.bufferWithCount = observableProto.bufferCount = function (count, skip) {
- typeof skip !== 'number' && (skip = count);
- return this.windowWithCount(count, skip)
- .flatMap(toArray)
- .filter(notEmpty);
- };
- /**
- * Projects each element of an observable sequence into zero or more windows which are produced based on element count information.
- * @param {Number} count Length of each window.
- * @param {Number} [skip] Number of elements to skip between creation of consecutive windows. If not specified, defaults to the count.
- * @returns {Observable} An observable sequence of windows.
- */
- observableProto.windowWithCount = observableProto.windowCount = function (count, skip) {
- var source = this;
- +count || (count = 0);
- Math.abs(count) === Infinity && (count = 0);
- if (count <= 0) { throw new ArgumentOutOfRangeError(); }
- skip == null && (skip = count);
- +skip || (skip = 0);
- Math.abs(skip) === Infinity && (skip = 0);
- if (skip <= 0) { throw new ArgumentOutOfRangeError(); }
- return new AnonymousObservable(function (observer) {
- var m = new SingleAssignmentDisposable(),
- refCountDisposable = new RefCountDisposable(m),
- n = 0,
- q = [];
- function createWindow () {
- var s = new Subject();
- q.push(s);
- observer.onNext(addRef(s, refCountDisposable));
- }
- createWindow();
- m.setDisposable(source.subscribe(
- function (x) {
- for (var i = 0, len = q.length; i < len; i++) { q[i].onNext(x); }
- var c = n - count + 1;
- c >= 0 && c % skip === 0 && q.shift().onCompleted();
- ++n % skip === 0 && createWindow();
- },
- function (e) {
- while (q.length > 0) { q.shift().onError(e); }
- observer.onError(e);
- },
- function () {
- while (q.length > 0) { q.shift().onCompleted(); }
- observer.onCompleted();
- }
- ));
- return refCountDisposable;
- }, source);
- };
- var TakeLastBufferObserver = (function (__super__) {
- inherits(TakeLastBufferObserver, __super__);
- function TakeLastBufferObserver(o, c) {
- this._o = o;
- this._c = c;
- this._q = [];
- __super__.call(this);
- }
- TakeLastBufferObserver.prototype.next = function (x) {
- this._q.push(x);
- this._q.length > this._c && this._q.shift();
- };
- TakeLastBufferObserver.prototype.error = function (e) {
- this._o.onError(e);
- };
- TakeLastBufferObserver.prototype.completed = function () {
- this._o.onNext(this._q);
- this._o.onCompleted();
- };
- return TakeLastBufferObserver;
- }(AbstractObserver));
- /**
- * Returns an array with the specified number of contiguous elements from the end of an observable sequence.
- *
- * @description
- * This operator accumulates a buffer with a length enough to store count elements. Upon completion of the
- * source sequence, this buffer is produced on the result sequence.
- * @param {Number} count Number of elements to take from the end of the source sequence.
- * @returns {Observable} An observable sequence containing a single array with the specified number of elements from the end of the source sequence.
- */
- observableProto.takeLastBuffer = function (count) {
- if (count < 0) { throw new ArgumentOutOfRangeError(); }
- var source = this;
- return new AnonymousObservable(function (o) {
- return source.subscribe(new TakeLastBufferObserver(o, count));
- }, source);
- };
- var DefaultIfEmptyObserver = (function (__super__) {
- inherits(DefaultIfEmptyObserver, __super__);
- function DefaultIfEmptyObserver(o, d) {
- this._o = o;
- this._d = d;
- this._f = false;
- __super__.call(this);
- }
- DefaultIfEmptyObserver.prototype.next = function (x) {
- this._f = true;
- this._o.onNext(x);
- };
- DefaultIfEmptyObserver.prototype.error = function (e) {
- this._o.onError(e);
- };
- DefaultIfEmptyObserver.prototype.completed = function () {
- !this._f && this._o.onNext(this._d);
- this._o.onCompleted();
- };
- return DefaultIfEmptyObserver;
- }(AbstractObserver));
- /**
- * Returns the elements of the specified sequence or the specified value in a singleton sequence if the sequence is empty.
- *
- * var res = obs = xs.defaultIfEmpty();
- * 2 - obs = xs.defaultIfEmpty(false);
- *
- * @memberOf Observable#
- * @param defaultValue The value to return if the sequence is empty. If not provided, this defaults to null.
- * @returns {Observable} An observable sequence that contains the specified default value if the source is empty; otherwise, the elements of the source itself.
- */
- observableProto.defaultIfEmpty = function (defaultValue) {
- var source = this;
- defaultValue === undefined && (defaultValue = null);
- return new AnonymousObservable(function (o) {
- return source.subscribe(new DefaultIfEmptyObserver(o, defaultValue));
- }, source);
- };
- // Swap out for Array.findIndex
- function arrayIndexOfComparer(array, item, comparer) {
- for (var i = 0, len = array.length; i < len; i++) {
- if (comparer(array[i], item)) { return i; }
- }
- return -1;
- }
- function HashSet(comparer) {
- this.comparer = comparer;
- this.set = [];
- }
- HashSet.prototype.push = function(value) {
- var retValue = arrayIndexOfComparer(this.set, value, this.comparer) === -1;
- retValue && this.set.push(value);
- return retValue;
- };
- var DistinctObservable = (function (__super__) {
- inherits(DistinctObservable, __super__);
- function DistinctObservable(source, keyFn, cmpFn) {
- this.source = source;
- this._keyFn = keyFn;
- this._cmpFn = cmpFn;
- __super__.call(this);
- }
- DistinctObservable.prototype.subscribeCore = function (o) {
- return this.source.subscribe(new DistinctObserver(o, this._keyFn, this._cmpFn));
- };
- return DistinctObservable;
- }(ObservableBase));
- var DistinctObserver = (function (__super__) {
- inherits(DistinctObserver, __super__);
- function DistinctObserver(o, keyFn, cmpFn) {
- this._o = o;
- this._keyFn = keyFn;
- this._h = new HashSet(cmpFn);
- __super__.call(this);
- }
- DistinctObserver.prototype.next = function (x) {
- var key = x;
- if (isFunction(this._keyFn)) {
- key = tryCatch(this._keyFn)(x);
- if (key === errorObj) { return this._o.onError(key.e); }
- }
- this._h.push(key) && this._o.onNext(x);
- };
- DistinctObserver.prototype.error = function (e) { this._o.onError(e); };
- DistinctObserver.prototype.completed = function () { this._o.onCompleted(); };
- return DistinctObserver;
- }(AbstractObserver));
- /**
- * Returns an observable sequence that contains only distinct elements according to the keySelector and the comparer.
- * Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large.
- *
- * @example
- * var res = obs = xs.distinct();
- * 2 - obs = xs.distinct(function (x) { return x.id; });
- * 2 - obs = xs.distinct(function (x) { return x.id; }, function (a,b) { return a === b; });
- * @param {Function} [keySelector] A function to compute the comparison key for each element.
- * @param {Function} [comparer] Used to compare items in the collection.
- * @returns {Observable} An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence.
- */
- observableProto.distinct = function (keySelector, comparer) {
- comparer || (comparer = defaultComparer);
- return new DistinctObservable(this, keySelector, comparer);
- };
- /**
- * Returns an observable sequence that shares a single subscription to the underlying sequence. This observable sequence
- * can be resubscribed to, even if all prior subscriptions have ended. (unlike `.publish().refCount()`)
- * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source.
- */
- observableProto.singleInstance = function() {
- var source = this, hasObservable = false, observable;
- function getObservable() {
- if (!hasObservable) {
- hasObservable = true;
- observable = source['finally'](function() { hasObservable = false; }).publish().refCount();
- }
- return observable;
- }
- return new AnonymousObservable(function(o) {
- return getObservable().subscribe(o);
- });
- };
- return Rx;
- }));
|