rx.experimental.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. // Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
  2. ;(function (factory) {
  3. var objectTypes = {
  4. 'function': true,
  5. 'object': true
  6. };
  7. function checkGlobal(value) {
  8. return (value && value.Object === Object) ? value : null;
  9. }
  10. var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
  11. var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
  12. var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
  13. var freeSelf = checkGlobal(objectTypes[typeof self] && self);
  14. var freeWindow = checkGlobal(objectTypes[typeof window] && window);
  15. var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
  16. var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
  17. var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
  18. // Because of build optimizers
  19. if (typeof define === 'function' && define.amd) {
  20. define(['./rx'], function (Rx, exports) {
  21. return factory(root, exports, Rx);
  22. });
  23. } else if (typeof module === 'object' && module && module.exports === freeExports) {
  24. module.exports = factory(root, module.exports, require('./rx'));
  25. } else {
  26. root.Rx = factory(root, {}, root.Rx);
  27. }
  28. }.call(this, function (root, exp, Rx, undefined) {
  29. // Aliases
  30. var Observable = Rx.Observable,
  31. observableProto = Observable.prototype,
  32. ObservableBase = Rx.ObservableBase,
  33. AbstractObserver = Rx.internals.AbstractObserver,
  34. FlatMapObservable = Rx.FlatMapObservable,
  35. observableConcat = Observable.concat,
  36. observableDefer = Observable.defer,
  37. observableEmpty = Observable.empty,
  38. disposableEmpty = Rx.Disposable.empty,
  39. CompositeDisposable = Rx.CompositeDisposable,
  40. SerialDisposable = Rx.SerialDisposable,
  41. SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
  42. Enumerable = Rx.internals.Enumerable,
  43. enumerableOf = Enumerable.of,
  44. currentThreadScheduler = Rx.Scheduler.currentThread,
  45. AsyncSubject = Rx.AsyncSubject,
  46. Observer = Rx.Observer,
  47. inherits = Rx.internals.inherits,
  48. addProperties = Rx.internals.addProperties,
  49. helpers = Rx.helpers,
  50. noop = helpers.noop,
  51. isPromise = helpers.isPromise,
  52. isFunction = helpers.isFunction,
  53. isIterable = Rx.helpers.isIterable,
  54. isArrayLike = Rx.helpers.isArrayLike,
  55. isScheduler = Rx.Scheduler.isScheduler,
  56. observableFromPromise = Observable.fromPromise;
  57. var errorObj = {e: {}};
  58. function tryCatcherGen(tryCatchTarget) {
  59. return function tryCatcher() {
  60. try {
  61. return tryCatchTarget.apply(this, arguments);
  62. } catch (e) {
  63. errorObj.e = e;
  64. return errorObj;
  65. }
  66. };
  67. }
  68. var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
  69. if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
  70. return tryCatcherGen(fn);
  71. };
  72. function thrower(e) {
  73. throw e;
  74. }
  75. // Shim in iterator support
  76. var $iterator$ = (typeof Symbol === 'function' && Symbol.iterator) ||
  77. '_es6shim_iterator_';
  78. // Bug for mozilla version
  79. if (root.Set && typeof new root.Set()['@@iterator'] === 'function') {
  80. $iterator$ = '@@iterator';
  81. }
  82. var doneEnumerator = Rx.doneEnumerator = { done: true, value: undefined };
  83. var isIterable = Rx.helpers.isIterable = function (o) {
  84. return o && o[$iterator$] !== undefined;
  85. };
  86. var isArrayLike = Rx.helpers.isArrayLike = function (o) {
  87. return o && o.length !== undefined;
  88. };
  89. Rx.helpers.iterator = $iterator$;
  90. var WhileEnumerable = (function(__super__) {
  91. inherits(WhileEnumerable, __super__);
  92. function WhileEnumerable(c, s) {
  93. this.c = c;
  94. this.s = s;
  95. }
  96. WhileEnumerable.prototype[$iterator$] = function () {
  97. var self = this;
  98. return {
  99. next: function () {
  100. return self.c() ?
  101. { done: false, value: self.s } :
  102. { done: true, value: void 0 };
  103. }
  104. };
  105. };
  106. return WhileEnumerable;
  107. }(Enumerable));
  108. function enumerableWhile(condition, source) {
  109. return new WhileEnumerable(condition, source);
  110. }
  111. /**
  112. * Returns an observable sequence that is the result of invoking the selector on the source sequence, without sharing subscriptions.
  113. * This operator allows for a fluent style of writing queries that use the same sequence multiple times.
  114. *
  115. * @param {Function} selector Selector function which can use the source sequence as many times as needed, without sharing subscriptions to the source sequence.
  116. * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
  117. */
  118. observableProto.letBind = observableProto['let'] = function (func) {
  119. return func(this);
  120. };
  121. /**
  122. * Determines whether an observable collection contains values.
  123. *
  124. * @example
  125. * 1 - res = Rx.Observable.if(condition, obs1);
  126. * 2 - res = Rx.Observable.if(condition, obs1, obs2);
  127. * 3 - res = Rx.Observable.if(condition, obs1, scheduler);
  128. * @param {Function} condition The condition which determines if the thenSource or elseSource will be run.
  129. * @param {Observable} thenSource The observable sequence or Promise that will be run if the condition function returns true.
  130. * @param {Observable} [elseSource] The observable sequence or Promise that will be run if the condition function returns false. If this is not provided, it defaults to Rx.Observabe.Empty with the specified scheduler.
  131. * @returns {Observable} An observable sequence which is either the thenSource or elseSource.
  132. */
  133. Observable['if'] = function (condition, thenSource, elseSourceOrScheduler) {
  134. return observableDefer(function () {
  135. elseSourceOrScheduler || (elseSourceOrScheduler = observableEmpty());
  136. isPromise(thenSource) && (thenSource = observableFromPromise(thenSource));
  137. isPromise(elseSourceOrScheduler) && (elseSourceOrScheduler = observableFromPromise(elseSourceOrScheduler));
  138. // Assume a scheduler for empty only
  139. typeof elseSourceOrScheduler.now === 'function' && (elseSourceOrScheduler = observableEmpty(elseSourceOrScheduler));
  140. return condition() ? thenSource : elseSourceOrScheduler;
  141. });
  142. };
  143. /**
  144. * Concatenates the observable sequences obtained by running the specified result selector for each element in source.
  145. * There is an alias for this method called 'forIn' for browsers <IE9
  146. * @param {Array} sources An array of values to turn into an observable sequence.
  147. * @param {Function} resultSelector A function to apply to each item in the sources array to turn it into an observable sequence.
  148. * @returns {Observable} An observable sequence from the concatenated observable sequences.
  149. */
  150. Observable['for'] = Observable.forIn = function (sources, resultSelector, thisArg) {
  151. return enumerableOf(sources, resultSelector, thisArg).concat();
  152. };
  153. /**
  154. * Repeats source as long as condition holds emulating a while loop.
  155. * There is an alias for this method called 'whileDo' for browsers <IE9
  156. *
  157. * @param {Function} condition The condition which determines if the source will be repeated.
  158. * @param {Observable} source The observable sequence that will be run if the condition function returns true.
  159. * @returns {Observable} An observable sequence which is repeated as long as the condition holds.
  160. */
  161. var observableWhileDo = Observable['while'] = Observable.whileDo = function (condition, source) {
  162. isPromise(source) && (source = observableFromPromise(source));
  163. return enumerableWhile(condition, source).concat();
  164. };
  165. /**
  166. * Repeats source as long as condition holds emulating a do while loop.
  167. *
  168. * @param {Function} condition The condition which determines if the source will be repeated.
  169. * @param {Observable} source The observable sequence that will be run if the condition function returns true.
  170. * @returns {Observable} An observable sequence which is repeated as long as the condition holds.
  171. */
  172. observableProto.doWhile = function (condition) {
  173. return observableConcat([this, observableWhileDo(condition, this)]);
  174. };
  175. /**
  176. * Uses selector to determine which source in sources to use.
  177. * @param {Function} selector The function which extracts the value for to test in a case statement.
  178. * @param {Array} sources A object which has keys which correspond to the case statement labels.
  179. * @param {Observable} [elseSource] The observable sequence or Promise that will be run if the sources are not matched. If this is not provided, it defaults to Rx.Observabe.empty with the specified scheduler.
  180. *
  181. * @returns {Observable} An observable sequence which is determined by a case statement.
  182. */
  183. Observable['case'] = function (selector, sources, defaultSourceOrScheduler) {
  184. return observableDefer(function () {
  185. isPromise(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableFromPromise(defaultSourceOrScheduler));
  186. defaultSourceOrScheduler || (defaultSourceOrScheduler = observableEmpty());
  187. isScheduler(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableEmpty(defaultSourceOrScheduler));
  188. var result = sources[selector()];
  189. isPromise(result) && (result = observableFromPromise(result));
  190. return result || defaultSourceOrScheduler;
  191. });
  192. };
  193. var ExpandObservable = (function(__super__) {
  194. inherits(ExpandObservable, __super__);
  195. function ExpandObservable(source, fn, scheduler) {
  196. this.source = source;
  197. this._fn = fn;
  198. this._scheduler = scheduler;
  199. __super__.call(this);
  200. }
  201. function scheduleRecursive(args, recurse) {
  202. var state = args[0], self = args[1];
  203. var work;
  204. if (state.q.length > 0) {
  205. work = state.q.shift();
  206. } else {
  207. state.isAcquired = false;
  208. return;
  209. }
  210. var m1 = new SingleAssignmentDisposable();
  211. state.d.add(m1);
  212. m1.setDisposable(work.subscribe(new ExpandObserver(state, self, m1)));
  213. recurse([state, self]);
  214. }
  215. ExpandObservable.prototype._ensureActive = function (state) {
  216. var isOwner = false;
  217. if (state.q.length > 0) {
  218. isOwner = !state.isAcquired;
  219. state.isAcquired = true;
  220. }
  221. isOwner && state.m.setDisposable(this._scheduler.scheduleRecursive([state, this], scheduleRecursive));
  222. };
  223. ExpandObservable.prototype.subscribeCore = function (o) {
  224. var m = new SerialDisposable(),
  225. d = new CompositeDisposable(m),
  226. state = {
  227. q: [],
  228. m: m,
  229. d: d,
  230. activeCount: 0,
  231. isAcquired: false,
  232. o: o
  233. };
  234. state.q.push(this.source);
  235. state.activeCount++;
  236. this._ensureActive(state);
  237. return d;
  238. };
  239. return ExpandObservable;
  240. }(ObservableBase));
  241. var ExpandObserver = (function(__super__) {
  242. inherits(ExpandObserver, __super__);
  243. function ExpandObserver(state, parent, m1) {
  244. this._s = state;
  245. this._p = parent;
  246. this._m1 = m1;
  247. __super__.call(this);
  248. }
  249. ExpandObserver.prototype.next = function (x) {
  250. this._s.o.onNext(x);
  251. var result = tryCatch(this._p._fn)(x);
  252. if (result === errorObj) { return this._s.o.onError(result.e); }
  253. this._s.q.push(result);
  254. this._s.activeCount++;
  255. this._p._ensureActive(this._s);
  256. };
  257. ExpandObserver.prototype.error = function (e) {
  258. this._s.o.onError(e);
  259. };
  260. ExpandObserver.prototype.completed = function () {
  261. this._s.d.remove(this._m1);
  262. this._s.activeCount--;
  263. this._s.activeCount === 0 && this._s.o.onCompleted();
  264. };
  265. return ExpandObserver;
  266. }(AbstractObserver));
  267. /**
  268. * Expands an observable sequence by recursively invoking selector.
  269. *
  270. * @param {Function} selector Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again.
  271. * @param {Scheduler} [scheduler] Scheduler on which to perform the expansion. If not provided, this defaults to the current thread scheduler.
  272. * @returns {Observable} An observable sequence containing all the elements produced by the recursive expansion.
  273. */
  274. observableProto.expand = function (selector, scheduler) {
  275. isScheduler(scheduler) || (scheduler = currentThreadScheduler);
  276. return new ExpandObservable(this, selector, scheduler);
  277. };
  278. function argumentsToArray() {
  279. var len = arguments.length, args = new Array(len);
  280. for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
  281. return args;
  282. }
  283. var ForkJoinObservable = (function (__super__) {
  284. inherits(ForkJoinObservable, __super__);
  285. function ForkJoinObservable(sources, cb) {
  286. this._sources = sources;
  287. this._cb = cb;
  288. __super__.call(this);
  289. }
  290. ForkJoinObservable.prototype.subscribeCore = function (o) {
  291. if (this._sources.length === 0) {
  292. o.onCompleted();
  293. return disposableEmpty;
  294. }
  295. var count = this._sources.length;
  296. var state = {
  297. finished: false,
  298. hasResults: new Array(count),
  299. hasCompleted: new Array(count),
  300. results: new Array(count)
  301. };
  302. var subscriptions = new CompositeDisposable();
  303. for (var i = 0, len = this._sources.length; i < len; i++) {
  304. var source = this._sources[i];
  305. isPromise(source) && (source = observableFromPromise(source));
  306. subscriptions.add(source.subscribe(new ForkJoinObserver(o, state, i, this._cb, subscriptions)));
  307. }
  308. return subscriptions;
  309. };
  310. return ForkJoinObservable;
  311. }(ObservableBase));
  312. var ForkJoinObserver = (function(__super__) {
  313. inherits(ForkJoinObserver, __super__);
  314. function ForkJoinObserver(o, s, i, cb, subs) {
  315. this._o = o;
  316. this._s = s;
  317. this._i = i;
  318. this._cb = cb;
  319. this._subs = subs;
  320. __super__.call(this);
  321. }
  322. ForkJoinObserver.prototype.next = function (x) {
  323. if (!this._s.finished) {
  324. this._s.hasResults[this._i] = true;
  325. this._s.results[this._i] = x;
  326. }
  327. };
  328. ForkJoinObserver.prototype.error = function (e) {
  329. this._s.finished = true;
  330. this._o.onError(e);
  331. this._subs.dispose();
  332. };
  333. ForkJoinObserver.prototype.completed = function () {
  334. if (!this._s.finished) {
  335. if (!this._s.hasResults[this._i]) {
  336. return this._o.onCompleted();
  337. }
  338. this._s.hasCompleted[this._i] = true;
  339. for (var i = 0; i < this._s.results.length; i++) {
  340. if (!this._s.hasCompleted[i]) { return; }
  341. }
  342. this._s.finished = true;
  343. var res = tryCatch(this._cb).apply(null, this._s.results);
  344. if (res === errorObj) { return this._o.onError(res.e); }
  345. this._o.onNext(res);
  346. this._o.onCompleted();
  347. }
  348. };
  349. return ForkJoinObserver;
  350. }(AbstractObserver));
  351. /**
  352. * Runs all observable sequences in parallel and collect their last elements.
  353. *
  354. * @example
  355. * 1 - res = Rx.Observable.forkJoin([obs1, obs2]);
  356. * 1 - res = Rx.Observable.forkJoin(obs1, obs2, ...);
  357. * @returns {Observable} An observable sequence with an array collecting the last elements of all the input sequences.
  358. */
  359. Observable.forkJoin = function () {
  360. var len = arguments.length, args = new Array(len);
  361. for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
  362. var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray;
  363. Array.isArray(args[0]) && (args = args[0]);
  364. return new ForkJoinObservable(args, resultSelector);
  365. };
  366. /**
  367. * Runs two observable sequences in parallel and combines their last elemenets.
  368. * @param {Observable} second Second observable sequence.
  369. * @param {Function} resultSelector Result selector function to invoke with the last elements of both sequences.
  370. * @returns {Observable} An observable sequence with the result of calling the selector function with the last elements of both input sequences.
  371. */
  372. observableProto.forkJoin = function () {
  373. var len = arguments.length, args = new Array(len);
  374. for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
  375. if (Array.isArray(args[0])) {
  376. args[0].unshift(this);
  377. } else {
  378. args.unshift(this);
  379. }
  380. return Observable.forkJoin.apply(null, args);
  381. };
  382. /**
  383. * Comonadic bind operator.
  384. * @param {Function} selector A transform function to apply to each element.
  385. * @param {Object} scheduler Scheduler used to execute the operation. If not specified, defaults to the ImmediateScheduler.
  386. * @returns {Observable} An observable sequence which results from the comonadic bind operation.
  387. */
  388. observableProto.manySelect = observableProto.extend = function (selector, scheduler) {
  389. isScheduler(scheduler) || (scheduler = Rx.Scheduler.immediate);
  390. var source = this;
  391. return observableDefer(function () {
  392. var chain;
  393. return source
  394. .map(function (x) {
  395. var curr = new ChainObservable(x);
  396. chain && chain.onNext(x);
  397. chain = curr;
  398. return curr;
  399. })
  400. .tap(
  401. noop,
  402. function (e) { chain && chain.onError(e); },
  403. function () { chain && chain.onCompleted(); }
  404. )
  405. .observeOn(scheduler)
  406. .map(selector);
  407. }, source);
  408. };
  409. var ChainObservable = (function (__super__) {
  410. inherits(ChainObservable, __super__);
  411. function ChainObservable(head) {
  412. __super__.call(this);
  413. this.head = head;
  414. this.tail = new AsyncSubject();
  415. }
  416. addProperties(ChainObservable.prototype, Observer, {
  417. _subscribe: function (o) {
  418. var g = new CompositeDisposable();
  419. g.add(currentThreadScheduler.schedule(this, function (_, self) {
  420. o.onNext(self.head);
  421. g.add(self.tail.mergeAll().subscribe(o));
  422. }));
  423. return g;
  424. },
  425. onCompleted: function () {
  426. this.onNext(Observable.empty());
  427. },
  428. onError: function (e) {
  429. this.onNext(Observable['throw'](e));
  430. },
  431. onNext: function (v) {
  432. this.tail.onNext(v);
  433. this.tail.onCompleted();
  434. }
  435. });
  436. return ChainObservable;
  437. }(Observable));
  438. var SwitchFirstObservable = (function (__super__) {
  439. inherits(SwitchFirstObservable, __super__);
  440. function SwitchFirstObservable(source) {
  441. this.source = source;
  442. __super__.call(this);
  443. }
  444. SwitchFirstObservable.prototype.subscribeCore = function (o) {
  445. var m = new SingleAssignmentDisposable(),
  446. g = new CompositeDisposable(),
  447. state = {
  448. hasCurrent: false,
  449. isStopped: false,
  450. o: o,
  451. g: g
  452. };
  453. g.add(m);
  454. m.setDisposable(this.source.subscribe(new SwitchFirstObserver(state)));
  455. return g;
  456. };
  457. return SwitchFirstObservable;
  458. }(ObservableBase));
  459. var SwitchFirstObserver = (function(__super__) {
  460. inherits(SwitchFirstObserver, __super__);
  461. function SwitchFirstObserver(state) {
  462. this._s = state;
  463. __super__.call(this);
  464. }
  465. SwitchFirstObserver.prototype.next = function (x) {
  466. if (!this._s.hasCurrent) {
  467. this._s.hasCurrent = true;
  468. isPromise(x) && (x = observableFromPromise(x));
  469. var inner = new SingleAssignmentDisposable();
  470. this._s.g.add(inner);
  471. inner.setDisposable(x.subscribe(new InnerObserver(this._s, inner)));
  472. }
  473. };
  474. SwitchFirstObserver.prototype.error = function (e) {
  475. this._s.o.onError(e);
  476. };
  477. SwitchFirstObserver.prototype.completed = function () {
  478. this._s.isStopped = true;
  479. !this._s.hasCurrent && this._s.g.length === 1 && this._s.o.onCompleted();
  480. };
  481. inherits(InnerObserver, __super__);
  482. function InnerObserver(state, inner) {
  483. this._s = state;
  484. this._i = inner;
  485. __super__.call(this);
  486. }
  487. InnerObserver.prototype.next = function (x) { this._s.o.onNext(x); };
  488. InnerObserver.prototype.error = function (e) { this._s.o.onError(e); };
  489. InnerObserver.prototype.completed = function () {
  490. this._s.g.remove(this._i);
  491. this._s.hasCurrent = false;
  492. this._s.isStopped && this._s.g.length === 1 && this._s.o.onCompleted();
  493. };
  494. return SwitchFirstObserver;
  495. }(AbstractObserver));
  496. /**
  497. * Performs a exclusive waiting for the first to finish before subscribing to another observable.
  498. * Observables that come in between subscriptions will be dropped on the floor.
  499. * @returns {Observable} A exclusive observable with only the results that happen when subscribed.
  500. */
  501. observableProto.switchFirst = function () {
  502. return new SwitchFirstObservable(this);
  503. };
  504. observableProto.flatMapFirst = observableProto.exhaustMap = function(selector, resultSelector, thisArg) {
  505. return new FlatMapObservable(this, selector, resultSelector, thisArg).switchFirst();
  506. };
  507. observableProto.flatMapWithMaxConcurrent = observableProto.flatMapMaxConcurrent = function(limit, selector, resultSelector, thisArg) {
  508. return new FlatMapObservable(this, selector, resultSelector, thisArg).merge(limit);
  509. };
  510. return Rx;
  511. }));