refactor(ngcc): simplify cluster PackageJsonUpdater (#36637)

PR Close #36637
This commit is contained in:
Pete Bacon Darwin 2020-04-16 10:37:24 +01:00 committed by Matias Niemelä
parent 443f5eee85
commit bb944eecd6
4 changed files with 152 additions and 188 deletions

View File

@ -18,26 +18,26 @@ import {sendMessageToMaster} from './utils';
/** /**
* A `PackageJsonUpdater` that can safely handle update operations on multiple processes. * A `PackageJsonUpdater` for cluster workers that will send update changes to the master process so
* that it can safely handle update operations on multiple processes.
*/ */
export class ClusterPackageJsonUpdater implements PackageJsonUpdater { export class ClusterWorkerPackageJsonUpdater implements PackageJsonUpdater {
constructor(private delegate: PackageJsonUpdater) {} constructor() {
if (cluster.isMaster) {
throw new Error('Tried to create cluster worker PackageJsonUpdater on the master process.');
}
}
createUpdate(): PackageJsonUpdate { createUpdate(): PackageJsonUpdate {
return new PackageJsonUpdate((...args) => this.writeChanges(...args)); return new PackageJsonUpdate((...args) => this.writeChanges(...args));
} }
/**
* Apply the changes in-memory (if necessary) and send a message to the master process.
*/
writeChanges( writeChanges(
changes: PackageJsonChange[], packageJsonPath: AbsoluteFsPath, changes: PackageJsonChange[], packageJsonPath: AbsoluteFsPath,
preExistingParsedJson?: JsonObject): void { preExistingParsedJson?: JsonObject): void {
if (cluster.isMaster) {
// This is the master process:
// Actually apply the changes to the file on disk.
return this.delegate.writeChanges(changes, packageJsonPath, preExistingParsedJson);
}
// This is a worker process:
// Apply the changes in-memory (if necessary) and send a message to the master process.
if (preExistingParsedJson) { if (preExistingParsedJson) {
for (const [propPath, value] of changes) { for (const [propPath, value] of changes) {
if (propPath.length === 0) { if (propPath.length === 0) {

View File

@ -15,13 +15,12 @@ import {parseCommandLineOptions} from '../../command_line_options';
import {ConsoleLogger} from '../../logging/console_logger'; import {ConsoleLogger} from '../../logging/console_logger';
import {Logger, LogLevel} from '../../logging/logger'; import {Logger, LogLevel} from '../../logging/logger';
import {getPathMappingsFromTsConfig} from '../../utils'; import {getPathMappingsFromTsConfig} from '../../utils';
import {DirectPackageJsonUpdater} from '../../writing/package_json_updater';
import {CreateCompileFn} from '../api'; import {CreateCompileFn} from '../api';
import {getCreateCompileFn} from '../create_compile_function'; import {getCreateCompileFn} from '../create_compile_function';
import {stringifyTask} from '../tasks/utils'; import {stringifyTask} from '../tasks/utils';
import {MessageToWorker} from './api'; import {MessageToWorker} from './api';
import {ClusterPackageJsonUpdater} from './package_json_updater'; import {ClusterWorkerPackageJsonUpdater} from './package_json_updater';
import {sendMessageToMaster} from './utils'; import {sendMessageToMaster} from './utils';
// Cluster worker entry point // Cluster worker entry point
@ -55,8 +54,10 @@ if (require.main === module) {
pathMappings = getPathMappingsFromTsConfig(tsConfig, projectPath); pathMappings = getPathMappingsFromTsConfig(tsConfig, projectPath);
} }
const pkgJsonUpdater = // NOTE: To avoid file corruption, `ngcc` invocation only creates _one_ instance of
new ClusterPackageJsonUpdater(new DirectPackageJsonUpdater(fileSystem)); // `PackageJsonUpdater` that actually writes to disk (across all processes).
// In cluster workers we use a `PackageJsonUpdater` that delegates to the cluster master.
const pkgJsonUpdater = new ClusterWorkerPackageJsonUpdater();
// The function for creating the `compile()` function. // The function for creating the `compile()` function.
const createCompileFn = getCreateCompileFn( const createCompileFn = getCreateCompileFn(

View File

@ -26,7 +26,6 @@ import {TargetedEntryPointFinder} from './entry_point_finder/targeted_entry_poin
import {getAnalyzeEntryPointsFn} from './execution/analyze_entry_points'; import {getAnalyzeEntryPointsFn} from './execution/analyze_entry_points';
import {Executor} from './execution/api'; import {Executor} from './execution/api';
import {ClusterExecutor} from './execution/cluster/executor'; import {ClusterExecutor} from './execution/cluster/executor';
import {ClusterPackageJsonUpdater} from './execution/cluster/package_json_updater';
import {getCreateCompileFn} from './execution/create_compile_function'; import {getCreateCompileFn} from './execution/create_compile_function';
import {SingleProcessExecutorAsync, SingleProcessExecutorSync} from './execution/single_process_executor'; import {SingleProcessExecutorAsync, SingleProcessExecutorSync} from './execution/single_process_executor';
import {CreateTaskCompletedCallback, TaskProcessingOutcome} from './execution/tasks/api'; import {CreateTaskCompletedCallback, TaskProcessingOutcome} from './execution/tasks/api';
@ -105,11 +104,7 @@ export function mainNgcc({
return; return;
} }
// NOTE: To avoid file corruption, ensure that each `ngcc` invocation only creates _one_ instance const pkgJsonUpdater = new DirectPackageJsonUpdater(fileSystem);
// of `PackageJsonUpdater` that actually writes to disk (across all processes).
// This is hard to enforce automatically, when running on multiple processes, so needs to be
// enforced manually.
const pkgJsonUpdater = getPackageJsonUpdater(inParallel, fileSystem);
const analyzeEntryPoints = getAnalyzeEntryPointsFn( const analyzeEntryPoints = getAnalyzeEntryPointsFn(
logger, finder, fileSystem, supportedPropertiesToConsider, compileAllFormats, logger, finder, fileSystem, supportedPropertiesToConsider, compileAllFormats,
@ -151,11 +146,6 @@ function ensureSupportedProperties(properties: string[]): EntryPointJsonProperty
return supportedProperties; return supportedProperties;
} }
function getPackageJsonUpdater(inParallel: boolean, fs: FileSystem): PackageJsonUpdater {
const directPkgJsonUpdater = new DirectPackageJsonUpdater(fs);
return inParallel ? new ClusterPackageJsonUpdater(directPkgJsonUpdater) : directPkgJsonUpdater;
}
function getCreateTaskCompletedCallback( function getCreateTaskCompletedCallback(
pkgJsonUpdater: PackageJsonUpdater, errorOnFailedEntryPoint: boolean, logger: Logger, pkgJsonUpdater: PackageJsonUpdater, errorOnFailedEntryPoint: boolean, logger: Logger,
fileSystem: FileSystem): CreateTaskCompletedCallback { fileSystem: FileSystem): CreateTaskCompletedCallback {

View File

@ -12,7 +12,7 @@ import * as cluster from 'cluster';
import {absoluteFrom as _} from '../../../../src/ngtsc/file_system'; import {absoluteFrom as _} from '../../../../src/ngtsc/file_system';
import {runInEachFileSystem} from '../../../../src/ngtsc/file_system/testing'; import {runInEachFileSystem} from '../../../../src/ngtsc/file_system/testing';
import {ClusterPackageJsonUpdater} from '../../../src/execution/cluster/package_json_updater'; import {ClusterWorkerPackageJsonUpdater} from '../../../src/execution/cluster/package_json_updater';
import {JsonObject} from '../../../src/packages/entry_point'; import {JsonObject} from '../../../src/packages/entry_point';
import {PackageJsonPropertyPositioning, PackageJsonUpdate, PackageJsonUpdater} from '../../../src/writing/package_json_updater'; import {PackageJsonPropertyPositioning, PackageJsonUpdate, PackageJsonUpdater} from '../../../src/writing/package_json_updater';
import {mockProperty} from '../../helpers/spy_utils'; import {mockProperty} from '../../helpers/spy_utils';
@ -23,204 +23,177 @@ runInEachFileSystem(() => {
const runAsClusterMaster = mockProperty(cluster, 'isMaster'); const runAsClusterMaster = mockProperty(cluster, 'isMaster');
const mockProcessSend = mockProperty(process, 'send'); const mockProcessSend = mockProperty(process, 'send');
let processSendSpy: jasmine.Spy; let processSendSpy: jasmine.Spy;
let delegate: PackageJsonUpdater;
let updater: ClusterPackageJsonUpdater;
beforeEach(() => { beforeEach(() => {
processSendSpy = jasmine.createSpy('process.send'); processSendSpy = jasmine.createSpy('process.send');
mockProcessSend(processSendSpy); mockProcessSend(processSendSpy);
});
delegate = new MockPackageJsonUpdater(); describe('constructor()', () => {
updater = new ClusterPackageJsonUpdater(delegate); it('should throw an error if used on a cluster master', () => {
runAsClusterMaster(true);
expect(() => new ClusterWorkerPackageJsonUpdater())
.toThrowError(
'Tried to create cluster worker PackageJsonUpdater on the master process.');
});
}); });
describe('createUpdate()', () => { describe('createUpdate()', () => {
[true, false].forEach( let updater: ClusterWorkerPackageJsonUpdater;
isMaster => describe(`(on cluster ${isMaster ? 'master' : 'worker'})`, () => { beforeEach(() => {
beforeEach(() => runAsClusterMaster(isMaster)); runAsClusterMaster(false);
updater = new ClusterWorkerPackageJsonUpdater();
});
it('should return a `PackageJsonUpdate` instance', () => { it('should return a `PackageJsonUpdate` instance', () => {
expect(updater.createUpdate()).toEqual(jasmine.any(PackageJsonUpdate)); expect(updater.createUpdate()).toEqual(jasmine.any(PackageJsonUpdate));
}); });
it('should wire up the `PackageJsonUpdate` with its `writeChanges()` method', () => { it('should wire up the `PackageJsonUpdate` with its `writeChanges()` method', () => {
const writeChangesSpy = spyOn(updater, 'writeChanges'); const writeChangesSpy = spyOn(updater, 'writeChanges');
const jsonPath = _('/foo/package.json'); const jsonPath = _('/foo/package.json');
const update = updater.createUpdate(); const update = updater.createUpdate();
update.addChange(['foo'], 'updated'); update.addChange(['foo'], 'updated');
update.addChange(['baz'], 'updated 2', 'alphabetic'); update.addChange(['baz'], 'updated 2', 'alphabetic');
update.addChange(['bar'], 'updated 3', {before: 'bar'}); update.addChange(['bar'], 'updated 3', {before: 'bar'});
update.writeChanges(jsonPath); update.writeChanges(jsonPath);
expect(writeChangesSpy) expect(writeChangesSpy)
.toHaveBeenCalledWith( .toHaveBeenCalledWith(
[ [
[['foo'], 'updated', 'unimportant'], [['foo'], 'updated', 'unimportant'],
[['baz'], 'updated 2', 'alphabetic'], [['baz'], 'updated 2', 'alphabetic'],
[['bar'], 'updated 3', {before: 'bar'}], [['bar'], 'updated 3', {before: 'bar'}],
], ],
jsonPath, undefined); jsonPath, undefined);
}); });
}));
}); });
describe('writeChanges()', () => { describe('writeChanges()', () => {
describe('(on cluster master)', () => { let updater: ClusterWorkerPackageJsonUpdater;
beforeEach(() => runAsClusterMaster(true)); beforeEach(() => {
afterEach(() => expect(processSendSpy).not.toHaveBeenCalled()); runAsClusterMaster(false);
updater = new ClusterWorkerPackageJsonUpdater();
});
it('should forward the call to the delegate `PackageJsonUpdater`', () => { it('should send an `update-package-json` message to the master process', () => {
const jsonPath = _('/foo/package.json'); const jsonPath = _('/foo/package.json');
const parsedJson = {foo: 'bar'};
updater.createUpdate() const writeToProp =
.addChange(['foo'], 'updated') (propPath: string[], positioning?: PackageJsonPropertyPositioning,
.addChange(['bar'], 'updated too', 'alphabetic') parsed?: JsonObject) => updater.createUpdate()
.writeChanges(jsonPath, parsedJson); .addChange(propPath, 'updated', positioning)
.writeChanges(jsonPath, parsed);
expect(delegate.writeChanges) writeToProp(['foo']);
.toHaveBeenCalledWith( expect(processSendSpy).toHaveBeenCalledWith({
[ type: 'update-package-json',
[['foo'], 'updated', 'unimportant'], packageJsonPath: jsonPath,
[['bar'], 'updated too', 'alphabetic'], changes: [[['foo'], 'updated', 'unimportant']],
],
jsonPath, parsedJson);
}); });
it('should throw, if trying to re-apply an already applied update', () => { writeToProp(['bar'], {before: 'foo'});
const update = updater.createUpdate().addChange(['foo'], 'updated'); expect(processSendSpy).toHaveBeenCalledWith({
type: 'update-package-json',
packageJsonPath: jsonPath,
changes: [[['bar'], 'updated', {before: 'foo'}]],
});
expect(() => update.writeChanges(_('/foo/package.json'))).not.toThrow(); writeToProp(['bar', 'baz', 'qux'], 'alphabetic', {});
expect(() => update.writeChanges(_('/foo/package.json'))) expect(processSendSpy).toHaveBeenCalledWith({
.toThrowError('Trying to apply a `PackageJsonUpdate` that has already been applied.'); type: 'update-package-json',
expect(() => update.writeChanges(_('/bar/package.json'))) packageJsonPath: jsonPath,
.toThrowError('Trying to apply a `PackageJsonUpdate` that has already been applied.'); changes: [[['bar', 'baz', 'qux'], 'updated', 'alphabetic']],
}); });
}); });
describe('(on cluster worker)', () => { it('should update an in-memory representation (if provided)', () => {
beforeEach(() => runAsClusterMaster(false)); const jsonPath = _('/foo/package.json');
afterEach(() => expect(delegate.writeChanges).not.toHaveBeenCalled()); const parsedJson: JsonObject = {
foo: true,
bar: {baz: 'OK'},
};
it('should send an `update-package-json` message to the master process', () => { const update =
const jsonPath = _('/foo/package.json'); updater.createUpdate().addChange(['foo'], false).addChange(['bar', 'baz'], 42);
const writeToProp = // Not updated yet.
(propPath: string[], positioning?: PackageJsonPropertyPositioning, expect(parsedJson).toEqual({
parsed?: JsonObject) => updater.createUpdate() foo: true,
.addChange(propPath, 'updated', positioning) bar: {baz: 'OK'},
.writeChanges(jsonPath, parsed);
writeToProp(['foo']);
expect(processSendSpy).toHaveBeenCalledWith({
type: 'update-package-json',
packageJsonPath: jsonPath,
changes: [[['foo'], 'updated', 'unimportant']],
});
writeToProp(['bar'], {before: 'foo'});
expect(processSendSpy).toHaveBeenCalledWith({
type: 'update-package-json',
packageJsonPath: jsonPath,
changes: [[['bar'], 'updated', {before: 'foo'}]],
});
writeToProp(['bar', 'baz', 'qux'], 'alphabetic', {});
expect(processSendSpy).toHaveBeenCalledWith({
type: 'update-package-json',
packageJsonPath: jsonPath,
changes: [[['bar', 'baz', 'qux'], 'updated', 'alphabetic']],
});
}); });
it('should update an in-memory representation (if provided)', () => { update.writeChanges(jsonPath, parsedJson);
const jsonPath = _('/foo/package.json');
const parsedJson: JsonObject = {
foo: true,
bar: {baz: 'OK'},
};
const update = // Updated now.
updater.createUpdate().addChange(['foo'], false).addChange(['bar', 'baz'], 42); expect(parsedJson).toEqual({
foo: false,
// Not updated yet. bar: {baz: 42},
expect(parsedJson).toEqual({
foo: true,
bar: {baz: 'OK'},
});
update.writeChanges(jsonPath, parsedJson);
// Updated now.
expect(parsedJson).toEqual({
foo: false,
bar: {baz: 42},
});
}); });
});
it('should create any missing ancestor objects', () => { it('should create any missing ancestor objects', () => {
const jsonPath = _('/foo/package.json'); const jsonPath = _('/foo/package.json');
const parsedJson: JsonObject = {foo: {}}; const parsedJson: JsonObject = {foo: {}};
updater.createUpdate() updater.createUpdate()
.addChange(['foo', 'bar', 'baz', 'qux'], 'updated') .addChange(['foo', 'bar', 'baz', 'qux'], 'updated')
.writeChanges(jsonPath, parsedJson); .writeChanges(jsonPath, parsedJson);
expect(parsedJson).toEqual({ expect(parsedJson).toEqual({
foo: { foo: {
bar: { bar: {
baz: { baz: {
qux: 'updated', qux: 'updated',
},
}, },
}, },
}); },
});
it('should throw, if a property-path is empty', () => {
const jsonPath = _('/foo/package.json');
expect(() => updater.createUpdate().addChange([], 'missing').writeChanges(jsonPath, {}))
.toThrowError(`Missing property path for writing value to '${jsonPath}'.`);
});
it('should throw, if a property-path points to a non-object intermediate value', () => {
const jsonPath = _('/foo/package.json');
const parsedJson = {foo: null, bar: 42, baz: {qux: []}};
const writeToProp = (propPath: string[], parsed?: JsonObject) =>
updater.createUpdate().addChange(propPath, 'updated').writeChanges(jsonPath, parsed);
expect(() => writeToProp(['foo', 'child'], parsedJson))
.toThrowError('Property path \'foo.child\' does not point to an object.');
expect(() => writeToProp(['bar', 'child'], parsedJson))
.toThrowError('Property path \'bar.child\' does not point to an object.');
expect(() => writeToProp(['baz', 'qux', 'child'], parsedJson))
.toThrowError('Property path \'baz.qux.child\' does not point to an object.');
// It should not throw, if no parsed representation is provided.
// (The error will still be thrown on the master process, but that is out of scope for
// this test.)
expect(() => writeToProp(['foo', 'child'])).not.toThrow();
});
it('should throw, if trying to re-apply an already applied update', () => {
const update = updater.createUpdate().addChange(['foo'], 'updated');
expect(() => update.writeChanges(_('/foo/package.json'))).not.toThrow();
expect(() => update.writeChanges(_('/foo/package.json')))
.toThrowError('Trying to apply a `PackageJsonUpdate` that has already been applied.');
expect(() => update.writeChanges(_('/bar/package.json')))
.toThrowError('Trying to apply a `PackageJsonUpdate` that has already been applied.');
}); });
}); });
});
// Helpers it('should throw, if a property-path is empty', () => {
class MockPackageJsonUpdater implements PackageJsonUpdater { const jsonPath = _('/foo/package.json');
createUpdate = jasmine.createSpy('MockPackageJsonUpdater#createUpdate');
writeChanges = jasmine.createSpy('MockPackageJsonUpdater#writeChanges'); expect(() => updater.createUpdate().addChange([], 'missing').writeChanges(jsonPath, {}))
} .toThrowError(`Missing property path for writing value to '${jsonPath}'.`);
});
it('should throw, if a property-path points to a non-object intermediate value', () => {
const jsonPath = _('/foo/package.json');
const parsedJson = {foo: null, bar: 42, baz: {qux: []}};
const writeToProp = (propPath: string[], parsed?: JsonObject) =>
updater.createUpdate().addChange(propPath, 'updated').writeChanges(jsonPath, parsed);
expect(() => writeToProp(['foo', 'child'], parsedJson))
.toThrowError('Property path \'foo.child\' does not point to an object.');
expect(() => writeToProp(['bar', 'child'], parsedJson))
.toThrowError('Property path \'bar.child\' does not point to an object.');
expect(() => writeToProp(['baz', 'qux', 'child'], parsedJson))
.toThrowError('Property path \'baz.qux.child\' does not point to an object.');
// It should not throw, if no parsed representation is provided.
// (The error will still be thrown on the master process, but that is out of scope for
// this test.)
expect(() => writeToProp(['foo', 'child'])).not.toThrow();
});
it('should throw, if trying to re-apply an already applied update', () => {
const update = updater.createUpdate().addChange(['foo'], 'updated');
expect(() => update.writeChanges(_('/foo/package.json'))).not.toThrow();
expect(() => update.writeChanges(_('/foo/package.json')))
.toThrowError('Trying to apply a `PackageJsonUpdate` that has already been applied.');
expect(() => update.writeChanges(_('/bar/package.json')))
.toThrowError('Trying to apply a `PackageJsonUpdate` that has already been applied.');
});
});
}); });
// Helpers
class MockPackageJsonUpdater implements PackageJsonUpdater {
createUpdate = jasmine.createSpy('MockPackageJsonUpdater#createUpdate');
writeChanges = jasmine.createSpy('MockPackageJsonUpdater#writeChanges');
}
}); });