# Ingest and query data from Apache Kafka

<!--
  ~ Licensed to the Apache Software Foundation (ASF) under one
  ~ or more contributor license agreements.  See the NOTICE file
  ~ distributed with this work for additional information
  ~ regarding copyright ownership.  The ASF licenses this file
  ~ to you under the Apache License, Version 2.0 (the
  ~ "License"); you may not use this file except in compliance
  ~ with the License.  You may obtain a copy of the License at
  ~
  ~   http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~ Unless required by applicable law or agreed to in writing,
  ~ software distributed under the License is distributed on an
  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  ~ KIND, either express or implied.  See the License for the
  ~ specific language governing permissions and limitations
  ~ under the License.
  -->

This tutorial introduces you to streaming ingestion in Apache Druid using the Apache Kafka event streaming platform.
Follow along to learn how to create and load data into a Kafka topic, start ingesting data from the topic into Druid, and query results over time. This tutorial assumes you have a basic understanding of Druid ingestion, querying, and API requests.

## Table of contents

* [Prerequisites](#Prerequisites)
* [Load Druid API client](#Load-Druid-API-client)
* [Create Kafka topic](#Create-Kafka-topic)
* [Load data into Kafka topic](#Load-data-into-Kafka-topic)
* [Start Druid ingestion](#Start-Druid-ingestion)
* [Query Druid datasource and visualize query results](#Query-Druid-datasource-and-visualize-query-results)
* [Learn more](#Learn-more)

## Prerequisites

This tutorial works with Druid 25.0.0 or later.

Launch this tutorial and all prerequisites using the `all-services` profile of the Docker Compose file for Jupyter-based Druid tutorials. For more information, see [Docker for Jupyter Notebook tutorials](https://druid.apache.org/docs/latest/tutorials/tutorial-jupyter-docker.html).

If you do not use the Docker Compose environment, you need the following:
* A running Druid instance.
   * Update the `druid_host` variable to point to your Router endpoint. For example, `druid_host = "http://localhost:8888"`.
   * Update the `rest_client` variable to point to your Coordinator endpoint. For example, `"http://localhost:8081"`.
* A running Kafka cluster.
   * Update the Kafka bootstrap servers to point to your servers. For example, `bootstrap_servers=["localhost:9092"]`.
* A running [Data Generator server](https://github.com/implydata/druid-datagenerator) accessible to the cluster.
   * Update the data generator client. For example `datagen = druidapi.rest.DruidRestClient("http://localhost:9999")`.
* The following Python packages:
   * `druidapi`, a Python client for Apache Druid
   * `kafka`, a Python client for Apache Kafka
   * `pandas`, `matplotlib`, and `seaborn` for data visualization


## Load Druid API client

To start the tutorial, run the following cell. It imports the required Python packages and defines a variable for the Druid client, and another for the SQL client used to run SQL commands.

In [None]:
import druidapi
import os
import time

if 'DRUID_HOST' not in os.environ.keys():
    druid_host=f"http://localhost:8888"
else:
    druid_host=f"http://{os.environ['DRUID_HOST']}:8888"
    
print(f"Opening a connection to {druid_host}.")
druid = druidapi.jupyter_client(druid_host)

In [None]:
# Use kafka_host variable when connecting to kafka 
if 'KAFKA_HOST' not in os.environ.keys():
   kafka_host=f"http://localhost:9092"
else:
    kafka_host=f"{os.environ['KAFKA_HOST']}:9092"

# this is the kafka topic we will be working with:
topic_name = "social_media"

In [None]:
import json

# shortcuts for display and sql api's
display = druid.display
sql_client = druid.sql

# client for Data Generator API
datagen = druidapi.rest.DruidRestClient("http://datagen:9999")

# client for Druid API
rest_client = druid.rest

## Publish generated data directly to Kafka topic

In this section, you use the data generator included as part of the Docker application to generate a stream of messages. The data generator creates and send messages to a Kafka topic named `social_media`. To learn more about the Druid Data Generator, see the [project](https://github.com/implydata/druid-datagenerator) and the [data generation notebook](../01-introduction/02-datagen-intro.ipynb).

### Generate data
Run the following cells to load sample data into the `social_media` Kafka topic. The data generator sends events until it reaches 50,000 messages.

In [None]:
headers = {
  'Content-Type': 'application/json'
}

datagen_request = {
    "name": "social_stream",
    "target": { "type": "kafka", "endpoint": kafka_host, "topic": topic_name  },
    "config_file": "social/social_posts.json", 
    "total_events":50000,
    "concurrency":100
}
datagen.post("/start", json.dumps(datagen_request), headers=headers)


Check the status of the job with the following cell:

In [None]:
time.sleep(1) # avoid race between start of the job and its status being available
response = datagen.get('/status/social_stream')
response.json()

## Start Druid ingestion

Now that you have a new Kafka topic and data being streamed into the topic, you ingest the data into Druid by submitting a Kafka ingestion spec.
The ingestion spec describes the following:
* where to source the data to ingest (in `spec > ioConfig`),
* the datasource to ingest data into (in `spec > dataSchema > dataSource`), and
* what the data looks like (in `spec > dataSchema > dimensionsSpec`).

Other properties control how Druid aggregates and stores data. For more information, see the Druid documenation:
* [Apache Kafka ingestion](https://druid.apache.org/docs/latest/development/extensions-core/kafka-ingestion.html)
* [Ingestion spec reference](https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html)

Run the following cells to define and view the Kafka ingestion spec.

In [None]:
kafka_ingestion_spec = {
    "type": "kafka",
    "spec": {
        "ioConfig": {
            "type": "kafka",
            "consumerProperties": {
                "bootstrap.servers": "kafka:9092"
            },
            "topic": "social_media",
            "inputFormat": {
                "type": "json"
            },
            "useEarliestOffset": True
        },
        "tuningConfig": {
            "type": "kafka"
        },
        "dataSchema": {
            "dataSource": "social_media",
            "timestampSpec": {
                "column": "time",
                "format": "iso"
            },
            "dimensionsSpec": {
                "dimensions": [
                    "username",
                    "post_title",
                    {
                        "type": "long",
                        "name": "views"
                    },
                    {
                        "type": "long",
                        "name": "upvotes"
                    },
                    {
                        "type": "long",
                        "name": "comments"
                    },
                    "edited"
                ]
            },
            "granularitySpec": {
                "queryGranularity": "none",
                "rollup": False,
                "segmentGranularity": "hour"
            }
        }
    }
}

Send the spec to Druid to start the streaming ingestion from Kafka:

In [None]:
headers = {
  'Content-Type': 'application/json'
}

supervisor = rest_client.post("/druid/indexer/v1/supervisor", json.dumps(kafka_ingestion_spec), headers=headers)
print(supervisor.status_code)

A `200` response indicates that the request was successful. You can view the running ingestion task and the new datasource in the web console's [ingestion view](http://localhost:8888/unified-console.html#ingestion).

The following cell pauses further execution until the ingestion has started and the datasource is available for querying:

In [None]:
druid.sql.wait_until_ready('social_media', verify_load_status=False)

## Query Druid datasource and visualize query results

You can now query the new datasource called `social_media`. In this section, you also visualize query results using the Matplotlib and Seaborn visualization libraries. Run the following cell import these packages.

In [None]:
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns

Run a simple query to view a subset of rows from the new datasource:

In [None]:
sql = '''
SELECT * FROM social_media LIMIT 5
'''
display.sql(sql)

In this social media scenario, each incoming event represents a post on social media, for which you collect the timestamp, username, and post metadata. You are interested in analyzing the total number of upvotes for all posts, compared between users. Preview this data with the following query:

In [None]:
sql = '''
SELECT
  COUNT(post_title) as num_posts,
  SUM(upvotes) as total_upvotes,
  username
FROM social_media
GROUP BY username
ORDER BY num_posts
'''

response = sql_client.sql_query(sql)
response.show()

Visualize the total number of upvotes per user using a line plot. You sort the results by username before plotting because the order of users may vary as new results arrive.

In [None]:
df = pd.DataFrame(response.json)
df = df.sort_values('username')

df.plot(x='username', y='total_upvotes', marker='o')
plt.xticks(rotation=45, ha='right')
plt.ylabel("Total number of upvotes")
plt.gca().get_legend().remove()
plt.show()

The total number of upvotes likely depends on the total number of posts created per user. To better assess the relative impact per user, you compare the total number of upvotes (line plot) with the total number of posts.

In [None]:
matplotlib.rc_file_defaults()
ax1 = sns.set_style(style=None, rc=None )

fig, ax1 = plt.subplots()
plt.xticks(rotation=45, ha='right')


sns.lineplot(
    data=df, x='username', y='total_upvotes',
    marker='o', ax=ax1, label="Sum of upvotes")
ax1.get_legend().remove()

ax2 = ax1.twinx()
sns.barplot(data=df, x='username', y='num_posts',
            order=df['username'], alpha=0.5, ax=ax2, log=True,
            color="orange", label="Number of posts")


# ask matplotlib for the plotted objects and their labels
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax2.legend(lines + lines2, labels + labels2, bbox_to_anchor=(1.55, 1))

You should see a correlation between total number of upvotes and total number of posts. In order to track user impact on a more equal footing, normalize the total number of upvotes relative to the total number of posts, and plot the result:

In [None]:
df['upvotes_normalized'] = df['total_upvotes']/df['num_posts']

df.plot(x='username', y='upvotes_normalized', marker='o', color='green')
plt.xticks(rotation=45, ha='right')
plt.ylabel("Number of upvotes (normalized)")
plt.gca().get_legend().remove()
plt.show()

You've been working with data taken at a single snapshot in time from when you ran the last query. Run the same query again, and store the output in `response2`, which you will compare with the previous results:

In [None]:
response2 = sql_client.sql_query(sql)
response2.show()

Normalizing the data also helps you evaluate trends over time more consistently on the same plot axes. Plot the normalized data again, this time alongside the results from the previous snapshot:

In [None]:
df2 = pd.DataFrame(response2.json)
df2 = df2.sort_values('username')
df2['upvotes_normalized'] = df2['total_upvotes']/df2['num_posts']

ax = df.plot(x='username', y='upvotes_normalized', marker='o', color='green', label="Time 1")
df2.plot(x='username', y='upvotes_normalized', marker='o', color='purple', ax=ax, label="Time 2")
plt.xticks(rotation=45, ha='right')
plt.ylabel("Number of upvotes (normalized)")
plt.show()

This plot shows how some users maintain relatively consistent social media impact between the two query snapshots, whereas other users grow or decline in their influence.

## Cleanup 
The following cells stop the data generation and ingestion jobs and removes the datasource from Druid.

In [None]:
print(f"Stop streaming generator: [{datagen.post('/stop/social_stream','',require_ok=False)}]")
print(f'Reset offsets for ingestion: [{druid.rest.post("/druid/indexer/v1/supervisor/social_media/reset","", require_ok=False)}]')
print(f'Stop streaming ingestion: [{druid.rest.post("/druid/indexer/v1/supervisor/social_media/terminate","", require_ok=False)}]')

Once the ingestion process ends and completes any final ingestion steps, remove the datasource with the following cell:

In [None]:
time.sleep(5) # wait for streaming ingestion tasks to end
print(f"Drop datasource: [{druid.datasources.drop('social_media')}]")

## Learn more

This tutorial showed you how to create a Kafka topic using a Python client for Kafka, send a simulated stream of data to Kafka using a data generator, and query and visualize results over time. For more information, see the following resources:

* [Apache Kafka ingestion](https://druid.apache.org/docs/latest/development/extensions-core/kafka-ingestion.html)
* [Querying data](https://druid.apache.org/docs/latest/tutorials/tutorial-query.html)
* [Tutorial: Run with Docker](https://druid.apache.org/docs/latest/tutorials/docker.html)