refactor(ngcc): abstract `onTaskCompleted` out of executors (#36083)

Moving the definition of the `onTaskCompleted` callback into `mainNgcc()`
allows it to be configured based on options passed in there more easily.
This will be the case when we want to configure whether to log or throw
an error for tasks that failed to be processed successfully.

This commit also creates two new folders and moves the code around a bit
to make it easier to navigate the code§:

* `execution/tasks`: specific helpers such as task completion handlers
* `execution/tasks/queues`: the `TaskQueue` implementations and helpers

PR Close #36083
This commit is contained in:
Pete Bacon Darwin 2020-03-14 13:38:27 +00:00 committed by Andrew Kushnir
parent 712f2642d5
commit 39d4016fe9
19 changed files with 303 additions and 181 deletions

View File

@ -5,10 +5,7 @@
* Use of this source code is governed by an MIT-style license that can be * Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license * found in the LICENSE file at https://angular.io/license
*/ */
import {Task, TaskCompletedCallback, TaskQueue} from './tasks/api';
import {EntryPoint, EntryPointJsonProperty, JsonObject} from '../packages/entry_point';
import {PartiallyOrderedList} from '../utils';
/** /**
* The type of the function that analyzes entry-points and creates the list of tasks. * The type of the function that analyzes entry-points and creates the list of tasks.
@ -32,92 +29,3 @@ export interface Executor {
execute(analyzeEntryPoints: AnalyzeEntryPointsFn, createCompileFn: CreateCompileFn): execute(analyzeEntryPoints: AnalyzeEntryPointsFn, createCompileFn: CreateCompileFn):
void|Promise<void>; void|Promise<void>;
} }
/**
* Represents a partially ordered list of tasks.
*
* The ordering/precedence of tasks is determined by the inter-dependencies between their associated
* entry-points. Specifically, the tasks' order/precedence is such that tasks associated to
* dependent entry-points always come after tasks associated with their dependencies.
*
* As result of this ordering, it is guaranteed that - by processing tasks in the order in which
* they appear in the list - a task's dependencies will always have been processed before processing
* the task itself.
*
* See `DependencyResolver#sortEntryPointsByDependency()`.
*/
export type PartiallyOrderedTasks = PartiallyOrderedList<Task>;
/** Represents a unit of work: processing a specific format property of an entry-point. */
export interface Task extends JsonObject {
/** The `EntryPoint` which needs to be processed as part of the task. */
entryPoint: EntryPoint;
/**
* The `package.json` format property to process (i.e. the property which points to the file that
* is the program entry-point).
*/
formatProperty: EntryPointJsonProperty;
/**
* The list of all format properties (including `task.formatProperty`) that should be marked as
* processed once the task has been completed, because they point to the format-path that will be
* processed as part of the task.
*/
formatPropertiesToMarkAsProcessed: EntryPointJsonProperty[];
/** Whether to also process typings for this entry-point as part of the task. */
processDts: boolean;
}
/** A function to be called once a task has been processed. */
export type TaskCompletedCallback =
(task: Task, outcome: TaskProcessingOutcome, message: string | null) => void;
/** Represents the outcome of processing a `Task`. */
export const enum TaskProcessingOutcome {
/** Successfully processed the target format property. */
Processed,
/** Failed to process the target format. */
Failed,
}
/**
* A wrapper around a list of tasks and providing utility methods for getting the next task of
* interest and determining when all tasks have been completed.
*
* (This allows different implementations to impose different constraints on when a task's
* processing can start.)
*/
export interface TaskQueue {
/** Whether all tasks have been completed. */
allTasksCompleted: boolean;
/**
* Get the next task whose processing can start (if any).
*
* This implicitly marks the task as in-progress.
* (This information is used to determine whether all tasks have been completed.)
*
* @return The next task available for processing or `null`, if no task can be processed at the
* moment (including if there are no more unprocessed tasks).
*/
getNextTask(): Task|null;
/**
* Mark a task as completed.
*
* This removes the task from the internal list of in-progress tasks.
* (This information is used to determine whether all tasks have been completed.)
*
* @param task The task to mark as completed.
*/
markTaskCompleted(task: Task): void;
/**
* Return a string representation of the task queue (for debugging purposes).
*
* @return A string representation of the task queue.
*/
toString(): string;
}

View File

@ -9,7 +9,7 @@
import {AbsoluteFsPath} from '../../../../src/ngtsc/file_system'; import {AbsoluteFsPath} from '../../../../src/ngtsc/file_system';
import {JsonObject} from '../../packages/entry_point'; import {JsonObject} from '../../packages/entry_point';
import {PackageJsonChange} from '../../writing/package_json_updater'; import {PackageJsonChange} from '../../writing/package_json_updater';
import {Task, TaskProcessingOutcome} from '../api'; import {Task, TaskProcessingOutcome} from '../tasks/api';
/** A message reporting that an unrecoverable error occurred. */ /** A message reporting that an unrecoverable error occurred. */

View File

@ -14,6 +14,7 @@ import {AsyncLocker} from '../../locking/async_locker';
import {Logger} from '../../logging/logger'; import {Logger} from '../../logging/logger';
import {PackageJsonUpdater} from '../../writing/package_json_updater'; import {PackageJsonUpdater} from '../../writing/package_json_updater';
import {AnalyzeEntryPointsFn, CreateCompileFn, Executor} from '../api'; import {AnalyzeEntryPointsFn, CreateCompileFn, Executor} from '../api';
import {CreateTaskCompletedCallback} from '../tasks/api';
import {ClusterMaster} from './master'; import {ClusterMaster} from './master';
import {ClusterWorker} from './worker'; import {ClusterWorker} from './worker';
@ -26,7 +27,8 @@ import {ClusterWorker} from './worker';
export class ClusterExecutor implements Executor { export class ClusterExecutor implements Executor {
constructor( constructor(
private workerCount: number, private logger: Logger, private workerCount: number, private logger: Logger,
private pkgJsonUpdater: PackageJsonUpdater, private lockFile: AsyncLocker) {} private pkgJsonUpdater: PackageJsonUpdater, private lockFile: AsyncLocker,
private createTaskCompletedCallback: CreateTaskCompletedCallback) {}
async execute(analyzeEntryPoints: AnalyzeEntryPointsFn, createCompileFn: CreateCompileFn): async execute(analyzeEntryPoints: AnalyzeEntryPointsFn, createCompileFn: CreateCompileFn):
Promise<void> { Promise<void> {
@ -36,7 +38,8 @@ export class ClusterExecutor implements Executor {
this.logger.debug( this.logger.debug(
`Running ngcc on ${this.constructor.name} (using ${this.workerCount} worker processes).`); `Running ngcc on ${this.constructor.name} (using ${this.workerCount} worker processes).`);
const master = new ClusterMaster( const master = new ClusterMaster(
this.workerCount, this.logger, this.pkgJsonUpdater, analyzeEntryPoints); this.workerCount, this.logger, this.pkgJsonUpdater, analyzeEntryPoints,
this.createTaskCompletedCallback);
return master.run(); return master.run();
}); });
} else { } else {

View File

@ -13,8 +13,9 @@ import * as cluster from 'cluster';
import {resolve} from '../../../../src/ngtsc/file_system'; import {resolve} from '../../../../src/ngtsc/file_system';
import {Logger} from '../../logging/logger'; import {Logger} from '../../logging/logger';
import {PackageJsonUpdater} from '../../writing/package_json_updater'; import {PackageJsonUpdater} from '../../writing/package_json_updater';
import {AnalyzeEntryPointsFn, Task, TaskQueue} from '../api'; import {AnalyzeEntryPointsFn} from '../api';
import {onTaskCompleted, stringifyTask} from '../utils'; import {CreateTaskCompletedCallback, Task, TaskCompletedCallback, TaskQueue} from '../tasks/api';
import {stringifyTask} from '../tasks/utils';
import {MessageFromWorker, TaskCompletedMessage, UpdatePackageJsonMessage} from './api'; import {MessageFromWorker, TaskCompletedMessage, UpdatePackageJsonMessage} from './api';
import {Deferred, sendMessageToWorker} from './utils'; import {Deferred, sendMessageToWorker} from './utils';
@ -29,15 +30,18 @@ export class ClusterMaster {
private processingStartTime: number = -1; private processingStartTime: number = -1;
private taskAssignments = new Map<number, Task|null>(); private taskAssignments = new Map<number, Task|null>();
private taskQueue: TaskQueue; private taskQueue: TaskQueue;
private onTaskCompleted: TaskCompletedCallback;
constructor( constructor(
private workerCount: number, private logger: Logger, private workerCount: number, private logger: Logger,
private pkgJsonUpdater: PackageJsonUpdater, analyzeEntryPoints: AnalyzeEntryPointsFn) { private pkgJsonUpdater: PackageJsonUpdater, analyzeEntryPoints: AnalyzeEntryPointsFn,
createTaskCompletedCallback: CreateTaskCompletedCallback) {
if (!cluster.isMaster) { if (!cluster.isMaster) {
throw new Error('Tried to instantiate `ClusterMaster` on a worker process.'); throw new Error('Tried to instantiate `ClusterMaster` on a worker process.');
} }
this.taskQueue = analyzeEntryPoints(); this.taskQueue = analyzeEntryPoints();
this.onTaskCompleted = createTaskCompletedCallback(this.taskQueue);
} }
run(): Promise<void> { run(): Promise<void> {
@ -206,7 +210,7 @@ export class ClusterMaster {
JSON.stringify(msg)); JSON.stringify(msg));
} }
onTaskCompleted(this.pkgJsonUpdater, task, msg.outcome); this.onTaskCompleted(task, msg.outcome, msg.message);
this.taskQueue.markTaskCompleted(task); this.taskQueue.markTaskCompleted(task);
this.taskAssignments.set(workerId, null); this.taskAssignments.set(workerId, null);

View File

@ -12,7 +12,7 @@ import * as cluster from 'cluster';
import {Logger} from '../../logging/logger'; import {Logger} from '../../logging/logger';
import {CompileFn, CreateCompileFn} from '../api'; import {CompileFn, CreateCompileFn} from '../api';
import {stringifyTask} from '../utils'; import {stringifyTask} from '../tasks/utils';
import {MessageToWorker} from './api'; import {MessageToWorker} from './api';
import {sendMessageToMaster} from './utils'; import {sendMessageToMaster} from './utils';

View File

@ -9,21 +9,21 @@
import {AsyncLocker} from '../locking/async_locker'; import {AsyncLocker} from '../locking/async_locker';
import {SyncLocker} from '../locking/sync_locker'; import {SyncLocker} from '../locking/sync_locker';
import {Logger} from '../logging/logger'; import {Logger} from '../logging/logger';
import {PackageJsonUpdater} from '../writing/package_json_updater';
import {AnalyzeEntryPointsFn, CreateCompileFn, Executor} from './api'; import {AnalyzeEntryPointsFn, CreateCompileFn, Executor} from './api';
import {onTaskCompleted} from './utils'; import {CreateTaskCompletedCallback} from './tasks/api';
export abstract class SingleProcessorExecutorBase { export abstract class SingleProcessorExecutorBase {
constructor(private logger: Logger, private pkgJsonUpdater: PackageJsonUpdater) {} constructor(
private logger: Logger, private createTaskCompletedCallback: CreateTaskCompletedCallback) {}
doExecute(analyzeEntryPoints: AnalyzeEntryPointsFn, createCompileFn: CreateCompileFn): doExecute(analyzeEntryPoints: AnalyzeEntryPointsFn, createCompileFn: CreateCompileFn):
void|Promise<void> { void|Promise<void> {
this.logger.debug(`Running ngcc on ${this.constructor.name}.`); this.logger.debug(`Running ngcc on ${this.constructor.name}.`);
const taskQueue = analyzeEntryPoints(); const taskQueue = analyzeEntryPoints();
const compile = const onTaskCompleted = this.createTaskCompletedCallback(taskQueue);
createCompileFn((task, outcome) => onTaskCompleted(this.pkgJsonUpdater, task, outcome)); const compile = createCompileFn(onTaskCompleted);
// Process all tasks. // Process all tasks.
this.logger.debug('Processing tasks...'); this.logger.debug('Processing tasks...');
@ -44,8 +44,10 @@ export abstract class SingleProcessorExecutorBase {
* An `Executor` that processes all tasks serially and completes synchronously. * An `Executor` that processes all tasks serially and completes synchronously.
*/ */
export class SingleProcessExecutorSync extends SingleProcessorExecutorBase implements Executor { export class SingleProcessExecutorSync extends SingleProcessorExecutorBase implements Executor {
constructor(logger: Logger, pkgJsonUpdater: PackageJsonUpdater, private lockFile: SyncLocker) { constructor(
super(logger, pkgJsonUpdater); logger: Logger, private lockFile: SyncLocker,
createTaskCompletedCallback: CreateTaskCompletedCallback) {
super(logger, createTaskCompletedCallback);
} }
execute(analyzeEntryPoints: AnalyzeEntryPointsFn, createCompileFn: CreateCompileFn): void { execute(analyzeEntryPoints: AnalyzeEntryPointsFn, createCompileFn: CreateCompileFn): void {
this.lockFile.lock(() => this.doExecute(analyzeEntryPoints, createCompileFn)); this.lockFile.lock(() => this.doExecute(analyzeEntryPoints, createCompileFn));
@ -56,8 +58,10 @@ export class SingleProcessExecutorSync extends SingleProcessorExecutorBase imple
* An `Executor` that processes all tasks serially, but still completes asynchronously. * An `Executor` that processes all tasks serially, but still completes asynchronously.
*/ */
export class SingleProcessExecutorAsync extends SingleProcessorExecutorBase implements Executor { export class SingleProcessExecutorAsync extends SingleProcessorExecutorBase implements Executor {
constructor(logger: Logger, pkgJsonUpdater: PackageJsonUpdater, private lockFile: AsyncLocker) { constructor(
super(logger, pkgJsonUpdater); logger: Logger, private lockFile: AsyncLocker,
createTaskCompletedCallback: CreateTaskCompletedCallback) {
super(logger, createTaskCompletedCallback);
} }
async execute(analyzeEntryPoints: AnalyzeEntryPointsFn, createCompileFn: CreateCompileFn): async execute(analyzeEntryPoints: AnalyzeEntryPointsFn, createCompileFn: CreateCompileFn):
Promise<void> { Promise<void> {

View File

@ -0,0 +1,113 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
import {EntryPoint, EntryPointJsonProperty, JsonObject} from '../../packages/entry_point';
import {PartiallyOrderedList} from '../../utils';
/**
* Represents a unit of work to be undertaken by an `Executor`.
*
* A task consists of processing a specific format property of an entry-point.
* This may or may not also include processing the typings for that entry-point, which only needs to
* happen once across all the formats.
*/
export interface Task extends JsonObject {
/** The `EntryPoint` which needs to be processed as part of the task. */
entryPoint: EntryPoint;
/**
* The `package.json` format property to process (i.e. the property which points to the file that
* is the program entry-point).
*/
formatProperty: EntryPointJsonProperty;
/**
* The list of all format properties (including `task.formatProperty`) that should be marked as
* processed once the task has been completed, because they point to the format-path that will be
* processed as part of the task.
*/
formatPropertiesToMarkAsProcessed: EntryPointJsonProperty[];
/** Whether to also process typings for this entry-point as part of the task. */
processDts: boolean;
}
/**
* Represents a partially ordered list of tasks.
*
* The ordering/precedence of tasks is determined by the inter-dependencies between their associated
* entry-points. Specifically, the tasks' order/precedence is such that tasks associated to
* dependent entry-points always come after tasks associated with their dependencies.
*
* As result of this ordering, it is guaranteed that - by processing tasks in the order in which
* they appear in the list - a task's dependencies will always have been processed before processing
* the task itself.
*
* See `DependencyResolver#sortEntryPointsByDependency()`.
*/
export type PartiallyOrderedTasks = PartiallyOrderedList<Task>;
/**
* A function to create a TaskCompletedCallback function.
*/
export type CreateTaskCompletedCallback = (taskQueue: TaskQueue) => TaskCompletedCallback;
/**
* A function to be called once a task has been processed.
*/
export type TaskCompletedCallback =
(task: Task, outcome: TaskProcessingOutcome, message: string | null) => void;
/**
* Represents the outcome of processing a `Task`.
*/
export const enum TaskProcessingOutcome {
/** Successfully processed the target format property. */
Processed,
/** Failed to process the target format. */
Failed,
}
/**
* A wrapper around a list of tasks and providing utility methods for getting the next task of
* interest and determining when all tasks have been completed.
*
* (This allows different implementations to impose different constraints on when a task's
* processing can start.)
*/
export interface TaskQueue {
/** Whether all tasks have been completed. */
allTasksCompleted: boolean;
/**
* Get the next task whose processing can start (if any).
*
* This implicitly marks the task as in-progress.
* (This information is used to determine whether all tasks have been completed.)
*
* @return The next task available for processing or `null`, if no task can be processed at the
* moment (including if there are no more unprocessed tasks).
*/
getNextTask(): Task|null;
/**
* Mark a task as completed.
*
* This removes the task from the internal list of in-progress tasks.
* (This information is used to determine whether all tasks have been completed.)
*
* @param task The task to mark as completed.
*/
markTaskCompleted(task: Task): void;
/**
* Return a string representation of the task queue (for debugging purposes).
*
* @return A string representation of the task queue.
*/
toString(): string;
}

View File

@ -0,0 +1,72 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
import {FileSystem, resolve} from '../../../../src/ngtsc/file_system';
import {markAsProcessed} from '../../packages/build_marker';
import {PackageJsonFormatProperties, getEntryPointFormat} from '../../packages/entry_point';
import {PackageJsonUpdater} from '../../writing/package_json_updater';
import {Task, TaskCompletedCallback, TaskProcessingOutcome} from './api';
/**
* A function that can handle a specific outcome of a task completion.
*
* These functions can be composed using the `composeTaskCompletedCallbacks()`
* to create a `TaskCompletedCallback` function that can be passed to an `Executor`.
*/
export type TaskCompletedHandler = (task: Task, message: string | null) => void;
/**
* Compose a group of TaskCompletedHandlers into a single TaskCompletedCallback.
*
* The compose callback will receive an outcome and will delegate to the appropriate handler based
* on this outcome.
*
* @param callbacks a map of outcomes to handlers.
*/
export function composeTaskCompletedCallbacks(
callbacks: Record<TaskProcessingOutcome, TaskCompletedHandler>): TaskCompletedCallback {
return (task: Task, outcome: TaskProcessingOutcome, message: string | null): void => {
const callback = callbacks[outcome];
if (callback === undefined) {
throw new Error(
`Unknown task outcome: "${outcome}" - supported outcomes: ${JSON.stringify(Object.keys(callbacks))}`);
}
callback(task, message);
};
}
/**
* Create a handler that will mark the entry-points in a package as being processed.
*
* @param pkgJsonUpdater The service used to update the package.json
*/
export function createMarkAsProcessedHandler(pkgJsonUpdater: PackageJsonUpdater):
TaskCompletedHandler {
return (task: Task): void => {
const {entryPoint, formatPropertiesToMarkAsProcessed, processDts} = task;
const packageJsonPath = resolve(entryPoint.path, 'package.json');
const propsToMarkAsProcessed: PackageJsonFormatProperties[] =
[...formatPropertiesToMarkAsProcessed];
if (processDts) {
propsToMarkAsProcessed.push('typings');
}
markAsProcessed(
pkgJsonUpdater, entryPoint.packageJson, packageJsonPath, propsToMarkAsProcessed);
};
}
/**
* Create a handler that will throw an error.
*/
export function createThrowErrorHandler(fs: FileSystem): TaskCompletedHandler {
return (task: Task, message: string | null): void => {
const format = getEntryPointFormat(fs, task.entryPoint, task.formatProperty);
throw new Error(
`Failed to compile entry-point ${task.entryPoint.name} (${task.formatProperty} as ${format})` +
(message !== null ? ` due to ${message}` : ''));
};
}

View File

@ -8,7 +8,7 @@
import {DepGraph} from 'dependency-graph'; import {DepGraph} from 'dependency-graph';
import {EntryPoint} from '../../packages/entry_point'; import {EntryPoint} from '../../../packages/entry_point';
import {PartiallyOrderedTasks, Task} from '../api'; import {PartiallyOrderedTasks, Task} from '../api';
import {stringifyTask} from '../utils'; import {stringifyTask} from '../utils';

View File

@ -0,0 +1,12 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
import {Task} from './api';
/** Stringify a task for debugging purposes. */
export const stringifyTask = (task: Task): string =>
`{entryPoint: ${task.entryPoint.name}, formatProperty: ${task.formatProperty}, processDts: ${task.processDts}}`;

View File

@ -1,38 +0,0 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
import {resolve} from '../../../src/ngtsc/file_system';
import {markAsProcessed} from '../packages/build_marker';
import {PackageJsonFormatProperties} from '../packages/entry_point';
import {PackageJsonUpdater} from '../writing/package_json_updater';
import {Task, TaskProcessingOutcome} from './api';
/** A helper function for handling a task's being completed. */
export const onTaskCompleted =
(pkgJsonUpdater: PackageJsonUpdater, task: Task, outcome: TaskProcessingOutcome): void => {
const {entryPoint, formatPropertiesToMarkAsProcessed, processDts} = task;
if (outcome === TaskProcessingOutcome.Processed) {
const packageJsonPath = resolve(entryPoint.path, 'package.json');
const propsToMarkAsProcessed: PackageJsonFormatProperties[] =
[...formatPropertiesToMarkAsProcessed];
if (processDts) {
propsToMarkAsProcessed.push('typings');
}
markAsProcessed(
pkgJsonUpdater, entryPoint.packageJson, packageJsonPath, propsToMarkAsProcessed);
}
};
/** Stringify a task for debugging purposes. */
export const stringifyTask = (task: Task): string =>
`{entryPoint: ${task.entryPoint.name}, formatProperty: ${task.formatProperty}, processDts: ${task.processDts}}`;

View File

@ -24,12 +24,14 @@ import {UmdDependencyHost} from './dependencies/umd_dependency_host';
import {DirectoryWalkerEntryPointFinder} from './entry_point_finder/directory_walker_entry_point_finder'; import {DirectoryWalkerEntryPointFinder} from './entry_point_finder/directory_walker_entry_point_finder';
import {EntryPointFinder} from './entry_point_finder/interface'; import {EntryPointFinder} from './entry_point_finder/interface';
import {TargetedEntryPointFinder} from './entry_point_finder/targeted_entry_point_finder'; import {TargetedEntryPointFinder} from './entry_point_finder/targeted_entry_point_finder';
import {AnalyzeEntryPointsFn, CreateCompileFn, Executor, PartiallyOrderedTasks, Task, TaskProcessingOutcome, TaskQueue} from './execution/api'; import {AnalyzeEntryPointsFn, CreateCompileFn, Executor} from './execution/api';
import {ClusterExecutor} from './execution/cluster/executor'; import {ClusterExecutor} from './execution/cluster/executor';
import {ClusterPackageJsonUpdater} from './execution/cluster/package_json_updater'; import {ClusterPackageJsonUpdater} from './execution/cluster/package_json_updater';
import {SingleProcessExecutorAsync, SingleProcessExecutorSync} from './execution/single_process_executor'; import {SingleProcessExecutorAsync, SingleProcessExecutorSync} from './execution/single_process_executor';
import {ParallelTaskQueue} from './execution/task_selection/parallel_task_queue'; import {CreateTaskCompletedCallback, PartiallyOrderedTasks, Task, TaskProcessingOutcome, TaskQueue} from './execution/tasks/api';
import {SerialTaskQueue} from './execution/task_selection/serial_task_queue'; import {composeTaskCompletedCallbacks, createMarkAsProcessedHandler, createThrowErrorHandler} from './execution/tasks/completion';
import {ParallelTaskQueue} from './execution/tasks/queues/parallel_task_queue';
import {SerialTaskQueue} from './execution/tasks/queues/serial_task_queue';
import {AsyncLocker} from './locking/async_locker'; import {AsyncLocker} from './locking/async_locker';
import {LockFileWithChildProcess} from './locking/lock_file_with_child_process'; import {LockFileWithChildProcess} from './locking/lock_file_with_child_process';
import {SyncLocker} from './locking/sync_locker'; import {SyncLocker} from './locking/sync_locker';
@ -294,8 +296,7 @@ export function mainNgcc({basePath, targetEntryPointPath,
} else { } else {
const errors = replaceTsWithNgInErrors( const errors = replaceTsWithNgInErrors(
ts.formatDiagnosticsWithColorAndContext(result.diagnostics, bundle.src.host)); ts.formatDiagnosticsWithColorAndContext(result.diagnostics, bundle.src.host));
throw new Error( onTaskCompleted(task, TaskProcessingOutcome.Failed, `compilation errors:\n${errors}`);
`Failed to compile entry-point ${entryPoint.name} (${formatProperty} as ${format}) due to compilation errors:\n${errors}`);
} }
logger.debug(` Successfully compiled ${entryPoint.name} : ${formatProperty}`); logger.debug(` Successfully compiled ${entryPoint.name} : ${formatProperty}`);
@ -305,7 +306,9 @@ export function mainNgcc({basePath, targetEntryPointPath,
}; };
// The executor for actually planning and getting the work done. // The executor for actually planning and getting the work done.
const executor = getExecutor(async, inParallel, logger, pkgJsonUpdater, fileSystem); const createTaskCompletedCallback = getCreateTaskCompletedCallback(pkgJsonUpdater, fileSystem);
const executor = getExecutor(
async, inParallel, logger, pkgJsonUpdater, fileSystem, createTaskCompletedCallback);
return executor.execute(analyzeEntryPoints, createCompileFn); return executor.execute(analyzeEntryPoints, createCompileFn);
} }
@ -349,9 +352,17 @@ function getTaskQueue(
return inParallel ? new ParallelTaskQueue(tasks, graph) : new SerialTaskQueue(tasks); return inParallel ? new ParallelTaskQueue(tasks, graph) : new SerialTaskQueue(tasks);
} }
function getCreateTaskCompletedCallback(
pkgJsonUpdater: PackageJsonUpdater, fileSystem: FileSystem): CreateTaskCompletedCallback {
return _taskQueue => composeTaskCompletedCallbacks({
[TaskProcessingOutcome.Processed]: createMarkAsProcessedHandler(pkgJsonUpdater),
[TaskProcessingOutcome.Failed]: createThrowErrorHandler(fileSystem),
});
}
function getExecutor( function getExecutor(
async: boolean, inParallel: boolean, logger: Logger, pkgJsonUpdater: PackageJsonUpdater, async: boolean, inParallel: boolean, logger: Logger, pkgJsonUpdater: PackageJsonUpdater,
fileSystem: FileSystem): Executor { fileSystem: FileSystem, createTaskCompletedCallback: CreateTaskCompletedCallback): Executor {
const lockFile = new LockFileWithChildProcess(fileSystem, logger); const lockFile = new LockFileWithChildProcess(fileSystem, logger);
if (async) { if (async) {
// Execute asynchronously (either serially or in parallel) // Execute asynchronously (either serially or in parallel)
@ -359,14 +370,16 @@ function getExecutor(
if (inParallel) { if (inParallel) {
// Execute in parallel. Use up to 8 CPU cores for workers, always reserving one for master. // Execute in parallel. Use up to 8 CPU cores for workers, always reserving one for master.
const workerCount = Math.min(8, os.cpus().length - 1); const workerCount = Math.min(8, os.cpus().length - 1);
return new ClusterExecutor(workerCount, logger, pkgJsonUpdater, locker); return new ClusterExecutor(
workerCount, logger, pkgJsonUpdater, locker, createTaskCompletedCallback);
} else { } else {
// Execute serially, on a single thread (async). // Execute serially, on a single thread (async).
return new SingleProcessExecutorAsync(logger, pkgJsonUpdater, locker); return new SingleProcessExecutorAsync(logger, locker, createTaskCompletedCallback);
} }
} else { } else {
// Execute serially, on a single thread (sync). // Execute serially, on a single thread (sync).
return new SingleProcessExecutorSync(logger, pkgJsonUpdater, new SyncLocker(lockFile)); return new SingleProcessExecutorSync(
logger, new SyncLocker(lockFile), createTaskCompletedCallback);
} }
} }

View File

@ -30,18 +30,21 @@ describe('ClusterExecutor', () => {
let mockLockFile: MockLockFile; let mockLockFile: MockLockFile;
let locker: AsyncLocker; let locker: AsyncLocker;
let executor: ClusterExecutor; let executor: ClusterExecutor;
let createTaskCompletedCallback: jasmine.Spy;
beforeEach(() => { beforeEach(() => {
masterRunSpy = spyOn(ClusterMaster.prototype, 'run') masterRunSpy = spyOn(ClusterMaster.prototype, 'run')
.and.returnValue(Promise.resolve('CusterMaster#run()')); .and.returnValue(Promise.resolve('CusterMaster#run()'));
workerRunSpy = spyOn(ClusterWorker.prototype, 'run') workerRunSpy = spyOn(ClusterWorker.prototype, 'run')
.and.returnValue(Promise.resolve('CusterWorker#run()')); .and.returnValue(Promise.resolve('CusterWorker#run()'));
createTaskCompletedCallback = jasmine.createSpy('createTaskCompletedCallback');
mockLogger = new MockLogger(); mockLogger = new MockLogger();
lockFileLog = []; lockFileLog = [];
mockLockFile = new MockLockFile(new MockFileSystemNative(), lockFileLog); mockLockFile = new MockLockFile(new MockFileSystemNative(), lockFileLog);
locker = new AsyncLocker(mockLockFile, mockLogger, 200, 2); locker = new AsyncLocker(mockLockFile, mockLogger, 200, 2);
executor = new ClusterExecutor(42, mockLogger, null as unknown as PackageJsonUpdater, locker); executor = new ClusterExecutor(
42, mockLogger, null as unknown as PackageJsonUpdater, locker, createTaskCompletedCallback);
}); });
describe('execute()', () => { describe('execute()', () => {
@ -98,8 +101,9 @@ describe('ClusterExecutor', () => {
throw new Error('LockFile.write() error'); throw new Error('LockFile.write() error');
}); });
executor = executor = new ClusterExecutor(
new ClusterExecutor(42, mockLogger, null as unknown as PackageJsonUpdater, locker); 42, mockLogger, null as unknown as PackageJsonUpdater, locker,
createTaskCompletedCallback);
let error = ''; let error = '';
try { try {
await executor.execute(anyFn, anyFn); await executor.execute(anyFn, anyFn);
@ -117,8 +121,9 @@ describe('ClusterExecutor', () => {
throw new Error('LockFile.remove() error'); throw new Error('LockFile.remove() error');
}); });
executor = executor = new ClusterExecutor(
new ClusterExecutor(42, mockLogger, null as unknown as PackageJsonUpdater, locker); 42, mockLogger, null as unknown as PackageJsonUpdater, locker,
createTaskCompletedCallback);
let error = ''; let error = '';
try { try {
await executor.execute(anyFn, anyFn); await executor.execute(anyFn, anyFn);

View File

@ -11,8 +11,8 @@
import * as cluster from 'cluster'; import * as cluster from 'cluster';
import {EventEmitter} from 'events'; import {EventEmitter} from 'events';
import {Task, TaskCompletedCallback, TaskProcessingOutcome} from '../../../src/execution/api';
import {ClusterWorker} from '../../../src/execution/cluster/worker'; import {ClusterWorker} from '../../../src/execution/cluster/worker';
import {Task, TaskCompletedCallback, TaskProcessingOutcome} from '../../../src/execution/tasks/api';
import {MockLogger} from '../../helpers/mock_logger'; import {MockLogger} from '../../helpers/mock_logger';
import {mockProperty} from '../../helpers/spy_utils'; import {mockProperty} from '../../helpers/spy_utils';

View File

@ -10,9 +10,8 @@
import {MockFileSystemNative} from '../../../src/ngtsc/file_system/testing'; import {MockFileSystemNative} from '../../../src/ngtsc/file_system/testing';
import {SingleProcessExecutorSync} from '../../src/execution/single_process_executor'; import {SingleProcessExecutorSync} from '../../src/execution/single_process_executor';
import {SerialTaskQueue} from '../../src/execution/task_selection/serial_task_queue'; import {Task, TaskQueue} from '../../src/execution/tasks/api';
import {SyncLocker} from '../../src/locking/sync_locker'; import {SyncLocker} from '../../src/locking/sync_locker';
import {PackageJsonUpdater} from '../../src/writing/package_json_updater';
import {MockLockFile} from '../helpers/mock_lock_file'; import {MockLockFile} from '../helpers/mock_lock_file';
import {MockLogger} from '../helpers/mock_logger'; import {MockLogger} from '../helpers/mock_logger';
@ -23,20 +22,33 @@ describe('SingleProcessExecutor', () => {
let mockLockFile: MockLockFile; let mockLockFile: MockLockFile;
let locker: SyncLocker; let locker: SyncLocker;
let executor: SingleProcessExecutorSync; let executor: SingleProcessExecutorSync;
let createTaskCompletedCallback: jasmine.Spy;
beforeEach(() => { beforeEach(() => {
mockLogger = new MockLogger(); mockLogger = new MockLogger();
lockFileLog = []; lockFileLog = [];
mockLockFile = new MockLockFile(new MockFileSystemNative(), lockFileLog); mockLockFile = new MockLockFile(new MockFileSystemNative(), lockFileLog);
locker = new SyncLocker(mockLockFile); locker = new SyncLocker(mockLockFile);
executor = createTaskCompletedCallback = jasmine.createSpy('createTaskCompletedCallback');
new SingleProcessExecutorSync(mockLogger, null as unknown as PackageJsonUpdater, locker); executor = new SingleProcessExecutorSync(mockLogger, locker, createTaskCompletedCallback);
}); });
const noTasks = () => ({ allTasksCompleted: true, getNextTask: () => null } as TaskQueue);
const oneTask = () => {
let tasksCount = 1;
return <TaskQueue>{
get allTasksCompleted() { return tasksCount === 0; },
getNextTask() {
tasksCount--;
return {};
},
markTaskCompleted(_task: Task) {},
};
};
describe('execute()', () => { describe('execute()', () => {
it('should call LockFile.write() and LockFile.remove() if processing completes successfully', it('should call LockFile.write() and LockFile.remove() if processing completes successfully',
() => { () => {
const noTasks = () => new SerialTaskQueue([] as any);
const createCompileFn: () => any = () => undefined; const createCompileFn: () => any = () => undefined;
executor.execute(noTasks, createCompileFn); executor.execute(noTasks, createCompileFn);
expect(lockFileLog).toEqual(['write()', 'remove()']); expect(lockFileLog).toEqual(['write()', 'remove()']);
@ -56,7 +68,6 @@ describe('SingleProcessExecutor', () => {
}); });
it('should call LockFile.write() and LockFile.remove() if `createCompileFn` fails', () => { it('should call LockFile.write() and LockFile.remove() if `createCompileFn` fails', () => {
const oneTask = () => new SerialTaskQueue([{}] as any);
const createErrorCompileFn: () => any = () => { throw new Error('compile error'); }; const createErrorCompileFn: () => any = () => { throw new Error('compile error'); };
let error: string = ''; let error: string = '';
try { try {
@ -76,8 +87,7 @@ describe('SingleProcessExecutor', () => {
const analyzeFn: () => any = () => { lockFileLog.push('analyzeFn'); }; const analyzeFn: () => any = () => { lockFileLog.push('analyzeFn'); };
const anyFn: () => any = () => undefined; const anyFn: () => any = () => undefined;
executor = executor = new SingleProcessExecutorSync(mockLogger, locker, createTaskCompletedCallback);
new SingleProcessExecutorSync(mockLogger, null as unknown as PackageJsonUpdater, locker);
let error = ''; let error = '';
try { try {
executor.execute(analyzeFn, anyFn); executor.execute(analyzeFn, anyFn);
@ -89,15 +99,13 @@ describe('SingleProcessExecutor', () => {
}); });
it('should fail if LockFile.remove() fails', () => { it('should fail if LockFile.remove() fails', () => {
const noTasks = () => new SerialTaskQueue([] as any);
const anyFn: () => any = () => undefined; const anyFn: () => any = () => undefined;
spyOn(mockLockFile, 'remove').and.callFake(() => { spyOn(mockLockFile, 'remove').and.callFake(() => {
lockFileLog.push('remove()'); lockFileLog.push('remove()');
throw new Error('LockFile.remove() error'); throw new Error('LockFile.remove() error');
}); });
executor = executor = new SingleProcessExecutorSync(mockLogger, locker, createTaskCompletedCallback);
new SingleProcessExecutorSync(mockLogger, null as unknown as PackageJsonUpdater, locker);
let error = ''; let error = '';
try { try {
executor.execute(noTasks, anyFn); executor.execute(noTasks, anyFn);
@ -107,5 +115,23 @@ describe('SingleProcessExecutor', () => {
expect(error).toEqual('LockFile.remove() error'); expect(error).toEqual('LockFile.remove() error');
expect(lockFileLog).toEqual(['write()', 'remove()']); expect(lockFileLog).toEqual(['write()', 'remove()']);
}); });
it('should call createTaskCompletedCallback with the task queue', () => {
const createCompileFn = jasmine.createSpy('createCompileFn');
executor.execute(noTasks, createCompileFn);
expect(createTaskCompletedCallback).toHaveBeenCalledTimes(1);
expect(createTaskCompletedCallback.calls.mostRecent().args).toEqual([jasmine.objectContaining(
{allTasksCompleted: true, getNextTask: jasmine.any(Function)})]);
});
it('should pass the created TaskCompletedCallback to the createCompileFn', () => {
const createCompileFn =
jasmine.createSpy('createCompileFn').and.returnValue(function compileFn() {});
function onTaskCompleted() {}
createTaskCompletedCallback.and.returnValue(onTaskCompleted);
executor.execute(noTasks, createCompileFn);
expect(createCompileFn).toHaveBeenCalledTimes(1);
expect(createCompileFn).toHaveBeenCalledWith(onTaskCompleted);
});
}); });
}); });

View File

@ -8,9 +8,9 @@
import {DepGraph} from 'dependency-graph'; import {DepGraph} from 'dependency-graph';
import {PartiallyOrderedTasks, Task, TaskQueue} from '../../../src/execution/api'; import {PartiallyOrderedTasks, Task, TaskQueue} from '../../../../src/execution/tasks/api';
import {ParallelTaskQueue} from '../../../src/execution/task_selection/parallel_task_queue'; import {ParallelTaskQueue} from '../../../../src/execution/tasks/queues/parallel_task_queue';
import {EntryPoint} from '../../../src/packages/entry_point'; import {EntryPoint} from '../../../../src/packages/entry_point';
describe('ParallelTaskQueue', () => { describe('ParallelTaskQueue', () => {

View File

@ -6,8 +6,8 @@
* found in the LICENSE file at https://angular.io/license * found in the LICENSE file at https://angular.io/license
*/ */
import {PartiallyOrderedTasks, Task, TaskQueue} from '../../../src/execution/api'; import {PartiallyOrderedTasks, Task, TaskQueue} from '../../../../src/execution/tasks/api';
import {SerialTaskQueue} from '../../../src/execution/task_selection/serial_task_queue'; import {SerialTaskQueue} from '../../../../src/execution/tasks/queues/serial_task_queue';
describe('SerialTaskQueue', () => { describe('SerialTaskQueue', () => {