diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml
index fd727328c49..d09ea33bd64 100644
--- a/indexing-common/pom.xml
+++ b/indexing-common/pom.xml
@@ -79,6 +79,12 @@
commons-io
commons-io
+
+ com.google.protobuf
+ protobuf-java
+ 2.5.0
+
+
diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java
index 099d0c8d535..36d942d8d62 100644
--- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java
+++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java
@@ -32,7 +32,8 @@ import java.util.List;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "json", value = JSONDataSpec.class),
@JsonSubTypes.Type(name = "csv", value = CSVDataSpec.class),
- @JsonSubTypes.Type(name = "tsv", value = DelimitedDataSpec.class)
+ @JsonSubTypes.Type(name = "tsv", value = DelimitedDataSpec.class),
+ @JsonSubTypes.Type(name = "protobuf", value = ProtoBufDataSpec.class)
})
public interface DataSpec
{
diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java
index 82a3c947bc3..25957379d9a 100644
--- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java
+++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java
@@ -1,7 +1,14 @@
package com.metamx.druid.indexer.data;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.druid.input.InputRow;
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class)
+@JsonSubTypes({
+ @JsonSubTypes.Type(name = "string", value = StringInputRowParser.class),
+ @JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class),
+})
public interface InputRowParser
{
public InputRow parse(T input);
diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufDataSpec.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufDataSpec.java
new file mode 100644
index 00000000000..042b0603061
--- /dev/null
+++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufDataSpec.java
@@ -0,0 +1,69 @@
+package com.metamx.druid.indexer.data;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import com.metamx.common.parsers.Parser;
+import com.metamx.druid.index.v1.SpatialDimensionSchema;
+
+/**
+ * @author jan.rudert
+ */
+public class ProtoBufDataSpec implements DataSpec{
+ private final List dimensions;
+ private final List spatialDimensions;
+ private final String descriptorFileInClasspath;
+
+ @JsonCreator
+ public ProtoBufDataSpec(
+ @JsonProperty("descriptor") String descriptorFileInClasspath,
+ @JsonProperty("dimensions") List dimensions,
+ @JsonProperty("spatialDimensions") List spatialDimensions
+ )
+ {
+ this.descriptorFileInClasspath = descriptorFileInClasspath;
+ this.dimensions = dimensions;
+ this.spatialDimensions = (spatialDimensions == null)
+ ? Lists.newArrayList()
+ : spatialDimensions;
+
+ }
+
+ @JsonProperty("descriptor")
+ public String getDescriptorFileInClassPath() {
+ return descriptorFileInClasspath;
+ }
+
+ @JsonProperty("dimensions")
+ @Override
+ public List getDimensions()
+ {
+ return dimensions;
+ }
+
+ @JsonProperty("spatialDimensions")
+ @Override
+ public List getSpatialDimensions()
+ {
+ return spatialDimensions;
+ }
+
+ @Override
+ public void verify(List usedCols)
+ {
+ }
+
+ @Override
+ public boolean hasCustomDimensions()
+ {
+ return !(dimensions == null || dimensions.isEmpty());
+ }
+
+ @Override
+ public Parser getParser()
+ {
+ throw new UnsupportedOperationException("No String parser for protobuf data");
+ }
+}
diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufInputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufInputRowParser.java
new file mode 100644
index 00000000000..38ee99663e4
--- /dev/null
+++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufInputRowParser.java
@@ -0,0 +1,108 @@
+package com.metamx.druid.indexer.data;
+
+import static com.google.protobuf.DescriptorProtos.FileDescriptorSet;
+import static com.google.protobuf.Descriptors.Descriptor;
+import static com.google.protobuf.Descriptors.FileDescriptor;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.metamx.druid.input.InputRow;
+
+/**
+ * @author jan.rudert
+ */
+public class ProtoBufInputRowParser implements InputRowParser
+{
+
+ private final MapInputRowParser inputRowCreator;
+ private final Descriptor descriptor;
+
+ @JsonCreator
+ public ProtoBufInputRowParser(
+ @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
+ @JsonProperty("data") ProtoBufDataSpec dataSpec,
+ @JsonProperty("dimensionExclusions") List dimensionExclusions)
+ {
+
+ descriptor = getDescriptor(dataSpec.getDescriptorFileInClassPath());
+
+
+ this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec, dimensionExclusions);
+
+ }
+
+ @Override
+ public InputRow parse(byte[] input)
+ {
+
+ Map theMap = buildStringKeyMap(input);
+
+ return inputRowCreator.parse(theMap);
+ }
+
+ private Map buildStringKeyMap(byte[] input)
+ {
+ Map theMap = Maps.newHashMap();
+
+ try
+ {
+ DynamicMessage message = DynamicMessage.parseFrom(descriptor, input);
+ Map allFields = message.getAllFields();
+
+ for (Map.Entry entry : allFields.entrySet())
+ {
+ String name = entry.getKey().getName();
+ if (theMap.containsKey(name))
+ {
+ continue;
+ // TODO
+ // throw new RuntimeException("dupicate key " + name + " in " +
+ // message);
+ }
+ Object value = entry.getValue();
+ if(value instanceof Descriptors.EnumValueDescriptor) {
+ Descriptors.EnumValueDescriptor desc = (Descriptors.EnumValueDescriptor) value;
+ value = desc.getName();
+ }
+
+ theMap.put(name, value);
+ }
+
+ } catch (InvalidProtocolBufferException e)
+ {
+ // TODO
+ e.printStackTrace();
+ }
+ return theMap;
+ }
+
+ private Descriptor getDescriptor(String descriptorFileInClassPath)
+ {
+ try
+ {
+ InputStream fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFileInClassPath);
+ FileDescriptorSet set = FileDescriptorSet.parseFrom(fin);
+ FileDescriptor file = FileDescriptor.buildFrom(set.getFile(0), new FileDescriptor[]
+ {});
+ return file.getMessageTypes().get(0);
+ } catch (Exception e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void addDimensionExclusion(String dimension)
+ {
+ inputRowCreator.addDimensionExclusion(dimension);
+ }
+}
diff --git a/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoBufInputRowParserTest.java b/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoBufInputRowParserTest.java
new file mode 100644
index 00000000000..c6116af3946
--- /dev/null
+++ b/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoBufInputRowParserTest.java
@@ -0,0 +1,85 @@
+package com.metamx.druid.indexer.data;
+
+import static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import com.metamx.druid.input.InputRow;
+
+/**
+ * @author jan.rudert
+ */
+public class ProtoBufInputRowParserTest {
+
+ public static final String[] DIMENSIONS = new String[]{"eventType", "id", "someOtherId", "isValid"};
+
+ /*
+ eventType = 1;
+
+ required uint64 id = 2;
+ required string timestamp = 3;
+ optional uint32 someOtherId = 4;
+ optional bool isValid = 5;
+ optional string description = 6;
+
+ optional float someFloatColumn = 7;
+ optional uint32 someIntColumn = 8;
+ optional uint64 someLongColumn = 9;
+ */
+
+ @Test
+ public void testParse() throws Exception {
+
+ //configure pares with desc file
+ ProtoBufInputRowParser parser = new ProtoBufInputRowParser(new TimestampSpec("timestamp", "iso"),
+ new ProtoBufDataSpec("prototest.desc", Arrays.asList(DIMENSIONS), null),
+ Arrays.asList());
+
+
+ //create binary of proto test event
+ DateTime dateTime = new DateTime(2012, 07, 12, 9, 30);
+ ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
+ .setDescription("description")
+ .setEventType(CATEGORY_ONE)
+ .setId(4711L)
+ .setIsValid(true)
+ .setSomeOtherId(4712)
+ .setTimestamp(dateTime.toString())
+ .setSomeFloatColumn(47.11F)
+ .setSomeIntColumn(815)
+ .setSomeLongColumn(816L)
+ .build();
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ event.writeTo(out);
+
+ InputRow row = parser.parse(out.toByteArray());
+ System.out.println(row);
+ assertEquals(Arrays.asList(DIMENSIONS), row.getDimensions());
+ assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
+
+ assertDimensionEquals(row, "id", "4711");
+ assertDimensionEquals(row, "isValid", "true");
+ assertDimensionEquals(row, "someOtherId", "4712");
+ assertDimensionEquals(row, "description", "description");
+ assertDimensionEquals(row, "eventType", CATEGORY_ONE.name());
+
+
+ assertEquals(47.11F, row.getFloatMetric("someFloatColumn"), 0.0);
+ assertEquals(815.0F, row.getFloatMetric("someIntColumn"), 0.0);
+ assertEquals(816.0F, row.getFloatMetric("someLongColumn"), 0.0);
+
+ }
+
+ private void assertDimensionEquals(InputRow row, String dimension, Object expected) {
+ List values = row.getDimension(dimension);
+ assertEquals(1, values.size());
+ assertEquals(expected, values.get(0));
+ }
+}
diff --git a/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoTestEventWrapper.java b/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoTestEventWrapper.java
new file mode 100644
index 00000000000..307d8a93a9c
--- /dev/null
+++ b/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoTestEventWrapper.java
@@ -0,0 +1,1375 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: ProtoTest.proto
+
+package com.metamx.druid.indexer.data;
+
+public final class ProtoTestEventWrapper {
+ private ProtoTestEventWrapper() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ }
+ public interface ProtoTestEventOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // required .prototest.ProtoTestEvent.EventCategory eventType = 1;
+ /**
+ * required .prototest.ProtoTestEvent.EventCategory eventType = 1;
+ */
+ boolean hasEventType();
+ /**
+ * required .prototest.ProtoTestEvent.EventCategory eventType = 1;
+ */
+ com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory getEventType();
+
+ // required uint64 id = 2;
+ /**
+ * required uint64 id = 2;
+ */
+ boolean hasId();
+ /**
+ * required uint64 id = 2;
+ */
+ long getId();
+
+ // required string timestamp = 3;
+ /**
+ * required string timestamp = 3;
+ */
+ boolean hasTimestamp();
+ /**
+ * required string timestamp = 3;
+ */
+ java.lang.String getTimestamp();
+ /**
+ * required string timestamp = 3;
+ */
+ com.google.protobuf.ByteString
+ getTimestampBytes();
+
+ // optional uint32 someOtherId = 4;
+ /**
+ * optional uint32 someOtherId = 4;
+ */
+ boolean hasSomeOtherId();
+ /**
+ * optional uint32 someOtherId = 4;
+ */
+ int getSomeOtherId();
+
+ // optional bool isValid = 5;
+ /**
+ * optional bool isValid = 5;
+ */
+ boolean hasIsValid();
+ /**
+ * optional bool isValid = 5;
+ */
+ boolean getIsValid();
+
+ // optional string description = 6;
+ /**
+ * optional string description = 6;
+ */
+ boolean hasDescription();
+ /**
+ * optional string description = 6;
+ */
+ java.lang.String getDescription();
+ /**
+ * optional string description = 6;
+ */
+ com.google.protobuf.ByteString
+ getDescriptionBytes();
+
+ // optional float someFloatColumn = 7;
+ /**
+ * optional float someFloatColumn = 7;
+ */
+ boolean hasSomeFloatColumn();
+ /**
+ * optional float someFloatColumn = 7;
+ */
+ float getSomeFloatColumn();
+
+ // optional uint32 someIntColumn = 8;
+ /**
+ * optional uint32 someIntColumn = 8;
+ */
+ boolean hasSomeIntColumn();
+ /**
+ * optional uint32 someIntColumn = 8;
+ */
+ int getSomeIntColumn();
+
+ // optional uint64 someLongColumn = 9;
+ /**
+ * optional uint64 someLongColumn = 9;
+ */
+ boolean hasSomeLongColumn();
+ /**
+ * optional uint64 someLongColumn = 9;
+ */
+ long getSomeLongColumn();
+ }
+ /**
+ * Protobuf type {@code prototest.ProtoTestEvent}
+ */
+ public static final class ProtoTestEvent extends
+ com.google.protobuf.GeneratedMessage
+ implements ProtoTestEventOrBuilder {
+ // Use ProtoTestEvent.newBuilder() to construct.
+ private ProtoTestEvent(com.google.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private ProtoTestEvent(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final ProtoTestEvent defaultInstance;
+ public static ProtoTestEvent getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public ProtoTestEvent getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private ProtoTestEvent(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 8: {
+ int rawValue = input.readEnum();
+ com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory value = com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.valueOf(rawValue);
+ if (value == null) {
+ unknownFields.mergeVarintField(1, rawValue);
+ } else {
+ bitField0_ |= 0x00000001;
+ eventType_ = value;
+ }
+ break;
+ }
+ case 16: {
+ bitField0_ |= 0x00000002;
+ id_ = input.readUInt64();
+ break;
+ }
+ case 26: {
+ bitField0_ |= 0x00000004;
+ timestamp_ = input.readBytes();
+ break;
+ }
+ case 32: {
+ bitField0_ |= 0x00000008;
+ someOtherId_ = input.readUInt32();
+ break;
+ }
+ case 40: {
+ bitField0_ |= 0x00000010;
+ isValid_ = input.readBool();
+ break;
+ }
+ case 50: {
+ bitField0_ |= 0x00000020;
+ description_ = input.readBytes();
+ break;
+ }
+ case 61: {
+ bitField0_ |= 0x00000040;
+ someFloatColumn_ = input.readFloat();
+ break;
+ }
+ case 64: {
+ bitField0_ |= 0x00000080;
+ someIntColumn_ = input.readUInt32();
+ break;
+ }
+ case 72: {
+ bitField0_ |= 0x00000100;
+ someLongColumn_ = input.readUInt64();
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return com.metamx.druid.indexer.data.ProtoTestEventWrapper.internal_static_prototest_ProtoTestEvent_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return com.metamx.druid.indexer.data.ProtoTestEventWrapper.internal_static_prototest_ProtoTestEvent_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.class, com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser PARSER =
+ new com.google.protobuf.AbstractParser() {
+ public ProtoTestEvent parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new ProtoTestEvent(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ /**
+ * Protobuf enum {@code prototest.ProtoTestEvent.EventCategory}
+ */
+ public enum EventCategory
+ implements com.google.protobuf.ProtocolMessageEnum {
+ /**
+ * CATEGORY_ZERO = 0;
+ */
+ CATEGORY_ZERO(0, 0),
+ /**
+ * CATEGORY_ONE = 1;
+ */
+ CATEGORY_ONE(1, 1),
+ /**
+ * CATEGORY_TWO = 2;
+ */
+ CATEGORY_TWO(2, 2),
+ ;
+
+ /**
+ * CATEGORY_ZERO = 0;
+ */
+ public static final int CATEGORY_ZERO_VALUE = 0;
+ /**
+ * CATEGORY_ONE = 1;
+ */
+ public static final int CATEGORY_ONE_VALUE = 1;
+ /**
+ * CATEGORY_TWO = 2;
+ */
+ public static final int CATEGORY_TWO_VALUE = 2;
+
+
+ public final int getNumber() { return value; }
+
+ public static EventCategory valueOf(int value) {
+ switch (value) {
+ case 0: return CATEGORY_ZERO;
+ case 1: return CATEGORY_ONE;
+ case 2: return CATEGORY_TWO;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap() {
+ public EventCategory findValueByNumber(int number) {
+ return EventCategory.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.getDescriptor().getEnumTypes().get(0);
+ }
+
+ private static final EventCategory[] VALUES = values();
+
+ public static EventCategory valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private EventCategory(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:prototest.ProtoTestEvent.EventCategory)
+ }
+
+ private int bitField0_;
+ // required .prototest.ProtoTestEvent.EventCategory eventType = 1;
+ public static final int EVENTTYPE_FIELD_NUMBER = 1;
+ private com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory eventType_;
+ /**
+ * required .prototest.ProtoTestEvent.EventCategory eventType = 1;
+ */
+ public boolean hasEventType() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required .prototest.ProtoTestEvent.EventCategory eventType = 1;
+ */
+ public com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory getEventType() {
+ return eventType_;
+ }
+
+ // required uint64 id = 2;
+ public static final int ID_FIELD_NUMBER = 2;
+ private long id_;
+ /**
+ * required uint64 id = 2;
+ */
+ public boolean hasId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * required uint64 id = 2;
+ */
+ public long getId() {
+ return id_;
+ }
+
+ // required string timestamp = 3;
+ public static final int TIMESTAMP_FIELD_NUMBER = 3;
+ private java.lang.Object timestamp_;
+ /**
+ * required string timestamp = 3;
+ */
+ public boolean hasTimestamp() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * required string timestamp = 3;
+ */
+ public java.lang.String getTimestamp() {
+ java.lang.Object ref = timestamp_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ timestamp_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * required string timestamp = 3;
+ */
+ public com.google.protobuf.ByteString
+ getTimestampBytes() {
+ java.lang.Object ref = timestamp_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ timestamp_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // optional uint32 someOtherId = 4;
+ public static final int SOMEOTHERID_FIELD_NUMBER = 4;
+ private int someOtherId_;
+ /**
+ * optional uint32 someOtherId = 4;
+ */
+ public boolean hasSomeOtherId() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * optional uint32 someOtherId = 4;
+ */
+ public int getSomeOtherId() {
+ return someOtherId_;
+ }
+
+ // optional bool isValid = 5;
+ public static final int ISVALID_FIELD_NUMBER = 5;
+ private boolean isValid_;
+ /**
+ * optional bool isValid = 5;
+ */
+ public boolean hasIsValid() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * optional bool isValid = 5;
+ */
+ public boolean getIsValid() {
+ return isValid_;
+ }
+
+ // optional string description = 6;
+ public static final int DESCRIPTION_FIELD_NUMBER = 6;
+ private java.lang.Object description_;
+ /**
+ * optional string description = 6;
+ */
+ public boolean hasDescription() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * optional string description = 6;
+ */
+ public java.lang.String getDescription() {
+ java.lang.Object ref = description_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ description_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * optional string description = 6;
+ */
+ public com.google.protobuf.ByteString
+ getDescriptionBytes() {
+ java.lang.Object ref = description_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ description_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // optional float someFloatColumn = 7;
+ public static final int SOMEFLOATCOLUMN_FIELD_NUMBER = 7;
+ private float someFloatColumn_;
+ /**
+ * optional float someFloatColumn = 7;
+ */
+ public boolean hasSomeFloatColumn() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * optional float someFloatColumn = 7;
+ */
+ public float getSomeFloatColumn() {
+ return someFloatColumn_;
+ }
+
+ // optional uint32 someIntColumn = 8;
+ public static final int SOMEINTCOLUMN_FIELD_NUMBER = 8;
+ private int someIntColumn_;
+ /**
+ * optional uint32 someIntColumn = 8;
+ */
+ public boolean hasSomeIntColumn() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * optional uint32 someIntColumn = 8;
+ */
+ public int getSomeIntColumn() {
+ return someIntColumn_;
+ }
+
+ // optional uint64 someLongColumn = 9;
+ public static final int SOMELONGCOLUMN_FIELD_NUMBER = 9;
+ private long someLongColumn_;
+ /**
+ * optional uint64 someLongColumn = 9;
+ */
+ public boolean hasSomeLongColumn() {
+ return ((bitField0_ & 0x00000100) == 0x00000100);
+ }
+ /**
+ * optional uint64 someLongColumn = 9;
+ */
+ public long getSomeLongColumn() {
+ return someLongColumn_;
+ }
+
+ private void initFields() {
+ eventType_ = com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ZERO;
+ id_ = 0L;
+ timestamp_ = "";
+ someOtherId_ = 0;
+ isValid_ = false;
+ description_ = "";
+ someFloatColumn_ = 0F;
+ someIntColumn_ = 0;
+ someLongColumn_ = 0L;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasEventType()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasTimestamp()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeEnum(1, eventType_.getNumber());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeUInt64(2, id_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBytes(3, getTimestampBytes());
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeUInt32(4, someOtherId_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBool(5, isValid_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeBytes(6, getDescriptionBytes());
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeFloat(7, someFloatColumn_);
+ }
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ output.writeUInt32(8, someIntColumn_);
+ }
+ if (((bitField0_ & 0x00000100) == 0x00000100)) {
+ output.writeUInt64(9, someLongColumn_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeEnumSize(1, eventType_.getNumber());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt64Size(2, id_);
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(3, getTimestampBytes());
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(4, someOtherId_);
+ }
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(5, isValid_);
+ }
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(6, getDescriptionBytes());
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeFloatSize(7, someFloatColumn_);
+ }
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(8, someIntColumn_);
+ }
+ if (((bitField0_ & 0x00000100) == 0x00000100)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt64Size(9, someLongColumn_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code prototest.ProtoTestEvent}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEventOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return com.metamx.druid.indexer.data.ProtoTestEventWrapper.internal_static_prototest_ProtoTestEvent_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return com.metamx.druid.indexer.data.ProtoTestEventWrapper.internal_static_prototest_ProtoTestEvent_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.class, com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.Builder.class);
+ }
+
+ // Construct using com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ eventType_ = com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ZERO;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ id_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ timestamp_ = "";
+ bitField0_ = (bitField0_ & ~0x00000004);
+ someOtherId_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ isValid_ = false;
+ bitField0_ = (bitField0_ & ~0x00000010);
+ description_ = "";
+ bitField0_ = (bitField0_ & ~0x00000020);
+ someFloatColumn_ = 0F;
+ bitField0_ = (bitField0_ & ~0x00000040);
+ someIntColumn_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000080);
+ someLongColumn_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000100);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return com.metamx.druid.indexer.data.ProtoTestEventWrapper.internal_static_prototest_ProtoTestEvent_descriptor;
+ }
+
+ public com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent getDefaultInstanceForType() {
+ return com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.getDefaultInstance();
+ }
+
+ public com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent build() {
+ com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent buildPartial() {
+ com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent result = new com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.eventType_ = eventType_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.id_ = id_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.timestamp_ = timestamp_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.someOtherId_ = someOtherId_;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.isValid_ = isValid_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.description_ = description_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.someFloatColumn_ = someFloatColumn_;
+ if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+ to_bitField0_ |= 0x00000080;
+ }
+ result.someIntColumn_ = someIntColumn_;
+ if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+ to_bitField0_ |= 0x00000100;
+ }
+ result.someLongColumn_ = someLongColumn_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent) {
+ return mergeFrom((com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent other) {
+ if (other == com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.getDefaultInstance()) return this;
+ if (other.hasEventType()) {
+ setEventType(other.getEventType());
+ }
+ if (other.hasId()) {
+ setId(other.getId());
+ }
+ if (other.hasTimestamp()) {
+ bitField0_ |= 0x00000004;
+ timestamp_ = other.timestamp_;
+ onChanged();
+ }
+ if (other.hasSomeOtherId()) {
+ setSomeOtherId(other.getSomeOtherId());
+ }
+ if (other.hasIsValid()) {
+ setIsValid(other.getIsValid());
+ }
+ if (other.hasDescription()) {
+ bitField0_ |= 0x00000020;
+ description_ = other.description_;
+ onChanged();
+ }
+ if (other.hasSomeFloatColumn()) {
+ setSomeFloatColumn(other.getSomeFloatColumn());
+ }
+ if (other.hasSomeIntColumn()) {
+ setSomeIntColumn(other.getSomeIntColumn());
+ }
+ if (other.hasSomeLongColumn()) {
+ setSomeLongColumn(other.getSomeLongColumn());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasEventType()) {
+
+ return false;
+ }
+ if (!hasId()) {
+
+ return false;
+ }
+ if (!hasTimestamp()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // required .prototest.ProtoTestEvent.EventCategory eventType = 1;
+ private com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory eventType_ = com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ZERO;
+ /**
+ * required .prototest.ProtoTestEvent.EventCategory eventType = 1;
+ */
+ public boolean hasEventType() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required .prototest.ProtoTestEvent.EventCategory eventType = 1;
+ */
+ public com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory getEventType() {
+ return eventType_;
+ }
+ /**
+ * required .prototest.ProtoTestEvent.EventCategory eventType = 1;
+ */
+ public Builder setEventType(com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ eventType_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required .prototest.ProtoTestEvent.EventCategory eventType = 1;
+ */
+ public Builder clearEventType() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ eventType_ = com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ZERO;
+ onChanged();
+ return this;
+ }
+
+ // required uint64 id = 2;
+ private long id_ ;
+ /**
+ * required uint64 id = 2;
+ */
+ public boolean hasId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * required uint64 id = 2;
+ */
+ public long getId() {
+ return id_;
+ }
+ /**
+ * required uint64 id = 2;
+ */
+ public Builder setId(long value) {
+ bitField0_ |= 0x00000002;
+ id_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required uint64 id = 2;
+ */
+ public Builder clearId() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ id_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // required string timestamp = 3;
+ private java.lang.Object timestamp_ = "";
+ /**
+ * required string timestamp = 3;
+ */
+ public boolean hasTimestamp() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * required string timestamp = 3;
+ */
+ public java.lang.String getTimestamp() {
+ java.lang.Object ref = timestamp_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ timestamp_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * required string timestamp = 3;
+ */
+ public com.google.protobuf.ByteString
+ getTimestampBytes() {
+ java.lang.Object ref = timestamp_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ timestamp_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * required string timestamp = 3;
+ */
+ public Builder setTimestamp(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ timestamp_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required string timestamp = 3;
+ */
+ public Builder clearTimestamp() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ timestamp_ = getDefaultInstance().getTimestamp();
+ onChanged();
+ return this;
+ }
+ /**
+ * required string timestamp = 3;
+ */
+ public Builder setTimestampBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ timestamp_ = value;
+ onChanged();
+ return this;
+ }
+
+ // optional uint32 someOtherId = 4;
+ private int someOtherId_ ;
+ /**
+ * optional uint32 someOtherId = 4;
+ */
+ public boolean hasSomeOtherId() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * optional uint32 someOtherId = 4;
+ */
+ public int getSomeOtherId() {
+ return someOtherId_;
+ }
+ /**
+ * optional uint32 someOtherId = 4;
+ */
+ public Builder setSomeOtherId(int value) {
+ bitField0_ |= 0x00000008;
+ someOtherId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional uint32 someOtherId = 4;
+ */
+ public Builder clearSomeOtherId() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ someOtherId_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // optional bool isValid = 5;
+ private boolean isValid_ ;
+ /**
+ * optional bool isValid = 5;
+ */
+ public boolean hasIsValid() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * optional bool isValid = 5;
+ */
+ public boolean getIsValid() {
+ return isValid_;
+ }
+ /**
+ * optional bool isValid = 5;
+ */
+ public Builder setIsValid(boolean value) {
+ bitField0_ |= 0x00000010;
+ isValid_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional bool isValid = 5;
+ */
+ public Builder clearIsValid() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ isValid_ = false;
+ onChanged();
+ return this;
+ }
+
+ // optional string description = 6;
+ private java.lang.Object description_ = "";
+ /**
+ * optional string description = 6;
+ */
+ public boolean hasDescription() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * optional string description = 6;
+ */
+ public java.lang.String getDescription() {
+ java.lang.Object ref = description_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ description_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * optional string description = 6;
+ */
+ public com.google.protobuf.ByteString
+ getDescriptionBytes() {
+ java.lang.Object ref = description_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ description_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * optional string description = 6;
+ */
+ public Builder setDescription(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000020;
+ description_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string description = 6;
+ */
+ public Builder clearDescription() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ description_ = getDefaultInstance().getDescription();
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string description = 6;
+ */
+ public Builder setDescriptionBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000020;
+ description_ = value;
+ onChanged();
+ return this;
+ }
+
+ // optional float someFloatColumn = 7;
+ private float someFloatColumn_ ;
+ /**
+ * optional float someFloatColumn = 7;
+ */
+ public boolean hasSomeFloatColumn() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * optional float someFloatColumn = 7;
+ */
+ public float getSomeFloatColumn() {
+ return someFloatColumn_;
+ }
+ /**
+ * optional float someFloatColumn = 7;
+ */
+ public Builder setSomeFloatColumn(float value) {
+ bitField0_ |= 0x00000040;
+ someFloatColumn_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional float someFloatColumn = 7;
+ */
+ public Builder clearSomeFloatColumn() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ someFloatColumn_ = 0F;
+ onChanged();
+ return this;
+ }
+
+ // optional uint32 someIntColumn = 8;
+ private int someIntColumn_ ;
+ /**
+ * optional uint32 someIntColumn = 8;
+ */
+ public boolean hasSomeIntColumn() {
+ return ((bitField0_ & 0x00000080) == 0x00000080);
+ }
+ /**
+ * optional uint32 someIntColumn = 8;
+ */
+ public int getSomeIntColumn() {
+ return someIntColumn_;
+ }
+ /**
+ * optional uint32 someIntColumn = 8;
+ */
+ public Builder setSomeIntColumn(int value) {
+ bitField0_ |= 0x00000080;
+ someIntColumn_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional uint32 someIntColumn = 8;
+ */
+ public Builder clearSomeIntColumn() {
+ bitField0_ = (bitField0_ & ~0x00000080);
+ someIntColumn_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // optional uint64 someLongColumn = 9;
+ private long someLongColumn_ ;
+ /**
+ * optional uint64 someLongColumn = 9;
+ */
+ public boolean hasSomeLongColumn() {
+ return ((bitField0_ & 0x00000100) == 0x00000100);
+ }
+ /**
+ * optional uint64 someLongColumn = 9;
+ */
+ public long getSomeLongColumn() {
+ return someLongColumn_;
+ }
+ /**
+ * optional uint64 someLongColumn = 9;
+ */
+ public Builder setSomeLongColumn(long value) {
+ bitField0_ |= 0x00000100;
+ someLongColumn_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional uint64 someLongColumn = 9;
+ */
+ public Builder clearSomeLongColumn() {
+ bitField0_ = (bitField0_ & ~0x00000100);
+ someLongColumn_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:prototest.ProtoTestEvent)
+ }
+
+ static {
+ defaultInstance = new ProtoTestEvent(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:prototest.ProtoTestEvent)
+ }
+
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_prototest_ProtoTestEvent_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_prototest_ProtoTestEvent_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n\017ProtoTest.proto\022\tprototest\"\266\002\n\016ProtoTe" +
+ "stEvent\022:\n\teventType\030\001 \002(\0162\'.prototest.P" +
+ "rotoTestEvent.EventCategory\022\n\n\002id\030\002 \002(\004\022" +
+ "\021\n\ttimestamp\030\003 \002(\t\022\023\n\013someOtherId\030\004 \001(\r\022" +
+ "\017\n\007isValid\030\005 \001(\010\022\023\n\013description\030\006 \001(\t\022\027\n" +
+ "\017someFloatColumn\030\007 \001(\002\022\025\n\rsomeIntColumn\030" +
+ "\010 \001(\r\022\026\n\016someLongColumn\030\t \001(\004\"F\n\rEventCa" +
+ "tegory\022\021\n\rCATEGORY_ZERO\020\000\022\020\n\014CATEGORY_ON" +
+ "E\020\001\022\020\n\014CATEGORY_TWO\020\002B6\n\035com.metamx.drui" +
+ "d.indexer.dataB\025ProtoTestEventWrapper"
+ };
+ com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+ new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ public com.google.protobuf.ExtensionRegistry assignDescriptors(
+ com.google.protobuf.Descriptors.FileDescriptor root) {
+ descriptor = root;
+ internal_static_prototest_ProtoTestEvent_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_prototest_ProtoTestEvent_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_prototest_ProtoTestEvent_descriptor,
+ new java.lang.String[] { "EventType", "Id", "Timestamp", "SomeOtherId", "IsValid", "Description", "SomeFloatColumn", "SomeIntColumn", "SomeLongColumn", });
+ return null;
+ }
+ };
+ com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ }, assigner);
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/indexing-common/src/test/resources/ProtoTest.proto b/indexing-common/src/test/resources/ProtoTest.proto
new file mode 100644
index 00000000000..c7822f9ff1b
--- /dev/null
+++ b/indexing-common/src/test/resources/ProtoTest.proto
@@ -0,0 +1,31 @@
+
+package prototest;
+option java_package = "com.metamx.druid.indexer.data";
+option java_outer_classname = "ProtoTestEventWrapper";
+
+
+
+message ProtoTestEvent {
+
+
+enum EventCategory {
+ CATEGORY_ZERO = 0;
+ CATEGORY_ONE = 1;
+ CATEGORY_TWO = 2;
+}
+
+ required EventCategory eventType = 1;
+
+ required uint64 id = 2;
+ required string timestamp = 3;
+ optional uint32 someOtherId = 4;
+ optional bool isValid = 5;
+ optional string description = 6;
+
+ optional float someFloatColumn = 7;
+ optional uint32 someIntColumn = 8;
+ optional uint64 someLongColumn = 9;
+
+
+
+}
diff --git a/indexing-common/src/test/resources/prototest.desc b/indexing-common/src/test/resources/prototest.desc
new file mode 100644
index 00000000000..649ce5bcb8f
Binary files /dev/null and b/indexing-common/src/test/resources/prototest.desc differ
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java
index 1c189ed95e9..3c6d528d6a6 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java
@@ -19,22 +19,8 @@
package com.metamx.druid.realtime.firehose;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.metamx.common.exception.FormattedException;
-import com.metamx.common.logger.Logger;
-import com.metamx.druid.indexer.data.StringInputRowParser;
-import com.metamx.druid.input.InputRow;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.Message;
-import kafka.message.MessageAndMetadata;
-
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
@@ -43,122 +29,217 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.Message;
+import kafka.message.MessageAndMetadata;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.metamx.common.exception.FormattedException;
+import com.metamx.common.logger.Logger;
+import com.metamx.druid.indexer.data.InputRowParser;
+import com.metamx.druid.indexer.data.ProtoBufInputRowParser;
+import com.metamx.druid.indexer.data.StringInputRowParser;
+import com.metamx.druid.input.InputRow;
+
/**
*/
public class KafkaFirehoseFactory implements FirehoseFactory
{
- private static final Logger log = new Logger(KafkaFirehoseFactory.class);
+ private static final Logger log = new Logger(KafkaFirehoseFactory.class);
- @JsonProperty
- private final Properties consumerProps;
+ @JsonProperty
+ private final Properties consumerProps;
- @JsonProperty
- private final String feed;
+ @JsonProperty
+ private final String feed;
- @JsonProperty
- private final StringInputRowParser parser;
+ @JsonProperty
+ private final InputRowParser parser;
- @JsonCreator
- public KafkaFirehoseFactory(
- @JsonProperty("consumerProps") Properties consumerProps,
- @JsonProperty("feed") String feed,
- @JsonProperty("parser") StringInputRowParser parser
- )
- {
- this.consumerProps = consumerProps;
- this.feed = feed;
- this.parser = parser;
+ @JsonCreator
+ public KafkaFirehoseFactory(
+ @JsonProperty("consumerProps") Properties consumerProps,
+ @JsonProperty("feed") String feed,
+ @JsonProperty("parser") InputRowParser parser)
+ {
+ this.consumerProps = consumerProps;
+ this.feed = feed;
+ this.parser = parser;
- parser.addDimensionExclusion("feed");
- }
+ parser.addDimensionExclusion("feed");
+ }
- @Override
- public Firehose connect() throws IOException
- {
- final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
+ @Override
+ public Firehose connect() throws IOException
+ {
+ final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
- final Map>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
+ final Map>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
- final List> streamList = streams.get(feed);
- if (streamList == null || streamList.size() != 1) {
- return null;
- }
+ final List> streamList = streams.get(feed);
+ if (streamList == null || streamList.size() != 1)
+ {
+ return null;
+ }
- final KafkaStream stream = streamList.get(0);
+ final KafkaStream stream = streamList.get(0);
- return new Firehose()
- {
- Iterator> iter = stream.iterator();
- private CharBuffer chars = null;
+ if (parser instanceof StringInputRowParser)
+ {
- @Override
- public boolean hasMore()
- {
- return iter.hasNext();
- }
+ return new StringMessageFirehose(connector, stream);
+ } else if (parser instanceof ProtoBufInputRowParser)
+ {
+ return new ProtoBufMessageFirehose(stream, connector);
+ }
+ throw new RuntimeException("No Firehose for parser: " + parser.getClass().getName());
+ }
- @Override
- public InputRow nextRow() throws FormattedException
- {
- final Message message = iter.next().message();
+ private abstract static class AbstractKafkaFirehose implements Firehose
+ {
+ protected final ConsumerConnector connector;
+ protected final Iterator> iter;
- if (message == null) {
- return null;
- }
+ public AbstractKafkaFirehose(ConsumerConnector connector, KafkaStream stream)
+ {
+ iter = stream.iterator();
+ this.connector = connector;
+ }
- int payloadSize = message.payloadSize();
- if (chars == null || chars.remaining() < payloadSize) {
- chars = CharBuffer.allocate(payloadSize);
- }
+ @Override
+ public boolean hasMore()
+ {
+ return iter.hasNext();
+ }
- final CoderResult coderResult = Charsets.UTF_8.newDecoder()
- .onMalformedInput(CodingErrorAction.REPLACE)
- .onUnmappableCharacter(CodingErrorAction.REPLACE)
- .decode(message.payload(), chars, true);
+ @Override
+ public InputRow nextRow() throws FormattedException
+ {
+ final Message message = iter.next().message();
- if (coderResult.isUnderflow()) {
- chars.flip();
- try {
- return parser.parse(chars.toString());
- }
- finally {
- chars.clear();
- }
- }
- else {
- throw new FormattedException.Builder()
- .withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
- .withMessage(String.format("Failed with CoderResult[%s]", coderResult))
- .build();
- }
- }
+ if (message == null)
+ {
+ return null;
+ }
- @Override
- public Runnable commit()
- {
- return new Runnable()
- {
- @Override
- public void run()
- {
- /*
- This is actually not going to do exactly what we want, cause it will be called asynchronously
- after the persist is complete. So, it's going to commit that it's processed more than was actually
- persisted. This is unfortunate, but good enough for now. Should revisit along with an upgrade
- of our Kafka version.
- */
+ return parseMessage(message);
+ }
- log.info("committing offsets");
- connector.commitOffsets();
- }
- };
- }
+ protected abstract InputRow parseMessage(Message message);
- @Override
- public void close() throws IOException
- {
- connector.shutdown();
- }
- };
- }
+ @Override
+ public Runnable commit()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ /*
+ * This is actually not going to do exactly what we want, cause it
+ * will be called asynchronously after the persist is complete. So,
+ * it's going to commit that it's processed more than was actually
+ * persisted. This is unfortunate, but good enough for now. Should
+ * revisit along with an upgrade of our Kafka version.
+ */
+
+ log.info("committing offsets");
+ connector.commitOffsets();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ connector.shutdown();
+ }
+ }
+
+ private class StringMessageFirehose extends AbstractKafkaFirehose
+ {
+
+ private CharBuffer chars = null;
+
+ public StringMessageFirehose(ConsumerConnector connector, KafkaStream stream)
+ {
+ super(connector, stream);
+ }
+
+ @Override
+ public InputRow parseMessage(Message message) throws FormattedException
+ {
+
+ int payloadSize = message.payloadSize();
+ if (chars == null || chars.remaining() < payloadSize)
+ {
+ chars = CharBuffer.allocate(payloadSize);
+ }
+
+ final CoderResult coderResult = Charsets.UTF_8.newDecoder()
+ .onMalformedInput(CodingErrorAction.REPLACE)
+ .onUnmappableCharacter(CodingErrorAction.REPLACE)
+ .decode(message.payload(), chars, true);
+
+ if (coderResult.isUnderflow())
+ {
+ chars.flip();
+ try
+ {
+ return parser.parse(chars.toString());
+ } finally
+ {
+ chars.clear();
+ }
+ }
+ else
+ {
+ throw new FormattedException.Builder()
+ .withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
+ .withMessage(String.format("Failed with CoderResult[%s]", coderResult))
+ .build();
+ }
+ }
+
+ }
+
+ private class ProtoBufMessageFirehose extends AbstractKafkaFirehose
+ {
+
+ private ByteBuffer bytes = null;
+
+ public ProtoBufMessageFirehose(KafkaStream stream, ConsumerConnector connector)
+ {
+ super(connector, stream);
+ }
+
+ @Override
+ public InputRow parseMessage(Message message) throws FormattedException
+ {
+
+ int payloadSize = message.payloadSize();
+ if (bytes == null || bytes.remaining() < payloadSize)
+ {
+ bytes = ByteBuffer.allocate(payloadSize);
+ }
+
+ bytes.put(message.payload());
+
+ bytes.flip();
+ try
+ {
+ return parser.parse(bytes);
+ } finally
+ {
+ bytes.clear();
+ }
+ }
+
+ }
}