From 9b8e69c99a410ba10496e375fc8cbb9c84f6d59b Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Tue, 11 Oct 2022 13:37:28 -0500 Subject: [PATCH] Add inline descriptor Protobuf bytes decoder (#13192) * Add inline descriptor Protobuf bytes decoder * PR comments * Update tests, check for IllegalArgumentException * Fix license, add equals test * Update extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoder.java Co-authored-by: Frank Chen Co-authored-by: Frank Chen --- docs/ingestion/data-formats.md | 20 +++ extensions-core/protobuf-extensions/pom.xml | 5 + .../DescriptorBasedProtobufBytesDecoder.java | 121 +++++++++++++++++ .../FileBasedProtobufBytesDecoder.java | 80 ++---------- .../InlineDescriptorProtobufBytesDecoder.java | 95 ++++++++++++++ .../input/protobuf/ProtobufBytesDecoder.java | 3 +- .../FileBasedProtobufBytesDecoderTest.java | 20 +++ ...ineDescriptorProtobufBytesDecoderTest.java | 123 ++++++++++++++++++ .../protobuf/ProtobufInputFormatTest.java | 31 ++++- website/.spelling | 1 + 10 files changed, 431 insertions(+), 68 deletions(-) create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/DescriptorBasedProtobufBytesDecoder.java create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoder.java create mode 100644 extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoderTest.java diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 22c10276472..db4e7f062f6 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -1308,6 +1308,26 @@ Sample spec: } ``` +#### Inline Descriptor Protobuf Bytes Decoder + +This Protobuf bytes decoder allows the user to provide the contents of a Protobuf descriptor file inline, encoded as a Base64 string, and then parse it to get schema used to decode the Protobuf record from bytes. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | Set value to `inline`. | yes | +| descriptorString | String | A compiled Protobuf descriptor, encoded as a Base64 string. | yes | +| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no | + +Sample spec: + +```json +"protoBytesDecoder": { + "type": "inline", + "descriptorString": , + "protoMessageType": "Metrics" +} +``` + ##### Confluent Schema Registry-based Protobuf Bytes Decoder This Protobuf bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes. diff --git a/extensions-core/protobuf-extensions/pom.xml b/extensions-core/protobuf-extensions/pom.xml index e2e8c4a1168..c7b7fc6e8b8 100644 --- a/extensions-core/protobuf-extensions/pom.xml +++ b/extensions-core/protobuf-extensions/pom.xml @@ -163,6 +163,11 @@ ${project.parent.version} test + + nl.jqno.equalsverifier + equalsverifier + test + diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/DescriptorBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/DescriptorBasedProtobufBytesDecoder.java new file mode 100644 index 00000000000..d4c65c6f999 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/DescriptorBasedProtobufBytesDecoder.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.github.os72.protobuf.dynamic.DynamicSchema; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.Set; + +public abstract class DescriptorBasedProtobufBytesDecoder implements ProtobufBytesDecoder +{ + private Descriptors.Descriptor descriptor; + private final String protoMessageType; + + public DescriptorBasedProtobufBytesDecoder( + final String protoMessageType + ) + { + this.protoMessageType = protoMessageType; + } + + @JsonProperty + public String getProtoMessageType() + { + return protoMessageType; + } + + public Descriptors.Descriptor getDescriptor() + { + return descriptor; + } + + @VisibleForTesting + void initDescriptor() + { + if (this.descriptor == null) { + final DynamicSchema dynamicSchema = generateDynamicSchema(); + this.descriptor = generateDescriptor(dynamicSchema); + } + } + + protected abstract DynamicSchema generateDynamicSchema(); + + @Override + public DynamicMessage parse(ByteBuffer bytes) + { + try { + DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(bytes)); + return message; + } + catch (Exception e) { + throw new ParseException(null, e, "Fail to decode protobuf message!"); + } + } + + private Descriptors.Descriptor generateDescriptor(DynamicSchema dynamicSchema) + { + Set messageTypes = dynamicSchema.getMessageTypes(); + if (messageTypes.size() == 0) { + throw new ParseException(null, "No message types found in the descriptor."); + } + + String messageType = protoMessageType == null ? (String) messageTypes.toArray()[0] : protoMessageType; + Descriptors.Descriptor desc = dynamicSchema.getMessageDescriptor(messageType); + if (desc == null) { + throw new ParseException( + null, + StringUtils.format( + "Protobuf message type %s not found in the specified descriptor. Available messages types are %s", + protoMessageType, + messageTypes + ) + ); + } + return desc; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DescriptorBasedProtobufBytesDecoder that = (DescriptorBasedProtobufBytesDecoder) o; + return Objects.equals(getProtoMessageType(), that.getProtoMessageType()); + } + + @Override + public int hashCode() + { + return Objects.hash(getProtoMessageType()); + } +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java index ed52f7443b4..d49037ec7ff 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java @@ -22,27 +22,19 @@ package org.apache.druid.data.input.protobuf; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.github.os72.protobuf.dynamic.DynamicSchema; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; +import com.google.common.base.Preconditions; import com.google.protobuf.Descriptors; -import com.google.protobuf.DynamicMessage; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.ParseException; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; -import java.nio.ByteBuffer; import java.util.Objects; -import java.util.Set; -public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder +public class FileBasedProtobufBytesDecoder extends DescriptorBasedProtobufBytesDecoder { private final String descriptorFilePath; - private final String protoMessageType; - private Descriptors.Descriptor descriptor; - @JsonCreator public FileBasedProtobufBytesDecoder( @@ -50,44 +42,20 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder @JsonProperty("protoMessageType") String protoMessageType ) { + super(protoMessageType); + Preconditions.checkNotNull(descriptorFilePath); this.descriptorFilePath = descriptorFilePath; - this.protoMessageType = protoMessageType; initDescriptor(); } - @JsonProperty - public String getDescriptor() + @JsonProperty("descriptor") + public String getDescriptorFilePath() { return descriptorFilePath; } - @JsonProperty - public String getProtoMessageType() - { - return protoMessageType; - } - - @VisibleForTesting - void initDescriptor() - { - if (this.descriptor == null) { - this.descriptor = getDescriptor(descriptorFilePath); - } - } - @Override - public DynamicMessage parse(ByteBuffer bytes) - { - try { - DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(bytes)); - return message; - } - catch (Exception e) { - throw new ParseException(null, e, "Fail to decode protobuf message!"); - } - } - - private Descriptors.Descriptor getDescriptor(String descriptorFilePath) + protected DynamicSchema generateDynamicSchema() { InputStream fin; @@ -111,9 +79,9 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder throw new ParseException(url.toString(), e, "Cannot read descriptor file: " + url); } } - DynamicSchema dynamicSchema; + try { - dynamicSchema = DynamicSchema.parseFrom(fin); + return DynamicSchema.parseFrom(fin); } catch (Descriptors.DescriptorValidationException e) { throw new ParseException(null, e, "Invalid descriptor file: " + descriptorFilePath); @@ -121,25 +89,6 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder catch (IOException e) { throw new ParseException(null, e, "Cannot read descriptor file: " + descriptorFilePath); } - - Set messageTypes = dynamicSchema.getMessageTypes(); - if (messageTypes.size() == 0) { - throw new ParseException(null, "No message types found in the descriptor: " + descriptorFilePath); - } - - String messageType = protoMessageType == null ? (String) messageTypes.toArray()[0] : protoMessageType; - Descriptors.Descriptor desc = dynamicSchema.getMessageDescriptor(messageType); - if (desc == null) { - throw new ParseException( - null, - StringUtils.format( - "Protobuf message type %s not found in the specified descriptor. Available messages types are %s", - protoMessageType, - messageTypes - ) - ); - } - return desc; } @Override @@ -151,17 +100,16 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder if (o == null || getClass() != o.getClass()) { return false; } - + if (!super.equals(o)) { + return false; + } FileBasedProtobufBytesDecoder that = (FileBasedProtobufBytesDecoder) o; - - return Objects.equals(descriptorFilePath, that.descriptorFilePath) && - Objects.equals(protoMessageType, that.protoMessageType); + return Objects.equals(descriptorFilePath, that.descriptorFilePath); } @Override public int hashCode() { - return Objects.hash(descriptorFilePath, protoMessageType); + return Objects.hash(super.hashCode(), descriptorFilePath); } - } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoder.java new file mode 100644 index 00000000000..9eb7051b68a --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoder.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.github.os72.protobuf.dynamic.DynamicSchema; +import com.google.common.base.Preconditions; +import com.google.protobuf.Descriptors; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.io.IOException; +import java.util.Objects; + +public class InlineDescriptorProtobufBytesDecoder extends DescriptorBasedProtobufBytesDecoder +{ + private final String descriptorString; + + @JsonCreator + public InlineDescriptorProtobufBytesDecoder( + @JsonProperty("descriptorString") String descriptorString, + @JsonProperty("protoMessageType") String protoMessageType + ) + { + super(protoMessageType); + Preconditions.checkNotNull(descriptorString); + this.descriptorString = descriptorString; + initDescriptor(); + } + + @JsonProperty + public String getDescriptorString() + { + return descriptorString; + } + + @Override + protected DynamicSchema generateDynamicSchema() + { + try { + byte[] decodedDesc = StringUtils.decodeBase64String(descriptorString); + return DynamicSchema.parseFrom(decodedDesc); + } + catch (IllegalArgumentException e) { + throw new IAE("Descriptor string does not have valid Base64 encoding."); + } + catch (Descriptors.DescriptorValidationException e) { + throw new ParseException(null, e, "Invalid descriptor string: " + descriptorString); + } + catch (IOException e) { + throw new ParseException(null, e, "Cannot read descriptor string: " + descriptorString); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + InlineDescriptorProtobufBytesDecoder that = (InlineDescriptorProtobufBytesDecoder) o; + return Objects.equals(getDescriptorString(), that.getDescriptorString()); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), getDescriptorString()); + } +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java index 1defa705aec..42a11755f7e 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufBytesDecoder.java @@ -28,7 +28,8 @@ import java.nio.ByteBuffer; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRegistryBasedProtobufBytesDecoder.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "file", value = FileBasedProtobufBytesDecoder.class), - @JsonSubTypes.Type(name = "schema_registry", value = SchemaRegistryBasedProtobufBytesDecoder.class) + @JsonSubTypes.Type(name = "schema_registry", value = SchemaRegistryBasedProtobufBytesDecoder.class), + @JsonSubTypes.Type(name = "inline", value = InlineDescriptorProtobufBytesDecoder.class) }) public interface ProtobufBytesDecoder { diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java index 18db5f12a37..c01575300c7 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoderTest.java @@ -19,6 +19,8 @@ package org.apache.druid.data.input.protobuf; +import com.google.protobuf.Descriptors; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.java.util.common.parsers.ParseException; import org.junit.Rule; import org.junit.Test; @@ -73,4 +75,22 @@ public class FileBasedProtobufBytesDecoderTest FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", null); decoder.initDescriptor(); } + + @Test + public void testEquals() + { + FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); + decoder.initDescriptor(); + Descriptors.Descriptor descriptorA = decoder.getDescriptor(); + + decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent.Foo"); + decoder.initDescriptor(); + Descriptors.Descriptor descriptorB = decoder.getDescriptor(); + + EqualsVerifier.forClass(FileBasedProtobufBytesDecoder.class) + .usingGetClass() + .withIgnoredFields("descriptor") + .withPrefabValues(Descriptors.Descriptor.class, descriptorA, descriptorB) + .verify(); + } } diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoderTest.java new file mode 100644 index 00000000000..a6fd8fffa51 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoderTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.google.common.io.Files; +import com.google.protobuf.Descriptors; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; + +public class InlineDescriptorProtobufBytesDecoderTest +{ + private String descString; + + @Before + public void initDescriptorString() throws Exception + { + File descFile = new File(this.getClass() + .getClassLoader() + .getResource("prototest.desc") + .toURI()); + descString = StringUtils.encodeBase64String(Files.toByteArray(descFile)); + } + + @Test + public void testShortMessageType() + { + @SuppressWarnings("unused") // expected to create parser without exception + InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder( + descString, + "ProtoTestEvent" + ); + decoder.initDescriptor(); + } + + @Test + public void testLongMessageType() + { + @SuppressWarnings("unused") // expected to create parser without exception + InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder( + descString, + "prototest.ProtoTestEvent" + ); + decoder.initDescriptor(); + } + + @Test(expected = ParseException.class) + public void testBadProto() + { + @SuppressWarnings("unused") // expected exception + InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder(descString, "BadName"); + decoder.initDescriptor(); + } + + @Test(expected = IAE.class) + public void testMalformedDescriptorBase64() + { + @SuppressWarnings("unused") // expected exception + InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder("invalidString", "BadName"); + decoder.initDescriptor(); + } + + @Test(expected = ParseException.class) + public void testMalformedDescriptorValidBase64InvalidDescriptor() + { + @SuppressWarnings("unused") // expected exception + InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder( + "aGVsbG8gd29ybGQ=", + "BadName" + ); + decoder.initDescriptor(); + } + + @Test + public void testSingleDescriptorNoMessageType() + { + // For the backward compatibility, protoMessageType allows null when the desc file has only one message type. + @SuppressWarnings("unused") // expected to create parser without exception + InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder(descString, null); + decoder.initDescriptor(); + } + + @Test + public void testEquals() + { + InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder(descString, "ProtoTestEvent"); + decoder.initDescriptor(); + Descriptors.Descriptor descriptorA = decoder.getDescriptor(); + + decoder = new InlineDescriptorProtobufBytesDecoder(descString, "ProtoTestEvent.Foo"); + decoder.initDescriptor(); + Descriptors.Descriptor descriptorB = decoder.getDescriptor(); + + EqualsVerifier.forClass(InlineDescriptorProtobufBytesDecoder.class) + .usingGetClass() + .withIgnoredFields("descriptor") + .withPrefabValues(Descriptors.Descriptor.class, descriptorA, descriptorB) + .verify(); + } + +} diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java index 44e9af98457..219fb23aac7 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import com.google.common.io.Files; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.ByteEntity; @@ -31,6 +32,7 @@ import org.apache.druid.data.input.impl.NestedInputFormat; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -42,6 +44,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.io.File; import java.io.IOException; public class ProtobufInputFormatTest @@ -53,11 +56,12 @@ public class ProtobufInputFormatTest private DimensionsSpec dimensionsSpec; private JSONPathSpec flattenSpec; private FileBasedProtobufBytesDecoder decoder; + private InlineDescriptorProtobufBytesDecoder inlineSchemaDecoder; private final ObjectMapper jsonMapper = new DefaultObjectMapper(); @Before - public void setUp() + public void setUp() throws Exception { timestampSpec = new TimestampSpec("timestamp", "iso", null); dimensionsSpec = new DimensionsSpec(Lists.newArrayList( @@ -75,6 +79,14 @@ public class ProtobufInputFormatTest ) ); decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); + + File descFile = new File(this.getClass() + .getClassLoader() + .getResource("prototest.desc") + .toURI()); + String descString = StringUtils.encodeBase64String(Files.toByteArray(descFile)); + inlineSchemaDecoder = new InlineDescriptorProtobufBytesDecoder(descString, "ProtoTestEvent"); + for (Module jacksonModule : new ProtobufExtensionsModule().getJacksonModules()) { jsonMapper.registerModule(jacksonModule); } @@ -145,4 +157,21 @@ public class ProtobufInputFormatTest ProtobufInputRowParserTest.verifyFlatData(row, dateTime); } + + @Test + public void testParseNestedDataWithInlineSchema() throws Exception + { + //configure parser with inline schema decoder + ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(flattenSpec, inlineSchemaDecoder); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime); + + final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event)); + + InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next(); + + ProtobufInputRowParserTest.verifyNestedData(row, dateTime); + } } diff --git a/website/.spelling b/website/.spelling index 0b13916f030..afa6c881b2d 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1233,6 +1233,7 @@ column_1 column_n com.opencsv ctrl +descriptorString headerFormat headerLabelPrefix jsonLowercase