diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 83451c490a8..e946fc6149b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -128,10 +128,10 @@ public class ForkingTaskRunner private volatile int numProcessorsPerTask = -1; private volatile boolean stopping = false; - private static final AtomicLong LAST_REPORTED_FAILED_TASK_COUNT = new AtomicLong(); - private static final AtomicLong FAILED_TASK_COUNT = new AtomicLong(); - private static final AtomicLong SUCCESSFUL_TASK_COUNT = new AtomicLong(); - private static final AtomicLong LAST_REPORTED_SUCCESSFUL_TASK_COUNT = new AtomicLong(); + private final AtomicLong lastReportedFailedTaskCount = new AtomicLong(); + private final AtomicLong failedTaskCount = new AtomicLong(); + private final AtomicLong successfulTaskCount = new AtomicLong(); + private final AtomicLong lastReportedSuccessfulTaskCount = new AtomicLong(); @Inject public ForkingTaskRunner( @@ -384,7 +384,7 @@ public class ForkingTaskRunner } LOGGER.info( - "Running command: %s", + "Running command[%s]", getMaskedCommand(startupLoggingConfig.getMaskProperties(), command.getCommandList()) ); taskWorkItem.processHolder = runTaskProcess(command.getCommandList(), logFile, taskLocation); @@ -400,15 +400,15 @@ public class ForkingTaskRunner TaskStatus.running(task.getId()) ); - LOGGER.info("Logging task %s output to: %s", task.getId(), logFile); + LOGGER.info("Logging output of task[%s] to file[%s].", task.getId(), logFile); final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile); final TaskStatus status; if (exitCode == 0) { - LOGGER.info("Process exited successfully for task: %s", task.getId()); + LOGGER.info("Process exited successfully for task[%s]", task.getId()); // Process exited successfully status = jsonMapper.readValue(statusFile, TaskStatus.class); } else { - LOGGER.error("Process exited with code[%d] for task: %s", exitCode, task.getId()); + LOGGER.error("Process exited with code[%d] for task[%s]", exitCode, task.getId()); // Process exited unsuccessfully status = TaskStatus.failure( task.getId(), @@ -420,9 +420,9 @@ public class ForkingTaskRunner ); } if (status.isSuccess()) { - SUCCESSFUL_TASK_COUNT.incrementAndGet(); + successfulTaskCount.incrementAndGet(); } else { - FAILED_TASK_COUNT.incrementAndGet(); + failedTaskCount.incrementAndGet(); } TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); return status; @@ -746,9 +746,9 @@ public class ForkingTaskRunner @Override public Long getWorkerFailedTaskCount() { - long failedTaskCount = FAILED_TASK_COUNT.get(); - long lastReportedFailedTaskCount = LAST_REPORTED_FAILED_TASK_COUNT.get(); - LAST_REPORTED_FAILED_TASK_COUNT.set(failedTaskCount); + long failedTaskCount = this.failedTaskCount.get(); + long lastReportedFailedTaskCount = this.lastReportedFailedTaskCount.get(); + this.lastReportedFailedTaskCount.set(failedTaskCount); return failedTaskCount - lastReportedFailedTaskCount; } @@ -785,9 +785,9 @@ public class ForkingTaskRunner @Override public Long getWorkerSuccessfulTaskCount() { - long successfulTaskCount = SUCCESSFUL_TASK_COUNT.get(); - long lastReportedSuccessfulTaskCount = LAST_REPORTED_SUCCESSFUL_TASK_COUNT.get(); - LAST_REPORTED_SUCCESSFUL_TASK_COUNT.set(successfulTaskCount); + long successfulTaskCount = this.successfulTaskCount.get(); + long lastReportedSuccessfulTaskCount = this.lastReportedSuccessfulTaskCount.get(); + this.lastReportedSuccessfulTaskCount.set(successfulTaskCount); return successfulTaskCount - lastReportedSuccessfulTaskCount; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java index 18980771eff..2c17b3cb7c4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java @@ -438,7 +438,6 @@ public class ForkingTaskRunnerTest @Test public void testInvalidTaskContextJavaOptsArray() throws JsonProcessingException { - ObjectMapper mapper = new DefaultObjectMapper(); final String taskContent = "{\n" + " \"type\" : \"noop\",\n" + " \"id\" : \"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n" @@ -463,7 +462,7 @@ public class ForkingTaskRunnerTest workerConfig, new Properties(), new NoopTaskLogs(), - mapper, + OBJECT_MAPPER, new DruidNode("middleManager", "host", false, 8091, null, true, false), new StartupLoggingConfig(), TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig) @@ -494,7 +493,7 @@ public class ForkingTaskRunnerTest + task.getId() + " must be an array of strings.") ); - Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerFailedTaskCount()); + Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerFailedTaskCount()); Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount()); }