ConnectableObservable.js 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. "use strict";
  2. var __extends = (this && this.__extends) || function (d, b) {
  3. for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p];
  4. function __() { this.constructor = d; }
  5. d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
  6. };
  7. var Subject_1 = require('../Subject');
  8. var Observable_1 = require('../Observable');
  9. var Subscriber_1 = require('../Subscriber');
  10. var Subscription_1 = require('../Subscription');
  11. var refCount_1 = require('../operators/refCount');
  12. /**
  13. * @class ConnectableObservable<T>
  14. */
  15. var ConnectableObservable = (function (_super) {
  16. __extends(ConnectableObservable, _super);
  17. function ConnectableObservable(/** @deprecated internal use only */ source,
  18. /** @deprecated internal use only */ subjectFactory) {
  19. _super.call(this);
  20. this.source = source;
  21. this.subjectFactory = subjectFactory;
  22. /** @deprecated internal use only */ this._refCount = 0;
  23. this._isComplete = false;
  24. }
  25. /** @deprecated internal use only */ ConnectableObservable.prototype._subscribe = function (subscriber) {
  26. return this.getSubject().subscribe(subscriber);
  27. };
  28. /** @deprecated internal use only */ ConnectableObservable.prototype.getSubject = function () {
  29. var subject = this._subject;
  30. if (!subject || subject.isStopped) {
  31. this._subject = this.subjectFactory();
  32. }
  33. return this._subject;
  34. };
  35. ConnectableObservable.prototype.connect = function () {
  36. var connection = this._connection;
  37. if (!connection) {
  38. this._isComplete = false;
  39. connection = this._connection = new Subscription_1.Subscription();
  40. connection.add(this.source
  41. .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
  42. if (connection.closed) {
  43. this._connection = null;
  44. connection = Subscription_1.Subscription.EMPTY;
  45. }
  46. else {
  47. this._connection = connection;
  48. }
  49. }
  50. return connection;
  51. };
  52. ConnectableObservable.prototype.refCount = function () {
  53. return refCount_1.refCount()(this);
  54. };
  55. return ConnectableObservable;
  56. }(Observable_1.Observable));
  57. exports.ConnectableObservable = ConnectableObservable;
  58. var connectableProto = ConnectableObservable.prototype;
  59. exports.connectableObservableDescriptor = {
  60. operator: { value: null },
  61. _refCount: { value: 0, writable: true },
  62. _subject: { value: null, writable: true },
  63. _connection: { value: null, writable: true },
  64. _subscribe: { value: connectableProto._subscribe },
  65. _isComplete: { value: connectableProto._isComplete, writable: true },
  66. getSubject: { value: connectableProto.getSubject },
  67. connect: { value: connectableProto.connect },
  68. refCount: { value: connectableProto.refCount }
  69. };
  70. var ConnectableSubscriber = (function (_super) {
  71. __extends(ConnectableSubscriber, _super);
  72. function ConnectableSubscriber(destination, connectable) {
  73. _super.call(this, destination);
  74. this.connectable = connectable;
  75. }
  76. ConnectableSubscriber.prototype._error = function (err) {
  77. this._unsubscribe();
  78. _super.prototype._error.call(this, err);
  79. };
  80. ConnectableSubscriber.prototype._complete = function () {
  81. this.connectable._isComplete = true;
  82. this._unsubscribe();
  83. _super.prototype._complete.call(this);
  84. };
  85. /** @deprecated internal use only */ ConnectableSubscriber.prototype._unsubscribe = function () {
  86. var connectable = this.connectable;
  87. if (connectable) {
  88. this.connectable = null;
  89. var connection = connectable._connection;
  90. connectable._refCount = 0;
  91. connectable._subject = null;
  92. connectable._connection = null;
  93. if (connection) {
  94. connection.unsubscribe();
  95. }
  96. }
  97. };
  98. return ConnectableSubscriber;
  99. }(Subject_1.SubjectSubscriber));
  100. var RefCountOperator = (function () {
  101. function RefCountOperator(connectable) {
  102. this.connectable = connectable;
  103. }
  104. RefCountOperator.prototype.call = function (subscriber, source) {
  105. var connectable = this.connectable;
  106. connectable._refCount++;
  107. var refCounter = new RefCountSubscriber(subscriber, connectable);
  108. var subscription = source.subscribe(refCounter);
  109. if (!refCounter.closed) {
  110. refCounter.connection = connectable.connect();
  111. }
  112. return subscription;
  113. };
  114. return RefCountOperator;
  115. }());
  116. var RefCountSubscriber = (function (_super) {
  117. __extends(RefCountSubscriber, _super);
  118. function RefCountSubscriber(destination, connectable) {
  119. _super.call(this, destination);
  120. this.connectable = connectable;
  121. }
  122. /** @deprecated internal use only */ RefCountSubscriber.prototype._unsubscribe = function () {
  123. var connectable = this.connectable;
  124. if (!connectable) {
  125. this.connection = null;
  126. return;
  127. }
  128. this.connectable = null;
  129. var refCount = connectable._refCount;
  130. if (refCount <= 0) {
  131. this.connection = null;
  132. return;
  133. }
  134. connectable._refCount = refCount - 1;
  135. if (refCount > 1) {
  136. this.connection = null;
  137. return;
  138. }
  139. ///
  140. // Compare the local RefCountSubscriber's connection Subscription to the
  141. // connection Subscription on the shared ConnectableObservable. In cases
  142. // where the ConnectableObservable source synchronously emits values, and
  143. // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
  144. // execution continues to here before the RefCountOperator has a chance to
  145. // supply the RefCountSubscriber with the shared connection Subscription.
  146. // For example:
  147. // ```
  148. // Observable.range(0, 10)
  149. // .publish()
  150. // .refCount()
  151. // .take(5)
  152. // .subscribe();
  153. // ```
  154. // In order to account for this case, RefCountSubscriber should only dispose
  155. // the ConnectableObservable's shared connection Subscription if the
  156. // connection Subscription exists, *and* either:
  157. // a. RefCountSubscriber doesn't have a reference to the shared connection
  158. // Subscription yet, or,
  159. // b. RefCountSubscriber's connection Subscription reference is identical
  160. // to the shared connection Subscription
  161. ///
  162. var connection = this.connection;
  163. var sharedConnection = connectable._connection;
  164. this.connection = null;
  165. if (sharedConnection && (!connection || sharedConnection === connection)) {
  166. sharedConnection.unsubscribe();
  167. }
  168. };
  169. return RefCountSubscriber;
  170. }(Subscriber_1.Subscriber));
  171. //# sourceMappingURL=ConnectableObservable.js.map