From 0e0c1a1aaf468d2e082fffa9cab8a98013f2b536 Mon Sep 17 00:00:00 2001 From: Yi Yuan <269081523@qq.com> Date: Tue, 13 Apr 2021 13:03:13 +0800 Subject: [PATCH] add protobuf inputformat (#11018) * add protobuf inputformat * repair pom * alter intermediateRow to type of Dynamicmessage * add document * refine test * fix document * add protoBytesDecoder * refine document and add ser test * add hash * add schema registry ser test Co-authored-by: yuanyi --- docs/ingestion/data-formats.md | 122 +++++++++- extensions-core/protobuf-extensions/pom.xml | 10 + .../FileBasedProtobufBytesDecoder.java | 36 +++ .../protobuf/ProtobufExtensionsModule.java | 3 +- .../input/protobuf/ProtobufInputFormat.java | 92 ++++++++ .../data/input/protobuf/ProtobufReader.java | 111 +++++++++ ...hemaRegistryBasedProtobufBytesDecoder.java | 84 ++++++- .../protobuf/ProtobufInputFormatTest.java | 215 ++++++++++++++++++ website/.spelling | 1 + 9 files changed, 664 insertions(+), 10 deletions(-) create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java create mode 100644 extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index b65a1db1deb..d431fead16e 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -469,6 +469,41 @@ The `inputFormat` to load data of Avro OCF format. An example is: |schema| JSON Object |Define a reader schema to be used when parsing Avro records, this is useful when parsing multiple versions of Avro OCF file data | no (default will decode using the writer schema contained in the OCF file) | | binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) | +### Protobuf + +> You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf input format. + +The `inputFormat` to load data of Protobuf format. An example is: +```json +"ioConfig": { + "inputFormat": { + "type": "protobuf", + "protoBytesDecoder": { + "type": "file", + "descriptor": "file:///tmp/metrics.desc", + "protoMessageType": "Metrics" + } + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [ + { + "type": "path", + "name": "someRecord_subInt", + "expr": "$.someRecord.subInt" + } + ] + } + }, + ... +} +``` + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +|type| String| This should be set to `protobuf` to read Protobuf serialized data| yes | +|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Protobuf record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) | +|`protoBytesDecoder`| JSON Object |Specifies how to decode bytes to Protobuf record. | yes | + ### FlattenSpec The `flattenSpec` is located in `inputFormat` → `flattenSpec` and is responsible for @@ -1113,8 +1148,7 @@ This parser is for [stream ingestion](./index.md#streaming) and reads Protocol b | Field | Type | Description | Required | |-------|------|-------------|----------| | type | String | This should say `protobuf`. | yes | -| descriptor | String | Protobuf descriptor file name in the classpath or URL. | yes | -| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no | +| `protoBytesDecoder` | JSON Object | Specifies how to decode bytes to Protobuf record. | yes | | parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. The format must be JSON. See [JSON ParseSpec](./index.md) for more configuration options. Note that timeAndDims parseSpec is no longer supported. | yes | Sample spec: @@ -1122,8 +1156,11 @@ Sample spec: ```json "parser": { "type": "protobuf", - "descriptor": "file:///tmp/metrics.desc", - "protoMessageType": "Metrics", + "protoBytesDecoder": { + "type": "file", + "descriptor": "file:///tmp/metrics.desc", + "protoMessageType": "Metrics" + }, "parseSpec": { "format": "json", "timestampSpec": { @@ -1151,6 +1188,83 @@ Sample spec: See the [extension description](../development/extensions-core/protobuf.md) for more details and examples. +#### Protobuf Bytes Decoder + +If `type` is not included, the `protoBytesDecoder` defaults to `schema_registry`. + +##### File-based Protobuf Bytes Decoder + +This Protobuf bytes decoder first read a descriptor file, and then parse it to get schema used to decode the Protobuf record from bytes. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `file`. | yes | +| descriptor | String | Protobuf descriptor file name in the classpath or URL. | yes | +| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no | + +Sample spec: + +```json +"protoBytesDecoder": { + "type": "file", + "descriptor": "file:///tmp/metrics.desc", + "protoMessageType": "Metrics" +} +``` + +##### Confluent Schema Registry-based Protobuf Bytes Decoder + +This Protobuf bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes. +For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry). + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `schema_registry`. | yes | +| url | String | Specifies the url endpoint of the Schema Registry. | yes | +| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | +| urls | Array | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | +| config | Json | To send additional configurations, configured for Schema Registry | no | +| headers | Json | To send headers to the Schema Registry | no | + +For a single schema registry instance, use Field `url` or `urls` for multi instances. + +Single Instance: + +```json +... +"protoBytesDecoder": { + "url": , + "type": "schema_registry" +} +... +``` + +Multiple Instances: +```json +... +"protoBytesDecoder": { + "urls": [, , ...], + "type": "schema_registry", + "capacity": 100, + "config" : { + "basic.auth.credentials.source": "USER_INFO", + "basic.auth.user.info": "fred:letmein", + "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks", + "schema.registry.ssl.truststore.password": "", + "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks", + "schema.registry.ssl.keystore.password": "", + "schema.registry.ssl.key.password": "", + ... + }, + "headers": { + "traceID" : "b29c5de2-0db4-490b-b421", + "timeStamp" : "1577191871865", + ... + } +} +... +``` + ## ParseSpec > The Parser is deprecated for [native batch tasks](./native-batch.md), [Kafka indexing service](../development/extensions-core/kafka-ingestion.md), diff --git a/extensions-core/protobuf-extensions/pom.xml b/extensions-core/protobuf-extensions/pom.xml index aebbec3224f..c523ec20926 100644 --- a/extensions-core/protobuf-extensions/pom.xml +++ b/extensions-core/protobuf-extensions/pom.xml @@ -137,6 +137,10 @@ 2.0.1 provided + + com.fasterxml.jackson.core + jackson-core + @@ -154,6 +158,12 @@ mockito-core test + + org.apache.druid + druid-processing + ${project.parent.version} + test + diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java index 35d5c4fb5d0..cda5eb3f049 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java @@ -34,6 +34,7 @@ import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import java.nio.ByteBuffer; +import java.util.Objects; import java.util.Set; public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder @@ -54,6 +55,18 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder initDescriptor(); } + @JsonProperty + public String getDescriptor() + { + return descriptorFilePath; + } + + @JsonProperty + public String getProtoMessageType() + { + return protoMessageType; + } + @VisibleForTesting void initDescriptor() { @@ -123,4 +136,27 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder } return desc; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FileBasedProtobufBytesDecoder that = (FileBasedProtobufBytesDecoder) o; + + return Objects.equals(descriptorFilePath, that.descriptorFilePath) && + Objects.equals(protoMessageType, that.protoMessageType); + } + + @Override + public int hashCode() + { + return Objects.hash(descriptorFilePath, protoMessageType); + } + } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java index 9b05932ceb9..a2293ec794c 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java @@ -37,7 +37,8 @@ public class ProtobufExtensionsModule implements DruidModule return Collections.singletonList( new SimpleModule("ProtobufInputRowParserModule") .registerSubtypes( - new NamedType(ProtobufInputRowParser.class, "protobuf") + new NamedType(ProtobufInputRowParser.class, "protobuf"), + new NamedType(ProtobufInputFormat.class, "protobuf") ) ); } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java new file mode 100644 index 00000000000..36ae06875e3 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Objects; + +public class ProtobufInputFormat extends NestedInputFormat +{ + private final ProtobufBytesDecoder protobufBytesDecoder; + + @JsonCreator + public ProtobufInputFormat( + @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, + @JsonProperty("protoBytesDecoder") ProtobufBytesDecoder protobufBytesDecoder + ) + { + super(flattenSpec); + this.protobufBytesDecoder = protobufBytesDecoder; + } + + @JsonProperty + public ProtobufBytesDecoder getProtoBytesDecoder() + { + return protobufBytesDecoder; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new ProtobufReader( + inputRowSchema, + source, + protobufBytesDecoder, + getFlattenSpec() + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ProtobufInputFormat that = (ProtobufInputFormat) o; + return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) && + Objects.equals(protobufBytesDecoder, that.protobufBytesDecoder); + } + + @Override + public int hashCode() + { + return Objects.hash(protobufBytesDecoder, getFlattenSpec()); + } + +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java new file mode 100644 index 00000000000..5f7aed65f26 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterators; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.druid.java.util.common.parsers.ObjectFlatteners; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.CollectionUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ProtobufReader extends IntermediateRowParsingReader +{ + private final InputRowSchema inputRowSchema; + private final InputEntity source; + private final JSONPathSpec flattenSpec; + private final ObjectFlattener recordFlattener; + private final ProtobufBytesDecoder protobufBytesDecoder; + + ProtobufReader( + InputRowSchema inputRowSchema, + InputEntity source, + ProtobufBytesDecoder protobufBytesDecoder, + JSONPathSpec flattenSpec + ) + { + this.inputRowSchema = inputRowSchema; + this.source = source; + this.protobufBytesDecoder = protobufBytesDecoder; + this.flattenSpec = flattenSpec; + this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true)); + } + + @Override + protected CloseableIterator intermediateRowIterator() throws IOException + { + return CloseableIterators.withEmptyBaggage( + Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())))) + ); + } + + @Override + protected List parseInputRows(DynamicMessage intermediateRow) throws ParseException, JsonProcessingException + { + Map record; + + if (flattenSpec == null) { + try { + record = CollectionUtils.mapKeys(intermediateRow.getAllFields(), k -> k.getJsonName()); + } + catch (Exception ex) { + throw new ParseException(ex, "Protobuf message could not be parsed"); + } + } else { + try { + String json = JsonFormat.printer().print(intermediateRow); + JsonNode document = new ObjectMapper().readValue(json, JsonNode.class); + record = recordFlattener.flatten(document); + } + catch (InvalidProtocolBufferException e) { + throw new ParseException(e, "Protobuf message could not be parsed"); + } + } + + return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, record)); + } + + @Override + protected List> toMap(DynamicMessage intermediateRow) throws JsonProcessingException, InvalidProtocolBufferException + { + return Collections.singletonList(new ObjectMapper().readValue(JsonFormat.printer().print(intermediateRow), Map.class)); + } +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java index b1435dfc332..2d4cc8dd555 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java @@ -39,6 +39,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDecoder { @@ -46,7 +47,11 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec private static final Logger LOGGER = new Logger(SchemaRegistryBasedProtobufBytesDecoder.class); private final SchemaRegistryClient registry; - private int identityMapCapacity; + private final String url; + private final int capacity; + private final List urls; + private final Map config; + private final Map headers; @JsonCreator public SchemaRegistryBasedProtobufBytesDecoder( @@ -57,23 +62,62 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec @JsonProperty("headers") @Nullable Map headers ) { - this.identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; + this.url = url; + this.capacity = capacity == null ? Integer.MAX_VALUE : capacity; + this.urls = urls; + this.config = config; + this.headers = headers; if (url != null && !url.isEmpty()) { - this.registry = new CachedSchemaRegistryClient(Collections.singletonList(url), identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), config, headers); + this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers); } else { - this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), config, headers); + this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers); } } + @JsonProperty + public String getUrl() + { + return url; + } + + @JsonProperty + public int getCapacity() + { + return capacity; + } + + @JsonProperty + public List getUrls() + { + return urls; + } + + @JsonProperty + public Map getConfig() + { + return config; + } + + @JsonProperty + public Map getHeaders() + { + return headers; + } + @VisibleForTesting int getIdentityMapCapacity() { - return this.identityMapCapacity; + return this.capacity; } @VisibleForTesting SchemaRegistryBasedProtobufBytesDecoder(SchemaRegistryClient registry) { + this.url = null; + this.capacity = Integer.MAX_VALUE; + this.urls = null; + this.config = null; + this.headers = null; this.registry = registry; } @@ -108,4 +152,34 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec throw new ParseException(e, "Fail to decode protobuf message!"); } } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SchemaRegistryBasedProtobufBytesDecoder that = (SchemaRegistryBasedProtobufBytesDecoder) o; + + return Objects.equals(url, that.url) && + Objects.equals(capacity, that.capacity) && + Objects.equals(urls, that.urls) && + Objects.equals(config, that.config) && + Objects.equals(headers, that.headers); + } + + @Override + public int hashCode() + { + int result = url != null ? url.hashCode() : 0; + result = 31 * result + capacity; + result = 31 * result + (urls != null ? urls.hashCode() : 0); + result = 31 * result + (config != null ? config.hashCode() : 0); + result = 31 * result + (headers != null ? headers.hashCode() : 0); + return result; + } } diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java new file mode 100644 index 00000000000..c73f513dd90 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.joda.time.DateTime; +import org.joda.time.chrono.ISOChronology; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +public class ProtobufInputFormatTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private TimestampSpec timestampSpec; + private DimensionsSpec dimensionsSpec; + private JSONPathSpec flattenSpec; + private FileBasedProtobufBytesDecoder decoder; + + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Before + public void setUp() + { + timestampSpec = new TimestampSpec("timestamp", "iso", null); + dimensionsSpec = new DimensionsSpec(Lists.newArrayList( + new StringDimensionSchema("event"), + new StringDimensionSchema("id"), + new StringDimensionSchema("someOtherId"), + new StringDimensionSchema("isValid") + ), null, null); + flattenSpec = new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar") + ) + ); + decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent"); + for (Module jacksonModule : new ProtobufExtensionsModule().getJacksonModules()) { + jsonMapper.registerModule(jacksonModule); + } + } + + @Test + public void testSerde() throws IOException + { + ProtobufInputFormat inputFormat = new ProtobufInputFormat( + flattenSpec, + decoder + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + + Assert.assertEquals(inputFormat, inputFormat2); + } + + @Test + public void testSerdeForSchemaRegistry() throws IOException + { + ProtobufInputFormat inputFormat = new ProtobufInputFormat( + flattenSpec, + new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null) + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + Assert.assertEquals(inputFormat, inputFormat2); + } + + @Test + public void testParseNestedData() throws Exception + { + //configure parser with desc file + ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(flattenSpec, decoder); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + .setDescription("description") + .setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE) + .setId(4711L) + .setIsValid(true) + .setSomeOtherId(4712) + .setTimestamp(dateTime.toString()) + .setSomeFloatColumn(47.11F) + .setSomeIntColumn(815) + .setSomeLongColumn(816L) + .setFoo(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("baz")) + .addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("bar0")) + .addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo + .newBuilder() + .setBar("bar1")) + .build(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + event.writeTo(out); + + final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray())); + + InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next(); + + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "id", "4711"); + assertDimensionEquals(row, "isValid", "true"); + assertDimensionEquals(row, "someOtherId", "4712"); + assertDimensionEquals(row, "description", "description"); + + assertDimensionEquals(row, "eventType", ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE.name()); + assertDimensionEquals(row, "foobar", "baz"); + assertDimensionEquals(row, "bar0", "bar0"); + + + Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0); + Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0); + Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0); + } + + @Test + public void testParseFlatData() throws Exception + { + //configure parser with desc file + ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(null, decoder); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder() + .setDescription("description") + .setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE) + .setId(4711L) + .setIsValid(true) + .setSomeOtherId(4712) + .setTimestamp(dateTime.toString()) + .setSomeFloatColumn(47.11F) + .setSomeIntColumn(815) + .setSomeLongColumn(816L) + .build(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + event.writeTo(out); + + final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray())); + + InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next(); + + System.out.println(row); + + Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + + assertDimensionEquals(row, "id", "4711"); + assertDimensionEquals(row, "isValid", "true"); + assertDimensionEquals(row, "someOtherId", "4712"); + assertDimensionEquals(row, "description", "description"); + + + Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0); + Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0); + Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0); + } + + private void assertDimensionEquals(InputRow row, String dimension, Object expected) + { + List values = row.getDimension(dimension); + Assert.assertEquals(1, values.size()); + Assert.assertEquals(expected, values.get(0)); + } +} diff --git a/website/.spelling b/website/.spelling index 5c7237bd965..c0d5898b593 100644 --- a/website/.spelling +++ b/website/.spelling @@ -640,6 +640,7 @@ Avro-1124 SchemaRepo avro avroBytesDecoder +protoBytesDecoder flattenSpec jq org.apache.druid.extensions