ReplaySubject.ts 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import { Subject } from './Subject';
  2. import { IScheduler } from './Scheduler';
  3. import { queue } from './scheduler/queue';
  4. import { Subscriber } from './Subscriber';
  5. import { Subscription } from './Subscription';
  6. import { ObserveOnSubscriber } from './operators/observeOn';
  7. import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
  8. import { SubjectSubscription } from './SubjectSubscription';
  9. /**
  10. * @class ReplaySubject<T>
  11. */
  12. export class ReplaySubject<T> extends Subject<T> {
  13. private _events: ReplayEvent<T>[] = [];
  14. private _bufferSize: number;
  15. private _windowTime: number;
  16. constructor(bufferSize: number = Number.POSITIVE_INFINITY,
  17. windowTime: number = Number.POSITIVE_INFINITY,
  18. private scheduler?: IScheduler) {
  19. super();
  20. this._bufferSize = bufferSize < 1 ? 1 : bufferSize;
  21. this._windowTime = windowTime < 1 ? 1 : windowTime;
  22. }
  23. next(value: T): void {
  24. const now = this._getNow();
  25. this._events.push(new ReplayEvent(now, value));
  26. this._trimBufferThenGetEvents();
  27. super.next(value);
  28. }
  29. /** @deprecated internal use only */ _subscribe(subscriber: Subscriber<T>): Subscription {
  30. const _events = this._trimBufferThenGetEvents();
  31. const scheduler = this.scheduler;
  32. let subscription: Subscription;
  33. if (this.closed) {
  34. throw new ObjectUnsubscribedError();
  35. } else if (this.hasError) {
  36. subscription = Subscription.EMPTY;
  37. } else if (this.isStopped) {
  38. subscription = Subscription.EMPTY;
  39. } else {
  40. this.observers.push(subscriber);
  41. subscription = new SubjectSubscription(this, subscriber);
  42. }
  43. if (scheduler) {
  44. subscriber.add(subscriber = new ObserveOnSubscriber<T>(subscriber, scheduler));
  45. }
  46. const len = _events.length;
  47. for (let i = 0; i < len && !subscriber.closed; i++) {
  48. subscriber.next(_events[i].value);
  49. }
  50. if (this.hasError) {
  51. subscriber.error(this.thrownError);
  52. } else if (this.isStopped) {
  53. subscriber.complete();
  54. }
  55. return subscription;
  56. }
  57. _getNow(): number {
  58. return (this.scheduler || queue).now();
  59. }
  60. private _trimBufferThenGetEvents(): ReplayEvent<T>[] {
  61. const now = this._getNow();
  62. const _bufferSize = this._bufferSize;
  63. const _windowTime = this._windowTime;
  64. const _events = this._events;
  65. let eventsCount = _events.length;
  66. let spliceCount = 0;
  67. // Trim events that fall out of the time window.
  68. // Start at the front of the list. Break early once
  69. // we encounter an event that falls within the window.
  70. while (spliceCount < eventsCount) {
  71. if ((now - _events[spliceCount].time) < _windowTime) {
  72. break;
  73. }
  74. spliceCount++;
  75. }
  76. if (eventsCount > _bufferSize) {
  77. spliceCount = Math.max(spliceCount, eventsCount - _bufferSize);
  78. }
  79. if (spliceCount > 0) {
  80. _events.splice(0, spliceCount);
  81. }
  82. return _events;
  83. }
  84. }
  85. class ReplayEvent<T> {
  86. constructor(public time: number, public value: T) {
  87. }
  88. }