Merge pull request #2802 from navis/optimize_multiplepath_concat

Optimize adding lots of paths to pathspec
This commit is contained in:
Navis Ryu 2016-04-12 13:35:28 +09:00 committed by Himanshu
parent 21e406613c
commit 49ef4d96cb
5 changed files with 78 additions and 17 deletions

View File

@ -151,10 +151,8 @@ public class GranularityPathSpec implements PathSpec
}
}
for (String path : paths) {
log.info("Appending path[%s]", path);
StaticPathSpec.addToMultipleInputs(config, job, path, inputFormat);
}
log.info("Appending path %s", paths);
StaticPathSpec.addToMultipleInputs(config, job, paths, inputFormat);
return job;
}

View File

@ -19,6 +19,9 @@
package io.druid.indexer.path;
import com.google.common.base.Functions;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.List;
@ -31,9 +34,9 @@ public class HadoopGlobPathSplitter
* e.g. splitGlob("/a,/b") -> ["/a","/b"]
* splitGlob("/a/{c,d}") -> ["/a/c", "/a/d"]
*/
public static List<StringBuilder> splitGlob(String path)
public static Iterable<String> splitGlob(String path)
{
return splitGlob(new CharStream(path));
return Iterables.transform(splitGlob(new CharStream(path)), Functions.toStringFunction());
}
private static List<StringBuilder> splitGlob(CharStream path)

View File

@ -21,8 +21,13 @@ package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
@ -31,6 +36,7 @@ import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;
import java.util.Set;
public class StaticPathSpec implements PathSpec
@ -72,14 +78,26 @@ public class StaticPathSpec implements PathSpec
return paths;
}
public final static void addToMultipleInputs(
public static void addToMultipleInputs(
HadoopDruidIndexerConfig config,
Job job,
String path,
Class<? extends InputFormat> inputFormatClass
)
{
if (path == null) {
if (path != null) {
addToMultipleInputs(config, job, ImmutableSet.of(path), inputFormatClass);
}
}
public static void addToMultipleInputs(
HadoopDruidIndexerConfig config,
Job job,
Set<String> paths,
Class<? extends InputFormat> inputFormatClass
)
{
if (paths == null || paths.isEmpty()) {
return;
}
@ -96,9 +114,33 @@ public class StaticPathSpec implements PathSpec
// MultipleInputs.addInputPath(job, path, inputFormatClassToUse)
// but have to handle hadoop glob path ourselves correctly
// This change and HadoopGlobPathSplitter.java can be removed once the hadoop issue is fixed
for (StringBuilder sb : HadoopGlobPathSplitter.splitGlob(path)) {
MultipleInputs.addInputPath(job, new Path(sb.toString()), inputFormatClassToUse);
Set<String> pathStrings = Sets.newLinkedHashSet();
for (String path : paths) {
Iterables.addAll(pathStrings, HadoopGlobPathSplitter.splitGlob(path));
}
if (!pathStrings.isEmpty()) {
addInputPath(job, pathStrings, inputFormatClassToUse);
}
}
// copied from MultipleInputs.addInputPath with slight modifications
private static void addInputPath(Job job, Iterable<String> pathStrings, Class<? extends InputFormat> inputFormatClass)
{
Configuration conf = job.getConfiguration();
StringBuilder inputFormats = new StringBuilder(Strings.nullToEmpty(conf.get(MultipleInputs.DIR_FORMATS)));
String[] paths = Iterables.toArray(pathStrings, String.class);
for (int i = 0; i < paths.length - 1; i++) {
if (inputFormats.length() > 0) {
inputFormats.append(',');
}
inputFormats.append(paths[i]).append(';').append(inputFormatClass.getName());
}
if (inputFormats.length() > 0) {
conf.set(MultipleInputs.DIR_FORMATS, inputFormats.toString());
}
// add last one separately for possible initialization in MultipleInputs
MultipleInputs.addInputPath(job, new Path(paths[paths.length - 1]), inputFormatClass);
}
@Override

View File

@ -20,10 +20,10 @@
package io.druid.indexer.path;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
@ -127,11 +127,6 @@ public class HadoopGlobPathSplitterTest
}
private static List<String> splitGlob(String path) {
List<StringBuilder> tmp = HadoopGlobPathSplitter.splitGlob(path);
List<String> result = new ArrayList<>(tmp.size());
for(StringBuilder sb : tmp) {
result.add(sb.toString());
}
return result;
return Lists.newArrayList(HadoopGlobPathSplitter.splitGlob(path));
}
}

View File

@ -20,7 +20,14 @@
package io.druid.indexer.path;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopIOConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.junit.Assert;
import org.junit.Test;
@ -41,6 +48,22 @@ public class StaticPathSpecTest
testSerde("/sample/path", null);
}
@Test
public void testAddingPaths() throws Exception
{
Job job = new Job();
StaticPathSpec pathSpec = new StaticPathSpec("/a/c,/a/b/{c,d}", null);
DataSchema schema = new DataSchema("ds", null, new AggregatorFactory[0], null, jsonMapper);
HadoopIOConfig io = new HadoopIOConfig(null, null, null);
pathSpec.addInputPaths(new HadoopDruidIndexerConfig(new HadoopIngestionSpec(schema, io, null)), job);
String paths = job.getConfiguration().get(MultipleInputs.DIR_FORMATS);
String formatter = TextInputFormat.class.getName();
String[] expected = {"/a/c;" + formatter, "/a/b/c;" + formatter, "/a/b/d;" + formatter};
Assert.assertArrayEquals(expected, paths.split(","));
}
private void testSerde(String path, Class inputFormat) throws Exception
{
StringBuilder sb = new StringBuilder();