Merge pull request #209 from zanox/kafka-protobuf

Kafka protobuf
This commit is contained in:
cheddar 2013-08-13 18:38:27 -07:00
commit c2305a82f9
13 changed files with 1491 additions and 152 deletions

View File

@ -79,6 +79,11 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -0,0 +1,14 @@
package com.metamx.druid.indexer.data;
import java.nio.ByteBuffer;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class),
@JsonSubTypes.Type(name = "string", value = StringInputRowParser.class)
})
public interface ByteBufferInputRowParser extends InputRowParser<ByteBuffer> {
}

View File

@ -1,8 +1,15 @@
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.exception.FormattedException;
import com.metamx.druid.input.InputRow;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
@JsonSubTypes({
@JsonSubTypes.Type(name = "string", value = StringInputRowParser.class),
@JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class)
})
public interface InputRowParser<T>
{
public InputRow parse(T input) throws FormattedException;

View File

@ -2,6 +2,7 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException;
@ -16,18 +17,20 @@ import java.util.Set;
public class MapInputRowParser implements InputRowParser<Map<String, Object>>
{
private final TimestampSpec timestampSpec;
private final DataSpec dataSpec;
private List<String> dimensions;
private final Set<String> dimensionExclusions;
@JsonCreator
public MapInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions
)
{
this.timestampSpec = timestampSpec;
this.dataSpec = dataSpec;
if (dimensions != null) {
this.dimensions = ImmutableList.copyOf(dimensions);
}
this.dimensionExclusions = Sets.newHashSet();
if (dimensionExclusions != null) {
for (String dimensionExclusion : dimensionExclusions) {
@ -40,8 +43,8 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
@Override
public InputRow parse(Map<String, Object> theMap) throws FormattedException
{
final List<String> dimensions = dataSpec.hasCustomDimensions()
? dataSpec.getDimensions()
final List<String> dimensions = hasCustomDimensions()
? this.dimensions
: Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions));
final DateTime timestamp;
@ -67,6 +70,10 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);
}
private boolean hasCustomDimensions() {
return dimensions != null;
}
@Override
public void addDimensionExclusion(String dimension)
{
@ -79,10 +86,10 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
return timestampSpec;
}
@JsonProperty("data")
public DataSpec getDataSpec()
@JsonProperty
public List<String> getDimensions()
{
return dataSpec;
return dimensions;
}
@JsonProperty

View File

@ -0,0 +1,108 @@
package com.metamx.druid.indexer.data;
import static com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import static com.google.protobuf.Descriptors.Descriptor;
import static com.google.protobuf.Descriptors.FileDescriptor;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.metamx.common.exception.FormattedException;
import com.metamx.druid.input.InputRow;
public class ProtoBufInputRowParser implements ByteBufferInputRowParser
{
private final MapInputRowParser inputRowCreator;
private final Descriptor descriptor;
@JsonCreator
public ProtoBufInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions,
@JsonProperty("descriptor") String descriptorFileInClasspath)
{
descriptor = getDescriptor(descriptorFileInClasspath);
inputRowCreator = new MapInputRowParser(timestampSpec, dimensions, dimensionExclusions);
}
@Override
public InputRow parse(ByteBuffer input) throws FormattedException
{
// TODO there should be a ProtoBufBasedInputRow that does not need an intermediate map but accesses
// the DynamicMessage directly
Map<String, Object> theMap = buildStringKeyMap(input);
return inputRowCreator.parse(theMap);
}
private Map<String, Object> buildStringKeyMap(ByteBuffer input)
{
Map<String, Object> theMap = Maps.newHashMap();
try
{
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
Map<Descriptors.FieldDescriptor, Object> allFields = message.getAllFields();
for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : allFields.entrySet())
{
String name = entry.getKey().getName();
if (theMap.containsKey(name))
{
continue;
// TODO
// throw new RuntimeException("dupicate key " + name + " in " +
// message);
}
Object value = entry.getValue();
if(value instanceof Descriptors.EnumValueDescriptor) {
Descriptors.EnumValueDescriptor desc = (Descriptors.EnumValueDescriptor) value;
value = desc.getName();
}
theMap.put(name, value);
}
} catch (InvalidProtocolBufferException e)
{
// TODO
e.printStackTrace();
}
return theMap;
}
private Descriptor getDescriptor(String descriptorFileInClassPath)
{
try
{
InputStream fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFileInClassPath);
FileDescriptorSet set = FileDescriptorSet.parseFrom(fin);
FileDescriptor file = FileDescriptor.buildFrom(set.getFile(0), new FileDescriptor[]
{});
return file.getMessageTypes().get(0);
} catch (Exception e)
{
throw Throwables.propagate(e);
}
}
@Override
public void addDimensionExclusion(String dimension)
{
inputRowCreator.addDimensionExclusion(dimension);
}
}

View File

@ -19,49 +19,106 @@
package com.metamx.druid.indexer.data;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Charsets;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser;
import com.metamx.druid.input.InputRow;
import java.util.List;
import java.util.Map;
/**
*/
public class StringInputRowParser implements InputRowParser<String>
public class StringInputRowParser implements ByteBufferInputRowParser
{
private final InputRowParser<Map<String, Object>> inputRowCreator;
private final Parser<String, Object> parser;
private final InputRowParser<Map<String, Object>> inputRowCreator;
private final Parser<String, Object> parser;
@JsonCreator
public StringInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions
)
{
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec, dimensionExclusions);
this.parser = new ToLowerCaseParser(dataSpec.getParser());
}
private CharBuffer chars = null;
public void addDimensionExclusion(String dimension)
{
inputRowCreator.addDimensionExclusion(dimension);
}
@JsonCreator
public StringInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions)
{
this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec.getDimensions(), dimensionExclusions);
this.parser = new ToLowerCaseParser(dataSpec.getParser());
}
@Override
public InputRow parse(String input) throws FormattedException
{
return inputRowCreator.parse(parser.parse(input));
}
public void addDimensionExclusion(String dimension)
{
inputRowCreator.addDimensionExclusion(dimension);
}
@JsonValue
public InputRowParser<Map<String, Object>> getInputRowCreator()
{
return inputRowCreator;
}
@Override
public InputRow parse(ByteBuffer input) throws FormattedException
{
return parseMap(buildStringKeyMap(input));
}
private Map<String, Object> buildStringKeyMap(ByteBuffer input)
{
int payloadSize = input.remaining();
if (chars == null || chars.remaining() < payloadSize)
{
chars = CharBuffer.allocate(payloadSize);
}
final CoderResult coderResult = Charsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
.decode(input, chars, true);
Map<String, Object> theMap;
if (coderResult.isUnderflow())
{
chars.flip();
try
{
theMap = parseString(chars.toString());
} finally
{
chars.clear();
}
}
else
{
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Failed with CoderResult[%s]", coderResult))
.build();
}
return theMap;
}
private Map<String, Object> parseString(String inputString)
{
return parser.parse(inputString);
}
public InputRow parse(String input) throws FormattedException
{
return parseMap(parseString(input));
}
private InputRow parseMap(Map<String, Object> theMap)
{
return inputRowCreator.parse(theMap);
}
@JsonValue
public InputRowParser<Map<String, Object>> getInputRowCreator()
{
return inputRowCreator;
}
}

View File

@ -0,0 +1,83 @@
package com.metamx.druid.indexer.data;
import static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import com.google.protobuf.ByteString;
import org.joda.time.DateTime;
import org.junit.Test;
import com.metamx.druid.input.InputRow;
public class ProtoBufInputRowParserTest {
public static final String[] DIMENSIONS = new String[]{"eventType", "id", "someOtherId", "isValid"};
/*
eventType = 1;
required uint64 id = 2;
required string timestamp = 3;
optional uint32 someOtherId = 4;
optional bool isValid = 5;
optional string description = 6;
optional float someFloatColumn = 7;
optional uint32 someIntColumn = 8;
optional uint64 someLongColumn = 9;
*/
@Test
public void testParse() throws Exception {
//configure parser with desc file
ProtoBufInputRowParser parser = new ProtoBufInputRowParser(new TimestampSpec("timestamp", "iso"),
Arrays.asList(DIMENSIONS), Arrays.<String>asList(), "prototest.desc");
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 07, 12, 9, 30);
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
InputRow row = parser.parse(ByteBuffer.wrap(out.toByteArray()));
System.out.println(row);
assertEquals(Arrays.asList(DIMENSIONS), row.getDimensions());
assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");
assertDimensionEquals(row, "eventType", CATEGORY_ONE.name());
assertEquals(47.11F, row.getFloatMetric("someFloatColumn"), 0.0);
assertEquals(815.0F, row.getFloatMetric("someIntColumn"), 0.0);
assertEquals(816.0F, row.getFloatMetric("someLongColumn"), 0.0);
}
private void assertDimensionEquals(InputRow row, String dimension, Object expected) {
List<String> values = row.getDimension(dimension);
assertEquals(1, values.size());
assertEquals(expected, values.get(0));
}
}

View File

@ -0,0 +1,31 @@
package prototest;
option java_package = "com.metamx.druid.indexer.data";
option java_outer_classname = "ProtoTestEventWrapper";
message ProtoTestEvent {
enum EventCategory {
CATEGORY_ZERO = 0;
CATEGORY_ONE = 1;
CATEGORY_TWO = 2;
}
required EventCategory eventType = 1;
required uint64 id = 2;
required string timestamp = 3;
optional uint32 someOtherId = 4;
optional bool isValid = 5;
optional string description = 6;
optional float someFloatColumn = 7;
optional uint32 someIntColumn = 8;
optional uint64 someLongColumn = 9;
}

Binary file not shown.

View File

@ -325,6 +325,12 @@
<artifactId>lz4</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.4.0a</version>
</dependency>
<!-- Test Scope -->

View File

@ -19,14 +19,14 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.metamx.druid.indexer.data.ByteBufferInputRowParser;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
@ -34,131 +34,123 @@ import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import java.io.IOException;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.InputRowParser;
import com.metamx.druid.input.InputRow;
/**
*/
public class KafkaFirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(KafkaFirehoseFactory.class);
private static final Logger log = new Logger(KafkaFirehoseFactory.class);
@JsonProperty
private final Properties consumerProps;
@JsonProperty
private final Properties consumerProps;
@JsonProperty
private final String feed;
@JsonProperty
private final String feed;
@JsonProperty
private final StringInputRowParser parser;
@JsonProperty
private final ByteBufferInputRowParser parser;
@JsonCreator
public KafkaFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
@JsonProperty("parser") StringInputRowParser parser
)
{
this.consumerProps = consumerProps;
this.feed = feed;
this.parser = parser;
@JsonCreator
public KafkaFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
@JsonProperty("parser") ByteBufferInputRowParser parser)
{
this.consumerProps = consumerProps;
this.feed = feed;
this.parser = parser;
parser.addDimensionExclusion("feed");
}
parser.addDimensionExclusion("feed");
}
@Override
public Firehose connect() throws IOException
{
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
@Override
public Firehose connect() throws IOException
{
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final List<KafkaStream<Message>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1) {
return null;
}
final List<KafkaStream<Message>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1)
{
return null;
}
final KafkaStream<Message> stream = streamList.get(0);
final KafkaStream<Message> stream = streamList.get(0);
return new Firehose()
{
Iterator<MessageAndMetadata<Message>> iter = stream.iterator();
private CharBuffer chars = null;
return new DefaultFirehose(connector, stream, parser);
}
@Override
public boolean hasMore()
{
return iter.hasNext();
}
private static class DefaultFirehose implements Firehose
{
private final ConsumerConnector connector;
private final Iterator<MessageAndMetadata<Message>> iter;
private final InputRowParser<ByteBuffer> parser;
@Override
public InputRow nextRow() throws FormattedException
{
final Message message = iter.next().message();
public DefaultFirehose(ConsumerConnector connector, KafkaStream<Message> stream, InputRowParser<ByteBuffer> parser)
{
iter = stream.iterator();
this.connector = connector;
this.parser = parser;
}
if (message == null) {
return null;
}
@Override
public boolean hasMore()
{
return iter.hasNext();
}
int payloadSize = message.payloadSize();
if (chars == null || chars.remaining() < payloadSize) {
chars = CharBuffer.allocate(payloadSize);
}
@Override
public InputRow nextRow() throws FormattedException
{
final Message message = iter.next().message();
final CoderResult coderResult = Charsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
.decode(message.payload(), chars, true);
if (message == null)
{
return null;
}
if (coderResult.isUnderflow()) {
chars.flip();
try {
return parser.parse(chars.toString());
}
finally {
chars.clear();
}
}
else {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Failed with CoderResult[%s]", coderResult))
.build();
}
}
return parseMessage(message);
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
/*
This is actually not going to do exactly what we want, cause it will be called asynchronously
after the persist is complete. So, it's going to commit that it's processed more than was actually
persisted. This is unfortunate, but good enough for now. Should revisit along with an upgrade
of our Kafka version.
*/
public InputRow parseMessage(Message message) throws FormattedException
{
return parser.parse(message.payload());
}
log.info("committing offsets");
connector.commitOffsets();
}
};
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
/*
* This is actually not going to do exactly what we want, cause it
* will be called asynchronously after the persist is complete. So,
* it's going to commit that it's processed more than was actually
* persisted. This is unfortunate, but good enough for now. Should
* revisit along with an upgrade of our Kafka version.
*/
@Override
public void close() throws IOException
{
connector.shutdown();
}
};
}
log.info("committing offsets");
connector.commitOffsets();
}
};
}
@Override
public void close() throws IOException
{
connector.shutdown();
}
}
}

View File

@ -10,9 +10,6 @@ import org.junit.Test;
import java.util.Arrays;
/**
* @author jan.rudert
*/
public class DataSegmentPusherUtilTest {
@Test
public void shouldNotHaveColonsInHdfsStorageDir() throws Exception {