add ability to specify Multiple PathSpecs in batch ingestion, so that we can grab data from multiple places in same ingestion

Conflicts:
	indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java
	indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java

Conflicts:
	indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java
This commit is contained in:
Himanshu Gupta 2015-05-18 13:41:06 -05:00
parent 1ae56f139b
commit 45947a1021
13 changed files with 235 additions and 57 deletions

View File

@ -88,7 +88,6 @@ public class DetermineHashedPartitionsJob implements Jobby
);
JobHelper.injectSystemProperties(groupByJob);
JobHelper.setInputFormat(groupByJob, config);
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
groupByJob.setMapOutputKeyClass(LongWritable.class);
groupByJob.setMapOutputValueClass(BytesWritable.class);

View File

@ -126,7 +126,6 @@ public class DeterminePartitionsJob implements Jobby
);
JobHelper.injectSystemProperties(groupByJob);
JobHelper.setInputFormat(groupByJob, config);
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
groupByJob.setMapOutputKeyClass(BytesWritable.class);
groupByJob.setMapOutputValueClass(NullWritable.class);
@ -173,7 +172,6 @@ public class DeterminePartitionsJob implements Jobby
} else {
// Directly read the source data, since we assume it's already grouped.
dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class);
JobHelper.setInputFormat(dimSelectionJob, config);
config.addInputPaths(dimSelectionJob);
}

View File

@ -55,7 +55,6 @@ import io.druid.timeline.partition.ShardSpecLookup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -354,11 +353,6 @@ public class HadoopDruidIndexerConfig
return pathSpec.addInputPaths(this, job);
}
public Class<? extends InputFormat> getInputFormatClass()
{
return pathSpec.getInputFormat();
}
/********************************************
Granularity/Bucket Helper Methods
********************************************/

View File

@ -140,8 +140,6 @@ public class IndexGeneratorJob implements Jobby
JobHelper.injectSystemProperties(job);
JobHelper.setInputFormat(job, config);
job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(BytesWritable.class);

View File

@ -44,8 +44,6 @@ import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -207,17 +205,6 @@ public class JobHelper
return true;
}
public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfig)
{
if (indexerConfig.getInputFormatClass() != null) {
job.setInputFormatClass(indexerConfig.getInputFormatClass());
} else if (indexerConfig.isCombineText()) {
job.setInputFormatClass(CombineTextInputFormat.class);
} else {
job.setInputFormatClass(TextInputFormat.class);
}
}
public static DataSegment serializeOutIndex(
final DataSegment segmentTemplate,
final Configuration configuration,

View File

@ -35,7 +35,6 @@ import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
@ -146,15 +145,11 @@ public class DatasourcePathSpec implements PathSpec
job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(ingestionSpec));
job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments));
job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize));
MultipleInputs.addInputPath(job, new Path("/dummy/tobe/ignored"), DatasourceInputFormat.class);
return job;
}
public Class<? extends InputFormat> getInputFormat()
{
return DatasourceInputFormat.class;
}
@Override
public boolean equals(Object o)
{

View File

@ -30,7 +30,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.DateTimeFormat;
@ -152,7 +155,7 @@ public class GranularityPathSpec implements PathSpec
for (String path : paths) {
log.info("Appending path[%s]", path);
FileInputFormat.addInputPath(job, new Path(path));
StaticPathSpec.addToMultipleInputs(config, job, path, inputFormat);
}
return job;

View File

@ -0,0 +1,80 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
import java.util.List;
public class MultiplePathSpec implements PathSpec
{
private List<PathSpec> children;
public MultiplePathSpec(
@JsonProperty("children") List<PathSpec> children
)
{
Preconditions.checkArgument(children != null && children.size() > 0, "Null/Empty list of child PathSpecs");
this.children = children;
}
@JsonProperty
public List<PathSpec> getChildren()
{
return children;
}
@Override
public Job addInputPaths(
HadoopDruidIndexerConfig config, Job job
) throws IOException
{
for(PathSpec spec : children) {
spec.addInputPaths(config, job);
}
return job;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MultiplePathSpec that = (MultiplePathSpec) o;
return children.equals(that.children);
}
@Override
public int hashCode()
{
return children.hashCode();
}
}

View File

@ -20,8 +20,6 @@ package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
@ -33,10 +31,10 @@ import java.io.IOException;
@JsonSubTypes.Type(name="granular_unprocessed", value=GranularUnprocessedPathSpec.class),
@JsonSubTypes.Type(name="granularity", value=GranularityPathSpec.class),
@JsonSubTypes.Type(name="static", value=StaticPathSpec.class),
@JsonSubTypes.Type(name="dataSource", value=DatasourcePathSpec.class)
@JsonSubTypes.Type(name="dataSource", value=DatasourcePathSpec.class),
@JsonSubTypes.Type(name="multi", value=MultiplePathSpec.class)
})
public interface PathSpec
{
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException;
public Class<? extends InputFormat> getInputFormat();
}

View File

@ -20,10 +20,12 @@ package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.logger.Logger;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;
@ -60,7 +62,9 @@ public class StaticPathSpec implements PathSpec
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException
{
log.info("Adding paths[%s]", paths);
FileInputFormat.addInputPaths(job, paths);
addToMultipleInputs(config, job, paths, inputFormat);
return job;
}
@ -68,4 +72,54 @@ public class StaticPathSpec implements PathSpec
{
return inputFormat;
}
public String getPaths()
{
return paths;
}
public final static void addToMultipleInputs(
HadoopDruidIndexerConfig config,
Job job,
String path,
Class<? extends InputFormat> inputFormatClass
)
{
if (inputFormatClass == null) {
if (config.isCombineText()) {
MultipleInputs.addInputPath(job, new Path(path), CombineTextInputFormat.class);
} else {
MultipleInputs.addInputPath(job, new Path(path), TextInputFormat.class);
}
} else {
MultipleInputs.addInputPath(job, new Path(path), inputFormatClass);
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StaticPathSpec that = (StaticPathSpec) o;
if (paths != null ? !paths.equals(that.paths) : that.paths != null) {
return false;
}
return !(inputFormat != null ? !inputFormat.equals(that.inputFormat) : that.inputFormat != null);
}
@Override
public int hashCode()
{
int result = paths != null ? paths.hashCode() : 0;
result = 31 * result + (inputFormat != null ? inputFormat.hashCode() : 0);
return result;
}
}

View File

@ -76,18 +76,18 @@ public class GranularityPathSpecTest
}
@Test
public void testDeserialization() throws Exception
public void testSerdeCustomInputFormat() throws Exception
{
testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class);
testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class);
}
@Test
public void testDeserializationNoInputFormat() throws Exception
public void testSerdeNoInputFormat() throws Exception
{
testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, null);
testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, null);
}
private void testDeserialization(
private void testSerde(
String inputPath,
String filePattern,
String pathFormat,
@ -114,7 +114,7 @@ public class GranularityPathSpecTest
}
sb.append("\"type\" : \"granularity\"}");
GranularityPathSpec pathSpec = (GranularityPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class);
GranularityPathSpec pathSpec = (GranularityPathSpec) StaticPathSpecTest.readWriteRead(sb.toString(), jsonMapper);
Assert.assertEquals(inputFormat, pathSpec.getInputFormat());
Assert.assertEquals(inputPath, pathSpec.getInputPath());
Assert.assertEquals(filePattern, pathSpec.getFilePattern());

View File

@ -0,0 +1,65 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class MultiplePathSpecTest
{
@Test
public void testSerde() throws Exception
{
PathSpec expected = new MultiplePathSpec(
Lists.<PathSpec>newArrayList(
new StaticPathSpec("/tmp/path1", null),
new StaticPathSpec("/tmp/path2", TextInputFormat.class)
)
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
PathSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
Assert.assertEquals(expected, actual);
}
@Test
public void testAddInputPaths() throws Exception
{
PathSpec ps1 = EasyMock.createMock(PathSpec.class);
EasyMock.expect(ps1.addInputPaths(null, null)).andReturn(null);
PathSpec ps2 = EasyMock.createMock(PathSpec.class);
EasyMock.expect(ps2.addInputPaths(null, null)).andReturn(null);
EasyMock.replay(ps1, ps2);
new MultiplePathSpec(Lists.newArrayList(ps1, ps2)).addInputPaths(null, null);
EasyMock.verify(ps1, ps2);
}
}

View File

@ -21,8 +21,6 @@ package io.druid.indexer.path;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.junit.Assert;
import org.junit.Test;
@ -34,18 +32,18 @@ public class StaticPathSpecTest
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testDeserialization() throws Exception
public void testSerdeCustomInputFormat() throws Exception
{
testDeserialization("/sample/path", TextInputFormat.class);
testSerde("/sample/path", TextInputFormat.class);
}
@Test
public void testDeserializationNoInputFormat() throws Exception
{
testDeserialization("/sample/path", null);
testSerde("/sample/path", null);
}
private void testDeserialization(String path, Class inputFormat) throws Exception
private void testSerde(String path, Class inputFormat) throws Exception
{
StringBuilder sb = new StringBuilder();
sb.append("{\"paths\" : \"");
@ -57,13 +55,22 @@ public class StaticPathSpecTest
sb.append("\",");
}
sb.append("\"type\" : \"static\"}");
StaticPathSpec pathSpec = (StaticPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class);
Assert.assertEquals(inputFormat, pathSpec.getInputFormat());
Job job = Job.getInstance();
pathSpec.addInputPaths(null, job);
Assert.assertEquals(
"file:" + path,
job.getConfiguration().get(FileInputFormat.INPUT_DIR));
StaticPathSpec pathSpec = (StaticPathSpec) readWriteRead(sb.toString(), jsonMapper);
Assert.assertEquals(inputFormat, pathSpec.getInputFormat());
Assert.assertEquals(path, pathSpec.getPaths());
}
public static final PathSpec readWriteRead(String jsonStr, ObjectMapper jsonMapper) throws Exception
{
return jsonMapper.readValue(
jsonMapper.writeValueAsString(
jsonMapper.readValue(
jsonStr,
PathSpec.class
)
),
PathSpec.class
);
}
}