diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index e217b82ee3e..74e9b6295be 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -88,7 +88,6 @@ public class DetermineHashedPartitionsJob implements Jobby ); JobHelper.injectSystemProperties(groupByJob); - JobHelper.setInputFormat(groupByJob, config); groupByJob.setMapperClass(DetermineCardinalityMapper.class); groupByJob.setMapOutputKeyClass(LongWritable.class); groupByJob.setMapOutputValueClass(BytesWritable.class); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index a21e5f7437b..0060303888f 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -126,7 +126,6 @@ public class DeterminePartitionsJob implements Jobby ); JobHelper.injectSystemProperties(groupByJob); - JobHelper.setInputFormat(groupByJob, config); groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class); groupByJob.setMapOutputKeyClass(BytesWritable.class); groupByJob.setMapOutputValueClass(NullWritable.class); @@ -173,7 +172,6 @@ public class DeterminePartitionsJob implements Jobby } else { // Directly read the source data, since we assume it's already grouped. dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class); - JobHelper.setInputFormat(dimSelectionJob, config); config.addInputPaths(dimSelectionJob); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 59f187fb8a2..19d3a7a606d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -55,7 +55,6 @@ import io.druid.timeline.partition.ShardSpecLookup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -354,11 +353,6 @@ public class HadoopDruidIndexerConfig return pathSpec.addInputPaths(this, job); } - public Class getInputFormatClass() - { - return pathSpec.getInputFormat(); - } - /******************************************** Granularity/Bucket Helper Methods ********************************************/ diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 08d852046ff..f8a3b7be9b4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -140,8 +140,6 @@ public class IndexGeneratorJob implements Jobby JobHelper.injectSystemProperties(job); - JobHelper.setInputFormat(job, config); - job.setMapperClass(IndexGeneratorMapper.class); job.setMapOutputValueClass(BytesWritable.class); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 541f561fc48..a9c282ddfad 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -44,8 +44,6 @@ import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Progressable; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -207,17 +205,6 @@ public class JobHelper return true; } - public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfig) - { - if (indexerConfig.getInputFormatClass() != null) { - job.setInputFormatClass(indexerConfig.getInputFormatClass()); - } else if (indexerConfig.isCombineText()) { - job.setInputFormatClass(CombineTextInputFormat.class); - } else { - job.setInputFormatClass(TextInputFormat.class); - } - } - public static DataSegment serializeOutIndex( final DataSegment segmentTemplate, final Configuration configuration, diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java index 6d912eeb65c..536a2fc822c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java @@ -35,7 +35,6 @@ import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.timeline.DataSegment; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; @@ -146,15 +145,11 @@ public class DatasourcePathSpec implements PathSpec job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(ingestionSpec)); job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments)); job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize)); + MultipleInputs.addInputPath(job, new Path("/dummy/tobe/ignored"), DatasourceInputFormat.class); return job; } - public Class getInputFormat() - { - return DatasourceInputFormat.class; - } - @Override public boolean equals(Object o) { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java index 14f8db615f9..e793827b65b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java @@ -30,7 +30,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.format.DateTimeFormat; @@ -152,7 +155,7 @@ public class GranularityPathSpec implements PathSpec for (String path : paths) { log.info("Appending path[%s]", path); - FileInputFormat.addInputPath(job, new Path(path)); + StaticPathSpec.addToMultipleInputs(config, job, path, inputFormat); } return job; diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/MultiplePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/MultiplePathSpec.java new file mode 100644 index 00000000000..7c8808f00e7 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/MultiplePathSpec.java @@ -0,0 +1,80 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import io.druid.indexer.HadoopDruidIndexerConfig; +import org.apache.hadoop.mapreduce.Job; + +import java.io.IOException; +import java.util.List; + +public class MultiplePathSpec implements PathSpec +{ + private List children; + + public MultiplePathSpec( + @JsonProperty("children") List children + ) + { + Preconditions.checkArgument(children != null && children.size() > 0, "Null/Empty list of child PathSpecs"); + this.children = children; + } + + @JsonProperty + public List getChildren() + { + return children; + } + + @Override + public Job addInputPaths( + HadoopDruidIndexerConfig config, Job job + ) throws IOException + { + for(PathSpec spec : children) { + spec.addInputPaths(config, job); + } + return job; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MultiplePathSpec that = (MultiplePathSpec) o; + + return children.equals(that.children); + + } + + @Override + public int hashCode() + { + return children.hashCode(); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java index f1de5131689..f7433fc0cc7 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java @@ -20,8 +20,6 @@ package io.druid.indexer.path; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.indexer.HadoopDruidIndexerConfig; - -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; @@ -33,10 +31,10 @@ import java.io.IOException; @JsonSubTypes.Type(name="granular_unprocessed", value=GranularUnprocessedPathSpec.class), @JsonSubTypes.Type(name="granularity", value=GranularityPathSpec.class), @JsonSubTypes.Type(name="static", value=StaticPathSpec.class), - @JsonSubTypes.Type(name="dataSource", value=DatasourcePathSpec.class) + @JsonSubTypes.Type(name="dataSource", value=DatasourcePathSpec.class), + @JsonSubTypes.Type(name="multi", value=MultiplePathSpec.class) }) public interface PathSpec { public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException; - public Class getInputFormat(); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java index c6cc63d794a..b0f5e19e188 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java @@ -20,10 +20,12 @@ package io.druid.indexer.path; import com.fasterxml.jackson.annotation.JsonProperty; import com.metamx.common.logger.Logger; import io.druid.indexer.HadoopDruidIndexerConfig; - +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.IOException; @@ -60,7 +62,9 @@ public class StaticPathSpec implements PathSpec public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException { log.info("Adding paths[%s]", paths); - FileInputFormat.addInputPaths(job, paths); + + addToMultipleInputs(config, job, paths, inputFormat); + return job; } @@ -68,4 +72,54 @@ public class StaticPathSpec implements PathSpec { return inputFormat; } + + public String getPaths() + { + return paths; + } + + public final static void addToMultipleInputs( + HadoopDruidIndexerConfig config, + Job job, + String path, + Class inputFormatClass + ) + { + if (inputFormatClass == null) { + if (config.isCombineText()) { + MultipleInputs.addInputPath(job, new Path(path), CombineTextInputFormat.class); + } else { + MultipleInputs.addInputPath(job, new Path(path), TextInputFormat.class); + } + } else { + MultipleInputs.addInputPath(job, new Path(path), inputFormatClass); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + StaticPathSpec that = (StaticPathSpec) o; + + if (paths != null ? !paths.equals(that.paths) : that.paths != null) { + return false; + } + return !(inputFormat != null ? !inputFormat.equals(that.inputFormat) : that.inputFormat != null); + + } + + @Override + public int hashCode() + { + int result = paths != null ? paths.hashCode() : 0; + result = 31 * result + (inputFormat != null ? inputFormat.hashCode() : 0); + return result; + } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index a7c5fc2033a..fcad18866ff 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -76,18 +76,18 @@ public class GranularityPathSpecTest } @Test - public void testDeserialization() throws Exception + public void testSerdeCustomInputFormat() throws Exception { - testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class); + testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class); } @Test - public void testDeserializationNoInputFormat() throws Exception + public void testSerdeNoInputFormat() throws Exception { - testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, null); + testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, null); } - private void testDeserialization( + private void testSerde( String inputPath, String filePattern, String pathFormat, @@ -114,7 +114,7 @@ public class GranularityPathSpecTest } sb.append("\"type\" : \"granularity\"}"); - GranularityPathSpec pathSpec = (GranularityPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class); + GranularityPathSpec pathSpec = (GranularityPathSpec) StaticPathSpecTest.readWriteRead(sb.toString(), jsonMapper); Assert.assertEquals(inputFormat, pathSpec.getInputFormat()); Assert.assertEquals(inputPath, pathSpec.getInputPath()); Assert.assertEquals(filePattern, pathSpec.getFilePattern()); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/MultiplePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/MultiplePathSpecTest.java new file mode 100644 index 00000000000..168d41e1953 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/MultiplePathSpecTest.java @@ -0,0 +1,65 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import io.druid.jackson.DefaultObjectMapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class MultiplePathSpecTest +{ + @Test + public void testSerde() throws Exception + { + PathSpec expected = new MultiplePathSpec( + Lists.newArrayList( + new StaticPathSpec("/tmp/path1", null), + new StaticPathSpec("/tmp/path2", TextInputFormat.class) + ) + ); + + ObjectMapper jsonMapper = new DefaultObjectMapper(); + + PathSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class); + Assert.assertEquals(expected, actual); + } + + @Test + public void testAddInputPaths() throws Exception + { + PathSpec ps1 = EasyMock.createMock(PathSpec.class); + EasyMock.expect(ps1.addInputPaths(null, null)).andReturn(null); + + PathSpec ps2 = EasyMock.createMock(PathSpec.class); + EasyMock.expect(ps2.addInputPaths(null, null)).andReturn(null); + + EasyMock.replay(ps1, ps2); + + new MultiplePathSpec(Lists.newArrayList(ps1, ps2)).addInputPaths(null, null); + + EasyMock.verify(ps1, ps2); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java index c8bf9a8575a..b654d7849f0 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java @@ -21,8 +21,6 @@ package io.druid.indexer.path; import io.druid.jackson.DefaultObjectMapper; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.junit.Assert; import org.junit.Test; @@ -34,18 +32,18 @@ public class StaticPathSpecTest private final ObjectMapper jsonMapper = new DefaultObjectMapper(); @Test - public void testDeserialization() throws Exception + public void testSerdeCustomInputFormat() throws Exception { - testDeserialization("/sample/path", TextInputFormat.class); + testSerde("/sample/path", TextInputFormat.class); } @Test public void testDeserializationNoInputFormat() throws Exception { - testDeserialization("/sample/path", null); + testSerde("/sample/path", null); } - private void testDeserialization(String path, Class inputFormat) throws Exception + private void testSerde(String path, Class inputFormat) throws Exception { StringBuilder sb = new StringBuilder(); sb.append("{\"paths\" : \""); @@ -57,13 +55,22 @@ public class StaticPathSpecTest sb.append("\","); } sb.append("\"type\" : \"static\"}"); - StaticPathSpec pathSpec = (StaticPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class); + + StaticPathSpec pathSpec = (StaticPathSpec) readWriteRead(sb.toString(), jsonMapper); Assert.assertEquals(inputFormat, pathSpec.getInputFormat()); - - Job job = Job.getInstance(); - pathSpec.addInputPaths(null, job); - Assert.assertEquals( - "file:" + path, - job.getConfiguration().get(FileInputFormat.INPUT_DIR)); + Assert.assertEquals(path, pathSpec.getPaths()); + } + + public static final PathSpec readWriteRead(String jsonStr, ObjectMapper jsonMapper) throws Exception + { + return jsonMapper.readValue( + jsonMapper.writeValueAsString( + jsonMapper.readValue( + jsonStr, + PathSpec.class + ) + ), + PathSpec.class + ); } }