202307-notebook-unionall (#14726)

Co-authored-by: Victoria Lim <vtlim@users.noreply.github.com>
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
This commit is contained in:
Peter Marshall 2023-08-21 18:55:58 +01:00 committed by GitHub
parent 631dc3b589
commit 0dfd99e381
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 509 additions and 0 deletions

View File

@ -0,0 +1,509 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "557e06e8-9b35-4b34-8322-8a8ede6de709",
"metadata": {},
"source": [
"# Using `UNION ALL` to address multiple `TABLE`s in the same query\n",
"\n",
"<!--\n",
" ~ Licensed to the Apache Software Foundation (ASF) under one\n",
" ~ or more contributor license agreements. See the NOTICE file\n",
" ~ distributed with this work for additional information\n",
" ~ regarding copyright ownership. The ASF licenses this file\n",
" ~ to you under the Apache License, Version 2.0 (the\n",
" ~ \"License\"); you may not use this file except in compliance\n",
" ~ with the License. You may obtain a copy of the License at\n",
" ~\n",
" ~ http://www.apache.org/licenses/LICENSE-2.0\n",
" ~\n",
" ~ Unless required by applicable law or agreed to in writing,\n",
" ~ software distributed under the License is distributed on an\n",
" ~ \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
" ~ KIND, either express or implied. See the License for the\n",
" ~ specific language governing permissions and limitations\n",
" ~ under the License.\n",
" -->\n",
" \n",
"While working with Druid, you may need to bring together two different tables of results together into a single result list, or to treat multiple tables as a single input to a query. This notebook introduces the `UNION ALL` operator, walking through two ways in which this operator can be used to achieve this result: top-level and table-level `UNION ALL`."
]
},
{
"cell_type": "markdown",
"id": "cf4554ae-6516-4e76-b202-d6e2fdf31603",
"metadata": {},
"source": [
"## Prerequisites\n",
"\n",
"This tutorial works with Druid 26.0.0 or later.\n",
"\n",
"#### Run using Docker\n",
"\n",
"Launch this tutorial and all prerequisites using the `druid-jupyter` profile of the Docker Compose file for Jupyter-based Druid tutorials. For more information, see [Docker for Jupyter Notebook tutorials](https://druid.apache.org/docs/latest/tutorials/tutorial-jupyter-docker.html).\n",
" \n",
"#### Run Druid without Docker\n",
"\n",
"If you do not use the Docker Compose environment, you need the following:\n",
"\n",
"* A running Druid instance, with a `DRUID_HOST` local environment variable containing the servername of your Druid router\n",
"* [druidapi](https://github.com/apache/druid/blob/master/examples/quickstart/jupyter-notebooks/druidapi/README.md), a Python client for Apache Druid. Follow the instructions in the Install section of the README file."
]
},
{
"cell_type": "markdown",
"id": "ee0c3171-def8-4ad9-9c56-d3a67f309631",
"metadata": {},
"source": [
"### Initialization\n",
"\n",
"Run the next cell to attempt a connection to Druid services. If successful, the output shows the Druid version number."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9fa4abfe-f878-4031-88f2-94c13e922279",
"metadata": {},
"outputs": [],
"source": [
"import druidapi\n",
"import os\n",
"\n",
"if 'DRUID_HOST' not in os.environ.keys():\n",
" druid_host=f\"http://localhost:8888\"\n",
"else:\n",
" druid_host=f\"http://{os.environ['DRUID_HOST']}:8888\"\n",
" \n",
"print(f\"Opening a connection to {druid_host}.\")\n",
"druid = druidapi.jupyter_client(druid_host)\n",
"\n",
"display = druid.display\n",
"sql_client = druid.sql\n",
"status_client = druid.status\n",
"\n",
"status_client.version"
]
},
{
"cell_type": "markdown",
"id": "fc3001a0-27e5-4f41-876a-ce6eab2acd6a",
"metadata": {},
"source": [
"Finally, run the following cell to import the Python JSON module."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6b058d8b-2bae-4929-ab0c-5a6df1850387",
"metadata": {},
"outputs": [],
"source": [
"import json"
]
},
{
"cell_type": "markdown",
"id": "f388633f-195b-4381-98cc-7a2f80f48690",
"metadata": {},
"source": [
"## Using Top-level `UNION ALL` to concatenate result sets\n",
"\n",
"Run the following cell to ingest the wikipedia data example. Once completed, you will see a description of the new table.\n",
"\n",
"You can optionally monitor the ingestion in the Druid console while it runs."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a399196b-12db-42ff-ae24-c7232f150aba",
"metadata": {},
"outputs": [],
"source": [
"sql='''\n",
"REPLACE INTO \"example-wikipedia-unionall\" OVERWRITE ALL\n",
"WITH \"ext\" AS (SELECT *\n",
"FROM TABLE(\n",
" EXTERN(\n",
" '{\"type\":\"http\",\"uris\":[\"https://druid.apache.org/data/wikipedia.json.gz\"]}',\n",
" '{\"type\":\"json\"}'\n",
" )\n",
") EXTEND (\"isRobot\" VARCHAR, \"channel\" VARCHAR, \"timestamp\" VARCHAR, \"flags\" VARCHAR, \"isUnpatrolled\" VARCHAR, \"page\" VARCHAR, \"diffUrl\" VARCHAR, \"added\" BIGINT, \"comment\" VARCHAR, \"commentLength\" BIGINT, \"isNew\" VARCHAR, \"isMinor\" VARCHAR, \"delta\" BIGINT, \"isAnonymous\" VARCHAR, \"user\" VARCHAR, \"deltaBucket\" BIGINT, \"deleted\" BIGINT, \"namespace\" VARCHAR, \"cityName\" VARCHAR, \"countryName\" VARCHAR, \"regionIsoCode\" VARCHAR, \"metroCode\" BIGINT, \"countryIsoCode\" VARCHAR, \"regionName\" VARCHAR))\n",
"SELECT\n",
" TIME_PARSE(\"timestamp\") AS \"__time\",\n",
" \"isRobot\",\n",
" \"channel\",\n",
" \"page\",\n",
" \"commentLength\",\n",
" \"countryName\",\n",
" \"user\"\n",
"FROM \"ext\"\n",
"PARTITIONED BY DAY\n",
"'''\n",
"\n",
"sql_client.run_task(sql)\n",
"sql_client.wait_until_ready('example-wikipedia-unionall')\n",
"display.table('example-wikipedia-unionall')"
]
},
{
"cell_type": "markdown",
"id": "24b47cc3-68f5-4a73-b374-94bbfa32d91d",
"metadata": {},
"source": [
"You can use `UNION ALL` to append the results of one query with another.\n",
"\n",
"The first query in the cell below, `set1`, returns the ten first edits to any \"fr\"-like `channel` between midday and 1pm on the 27th June 2016. The second query repeats this but for any \"en\"-like `channel`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b76e5184-9fe4-4f21-a471-4e15d16515c8",
"metadata": {},
"outputs": [],
"source": [
"sql = '''\n",
"WITH\n",
"set1 AS (\n",
" SELECT\n",
" __time,\n",
" \"channel\",\n",
" \"page\",\n",
" \"isRobot\"\n",
" FROM \"example-wikipedia-unionall\"\n",
" WHERE DATE_TRUNC('HOUR', __time) = TIMESTAMP '2016-06-27 12:00:00'\n",
" AND channel LIKE '#fr%'\n",
" ORDER BY __time\n",
" LIMIT 10\n",
" ),\n",
"set2 AS (\n",
" SELECT\n",
" __time,\n",
" \"channel\",\n",
" \"page\",\n",
" \"isRobot\"\n",
" FROM \"example-wikipedia-unionall\"\n",
" WHERE DATE_TRUNC('HOUR', __time) = TIMESTAMP '2016-06-27 12:00:00'\n",
" AND channel LIKE '#en%'\n",
" ORDER BY __time\n",
" LIMIT 10\n",
" )\n",
" \n",
"SELECT * from set1\n",
"UNION ALL\n",
"SELECT * from set2\n",
"'''\n",
"\n",
"display.sql(sql)"
]
},
{
"cell_type": "markdown",
"id": "f5e77fa9-a60c-4955-b763-58d970d7326d",
"metadata": {},
"source": [
"This is a [top-level](https://druid.apache.org/docs/latest/querying/sql.html#top-level) `UNION` operation. First, Druid calculated `set1` and appended subsequent results sets.\n",
"\n",
"Notice that these results are not in order by time  even though the individual sets did `ORDER BY` time. Druid simply concatenated the two result sets together.\n",
"\n",
"Optionally, run the next cell to show the precise [`EXPLAIN PLAN`](https://druid.apache.org/docs/latest/querying/sql-translation#interpreting-explain-plan-output) for the query. You can see there are two `query` execution plans, one for each subquery. Also, Druid's planning process optimized execution of the outer query."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "97934da2-17d1-4c91-8ae3-926cc89185c1",
"metadata": {},
"outputs": [],
"source": [
"print(json.dumps(json.loads(sql_client.explain_sql(sql)['PLAN']), indent=2))"
]
},
{
"cell_type": "markdown",
"id": "800add1a-d459-4796-b974-b2f094db417f",
"metadata": {},
"source": [
"Run next cell to perform another top-level UNION ALL, this time where the sets use `GROUP BY`.\n",
"\n",
"Notice that the aggregates have `AS` to set specific field names."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8e687466-74bb-4cc0-ba17-913d1807fc60",
"metadata": {},
"outputs": [],
"source": [
"sql='''\n",
"WITH\n",
"set1 AS (\n",
" SELECT\n",
" TIME_FLOOR(__time, 'PT1H') AS \"Period\",\n",
" countryName,\n",
" AVG(commentLength) AS \"Average Comment Size\",\n",
" COUNT(DISTINCT \"page\") AS \"Pages\"\n",
" FROM \"example-wikipedia-unionall\"\n",
" WHERE countryName='China'\n",
" GROUP BY 1, 2\n",
" LIMIT 10\n",
" ),\n",
"set2 AS (\n",
" SELECT\n",
" TIME_FLOOR(__time, 'PT1H') AS \"Episode\",\n",
" countryName,\n",
" COUNT(DISTINCT \"page\") AS \"Pages\",\n",
" AVG(commentLength) AS \"Average Comment Length\"\n",
" FROM \"example-wikipedia-unionall\"\n",
" WHERE countryName='Austria'\n",
" GROUP BY 1, 2\n",
" LIMIT 10\n",
" )\n",
"\n",
"SELECT * from set1\n",
"UNION ALL\n",
"SELECT * from set2\n",
"'''\n",
"\n",
"display.sql(sql)"
]
},
{
"cell_type": "markdown",
"id": "f2c95ffc-b260-4671-bacc-c8cc3137e9c2",
"metadata": {},
"source": [
"Look carefully at these results - Druid has simply appended the results from `set2` to `set1` without introducing redundant columns.\n",
"\n",
"* Column name in `set2` (`Period` versus `Episode` and `Average Comment Size` versus `Average Comment Length`) did not result in new columns\n",
"* Columns with the same name (`Pages`) did not result in that aggregate being put into same column - Austria's values are simply appended `Average Comment Size`\n",
"\n",
"Run the next cell, which uses explicit column names at the top-level, rather than `*`, to ensure the calculations appear in the right columns in the final result. It also aliases the columns for the results by using `AS`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "39f9be92-7b2e-417c-b16a-5060b8cd2c30",
"metadata": {},
"outputs": [],
"source": [
"sql='''\n",
"WITH\n",
"set1 AS (\n",
" SELECT\n",
" TIME_FLOOR(__time, 'PT1H') AS \"Period\",\n",
" countryName,\n",
" AVG(commentLength) AS \"Average Comment Size\",\n",
" COUNT(DISTINCT \"page\") AS \"Pages\"\n",
" FROM \"example-wikipedia-unionall\"\n",
" WHERE countryName='China'\n",
" GROUP BY 1, 2\n",
" LIMIT 10\n",
" ),\n",
"set2 AS (\n",
" SELECT\n",
" TIME_FLOOR(__time, 'PT1H') AS \"Episode\",\n",
" countryName,\n",
" COUNT(DISTINCT \"page\") AS \"Pages\",\n",
" AVG(commentLength) AS \"Average Comment Length\"\n",
" FROM \"example-wikipedia-unionall\"\n",
" WHERE countryName='Austria'\n",
" GROUP BY 1, 2\n",
" LIMIT 10\n",
" )\n",
"\n",
"SELECT \"Period\", \"countryName\" AS \"Country\", \"Average Comment Size\" AS \"Edit Size\", \"Pages\" AS \"Unique Pages\" from set1\n",
"UNION ALL\n",
"SELECT \"Episode\", \"countryName\", \"Average Comment Length\", \"Pages\" from set2\n",
"'''\n",
"\n",
"display.sql(sql)"
]
},
{
"cell_type": "markdown",
"id": "25001794-e1a7-4325-adb3-2b8f26036261",
"metadata": {},
"source": [
"## Using Table-level `UNION ALL` to work with multiple tables\n",
"\n",
"From one source of data, data engineers may create multiple `TABLE` datasources in order to:\n",
"\n",
"* Separate data with different levels of `__time` granularity (ie. the level of summarisation),\n",
"* Apply different security to different parts, for example, per tenant,\n",
"* Break up the data using filtering at ingestion time, for example, different tables for different HTTP error codes,\n",
"* Separate upstream data by the source device or system, for example, different types of IOT device,\n",
"* Isolate different periods of time, perhaps with different retention periods.\n",
"\n",
"You can use `UNION ALL` to access _all_ the source data, referencing all the `TABLE` datasources through a sub-query or a `FROM` clause.\n",
"\n",
"The next two cells create two new tables, `example-wikipedia-unionall-en` and `example-wikipedia-unionall-fr`. `example-wikipedia-unionall-en` contains only data for English language channel edits, while `example-wikipedia-unionall-fr` contains only French channels.\n",
"\n",
"Run the next two cells, monitoring the ingestion in the Druid Console as they run."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "269c6aef-c3a5-46ad-8332-30b7bf30ddfb",
"metadata": {},
"outputs": [],
"source": [
"sql='''\n",
"REPLACE INTO \"example-wikipedia-unionall-en\" OVERWRITE ALL\n",
"WITH \"ext\" AS (SELECT *\n",
"FROM TABLE(\n",
" EXTERN(\n",
" '{\"type\":\"http\",\"uris\":[\"https://druid.apache.org/data/wikipedia.json.gz\"]}',\n",
" '{\"type\":\"json\"}'\n",
" )\n",
") EXTEND (\"isRobot\" VARCHAR, \"channel\" VARCHAR, \"timestamp\" VARCHAR, \"flags\" VARCHAR, \"isUnpatrolled\" VARCHAR, \"page\" VARCHAR, \"diffUrl\" VARCHAR, \"added\" BIGINT, \"comment\" VARCHAR, \"commentLength\" BIGINT, \"isNew\" VARCHAR, \"isMinor\" VARCHAR, \"delta\" BIGINT, \"isAnonymous\" VARCHAR, \"user\" VARCHAR, \"deltaBucket\" BIGINT, \"deleted\" BIGINT, \"namespace\" VARCHAR, \"cityName\" VARCHAR, \"countryName\" VARCHAR, \"regionIsoCode\" VARCHAR, \"metroCode\" BIGINT, \"countryIsoCode\" VARCHAR, \"regionName\" VARCHAR))\n",
"SELECT\n",
" TIME_PARSE(\"timestamp\") AS \"__time\",\n",
" \"isRobot\",\n",
" \"channel\",\n",
" \"page\",\n",
" \"commentLength\",\n",
" \"countryName\",\n",
" \"user\"\n",
"FROM \"ext\"\n",
"WHERE \"channel\" LIKE '#en%'\n",
"PARTITIONED BY DAY\n",
"'''\n",
"\n",
"sql_client.run_task(sql)\n",
"sql_client.wait_until_ready('example-wikipedia-unionall-en')\n",
"display.table('example-wikipedia-unionall-en')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "61740d61-28fc-48e9-b026-d472bd04f390",
"metadata": {},
"outputs": [],
"source": [
"sql='''\n",
"REPLACE INTO \"example-wikipedia-unionall-fr\" OVERWRITE ALL\n",
"WITH \"ext\" AS (SELECT *\n",
"FROM TABLE(\n",
" EXTERN(\n",
" '{\"type\":\"http\",\"uris\":[\"https://druid.apache.org/data/wikipedia.json.gz\"]}',\n",
" '{\"type\":\"json\"}'\n",
" )\n",
") EXTEND (\"isRobot\" VARCHAR, \"channel\" VARCHAR, \"timestamp\" VARCHAR, \"flags\" VARCHAR, \"isUnpatrolled\" VARCHAR, \"page\" VARCHAR, \"diffUrl\" VARCHAR, \"added\" BIGINT, \"comment\" VARCHAR, \"commentLength\" BIGINT, \"isNew\" VARCHAR, \"isMinor\" VARCHAR, \"delta\" BIGINT, \"isAnonymous\" VARCHAR, \"user\" VARCHAR, \"deltaBucket\" BIGINT, \"deleted\" BIGINT, \"namespace\" VARCHAR, \"cityName\" VARCHAR, \"countryName\" VARCHAR, \"regionIsoCode\" VARCHAR, \"metroCode\" BIGINT, \"countryIsoCode\" VARCHAR, \"regionName\" VARCHAR))\n",
"SELECT\n",
" TIME_PARSE(\"timestamp\") AS \"__time\",\n",
" \"isRobot\",\n",
" \"channel\",\n",
" \"page\",\n",
" \"commentLength\",\n",
" \"countryName\",\n",
" \"user\"\n",
"FROM \"ext\"\n",
"WHERE \"channel\" LIKE '#fr%'\n",
"PARTITIONED BY DAY\n",
"'''\n",
"\n",
"sql_client.run_task(sql)\n",
"sql_client.wait_until_ready('example-wikipedia-unionall-fr')\n",
"display.table('example-wikipedia-unionall-fr')"
]
},
{
"cell_type": "markdown",
"id": "f8bbf2c6-681a-46f5-82f2-201cbbe8058d",
"metadata": {},
"source": [
"The next cell uses `UNION ALL` in a `WITH` statement that creates `unifiedSource`. This will be a unified source of data for both tables that can then be used in a `SELECT` query.\n",
"\n",
"Druid executes these \"[top level](https://druid.apache.org/docs/26.0.0/querying/sql.html#top-level)\" `UNION ALL` queries differently to \"[table level](https://druid.apache.org/docs/26.0.0/querying/sql.html#table-level)\" queries you have used so far. Table level `UNION ALL` makes use of `union` datasources, and it's important that you read the [documentation](https://druid.apache.org/docs/26.0.0/querying/datasource.html#union) to understand the functionality available to you. Operations such as filtering, for example, can only be done in the outer `SELECT` statement on `unifiedSource` in the sample query below. \n",
"\n",
"Run the following cell to count the number of robot and non-robot edits by channel across both sets."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "783fe77d-2e7b-476a-9748-67ea90c8bb91",
"metadata": {},
"outputs": [],
"source": [
"sql = '''\n",
"WITH unifiedSource AS (\n",
" SELECT\n",
" \"__time\",\n",
" \"isRobot\",\n",
" \"channel\",\n",
" \"user\",\n",
" \"countryName\"\n",
" FROM \"example-wikipedia-unionall-en\"\n",
" UNION ALL\n",
" SELECT\n",
" \"__time\",\n",
" \"isRobot\",\n",
" \"channel\",\n",
" \"user\",\n",
" \"countryName\"\n",
" FROM \"example-wikipedia-unionall-fr\"\n",
" )\n",
"\n",
"SELECT\n",
" \"channel\",\n",
" COUNT(*) FILTER (WHERE isRobot=true) AS \"Robot Edits\",\n",
" COUNT (DISTINCT user) FILTER (WHERE isRobot=true) AS \"Robot Editors\",\n",
" COUNT(*) FILTER (WHERE isRobot=false) AS \"Human Edits\",\n",
" COUNT (DISTINCT user) FILTER (WHERE isRobot=false) AS \"Human Editors\"\n",
"FROM unifiedSource\n",
"GROUP BY 1\n",
"'''\n",
"\n",
"display.sql(sql)"
]
},
{
"cell_type": "markdown",
"id": "f58a1846-5072-4495-b840-a620de3c0442",
"metadata": {},
"source": [
"## Conclusion\n",
"\n",
"* There are two modes for `UNION ALL` in Druid - top level and table level\n",
"* Top level is a simple concatenation, and operations must be done on the source `TABLE`s\n",
"* Table level uses a `union` data source, and operations must be done on the outer `SELECT`\n",
"\n",
"## Learn more\n",
"\n",
"* Watch [Plan your Druid table datasources](https://youtu.be/OpYDX4RYLV0?list=PLDZysOZKycN7MZvNxQk_6RbwSJqjSrsNR) by Peter Marshall\n",
"* Read about [union](https://druid.apache.org/docs/26.0.0/querying/datasource.html#union) datasources in the documentation\n",
"* Read the latest [documentation](https://druid.apache.org/docs/26.0.0/querying/sql.html#union-all) on the `UNION ALL` operator"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}