fix(ngcc): support recovering when a worker process crashes (#36626)

Previously, when running in parallel mode and a worker process crashed
while processing a task, it was not possible for ngcc to continue
without risking ending up with a corrupted entry-point and therefore it
exited with an error. This, for example, could happen when a worker
process received a `SIGKILL` signal, which was frequently observed in CI
environments. This was probably the result of Docker killing processes
due to increased memory pressure.

One factor that amplifies the problem under Docker (which is often used
in CI) is that it is not possible to distinguish between the available
CPU cores on the host machine and the ones made available to Docker
containers, thus resulting in ngcc spawning too many worker processes.

This commit addresses these issues in the following ways:

1. We take advantage of the fact that files are written to disk only
   after an entry-point has been fully analyzed/compiled. The master
   process can now determine whether a worker process has not yet
   started writing files to disk (even if it was in the middle of
   processing a task) and just put the task back into the tasks queue if
   the worker process crashes.

2. The master process keeps track of the transformed files that a worker
   process will attempt to write to disk. If the worker process crashes
   while writing files, the master process can revert any changes and
   put the task back into the tasks queue (without risking corruption).

3. When a worker process crashes while processing a task (which can be a
   result of increased memory pressure or too many worker processes),
   the master process will not try to re-spawn it. This way the number
   or worker processes is gradually adjusted to a level that can be
   accomodated by the system's resources.

Examples of ngcc being able to recover after a worker process crashed:
- While idling: https://circleci.com/gh/angular/angular/682197
- While compiling: https://circleci.com/gh/angular/angular/682209
- While writing files: https://circleci.com/gh/angular/angular/682267

Jira issue: [FW-2008](https://angular-team.atlassian.net/browse/FW-2008)

Fixes #36278

PR Close #36626
This commit is contained in:
George Kalpakas 2020-04-29 21:28:22 +03:00 committed by Andrew Kushnir
parent 772ccf0d9f
commit 966598cda7
7 changed files with 69 additions and 48 deletions

View File

@ -8,6 +8,7 @@
import {FileSystem} from '../../../../src/ngtsc/file_system'; import {FileSystem} from '../../../../src/ngtsc/file_system';
import {AsyncLocker} from '../../locking/async_locker'; import {AsyncLocker} from '../../locking/async_locker';
import {Logger} from '../../logging/logger'; import {Logger} from '../../logging/logger';
import {FileWriter} from '../../writing/file_writer';
import {PackageJsonUpdater} from '../../writing/package_json_updater'; import {PackageJsonUpdater} from '../../writing/package_json_updater';
import {AnalyzeEntryPointsFn, CreateCompileFn, Executor} from '../api'; import {AnalyzeEntryPointsFn, CreateCompileFn, Executor} from '../api';
import {CreateTaskCompletedCallback} from '../tasks/api'; import {CreateTaskCompletedCallback} from '../tasks/api';
@ -21,7 +22,8 @@ import {ClusterMaster} from './master';
export class ClusterExecutor implements Executor { export class ClusterExecutor implements Executor {
constructor( constructor(
private workerCount: number, private fileSystem: FileSystem, private logger: Logger, private workerCount: number, private fileSystem: FileSystem, private logger: Logger,
private pkgJsonUpdater: PackageJsonUpdater, private lockFile: AsyncLocker, private fileWriter: FileWriter, private pkgJsonUpdater: PackageJsonUpdater,
private lockFile: AsyncLocker,
private createTaskCompletedCallback: CreateTaskCompletedCallback) {} private createTaskCompletedCallback: CreateTaskCompletedCallback) {}
async execute(analyzeEntryPoints: AnalyzeEntryPointsFn, _createCompileFn: CreateCompileFn): async execute(analyzeEntryPoints: AnalyzeEntryPointsFn, _createCompileFn: CreateCompileFn):
@ -30,8 +32,8 @@ export class ClusterExecutor implements Executor {
this.logger.debug( this.logger.debug(
`Running ngcc on ${this.constructor.name} (using ${this.workerCount} worker processes).`); `Running ngcc on ${this.constructor.name} (using ${this.workerCount} worker processes).`);
const master = new ClusterMaster( const master = new ClusterMaster(
this.workerCount, this.fileSystem, this.logger, this.pkgJsonUpdater, analyzeEntryPoints, this.workerCount, this.fileSystem, this.logger, this.fileWriter, this.pkgJsonUpdater,
this.createTaskCompletedCallback); analyzeEntryPoints, this.createTaskCompletedCallback);
return master.run(); return master.run();
}); });
} }

View File

@ -12,6 +12,7 @@ import * as cluster from 'cluster';
import {AbsoluteFsPath, FileSystem} from '../../../../src/ngtsc/file_system'; import {AbsoluteFsPath, FileSystem} from '../../../../src/ngtsc/file_system';
import {Logger} from '../../logging/logger'; import {Logger} from '../../logging/logger';
import {FileWriter} from '../../writing/file_writer';
import {PackageJsonUpdater} from '../../writing/package_json_updater'; import {PackageJsonUpdater} from '../../writing/package_json_updater';
import {AnalyzeEntryPointsFn} from '../api'; import {AnalyzeEntryPointsFn} from '../api';
import {CreateTaskCompletedCallback, Task, TaskCompletedCallback, TaskQueue} from '../tasks/api'; import {CreateTaskCompletedCallback, Task, TaskCompletedCallback, TaskQueue} from '../tasks/api';
@ -34,7 +35,8 @@ export class ClusterMaster {
constructor( constructor(
private maxWorkerCount: number, private fileSystem: FileSystem, private logger: Logger, private maxWorkerCount: number, private fileSystem: FileSystem, private logger: Logger,
private pkgJsonUpdater: PackageJsonUpdater, analyzeEntryPoints: AnalyzeEntryPointsFn, private fileWriter: FileWriter, private pkgJsonUpdater: PackageJsonUpdater,
analyzeEntryPoints: AnalyzeEntryPointsFn,
createTaskCompletedCallback: CreateTaskCompletedCallback) { createTaskCompletedCallback: CreateTaskCompletedCallback) {
if (!cluster.isMaster) { if (!cluster.isMaster) {
throw new Error('Tried to instantiate `ClusterMaster` on a worker process.'); throw new Error('Tried to instantiate `ClusterMaster` on a worker process.');
@ -146,6 +148,7 @@ export class ClusterMaster {
// The worker exited unexpectedly: Determine it's status and take an appropriate action. // The worker exited unexpectedly: Determine it's status and take an appropriate action.
const assignment = this.taskAssignments.get(worker.id); const assignment = this.taskAssignments.get(worker.id);
this.taskAssignments.delete(worker.id);
this.logger.warn( this.logger.warn(
`Worker #${worker.id} exited unexpectedly (code: ${code} | signal: ${signal}).\n` + `Worker #${worker.id} exited unexpectedly (code: ${code} | signal: ${signal}).\n` +
@ -158,16 +161,34 @@ export class ClusterMaster {
// The crashed worker process was not in the middle of a task: // The crashed worker process was not in the middle of a task:
// Just spawn another process. // Just spawn another process.
this.logger.debug(`Spawning another worker process to replace #${worker.id}...`); this.logger.debug(`Spawning another worker process to replace #${worker.id}...`);
this.taskAssignments.delete(worker.id);
cluster.fork(); cluster.fork();
} else { } else {
const {task, files} = assignment;
if (files != null) {
// The crashed worker process was in the middle of writing transformed files:
// Revert any changes before re-processing the task.
this.logger.debug(`Reverting ${files.length} transformed files...`);
this.fileWriter.revertBundle(
task.entryPoint, files, task.formatPropertiesToMarkAsProcessed);
}
// The crashed worker process was in the middle of a task: // The crashed worker process was in the middle of a task:
// Impossible to know whether we can recover (without ending up with a corrupted entry-point). // Re-add the task back to the queue.
// TODO: Use `assignment.files` to revert any changes and rerun the task worker. this.taskQueue.markAsUnprocessed(task);
const currentTask = assignment.task;
throw new Error( // The crashing might be a result of increased memory consumption by ngcc.
'Process unexpectedly crashed, while processing format property ' + // Do not spawn another process, unless this was the last worker process.
`${currentTask.formatProperty} for entry-point '${currentTask.entryPoint.path}'.`); const spawnedWorkerCount = Object.keys(cluster.workers).length;
if (spawnedWorkerCount > 0) {
this.logger.debug(`Not spawning another worker process to replace #${
worker.id}. Continuing with ${spawnedWorkerCount} workers...`);
this.maybeDistributeWork();
} else {
this.logger.debug(`Spawning another worker process to replace #${worker.id}...`);
this.remainingRespawnAttempts--;
cluster.fork();
}
} }
} }

View File

@ -28,24 +28,23 @@ if (require.main === module) {
try { try {
const { const {
createNewEntryPointFormats = false, logger,
logger = new ConsoleLogger(LogLevel.info),
pathMappings, pathMappings,
errorOnFailedEntryPoint = false, enableI18nLegacyMessageIdFormat,
enableI18nLegacyMessageIdFormat = true,
fileSystem, fileSystem,
tsConfig tsConfig,
getFileWriter,
} = getSharedSetup(parseCommandLineOptions(process.argv.slice(2))); } = getSharedSetup(parseCommandLineOptions(process.argv.slice(2)));
// NOTE: To avoid file corruption, `ngcc` invocation only creates _one_ instance of // NOTE: To avoid file corruption, `ngcc` invocation only creates _one_ instance of
// `PackageJsonUpdater` that actually writes to disk (across all processes). // `PackageJsonUpdater` that actually writes to disk (across all processes).
// In cluster workers we use a `PackageJsonUpdater` that delegates to the cluster master. // In cluster workers we use a `PackageJsonUpdater` that delegates to the cluster master.
const pkgJsonUpdater = new ClusterWorkerPackageJsonUpdater(); const pkgJsonUpdater = new ClusterWorkerPackageJsonUpdater();
const fileWriter = getFileWriter(pkgJsonUpdater);
// The function for creating the `compile()` function. // The function for creating the `compile()` function.
const createCompileFn = getCreateCompileFn( const createCompileFn = getCreateCompileFn(
fileSystem, logger, pkgJsonUpdater, createNewEntryPointFormats, errorOnFailedEntryPoint, fileSystem, logger, fileWriter, enableI18nLegacyMessageIdFormat, tsConfig, pathMappings);
enableI18nLegacyMessageIdFormat, tsConfig, pathMappings);
await startWorker(logger, createCompileFn); await startWorker(logger, createCompileFn);
process.exitCode = 0; process.exitCode = 0;

View File

@ -16,9 +16,6 @@ import {PathMappings} from '../ngcc_options';
import {getEntryPointFormat} from '../packages/entry_point'; import {getEntryPointFormat} from '../packages/entry_point';
import {makeEntryPointBundle} from '../packages/entry_point_bundle'; import {makeEntryPointBundle} from '../packages/entry_point_bundle';
import {FileWriter} from '../writing/file_writer'; import {FileWriter} from '../writing/file_writer';
import {InPlaceFileWriter} from '../writing/in_place_file_writer';
import {NewEntryPointFileWriter} from '../writing/new_entry_point_file_writer';
import {PackageJsonUpdater} from '../writing/package_json_updater';
import {CreateCompileFn} from './api'; import {CreateCompileFn} from './api';
import {Task, TaskProcessingOutcome} from './tasks/api'; import {Task, TaskProcessingOutcome} from './tasks/api';
@ -27,13 +24,10 @@ import {Task, TaskProcessingOutcome} from './tasks/api';
* The function for creating the `compile()` function. * The function for creating the `compile()` function.
*/ */
export function getCreateCompileFn( export function getCreateCompileFn(
fileSystem: FileSystem, logger: Logger, pkgJsonUpdater: PackageJsonUpdater, fileSystem: FileSystem, logger: Logger, fileWriter: FileWriter,
createNewEntryPointFormats: boolean, errorOnFailedEntryPoint: boolean,
enableI18nLegacyMessageIdFormat: boolean, tsConfig: ParsedConfiguration|null, enableI18nLegacyMessageIdFormat: boolean, tsConfig: ParsedConfiguration|null,
pathMappings: PathMappings|undefined): CreateCompileFn { pathMappings: PathMappings|undefined): CreateCompileFn {
return (beforeWritingFiles, onTaskCompleted) => { return (beforeWritingFiles, onTaskCompleted) => {
const fileWriter = getFileWriter(
fileSystem, logger, pkgJsonUpdater, createNewEntryPointFormats, errorOnFailedEntryPoint);
const {Transformer} = require('../packages/transformer'); const {Transformer} = require('../packages/transformer');
const transformer = new Transformer(fileSystem, logger, tsConfig); const transformer = new Transformer(fileSystem, logger, tsConfig);
@ -91,11 +85,3 @@ export function getCreateCompileFn(
}; };
}; };
} }
function getFileWriter(
fs: FileSystem, logger: Logger, pkgJsonUpdater: PackageJsonUpdater,
createNewEntryPointFormats: boolean, errorOnFailedEntryPoint: boolean): FileWriter {
return createNewEntryPointFormats ?
new NewEntryPointFileWriter(fs, logger, errorOnFailedEntryPoint, pkgJsonUpdater) :
new InPlaceFileWriter(fs, logger, errorOnFailedEntryPoint);
}

View File

@ -36,6 +36,7 @@ import {AsyncNgccOptions, getSharedSetup, NgccOptions, PathMappings, SyncNgccOpt
import {NgccConfiguration} from './packages/configuration'; import {NgccConfiguration} from './packages/configuration';
import {EntryPointJsonProperty, SUPPORTED_FORMAT_PROPERTIES} from './packages/entry_point'; import {EntryPointJsonProperty, SUPPORTED_FORMAT_PROPERTIES} from './packages/entry_point';
import {EntryPointManifest, InvalidatingEntryPointManifest} from './packages/entry_point_manifest'; import {EntryPointManifest, InvalidatingEntryPointManifest} from './packages/entry_point_manifest';
import {FileWriter} from './writing/file_writer';
import {DirectPackageJsonUpdater, PackageJsonUpdater} from './writing/package_json_updater'; import {DirectPackageJsonUpdater, PackageJsonUpdater} from './writing/package_json_updater';
/** /**
@ -54,7 +55,6 @@ export function mainNgcc(options: NgccOptions): void|Promise<void> {
targetEntryPointPath, targetEntryPointPath,
propertiesToConsider, propertiesToConsider,
compileAllFormats, compileAllFormats,
createNewEntryPointFormats,
logger, logger,
pathMappings, pathMappings,
async, async,
@ -64,7 +64,8 @@ export function mainNgcc(options: NgccOptions): void|Promise<void> {
fileSystem, fileSystem,
absBasePath, absBasePath,
projectPath, projectPath,
tsConfig tsConfig,
getFileWriter,
} = getSharedSetup(options); } = getSharedSetup(options);
const config = new NgccConfiguration(fileSystem, projectPath); const config = new NgccConfiguration(fileSystem, projectPath);
@ -95,19 +96,20 @@ export function mainNgcc(options: NgccOptions): void|Promise<void> {
logger, finder, fileSystem, supportedPropertiesToConsider, compileAllFormats, logger, finder, fileSystem, supportedPropertiesToConsider, compileAllFormats,
propertiesToConsider, inParallel); propertiesToConsider, inParallel);
// Create an updater that will actually write to disk. In // Create an updater that will actually write to disk.
const pkgJsonUpdater = new DirectPackageJsonUpdater(fileSystem); const pkgJsonUpdater = new DirectPackageJsonUpdater(fileSystem);
const fileWriter = getFileWriter(pkgJsonUpdater);
// The function for creating the `compile()` function. // The function for creating the `compile()` function.
const createCompileFn = getCreateCompileFn( const createCompileFn = getCreateCompileFn(
fileSystem, logger, pkgJsonUpdater, createNewEntryPointFormats, errorOnFailedEntryPoint, fileSystem, logger, fileWriter, enableI18nLegacyMessageIdFormat, tsConfig, pathMappings);
enableI18nLegacyMessageIdFormat, tsConfig, pathMappings);
// The executor for actually planning and getting the work done. // The executor for actually planning and getting the work done.
const createTaskCompletedCallback = const createTaskCompletedCallback =
getCreateTaskCompletedCallback(pkgJsonUpdater, errorOnFailedEntryPoint, logger, fileSystem); getCreateTaskCompletedCallback(pkgJsonUpdater, errorOnFailedEntryPoint, logger, fileSystem);
const executor = getExecutor( const executor = getExecutor(
async, inParallel, logger, pkgJsonUpdater, fileSystem, createTaskCompletedCallback); async, inParallel, logger, fileWriter, pkgJsonUpdater, fileSystem,
createTaskCompletedCallback);
return executor.execute(analyzeEntryPoints, createCompileFn); return executor.execute(analyzeEntryPoints, createCompileFn);
} }
@ -146,8 +148,9 @@ function getCreateTaskCompletedCallback(
} }
function getExecutor( function getExecutor(
async: boolean, inParallel: boolean, logger: Logger, pkgJsonUpdater: PackageJsonUpdater, async: boolean, inParallel: boolean, logger: Logger, fileWriter: FileWriter,
fileSystem: FileSystem, createTaskCompletedCallback: CreateTaskCompletedCallback): Executor { pkgJsonUpdater: PackageJsonUpdater, fileSystem: FileSystem,
createTaskCompletedCallback: CreateTaskCompletedCallback): Executor {
const lockFile = new LockFileWithChildProcess(fileSystem, logger); const lockFile = new LockFileWithChildProcess(fileSystem, logger);
if (async) { if (async) {
// Execute asynchronously (either serially or in parallel) // Execute asynchronously (either serially or in parallel)
@ -156,7 +159,8 @@ function getExecutor(
// Execute in parallel. Use up to 8 CPU cores for workers, always reserving one for master. // Execute in parallel. Use up to 8 CPU cores for workers, always reserving one for master.
const workerCount = Math.min(8, os.cpus().length - 1); const workerCount = Math.min(8, os.cpus().length - 1);
return new ClusterExecutor( return new ClusterExecutor(
workerCount, fileSystem, logger, pkgJsonUpdater, locker, createTaskCompletedCallback); workerCount, fileSystem, logger, fileWriter, pkgJsonUpdater, locker,
createTaskCompletedCallback);
} else { } else {
// Execute serially, on a single thread (async). // Execute serially, on a single thread (async).
return new SingleProcessExecutorAsync(logger, locker, createTaskCompletedCallback); return new SingleProcessExecutorAsync(logger, locker, createTaskCompletedCallback);

View File

@ -11,6 +11,10 @@ import {ParsedConfiguration, readConfiguration} from '../../src/perform_compile'
import {ConsoleLogger} from './logging/console_logger'; import {ConsoleLogger} from './logging/console_logger';
import {Logger, LogLevel} from './logging/logger'; import {Logger, LogLevel} from './logging/logger';
import {SUPPORTED_FORMAT_PROPERTIES} from './packages/entry_point'; import {SUPPORTED_FORMAT_PROPERTIES} from './packages/entry_point';
import {FileWriter} from './writing/file_writer';
import {InPlaceFileWriter} from './writing/in_place_file_writer';
import {NewEntryPointFileWriter} from './writing/new_entry_point_file_writer';
import {PackageJsonUpdater} from './writing/package_json_updater';
/** /**
* The options to configure the ngcc compiler for synchronous execution. * The options to configure the ngcc compiler for synchronous execution.
@ -156,6 +160,7 @@ export type OptionalNgccOptions = Pick<NgccOptions, OptionalNgccOptionKeys>;
export type SharedSetup = { export type SharedSetup = {
fileSystem: FileSystem; absBasePath: AbsoluteFsPath; projectPath: AbsoluteFsPath; fileSystem: FileSystem; absBasePath: AbsoluteFsPath; projectPath: AbsoluteFsPath;
tsConfig: ParsedConfiguration | null; tsConfig: ParsedConfiguration | null;
getFileWriter(pkgJsonUpdater: PackageJsonUpdater): FileWriter;
}; };
/** /**
@ -210,5 +215,8 @@ export function getSharedSetup(options: NgccOptions): SharedSetup&RequiredNgccOp
absBasePath, absBasePath,
projectPath, projectPath,
tsConfig, tsConfig,
getFileWriter: (pkgJsonUpdater: PackageJsonUpdater) => createNewEntryPointFormats ?
new NewEntryPointFileWriter(fileSystem, logger, errorOnFailedEntryPoint, pkgJsonUpdater) :
new InPlaceFileWriter(fileSystem, logger, errorOnFailedEntryPoint),
}; };
} }

View File

@ -15,6 +15,7 @@ import {MockFileSystemNative, runInEachFileSystem} from '../../../../src/ngtsc/f
import {ClusterExecutor} from '../../../src/execution/cluster/executor'; import {ClusterExecutor} from '../../../src/execution/cluster/executor';
import {ClusterMaster} from '../../../src/execution/cluster/master'; import {ClusterMaster} from '../../../src/execution/cluster/master';
import {AsyncLocker} from '../../../src/locking/async_locker'; import {AsyncLocker} from '../../../src/locking/async_locker';
import {FileWriter} from '../../../src/writing/file_writer';
import {PackageJsonUpdater} from '../../../src/writing/package_json_updater'; import {PackageJsonUpdater} from '../../../src/writing/package_json_updater';
import {MockLockFile} from '../../helpers/mock_lock_file'; import {MockLockFile} from '../../helpers/mock_lock_file';
import {MockLogger} from '../../helpers/mock_logger'; import {MockLogger} from '../../helpers/mock_logger';
@ -41,8 +42,8 @@ runInEachFileSystem(() => {
mockLockFile = new MockLockFile(new MockFileSystemNative(), lockFileLog); mockLockFile = new MockLockFile(new MockFileSystemNative(), lockFileLog);
locker = new AsyncLocker(mockLockFile, mockLogger, 200, 2); locker = new AsyncLocker(mockLockFile, mockLogger, 200, 2);
executor = new ClusterExecutor( executor = new ClusterExecutor(
42, getFileSystem(), mockLogger, null as unknown as PackageJsonUpdater, locker, 42, getFileSystem(), mockLogger, null as unknown as FileWriter,
createTaskCompletedCallback); null as unknown as PackageJsonUpdater, locker, createTaskCompletedCallback);
}); });
describe('execute()', () => { describe('execute()', () => {
@ -98,8 +99,8 @@ runInEachFileSystem(() => {
}); });
executor = new ClusterExecutor( executor = new ClusterExecutor(
42, getFileSystem(), mockLogger, null as unknown as PackageJsonUpdater, locker, 42, getFileSystem(), mockLogger, null as unknown as FileWriter,
createTaskCompletedCallback); null as unknown as PackageJsonUpdater, locker, createTaskCompletedCallback);
let error = ''; let error = '';
try { try {
await executor.execute(anyFn, anyFn); await executor.execute(anyFn, anyFn);
@ -118,8 +119,8 @@ runInEachFileSystem(() => {
}); });
executor = new ClusterExecutor( executor = new ClusterExecutor(
42, getFileSystem(), mockLogger, null as unknown as PackageJsonUpdater, locker, 42, getFileSystem(), mockLogger, null as unknown as FileWriter,
createTaskCompletedCallback); null as unknown as PackageJsonUpdater, locker, createTaskCompletedCallback);
let error = ''; let error = '';
try { try {
await executor.execute(anyFn, anyFn); await executor.execute(anyFn, anyFn);