Merge pull request #1177 from himanshug/custom_input_format1

Feature:  Make hadoop input format configurable for batch ingestion
This commit is contained in:
Xavier Léauté 2015-03-19 15:49:36 -07:00
commit a98187f798
15 changed files with 585 additions and 122 deletions

View File

@ -42,7 +42,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
@ -90,11 +90,7 @@ public class DetermineHashedPartitionsJob implements Jobby
);
JobHelper.injectSystemProperties(groupByJob);
if (config.isCombineText()) {
groupByJob.setInputFormatClass(CombineTextInputFormat.class);
} else {
groupByJob.setInputFormatClass(TextInputFormat.class);
}
JobHelper.setInputFormat(groupByJob, config);
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
groupByJob.setMapOutputKeyClass(LongWritable.class);
groupByJob.setMapOutputValueClass(BytesWritable.class);
@ -241,7 +237,7 @@ public class DetermineHashedPartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Text text,
Writable value,
Context context
) throws IOException, InterruptedException
{

View File

@ -127,7 +127,7 @@ public class DeterminePartitionsJob implements Jobby
);
JobHelper.injectSystemProperties(groupByJob);
groupByJob.setInputFormatClass(TextInputFormat.class);
JobHelper.setInputFormat(groupByJob, config);
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
groupByJob.setMapOutputKeyClass(BytesWritable.class);
groupByJob.setMapOutputValueClass(NullWritable.class);
@ -174,7 +174,7 @@ public class DeterminePartitionsJob implements Jobby
} else {
// Directly read the source data, since we assume it's already grouped.
dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class);
dimSelectionJob.setInputFormatClass(TextInputFormat.class);
JobHelper.setInputFormat(dimSelectionJob, config);
config.addInputPaths(dimSelectionJob);
}
@ -260,7 +260,7 @@ public class DeterminePartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Text text,
Writable value,
Context context
) throws IOException, InterruptedException
{
@ -341,7 +341,7 @@ public class DeterminePartitionsJob implements Jobby
@Override
protected void innerMap(
InputRow inputRow,
Text text,
Writable value,
Context context
) throws IOException, InterruptedException
{

View File

@ -17,6 +17,41 @@
package io.druid.indexer;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.initialization.Initialization;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.ShardSpecLookup;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
@ -36,38 +71,6 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.initialization.Initialization;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.ShardSpecLookup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
/**
*/
@ -301,9 +304,9 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().isCombineText();
}
public StringInputRowParser getParser()
public InputRowParser getParser()
{
return (StringInputRowParser) schema.getDataSchema().getParser();
return schema.getDataSchema().getParser();
}
public HadoopyShardSpec getShardSpec(Bucket bucket)
@ -316,6 +319,11 @@ public class HadoopDruidIndexerConfig
return pathSpec.addInputPaths(this, job);
}
public Class<? extends InputFormat> getInputFormatClass()
{
return pathSpec.getInputFormat();
}
/********************************************
Granularity/Bucket Helper Methods
********************************************/

View File

@ -17,21 +17,33 @@
package io.druid.indexer;
import com.metamx.common.RE;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.segment.indexing.granularity.GranularitySpec;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
import org.joda.time.DateTime;
import java.io.IOException;
import com.metamx.common.RE;
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<LongWritable, Text, KEYOUT, VALUEOUT>
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Writable, Writable, KEYOUT, VALUEOUT>
{
private HadoopDruidIndexerConfig config;
private StringInputRowParser parser;
private InputRowParser parser;
protected GranularitySpec granularitySpec;
@Override
@ -48,20 +60,20 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
return config;
}
public StringInputRowParser getParser()
public InputRowParser getParser()
{
return parser;
}
@Override
protected void map(
LongWritable key, Text value, Context context
Writable key, Writable value, Context context
) throws IOException, InterruptedException
{
try {
final InputRow inputRow;
try {
inputRow = parser.parse(value.toString());
inputRow = parseInputRow(value, parser);
}
catch (Exception e) {
if (config.isIgnoreInvalidRows()) {
@ -83,6 +95,17 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
}
}
abstract protected void innerMap(InputRow inputRow, Text text, Context context)
throws IOException, InterruptedException;
public final static InputRow parseInputRow(Writable value, InputRowParser parser)
{
if(parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
return ((StringInputRowParser)parser).parse(value.toString());
} else {
return parser.parse(value);
}
}
abstract protected void innerMap(InputRow inputRow, Writable value, Context context)
throws IOException, InterruptedException;
}

View File

@ -17,6 +17,60 @@
package io.druid.indexer;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.InputRowParser;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.LoggingProgressIndicator;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
@ -34,58 +88,6 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.LoggingProgressIndicator;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
/**
*/
@ -158,11 +160,7 @@ public class IndexGeneratorJob implements Jobby
JobHelper.injectSystemProperties(job);
if (config.isCombineText()) {
job.setInputFormatClass(CombineTextInputFormat.class);
} else {
job.setInputFormatClass(TextInputFormat.class);
}
JobHelper.setInputFormat(job, config);
job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(Text.class);
@ -217,14 +215,14 @@ public class IndexGeneratorJob implements Jobby
}
}
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Writable>
{
private static final HashFunction hashFunction = Hashing.murmur3_128();
@Override
protected void innerMap(
InputRow inputRow,
Text text,
Writable value,
Context context
) throws IOException, InterruptedException
{
@ -254,17 +252,17 @@ public class IndexGeneratorJob implements Jobby
.put(hashedDimensions)
.array()
).toBytesWritable(),
text
value
);
}
}
public static class IndexGeneratorPartitioner extends Partitioner<BytesWritable, Text> implements Configurable
public static class IndexGeneratorPartitioner extends Partitioner<BytesWritable, Writable> implements Configurable
{
private Configuration config;
@Override
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
public int getPartition(BytesWritable bytesWritable, Writable value, int numPartitions)
{
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
bytes.position(4); // Skip length added by SortableBytes
@ -293,11 +291,11 @@ public class IndexGeneratorJob implements Jobby
}
}
public static class IndexGeneratorReducer extends Reducer<BytesWritable, Text, BytesWritable, Text>
public static class IndexGeneratorReducer extends Reducer<BytesWritable, Writable, BytesWritable, Text>
{
private HadoopDruidIndexerConfig config;
private List<String> metricNames = Lists.newArrayList();
private StringInputRowParser parser;
private InputRowParser parser;
protected ProgressIndicator makeProgressIndicator(final Context context)
{
@ -350,7 +348,7 @@ public class IndexGeneratorJob implements Jobby
@Override
protected void reduce(
BytesWritable key, Iterable<Text> values, final Context context
BytesWritable key, Iterable<Writable> values, final Context context
) throws IOException, InterruptedException
{
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
@ -378,9 +376,9 @@ public class IndexGeneratorJob implements Jobby
Set<String> allDimensionNames = Sets.newHashSet();
final ProgressIndicator progressIndicator = makeProgressIndicator(context);
for (final Text value : values) {
for (final Writable value : values) {
context.progress();
final InputRow inputRow = index.formatRow(parser.parse(value.toString()));
final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow);

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.File;
import java.io.IOException;
@ -155,4 +157,15 @@ public class JobHelper
return true;
}
public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfig)
{
if(indexerConfig.getInputFormatClass() != null) {
job.setInputFormatClass(indexerConfig.getInputFormatClass());
} else if (indexerConfig.isCombineText()) {
job.setInputFormatClass(CombineTextInputFormat.class);
} else {
job.setInputFormatClass(TextInputFormat.class);
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.initialization;
import java.util.List;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.indexer.parser.MapWritableInputRowParser;
import io.druid.initialization.DruidModule;
public class IndexingHadoopDruidModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
}
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.<Module>of(
new SimpleModule("IndexingHadoopDruidModule")
.registerSubtypes(
new NamedType(MapWritableInputRowParser.class, "mapWritableParser")
)
);
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.parser;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.ParseSpec;
public class MapWritableInputRowParser implements InputRowParser<MapWritable>
{
private final ParseSpec parseSpec;
private final MapInputRowParser mapParser;
@JsonCreator
public MapWritableInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec
)
{
this.parseSpec = parseSpec;
this.mapParser = new MapInputRowParser(parseSpec);
}
@Override
public InputRow parse(MapWritable map)
{
return mapParser.parse(convertMapStringAndObject(map));
}
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new MapWritableInputRowParser(parseSpec);
}
private Map<String, Object> convertMapStringAndObject(MapWritable value)
{
Map<String, Object> map = new HashMap<String, Object>(value.size());
for(Map.Entry<Writable, Writable> e : value.entrySet()) {
if(! (e.getKey() instanceof Text)) {
throw new RuntimeException("Found non-Text key in input record. type: " + e.getKey().getClass().getName());
}
if(e.getValue() instanceof IntWritable) {
map.put(e.getKey().toString(), ((IntWritable)e.getValue()).get());
} else if(e.getValue() instanceof LongWritable) {
map.put(e.getKey().toString(), ((LongWritable)e.getValue()).get());
} else if(e.getValue() instanceof FloatWritable) {
map.put(e.getKey().toString(), ((FloatWritable)e.getValue()).get());
} else if(e.getValue() instanceof DoubleWritable) {
map.put(e.getKey().toString(), ((DoubleWritable)e.getValue()).get());
} else if(e.getValue() instanceof Text) {
map.put(e.getKey().toString(), e.getValue().toString());
} else if(e.getValue() instanceof BytesWritable) {
map.put(e.getKey().toString(), ((BytesWritable)e.getValue()).getBytes());
} else if(e.getValue() instanceof ArrayWritable) {
//this is for multivalued dimensions
map.put(
e.getKey().toString(),
Lists.transform(Arrays.asList(((ArrayWritable) e.getValue()).get()), new Function<Writable, String>()
{
@Override
public String apply(Writable input)
{
return ((Text) input).toString();
}
}));
} else {
throw new RuntimeException("Unrecognized value type in input record. type: " + e.getValue().getClass().getName());
}
}
return map;
}
}

View File

@ -28,6 +28,7 @@ import io.druid.indexer.hadoop.FSSpideringIterator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.joda.time.DateTime;
@ -49,6 +50,7 @@ public class GranularityPathSpec implements PathSpec
private String filePattern;
private Granularity dataGranularity;
private String pathFormat;
private Class<? extends InputFormat> inputFormat;
@JsonProperty
public String getInputPath()
@ -61,6 +63,17 @@ public class GranularityPathSpec implements PathSpec
this.inputPath = inputPath;
}
@JsonProperty
public Class<? extends InputFormat> getInputFormat()
{
return inputFormat;
}
public void setInputFormat(Class<? extends InputFormat> inputFormat)
{
this.inputFormat = inputFormat;
}
@JsonProperty
public String getFilePattern()
{

View File

@ -20,6 +20,8 @@ package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
@ -35,4 +37,5 @@ import java.io.IOException;
public interface PathSpec
{
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException;
public Class<? extends InputFormat> getInputFormat();
}

View File

@ -20,6 +20,8 @@ package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.logger.Logger;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@ -37,16 +39,21 @@ public class StaticPathSpec implements PathSpec
@JsonProperty("paths")
public String paths;
@JsonProperty("inputFormat")
private final Class<? extends InputFormat> inputFormat;
public StaticPathSpec()
{
this(null);
this(null, null);
}
public StaticPathSpec(
String paths
String paths,
Class<? extends InputFormat> inputFormat
)
{
this.paths = paths;
this.inputFormat = inputFormat;
}
@Override
@ -56,4 +63,9 @@ public class StaticPathSpec implements PathSpec
FileInputFormat.addInputPaths(job, paths);
return job;
}
public Class<? extends InputFormat> getInputFormat()
{
return inputFormat;
}
}

View File

@ -0,0 +1 @@
io.druid.indexer.initialization.IndexingHadoopDruidModule

View File

@ -0,0 +1,106 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.parser;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.initialization.IndexingHadoopDruidModule;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
public class MapWritableInputRowParserTest
{
private final ObjectMapper jsonMapper;
public MapWritableInputRowParserTest()
{
jsonMapper = new DefaultObjectMapper();
for (Module jacksonModule : new IndexingHadoopDruidModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
}
@Test
public void testDeserialization() throws Exception
{
String str = "{" +
"\"type\": \"mapWritableParser\",\n" +
"\"parseSpec\": {\n" +
" \"format\": \"json\",\n" + //NOTE: may be druid-api should allow another name for json parseSpec
" \"timestampSpec\": { \"column\": \"time\", \"format\": \"YYYY\" },\n" +
" \"dimensionsSpec\": {}\n" +
" }\n" +
"}";
MapWritableInputRowParser parser = (MapWritableInputRowParser)jsonMapper.readValue(str, InputRowParser.class);
}
@Test
public void testParse()
{
MapWritableInputRowParser parser = new MapWritableInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "YYYY"),
new DimensionsSpec(null, null, null)));
MapWritable mapWritable = new MapWritable();
mapWritable.put(new Text("time"), new Text("2015"));
mapWritable.put(new Text("int"), new IntWritable(1));
mapWritable.put(new Text("long"), new LongWritable(1));
mapWritable.put(new Text("float"), new FloatWritable(1.0f));
mapWritable.put(new Text("double"), new DoubleWritable(1.0));
mapWritable.put(new Text("text"), new Text("a"));
mapWritable.put(
new Text("list"),
new ArrayWritable(Text.class, new Text[]{ new Text("v1"), new Text("v2") }));
byte[] bytes = "a".getBytes();
mapWritable.put(new Text("bytes"), new BytesWritable(bytes));
InputRow inputRow = parser.parse(mapWritable);
Assert.assertEquals(DateTime.parse("2015"), inputRow.getTimestamp());
Assert.assertEquals(1, inputRow.getLongMetric("int"));
Assert.assertEquals(1, inputRow.getLongMetric("long"));
Assert.assertEquals(1.0, inputRow.getFloatMetric("float"), 0.0001);
Assert.assertEquals(1.0, inputRow.getFloatMetric("double"), 0.0001);
Assert.assertEquals(ImmutableList.of("a"), inputRow.getDimension("text"));
Assert.assertEquals(ImmutableList.of("v1", "v2"), inputRow.getDimension("list"));
Assert.assertEquals(bytes, inputRow.getRaw("bytes"));
}
}

View File

@ -19,7 +19,12 @@
package io.druid.indexer.path;
import io.druid.jackson.DefaultObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.Granularity;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -33,6 +38,8 @@ public class GranularityPathSpecTest
private final String TEST_STRING_PATTERN = "*.TEST";
private final String TEST_STRING_FORMAT = "F_TEST";
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Before public void setUp()
{
granularityPathSpec = new GranularityPathSpec();
@ -67,4 +74,51 @@ public class GranularityPathSpecTest
granularityPathSpec.setDataGranularity(granularity);
Assert.assertEquals(granularity,granularityPathSpec.getDataGranularity());
}
@Test
public void testDeserialization() throws Exception
{
testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class);
}
@Test
public void testDeserializationNoInputFormat() throws Exception
{
testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, null);
}
private void testDeserialization(
String inputPath,
String filePattern,
String pathFormat,
Granularity granularity,
Class inputFormat) throws Exception
{
StringBuilder sb = new StringBuilder();
sb.append("{\"inputPath\" : \"");
sb.append(inputPath);
sb.append("\",");
sb.append("\"filePattern\" : \"");
sb.append(filePattern);
sb.append("\",");
sb.append("\"pathFormat\" : \"");
sb.append(pathFormat);
sb.append("\",");
sb.append("\"dataGranularity\" : \"");
sb.append(granularity.toString());
sb.append("\",");
if(inputFormat != null) {
sb.append("\"inputFormat\" : \"");
sb.append(inputFormat.getName());
sb.append("\",");
}
sb.append("\"type\" : \"granularity\"}");
GranularityPathSpec pathSpec = (GranularityPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class);
Assert.assertEquals(inputFormat, pathSpec.getInputFormat());
Assert.assertEquals(inputPath, pathSpec.getInputPath());
Assert.assertEquals(filePattern, pathSpec.getFilePattern());
Assert.assertEquals(pathFormat, pathSpec.getPathFormat());
Assert.assertEquals(granularity, pathSpec.getDataGranularity());
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.junit.Assert;
import org.junit.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
public class StaticPathSpecTest
{
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testDeserialization() throws Exception
{
testDeserialization("/sample/path", TextInputFormat.class);
}
@Test
public void testDeserializationNoInputFormat() throws Exception
{
testDeserialization("/sample/path", null);
}
private void testDeserialization(String path, Class inputFormat) throws Exception
{
StringBuilder sb = new StringBuilder();
sb.append("{\"paths\" : \"");
sb.append(path);
sb.append("\",");
if(inputFormat != null) {
sb.append("\"inputFormat\" : \"");
sb.append(inputFormat.getName());
sb.append("\",");
}
sb.append("\"type\" : \"static\"}");
StaticPathSpec pathSpec = (StaticPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class);
Assert.assertEquals(inputFormat, pathSpec.getInputFormat());
Job job = new Job();
pathSpec.addInputPaths(null, job);
Assert.assertEquals(
"file:" + path,
job.getConfiguration().get(FileInputFormat.INPUT_DIR));
}
}