From 9b3b3d325ff74681a5e7170b36b365c79e73dba3 Mon Sep 17 00:00:00 2001
From: vsavkin <vic.savkin@gmail.com>
Date: Tue, 24 Mar 2015 13:45:39 -0700
Subject: [PATCH] feat(facade): added support for observables

---
 gulpfile.js                                |  2 +
 karma-js.conf.js                           |  2 +
 modules/angular2/package.json              |  1 +
 modules/angular2/src/facade/async.dart     | 28 +++++-
 modules/angular2/src/facade/async.es6      | 47 +++++++++-
 modules/angular2/test/facade/async_spec.js | 99 ++++++++++++++++++++++
 package.json                               |  1 +
 test-main.js                               |  4 +-
 tools/build/snippets/runtime_paths.js      |  5 +-
 tools/transpiler/src/type_mapping.js       |  2 +
 10 files changed, 187 insertions(+), 4 deletions(-)
 create mode 100644 modules/angular2/test/facade/async_spec.js

diff --git a/gulpfile.js b/gulpfile.js
index 34474de909..5c123774ba 100644
--- a/gulpfile.js
+++ b/gulpfile.js
@@ -50,6 +50,8 @@ var _HTML_DEFAULT_SCRIPTS_JS = [
   {src: 'node_modules/zone.js/long-stack-trace-zone.js', mimeType: 'text/javascript', copy: true},
   {src: 'node_modules/systemjs/dist/system.src.js', mimeType: 'text/javascript', copy: true},
   {src: 'node_modules/systemjs/lib/extension-register.js', mimeType: 'text/javascript', copy: true},
+  {src: 'node_modules/systemjs/lib/extension-cjs.js', mimeType: 'text/javascript', copy: true},
+  {src: 'node_modules/rx/dist/rx.all.js', mimeType: 'text/javascript', copy: true},
   {src: 'tools/build/snippets/runtime_paths.js', mimeType: 'text/javascript', copy: true},
   {
     inline: 'System.import(\'$MODULENAME$\').then(function(m) { m.main(); }, console.error.bind(console))',
diff --git a/karma-js.conf.js b/karma-js.conf.js
index 8ba931d27a..dedbfda7c4 100644
--- a/karma-js.conf.js
+++ b/karma-js.conf.js
@@ -18,6 +18,8 @@ module.exports = function(config) {
       // Including systemjs because it defines `__eval`, which produces correct stack traces.
       'node_modules/systemjs/dist/system.src.js',
       'node_modules/systemjs/lib/extension-register.js',
+      'node_modules/systemjs/lib/extension-cjs.js',
+      'node_modules/rx/dist/rx.all.js',
       'node_modules/zone.js/zone.js',
       'node_modules/zone.js/long-stack-trace-zone.js',
 
diff --git a/modules/angular2/package.json b/modules/angular2/package.json
index 28689dbc92..13f36abf67 100644
--- a/modules/angular2/package.json
+++ b/modules/angular2/package.json
@@ -9,6 +9,7 @@
   "dependencies": {
     "traceur": "<%= packageJson.dependencies.traceur %>",
     "rtts_assert": "<%= packageJson.version %>",
+    "rx": "<%= packageJson.dependencies['rx'] %>",
     "zone.js": "<%= packageJson.dependencies['zone.js'] %>"
   },
   "devDependencies": <%= JSON.stringify(packageJson.devDependencies) %>
diff --git a/modules/angular2/src/facade/async.dart b/modules/angular2/src/facade/async.dart
index 08252ebc2e..e3b500123a 100644
--- a/modules/angular2/src/facade/async.dart
+++ b/modules/angular2/src/facade/async.dart
@@ -1,7 +1,7 @@
 library angular.core.facade.async;
 
 import 'dart:async';
-export 'dart:async' show Future;
+export 'dart:async' show Future, Stream, StreamController, StreamSubscription;
 
 class PromiseWrapper {
   static Future resolve(obj) => new Future.value(obj);
@@ -32,6 +32,32 @@ class PromiseWrapper {
   }
 }
 
+class ObservableWrapper {
+  static StreamSubscription subscribe(Stream s, Function onNext, [onError, onComplete]) {
+    return s.listen(onNext, onError: onError, onDone: onComplete, cancelOnError: true);
+  }
+
+  static StreamController createController() {
+    return new StreamController.broadcast();
+  }
+
+  static Stream createObservable(StreamController controller) {
+    return controller.stream;
+  }
+
+  static void callNext(StreamController controller, value) {
+    controller.add(value);
+  }
+
+  static void callThrow(StreamController controller, error) {
+    controller.addError(error);
+  }
+
+  static void callReturn(StreamController controller) {
+    controller.close();
+  }
+}
+
 class _Completer {
   final Completer c;
 
diff --git a/modules/angular2/src/facade/async.es6 b/modules/angular2/src/facade/async.es6
index 35fd57ede7..5e2ca3639c 100644
--- a/modules/angular2/src/facade/async.es6
+++ b/modules/angular2/src/facade/async.es6
@@ -1,5 +1,6 @@
-import {int, global} from 'angular2/src/facade/lang';
+import {int, global, isPresent} from 'angular2/src/facade/lang';
 import {List} from 'angular2/src/facade/collection';
+import Rx from 'rx/dist/rx.all';
 
 export var Promise = global.Promise;
 
@@ -51,3 +52,47 @@ export class PromiseWrapper {
     return maybePromise instanceof Promise;
   }
 }
+
+
+/**
+ * Use Rx.Observable but provides an adapter to make it work as specified here:
+ * https://github.com/jhusain/observable-spec
+ *
+ * Once a reference implementation of the spec is available, switch to it.
+ */
+export var Observable = Rx.Observable;
+export var ObservableController = Rx.Subject;
+
+export class ObservableWrapper {
+  static createController():Rx.Subject {
+    return new Rx.Subject();
+  }
+
+  static createObservable(subject:Rx.Subject):Observable {
+    return subject;
+  }
+
+  static subscribe(observable:Observable, generatorOrOnNext, onThrow = null, onReturn = null) {
+    if (isPresent(generatorOrOnNext.next)) {
+      return observable.observeOn(Rx.Scheduler.timeout).subscribe(
+        (value) => generatorOrOnNext.next(value),
+        (error) => generatorOrOnNext.throw(error),
+        () => generatorOrOnNext.return()
+      );
+    } else {
+      return observable.observeOn(Rx.Scheduler.timeout).subscribe(generatorOrOnNext, onThrow, onReturn);
+    }
+  }
+
+  static callNext(subject:Rx.Subject, value:any) {
+    subject.onNext(value);
+  }
+
+  static callThrow(subject:Rx.Subject, error:any) {
+    subject.onError(error);
+  }
+
+  static callReturn(subject:Rx.Subject) {
+    subject.onCompleted();
+  }
+}
\ No newline at end of file
diff --git a/modules/angular2/test/facade/async_spec.js b/modules/angular2/test/facade/async_spec.js
new file mode 100644
index 0000000000..825fb39ab6
--- /dev/null
+++ b/modules/angular2/test/facade/async_spec.js
@@ -0,0 +1,99 @@
+import {describe, it, expect, beforeEach, ddescribe, iit, xit, el,
+  SpyObject, AsyncTestCompleter, inject, IS_DARTIUM} from 'angular2/test_lib';
+
+import {ObservableWrapper, Observable, ObservableController, PromiseWrapper} from 'angular2/src/facade/async';
+
+export function main() {
+  describe('Observable', () => {
+    var obs:Observable;
+    var controller:ObservableController;
+
+    beforeEach(() => {
+      controller = ObservableWrapper.createController();
+      obs = ObservableWrapper.createObservable(controller);
+    });
+
+    it("should call the next callback",  inject([AsyncTestCompleter], (async) => {
+      ObservableWrapper.subscribe(obs, (value) => {
+        expect(value).toEqual(99);
+        async.done();
+      });
+
+      ObservableWrapper.callNext(controller, 99);
+    }));
+
+    it("should call the throw callback", inject([AsyncTestCompleter], (async) => {
+      ObservableWrapper.subscribe(obs, (_) => {}, (error) => {
+        expect(error).toEqual("Boom");
+        async.done();
+      });
+      ObservableWrapper.callThrow(controller, "Boom");
+    }));
+
+    it("should call the return callback", inject([AsyncTestCompleter], (async) => {
+      ObservableWrapper.subscribe(obs, (_) => {}, (_) => {}, () => {
+        async.done();
+      });
+
+      ObservableWrapper.callReturn(controller);
+    }));
+
+    it("should subscribe to the wrapper asynchronously", () => {
+      var called = false;
+      ObservableWrapper.subscribe(obs, (value) => {
+        called = true;
+      });
+
+      ObservableWrapper.callNext(controller, 99);
+      expect(called).toBe(false);
+    });
+
+    if (!IS_DARTIUM) {
+      // See here: https://github.com/jhusain/observable-spec
+      describe("Generator", () => {
+        var generator;
+
+        beforeEach(() => {
+          generator = new SpyObject();
+          generator.spy("next");
+          generator.spy("throw");
+          generator.spy("return");
+        });
+
+        it("should call next on the given generator",  inject([AsyncTestCompleter], (async) => {
+          generator.spy("next").andCallFake((value) => {
+            expect(value).toEqual(99);
+            async.done();
+          });
+
+          ObservableWrapper.subscribe(obs, generator);
+          ObservableWrapper.callNext(controller, 99);
+        }));
+
+        it("should call throw on the given generator", inject([AsyncTestCompleter], (async) => {
+          generator.spy("throw").andCallFake((error) => {
+            expect(error).toEqual("Boom");
+            async.done();
+          });
+          ObservableWrapper.subscribe(obs, generator);
+          ObservableWrapper.callThrow(controller, "Boom");
+        }));
+
+        it("should call return on the given generator", inject([AsyncTestCompleter], (async) => {
+          generator.spy("return").andCallFake(() => {
+            async.done();
+          });
+          ObservableWrapper.subscribe(obs, generator);
+          ObservableWrapper.callReturn(controller);
+        }));
+      });
+    }
+
+    //TODO: vsavkin: add tests cases
+    //should call dispose on the subscription if generator returns {done:true}
+    //should call dispose on the subscription on throw
+    //should call dispose on the subscription on return
+ });
+}
+
+//make sure rx observables are async
\ No newline at end of file
diff --git a/package.json b/package.json
index d4f469ff6b..434ea0d18a 100644
--- a/package.json
+++ b/package.json
@@ -27,6 +27,7 @@
     "es6-module-loader": "^0.9.2",
     "systemjs": "^0.9.1",
     "traceur": "0.0.82",
+    "rx": "2.4.6",
     "which": "~1",
     "zone.js": "0.4.0",
     "googleapis": "1.0.x",
diff --git a/test-main.js b/test-main.js
index caeeebc0f7..9c0a24f588 100644
--- a/test-main.js
+++ b/test-main.js
@@ -1,6 +1,7 @@
 // Use "register" extension from systemjs.
 // That's what Traceur outputs: `System.register()`.
 register(System);
+cjs(System);
 
 jasmine.DEFAULT_TIMEOUT_INTERVAL = 50;
 
@@ -14,7 +15,8 @@ System.baseURL = '/base/modules/';
 // So that we can import packages like `core/foo`, instead of `core/src/foo`.
 System.paths = {
   '*': './*.js',
-  'transpiler/*': '../tools/transpiler/*.js'
+  'transpiler/*': '../tools/transpiler/*.js',
+  'rx/*': '../node_modules/rx/*.js'
 }
 
 // Import all the specs, execute their `main()` method and kick off Karma (Jasmine).
diff --git a/tools/build/snippets/runtime_paths.js b/tools/build/snippets/runtime_paths.js
index 52f2464225..96ea89fc5b 100644
--- a/tools/build/snippets/runtime_paths.js
+++ b/tools/build/snippets/runtime_paths.js
@@ -1,4 +1,7 @@
 System.paths = {
-  '*': '/*.js'
+  '*': '/*.js',
+  'rx/dist/*': '*.js'
 };
 register(System);
+cjs(System);
+
diff --git a/tools/transpiler/src/type_mapping.js b/tools/transpiler/src/type_mapping.js
index cef5709784..953c8aa099 100644
--- a/tools/transpiler/src/type_mapping.js
+++ b/tools/transpiler/src/type_mapping.js
@@ -5,6 +5,8 @@ export var typeMapping = {
   'string': 'String',
   'any': 'dynamic',
   'Promise': 'Future',
+  'Observable': 'Stream',
+  'ObservableController': 'StreamController',
   'Date': 'DateTime',
   'StringMap': 'Map'
 };