mirror of https://github.com/apache/druid.git
Fix `ForkingTaskRunnerTest` (#16323)
Changes: - Use non-static fields to track task counts in `ForkingTaskRunner` - Update assertions in `ForkingTaskRunnerTest` to ensure that the tests are idempotent
This commit is contained in:
parent
4bdc1890f7
commit
1dabb02843
|
@ -128,10 +128,10 @@ public class ForkingTaskRunner
|
||||||
private volatile int numProcessorsPerTask = -1;
|
private volatile int numProcessorsPerTask = -1;
|
||||||
private volatile boolean stopping = false;
|
private volatile boolean stopping = false;
|
||||||
|
|
||||||
private static final AtomicLong LAST_REPORTED_FAILED_TASK_COUNT = new AtomicLong();
|
private final AtomicLong lastReportedFailedTaskCount = new AtomicLong();
|
||||||
private static final AtomicLong FAILED_TASK_COUNT = new AtomicLong();
|
private final AtomicLong failedTaskCount = new AtomicLong();
|
||||||
private static final AtomicLong SUCCESSFUL_TASK_COUNT = new AtomicLong();
|
private final AtomicLong successfulTaskCount = new AtomicLong();
|
||||||
private static final AtomicLong LAST_REPORTED_SUCCESSFUL_TASK_COUNT = new AtomicLong();
|
private final AtomicLong lastReportedSuccessfulTaskCount = new AtomicLong();
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ForkingTaskRunner(
|
public ForkingTaskRunner(
|
||||||
|
@ -384,7 +384,7 @@ public class ForkingTaskRunner
|
||||||
}
|
}
|
||||||
|
|
||||||
LOGGER.info(
|
LOGGER.info(
|
||||||
"Running command: %s",
|
"Running command[%s]",
|
||||||
getMaskedCommand(startupLoggingConfig.getMaskProperties(), command.getCommandList())
|
getMaskedCommand(startupLoggingConfig.getMaskProperties(), command.getCommandList())
|
||||||
);
|
);
|
||||||
taskWorkItem.processHolder = runTaskProcess(command.getCommandList(), logFile, taskLocation);
|
taskWorkItem.processHolder = runTaskProcess(command.getCommandList(), logFile, taskLocation);
|
||||||
|
@ -400,15 +400,15 @@ public class ForkingTaskRunner
|
||||||
TaskStatus.running(task.getId())
|
TaskStatus.running(task.getId())
|
||||||
);
|
);
|
||||||
|
|
||||||
LOGGER.info("Logging task %s output to: %s", task.getId(), logFile);
|
LOGGER.info("Logging output of task[%s] to file[%s].", task.getId(), logFile);
|
||||||
final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
|
final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
|
||||||
final TaskStatus status;
|
final TaskStatus status;
|
||||||
if (exitCode == 0) {
|
if (exitCode == 0) {
|
||||||
LOGGER.info("Process exited successfully for task: %s", task.getId());
|
LOGGER.info("Process exited successfully for task[%s]", task.getId());
|
||||||
// Process exited successfully
|
// Process exited successfully
|
||||||
status = jsonMapper.readValue(statusFile, TaskStatus.class);
|
status = jsonMapper.readValue(statusFile, TaskStatus.class);
|
||||||
} else {
|
} else {
|
||||||
LOGGER.error("Process exited with code[%d] for task: %s", exitCode, task.getId());
|
LOGGER.error("Process exited with code[%d] for task[%s]", exitCode, task.getId());
|
||||||
// Process exited unsuccessfully
|
// Process exited unsuccessfully
|
||||||
status = TaskStatus.failure(
|
status = TaskStatus.failure(
|
||||||
task.getId(),
|
task.getId(),
|
||||||
|
@ -420,9 +420,9 @@ public class ForkingTaskRunner
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if (status.isSuccess()) {
|
if (status.isSuccess()) {
|
||||||
SUCCESSFUL_TASK_COUNT.incrementAndGet();
|
successfulTaskCount.incrementAndGet();
|
||||||
} else {
|
} else {
|
||||||
FAILED_TASK_COUNT.incrementAndGet();
|
failedTaskCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
|
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
|
||||||
return status;
|
return status;
|
||||||
|
@ -746,9 +746,9 @@ public class ForkingTaskRunner
|
||||||
@Override
|
@Override
|
||||||
public Long getWorkerFailedTaskCount()
|
public Long getWorkerFailedTaskCount()
|
||||||
{
|
{
|
||||||
long failedTaskCount = FAILED_TASK_COUNT.get();
|
long failedTaskCount = this.failedTaskCount.get();
|
||||||
long lastReportedFailedTaskCount = LAST_REPORTED_FAILED_TASK_COUNT.get();
|
long lastReportedFailedTaskCount = this.lastReportedFailedTaskCount.get();
|
||||||
LAST_REPORTED_FAILED_TASK_COUNT.set(failedTaskCount);
|
this.lastReportedFailedTaskCount.set(failedTaskCount);
|
||||||
return failedTaskCount - lastReportedFailedTaskCount;
|
return failedTaskCount - lastReportedFailedTaskCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -785,9 +785,9 @@ public class ForkingTaskRunner
|
||||||
@Override
|
@Override
|
||||||
public Long getWorkerSuccessfulTaskCount()
|
public Long getWorkerSuccessfulTaskCount()
|
||||||
{
|
{
|
||||||
long successfulTaskCount = SUCCESSFUL_TASK_COUNT.get();
|
long successfulTaskCount = this.successfulTaskCount.get();
|
||||||
long lastReportedSuccessfulTaskCount = LAST_REPORTED_SUCCESSFUL_TASK_COUNT.get();
|
long lastReportedSuccessfulTaskCount = this.lastReportedSuccessfulTaskCount.get();
|
||||||
LAST_REPORTED_SUCCESSFUL_TASK_COUNT.set(successfulTaskCount);
|
this.lastReportedSuccessfulTaskCount.set(successfulTaskCount);
|
||||||
return successfulTaskCount - lastReportedSuccessfulTaskCount;
|
return successfulTaskCount - lastReportedSuccessfulTaskCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -438,7 +438,6 @@ public class ForkingTaskRunnerTest
|
||||||
@Test
|
@Test
|
||||||
public void testInvalidTaskContextJavaOptsArray() throws JsonProcessingException
|
public void testInvalidTaskContextJavaOptsArray() throws JsonProcessingException
|
||||||
{
|
{
|
||||||
ObjectMapper mapper = new DefaultObjectMapper();
|
|
||||||
final String taskContent = "{\n"
|
final String taskContent = "{\n"
|
||||||
+ " \"type\" : \"noop\",\n"
|
+ " \"type\" : \"noop\",\n"
|
||||||
+ " \"id\" : \"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
|
+ " \"id\" : \"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
|
||||||
|
@ -463,7 +462,7 @@ public class ForkingTaskRunnerTest
|
||||||
workerConfig,
|
workerConfig,
|
||||||
new Properties(),
|
new Properties(),
|
||||||
new NoopTaskLogs(),
|
new NoopTaskLogs(),
|
||||||
mapper,
|
OBJECT_MAPPER,
|
||||||
new DruidNode("middleManager", "host", false, 8091, null, true, false),
|
new DruidNode("middleManager", "host", false, 8091, null, true, false),
|
||||||
new StartupLoggingConfig(),
|
new StartupLoggingConfig(),
|
||||||
TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
|
TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
|
||||||
|
@ -494,7 +493,7 @@ public class ForkingTaskRunnerTest
|
||||||
+ task.getId()
|
+ task.getId()
|
||||||
+ " must be an array of strings.")
|
+ " must be an array of strings.")
|
||||||
);
|
);
|
||||||
Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerFailedTaskCount());
|
Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerFailedTaskCount());
|
||||||
Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount());
|
Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue