handle timestamps of complex types when parsing protobuf messages (#11293)

* handle timestamps correctly when parsing protobuf

* Add timestamp handling to ProtobufReader

* disable checkstyle for generated sourcecode

* Fix test

* try this

* refactor tests
This commit is contained in:
Abhishek Agarwal 2021-06-07 15:19:39 +05:30 committed by GitHub
parent 0c5d1c9725
commit 44d629319d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1402 additions and 1345 deletions

View File

@ -48,7 +48,7 @@
<suppress checks="Indentation" files="[\\/]target[\\/]generated-test-sources[\\/]" />
<suppress checks="Indentation" files="ProtoTestEventWrapper.java" />
<suppress checks="Regexp" id="argumentLineBreaking" files="ProtoTestEventWrapper.java" />
<suppress checks="ConstantName" files="ProtoTestEventWrapper.java" />
<suppress checks="[a-zA-Z0-9]*" files="ProtoTestEventWrapper.java" />
<suppress checks="ConstantName" files="MySubRecord.java" />
<suppress checks="ConstantName" files="SomeAvroDatum.java" />
<suppress checks="ConstantName" files="MyFixed.java" />

View File

@ -32,10 +32,11 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.common.parsers.Parser;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import java.nio.ByteBuffer;
import java.util.List;
@ -43,9 +44,11 @@ import java.util.Map;
public class ProtobufInputRowParser implements ByteBufferInputRowParser
{
private static final Logger LOG = new Logger(ByteBufferInputRowParser.class);
private final ParseSpec parseSpec;
// timestamp spec to be used for parsing timestamp
private final TimestampSpec timestampSpec;
// whether the spec has any fields to flat
private final boolean isFlatSpec;
private final ProtobufBytesDecoder protobufBytesDecoder;
private Parser<String, Object> parser;
private final List<String> dimensions;
@ -62,12 +65,18 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
{
this.parseSpec = parseSpec;
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
this.isFlatSpec = parseSpec instanceof JSONParseSpec && ((JSONParseSpec) parseSpec).getFlattenSpec().getFields().isEmpty();
if (descriptorFilePath != null || protoMessageType != null) {
this.protobufBytesDecoder = new FileBasedProtobufBytesDecoder(descriptorFilePath, protoMessageType);
} else {
this.protobufBytesDecoder = protobufBytesDecoder;
}
if (isFlatSpec) {
this.timestampSpec = new ProtobufInputRowSchema.ProtobufTimestampSpec(parseSpec.getTimestampSpec());
} else {
this.timestampSpec = parseSpec.getTimestampSpec();
}
}
@Override
@ -89,11 +98,13 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
parser = parseSpec.makeParser();
}
Map<String, Object> record;
DateTime timestamp;
if (parseSpec instanceof JSONParseSpec && ((JSONParseSpec) parseSpec).getFlattenSpec().getFields().isEmpty()) {
if (isFlatSpec) {
try {
DynamicMessage message = protobufBytesDecoder.parse(input);
record = CollectionUtils.mapKeys(message.getAllFields(), k -> k.getJsonName());
timestamp = this.timestampSpec.extractTimestamp(record);
}
catch (Exception ex) {
throw new ParseException(ex, "Protobuf message could not be parsed");
@ -103,6 +114,7 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
DynamicMessage message = protobufBytesDecoder.parse(input);
String json = JsonFormat.printer().print(message);
record = parser.parseToMap(json);
timestamp = this.timestampSpec.extractTimestamp(record);
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(e, "Protobuf message could not be parsed");
@ -117,10 +129,6 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
Sets.difference(record.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
);
}
return ImmutableList.of(new MapBasedInputRow(
parseSpec.getTimestampSpec().extractTimestamp(record),
dimensions,
record
));
return ImmutableList.of(new MapBasedInputRow(timestamp, dimensions, record));
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.protobuf;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.JsonFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Map;
/**
* Extension of {@link InputRowSchema} with a custom {@link TimestampSpec} to support timestamp extraction for
* complex timestamp types
*/
public class ProtobufInputRowSchema extends InputRowSchema
{
public ProtobufInputRowSchema(InputRowSchema inputRowSchema)
{
super(new ProtobufTimestampSpec(inputRowSchema.getTimestampSpec()), inputRowSchema.getDimensionsSpec(), inputRowSchema.getColumnsFilter());
}
static class ProtobufTimestampSpec extends TimestampSpec
{
public ProtobufTimestampSpec(TimestampSpec timestampSpec)
{
super(timestampSpec.getTimestampColumn(), timestampSpec.getTimestampFormat(), timestampSpec.getMissingValue());
}
/**
* Extracts the timestamp from the record. If the timestamp column is of complex type such as {@link Timestamp}, then the timestamp
* is first serialized to string via {@link JsonFormat}. Directly calling {@code toString()} on {@code Timestamp}
* returns an unparseable string.
*/
@Override
@Nullable
public DateTime extractTimestamp(@Nullable Map<String, Object> input)
{
Object rawTimestamp = getRawTimestamp(input);
if (rawTimestamp instanceof Message) {
try {
String timestampStr = JsonFormat.printer().print((Message) rawTimestamp);
return parseDateTime(timestampStr);
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(e, "Protobuf message could not be parsed");
}
} else {
return parseDateTime(rawTimestamp);
}
}
}
}

View File

@ -62,11 +62,17 @@ public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
JSONPathSpec flattenSpec
)
{
this.inputRowSchema = inputRowSchema;
if (flattenSpec == null) {
this.inputRowSchema = new ProtobufInputRowSchema(inputRowSchema);
this.recordFlattener = null;
} else {
this.inputRowSchema = inputRowSchema;
this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true));
}
this.source = source;
this.protobufBytesDecoder = protobufBytesDecoder;
this.flattenSpec = flattenSpec;
this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true));
}
@Override

View File

@ -41,10 +41,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
public class ProtobufInputFormatTest
{
@ -119,49 +116,13 @@ public class ProtobufInputFormatTest
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.setFoo(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("baz"))
.addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("bar0"))
.addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("bar1"))
.build();
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime);
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray()));
final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next();
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");
assertDimensionEquals(row, "eventType", ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE.name());
assertDimensionEquals(row, "foobar", "baz");
assertDimensionEquals(row, "bar0", "bar0");
Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0);
Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0);
Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
}
@Test
@ -172,44 +133,12 @@ public class ProtobufInputFormatTest
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildFlatData(dateTime);
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray()));
final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next();
System.out.println(row);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");
Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0);
Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0);
Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
}
private void assertDimensionEquals(InputRow row, String dimension, Object expected)
{
List<String> values = row.getDimension(dimension);
Assert.assertEquals(1, values.size());
Assert.assertEquals(expected, values.get(0));
ProtobufInputRowParserTest.verifyFlatData(row, dateTime);
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.protobuf;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.Timestamp;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
@ -42,6 +43,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@ -52,6 +54,7 @@ public class ProtobufInputRowParserTest
private ParseSpec parseSpec;
private ParseSpec flatParseSpec;
private ParseSpec flatParseSpecWithComplexTimestamp;
private FileBasedProtobufBytesDecoder decoder;
@Before
@ -90,6 +93,20 @@ public class ProtobufInputRowParserTest
null,
null
);
flatParseSpecWithComplexTimestamp = new JSONParseSpec(
new TimestampSpec("otherTimestamp", "iso", null),
new DimensionsSpec(Lists.newArrayList(
new StringDimensionSchema("event"),
new StringDimensionSchema("id"),
new StringDimensionSchema("someOtherId"),
new StringDimensionSchema("isValid")
), null, null),
null,
null,
null
);
decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent");
}
@ -101,47 +118,11 @@ public class ProtobufInputRowParserTest
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.setFoo(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("baz"))
.addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("bar0"))
.addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("bar1"))
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
ProtoTestEventWrapper.ProtoTestEvent event = buildNestedData(dateTime);
InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");
assertDimensionEquals(row, "eventType", ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE.name());
assertDimensionEquals(row, "foobar", "baz");
assertDimensionEquals(row, "bar0", "bar0");
Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0);
Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0);
Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
InputRow row = parser.parseBatch(toByteBuffer(event)).get(0);
verifyNestedData(row, dateTime);
}
@Test
@ -152,35 +133,24 @@ public class ProtobufInputRowParserTest
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
ProtoTestEventWrapper.ProtoTestEvent event = buildFlatData(dateTime);
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
InputRow row = parser.parseBatch(toByteBuffer(event)).get(0);
verifyFlatData(row, dateTime);
}
InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
System.out.println(row);
@Test
public void testParseFlatDataWithComplexTimestamp() throws Exception
{
ProtobufInputRowParser parser = new ProtobufInputRowParser(flatParseSpecWithComplexTimestamp, decoder, null, null);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = buildFlatDataWithComplexTimestamp(dateTime);
assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");
InputRow row = parser.parseBatch(toByteBuffer(event)).get(0);
Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0);
Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0);
Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
verifyFlatDataWithComplexTimestamp(row, dateTime);
}
@Test
@ -218,7 +188,53 @@ public class ProtobufInputRowParserTest
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
ProtoTestEventWrapper.ProtoTestEvent event = buildNestedData(dateTime);
InputRow row = parser.parseBatch(toByteBuffer(event)).get(0);
verifyNestedData(row, dateTime);
}
private static void assertDimensionEquals(InputRow row, String dimension, Object expected)
{
List<String> values = row.getDimension(dimension);
Assert.assertEquals(1, values.size());
Assert.assertEquals(expected, values.get(0));
}
static ProtoTestEventWrapper.ProtoTestEvent buildFlatData(DateTime dateTime)
{
return ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
}
static void verifyFlatData(InputRow row, DateTime dateTime)
{
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");
Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0);
Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0);
Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
}
static ProtoTestEventWrapper.ProtoTestEvent buildNestedData(DateTime dateTime)
{
return ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
@ -238,12 +254,10 @@ public class ProtobufInputRowParserTest
.newBuilder()
.setBar("bar1"))
.build();
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
static void verifyNestedData(InputRow row, DateTime dateTime)
{
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "id", "4711");
@ -261,10 +275,33 @@ public class ProtobufInputRowParserTest
Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
}
private void assertDimensionEquals(InputRow row, String dimension, Object expected)
static ProtoTestEventWrapper.ProtoTestEvent buildFlatDataWithComplexTimestamp(DateTime dateTime)
{
List<String> values = row.getDimension(dimension);
Assert.assertEquals(1, values.size());
Assert.assertEquals(expected, values.get(0));
Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000).setNanos((int) ((dateTime.getMillis() % 1000) * 1000 * 1000)).build();
return ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setOtherTimestamp(timestamp)
.setTimestamp("unused")
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
}
static void verifyFlatDataWithComplexTimestamp(InputRow row, DateTime dateTime)
{
verifyFlatData(row, dateTime);
}
static ByteBuffer toByteBuffer(ProtoTestEventWrapper.ProtoTestEvent event) throws IOException
{
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
event.writeTo(out);
return ByteBuffer.wrap(out.toByteArray());
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.protobuf;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.nio.ByteBuffer;
public class ProtobufReaderTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private InputRowSchema inputRowSchema;
private InputRowSchema inputRowSchemaWithComplexTimestamp;
private JSONPathSpec flattenSpec;
private FileBasedProtobufBytesDecoder decoder;
@Before
public void setUp()
{
TimestampSpec timestampSpec = new TimestampSpec("timestamp", "iso", null);
DimensionsSpec dimensionsSpec = new DimensionsSpec(Lists.newArrayList(
new StringDimensionSchema("event"),
new StringDimensionSchema("id"),
new StringDimensionSchema("someOtherId"),
new StringDimensionSchema("isValid")
), null, null);
flattenSpec = new JSONPathSpec(
true,
Lists.newArrayList(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
)
);
inputRowSchema = new InputRowSchema(timestampSpec, dimensionsSpec, null);
inputRowSchemaWithComplexTimestamp = new InputRowSchema(
new TimestampSpec("otherTimestamp", "iso", null),
dimensionsSpec,
null
);
decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent");
}
@Test
public void testParseNestedData() throws Exception
{
ProtobufReader reader = new ProtobufReader(inputRowSchema, null, decoder, flattenSpec);
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime);
ByteBuffer buffer = ProtobufInputRowParserTest.toByteBuffer(event);
InputRow row = reader.parseInputRows(decoder.parse(buffer)).get(0);
ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
}
@Test
public void testParseFlatData() throws Exception
{
ProtobufReader reader = new ProtobufReader(inputRowSchema, null, decoder, null);
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildFlatData(dateTime);
ByteBuffer buffer = ProtobufInputRowParserTest.toByteBuffer(event);
InputRow row = reader.parseInputRows(decoder.parse(buffer)).get(0);
ProtobufInputRowParserTest.verifyFlatData(row, dateTime);
}
@Test
public void testParseFlatDataWithComplexTimestamp() throws Exception
{
ProtobufReader reader = new ProtobufReader(inputRowSchemaWithComplexTimestamp, null, decoder, null);
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildFlatDataWithComplexTimestamp(dateTime);
ByteBuffer buffer = ProtobufInputRowParserTest.toByteBuffer(event);
InputRow row = reader.parseInputRows(decoder.parse(buffer)).get(0);
ProtobufInputRowParserTest.verifyFlatDataWithComplexTimestamp(row, dateTime);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.DynamicMessage;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
@ -38,6 +39,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
public class SchemaRegistryBasedProtobufBytesDecoderTest
{
@ -52,11 +54,8 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
@Test
public void testParse() throws Exception
{
// Given
InputStream fin;
fin = this.getClass().getClassLoader().getResourceAsStream("ProtoTest.proto");
String protobufString = IOUtils.toString(fin, StandardCharsets.UTF_8);
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new ProtobufSchema(protobufString));
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(parseProtobufSchema());
ProtoTestEventWrapper.ProtoTestEvent event = getTestEvent();
byte[] bytes = event.toByteArray();
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((byte) 0).put(bytes);
@ -70,11 +69,7 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
@Test(expected = ParseException.class)
public void testParseCorrupted() throws Exception
{
// Given
InputStream fin;
fin = this.getClass().getClassLoader().getResourceAsStream("ProtoTest.proto");
String protobufString = IOUtils.toString(fin, StandardCharsets.UTF_8);
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new ProtobufSchema(protobufString));
Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(parseProtobufSchema());
byte[] bytes = getTestEvent().toByteArray();
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 6).put((byte) 0).putInt(1234).put((bytes), 5, 10);
bb.rewind();
@ -116,17 +111,7 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
private ProtoTestEventWrapper.ProtoTestEvent getTestEvent()
{
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildFlatData(dateTime);
return event;
}
@ -172,4 +157,17 @@ public class SchemaRegistryBasedProtobufBytesDecoderTest
// Then
Assert.assertNotEquals(decoder.hashCode(), 0);
}
private ProtobufSchema parseProtobufSchema() throws IOException
{
// Given
InputStream fin;
fin = this.getClass().getClassLoader().getResourceAsStream("ProtoTest.proto");
String protobufString = IOUtils.toString(fin, StandardCharsets.UTF_8);
fin = this.getClass().getClassLoader().getResourceAsStream("google/protobuf/timestamp.proto");
String timestampProtobufString = IOUtils.toString(fin, StandardCharsets.UTF_8);
return new ProtobufSchema(protobufString, Collections.emptyList(),
ImmutableMap.of("google/protobuf/timestamp.proto", timestampProtobufString), null, null);
}
}

View File

@ -13,11 +13,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
/*
If you are changing this class, make sure to add --include_imports flag while generating the descriptor file
*/
syntax = "proto2";
package prototest;
option java_package = "org.apache.druid.data.input.protobuf";
option java_outer_classname = "ProtoTestEventWrapper";
import "google/protobuf/timestamp.proto";
message ProtoTestEvent {
enum EventCategory {
@ -42,4 +46,5 @@ message ProtoTestEvent {
optional uint64 someLongColumn = 9;
optional Foo foo = 10;
repeated Foo bar = 11;
optional google.protobuf.Timestamp otherTimestamp = 12;
}