mirror of https://github.com/apache/druid.git
add protobuf inputformat (#11018)
* add protobuf inputformat * repair pom * alter intermediateRow to type of Dynamicmessage * add document * refine test * fix document * add protoBytesDecoder * refine document and add ser test * add hash * add schema registry ser test Co-authored-by: yuanyi <yuanyi@freewheel.tv>
This commit is contained in:
parent
d0a94a8c14
commit
0e0c1a1aaf
|
@ -469,6 +469,41 @@ The `inputFormat` to load data of Avro OCF format. An example is:
|
||||||
|schema| JSON Object |Define a reader schema to be used when parsing Avro records, this is useful when parsing multiple versions of Avro OCF file data | no (default will decode using the writer schema contained in the OCF file) |
|
|schema| JSON Object |Define a reader schema to be used when parsing Avro records, this is useful when parsing multiple versions of Avro OCF file data | no (default will decode using the writer schema contained in the OCF file) |
|
||||||
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
|
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
|
||||||
|
|
||||||
|
### Protobuf
|
||||||
|
|
||||||
|
> You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf input format.
|
||||||
|
|
||||||
|
The `inputFormat` to load data of Protobuf format. An example is:
|
||||||
|
```json
|
||||||
|
"ioConfig": {
|
||||||
|
"inputFormat": {
|
||||||
|
"type": "protobuf",
|
||||||
|
"protoBytesDecoder": {
|
||||||
|
"type": "file",
|
||||||
|
"descriptor": "file:///tmp/metrics.desc",
|
||||||
|
"protoMessageType": "Metrics"
|
||||||
|
}
|
||||||
|
"flattenSpec": {
|
||||||
|
"useFieldDiscovery": true,
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"type": "path",
|
||||||
|
"name": "someRecord_subInt",
|
||||||
|
"expr": "$.someRecord.subInt"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
...
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
| Field | Type | Description | Required |
|
||||||
|
|-------|------|-------------|----------|
|
||||||
|
|type| String| This should be set to `protobuf` to read Protobuf serialized data| yes |
|
||||||
|
|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Protobuf record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
|
||||||
|
|`protoBytesDecoder`| JSON Object |Specifies how to decode bytes to Protobuf record. | yes |
|
||||||
|
|
||||||
### FlattenSpec
|
### FlattenSpec
|
||||||
|
|
||||||
The `flattenSpec` is located in `inputFormat` → `flattenSpec` and is responsible for
|
The `flattenSpec` is located in `inputFormat` → `flattenSpec` and is responsible for
|
||||||
|
@ -1113,8 +1148,7 @@ This parser is for [stream ingestion](./index.md#streaming) and reads Protocol b
|
||||||
| Field | Type | Description | Required |
|
| Field | Type | Description | Required |
|
||||||
|-------|------|-------------|----------|
|
|-------|------|-------------|----------|
|
||||||
| type | String | This should say `protobuf`. | yes |
|
| type | String | This should say `protobuf`. | yes |
|
||||||
| descriptor | String | Protobuf descriptor file name in the classpath or URL. | yes |
|
| `protoBytesDecoder` | JSON Object | Specifies how to decode bytes to Protobuf record. | yes |
|
||||||
| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no |
|
|
||||||
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. The format must be JSON. See [JSON ParseSpec](./index.md) for more configuration options. Note that timeAndDims parseSpec is no longer supported. | yes |
|
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. The format must be JSON. See [JSON ParseSpec](./index.md) for more configuration options. Note that timeAndDims parseSpec is no longer supported. | yes |
|
||||||
|
|
||||||
Sample spec:
|
Sample spec:
|
||||||
|
@ -1122,8 +1156,11 @@ Sample spec:
|
||||||
```json
|
```json
|
||||||
"parser": {
|
"parser": {
|
||||||
"type": "protobuf",
|
"type": "protobuf",
|
||||||
|
"protoBytesDecoder": {
|
||||||
|
"type": "file",
|
||||||
"descriptor": "file:///tmp/metrics.desc",
|
"descriptor": "file:///tmp/metrics.desc",
|
||||||
"protoMessageType": "Metrics",
|
"protoMessageType": "Metrics"
|
||||||
|
},
|
||||||
"parseSpec": {
|
"parseSpec": {
|
||||||
"format": "json",
|
"format": "json",
|
||||||
"timestampSpec": {
|
"timestampSpec": {
|
||||||
|
@ -1151,6 +1188,83 @@ Sample spec:
|
||||||
See the [extension description](../development/extensions-core/protobuf.md) for
|
See the [extension description](../development/extensions-core/protobuf.md) for
|
||||||
more details and examples.
|
more details and examples.
|
||||||
|
|
||||||
|
#### Protobuf Bytes Decoder
|
||||||
|
|
||||||
|
If `type` is not included, the `protoBytesDecoder` defaults to `schema_registry`.
|
||||||
|
|
||||||
|
##### File-based Protobuf Bytes Decoder
|
||||||
|
|
||||||
|
This Protobuf bytes decoder first read a descriptor file, and then parse it to get schema used to decode the Protobuf record from bytes.
|
||||||
|
|
||||||
|
| Field | Type | Description | Required |
|
||||||
|
|-------|------|-------------|----------|
|
||||||
|
| type | String | This should say `file`. | yes |
|
||||||
|
| descriptor | String | Protobuf descriptor file name in the classpath or URL. | yes |
|
||||||
|
| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no |
|
||||||
|
|
||||||
|
Sample spec:
|
||||||
|
|
||||||
|
```json
|
||||||
|
"protoBytesDecoder": {
|
||||||
|
"type": "file",
|
||||||
|
"descriptor": "file:///tmp/metrics.desc",
|
||||||
|
"protoMessageType": "Metrics"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
##### Confluent Schema Registry-based Protobuf Bytes Decoder
|
||||||
|
|
||||||
|
This Protobuf bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes.
|
||||||
|
For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry).
|
||||||
|
|
||||||
|
| Field | Type | Description | Required |
|
||||||
|
|-------|------|-------------|----------|
|
||||||
|
| type | String | This should say `schema_registry`. | yes |
|
||||||
|
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
|
||||||
|
| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
|
||||||
|
| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
|
||||||
|
| config | Json | To send additional configurations, configured for Schema Registry | no |
|
||||||
|
| headers | Json | To send headers to the Schema Registry | no |
|
||||||
|
|
||||||
|
For a single schema registry instance, use Field `url` or `urls` for multi instances.
|
||||||
|
|
||||||
|
Single Instance:
|
||||||
|
|
||||||
|
```json
|
||||||
|
...
|
||||||
|
"protoBytesDecoder": {
|
||||||
|
"url": <schema-registry-url>,
|
||||||
|
"type": "schema_registry"
|
||||||
|
}
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
Multiple Instances:
|
||||||
|
```json
|
||||||
|
...
|
||||||
|
"protoBytesDecoder": {
|
||||||
|
"urls": [<schema-registry-url-1>, <schema-registry-url-2>, ...],
|
||||||
|
"type": "schema_registry",
|
||||||
|
"capacity": 100,
|
||||||
|
"config" : {
|
||||||
|
"basic.auth.credentials.source": "USER_INFO",
|
||||||
|
"basic.auth.user.info": "fred:letmein",
|
||||||
|
"schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks",
|
||||||
|
"schema.registry.ssl.truststore.password": "<password>",
|
||||||
|
"schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
|
||||||
|
"schema.registry.ssl.keystore.password": "<password>",
|
||||||
|
"schema.registry.ssl.key.password": "<password>",
|
||||||
|
...
|
||||||
|
},
|
||||||
|
"headers": {
|
||||||
|
"traceID" : "b29c5de2-0db4-490b-b421",
|
||||||
|
"timeStamp" : "1577191871865",
|
||||||
|
...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
## ParseSpec
|
## ParseSpec
|
||||||
|
|
||||||
> The Parser is deprecated for [native batch tasks](./native-batch.md), [Kafka indexing service](../development/extensions-core/kafka-ingestion.md),
|
> The Parser is deprecated for [native batch tasks](./native-batch.md), [Kafka indexing service](../development/extensions-core/kafka-ingestion.md),
|
||||||
|
|
|
@ -137,6 +137,10 @@
|
||||||
<version>2.0.1</version>
|
<version>2.0.1</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-core</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- test -->
|
<!-- test -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -154,6 +158,12 @@
|
||||||
<artifactId>mockito-core</artifactId>
|
<artifactId>mockito-core</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.druid</groupId>
|
||||||
|
<artifactId>druid-processing</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -34,6 +34,7 @@ import java.io.InputStream;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder
|
public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder
|
||||||
|
@ -54,6 +55,18 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder
|
||||||
initDescriptor();
|
initDescriptor();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getDescriptor()
|
||||||
|
{
|
||||||
|
return descriptorFilePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getProtoMessageType()
|
||||||
|
{
|
||||||
|
return protoMessageType;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void initDescriptor()
|
void initDescriptor()
|
||||||
{
|
{
|
||||||
|
@ -123,4 +136,27 @@ public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder
|
||||||
}
|
}
|
||||||
return desc;
|
return desc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
FileBasedProtobufBytesDecoder that = (FileBasedProtobufBytesDecoder) o;
|
||||||
|
|
||||||
|
return Objects.equals(descriptorFilePath, that.descriptorFilePath) &&
|
||||||
|
Objects.equals(protoMessageType, that.protoMessageType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
return Objects.hash(descriptorFilePath, protoMessageType);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,8 @@ public class ProtobufExtensionsModule implements DruidModule
|
||||||
return Collections.singletonList(
|
return Collections.singletonList(
|
||||||
new SimpleModule("ProtobufInputRowParserModule")
|
new SimpleModule("ProtobufInputRowParserModule")
|
||||||
.registerSubtypes(
|
.registerSubtypes(
|
||||||
new NamedType(ProtobufInputRowParser.class, "protobuf")
|
new NamedType(ProtobufInputRowParser.class, "protobuf"),
|
||||||
|
new NamedType(ProtobufInputFormat.class, "protobuf")
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,92 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.data.input.protobuf;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import org.apache.druid.data.input.InputEntity;
|
||||||
|
import org.apache.druid.data.input.InputEntityReader;
|
||||||
|
import org.apache.druid.data.input.InputRowSchema;
|
||||||
|
import org.apache.druid.data.input.impl.NestedInputFormat;
|
||||||
|
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
public class ProtobufInputFormat extends NestedInputFormat
|
||||||
|
{
|
||||||
|
private final ProtobufBytesDecoder protobufBytesDecoder;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public ProtobufInputFormat(
|
||||||
|
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
|
||||||
|
@JsonProperty("protoBytesDecoder") ProtobufBytesDecoder protobufBytesDecoder
|
||||||
|
)
|
||||||
|
{
|
||||||
|
super(flattenSpec);
|
||||||
|
this.protobufBytesDecoder = protobufBytesDecoder;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public ProtobufBytesDecoder getProtoBytesDecoder()
|
||||||
|
{
|
||||||
|
return protobufBytesDecoder;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSplittable()
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
|
||||||
|
{
|
||||||
|
return new ProtobufReader(
|
||||||
|
inputRowSchema,
|
||||||
|
source,
|
||||||
|
protobufBytesDecoder,
|
||||||
|
getFlattenSpec()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(final Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
final ProtobufInputFormat that = (ProtobufInputFormat) o;
|
||||||
|
return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) &&
|
||||||
|
Objects.equals(protobufBytesDecoder, that.protobufBytesDecoder);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
return Objects.hash(protobufBytesDecoder, getFlattenSpec());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,111 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.data.input.protobuf;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.Iterators;
|
||||||
|
import com.google.protobuf.DynamicMessage;
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
import com.google.protobuf.util.JsonFormat;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.druid.data.input.InputEntity;
|
||||||
|
import org.apache.druid.data.input.InputRow;
|
||||||
|
import org.apache.druid.data.input.InputRowSchema;
|
||||||
|
import org.apache.druid.data.input.IntermediateRowParsingReader;
|
||||||
|
import org.apache.druid.data.input.impl.MapInputRowParser;
|
||||||
|
import org.apache.druid.java.util.common.CloseableIterators;
|
||||||
|
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||||
|
import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
|
||||||
|
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||||
|
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
|
||||||
|
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
|
||||||
|
import org.apache.druid.java.util.common.parsers.ParseException;
|
||||||
|
import org.apache.druid.utils.CollectionUtils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
|
||||||
|
{
|
||||||
|
private final InputRowSchema inputRowSchema;
|
||||||
|
private final InputEntity source;
|
||||||
|
private final JSONPathSpec flattenSpec;
|
||||||
|
private final ObjectFlattener<JsonNode> recordFlattener;
|
||||||
|
private final ProtobufBytesDecoder protobufBytesDecoder;
|
||||||
|
|
||||||
|
ProtobufReader(
|
||||||
|
InputRowSchema inputRowSchema,
|
||||||
|
InputEntity source,
|
||||||
|
ProtobufBytesDecoder protobufBytesDecoder,
|
||||||
|
JSONPathSpec flattenSpec
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.inputRowSchema = inputRowSchema;
|
||||||
|
this.source = source;
|
||||||
|
this.protobufBytesDecoder = protobufBytesDecoder;
|
||||||
|
this.flattenSpec = flattenSpec;
|
||||||
|
this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected CloseableIterator<DynamicMessage> intermediateRowIterator() throws IOException
|
||||||
|
{
|
||||||
|
return CloseableIterators.withEmptyBaggage(
|
||||||
|
Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()))))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<InputRow> parseInputRows(DynamicMessage intermediateRow) throws ParseException, JsonProcessingException
|
||||||
|
{
|
||||||
|
Map<String, Object> record;
|
||||||
|
|
||||||
|
if (flattenSpec == null) {
|
||||||
|
try {
|
||||||
|
record = CollectionUtils.mapKeys(intermediateRow.getAllFields(), k -> k.getJsonName());
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
throw new ParseException(ex, "Protobuf message could not be parsed");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
String json = JsonFormat.printer().print(intermediateRow);
|
||||||
|
JsonNode document = new ObjectMapper().readValue(json, JsonNode.class);
|
||||||
|
record = recordFlattener.flatten(document);
|
||||||
|
}
|
||||||
|
catch (InvalidProtocolBufferException e) {
|
||||||
|
throw new ParseException(e, "Protobuf message could not be parsed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, record));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<Map<String, Object>> toMap(DynamicMessage intermediateRow) throws JsonProcessingException, InvalidProtocolBufferException
|
||||||
|
{
|
||||||
|
return Collections.singletonList(new ObjectMapper().readValue(JsonFormat.printer().print(intermediateRow), Map.class));
|
||||||
|
}
|
||||||
|
}
|
|
@ -39,6 +39,7 @@ import java.nio.ByteBuffer;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDecoder
|
public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDecoder
|
||||||
{
|
{
|
||||||
|
@ -46,7 +47,11 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec
|
||||||
private static final Logger LOGGER = new Logger(SchemaRegistryBasedProtobufBytesDecoder.class);
|
private static final Logger LOGGER = new Logger(SchemaRegistryBasedProtobufBytesDecoder.class);
|
||||||
|
|
||||||
private final SchemaRegistryClient registry;
|
private final SchemaRegistryClient registry;
|
||||||
private int identityMapCapacity;
|
private final String url;
|
||||||
|
private final int capacity;
|
||||||
|
private final List<String> urls;
|
||||||
|
private final Map<String, ?> config;
|
||||||
|
private final Map<String, String> headers;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public SchemaRegistryBasedProtobufBytesDecoder(
|
public SchemaRegistryBasedProtobufBytesDecoder(
|
||||||
|
@ -57,23 +62,62 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec
|
||||||
@JsonProperty("headers") @Nullable Map<String, String> headers
|
@JsonProperty("headers") @Nullable Map<String, String> headers
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity;
|
this.url = url;
|
||||||
|
this.capacity = capacity == null ? Integer.MAX_VALUE : capacity;
|
||||||
|
this.urls = urls;
|
||||||
|
this.config = config;
|
||||||
|
this.headers = headers;
|
||||||
if (url != null && !url.isEmpty()) {
|
if (url != null && !url.isEmpty()) {
|
||||||
this.registry = new CachedSchemaRegistryClient(Collections.singletonList(url), identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), config, headers);
|
this.registry = new CachedSchemaRegistryClient(Collections.singletonList(this.url), this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers);
|
||||||
} else {
|
} else {
|
||||||
this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), config, headers);
|
this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, Collections.singletonList(new ProtobufSchemaProvider()), this.config, this.headers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public String getUrl()
|
||||||
|
{
|
||||||
|
return url;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public int getCapacity()
|
||||||
|
{
|
||||||
|
return capacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public List<String> getUrls()
|
||||||
|
{
|
||||||
|
return urls;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public Map<String, ?> getConfig()
|
||||||
|
{
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public Map<String, String> getHeaders()
|
||||||
|
{
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
int getIdentityMapCapacity()
|
int getIdentityMapCapacity()
|
||||||
{
|
{
|
||||||
return this.identityMapCapacity;
|
return this.capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
SchemaRegistryBasedProtobufBytesDecoder(SchemaRegistryClient registry)
|
SchemaRegistryBasedProtobufBytesDecoder(SchemaRegistryClient registry)
|
||||||
{
|
{
|
||||||
|
this.url = null;
|
||||||
|
this.capacity = Integer.MAX_VALUE;
|
||||||
|
this.urls = null;
|
||||||
|
this.config = null;
|
||||||
|
this.headers = null;
|
||||||
this.registry = registry;
|
this.registry = registry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,4 +152,34 @@ public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDec
|
||||||
throw new ParseException(e, "Fail to decode protobuf message!");
|
throw new ParseException(e, "Fail to decode protobuf message!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SchemaRegistryBasedProtobufBytesDecoder that = (SchemaRegistryBasedProtobufBytesDecoder) o;
|
||||||
|
|
||||||
|
return Objects.equals(url, that.url) &&
|
||||||
|
Objects.equals(capacity, that.capacity) &&
|
||||||
|
Objects.equals(urls, that.urls) &&
|
||||||
|
Objects.equals(config, that.config) &&
|
||||||
|
Objects.equals(headers, that.headers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = url != null ? url.hashCode() : 0;
|
||||||
|
result = 31 * result + capacity;
|
||||||
|
result = 31 * result + (urls != null ? urls.hashCode() : 0);
|
||||||
|
result = 31 * result + (config != null ? config.hashCode() : 0);
|
||||||
|
result = 31 * result + (headers != null ? headers.hashCode() : 0);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,215 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.data.input.protobuf;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.Module;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.druid.data.input.InputRow;
|
||||||
|
import org.apache.druid.data.input.InputRowSchema;
|
||||||
|
import org.apache.druid.data.input.impl.ByteEntity;
|
||||||
|
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||||
|
import org.apache.druid.data.input.impl.NestedInputFormat;
|
||||||
|
import org.apache.druid.data.input.impl.StringDimensionSchema;
|
||||||
|
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||||
|
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||||
|
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
|
||||||
|
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
|
||||||
|
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.chrono.ISOChronology;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ProtobufInputFormatTest
|
||||||
|
{
|
||||||
|
@Rule
|
||||||
|
public ExpectedException expectedException = ExpectedException.none();
|
||||||
|
|
||||||
|
private TimestampSpec timestampSpec;
|
||||||
|
private DimensionsSpec dimensionsSpec;
|
||||||
|
private JSONPathSpec flattenSpec;
|
||||||
|
private FileBasedProtobufBytesDecoder decoder;
|
||||||
|
|
||||||
|
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp()
|
||||||
|
{
|
||||||
|
timestampSpec = new TimestampSpec("timestamp", "iso", null);
|
||||||
|
dimensionsSpec = new DimensionsSpec(Lists.newArrayList(
|
||||||
|
new StringDimensionSchema("event"),
|
||||||
|
new StringDimensionSchema("id"),
|
||||||
|
new StringDimensionSchema("someOtherId"),
|
||||||
|
new StringDimensionSchema("isValid")
|
||||||
|
), null, null);
|
||||||
|
flattenSpec = new JSONPathSpec(
|
||||||
|
true,
|
||||||
|
Lists.newArrayList(
|
||||||
|
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
|
||||||
|
new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
|
||||||
|
new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
decoder = new FileBasedProtobufBytesDecoder("prototest.desc", "ProtoTestEvent");
|
||||||
|
for (Module jacksonModule : new ProtobufExtensionsModule().getJacksonModules()) {
|
||||||
|
jsonMapper.registerModule(jacksonModule);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerde() throws IOException
|
||||||
|
{
|
||||||
|
ProtobufInputFormat inputFormat = new ProtobufInputFormat(
|
||||||
|
flattenSpec,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
NestedInputFormat inputFormat2 = jsonMapper.readValue(
|
||||||
|
jsonMapper.writeValueAsString(inputFormat),
|
||||||
|
NestedInputFormat.class
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(inputFormat, inputFormat2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerdeForSchemaRegistry() throws IOException
|
||||||
|
{
|
||||||
|
ProtobufInputFormat inputFormat = new ProtobufInputFormat(
|
||||||
|
flattenSpec,
|
||||||
|
new SchemaRegistryBasedProtobufBytesDecoder("http://test:8081", 100, null, null, null)
|
||||||
|
);
|
||||||
|
NestedInputFormat inputFormat2 = jsonMapper.readValue(
|
||||||
|
jsonMapper.writeValueAsString(inputFormat),
|
||||||
|
NestedInputFormat.class
|
||||||
|
);
|
||||||
|
Assert.assertEquals(inputFormat, inputFormat2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParseNestedData() throws Exception
|
||||||
|
{
|
||||||
|
//configure parser with desc file
|
||||||
|
ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(flattenSpec, decoder);
|
||||||
|
|
||||||
|
//create binary of proto test event
|
||||||
|
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
|
||||||
|
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
|
||||||
|
.setDescription("description")
|
||||||
|
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
|
||||||
|
.setId(4711L)
|
||||||
|
.setIsValid(true)
|
||||||
|
.setSomeOtherId(4712)
|
||||||
|
.setTimestamp(dateTime.toString())
|
||||||
|
.setSomeFloatColumn(47.11F)
|
||||||
|
.setSomeIntColumn(815)
|
||||||
|
.setSomeLongColumn(816L)
|
||||||
|
.setFoo(ProtoTestEventWrapper.ProtoTestEvent.Foo
|
||||||
|
.newBuilder()
|
||||||
|
.setBar("baz"))
|
||||||
|
.addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo
|
||||||
|
.newBuilder()
|
||||||
|
.setBar("bar0"))
|
||||||
|
.addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo
|
||||||
|
.newBuilder()
|
||||||
|
.setBar("bar1"))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
event.writeTo(out);
|
||||||
|
|
||||||
|
final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray()));
|
||||||
|
|
||||||
|
InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next();
|
||||||
|
|
||||||
|
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
|
||||||
|
|
||||||
|
assertDimensionEquals(row, "id", "4711");
|
||||||
|
assertDimensionEquals(row, "isValid", "true");
|
||||||
|
assertDimensionEquals(row, "someOtherId", "4712");
|
||||||
|
assertDimensionEquals(row, "description", "description");
|
||||||
|
|
||||||
|
assertDimensionEquals(row, "eventType", ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE.name());
|
||||||
|
assertDimensionEquals(row, "foobar", "baz");
|
||||||
|
assertDimensionEquals(row, "bar0", "bar0");
|
||||||
|
|
||||||
|
|
||||||
|
Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0);
|
||||||
|
Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0);
|
||||||
|
Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParseFlatData() throws Exception
|
||||||
|
{
|
||||||
|
//configure parser with desc file
|
||||||
|
ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(null, decoder);
|
||||||
|
|
||||||
|
//create binary of proto test event
|
||||||
|
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
|
||||||
|
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
|
||||||
|
.setDescription("description")
|
||||||
|
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
|
||||||
|
.setId(4711L)
|
||||||
|
.setIsValid(true)
|
||||||
|
.setSomeOtherId(4712)
|
||||||
|
.setTimestamp(dateTime.toString())
|
||||||
|
.setSomeFloatColumn(47.11F)
|
||||||
|
.setSomeIntColumn(815)
|
||||||
|
.setSomeLongColumn(816L)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
event.writeTo(out);
|
||||||
|
|
||||||
|
final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray()));
|
||||||
|
|
||||||
|
InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next();
|
||||||
|
|
||||||
|
System.out.println(row);
|
||||||
|
|
||||||
|
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
|
||||||
|
|
||||||
|
assertDimensionEquals(row, "id", "4711");
|
||||||
|
assertDimensionEquals(row, "isValid", "true");
|
||||||
|
assertDimensionEquals(row, "someOtherId", "4712");
|
||||||
|
assertDimensionEquals(row, "description", "description");
|
||||||
|
|
||||||
|
|
||||||
|
Assert.assertEquals(47.11F, row.getMetric("someFloatColumn").floatValue(), 0.0);
|
||||||
|
Assert.assertEquals(815.0F, row.getMetric("someIntColumn").floatValue(), 0.0);
|
||||||
|
Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertDimensionEquals(InputRow row, String dimension, Object expected)
|
||||||
|
{
|
||||||
|
List<String> values = row.getDimension(dimension);
|
||||||
|
Assert.assertEquals(1, values.size());
|
||||||
|
Assert.assertEquals(expected, values.get(0));
|
||||||
|
}
|
||||||
|
}
|
|
@ -640,6 +640,7 @@ Avro-1124
|
||||||
SchemaRepo
|
SchemaRepo
|
||||||
avro
|
avro
|
||||||
avroBytesDecoder
|
avroBytesDecoder
|
||||||
|
protoBytesDecoder
|
||||||
flattenSpec
|
flattenSpec
|
||||||
jq
|
jq
|
||||||
org.apache.druid.extensions
|
org.apache.druid.extensions
|
||||||
|
|
Loading…
Reference in New Issue