feat(common): allow any Subscribable in async pipe (#39627)
As only methods from the Subscribable interface are currently used in the implementation of the async pipe, it makes sense to make it explicit so that it works successfully with any other implementation instead of only Observable. PR Close #39627
This commit is contained in:
parent
0870af1740
commit
c7f4abf18a
|
@ -3,9 +3,9 @@ export declare const APP_BASE_HREF: InjectionToken<string>;
|
|||
export declare class AsyncPipe implements OnDestroy, PipeTransform {
|
||||
constructor(_ref: ChangeDetectorRef);
|
||||
ngOnDestroy(): void;
|
||||
transform<T>(obj: Observable<T> | Promise<T>): T | null;
|
||||
transform<T>(obj: Subscribable<T> | Promise<T>): T | null;
|
||||
transform<T>(obj: null | undefined): null;
|
||||
transform<T>(obj: Observable<T> | Promise<T> | null | undefined): T | null;
|
||||
transform<T>(obj: Subscribable<T> | Promise<T> | null | undefined): T | null;
|
||||
}
|
||||
|
||||
export declare class CommonModule {
|
||||
|
|
|
@ -6,19 +6,20 @@
|
|||
* found in the LICENSE file at https://angular.io/license
|
||||
*/
|
||||
|
||||
import {ChangeDetectorRef, EventEmitter, OnDestroy, Pipe, PipeTransform, ɵisObservable, ɵisPromise} from '@angular/core';
|
||||
import {Observable, SubscriptionLike} from 'rxjs';
|
||||
import {ChangeDetectorRef, EventEmitter, OnDestroy, Pipe, PipeTransform, ɵisPromise, ɵisSubscribable} from '@angular/core';
|
||||
import {Subscribable, Unsubscribable} from 'rxjs';
|
||||
|
||||
import {invalidPipeArgumentError} from './invalid_pipe_argument_error';
|
||||
|
||||
interface SubscriptionStrategy {
|
||||
createSubscription(async: Observable<any>|Promise<any>, updateLatestValue: any): SubscriptionLike
|
||||
createSubscription(async: Subscribable<any>|Promise<any>, updateLatestValue: any): Unsubscribable
|
||||
|Promise<any>;
|
||||
dispose(subscription: SubscriptionLike|Promise<any>): void;
|
||||
onDestroy(subscription: SubscriptionLike|Promise<any>): void;
|
||||
dispose(subscription: Unsubscribable|Promise<any>): void;
|
||||
onDestroy(subscription: Unsubscribable|Promise<any>): void;
|
||||
}
|
||||
|
||||
class ObservableStrategy implements SubscriptionStrategy {
|
||||
createSubscription(async: Observable<any>, updateLatestValue: any): SubscriptionLike {
|
||||
class SubscribableStrategy implements SubscriptionStrategy {
|
||||
createSubscription(async: Subscribable<any>, updateLatestValue: any): Unsubscribable {
|
||||
return async.subscribe({
|
||||
next: updateLatestValue,
|
||||
error: (e: any) => {
|
||||
|
@ -27,11 +28,11 @@ class ObservableStrategy implements SubscriptionStrategy {
|
|||
});
|
||||
}
|
||||
|
||||
dispose(subscription: SubscriptionLike): void {
|
||||
dispose(subscription: Unsubscribable): void {
|
||||
subscription.unsubscribe();
|
||||
}
|
||||
|
||||
onDestroy(subscription: SubscriptionLike): void {
|
||||
onDestroy(subscription: Unsubscribable): void {
|
||||
subscription.unsubscribe();
|
||||
}
|
||||
}
|
||||
|
@ -49,7 +50,7 @@ class PromiseStrategy implements SubscriptionStrategy {
|
|||
}
|
||||
|
||||
const _promiseStrategy = new PromiseStrategy();
|
||||
const _observableStrategy = new ObservableStrategy();
|
||||
const _subscribableStrategy = new SubscribableStrategy();
|
||||
|
||||
/**
|
||||
* @ngModule CommonModule
|
||||
|
@ -82,8 +83,8 @@ const _observableStrategy = new ObservableStrategy();
|
|||
export class AsyncPipe implements OnDestroy, PipeTransform {
|
||||
private _latestValue: any = null;
|
||||
|
||||
private _subscription: SubscriptionLike|Promise<any>|null = null;
|
||||
private _obj: Observable<any>|Promise<any>|EventEmitter<any>|null = null;
|
||||
private _subscription: Unsubscribable|Promise<any>|null = null;
|
||||
private _obj: Subscribable<any>|Promise<any>|EventEmitter<any>|null = null;
|
||||
private _strategy: SubscriptionStrategy = null!;
|
||||
|
||||
constructor(private _ref: ChangeDetectorRef) {}
|
||||
|
@ -94,10 +95,10 @@ export class AsyncPipe implements OnDestroy, PipeTransform {
|
|||
}
|
||||
}
|
||||
|
||||
transform<T>(obj: Observable<T>|Promise<T>): T|null;
|
||||
transform<T>(obj: Subscribable<T>|Promise<T>): T|null;
|
||||
transform<T>(obj: null|undefined): null;
|
||||
transform<T>(obj: Observable<T>|Promise<T>|null|undefined): T|null;
|
||||
transform<T>(obj: Observable<T>|Promise<T>|null|undefined): T|null {
|
||||
transform<T>(obj: Subscribable<T>|Promise<T>|null|undefined): T|null;
|
||||
transform<T>(obj: Subscribable<T>|Promise<T>|null|undefined): T|null {
|
||||
if (!this._obj) {
|
||||
if (obj) {
|
||||
this._subscribe(obj);
|
||||
|
@ -113,20 +114,20 @@ export class AsyncPipe implements OnDestroy, PipeTransform {
|
|||
return this._latestValue;
|
||||
}
|
||||
|
||||
private _subscribe(obj: Observable<any>|Promise<any>|EventEmitter<any>): void {
|
||||
private _subscribe(obj: Subscribable<any>|Promise<any>|EventEmitter<any>): void {
|
||||
this._obj = obj;
|
||||
this._strategy = this._selectStrategy(obj);
|
||||
this._subscription = this._strategy.createSubscription(
|
||||
obj, (value: Object) => this._updateLatestValue(obj, value));
|
||||
}
|
||||
|
||||
private _selectStrategy(obj: Observable<any>|Promise<any>|EventEmitter<any>): any {
|
||||
private _selectStrategy(obj: Subscribable<any>|Promise<any>|EventEmitter<any>): any {
|
||||
if (ɵisPromise(obj)) {
|
||||
return _promiseStrategy;
|
||||
}
|
||||
|
||||
if (ɵisObservable(obj)) {
|
||||
return _observableStrategy;
|
||||
if (ɵisSubscribable(obj)) {
|
||||
return _subscribableStrategy;
|
||||
}
|
||||
|
||||
throw invalidPipeArgumentError(AsyncPipe, obj);
|
||||
|
|
|
@ -32,6 +32,7 @@ ts_library(
|
|||
"//packages/platform-browser-dynamic",
|
||||
"//packages/platform-browser/testing",
|
||||
"//packages/private/testing",
|
||||
"@npm//rxjs",
|
||||
],
|
||||
)
|
||||
|
||||
|
|
|
@ -10,35 +10,51 @@ import {AsyncPipe, ɵgetDOM as getDOM} from '@angular/common';
|
|||
import {EventEmitter} from '@angular/core';
|
||||
import {AsyncTestCompleter, beforeEach, describe, expect, inject, it} from '@angular/core/testing/src/testing_internal';
|
||||
import {browserDetection} from '@angular/platform-browser/testing/src/browser_util';
|
||||
import {Subscribable, Unsubscribable} from 'rxjs';
|
||||
|
||||
import {SpyChangeDetectorRef} from '../spies';
|
||||
|
||||
{
|
||||
describe('AsyncPipe', () => {
|
||||
describe('Observable', () => {
|
||||
// only expose methods from the Subscribable interface, to ensure that
|
||||
// the implementation does not rely on other methods:
|
||||
const wrapSubscribable = <T>(input: Subscribable<T>): Subscribable<T> => ({
|
||||
subscribe(...args: any): Unsubscribable {
|
||||
const subscription = input.subscribe(...args);
|
||||
return {
|
||||
unsubscribe() {
|
||||
subscription.unsubscribe();
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
let emitter: EventEmitter<any>;
|
||||
let subscribable: Subscribable<any>;
|
||||
let pipe: AsyncPipe;
|
||||
let ref: any;
|
||||
const message = {};
|
||||
|
||||
beforeEach(() => {
|
||||
emitter = new EventEmitter();
|
||||
subscribable = wrapSubscribable(emitter);
|
||||
ref = new SpyChangeDetectorRef();
|
||||
pipe = new AsyncPipe(ref);
|
||||
});
|
||||
|
||||
describe('transform', () => {
|
||||
it('should return null when subscribing to an observable', () => {
|
||||
expect(pipe.transform(emitter)).toBe(null);
|
||||
expect(pipe.transform(subscribable)).toBe(null);
|
||||
});
|
||||
|
||||
it('should return the latest available value',
|
||||
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
|
||||
pipe.transform(emitter);
|
||||
pipe.transform(subscribable);
|
||||
emitter.emit(message);
|
||||
|
||||
setTimeout(() => {
|
||||
expect(pipe.transform(emitter)).toEqual(message);
|
||||
expect(pipe.transform(subscribable)).toEqual(message);
|
||||
async.done();
|
||||
}, 0);
|
||||
}));
|
||||
|
@ -46,34 +62,35 @@ import {SpyChangeDetectorRef} from '../spies';
|
|||
|
||||
it('should return same value when nothing has changed since the last call',
|
||||
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
|
||||
pipe.transform(emitter);
|
||||
pipe.transform(subscribable);
|
||||
emitter.emit(message);
|
||||
|
||||
setTimeout(() => {
|
||||
pipe.transform(emitter);
|
||||
expect(pipe.transform(emitter)).toBe(message);
|
||||
pipe.transform(subscribable);
|
||||
expect(pipe.transform(subscribable)).toBe(message);
|
||||
async.done();
|
||||
}, 0);
|
||||
}));
|
||||
|
||||
it('should dispose of the existing subscription when subscribing to a new observable',
|
||||
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
|
||||
pipe.transform(emitter);
|
||||
pipe.transform(subscribable);
|
||||
|
||||
const newEmitter = new EventEmitter();
|
||||
expect(pipe.transform(newEmitter)).toBe(null);
|
||||
const newSubscribable = wrapSubscribable(newEmitter);
|
||||
expect(pipe.transform(newSubscribable)).toBe(null);
|
||||
emitter.emit(message);
|
||||
|
||||
// this should not affect the pipe
|
||||
setTimeout(() => {
|
||||
expect(pipe.transform(newEmitter)).toBe(null);
|
||||
expect(pipe.transform(newSubscribable)).toBe(null);
|
||||
async.done();
|
||||
}, 0);
|
||||
}));
|
||||
|
||||
it('should request a change detection check upon receiving a new value',
|
||||
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
|
||||
pipe.transform(emitter);
|
||||
pipe.transform(subscribable);
|
||||
emitter.emit(message);
|
||||
|
||||
setTimeout(() => {
|
||||
|
@ -83,12 +100,11 @@ import {SpyChangeDetectorRef} from '../spies';
|
|||
}));
|
||||
|
||||
it('should return value for unchanged NaN', () => {
|
||||
const emitter = new EventEmitter<any>();
|
||||
emitter.emit(null);
|
||||
pipe.transform(emitter);
|
||||
pipe.transform(subscribable);
|
||||
emitter.next(NaN);
|
||||
const firstResult = pipe.transform(emitter);
|
||||
const secondResult = pipe.transform(emitter);
|
||||
const firstResult = pipe.transform(subscribable);
|
||||
const secondResult = pipe.transform(subscribable);
|
||||
expect(firstResult).toBeNaN();
|
||||
expect(secondResult).toBeNaN();
|
||||
});
|
||||
|
@ -101,12 +117,12 @@ import {SpyChangeDetectorRef} from '../spies';
|
|||
|
||||
it('should dispose of the existing subscription',
|
||||
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
|
||||
pipe.transform(emitter);
|
||||
pipe.transform(subscribable);
|
||||
pipe.ngOnDestroy();
|
||||
emitter.emit(message);
|
||||
|
||||
setTimeout(() => {
|
||||
expect(pipe.transform(emitter)).toBe(null);
|
||||
expect(pipe.transform(subscribable)).toBe(null);
|
||||
async.done();
|
||||
}, 0);
|
||||
}));
|
||||
|
|
|
@ -29,7 +29,7 @@ export {_sanitizeHtml as ɵ_sanitizeHtml} from './sanitization/html_sanitizer';
|
|||
export {_sanitizeUrl as ɵ_sanitizeUrl} from './sanitization/url_sanitizer';
|
||||
export {makeDecorator as ɵmakeDecorator} from './util/decorators';
|
||||
export {global as ɵglobal} from './util/global';
|
||||
export {isObservable as ɵisObservable, isPromise as ɵisPromise} from './util/lang';
|
||||
export {isObservable as ɵisObservable, isPromise as ɵisPromise, isSubscribable as ɵisSubscribable} from './util/lang';
|
||||
export {stringify as ɵstringify} from './util/stringify';
|
||||
export {clearOverrides as ɵclearOverrides, initServicesIfNeeded as ɵinitServicesIfNeeded, overrideComponentView as ɵoverrideComponentView, overrideProvider as ɵoverrideProvider} from './view/index';
|
||||
export {NOT_FOUND_CHECK_ONLY_ELEMENT_INJECTOR as ɵNOT_FOUND_CHECK_ONLY_ELEMENT_INJECTOR} from './view/provider';
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
* found in the LICENSE file at https://angular.io/license
|
||||
*/
|
||||
|
||||
import {Observable} from 'rxjs';
|
||||
import {Observable, Subscribable} from 'rxjs';
|
||||
|
||||
/**
|
||||
* Determine if the argument is shaped like a Promise
|
||||
|
@ -17,6 +17,13 @@ export function isPromise<T = any>(obj: any): obj is Promise<T> {
|
|||
return !!obj && typeof obj.then === 'function';
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if the argument is a Subscribable
|
||||
*/
|
||||
export function isSubscribable(obj: any|Subscribable<any>): obj is Subscribable<any> {
|
||||
return !!obj && typeof obj.subscribe === 'function';
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if the argument is an Observable
|
||||
*
|
||||
|
@ -26,6 +33,5 @@ export function isPromise<T = any>(obj: any): obj is Promise<T> {
|
|||
* `subscribe()` method, and RxJS has mechanisms to wrap `Subscribable` objects
|
||||
* into `Observable` as needed.
|
||||
*/
|
||||
export function isObservable(obj: any|Observable<any>): obj is Observable<any> {
|
||||
return !!obj && typeof obj.subscribe === 'function';
|
||||
}
|
||||
export const isObservable =
|
||||
isSubscribable as ((obj: any|Observable<any>) => obj is Observable<any>);
|
||||
|
|
Loading…
Reference in New Issue