Protobuf extension (#4039)

* move ProtoBufInputRowParser from processing module to protobuf extensions

* Ported PR #3509

* add DynamicMessage

* fix local test stuff that slipped in

* add license header

* removed redundant type name

* removed commented code

* fix code style

* rename ProtoBuf -> Protobuf

* pom.xml: shade protobuf classes, handle .desc resource file as binary file

* clean up error messages

* pick first message type from descriptor if not specified

* fix protoMessageType null check. add test case

* move protobuf-extension from contrib to core

* document: add new configuration keys, and descriptions

* update document. add examples

* move protobuf-extension from contrib to core (2nd try)

* touch

* include protobuf extensions in the distribution

* fix whitespace

* include protobuf example in the distribution

* example: create new pb obj everytime

* document: use properly quoted json

* fix whitespace

* bump parent version to 0.10.1-SNAPSHOT

* ignore Override check

* touch
This commit is contained in:
Kenji Noguchi 2017-05-30 13:11:58 -07:00 committed by Fangjin Yang
parent 7889891bd3
commit 3400f601db
26 changed files with 4230 additions and 1412 deletions

View File

@ -97,10 +97,12 @@
<argument>-c</argument>
<argument>io.druid.extensions:druid-lookups-cached-single</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-protobuf-extensions</argument>
<argument>-c</argument>
<argument>io.druid.extensions:mysql-metadata-storage</argument>
<argument>-c</argument>
<argument>io.druid.extensions:postgresql-metadata-storage</argument>
<argument>-c</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-kerberos</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-s3-extensions</argument>

View File

@ -114,6 +114,13 @@
</includes>
<outputDirectory>conf-quickstart/tranquility</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/quickstart/protobuf</directory>
<includes>
<include>*</include>
</includes>
<outputDirectory>quickstart/protobuf</outputDirectory>
</fileSet>
<fileSet>
<directory>../examples/conf</directory>

View File

@ -0,0 +1,203 @@
---
layout: doc_page
---
# Protobuf
This extension enables Druid to ingest and understand the Protobuf data format. Make sure to [include](../../operations/including-extensions.html) `druid-protobuf-extensions` as an extension.
## Protobuf Parser
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `protobuf`. | no |
| descriptor | String | Protobuf descriptor file name in the classpath or URL. | yes |
| protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. The format must be json. See [JSON ParseSpec](../../ingestion/index.html) for more configuration options. Please note timeAndDims parseSpec is no longer supported. | yes |
## Example: Load Protobuf messages from Kafka
This example demonstrates how to load Protobuf messages from Kafka. Please read the [Load from Kafka tutorial](../../tutorial/tutorial-kafka.html) first. This example will use the same "metrics" dataset.
Files used in this example are found at `./examples/quickstart/protobuf` in your Druid directory.
- We will use [Kafka Indexing Service](./kafka-ingestion.html) instead of Tranquility.
- Kafka broker host is `localhost:9092`.
- Kafka topic is `metrics_pb` instead of `metrics`.
- datasource name is `metrics-kafka-pb` instead of `metrics-kafka` to avoid the confusion.
Here is the metrics JSON example.
```json
{
"unit": "milliseconds",
"http_method": "GET",
"value": 44,
"timestamp": "2017-04-06T02:36:22Z",
"http_code": "200",
"page": "/",
"metricType": "request/latency",
"server": "www1.example.com"
}
```
### Proto file
The proto file should look like this. Save it as metrics.proto.
```
syntax = "proto3";
message Metrics {
string unit = 1;
string http_method = 2;
int32 value = 3;
string timestamp = 4;
string http_code = 5;
string page = 6;
string metricType = 7;
string server = 8;
}
```
### Descriptor file
Using the `protoc` Protobuf compiler to generate the descriptor file. Save the metrics.desc file either in the classpath or reachable by URL. In this example the descriptor file was saved at /tmp/metrics.desc.
```
protoc -o /tmp/metrics.desc metrics.proto
```
### Supervisor spec JSON
Below is the complete Supervisor spec JSON to be submitted to the Overlord.
Please make sure these keys are properly configured for successful ingestion.
- `descriptor` for the descriptor file URL.
- `protoMessageType` from the proto definition.
- parseSpec `format` must be `json`.
- `topic` to subscribe. The topic is "metrics_pb" instead of "metrics".
- `bootstrap.server` is the kafka broker host.
```json
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka2",
"parser": {
"type": "protobuf",
"descriptor": "file:///tmp/metrics.desc",
"protoMessageType": "Metrics",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"unit",
"http_method",
"http_code",
"page",
"metricType",
"server"
],
"dimensionExclusions": [
"timestamp",
"value"
]
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "metrics_pb",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}
```
## Kafka Producer
Here is the sample script that publishes the metrics to Kafka in Protobuf format.
1. Run `protoc` again with the Python binding option. This command generates `metrics_pb2.py` file.
```
protoc -o metrics.desc metrics.proto --python_out=.
```
2. Create Kafka producer script.
This script requires `protobuf` and `kafka-python` modules.
```python
#!/usr/bin/env python
import sys
import json
from kafka import KafkaProducer
from metrics_pb2 import Metrics
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic = 'metrics_pb'
metrics = Metrics()
for row in iter(sys.stdin):
d = json.loads(row)
for k, v in d.items():
setattr(metrics, k, v)
pb = metrics.SerializeToString()
producer.send(topic, pb)
```
3. run producer
```
./bin/generate-example-metrics | ./pb_publisher.py
```
4. test
```
kafka-console-consumer --zookeeper localhost --topic metrics_pb
```
It should print messages like this
> millisecondsGETR"2017-04-06T03:23:56Z*2002/list:request/latencyBwww1.example.com

View File

@ -32,6 +32,7 @@ Core extensions are maintained by Druid committers.
|druid-kerberos|Kerberos authentication for druid nodes.|[link](../development/extensions-core/druid-kerberos.html)|
|druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)|
|druid-lookups-cached-single| Per lookup caching module to support the use cases where a lookup need to be isolated from the global pool of lookups |[link](../development/extensions-core/druid-lookups.html)|
|druid-protobuf-extensions| Support for data in Protobuf data format.|[link](../development/extensions-core/protobuf.html)|
|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
|druid-stats|Statistics related module including variance and standard deviation.|[link](../development/extensions-core/stats.html)|
|mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)|

View File

@ -109,13 +109,6 @@ If `type` is not included, the parser defaults to `string`. For additional data
| type | String | This should say `string` in general, or `hadoopyString` when used in a Hadoop indexing job. | no |
| parseSpec | JSON Object | Specifies the format, timestamp, and dimensions of the data. | yes |
### Protobuf Parser
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `protobuf`. | no |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes |
### ParseSpec
ParseSpecs serve two purposes:

View File

@ -0,0 +1,71 @@
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka2",
"parser": {
"type": "protobuf",
"descriptor": "file:///tmp/metrics.desc",
"protoMessageType": "Metrics",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"unit",
"http_method",
"http_code",
"page",
"metricType",
"server"
],
"dimensionExclusions": [
"timestamp",
"value"
]
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "metrics_pb",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}

View File

@ -0,0 +1,15 @@
õ
metrics.proto"Û
Metrics
unit ( Runit
http_method ( R
httpMethod
value (Rvalue
timestamp ( R timestamp
http_code ( RhttpCode
page ( Rpage
metricType ( R
metricType
server ( Rserverbproto3

View File

@ -0,0 +1,11 @@
syntax = "proto3";
message Metrics {
string unit = 1;
string http_method = 2;
int32 value = 3;
string timestamp = 4;
string http_code = 5;
string page = 6;
string metricType = 7;
string server = 8;
}

View File

@ -0,0 +1,118 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: metrics.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='metrics.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n\rmetrics.proto\"\x93\x01\n\x07Metrics\x12\x0c\n\x04unit\x18\x01 \x01(\t\x12\x13\n\x0bhttp_method\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\x05\x12\x11\n\ttimestamp\x18\x04 \x01(\t\x12\x11\n\thttp_code\x18\x05 \x01(\t\x12\x0c\n\x04page\x18\x06 \x01(\t\x12\x12\n\nmetricType\x18\x07 \x01(\t\x12\x0e\n\x06server\x18\x08 \x01(\tb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_METRICS = _descriptor.Descriptor(
name='Metrics',
full_name='Metrics',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='unit', full_name='Metrics.unit', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='http_method', full_name='Metrics.http_method', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='value', full_name='Metrics.value', index=2,
number=3, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='timestamp', full_name='Metrics.timestamp', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='http_code', full_name='Metrics.http_code', index=4,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='page', full_name='Metrics.page', index=5,
number=6, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='metricType', full_name='Metrics.metricType', index=6,
number=7, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='server', full_name='Metrics.server', index=7,
number=8, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=18,
serialized_end=165,
)
DESCRIPTOR.message_types_by_name['Metrics'] = _METRICS
Metrics = _reflection.GeneratedProtocolMessageType('Metrics', (_message.Message,), dict(
DESCRIPTOR = _METRICS,
__module__ = 'metrics_pb2'
# @@protoc_insertion_point(class_scope:Metrics)
))
_sym_db.RegisterMessage(Metrics)
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,19 @@
#!/usr/bin/env python
import sys
import json
from kafka import KafkaProducer
from metrics_pb2 import Metrics
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic = 'metrics_pb'
for row in iter(sys.stdin):
d = json.loads(row)
metrics = Metrics()
for k, v in d.items():
setattr(metrics, k, v)
pb = metrics.SerializeToString()
producer.send(topic, pb)

View File

@ -0,0 +1,123 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright 2012 - 2017 Metamarkets Group Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-protobuf-extensions</artifactId>
<name>druid-protobuf-extensions</name>
<description>druid-protobuf-extensions</description>
<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.10.1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<properties>
<protobuf.version>3.2.0</protobuf.version>
</properties>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protobuf-dynamic</artifactId>
<version>0.9.3</version>
</dependency>
<!-- test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<relocations>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>shaded.com.google.protobuf</shadedPattern>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<nonFilteredFileExtensions>
<nonFilteredFileExtension>desc</nonFilteredFileExtension>
</nonFilteredFileExtensions>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>strict</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgs>
<!-- protobuf compiler generated classes miss @Override, that is not easy to fix -->
<arg>-Xep:MissingOverride:WARN</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,48 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.protobuf;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
import java.util.List;
public class ProtobufExtensionsModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.asList(
new SimpleModule("ProtobufInputRowParserModule")
.registerSubtypes(
new NamedType(ProtobufInputRowParser.class, "protobuf")
)
);
}
@Override
public void configure(Binder binder)
{ }
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.protobuf;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.ParseSpec;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.common.parsers.Parser;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
public class ProtobufInputRowParser implements ByteBufferInputRowParser
{
private final ParseSpec parseSpec;
private Parser<String, Object> parser;
private final String descriptorFilePath;
private final String protoMessageType;
private Descriptor descriptor;
@JsonCreator
public ProtobufInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("descriptor") String descriptorFilePath,
@JsonProperty("protoMessageType") String protoMessageType
)
{
this.parseSpec = parseSpec;
this.descriptorFilePath = descriptorFilePath;
this.protoMessageType = protoMessageType;
this.parser = parseSpec.makeParser();
this.descriptor = getDescriptor(descriptorFilePath);
}
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}
@Override
public ProtobufInputRowParser withParseSpec(ParseSpec parseSpec)
{
return new ProtobufInputRowParser(parseSpec, descriptorFilePath, protoMessageType);
}
@Override
public InputRow parse(ByteBuffer input)
{
String json;
try {
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
json = JsonFormat.printer().print(message);
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(e, "Protobuf message could not be parsed");
}
Map<String, Object> record = parser.parse(json);
return new MapBasedInputRow(
parseSpec.getTimestampSpec().extractTimestamp(record),
parseSpec.getDimensionsSpec().getDimensionNames(),
record
);
}
private Descriptor getDescriptor(String descriptorFilePath)
{
InputStream fin;
fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFilePath);
if (fin == null) {
URL url = null;
try {
url = new URL(descriptorFilePath);
}
catch (MalformedURLException e) {
throw new ParseException(e, "Descriptor not found in class path or malformed URL:" + descriptorFilePath);
}
try {
fin = url.openConnection().getInputStream();
}
catch (IOException e) {
throw new ParseException(e, "Cannot read descriptor file: " + url.toString());
}
}
DynamicSchema dynamicSchema = null;
try {
dynamicSchema = DynamicSchema.parseFrom(fin);
}
catch (Descriptors.DescriptorValidationException e) {
throw new ParseException(e, "Invalid descriptor file: " + descriptorFilePath);
}
catch (IOException e) {
throw new ParseException(e, "Cannot read descriptor file: " + descriptorFilePath);
}
Set<String> messageTypes = dynamicSchema.getMessageTypes();
if (messageTypes.size() == 0) {
throw new ParseException("No message types found in the descriptor: " + descriptorFilePath);
}
String messageType = protoMessageType == null ? (String) messageTypes.toArray()[0] : protoMessageType;
Descriptor desc = dynamicSchema.getMessageDescriptor(messageType);
if (desc == null) {
throw new ParseException(
String.format(
"Protobuf message type %s not found in the specified descriptor. Available messages types are %s",
protoMessageType,
messageTypes
)
);
}
return desc;
}
}

View File

@ -0,0 +1 @@
io.druid.data.input.protobuf.ProtobufExtensionsModule

View File

@ -0,0 +1,170 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.protobuf;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.JSONPathFieldSpec;
import io.druid.data.input.impl.JSONPathFieldType;
import io.druid.data.input.impl.JSONPathSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class ProtobufInputRowParserTest
{
private ParseSpec parseSpec;
@Before
public void setUp() throws Exception
{
parseSpec = new JSONParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(Lists.<DimensionSchema>newArrayList(
new StringDimensionSchema("event"),
new StringDimensionSchema("id"),
new StringDimensionSchema("someOtherId"),
new StringDimensionSchema("isValid")
), null, null),
new JSONPathSpec(
true,
Lists.newArrayList(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
)
), null
);
}
@Test
public void testShortMessageType() throws Exception
{
//configure parser with desc file, and specify which file name to use
ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent");
}
@Test
public void testLongMessageType() throws Exception
{
//configure parser with desc file, and specify which file name to use
ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "prototest.ProtoTestEvent");
}
@Test(expected = ParseException.class)
public void testBadProto() throws Exception
{
//configure parser with desc file
ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "BadName");
}
@Test(expected = ParseException.class)
public void testMalformedDescriptorUrl() throws Exception
{
//configure parser with non existent desc file
ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "file:/nonexist.desc", "BadName");
}
@Test
public void testSingleDescriptorNoMessageType() throws Exception
{
// For the backward compatibility, protoMessageType allows null when the desc file has only one message type.
ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", null);
}
@Test
public void testParse() throws Exception
{
//configure parser with desc file
ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent");
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 07, 12, 9, 30);
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.setFoo(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("baz"))
.addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("bar0"))
.addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("bar1"))
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
InputRow row = parser.parse(ByteBuffer.wrap(out.toByteArray()));
System.out.println(row);
assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");
assertDimensionEquals(row, "eventType", ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE.name());
assertDimensionEquals(row, "foobar", "baz");
assertDimensionEquals(row, "bar0", "bar0");
assertEquals(47.11F, row.getFloatMetric("someFloatColumn"), 0.0);
assertEquals(815.0F, row.getFloatMetric("someIntColumn"), 0.0);
assertEquals(816.0F, row.getFloatMetric("someLongColumn"), 0.0);
}
private void assertDimensionEquals(InputRow row, String dimension, Object expected)
{
List<String> values = row.getDimension(dimension);
assertEquals(1, values.size());
assertEquals(expected, values.get(0));
}
}

View File

@ -0,0 +1,30 @@
syntax = "proto2";
package prototest;
option java_package = "io.druid.data.input.protobuf";
option java_outer_classname = "ProtoTestEventWrapper";
message ProtoTestEvent {
enum EventCategory {
CATEGORY_ZERO = 0;
CATEGORY_ONE = 1;
CATEGORY_TWO = 2;
}
message Foo {
required string bar = 1;
}
required EventCategory eventType = 1;
required uint64 id = 2;
required string timestamp = 3;
optional uint32 someOtherId = 4;
optional bool isValid = 5;
optional string description = 6;
optional float someFloatColumn = 7;
optional uint32 someIntColumn = 8;
optional uint64 someLongColumn = 9;
optional Foo foo = 10;
repeated Foo bar = 11;
}

View File

@ -106,6 +106,7 @@
<module>extensions-core/kafka-indexing-service</module>
<module>extensions-core/mysql-metadata-storage</module>
<module>extensions-core/postgresql-metadata-storage</module>
<module>extensions-core/protobuf-extensions</module>
<module>extensions-core/lookups-cached-global</module>
<module>extensions-core/lookups-cached-single</module>
<module>extensions-core/s3-extensions</module>

View File

@ -65,10 +65,6 @@
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>

View File

@ -1,129 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.java.util.common.logger.Logger;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Map;
@JsonTypeName("protobuf")
public class ProtoBufInputRowParser implements ByteBufferInputRowParser
{
private static final Logger log = new Logger(ProtoBufInputRowParser.class);
private final ParseSpec parseSpec;
private final MapInputRowParser mapParser;
private final String descriptorFileInClasspath;
@JsonCreator
public ProtoBufInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("descriptor") String descriptorFileInClasspath
)
{
this.parseSpec = parseSpec;
this.descriptorFileInClasspath = descriptorFileInClasspath;
this.mapParser = new MapInputRowParser(this.parseSpec);
}
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}
@Override
public ProtoBufInputRowParser withParseSpec(ParseSpec parseSpec)
{
return new ProtoBufInputRowParser(parseSpec, descriptorFileInClasspath);
}
@Override
public InputRow parse(ByteBuffer input)
{
// We should really create a ProtoBufBasedInputRow that does not need an intermediate map but accesses
// the DynamicMessage directly...
Map<String, Object> theMap = buildStringKeyMap(input);
return mapParser.parse(theMap);
}
private Map<String, Object> buildStringKeyMap(ByteBuffer input)
{
final Descriptors.Descriptor descriptor = getDescriptor(descriptorFileInClasspath);
final Map<String, Object> theMap = Maps.newHashMap();
try {
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
Map<Descriptors.FieldDescriptor, Object> allFields = message.getAllFields();
for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : allFields.entrySet()) {
String name = entry.getKey().getName();
if (theMap.containsKey(name)) {
continue;
// Perhaps throw an exception here?
// throw new RuntimeException("dupicate key " + name + " in " + message);
}
Object value = entry.getValue();
if (value instanceof Descriptors.EnumValueDescriptor) {
Descriptors.EnumValueDescriptor desc = (Descriptors.EnumValueDescriptor) value;
value = desc.getName();
}
theMap.put(name, value);
}
}
catch (InvalidProtocolBufferException e) {
log.warn(e, "Problem with protobuf something");
}
return theMap;
}
private Descriptors.Descriptor getDescriptor(String descriptorFileInClassPath)
{
try {
InputStream fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFileInClassPath);
DescriptorProtos.FileDescriptorSet set = DescriptorProtos.FileDescriptorSet.parseFrom(fin);
Descriptors.FileDescriptor file = Descriptors.FileDescriptor.buildFrom(
set.getFile(0), new Descriptors.FileDescriptor[]
{}
);
return file.getMessageTypes().get(0);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -1,110 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import org.joda.time.DateTime;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class ProtoBufInputRowParserTest
{
public static final String[] DIMENSIONS = new String[]{"eventType", "id", "someOtherId", "isValid"};
/*
* eventType = 1;
*
* required uint64 id = 2;
* required string timestamp = 3;
* optional uint32 someOtherId = 4;
* optional bool isValid = 5;
* optional string description = 6;
*
* optional float someFloatColumn = 7;
* optional uint32 someIntColumn = 8;
* optional uint64 someLongColumn = 9;
*/
@Test
public void testParse() throws Exception
{
//configure parser with desc file
ProtoBufInputRowParser parser = new ProtoBufInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(DIMENSIONS)), Arrays.<String>asList(), null)
),
"prototest.desc"
);
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 07, 12, 9, 30);
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);
InputRow row = parser.parse(ByteBuffer.wrap(out.toByteArray()));
System.out.println(row);
assertEquals(Arrays.asList(DIMENSIONS), row.getDimensions());
assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");
assertDimensionEquals(row, "eventType", ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE.name());
assertEquals(47.11F, row.getFloatMetric("someFloatColumn"), 0.0);
assertEquals(815.0F, row.getFloatMetric("someIntColumn"), 0.0);
assertEquals(816.0F, row.getFloatMetric("someLongColumn"), 0.0);
}
private void assertDimensionEquals(InputRow row, String dimension, Object expected)
{
List<String> values = row.getDimension(dimension);
assertEquals(1, values.size());
assertEquals(expected, values.get(0));
}
}

View File

@ -1,31 +0,0 @@
package prototest;
option java_package = "io.druid.indexer.data";
option java_outer_classname = "ProtoTestEventWrapper";
message ProtoTestEvent {
enum EventCategory {
CATEGORY_ZERO = 0;
CATEGORY_ONE = 1;
CATEGORY_TWO = 2;
}
required EventCategory eventType = 1;
required uint64 id = 2;
required string timestamp = 3;
optional uint32 someOtherId = 4;
optional bool isValid = 5;
optional string description = 6;
optional float someFloatColumn = 7;
optional uint32 someIntColumn = 8;
optional uint64 someLongColumn = 9;
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.data.input.ProtoBufInputRowParser;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.IrcInputRowParser;
@ -45,7 +44,6 @@ public class ParsersModule implements DruidModule
return Arrays.<Module>asList(
new SimpleModule("ParsersModule")
.registerSubtypes(
new NamedType(ProtoBufInputRowParser.class, "protobuf"),
new NamedType(IrcInputRowParser.class, "irc")
)
);