diff --git a/packages/core/src/event_emitter.ts b/packages/core/src/event_emitter.ts index 86d170d070..ae9bb4bc8b 100644 --- a/packages/core/src/event_emitter.ts +++ b/packages/core/src/event_emitter.ts @@ -7,6 +7,7 @@ */ import {Subject} from 'rxjs/Subject'; +import {Subscription} from 'rxjs/Subscription'; /** * Use by directives and components to emit custom Events. @@ -111,6 +112,12 @@ export class EventEmitter extends Subject { } } - return super.subscribe(schedulerFn, errorFn, completeFn); + const sink = super.subscribe(schedulerFn, errorFn, completeFn); + + if (generatorOrNext instanceof Subscription) { + generatorOrNext.add(sink); + } + + return sink; } } diff --git a/packages/core/test/event_emitter_spec.ts b/packages/core/test/event_emitter_spec.ts index 1c110bf4d3..0b46ecc4ec 100644 --- a/packages/core/test/event_emitter_spec.ts +++ b/packages/core/test/event_emitter_spec.ts @@ -7,6 +7,8 @@ */ import {AsyncTestCompleter, beforeEach, describe, expect, inject, it} from '@angular/core/testing/src/testing_internal'; +import {filter} from 'rxjs/operators'; + import {EventEmitter} from '../src/event_emitter'; { @@ -125,6 +127,59 @@ import {EventEmitter} from '../src/event_emitter'; expect(e.observers.length > 0).toBe(true); }); + it('remove a subscriber subscribed directly to EventEmitter', () => { + const sub = emitter.subscribe(); + expect(emitter.observers.length).toBe(1); + sub.unsubscribe(); + expect(emitter.observers.length).toBe(0); + }); + + it('remove a subscriber subscribed after applying operators with pipe()', () => { + const sub = emitter.pipe(filter(() => true)).subscribe(); + expect(emitter.observers.length).toBe(1); + sub.unsubscribe(); + expect(emitter.observers.length).toBe(0); + }); + + it('unsubscribing a subscriber invokes the dispose method', () => { + inject([AsyncTestCompleter], (async: AsyncTestCompleter) => { + const sub = emitter.subscribe(); + sub.add(() => async.done()); + sub.unsubscribe(); + }); + }); + + it('unsubscribing a subscriber after applying operators with pipe() invokes the dispose method', + () => { + inject([AsyncTestCompleter], (async: AsyncTestCompleter) => { + const sub = emitter.pipe(filter(() => true)).subscribe(); + sub.add(() => async.done()); + sub.unsubscribe(); + }); + }); + + it('error thrown inside an Rx chain propagates to the error handler and disposes the chain', + () => { + let errorPropagated = false; + emitter.pipe(filter(() => { throw new Error(); }), ) + .subscribe(() => {}, err => errorPropagated = true, ); + + emitter.next(1); + + expect(errorPropagated).toBe(true); + expect(emitter.observers.length).toBe(0); + }); + + it('error sent by EventEmitter should dispose the Rx chain and remove subscribers', () => { + let errorPropagated = false; + emitter.pipe(filter(() => true)).subscribe(() => {}, err => errorPropagated = true, ); + + emitter.error(1); + + expect(errorPropagated).toBe(true); + expect(emitter.observers.length).toBe(0); + }); + // TODO: vsavkin: add tests cases // should call dispose on the subscription if generator returns {done:true} // should call dispose on the subscription on throw