diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index 803e527b4dd..18d239b734e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -25,6 +25,7 @@ import io.druid.data.input.impl.StringInputRowParser; import io.druid.segment.indexing.granularity.GranularitySpec; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -101,11 +102,15 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< public final static InputRow parseInputRow(Object value, InputRowParser parser) { - if(parser instanceof StringInputRowParser && value instanceof Text) { + if (parser instanceof StringInputRowParser && value instanceof Text) { //Note: This is to ensure backward compatibility with 0.7.0 and before return ((StringInputRowParser) parser).parse(value.toString()); - } else if(value instanceof InputRow) { - return (InputRow)value; + } else if (parser instanceof StringInputRowParser && value instanceof BytesWritable) { + BytesWritable valueBytes = (BytesWritable) value; + ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes.getBytes(), 0, valueBytes.getLength()); + return ((StringInputRowParser) parser).parse(valueBuffer); + } else if (value instanceof InputRow) { + return (InputRow) value; } else { return parser.parse(value); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 958b64041f1..0eba50ce46c 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -22,6 +22,7 @@ package io.druid.indexer; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.api.client.util.Charsets; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -43,6 +44,15 @@ import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.SingleDimensionShardSpec; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.joda.time.DateTime; import org.joda.time.DateTimeComparator; import org.joda.time.Interval; @@ -56,8 +66,10 @@ import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -65,7 +77,8 @@ import java.util.Map; public class IndexGeneratorJobTest { - @Parameterized.Parameters(name = "partitionType={0}, interval={1}, shardInfoForEachSegment={2}, data={3}") + @Parameterized.Parameters(name = "partitionType={0}, interval={1}, shardInfoForEachSegment={2}, data={3}, " + + "inputFormatName={4}") public static Collection constructFeed() { return Arrays.asList( @@ -76,18 +89,18 @@ public class IndexGeneratorJobTest "2014-10-22T00:00:00Z/P2D", new String[][][]{ { - { null, "c.example.com" }, - { "c.example.com", "e.example.com" }, - { "e.example.com", "g.example.com" }, - { "g.example.com", "i.example.com" }, - { "i.example.com", null } + {null, "c.example.com"}, + {"c.example.com", "e.example.com"}, + {"e.example.com", "g.example.com"}, + {"g.example.com", "i.example.com"}, + {"i.example.com", null} }, { - { null, "c.example.com" }, - { "c.example.com", "e.example.com" }, - { "e.example.com", "g.example.com" }, - { "g.example.com", "i.example.com" }, - { "i.example.com", null } + {null, "c.example.com"}, + {"c.example.com", "e.example.com"}, + {"e.example.com", "g.example.com"}, + {"g.example.com", "i.example.com"}, + {"i.example.com", null} } }, ImmutableList.of( @@ -111,7 +124,8 @@ public class IndexGeneratorJobTest "2014102300,h.example.com,251", "2014102300,i.example.com,963", "2014102300,j.example.com,333" - ) + ), + null }, { false, @@ -119,10 +133,10 @@ public class IndexGeneratorJobTest "2014-10-22T00:00:00Z/P1D", new Integer[][][]{ { - { 0, 4 }, - { 1, 4 }, - { 2, 4 }, - { 3, 4 } + {0, 4}, + {1, 4}, + {2, 4}, + {3, 4} } }, ImmutableList.of( @@ -144,7 +158,8 @@ public class IndexGeneratorJobTest "2014102215,p.example.com,3533", "2014102216,q.example.com,500", "2014102216,q.example.com,87" - ) + ), + null }, { true, @@ -152,10 +167,10 @@ public class IndexGeneratorJobTest "2014-10-22T00:00:00Z/P1D", new Integer[][][]{ { - { 0, 4 }, - { 1, 4 }, - { 2, 4 }, - { 3, 4 } + {0, 4}, + {1, 4}, + {2, 4}, + {3, 4} } }, ImmutableList.of( @@ -177,7 +192,52 @@ public class IndexGeneratorJobTest "2014102215,p.example.com,3533", "2014102216,q.example.com,500", "2014102216,q.example.com,87" - ) + ), + null + }, + { + false, + "single", + "2014-10-22T00:00:00Z/P2D", + new String[][][]{ + { + {null, "c.example.com"}, + {"c.example.com", "e.example.com"}, + {"e.example.com", "g.example.com"}, + {"g.example.com", "i.example.com"}, + {"i.example.com", null} + }, + { + {null, "c.example.com"}, + {"c.example.com", "e.example.com"}, + {"e.example.com", "g.example.com"}, + {"g.example.com", "i.example.com"}, + {"i.example.com", null} + } + }, + ImmutableList.of( + "2014102200,a.example.com,100", + "2014102200,b.exmaple.com,50", + "2014102200,c.example.com,200", + "2014102200,d.example.com,250", + "2014102200,e.example.com,123", + "2014102200,f.example.com,567", + "2014102200,g.example.com,11", + "2014102200,h.example.com,251", + "2014102200,i.example.com,963", + "2014102200,j.example.com,333", + "2014102300,a.example.com,100", + "2014102300,b.exmaple.com,50", + "2014102300,c.example.com,200", + "2014102300,d.example.com,250", + "2014102300,e.example.com,123", + "2014102300,f.example.com,567", + "2014102300,g.example.com,11", + "2014102300,h.example.com,251", + "2014102300,i.example.com,963", + "2014102300,j.example.com,333" + ), + SequenceFileInputFormat.class.getName() } } ); @@ -194,13 +254,15 @@ public class IndexGeneratorJobTest private Object[][][] shardInfoForEachSegment; private List data; private boolean useCombiner; + private String inputFormatName; public IndexGeneratorJobTest( boolean useCombiner, String partitionType, String interval, Object[][][] shardInfoForEachSegment, - List data + List data, + String inputFormatName ) throws IOException { this.useCombiner = useCombiner; @@ -208,6 +270,34 @@ public class IndexGeneratorJobTest this.shardInfoForEachSegment = shardInfoForEachSegment; this.interval = new Interval(interval); this.data = data; + this.inputFormatName = inputFormatName; + } + + private void writeDataToLocalSequenceFile(File outputFile, List data) throws IOException + { + Configuration conf = new Configuration(); + LocalFileSystem fs = FileSystem.getLocal(conf); + Writer fileWriter = SequenceFile.createWriter( + fs, + conf, + new Path(outputFile.getAbsolutePath()), + BytesWritable.class, + BytesWritable.class, + SequenceFile.CompressionType.NONE, + (CompressionCodec) null + ); + + int keyCount = 10; + for (String line : data) { + ByteBuffer buf = ByteBuffer.allocate(4); + buf.putInt(keyCount); + BytesWritable key = new BytesWritable(buf.array()); + BytesWritable value = new BytesWritable(line.getBytes(Charsets.UTF_8)); + fileWriter.append(key, value); + keyCount += 1; + } + + fileWriter.close(); } @Before @@ -222,7 +312,18 @@ public class IndexGeneratorJobTest dataFile = temporaryFolder.newFile(); tmpDir = temporaryFolder.newFolder(); - FileUtils.writeLines(dataFile, data); + HashMap inputSpec = new HashMap(); + inputSpec.put("paths", dataFile.getCanonicalPath()); + inputSpec.put("type", "static"); + if (inputFormatName != null) { + inputSpec.put("inputFormat", inputFormatName); + } + + if (SequenceFileInputFormat.class.getName().equals(inputFormatName)) { + writeDataToLocalSequenceFile(dataFile, data); + } else { + FileUtils.writeLines(dataFile, data); + } config = new HadoopDruidIndexerConfig( new HadoopIngestionSpec( @@ -245,12 +346,7 @@ public class IndexGeneratorJobTest ) ), new HadoopIOConfig( - ImmutableMap.of( - "paths", - dataFile.getCanonicalPath(), - "type", - "static" - ), + ImmutableMap.copyOf(inputSpec), null, tmpDir.getCanonicalPath() ),