5.3 KiB
layout |
---|
doc_page |
Protobuf
This extension enables Druid to ingest and understand the Protobuf data format. Make sure to include druid-protobuf-extensions
as an extension.
Protobuf Parser
Field | Type | Description | Required |
---|---|---|---|
type | String | This should say protobuf . |
no |
descriptor | String | Protobuf descriptor file name in the classpath or URL. | yes |
protoMessageType | String | Protobuf message type in the descriptor. Both short name and fully qualified name are accepted. The parser uses the first message type found in the descriptor if not specified. | no |
parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. The format must be json. See JSON ParseSpec for more configuration options. Please note timeAndDims parseSpec is no longer supported. | yes |
Example: Load Protobuf messages from Kafka
This example demonstrates how to load Protobuf messages from Kafka. Please read the Load from Kafka tutorial first. This example will use the same "metrics" dataset.
Files used in this example are found at ./examples/quickstart/protobuf
in your Druid directory.
- We will use Kafka Indexing Service instead of Tranquility.
- Kafka broker host is
localhost:9092
. - Kafka topic is
metrics_pb
instead ofmetrics
. - datasource name is
metrics-kafka-pb
instead ofmetrics-kafka
to avoid the confusion.
Here is the metrics JSON example.
{
"unit": "milliseconds",
"http_method": "GET",
"value": 44,
"timestamp": "2017-04-06T02:36:22Z",
"http_code": "200",
"page": "/",
"metricType": "request/latency",
"server": "www1.example.com"
}
Proto file
The proto file should look like this. Save it as metrics.proto.
syntax = "proto3";
message Metrics {
string unit = 1;
string http_method = 2;
int32 value = 3;
string timestamp = 4;
string http_code = 5;
string page = 6;
string metricType = 7;
string server = 8;
}
Descriptor file
Using the protoc
Protobuf compiler to generate the descriptor file. Save the metrics.desc file either in the classpath or reachable by URL. In this example the descriptor file was saved at /tmp/metrics.desc.
protoc -o /tmp/metrics.desc metrics.proto
Supervisor spec JSON
Below is the complete Supervisor spec JSON to be submitted to the Overlord. Please make sure these keys are properly configured for successful ingestion.
descriptor
for the descriptor file URL.protoMessageType
from the proto definition.- parseSpec
format
must bejson
. topic
to subscribe. The topic is "metrics_pb" instead of "metrics".bootstrap.server
is the kafka broker host.
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka2",
"parser": {
"type": "protobuf",
"descriptor": "file:///tmp/metrics.desc",
"protoMessageType": "Metrics",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"unit",
"http_method",
"http_code",
"page",
"metricType",
"server"
],
"dimensionExclusions": [
"timestamp",
"value"
]
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "metrics_pb",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}
Kafka Producer
Here is the sample script that publishes the metrics to Kafka in Protobuf format.
- Run
protoc
again with the Python binding option. This command generatesmetrics_pb2.py
file.
protoc -o metrics.desc metrics.proto --python_out=.
- Create Kafka producer script.
This script requires protobuf
and kafka-python
modules.
#!/usr/bin/env python
import sys
import json
from kafka import KafkaProducer
from metrics_pb2 import Metrics
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic = 'metrics_pb'
metrics = Metrics()
for row in iter(sys.stdin):
d = json.loads(row)
for k, v in d.items():
setattr(metrics, k, v)
pb = metrics.SerializeToString()
producer.send(topic, pb)
- run producer
./bin/generate-example-metrics | ./pb_publisher.py
- test
kafka-console-consumer --zookeeper localhost --topic metrics_pb
It should print messages like this
millisecondsGETR"2017-04-06T03:23:56Z*2002/list:request/latencyBwww1.example.com