fix(EventEmitter): resolve onError and onComplete asynchronously
closes #4443
This commit is contained in:
		
							parent
							
								
									b4de41b74e
								
							
						
					
					
						commit
						019cb41dd8
					
				| @ -1,4 +1,4 @@ | |||||||
| import {global, isPresent} from 'angular2/src/facade/lang'; | import {global, isPresent, noop} from 'angular2/src/facade/lang'; | ||||||
| // We make sure promises are in a separate file so that we can use promises
 | // We make sure promises are in a separate file so that we can use promises
 | ||||||
| // without depending on rxjs.
 | // without depending on rxjs.
 | ||||||
| import {PromiseWrapper, Promise, PromiseCompleter} from 'angular2/src/facade/promise'; | import {PromiseWrapper, Promise, PromiseCompleter} from 'angular2/src/facade/promise'; | ||||||
| @ -27,6 +27,8 @@ export class ObservableWrapper { | |||||||
|   // TODO(vsavkin): when we use rxnext, try inferring the generic type from the first arg
 |   // TODO(vsavkin): when we use rxnext, try inferring the generic type from the first arg
 | ||||||
|   static subscribe<T>(emitter: any, onNext: (value: T) => void, onError?: (exception: any) => void, |   static subscribe<T>(emitter: any, onNext: (value: T) => void, onError?: (exception: any) => void, | ||||||
|                       onComplete: () => void = () => {}): Object { |                       onComplete: () => void = () => {}): Object { | ||||||
|  |     onError = (typeof onError === "function") && onError || noop; | ||||||
|  |     onComplete = (typeof onComplete === "function") && onComplete || noop; | ||||||
|     return emitter.subscribe({next: onNext, error: onError, complete: onComplete}); |     return emitter.subscribe({next: onNext, error: onError, complete: onComplete}); | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
| @ -117,20 +119,39 @@ export class EventEmitter<T> extends Subject<T> { | |||||||
|   next(value: any) { super.next(value); } |   next(value: any) { super.next(value); } | ||||||
| 
 | 
 | ||||||
|   subscribe(generatorOrNext?: any, error?: any, complete?: any): any { |   subscribe(generatorOrNext?: any, error?: any, complete?: any): any { | ||||||
|  |     let schedulerFn; | ||||||
|  |     let errorFn = (err: any) => null; | ||||||
|  |     let completeFn = () => null; | ||||||
|  | 
 | ||||||
|     if (generatorOrNext && typeof generatorOrNext === 'object') { |     if (generatorOrNext && typeof generatorOrNext === 'object') { | ||||||
|       let schedulerFn = this._isAsync ? |       schedulerFn = this._isAsync ? (value) => { setTimeout(() => generatorOrNext.next(value)); } : | ||||||
|                             (value) => { setTimeout(() => generatorOrNext.next(value)); } : |  | ||||||
|                                     (value) => { generatorOrNext.next(value); }; |                                     (value) => { generatorOrNext.next(value); }; | ||||||
|       return super.subscribe(schedulerFn, | 
 | ||||||
|                              (err) => generatorOrNext.error ? generatorOrNext.error(err) : null, |       if (generatorOrNext.error) { | ||||||
|                              () => generatorOrNext.complete ? generatorOrNext.complete() : null); |         errorFn = this._isAsync ? (err) => { setTimeout(() => generatorOrNext.error(err)); } : | ||||||
|  |                                   (err) => { generatorOrNext.error(err); }; | ||||||
|  |       } | ||||||
|  | 
 | ||||||
|  |       if (generatorOrNext.complete) { | ||||||
|  |         completeFn = this._isAsync ? () => { setTimeout(() => generatorOrNext.complete()); } : | ||||||
|  |                                      () => { generatorOrNext.complete(); }; | ||||||
|  |       } | ||||||
|     } else { |     } else { | ||||||
|       let schedulerFn = this._isAsync ? (value) => { setTimeout(() => generatorOrNext(value)); } : |       schedulerFn = this._isAsync ? (value) => { setTimeout(() => generatorOrNext(value)); } : | ||||||
|                                     (value) => { generatorOrNext(value); }; |                                     (value) => { generatorOrNext(value); }; | ||||||
| 
 | 
 | ||||||
|       return super.subscribe(schedulerFn, (err) => error ? error(err) : null, |       if (error) { | ||||||
|                              () => complete ? complete() : null); |         errorFn = | ||||||
|  |             this._isAsync ? (err) => { setTimeout(() => error(err)); } : (err) => { error(err); }; | ||||||
|       } |       } | ||||||
|  | 
 | ||||||
|  |       if (complete) { | ||||||
|  |         completeFn = | ||||||
|  |             this._isAsync ? () => { setTimeout(() => complete()); } : () => { complete(); }; | ||||||
|  |       } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     return super.subscribe(schedulerFn, errorFn, completeFn); | ||||||
|   } |   } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -126,6 +126,8 @@ export function isDate(obj): boolean { | |||||||
|   return obj instanceof Date && !isNaN(obj.valueOf()); |   return obj instanceof Date && !isNaN(obj.valueOf()); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | export function noop() {} | ||||||
|  | 
 | ||||||
| export function stringify(token): string { | export function stringify(token): string { | ||||||
|   if (typeof token === 'string') { |   if (typeof token === 'string') { | ||||||
|     return token; |     return token; | ||||||
|  | |||||||
| @ -62,17 +62,42 @@ export function main() { | |||||||
|       expect(called).toBe(false); |       expect(called).toBe(false); | ||||||
|     }); |     }); | ||||||
| 
 | 
 | ||||||
|     it('delivers events asynchronously', inject([AsyncTestCompleter], (async) => { |     it("delivers next and error events asynchronously", inject([AsyncTestCompleter], (async) => { | ||||||
|          var e = new EventEmitter(); |          let log = []; | ||||||
|          var log = []; |          ObservableWrapper.subscribe(emitter, | ||||||
|          ObservableWrapper.subscribe(e, (x) => { |                                      (x) => { | ||||||
|                                        log.push(x); |                                        log.push(x); | ||||||
|            expect(log).toEqual([1, 3, 2]); |                                        expect(log).toEqual([1, 3, 5, 2]); | ||||||
|  |                                      }, | ||||||
|  |                                      (err) => { | ||||||
|  |                                        log.push(err); | ||||||
|  |                                        expect(log).toEqual([1, 3, 5, 2, 4]); | ||||||
|                                        async.done(); |                                        async.done(); | ||||||
|                                      }); |                                      }); | ||||||
|          log.push(1); |          log.push(1); | ||||||
|          ObservableWrapper.callEmit(e, 2); |          ObservableWrapper.callEmit(emitter, 2); | ||||||
|          log.push(3); |          log.push(3); | ||||||
|  |          ObservableWrapper.callError(emitter, 4); | ||||||
|  |          log.push(5); | ||||||
|  |        })); | ||||||
|  | 
 | ||||||
|  |     it("delivers next and complete events asynchronously", inject([AsyncTestCompleter], (async) => { | ||||||
|  |          let log = []; | ||||||
|  |          ObservableWrapper.subscribe(emitter, | ||||||
|  |                                      (x) => { | ||||||
|  |                                        log.push(x); | ||||||
|  |                                        expect(log).toEqual([1, 3, 5, 2]); | ||||||
|  |                                      }, | ||||||
|  |                                      null, () => { | ||||||
|  |                                        log.push(4); | ||||||
|  |                                        expect(log).toEqual([1, 3, 5, 2, 4]); | ||||||
|  |                                        async.done(); | ||||||
|  |                                      }); | ||||||
|  |          log.push(1); | ||||||
|  |          ObservableWrapper.callEmit(emitter, 2); | ||||||
|  |          log.push(3); | ||||||
|  |          ObservableWrapper.callComplete(emitter); | ||||||
|  |          log.push(5); | ||||||
|        })); |        })); | ||||||
| 
 | 
 | ||||||
|     it('delivers events synchronously', () => { |     it('delivers events synchronously', () => { | ||||||
| @ -110,6 +135,15 @@ export function main() { | |||||||
|       expect(ObservableWrapper.isObservable(e)).toBe(true); |       expect(ObservableWrapper.isObservable(e)).toBe(true); | ||||||
|     }); |     }); | ||||||
| 
 | 
 | ||||||
|  |     it('should subscribe to EventEmitters', () => { | ||||||
|  |       let e = new EventEmitter(false); | ||||||
|  | 
 | ||||||
|  |       ObservableWrapper.subscribe(e, (val) => {}); | ||||||
|  | 
 | ||||||
|  |       ObservableWrapper.callEmit(e, 1); | ||||||
|  |       ObservableWrapper.callComplete(e); | ||||||
|  |     }); | ||||||
|  | 
 | ||||||
|   }); |   }); | ||||||
| 
 | 
 | ||||||
|   // See ECMAScript 6 Spec 25.4.4.1
 |   // See ECMAScript 6 Spec 25.4.4.1
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user