diff --git a/modules/angular2/src/change_detection/change_detection.js b/modules/angular2/src/change_detection/change_detection.js index 4f76accead..b46743b4ca 100644 --- a/modules/angular2/src/change_detection/change_detection.js +++ b/modules/angular2/src/change_detection/change_detection.js @@ -2,6 +2,7 @@ import {DynamicProtoChangeDetector, JitProtoChangeDetector} from './proto_change import {PipeRegistry} from './pipes/pipe_registry'; import {IterableChangesFactory} from './pipes/iterable_changes'; import {KeyValueChangesFactory} from './pipes/keyvalue_changes'; +import {AsyncPipeFactory} from './pipes/async_pipe'; import {NullPipeFactory} from './pipes/null_pipe'; import {DEFAULT} from './constants'; import {ChangeDetection, ProtoChangeDetector} from './interfaces'; @@ -27,9 +28,20 @@ export var iterableDiff = [ new NullPipeFactory() ]; +/** + * Async binding to such types as Observable. + * + * @exportedAs angular2/pipes + */ +export var async = [ + new AsyncPipeFactory(), + new NullPipeFactory() +]; + export var defaultPipes = { "iterableDiff" : iterableDiff, - "keyValDiff" : keyValDiff + "keyValDiff" : keyValDiff, + "async" : async }; diff --git a/modules/angular2/src/change_detection/pipes/async_pipe.js b/modules/angular2/src/change_detection/pipes/async_pipe.js new file mode 100644 index 0000000000..9f844a6790 --- /dev/null +++ b/modules/angular2/src/change_detection/pipes/async_pipe.js @@ -0,0 +1,112 @@ +import {Observable, ObservableWrapper} from 'angular2/src/facade/async'; +import {isBlank, isPresent} from 'angular2/src/facade/lang'; +import {Pipe, NO_CHANGE} from './pipe'; +import {ChangeDetectorRef} from '../change_detector_ref'; + +/** + * Implements async bindings to Observable. + * + * # Example + * + * In this example we bind the description observable to the DOM. The async pipe will convert an observable to the + * latest value it emitted. It will also request a change detection check when a new value is emitted. + * + * ``` + * @Component({ + * selector: "task-cmp", + * changeDetection: ON_PUSH + * }) + * @View({ + * inline: "Task Description {{description|async}}" + * }) + * class Task { + * description:Observable; + * } + * + * ``` + * + * @exportedAs angular2/pipes + */ +export class AsyncPipe extends Pipe { + _ref:ChangeDetectorRef; + + _latestValue:Object; + _latestReturnedValue:Object; + + _subscription:Object; + _observable:Observable; + + constructor(ref:ChangeDetectorRef) { + super(); + this._ref = ref; + this._latestValue = null; + this._latestReturnedValue = null; + this._subscription = null; + this._observable = null; + } + + supports(obs):boolean { + return ObservableWrapper.isObservable(obs); + } + + onDestroy():void { + if (isPresent(this._subscription)) { + this._dispose(); + }; + } + + transform(obs:Observable):any { + if (isBlank(this._subscription)) { + this._subscribe(obs); + return null; + } + + if (obs !== this._observable) { + this._dispose(); + return this.transform(obs); + } + + if (this._latestValue === this._latestReturnedValue) { + return NO_CHANGE; + } else { + this._latestReturnedValue = this._latestValue; + return this._latestValue; + } + } + + _subscribe(obs:Observable):void { + this._observable = obs; + this._subscription = ObservableWrapper.subscribe(obs, + value => this._updateLatestValue(value), + e => {throw e;} + ); + } + + _dispose():void { + ObservableWrapper.dispose(this._subscription); + this._latestValue = null; + this._latestReturnedValue = null; + this._subscription = null; + this._observable = null; + } + + _updateLatestValue(value:Object) { + this._latestValue = value; + this._ref.requestCheck(); + } +} + +/** + * Provides a factory for [AsyncPipe]. + * + * @exportedAs angular2/pipes + */ +export class AsyncPipeFactory { + supports(obs):boolean { + return ObservableWrapper.isObservable(obs); + } + + create(cdRef):Pipe { + return new AsyncPipe(cdRef); + } +} \ No newline at end of file diff --git a/modules/angular2/src/facade/async.dart b/modules/angular2/src/facade/async.dart index a4aa5180d8..5b35448966 100644 --- a/modules/angular2/src/facade/async.dart +++ b/modules/angular2/src/facade/async.dart @@ -37,6 +37,14 @@ class ObservableWrapper { return s.listen(onNext, onError: onError, onDone: onComplete, cancelOnError: true); } + static bool isObservable(obs) { + return obs is Stream; + } + + static void dispose(StreamSubscription s) { + s.cancel(); + } + static void callNext(EventEmitter emitter, value) { emitter.add(value); } diff --git a/modules/angular2/src/facade/async.es6 b/modules/angular2/src/facade/async.es6 index e9eff1b461..8e9fcff7b1 100644 --- a/modules/angular2/src/facade/async.es6 +++ b/modules/angular2/src/facade/async.es6 @@ -58,6 +58,14 @@ export class ObservableWrapper { return emitter.observer({next: onNext, throw: onThrow, return: onReturn}); } + static dispose(subscription:any) { + subscription.dispose(); + } + + static isObservable(obs):boolean { + return obs instanceof Observable; + } + static callNext(emitter:EventEmitter, value:any) { emitter.next(value); } diff --git a/modules/angular2/src/facade/async.ts b/modules/angular2/src/facade/async.ts index 17b5f60eaa..4b3c543c2e 100644 --- a/modules/angular2/src/facade/async.ts +++ b/modules/angular2/src/facade/async.ts @@ -54,6 +54,10 @@ export class ObservableWrapper { return emitter.observer({next: onNext, throw: onThrow, return: onReturn}); } + static isObservable(obs: any): boolean { return obs instanceof Observable; } + + static dispose(subscription: any) { subscription.dispose(); } + static callNext(emitter: EventEmitter, value: any) { emitter.next(value); } static callThrow(emitter: EventEmitter, error: any) { emitter.throw(error); } diff --git a/modules/angular2/test/change_detection/pipes/async_pipe_spec.js b/modules/angular2/test/change_detection/pipes/async_pipe_spec.js new file mode 100644 index 0000000000..218ce31e27 --- /dev/null +++ b/modules/angular2/test/change_detection/pipes/async_pipe_spec.js @@ -0,0 +1,115 @@ +import {ddescribe, describe, it, iit, xit, expect, beforeEach, afterEach, + AsyncTestCompleter, inject, proxy, SpyObject} from 'angular2/test_lib'; +import {IMPLEMENTS} from 'angular2/src/facade/lang'; + +import {AsyncPipe} from 'angular2/src/change_detection/pipes/async_pipe'; +import {NO_CHANGE} from 'angular2/src/change_detection/pipes/pipe'; +import {ChangeDetectorRef} from 'angular2/src/change_detection/change_detector_ref'; +import {EventEmitter, Observable, ObservableWrapper, PromiseWrapper} from 'angular2/src/facade/async'; + +export function main() { + describe("AsyncPipe", () => { + var emitter; + var pipe; + var ref; + var message = new Object(); + + beforeEach(() => { + emitter = new EventEmitter(); + ref = new SpyChangeDetectorRef(); + pipe = new AsyncPipe(ref); + }); + + describe("supports", () => { + it("should support observables", () => { + expect(pipe.supports(emitter)).toBe(true); + }); + + it("should not support other objects", () => { + expect(pipe.supports("string")).toBe(false); + expect(pipe.supports(null)).toBe(false); + }); + }); + + describe("transform", () => { + it("should return null when subscribing to an observable", () => { + expect(pipe.transform(emitter)).toBe(null); + }); + + it("should return the latest available value", inject([AsyncTestCompleter], (async) => { + pipe.transform(emitter); + + ObservableWrapper.callNext(emitter, message); + + PromiseWrapper.setTimeout(() => { + expect(pipe.transform(emitter)).toEqual(message); + async.done(); + }, 0) + })); + + it("should return NO_CHANGE when nothing has changed since the last call", + inject([AsyncTestCompleter], (async) => { + pipe.transform(emitter); + ObservableWrapper.callNext(emitter, message); + + PromiseWrapper.setTimeout(() => { + pipe.transform(emitter); + expect(pipe.transform(emitter)).toBe(NO_CHANGE); + async.done(); + }, 0) + })); + + it("should dispose of the existing subscription when subscribing to a new observable", + inject([AsyncTestCompleter], (async) => { + pipe.transform(emitter); + + var newEmitter = new EventEmitter(); + expect(pipe.transform(newEmitter)).toBe(null); + + // this should not affect the pipe, so it should return NO_CHANGE + ObservableWrapper.callNext(emitter, message); + + PromiseWrapper.setTimeout(() => { + expect(pipe.transform(newEmitter)).toBe(NO_CHANGE); + async.done(); + }, 0) + })); + + it("should request a change detection check upon receiving a new value", + inject([AsyncTestCompleter], (async) => { + pipe.transform(emitter); + ObservableWrapper.callNext(emitter, message); + + PromiseWrapper.setTimeout(() => { + expect(ref.spy('requestCheck')).toHaveBeenCalled(); + async.done(); + }, 0) + })); + }); + + describe("onDestroy", () => { + it("should do nothing when no subscription", () => { + pipe.onDestroy(); + }); + + it("should dispose of the existing subscription", inject([AsyncTestCompleter], (async) => { + pipe.transform(emitter); + pipe.onDestroy(); + + ObservableWrapper.callNext(emitter, message); + + PromiseWrapper.setTimeout(() => { + expect(pipe.transform(emitter)).toBe(null); + async.done(); + }, 0) + })); + }); + }); +} + +@proxy +@IMPLEMENTS(ChangeDetectorRef) +class SpyChangeDetectorRef extends SpyObject { + constructor(){super(ChangeDetectorRef);} + noSuchMethod(m){return super.noSuchMethod(m)} +}