fix(core): fix proper propagation of subscriptions in EventEmitter (#22016)

Closes #21999

PR Close #22016
This commit is contained in:
Martin Sikora 2018-02-04 17:58:02 +01:00 committed by Alex Rickabaugh
parent f791e9f081
commit e81606c97a
2 changed files with 63 additions and 1 deletions

View File

@ -7,6 +7,7 @@
*/ */
import {Subject} from 'rxjs/Subject'; import {Subject} from 'rxjs/Subject';
import {Subscription} from 'rxjs/Subscription';
/** /**
* Use by directives and components to emit custom Events. * Use by directives and components to emit custom Events.
@ -111,6 +112,12 @@ export class EventEmitter<T> extends Subject<T> {
} }
} }
return super.subscribe(schedulerFn, errorFn, completeFn); const sink = super.subscribe(schedulerFn, errorFn, completeFn);
if (generatorOrNext instanceof Subscription) {
generatorOrNext.add(sink);
}
return sink;
} }
} }

View File

@ -7,6 +7,8 @@
*/ */
import {AsyncTestCompleter, beforeEach, describe, expect, inject, it} from '@angular/core/testing/src/testing_internal'; import {AsyncTestCompleter, beforeEach, describe, expect, inject, it} from '@angular/core/testing/src/testing_internal';
import {filter} from 'rxjs/operators';
import {EventEmitter} from '../src/event_emitter'; import {EventEmitter} from '../src/event_emitter';
{ {
@ -125,6 +127,59 @@ import {EventEmitter} from '../src/event_emitter';
expect(e.observers.length > 0).toBe(true); expect(e.observers.length > 0).toBe(true);
}); });
it('remove a subscriber subscribed directly to EventEmitter', () => {
const sub = emitter.subscribe();
expect(emitter.observers.length).toBe(1);
sub.unsubscribe();
expect(emitter.observers.length).toBe(0);
});
it('remove a subscriber subscribed after applying operators with pipe()', () => {
const sub = emitter.pipe(filter(() => true)).subscribe();
expect(emitter.observers.length).toBe(1);
sub.unsubscribe();
expect(emitter.observers.length).toBe(0);
});
it('unsubscribing a subscriber invokes the dispose method', () => {
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
const sub = emitter.subscribe();
sub.add(() => async.done());
sub.unsubscribe();
});
});
it('unsubscribing a subscriber after applying operators with pipe() invokes the dispose method',
() => {
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
const sub = emitter.pipe(filter(() => true)).subscribe();
sub.add(() => async.done());
sub.unsubscribe();
});
});
it('error thrown inside an Rx chain propagates to the error handler and disposes the chain',
() => {
let errorPropagated = false;
emitter.pipe(filter(() => { throw new Error(); }), )
.subscribe(() => {}, err => errorPropagated = true, );
emitter.next(1);
expect(errorPropagated).toBe(true);
expect(emitter.observers.length).toBe(0);
});
it('error sent by EventEmitter should dispose the Rx chain and remove subscribers', () => {
let errorPropagated = false;
emitter.pipe(filter(() => true)).subscribe(() => {}, err => errorPropagated = true, );
emitter.error(1);
expect(errorPropagated).toBe(true);
expect(emitter.observers.length).toBe(0);
});
// TODO: vsavkin: add tests cases // TODO: vsavkin: add tests cases
// should call dispose on the subscription if generator returns {done:true} // should call dispose on the subscription if generator returns {done:true}
// should call dispose on the subscription on throw // should call dispose on the subscription on throw