diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 5d8640c3f7a..39683bbac5a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -42,7 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; @@ -90,11 +90,7 @@ public class DetermineHashedPartitionsJob implements Jobby ); JobHelper.injectSystemProperties(groupByJob); - if (config.isCombineText()) { - groupByJob.setInputFormatClass(CombineTextInputFormat.class); - } else { - groupByJob.setInputFormatClass(TextInputFormat.class); - } + JobHelper.setInputFormat(groupByJob, config); groupByJob.setMapperClass(DetermineCardinalityMapper.class); groupByJob.setMapOutputKeyClass(LongWritable.class); groupByJob.setMapOutputValueClass(BytesWritable.class); @@ -241,7 +237,7 @@ public class DetermineHashedPartitionsJob implements Jobby @Override protected void innerMap( InputRow inputRow, - Text text, + Writable value, Context context ) throws IOException, InterruptedException { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index e0014ca2dc3..ffbc5a446a8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -127,7 +127,7 @@ public class DeterminePartitionsJob implements Jobby ); JobHelper.injectSystemProperties(groupByJob); - groupByJob.setInputFormatClass(TextInputFormat.class); + JobHelper.setInputFormat(groupByJob, config); groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class); groupByJob.setMapOutputKeyClass(BytesWritable.class); groupByJob.setMapOutputValueClass(NullWritable.class); @@ -174,7 +174,7 @@ public class DeterminePartitionsJob implements Jobby } else { // Directly read the source data, since we assume it's already grouped. dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class); - dimSelectionJob.setInputFormatClass(TextInputFormat.class); + JobHelper.setInputFormat(dimSelectionJob, config); config.addInputPaths(dimSelectionJob); } @@ -260,7 +260,7 @@ public class DeterminePartitionsJob implements Jobby @Override protected void innerMap( InputRow inputRow, - Text text, + Writable value, Context context ) throws IOException, InterruptedException { @@ -341,7 +341,7 @@ public class DeterminePartitionsJob implements Jobby @Override protected void innerMap( InputRow inputRow, - Text text, + Writable value, Context context ) throws IOException, InterruptedException { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 305b0edd771..c0a692bb6cf 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -17,6 +17,41 @@ package io.druid.indexer; +import io.druid.common.utils.JodaUtils; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.granularity.QueryGranularity; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.annotations.Self; +import io.druid.indexer.partitions.PartitionsSpec; +import io.druid.indexer.path.PathSpec; +import io.druid.initialization.Initialization; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.server.DruidNode; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.ShardSpec; +import io.druid.timeline.partition.ShardSpecLookup; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.SortedSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.format.ISODateTimeFormat; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; @@ -36,38 +71,6 @@ import com.google.inject.Key; import com.google.inject.Module; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; -import io.druid.common.utils.JodaUtils; -import io.druid.data.input.InputRow; -import io.druid.data.input.impl.StringInputRowParser; -import io.druid.granularity.QueryGranularity; -import io.druid.guice.GuiceInjectors; -import io.druid.guice.JsonConfigProvider; -import io.druid.guice.annotations.Self; -import io.druid.indexer.partitions.PartitionsSpec; -import io.druid.indexer.path.PathSpec; -import io.druid.initialization.Initialization; -import io.druid.segment.indexing.granularity.GranularitySpec; -import io.druid.server.DruidNode; -import io.druid.timeline.DataSegment; -import io.druid.timeline.partition.ShardSpec; -import io.druid.timeline.partition.ShardSpecLookup; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.mapreduce.Job; -import org.joda.time.DateTime; -import org.joda.time.Interval; -import org.joda.time.format.ISODateTimeFormat; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.SortedSet; /** */ @@ -301,9 +304,9 @@ public class HadoopDruidIndexerConfig return schema.getTuningConfig().isCombineText(); } - public StringInputRowParser getParser() + public InputRowParser getParser() { - return (StringInputRowParser) schema.getDataSchema().getParser(); + return schema.getDataSchema().getParser(); } public HadoopyShardSpec getShardSpec(Bucket bucket) @@ -316,6 +319,11 @@ public class HadoopDruidIndexerConfig return pathSpec.addInputPaths(this, job); } + public Class getInputFormatClass() + { + return pathSpec.getInputFormat(); + } + /******************************************** Granularity/Bucket Helper Methods ********************************************/ diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index 556432a64ee..4fd3cc93081 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -17,21 +17,33 @@ package io.druid.indexer; -import com.metamx.common.RE; import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.MapInputRowParser; import io.druid.data.input.impl.StringInputRowParser; import io.druid.segment.indexing.granularity.GranularitySpec; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Mapper; import org.joda.time.DateTime; -import java.io.IOException; +import com.metamx.common.RE; -public abstract class HadoopDruidIndexerMapper extends Mapper +public abstract class HadoopDruidIndexerMapper extends Mapper { private HadoopDruidIndexerConfig config; - private StringInputRowParser parser; + private InputRowParser parser; protected GranularitySpec granularitySpec; @Override @@ -48,20 +60,20 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< return config; } - public StringInputRowParser getParser() + public InputRowParser getParser() { return parser; } @Override protected void map( - LongWritable key, Text value, Context context + Writable key, Writable value, Context context ) throws IOException, InterruptedException { try { final InputRow inputRow; try { - inputRow = parser.parse(value.toString()); + inputRow = parseInputRow(value, parser); } catch (Exception e) { if (config.isIgnoreInvalidRows()) { @@ -83,6 +95,17 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< } } - abstract protected void innerMap(InputRow inputRow, Text text, Context context) + public final static InputRow parseInputRow(Writable value, InputRowParser parser) + { + if(parser instanceof StringInputRowParser && value instanceof Text) { + //Note: This is to ensure backward compatibility with 0.7.0 and before + return ((StringInputRowParser)parser).parse(value.toString()); + } else { + return parser.parse(value); + } + } + + abstract protected void innerMap(InputRow inputRow, Writable value, Context context) throws IOException, InterruptedException; + } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 73634382059..5f97f431482 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -17,6 +17,60 @@ package io.druid.indexer; +import io.druid.collections.StupidPool; +import io.druid.data.input.InputRow; +import io.druid.data.input.Rows; +import io.druid.data.input.impl.InputRowParser; +import io.druid.offheap.OffheapBufferPool; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMaker; +import io.druid.segment.LoggingProgressIndicator; +import io.druid.segment.ProgressIndicator; +import io.druid.segment.QueryableIndex; +import io.druid.segment.SegmentUtils; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OffheapIncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.timeline.DataSegment; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.joda.time.DateTime; +import org.joda.time.Interval; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Strings; @@ -34,58 +88,6 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.logger.Logger; -import io.druid.collections.StupidPool; -import io.druid.data.input.InputRow; -import io.druid.data.input.Rows; -import io.druid.data.input.impl.StringInputRowParser; -import io.druid.granularity.QueryGranularity; -import io.druid.offheap.OffheapBufferPool; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; -import io.druid.segment.LoggingProgressIndicator; -import io.druid.segment.ProgressIndicator; -import io.druid.segment.QueryableIndex; -import io.druid.segment.SegmentUtils; -import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OffheapIncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; -import io.druid.timeline.DataSegment; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.InvalidJobConfException; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.joda.time.DateTime; -import org.joda.time.Interval; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.zip.ZipEntry; -import java.util.zip.ZipOutputStream; /** */ @@ -158,11 +160,7 @@ public class IndexGeneratorJob implements Jobby JobHelper.injectSystemProperties(job); - if (config.isCombineText()) { - job.setInputFormatClass(CombineTextInputFormat.class); - } else { - job.setInputFormatClass(TextInputFormat.class); - } + JobHelper.setInputFormat(job, config); job.setMapperClass(IndexGeneratorMapper.class); job.setMapOutputValueClass(Text.class); @@ -217,14 +215,14 @@ public class IndexGeneratorJob implements Jobby } } - public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper + public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper { private static final HashFunction hashFunction = Hashing.murmur3_128(); @Override protected void innerMap( InputRow inputRow, - Text text, + Writable value, Context context ) throws IOException, InterruptedException { @@ -254,17 +252,17 @@ public class IndexGeneratorJob implements Jobby .put(hashedDimensions) .array() ).toBytesWritable(), - text + value ); } } - public static class IndexGeneratorPartitioner extends Partitioner implements Configurable + public static class IndexGeneratorPartitioner extends Partitioner implements Configurable { private Configuration config; @Override - public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions) + public int getPartition(BytesWritable bytesWritable, Writable value, int numPartitions) { final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); bytes.position(4); // Skip length added by SortableBytes @@ -293,11 +291,11 @@ public class IndexGeneratorJob implements Jobby } } - public static class IndexGeneratorReducer extends Reducer + public static class IndexGeneratorReducer extends Reducer { private HadoopDruidIndexerConfig config; private List metricNames = Lists.newArrayList(); - private StringInputRowParser parser; + private InputRowParser parser; protected ProgressIndicator makeProgressIndicator(final Context context) { @@ -350,7 +348,7 @@ public class IndexGeneratorJob implements Jobby @Override protected void reduce( - BytesWritable key, Iterable values, final Context context + BytesWritable key, Iterable values, final Context context ) throws IOException, InterruptedException { SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); @@ -378,9 +376,9 @@ public class IndexGeneratorJob implements Jobby Set allDimensionNames = Sets.newHashSet(); final ProgressIndicator progressIndicator = makeProgressIndicator(context); - for (final Text value : values) { + for (final Writable value : values) { context.progress(); - final InputRow inputRow = index.formatRow(parser.parse(value.toString())); + final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser)); allDimensionNames.addAll(inputRow.getDimensions()); int numRows = index.add(inputRow); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index e22631051de..7b4ed9fd1df 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -30,6 +30,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.File; import java.io.IOException; @@ -155,4 +157,15 @@ public class JobHelper return true; } + + public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfig) + { + if(indexerConfig.getInputFormatClass() != null) { + job.setInputFormatClass(indexerConfig.getInputFormatClass()); + } else if (indexerConfig.isCombineText()) { + job.setInputFormatClass(CombineTextInputFormat.class); + } else { + job.setInputFormatClass(TextInputFormat.class); + } + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/initialization/IndexingHadoopDruidModule.java b/indexing-hadoop/src/main/java/io/druid/indexer/initialization/IndexingHadoopDruidModule.java new file mode 100644 index 00000000000..6e5b74d4045 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/initialization/IndexingHadoopDruidModule.java @@ -0,0 +1,51 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.initialization; + +import java.util.List; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; + +import io.druid.indexer.parser.MapWritableInputRowParser; +import io.druid.initialization.DruidModule; + +public class IndexingHadoopDruidModule implements DruidModule +{ + + @Override + public void configure(Binder binder) + { + } + + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("IndexingHadoopDruidModule") + .registerSubtypes( + new NamedType(MapWritableInputRowParser.class, "mapWritableParser") + ) + ); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/parser/MapWritableInputRowParser.java b/indexing-hadoop/src/main/java/io/druid/indexer/parser/MapWritableInputRowParser.java new file mode 100644 index 00000000000..37592bef810 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/parser/MapWritableInputRowParser.java @@ -0,0 +1,116 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.parser; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Function; +import com.google.common.collect.Lists; + +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.ParseSpec; + +public class MapWritableInputRowParser implements InputRowParser +{ + private final ParseSpec parseSpec; + private final MapInputRowParser mapParser; + + @JsonCreator + public MapWritableInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec + ) + { + this.parseSpec = parseSpec; + this.mapParser = new MapInputRowParser(parseSpec); + } + + @Override + public InputRow parse(MapWritable map) + { + return mapParser.parse(convertMapStringAndObject(map)); + } + + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @Override + public InputRowParser withParseSpec(ParseSpec parseSpec) + { + return new MapWritableInputRowParser(parseSpec); + } + + private Map convertMapStringAndObject(MapWritable value) + { + Map map = new HashMap(value.size()); + for(Map.Entry e : value.entrySet()) { + if(! (e.getKey() instanceof Text)) { + throw new RuntimeException("Found non-Text key in input record. type: " + e.getKey().getClass().getName()); + } + + if(e.getValue() instanceof IntWritable) { + map.put(e.getKey().toString(), ((IntWritable)e.getValue()).get()); + } else if(e.getValue() instanceof LongWritable) { + map.put(e.getKey().toString(), ((LongWritable)e.getValue()).get()); + } else if(e.getValue() instanceof FloatWritable) { + map.put(e.getKey().toString(), ((FloatWritable)e.getValue()).get()); + } else if(e.getValue() instanceof DoubleWritable) { + map.put(e.getKey().toString(), ((DoubleWritable)e.getValue()).get()); + } else if(e.getValue() instanceof Text) { + map.put(e.getKey().toString(), e.getValue().toString()); + } else if(e.getValue() instanceof BytesWritable) { + map.put(e.getKey().toString(), ((BytesWritable)e.getValue()).getBytes()); + } else if(e.getValue() instanceof ArrayWritable) { + //this is for multivalued dimensions + map.put( + e.getKey().toString(), + Lists.transform(Arrays.asList(((ArrayWritable) e.getValue()).get()), new Function() + { + @Override + public String apply(Writable input) + { + return ((Text) input).toString(); + } + })); + } else { + throw new RuntimeException("Unrecognized value type in input record. type: " + e.getValue().getClass().getName()); + } + } + return map; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java index e2ff80aaeb2..14f8db615f9 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java @@ -28,6 +28,7 @@ import io.druid.indexer.hadoop.FSSpideringIterator; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.joda.time.DateTime; @@ -49,6 +50,7 @@ public class GranularityPathSpec implements PathSpec private String filePattern; private Granularity dataGranularity; private String pathFormat; + private Class inputFormat; @JsonProperty public String getInputPath() @@ -61,6 +63,17 @@ public class GranularityPathSpec implements PathSpec this.inputPath = inputPath; } + @JsonProperty + public Class getInputFormat() + { + return inputFormat; + } + + public void setInputFormat(Class inputFormat) + { + this.inputFormat = inputFormat; + } + @JsonProperty public String getFilePattern() { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java index 693f7cf4033..90feb83bc95 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java @@ -20,6 +20,8 @@ package io.druid.indexer.path; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.indexer.HadoopDruidIndexerConfig; + +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; @@ -35,4 +37,5 @@ import java.io.IOException; public interface PathSpec { public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException; + public Class getInputFormat(); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java index f11e9d8f0c3..c6cc63d794a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java @@ -20,6 +20,8 @@ package io.druid.indexer.path; import com.fasterxml.jackson.annotation.JsonProperty; import com.metamx.common.logger.Logger; import io.druid.indexer.HadoopDruidIndexerConfig; + +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -37,16 +39,21 @@ public class StaticPathSpec implements PathSpec @JsonProperty("paths") public String paths; + @JsonProperty("inputFormat") + private final Class inputFormat; + public StaticPathSpec() { - this(null); + this(null, null); } public StaticPathSpec( - String paths + String paths, + Class inputFormat ) { this.paths = paths; + this.inputFormat = inputFormat; } @Override @@ -56,4 +63,9 @@ public class StaticPathSpec implements PathSpec FileInputFormat.addInputPaths(job, paths); return job; } + + public Class getInputFormat() + { + return inputFormat; + } } diff --git a/indexing-hadoop/src/main/resources/META-INF/io.druid.initialization.DruidModule b/indexing-hadoop/src/main/resources/META-INF/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..c64e1060d81 --- /dev/null +++ b/indexing-hadoop/src/main/resources/META-INF/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.indexer.initialization.IndexingHadoopDruidModule \ No newline at end of file diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/parser/MapWritableInputRowParserTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/parser/MapWritableInputRowParserTest.java new file mode 100644 index 00000000000..f39116f3007 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/parser/MapWritableInputRowParserTest.java @@ -0,0 +1,106 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.parser; + +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.indexer.initialization.IndexingHadoopDruidModule; +import io.druid.jackson.DefaultObjectMapper; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; + +public class MapWritableInputRowParserTest +{ + private final ObjectMapper jsonMapper; + + public MapWritableInputRowParserTest() + { + jsonMapper = new DefaultObjectMapper(); + for (Module jacksonModule : new IndexingHadoopDruidModule().getJacksonModules()) { + jsonMapper.registerModule(jacksonModule); + } + } + + @Test + public void testDeserialization() throws Exception + { + String str = "{" + + "\"type\": \"mapWritableParser\",\n" + + "\"parseSpec\": {\n" + + " \"format\": \"json\",\n" + //NOTE: may be druid-api should allow another name for json parseSpec + " \"timestampSpec\": { \"column\": \"time\", \"format\": \"YYYY\" },\n" + + " \"dimensionsSpec\": {}\n" + + " }\n" + + "}"; + + MapWritableInputRowParser parser = (MapWritableInputRowParser)jsonMapper.readValue(str, InputRowParser.class); + } + + @Test + public void testParse() + { + MapWritableInputRowParser parser = new MapWritableInputRowParser( + new JSONParseSpec( + new TimestampSpec("time", "YYYY"), + new DimensionsSpec(null, null, null))); + + MapWritable mapWritable = new MapWritable(); + mapWritable.put(new Text("time"), new Text("2015")); + mapWritable.put(new Text("int"), new IntWritable(1)); + mapWritable.put(new Text("long"), new LongWritable(1)); + mapWritable.put(new Text("float"), new FloatWritable(1.0f)); + mapWritable.put(new Text("double"), new DoubleWritable(1.0)); + mapWritable.put(new Text("text"), new Text("a")); + mapWritable.put( + new Text("list"), + new ArrayWritable(Text.class, new Text[]{ new Text("v1"), new Text("v2") })); + + byte[] bytes = "a".getBytes(); + mapWritable.put(new Text("bytes"), new BytesWritable(bytes)); + + InputRow inputRow = parser.parse(mapWritable); + + Assert.assertEquals(DateTime.parse("2015"), inputRow.getTimestamp()); + Assert.assertEquals(1, inputRow.getLongMetric("int")); + Assert.assertEquals(1, inputRow.getLongMetric("long")); + Assert.assertEquals(1.0, inputRow.getFloatMetric("float"), 0.0001); + Assert.assertEquals(1.0, inputRow.getFloatMetric("double"), 0.0001); + Assert.assertEquals(ImmutableList.of("a"), inputRow.getDimension("text")); + Assert.assertEquals(ImmutableList.of("v1", "v2"), inputRow.getDimension("list")); + Assert.assertEquals(bytes, inputRow.getRaw("bytes")); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index c88c6778c55..a7c5fc2033a 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -19,7 +19,12 @@ package io.druid.indexer.path; +import io.druid.jackson.DefaultObjectMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; import com.metamx.common.Granularity; + +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -33,6 +38,8 @@ public class GranularityPathSpecTest private final String TEST_STRING_PATTERN = "*.TEST"; private final String TEST_STRING_FORMAT = "F_TEST"; + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + @Before public void setUp() { granularityPathSpec = new GranularityPathSpec(); @@ -67,4 +74,51 @@ public class GranularityPathSpecTest granularityPathSpec.setDataGranularity(granularity); Assert.assertEquals(granularity,granularityPathSpec.getDataGranularity()); } + + @Test + public void testDeserialization() throws Exception + { + testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class); + } + + @Test + public void testDeserializationNoInputFormat() throws Exception + { + testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, null); + } + + private void testDeserialization( + String inputPath, + String filePattern, + String pathFormat, + Granularity granularity, + Class inputFormat) throws Exception + { + StringBuilder sb = new StringBuilder(); + sb.append("{\"inputPath\" : \""); + sb.append(inputPath); + sb.append("\","); + sb.append("\"filePattern\" : \""); + sb.append(filePattern); + sb.append("\","); + sb.append("\"pathFormat\" : \""); + sb.append(pathFormat); + sb.append("\","); + sb.append("\"dataGranularity\" : \""); + sb.append(granularity.toString()); + sb.append("\","); + if(inputFormat != null) { + sb.append("\"inputFormat\" : \""); + sb.append(inputFormat.getName()); + sb.append("\","); + } + sb.append("\"type\" : \"granularity\"}"); + + GranularityPathSpec pathSpec = (GranularityPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class); + Assert.assertEquals(inputFormat, pathSpec.getInputFormat()); + Assert.assertEquals(inputPath, pathSpec.getInputPath()); + Assert.assertEquals(filePattern, pathSpec.getFilePattern()); + Assert.assertEquals(pathFormat, pathSpec.getPathFormat()); + Assert.assertEquals(granularity, pathSpec.getDataGranularity()); + } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java new file mode 100644 index 00000000000..96cb98b6c5a --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java @@ -0,0 +1,69 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import io.druid.jackson.DefaultObjectMapper; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.junit.Assert; +import org.junit.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class StaticPathSpecTest +{ + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testDeserialization() throws Exception + { + testDeserialization("/sample/path", TextInputFormat.class); + } + + @Test + public void testDeserializationNoInputFormat() throws Exception + { + testDeserialization("/sample/path", null); + } + + private void testDeserialization(String path, Class inputFormat) throws Exception + { + StringBuilder sb = new StringBuilder(); + sb.append("{\"paths\" : \""); + sb.append(path); + sb.append("\","); + if(inputFormat != null) { + sb.append("\"inputFormat\" : \""); + sb.append(inputFormat.getName()); + sb.append("\","); + } + sb.append("\"type\" : \"static\"}"); + StaticPathSpec pathSpec = (StaticPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class); + Assert.assertEquals(inputFormat, pathSpec.getInputFormat()); + + Job job = new Job(); + pathSpec.addInputPaths(null, job); + Assert.assertEquals( + "file:" + path, + job.getConfiguration().get(FileInputFormat.INPUT_DIR)); + } +}