initial implementation of an protocol buffers firehose

This commit is contained in:
Jan Rudert 2013-07-12 13:34:57 +02:00
parent 0961d8f149
commit 89b0c84f3b
10 changed files with 1874 additions and 111 deletions

View File

@ -79,6 +79,12 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -32,7 +32,8 @@ import java.util.List;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "json", value = JSONDataSpec.class),
@JsonSubTypes.Type(name = "csv", value = CSVDataSpec.class),
@JsonSubTypes.Type(name = "tsv", value = DelimitedDataSpec.class)
@JsonSubTypes.Type(name = "tsv", value = DelimitedDataSpec.class),
@JsonSubTypes.Type(name = "protobuf", value = ProtoBufDataSpec.class)
})
public interface DataSpec
{

View File

@ -1,7 +1,14 @@
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.druid.input.InputRow;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@JsonSubTypes({
@JsonSubTypes.Type(name = "string", value = StringInputRowParser.class),
@JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class),
})
public interface InputRowParser<T>
{
public InputRow parse(T input);

View File

@ -0,0 +1,69 @@
package com.metamx.druid.indexer.data;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.Parser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
/**
* @author jan.rudert
*/
public class ProtoBufDataSpec implements DataSpec{
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
private final String descriptorFileInClasspath;
@JsonCreator
public ProtoBufDataSpec(
@JsonProperty("descriptor") String descriptorFileInClasspath,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
)
{
this.descriptorFileInClasspath = descriptorFileInClasspath;
this.dimensions = dimensions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
}
@JsonProperty("descriptor")
public String getDescriptorFileInClassPath() {
return descriptorFileInClasspath;
}
@JsonProperty("dimensions")
@Override
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty("spatialDimensions")
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@Override
public void verify(List<String> usedCols)
{
}
@Override
public boolean hasCustomDimensions()
{
return !(dimensions == null || dimensions.isEmpty());
}
@Override
public Parser<String, Object> getParser()
{
throw new UnsupportedOperationException("No String parser for protobuf data");
}
}

View File

@ -0,0 +1,108 @@
package com.metamx.druid.indexer.data;
import static com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import static com.google.protobuf.Descriptors.Descriptor;
import static com.google.protobuf.Descriptors.FileDescriptor;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.metamx.druid.input.InputRow;
/**
* @author jan.rudert
*/
public class ProtoBufInputRowParser implements InputRowParser<byte[]>
{
private final MapInputRowParser inputRowCreator;
private final Descriptor descriptor;
@JsonCreator
public ProtoBufInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") ProtoBufDataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions)
{
descriptor = getDescriptor(dataSpec.getDescriptorFileInClassPath());
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec, dimensionExclusions);
}
@Override
public InputRow parse(byte[] input)
{
Map<String, Object> theMap = buildStringKeyMap(input);
return inputRowCreator.parse(theMap);
}
private Map<String, Object> buildStringKeyMap(byte[] input)
{
Map<String, Object> theMap = Maps.newHashMap();
try
{
DynamicMessage message = DynamicMessage.parseFrom(descriptor, input);
Map<Descriptors.FieldDescriptor, Object> allFields = message.getAllFields();
for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : allFields.entrySet())
{
String name = entry.getKey().getName();
if (theMap.containsKey(name))
{
continue;
// TODO
// throw new RuntimeException("dupicate key " + name + " in " +
// message);
}
Object value = entry.getValue();
if(value instanceof Descriptors.EnumValueDescriptor) {
Descriptors.EnumValueDescriptor desc = (Descriptors.EnumValueDescriptor) value;
value = desc.getName();
}
theMap.put(name, value);
}
} catch (InvalidProtocolBufferException e)
{
// TODO
e.printStackTrace();
}
return theMap;
}
private Descriptor getDescriptor(String descriptorFileInClassPath)
{
try
{
InputStream fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFileInClassPath);
FileDescriptorSet set = FileDescriptorSet.parseFrom(fin);
FileDescriptor file = FileDescriptor.buildFrom(set.getFile(0), new FileDescriptor[]
{});
return file.getMessageTypes().get(0);
} catch (Exception e)
{
throw Throwables.propagate(e);
}
}
@Override
public void addDimensionExclusion(String dimension)
{
inputRowCreator.addDimensionExclusion(dimension);
}
}

View File

@ -0,0 +1,85 @@
package com.metamx.druid.indexer.data;
import static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.List;
import org.joda.time.DateTime;
import org.junit.Test;
import com.metamx.druid.input.InputRow;
/**
* @author jan.rudert
*/
public class ProtoBufInputRowParserTest {
public static final String[] DIMENSIONS = new String[]{"eventType", "id", "someOtherId", "isValid"};
/*
eventType = 1;
required uint64 id = 2;
required string timestamp = 3;
optional uint32 someOtherId = 4;
optional bool isValid = 5;
optional string description = 6;
optional float someFloatColumn = 7;
optional uint32 someIntColumn = 8;
optional uint64 someLongColumn = 9;
*/
@Test
public void testParse() throws Exception {
//configure pares with desc file
ProtoBufInputRowParser parser = new ProtoBufInputRowParser(new TimestampSpec("timestamp", "iso"),
new ProtoBufDataSpec("prototest.desc", Arrays.asList(DIMENSIONS), null),
Arrays.<String>asList());
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 07, 12, 9, 30);
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
InputRow row = parser.parse(out.toByteArray());
System.out.println(row);
assertEquals(Arrays.asList(DIMENSIONS), row.getDimensions());
assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");
assertDimensionEquals(row, "eventType", CATEGORY_ONE.name());
assertEquals(47.11F, row.getFloatMetric("someFloatColumn"), 0.0);
assertEquals(815.0F, row.getFloatMetric("someIntColumn"), 0.0);
assertEquals(816.0F, row.getFloatMetric("someLongColumn"), 0.0);
}
private void assertDimensionEquals(InputRow row, String dimension, Object expected) {
List<String> values = row.getDimension(dimension);
assertEquals(1, values.size());
assertEquals(expected, values.get(0));
}
}

View File

@ -0,0 +1,31 @@
package prototest;
option java_package = "com.metamx.druid.indexer.data";
option java_outer_classname = "ProtoTestEventWrapper";
message ProtoTestEvent {
enum EventCategory {
CATEGORY_ZERO = 0;
CATEGORY_ONE = 1;
CATEGORY_TWO = 2;
}
required EventCategory eventType = 1;
required uint64 id = 2;
required string timestamp = 3;
optional uint32 someOtherId = 4;
optional bool isValid = 5;
optional string description = 6;
optional float someFloatColumn = 7;
optional uint32 someIntColumn = 8;
optional uint64 someLongColumn = 9;
}

Binary file not shown.

View File

@ -19,22 +19,8 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
@ -43,122 +29,217 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.InputRowParser;
import com.metamx.druid.indexer.data.ProtoBufInputRowParser;
import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow;
/**
*/
public class KafkaFirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(KafkaFirehoseFactory.class);
private static final Logger log = new Logger(KafkaFirehoseFactory.class);
@JsonProperty
private final Properties consumerProps;
@JsonProperty
private final Properties consumerProps;
@JsonProperty
private final String feed;
@JsonProperty
private final String feed;
@JsonProperty
private final StringInputRowParser parser;
@JsonProperty
private final InputRowParser parser;
@JsonCreator
public KafkaFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
@JsonProperty("parser") StringInputRowParser parser
)
{
this.consumerProps = consumerProps;
this.feed = feed;
this.parser = parser;
@JsonCreator
public KafkaFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
@JsonProperty("parser") InputRowParser parser)
{
this.consumerProps = consumerProps;
this.feed = feed;
this.parser = parser;
parser.addDimensionExclusion("feed");
}
parser.addDimensionExclusion("feed");
}
@Override
public Firehose connect() throws IOException
{
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
@Override
public Firehose connect() throws IOException
{
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final List<KafkaStream<Message>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1) {
return null;
}
final List<KafkaStream<Message>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1)
{
return null;
}
final KafkaStream<Message> stream = streamList.get(0);
final KafkaStream<Message> stream = streamList.get(0);
return new Firehose()
{
Iterator<MessageAndMetadata<Message>> iter = stream.iterator();
private CharBuffer chars = null;
if (parser instanceof StringInputRowParser)
{
@Override
public boolean hasMore()
{
return iter.hasNext();
}
return new StringMessageFirehose(connector, stream);
} else if (parser instanceof ProtoBufInputRowParser)
{
return new ProtoBufMessageFirehose(stream, connector);
}
throw new RuntimeException("No Firehose for parser: " + parser.getClass().getName());
}
@Override
public InputRow nextRow() throws FormattedException
{
final Message message = iter.next().message();
private abstract static class AbstractKafkaFirehose implements Firehose
{
protected final ConsumerConnector connector;
protected final Iterator<MessageAndMetadata<Message>> iter;
if (message == null) {
return null;
}
public AbstractKafkaFirehose(ConsumerConnector connector, KafkaStream<Message> stream)
{
iter = stream.iterator();
this.connector = connector;
}
int payloadSize = message.payloadSize();
if (chars == null || chars.remaining() < payloadSize) {
chars = CharBuffer.allocate(payloadSize);
}
@Override
public boolean hasMore()
{
return iter.hasNext();
}
final CoderResult coderResult = Charsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
.decode(message.payload(), chars, true);
@Override
public InputRow nextRow() throws FormattedException
{
final Message message = iter.next().message();
if (coderResult.isUnderflow()) {
chars.flip();
try {
return parser.parse(chars.toString());
}
finally {
chars.clear();
}
}
else {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Failed with CoderResult[%s]", coderResult))
.build();
}
}
if (message == null)
{
return null;
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
/*
This is actually not going to do exactly what we want, cause it will be called asynchronously
after the persist is complete. So, it's going to commit that it's processed more than was actually
persisted. This is unfortunate, but good enough for now. Should revisit along with an upgrade
of our Kafka version.
*/
return parseMessage(message);
}
log.info("committing offsets");
connector.commitOffsets();
}
};
}
protected abstract InputRow parseMessage(Message message);
@Override
public void close() throws IOException
{
connector.shutdown();
}
};
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
/*
* This is actually not going to do exactly what we want, cause it
* will be called asynchronously after the persist is complete. So,
* it's going to commit that it's processed more than was actually
* persisted. This is unfortunate, but good enough for now. Should
* revisit along with an upgrade of our Kafka version.
*/
log.info("committing offsets");
connector.commitOffsets();
}
};
}
@Override
public void close() throws IOException
{
connector.shutdown();
}
}
private class StringMessageFirehose extends AbstractKafkaFirehose
{
private CharBuffer chars = null;
public StringMessageFirehose(ConsumerConnector connector, KafkaStream<Message> stream)
{
super(connector, stream);
}
@Override
public InputRow parseMessage(Message message) throws FormattedException
{
int payloadSize = message.payloadSize();
if (chars == null || chars.remaining() < payloadSize)
{
chars = CharBuffer.allocate(payloadSize);
}
final CoderResult coderResult = Charsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
.decode(message.payload(), chars, true);
if (coderResult.isUnderflow())
{
chars.flip();
try
{
return parser.parse(chars.toString());
} finally
{
chars.clear();
}
}
else
{
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Failed with CoderResult[%s]", coderResult))
.build();
}
}
}
private class ProtoBufMessageFirehose extends AbstractKafkaFirehose
{
private ByteBuffer bytes = null;
public ProtoBufMessageFirehose(KafkaStream<Message> stream, ConsumerConnector connector)
{
super(connector, stream);
}
@Override
public InputRow parseMessage(Message message) throws FormattedException
{
int payloadSize = message.payloadSize();
if (bytes == null || bytes.remaining() < payloadSize)
{
bytes = ByteBuffer.allocate(payloadSize);
}
bytes.put(message.payload());
bytes.flip();
try
{
return parser.parse(bytes);
} finally
{
bytes.clear();
}
}
}
}