277 lines
11 KiB
TypeScript
Raw Normal View History

/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
/// <reference types="node" />
import * as cluster from 'cluster';
import {FileSystem} from '../../../../src/ngtsc/file_system';
import {Logger} from '../../logging/logger';
import {PackageJsonUpdater} from '../../writing/package_json_updater';
import {AnalyzeEntryPointsFn} from '../api';
import {CreateTaskCompletedCallback, Task, TaskCompletedCallback, TaskQueue} from '../tasks/api';
import {stringifyTask} from '../tasks/utils';
import {MessageFromWorker, TaskCompletedMessage, UpdatePackageJsonMessage} from './api';
import {Deferred, sendMessageToWorker} from './utils';
/**
* The cluster master is responsible for analyzing all entry-points, planning the work that needs to
* be done, distributing it to worker-processes and collecting/post-processing the results.
*/
export class ClusterMaster {
private finishedDeferred = new Deferred<void>();
private processingStartTime: number = -1;
private taskAssignments = new Map<number, Task|null>();
private taskQueue: TaskQueue;
private onTaskCompleted: TaskCompletedCallback;
constructor(
private maxWorkerCount: number, private fileSystem: FileSystem, private logger: Logger,
private pkgJsonUpdater: PackageJsonUpdater, analyzeEntryPoints: AnalyzeEntryPointsFn,
createTaskCompletedCallback: CreateTaskCompletedCallback) {
if (!cluster.isMaster) {
throw new Error('Tried to instantiate `ClusterMaster` on a worker process.');
}
// Set the worker entry-point
cluster.setupMaster({exec: this.fileSystem.resolve(__dirname, 'worker.js')});
this.taskQueue = analyzeEntryPoints();
this.onTaskCompleted = createTaskCompletedCallback(this.taskQueue);
}
run(): Promise<void> {
if (this.taskQueue.allTasksCompleted) {
return Promise.resolve();
}
// Set up listeners for worker events (emitted on `cluster`).
cluster.on('online', this.wrapEventHandler(worker => this.onWorkerOnline(worker.id)));
cluster.on(
'message', this.wrapEventHandler((worker, msg) => this.onWorkerMessage(worker.id, msg)));
cluster.on(
'exit',
this.wrapEventHandler((worker, code, signal) => this.onWorkerExit(worker, code, signal)));
// Since we have pending tasks at the very minimum we need a single worker.
cluster.fork();
return this.finishedDeferred.promise.then(() => this.stopWorkers(), err => {
this.stopWorkers();
return Promise.reject(err);
});
}
/** Try to find available (idle) workers and assign them available (non-blocked) tasks. */
private maybeDistributeWork(): void {
let isWorkerAvailable = false;
// First, check whether all tasks have been completed.
if (this.taskQueue.allTasksCompleted) {
const duration = Math.round((Date.now() - this.processingStartTime) / 100) / 10;
this.logger.debug(`Processed tasks in ${duration}s.`);
return this.finishedDeferred.resolve();
}
// Look for available workers and available tasks to assign to them.
for (const [workerId, assignedTask] of Array.from(this.taskAssignments)) {
if (assignedTask !== null) {
// This worker already has a job; check other workers.
continue;
} else {
// This worker is available.
isWorkerAvailable = true;
}
// This worker needs a job. See if any are available.
const task = this.taskQueue.getNextTask();
if (task === null) {
// No suitable work available right now.
break;
}
// Process the next task on the worker.
this.taskAssignments.set(workerId, task);
sendMessageToWorker(workerId, {type: 'process-task', task});
isWorkerAvailable = false;
}
if (!isWorkerAvailable) {
fix(ngcc): do not spawn more processes than intended in parallel mode (#36280) When running in parallel mode, ngcc spawns multiple worker processed to process the various entry-points. The number of max allowed processes is determined by the number of CPU cores available to the OS. There is also currently an [upper limit of 8][1]. The number of active workers is in turn inferred by the number of [task assignments][2]. In the past, counting the entries of `ClusterMaster#taskAssignments` was enough, because worker processes were spawned eagerly at the beginning and corresponding entries were created in `taskAssignments`. Since #35719 however, worker processes are spawned lazily on an as needed basis. Because there is some delay between [spawning a process][3] and [inserting it][4] into `taskAssignments`, there is a short period of time when `taskAssignment.size` does not actually represent the number of spawned processes. This can result in spawning more than `ClusterMaster#workerCount` processes. An example of this can be seen in #36278, where the debug logs indicate 9 worker processes had been spawned (`All 9 workers are currently busy`) despite the hard limit of 8. This commit fixes this by using `cluster.workers` to compute the number of spawned worker processes. `cluster.workers` is updated synchronously with `cluster.fork()` and thus reflects the number of spawned workers accurately at all times. [1]: https://github.com/angular/angular/blob/b8e9a30d3b6/packages/compiler-cli/ngcc/src/main.ts#L429 [2]: https://github.com/angular/angular/blob/b8e9a30d3b6/packages/compiler-cli/ngcc/src/execution/cluster/master.ts#L108 [3]: https://github.com/angular/angular/blob/b8e9a30d3b6/packages/compiler-cli/ngcc/src/execution/cluster/master.ts#L110 [4]: https://github.com/angular/angular/blob/b8e9a30d3b6/packages/compiler-cli/ngcc/src/execution/cluster/master.ts#L199 PR Close #36280
2020-03-27 19:30:39 +02:00
const spawnedWorkerCount = Object.keys(cluster.workers).length;
if (spawnedWorkerCount < this.maxWorkerCount) {
this.logger.debug('Spawning another worker process as there is more work to be done.');
cluster.fork();
} else {
// If there are no available workers or no available tasks, log (for debugging purposes).
this.logger.debug(
fix(ngcc): do not spawn more processes than intended in parallel mode (#36280) When running in parallel mode, ngcc spawns multiple worker processed to process the various entry-points. The number of max allowed processes is determined by the number of CPU cores available to the OS. There is also currently an [upper limit of 8][1]. The number of active workers is in turn inferred by the number of [task assignments][2]. In the past, counting the entries of `ClusterMaster#taskAssignments` was enough, because worker processes were spawned eagerly at the beginning and corresponding entries were created in `taskAssignments`. Since #35719 however, worker processes are spawned lazily on an as needed basis. Because there is some delay between [spawning a process][3] and [inserting it][4] into `taskAssignments`, there is a short period of time when `taskAssignment.size` does not actually represent the number of spawned processes. This can result in spawning more than `ClusterMaster#workerCount` processes. An example of this can be seen in #36278, where the debug logs indicate 9 worker processes had been spawned (`All 9 workers are currently busy`) despite the hard limit of 8. This commit fixes this by using `cluster.workers` to compute the number of spawned worker processes. `cluster.workers` is updated synchronously with `cluster.fork()` and thus reflects the number of spawned workers accurately at all times. [1]: https://github.com/angular/angular/blob/b8e9a30d3b6/packages/compiler-cli/ngcc/src/main.ts#L429 [2]: https://github.com/angular/angular/blob/b8e9a30d3b6/packages/compiler-cli/ngcc/src/execution/cluster/master.ts#L108 [3]: https://github.com/angular/angular/blob/b8e9a30d3b6/packages/compiler-cli/ngcc/src/execution/cluster/master.ts#L110 [4]: https://github.com/angular/angular/blob/b8e9a30d3b6/packages/compiler-cli/ngcc/src/execution/cluster/master.ts#L199 PR Close #36280
2020-03-27 19:30:39 +02:00
`All ${spawnedWorkerCount} workers are currently busy and cannot take on more work.`);
}
} else {
const busyWorkers = Array.from(this.taskAssignments)
.filter(([_workerId, task]) => task !== null)
.map(([workerId]) => workerId);
const totalWorkerCount = this.taskAssignments.size;
const idleWorkerCount = totalWorkerCount - busyWorkers.length;
this.logger.debug(
`No assignments for ${idleWorkerCount} idle (out of ${totalWorkerCount} total) ` +
`workers. Busy workers: ${busyWorkers.join(', ')}`);
if (busyWorkers.length === 0) {
// This is a bug:
// All workers are idle (meaning no tasks are in progress) and `taskQueue.allTasksCompleted`
// is `false`, but there is still no assignable work.
throw new Error(
'There are still unprocessed tasks in the queue and no tasks are currently in ' +
`progress, yet the queue did not return any available tasks: ${this.taskQueue}`);
}
}
}
/** Handle a worker's exiting. (Might be intentional or not.) */
private onWorkerExit(worker: cluster.Worker, code: number|null, signal: string|null): void {
// If the worker's exiting was intentional, nothing to do.
if (worker.exitedAfterDisconnect) return;
// The worker exited unexpectedly: Determine it's status and take an appropriate action.
const currentTask = this.taskAssignments.get(worker.id);
this.logger.warn(
`Worker #${worker.id} exited unexpectedly (code: ${code} | signal: ${signal}).\n` +
` Current assignment: ${(currentTask == null) ? '-' : stringifyTask(currentTask)}`);
if (currentTask == null) {
// The crashed worker process was not in the middle of a task:
// Just spawn another process.
this.logger.debug(`Spawning another worker process to replace #${worker.id}...`);
this.taskAssignments.delete(worker.id);
cluster.fork();
} else {
// The crashed worker process was in the middle of a task:
// Impossible to know whether we can recover (without ending up with a corrupted entry-point).
throw new Error(
'Process unexpectedly crashed, while processing format property ' +
`${currentTask.formatProperty} for entry-point '${currentTask.entryPoint.path}'.`);
}
}
/** Handle a message from a worker. */
private onWorkerMessage(workerId: number, msg: MessageFromWorker): void {
if (!this.taskAssignments.has(workerId)) {
const knownWorkers = Array.from(this.taskAssignments.keys());
throw new Error(
`Received message from unknown worker #${workerId} (known workers: ` +
`${knownWorkers.join(', ')}): ${JSON.stringify(msg)}`);
}
switch (msg.type) {
case 'error':
throw new Error(`Error on worker #${workerId}: ${msg.error}`);
case 'task-completed':
return this.onWorkerTaskCompleted(workerId, msg);
case 'update-package-json':
return this.onWorkerUpdatePackageJson(workerId, msg);
default:
throw new Error(
`Invalid message received from worker #${workerId}: ${JSON.stringify(msg)}`);
}
}
/** Handle a worker's coming online. */
private onWorkerOnline(workerId: number): void {
if (this.taskAssignments.has(workerId)) {
throw new Error(`Invariant violated: Worker #${workerId} came online more than once.`);
}
if (this.processingStartTime === -1) {
this.logger.debug('Processing tasks...');
this.processingStartTime = Date.now();
}
this.taskAssignments.set(workerId, null);
this.maybeDistributeWork();
}
/** Handle a worker's having completed their assigned task. */
private onWorkerTaskCompleted(workerId: number, msg: TaskCompletedMessage): void {
const task = this.taskAssignments.get(workerId) || null;
if (task === null) {
throw new Error(
`Expected worker #${workerId} to have a task assigned, while handling message: ` +
JSON.stringify(msg));
}
this.onTaskCompleted(task, msg.outcome, msg.message);
this.taskQueue.markAsCompleted(task);
this.taskAssignments.set(workerId, null);
this.maybeDistributeWork();
}
/** Handle a worker's request to update a `package.json` file. */
private onWorkerUpdatePackageJson(workerId: number, msg: UpdatePackageJsonMessage): void {
const task = this.taskAssignments.get(workerId) || null;
if (task === null) {
throw new Error(
`Expected worker #${workerId} to have a task assigned, while handling message: ` +
JSON.stringify(msg));
}
const expectedPackageJsonPath = this.fileSystem.resolve(task.entryPoint.path, 'package.json');
const parsedPackageJson = task.entryPoint.packageJson;
if (expectedPackageJsonPath !== msg.packageJsonPath) {
throw new Error(
`Received '${msg.type}' message from worker #${workerId} for '${msg.packageJsonPath}', ` +
`but was expecting '${expectedPackageJsonPath}' (based on task assignment).`);
}
// NOTE: Although the change in the parsed `package.json` will be reflected in tasks objects
// locally and thus also in future `process-task` messages sent to worker processes, any
// processes already running and processing a task for the same entry-point will not get
// the change.
// Do not rely on having an up-to-date `package.json` representation in worker processes.
// In other words, task processing should only rely on the info that was there when the
// file was initially parsed (during entry-point analysis) and not on the info that might
// be added later (during task processing).
this.pkgJsonUpdater.writeChanges(msg.changes, msg.packageJsonPath, parsedPackageJson);
}
/** Stop all workers and stop listening on cluster events. */
private stopWorkers(): void {
const workers = Object.values(cluster.workers) as cluster.Worker[];
this.logger.debug(`Stopping ${workers.length} workers...`);
cluster.removeAllListeners();
workers.forEach(worker => worker.kill());
}
/**
* Wrap an event handler to ensure that `finishedDeferred` will be rejected on error (regardless
* if the handler completes synchronously or asynchronously).
*/
private wrapEventHandler<Args extends unknown[]>(fn: (...args: Args) => void|Promise<void>):
(...args: Args) => Promise<void> {
return async (...args: Args) => {
try {
await fn(...args);
} catch (err) {
this.finishedDeferred.reject(err);
}
};
}
}