From 89b0c84f3b1124f62949e4987cd38b6d034dbc91 Mon Sep 17 00:00:00 2001 From: Jan Rudert Date: Fri, 12 Jul 2013 13:34:57 +0200 Subject: [PATCH] initial implementation of an protocol buffers firehose --- indexing-common/pom.xml | 6 + .../metamx/druid/indexer/data/DataSpec.java | 3 +- .../druid/indexer/data/InputRowParser.java | 7 + .../druid/indexer/data/ProtoBufDataSpec.java | 69 + .../indexer/data/ProtoBufInputRowParser.java | 108 ++ .../data/ProtoBufInputRowParserTest.java | 85 + .../indexer/data/ProtoTestEventWrapper.java | 1375 +++++++++++++++++ .../src/test/resources/ProtoTest.proto | 31 + .../src/test/resources/prototest.desc | Bin 0 -> 400 bytes .../firehose/KafkaFirehoseFactory.java | 301 ++-- 10 files changed, 1874 insertions(+), 111 deletions(-) create mode 100644 indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufDataSpec.java create mode 100644 indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufInputRowParser.java create mode 100644 indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoBufInputRowParserTest.java create mode 100644 indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoTestEventWrapper.java create mode 100644 indexing-common/src/test/resources/ProtoTest.proto create mode 100644 indexing-common/src/test/resources/prototest.desc diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml index fd727328c49..d09ea33bd64 100644 --- a/indexing-common/pom.xml +++ b/indexing-common/pom.xml @@ -79,6 +79,12 @@ commons-io commons-io + + com.google.protobuf + protobuf-java + 2.5.0 + + diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java index 099d0c8d535..36d942d8d62 100644 --- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java +++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java @@ -32,7 +32,8 @@ import java.util.List; @JsonSubTypes(value = { @JsonSubTypes.Type(name = "json", value = JSONDataSpec.class), @JsonSubTypes.Type(name = "csv", value = CSVDataSpec.class), - @JsonSubTypes.Type(name = "tsv", value = DelimitedDataSpec.class) + @JsonSubTypes.Type(name = "tsv", value = DelimitedDataSpec.class), + @JsonSubTypes.Type(name = "protobuf", value = ProtoBufDataSpec.class) }) public interface DataSpec { diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java index 82a3c947bc3..25957379d9a 100644 --- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java +++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java @@ -1,7 +1,14 @@ package com.metamx.druid.indexer.data; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.metamx.druid.input.InputRow; +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = StringInputRowParser.class) +@JsonSubTypes({ + @JsonSubTypes.Type(name = "string", value = StringInputRowParser.class), + @JsonSubTypes.Type(name = "protobuf", value = ProtoBufInputRowParser.class), +}) public interface InputRowParser { public InputRow parse(T input); diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufDataSpec.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufDataSpec.java new file mode 100644 index 00000000000..042b0603061 --- /dev/null +++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufDataSpec.java @@ -0,0 +1,69 @@ +package com.metamx.druid.indexer.data; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import com.metamx.common.parsers.Parser; +import com.metamx.druid.index.v1.SpatialDimensionSchema; + +/** + * @author jan.rudert + */ +public class ProtoBufDataSpec implements DataSpec{ + private final List dimensions; + private final List spatialDimensions; + private final String descriptorFileInClasspath; + + @JsonCreator + public ProtoBufDataSpec( + @JsonProperty("descriptor") String descriptorFileInClasspath, + @JsonProperty("dimensions") List dimensions, + @JsonProperty("spatialDimensions") List spatialDimensions + ) + { + this.descriptorFileInClasspath = descriptorFileInClasspath; + this.dimensions = dimensions; + this.spatialDimensions = (spatialDimensions == null) + ? Lists.newArrayList() + : spatialDimensions; + + } + + @JsonProperty("descriptor") + public String getDescriptorFileInClassPath() { + return descriptorFileInClasspath; + } + + @JsonProperty("dimensions") + @Override + public List getDimensions() + { + return dimensions; + } + + @JsonProperty("spatialDimensions") + @Override + public List getSpatialDimensions() + { + return spatialDimensions; + } + + @Override + public void verify(List usedCols) + { + } + + @Override + public boolean hasCustomDimensions() + { + return !(dimensions == null || dimensions.isEmpty()); + } + + @Override + public Parser getParser() + { + throw new UnsupportedOperationException("No String parser for protobuf data"); + } +} diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufInputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufInputRowParser.java new file mode 100644 index 00000000000..38ee99663e4 --- /dev/null +++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/ProtoBufInputRowParser.java @@ -0,0 +1,108 @@ +package com.metamx.druid.indexer.data; + +import static com.google.protobuf.DescriptorProtos.FileDescriptorSet; +import static com.google.protobuf.Descriptors.Descriptor; +import static com.google.protobuf.Descriptors.FileDescriptor; + +import java.io.InputStream; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.metamx.druid.input.InputRow; + +/** + * @author jan.rudert + */ +public class ProtoBufInputRowParser implements InputRowParser +{ + + private final MapInputRowParser inputRowCreator; + private final Descriptor descriptor; + + @JsonCreator + public ProtoBufInputRowParser( + @JsonProperty("timestampSpec") TimestampSpec timestampSpec, + @JsonProperty("data") ProtoBufDataSpec dataSpec, + @JsonProperty("dimensionExclusions") List dimensionExclusions) + { + + descriptor = getDescriptor(dataSpec.getDescriptorFileInClassPath()); + + + this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec, dimensionExclusions); + + } + + @Override + public InputRow parse(byte[] input) + { + + Map theMap = buildStringKeyMap(input); + + return inputRowCreator.parse(theMap); + } + + private Map buildStringKeyMap(byte[] input) + { + Map theMap = Maps.newHashMap(); + + try + { + DynamicMessage message = DynamicMessage.parseFrom(descriptor, input); + Map allFields = message.getAllFields(); + + for (Map.Entry entry : allFields.entrySet()) + { + String name = entry.getKey().getName(); + if (theMap.containsKey(name)) + { + continue; + // TODO + // throw new RuntimeException("dupicate key " + name + " in " + + // message); + } + Object value = entry.getValue(); + if(value instanceof Descriptors.EnumValueDescriptor) { + Descriptors.EnumValueDescriptor desc = (Descriptors.EnumValueDescriptor) value; + value = desc.getName(); + } + + theMap.put(name, value); + } + + } catch (InvalidProtocolBufferException e) + { + // TODO + e.printStackTrace(); + } + return theMap; + } + + private Descriptor getDescriptor(String descriptorFileInClassPath) + { + try + { + InputStream fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFileInClassPath); + FileDescriptorSet set = FileDescriptorSet.parseFrom(fin); + FileDescriptor file = FileDescriptor.buildFrom(set.getFile(0), new FileDescriptor[] + {}); + return file.getMessageTypes().get(0); + } catch (Exception e) + { + throw Throwables.propagate(e); + } + } + + @Override + public void addDimensionExclusion(String dimension) + { + inputRowCreator.addDimensionExclusion(dimension); + } +} diff --git a/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoBufInputRowParserTest.java b/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoBufInputRowParserTest.java new file mode 100644 index 00000000000..c6116af3946 --- /dev/null +++ b/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoBufInputRowParserTest.java @@ -0,0 +1,85 @@ +package com.metamx.druid.indexer.data; + +import static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE; +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.util.Arrays; +import java.util.List; + +import org.joda.time.DateTime; +import org.junit.Test; + +import com.metamx.druid.input.InputRow; + +/** + * @author jan.rudert + */ +public class ProtoBufInputRowParserTest { + + public static final String[] DIMENSIONS = new String[]{"eventType", "id", "someOtherId", "isValid"}; + + /* + eventType = 1; + + required uint64 id = 2; + required string timestamp = 3; + optional uint32 someOtherId = 4; + optional bool isValid = 5; + optional string description = 6; + + optional float someFloatColumn = 7; + optional uint32 someIntColumn = 8; + optional uint64 someLongColumn = 9; + */ + + @Test + public void testParse() throws Exception { + + //configure pares with desc file + ProtoBufInputRowParser parser = new ProtoBufInputRowParser(new TimestampSpec("timestamp", "iso"), + new ProtoBufDataSpec("prototest.desc", Arrays.asList(DIMENSIONS), null), + Arrays.asList()); + + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 07, 12, 9, 30); + ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + .setDescription("description") + .setEventType(CATEGORY_ONE) + .setId(4711L) + .setIsValid(true) + .setSomeOtherId(4712) + .setTimestamp(dateTime.toString()) + .setSomeFloatColumn(47.11F) + .setSomeIntColumn(815) + .setSomeLongColumn(816L) + .build(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + event.writeTo(out); + + InputRow row = parser.parse(out.toByteArray()); + System.out.println(row); + assertEquals(Arrays.asList(DIMENSIONS), row.getDimensions()); + assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "id", "4711"); + assertDimensionEquals(row, "isValid", "true"); + assertDimensionEquals(row, "someOtherId", "4712"); + assertDimensionEquals(row, "description", "description"); + assertDimensionEquals(row, "eventType", CATEGORY_ONE.name()); + + + assertEquals(47.11F, row.getFloatMetric("someFloatColumn"), 0.0); + assertEquals(815.0F, row.getFloatMetric("someIntColumn"), 0.0); + assertEquals(816.0F, row.getFloatMetric("someLongColumn"), 0.0); + + } + + private void assertDimensionEquals(InputRow row, String dimension, Object expected) { + List values = row.getDimension(dimension); + assertEquals(1, values.size()); + assertEquals(expected, values.get(0)); + } +} diff --git a/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoTestEventWrapper.java b/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoTestEventWrapper.java new file mode 100644 index 00000000000..307d8a93a9c --- /dev/null +++ b/indexing-common/src/test/java/com/metamx/druid/indexer/data/ProtoTestEventWrapper.java @@ -0,0 +1,1375 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: ProtoTest.proto + +package com.metamx.druid.indexer.data; + +public final class ProtoTestEventWrapper { + private ProtoTestEventWrapper() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface ProtoTestEventOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required .prototest.ProtoTestEvent.EventCategory eventType = 1; + /** + * required .prototest.ProtoTestEvent.EventCategory eventType = 1; + */ + boolean hasEventType(); + /** + * required .prototest.ProtoTestEvent.EventCategory eventType = 1; + */ + com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory getEventType(); + + // required uint64 id = 2; + /** + * required uint64 id = 2; + */ + boolean hasId(); + /** + * required uint64 id = 2; + */ + long getId(); + + // required string timestamp = 3; + /** + * required string timestamp = 3; + */ + boolean hasTimestamp(); + /** + * required string timestamp = 3; + */ + java.lang.String getTimestamp(); + /** + * required string timestamp = 3; + */ + com.google.protobuf.ByteString + getTimestampBytes(); + + // optional uint32 someOtherId = 4; + /** + * optional uint32 someOtherId = 4; + */ + boolean hasSomeOtherId(); + /** + * optional uint32 someOtherId = 4; + */ + int getSomeOtherId(); + + // optional bool isValid = 5; + /** + * optional bool isValid = 5; + */ + boolean hasIsValid(); + /** + * optional bool isValid = 5; + */ + boolean getIsValid(); + + // optional string description = 6; + /** + * optional string description = 6; + */ + boolean hasDescription(); + /** + * optional string description = 6; + */ + java.lang.String getDescription(); + /** + * optional string description = 6; + */ + com.google.protobuf.ByteString + getDescriptionBytes(); + + // optional float someFloatColumn = 7; + /** + * optional float someFloatColumn = 7; + */ + boolean hasSomeFloatColumn(); + /** + * optional float someFloatColumn = 7; + */ + float getSomeFloatColumn(); + + // optional uint32 someIntColumn = 8; + /** + * optional uint32 someIntColumn = 8; + */ + boolean hasSomeIntColumn(); + /** + * optional uint32 someIntColumn = 8; + */ + int getSomeIntColumn(); + + // optional uint64 someLongColumn = 9; + /** + * optional uint64 someLongColumn = 9; + */ + boolean hasSomeLongColumn(); + /** + * optional uint64 someLongColumn = 9; + */ + long getSomeLongColumn(); + } + /** + * Protobuf type {@code prototest.ProtoTestEvent} + */ + public static final class ProtoTestEvent extends + com.google.protobuf.GeneratedMessage + implements ProtoTestEventOrBuilder { + // Use ProtoTestEvent.newBuilder() to construct. + private ProtoTestEvent(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private ProtoTestEvent(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final ProtoTestEvent defaultInstance; + public static ProtoTestEvent getDefaultInstance() { + return defaultInstance; + } + + public ProtoTestEvent getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private ProtoTestEvent( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + int rawValue = input.readEnum(); + com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory value = com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(1, rawValue); + } else { + bitField0_ |= 0x00000001; + eventType_ = value; + } + break; + } + case 16: { + bitField0_ |= 0x00000002; + id_ = input.readUInt64(); + break; + } + case 26: { + bitField0_ |= 0x00000004; + timestamp_ = input.readBytes(); + break; + } + case 32: { + bitField0_ |= 0x00000008; + someOtherId_ = input.readUInt32(); + break; + } + case 40: { + bitField0_ |= 0x00000010; + isValid_ = input.readBool(); + break; + } + case 50: { + bitField0_ |= 0x00000020; + description_ = input.readBytes(); + break; + } + case 61: { + bitField0_ |= 0x00000040; + someFloatColumn_ = input.readFloat(); + break; + } + case 64: { + bitField0_ |= 0x00000080; + someIntColumn_ = input.readUInt32(); + break; + } + case 72: { + bitField0_ |= 0x00000100; + someLongColumn_ = input.readUInt64(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return com.metamx.druid.indexer.data.ProtoTestEventWrapper.internal_static_prototest_ProtoTestEvent_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return com.metamx.druid.indexer.data.ProtoTestEventWrapper.internal_static_prototest_ProtoTestEvent_fieldAccessorTable + .ensureFieldAccessorsInitialized( + com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.class, com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public ProtoTestEvent parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new ProtoTestEvent(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + /** + * Protobuf enum {@code prototest.ProtoTestEvent.EventCategory} + */ + public enum EventCategory + implements com.google.protobuf.ProtocolMessageEnum { + /** + * CATEGORY_ZERO = 0; + */ + CATEGORY_ZERO(0, 0), + /** + * CATEGORY_ONE = 1; + */ + CATEGORY_ONE(1, 1), + /** + * CATEGORY_TWO = 2; + */ + CATEGORY_TWO(2, 2), + ; + + /** + * CATEGORY_ZERO = 0; + */ + public static final int CATEGORY_ZERO_VALUE = 0; + /** + * CATEGORY_ONE = 1; + */ + public static final int CATEGORY_ONE_VALUE = 1; + /** + * CATEGORY_TWO = 2; + */ + public static final int CATEGORY_TWO_VALUE = 2; + + + public final int getNumber() { return value; } + + public static EventCategory valueOf(int value) { + switch (value) { + case 0: return CATEGORY_ZERO; + case 1: return CATEGORY_ONE; + case 2: return CATEGORY_TWO; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public EventCategory findValueByNumber(int number) { + return EventCategory.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.getDescriptor().getEnumTypes().get(0); + } + + private static final EventCategory[] VALUES = values(); + + public static EventCategory valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private EventCategory(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:prototest.ProtoTestEvent.EventCategory) + } + + private int bitField0_; + // required .prototest.ProtoTestEvent.EventCategory eventType = 1; + public static final int EVENTTYPE_FIELD_NUMBER = 1; + private com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory eventType_; + /** + * required .prototest.ProtoTestEvent.EventCategory eventType = 1; + */ + public boolean hasEventType() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required .prototest.ProtoTestEvent.EventCategory eventType = 1; + */ + public com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory getEventType() { + return eventType_; + } + + // required uint64 id = 2; + public static final int ID_FIELD_NUMBER = 2; + private long id_; + /** + * required uint64 id = 2; + */ + public boolean hasId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required uint64 id = 2; + */ + public long getId() { + return id_; + } + + // required string timestamp = 3; + public static final int TIMESTAMP_FIELD_NUMBER = 3; + private java.lang.Object timestamp_; + /** + * required string timestamp = 3; + */ + public boolean hasTimestamp() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * required string timestamp = 3; + */ + public java.lang.String getTimestamp() { + java.lang.Object ref = timestamp_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + timestamp_ = s; + } + return s; + } + } + /** + * required string timestamp = 3; + */ + public com.google.protobuf.ByteString + getTimestampBytes() { + java.lang.Object ref = timestamp_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + timestamp_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional uint32 someOtherId = 4; + public static final int SOMEOTHERID_FIELD_NUMBER = 4; + private int someOtherId_; + /** + * optional uint32 someOtherId = 4; + */ + public boolean hasSomeOtherId() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional uint32 someOtherId = 4; + */ + public int getSomeOtherId() { + return someOtherId_; + } + + // optional bool isValid = 5; + public static final int ISVALID_FIELD_NUMBER = 5; + private boolean isValid_; + /** + * optional bool isValid = 5; + */ + public boolean hasIsValid() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bool isValid = 5; + */ + public boolean getIsValid() { + return isValid_; + } + + // optional string description = 6; + public static final int DESCRIPTION_FIELD_NUMBER = 6; + private java.lang.Object description_; + /** + * optional string description = 6; + */ + public boolean hasDescription() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional string description = 6; + */ + public java.lang.String getDescription() { + java.lang.Object ref = description_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + description_ = s; + } + return s; + } + } + /** + * optional string description = 6; + */ + public com.google.protobuf.ByteString + getDescriptionBytes() { + java.lang.Object ref = description_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + description_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional float someFloatColumn = 7; + public static final int SOMEFLOATCOLUMN_FIELD_NUMBER = 7; + private float someFloatColumn_; + /** + * optional float someFloatColumn = 7; + */ + public boolean hasSomeFloatColumn() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional float someFloatColumn = 7; + */ + public float getSomeFloatColumn() { + return someFloatColumn_; + } + + // optional uint32 someIntColumn = 8; + public static final int SOMEINTCOLUMN_FIELD_NUMBER = 8; + private int someIntColumn_; + /** + * optional uint32 someIntColumn = 8; + */ + public boolean hasSomeIntColumn() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional uint32 someIntColumn = 8; + */ + public int getSomeIntColumn() { + return someIntColumn_; + } + + // optional uint64 someLongColumn = 9; + public static final int SOMELONGCOLUMN_FIELD_NUMBER = 9; + private long someLongColumn_; + /** + * optional uint64 someLongColumn = 9; + */ + public boolean hasSomeLongColumn() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional uint64 someLongColumn = 9; + */ + public long getSomeLongColumn() { + return someLongColumn_; + } + + private void initFields() { + eventType_ = com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ZERO; + id_ = 0L; + timestamp_ = ""; + someOtherId_ = 0; + isValid_ = false; + description_ = ""; + someFloatColumn_ = 0F; + someIntColumn_ = 0; + someLongColumn_ = 0L; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasEventType()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasTimestamp()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeEnum(1, eventType_.getNumber()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeUInt64(2, id_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBytes(3, getTimestampBytes()); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeUInt32(4, someOtherId_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBool(5, isValid_); + } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeBytes(6, getDescriptionBytes()); + } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeFloat(7, someFloatColumn_); + } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeUInt32(8, someIntColumn_); + } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + output.writeUInt64(9, someLongColumn_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(1, eventType_.getNumber()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(2, id_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(3, getTimestampBytes()); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(4, someOtherId_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(5, isValid_); + } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(6, getDescriptionBytes()); + } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeFloatSize(7, someFloatColumn_); + } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(8, someIntColumn_); + } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(9, someLongColumn_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code prototest.ProtoTestEvent} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEventOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return com.metamx.druid.indexer.data.ProtoTestEventWrapper.internal_static_prototest_ProtoTestEvent_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return com.metamx.druid.indexer.data.ProtoTestEventWrapper.internal_static_prototest_ProtoTestEvent_fieldAccessorTable + .ensureFieldAccessorsInitialized( + com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.class, com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.Builder.class); + } + + // Construct using com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + eventType_ = com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ZERO; + bitField0_ = (bitField0_ & ~0x00000001); + id_ = 0L; + bitField0_ = (bitField0_ & ~0x00000002); + timestamp_ = ""; + bitField0_ = (bitField0_ & ~0x00000004); + someOtherId_ = 0; + bitField0_ = (bitField0_ & ~0x00000008); + isValid_ = false; + bitField0_ = (bitField0_ & ~0x00000010); + description_ = ""; + bitField0_ = (bitField0_ & ~0x00000020); + someFloatColumn_ = 0F; + bitField0_ = (bitField0_ & ~0x00000040); + someIntColumn_ = 0; + bitField0_ = (bitField0_ & ~0x00000080); + someLongColumn_ = 0L; + bitField0_ = (bitField0_ & ~0x00000100); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return com.metamx.druid.indexer.data.ProtoTestEventWrapper.internal_static_prototest_ProtoTestEvent_descriptor; + } + + public com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent getDefaultInstanceForType() { + return com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.getDefaultInstance(); + } + + public com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent build() { + com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent buildPartial() { + com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent result = new com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.eventType_ = eventType_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.id_ = id_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.timestamp_ = timestamp_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.someOtherId_ = someOtherId_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.isValid_ = isValid_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.description_ = description_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000040; + } + result.someFloatColumn_ = someFloatColumn_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000080; + } + result.someIntColumn_ = someIntColumn_; + if (((from_bitField0_ & 0x00000100) == 0x00000100)) { + to_bitField0_ |= 0x00000100; + } + result.someLongColumn_ = someLongColumn_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent) { + return mergeFrom((com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent other) { + if (other == com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.getDefaultInstance()) return this; + if (other.hasEventType()) { + setEventType(other.getEventType()); + } + if (other.hasId()) { + setId(other.getId()); + } + if (other.hasTimestamp()) { + bitField0_ |= 0x00000004; + timestamp_ = other.timestamp_; + onChanged(); + } + if (other.hasSomeOtherId()) { + setSomeOtherId(other.getSomeOtherId()); + } + if (other.hasIsValid()) { + setIsValid(other.getIsValid()); + } + if (other.hasDescription()) { + bitField0_ |= 0x00000020; + description_ = other.description_; + onChanged(); + } + if (other.hasSomeFloatColumn()) { + setSomeFloatColumn(other.getSomeFloatColumn()); + } + if (other.hasSomeIntColumn()) { + setSomeIntColumn(other.getSomeIntColumn()); + } + if (other.hasSomeLongColumn()) { + setSomeLongColumn(other.getSomeLongColumn()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasEventType()) { + + return false; + } + if (!hasId()) { + + return false; + } + if (!hasTimestamp()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required .prototest.ProtoTestEvent.EventCategory eventType = 1; + private com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory eventType_ = com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ZERO; + /** + * required .prototest.ProtoTestEvent.EventCategory eventType = 1; + */ + public boolean hasEventType() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required .prototest.ProtoTestEvent.EventCategory eventType = 1; + */ + public com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory getEventType() { + return eventType_; + } + /** + * required .prototest.ProtoTestEvent.EventCategory eventType = 1; + */ + public Builder setEventType(com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + eventType_ = value; + onChanged(); + return this; + } + /** + * required .prototest.ProtoTestEvent.EventCategory eventType = 1; + */ + public Builder clearEventType() { + bitField0_ = (bitField0_ & ~0x00000001); + eventType_ = com.metamx.druid.indexer.data.ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ZERO; + onChanged(); + return this; + } + + // required uint64 id = 2; + private long id_ ; + /** + * required uint64 id = 2; + */ + public boolean hasId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required uint64 id = 2; + */ + public long getId() { + return id_; + } + /** + * required uint64 id = 2; + */ + public Builder setId(long value) { + bitField0_ |= 0x00000002; + id_ = value; + onChanged(); + return this; + } + /** + * required uint64 id = 2; + */ + public Builder clearId() { + bitField0_ = (bitField0_ & ~0x00000002); + id_ = 0L; + onChanged(); + return this; + } + + // required string timestamp = 3; + private java.lang.Object timestamp_ = ""; + /** + * required string timestamp = 3; + */ + public boolean hasTimestamp() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * required string timestamp = 3; + */ + public java.lang.String getTimestamp() { + java.lang.Object ref = timestamp_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + timestamp_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string timestamp = 3; + */ + public com.google.protobuf.ByteString + getTimestampBytes() { + java.lang.Object ref = timestamp_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + timestamp_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * required string timestamp = 3; + */ + public Builder setTimestamp( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + timestamp_ = value; + onChanged(); + return this; + } + /** + * required string timestamp = 3; + */ + public Builder clearTimestamp() { + bitField0_ = (bitField0_ & ~0x00000004); + timestamp_ = getDefaultInstance().getTimestamp(); + onChanged(); + return this; + } + /** + * required string timestamp = 3; + */ + public Builder setTimestampBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + timestamp_ = value; + onChanged(); + return this; + } + + // optional uint32 someOtherId = 4; + private int someOtherId_ ; + /** + * optional uint32 someOtherId = 4; + */ + public boolean hasSomeOtherId() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional uint32 someOtherId = 4; + */ + public int getSomeOtherId() { + return someOtherId_; + } + /** + * optional uint32 someOtherId = 4; + */ + public Builder setSomeOtherId(int value) { + bitField0_ |= 0x00000008; + someOtherId_ = value; + onChanged(); + return this; + } + /** + * optional uint32 someOtherId = 4; + */ + public Builder clearSomeOtherId() { + bitField0_ = (bitField0_ & ~0x00000008); + someOtherId_ = 0; + onChanged(); + return this; + } + + // optional bool isValid = 5; + private boolean isValid_ ; + /** + * optional bool isValid = 5; + */ + public boolean hasIsValid() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bool isValid = 5; + */ + public boolean getIsValid() { + return isValid_; + } + /** + * optional bool isValid = 5; + */ + public Builder setIsValid(boolean value) { + bitField0_ |= 0x00000010; + isValid_ = value; + onChanged(); + return this; + } + /** + * optional bool isValid = 5; + */ + public Builder clearIsValid() { + bitField0_ = (bitField0_ & ~0x00000010); + isValid_ = false; + onChanged(); + return this; + } + + // optional string description = 6; + private java.lang.Object description_ = ""; + /** + * optional string description = 6; + */ + public boolean hasDescription() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional string description = 6; + */ + public java.lang.String getDescription() { + java.lang.Object ref = description_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + description_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string description = 6; + */ + public com.google.protobuf.ByteString + getDescriptionBytes() { + java.lang.Object ref = description_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + description_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string description = 6; + */ + public Builder setDescription( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000020; + description_ = value; + onChanged(); + return this; + } + /** + * optional string description = 6; + */ + public Builder clearDescription() { + bitField0_ = (bitField0_ & ~0x00000020); + description_ = getDefaultInstance().getDescription(); + onChanged(); + return this; + } + /** + * optional string description = 6; + */ + public Builder setDescriptionBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000020; + description_ = value; + onChanged(); + return this; + } + + // optional float someFloatColumn = 7; + private float someFloatColumn_ ; + /** + * optional float someFloatColumn = 7; + */ + public boolean hasSomeFloatColumn() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional float someFloatColumn = 7; + */ + public float getSomeFloatColumn() { + return someFloatColumn_; + } + /** + * optional float someFloatColumn = 7; + */ + public Builder setSomeFloatColumn(float value) { + bitField0_ |= 0x00000040; + someFloatColumn_ = value; + onChanged(); + return this; + } + /** + * optional float someFloatColumn = 7; + */ + public Builder clearSomeFloatColumn() { + bitField0_ = (bitField0_ & ~0x00000040); + someFloatColumn_ = 0F; + onChanged(); + return this; + } + + // optional uint32 someIntColumn = 8; + private int someIntColumn_ ; + /** + * optional uint32 someIntColumn = 8; + */ + public boolean hasSomeIntColumn() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * optional uint32 someIntColumn = 8; + */ + public int getSomeIntColumn() { + return someIntColumn_; + } + /** + * optional uint32 someIntColumn = 8; + */ + public Builder setSomeIntColumn(int value) { + bitField0_ |= 0x00000080; + someIntColumn_ = value; + onChanged(); + return this; + } + /** + * optional uint32 someIntColumn = 8; + */ + public Builder clearSomeIntColumn() { + bitField0_ = (bitField0_ & ~0x00000080); + someIntColumn_ = 0; + onChanged(); + return this; + } + + // optional uint64 someLongColumn = 9; + private long someLongColumn_ ; + /** + * optional uint64 someLongColumn = 9; + */ + public boolean hasSomeLongColumn() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional uint64 someLongColumn = 9; + */ + public long getSomeLongColumn() { + return someLongColumn_; + } + /** + * optional uint64 someLongColumn = 9; + */ + public Builder setSomeLongColumn(long value) { + bitField0_ |= 0x00000100; + someLongColumn_ = value; + onChanged(); + return this; + } + /** + * optional uint64 someLongColumn = 9; + */ + public Builder clearSomeLongColumn() { + bitField0_ = (bitField0_ & ~0x00000100); + someLongColumn_ = 0L; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:prototest.ProtoTestEvent) + } + + static { + defaultInstance = new ProtoTestEvent(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:prototest.ProtoTestEvent) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_prototest_ProtoTestEvent_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_prototest_ProtoTestEvent_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\017ProtoTest.proto\022\tprototest\"\266\002\n\016ProtoTe" + + "stEvent\022:\n\teventType\030\001 \002(\0162\'.prototest.P" + + "rotoTestEvent.EventCategory\022\n\n\002id\030\002 \002(\004\022" + + "\021\n\ttimestamp\030\003 \002(\t\022\023\n\013someOtherId\030\004 \001(\r\022" + + "\017\n\007isValid\030\005 \001(\010\022\023\n\013description\030\006 \001(\t\022\027\n" + + "\017someFloatColumn\030\007 \001(\002\022\025\n\rsomeIntColumn\030" + + "\010 \001(\r\022\026\n\016someLongColumn\030\t \001(\004\"F\n\rEventCa" + + "tegory\022\021\n\rCATEGORY_ZERO\020\000\022\020\n\014CATEGORY_ON" + + "E\020\001\022\020\n\014CATEGORY_TWO\020\002B6\n\035com.metamx.drui" + + "d.indexer.dataB\025ProtoTestEventWrapper" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_prototest_ProtoTestEvent_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_prototest_ProtoTestEvent_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_prototest_ProtoTestEvent_descriptor, + new java.lang.String[] { "EventType", "Id", "Timestamp", "SomeOtherId", "IsValid", "Description", "SomeFloatColumn", "SomeIntColumn", "SomeLongColumn", }); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/indexing-common/src/test/resources/ProtoTest.proto b/indexing-common/src/test/resources/ProtoTest.proto new file mode 100644 index 00000000000..c7822f9ff1b --- /dev/null +++ b/indexing-common/src/test/resources/ProtoTest.proto @@ -0,0 +1,31 @@ + +package prototest; +option java_package = "com.metamx.druid.indexer.data"; +option java_outer_classname = "ProtoTestEventWrapper"; + + + +message ProtoTestEvent { + + +enum EventCategory { + CATEGORY_ZERO = 0; + CATEGORY_ONE = 1; + CATEGORY_TWO = 2; +} + + required EventCategory eventType = 1; + + required uint64 id = 2; + required string timestamp = 3; + optional uint32 someOtherId = 4; + optional bool isValid = 5; + optional string description = 6; + + optional float someFloatColumn = 7; + optional uint32 someIntColumn = 8; + optional uint64 someLongColumn = 9; + + + +} diff --git a/indexing-common/src/test/resources/prototest.desc b/indexing-common/src/test/resources/prototest.desc new file mode 100644 index 0000000000000000000000000000000000000000..649ce5bcb8fe5df743b9fff73e1835e4467efd4e GIT binary patch literal 400 zcmY+APfNov7>8|F-TG?(EHdpy7`wPr4jw#sbET7kSQ*8k7YR$4fu<>GCfkSb>-hOh zst$U}%k%p^;Z5MH2hp>VS{AI)xV97W+d~^L`F0^Z#MvjSG`a(y*`e6iEOEx}G`zX~ z6|I50ZQMF3(QG5t9s#(VCa#e@Gz4FBVH6_jq-T(i&f&C?g5~;ysRd2Eu`>-&1byDT zkkW|91`KSAvSzJ#t+}j{69av836WLjr6f9)1S^I~`K2^4Oi-!qr*|ESP!T4pX(| X6!VI*9aESRO=hF+N39g8Yo>kxsG4&{ literal 0 HcmV?d00001 diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java index 1c189ed95e9..3c6d528d6a6 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java @@ -19,22 +19,8 @@ package com.metamx.druid.realtime.firehose; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableMap; -import com.metamx.common.exception.FormattedException; -import com.metamx.common.logger.Logger; -import com.metamx.druid.indexer.data.StringInputRowParser; -import com.metamx.druid.input.InputRow; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.Message; -import kafka.message.MessageAndMetadata; - import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; @@ -43,122 +29,217 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.Message; +import kafka.message.MessageAndMetadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.exception.FormattedException; +import com.metamx.common.logger.Logger; +import com.metamx.druid.indexer.data.InputRowParser; +import com.metamx.druid.indexer.data.ProtoBufInputRowParser; +import com.metamx.druid.indexer.data.StringInputRowParser; +import com.metamx.druid.input.InputRow; + /** */ public class KafkaFirehoseFactory implements FirehoseFactory { - private static final Logger log = new Logger(KafkaFirehoseFactory.class); + private static final Logger log = new Logger(KafkaFirehoseFactory.class); - @JsonProperty - private final Properties consumerProps; + @JsonProperty + private final Properties consumerProps; - @JsonProperty - private final String feed; + @JsonProperty + private final String feed; - @JsonProperty - private final StringInputRowParser parser; + @JsonProperty + private final InputRowParser parser; - @JsonCreator - public KafkaFirehoseFactory( - @JsonProperty("consumerProps") Properties consumerProps, - @JsonProperty("feed") String feed, - @JsonProperty("parser") StringInputRowParser parser - ) - { - this.consumerProps = consumerProps; - this.feed = feed; - this.parser = parser; + @JsonCreator + public KafkaFirehoseFactory( + @JsonProperty("consumerProps") Properties consumerProps, + @JsonProperty("feed") String feed, + @JsonProperty("parser") InputRowParser parser) + { + this.consumerProps = consumerProps; + this.feed = feed; + this.parser = parser; - parser.addDimensionExclusion("feed"); - } + parser.addDimensionExclusion("feed"); + } - @Override - public Firehose connect() throws IOException - { - final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)); + @Override + public Firehose connect() throws IOException + { + final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)); - final Map>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1)); + final Map>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1)); - final List> streamList = streams.get(feed); - if (streamList == null || streamList.size() != 1) { - return null; - } + final List> streamList = streams.get(feed); + if (streamList == null || streamList.size() != 1) + { + return null; + } - final KafkaStream stream = streamList.get(0); + final KafkaStream stream = streamList.get(0); - return new Firehose() - { - Iterator> iter = stream.iterator(); - private CharBuffer chars = null; + if (parser instanceof StringInputRowParser) + { - @Override - public boolean hasMore() - { - return iter.hasNext(); - } + return new StringMessageFirehose(connector, stream); + } else if (parser instanceof ProtoBufInputRowParser) + { + return new ProtoBufMessageFirehose(stream, connector); + } + throw new RuntimeException("No Firehose for parser: " + parser.getClass().getName()); + } - @Override - public InputRow nextRow() throws FormattedException - { - final Message message = iter.next().message(); + private abstract static class AbstractKafkaFirehose implements Firehose + { + protected final ConsumerConnector connector; + protected final Iterator> iter; - if (message == null) { - return null; - } + public AbstractKafkaFirehose(ConsumerConnector connector, KafkaStream stream) + { + iter = stream.iterator(); + this.connector = connector; + } - int payloadSize = message.payloadSize(); - if (chars == null || chars.remaining() < payloadSize) { - chars = CharBuffer.allocate(payloadSize); - } + @Override + public boolean hasMore() + { + return iter.hasNext(); + } - final CoderResult coderResult = Charsets.UTF_8.newDecoder() - .onMalformedInput(CodingErrorAction.REPLACE) - .onUnmappableCharacter(CodingErrorAction.REPLACE) - .decode(message.payload(), chars, true); + @Override + public InputRow nextRow() throws FormattedException + { + final Message message = iter.next().message(); - if (coderResult.isUnderflow()) { - chars.flip(); - try { - return parser.parse(chars.toString()); - } - finally { - chars.clear(); - } - } - else { - throw new FormattedException.Builder() - .withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW) - .withMessage(String.format("Failed with CoderResult[%s]", coderResult)) - .build(); - } - } + if (message == null) + { + return null; + } - @Override - public Runnable commit() - { - return new Runnable() - { - @Override - public void run() - { - /* - This is actually not going to do exactly what we want, cause it will be called asynchronously - after the persist is complete. So, it's going to commit that it's processed more than was actually - persisted. This is unfortunate, but good enough for now. Should revisit along with an upgrade - of our Kafka version. - */ + return parseMessage(message); + } - log.info("committing offsets"); - connector.commitOffsets(); - } - }; - } + protected abstract InputRow parseMessage(Message message); - @Override - public void close() throws IOException - { - connector.shutdown(); - } - }; - } + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + /* + * This is actually not going to do exactly what we want, cause it + * will be called asynchronously after the persist is complete. So, + * it's going to commit that it's processed more than was actually + * persisted. This is unfortunate, but good enough for now. Should + * revisit along with an upgrade of our Kafka version. + */ + + log.info("committing offsets"); + connector.commitOffsets(); + } + }; + } + + @Override + public void close() throws IOException + { + connector.shutdown(); + } + } + + private class StringMessageFirehose extends AbstractKafkaFirehose + { + + private CharBuffer chars = null; + + public StringMessageFirehose(ConsumerConnector connector, KafkaStream stream) + { + super(connector, stream); + } + + @Override + public InputRow parseMessage(Message message) throws FormattedException + { + + int payloadSize = message.payloadSize(); + if (chars == null || chars.remaining() < payloadSize) + { + chars = CharBuffer.allocate(payloadSize); + } + + final CoderResult coderResult = Charsets.UTF_8.newDecoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE) + .decode(message.payload(), chars, true); + + if (coderResult.isUnderflow()) + { + chars.flip(); + try + { + return parser.parse(chars.toString()); + } finally + { + chars.clear(); + } + } + else + { + throw new FormattedException.Builder() + .withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW) + .withMessage(String.format("Failed with CoderResult[%s]", coderResult)) + .build(); + } + } + + } + + private class ProtoBufMessageFirehose extends AbstractKafkaFirehose + { + + private ByteBuffer bytes = null; + + public ProtoBufMessageFirehose(KafkaStream stream, ConsumerConnector connector) + { + super(connector, stream); + } + + @Override + public InputRow parseMessage(Message message) throws FormattedException + { + + int payloadSize = message.payloadSize(); + if (bytes == null || bytes.remaining() < payloadSize) + { + bytes = ByteBuffer.allocate(payloadSize); + } + + bytes.put(message.payload()); + + bytes.flip(); + try + { + return parser.parse(bytes); + } finally + { + bytes.clear(); + } + } + + } }