Merge pull request #1682 from jon-wei/master

Support parsing of BytesWritable strings in HadoopDruidIndexerMapper
This commit is contained in:
Gian Merlino 2015-08-28 17:46:26 -07:00
commit e7b4cf942b
2 changed files with 134 additions and 33 deletions

View File

@ -25,6 +25,7 @@ import io.druid.data.input.impl.StringInputRowParser;
import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.GranularitySpec;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -101,11 +102,15 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
public final static InputRow parseInputRow(Object value, InputRowParser parser) public final static InputRow parseInputRow(Object value, InputRowParser parser)
{ {
if(parser instanceof StringInputRowParser && value instanceof Text) { if (parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before //Note: This is to ensure backward compatibility with 0.7.0 and before
return ((StringInputRowParser) parser).parse(value.toString()); return ((StringInputRowParser) parser).parse(value.toString());
} else if(value instanceof InputRow) { } else if (parser instanceof StringInputRowParser && value instanceof BytesWritable) {
return (InputRow)value; BytesWritable valueBytes = (BytesWritable) value;
ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes.getBytes(), 0, valueBytes.getLength());
return ((StringInputRowParser) parser).parse(valueBuffer);
} else if (value instanceof InputRow) {
return (InputRow) value;
} else { } else {
return parser.parse(value); return parser.parse(value);
} }

View File

@ -22,6 +22,7 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.api.client.util.Charsets;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -43,6 +44,15 @@ import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec; import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator; import org.joda.time.DateTimeComparator;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -56,8 +66,10 @@ import org.junit.runners.Parameterized;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -65,7 +77,8 @@ import java.util.Map;
public class IndexGeneratorJobTest public class IndexGeneratorJobTest
{ {
@Parameterized.Parameters(name = "partitionType={0}, interval={1}, shardInfoForEachSegment={2}, data={3}") @Parameterized.Parameters(name = "partitionType={0}, interval={1}, shardInfoForEachSegment={2}, data={3}, " +
"inputFormatName={4}")
public static Collection<Object[]> constructFeed() public static Collection<Object[]> constructFeed()
{ {
return Arrays.asList( return Arrays.asList(
@ -76,18 +89,18 @@ public class IndexGeneratorJobTest
"2014-10-22T00:00:00Z/P2D", "2014-10-22T00:00:00Z/P2D",
new String[][][]{ new String[][][]{
{ {
{ null, "c.example.com" }, {null, "c.example.com"},
{ "c.example.com", "e.example.com" }, {"c.example.com", "e.example.com"},
{ "e.example.com", "g.example.com" }, {"e.example.com", "g.example.com"},
{ "g.example.com", "i.example.com" }, {"g.example.com", "i.example.com"},
{ "i.example.com", null } {"i.example.com", null}
}, },
{ {
{ null, "c.example.com" }, {null, "c.example.com"},
{ "c.example.com", "e.example.com" }, {"c.example.com", "e.example.com"},
{ "e.example.com", "g.example.com" }, {"e.example.com", "g.example.com"},
{ "g.example.com", "i.example.com" }, {"g.example.com", "i.example.com"},
{ "i.example.com", null } {"i.example.com", null}
} }
}, },
ImmutableList.of( ImmutableList.of(
@ -111,7 +124,8 @@ public class IndexGeneratorJobTest
"2014102300,h.example.com,251", "2014102300,h.example.com,251",
"2014102300,i.example.com,963", "2014102300,i.example.com,963",
"2014102300,j.example.com,333" "2014102300,j.example.com,333"
) ),
null
}, },
{ {
false, false,
@ -119,10 +133,10 @@ public class IndexGeneratorJobTest
"2014-10-22T00:00:00Z/P1D", "2014-10-22T00:00:00Z/P1D",
new Integer[][][]{ new Integer[][][]{
{ {
{ 0, 4 }, {0, 4},
{ 1, 4 }, {1, 4},
{ 2, 4 }, {2, 4},
{ 3, 4 } {3, 4}
} }
}, },
ImmutableList.of( ImmutableList.of(
@ -144,7 +158,8 @@ public class IndexGeneratorJobTest
"2014102215,p.example.com,3533", "2014102215,p.example.com,3533",
"2014102216,q.example.com,500", "2014102216,q.example.com,500",
"2014102216,q.example.com,87" "2014102216,q.example.com,87"
) ),
null
}, },
{ {
true, true,
@ -152,10 +167,10 @@ public class IndexGeneratorJobTest
"2014-10-22T00:00:00Z/P1D", "2014-10-22T00:00:00Z/P1D",
new Integer[][][]{ new Integer[][][]{
{ {
{ 0, 4 }, {0, 4},
{ 1, 4 }, {1, 4},
{ 2, 4 }, {2, 4},
{ 3, 4 } {3, 4}
} }
}, },
ImmutableList.of( ImmutableList.of(
@ -177,7 +192,52 @@ public class IndexGeneratorJobTest
"2014102215,p.example.com,3533", "2014102215,p.example.com,3533",
"2014102216,q.example.com,500", "2014102216,q.example.com,500",
"2014102216,q.example.com,87" "2014102216,q.example.com,87"
) ),
null
},
{
false,
"single",
"2014-10-22T00:00:00Z/P2D",
new String[][][]{
{
{null, "c.example.com"},
{"c.example.com", "e.example.com"},
{"e.example.com", "g.example.com"},
{"g.example.com", "i.example.com"},
{"i.example.com", null}
},
{
{null, "c.example.com"},
{"c.example.com", "e.example.com"},
{"e.example.com", "g.example.com"},
{"g.example.com", "i.example.com"},
{"i.example.com", null}
}
},
ImmutableList.of(
"2014102200,a.example.com,100",
"2014102200,b.exmaple.com,50",
"2014102200,c.example.com,200",
"2014102200,d.example.com,250",
"2014102200,e.example.com,123",
"2014102200,f.example.com,567",
"2014102200,g.example.com,11",
"2014102200,h.example.com,251",
"2014102200,i.example.com,963",
"2014102200,j.example.com,333",
"2014102300,a.example.com,100",
"2014102300,b.exmaple.com,50",
"2014102300,c.example.com,200",
"2014102300,d.example.com,250",
"2014102300,e.example.com,123",
"2014102300,f.example.com,567",
"2014102300,g.example.com,11",
"2014102300,h.example.com,251",
"2014102300,i.example.com,963",
"2014102300,j.example.com,333"
),
SequenceFileInputFormat.class.getName()
} }
} }
); );
@ -194,13 +254,15 @@ public class IndexGeneratorJobTest
private Object[][][] shardInfoForEachSegment; private Object[][][] shardInfoForEachSegment;
private List<String> data; private List<String> data;
private boolean useCombiner; private boolean useCombiner;
private String inputFormatName;
public IndexGeneratorJobTest( public IndexGeneratorJobTest(
boolean useCombiner, boolean useCombiner,
String partitionType, String partitionType,
String interval, String interval,
Object[][][] shardInfoForEachSegment, Object[][][] shardInfoForEachSegment,
List<String> data List<String> data,
String inputFormatName
) throws IOException ) throws IOException
{ {
this.useCombiner = useCombiner; this.useCombiner = useCombiner;
@ -208,6 +270,34 @@ public class IndexGeneratorJobTest
this.shardInfoForEachSegment = shardInfoForEachSegment; this.shardInfoForEachSegment = shardInfoForEachSegment;
this.interval = new Interval(interval); this.interval = new Interval(interval);
this.data = data; this.data = data;
this.inputFormatName = inputFormatName;
}
private void writeDataToLocalSequenceFile(File outputFile, List<String> data) throws IOException
{
Configuration conf = new Configuration();
LocalFileSystem fs = FileSystem.getLocal(conf);
Writer fileWriter = SequenceFile.createWriter(
fs,
conf,
new Path(outputFile.getAbsolutePath()),
BytesWritable.class,
BytesWritable.class,
SequenceFile.CompressionType.NONE,
(CompressionCodec) null
);
int keyCount = 10;
for (String line : data) {
ByteBuffer buf = ByteBuffer.allocate(4);
buf.putInt(keyCount);
BytesWritable key = new BytesWritable(buf.array());
BytesWritable value = new BytesWritable(line.getBytes(Charsets.UTF_8));
fileWriter.append(key, value);
keyCount += 1;
}
fileWriter.close();
} }
@Before @Before
@ -222,7 +312,18 @@ public class IndexGeneratorJobTest
dataFile = temporaryFolder.newFile(); dataFile = temporaryFolder.newFile();
tmpDir = temporaryFolder.newFolder(); tmpDir = temporaryFolder.newFolder();
FileUtils.writeLines(dataFile, data); HashMap<String, Object> inputSpec = new HashMap<String, Object>();
inputSpec.put("paths", dataFile.getCanonicalPath());
inputSpec.put("type", "static");
if (inputFormatName != null) {
inputSpec.put("inputFormat", inputFormatName);
}
if (SequenceFileInputFormat.class.getName().equals(inputFormatName)) {
writeDataToLocalSequenceFile(dataFile, data);
} else {
FileUtils.writeLines(dataFile, data);
}
config = new HadoopDruidIndexerConfig( config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec( new HadoopIngestionSpec(
@ -245,12 +346,7 @@ public class IndexGeneratorJobTest
) )
), ),
new HadoopIOConfig( new HadoopIOConfig(
ImmutableMap.<String, Object>of( ImmutableMap.copyOf(inputSpec),
"paths",
dataFile.getCanonicalPath(),
"type",
"static"
),
null, null,
tmpDir.getCanonicalPath() tmpDir.getCanonicalPath()
), ),