Optimize protobuf parsing for flatten data (#9999)

* optimize for protobuf parsing

* fix import error and maven dependency

* add unit test in protobufInputrowParserTest for flatten data

* solve code duplication (remove the log and main())

* rename 'flatten' to 'flat' to make it clearer

Co-authored-by: xionghuilin <xionghuilin@bytedance.com>
This commit is contained in:
xhl0726 2020-06-25 09:01:31 +08:00 committed by GitHub
parent 9be5039f68
commit 1596b3eacd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 244 additions and 10 deletions

View File

@ -169,6 +169,12 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-protobuf-extensions</artifactId>
<version>0.19.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<properties> <properties>

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
public class ProtobufParserBenchmark
{
@Param({"75000"})
private int rowsPerSegment;
private static final Logger log = new Logger(ProtobufParserBenchmark.class);
static {
NullHandling.initializeForTests();
}
private ParseSpec nestedParseSpec;
private ProtobufInputRowParser nestedParser;
private ParseSpec flatParseSpec;
private ProtobufInputRowParser flatParser;
private byte[] protoInputs;
private String protoFilePath;
@Setup
public void setup()
{
nestedParseSpec = new JSONParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(Lists.newArrayList(
new StringDimensionSchema("event"),
new StringDimensionSchema("id"),
new StringDimensionSchema("someOtherId"),
new StringDimensionSchema("isValid")
), null, null),
new JSONPathSpec(
true,
Lists.newArrayList(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
)
),
null,
null
);
flatParseSpec = new JSONParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(Lists.newArrayList(
new StringDimensionSchema("event"),
new StringDimensionSchema("id"),
new StringDimensionSchema("someOtherId"),
new StringDimensionSchema("isValid")
), null, null),
null,
null,
null
);
protoFilePath = "ProtoFile";
protoInputs = getProtoInputs(protoFilePath);
nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent");
flatParser = new ProtobufInputRowParser(flatParseSpec, "prototest.desc", "ProtoTestEvent");
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void consumeFlatData(Blackhole blackhole)
{
for (int i = 0; i < rowsPerSegment; i++) {
InputRow row = flatParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0);
blackhole.consume(row);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void consumeNestedData(Blackhole blackhole)
{
for (int i = 0; i < rowsPerSegment; i++) {
InputRow row = nestedParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0);
blackhole.consume(row);
}
}
private byte[] getProtoInputs(String fileName)
{
String filePath = this.getClass().getClassLoader().getResource(fileName).getPath();
byte[] bytes = null;
try {
File file = new File(filePath);
bytes = new byte[(int) file.length()];
bytes = Files.toByteArray(file);
}
catch (FileNotFoundException e) {
log.error("Cannot find the file: " + filePath);
e.printStackTrace();
}
catch (IOException e) {
e.printStackTrace();
}
return bytes;
}
}

View File

@ -0,0 +1,4 @@
ç$2012-07-12T09:30:00.000Z è$(2 description=¤p<B@¯R
bazZ
bar0Z
bar1

Binary file not shown.

View File

@ -35,10 +35,12 @@ import com.google.protobuf.util.JsonFormat;
import org.apache.druid.data.input.ByteBufferInputRowParser; import org.apache.druid.data.input.ByteBufferInputRowParser;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.common.parsers.Parser; import org.apache.druid.java.util.common.parsers.Parser;
import org.apache.druid.utils.CollectionUtils;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -100,16 +102,27 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
parser = parseSpec.makeParser(); parser = parseSpec.makeParser();
initDescriptor(); initDescriptor();
} }
String json; Map<String, Object> record;
try {
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input)); if (parseSpec instanceof JSONParseSpec && ((JSONParseSpec) parseSpec).getFlattenSpec().getFields().isEmpty()) {
json = JsonFormat.printer().print(message); try {
} DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
catch (InvalidProtocolBufferException e) { record = CollectionUtils.mapKeys(message.getAllFields(), k -> k.getJsonName());
throw new ParseException(e, "Protobuf message could not be parsed"); }
catch (InvalidProtocolBufferException ex) {
throw new ParseException(ex, "Protobuf message could not be parsed");
}
} else {
try {
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
String json = JsonFormat.printer().print(message);
record = parser.parseToMap(json);
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(e, "Protobuf message could not be parsed");
}
} }
Map<String, Object> record = parser.parseToMap(json);
final List<String> dimensions; final List<String> dimensions;
if (!this.dimensions.isEmpty()) { if (!this.dimensions.isEmpty()) {
dimensions = this.dimensions; dimensions = this.dimensions;

View File

@ -52,6 +52,7 @@ public class ProtobufInputRowParserTest
public ExpectedException expectedException = ExpectedException.none(); public ExpectedException expectedException = ExpectedException.none();
private ParseSpec parseSpec; private ParseSpec parseSpec;
private ParseSpec flatParseSpec;
@Before @Before
public void setUp() public void setUp()
@ -76,6 +77,19 @@ public class ProtobufInputRowParserTest
null null
); );
flatParseSpec = new JSONParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(Lists.newArrayList(
new StringDimensionSchema("event"),
new StringDimensionSchema("id"),
new StringDimensionSchema("someOtherId"),
new StringDimensionSchema("isValid")
), null, null),
null,
null,
null
);
} }
@Test @Test
@ -126,7 +140,7 @@ public class ProtobufInputRowParserTest
} }
@Test @Test
public void testParse() throws Exception public void testParseNestedData() throws Exception
{ {
//configure parser with desc file //configure parser with desc file
ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent"); ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent");
@ -158,7 +172,6 @@ public class ProtobufInputRowParserTest
event.writeTo(out); event.writeTo(out);
InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
System.out.println(row);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
@ -177,6 +190,45 @@ public class ProtobufInputRowParserTest
Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0); Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
} }
@Test
public void testParseFlatData() throws Exception
{
//configure parser with desc file
ProtobufInputRowParser parser = new ProtobufInputRowParser(flatParseSpec, "prototest.desc", "ProtoTestEvent");
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
System.out.println(row);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");
Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0);
Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0);
Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
}
@Test @Test
public void testDisableJavaScript() public void testDisableJavaScript()
{ {