refactor(ngcc): notify master process about transformed files before writing (#36626)
With this commit, worker processes will notify the master process about the transformed files they are about to write to disk before starting writing them. In a subsequent commit, this will be used to allow ngcc to recover when a worker process crashes in the middle of processing a task. PR Close #36626
This commit is contained in:
parent
e367593a26
commit
dff5129661
|
@ -36,6 +36,12 @@ export interface TaskCompletedMessage extends JsonObject {
|
||||||
message: string|null;
|
message: string|null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** A message listing the paths to transformed files about to be written to disk. */
|
||||||
|
export interface TransformedFilesMessage extends JsonObject {
|
||||||
|
type: 'transformed-files';
|
||||||
|
files: AbsoluteFsPath[];
|
||||||
|
}
|
||||||
|
|
||||||
/** A message requesting the update of a `package.json` file. */
|
/** A message requesting the update of a `package.json` file. */
|
||||||
export interface UpdatePackageJsonMessage extends JsonObject {
|
export interface UpdatePackageJsonMessage extends JsonObject {
|
||||||
type: 'update-package-json';
|
type: 'update-package-json';
|
||||||
|
@ -44,7 +50,8 @@ export interface UpdatePackageJsonMessage extends JsonObject {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The type of messages sent from cluster workers to the cluster master. */
|
/** The type of messages sent from cluster workers to the cluster master. */
|
||||||
export type MessageFromWorker = ErrorMessage|TaskCompletedMessage|UpdatePackageJsonMessage;
|
export type MessageFromWorker =
|
||||||
|
ErrorMessage|TaskCompletedMessage|TransformedFilesMessage|UpdatePackageJsonMessage;
|
||||||
|
|
||||||
/** The type of messages sent from the cluster master to cluster workers. */
|
/** The type of messages sent from the cluster master to cluster workers. */
|
||||||
export type MessageToWorker = ProcessTaskMessage;
|
export type MessageToWorker = ProcessTaskMessage;
|
||||||
|
|
|
@ -17,7 +17,7 @@ import {AnalyzeEntryPointsFn} from '../api';
|
||||||
import {CreateTaskCompletedCallback, Task, TaskCompletedCallback, TaskQueue} from '../tasks/api';
|
import {CreateTaskCompletedCallback, Task, TaskCompletedCallback, TaskQueue} from '../tasks/api';
|
||||||
import {stringifyTask} from '../tasks/utils';
|
import {stringifyTask} from '../tasks/utils';
|
||||||
|
|
||||||
import {MessageFromWorker, TaskCompletedMessage, UpdatePackageJsonMessage} from './api';
|
import {MessageFromWorker, TaskCompletedMessage, TransformedFilesMessage, UpdatePackageJsonMessage} from './api';
|
||||||
import {Deferred, sendMessageToWorker} from './utils';
|
import {Deferred, sendMessageToWorker} from './utils';
|
||||||
|
|
||||||
|
|
||||||
|
@ -180,6 +180,8 @@ export class ClusterMaster {
|
||||||
throw new Error(`Error on worker #${workerId}: ${msg.error}`);
|
throw new Error(`Error on worker #${workerId}: ${msg.error}`);
|
||||||
case 'task-completed':
|
case 'task-completed':
|
||||||
return this.onWorkerTaskCompleted(workerId, msg);
|
return this.onWorkerTaskCompleted(workerId, msg);
|
||||||
|
case 'transformed-files':
|
||||||
|
return this.onWorkerTransformedFiles(workerId, msg);
|
||||||
case 'update-package-json':
|
case 'update-package-json':
|
||||||
return this.onWorkerUpdatePackageJson(workerId, msg);
|
return this.onWorkerUpdatePackageJson(workerId, msg);
|
||||||
default:
|
default:
|
||||||
|
@ -220,6 +222,11 @@ export class ClusterMaster {
|
||||||
this.maybeDistributeWork();
|
this.maybeDistributeWork();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Handle a worker's message regarding the files transformed while processing its task. */
|
||||||
|
private onWorkerTransformedFiles(workerId: number, msg: TransformedFilesMessage): void {
|
||||||
|
// TODO: Do something useful with this info.
|
||||||
|
}
|
||||||
|
|
||||||
/** Handle a worker's request to update a `package.json` file. */
|
/** Handle a worker's request to update a `package.json` file. */
|
||||||
private onWorkerUpdatePackageJson(workerId: number, msg: UpdatePackageJsonMessage): void {
|
private onWorkerUpdatePackageJson(workerId: number, msg: UpdatePackageJsonMessage): void {
|
||||||
const task = this.taskAssignments.get(workerId) || null;
|
const task = this.taskAssignments.get(workerId) || null;
|
||||||
|
|
|
@ -44,18 +44,21 @@ export class Deferred<T> {
|
||||||
* (This function should be invoked from cluster workers only.)
|
* (This function should be invoked from cluster workers only.)
|
||||||
*
|
*
|
||||||
* @param msg The message to send to the cluster master.
|
* @param msg The message to send to the cluster master.
|
||||||
|
* @return A promise that is resolved once the message has been sent.
|
||||||
*/
|
*/
|
||||||
export const sendMessageToMaster = (msg: MessageFromWorker): void => {
|
export const sendMessageToMaster = (msg: MessageFromWorker): Promise<void> => {
|
||||||
if (cluster.isMaster) {
|
if (cluster.isMaster) {
|
||||||
throw new Error('Unable to send message to the master process: Already on the master process.');
|
throw new Error('Unable to send message to the master process: Already on the master process.');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (process.send === undefined) {
|
return new Promise((resolve, reject) => {
|
||||||
// Theoretically, this should never happen on a worker process.
|
if (process.send === undefined) {
|
||||||
throw new Error('Unable to send message to the master process: Missing `process.send()`.');
|
// Theoretically, this should never happen on a worker process.
|
||||||
}
|
throw new Error('Unable to send message to the master process: Missing `process.send()`.');
|
||||||
|
}
|
||||||
|
|
||||||
process.send(msg);
|
process.send(msg, (err: Error|null) => (err === null) ? resolve() : reject(err));
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -64,8 +67,9 @@ export const sendMessageToMaster = (msg: MessageFromWorker): void => {
|
||||||
*
|
*
|
||||||
* @param workerId The ID of the recipient worker.
|
* @param workerId The ID of the recipient worker.
|
||||||
* @param msg The message to send to the worker.
|
* @param msg The message to send to the worker.
|
||||||
|
* @return A promise that is resolved once the message has been sent.
|
||||||
*/
|
*/
|
||||||
export const sendMessageToWorker = (workerId: number, msg: MessageToWorker): void => {
|
export const sendMessageToWorker = (workerId: number, msg: MessageToWorker): Promise<void> => {
|
||||||
if (!cluster.isMaster) {
|
if (!cluster.isMaster) {
|
||||||
throw new Error('Unable to send message to worker process: Sender is not the master process.');
|
throw new Error('Unable to send message to worker process: Sender is not the master process.');
|
||||||
}
|
}
|
||||||
|
@ -77,5 +81,7 @@ export const sendMessageToWorker = (workerId: number, msg: MessageToWorker): voi
|
||||||
'Unable to send message to worker process: Recipient does not exist or has disconnected.');
|
'Unable to send message to worker process: Recipient does not exist or has disconnected.');
|
||||||
}
|
}
|
||||||
|
|
||||||
worker.send(msg);
|
return new Promise((resolve, reject) => {
|
||||||
|
worker.send(msg, (err: Error|null) => (err === null) ? resolve() : reject(err));
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
|
@ -62,7 +62,10 @@ export async function startWorker(logger: Logger, createCompileFn: CreateCompile
|
||||||
}
|
}
|
||||||
|
|
||||||
const compile = createCompileFn(
|
const compile = createCompileFn(
|
||||||
() => {},
|
transformedFiles => sendMessageToMaster({
|
||||||
|
type: 'transformed-files',
|
||||||
|
files: transformedFiles.map(f => f.path),
|
||||||
|
}),
|
||||||
(_task, outcome, message) => sendMessageToMaster({type: 'task-completed', outcome, message}));
|
(_task, outcome, message) => sendMessageToMaster({type: 'task-completed', outcome, message}));
|
||||||
|
|
||||||
|
|
||||||
|
@ -79,7 +82,7 @@ export async function startWorker(logger: Logger, createCompileFn: CreateCompile
|
||||||
`[Worker #${cluster.worker.id}] Invalid message received: ${JSON.stringify(msg)}`);
|
`[Worker #${cluster.worker.id}] Invalid message received: ${JSON.stringify(msg)}`);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
sendMessageToMaster({
|
await sendMessageToMaster({
|
||||||
type: 'error',
|
type: 'error',
|
||||||
error: (err instanceof Error) ? (err.stack || err.message) : err,
|
error: (err instanceof Error) ? (err.stack || err.message) : err,
|
||||||
});
|
});
|
||||||
|
@ -88,4 +91,4 @@ export async function startWorker(logger: Logger, createCompileFn: CreateCompile
|
||||||
|
|
||||||
// Return a promise that is never resolved.
|
// Return a promise that is never resolved.
|
||||||
return new Promise(() => undefined);
|
return new Promise(() => undefined);
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,25 +87,34 @@ runInEachFileSystem(() => {
|
||||||
.writeChanges(jsonPath, parsed);
|
.writeChanges(jsonPath, parsed);
|
||||||
|
|
||||||
writeToProp(['foo']);
|
writeToProp(['foo']);
|
||||||
expect(processSendSpy).toHaveBeenCalledWith({
|
expect(processSendSpy)
|
||||||
type: 'update-package-json',
|
.toHaveBeenCalledWith(
|
||||||
packageJsonPath: jsonPath,
|
{
|
||||||
changes: [[['foo'], 'updated', 'unimportant']],
|
type: 'update-package-json',
|
||||||
});
|
packageJsonPath: jsonPath,
|
||||||
|
changes: [[['foo'], 'updated', 'unimportant']],
|
||||||
|
},
|
||||||
|
jasmine.any(Function));
|
||||||
|
|
||||||
writeToProp(['bar'], {before: 'foo'});
|
writeToProp(['bar'], {before: 'foo'});
|
||||||
expect(processSendSpy).toHaveBeenCalledWith({
|
expect(processSendSpy)
|
||||||
type: 'update-package-json',
|
.toHaveBeenCalledWith(
|
||||||
packageJsonPath: jsonPath,
|
{
|
||||||
changes: [[['bar'], 'updated', {before: 'foo'}]],
|
type: 'update-package-json',
|
||||||
});
|
packageJsonPath: jsonPath,
|
||||||
|
changes: [[['bar'], 'updated', {before: 'foo'}]],
|
||||||
|
},
|
||||||
|
jasmine.any(Function));
|
||||||
|
|
||||||
writeToProp(['bar', 'baz', 'qux'], 'alphabetic', {});
|
writeToProp(['bar', 'baz', 'qux'], 'alphabetic', {});
|
||||||
expect(processSendSpy).toHaveBeenCalledWith({
|
expect(processSendSpy)
|
||||||
type: 'update-package-json',
|
.toHaveBeenCalledWith(
|
||||||
packageJsonPath: jsonPath,
|
{
|
||||||
changes: [[['bar', 'baz', 'qux'], 'updated', 'alphabetic']],
|
type: 'update-package-json',
|
||||||
});
|
packageJsonPath: jsonPath,
|
||||||
|
changes: [[['bar', 'baz', 'qux'], 'updated', 'alphabetic']],
|
||||||
|
},
|
||||||
|
jasmine.any(Function));
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should update an in-memory representation (if provided)', () => {
|
it('should update an in-memory representation (if provided)', () => {
|
||||||
|
|
|
@ -11,8 +11,11 @@
|
||||||
import * as cluster from 'cluster';
|
import * as cluster from 'cluster';
|
||||||
import {EventEmitter} from 'events';
|
import {EventEmitter} from 'events';
|
||||||
|
|
||||||
|
import {AbsoluteFsPath} from '../../../../src/ngtsc/file_system';
|
||||||
|
import {CreateCompileFn} from '../../../src/execution/api';
|
||||||
import {startWorker} from '../../../src/execution/cluster/worker';
|
import {startWorker} from '../../../src/execution/cluster/worker';
|
||||||
import {Task, TaskCompletedCallback, TaskProcessingOutcome} from '../../../src/execution/tasks/api';
|
import {Task, TaskCompletedCallback, TaskProcessingOutcome} from '../../../src/execution/tasks/api';
|
||||||
|
import {FileToWrite} from '../../../src/rendering/utils';
|
||||||
import {MockLogger} from '../../helpers/mock_logger';
|
import {MockLogger} from '../../helpers/mock_logger';
|
||||||
import {mockProperty} from '../../helpers/spy_utils';
|
import {mockProperty} from '../../helpers/spy_utils';
|
||||||
|
|
||||||
|
@ -60,6 +63,24 @@ describe('startWorker()', () => {
|
||||||
expect(createCompileFnSpy).toHaveBeenCalledWith(jasmine.any(Function), jasmine.any(Function));
|
expect(createCompileFnSpy).toHaveBeenCalledWith(jasmine.any(Function), jasmine.any(Function));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should set up `compileFn()` to send `transformed-files` messages to master', () => {
|
||||||
|
startWorker(mockLogger, createCompileFnSpy);
|
||||||
|
|
||||||
|
const mockTransformedFiles: FileToWrite[] = [
|
||||||
|
{path: '/foo' as AbsoluteFsPath, contents: 'FOO'},
|
||||||
|
{path: '/bar' as AbsoluteFsPath, contents: 'BAR'},
|
||||||
|
];
|
||||||
|
const beforeWritingFiles: Parameters<CreateCompileFn>[0] =
|
||||||
|
createCompileFnSpy.calls.argsFor(0)[0];
|
||||||
|
|
||||||
|
beforeWritingFiles(mockTransformedFiles);
|
||||||
|
|
||||||
|
expect(processSendSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(processSendSpy)
|
||||||
|
.toHaveBeenCalledWith(
|
||||||
|
{type: 'transformed-files', files: ['/foo', '/bar']}, jasmine.any(Function));
|
||||||
|
});
|
||||||
|
|
||||||
it('should set up `compileFn()` to send `task-completed` messages to master', () => {
|
it('should set up `compileFn()` to send `task-completed` messages to master', () => {
|
||||||
startWorker(mockLogger, createCompileFnSpy);
|
startWorker(mockLogger, createCompileFnSpy);
|
||||||
const onTaskCompleted: TaskCompletedCallback = createCompileFnSpy.calls.argsFor(0)[1];
|
const onTaskCompleted: TaskCompletedCallback = createCompileFnSpy.calls.argsFor(0)[1];
|
||||||
|
@ -68,17 +89,21 @@ describe('startWorker()', () => {
|
||||||
expect(processSendSpy).toHaveBeenCalledTimes(1);
|
expect(processSendSpy).toHaveBeenCalledTimes(1);
|
||||||
expect(processSendSpy)
|
expect(processSendSpy)
|
||||||
.toHaveBeenCalledWith(
|
.toHaveBeenCalledWith(
|
||||||
{type: 'task-completed', outcome: TaskProcessingOutcome.Processed, message: null});
|
{type: 'task-completed', outcome: TaskProcessingOutcome.Processed, message: null},
|
||||||
|
jasmine.any(Function));
|
||||||
|
|
||||||
processSendSpy.calls.reset();
|
processSendSpy.calls.reset();
|
||||||
|
|
||||||
onTaskCompleted(null as any, TaskProcessingOutcome.Failed, 'error message');
|
onTaskCompleted(null as any, TaskProcessingOutcome.Failed, 'error message');
|
||||||
expect(processSendSpy).toHaveBeenCalledTimes(1);
|
expect(processSendSpy).toHaveBeenCalledTimes(1);
|
||||||
expect(processSendSpy).toHaveBeenCalledWith({
|
expect(processSendSpy)
|
||||||
type: 'task-completed',
|
.toHaveBeenCalledWith(
|
||||||
outcome: TaskProcessingOutcome.Failed,
|
{
|
||||||
message: 'error message',
|
type: 'task-completed',
|
||||||
});
|
outcome: TaskProcessingOutcome.Failed,
|
||||||
|
message: 'error message',
|
||||||
|
},
|
||||||
|
jasmine.any(Function));
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should return a promise (that is never resolved)', done => {
|
it('should return a promise (that is never resolved)', done => {
|
||||||
|
@ -129,11 +154,13 @@ describe('startWorker()', () => {
|
||||||
|
|
||||||
err = 'Error string.';
|
err = 'Error string.';
|
||||||
cluster.worker.emit('message', {type: 'process-task', task: mockTask});
|
cluster.worker.emit('message', {type: 'process-task', task: mockTask});
|
||||||
expect(processSendSpy).toHaveBeenCalledWith({type: 'error', error: err});
|
expect(processSendSpy)
|
||||||
|
.toHaveBeenCalledWith({type: 'error', error: err}, jasmine.any(Function));
|
||||||
|
|
||||||
err = new Error('Error object.');
|
err = new Error('Error object.');
|
||||||
cluster.worker.emit('message', {type: 'process-task', task: mockTask});
|
cluster.worker.emit('message', {type: 'process-task', task: mockTask});
|
||||||
expect(processSendSpy).toHaveBeenCalledWith({type: 'error', error: err.stack});
|
expect(processSendSpy)
|
||||||
|
.toHaveBeenCalledWith({type: 'error', error: err.stack}, jasmine.any(Function));
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should throw, when an unknown message type is received', () => {
|
it('should throw, when an unknown message type is received', () => {
|
||||||
|
@ -141,11 +168,14 @@ describe('startWorker()', () => {
|
||||||
cluster.worker.emit('message', {type: 'unknown', foo: 'bar'});
|
cluster.worker.emit('message', {type: 'unknown', foo: 'bar'});
|
||||||
|
|
||||||
expect(compileFnSpy).not.toHaveBeenCalled();
|
expect(compileFnSpy).not.toHaveBeenCalled();
|
||||||
expect(processSendSpy).toHaveBeenCalledWith({
|
expect(processSendSpy)
|
||||||
type: 'error',
|
.toHaveBeenCalledWith(
|
||||||
error: jasmine.stringMatching(
|
{
|
||||||
'Error: \\[Worker #42\\] Invalid message received: {"type":"unknown","foo":"bar"}'),
|
type: 'error',
|
||||||
});
|
error: jasmine.stringMatching(
|
||||||
|
'Error: \\[Worker #42\\] Invalid message received: {"type":"unknown","foo":"bar"}'),
|
||||||
|
},
|
||||||
|
jasmine.any(Function));
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue