diff --git a/packages/compiler-cli/ngcc/src/execution/cluster/master.ts b/packages/compiler-cli/ngcc/src/execution/cluster/master.ts index f32da5a723..eea1fc90e7 100644 --- a/packages/compiler-cli/ngcc/src/execution/cluster/master.ts +++ b/packages/compiler-cli/ngcc/src/execution/cluster/master.ts @@ -10,7 +10,7 @@ import * as cluster from 'cluster'; -import {FileSystem} from '../../../../src/ngtsc/file_system'; +import {AbsoluteFsPath, FileSystem} from '../../../../src/ngtsc/file_system'; import {Logger} from '../../logging/logger'; import {PackageJsonUpdater} from '../../writing/package_json_updater'; import {AnalyzeEntryPointsFn} from '../api'; @@ -28,7 +28,7 @@ import {Deferred, sendMessageToWorker} from './utils'; export class ClusterMaster { private finishedDeferred = new Deferred(); private processingStartTime: number = -1; - private taskAssignments = new Map(); + private taskAssignments = new Map(); private taskQueue: TaskQueue; private onTaskCompleted: TaskCompletedCallback; @@ -101,7 +101,7 @@ export class ClusterMaster { } // Process the next task on the worker. - this.taskAssignments.set(workerId, task); + this.taskAssignments.set(workerId, {task}); sendMessageToWorker(workerId, {type: 'process-task', task}); isWorkerAvailable = false; @@ -145,13 +145,16 @@ export class ClusterMaster { if (worker.exitedAfterDisconnect) return; // The worker exited unexpectedly: Determine it's status and take an appropriate action. - const currentTask = this.taskAssignments.get(worker.id); + const assignment = this.taskAssignments.get(worker.id); this.logger.warn( `Worker #${worker.id} exited unexpectedly (code: ${code} | signal: ${signal}).\n` + - ` Current assignment: ${(currentTask == null) ? '-' : stringifyTask(currentTask)}`); + ` Current task: ${(assignment == null) ? '-' : stringifyTask(assignment.task)}\n` + + ` Current phase: ${ + (assignment == null) ? '-' : + (assignment.files == null) ? 'compiling' : 'writing files'}`); - if (currentTask == null) { + if (assignment == null) { // The crashed worker process was not in the middle of a task: // Just spawn another process. this.logger.debug(`Spawning another worker process to replace #${worker.id}...`); @@ -160,6 +163,8 @@ export class ClusterMaster { } else { // The crashed worker process was in the middle of a task: // Impossible to know whether we can recover (without ending up with a corrupted entry-point). + // TODO: Use `assignment.files` to revert any changes and rerun the task worker. + const currentTask = assignment.task; throw new Error( 'Process unexpectedly crashed, while processing format property ' + `${currentTask.formatProperty} for entry-point '${currentTask.entryPoint.path}'.`); @@ -207,38 +212,56 @@ export class ClusterMaster { /** Handle a worker's having completed their assigned task. */ private onWorkerTaskCompleted(workerId: number, msg: TaskCompletedMessage): void { - const task = this.taskAssignments.get(workerId) || null; + const assignment = this.taskAssignments.get(workerId) || null; - if (task === null) { + if (assignment === null) { throw new Error( `Expected worker #${workerId} to have a task assigned, while handling message: ` + JSON.stringify(msg)); } - this.onTaskCompleted(task, msg.outcome, msg.message); + this.onTaskCompleted(assignment.task, msg.outcome, msg.message); - this.taskQueue.markAsCompleted(task); + this.taskQueue.markAsCompleted(assignment.task); this.taskAssignments.set(workerId, null); this.maybeDistributeWork(); } /** Handle a worker's message regarding the files transformed while processing its task. */ private onWorkerTransformedFiles(workerId: number, msg: TransformedFilesMessage): void { - // TODO: Do something useful with this info. - } + const assignment = this.taskAssignments.get(workerId) || null; - /** Handle a worker's request to update a `package.json` file. */ - private onWorkerUpdatePackageJson(workerId: number, msg: UpdatePackageJsonMessage): void { - const task = this.taskAssignments.get(workerId) || null; - - if (task === null) { + if (assignment === null) { throw new Error( `Expected worker #${workerId} to have a task assigned, while handling message: ` + JSON.stringify(msg)); } - const expectedPackageJsonPath = this.fileSystem.resolve(task.entryPoint.path, 'package.json'); - const parsedPackageJson = task.entryPoint.packageJson; + const oldFiles = assignment.files; + const newFiles = msg.files; + + if (oldFiles !== undefined) { + throw new Error( + `Worker #${workerId} reported transformed files more than once.\n` + + ` Old files (${oldFiles.length}): [${oldFiles.join(', ')}]\n` + + ` New files (${newFiles.length}): [${newFiles.join(', ')}]\n`); + } + + assignment.files = newFiles; + } + + /** Handle a worker's request to update a `package.json` file. */ + private onWorkerUpdatePackageJson(workerId: number, msg: UpdatePackageJsonMessage): void { + const assignment = this.taskAssignments.get(workerId) || null; + + if (assignment === null) { + throw new Error( + `Expected worker #${workerId} to have a task assigned, while handling message: ` + + JSON.stringify(msg)); + } + + const entryPoint = assignment.task.entryPoint; + const expectedPackageJsonPath = this.fileSystem.resolve(entryPoint.path, 'package.json'); if (expectedPackageJsonPath !== msg.packageJsonPath) { throw new Error( @@ -254,7 +277,7 @@ export class ClusterMaster { // In other words, task processing should only rely on the info that was there when the // file was initially parsed (during entry-point analysis) and not on the info that might // be added later (during task processing). - this.pkgJsonUpdater.writeChanges(msg.changes, msg.packageJsonPath, parsedPackageJson); + this.pkgJsonUpdater.writeChanges(msg.changes, msg.packageJsonPath, entryPoint.packageJson); } /** Stop all workers and stop listening on cluster events. */