Update Kafka ingestion tutorial (#13261)

* Update Kafka ingestion tutorial

* Update tutorial-kafka.md

Updated location of sample data file

* Added sample data file

* Update tutorial-kafka.md

* Add sample data file

* Update tutorial-kafka.md

Updated sample file location in curl commands

* Update and reuploading sample data files

* Updated spelling file

* Delete .spelling

* Added spelling file

* Update docs/tutorials/tutorial-kafka.md

Co-authored-by: Victoria Lim <vtlim@users.noreply.github.com>

* Update docs/tutorials/tutorial-kafka.md

Co-authored-by: Victoria Lim <vtlim@users.noreply.github.com>

* Updated after review

* Update tutorial-kafka.md

* Updated

* Update tutorial-kafka.md

* Update tutorial-kafka.md

* Update tutorial-kafka.md

* Updated sample data file and command

* Add files via upload

* Delete kttm-nested-data.json.tgz

* Delete kttm-nested-data.json.tgz

* Add files via upload

* Update tutorial-kafka.md

Co-authored-by: Victoria Lim <vtlim@users.noreply.github.com>
This commit is contained in:
Jill Osborne 2022-11-11 22:47:54 +00:00 committed by GitHub
parent 3e172d44ab
commit b0db2a87d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 274 additions and 194 deletions

View File

@ -0,0 +1,66 @@
{
"type": "kafka",
"spec": {
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"topic": "kttm",
"inputFormat": {
"type": "json"
},
"useEarliestOffset": true
},
"tuningConfig": {
"type": "kafka"
},
"dataSchema": {
"dataSource": "kttm-kafka-supervisor-api",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"session",
"number",
"client_ip",
"language",
"adblock_list",
"app_version",
"path",
"loaded_image",
"referrer",
"referrer_host",
"server_ip",
"screen",
"window",
{
"type": "long",
"name": "session_length"
},
"timezone",
"timezone_offset",
{
"type": "json",
"name": "event"
},
{
"type": "json",
"name": "agent"
},
{
"type": "json",
"name": "geo_ip"
}
]
},
"granularitySpec": {
"queryGranularity": "none",
"rollup": false,
"segmentGranularity": "day"
}
}
}
}

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 115 KiB

After

Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 599 KiB

After

Width:  |  Height:  |  Size: 411 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 197 KiB

After

Width:  |  Height:  |  Size: 188 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 247 KiB

After

Width:  |  Height:  |  Size: 238 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 251 KiB

After

Width:  |  Height:  |  Size: 201 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 197 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 92 KiB

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 133 KiB

After

Width:  |  Height:  |  Size: 111 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 96 KiB

After

Width:  |  Height:  |  Size: 80 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 168 KiB

After

Width:  |  Height:  |  Size: 138 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 111 KiB

After

Width:  |  Height:  |  Size: 78 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 117 KiB

After

Width:  |  Height:  |  Size: 73 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 149 KiB

After

Width:  |  Height:  |  Size: 131 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 140 KiB

After

Width:  |  Height:  |  Size: 128 KiB

View File

@ -24,260 +24,274 @@ sidebar_label: "Load from Apache Kafka"
-->
## Getting started
This tutorial shows you how to load data into Apache Druid from a Kafka stream, using Druid's Kafka indexing service.
This tutorial demonstrates how to load data into Apache Druid from a Kafka stream, using Druid's Kafka indexing service.
The tutorial guides you through the steps to load sample nested clickstream data from the [Koalas to the Max](https://www.koalastothemax.com/) game into a Kafka topic, then ingest the data into Druid.
For this tutorial, we'll assume you've already downloaded Druid as described in
the [quickstart](index.md) using the `micro-quickstart` single-machine configuration and have it
running on your local machine. You don't need to have loaded any data yet.
## Prerequisites
Before you follow the steps in this tutorial, download Druid as described in the [quickstart](index.md) using the [micro-quickstart](../operations/single-server.md#micro-quickstart-4-cpu-16gib-ram) single-machine configuration and have it running on your local machine. You don't need to have loaded any data.
## Download and start Kafka
[Apache Kafka](http://kafka.apache.org/) is a high throughput message bus that works well with
Druid. For this tutorial, we will use Kafka 2.7.0. To download Kafka, issue the following
commands in your terminal:
[Apache Kafka](http://kafka.apache.org/) is a high-throughput message bus that works well with Druid. For this tutorial, use Kafka 2.7.0.
```bash
curl -O https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar -xzf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0
```
Start zookeeper first with the following command:
1. To download Kafka, run the following commands in your terminal:
```bash
./bin/zookeeper-server-start.sh config/zookeeper.properties
```
```bash
curl -O https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar -xzf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0
```
2. If you're already running Kafka on the machine you're using for this tutorial, delete or rename the `kafka-logs` directory in `/tmp`.
Start a Kafka broker by running the following command in a new terminal:
> Druid and Kafka both rely on [Apache ZooKeeper](https://zookeeper.apache.org/) to coordinate and manage services. Because Druid is already running, Kafka attaches to the Druid ZooKeeper instance when it starts up.<br>
In a production environment where you're running Druid and Kafka on different machines, [start the Kafka ZooKeeper](https://kafka.apache.org/quickstart) before you start the Kafka broker.
```bash
./bin/kafka-server-start.sh config/server.properties
```
3. In the Kafka root directory, run this command to start a Kafka broker:
Run this command to create a Kafka topic called *wikipedia*, to which we'll send data:
```bash
./bin/kafka-server-start.sh config/server.properties
```
```bash
./bin/kafka-topics.sh --create --topic wikipedia --bootstrap-server localhost:9092
```
4. In a new terminal window, navigate to the Kafka root directory and run the following command to create a Kafka topic called `kttm`:
```bash
./bin/kafka-topics.sh --create --topic kttm --bootstrap-server localhost:9092
```
Kafka returns a message when it successfully adds the topic: `Created topic kttm`.
## Load data into Kafka
Let's launch a producer for our topic and send some data!
In this section, you download sample data to the tutorial's directory and send the data to your Kafka topic.
In your Druid directory, run the following command:
1. In your Kafka root directory, create a directory for the sample data:
```bash
cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json
```
```bash
mkdir sample-data
```
In your Kafka directory, run the following command, where {PATH_TO_DRUID} is replaced by the path to the Druid directory:
2. Download the sample data to your new directory and extract it:
```bash
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
```
```bash
cd sample-data
curl -O https://druid.apache.org/docs/latest/assets/files/kttm-nested-data.json.tgz
tar -xzf kttm-nested-data.json.tgz
```
The previous command posted sample events to the *wikipedia* Kafka topic.
Now we will use Druid's Kafka indexing service to ingest messages from our newly created topic.
3. In your Kafka root directory, run the following commands to post sample events to the `kttm` Kafka topic:
## Loading data with the data loader
```bash
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kttm < ./sample-data/kttm-nested-data.json
```
Navigate to [localhost:8888](http://localhost:8888) and click `Load data` in the console header.
## Load data into Druid
![Data loader init](../assets/tutorial-kafka-data-loader-01.png "Data loader init")
Now that you have data in your Kafka topic, you can use Druid's Kafka indexing service to ingest the data into Druid.
Select `Apache Kafka` and click `Connect data`.
To do this, you can use the Druid console data loader or you can submit a supervisor spec. Follow the steps below to try each method.
![Data loader sample](../assets/tutorial-kafka-data-loader-02.png "Data loader sample")
### Load data with the console data loader
Enter `localhost:9092` as the bootstrap server and `wikipedia` as the topic.
The Druid console data loader presents you with several screens to configure each section of the supervisor spec, then creates an ingestion task to ingest the Kafka data.
Click `Apply` and make sure that the data you are seeing is correct.
To use the console data loader:
Once the data is located, you can click "Next: Parse data" to go to the next step.
1. Navigate to [localhost:8888](http://localhost:8888) and click **Load data > Streaming**.
![Data loader parse data](../assets/tutorial-kafka-data-loader-03.png "Data loader parse data")
![Data loader init](../assets/tutorial-kafka-data-loader-01.png "Data loader init")
The data loader will try to automatically determine the correct parser for the data.
In this case it will successfully determine `json`.
Feel free to play around with different parser options to get a preview of how Druid will parse your data.
2. Click **Apache Kafka** and then **Connect data**.
With the `json` parser selected, click `Next: Parse time` to get to the step centered around determining your primary timestamp column.
3. Enter `localhost:9092` as the bootstrap server and `kttm` as the topic, then click **Apply** and make sure you see data similar to the following:
![Data loader parse time](../assets/tutorial-kafka-data-loader-04.png "Data loader parse time")
![Data loader sample](../assets/tutorial-kafka-data-loader-02.png "Data loader sample")
Druid's architecture requires a primary timestamp column (internally stored in a column called `__time`).
If you do not have a timestamp in your data, select `Constant value`.
In our example, the data loader will determine that the `time` column in our raw data is the only candidate that can be used as the primary time column.
4. Click **Next: Parse data**.
Click `Next: ...` twice to go past the `Transform` and `Filter` steps.
You do not need to enter anything in these steps as applying ingestion time transforms and filters are out of scope for this tutorial.
![Data loader parse data](../assets/tutorial-kafka-data-loader-03.png "Data loader parse data")
![Data loader schema](../assets/tutorial-kafka-data-loader-05.png "Data loader schema")
The data loader automatically tries to determine the correct parser for the data. For the sample data, it selects input format `json`. You can play around with the different options to get a preview of how Druid parses your data.
In the `Configure schema` step, you can configure which [dimensions](../ingestion/data-model.md#dimensions) and [metrics](../ingestion/data-model.md#metrics) will be ingested into Druid.
This is exactly what the data will appear like in Druid once it is ingested.
Since our dataset is very small, go ahead and turn off [`Rollup`](../ingestion/rollup.md) by clicking on the switch and confirming the change.
5. With the `json` input format selected, click **Next: Parse time**. You may need to click **Apply** first.
Once you are satisfied with the schema, click `Next` to go to the `Partition` step where you can fine tune how the data will be partitioned into segments.
![Data loader parse time](../assets/tutorial-kafka-data-loader-04.png "Data loader parse time")
![Data loader partition](../assets/tutorial-kafka-data-loader-06.png "Data loader partition")
Druid's architecture requires that you specify a primary timestamp column. Druid stores the timestamp in the `__time` column in your Druid datasource.
In a production environment, if you don't have a timestamp in your data, you can select **Parse timestamp from:** `None` to use a placeholder value.
Here, you can adjust how the data will be split up into segments in Druid.
Since this is a small dataset, there are no adjustments that need to be made in this step.
For the sample data, the data loader selects the `timestamp` column in the raw data as the primary time column.
Click `Next: Tune` to go to the tuning step.
6. Click **Next: ...** three times to go past the **Transform** and **Filter** steps to **Configure schema**. You don't need to enter anything in these two steps because applying transforms and filters is out of scope for this tutorial.
![Data loader tune](../assets/tutorial-kafka-data-loader-07.png "Data loader tune")
![Data loader schema](../assets/tutorial-kafka-data-loader-05.png "Data loader schema")
In the `Tune` step is it *very important* to set `Use earliest offset` to `True` since we want to consume the data from the start of the stream.
There are no other changes that need to be made here, so click `Next: Publish` to go to the `Publish` step.
7. In the **Configure schema** step, you can select data types for the columns and configure [dimensions](../ingestion/data-model.md#dimensions) and [metrics](../ingestion/data-model.md#metrics) to ingest into Druid. The console does most of this for you, but you need to create JSON-type dimensions for the three nested columns in the data.
![Data loader publish](../assets/tutorial-kafka-data-loader-08.png "Data loader publish")
Click **Add dimension** and enter the following information. You can only add one dimension at a time.
- Name: `event`, Type: `json`
- Name: `agent`, Type: `json`
- Name: `geo_ip`, Type: `json`
Let's name this datasource `wikipedia-kafka`.
After you create the dimensions, you can scroll to the right in the preview window to see the nested columns:
Finally, click `Next` to review your spec.
![Nested columns schema](../assets/tutorial-kafka-data-loader-05b.png "Nested columns schema")
![Data loader spec](../assets/tutorial-kafka-data-loader-09.png "Data loader spec")
8. Click **Next: Partition** to configure how Druid partitions the data into segments.
This is the spec you have constructed.
Feel free to go back and make changes in previous steps to see how changes will update the spec.
Similarly, you can also edit the spec directly and see it reflected in the previous steps.
![Data loader partition](../assets/tutorial-kafka-data-loader-06.png "Data loader partition")
Once you are satisfied with the spec, click `Submit` and an ingestion task will be created.
9. Select `day` as the **Segment granularity**. Since this is a small dataset, you don't need to make any further adjustments. Click **Next: Tune** to fine tune how Druid ingests data.
![Tasks view](../assets/tutorial-kafka-data-loader-10.png "Tasks view")
![Data loader tune](../assets/tutorial-kafka-data-loader-07.png "Data loader tune")
You will be taken to the task view with the focus on the newly created supervisor.
10. In **Input tuning**, set **Use earliest offset** to `True`&mdash;this is very important because you want to consume the data from the start of the stream. There are no other changes to make here, so click **Next: Publish**.
The task view is set to auto refresh, wait until your supervisor launches a task.
![Data loader publish](../assets/tutorial-kafka-data-loader-08.png "Data loader publish")
When a tasks starts running, it will also start serving the data that it is ingesting.
11. Name the datasource `kttm-kafka` and click **Next: Edit spec** to review your spec.
Navigate to the `Datasources` view from the header.
![Data loader spec](../assets/tutorial-kafka-data-loader-09.png "Data loader spec")
![Datasource view](../assets/tutorial-kafka-data-loader-11.png "Datasource view")
The console presents the spec you've constructed. You can click the buttons above the spec to make changes in previous steps and see how the changes update the spec. You can also edit the spec directly and see it reflected in the previous steps.
When the `wikipedia-kafka` datasource appears here it can be queried.
12. Click **Submit** to create an ingestion task.
*Note:* if the datasource does not appear after a minute you might have not set the supervisor to read from the start of the stream (in the `Tune` step).
Druid displays the task view with the focus on the newly created supervisor.
At this point, you can go to the `Query` view to run SQL queries against the datasource.
The task view auto-refreshes, so wait until the supervisor launches a task. The status changes from **Pending** to **Running** as Druid starts to ingest data.
Since this is a small dataset, you can simply run a `SELECT * FROM "wikipedia-kafka"` query to see your results.
![Tasks view](../assets/tutorial-kafka-data-loader-10.png "Tasks view")
![Query view](../assets/tutorial-kafka-data-loader-12.png "Query view")
13. Navigate to the **Datasources** view from the header.
Check out the [query tutorial](../tutorials/tutorial-query.md) to run some example queries on the newly loaded data.
![Datasource view](../assets/tutorial-kafka-data-loader-11.png "Datasource view")
When the `kttm-kafka` datasource appears here, you can query it. See [Query your data](#query-your-data) for details.
### Submit a supervisor via the console
> If the datasource doesn't appear after a minute you might not have set the supervisor to read data from the start of the stream&mdash;the `Use earliest offset` setting in the **Tune** step. Go to the **Ingestion** page and terminate the supervisor using the **Actions(...)** menu. [Load the sample data](#load-data-with-the-console-data-loader) again and apply the correct setting when you get to the **Tune** step.
In the console, click `Submit supervisor` to open the submit supervisor dialog.
### Submit a supervisor spec
![Submit supervisor](../assets/tutorial-kafka-submit-supervisor-01.png "Submit supervisor")
As an alternative to using the data loader, you can submit a supervisor spec to Druid. You can do this in the console or using the Druid API.
Paste in this spec and click `Submit`.
#### Use the console
```json
{
To submit a supervisor spec using the Druid console:
1. Click **Ingestion** in the console, then click the ellipses next to the refresh button and select **Submit JSON supervisor**.
2. Paste this spec into the JSON window and click **Submit**.
```json
{
"type": "kafka",
"spec" : {
"dataSchema": {
"dataSource": "wikipedia",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"metricsSpec" : [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"spec": {
"ioConfig": {
"topic": "wikipedia",
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"topic": "kttm",
"inputFormat": {
"type": "json"
},
"replicas": 2,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
"useEarliestOffset": true
},
"tuningConfig": {
"type": "kafka"
},
"dataSchema": {
"dataSource": "kttm-kafka-supervisor-console",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"session",
"number",
"client_ip",
"language",
"adblock_list",
"app_version",
"path",
"loaded_image",
"referrer",
"referrer_host",
"server_ip",
"screen",
"window",
{
"type": "long",
"name": "session_length"
},
"timezone",
"timezone_offset",
{
"type": "json",
"name": "event"
},
{
"type": "json",
"name": "agent"
},
{
"type": "json",
"name": "geo_ip"
}
]
},
"granularitySpec": {
"queryGranularity": "none",
"rollup": false,
"segmentGranularity": "day"
}
}
}
}
```
This will start the supervisor that will in turn spawn some tasks that will start listening for incoming data.
### Submit a supervisor directly
To start the service directly, we will need to submit a supervisor spec to the Druid overlord by running the following from the Druid package root:
```bash
curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor
```
}
```
If the supervisor was successfully created, you will get a response containing the ID of the supervisor; in our case we should see `{"id":"wikipedia"}`.
This starts the supervisor&mdash;the supervisor spawns tasks that start listening for incoming data.
For more details about what's going on here, check out the
[Druid Kafka indexing service documentation](../development/extensions-core/kafka-ingestion.md).
3. Click **Tasks** on the console home page to monitor the status of the job. This spec writes the data in the `kttm` topic to a datasource named `kttm-kafka-supervisor-console`.
You can view the current supervisors and tasks in the web console: [http://localhost:8888/unified-console.md#tasks](http://localhost:8888/unified-console.html#tasks).
#### Use the API
## Querying your data
You can also use the Druid API to submit a supervisor spec.
After data is sent to the Kafka stream, it is immediately available for querying.
1. Run the following command to download the sample spec:
Please follow the [query tutorial](../tutorials/tutorial-query.md) to run some example queries on the newly loaded data.
```bash
curl -O https://druid.apache.org/docs/latest/assets/files/kttm-kafka-supervisor.json
```
## Cleanup
2. Run the following command to submit the spec in the `kttm-kafka-supervisor.json` file:
To go through any of the other ingestion tutorials, you will need to shut down the cluster and reset the cluster state by removing the contents of the `var` directory in the Druid home, as the other tutorials will write to the same "wikipedia" datasource.
```bash
curl -XPOST -H 'Content-Type: application/json' kttm-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor
```
You should additionally clear out any Kafka state. Do so by shutting down the Kafka broker with CTRL-C before stopping ZooKeeper and the Druid services, and then deleting the Kafka log directory at `/tmp/kafka-logs`:
After Druid successfully creates the supervisor, you get a response containing the supervisor ID: `{"id":"kttm-kafka-supervisor-api"}`.
```bash
rm -rf /tmp/kafka-logs
```
3. Click **Tasks** on the console home page to monitor the status of the job. This spec writes the data in the `kttm` topic to a datasource named `kttm-kafka-supervisor-api`.
## Query your data
After Druid sends data to the Kafka stream, it is immediately available for querying. Click **Query** in the Druid console to run SQL queries against the datasource.
Since this tutorial ingests a small dataset, you can run the query `SELECT * FROM "kttm-kafka"` to return all of the data in the dataset you created.
![Query view](../assets/tutorial-kafka-data-loader-12.png "Query view")
Check out the [Querying data tutorial](../tutorials/tutorial-query.md) to run some example queries on the newly loaded data.
## Further reading
For more information on loading data from Kafka streams, please see the [Druid Kafka indexing service documentation](../development/extensions-core/kafka-ingestion.md).
For more information, see the following topics:
- [Apache Kafka ingestion](../development/extensions-core/kafka-ingestion.md) for more information on loading data from Kafka streams.
- [Apache Kafka supervisor reference](../development/extensions-core/kafka-supervisor-reference.md) for Kafka supervisor configuration information.
- [Apache Kafka supervisor operations reference](../development/extensions-core/kafka-supervisor-operations.md) for information on running and maintaining Kafka supervisors for Druid.