mirror of https://github.com/apache/druid.git
Use javaOptsArray provided in task context (#12326)
The `javaOpts` property is being read from task context but not `javaOptsArray`. Changes: - Read `javaOptsArray` from task context in `ForkingTaskRunner`. - Add test to verify that `javaOptsArray` in task context takes precedence over `javaOpts`
This commit is contained in:
parent
ee44fe45c6
commit
9c6b9abcde
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.indexing.overlord;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.CharMatcher;
|
||||
|
@ -219,6 +220,24 @@ public class ForkingTaskRunner
|
|||
);
|
||||
}
|
||||
|
||||
// Override task specific javaOptsArray
|
||||
try {
|
||||
List<String> taskJavaOptsArray = jsonMapper.convertValue(
|
||||
task.getContextValue(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY),
|
||||
new TypeReference<List<String>>() {}
|
||||
);
|
||||
if (taskJavaOptsArray != null) {
|
||||
Iterables.addAll(command, taskJavaOptsArray);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IllegalArgumentException(
|
||||
ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY
|
||||
+ " in context of task: " + task.getId() + " must be an array of strings.",
|
||||
e
|
||||
);
|
||||
}
|
||||
|
||||
for (String propName : props.stringPropertyNames()) {
|
||||
for (String allowedPrefix : config.getAllowedPrefixes()) {
|
||||
// See https://github.com/apache/druid/issues/1841
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.indexing.overlord;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -48,9 +49,13 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class ForkingTaskRunnerTest
|
||||
{
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
// This tests the test to make sure the test fails when it should.
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testPatternMatcherFailureForJavaOptions()
|
||||
|
@ -356,4 +361,135 @@ public class ForkingTaskRunnerTest
|
|||
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
|
||||
Assert.assertEquals("task failure test", status.getErrorMsg());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJavaOptsAndJavaOptsArrayOverride() throws ExecutionException, InterruptedException,
|
||||
JsonProcessingException
|
||||
{
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
final String taskContent = "{\n"
|
||||
+ " \"type\" : \"noop\",\n"
|
||||
+ " \"id\" : \"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
|
||||
+ " \"groupId\" : \"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
|
||||
+ " \"dataSource\" : \"none\",\n"
|
||||
+ " \"runTime\" : 2500,\n"
|
||||
+ " \"isReadyTime\" : 0,\n"
|
||||
+ " \"isReadyResult\" : \"YES\",\n"
|
||||
+ " \"firehose\" : null,\n"
|
||||
+ " \"context\" : {\n"
|
||||
+ " \"druid.indexer.runner.javaOptsArray\" : [ \"-Xmx10g\", \"-Xms10g\" ],\n"
|
||||
+ " \"druid.indexer.runner.javaOpts\" : \"-Xmx1g -Xms1g\"\n"
|
||||
+ " }\n"
|
||||
+ "}";
|
||||
final Task task = OBJECT_MAPPER.readValue(taskContent, NoopTask.class);
|
||||
final AtomicInteger xmxJavaOptsIndex = new AtomicInteger(-1);
|
||||
final AtomicInteger xmxJavaOptsArrayIndex = new AtomicInteger(-1);
|
||||
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
|
||||
new ForkingTaskRunnerConfig(),
|
||||
new TaskConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
ImmutableList.of(),
|
||||
false,
|
||||
new Period("PT0S"),
|
||||
new Period("PT10S"),
|
||||
ImmutableList.of(),
|
||||
false,
|
||||
false,
|
||||
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
|
||||
null
|
||||
),
|
||||
new WorkerConfig(),
|
||||
new Properties(),
|
||||
new NoopTaskLogs(),
|
||||
mapper,
|
||||
new DruidNode("middleManager", "host", false, 8091, null, true, false),
|
||||
new StartupLoggingConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
|
||||
{
|
||||
xmxJavaOptsIndex.set(command.indexOf("-Xmx1g"));
|
||||
xmxJavaOptsArrayIndex.set(command.indexOf("-Xmx10g"));
|
||||
|
||||
return Mockito.mock(ProcessHolder.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
};
|
||||
|
||||
forkingTaskRunner.run(task).get();
|
||||
Assert.assertTrue(xmxJavaOptsArrayIndex.get() > xmxJavaOptsIndex.get());
|
||||
Assert.assertTrue(xmxJavaOptsIndex.get() >= 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidTaskContextJavaOptsArray() throws JsonProcessingException
|
||||
{
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
final String taskContent = "{\n"
|
||||
+ " \"type\" : \"noop\",\n"
|
||||
+ " \"id\" : \"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
|
||||
+ " \"groupId\" : \"noop_2022-03-25T05:17:34.929Z_3a074de1-74b8-4f6e-84b5-67996144f9ac\",\n"
|
||||
+ " \"dataSource\" : \"none\",\n"
|
||||
+ " \"runTime\" : 2500,\n"
|
||||
+ " \"isReadyTime\" : 0,\n"
|
||||
+ " \"isReadyResult\" : \"YES\",\n"
|
||||
+ " \"firehose\" : null,\n"
|
||||
+ " \"context\" : {\n"
|
||||
+ " \"druid.indexer.runner.javaOptsArray\" : \"not a string array\",\n"
|
||||
+ " \"druid.indexer.runner.javaOpts\" : \"-Xmx1g -Xms1g\"\n"
|
||||
+ " }\n"
|
||||
+ "}";
|
||||
final Task task = OBJECT_MAPPER.readValue(taskContent, NoopTask.class);
|
||||
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
|
||||
new ForkingTaskRunnerConfig(),
|
||||
new TaskConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
ImmutableList.of(),
|
||||
false,
|
||||
new Period("PT0S"),
|
||||
new Period("PT10S"),
|
||||
ImmutableList.of(),
|
||||
false,
|
||||
false,
|
||||
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
|
||||
null
|
||||
),
|
||||
new WorkerConfig(),
|
||||
new Properties(),
|
||||
new NoopTaskLogs(),
|
||||
mapper,
|
||||
new DruidNode("middleManager", "host", false, 8091, null, true, false),
|
||||
new StartupLoggingConfig()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
|
||||
{
|
||||
return Mockito.mock(ProcessHolder.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
};
|
||||
|
||||
ExecutionException e = Assert.assertThrows(ExecutionException.class, () -> forkingTaskRunner.run(task).get());
|
||||
Assert.assertTrue(e.getMessage().endsWith(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY
|
||||
+ " in context of task: " + task.getId() + " must be an array of strings.")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue