docs: fix multicasting observable example (#42258)

The example did not allow the async sequence to be cancelled
after the first timeout had completed.

Fixes #25804

PR Close #42258
This commit is contained in:
Pete Bacon Darwin 2021-05-23 22:50:50 +01:00 committed by Zach Arend
parent b59d2b05bc
commit e16778d371
2 changed files with 77 additions and 19 deletions

View File

@ -1,3 +1,4 @@
import { Observable, Subscription } from 'rxjs';
import { docRegionDelaySequence, docRegionMulticastSequence } from './multicasting';
describe('multicasting', () => {
@ -26,23 +27,70 @@ describe('multicasting', () => {
['1st subscribe: 3'],
['1st sequence finished.'],
['2nd subscribe: 3'],
['2nd sequence finished.']
['2nd sequence finished.'],
]);
});
it('should create an observable and multicast the emissions', () => {
const consoleSpy = jasmine.createSpyObj<Console>('console', ['log']);
docRegionMulticastSequence(consoleSpy);
docRegionMulticastSequence(consoleSpy, /* runSequence */ true);
jasmine.clock().tick(10000);
expect(consoleSpy.log).toHaveBeenCalledTimes(7);
expect(consoleSpy.log).toHaveBeenCalledTimes(10);
expect(consoleSpy.log.calls.allArgs()).toEqual([
['Emitting 1'],
['1st subscribe: 1'],
['Emitting 2'],
['1st subscribe: 2'],
['2nd subscribe: 2'],
['Emitting 3'],
['1st subscribe: 3'],
['2nd subscribe: 3'],
['1st sequence finished.'],
['2nd sequence finished.']
['2nd sequence finished.'],
]);
});
it('should stop the sequence emission when the last observer unsubscribes from a multicast observable',
() => {
const consoleSpy = jasmine.createSpyObj<Console>('console', ['log']);
const multicastSequenceSubscriber =
docRegionMulticastSequence(consoleSpy, /* runSequence */ false);
const multicastSequence = new Observable(multicastSequenceSubscriber());
const subscription1 = multicastSequence.subscribe({
next(num) {
consoleSpy.log('1st subscribe: ' + num);
},
complete() {
consoleSpy.log('1st sequence finished.');
}
});
let subscription2: Subscription;
setTimeout(() => {
subscription2 = multicastSequence.subscribe({
next(num) {
consoleSpy.log('2nd subscribe: ' + num);
},
complete() {
consoleSpy.log('2nd sequence finished.');
}
});
}, 1500);
setTimeout(() => subscription1.unsubscribe(), 2500);
setTimeout(() => subscription2.unsubscribe(), 2800);
jasmine.clock().tick(5000);
expect(consoleSpy.log).toHaveBeenCalledTimes(5);
expect(consoleSpy.log.calls.allArgs()).toEqual([
['Emitting 1'],
['1st subscribe: 1'],
['Emitting 2'],
['1st subscribe: 2'],
['2nd subscribe: 2'],
]);
});
});

View File

@ -76,7 +76,10 @@ export function docRegionDelaySequence(console: Console) {
// #enddocregion subscribe_twice
}
export function docRegionMulticastSequence(console: Console) {
export function docRegionMulticastSequence(console: Console, runSequence: boolean) {
if (!runSequence) {
return multicastSequenceSubscriber;
}
// #docregion multicast_sequence
function multicastSequenceSubscriber() {
const seq = [1, 2, 3];
@ -92,7 +95,7 @@ export function docRegionMulticastSequence(console: Console) {
observers.push(observer);
// When this is the first subscription, start the sequence
if (observers.length === 1) {
timeoutId = doSequence({
const multicastObserver: Observer<number> = {
next(val) {
// Iterate through observers and notify all subscriptions
observers.forEach(obs => obs.next(val));
@ -102,7 +105,8 @@ export function docRegionMulticastSequence(console: Console) {
// Notify all complete callbacks
observers.slice(0).forEach(obs => obs.complete());
}
}, seq, 0);
};
doSequence(multicastObserver, seq, 0);
}
return {
@ -115,20 +119,21 @@ export function docRegionMulticastSequence(console: Console) {
}
}
};
};
}
// Run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(observer: Observer<number>, arr: number[], idx: number) {
return setTimeout(() => {
observer.next(arr[idx]);
if (idx === arr.length - 1) {
observer.complete();
} else {
doSequence(observer, arr, ++idx);
// Run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(sequenceObserver: Observer<number>, arr: number[], idx: number) {
timeoutId = setTimeout(() => {
console.log('Emitting ' + arr[idx]);
sequenceObserver.next(arr[idx]);
if (idx === arr.length - 1) {
sequenceObserver.complete();
} else {
doSequence(sequenceObserver, arr, ++idx);
}
}, 1000);
}
}, 1000);
};
}
// Create a new Observable that will deliver the above sequence
@ -149,13 +154,18 @@ export function docRegionMulticastSequence(console: Console) {
}, 1500);
// Logs:
// (at 1 second): Emitting 1
// (at 1 second): 1st subscribe: 1
// (at 2 seconds): Emitting 2
// (at 2 seconds): 1st subscribe: 2
// (at 2 seconds): 2nd subscribe: 2
// (at 3 seconds): Emitting 3
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3 seconds): 2nd subscribe: 3
// (at 3 seconds): 2nd sequence finished
// #enddocregion multicast_sequence
return multicastSequenceSubscriber;
}