6.2 KiB
id | title | sidebar_label |
---|---|---|
tutorial-kafka | Tutorial: Load streaming data from Apache Kafka | Load from Apache Kafka |
Getting started
This tutorial demonstrates how to load data into Apache Druid (incubating) from a Kafka stream, using Druid's Kafka indexing service.
For this tutorial, we'll assume you've already downloaded Druid as described in
the quickstart using the micro-quickstart
single-machine configuration and have it
running on your local machine. You don't need to have loaded any data yet.
Download and start Kafka
Apache Kafka is a high throughput message bus that works well with Druid. For this tutorial, we will use Kafka 2.1.0. To download Kafka, issue the following commands in your terminal:
curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0
Start a Kafka broker by running the following command in a new terminal:
./bin/kafka-server-start.sh config/server.properties
Run this command to create a Kafka topic called wikipedia, to which we'll send data:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
Start Druid Kafka ingestion
We will use Druid's Kafka indexing service to ingest messages from our newly created wikipedia topic.
Submit a supervisor via the console
In the console, click Submit supervisor
to open the submit supervisor dialog.
Paste in this spec and click Submit
.
{
"type": "kafka",
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
}
}
},
"metricsSpec" : [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"ioConfig": {
"topic": "wikipedia",
"replicas": 2,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
}
}
This will start the supervisor that will in turn spawn some tasks that will start listening for incoming data.
Submit a supervisor directly
To start the service directly, we will need to submit a supervisor spec to the Druid overlord by running the following from the Druid package root:
curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor
If the supervisor was successfully created, you will get a response containing the ID of the supervisor; in our case we should see {"id":"wikipedia"}
.
For more details about what's going on here, check out the Druid Kafka indexing service documentation.
You can view the current supervisors and tasks in the Druid Console: http://localhost:8888/unified-console.html#tasks.
Load data
Let's launch a producer for our topic and send some data!
In your Druid directory, run the following command:
cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json
In your Kafka directory, run the following command, where {PATH_TO_DRUID} is replaced by the path to the Druid directory:
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
The previous command posted sample events to the wikipedia Kafka topic which were then ingested into Druid by the Kafka indexing service. You're now ready to run some queries!
Querying your data
After data is sent to the Kafka stream, it is immediately available for querying.
Please follow the query tutorial to run some example queries on the newly loaded data.
Cleanup
If you wish to go through any of the other ingestion tutorials, you will need to shut down the cluster and reset the cluster state by removing the contents of the var
directory under the druid package, as the other tutorials will write to the same "wikipedia" datasource.
Further reading
For more information on loading data from Kafka streams, please see the Druid Kafka indexing service documentation.