mirror of https://github.com/apache/druid.git
Merge pull request #2439 from metamx/fix2435
Make QuotableWhiteSpaceSplitter able to take JSON
This commit is contained in:
commit
5779b32742
|
@ -254,7 +254,7 @@ Middle managers pass their configurations down to their child peons. The middle
|
|||
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the middle managers should compress Znodes.|true|
|
||||
|`druid.indexer.runner.classpath`|Java classpath for the peon.|System.getProperty("java.class.path")|
|
||||
|`druid.indexer.runner.javaCommand`|Command required to execute java.|java|
|
||||
|`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM.|""|
|
||||
|`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM. Can be either a string or a json string list. Quotable parameters or parameters with spaces are encouraged to use json string lists|""|
|
||||
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|
||||
|`druid.indexer.runner.startPort`|The port that peons begin running on.|8100|
|
||||
|`druid.indexer.runner.separateIngestionEndpoint`|Use separate server and consequently separate jetty thread pool for ingesting events|false|
|
||||
|
|
|
@ -21,6 +21,9 @@ package io.druid.indexing.overlord;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.CharMatcher;
|
||||
import com.google.common.base.Joiner;
|
||||
|
@ -45,6 +48,7 @@ import com.google.inject.Inject;
|
|||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.guice.annotations.Self;
|
||||
|
@ -75,7 +79,6 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
|
@ -244,7 +247,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
command.add("-cp");
|
||||
command.add(taskClasspath);
|
||||
|
||||
Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
|
||||
Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts(), jsonMapper));
|
||||
|
||||
// Override task specific javaOpts
|
||||
Object taskJavaOpts = task.getContextValue(
|
||||
|
@ -253,7 +256,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
if (taskJavaOpts != null) {
|
||||
Iterables.addAll(
|
||||
command,
|
||||
new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
|
||||
new QuotableWhiteSpaceSplitter((String) taskJavaOpts, jsonMapper)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -673,16 +676,31 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
*/
|
||||
class QuotableWhiteSpaceSplitter implements Iterable<String>
|
||||
{
|
||||
private static final Logger LOG = new Logger(QuotableWhiteSpaceSplitter.class);
|
||||
private final String string;
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
public QuotableWhiteSpaceSplitter(String string)
|
||||
public QuotableWhiteSpaceSplitter(String string, ObjectMapper jsonMapper)
|
||||
{
|
||||
this.string = Preconditions.checkNotNull(string);
|
||||
this.mapper = jsonMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<String> iterator()
|
||||
{
|
||||
try (JsonParser parser = mapper.getFactory().createParser(string)) {
|
||||
final JsonToken token = parser.nextToken();
|
||||
if (JsonToken.START_ARRAY.equals(token)) {
|
||||
return mapper.<List<String>>readValue(string, new TypeReference<List<String>>()
|
||||
{
|
||||
}).iterator();
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
LOG.debug(e, "Could not parse %s", string);
|
||||
}
|
||||
LOG.debug("Not json, hoping it is a good string : %s", string);
|
||||
return Splitter.on(
|
||||
new CharMatcher()
|
||||
{
|
||||
|
|
|
@ -19,14 +19,22 @@
|
|||
|
||||
package io.druid.indexing.overlord;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterators;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class ForkingTaskRunnerTest
|
||||
{
|
||||
private static final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
// This tests the test to make sure the test fails when it should.
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testPatternMatcherFailureForJavaOptions()
|
||||
|
@ -43,7 +51,7 @@ public class ForkingTaskRunnerTest
|
|||
@Test
|
||||
public void testPatternMatcherLeavesUnbalancedQuoteJavaOptions()
|
||||
{
|
||||
Assert.assertEquals("\"", Iterators.get(new QuotableWhiteSpaceSplitter("\"").iterator(), 0));
|
||||
Assert.assertEquals("\"", Iterators.get(new QuotableWhiteSpaceSplitter("\"", mapper).iterator(), 0));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -88,7 +96,7 @@ public class ForkingTaskRunnerTest
|
|||
@Test
|
||||
public void testEmpty()
|
||||
{
|
||||
Assert.assertTrue(ImmutableList.copyOf(new QuotableWhiteSpaceSplitter("")).isEmpty());
|
||||
Assert.assertTrue(ImmutableList.copyOf(new QuotableWhiteSpaceSplitter("", mapper)).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -97,7 +105,8 @@ public class ForkingTaskRunnerTest
|
|||
Assert.assertEquals(
|
||||
ImmutableList.of("start", "stop"), ImmutableList.copyOf(
|
||||
new QuotableWhiteSpaceSplitter(
|
||||
"start\t\t\t\t \n\f\r\n \f\f \n\r\f\n\r\t stop"
|
||||
"start\t\t\t\t \n\f\r\n \f\f \n\r\f\n\r\t stop",
|
||||
mapper
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -108,16 +117,26 @@ public class ForkingTaskRunnerTest
|
|||
{
|
||||
Assert.assertTrue(
|
||||
ImmutableList.copyOf(
|
||||
new QuotableWhiteSpaceSplitter(" \t \t\t\t\t \n\n \f\f \n\f\r\t")
|
||||
new QuotableWhiteSpaceSplitter(" \t \t\t\t\t \n\n \f\f \n\f\r\t", mapper)
|
||||
).isEmpty()
|
||||
);
|
||||
}
|
||||
|
||||
private static void checkValues(String[] strings)
|
||||
{
|
||||
|
||||
try {
|
||||
Assert.assertEquals(
|
||||
ImmutableList.copyOf(strings),
|
||||
ImmutableList.copyOf(new QuotableWhiteSpaceSplitter(mapper.writeValueAsString(Arrays.asList(strings)), mapper))
|
||||
);
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
Assert.assertEquals(
|
||||
ImmutableList.copyOf(strings),
|
||||
ImmutableList.copyOf(new QuotableWhiteSpaceSplitter(Joiner.on(" ").join(strings)))
|
||||
ImmutableList.copyOf(new QuotableWhiteSpaceSplitter(Joiner.on(" ").join(strings), mapper))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue