2018-12-13 14:47:20 -05:00
---
2019-08-21 00:48:59 -04:00
id: tutorial-kafka
2019-04-19 18:52:26 -04:00
title: "Tutorial: Load streaming data from Apache Kafka"
2019-08-21 00:48:59 -04:00
sidebar_label: "Load from Apache Kafka"
2018-12-13 14:47:20 -05:00
---
2018-11-13 12:38:37 -05:00
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
2016-01-06 00:27:52 -05:00
## Getting started
2019-04-19 18:52:26 -04:00
This tutorial demonstrates how to load data into Apache Druid (incubating) from a Kafka stream, using Druid's Kafka indexing service.
2016-01-06 00:27:52 -05:00
2019-08-21 00:48:59 -04:00
For this tutorial, we'll assume you've already downloaded Druid as described in
2019-06-11 11:50:52 -04:00
the [quickstart ](index.html ) using the `micro-quickstart` single-machine configuration and have it
running on your local machine. You don't need to have loaded any data yet.
2016-01-06 00:27:52 -05:00
2018-08-09 16:37:52 -04:00
## Download and start Kafka
2016-01-06 00:27:52 -05:00
2016-02-04 14:53:09 -05:00
[Apache Kafka ](http://kafka.apache.org/ ) is a high throughput message bus that works well with
2019-05-17 02:10:45 -04:00
Druid. For this tutorial, we will use Kafka 2.1.0. To download Kafka, issue the following
2016-01-06 00:27:52 -05:00
commands in your terminal:
```bash
2019-04-17 17:56:29 -04:00
curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0
2016-01-06 00:27:52 -05:00
```
Start a Kafka broker by running the following command in a new terminal:
```bash
./bin/kafka-server-start.sh config/server.properties
```
2018-08-09 16:37:52 -04:00
Run this command to create a Kafka topic called *wikipedia* , to which we'll send data:
2016-01-06 00:27:52 -05:00
```bash
2018-08-09 16:37:52 -04:00
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
2016-01-06 00:27:52 -05:00
```
2019-06-17 21:00:54 -04:00
## Start Druid Kafka ingestion
We will use Druid's Kafka indexing service to ingest messages from our newly created *wikipedia* topic.
### Submit a supervisor via the console
In the console, click `Submit supervisor` to open the submit supervisor dialog.
2019-08-21 00:48:59 -04:00
![Submit supervisor ](../assets/tutorial-kafka-01.png "Submit supervisor" )
2019-06-17 21:00:54 -04:00
Paste in this spec and click `Submit` .
```json
{
"type": "kafka",
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
}
}
},
"metricsSpec" : [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"ioConfig": {
"topic": "wikipedia",
"replicas": 2,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
}
}
```
This will start the supervisor that will in turn spawn some tasks that will start listening for incoming data.
2019-08-21 00:48:59 -04:00
![Running supervisor ](../assets/tutorial-kafka-02.png "Running supervisor" )
2016-01-06 00:27:52 -05:00
2019-06-17 21:00:54 -04:00
### Submit a supervisor directly
To start the service directly, we will need to submit a supervisor spec to the Druid overlord by running the following from the Druid package root:
2016-01-06 00:27:52 -05:00
```bash
2019-06-11 11:50:52 -04:00
curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia -kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor
2016-01-06 00:27:52 -05:00
```
2019-06-11 11:50:52 -04:00
2019-04-17 17:56:29 -04:00
If the supervisor was successfully created, you will get a response containing the ID of the supervisor; in our case we should see `{"id":"wikipedia"}` .
2016-01-06 00:27:52 -05:00
2018-08-09 16:37:52 -04:00
For more details about what's going on here, check out the
2019-08-21 00:48:59 -04:00
[Druid Kafka indexing service documentation ](../development/extensions-core/kafka-ingestion.md ).
2016-01-06 00:27:52 -05:00
2019-02-27 22:50:31 -05:00
You can view the current supervisors and tasks in the Druid Console: [http://localhost:8888/unified-console.html#tasks ](http://localhost:8888/unified-console.html#tasks ).
2019-06-17 21:00:54 -04:00
2018-08-09 16:37:52 -04:00
## Load data
2016-01-06 00:27:52 -05:00
2019-06-17 21:00:54 -04:00
Let's launch a producer for our topic and send some data!
2016-01-06 00:27:52 -05:00
2018-08-09 16:37:52 -04:00
In your Druid directory, run the following command:
2016-01-06 00:27:52 -05:00
2018-08-13 14:11:32 -04:00
```bash
2018-11-02 00:47:29 -04:00
cd quickstart/tutorial
2018-09-05 00:47:12 -04:00
gunzip -k wikiticker-2015-09-12-sampled.json.gz
2016-01-06 00:27:52 -05:00
```
2018-08-09 16:37:52 -04:00
In your Kafka directory, run the following command, where {PATH_TO_DRUID} is replaced by the path to the Druid directory:
2016-01-06 00:27:52 -05:00
```bash
2018-08-09 16:37:52 -04:00
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
2018-11-02 00:47:29 -04:00
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
2016-01-06 00:27:52 -05:00
```
2018-08-09 16:37:52 -04:00
The previous command posted sample events to the *wikipedia* Kafka topic which were then ingested into Druid by the Kafka indexing service. You're now ready to run some queries!
2016-01-06 00:27:52 -05:00
2018-08-09 16:37:52 -04:00
## Querying your data
2016-01-06 00:27:52 -05:00
2018-08-09 16:37:52 -04:00
After data is sent to the Kafka stream, it is immediately available for querying.
2016-01-06 00:27:52 -05:00
2019-08-21 00:48:59 -04:00
Please follow the [query tutorial ](../tutorials/tutorial-query.md ) to run some example queries on the newly loaded data.
2016-01-06 00:27:52 -05:00
2018-08-09 16:37:52 -04:00
## Cleanup
2016-01-06 00:27:52 -05:00
2018-08-09 16:37:52 -04:00
If you wish to go through any of the other ingestion tutorials, you will need to shut down the cluster and reset the cluster state by removing the contents of the `var` directory under the druid package, as the other tutorials will write to the same "wikipedia" datasource.
2016-01-06 00:27:52 -05:00
## Further reading
2019-08-21 00:48:59 -04:00
For more information on loading data from Kafka streams, please see the [Druid Kafka indexing service documentation ](../development/extensions-core/kafka-ingestion.md ).