Adding data generation pod to jupyter notebooks deployment (#14742)

Co-authored-by: Charles Smith <techdocsmith@gmail.com>
Co-authored-by: Victoria Lim <vtlim@users.noreply.github.com>
This commit is contained in:
Sergio Ferragut 2023-08-10 15:43:05 -07:00 committed by GitHub
parent 82d82dfbd6
commit 353f7bed7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 902 additions and 1357 deletions

View File

@ -37,7 +37,8 @@ RUN pip install requests \
pip install seaborn \
pip install bokeh \
pip install kafka-python \
pip install sortedcontainers
pip install sortedcontainers \
pip install tqdm
# Install druidapi client from apache/druid
# Local install requires sudo privileges
@ -46,12 +47,6 @@ ADD druidapi /home/jovyan/druidapi
WORKDIR /home/jovyan/druidapi
RUN pip install .
# WIP -- install DruidDataDriver as a package
# Import data generator and configuration file
# Change permissions to allow import (requires sudo privileges)
# The Jupyter notebooks themselves are mounted into the image's /home/jovyan/notebooks
# path when running this image.
RUN mkdir -p /home/jovyan/notebooks
@ -59,8 +54,3 @@ RUN mkdir -p /home/jovyan/notebooks
WORKDIR /home/jovyan/notebooks
USER jovyan
# Add location of the data generator to PYTHONPATH
ENV PYTHONPATH "${PYTHONPATH}:/home/jovyan/notebooks/02-ingestion"

View File

@ -27,6 +27,7 @@ volumes:
coordinator_var: {}
router_var: {}
druid_shared: {}
datagen_data: {}
services:
@ -175,3 +176,12 @@ services:
- "${JUPYTER_PORT:-8889}:8888"
volumes:
- ../notebooks:/home/jovyan/notebooks
datagen:
image: imply/datagen:latest
container_name: datagen
profiles: ["jupyter", "kafka-jupyter", "druid-jupyter", "all-services"]
ports:
- "${DATAGEN_PORT:-9999}:9999"
volumes:
- datagen_data:/files

View File

@ -27,6 +27,7 @@ volumes:
coordinator_var: {}
router_var: {}
druid_shared: {}
datagen_data: {}
services:
@ -173,3 +174,12 @@ services:
- "${JUPYTER_PORT:-8889}:8888"
volumes:
- ../notebooks:/home/jovyan/notebooks
datagen:
image: imply/datagen:latest
container_name: datagen
profiles: ["jupyter", "kafka-jupyter", "druid-jupyter", "all-services"]
ports:
- "${DATAGEN_PORT:-9999}:9999"
volumes:
- datagen_data:/files

View File

@ -39,8 +39,8 @@ druid_metadata_storage_connector_password=FoolishPassword
druid_coordinator_balancer_strategy=cachingCost
druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB
druid_indexer_runner_javaOptsArray=["-server", "-Xmx256m", "-Xms256m", "-XX:MaxDirectMemorySize=324m", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid_indexer_fork_property_druid_processing_buffer_sizeBytes=64MiB

View File

@ -14,6 +14,7 @@
# limitations under the License.
from druidapi import consts
import time
class DisplayClient:
'''
@ -144,3 +145,36 @@ class DisplayClient:
def tables(self, schema=consts.DRUID_SCHEMA):
self._druid.sql._tables_query(schema).show(display=self)
def run_task(self, query):
'''
Run an MSQ task while displaying progress in the cell output.
:param query: INSERT/REPLACE statement to run
:return: None
'''
from tqdm import tqdm
task = self._druid.sql.task(query)
with tqdm(total=100.0) as pbar:
previous_progress = 0.0
while True:
reports=task.reports_no_wait()
# check if progress metric is available and display it
if 'multiStageQuery' in reports.keys():
if 'payload' in reports['multiStageQuery'].keys():
if 'counters' in reports['multiStageQuery']['payload'].keys():
if ('0' in reports['multiStageQuery']['payload']['counters'].keys() ) and \
('0' in reports['multiStageQuery']['payload']['counters']['0'].keys()):
if 'progressDigest' in reports['multiStageQuery']['payload']['counters']['0']['0']['sortProgress'].keys():
current_progress = reports['multiStageQuery']['payload']['counters']['0']['0']['sortProgress']['progressDigest']*100.0
pbar.update( current_progress - previous_progress ) # update requires a relative value
previous_progress = current_progress
# present status if available
if 'status' in reports['multiStageQuery']['payload'].keys():
pbar.set_description(f"Loading data, status:[{reports['multiStageQuery']['payload']['status']['status']}]")
# stop when job is done
if reports['multiStageQuery']['payload']['status']['status'] in ['SUCCESS', 'FAILED']:
break;
else:
pbar.set_description('Initializing...')
time.sleep(1)

View File

@ -585,6 +585,9 @@ class QueryTaskResult:
self._reports = self._tasks().task_reports(self._id)
return self._reports
def reports_no_wait(self) -> dict:
return self._tasks().task_reports(self._id, require_ok=False)
@property
def results(self):
if not self._results:
@ -844,7 +847,7 @@ class QueryClient:
'''
return self._function_args_query(table_name).rows
def wait_until_ready(self, table_name):
def wait_until_ready(self, table_name, verify_load_status=True):
'''
Waits for a datasource to be loaded in the cluster, and to become available to SQL.
@ -852,8 +855,12 @@ class QueryClient:
----------
table_name str
The name of a datasource in the 'druid' schema.
verify_load_status
If true, checks whether all published segments are loaded before testing query.
If false, tries the test query before checking whether all published segments are loaded.
'''
self.druid_client.datasources.wait_until_ready(table_name)
if verify_load_status:
self.druid_client.datasources.wait_until_ready(table_name)
while True:
try:
self.sql('SELECT 1 FROM "{}" LIMIT 1'.format(table_name));

View File

@ -14,6 +14,7 @@
# limitations under the License.
from druidapi.consts import OVERLORD_BASE
import requests
REQ_TASKS = OVERLORD_BASE + '/tasks'
REQ_POST_TASK = OVERLORD_BASE + '/task'
@ -112,7 +113,7 @@ class TaskClient:
'''
return self.client.get_json(REQ_TASK_STATUS, args=[task_id])
def task_reports(self, task_id) -> dict:
def task_reports(self, task_id, require_ok = True) -> dict:
'''
Retrieves the completion report for a completed task.
@ -129,7 +130,19 @@ class TaskClient:
---------
`GET /druid/indexer/v1/task/{taskId}/reports`
'''
return self.client.get_json(REQ_TASK_REPORTS, args=[task_id])
if require_ok:
return self.client.get_json(REQ_TASK_REPORTS, args=[task_id])
else:
resp = self.client.get(REQ_TASK_REPORTS, args=[task_id], require_ok=require_ok)
if resp.status_code == requests.codes.ok:
try:
result = resp.json()
except Exception as ex:
result = {"message":"Payload could not be converted to json.", "payload":f"{resp.content}", "exception":f"{ex}"}
return result
else:
return {"message":f"Request return code:{resp.status_code}"}
def submit_task(self, payload):
'''

View File

@ -91,7 +91,8 @@
" basics related to the Druid REST API and several endpoints.\n",
"- [Introduction to the Druid Python API](01-druidapi-package-intro.ipynb) walks you through some of the\n",
" basics related to the Druid API using the Python wrapper API.\n",
"- [Learn the basics of Druid SQL](../03-query/00-using-sql-with-druidapi.ipynb) introduces you to the unique aspects of Druid SQL with the primary focus on the SELECT statement. \n",
"- [Learn the basics of Druid SQL](../03-query/00-using-sql-with-druidapi.ipynb) introduces you to the unique aspects of Druid SQL with the primary focus on the SELECT statement.\n",
"- [Learn to use the Data Generator](./02-datagen-intro.ipynb) gets you started with streaming and batch file data generation for testing of any data schema.\n",
"- [Ingest and query data from Apache Kafka](../02-ingestion/01-streaming-from-kafka.ipynb) walks you through ingesting an event stream from Kafka."
]
},

View File

@ -445,7 +445,7 @@
"metadata": {},
"outputs": [],
"source": [
"sql_client.run_task(sql)"
"display.run_task(sql)"
]
},
{
@ -473,7 +473,7 @@
"id": "11d9c95a",
"metadata": {},
"source": [
"`describe_table()` lists the columns in a table."
"`display.table(<table_name>)` lists the columns in a table."
]
},
{

View File

@ -0,0 +1,642 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "9e07b3f5-d919-4179-91a1-0f6b66c42757",
"metadata": {},
"source": [
"# Data Generator Server\n",
"<!--\n",
" ~ Licensed to the Apache Software Foundation (ASF) under one\n",
" ~ or more contributor license agreements. See the NOTICE file\n",
" ~ distributed with this work for additional information\n",
" ~ regarding copyright ownership. The ASF licenses this file\n",
" ~ to you under the Apache License, Version 2.0 (the\n",
" ~ \"License\"); you may not use this file except in compliance\n",
" ~ with the License. You may obtain a copy of the License at\n",
" ~\n",
" ~ http://www.apache.org/licenses/LICENSE-2.0\n",
" ~\n",
" ~ Unless required by applicable law or agreed to in writing,\n",
" ~ software distributed under the License is distributed on an\n",
" ~ \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
" ~ KIND, either express or implied. See the License for the\n",
" ~ specific language governing permissions and limitations\n",
" ~ under the License.\n",
" -->\n",
"The default Docker Compose deployment includes a data generation service created from the published Docker image at `imply/datagen:latest`. \n",
"This image is built by the project https://github.com/implydata/druid-datagenerator. \n",
"\n",
"This notebook shows you how to use the data generation service included in the Docker Compose deployment. It explains how to use predefined data generator configurations as well as how to build a custom data generator. You will also learn how to create sample data files for batch ingestion and how to generate live streaming data for streaming ingestion.\n",
"\n",
"## Table of contents\n",
"\n",
"* [Initialization](#Initialization)\n",
"* [List available configurations](#List-available-configurations)\n",
"* [Generate a data file for backfilling history](#Generate-a-data-file-for-backfilling-history)\n",
"* [Batch ingestion of generated files](#Batch-ingestion-of-generated-files)\n",
"* [Generate custom data](#Generate-custom-data)\n",
"* [Stream generated data](#Stream-generated-data)\n",
"* [Ingest data from a stream](#Ingest-data-from-a-stream)\n",
"* [Cleanup](#Cleanup)\n",
"\n",
"\n",
"## Initialization\n",
"\n",
"To interact with the data generation service, use the REST client provided in the [`druidapi` Python package](https://druid.apache.org/docs/latest/tutorials/tutorial-jupyter-index.html#python-api-for-druid)."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f84766c7-c6a5-4496-91a3-abdb8ddd2375",
"metadata": {},
"outputs": [],
"source": [
"import druidapi\n",
"import os\n",
"import time\n",
"\n",
"# Datagen client \n",
"datagen = druidapi.rest.DruidRestClient(\"http://datagen:9999\")\n",
"\n",
"if (os.environ['DRUID_HOST'] == None):\n",
" druid_host=f\"http://router:8888\"\n",
"else:\n",
" druid_host=f\"http://{os.environ['DRUID_HOST']}:8888\"\n",
"\n",
"# Druid client\n",
"druid = druidapi.jupyter_client(druid_host)\n",
"\n",
"\n",
"\n",
"# these imports and constants are used by multiple cells\n",
"from datetime import datetime, timedelta\n",
"import json\n",
"\n",
"headers = {\n",
" 'Content-Type': 'application/json'\n",
"}"
]
},
{
"cell_type": "markdown",
"id": "c54af617-0998-4010-90c3-9b5a38a09a5f",
"metadata": {},
"source": [
"### List available configurations\n",
"Use the `/list` API endpoint to get the data generator's available configuration values with predefined data generator schemas."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1ba6a80a-c49b-4abf-943b-9dad82f2ae13",
"metadata": {},
"outputs": [],
"source": [
"display(datagen.get(f\"/list\", require_ok=False).json())"
]
},
{
"cell_type": "markdown",
"id": "ae88a3b7-60da-405d-bcf4-fb4affcfe973",
"metadata": {},
"source": [
"### Generate a data file for backfilling history\n",
"When generating a file for backfill purposes, you can select the start time and the duration of the simulation.\n",
"\n",
"Configure the data generator request as follows:\n",
"* `name`: an arbitrary name you assign to the job. Refer to the job name to get the job status or to stop the job.\n",
"* `target.type`: \"file\" to generate a data file\n",
"* `target.path`: identifies the name of the file to generate. The data generator ignores any path specified and creates the file in the current working directory.\n",
"* `time_type`,`time`: The data generator simulates the time range you specify with a start timestamp in the `time_type` property and a duration in the `time` property. To specify `time`, use the `h` suffix for hours, `m` for minutes, and `s` for seconds.\n",
"- `concurrency` indicates the maximum number of entities used concurrently to generate events. Each entity is a separate state machine that simulates things like user sessions, IoT devices, or other concurrent sources of event data.\n",
"\n",
"The following example uses the `clickstream.json` predefined configuration to generate data into a file called `clicks.json`. The data generator starts the sample data at one hour prior to the current time and simulates events for a duration of one hour. Since it is simulated, it does this in just a few seconds."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "811ff58f-75af-4092-a08d-5e07a51592ff",
"metadata": {},
"outputs": [],
"source": [
"# Configure the start time to one hour prior to the current time. \n",
"startDateTime = (datetime.now() - timedelta(hours = 1)).strftime('%Y-%m-%dT%H:%M:%S.001')\n",
"print(f\"Starting to generate history at {startDateTime}.\")\n",
"\n",
"# Give the datagen job a name for use in subsequent API calls\n",
"job_name=\"gen_clickstream1\"\n",
"\n",
"# Generate a data file on the datagen server\n",
"datagen_request = {\n",
" \"name\": job_name,\n",
" \"target\": { \"type\": \"file\", \"path\":\"clicks.json\"},\n",
" \"config_file\": \"clickstream/clickstream.json\", \n",
" \"time_type\": startDateTime,\n",
" \"time\": \"1h\",\n",
" \"concurrency\":100\n",
"}\n",
"response = datagen.post(\"/start\", json.dumps(datagen_request), headers=headers, require_ok=False)\n",
"response.json()"
]
},
{
"cell_type": "markdown",
"id": "d407d1d9-3f01-4128-a014-6a5f371c25a5",
"metadata": {},
"source": [
"#### Display jobs\n",
"Use the `/jobs` API endpoint to get the current jobs and job statuses."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3de698c5-bcf4-40c7-b295-728fb54d1f0a",
"metadata": {},
"outputs": [],
"source": [
"display(datagen.get(f\"/jobs\").json())"
]
},
{
"cell_type": "markdown",
"id": "972ebed0-34a1-4ad2-909d-69b8b27c3046",
"metadata": {},
"source": [
"#### Get status of a job\n",
"Use the `/status/JOB_NAME` API endpoint to get the current jobs and their status."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "debce4f8-9c16-476c-9593-21ec984985d2",
"metadata": {},
"outputs": [],
"source": [
"display(datagen.get(f\"/status/{job_name}\", require_ok=False).json())"
]
},
{
"cell_type": "markdown",
"id": "ef818d78-6aa6-4d38-8a43-83416aede96f",
"metadata": {},
"source": [
"#### Stop a job\n",
"Use the `/stop/JOB_NAME` API endpoint to stop a job."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7631b8b8-d3d6-4803-9162-587f440d2ef2",
"metadata": {},
"outputs": [],
"source": [
"display(datagen.post(f\"/stop/{job_name}\", '').json())"
]
},
{
"cell_type": "markdown",
"id": "0a8dc7d3-64e5-41e3-8c28-c5f19c0536f5",
"metadata": {},
"source": [
"#### List files created on datagen server\n",
"Use the `/files` API endpoint to list files available on the server."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "06ee36bd-2d2b-4904-9987-10636cf52aac",
"metadata": {},
"outputs": [],
"source": [
"display(datagen.get(f\"/files\", '').json())"
]
},
{
"cell_type": "markdown",
"id": "83ef9edb-98e2-45b4-88e8-578703faedc1",
"metadata": {},
"source": [
"### Batch ingestion of generated files\n",
"Use a [Druid HTTP input source](https://druid.apache.org/docs/latest/ingestion/native-batch-input-sources.html#http-input-source) in the [EXTERN function](https://druid.apache.org/docs/latest/multi-stage-query/reference.html#extern-function) of a [SQL-based ingestion](https://druid.apache.org/docs/latest/multi-stage-query/index.html) to load generated files.\n",
"You can access files by name from within Druid using the URI `http://datagen:9999/file/FILE_NAME`. Alternatively, if you run Druid outside of Docker but on the same machine, access the file with `http://localhost:9999/file/FILE_NAME`.\n",
"The following example assumes that both Druid and the data generator server are running in Docker Compose."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0d72b015-f8ec-4713-b6f2-fe7a15afff59",
"metadata": {},
"outputs": [],
"source": [
"sql = '''\n",
"REPLACE INTO \"clicks\" OVERWRITE ALL\n",
"WITH \"ext\" AS (SELECT *\n",
"FROM TABLE(\n",
" EXTERN(\n",
" '{\"type\":\"http\",\"uris\":[\"http://datagen:9999/file/clicks.json\"]}',\n",
" '{\"type\":\"json\"}'\n",
" )\n",
") EXTEND (\"time\" VARCHAR, \"user_id\" VARCHAR, \"event_type\" VARCHAR, \"client_ip\" VARCHAR, \"client_device\" VARCHAR, \"client_lang\" VARCHAR, \"client_country\" VARCHAR, \"referrer\" VARCHAR, \"keyword\" VARCHAR, \"product\" VARCHAR))\n",
"SELECT\n",
" TIME_PARSE(\"time\") AS \"__time\",\n",
" \"user_id\",\n",
" \"event_type\",\n",
" \"client_ip\",\n",
" \"client_device\",\n",
" \"client_lang\",\n",
" \"client_country\",\n",
" \"referrer\",\n",
" \"keyword\",\n",
" \"product\"\n",
"FROM \"ext\"\n",
"PARTITIONED BY DAY\n",
"''' \n",
"\n",
"druid.display.run_task(sql)\n",
"print(\"Waiting for segment avaialbility ...\")\n",
"druid.sql.wait_until_ready('clicks')\n",
"print(\"Data is available for query.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b0997b38-02c2-483e-bd15-439c4bf0097a",
"metadata": {},
"outputs": [],
"source": [
"sql = '''\n",
"SELECT \"event_type\", \"user_id\", count( DISTINCT \"client_ip\") ip_count\n",
"FROM \"clicks\"\n",
"GROUP BY 1,2\n",
"ORDER BY 3 DESC\n",
"LIMIT 10\n",
"'''\n",
"druid.display.sql(sql)"
]
},
{
"cell_type": "markdown",
"id": "66ec013f-28e4-4d5a-94a6-06e0ed537b4e",
"metadata": {},
"source": [
"## Generate custom data\n",
"\n",
"You can find the full set of configuration options for the data generator in the [README](https://github.com/implydata/druid-datagenerator#data-generator-configuration).\n",
"\n",
"This section demonstrates a simple custom configuration as an example. Notice that the emitter defined the schema as a list of dimensions, each dimension specifies how its values are generated: "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d6451310-b7dd-4b39-a23b-7b735b152d6c",
"metadata": {},
"outputs": [],
"source": [
"gen_config = {\n",
" \"emitters\": [\n",
" {\n",
" \"name\": \"simple_record\",\n",
" \"dimensions\": [\n",
" {\n",
" \"type\": \"string\",\n",
" \"name\": \"random_string_column\",\n",
" \"length_distribution\": {\n",
" \"type\": \"constant\",\n",
" \"value\": 13\n",
" },\n",
" \"cardinality\": 0,\n",
" \"chars\": \"#.abcdefghijklmnopqrstuvwxyz\"\n",
" },\n",
" {\n",
" \"type\": \"int\",\n",
" \"name\": \"distributed_number\",\n",
" \"distribution\": {\n",
" \"type\": \"uniform\",\n",
" \"min\": 0,\n",
" \"max\": 1000\n",
" },\n",
" \"cardinality\": 10,\n",
" \"cardinality_distribution\": {\n",
" \"type\": \"exponential\",\n",
" \"mean\": 5\n",
" }\n",
" }\n",
" ]\n",
" }\n",
" ],\n",
" \"interarrival\": {\n",
" \"type\": \"constant\",\n",
" \"value\": 1\n",
" },\n",
" \"states\": [\n",
" {\n",
" \"name\": \"state_1\",\n",
" \"emitter\": \"simple_record\",\n",
" \"delay\": {\n",
" \"type\": \"constant\",\n",
" \"value\": 1\n",
" },\n",
" \"transitions\": [\n",
" {\n",
" \"next\": \"state_1\",\n",
" \"probability\": 1.0\n",
" }\n",
" ]\n",
" }\n",
" ]\n",
"}\n",
"\n",
"target = { \"type\":\"file\", \"path\":\"sample_data.json\"}"
]
},
{
"cell_type": "markdown",
"id": "89a22645-aea5-4c15-b81a-959b27df731f",
"metadata": {},
"source": [
"This example uses the `config` attribute of the request to configure a new custom data generator instead of using a predefined `config_file`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e5e5c535-3474-42b4-9772-14279e712f3d",
"metadata": {},
"outputs": [],
"source": [
"# generate 1 hour of simulated time using custom configuration\n",
"datagen_request = {\n",
" \"name\": \"sample_custom\",\n",
" \"target\": target,\n",
" \"config\": gen_config, \n",
" \"time\": \"1h\",\n",
" \"concurrency\":10,\n",
" \"time_type\": \"SIM\"\n",
"}\n",
"response = datagen.post(\"/start\", json.dumps(datagen_request), headers=headers, require_ok=False)\n",
"response.json()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "952386f7-8181-4325-972b-5f30dc12cf21",
"metadata": {},
"outputs": [],
"source": [
"display(datagen.get(f\"/jobs\", require_ok=False).json())"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "470b3a2a-4fd9-45a2-9221-497d906f62a9",
"metadata": {},
"outputs": [],
"source": [
"# display the first 1k characters of the generated data file\n",
"display( datagen.get(f\"/file/sample_data.json\").content[:1024])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "350faea6-55b0-4386-830c-5160ae495012",
"metadata": {},
"outputs": [],
"source": [
"datagen.post(f\"/stop/sample_custom\",'')"
]
},
{
"cell_type": "markdown",
"id": "77bff054-0f16-4fd5-8ade-2d44b30d0cf2",
"metadata": {},
"source": [
"## Stream generated data\n",
"\n",
"The data generator works exactly the same whether it is writing data to a file or publishing messages into a stream. You only need to change the target configuration.\n",
"\n",
"To use the Kafka container running on Docker Compose, use the host name `kafka:9092`. This tutorial uses the KAFKA_HOST environment variable from Docker Compose to specify the Kafka host. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9959b7c3-6223-479d-b0c2-115a1c555090",
"metadata": {},
"outputs": [],
"source": [
"if (os.environ['KAFKA_HOST'] == None):\n",
" kafka_host=f\"kafka:9092\"\n",
"else:\n",
" kafka_host=f\"{os.environ['KAFKA_HOST']}:9092\""
]
},
{
"cell_type": "markdown",
"id": "497abc18-6538-4536-a17f-fe10c4367611",
"metadata": {},
"source": [
"The simplest `target` object for Kafka and, similarly, Confluent is:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "686a74ab-e2dd-458e-9e93-10291064e9db",
"metadata": {},
"outputs": [],
"source": [
"target = {\n",
" \"type\":\"kafka\",\n",
" \"endpoint\": kafka_host,\n",
" \"topic\": \"custom_data\"\n",
"}\n",
"\n",
"# Generate 1 hour of real time using custom configuration, this means that this stream will run for an hour if not stopped\n",
"datagen_request = {\n",
" \"name\": \"sample_custom\",\n",
" \"target\": target,\n",
" \"config\": gen_config, \n",
" \"time\": \"1h\",\n",
" \"concurrency\":10,\n",
" \"time_type\": \"REAL\"\n",
"}\n",
"response = datagen.post(\"/start\", json.dumps(datagen_request), headers=headers, require_ok=False)\n",
"response.json()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ec17d0c7-a3ab-4f37-bbf0-cc02bff44cf1",
"metadata": {},
"outputs": [],
"source": [
"time.sleep(1) # avoid race condition of async job start\n",
"display(datagen.get(f\"/jobs\", require_ok=False).json())"
]
},
{
"cell_type": "markdown",
"id": "84d7b706-9040-4a69-a956-1b1bbb037c32",
"metadata": {},
"source": [
"### Ingest data from a stream \n",
"This example shows how to start a streaming ingestion supervisor in Apache Druid to consume your custom data:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "51912409-e4e7-48d1-b3a5-b269622b4e56",
"metadata": {},
"outputs": [],
"source": [
"ingestion_spec ={\n",
" \"type\": \"kafka\",\n",
" \"spec\": {\n",
" \"ioConfig\": {\n",
" \"type\": \"kafka\",\n",
" \"consumerProperties\": {\n",
" \"bootstrap.servers\": \"kafka:9092\"\n",
" },\n",
" \"topic\": \"custom_data\",\n",
" \"inputFormat\": {\n",
" \"type\": \"json\"\n",
" },\n",
" \"useEarliestOffset\": True\n",
" },\n",
" \"tuningConfig\": {\n",
" \"type\": \"kafka\",\n",
" \"maxRowsInMemory\": 100000,\n",
" \"resetOffsetAutomatically\": False\n",
" },\n",
" \"dataSchema\": {\n",
" \"dataSource\": \"custom_data\",\n",
" \"timestampSpec\": {\n",
" \"column\": \"time\",\n",
" \"format\": \"iso\"\n",
" },\n",
" \"dimensionsSpec\": {\n",
" \"dimensions\": [\n",
" \"random_string_column\",\n",
" {\n",
" \"type\": \"long\",\n",
" \"name\": \"distributed_number\"\n",
" }\n",
" ]\n",
" },\n",
" \"granularitySpec\": {\n",
" \"queryGranularity\": \"none\",\n",
" \"rollup\": False,\n",
" \"segmentGranularity\": \"hour\"\n",
" }\n",
" }\n",
" }\n",
"}\n",
"\n",
"headers = {\n",
" 'Content-Type': 'application/json'\n",
"}\n",
"\n",
"druid.rest.post(\"/druid/indexer/v1/supervisor\", json.dumps(ingestion_spec), headers=headers)"
]
},
{
"cell_type": "markdown",
"id": "dddfb1cc-f863-4bf4-8c5a-b261b0b9c2f0",
"metadata": {},
"source": [
"Query the data on the stream, but first wait for its availability. It takes a bit of time for the streaming tasks to start, but once they are consuming you can see data very close to real time: Run the following cell multiple times to see how the data is changing:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7e1284ed-5c49-4f37-81f7-c3b720473158",
"metadata": {},
"outputs": [],
"source": [
"druid.sql.wait_until_ready('custom_data', verify_load_status=False)\n",
"druid.display.sql('''\n",
"SELECT SUM(distributed_number) sum_randoms, count(*) total_count\n",
"FROM custom_data\n",
"''')"
]
},
{
"cell_type": "markdown",
"id": "4486e430-0776-46ad-8a8b-4f0354f17bfb",
"metadata": {},
"source": [
"### Cleanup\n",
"\n",
"Stop the streaming ingestion and the streaming producer:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "38943a92-dc23-41cf-91a4-1b68d2178033",
"metadata": {},
"outputs": [],
"source": [
"print(f\"Stop streaming generator: [{datagen.post('/stop/sample_custom','',require_ok=False)}]\")\n",
"print(f'Reset offsets for streaming ingestion: [{druid.rest.post(\"/druid/indexer/v1/supervisor/custom_data/reset\",\"\", require_ok=False)}]')\n",
"print(f'Stop streaming ingestion: [{druid.rest.post(\"/druid/indexer/v1/supervisor/custom_data/terminate\",\"\", require_ok=False)}]')"
]
},
{
"cell_type": "markdown",
"id": "0cf53bdc-de7f-425d-84b1-68d0cef420d8",
"metadata": {},
"source": [
"Wait for streaming ingestion to complete and then remove the custom data table:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "87341e7c-f7ab-488c-9913-091f712534cb",
"metadata": {},
"outputs": [],
"source": [
"print(f\"Drop datasource: [{druid.datasources.drop('custom_data')}]\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -4,7 +4,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Ingest and query data from Apache Kafka\n",
"# Ingest and query data from Apache Kafka\n",
"\n",
"<!--\n",
" ~ Licensed to the Apache Software Foundation (ASF) under one\n",
@ -60,9 +60,10 @@
" * Update the `rest_client` variable to point to your Coordinator endpoint. For example, `\"http://localhost:8081\"`.\n",
"* A running Kafka cluster.\n",
" * Update the Kafka bootstrap servers to point to your servers. For example, `bootstrap_servers=[\"localhost:9092\"]`.\n",
"* A running [Data Generator server](https://github.com/implydata/druid-datagenerator) accessible to the cluster.\n",
" * Update the data generator client. For example `datagen = druidapi.rest.DruidRestClient(\"http://localhost:9999\")`.\n",
"* The following Python packages:\n",
" * `druidapi`, a Python client for Apache Druid\n",
" * `DruidDataDriver`, a data generator\n",
" * `kafka`, a Python client for Apache Kafka\n",
" * `pandas`, `matplotlib`, and `seaborn` for data visualization\n"
]
@ -88,36 +89,16 @@
"outputs": [],
"source": [
"import druidapi\n",
"import json\n",
"import os\n",
"import time\n",
"\n",
"# druid_host is the hostname and port for your Druid deployment. \n",
"# In the Docker Compose tutorial environment, this is the Router\n",
"# service running at \"http://router:8888\".\n",
"# If you are not using the Docker Compose environment, edit the `druid_host`.\n",
"\n",
"druid_host = \"http://router:8888\"\n",
"druid_host\n",
"\n",
"druid = druidapi.jupyter_client(druid_host)\n",
"display = druid.display\n",
"sql_client = druid.sql\n",
"\n",
"# Create a rest client for native JSON ingestion for streaming data\n",
"rest_client = druidapi.rest.DruidRestClient(\"http://coordinator:8081\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Kafka topic"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook relies on the Python client for the Apache Kafka. Import the Kafka producer and consumer modules, then create a Kafka client. You use the Kafka producer to create and publish records to a new topic named `social_media`."
"if 'DRUID_HOST' not in os.environ.keys():\n",
" druid_host=f\"http://localhost:8888\"\n",
"else:\n",
" druid_host=f\"http://{os.environ['DRUID_HOST']}:8888\"\n",
" \n",
"print(f\"Opening a connection to {druid_host}.\")\n",
"druid = druidapi.jupyter_client(druid_host)"
]
},
{
@ -126,19 +107,55 @@
"metadata": {},
"outputs": [],
"source": [
"from kafka import KafkaProducer\n",
"from kafka import KafkaConsumer\n",
"# Use kafka_host variable when connecting to kafka \n",
"if 'KAFKA_HOST' not in os.environ.keys():\n",
" kafka_host=f\"http://localhost:9092\"\n",
"else:\n",
" kafka_host=f\"{os.environ['KAFKA_HOST']}:9092\"\n",
"\n",
"# Kafka runs on kafka:9092 in multi-container tutorial application\n",
"producer = KafkaProducer(bootstrap_servers='kafka:9092')\n",
"# this is the kafka topic we will be working with:\n",
"topic_name = \"social_media\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"\n",
"# shortcuts for display and sql api's\n",
"display = druid.display\n",
"sql_client = druid.sql\n",
"\n",
"# client for Data Generator API\n",
"datagen = druidapi.rest.DruidRestClient(\"http://datagen:9999\")\n",
"\n",
"# client for Druid API\n",
"rest_client = druid.rest"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create the `social_media` topic and send a sample event. The `send()` command returns a metadata descriptor for the record."
"## Publish generated data directly to Kafka topic"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this section, you use the data generator included as part of the Docker application to generate a stream of messages. The data generator creates and send messages to a Kafka topic named `social_media`. To learn more about the Druid Data Generator, see the [project](https://github.com/implydata/druid-datagenerator) and the [data generation notebook](../01-introduction/02-datagen-intro.ipynb)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate data\n",
"Run the following cells to load sample data into the `social_media` Kafka topic. The data generator sends events until it reaches 50,000 messages."
]
},
{
@ -147,24 +164,25 @@
"metadata": {},
"outputs": [],
"source": [
"event = {\n",
" \"__time\": \"2023-01-03T16:40:21.501\",\n",
" \"username\": \"willow\",\n",
" \"post_title\": \"This title is required\",\n",
" \"views\": 15284,\n",
" \"upvotes\": 124,\n",
" \"comments\": 21,\n",
" \"edited\": \"True\"\n",
"headers = {\n",
" 'Content-Type': 'application/json'\n",
"}\n",
"\n",
"producer.send(topic_name, json.dumps(event).encode('utf-8'))"
"datagen_request = {\n",
" \"name\": \"social_stream\",\n",
" \"target\": { \"type\": \"kafka\", \"endpoint\": kafka_host, \"topic\": topic_name },\n",
" \"config_file\": \"social/social_posts.json\", \n",
" \"total_events\":50000,\n",
" \"concurrency\":100\n",
"}\n",
"datagen.post(\"/start\", json.dumps(datagen_request), headers=headers)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To verify that the Kafka topic stored the event, create a consumer client to read records from the Kafka cluster, and get the next (only) message:"
"Check the status of the job with the following cell:"
]
},
{
@ -173,59 +191,9 @@
"metadata": {},
"outputs": [],
"source": [
"consumer = KafkaConsumer(topic_name, bootstrap_servers=['kafka:9092'], auto_offset_reset='earliest',\n",
" enable_auto_commit=True)\n",
"\n",
"print(next(consumer).value.decode('utf-8'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load data into Kafka topic"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Instead of manually creating events to send to the Kafka topic, use a data generator to simulate a continuous data stream. This tutorial makes use of Druid Data Driver to simulate a continuous data stream into the `social_media` Kafka topic. To learn more about the Druid Data Driver, see the Druid Summit talk, [Generating Time centric Data for Apache Druid](https://www.youtube.com/watch?v=3zAOeLe3iAo).\n",
"\n",
"In this notebook, you use a background process to continuously load data into the Kafka topic.\n",
"This allows you to keep executing commands in this notebook while data is constantly being streamed into the topic."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the following cells to load sample data into the `social_media` Kafka topic:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import multiprocessing as mp\n",
"from datetime import datetime\n",
"import DruidDataDriver"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def run_driver():\n",
" DruidDataDriver.simulate(\"kafka_docker_config.json\", None, None, \"REAL\", datetime.now())\n",
" \n",
"mp.set_start_method('fork')\n",
"ps = mp.Process(target=run_driver)\n",
"ps.start()"
"time.sleep(1) # avoid race between start of the job and its status being available\n",
"response = datagen.get('/status/social_stream')\n",
"response.json()"
]
},
{
@ -258,16 +226,56 @@
"metadata": {},
"outputs": [],
"source": [
"kafka_ingestion_spec = \"{\\\"type\\\": \\\"kafka\\\",\\\"spec\\\": {\\\"ioConfig\\\": {\\\"type\\\": \\\"kafka\\\",\\\"consumerProperties\\\": {\\\"bootstrap.servers\\\": \\\"kafka:9092\\\"},\\\"topic\\\": \\\"social_media\\\",\\\"inputFormat\\\": {\\\"type\\\": \\\"json\\\"},\\\"useEarliestOffset\\\": true},\\\"tuningConfig\\\": {\\\"type\\\": \\\"kafka\\\"},\\\"dataSchema\\\": {\\\"dataSource\\\": \\\"social_media\\\",\\\"timestampSpec\\\": {\\\"column\\\": \\\"__time\\\",\\\"format\\\": \\\"iso\\\"},\\\"dimensionsSpec\\\": {\\\"dimensions\\\": [\\\"username\\\",\\\"post_title\\\",{\\\"type\\\": \\\"long\\\",\\\"name\\\": \\\"views\\\"},{\\\"type\\\": \\\"long\\\",\\\"name\\\": \\\"upvotes\\\"},{\\\"type\\\": \\\"long\\\",\\\"name\\\": \\\"comments\\\"},\\\"edited\\\"]},\\\"granularitySpec\\\": {\\\"queryGranularity\\\": \\\"none\\\",\\\"rollup\\\": false,\\\"segmentGranularity\\\": \\\"hour\\\"}}}}\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(json.dumps(json.loads(kafka_ingestion_spec), indent=4))"
"kafka_ingestion_spec = {\n",
" \"type\": \"kafka\",\n",
" \"spec\": {\n",
" \"ioConfig\": {\n",
" \"type\": \"kafka\",\n",
" \"consumerProperties\": {\n",
" \"bootstrap.servers\": \"kafka:9092\"\n",
" },\n",
" \"topic\": \"social_media\",\n",
" \"inputFormat\": {\n",
" \"type\": \"json\"\n",
" },\n",
" \"useEarliestOffset\": True\n",
" },\n",
" \"tuningConfig\": {\n",
" \"type\": \"kafka\"\n",
" },\n",
" \"dataSchema\": {\n",
" \"dataSource\": \"social_media\",\n",
" \"timestampSpec\": {\n",
" \"column\": \"time\",\n",
" \"format\": \"iso\"\n",
" },\n",
" \"dimensionsSpec\": {\n",
" \"dimensions\": [\n",
" \"username\",\n",
" \"post_title\",\n",
" {\n",
" \"type\": \"long\",\n",
" \"name\": \"views\"\n",
" },\n",
" {\n",
" \"type\": \"long\",\n",
" \"name\": \"upvotes\"\n",
" },\n",
" {\n",
" \"type\": \"long\",\n",
" \"name\": \"comments\"\n",
" },\n",
" \"edited\"\n",
" ]\n",
" },\n",
" \"granularitySpec\": {\n",
" \"queryGranularity\": \"none\",\n",
" \"rollup\": False,\n",
" \"segmentGranularity\": \"hour\"\n",
" }\n",
" }\n",
" }\n",
"}"
]
},
{
@ -287,14 +295,26 @@
" 'Content-Type': 'application/json'\n",
"}\n",
"\n",
"rest_client.post(\"/druid/indexer/v1/supervisor\", kafka_ingestion_spec, headers=headers)"
"supervisor = rest_client.post(\"/druid/indexer/v1/supervisor\", json.dumps(kafka_ingestion_spec), headers=headers)\n",
"print(supervisor.status_code)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A `200` response indicates that the request was successful. You can view the running ingestion task and the new datasource in the web console at http://localhost:8888/unified-console.html."
"A `200` response indicates that the request was successful. You can view the running ingestion task and the new datasource in the web console's [ingestion view](http://localhost:8888/unified-console.html#ingestion).\n",
"\n",
"The following cell pauses further execution until the ingestion has started and the datasource is available for querying:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"druid.sql.wait_until_ready('social_media', verify_load_status=False)"
]
},
{
@ -496,8 +516,49 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"This plot shows how some users maintain relatively consistent social media impact between the two query snapshots, whereas other users grow or decline in their influence.\n",
"\n",
"This plot shows how some users maintain relatively consistent social media impact between the two query snapshots, whereas other users grow or decline in their influence."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cleanup \n",
"The following cells stop the data generation and ingestion jobs and removes the datasource from Druid."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(f\"Stop streaming generator: [{datagen.post('/stop/social_stream','',require_ok=False)}]\")\n",
"print(f'Reset offsets for ingestion: [{druid.rest.post(\"/druid/indexer/v1/supervisor/social_media/reset\",\"\", require_ok=False)}]')\n",
"print(f'Stop streaming ingestion: [{druid.rest.post(\"/druid/indexer/v1/supervisor/social_media/terminate\",\"\", require_ok=False)}]')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Once the ingestion process ends and completes any final ingestion steps, remove the datasource with the following cell:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"time.sleep(5) # wait for streaming ingestion tasks to end\n",
"print(f\"Drop datasource: [{druid.datasources.drop('social_media')}]\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Learn more\n",
"\n",
"This tutorial showed you how to create a Kafka topic using a Python client for Kafka, send a simulated stream of data to Kafka using a data generator, and query and visualize results over time. For more information, see the following resources:\n",

View File

@ -1,90 +0,0 @@
{
"target": {
"type": "kafka",
"endpoint": "kafka:9092",
"topic": "social_media"
},
"emitters": [
{
"name": "example_record_1",
"dimensions": [
{
"type": "enum",
"name": "username",
"values": ["willow", "mia", "leon", "milton", "miette", "gus", "jojo", "rocket"],
"cardinality_distribution": {
"type": "uniform",
"min": 0,
"max": 7
}
},
{
"type": "string",
"name": "post_title",
"length_distribution": {"type": "uniform", "min": 1, "max": 140},
"cardinality": 0,
"chars": "abcdefghijklmnopqrstuvwxyz0123456789_ABCDEFGHIJKLMNOPQRSTUVWXYZ!';:,."
},
{
"type": "int",
"name": "views",
"distribution": {
"type": "exponential",
"mean": 10000
},
"cardinality": 0
},
{
"type": "int",
"name": "upvotes",
"distribution": {
"type": "normal",
"mean": 70,
"stddev": 20
},
"cardinality": 0
},
{
"type": "int",
"name": "comments",
"distribution": {
"type": "normal",
"mean": 10,
"stddev": 5
},
"cardinality": 0
},
{
"type": "enum",
"name": "edited",
"values": ["True","False"],
"cardinality_distribution": {
"type": "uniform",
"min": 0,
"max": 1
}
}
]
}
],
"interarrival": {
"type": "constant",
"value": 1
},
"states": [
{
"name": "state_1",
"emitter": "example_record_1",
"delay": {
"type": "constant",
"value": 1
},
"transitions": [
{
"next": "state_1",
"probability": 1.0
}
]
}
]
}