druid/docs/querying/sql-translation.md

22 KiB

id title sidebar_label
sql-translation SQL query translation SQL query translation

Apache Druid supports two query languages: Druid SQL and native queries. This document describes the Druid SQL language.

Druid uses Apache Calcite to parse and plan SQL queries. Druid translates SQL statements into its native JSON-based query language. In general, the slight overhead of translating SQL on the Broker is the only minor performance penalty to using Druid SQL compared to native queries.

This topic includes best practices and tools to help you achieve good performance and minimize the impact of translation.

Best practices

Consider the following non-exhaustive list of best practices when looking into performance implications of translating Druid SQL queries to native queries.

  1. If you wrote a filter on the primary time column __time, make sure it is being correctly translated to an "intervals" filter, as described in the Time filters section below. If not, you may need to change the way you write the filter.

  2. Try to avoid subqueries underneath joins: they affect both performance and scalability. This includes implicit subqueries generated by conditions on mismatched types, and implicit subqueries generated by conditions that use expressions to refer to the right-hand side.

  3. Currently, Druid does not support pushing down predicates (condition and filter) past a Join (i.e. into Join's children). Druid only supports pushing predicates into the join if they originated from above the join. Hence, the location of predicates and filters in your Druid SQL is very important. Also, as a result of this, comma joins should be avoided.

  4. Read through the Query execution page to understand how various types of native queries will be executed.

  5. Be careful when interpreting EXPLAIN PLAN output, and use request logging if in doubt. Request logs will show the exact native query that was run. See the next section for more details.

  6. If you encounter a query that could be planned better, feel free to raise an issue on GitHub. A reproducible test case is always appreciated.

Interpreting EXPLAIN PLAN output

The EXPLAIN PLAN functionality can help you understand how a given SQL query will be translated to native. EXPLAIN PLAN statements return:

  • a PLAN column that contains a JSON array of native queries that Druid will run
  • a RESOURCES column that describes the resources used in the query
  • an ATTRIBUTES column that describes the attributes of the query, including:
    • statementType: the SQL statement type
    • targetDataSource: the target datasource in an INSERT or REPLACE statement
    • partitionedBy: the time-based partitioning granularity in an INSERT or REPLACE statement
    • clusteredBy: the clustering columns in an INSERT or REPLACE statement
    • replaceTimeChunks: the time chunks in a REPLACE statement

Example 1: EXPLAIN PLAN for a SELECT query on the wikipedia datasource:

EXPLAIN PLAN FOR
SELECT
  channel,
  COUNT(*)
FROM wikipedia
WHERE channel IN (SELECT page FROM wikipedia GROUP BY page ORDER BY COUNT(*) DESC LIMIT 10)
GROUP BY channel

The above EXPLAIN PLAN query returns the following result:

[
  [
    {
      "query": {
        "queryType": "topN",
        "dataSource": {
          "type": "join",
          "left": {
            "type": "table",
            "name": "wikipedia"
          },
          "right": {
            "type": "query",
            "query": {
              "queryType": "groupBy",
              "dataSource": {
                "type": "table",
                "name": "wikipedia"
              },
              "intervals": {
                "type": "intervals",
                "intervals": [
                  "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
                ]
              },
              "granularity": {
                "type": "all"
              },
              "dimensions": [
                {
                  "type": "default",
                  "dimension": "page",
                  "outputName": "d0",
                  "outputType": "STRING"
                }
              ],
              "aggregations": [
                {
                  "type": "count",
                  "name": "a0"
                }
              ],
              "limitSpec": {
                "type": "default",
                "columns": [
                  {
                    "dimension": "a0",
                    "direction": "descending",
                    "dimensionOrder": {
                      "type": "numeric"
                    }
                  }
                ],
                "limit": 10
              },
              "context": {
                "sqlOuterLimit": 101,
                "sqlQueryId": "ee616a36-c30c-4eae-af00-245127956e42",
                "useApproximateCountDistinct": false,
                "useApproximateTopN": false
              }
            }
          },
          "rightPrefix": "j0.",
          "condition": "(\"channel\" == \"j0.d0\")",
          "joinType": "INNER"
        },
        "dimension": {
          "type": "default",
          "dimension": "channel",
          "outputName": "d0",
          "outputType": "STRING"
        },
        "metric": {
          "type": "dimension",
          "ordering": {
            "type": "lexicographic"
          }
        },
        "threshold": 101,
        "intervals": {
          "type": "intervals",
          "intervals": [
            "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
          ]
        },
        "granularity": {
          "type": "all"
        },
        "aggregations": [
          {
            "type": "count",
            "name": "a0"
          }
        ],
        "context": {
          "sqlOuterLimit": 101,
          "sqlQueryId": "ee616a36-c30c-4eae-af00-245127956e42",
          "useApproximateCountDistinct": false,
          "useApproximateTopN": false
        }
      },
      "signature": [
        {
          "name": "d0",
          "type": "STRING"
        },
        {
          "name": "a0",
          "type": "LONG"
        }
      ],
      "columnMappings": [
        {
          "queryColumn": "d0",
          "outputColumn": "channel"
        },
        {
          "queryColumn": "a0",
          "outputColumn": "EXPR$1"
        }
      ]
    }
  ],
  [
    {
      "name": "wikipedia",
      "type": "DATASOURCE"
    }
  ],
  {
    "statementType": "SELECT"
  }
]

Example 2: EXPLAIN PLAN for a REPLACE query that replaces all the data in the wikipedia datasource:

EXPLAIN PLAN FOR
REPLACE INTO wikipedia
OVERWRITE ALL
SELECT
  TIME_PARSE("timestamp") AS __time,
  namespace,
  cityName,
  countryName,
  regionIsoCode,
  metroCode,
  countryIsoCode,
  regionName
FROM TABLE(
    EXTERN(
      '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}',
      '{"type":"json"}',
      '[{"name":"timestamp","type":"string"},{"name":"namespace","type":"string"},{"name":"cityName","type":"string"},{"name":"countryName","type":"string"},{"name":"regionIsoCode","type":"string"},{"name":"metroCode","type":"long"},{"name":"countryIsoCode","type":"string"},{"name":"regionName","type":"string"}]'
    )
  )
PARTITIONED BY HOUR
CLUSTERED BY cityName

The above EXPLAIN PLAN query returns the following result:

[
  [
    {
      "query": {
        "queryType": "scan",
        "dataSource": {
          "type": "external",
          "inputSource": {
            "type": "http",
            "uris": [
              "https://druid.apache.org/data/wikipedia.json.gz"
            ]
          },
          "inputFormat": {
            "type": "json",
            "keepNullColumns": false,
            "assumeNewlineDelimited": false,
            "useJsonNodeReader": false
          },
          "signature": [
            {
              "name": "timestamp",
              "type": "STRING"
            },
            {
              "name": "namespace",
              "type": "STRING"
            },
            {
              "name": "cityName",
              "type": "STRING"
            },
            {
              "name": "countryName",
              "type": "STRING"
            },
            {
              "name": "regionIsoCode",
              "type": "STRING"
            },
            {
              "name": "metroCode",
              "type": "LONG"
            },
            {
              "name": "countryIsoCode",
              "type": "STRING"
            },
            {
              "name": "regionName",
              "type": "STRING"
            }
          ]
        },
        "intervals": {
          "type": "intervals",
          "intervals": [
            "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
          ]
        },
        "virtualColumns": [
          {
            "type": "expression",
            "name": "v0",
            "expression": "timestamp_parse(\"timestamp\",null,'UTC')",
            "outputType": "LONG"
          }
        ],
        "resultFormat": "compactedList",
        "orderBy": [
          {
            "columnName": "cityName",
            "order": "ascending"
          }
        ],
        "columns": [
          "cityName",
          "countryIsoCode",
          "countryName",
          "metroCode",
          "namespace",
          "regionIsoCode",
          "regionName",
          "v0"
        ],
        "legacy": false,
        "context": {
          "finalizeAggregations": false,
          "groupByEnableMultiValueUnnesting": false,
          "maxNumTasks": 5,
          "queryId": "b474c0d5-a5ce-432d-be94-535ccdb7addc",
          "scanSignature": "[{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"countryName\",\"type\":\"STRING\"},{\"name\":\"metroCode\",\"type\":\"LONG\"},{\"name\":\"namespace\",\"type\":\"STRING\"},{\"name\":\"regionIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"},{\"name\":\"v0\",\"type\":\"LONG\"}]",
          "sqlInsertSegmentGranularity": "\"HOUR\"",
          "sqlQueryId": "b474c0d5-a5ce-432d-be94-535ccdb7addc",
          "sqlReplaceTimeChunks": "all"
        },
        "granularity": {
          "type": "all"
        }
      },
      "signature": [
        {
          "name": "v0",
          "type": "LONG"
        },
        {
          "name": "namespace",
          "type": "STRING"
        },
        {
          "name": "cityName",
          "type": "STRING"
        },
        {
          "name": "countryName",
          "type": "STRING"
        },
        {
          "name": "regionIsoCode",
          "type": "STRING"
        },
        {
          "name": "metroCode",
          "type": "LONG"
        },
        {
          "name": "countryIsoCode",
          "type": "STRING"
        },
        {
          "name": "regionName",
          "type": "STRING"
        }
      ],
      "columnMappings": [
        {
          "queryColumn": "v0",
          "outputColumn": "__time"
        },
        {
          "queryColumn": "namespace",
          "outputColumn": "namespace"
        },
        {
          "queryColumn": "cityName",
          "outputColumn": "cityName"
        },
        {
          "queryColumn": "countryName",
          "outputColumn": "countryName"
        },
        {
          "queryColumn": "regionIsoCode",
          "outputColumn": "regionIsoCode"
        },
        {
          "queryColumn": "metroCode",
          "outputColumn": "metroCode"
        },
        {
          "queryColumn": "countryIsoCode",
          "outputColumn": "countryIsoCode"
        },
        {
          "queryColumn": "regionName",
          "outputColumn": "regionName"
        }
      ]
    }
  ],
  [
    {
      "name": "EXTERNAL",
      "type": "EXTERNAL"
    },
    {
      "name": "wikipedia",
      "type": "DATASOURCE"
    }
  ],
  {
    "statementType": "REPLACE",
    "targetDataSource": "wikipedia",
    "partitionedBy": "HOUR",
    "clusteredBy": "`cityName`",
    "replaceTimeChunks": "'ALL'"
  }
]

In this case the JOIN operator gets translated to a join datasource. See the Join translation section for more details about how this works.

We can see this for ourselves using Druid's request logging feature. After enabling logging and running this query, we can see that it actually runs as the following native query.

{
  "queryType": "groupBy",
  "dataSource": {
    "type": "join",
    "left": "wikipedia",
    "right": {
      "type": "query",
      "query": {
        "queryType": "topN",
        "dataSource": "wikipedia",
        "dimension": {"type": "default", "dimension": "page", "outputName": "d0"},
        "metric": {"type": "numeric", "metric": "a0"},
        "threshold": 10,
        "intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
        "granularity": "all",
        "aggregations": [
          { "type": "count", "name": "a0"}
        ]
      }
    },
    "rightPrefix": "j0.",
    "condition": "(\"page\" == \"j0.d0\")",
    "joinType": "INNER"
  },
  "intervals": "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z",
  "granularity": "all",
  "dimensions": [
    {"type": "default", "dimension": "channel", "outputName": "d0"}
  ],
  "aggregations": [
    { "type": "count", "name": "a0"}
  ]
}

Query types

Druid SQL uses four different native query types.

  • Scan is used for queries that do not aggregate—no GROUP BY, no DISTINCT.

  • Timeseries is used for queries that GROUP BY FLOOR(__time TO unit) or TIME_FLOOR(__time, period), have no other grouping expressions, no HAVING clause, no nesting, and either no ORDER BY, or an ORDER BY that orders by same expression as present in GROUP BY. It also uses Timeseries for "grand total" queries that have aggregation functions but no GROUP BY. This query type takes advantage of the fact that Druid segments are sorted by time.

  • TopN is used by default for queries that group by a single expression, do have ORDER BY and LIMIT clauses, do not have HAVING clauses, and are not nested. However, the TopN query type will deliver approximate ranking and results in some cases; if you want to avoid this, set "useApproximateTopN" to "false". TopN results are always computed in memory. See the TopN documentation for more details.

  • GroupBy is used for all other aggregations, including any nested aggregation queries. Druid's GroupBy is a traditional aggregation engine: it delivers exact results and rankings and supports a wide variety of features. GroupBy aggregates in memory if it can, but it may spill to disk if it doesn't have enough memory to complete your query. Results are streamed back from data processes through the Broker if you ORDER BY the same expressions in your GROUP BY clause, or if you don't have an ORDER BY at all. If your query has an ORDER BY referencing expressions that don't appear in the GROUP BY clause (like aggregation functions) then the Broker will materialize a list of results in memory, up to a max of your LIMIT, if any. See the GroupBy documentation for details about tuning performance and memory use.

Time filters

For all native query types, filters on the __time column will be translated into top-level query "intervals" whenever possible, which allows Druid to use its global time index to quickly prune the set of data that must be scanned. Consider this (non-exhaustive) list of time filters that will be recognized and translated to "intervals":

  • __time >= TIMESTAMP '2000-01-01 00:00:00' (comparison to absolute time)
  • __time >= CURRENT_TIMESTAMP - INTERVAL '8' HOUR (comparison to relative time)
  • FLOOR(__time TO DAY) = TIMESTAMP '2000-01-01 00:00:00' (specific day)

Refer to the Interpreting EXPLAIN PLAN output section for details on confirming that time filters are being translated as you expect.

Joins

SQL join operators are translated to native join datasources as follows:

  1. Joins that the native layer can handle directly are translated literally, to a join datasource whose left, right, and condition are faithful translations of the original SQL. This includes any SQL join where the right-hand side is a lookup or subquery, and where the condition is an equality where one side is an expression based on the left-hand table, the other side is a simple column reference to the right-hand table, and both sides of the equality are the same data type.

  2. If a join cannot be handled directly by a native join datasource as written, Druid SQL will insert subqueries to make it runnable. For example, foo INNER JOIN bar ON foo.abc = LOWER(bar.def) cannot be directly translated, because there is an expression on the right-hand side instead of a simple column access. A subquery will be inserted that effectively transforms this clause to foo INNER JOIN (SELECT LOWER(def) AS def FROM bar) t ON foo.abc = t.def.

  3. Druid SQL does not currently reorder joins to optimize queries.

Refer to the Interpreting EXPLAIN PLAN output section for details on confirming that joins are being translated as you expect.

Refer to the Query execution page for information about how joins are executed.

Subqueries

Subqueries in SQL are generally translated to native query datasources. Refer to the Query execution page for information about how subqueries are executed.

Note: Subqueries in the WHERE clause, like WHERE col1 IN (SELECT foo FROM ...) are translated to inner joins.

Approximations

Druid SQL will use approximate algorithms in some situations:

  • The COUNT(DISTINCT col) aggregation functions by default uses a variant of HyperLogLog, a fast approximate distinct counting algorithm. Druid SQL will switch to exact distinct counts if you set "useApproximateCountDistinct" to "false", either through query context or through Broker configuration.

  • GROUP BY queries over a single column with ORDER BY and LIMIT may be executed using the TopN engine, which uses an approximate algorithm. Druid SQL will switch to an exact grouping algorithm if you set "useApproximateTopN" to "false", either through query context or through Broker configuration.

  • Aggregation functions that are labeled as using sketches or approximations, such as APPROX_COUNT_DISTINCT, are always approximate, regardless of configuration.

A known issue with approximate functions based on data sketches

The APPROX_QUANTILE_DS and DS_QUANTILES_SKETCH functions can fail with an IllegalStateException if one of the sketches for the query hits maxStreamLength: the maximum number of items to store in each sketch. See GitHub issue 11544 for more details. To workaround the issue, increase value of the maximum string length with the approxQuantileDsMaxStreamLength parameter in the query context. Since it is set to 1,000,000,000 by default, you don't need to override it in most cases. See accuracy information in the DataSketches documentation for how many bytes are required per stream length. This query context parameter is a temporary solution to avoid the known issue. It may be removed in a future release after the bug is fixed.

Unsupported features

Druid does not support all SQL features. In particular, the following features are not supported.

  • JOIN between native datasources (table, lookup, subquery) and system tables.
  • JOIN conditions that are not an equality between expressions from the left- and right-hand sides.
  • JOIN conditions containing a constant value inside the condition.
  • JOIN conditions on a column which contains a multi-value dimension.
  • OVER clauses, and analytic functions such as LAG and LEAD.
  • ORDER BY for a non-aggregating query, except for ORDER BY __time or ORDER BY __time DESC, which are supported. This restriction only applies to non-aggregating queries; you can ORDER BY any column in an aggregating query.
  • DDL and DML.
  • Using Druid-specific functions like TIME_PARSE and APPROX_QUANTILE_DS on system tables.

Additionally, some Druid native query features are not supported by the SQL language. Some unsupported Druid features include: