
193 lines
6.5 KiB
Raw Normal View History

2016-01-05 21:27:52 -08:00
layout: doc_page
# Tutorial: Load from Kafka
## Getting started
This tutorial shows you how to load data from Kafka into Druid.
2016-02-04 11:53:09 -08:00
For this tutorial, we'll assume you've already downloaded Druid and Tranquility as described in
the [single-machine quickstart](quickstart.html) and have it running on your local machine. You
2016-01-05 21:27:52 -08:00
don't need to have loaded any data yet.
2016-02-04 11:53:09 -08:00
<div class="note info">
2016-01-05 21:27:52 -08:00
This tutorial will show you how to load data from Kafka into Druid, but Druid additionally supports
2016-02-04 11:53:09 -08:00
a wide variety of batch and streaming loading methods. See the <a href="../ingestion/batch-ingestion.html">Loading files</a>
and <a href="../ingestion/stream-ingestion.html">Loading streams</a> pages for more information about other options,
2016-01-05 21:27:52 -08:00
including from Hadoop, HTTP, Storm, Samza, Spark Streaming, and your own JVM apps.
2016-02-04 11:53:09 -08:00
2016-01-05 21:27:52 -08:00
## Start Kafka
2016-02-04 11:53:09 -08:00
[Apache Kafka](http://kafka.apache.org/) is a high throughput message bus that works well with
Druid. For this tutorial, we will use Kafka To download Kafka, issue the following
2016-01-05 21:27:52 -08:00
commands in your terminal:
curl -O http://www.us.apache.org/dist/kafka/
tar -xzf kafka_2.11-
cd kafka_2.11-
Start a Kafka broker by running the following command in a new terminal:
./bin/kafka-server-start.sh config/server.properties
Run this command to create a Kafka topic called *metrics*, to which we'll send data:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic metrics
## Enable Druid Kafka ingestion
Druid includes configs for [Tranquility Kafka](../ingestion/stream-pull.html#kafka) to support loading data from Kafka.
2016-01-05 21:27:52 -08:00
To enable this in the quickstart-based configuration:
- Stop your Tranquility command (CTRL-C) and then start it up again.
## Send example data
Let's launch a console producer for our topic and send some data!
In your Druid directory, generate some metrics by running:
In your Kafka directory, run:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic metrics
2016-02-04 11:53:09 -08:00
The *kafka-console-producer* command is now awaiting input. Copy the generated example metrics,
paste them into the *kafka-console-producer* terminal, and press enter. If you like, you can also
2016-01-05 21:27:52 -08:00
paste more messages into the producer, or you can press CTRL-D to exit the console producer.
2016-02-04 11:53:09 -08:00
You can immediately query this data, or you can skip ahead to the
2016-01-05 21:27:52 -08:00
[Loading your own data](#loading-your-own-data) section if you'd like to load your own dataset.
## Querying your data
2016-02-04 11:53:09 -08:00
After sending data, you can immediately query it using any of the
2016-01-05 21:27:52 -08:00
[supported query methods](../querying/querying.html).
## Loading your own data
2016-02-04 11:53:09 -08:00
So far, you've loaded data into Druid from Kafka using an ingestion spec that we've included in the
distribution. Each ingestion spec is designed to work with a particular dataset. You load your own
2016-01-05 21:27:52 -08:00
data types into Imply by writing a custom ingestion spec.
2016-02-04 11:53:09 -08:00
You can write a custom ingestion spec by starting from the bundled configuration in
2016-01-05 21:27:52 -08:00
`conf-quickstart/tranquility/kafka.json` and modifying it for your own needs.
The most important questions are:
* What should the dataset be called? This is the "dataSource" field of the "dataSchema".
* Which field should be treated as a timestamp? This belongs in the "column" of the "timestampSpec".
* Which fields should be treated as dimensions? This belongs in the "dimensions" of the "dimensionsSpec".
* Which fields should be treated as measures? This belongs in the "metricsSpec".
Let's use a small JSON pageviews dataset in the topic *pageviews* as an example, with records like:
{"time": "2000-01-01T00:00:00Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}
First, create the topic:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pageviews
Next, edit `conf-quickstart/tranquility/kafka.json`:
* Let's call the dataset "pageviews-kafka".
* The timestamp is the "time" field.
* Good choices for dimensions are the string fields "url" and "user".
2016-02-04 11:53:09 -08:00
* Good choices for measures are a count of pageviews, and the sum of "latencyMs". Collecting that
2016-01-05 21:27:52 -08:00
sum when we load the data will allow us to compute an average at query time as well.
You can edit the existing `conf-quickstart/tranquility/kafka.json` file by altering these
1. Change the key `"metrics-kafka"` under `"dataSources"` to `"pageviews-kafka"`
2. Alter these sections under the new `"pageviews-kafka"` key:
"dataSource": "pageviews-kafka"
"timestampSpec": {
"format": "auto",
"column": "time"
"dimensionsSpec": {
"dimensions": ["url", "user"]
"metricsSpec": [
{"name": "views", "type": "count"},
{"name": "latencyMs", "type": "doubleSum", "fieldName": "latencyMs"}
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1",
"topicPattern" : "pageviews"
Next, start Druid Kafka ingestion:
bin/tranquility kafka -configFile ../druid-0.9.0-SNAPSHOT/conf-quickstart/tranquility/kafka.json
2016-02-04 11:53:09 -08:00
- If your Tranquility server or Kafka is already running, stop it (CTRL-C) and
2016-01-05 21:27:52 -08:00
start it up again.
Finally, send some data to the Kafka topic. Let's start with these messages:
{"time": "2000-01-01T00:00:00Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}
{"time": "2000-01-01T00:00:00Z", "url": "/", "user": "bob", "latencyMs": 11}
{"time": "2000-01-01T00:00:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}
2016-02-04 11:53:09 -08:00
Druid streaming ingestion requires relatively current messages (relative to a slack time controlled by the
[windowPeriod](../ingestion/stream-ingestion.html#segmentgranularity-and-windowperiod) value), so you should
replace `2000-01-01T00:00:00Z` in these messages with the current time in ISO8601 format. You can
2016-01-05 21:27:52 -08:00
get this by running:
python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
2016-02-04 11:53:09 -08:00
Update the timestamps in the JSON above, then copy and paste these messages into this console
2016-01-05 21:27:52 -08:00
producer and press enter:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pageviews
2016-02-04 11:53:09 -08:00
That's it, your data should now be in Druid. You can immediately query it using any of the
2016-01-05 21:27:52 -08:00
[supported query methods](../querying/querying.html).
## Further reading
To read more about loading streams, see our [streaming ingestion documentation](../ingestion/stream-ingestion.html).