diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java index b516c2037b1..daf346b6b8f 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java @@ -151,10 +151,8 @@ public class GranularityPathSpec implements PathSpec } } - for (String path : paths) { - log.info("Appending path[%s]", path); - StaticPathSpec.addToMultipleInputs(config, job, path, inputFormat); - } + log.info("Appending path %s", paths); + StaticPathSpec.addToMultipleInputs(config, job, paths, inputFormat); return job; } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/HadoopGlobPathSplitter.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/HadoopGlobPathSplitter.java index 1684b5b8434..da26b856309 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/HadoopGlobPathSplitter.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/HadoopGlobPathSplitter.java @@ -19,6 +19,9 @@ package io.druid.indexer.path; +import com.google.common.base.Functions; +import com.google.common.collect.Iterables; + import java.util.ArrayList; import java.util.List; @@ -31,9 +34,9 @@ public class HadoopGlobPathSplitter * e.g. splitGlob("/a,/b") -> ["/a","/b"] * splitGlob("/a/{c,d}") -> ["/a/c", "/a/d"] */ - public static List splitGlob(String path) + public static Iterable splitGlob(String path) { - return splitGlob(new CharStream(path)); + return Iterables.transform(splitGlob(new CharStream(path)), Functions.toStringFunction()); } private static List splitGlob(CharStream path) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java index 85210fa2dc3..fd532980ba1 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java @@ -21,8 +21,13 @@ package io.druid.indexer.path; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import com.metamx.common.logger.Logger; import io.druid.indexer.HadoopDruidIndexerConfig; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -31,6 +36,7 @@ import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.IOException; +import java.util.Set; public class StaticPathSpec implements PathSpec @@ -72,14 +78,26 @@ public class StaticPathSpec implements PathSpec return paths; } - public final static void addToMultipleInputs( + public static void addToMultipleInputs( HadoopDruidIndexerConfig config, Job job, String path, Class inputFormatClass ) { - if (path == null) { + if (path != null) { + addToMultipleInputs(config, job, ImmutableSet.of(path), inputFormatClass); + } + } + + public static void addToMultipleInputs( + HadoopDruidIndexerConfig config, + Job job, + Set paths, + Class inputFormatClass + ) + { + if (paths == null || paths.isEmpty()) { return; } @@ -96,9 +114,33 @@ public class StaticPathSpec implements PathSpec // MultipleInputs.addInputPath(job, path, inputFormatClassToUse) // but have to handle hadoop glob path ourselves correctly // This change and HadoopGlobPathSplitter.java can be removed once the hadoop issue is fixed - for (StringBuilder sb : HadoopGlobPathSplitter.splitGlob(path)) { - MultipleInputs.addInputPath(job, new Path(sb.toString()), inputFormatClassToUse); + Set pathStrings = Sets.newLinkedHashSet(); + for (String path : paths) { + Iterables.addAll(pathStrings, HadoopGlobPathSplitter.splitGlob(path)); } + if (!pathStrings.isEmpty()) { + addInputPath(job, pathStrings, inputFormatClassToUse); + } + } + + // copied from MultipleInputs.addInputPath with slight modifications + private static void addInputPath(Job job, Iterable pathStrings, Class inputFormatClass) + { + Configuration conf = job.getConfiguration(); + StringBuilder inputFormats = new StringBuilder(Strings.nullToEmpty(conf.get(MultipleInputs.DIR_FORMATS))); + + String[] paths = Iterables.toArray(pathStrings, String.class); + for (int i = 0; i < paths.length - 1; i++) { + if (inputFormats.length() > 0) { + inputFormats.append(','); + } + inputFormats.append(paths[i]).append(';').append(inputFormatClass.getName()); + } + if (inputFormats.length() > 0) { + conf.set(MultipleInputs.DIR_FORMATS, inputFormats.toString()); + } + // add last one separately for possible initialization in MultipleInputs + MultipleInputs.addInputPath(job, new Path(paths[paths.length - 1]), inputFormatClass); } @Override diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/HadoopGlobPathSplitterTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/HadoopGlobPathSplitterTest.java index bbbfee01b18..38057266de4 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/HadoopGlobPathSplitterTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/HadoopGlobPathSplitterTest.java @@ -20,10 +20,10 @@ package io.druid.indexer.path; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Test; -import java.util.ArrayList; import java.util.List; /** @@ -127,11 +127,6 @@ public class HadoopGlobPathSplitterTest } private static List splitGlob(String path) { - List tmp = HadoopGlobPathSplitter.splitGlob(path); - List result = new ArrayList<>(tmp.size()); - for(StringBuilder sb : tmp) { - result.add(sb.toString()); - } - return result; + return Lists.newArrayList(HadoopGlobPathSplitter.splitGlob(path)); } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java index 53846946dab..7cc9a3fdf20 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java @@ -20,7 +20,14 @@ package io.druid.indexer.path; import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopIOConfig; +import io.druid.indexer.HadoopIngestionSpec; import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.junit.Assert; import org.junit.Test; @@ -41,6 +48,22 @@ public class StaticPathSpecTest testSerde("/sample/path", null); } + @Test + public void testAddingPaths() throws Exception + { + Job job = new Job(); + StaticPathSpec pathSpec = new StaticPathSpec("/a/c,/a/b/{c,d}", null); + + DataSchema schema = new DataSchema("ds", null, new AggregatorFactory[0], null, jsonMapper); + HadoopIOConfig io = new HadoopIOConfig(null, null, null); + pathSpec.addInputPaths(new HadoopDruidIndexerConfig(new HadoopIngestionSpec(schema, io, null)), job); + + String paths = job.getConfiguration().get(MultipleInputs.DIR_FORMATS); + String formatter = TextInputFormat.class.getName(); + String[] expected = {"/a/c;" + formatter, "/a/b/c;" + formatter, "/a/b/d;" + formatter}; + Assert.assertArrayEquals(expected, paths.split(",")); + } + private void testSerde(String path, Class inputFormat) throws Exception { StringBuilder sb = new StringBuilder();