feat(change_detection): added async pipe

This commit is contained in:
vsavkin 2015-04-19 12:45:08 -07:00
parent 8b3c808cb0
commit a97a2266d3
6 changed files with 260 additions and 1 deletions

View File

@ -2,6 +2,7 @@ import {DynamicProtoChangeDetector, JitProtoChangeDetector} from './proto_change
import {PipeRegistry} from './pipes/pipe_registry';
import {IterableChangesFactory} from './pipes/iterable_changes';
import {KeyValueChangesFactory} from './pipes/keyvalue_changes';
import {AsyncPipeFactory} from './pipes/async_pipe';
import {NullPipeFactory} from './pipes/null_pipe';
import {DEFAULT} from './constants';
import {ChangeDetection, ProtoChangeDetector} from './interfaces';
@ -27,9 +28,20 @@ export var iterableDiff = [
new NullPipeFactory()
];
/**
* Async binding to such types as Observable.
*
* @exportedAs angular2/pipes
*/
export var async = [
new AsyncPipeFactory(),
new NullPipeFactory()
];
export var defaultPipes = {
"iterableDiff" : iterableDiff,
"keyValDiff" : keyValDiff
"keyValDiff" : keyValDiff,
"async" : async
};

View File

@ -0,0 +1,112 @@
import {Observable, ObservableWrapper} from 'angular2/src/facade/async';
import {isBlank, isPresent} from 'angular2/src/facade/lang';
import {Pipe, NO_CHANGE} from './pipe';
import {ChangeDetectorRef} from '../change_detector_ref';
/**
* Implements async bindings to Observable.
*
* # Example
*
* In this example we bind the description observable to the DOM. The async pipe will convert an observable to the
* latest value it emitted. It will also request a change detection check when a new value is emitted.
*
* ```
* @Component({
* selector: "task-cmp",
* changeDetection: ON_PUSH
* })
* @View({
* inline: "Task Description {{description|async}}"
* })
* class Task {
* description:Observable<string>;
* }
*
* ```
*
* @exportedAs angular2/pipes
*/
export class AsyncPipe extends Pipe {
_ref:ChangeDetectorRef;
_latestValue:Object;
_latestReturnedValue:Object;
_subscription:Object;
_observable:Observable;
constructor(ref:ChangeDetectorRef) {
super();
this._ref = ref;
this._latestValue = null;
this._latestReturnedValue = null;
this._subscription = null;
this._observable = null;
}
supports(obs):boolean {
return ObservableWrapper.isObservable(obs);
}
onDestroy():void {
if (isPresent(this._subscription)) {
this._dispose();
};
}
transform(obs:Observable):any {
if (isBlank(this._subscription)) {
this._subscribe(obs);
return null;
}
if (obs !== this._observable) {
this._dispose();
return this.transform(obs);
}
if (this._latestValue === this._latestReturnedValue) {
return NO_CHANGE;
} else {
this._latestReturnedValue = this._latestValue;
return this._latestValue;
}
}
_subscribe(obs:Observable):void {
this._observable = obs;
this._subscription = ObservableWrapper.subscribe(obs,
value => this._updateLatestValue(value),
e => {throw e;}
);
}
_dispose():void {
ObservableWrapper.dispose(this._subscription);
this._latestValue = null;
this._latestReturnedValue = null;
this._subscription = null;
this._observable = null;
}
_updateLatestValue(value:Object) {
this._latestValue = value;
this._ref.requestCheck();
}
}
/**
* Provides a factory for [AsyncPipe].
*
* @exportedAs angular2/pipes
*/
export class AsyncPipeFactory {
supports(obs):boolean {
return ObservableWrapper.isObservable(obs);
}
create(cdRef):Pipe {
return new AsyncPipe(cdRef);
}
}

View File

@ -37,6 +37,14 @@ class ObservableWrapper {
return s.listen(onNext, onError: onError, onDone: onComplete, cancelOnError: true);
}
static bool isObservable(obs) {
return obs is Stream;
}
static void dispose(StreamSubscription s) {
s.cancel();
}
static void callNext(EventEmitter emitter, value) {
emitter.add(value);
}

View File

@ -58,6 +58,14 @@ export class ObservableWrapper {
return emitter.observer({next: onNext, throw: onThrow, return: onReturn});
}
static dispose(subscription:any) {
subscription.dispose();
}
static isObservable(obs):boolean {
return obs instanceof Observable;
}
static callNext(emitter:EventEmitter, value:any) {
emitter.next(value);
}

View File

@ -54,6 +54,10 @@ export class ObservableWrapper {
return emitter.observer({next: onNext, throw: onThrow, return: onReturn});
}
static isObservable(obs: any): boolean { return obs instanceof Observable; }
static dispose(subscription: any) { subscription.dispose(); }
static callNext(emitter: EventEmitter, value: any) { emitter.next(value); }
static callThrow(emitter: EventEmitter, error: any) { emitter.throw(error); }

View File

@ -0,0 +1,115 @@
import {ddescribe, describe, it, iit, xit, expect, beforeEach, afterEach,
AsyncTestCompleter, inject, proxy, SpyObject} from 'angular2/test_lib';
import {IMPLEMENTS} from 'angular2/src/facade/lang';
import {AsyncPipe} from 'angular2/src/change_detection/pipes/async_pipe';
import {NO_CHANGE} from 'angular2/src/change_detection/pipes/pipe';
import {ChangeDetectorRef} from 'angular2/src/change_detection/change_detector_ref';
import {EventEmitter, Observable, ObservableWrapper, PromiseWrapper} from 'angular2/src/facade/async';
export function main() {
describe("AsyncPipe", () => {
var emitter;
var pipe;
var ref;
var message = new Object();
beforeEach(() => {
emitter = new EventEmitter();
ref = new SpyChangeDetectorRef();
pipe = new AsyncPipe(ref);
});
describe("supports", () => {
it("should support observables", () => {
expect(pipe.supports(emitter)).toBe(true);
});
it("should not support other objects", () => {
expect(pipe.supports("string")).toBe(false);
expect(pipe.supports(null)).toBe(false);
});
});
describe("transform", () => {
it("should return null when subscribing to an observable", () => {
expect(pipe.transform(emitter)).toBe(null);
});
it("should return the latest available value", inject([AsyncTestCompleter], (async) => {
pipe.transform(emitter);
ObservableWrapper.callNext(emitter, message);
PromiseWrapper.setTimeout(() => {
expect(pipe.transform(emitter)).toEqual(message);
async.done();
}, 0)
}));
it("should return NO_CHANGE when nothing has changed since the last call",
inject([AsyncTestCompleter], (async) => {
pipe.transform(emitter);
ObservableWrapper.callNext(emitter, message);
PromiseWrapper.setTimeout(() => {
pipe.transform(emitter);
expect(pipe.transform(emitter)).toBe(NO_CHANGE);
async.done();
}, 0)
}));
it("should dispose of the existing subscription when subscribing to a new observable",
inject([AsyncTestCompleter], (async) => {
pipe.transform(emitter);
var newEmitter = new EventEmitter();
expect(pipe.transform(newEmitter)).toBe(null);
// this should not affect the pipe, so it should return NO_CHANGE
ObservableWrapper.callNext(emitter, message);
PromiseWrapper.setTimeout(() => {
expect(pipe.transform(newEmitter)).toBe(NO_CHANGE);
async.done();
}, 0)
}));
it("should request a change detection check upon receiving a new value",
inject([AsyncTestCompleter], (async) => {
pipe.transform(emitter);
ObservableWrapper.callNext(emitter, message);
PromiseWrapper.setTimeout(() => {
expect(ref.spy('requestCheck')).toHaveBeenCalled();
async.done();
}, 0)
}));
});
describe("onDestroy", () => {
it("should do nothing when no subscription", () => {
pipe.onDestroy();
});
it("should dispose of the existing subscription", inject([AsyncTestCompleter], (async) => {
pipe.transform(emitter);
pipe.onDestroy();
ObservableWrapper.callNext(emitter, message);
PromiseWrapper.setTimeout(() => {
expect(pipe.transform(emitter)).toBe(null);
async.done();
}, 0)
}));
});
});
}
@proxy
@IMPLEMENTS(ChangeDetectorRef)
class SpyChangeDetectorRef extends SpyObject {
constructor(){super(ChangeDetectorRef);}
noSuchMethod(m){return super.noSuchMethod(m)}
}