diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index f9c36ef5810..c35e10cb257 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.overlord; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CharMatcher; @@ -219,6 +220,24 @@ public class ForkingTaskRunner ); } + // Override task specific javaOptsArray + try { + List taskJavaOptsArray = jsonMapper.convertValue( + task.getContextValue(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY), + new TypeReference>() {} + ); + if (taskJavaOptsArray != null) { + Iterables.addAll(command, taskJavaOptsArray); + } + } + catch (Exception e) { + throw new IllegalArgumentException( + ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY + + " in context of task: " + task.getId() + " must be an array of strings.", + e + ); + } + for (String propName : props.stringPropertyNames()) { for (String allowedPrefix : config.getAllowedPrefixes()) { // See https://github.com/apache/druid/issues/1841 diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java index 5a1165138fb..ae5c49d4afa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.overlord; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; @@ -48,9 +49,13 @@ import java.io.IOException; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; public class ForkingTaskRunnerTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + // This tests the test to make sure the test fails when it should. @Test(expected = AssertionError.class) public void testPatternMatcherFailureForJavaOptions() @@ -356,4 +361,135 @@ public class ForkingTaskRunnerTest Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); Assert.assertEquals("task failure test", status.getErrorMsg()); } + + @Test + public void testJavaOptsAndJavaOptsArrayOverride() throws ExecutionException, InterruptedException, + JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + final String taskContent = "{\n" + + " \"type\" : \"noop\",\n" + + " \"id\" : \"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n" + + " \"groupId\" : \"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n" + + " \"dataSource\" : \"none\",\n" + + " \"runTime\" : 2500,\n" + + " \"isReadyTime\" : 0,\n" + + " \"isReadyResult\" : \"YES\",\n" + + " \"firehose\" : null,\n" + + " \"context\" : {\n" + + " \"druid.indexer.runner.javaOptsArray\" : [ \"-Xmx10g\", \"-Xms10g\" ],\n" + + " \"druid.indexer.runner.javaOpts\" : \"-Xmx1g -Xms1g\"\n" + + " }\n" + + "}"; + final Task task = OBJECT_MAPPER.readValue(taskContent, NoopTask.class); + final AtomicInteger xmxJavaOptsIndex = new AtomicInteger(-1); + final AtomicInteger xmxJavaOptsArrayIndex = new AtomicInteger(-1); + ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner( + new ForkingTaskRunnerConfig(), + new TaskConfig( + null, + null, + null, + null, + ImmutableList.of(), + false, + new Period("PT0S"), + new Period("PT10S"), + ImmutableList.of(), + false, + false, + TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), + null + ), + new WorkerConfig(), + new Properties(), + new NoopTaskLogs(), + mapper, + new DruidNode("middleManager", "host", false, 8091, null, true, false), + new StartupLoggingConfig() + ) + { + @Override + ProcessHolder runTaskProcess(List command, File logFile, TaskLocation taskLocation) + { + xmxJavaOptsIndex.set(command.indexOf("-Xmx1g")); + xmxJavaOptsArrayIndex.set(command.indexOf("-Xmx10g")); + + return Mockito.mock(ProcessHolder.class); + } + + @Override + int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile) + { + return 1; + } + }; + + forkingTaskRunner.run(task).get(); + Assert.assertTrue(xmxJavaOptsArrayIndex.get() > xmxJavaOptsIndex.get()); + Assert.assertTrue(xmxJavaOptsIndex.get() >= 0); + } + + @Test + public void testInvalidTaskContextJavaOptsArray() throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + final String taskContent = "{\n" + + " \"type\" : \"noop\",\n" + + " \"id\" : \"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n" + + " \"groupId\" : \"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n" + + " \"dataSource\" : \"none\",\n" + + " \"runTime\" : 2500,\n" + + " \"isReadyTime\" : 0,\n" + + " \"isReadyResult\" : \"YES\",\n" + + " \"firehose\" : null,\n" + + " \"context\" : {\n" + + " \"druid.indexer.runner.javaOptsArray\" : \"not a string array\",\n" + + " \"druid.indexer.runner.javaOpts\" : \"-Xmx1g -Xms1g\"\n" + + " }\n" + + "}"; + final Task task = OBJECT_MAPPER.readValue(taskContent, NoopTask.class); + ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner( + new ForkingTaskRunnerConfig(), + new TaskConfig( + null, + null, + null, + null, + ImmutableList.of(), + false, + new Period("PT0S"), + new Period("PT10S"), + ImmutableList.of(), + false, + false, + TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), + null + ), + new WorkerConfig(), + new Properties(), + new NoopTaskLogs(), + mapper, + new DruidNode("middleManager", "host", false, 8091, null, true, false), + new StartupLoggingConfig() + ) + { + @Override + ProcessHolder runTaskProcess(List command, File logFile, TaskLocation taskLocation) + { + return Mockito.mock(ProcessHolder.class); + } + + @Override + int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile) + { + return 1; + } + }; + + ExecutionException e = Assert.assertThrows(ExecutionException.class, () -> forkingTaskRunner.run(task).get()); + Assert.assertTrue(e.getMessage().endsWith(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY + + " in context of task: " + task.getId() + " must be an array of strings.") + ); + } }