From e16778d3715436b0ecaa60cc7ad32da45b4bd22b Mon Sep 17 00:00:00 2001 From: Pete Bacon Darwin Date: Sun, 23 May 2021 22:50:50 +0100 Subject: [PATCH] docs: fix multicasting observable example (#42258) The example did not allow the async sequence to be cancelled after the first timeout had completed. Fixes #25804 PR Close #42258 --- .../observables/src/multicasting.spec.ts | 56 +++++++++++++++++-- .../examples/observables/src/multicasting.ts | 40 ++++++++----- 2 files changed, 77 insertions(+), 19 deletions(-) diff --git a/aio/content/examples/observables/src/multicasting.spec.ts b/aio/content/examples/observables/src/multicasting.spec.ts index 56e85cad4f..58f9d280e6 100644 --- a/aio/content/examples/observables/src/multicasting.spec.ts +++ b/aio/content/examples/observables/src/multicasting.spec.ts @@ -1,3 +1,4 @@ +import { Observable, Subscription } from 'rxjs'; import { docRegionDelaySequence, docRegionMulticastSequence } from './multicasting'; describe('multicasting', () => { @@ -26,23 +27,70 @@ describe('multicasting', () => { ['1st subscribe: 3'], ['1st sequence finished.'], ['2nd subscribe: 3'], - ['2nd sequence finished.'] + ['2nd sequence finished.'], ]); }); it('should create an observable and multicast the emissions', () => { const consoleSpy = jasmine.createSpyObj('console', ['log']); - docRegionMulticastSequence(consoleSpy); + docRegionMulticastSequence(consoleSpy, /* runSequence */ true); jasmine.clock().tick(10000); - expect(consoleSpy.log).toHaveBeenCalledTimes(7); + expect(consoleSpy.log).toHaveBeenCalledTimes(10); expect(consoleSpy.log.calls.allArgs()).toEqual([ + ['Emitting 1'], ['1st subscribe: 1'], + ['Emitting 2'], ['1st subscribe: 2'], ['2nd subscribe: 2'], + ['Emitting 3'], ['1st subscribe: 3'], ['2nd subscribe: 3'], ['1st sequence finished.'], - ['2nd sequence finished.'] + ['2nd sequence finished.'], ]); }); + + it('should stop the sequence emission when the last observer unsubscribes from a multicast observable', + () => { + const consoleSpy = jasmine.createSpyObj('console', ['log']); + const multicastSequenceSubscriber = + docRegionMulticastSequence(consoleSpy, /* runSequence */ false); + + const multicastSequence = new Observable(multicastSequenceSubscriber()); + + const subscription1 = multicastSequence.subscribe({ + next(num) { + consoleSpy.log('1st subscribe: ' + num); + }, + complete() { + consoleSpy.log('1st sequence finished.'); + } + }); + + let subscription2: Subscription; + setTimeout(() => { + subscription2 = multicastSequence.subscribe({ + next(num) { + consoleSpy.log('2nd subscribe: ' + num); + }, + complete() { + consoleSpy.log('2nd sequence finished.'); + } + }); + }, 1500); + + setTimeout(() => subscription1.unsubscribe(), 2500); + setTimeout(() => subscription2.unsubscribe(), 2800); + + jasmine.clock().tick(5000); + + expect(consoleSpy.log).toHaveBeenCalledTimes(5); + expect(consoleSpy.log.calls.allArgs()).toEqual([ + ['Emitting 1'], + ['1st subscribe: 1'], + ['Emitting 2'], + ['1st subscribe: 2'], + ['2nd subscribe: 2'], + ]); + }); }); diff --git a/aio/content/examples/observables/src/multicasting.ts b/aio/content/examples/observables/src/multicasting.ts index 45a8922e2b..2401802b08 100644 --- a/aio/content/examples/observables/src/multicasting.ts +++ b/aio/content/examples/observables/src/multicasting.ts @@ -76,7 +76,10 @@ export function docRegionDelaySequence(console: Console) { // #enddocregion subscribe_twice } -export function docRegionMulticastSequence(console: Console) { +export function docRegionMulticastSequence(console: Console, runSequence: boolean) { + if (!runSequence) { + return multicastSequenceSubscriber; + } // #docregion multicast_sequence function multicastSequenceSubscriber() { const seq = [1, 2, 3]; @@ -92,7 +95,7 @@ export function docRegionMulticastSequence(console: Console) { observers.push(observer); // When this is the first subscription, start the sequence if (observers.length === 1) { - timeoutId = doSequence({ + const multicastObserver: Observer = { next(val) { // Iterate through observers and notify all subscriptions observers.forEach(obs => obs.next(val)); @@ -102,7 +105,8 @@ export function docRegionMulticastSequence(console: Console) { // Notify all complete callbacks observers.slice(0).forEach(obs => obs.complete()); } - }, seq, 0); + }; + doSequence(multicastObserver, seq, 0); } return { @@ -115,20 +119,21 @@ export function docRegionMulticastSequence(console: Console) { } } }; - }; - } - // Run through an array of numbers, emitting one value - // per second until it gets to the end of the array. - function doSequence(observer: Observer, arr: number[], idx: number) { - return setTimeout(() => { - observer.next(arr[idx]); - if (idx === arr.length - 1) { - observer.complete(); - } else { - doSequence(observer, arr, ++idx); + // Run through an array of numbers, emitting one value + // per second until it gets to the end of the array. + function doSequence(sequenceObserver: Observer, arr: number[], idx: number) { + timeoutId = setTimeout(() => { + console.log('Emitting ' + arr[idx]); + sequenceObserver.next(arr[idx]); + if (idx === arr.length - 1) { + sequenceObserver.complete(); + } else { + doSequence(sequenceObserver, arr, ++idx); + } + }, 1000); } - }, 1000); + }; } // Create a new Observable that will deliver the above sequence @@ -149,13 +154,18 @@ export function docRegionMulticastSequence(console: Console) { }, 1500); // Logs: + // (at 1 second): Emitting 1 // (at 1 second): 1st subscribe: 1 + // (at 2 seconds): Emitting 2 // (at 2 seconds): 1st subscribe: 2 // (at 2 seconds): 2nd subscribe: 2 + // (at 3 seconds): Emitting 3 // (at 3 seconds): 1st subscribe: 3 // (at 3 seconds): 1st sequence finished // (at 3 seconds): 2nd subscribe: 3 // (at 3 seconds): 2nd sequence finished // #enddocregion multicast_sequence + + return multicastSequenceSubscriber; }