Add inline descriptor Protobuf bytes decoder (#13192)

* Add inline descriptor Protobuf bytes decoder

* PR comments

* Update tests, check for IllegalArgumentException

* Fix license, add equals test

* Update extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/InlineDescriptorProtobufBytesDecoder.java

Co-authored-by: Frank Chen <frankchen@apache.org>

Co-authored-by: Frank Chen <frankchen@apache.org>
This commit is contained in:
Jonathan Wei 2022-10-11 13:37:28 -05:00 committed by GitHub
parent 2a24c20454
commit 9b8e69c99a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 431 additions and 68 deletions

View File

@ -1308,6 +1308,26 @@ Sample spec:
} }
``` ```
#### Inline Descriptor Protobuf Bytes Decoder
This Protobuf bytes decoder allows the user to provide the contents of a Protobuf descriptor file inline, encoded as a Base64 string, and then parse it to get schema used to decode the Protobuf record from bytes.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | Set value to `inline`. | yes |
| descriptorString | String | A compiled Protobuf descriptor, encoded as a Base64 string. | yes |
| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no |
Sample spec:
```json
"protoBytesDecoder": {
"type": "inline",
"descriptorString": <Contents of a Protobuf descriptor file encoded as Base64 string>,
"protoMessageType": "Metrics"
}
```
##### Confluent Schema Registry-based Protobuf Bytes Decoder ##### Confluent Schema Registry-based Protobuf Bytes Decoder
This Protobuf bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes. This Protobuf bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes.

View File

@ -163,6 +163,11 @@
<version>${project.parent.version}</version> <version>${project.parent.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Set;
public abstract class DescriptorBasedProtobufBytesDecoder implements ProtobufBytesDecoder
{
private Descriptors.Descriptor descriptor;
private final String protoMessageType;
public DescriptorBasedProtobufBytesDecoder(
final String protoMessageType
)
{
this.protoMessageType = protoMessageType;
}
@JsonProperty
public String getProtoMessageType()
{
return protoMessageType;
}
public Descriptors.Descriptor getDescriptor()
{
return descriptor;
}
@VisibleForTesting
void initDescriptor()
{
if (this.descriptor == null) {
final DynamicSchema dynamicSchema = generateDynamicSchema();
this.descriptor = generateDescriptor(dynamicSchema);
}
}
protected abstract DynamicSchema generateDynamicSchema();
@Override
public DynamicMessage parse(ByteBuffer bytes)
{
try {
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(bytes));
return message;
}
catch (Exception e) {
throw new ParseException(null, e, "Fail to decode protobuf message!");
}
}
private Descriptors.Descriptor generateDescriptor(DynamicSchema dynamicSchema)
{
Set<String> messageTypes = dynamicSchema.getMessageTypes();
if (messageTypes.size() == 0) {
throw new ParseException(null, "No message types found in the descriptor.");
}
String messageType = protoMessageType == null ? (String) messageTypes.toArray()[0] : protoMessageType;
Descriptors.Descriptor desc = dynamicSchema.getMessageDescriptor(messageType);
if (desc == null) {
throw new ParseException(
null,
StringUtils.format(
"Protobuf message type %s not found in the specified descriptor. Available messages types are %s",
protoMessageType,
messageTypes
)
);
}
return desc;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DescriptorBasedProtobufBytesDecoder that = (DescriptorBasedProtobufBytesDecoder) o;
return Objects.equals(getProtoMessageType(), that.getProtoMessageType());
}
@Override
public int hashCode()
{
return Objects.hash(getProtoMessageType());
}
}

View File

@ -22,27 +22,19 @@ package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.os72.protobuf.dynamic.DynamicSchema; import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder public class FileBasedProtobufBytesDecoder extends DescriptorBasedProtobufBytesDecoder
{ {
private final String descriptorFilePath; private final String descriptorFilePath;
private final String protoMessageType;
private Descriptors.Descriptor descriptor;
@JsonCreator @JsonCreator
public FileBasedProtobufBytesDecoder( public FileBasedProtobufBytesDecoder(
@ -50,44 +42,20 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder
@JsonProperty("protoMessageType") String protoMessageType @JsonProperty("protoMessageType") String protoMessageType
) )
{ {
super(protoMessageType);
Preconditions.checkNotNull(descriptorFilePath);
this.descriptorFilePath = descriptorFilePath; this.descriptorFilePath = descriptorFilePath;
this.protoMessageType = protoMessageType;
initDescriptor(); initDescriptor();
} }
@JsonProperty @JsonProperty("descriptor")
public String getDescriptor() public String getDescriptorFilePath()
{ {
return descriptorFilePath; return descriptorFilePath;
} }
@JsonProperty
public String getProtoMessageType()
{
return protoMessageType;
}
@VisibleForTesting
void initDescriptor()
{
if (this.descriptor == null) {
this.descriptor = getDescriptor(descriptorFilePath);
}
}
@Override @Override
public DynamicMessage parse(ByteBuffer bytes) protected DynamicSchema generateDynamicSchema()
{
try {
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(bytes));
return message;
}
catch (Exception e) {
throw new ParseException(null, e, "Fail to decode protobuf message!");
}
}
private Descriptors.Descriptor getDescriptor(String descriptorFilePath)
{ {
InputStream fin; InputStream fin;
@ -111,9 +79,9 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder
throw new ParseException(url.toString(), e, "Cannot read descriptor file: " + url); throw new ParseException(url.toString(), e, "Cannot read descriptor file: " + url);
} }
} }
DynamicSchema dynamicSchema;
try { try {
dynamicSchema = DynamicSchema.parseFrom(fin); return DynamicSchema.parseFrom(fin);
} }
catch (Descriptors.DescriptorValidationException e) { catch (Descriptors.DescriptorValidationException e) {
throw new ParseException(null, e, "Invalid descriptor file: " + descriptorFilePath); throw new ParseException(null, e, "Invalid descriptor file: " + descriptorFilePath);
@ -121,25 +89,6 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder
catch (IOException e) { catch (IOException e) {
throw new ParseException(null, e, "Cannot read descriptor file: " + descriptorFilePath); throw new ParseException(null, e, "Cannot read descriptor file: " + descriptorFilePath);
} }
Set<String> messageTypes = dynamicSchema.getMessageTypes();
if (messageTypes.size() == 0) {
throw new ParseException(null, "No message types found in the descriptor: " + descriptorFilePath);
}
String messageType = protoMessageType == null ? (String) messageTypes.toArray()[0] : protoMessageType;
Descriptors.Descriptor desc = dynamicSchema.getMessageDescriptor(messageType);
if (desc == null) {
throw new ParseException(
null,
StringUtils.format(
"Protobuf message type %s not found in the specified descriptor. Available messages types are %s",
protoMessageType,
messageTypes
)
);
}
return desc;
} }
@Override @Override
@ -151,17 +100,16 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder
if (o == null || getClass() != o.getClass()) { if (o == null || getClass() != o.getClass()) {
return false; return false;
} }
if (!super.equals(o)) {
return false;
}
FileBasedProtobufBytesDecoder that = (FileBasedProtobufBytesDecoder) o; FileBasedProtobufBytesDecoder that = (FileBasedProtobufBytesDecoder) o;
return Objects.equals(descriptorFilePath, that.descriptorFilePath);
return Objects.equals(descriptorFilePath, that.descriptorFilePath) &&
Objects.equals(protoMessageType, that.protoMessageType);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(descriptorFilePath, protoMessageType); return Objects.hash(super.hashCode(), descriptorFilePath);
} }
} }

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import java.io.IOException;
import java.util.Objects;
public class InlineDescriptorProtobufBytesDecoder extends DescriptorBasedProtobufBytesDecoder
{
private final String descriptorString;
@JsonCreator
public InlineDescriptorProtobufBytesDecoder(
@JsonProperty("descriptorString") String descriptorString,
@JsonProperty("protoMessageType") String protoMessageType
)
{
super(protoMessageType);
Preconditions.checkNotNull(descriptorString);
this.descriptorString = descriptorString;
initDescriptor();
}
@JsonProperty
public String getDescriptorString()
{
return descriptorString;
}
@Override
protected DynamicSchema generateDynamicSchema()
{
try {
byte[] decodedDesc = StringUtils.decodeBase64String(descriptorString);
return DynamicSchema.parseFrom(decodedDesc);
}
catch (IllegalArgumentException e) {
throw new IAE("Descriptor string does not have valid Base64 encoding.");
}
catch (Descriptors.DescriptorValidationException e) {
throw new ParseException(null, e, "Invalid descriptor string: " + descriptorString);
}
catch (IOException e) {
throw new ParseException(null, e, "Cannot read descriptor string: " + descriptorString);
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
InlineDescriptorProtobufBytesDecoder that = (InlineDescriptorProtobufBytesDecoder) o;
return Objects.equals(getDescriptorString(), that.getDescriptorString());
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), getDescriptorString());
}
}

View File

@ -28,7 +28,8 @@ import java.nio.ByteBuffer;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRegistryBasedProtobufBytesDecoder.class) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRegistryBasedProtobufBytesDecoder.class)
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "file", value = FileBasedProtobufBytesDecoder.class), @JsonSubTypes.Type(name = "file", value = FileBasedProtobufBytesDecoder.class),
@JsonSubTypes.Type(name = "schema_registry", value = SchemaRegistryBasedProtobufBytesDecoder.class) @JsonSubTypes.Type(name = "schema_registry", value = SchemaRegistryBasedProtobufBytesDecoder.class),
@JsonSubTypes.Type(name = "inline", value = InlineDescriptorProtobufBytesDecoder.class)
}) })
public interface ProtobufBytesDecoder public interface ProtobufBytesDecoder
{ {

View File

@ -19,6 +19,8 @@
package org.apache.druid.data.input.protobuf; package org.apache.druid.data.input.protobuf;
import com.google.protobuf.Descriptors;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -73,4 +75,22 @@ public class FileBasedProtobufBytesDecoderTest
FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", null); FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", null);
decoder.initDescriptor(); decoder.initDescriptor();
} }
@Test
public void testEquals()
{
FileBasedProtobufBytesDecoder decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent");
decoder.initDescriptor();
Descriptors.Descriptor descriptorA = decoder.getDescriptor();
decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent.Foo");
decoder.initDescriptor();
Descriptors.Descriptor descriptorB = decoder.getDescriptor();
EqualsVerifier.forClass(FileBasedProtobufBytesDecoder.class)
.usingGetClass()
.withIgnoredFields("descriptor")
.withPrefabValues(Descriptors.Descriptor.class, descriptorA, descriptorB)
.verify();
}
} }

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.protobuf;
import com.google.common.io.Files;
import com.google.protobuf.Descriptors;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
public class InlineDescriptorProtobufBytesDecoderTest
{
private String descString;
@Before
public void initDescriptorString() throws Exception
{
File descFile = new File(this.getClass()
.getClassLoader()
.getResource("prototest.desc")
.toURI());
descString = StringUtils.encodeBase64String(Files.toByteArray(descFile));
}
@Test
public void testShortMessageType()
{
@SuppressWarnings("unused") // expected to create parser without exception
InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder(
descString,
"ProtoTestEvent"
);
decoder.initDescriptor();
}
@Test
public void testLongMessageType()
{
@SuppressWarnings("unused") // expected to create parser without exception
InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder(
descString,
"prototest.ProtoTestEvent"
);
decoder.initDescriptor();
}
@Test(expected = ParseException.class)
public void testBadProto()
{
@SuppressWarnings("unused") // expected exception
InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder(descString, "BadName");
decoder.initDescriptor();
}
@Test(expected = IAE.class)
public void testMalformedDescriptorBase64()
{
@SuppressWarnings("unused") // expected exception
InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder("invalidString", "BadName");
decoder.initDescriptor();
}
@Test(expected = ParseException.class)
public void testMalformedDescriptorValidBase64InvalidDescriptor()
{
@SuppressWarnings("unused") // expected exception
InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder(
"aGVsbG8gd29ybGQ=",
"BadName"
);
decoder.initDescriptor();
}
@Test
public void testSingleDescriptorNoMessageType()
{
// For the backward compatibility, protoMessageType allows null when the desc file has only one message type.
@SuppressWarnings("unused") // expected to create parser without exception
InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder(descString, null);
decoder.initDescriptor();
}
@Test
public void testEquals()
{
InlineDescriptorProtobufBytesDecoder decoder = new InlineDescriptorProtobufBytesDecoder(descString, "ProtoTestEvent");
decoder.initDescriptor();
Descriptors.Descriptor descriptorA = decoder.getDescriptor();
decoder = new InlineDescriptorProtobufBytesDecoder(descString, "ProtoTestEvent.Foo");
decoder.initDescriptor();
Descriptors.Descriptor descriptorB = decoder.getDescriptor();
EqualsVerifier.forClass(InlineDescriptorProtobufBytesDecoder.class)
.usingGetClass()
.withIgnoredFields("descriptor")
.withPrefabValues(Descriptors.Descriptor.class, descriptorA, descriptorB)
.verify();
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.ByteEntity;
@ -31,6 +32,7 @@ import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -42,6 +44,7 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException; import java.io.IOException;
public class ProtobufInputFormatTest public class ProtobufInputFormatTest
@ -53,11 +56,12 @@ public class ProtobufInputFormatTest
private DimensionsSpec dimensionsSpec; private DimensionsSpec dimensionsSpec;
private JSONPathSpec flattenSpec; private JSONPathSpec flattenSpec;
private FileBasedProtobufBytesDecoder decoder; private FileBasedProtobufBytesDecoder decoder;
private InlineDescriptorProtobufBytesDecoder inlineSchemaDecoder;
private final ObjectMapper jsonMapper = new DefaultObjectMapper(); private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Before @Before
public void setUp() public void setUp() throws Exception
{ {
timestampSpec = new TimestampSpec("timestamp", "iso", null); timestampSpec = new TimestampSpec("timestamp", "iso", null);
dimensionsSpec = new DimensionsSpec(Lists.newArrayList( dimensionsSpec = new DimensionsSpec(Lists.newArrayList(
@ -75,6 +79,14 @@ public class ProtobufInputFormatTest
) )
); );
decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent");
File descFile = new File(this.getClass()
.getClassLoader()
.getResource("prototest.desc")
.toURI());
String descString = StringUtils.encodeBase64String(Files.toByteArray(descFile));
inlineSchemaDecoder = new InlineDescriptorProtobufBytesDecoder(descString, "ProtoTestEvent");
for (Module jacksonModule : new ProtobufExtensionsModule().getJacksonModules()) { for (Module jacksonModule : new ProtobufExtensionsModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule); jsonMapper.registerModule(jacksonModule);
} }
@ -145,4 +157,21 @@ public class ProtobufInputFormatTest
ProtobufInputRowParserTest.verifyFlatData(row, dateTime); ProtobufInputRowParserTest.verifyFlatData(row, dateTime);
} }
@Test
public void testParseNestedDataWithInlineSchema() throws Exception
{
//configure parser with inline schema decoder
ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(flattenSpec, inlineSchemaDecoder);
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime);
final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next();
ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
}
} }

View File

@ -1233,6 +1233,7 @@ column_1
column_n column_n
com.opencsv com.opencsv
ctrl ctrl
descriptorString
headerFormat headerFormat
headerLabelPrefix headerLabelPrefix
jsonLowercase jsonLowercase