diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index a8c48cfd9e6..0fce6272122 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -254,7 +254,7 @@ Middle managers pass their configurations down to their child peons. The middle |`druid.indexer.runner.compressZnodes`|Indicates whether or not the middle managers should compress Znodes.|true| |`druid.indexer.runner.classpath`|Java classpath for the peon.|System.getProperty("java.class.path")| |`druid.indexer.runner.javaCommand`|Command required to execute java.|java| -|`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM.|""| +|`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM. Can be either a string or a json string list. Quotable parameters or parameters with spaces are encouraged to use json string lists|""| |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288| |`druid.indexer.runner.startPort`|The port that peons begin running on.|8100| |`druid.indexer.runner.separateIngestionEndpoint`|Use separate server and consequently separate jetty thread pool for ingesting events|false| diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 63a1180b741..d64397447e4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -21,6 +21,9 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.CharMatcher; import com.google.common.base.Joiner; @@ -45,6 +48,7 @@ import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; import com.metamx.emitter.EmittingLogger; import io.druid.concurrent.Execs; import io.druid.guice.annotations.Self; @@ -75,7 +79,6 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.Callable; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -244,7 +247,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer command.add("-cp"); command.add(taskClasspath); - Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts())); + Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts(), jsonMapper)); // Override task specific javaOpts Object taskJavaOpts = task.getContextValue( @@ -253,7 +256,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer if (taskJavaOpts != null) { Iterables.addAll( command, - new QuotableWhiteSpaceSplitter((String) taskJavaOpts) + new QuotableWhiteSpaceSplitter((String) taskJavaOpts, jsonMapper) ); } @@ -673,16 +676,31 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer */ class QuotableWhiteSpaceSplitter implements Iterable { + private static final Logger LOG = new Logger(QuotableWhiteSpaceSplitter.class); private final String string; + private final ObjectMapper mapper; - public QuotableWhiteSpaceSplitter(String string) + public QuotableWhiteSpaceSplitter(String string, ObjectMapper jsonMapper) { this.string = Preconditions.checkNotNull(string); + this.mapper = jsonMapper; } @Override public Iterator iterator() { + try (JsonParser parser = mapper.getFactory().createParser(string)) { + final JsonToken token = parser.nextToken(); + if (JsonToken.START_ARRAY.equals(token)) { + return mapper.>readValue(string, new TypeReference>() + { + }).iterator(); + } + } + catch (IOException e) { + LOG.debug(e, "Could not parse %s", string); + } + LOG.debug("Not json, hoping it is a good string : %s", string); return Splitter.on( new CharMatcher() { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/ForkingTaskRunnerTest.java index 6a1d3c86502..a000f81df0b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/ForkingTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/ForkingTaskRunnerTest.java @@ -19,14 +19,22 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import io.druid.jackson.DefaultObjectMapper; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; +import java.util.List; + public class ForkingTaskRunnerTest { + private static final ObjectMapper mapper = new DefaultObjectMapper(); // This tests the test to make sure the test fails when it should. @Test(expected = AssertionError.class) public void testPatternMatcherFailureForJavaOptions() @@ -43,7 +51,7 @@ public class ForkingTaskRunnerTest @Test public void testPatternMatcherLeavesUnbalancedQuoteJavaOptions() { - Assert.assertEquals("\"", Iterators.get(new QuotableWhiteSpaceSplitter("\"").iterator(), 0)); + Assert.assertEquals("\"", Iterators.get(new QuotableWhiteSpaceSplitter("\"", mapper).iterator(), 0)); } @Test @@ -88,7 +96,7 @@ public class ForkingTaskRunnerTest @Test public void testEmpty() { - Assert.assertTrue(ImmutableList.copyOf(new QuotableWhiteSpaceSplitter("")).isEmpty()); + Assert.assertTrue(ImmutableList.copyOf(new QuotableWhiteSpaceSplitter("", mapper)).isEmpty()); } @Test @@ -97,7 +105,8 @@ public class ForkingTaskRunnerTest Assert.assertEquals( ImmutableList.of("start", "stop"), ImmutableList.copyOf( new QuotableWhiteSpaceSplitter( - "start\t\t\t\t \n\f\r\n \f\f \n\r\f\n\r\t stop" + "start\t\t\t\t \n\f\r\n \f\f \n\r\f\n\r\t stop", + mapper ) ) ); @@ -108,16 +117,26 @@ public class ForkingTaskRunnerTest { Assert.assertTrue( ImmutableList.copyOf( - new QuotableWhiteSpaceSplitter(" \t \t\t\t\t \n\n \f\f \n\f\r\t") + new QuotableWhiteSpaceSplitter(" \t \t\t\t\t \n\n \f\f \n\f\r\t", mapper) ).isEmpty() ); } private static void checkValues(String[] strings) { + + try { + Assert.assertEquals( + ImmutableList.copyOf(strings), + ImmutableList.copyOf(new QuotableWhiteSpaceSplitter(mapper.writeValueAsString(Arrays.asList(strings)), mapper)) + ); + } + catch (JsonProcessingException e) { + throw Throwables.propagate(e); + } Assert.assertEquals( ImmutableList.copyOf(strings), - ImmutableList.copyOf(new QuotableWhiteSpaceSplitter(Joiner.on(" ").join(strings))) + ImmutableList.copyOf(new QuotableWhiteSpaceSplitter(Joiner.on(" ").join(strings), mapper)) ); } }