29 KiB
id | title | sidebar_label |
---|---|---|
reference | SQL-based ingestion reference | Reference |
This page describes SQL-based batch ingestion using the
druid-multi-stage-query
extension, new in Druid 24.0. Refer to the ingestion methods table to determine which ingestion method is right for you.
SQL reference
This topic is a reference guide for the multi-stage query architecture in Apache Druid. For examples of real-world usage, refer to the Examples page.
EXTERN
Use the EXTERN
function to read external data.
Function format:
SELECT
<column>
FROM TABLE(
EXTERN(
'<Druid input source>',
'<Druid input format>',
'<row signature>'
)
)
EXTERN
consists of the following parts:
- Any Druid input source as a JSON-encoded string.
- Any Druid input format as a JSON-encoded string.
- A row signature, as a JSON-encoded array of column descriptors. Each column descriptor must have a
name
and atype
. The type can bestring
,long
,double
, orfloat
. This row signature is used to map the external data into the SQL layer.
For more information, see Read external data with EXTERN.
HTTP
, INLINE
and LOCALFILES
While EXTERN
allows you to specify an external table using JSON, other table functions allow you
describe the external table using SQL syntax. Each function works for one specific kind of input
source. You provide properties using SQL named arguments. The row signature is given using the
Druid SQL EXTEND
keyword using SQL syntax and types. Function format:
SELECT
<column>
FROM TABLE(
http(
userName => 'bob',
password => 'secret',
uris => 'http:foo.com/bar.csv',
format => 'csv'
)
) EXTEND (x VARCHAR, y VARCHAR, z BIGINT)
Note that the EXTEND
keyword is optional. The following is equally valid (and perhaps
more convenient):
SELECT
<column>
FROM TABLE(
http(
userName => 'bob',
password => 'secret',
uris => 'http:foo.com/bar.csv',
format => 'csv'
)
) (x VARCHAR, y VARCHAR, z BIGINT)
The set of table functions and formats is preliminary in this release.
HTTP
The HTTP
table function represents the HttpInputSource
class in Druid which allows you to
read from an HTTP server. The function accepts the following arguments:
Name | Description | JSON equivalent | Required |
---|---|---|---|
userName |
Basic authentication user name | httpAuthenticationUsername |
No |
password |
Basic authentication password | httpAuthenticationPassword |
No |
passwordEnvVar |
Environment variable that contains the basic authentication password | httpAuthenticationPassword |
No |
uris |
Comma-separated list of URIs to read. | uris |
Yes |
INLINE
The INLINE
table function represents the InlineInputSource
class in Druid which provides
data directly in the table function. The function accepts the following arguments:
Name | Description | JSON equivalent | Required |
---|---|---|---|
data |
Text lines of inline data. Separate lines with a newline. | data |
Yes |
LOCALFILES
The LOCALFILES
table function represents the LocalInputSource
class in Druid which reads
files from the file system of the node running Druid. This is most useful for single-node
installations. The function accepts the following arguments:
Name | Description | JSON equivalent | Required |
---|---|---|---|
baseDir |
Directory to read from. | baseDir |
No |
filter |
Filter pattern to read. Example: *.csv . |
filter |
No |
files |
Comma-separated list of files to read. | files |
No |
You must either provide the baseDir
or the list of files
. You can provide both, in which case
the files are assumed relative to the baseDir
. If you provide a filter
, you must provide the
baseDir
.
Table Function Format
Each of the table functions above requires that you specify a format.
Name | Description | JSON equivalent | Required |
---|---|---|---|
format |
The input format, using the same names as for EXTERN . |
inputFormat.type |
Yes |
CSV Format
Use the csv
format to read from CSV. This choice selects the Druid CsvInputFormat
class.
Name | Description | JSON equivalent | Required |
---|---|---|---|
listDelimiter |
The delimiter to use for fields that represent a list of strings. | listDelimiter |
No |
skipRows |
The number of rows to skip at the start of the file. Default is 0. | skipHeaderRows |
No |
MSQ does not have the ability to infer schema from a CSV, file, so the findColumnsFromHeader
property
is unavailable. Instead, Columns are given using the EXTEND
syntax described above.
Delimited Text Format
Use the tsv
format to read from an arbitrary delimited (CSV-like) file such as tab-delimited,
pipe-delimited, etc. This choice selects the Druid DelimitedInputFormat
class.
Name | Description | JSON equivalent | Required |
---|---|---|---|
delimiter |
The delimiter which separates fields. | delimiter |
Yes |
listDelimiter |
The delimiter to use for fields that represent a list of strings. | listDelimiter |
No |
skipRows |
The number of rows to skip at the start of the file. Default is 0. | skipHeaderRows |
No |
As noted above, MSQ cannot infer schema using headers. Use EXTEND
instead.
JSON Format
Use the json
format to read from a JSON input source. This choice selects the Druid JsonInputFormat
class.
Name | Description | JSON equivalent | Required |
---|---|---|---|
keepNulls |
Whether to keep null values. Defaults to false . |
keepNullColumns |
No |
INSERT
Use the INSERT
statement to insert data.
Unlike standard SQL, INSERT
loads data into the target table according to column name, not positionally. If necessary,
use AS
in your SELECT
column list to assign the correct names. Do not rely on their positions within the SELECT
clause.
Statement format:
INSERT INTO <table name>
< SELECT query >
PARTITIONED BY <time frame>
[ CLUSTERED BY <column list> ]
INSERT consists of the following parts:
- Optional context parameters.
- An
INSERT INTO <dataSource>
clause at the start of your query, such asINSERT INTO your-table
. - A clause for the data you want to insert, such as
SELECT ... FROM ...
. You can use EXTERN to reference external tables usingFROM TABLE(EXTERN(...))
. - A PARTITIONED BY clause, such as
PARTITIONED BY DAY
. - An optional CLUSTERED BY clause.
For more information, see Load data with INSERT.
REPLACE
You can use the REPLACE
function to replace all or some of the data.
Unlike standard SQL, REPLACE
loads data into the target table according to column name, not positionally. If necessary,
use AS
in your SELECT
column list to assign the correct names. Do not rely on their positions within the SELECT
clause.
REPLACE
all data
Function format to replace all data:
REPLACE INTO <target table>
OVERWRITE ALL
< SELECT query >
PARTITIONED BY <time granularity>
[ CLUSTERED BY <column list> ]
REPLACE
specific time ranges
Function format to replace specific time ranges:
REPLACE INTO <target table>
OVERWRITE WHERE __time >= TIMESTAMP '<lower bound>' AND __time < TIMESTAMP '<upper bound>'
< SELECT query >
PARTITIONED BY <time granularity>
[ CLUSTERED BY <column list> ]
REPLACE
consists of the following parts:
- Optional context parameters.
- A
REPLACE INTO <dataSource>
clause at the start of your query, such asREPLACE INTO "your-table".
- An OVERWRITE clause after the datasource, either OVERWRITE ALL or OVERWRITE WHERE:
- OVERWRITE ALL replaces the entire existing datasource with the results of the query.
- OVERWRITE WHERE drops the time segments that match the condition you set. Conditions are based on the
__time
column and use the format__time [< > = <= >=] TIMESTAMP
. Use them with AND, OR, and NOT between them, inclusive of the timestamps specified. No other expressions or functions are valid in OVERWRITE.
- A clause for the actual data you want to use for the replacement.
- A PARTITIONED BY clause, such as
PARTITIONED BY DAY
. - An optional CLUSTERED BY clause.
For more information, see Overwrite data with REPLACE.
PARTITIONED BY
The PARTITIONED BY <time granularity>
clause is required for INSERT and REPLACE. See
Partitioning for details.
The following granularity arguments are accepted:
- Time unit:
HOUR
,DAY
,MONTH
, orYEAR
. Equivalent toFLOOR(__time TO TimeUnit)
. TIME_FLOOR(__time, 'granularity_string')
, where granularity_string is one of the ISO 8601 periods listed below. The first argument must be__time
.FLOOR(__time TO TimeUnit)
, whereTimeUnit
is any unit supported by the FLOOR function. The first argument must be__time
.ALL
orALL TIME
, which effectively disables time partitioning by placing all data in a single time chunk. To use LIMIT or OFFSET at the outer level of your INSERT or REPLACE query, you must set PARTITIONED BY to ALL or ALL TIME.
The following ISO 8601 periods are supported for TIME_FLOOR
:
- PT1S
- PT1M
- PT5M
- PT10M
- PT15M
- PT30M
- PT1H
- PT6H
- P1D
- P1W
- P1M
- P3M
- P1Y
For more information about partitioning, see Partitioning.
CLUSTERED BY
The CLUSTERED BY <column list>
clause is optional for INSERT and REPLACE. It accepts a list of
column names or expressions.
For more information about clustering, see Clustering.
Context parameters
In addition to the Druid SQL context parameters, the multi-stage query task engine accepts certain context parameters that are specific to it.
Use context parameters alongside your queries to customize the behavior of the query. If you're using the API, include the context parameters in the query context when you submit a query:
{
"query": "SELECT 1 + 1",
"context": {
"<key>": "<value>",
"maxNumTasks": 3
}
}
If you're using the web console, you can specify the context parameters through various UI options.
The following table lists the context parameters for the MSQ task engine:
Parameter | Description | Default value |
---|---|---|
maxNumTasks |
SELECT, INSERT, REPLACE The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a TaskStartTimeout error code after approximately 10 minutes.May also be provided as numTasks . If both are present, maxNumTasks takes priority. |
2 |
taskAssignment |
SELECT, INSERT, REPLACE Determines how many tasks to use. Possible values include:
|
max |
finalizeAggregations |
SELECT, INSERT, REPLACE Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see SQL aggregation functions. |
true |
rowsInMemory |
INSERT or REPLACE Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the known issues around memory usage. |
100,000 |
segmentSortOrder |
INSERT or REPLACE Normally, Druid sorts rows in individual segments using __time first, followed by the CLUSTERED BY clause. When you set segmentSortOrder , Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.You provide the column list as comma-separated values or as a JSON array in string form. If your query includes __time , then this list must begin with __time . For example, consider an INSERT query that uses CLUSTERED BY country and has segmentSortOrder set to __time,city . Within each time chunk, Druid assigns rows to segments based on country , and then within each of those segments, Druid sorts those rows by __time first, then city , then country . |
empty list |
maxParseExceptions |
SELECT, INSERT, REPLACE Maximum number of parse exceptions that are ignored while executing the query before it stops with TooManyWarningsFault . To ignore all the parse exceptions, set the value to -1. |
0 |
rowsPerSegment |
INSERT or REPLACE The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see Segment Size Optimization. |
3,000,000 |
indexSpec |
INSERT or REPLACE An indexSpec to use when generating segments. May be a JSON string or object. See Front coding for details on configuring an indexSpec with front coding. |
See indexSpec . |
clusterStatisticsMergeMode |
Whether to use parallel or sequential mode for merging of the worker sketches. Can be PARALLEL , SEQUENTIAL or AUTO . See Sketch Merging Mode for more information. |
PARALLEL |
Sketch Merging Mode
This section details the advantages and performance of various Cluster By Statistics Merge Modes.
If a query requires key statistics to generate partition boundaries, key statistics are gathered by the workers while
reading rows from the datasource. These statistics must be transferred to the controller to be merged together.
clusterStatisticsMergeMode
configures the way in which this happens.
PARALLEL
mode fetches the key statistics for all time chunks from all workers together and the controller then downsamples
the sketch if it does not fit in memory. This is faster than SEQUENTIAL
mode as there is less over head in fetching sketches
for all time chunks together. This is good for small sketches which won't be downsampled even if merged together or if
accuracy in segment sizing for the ingestion is not very important.
SEQUENTIAL
mode fetches the sketches in ascending order of time and generates the partition boundaries for one time
chunk at a time. This gives more working memory to the controller for merging sketches, which results in less
downsampling and thus, more accuracy. There is, however, a time overhead on fetching sketches in sequential order. This is
good for cases where accuracy is important.
AUTO
mode tries to find the best approach based on number of workers and size of input rows. If there are more
than 100 workers or if the combined sketch size among all workers is more than 1GB, SEQUENTIAL
is chosen, otherwise,
PARALLEL
is chosen.
Durable Storage
This section enumerates the advantages and performance implications of enabling durable storage while executing MSQ tasks.
To prevent durable storage from getting filled up with temporary files in case the tasks fail to clean them up, a periodic cleaner can be scheduled to clean the directories corresponding to which there isn't a controller task running. It utilizes the storage connector to work upon the durable storage. The durable storage location should only be utilized to store the output for cluster's MSQ tasks. If the location contains other files or directories, then they will get cleaned up as well. Following table lists the properties that can be set to control the behavior of the durable storage of the cluster.
Parameter | Default | Description |
---|---|---|
druid.msq.intermediate.storage.enable |
true | Whether to enable durable storage for the cluster |
druid.msq.intermediate.storage.cleaner.enabled |
false | Whether durable storage cleaner should be enabled for the cluster. This should be set on the overlord |
druid.msq.intermediate.storage.cleaner.delaySeconds |
86400 | The delay (in seconds) after the last run post which the durable storage cleaner would clean the outputs. This should be set on the overlord |
Limits
Knowing the limits for the MSQ task engine can help you troubleshoot any errors that you encounter. Many of the errors occur as a result of reaching a limit.
The following table lists query limits:
Limit | Value | Error if exceeded |
---|---|---|
Size of an individual row written to a frame. Row size when written to a frame may differ from the original row size. | 1 MB | RowTooLarge |
Number of segment-granular time chunks encountered during ingestion. | 5,000 | TooManyBuckets |
Number of input files/segments per worker. | 10,000 | TooManyInputFiles |
Number of output partitions for any one stage. Number of segments generated during ingestion. | 25,000 | TooManyPartitions |
Number of output columns for any one stage. | 2,000 | TooManyColumns |
Number of cluster by columns that can appear in a stage | 1,500 | TooManyClusteredByColumns |
Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | TooManyWorkers |
Maximum memory occupied by broadcasted tables. | 30% of each processor memory bundle. | BroadcastTablesTooLarge |
Error codes
The following table describes error codes you may encounter in the multiStageQuery.payload.status.errorReport.error.errorCode
field:
Code | Meaning | Additional fields |
---|---|---|
BroadcastTablesTooLarge |
The size of the broadcast tables used in the right hand side of the join exceeded the memory reserved for them in a worker task. Try increasing the peon memory or reducing the size of the broadcast tables. |
maxBroadcastTablesSize : Memory reserved for the broadcast tables, measured in bytes. |
Canceled |
The query was canceled. Common reasons for cancellation:
|
|
CannotParseExternalData |
A worker task could not parse data from an external datasource. | errorMessage : More details on why parsing failed. |
ColumnNameRestricted |
The query uses a restricted column name. | columnName : The restricted column name. |
ColumnTypeNotSupported |
The column type is not supported. This can be because:
|
columnName : The column name with an unsupported type.columnType : The unknown column type. |
InsertCannotAllocateSegment |
The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:
|
dataSource interval : The interval for the attempted new segment allocation. |
InsertCannotBeEmpty |
An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with PARTITIONED BY set to something other than ALL or ALL TIME . |
dataSource |
InsertCannotOrderByDescending |
An INSERT query contained a CLUSTERED BY expression in descending order. Druid's segment generation code only supports ascending order. |
columnName |
InsertCannotReplaceExistingSegment |
A REPLACE query cannot proceed because an existing segment partially overlaps those bounds, and the portion within the bounds is not fully overshadowed by query results. There are two ways to address this without modifying your query:
|
segmentId : The existing segment |
InsertLockPreempted |
An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | |
InsertTimeNull |
An INSERT or REPLACE query encountered a null timestamp in the __time field.This can happen due to using an expression like TIME_PARSE(timestamp) AS __time with a timestamp that cannot be parsed. (TIME_PARSE returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern.If your timestamps may genuinely be null, consider using COALESCE to provide a default value. One option is CURRENT_TIMESTAMP, which represents the start time of the job. |
|
InsertTimeOutOfBounds |
A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause. To avoid this error, verify that the you specified is valid. |
interval : time chunk interval corresponding to the out-of-bounds timestamp |
InvalidNullByte |
A string column included a null byte. Null bytes in strings are not permitted. | column : The column that included the null byte |
QueryNotSupported |
QueryKit could not translate the provided native query to a multi-stage query. This can happen if the query uses features that aren't supported, like GROUPING SETS. |
|
RowTooLarge |
The query tried to process a row that was too large to write to a single frame. See the Limits table for specific limits on frame size. Note that the effective maximum row size is smaller than the maximum frame size due to alignment considerations during frame writing. | maxFrameSize : The limit on the frame size. |
TaskStartTimeout |
Unable to launch all the worker tasks in time. There might be insufficient available slots to start all the worker tasks simultaneously. Try splitting up the query into smaller chunks with lesser maxNumTasks number. Another option is to increase capacity. |
numTasks : The number of tasks attempted to launch. |
TooManyBuckets |
Exceeded the maximum number of partition buckets for a stage (5,000 partition buckets). < br />Partition buckets are created for each PARTITIONED BY time chunk for INSERT and REPLACE queries. The most common reason for this error is that your PARTITIONED BY is too narrow relative to your data. |
maxBuckets : The limit on partition buckets. |
TooManyInputFiles |
Exceeded the maximum number of input files or segments per worker (10,000 files or segments). If you encounter this limit, consider adding more workers, or breaking up your query into smaller queries that process fewer files or segments per query. |
numInputFiles : The total number of input files/segments for the stage.maxInputFiles : The maximum number of input files/segments per worker per stage.minNumWorker : The minimum number of workers required for a successful run. |
TooManyPartitions |
Exceeded the maximum number of partitions for a stage (25,000 partitions). This can occur with INSERT or REPLACE statements that generate large numbers of segments, since each segment is associated with a partition. If you encounter this limit, consider breaking up your INSERT or REPLACE statement into smaller statements that process less data per statement. |
maxPartitions : The limit on partitions which was exceeded |
TooManyClusteredByColumns |
Exceeded the maximum number of clustering columns for a stage (1,500 columns). This can occur with CLUSTERED BY , ORDER BY , or GROUP BY with a large number of columns. |
numColumns : The number of columns requested.maxColumns : The limit on columns which was exceeded.stage : The stage number exceeding the limit |
TooManyColumns |
Exceeded the maximum number of columns for a stage (2,000 columns). | numColumns : The number of columns requested.maxColumns : The limit on columns which was exceeded. |
TooManyWarnings |
Exceeded the maximum allowed number of warnings of a particular type. | rootErrorCode : The error code corresponding to the exception that exceeded the required limit. maxWarnings : Maximum number of warnings that are allowed for the corresponding rootErrorCode . |
TooManyWorkers |
Exceeded the maximum number of simultaneously-running workers. See the Limits table for more details. | workers : The number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage if multiple stages are running simultaneously. maxWorkers : The hard or soft limit on workers that was exceeded. If this is lower than the hard limit (1,000 workers), then you can increase the limit by adding more memory to each task. |
NotEnoughMemory |
Insufficient memory to launch a stage. | serverMemory : The amount of memory available to a single process.serverWorkers : The number of workers running in a single process.serverThreads : The number of threads in a single process. |
WorkerFailed |
A worker task failed unexpectedly. | errorMsg workerTaskId : The ID of the worker task. |
WorkerRpcFailed |
A remote procedure call to a worker task failed and could not recover. | workerTaskId : the id of the worker task |
UnknownError |
All other errors. | message |