refactored StringInputRowParser to handle a ByteBuffer, better wiring and configuration of KafkaFirehose

This commit is contained in:
Jan Rudert 2013-07-19 14:12:53 +02:00
parent d3412c851b
commit 527c6c81c5
13 changed files with 151 additions and 215 deletions

View File

@ -36,6 +36,7 @@ import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.zip.GZIPInputStream;
@ -118,7 +119,7 @@ public class FlightsFirehoseFactory implements FirehoseFactory
@Override
public InputRow nextRow()
{
final InputRow retVal = parser.parse(line);
final InputRow retVal = parser.parse(ByteBuffer.wrap(line.getBytes()));
line = null;
return retVal;
}

View File

@ -32,8 +32,7 @@ import java.util.List;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "json", value = JSONDataSpec.class),
@JsonSubTypes.Type(name = "csv", value = CSVDataSpec.class),
@JsonSubTypes.Type(name = "tsv", value = DelimitedDataSpec.class),
@JsonSubTypes.Type(name = "protobuf", value = ProtoBufDataSpec.class)
@JsonSubTypes.Type(name = "tsv", value = DelimitedDataSpec.class)
})
public interface DataSpec
{

View File

@ -8,6 +8,7 @@ import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import org.joda.time.DateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -15,18 +16,20 @@ import java.util.Set;
public class MapInputRowParser implements InputRowParser<Map<String, Object>>
{
private final TimestampSpec timestampSpec;
private final DataSpec dataSpec;
private List<String> dimensions;
private final Set<String> dimensionExclusions;
@JsonCreator
public MapInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions
)
{
this.timestampSpec = timestampSpec;
this.dataSpec = dataSpec;
if (dimensions != null) {
this.dimensions = Collections.unmodifiableList(Lists.newArrayList(dimensions));
}
this.dimensionExclusions = Sets.newHashSet();
if (dimensionExclusions != null) {
for (String dimensionExclusion : dimensionExclusions) {
@ -39,8 +42,8 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
@Override
public InputRow parse(Map<String, Object> theMap)
{
final List<String> dimensions = dataSpec.hasCustomDimensions()
? dataSpec.getDimensions()
final List<String> dimensions = hasCustomDimensions()
? this.dimensions
: Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions));
final DateTime timestamp = timestampSpec.extractTimestamp(theMap);
@ -57,6 +60,10 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);
}
private boolean hasCustomDimensions() {
return dimensions != null;
}
@Override
public void addDimensionExclusion(String dimension)
{
@ -69,10 +76,10 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
return timestampSpec;
}
@JsonProperty("data")
public DataSpec getDataSpec()
@JsonProperty
public List<String> getDimensions()
{
return dataSpec;
return dimensions;
}
@JsonProperty

View File

@ -1,69 +0,0 @@
package com.metamx.druid.indexer.data;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.Parser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
/**
* @author jan.rudert
*/
public class ProtoBufDataSpec implements DataSpec{
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
private final String descriptorFileInClasspath;
@JsonCreator
public ProtoBufDataSpec(
@JsonProperty("descriptor") String descriptorFileInClasspath,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
)
{
this.descriptorFileInClasspath = descriptorFileInClasspath;
this.dimensions = dimensions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
}
@JsonProperty("descriptor")
public String getDescriptorFileInClassPath() {
return descriptorFileInClasspath;
}
@JsonProperty("dimensions")
@Override
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty("spatialDimensions")
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@Override
public void verify(List<String> usedCols)
{
}
@Override
public boolean hasCustomDimensions()
{
return !(dimensions == null || dimensions.isEmpty());
}
@Override
public Parser<String, Object> getParser()
{
throw new UnsupportedOperationException("No String parser for protobuf data");
}
}

View File

@ -5,6 +5,7 @@ import static com.google.protobuf.Descriptors.Descriptor;
import static com.google.protobuf.Descriptors.FileDescriptor;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
@ -21,7 +22,7 @@ import com.metamx.druid.input.InputRow;
/**
* @author jan.rudert
*/
public class ProtoBufInputRowParser implements InputRowParser<ByteString>
public class ProtoBufInputRowParser implements InputRowParser<ByteBuffer>
{
private final MapInputRowParser inputRowCreator;
@ -30,19 +31,18 @@ public class ProtoBufInputRowParser implements InputRowParser<ByteString>
@JsonCreator
public ProtoBufInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") ProtoBufDataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions)
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions,
@JsonProperty("descriptor") String descriptorFileInClasspath)
{
descriptor = getDescriptor(dataSpec.getDescriptorFileInClassPath());
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec, dimensionExclusions);
descriptor = getDescriptor(descriptorFileInClasspath);
inputRowCreator = new MapInputRowParser(timestampSpec, dimensions, dimensionExclusions);
}
@Override
public InputRow parse(ByteString input)
public InputRow parse(ByteBuffer input)
{
Map<String, Object> theMap = buildStringKeyMap(input);
@ -50,18 +50,18 @@ public class ProtoBufInputRowParser implements InputRowParser<ByteString>
return inputRowCreator.parse(theMap);
}
private Map<String, Object> buildStringKeyMap(ByteString input)
private Map<String, Object> buildStringKeyMap(ByteBuffer input)
{
Map<String, Object> theMap = Maps.newHashMap();
try
{
DynamicMessage message = DynamicMessage.parseFrom(descriptor, input);
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
Map<Descriptors.FieldDescriptor, Object> allFields = message.getAllFields();
for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : allFields.entrySet())
{
String name = entry.getKey().getName().toLowerCase();
String name = entry.getKey().getName();
if (theMap.containsKey(name))
{
continue;

View File

@ -19,49 +19,94 @@
package com.metamx.druid.indexer.data;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Charsets;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser;
import com.metamx.druid.input.InputRow;
import java.util.List;
import java.util.Map;
/**
*/
public class StringInputRowParser implements InputRowParser<String>
public class StringInputRowParser implements InputRowParser<ByteBuffer>
{
private final InputRowParser<Map<String, Object>> inputRowCreator;
private final Parser<String, Object> parser;
private final InputRowParser<Map<String, Object>> inputRowCreator;
private final Parser<String, Object> parser;
@JsonCreator
public StringInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions
)
{
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec, dimensionExclusions);
this.parser = new ToLowerCaseParser(dataSpec.getParser());
}
private CharBuffer chars = null;
public void addDimensionExclusion(String dimension)
{
inputRowCreator.addDimensionExclusion(dimension);
}
@JsonCreator
public StringInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions)
{
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec.getDimensions(), dimensionExclusions);
this.parser = new ToLowerCaseParser(dataSpec.getParser());
}
@Override
public InputRow parse(String input) throws FormattedException
{
return inputRowCreator.parse(parser.parse(input));
}
public void addDimensionExclusion(String dimension)
{
inputRowCreator.addDimensionExclusion(dimension);
}
@JsonValue
public InputRowParser<Map<String, Object>> getInputRowCreator()
{
return inputRowCreator;
}
@Override
public InputRow parse(ByteBuffer input) throws FormattedException
{
Map<String, Object> theMap = buildStringKeyMap(input);
return inputRowCreator.parse(theMap);
}
private Map<String, Object> buildStringKeyMap(ByteBuffer input)
{
int payloadSize = input.limit();
if (chars == null || chars.remaining() < payloadSize)
{
chars = CharBuffer.allocate(payloadSize);
}
final CoderResult coderResult = Charsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
.decode(input, chars, true);
Map<String, Object> theMap;
if (coderResult.isUnderflow())
{
chars.flip();
try
{
theMap = parser.parse(chars.toString());
} finally
{
chars.clear();
}
}
else
{
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Failed with CoderResult[%s]", coderResult))
.build();
}
return theMap;
}
@JsonValue
public InputRowParser<Map<String, Object>> getInputRowCreator()
{
return inputRowCreator;
}
}

View File

@ -4,6 +4,7 @@ import static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
@ -37,10 +38,9 @@ public class ProtoBufInputRowParserTest {
@Test
public void testParse() throws Exception {
//configure pares with desc file
//configure parser with desc file
ProtoBufInputRowParser parser = new ProtoBufInputRowParser(new TimestampSpec("timestamp", "iso"),
new ProtoBufDataSpec("prototest.desc", Arrays.asList(DIMENSIONS), null),
Arrays.<String>asList());
Arrays.asList(DIMENSIONS), Arrays.<String>asList(), "prototest.desc");
//create binary of proto test event
@ -60,7 +60,7 @@ public class ProtoBufInputRowParserTest {
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
InputRow row = parser.parse(ByteString.copyFrom(out.toByteArray()));
InputRow row = parser.parse(ByteBuffer.wrap(out.toByteArray()));
System.out.println(row);
assertEquals(Arrays.asList(DIMENSIONS), row.getDimensions());
assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());

View File

@ -0,0 +1,23 @@
package com.metamx.druid.indexer.data;
import junit.framework.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
/**
* @author jan.rudert
*/
public class StringInputRowParserTest {
@Test
public void testPayloadSize() {
ByteBuffer payload = ByteBuffer.allocate(10);
payload.position(2);
payload.limit(5);
payload.rewind();
Assert.assertEquals(5, payload.limit());
}
}

View File

@ -9,6 +9,7 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.joda.time.DateTime;
import java.io.IOException;
import java.nio.ByteBuffer;
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<LongWritable, Text, KEYOUT, VALUEOUT>
{
@ -41,7 +42,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
try {
final InputRow inputRow;
try {
inputRow = parser.parse(value.toString());
inputRow = parser.parse(ByteBuffer.wrap(value.getBytes()));
}
catch (IllegalArgumentException e) {
if (config.isIgnoreInvalidRows()) {

View File

@ -280,7 +280,7 @@ public class IndexGeneratorJob implements Jobby
for (final Text value : values) {
context.progress();
final InputRow inputRow = index.getSpatialDimensionRowFormatter().formatRow(parser.parse(value.toString()));
final InputRow inputRow = index.getSpatialDimensionRowFormatter().formatRow(parser.parse(ByteBuffer.wrap(value.getBytes())));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow);

View File

@ -44,6 +44,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
@ -186,7 +187,7 @@ public class StaticS3FirehoseFactory implements FirehoseFactory
throw new NoSuchElementException();
}
return parser.parse(lineIterator.next());
return parser.parse(ByteBuffer.wrap(lineIterator.next().getBytes()));
}
@Override

View File

@ -21,15 +21,11 @@ package com.metamx.druid.realtime.firehose;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.google.protobuf.ByteString;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
@ -39,13 +35,10 @@ import kafka.message.MessageAndMetadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.InputRowParser;
import com.metamx.druid.indexer.data.ProtoBufInputRowParser;
import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow;
/**
@ -61,13 +54,13 @@ public class KafkaFirehoseFactory implements FirehoseFactory
private final String feed;
@JsonProperty
private final InputRowParser parser;
private final InputRowParser<ByteBuffer> parser;
@JsonCreator
public KafkaFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
@JsonProperty("parser") InputRowParser parser)
@JsonProperty("parser") InputRowParser<ByteBuffer> parser)
{
this.consumerProps = consumerProps;
this.feed = feed;
@ -91,26 +84,20 @@ public class KafkaFirehoseFactory implements FirehoseFactory
final KafkaStream<Message> stream = streamList.get(0);
if (parser instanceof StringInputRowParser)
{
return new StringMessageFirehose(connector, stream);
} else if (parser instanceof ProtoBufInputRowParser)
{
return new ProtoBufMessageFirehose(stream, connector);
}
throw new RuntimeException("No Firehose for parser: " + parser.getClass().getName());
return new DefaultFirehose(connector, stream, parser);
}
private abstract static class AbstractKafkaFirehose implements Firehose
private static class DefaultFirehose implements Firehose
{
protected final ConsumerConnector connector;
protected final Iterator<MessageAndMetadata<Message>> iter;
private final ConsumerConnector connector;
private final Iterator<MessageAndMetadata<Message>> iter;
private final InputRowParser<ByteBuffer> parser;
public AbstractKafkaFirehose(ConsumerConnector connector, KafkaStream<Message> stream)
public DefaultFirehose(ConsumerConnector connector, KafkaStream<Message> stream, InputRowParser<ByteBuffer> parser)
{
iter = stream.iterator();
this.connector = connector;
this.parser = parser;
}
@Override
@ -132,7 +119,10 @@ public class KafkaFirehoseFactory implements FirehoseFactory
return parseMessage(message);
}
protected abstract InputRow parseMessage(Message message);
public InputRow parseMessage(Message message) throws FormattedException
{
return parser.parse(message.payload());
}
@Override
public Runnable commit()
@ -162,67 +152,4 @@ public class KafkaFirehoseFactory implements FirehoseFactory
connector.shutdown();
}
}
private class StringMessageFirehose extends AbstractKafkaFirehose
{
private CharBuffer chars = null;
public StringMessageFirehose(ConsumerConnector connector, KafkaStream<Message> stream)
{
super(connector, stream);
}
@Override
public InputRow parseMessage(Message message) throws FormattedException
{
int payloadSize = message.payloadSize();
if (chars == null || chars.remaining() < payloadSize)
{
chars = CharBuffer.allocate(payloadSize);
}
final CoderResult coderResult = Charsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
.decode(message.payload(), chars, true);
if (coderResult.isUnderflow())
{
chars.flip();
try
{
return parser.parse(chars.toString());
} finally
{
chars.clear();
}
}
else
{
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Failed with CoderResult[%s]", coderResult))
.build();
}
}
}
private class ProtoBufMessageFirehose extends AbstractKafkaFirehose
{
public ProtoBufMessageFirehose(KafkaStream<Message> stream, ConsumerConnector connector)
{
super(connector, stream);
}
@Override
public InputRow parseMessage(Message message) throws FormattedException
{
return parser.parse(ByteString.copyFrom(message.payload()));
}
}
}

View File

@ -43,6 +43,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@ -176,7 +177,7 @@ public class TestIndex
runOnce = true;
}
retVal.add(parser.parse(line));
retVal.add(parser.parse(ByteBuffer.wrap(line.getBytes())));
++lineCount;
return true;