diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml
index cbd7b144e01..50feb9274c9 100644
--- a/indexing-common/pom.xml
+++ b/indexing-common/pom.xml
@@ -79,6 +79,11 @@
commons-io
commons-io
+
+ com.google.protobuf
+ protobuf-java
+
+
diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/ByteBufferInputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ByteBufferInputRowParser.java
new file mode 100644
index 00000000000..bc3721bf719
--- /dev/null
+++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ByteBufferInputRowParser.java
@@ -0,0 +1,14 @@
+package com.metamx.druid.indexer.data;
+
+import java.nio.ByteBuffer;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class),
+ @JsonSubTypes.Type(name = "string", value = StringInputRowParser.class)
+})
+public interface ByteBufferInputRowParser extends InputRowParser {
+}
diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java
index 231cbd44102..960ac8719a7 100644
--- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java
+++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java
@@ -1,8 +1,15 @@
package com.metamx.druid.indexer.data;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.exception.FormattedException;
import com.metamx.druid.input.InputRow;
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
+@JsonSubTypes({
+ @JsonSubTypes.Type(name = "string", value = StringInputRowParser.class),
+ @JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class)
+})
public interface InputRowParser
{
public InputRow parse(T input) throws FormattedException;
diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java
index b2d9586f272..b39b70471fa 100644
--- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java
+++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java
@@ -2,6 +2,7 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException;
@@ -16,18 +17,20 @@ import java.util.Set;
public class MapInputRowParser implements InputRowParser
+
+ com.google.protobuf
+ protobuf-java
+ 2.4.0a
+
+
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java
index 1c189ed95e9..01debf2a0a2 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java
@@ -19,14 +19,14 @@
package com.metamx.druid.realtime.firehose;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.metamx.common.exception.FormattedException;
-import com.metamx.common.logger.Logger;
-import com.metamx.druid.indexer.data.StringInputRowParser;
-import com.metamx.druid.input.InputRow;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.metamx.druid.indexer.data.ByteBufferInputRowParser;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
@@ -34,131 +34,123 @@ import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
-import java.io.IOException;
-import java.nio.CharBuffer;
-import java.nio.charset.CoderResult;
-import java.nio.charset.CodingErrorAction;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableMap;
+import com.metamx.common.exception.FormattedException;
+import com.metamx.common.logger.Logger;
+import com.metamx.druid.indexer.data.InputRowParser;
+import com.metamx.druid.input.InputRow;
/**
*/
public class KafkaFirehoseFactory implements FirehoseFactory
{
- private static final Logger log = new Logger(KafkaFirehoseFactory.class);
+ private static final Logger log = new Logger(KafkaFirehoseFactory.class);
- @JsonProperty
- private final Properties consumerProps;
+ @JsonProperty
+ private final Properties consumerProps;
- @JsonProperty
- private final String feed;
+ @JsonProperty
+ private final String feed;
- @JsonProperty
- private final StringInputRowParser parser;
+ @JsonProperty
+ private final ByteBufferInputRowParser parser;
- @JsonCreator
- public KafkaFirehoseFactory(
- @JsonProperty("consumerProps") Properties consumerProps,
- @JsonProperty("feed") String feed,
- @JsonProperty("parser") StringInputRowParser parser
- )
- {
- this.consumerProps = consumerProps;
- this.feed = feed;
- this.parser = parser;
+ @JsonCreator
+ public KafkaFirehoseFactory(
+ @JsonProperty("consumerProps") Properties consumerProps,
+ @JsonProperty("feed") String feed,
+ @JsonProperty("parser") ByteBufferInputRowParser parser)
+ {
+ this.consumerProps = consumerProps;
+ this.feed = feed;
+ this.parser = parser;
- parser.addDimensionExclusion("feed");
- }
+ parser.addDimensionExclusion("feed");
+ }
- @Override
- public Firehose connect() throws IOException
- {
- final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
+ @Override
+ public Firehose connect() throws IOException
+ {
+ final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
- final Map>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
+ final Map>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
- final List> streamList = streams.get(feed);
- if (streamList == null || streamList.size() != 1) {
- return null;
- }
+ final List> streamList = streams.get(feed);
+ if (streamList == null || streamList.size() != 1)
+ {
+ return null;
+ }
- final KafkaStream stream = streamList.get(0);
+ final KafkaStream stream = streamList.get(0);
- return new Firehose()
- {
- Iterator> iter = stream.iterator();
- private CharBuffer chars = null;
+ return new DefaultFirehose(connector, stream, parser);
+ }
- @Override
- public boolean hasMore()
- {
- return iter.hasNext();
- }
+ private static class DefaultFirehose implements Firehose
+ {
+ private final ConsumerConnector connector;
+ private final Iterator> iter;
+ private final InputRowParser parser;
- @Override
- public InputRow nextRow() throws FormattedException
- {
- final Message message = iter.next().message();
+ public DefaultFirehose(ConsumerConnector connector, KafkaStream stream, InputRowParser parser)
+ {
+ iter = stream.iterator();
+ this.connector = connector;
+ this.parser = parser;
+ }
- if (message == null) {
- return null;
- }
+ @Override
+ public boolean hasMore()
+ {
+ return iter.hasNext();
+ }
- int payloadSize = message.payloadSize();
- if (chars == null || chars.remaining() < payloadSize) {
- chars = CharBuffer.allocate(payloadSize);
- }
+ @Override
+ public InputRow nextRow() throws FormattedException
+ {
+ final Message message = iter.next().message();
- final CoderResult coderResult = Charsets.UTF_8.newDecoder()
- .onMalformedInput(CodingErrorAction.REPLACE)
- .onUnmappableCharacter(CodingErrorAction.REPLACE)
- .decode(message.payload(), chars, true);
+ if (message == null)
+ {
+ return null;
+ }
- if (coderResult.isUnderflow()) {
- chars.flip();
- try {
- return parser.parse(chars.toString());
- }
- finally {
- chars.clear();
- }
- }
- else {
- throw new FormattedException.Builder()
- .withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
- .withMessage(String.format("Failed with CoderResult[%s]", coderResult))
- .build();
- }
- }
+ return parseMessage(message);
+ }
- @Override
- public Runnable commit()
- {
- return new Runnable()
- {
- @Override
- public void run()
- {
- /*
- This is actually not going to do exactly what we want, cause it will be called asynchronously
- after the persist is complete. So, it's going to commit that it's processed more than was actually
- persisted. This is unfortunate, but good enough for now. Should revisit along with an upgrade
- of our Kafka version.
- */
+ public InputRow parseMessage(Message message) throws FormattedException
+ {
+ return parser.parse(message.payload());
+ }
- log.info("committing offsets");
- connector.commitOffsets();
- }
- };
- }
+ @Override
+ public Runnable commit()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ /*
+ * This is actually not going to do exactly what we want, cause it
+ * will be called asynchronously after the persist is complete. So,
+ * it's going to commit that it's processed more than was actually
+ * persisted. This is unfortunate, but good enough for now. Should
+ * revisit along with an upgrade of our Kafka version.
+ */
- @Override
- public void close() throws IOException
- {
- connector.shutdown();
- }
- };
- }
+ log.info("committing offsets");
+ connector.commitOffsets();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ connector.shutdown();
+ }
+ }
}
diff --git a/server/src/test/java/com/metamx/druid/loading/DataSegmentPusherUtilTest.java b/server/src/test/java/com/metamx/druid/loading/DataSegmentPusherUtilTest.java
index f367fceab47..2337f882098 100644
--- a/server/src/test/java/com/metamx/druid/loading/DataSegmentPusherUtilTest.java
+++ b/server/src/test/java/com/metamx/druid/loading/DataSegmentPusherUtilTest.java
@@ -10,9 +10,6 @@ import org.junit.Test;
import java.util.Arrays;
-/**
- * @author jan.rudert
- */
public class DataSegmentPusherUtilTest {
@Test
public void shouldNotHaveColonsInHdfsStorageDir() throws Exception {