refactor(ngcc): keep track of transformed files per task (#36626)

With this commit, the master process will keep track of the transformed
files that each worker process is intending to write to disk.

In a subsequent commit, this info will be used to allow ngcc to recover
when a worker process crashes in the middle of processing a task.

PR Close #36626
This commit is contained in:
George Kalpakas 2020-04-29 21:28:16 +03:00 committed by Andrew Kushnir
parent dff5129661
commit ff6e93163f
1 changed files with 43 additions and 20 deletions

View File

@ -10,7 +10,7 @@
import * as cluster from 'cluster';
import {FileSystem} from '../../../../src/ngtsc/file_system';
import {AbsoluteFsPath, FileSystem} from '../../../../src/ngtsc/file_system';
import {Logger} from '../../logging/logger';
import {PackageJsonUpdater} from '../../writing/package_json_updater';
import {AnalyzeEntryPointsFn} from '../api';
@ -28,7 +28,7 @@ import {Deferred, sendMessageToWorker} from './utils';
export class ClusterMaster {
private finishedDeferred = new Deferred<void>();
private processingStartTime: number = -1;
private taskAssignments = new Map<number, Task|null>();
private taskAssignments = new Map<number, {task: Task, files?: AbsoluteFsPath[]}|null>();
private taskQueue: TaskQueue;
private onTaskCompleted: TaskCompletedCallback;
@ -101,7 +101,7 @@ export class ClusterMaster {
}
// Process the next task on the worker.
this.taskAssignments.set(workerId, task);
this.taskAssignments.set(workerId, {task});
sendMessageToWorker(workerId, {type: 'process-task', task});
isWorkerAvailable = false;
@ -145,13 +145,16 @@ export class ClusterMaster {
if (worker.exitedAfterDisconnect) return;
// The worker exited unexpectedly: Determine it's status and take an appropriate action.
const currentTask = this.taskAssignments.get(worker.id);
const assignment = this.taskAssignments.get(worker.id);
this.logger.warn(
`Worker #${worker.id} exited unexpectedly (code: ${code} | signal: ${signal}).\n` +
` Current assignment: ${(currentTask == null) ? '-' : stringifyTask(currentTask)}`);
` Current task: ${(assignment == null) ? '-' : stringifyTask(assignment.task)}\n` +
` Current phase: ${
(assignment == null) ? '-' :
(assignment.files == null) ? 'compiling' : 'writing files'}`);
if (currentTask == null) {
if (assignment == null) {
// The crashed worker process was not in the middle of a task:
// Just spawn another process.
this.logger.debug(`Spawning another worker process to replace #${worker.id}...`);
@ -160,6 +163,8 @@ export class ClusterMaster {
} else {
// The crashed worker process was in the middle of a task:
// Impossible to know whether we can recover (without ending up with a corrupted entry-point).
// TODO: Use `assignment.files` to revert any changes and rerun the task worker.
const currentTask = assignment.task;
throw new Error(
'Process unexpectedly crashed, while processing format property ' +
`${currentTask.formatProperty} for entry-point '${currentTask.entryPoint.path}'.`);
@ -207,38 +212,56 @@ export class ClusterMaster {
/** Handle a worker's having completed their assigned task. */
private onWorkerTaskCompleted(workerId: number, msg: TaskCompletedMessage): void {
const task = this.taskAssignments.get(workerId) || null;
const assignment = this.taskAssignments.get(workerId) || null;
if (task === null) {
if (assignment === null) {
throw new Error(
`Expected worker #${workerId} to have a task assigned, while handling message: ` +
JSON.stringify(msg));
}
this.onTaskCompleted(task, msg.outcome, msg.message);
this.onTaskCompleted(assignment.task, msg.outcome, msg.message);
this.taskQueue.markAsCompleted(task);
this.taskQueue.markAsCompleted(assignment.task);
this.taskAssignments.set(workerId, null);
this.maybeDistributeWork();
}
/** Handle a worker's message regarding the files transformed while processing its task. */
private onWorkerTransformedFiles(workerId: number, msg: TransformedFilesMessage): void {
// TODO: Do something useful with this info.
}
const assignment = this.taskAssignments.get(workerId) || null;
/** Handle a worker's request to update a `package.json` file. */
private onWorkerUpdatePackageJson(workerId: number, msg: UpdatePackageJsonMessage): void {
const task = this.taskAssignments.get(workerId) || null;
if (task === null) {
if (assignment === null) {
throw new Error(
`Expected worker #${workerId} to have a task assigned, while handling message: ` +
JSON.stringify(msg));
}
const expectedPackageJsonPath = this.fileSystem.resolve(task.entryPoint.path, 'package.json');
const parsedPackageJson = task.entryPoint.packageJson;
const oldFiles = assignment.files;
const newFiles = msg.files;
if (oldFiles !== undefined) {
throw new Error(
`Worker #${workerId} reported transformed files more than once.\n` +
` Old files (${oldFiles.length}): [${oldFiles.join(', ')}]\n` +
` New files (${newFiles.length}): [${newFiles.join(', ')}]\n`);
}
assignment.files = newFiles;
}
/** Handle a worker's request to update a `package.json` file. */
private onWorkerUpdatePackageJson(workerId: number, msg: UpdatePackageJsonMessage): void {
const assignment = this.taskAssignments.get(workerId) || null;
if (assignment === null) {
throw new Error(
`Expected worker #${workerId} to have a task assigned, while handling message: ` +
JSON.stringify(msg));
}
const entryPoint = assignment.task.entryPoint;
const expectedPackageJsonPath = this.fileSystem.resolve(entryPoint.path, 'package.json');
if (expectedPackageJsonPath !== msg.packageJsonPath) {
throw new Error(
@ -254,7 +277,7 @@ export class ClusterMaster {
// In other words, task processing should only rely on the info that was there when the
// file was initially parsed (during entry-point analysis) and not on the info that might
// be added later (during task processing).
this.pkgJsonUpdater.writeChanges(msg.changes, msg.packageJsonPath, parsedPackageJson);
this.pkgJsonUpdater.writeChanges(msg.changes, msg.packageJsonPath, entryPoint.packageJson);
}
/** Stop all workers and stop listening on cluster events. */