diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufInputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufInputRowParser.java index 38ee99663e4..7e7b7494083 100644 --- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufInputRowParser.java +++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufInputRowParser.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; @@ -20,7 +21,7 @@ import com.metamx.druid.input.InputRow; /** * @author jan.rudert */ -public class ProtoBufInputRowParser implements InputRowParser +public class ProtoBufInputRowParser implements InputRowParser { private final MapInputRowParser inputRowCreator; @@ -41,7 +42,7 @@ public class ProtoBufInputRowParser implements InputRowParser } @Override - public InputRow parse(byte[] input) + public InputRow parse(ByteString input) { Map theMap = buildStringKeyMap(input); @@ -49,7 +50,7 @@ public class ProtoBufInputRowParser implements InputRowParser return inputRowCreator.parse(theMap); } - private Map buildStringKeyMap(byte[] input) + private Map buildStringKeyMap(ByteString input) { Map theMap = Maps.newHashMap(); @@ -60,7 +61,7 @@ public class ProtoBufInputRowParser implements InputRowParser for (Map.Entry entry : allFields.entrySet()) { - String name = entry.getKey().getName(); + String name = entry.getKey().getName().toLowerCase(); if (theMap.containsKey(name)) { continue; diff --git a/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoBufInputRowParserTest.java b/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoBufInputRowParserTest.java index c6116af3946..f17d9cf9749 100644 --- a/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoBufInputRowParserTest.java +++ b/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoBufInputRowParserTest.java @@ -7,6 +7,7 @@ import java.io.ByteArrayOutputStream; import java.util.Arrays; import java.util.List; +import com.google.protobuf.ByteString; import org.joda.time.DateTime; import org.junit.Test; @@ -59,7 +60,7 @@ public class ProtoBufInputRowParserTest { ByteArrayOutputStream out = new ByteArrayOutputStream(); event.writeTo(out); - InputRow row = parser.parse(out.toByteArray()); + InputRow row = parser.parse(ByteString.copyFrom(out.toByteArray())); System.out.println(row); assertEquals(Arrays.asList(DIMENSIONS), row.getDimensions()); assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java index 3c6d528d6a6..514cb1025b2 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import com.google.protobuf.ByteString; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; @@ -212,8 +213,6 @@ public class KafkaFirehoseFactory implements FirehoseFactory private class ProtoBufMessageFirehose extends AbstractKafkaFirehose { - private ByteBuffer bytes = null; - public ProtoBufMessageFirehose(KafkaStream stream, ConsumerConnector connector) { super(connector, stream); @@ -222,23 +221,7 @@ public class KafkaFirehoseFactory implements FirehoseFactory @Override public InputRow parseMessage(Message message) throws FormattedException { - - int payloadSize = message.payloadSize(); - if (bytes == null || bytes.remaining() < payloadSize) - { - bytes = ByteBuffer.allocate(payloadSize); - } - - bytes.put(message.payload()); - - bytes.flip(); - try - { - return parser.parse(bytes); - } finally - { - bytes.clear(); - } + return parser.parse(ByteString.copyFrom(message.payload())); } }