feat(ngcc): support marking an in-progress task as unprocessed (#36626)

This commit adds support for stopping processing an in-progress task
and moving it back to the list of pending tasks.

In a subsequent commit, this will be used to allow ngcc to recover when
a worker process crashes in the middle of processing a task.

PR Close #36626
This commit is contained in:
George Kalpakas 2020-04-29 21:28:06 +03:00 committed by Andrew Kushnir
parent 4c63241b34
commit 4665c35453
4 changed files with 109 additions and 0 deletions

View File

@ -117,6 +117,16 @@ export interface TaskQueue {
*/ */
markAsFailed(task: Task): void; markAsFailed(task: Task): void;
/**
* Mark a task as not processed (i.e. add an in-progress task back to the queue).
*
* This removes the task from the internal list of in-progress tasks and adds it back to the list
* of pending tasks.
*
* @param task The task to mark as not processed.
*/
markAsUnprocessed(task: Task): void;
/** /**
* Return a string representation of the task queue (for debugging purposes). * Return a string representation of the task queue (for debugging purposes).
* *

View File

@ -63,6 +63,16 @@ export abstract class BaseTaskQueue implements TaskQueue {
this.inProgressTasks.delete(task); this.inProgressTasks.delete(task);
} }
markAsUnprocessed(task: Task): void {
if (!this.inProgressTasks.has(task)) {
throw new Error(
`Trying to mark task that was not in progress as unprocessed: ${stringifyTask(task)}`);
}
this.inProgressTasks.delete(task);
this.tasks.unshift(task);
}
toString(): string { toString(): string {
const inProgTasks = Array.from(this.inProgressTasks); const inProgTasks = Array.from(this.inProgressTasks);

View File

@ -312,6 +312,65 @@ describe('ParallelTaskQueue', () => {
}); });
}); });
describe('markAsUnprocessed()', () => {
it('should mark an in-progress task as unprocessed, so that it can be picked again', () => {
const {queue} = createQueue(2);
const task1 = queue.getNextTask()!;
const task2 = queue.getNextTask()!;
expect(queue.allTasksCompleted).toBe(false);
queue.markAsUnprocessed(task1);
queue.markTaskCompleted(task2);
expect(queue.allTasksCompleted).toBe(false);
expect(queue.getNextTask()).toBe(task1);
expect(queue.allTasksCompleted).toBe(false);
queue.markTaskCompleted(task1);
expect(queue.allTasksCompleted).toBe(true);
});
it('should throw, if the specified task is not in progress', () => {
const {tasks, queue} = createQueue(3);
queue.getNextTask();
queue.markAsCompleted(tasks[0]);
// Try with a task that is already completed.
expect(() => queue.markAsUnprocessed(tasks[0]))
.toThrowError(
`Trying to mark task that was not in progress as unprocessed: ` +
`{entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}`);
// Try with a task that is not yet started.
expect(() => queue.markAsUnprocessed(tasks[2]))
.toThrowError(
`Trying to mark task that was not in progress as unprocessed: ` +
`{entryPoint: entry-point-2, formatProperty: prop-0, processDts: true}`);
});
it('should not remove the unprocessed task from the lists of blocking tasks', () => {
const {tasks, queue} = createQueue(3, 1, {
0: [], // Entry-point #0 does not depend on anything.
1: [0], // Entry-point #1 depends on #0.
2: [0, 1], // Entry-point #2 depends on #0 and #1.
});
// Pick task #0 first, since it is the only one that is not blocked by other tasks.
expect(queue.getNextTask()).toBe(tasks[0]);
// No task available, until task #0 is completed.
expect(queue.getNextTask()).toBe(null);
// Put task #0 back to the unprocessed tasks.
queue.markAsUnprocessed(tasks[0]);
expect(queue.getNextTask()).toBe(tasks[0]);
// Other tasks are still blocked on task #0.
expect(queue.getNextTask()).toBe(null);
});
});
describe('toString()', () => { describe('toString()', () => {
it('should include the `TaskQueue` constructor\'s name', () => { it('should include the `TaskQueue` constructor\'s name', () => {
const {queue} = createQueue(0); const {queue} = createQueue(0);

View File

@ -157,6 +157,36 @@ describe('SerialTaskQueue', () => {
}); });
}); });
describe('markAsUnprocessed()', () => {
it('should mark an in-progress task as unprocessed, so that it can be picked again', () => {
const {queue} = createQueue(3);
const task = queue.getNextTask()!;
expect(() => queue.getNextTask()).toThrow();
queue.markAsUnprocessed(task);
expect(queue.getNextTask()).toBe(task);
});
it('should throw, if the specified task is not in progress', () => {
const {tasks, queue} = createQueue(3);
queue.getNextTask();
queue.markAsCompleted(tasks[0]);
// Try with a task that is already completed.
expect(() => queue.markAsUnprocessed(tasks[0]))
.toThrowError(
`Trying to mark task that was not in progress as unprocessed: ` +
`{entryPoint: entry-point-0, formatProperty: prop-0, processDts: true}`);
// Try with a task that is not yet started.
expect(() => queue.markAsUnprocessed(tasks[2]))
.toThrowError(
`Trying to mark task that was not in progress as unprocessed: ` +
`{entryPoint: entry-point-2, formatProperty: prop-2, processDts: true}`);
});
});
describe('toString()', () => { describe('toString()', () => {
it('should include the `TaskQueue` constructor\'s name', () => { it('should include the `TaskQueue` constructor\'s name', () => {
const {queue} = createQueue(0); const {queue} = createQueue(0);