rx.joinpatterns.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. // Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
  2. ;(function (factory) {
  3. var objectTypes = {
  4. 'function': true,
  5. 'object': true
  6. };
  7. function checkGlobal(value) {
  8. return (value && value.Object === Object) ? value : null;
  9. }
  10. var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
  11. var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
  12. var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
  13. var freeSelf = checkGlobal(objectTypes[typeof self] && self);
  14. var freeWindow = checkGlobal(objectTypes[typeof window] && window);
  15. var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
  16. var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
  17. var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
  18. // Because of build optimizers
  19. if (typeof define === 'function' && define.amd) {
  20. define(['./rx'], function (Rx, exports) {
  21. return factory(root, exports, Rx);
  22. });
  23. } else if (typeof module === 'object' && module && module.exports === freeExports) {
  24. module.exports = factory(root, module.exports, require('./rx'));
  25. } else {
  26. root.Rx = factory(root, {}, root.Rx);
  27. }
  28. }.call(this, function (root, exp, Rx, undefined) {
  29. // Aliases
  30. var Observable = Rx.Observable,
  31. observableProto = Observable.prototype,
  32. AnonymousObservable = Rx.AnonymousObservable,
  33. observableThrow = Observable.throwError,
  34. observerCreate = Rx.Observer.create,
  35. SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
  36. CompositeDisposable = Rx.CompositeDisposable,
  37. AbstractObserver = Rx.internals.AbstractObserver,
  38. noop = Rx.helpers.noop,
  39. inherits = Rx.internals.inherits,
  40. isFunction = Rx.helpers.isFunction;
  41. var errorObj = {e: {}};
  42. function tryCatcherGen(tryCatchTarget) {
  43. return function tryCatcher() {
  44. try {
  45. return tryCatchTarget.apply(this, arguments);
  46. } catch (e) {
  47. errorObj.e = e;
  48. return errorObj;
  49. }
  50. };
  51. }
  52. var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
  53. if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
  54. return tryCatcherGen(fn);
  55. };
  56. function thrower(e) {
  57. throw e;
  58. }
  59. var Map = root.Map || (function () {
  60. function Map() {
  61. this.size = 0;
  62. this._values = [];
  63. this._keys = [];
  64. }
  65. Map.prototype['delete'] = function (key) {
  66. var i = this._keys.indexOf(key);
  67. if (i === -1) { return false; }
  68. this._values.splice(i, 1);
  69. this._keys.splice(i, 1);
  70. this.size--;
  71. return true;
  72. };
  73. Map.prototype.get = function (key) {
  74. var i = this._keys.indexOf(key);
  75. return i === -1 ? undefined : this._values[i];
  76. };
  77. Map.prototype.set = function (key, value) {
  78. var i = this._keys.indexOf(key);
  79. if (i === -1) {
  80. this._keys.push(key);
  81. this._values.push(value);
  82. this.size++;
  83. } else {
  84. this._values[i] = value;
  85. }
  86. return this;
  87. };
  88. Map.prototype.forEach = function (cb, thisArg) {
  89. for (var i = 0; i < this.size; i++) {
  90. cb.call(thisArg, this._values[i], this._keys[i]);
  91. }
  92. };
  93. return Map;
  94. }());
  95. /**
  96. * @constructor
  97. * Represents a join pattern over observable sequences.
  98. */
  99. function Pattern(patterns) {
  100. this.patterns = patterns;
  101. }
  102. /**
  103. * Creates a pattern that matches the current plan matches and when the specified observable sequences has an available value.
  104. * @param other Observable sequence to match in addition to the current pattern.
  105. * @return {Pattern} Pattern object that matches when all observable sequences in the pattern have an available value.
  106. */
  107. Pattern.prototype.and = function (other) {
  108. return new Pattern(this.patterns.concat(other));
  109. };
  110. /**
  111. * Matches when all observable sequences in the pattern (specified using a chain of and operators) have an available value and projects the values.
  112. * @param {Function} selector Selector that will be invoked with available values from the source sequences, in the same order of the sequences in the pattern.
  113. * @return {Plan} Plan that produces the projected values, to be fed (with other plans) to the when operator.
  114. */
  115. Pattern.prototype.thenDo = function (selector) {
  116. return new Plan(this, selector);
  117. };
  118. function Plan(expression, selector) {
  119. this.expression = expression;
  120. this.selector = selector;
  121. }
  122. function handleOnError(o) { return function (e) { o.onError(e); }; }
  123. function handleOnNext(self, observer) {
  124. return function onNext () {
  125. var result = tryCatch(self.selector).apply(self, arguments);
  126. if (result === errorObj) { return observer.onError(result.e); }
  127. observer.onNext(result);
  128. };
  129. }
  130. Plan.prototype.activate = function (externalSubscriptions, observer, deactivate) {
  131. var joinObservers = [], errHandler = handleOnError(observer);
  132. for (var i = 0, len = this.expression.patterns.length; i < len; i++) {
  133. joinObservers.push(planCreateObserver(externalSubscriptions, this.expression.patterns[i], errHandler));
  134. }
  135. var activePlan = new ActivePlan(joinObservers, handleOnNext(this, observer), function () {
  136. for (var j = 0, jlen = joinObservers.length; j < jlen; j++) {
  137. joinObservers[j].removeActivePlan(activePlan);
  138. }
  139. deactivate(activePlan);
  140. });
  141. for (i = 0, len = joinObservers.length; i < len; i++) {
  142. joinObservers[i].addActivePlan(activePlan);
  143. }
  144. return activePlan;
  145. };
  146. function planCreateObserver(externalSubscriptions, observable, onError) {
  147. var entry = externalSubscriptions.get(observable);
  148. if (!entry) {
  149. var observer = new JoinObserver(observable, onError);
  150. externalSubscriptions.set(observable, observer);
  151. return observer;
  152. }
  153. return entry;
  154. }
  155. function ActivePlan(joinObserverArray, onNext, onCompleted) {
  156. this.joinObserverArray = joinObserverArray;
  157. this.onNext = onNext;
  158. this.onCompleted = onCompleted;
  159. this.joinObservers = new Map();
  160. for (var i = 0, len = this.joinObserverArray.length; i < len; i++) {
  161. var joinObserver = this.joinObserverArray[i];
  162. this.joinObservers.set(joinObserver, joinObserver);
  163. }
  164. }
  165. ActivePlan.prototype.dequeue = function () {
  166. this.joinObservers.forEach(function (v) { v.queue.shift(); });
  167. };
  168. ActivePlan.prototype.match = function () {
  169. var i, len, hasValues = true;
  170. for (i = 0, len = this.joinObserverArray.length; i < len; i++) {
  171. if (this.joinObserverArray[i].queue.length === 0) {
  172. hasValues = false;
  173. break;
  174. }
  175. }
  176. if (hasValues) {
  177. var firstValues = [],
  178. isCompleted = false;
  179. for (i = 0, len = this.joinObserverArray.length; i < len; i++) {
  180. firstValues.push(this.joinObserverArray[i].queue[0]);
  181. this.joinObserverArray[i].queue[0].kind === 'C' && (isCompleted = true);
  182. }
  183. if (isCompleted) {
  184. this.onCompleted();
  185. } else {
  186. this.dequeue();
  187. var values = [];
  188. for (i = 0, len = firstValues.length; i < firstValues.length; i++) {
  189. values.push(firstValues[i].value);
  190. }
  191. this.onNext.apply(this, values);
  192. }
  193. }
  194. };
  195. var JoinObserver = (function (__super__) {
  196. inherits(JoinObserver, __super__);
  197. function JoinObserver(source, onError) {
  198. __super__.call(this);
  199. this.source = source;
  200. this.onError = onError;
  201. this.queue = [];
  202. this.activePlans = [];
  203. this.subscription = new SingleAssignmentDisposable();
  204. this.isDisposed = false;
  205. }
  206. var JoinObserverPrototype = JoinObserver.prototype;
  207. JoinObserverPrototype.next = function (notification) {
  208. if (!this.isDisposed) {
  209. if (notification.kind === 'E') {
  210. return this.onError(notification.error);
  211. }
  212. this.queue.push(notification);
  213. var activePlans = this.activePlans.slice(0);
  214. for (var i = 0, len = activePlans.length; i < len; i++) {
  215. activePlans[i].match();
  216. }
  217. }
  218. };
  219. JoinObserverPrototype.error = noop;
  220. JoinObserverPrototype.completed = noop;
  221. JoinObserverPrototype.addActivePlan = function (activePlan) {
  222. this.activePlans.push(activePlan);
  223. };
  224. JoinObserverPrototype.subscribe = function () {
  225. this.subscription.setDisposable(this.source.materialize().subscribe(this));
  226. };
  227. JoinObserverPrototype.removeActivePlan = function (activePlan) {
  228. this.activePlans.splice(this.activePlans.indexOf(activePlan), 1);
  229. this.activePlans.length === 0 && this.dispose();
  230. };
  231. JoinObserverPrototype.dispose = function () {
  232. __super__.prototype.dispose.call(this);
  233. if (!this.isDisposed) {
  234. this.isDisposed = true;
  235. this.subscription.dispose();
  236. }
  237. };
  238. return JoinObserver;
  239. } (AbstractObserver));
  240. /**
  241. * Creates a pattern that matches when both observable sequences have an available value.
  242. *
  243. * @param right Observable sequence to match with the current sequence.
  244. * @return {Pattern} Pattern object that matches when both observable sequences have an available value.
  245. */
  246. observableProto.and = function (right) {
  247. return new Pattern([this, right]);
  248. };
  249. /**
  250. * Matches when the observable sequence has an available value and projects the value.
  251. *
  252. * @param {Function} selector Selector that will be invoked for values in the source sequence.
  253. * @returns {Plan} Plan that produces the projected values, to be fed (with other plans) to the when operator.
  254. */
  255. observableProto.thenDo = function (selector) {
  256. return new Pattern([this]).thenDo(selector);
  257. };
  258. /**
  259. * Joins together the results from several patterns.
  260. *
  261. * @param plans A series of plans (specified as an Array of as a series of arguments) created by use of the Then operator on patterns.
  262. * @returns {Observable} Observable sequence with the results form matching several patterns.
  263. */
  264. Observable.when = function () {
  265. var len = arguments.length, plans;
  266. if (Array.isArray(arguments[0])) {
  267. plans = arguments[0];
  268. } else {
  269. plans = new Array(len);
  270. for(var i = 0; i < len; i++) { plans[i] = arguments[i]; }
  271. }
  272. return new AnonymousObservable(function (o) {
  273. var activePlans = [],
  274. externalSubscriptions = new Map();
  275. var outObserver = observerCreate(
  276. function (x) { o.onNext(x); },
  277. function (err) {
  278. externalSubscriptions.forEach(function (v) { v.onError(err); });
  279. o.onError(err);
  280. },
  281. function (x) { o.onCompleted(); }
  282. );
  283. try {
  284. for (var i = 0, len = plans.length; i < len; i++) {
  285. activePlans.push(plans[i].activate(externalSubscriptions, outObserver, function (activePlan) {
  286. var idx = activePlans.indexOf(activePlan);
  287. activePlans.splice(idx, 1);
  288. activePlans.length === 0 && o.onCompleted();
  289. }));
  290. }
  291. } catch (e) {
  292. return observableThrow(e).subscribe(o);
  293. }
  294. var group = new CompositeDisposable();
  295. externalSubscriptions.forEach(function (joinObserver) {
  296. joinObserver.subscribe();
  297. group.add(joinObserver);
  298. });
  299. return group;
  300. });
  301. };
  302. return Rx;
  303. }));