Docs: Add multi-dimension partitioning doc; refactor native batch and separate into smaller topics. (#11983)

Adds documentation for multi-dimension partitioning. cc: @kfaraz
Refactors the native batch partitioning topic as follows:

Native batch ingestion covers parallel-index
Native batch simple task indexing covers index
Native batch input sources covers ioSource
Native batch ingestion with firehose covers deprecated firehose
This commit is contained in:
Charles Smith 2021-12-03 03:07:14 -08:00 committed by GitHub
parent 503384569a
commit 7ed46800c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1504 additions and 1386 deletions

View File

@ -530,7 +530,7 @@ This deep storage is used to interface with Cassandra. Note that the `druid-cas
#### HDFS input source
You can set the following property to specify permissible protocols for
the [HDFS input source](../ingestion/native-batch.md#hdfs-input-source) and the [HDFS firehose](../ingestion/native-batch.md#hdfsfirehose).
the [HDFS input source](../ingestion/native-batch-input-source.md#hdfs-input-source) and the [HDFS firehose](../ingestion/native-batch-firehose.md#hdfsfirehose).
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
@ -540,7 +540,7 @@ the [HDFS input source](../ingestion/native-batch.md#hdfs-input-source) and the
#### HTTP input source
You can set the following property to specify permissible protocols for
the [HTTP input source](../ingestion/native-batch.md#http-input-source) and the [HTTP firehose](../ingestion/native-batch.md#httpfirehose).
the [HTTP input source](../ingestion/native-batch-input-source.md#http-input-source) and the [HTTP firehose](../ingestion/native-batch-firehose.md#httpfirehose).
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
@ -552,8 +552,8 @@ the [HTTP input source](../ingestion/native-batch.md#http-input-source) and the
#### JDBC Connections to External Databases
You can use the following properties to specify permissible JDBC options for:
- [SQL input source](../ingestion/native-batch.md#sql-input-source)
- [SQL firehose](../ingestion/native-batch.md#sqlfirehose),
- [SQL input source](../ingestion/native-batch-input-source.md#sql-input-source)
- [SQL firehose](../ingestion/native-batch-firehose.md#sqlfirehose),
- [globally cached JDBC lookups](../development/extensions-core/lookups-cached-global.md#jdbc-lookup)
- [JDBC Data Fetcher for per-lookup caching](../development/extensions-core/druid-lookups.md#data-fetcher-layer).
@ -981,7 +981,7 @@ The below is a list of the supported configurations for auto compaction.
|`indexSpecForIntermediatePersists`|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](../ingestion/ingestion-spec.md#indexspec) for possible values.|no|
|`maxPendingPersists`|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with `maxRowsInMemory` * (2 + `maxPendingPersists`).|no (default = 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`pushTimeout`|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|no (default = 0)|
|`segmentWriteOutMediumFactory`|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](../ingestion/native-batch.md#segmentwriteoutmediumfactory).|no (default is the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used)|
|`segmentWriteOutMediumFactory`|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](../ingestion/native-batch-simple-task.md#segmentwriteoutmediumfactory).|no (default is the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used)|
|`maxNumConcurrentSubTasks`|Maximum number of worker tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is set to 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion. Check [Capacity Planning](../ingestion/native-batch.md#capacity-planning) for more details.|no (default = 1)|
|`maxRetry`|Maximum number of retries on task failures.|no (default = 3)|
|`maxNumSegmentsToMerge`|Max limit for the number of segments that a single task can merge at the same time in the second phase. Used only with `hashed` or `single_dim` partitionsSpec.|no (default = 100)|
@ -1409,7 +1409,7 @@ Additional peon configs include:
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|
|`druid.indexer.task.restoreTasksOnRestart`|If true, MiddleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch.md#druid-input-source) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch-input-source.md) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
|`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests served by a task's chat handler. Set to 0 to disable limiting.|0|
If the peon is running in remote mode, there must be an Overlord up and running. Peons in remote mode can set the following configurations:
@ -1474,7 +1474,7 @@ then the value from the configuration below is used:
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on Indexer restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|
|`druid.indexer.task.restoreTasksOnRestart`|If true, the Indexer will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch.md#druid-input-source) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch-input-source.md) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
|`druid.peon.taskActionClient.retry.minWait`|The minimum retry time to communicate with Overlord.|PT5S|
|`druid.peon.taskActionClient.retry.maxWait`|The maximum retry time to communicate with Overlord.|PT1M|
|`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of retries to communicate with Overlord.|60|

View File

@ -55,7 +55,7 @@ The storage account is shared with the one used for Rackspace's Cloud Files deep
As with the Azure blobstore, it is assumed to be gzipped if the extension ends in .gz
This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task).
This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native-batch.md).
Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.
Sample spec:

View File

@ -36,11 +36,11 @@ To configure connectivity to google cloud, run druid processes with `GOOGLE_APPL
### Reading data from Google Cloud Storage
The [Google Cloud Storage input source](../../ingestion/native-batch.md#google-cloud-storage-input-source) is supported by the [Parallel task](../../ingestion/native-batch.md#parallel-task)
The [Google Cloud Storage input source](../../ingestion/native-batch-input-source.md) is supported by the [Parallel task](../../ingestion/native-batch.md)
to read objects directly from Google Cloud Storage. If you use the [Hadoop task](../../ingestion/hadoop.md),
you can read data from Google Cloud Storage by specifying the paths in your [`inputSpec`](../../ingestion/hadoop.md#inputspec).
Objects can also be read directly from Google Cloud Storage via the [StaticGoogleBlobStoreFirehose](../../ingestion/native-batch.md#staticgoogleblobstorefirehose)
Objects can also be read directly from Google Cloud Storage via the [StaticGoogleBlobStoreFirehose](../../ingestion/native-batch-firehose.md#staticgoogleblobstorefirehose)
### Deep Storage

View File

@ -153,12 +153,12 @@ Tested with Druid 0.17.0, Hadoop 2.8.5 and gcs-connector jar 2.0.0-hadoop2.
### Native batch ingestion
The [HDFS input source](../../ingestion/native-batch.md#hdfs-input-source) is supported by the [Parallel task](../../ingestion/native-batch.md#parallel-task)
The [HDFS input source](../../ingestion/native-batch-input-source.md#hdfs-input-source) is supported by the [Parallel task](../../ingestion/native-batch.md)
to read files directly from the HDFS Storage. You may be able to read objects from cloud storage
with the HDFS input source, but we highly recommend to use a proper
[Input Source](../../ingestion/native-batch.md#input-sources) instead if possible because
it is simple to set up. For now, only the [S3 input source](../../ingestion/native-batch.md#s3-input-source)
and the [Google Cloud Storage input source](../../ingestion/native-batch.md#google-cloud-storage-input-source)
[Input Source](../../ingestion/native-batch-input-source.md) instead if possible because
it is simple to set up. For now, only the [S3 input source](../../ingestion/native-batch-input-source.md#s3-input-source)
and the [Google Cloud Storage input source](../../ingestion/native-batch-input-source.md#google-cloud-storage-input-source)
are supported for cloud storage types, and so you may still want to use the HDFS input source
to read from cloud storage other than those two.

View File

@ -127,8 +127,6 @@ If using the MariaDB connector library, set `druid.metadata.mysql.driver.driverC
### MySQL InputSource and Firehose
The MySQL extension provides a connector implementation of an [SqlInputSource](../../ingestion/native-batch.md#sql-input-source) and [SqlFirehose](../../ingestion/native-batch.md#firehoses-deprecated) which can be used to ingest data into Druid from a MySQL database. This works with either MySQL or MariaDB connector jars.
```json
{
"type": "index_parallel",

View File

@ -87,7 +87,7 @@ In most cases, the configuration options map directly to the [postgres JDBC conn
### PostgreSQL Firehose
The PostgreSQL extension provides an implementation of an [SqlFirehose](../../ingestion/native-batch.md#firehoses-deprecated) which can be used to ingest data into Druid from a PostgreSQL database.
The PostgreSQL extension provides an implementation of an [SqlFirehose](../../ingestion/native-batch-firehose.md) which can be used to ingest data into Druid from a PostgreSQL database.
```json
{

View File

@ -32,7 +32,7 @@ To use this Apache Druid extension, [include](../../development/extensions.md#lo
### Reading data from S3
The [S3 input source](../../ingestion/native-batch.md#s3-input-source) is supported by the [Parallel task](../../ingestion/native-batch.md#parallel-task)
The [S3 input source](../../ingestion/native-batch-input-source.md#s3-input-source) is supported by the [Parallel task](../../ingestion/native-batch.md)
to read objects directly from S3. If you use the [Hadoop task](../../ingestion/hadoop.md),
you can read data from S3 by specifying the S3 paths in your [`inputSpec`](../../ingestion/hadoop.md#inputspec).
@ -64,7 +64,7 @@ In addition to this you need to set additional configuration, specific for [deep
### S3 authentication methods
Druid uses the following credentials provider chain to connect to your S3 bucket (whether a deep storage bucket or source bucket).
**Note :** *You can override the default credentials provider chain for connecting to source bucket by specifying an access key and secret key using [Properties Object](../../ingestion/native-batch.md#s3-input-source) parameters in the ingestionSpec.*
**Note :** *You can override the default credentials provider chain for connecting to source bucket by specifying an access key and secret key using [Properties Object](../../ingestion/native-batch-input-source.md#s3-input-source) parameters in the ingestionSpec.*
|order|type|details|
|--------|-----------|-------|

View File

@ -123,7 +123,7 @@ To control the number of result segments per time chunk, you can set [maxRowsPer
> You can run multiple compaction tasks in parallel. For example, if you want to compact the data for a year, you are not limited to running a single task for the entire year. You can run 12 compaction tasks with month-long intervals.
A compaction task internally generates an `index` task spec for performing compaction work with some fixed parameters. For example, its `inputSource` is always the [DruidInputSource](native-batch.md#druid-input-source), and `dimensionsSpec` and `metricsSpec` include all dimensions and metrics of the input segments by default.
A compaction task internally generates an `index` task spec for performing compaction work with some fixed parameters. For example, its `inputSource` is always the [DruidInputSource](./native-batch-input-source.md), and `dimensionsSpec` and `metricsSpec` include all dimensions and metrics of the input segments by default.
Compaction tasks exit without doing anything and issue a failure status code in either of the following cases:
- If the interval you specify has no data segments loaded<br>

View File

@ -68,7 +68,7 @@ There are other types of `inputSpec` to enable reindexing and delta ingestion.
### Reindexing with Native Batch Ingestion
This section assumes you understand how to do batch ingestion without Hadoop using [native batch indexing](../ingestion/native-batch.md). Native batch indexing uses an `inputSource` to know where and how to read the input data. You can use the [`DruidInputSource`](native-batch.md#druid-input-source) to read data from segments inside Druid. You can use Parallel task (`index_parallel`) for all native batch reindexing tasks. Increase the `maxNumConcurrentSubTasks` to accommodate the amount of data your are reindexing. See [Capacity planning](native-batch.md#capacity-planning).
This section assumes you understand how to do batch ingestion without Hadoop using [native batch indexing](../ingestion/native-batch.md). Native batch indexing uses an `inputSource` to know where and how to read the input data. You can use the [`DruidInputSource`](./native-batch-input-source.md) to read data from segments inside Druid. You can use Parallel task (`index_parallel`) for all native batch reindexing tasks. Increase the `maxNumConcurrentSubTasks` to accommodate the amount of data your are reindexing. See [Capacity planning](native-batch.md#capacity-planning).
<a name="delete"></a>

View File

@ -86,7 +86,7 @@ You can use a [segment metadata query](../querying/segmentmetadataquery.md) for
## How can I Reindex existing data in Druid with schema changes?
You can use DruidInputSource with the [Parallel task](../ingestion/native-batch.md) to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
See [DruidInputSource](../ingestion/native-batch.md#druid-input-source) for more details.
See [DruidInputSource](./native-batch-input-source.md) for more details.
Or, if you use hadoop based ingestion, then you can use "dataSource" input spec to do reindexing.
See the [Update existing data](../ingestion/data-management.md#update) section of the data management page for more details.
@ -95,7 +95,7 @@ See the [Update existing data](../ingestion/data-management.md#update) section o
In a lot of situations you may want coarser granularity for older data. Example, any data older than 1 month has only hour level granularity but newer data has minute level granularity. This use case is same as re-indexing.
To do this use the [DruidInputSource](../ingestion/native-batch.md#druid-input-source) and run a [Parallel task](../ingestion/native-batch.md). The DruidInputSource will allow you to take in existing segments from Druid and aggregate them and feed them back into Druid. It will also allow you to filter the data in those segments while feeding it back in. This means if there are rows you want to delete, you can just filter them away during re-ingestion.
To do this use the [DruidInputSource](./native-batch-input-source.md) and run a [Parallel task](../ingestion/native-batch.md). The DruidInputSource will allow you to take in existing segments from Druid and aggregate them and feed them back into Druid. It will also allow you to filter the data in those segments while feeding it back in. This means if there are rows you want to delete, you can just filter them away during re-ingestion.
Typically the above will be run as a batch job to say everyday feed in a chunk of data and aggregate it.
Or, if you use hadoop based ingestion, then you can use "dataSource" input spec to do reindexing.

View File

@ -73,14 +73,14 @@ use the cluster resource of the existing cluster for batch ingestion.
This table compares the three available options:
| **Method** | [Native batch (parallel)](native-batch.md#parallel-task) | [Hadoop-based](hadoop.md) | [Native batch (simple)](native-batch.md#simple-task) |
| **Method** | [Native batch (parallel)](./native-batch.md) | [Hadoop-based](hadoop.md) | [Native batch (simple)](./native-batch-simple-task.md) |
|---|-----|--------------|------------|
| **Task type** | `index_parallel` | `index_hadoop` | `index` |
| **Parallel?** | Yes, if `inputFormat` is splittable and `maxNumConcurrentSubTasks` > 1 in `tuningConfig`. See [data format documentation](./data-formats.md) for details. | Yes, always. | No. Each task is single-threaded. |
| **Can append or overwrite?** | Yes, both. | Overwrite only. | Yes, both. |
| **External dependencies** | None. | Hadoop cluster (Druid submits Map/Reduce jobs). | None. |
| **Input locations** | Any [`inputSource`](./native-batch.md#input-sources). | Any Hadoop FileSystem or Druid datasource. | Any [`inputSource`](./native-batch.md#input-sources). |
| **Input locations** | Any [`inputSource`](./native-batch-input-source.md). | Any Hadoop FileSystem or Druid datasource. | Any [`inputSource`](./native-batch-input-source.md). |
| **File formats** | Any [`inputFormat`](./data-formats.md#input-format). | Any Hadoop InputFormat. | Any [`inputFormat`](./data-formats.md#input-format). |
| **[Rollup modes](./rollup.md)** | Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). | Always perfect. | Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). |
| **Partitioning options** | Dynamic, hash-based, and range-based partitioning methods are available. See [partitionsSpec](./native-batch.md#partitionsspec) for details.| Hash-based or range-based partitioning via [`partitionsSpec`](hadoop.md#partitionsspec). | Dynamic and hash-based partitioning methods are available. See [partitionsSpec](./native-batch.md#partitionsspec-1) for details. |
| **Partitioning options** | Dynamic, hash-based, and range-based partitioning methods are available. See [partitionsSpec](./native-batch.md#partitionsspec) for details.| Hash-based or range-based partitioning via [`partitionsSpec`](hadoop.md#partitionsspec). | Dynamic and hash-based partitioning methods are available. See [partitionsSpec](./native-batch.md#partitionsspec) for details. |

View File

@ -0,0 +1,341 @@
---
id: native-batch-firehose
title: "Native batch ingestion with firehose"
sidebar_label: "Firehose"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
Firehoses are deprecated in 0.17.0. It's highly recommended to use the [Native batch ingestion input sources](./native-batch-input-source.md) instead.
There are several firehoses readily available in Druid, some are meant for examples, others can be used directly in a production environment.
## StaticS3Firehose
> You need to include the [`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension to use the StaticS3Firehose.
This firehose ingests events from a predefined list of S3 objects.
This firehose is _splittable_ and can be used by the [Parallel task](./native-batch.md).
Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.
Sample spec:
```json
"firehose" : {
"type" : "static-s3",
"uris": ["s3://foo/bar/file.gz", "s3://bar/foo/file2.gz"]
}
```
This firehose provides caching and prefetching features. In the Simple task, a firehose can be read twice if intervals or
shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow.
Note that prefetching or caching isn't that useful in the Parallel task.
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `static-s3`.|None|yes|
|uris|JSON array of URIs where s3 files to be ingested are located.|None|`uris` or `prefixes` must be set|
|prefixes|JSON array of URI prefixes for the locations of s3 files to be ingested.|None|`uris` or `prefixes` must be set|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|prefetchTriggerBytes|Threshold to trigger prefetching s3 objects.|maxFetchCapacityBytes / 2|no|
|fetchTimeout|Timeout for fetching an s3 object.|60000|no|
|maxFetchRetry|Maximum retry for fetching an s3 object.|3|no|
## StaticGoogleBlobStoreFirehose
> You need to include the [`druid-google-extensions`](../development/extensions-core/google.md) as an extension to use the StaticGoogleBlobStoreFirehose.
This firehose ingests events, similar to the StaticS3Firehose, but from an Google Cloud Store.
As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz
This firehose is _splittable_ and can be used by the [Parallel task](./native-batch.md).
Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.
Sample spec:
```json
"firehose" : {
"type" : "static-google-blobstore",
"blobs": [
{
"bucket": "foo",
"path": "/path/to/your/file.json"
},
{
"bucket": "bar",
"path": "/another/path.json"
}
]
}
```
This firehose provides caching and prefetching features. In the Simple task, a firehose can be read twice if intervals or
shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow.
Note that prefetching or caching isn't that useful in the Parallel task.
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `static-google-blobstore`.|None|yes|
|blobs|JSON array of Google Blobs.|None|yes|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|prefetchTriggerBytes|Threshold to trigger prefetching Google Blobs.|maxFetchCapacityBytes / 2|no|
|fetchTimeout|Timeout for fetching a Google Blob.|60000|no|
|maxFetchRetry|Maximum retry for fetching a Google Blob.|3|no|
Google Blobs:
|property|description|default|required?|
|--------|-----------|-------|---------|
|bucket|Name of the Google Cloud bucket|None|yes|
|path|The path where data is located.|None|yes|
## HDFSFirehose
> You need to include the [`druid-hdfs-storage`](../development/extensions-core/hdfs.md) as an extension to use the HDFSFirehose.
This firehose ingests events from a predefined list of files from the HDFS storage.
This firehose is _splittable_ and can be used by the [Parallel task](./native-batch.md).
Since each split represents an HDFS file, each worker task of `index_parallel` will read files.
Sample spec:
```json
"firehose" : {
"type" : "hdfs",
"paths": "/foo/bar,/foo/baz"
}
```
This firehose provides caching and prefetching features. During native batch indexing, a firehose can be read twice if
`intervals` are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scanning
of files is slow.
Note that prefetching or caching isn't that useful in the Parallel task.
|Property|Description|Default|
|--------|-----------|-------|
|type|This should be `hdfs`.|none (required)|
|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths.|none (required)|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|
|prefetchTriggerBytes|Threshold to trigger prefetching files.|maxFetchCapacityBytes / 2|
|fetchTimeout|Timeout for fetching each file.|60000|
|maxFetchRetry|Maximum number of retries for fetching each file.|3|
You can also ingest from other storage using the HDFS firehose if the HDFS client supports that storage.
However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage.
If you want to use a non-hdfs protocol with the HDFS firehose, you need to include the protocol you want
in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS firehose security configuration](../configuration/index.md#hdfs-input-source) for more details.
## LocalFirehose
This Firehose can be used to read the data from files on local disk, and is mainly intended for proof-of-concept testing, and works with `string` typed parsers.
This Firehose is _splittable_ and can be used by [native parallel index tasks](native-batch.md).
Since each split represents a file in this Firehose, each worker task of `index_parallel` will read a file.
A sample local Firehose spec is shown below:
```json
{
"type": "local",
"filter" : "*.csv",
"baseDir": "/data/directory"
}
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "local".|yes|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information.|yes|
|baseDir|directory to search recursively for files to be ingested. |yes|
<a name="http-firehose"></a>
## HttpFirehose
This Firehose can be used to read the data from remote sites via HTTP, and works with `string` typed parsers.
This Firehose is _splittable_ and can be used by [native parallel index tasks](native-batch.md).
Since each split represents a file in this Firehose, each worker task of `index_parallel` will read a file.
A sample HTTP Firehose spec is shown below:
```json
{
"type": "http",
"uris": ["http://example.com/uri1", "http://example2.com/uri2"]
}
```
You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP firehose input sources.
The `http` and `https` protocols are allowed by default. See [HTTP firehose security configuration](../configuration/index.md#http-input-source) for more details.
The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header.
Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header.
|property|description|default|
|--------|-----------|-------|
|httpAuthenticationUsername|Username to use for authentication with specified URIs|None|
|httpAuthenticationPassword|PasswordProvider to use with specified URIs|None|
Example with authentication fields using the DefaultPassword provider (this requires the password to be in the ingestion spec):
```json
{
"type": "http",
"uris": ["http://example.com/uri1", "http://example2.com/uri2"],
"httpAuthenticationUsername": "username",
"httpAuthenticationPassword": "password123"
}
```
You can also use the other existing Druid PasswordProviders. Here is an example using the EnvironmentVariablePasswordProvider:
```json
{
"type": "http",
"uris": ["http://example.com/uri1", "http://example2.com/uri2"],
"httpAuthenticationUsername": "username",
"httpAuthenticationPassword": {
"type": "environment",
"variable": "HTTP_FIREHOSE_PW"
}
}
```
The below configurations can optionally be used for tuning the Firehose performance.
Note that prefetching or caching isn't that useful in the Parallel task.
|property|description|default|
|--------|-----------|-------|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|
|prefetchTriggerBytes|Threshold to trigger prefetching HTTP objects.|maxFetchCapacityBytes / 2|
|fetchTimeout|Timeout for fetching an HTTP object.|60000|
|maxFetchRetry|Maximum retries for fetching an HTTP object.|3|
<a name="segment-firehose"></a>
## IngestSegmentFirehose
This Firehose can be used to read the data from existing druid segments, potentially using a new schema and changing the name, dimensions, metrics, rollup, etc. of the segment.
This Firehose is _splittable_ and can be used by [native parallel index tasks](native-batch.md).
This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification.
A sample ingest Firehose spec is shown below:
```json
{
"type": "ingestSegment",
"dataSource": "wikipedia",
"interval": "2013-01-01/2013-01-02"
}
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "ingestSegment".|yes|
|dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes|
|interval|A String representing the ISO-8601 interval. This defines the time range to fetch the data over.|yes|
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](../querying/filters.md)|no|
|maxInputSegmentBytesPerTask|Deprecated. Use [Segments Split Hint Spec](./native-batch.md#segments-split-hint-spec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
<a name="sql-firehose"></a>
## SqlFirehose
This Firehose can be used to ingest events residing in an RDBMS. The database connection information is provided as part of the ingestion spec.
For each query, the results are fetched locally and indexed.
If there are multiple queries from which data needs to be indexed, queries are prefetched in the background, up to `maxFetchCapacityBytes` bytes.
This Firehose is _splittable_ and can be used by [native parallel index tasks](native-batch.md).
This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification. See the extension documentation for more detailed ingestion examples.
Requires one of the following extensions:
* [MySQL Metadata Store](../development/extensions-core/mysql.md).
* [PostgreSQL Metadata Store](../development/extensions-core/postgresql.md).
```json
{
"type": "sql",
"database": {
"type": "mysql",
"connectorConfig": {
"connectURI": "jdbc:mysql://host:port/schema",
"user": "user",
"password": "password"
}
},
"sqls": ["SELECT * FROM table1", "SELECT * FROM table2"]
}
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be "sql".||Yes|
|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support. The specified extension must be loaded into Druid:<br/><br/><ul><li>[mysql-metadata-storage](../development/extensions-core/mysql.md) for `mysql`</li><li> [postgresql-metadata-storage](../development/extensions-core/postgresql.md) extension for `postgresql`.</li></ul><br/><br/>You can selectively allow JDBC properties in `connectURI`. See [JDBC connections security config](../configuration/index.md#jdbc-connections-to-external-databases) for more details.||Yes|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|No|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|No|
|prefetchTriggerBytes|Threshold to trigger prefetching SQL result objects.|maxFetchCapacityBytes / 2|No|
|fetchTimeout|Timeout for fetching the result set.|60000|No|
|foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|false|No|
|sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.||Yes|
### Database
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|The type of database to query. Valid values are `mysql` and `postgresql`_||Yes|
|connectorConfig|Specify the database connection properties via `connectURI`, `user` and `password`||Yes|
## InlineFirehose
This Firehose can be used to read the data inlined in its own spec.
It can be used for demos or for quickly testing out parsing and schema, and works with `string` typed parsers.
A sample inline Firehose spec is shown below:
```json
{
"type": "inline",
"data": "0,values,formatted\n1,as,CSV"
}
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "inline".|yes|
|data|Inlined data to ingest.|yes|
## CombiningFirehose
This Firehose can be used to combine and merge data from a list of different Firehoses.
```json
{
"type": "combining",
"delegates": [ { firehose1 }, { firehose2 }, ... ]
}
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "combining"|yes|
|delegates|List of Firehoses to combine data from|yes|

View File

@ -0,0 +1,736 @@
---
id: native-batch-input-sources
title: "Native batch input sources"
sidebar_label: "Input sources"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
The input source defines where your index task reads data for Apache Druid native batch ingestion. Only the native parallel task and simple task support the input source.
For general information on native batch indexing and parallel task indexing, see [Native batch ingestion](./native-batch.md).
## S3 input source
> You need to include the [`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension to use the S3 input source.
The S3 input source reads objects directly from S3. You can specify either:
- a list of S3 URI strings
- a list of S3 location prefixes that attempts to list the contents and ingest
all objects contained within the locations.
The S3 input source is splittable. Therefore, you can use it with the [Parallel task](./native-batch.md). Each worker task of `index_parallel` reads one or multiple objects.
Sample specs:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"prefixes": ["s3://foo/bar/", "s3://bar/foo/"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
"properties": {
"accessKeyId": "KLJ78979SDFdS2",
"secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd"
}
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
"properties": {
"accessKeyId": "KLJ78979SDFdS2",
"secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd",
"assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3"
}
},
"inputFormat": {
"type": "json"
},
...
},
...
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `s3`.|None|yes|
|uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|properties|Properties Object for overriding the default S3 configuration. See below for more information.|None|No (defaults will be used if not given)
Note that the S3 input source will skip all empty objects only when `prefixes` is specified.
S3 Object:
|property|description|default|required?|
|--------|-----------|-------|---------|
|bucket|Name of the S3 bucket|None|yes|
|path|The path where data is located.|None|yes|
Properties Object:
|property|description|default|required?|
|--------|-----------|-------|---------|
|accessKeyId|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's access key|None|yes if secretAccessKey is given|
|secretAccessKey|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's secret key|None|yes if accessKeyId is given|
|assumeRoleArn|AWS ARN of the role to assume [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html). **assumeRoleArn** can be used either with the ingestion spec AWS credentials or with the default S3 credentials|None|no|
|assumeRoleExternalId|A unique identifier that might be required when you assume a role in another account [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html)|None|no|
> **Note:** If `accessKeyId` and `secretAccessKey` are not given, the default [S3 credentials provider chain](../development/extensions-core/s3.md#s3-authentication-methods) is used.
## Google Cloud Storage Input Source
> You need to include the [`druid-google-extensions`](../development/extensions-core/google.md) as an extension to use the Google Cloud Storage input source.
The Google Cloud Storage input source is to support reading objects directly
from Google Cloud Storage. Objects can be specified as list of Google
Cloud Storage URI strings. The Google Cloud Storage input source is splittable
and can be used by the [Parallel task](./native-batch.md), where each worker task of `index_parallel` will read
one or multiple objects.
Sample specs:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "google",
"uris": ["gs://foo/bar/file.json", "gs://bar/foo/file2.json"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "google",
"prefixes": ["gs://foo/bar/", "gs://bar/foo/"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "google",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `google`.|None|yes|
|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
Note that the Google Cloud Storage input source will skip all empty objects only when `prefixes` is specified.
Google Cloud Storage object:
|property|description|default|required?|
|--------|-----------|-------|---------|
|bucket|Name of the Google Cloud Storage bucket|None|yes|
|path|The path where data is located.|None|yes|
## Azure input source
> You need to include the [`druid-azure-extensions`](../development/extensions-core/azure.md) as an extension to use the Azure input source.
The Azure input source reads objects directly from Azure Blob store or Azure Data Lake sources. You can
specify objects as a list of file URI strings or prefixes. You can split the Azure input source for use with [Parallel task](./native-batch.md) indexing and each worker task reads one chunk of the split data.
Sample specs:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "azure",
"uris": ["azure://container/prefix1/file.json", "azure://container/prefix2/file2.json"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "azure",
"prefixes": ["azure://container/prefix1/", "azure://container/prefix2/"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "azure",
"objects": [
{ "bucket": "container", "path": "prefix1/file1.json"},
{ "bucket": "container", "path": "prefix2/file2.json"}
]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `azure`.|None|yes|
|uris|JSON array of URIs where the Azure objects to be ingested are located, in the form "azure://\<container>/\<path-to-file\>"|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Azure objects to ingest, in the form "azure://\<container>/\<prefix\>". Empty objects starting with one of the given prefixes are skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Azure objects to ingest.|None|`uris` or `prefixes` or `objects` must be set|
Note that the Azure input source skips all empty objects only when `prefixes` is specified.
The `objects` property is:
|property|description|default|required?|
|--------|-----------|-------|---------|
|bucket|Name of the Azure Blob Storage or Azure Data Lake container|None|yes|
|path|The path where data is located.|None|yes|
## HDFS Input Source
> You need to include the [`druid-hdfs-storage`](../development/extensions-core/hdfs.md) as an extension to use the HDFS input source.
The HDFS input source is to support reading files directly
from HDFS storage. File paths can be specified as an HDFS URI string or a list
of HDFS URI strings. The HDFS input source is splittable and can be used by the [Parallel task](./native-batch.md),
where each worker task of `index_parallel` will read one or multiple files.
Sample specs:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "hdfs",
"paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo"
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "hdfs",
"paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo"
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "hdfs",
"paths": "hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "hdfs",
"paths": ["hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `hdfs`.|None|yes|
|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes|
You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage.
However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage.
If you want to use a non-hdfs protocol with the HDFS input source, include the protocol
in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS input source security configuration](../configuration/index.md#hdfs-input-source) for more details.
## HTTP Input Source
The HTTP input source is to support reading files directly from remote sites via HTTP.
> **NOTE:** Ingestion tasks run under the operating system account that runs the Druid processes, for example the Indexer, Middle Manager, and Peon. This means any user who can submit an ingestion task can specify an `HTTPInputSource` at any location where the Druid process has permissions. For example, using `HTTPInputSource`, a console user has access to internal network locations where the they would be denied access otherwise.
> **WARNING:** `HTTPInputSource` is not limited to the HTTP or HTTPS protocols. It uses the Java `URI` class that supports HTTP, HTTPS, FTP, file, and jar protocols by default. This means you should never run Druid under the `root` account, because a user can use the file protocol to access any files on the local disk.
For more information about security best practices, see [Security overview](../operations/security-overview.md#best-practices).
The HTTP input source is _splittable_ and can be used by the [Parallel task](./native-batch.md),
where each worker task of `index_parallel` will read only one file. This input source does not support Split Hint Spec.
Sample specs:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "http",
"uris": ["http://example.com/uri1", "http://example2.com/uri2"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
Example with authentication fields using the DefaultPassword provider (this requires the password to be in the ingestion spec):
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "http",
"uris": ["http://example.com/uri1", "http://example2.com/uri2"],
"httpAuthenticationUsername": "username",
"httpAuthenticationPassword": "password123"
},
"inputFormat": {
"type": "json"
},
...
},
...
```
You can also use the other existing Druid PasswordProviders. Here is an example using the EnvironmentVariablePasswordProvider:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "http",
"uris": ["http://example.com/uri1", "http://example2.com/uri2"],
"httpAuthenticationUsername": "username",
"httpAuthenticationPassword": {
"type": "environment",
"variable": "HTTP_INPUT_SOURCE_PW"
}
},
"inputFormat": {
"type": "json"
},
...
},
...
}
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `http`|None|yes|
|uris|URIs of the input files. See below for the protocols allowed for URIs.|None|yes|
|httpAuthenticationUsername|Username to use for authentication with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no|
|httpAuthenticationPassword|PasswordProvider to use with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no|
You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP input sources.
The `http` and `https` protocols are allowed by default. See [HTTP input source security configuration](../configuration/index.md#http-input-source) for more details.
## Inline Input Source
The Inline input source can be used to read the data inlined in its own spec.
It can be used for demos or for quickly testing out parsing and schema.
Sample spec:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "inline",
"data": "0,values,formatted\n1,as,CSV"
},
"inputFormat": {
"type": "csv"
},
...
},
...
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "inline".|yes|
|data|Inlined data to ingest.|yes|
## Local Input Source
The Local input source is to support reading files directly from local storage,
and is mainly intended for proof-of-concept testing.
The Local input source is _splittable_ and can be used by the [Parallel task](./native-batch.md),
where each worker task of `index_parallel` will read one or multiple files.
Sample spec:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"filter" : "*.csv",
"baseDir": "/data/directory",
"files": ["/bar/foo", "/foo/bar"]
},
"inputFormat": {
"type": "csv"
},
...
},
...
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "local".|yes|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information.|yes if `baseDir` is specified|
|baseDir|Directory to search recursively for files to be ingested. Empty files under the `baseDir` will be skipped.|At least one of `baseDir` or `files` should be specified|
|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. Empty files will be skipped.|At least one of `baseDir` or `files` should be specified|
## Druid Input Source
The Druid input source is to support reading data directly from existing Druid segments,
potentially using a new schema and changing the name, dimensions, metrics, rollup, etc. of the segment.
The Druid input source is _splittable_ and can be used by the [Parallel task](./native-batch.md).
This input source has a fixed input format for reading from Druid segments;
no `inputFormat` field needs to be specified in the ingestion spec when using this input source.
|property|description|required?|
|--------|-----------|---------|
|type|This should be "druid".|yes|
|dataSource|A String defining the Druid datasource to fetch rows from|yes|
|interval|A String representing an ISO-8601 interval, which defines the time range to fetch the data over.|yes|
|filter| See [Filters](../querying/filters.md). Only rows that match the filter, if specified, will be returned.|no|
The Druid input source can be used for a variety of purposes, including:
- Creating new datasources that are rolled-up copies of existing datasources.
- Changing the [partitioning or sorting](./partitioning.md) of a datasource to improve performance.
- Updating or removing rows using a [`transformSpec`](./ingestion-spec.md#transformspec).
When using the Druid input source, the timestamp column shows up as a numeric field named `__time` set to the number
of milliseconds since the epoch (January 1, 1970 00:00:00 UTC). It is common to use this in the timestampSpec, if you
want the output timestamp to be equivalent to the input timestamp. In this case, set the timestamp column to `__time`
and the format to `auto` or `millis`.
It is OK for the input and output datasources to be the same. In this case, newly generated data will overwrite the
previous data for the intervals specified in the `granularitySpec`. Generally, if you are going to do this, it is a
good idea to test out your reindexing by writing to a separate datasource before overwriting your main one.
Alternatively, if your goals can be satisfied by [compaction](compaction.md), consider that instead as a simpler
approach.
An example task spec is shown below. It reads from a hypothetical raw datasource `wikipedia_raw` and creates a new
rolled-up datasource `wikipedia_rollup` by grouping on hour, "countryName", and "page".
```json
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "wikipedia_rollup",
"timestampSpec": {
"column": "__time",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
"countryName",
"page"
]
},
"metricsSpec": [
{
"type": "count",
"name": "cnt"
}
],
"granularitySpec": {
"type": "uniform",
"queryGranularity": "HOUR",
"segmentGranularity": "DAY",
"intervals": ["2016-06-27/P1D"],
"rollup": true
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "druid",
"dataSource": "wikipedia_raw",
"interval": "2016-06-27/P1D"
}
},
"tuningConfig": {
"type": "index_parallel",
"partitionsSpec": {
"type": "hashed"
},
"forceGuaranteedRollup": true,
"maxNumConcurrentSubTasks": 1
}
}
}
```
> Note: Older versions (0.19 and earlier) did not respect the timestampSpec when using the Druid input source. If you
> have ingestion specs that rely on this and cannot rewrite them, set
> [`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`](../configuration/index.md#indexer-general-configuration)
> to `true` to enable a compatibility mode where the timestampSpec is ignored.
## SQL Input Source
The SQL input source is used to read data directly from RDBMS.
The SQL input source is _splittable_ and can be used by the [Parallel task](./native-batch.md), where each worker task will read from one SQL query from the list of queries.
This input source does not support Split Hint Spec.
Since this input source has a fixed input format for reading events, no `inputFormat` field needs to be specified in the ingestion spec when using this input source.
Please refer to the Recommended practices section below before using this input source.
|property|description|required?|
|--------|-----------|---------|
|type|This should be "sql".|Yes|
|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support. The specified extension must be loaded into Druid:<br/><br/><ul><li>[mysql-metadata-storage](../development/extensions-core/mysql.md) for `mysql`</li><li> [postgresql-metadata-storage](../development/extensions-core/postgresql.md) extension for `postgresql`.</li></ul><br/><br/>You can selectively allow JDBC properties in `connectURI`. See [JDBC connections security config](../configuration/index.md#jdbc-connections-to-external-databases) for more details.|Yes|
|foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|No|
|sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.|Yes|
An example SqlInputSource spec is shown below:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "sql",
"database": {
"type": "mysql",
"connectorConfig": {
"connectURI": "jdbc:mysql://host:port/schema",
"user": "user",
"password": "password"
}
},
"sqls": ["SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'"]
}
},
...
```
The spec above will read all events from two separate SQLs for the interval `2013-01-01/2013-01-02`.
Each of the SQL queries will be run in its own sub-task and thus for the above example, there would be two sub-tasks.
**Recommended practices**
Compared to the other native batch InputSources, SQL InputSource behaves differently in terms of reading the input data and so it would be helpful to consider the following points before using this InputSource in a production environment:
* During indexing, each sub-task would execute one of the SQL queries and the results are stored locally on disk. The sub-tasks then proceed to read the data from these local input files and generate segments. Presently, there isnt any restriction on the size of the generated files and this would require the MiddleManagers or Indexers to have sufficient disk capacity based on the volume of data being indexed.
* Filtering the SQL queries based on the intervals specified in the `granularitySpec` can avoid unwanted data being retrieved and stored locally by the indexing sub-tasks. For example, if the `intervals` specified in the `granularitySpec` is `["2013-01-01/2013-01-02"]` and the SQL query is `SELECT * FROM table1`, `SqlInputSource` will read all the data for `table1` based on the query, even though only data between the intervals specified will be indexed into Druid.
* Pagination may be used on the SQL queries to ensure that each query pulls a similar amount of data, thereby improving the efficiency of the sub-tasks.
* Similar to file-based input formats, any updates to existing data will replace the data in segments specific to the intervals specified in the `granularitySpec`.
## Combining input sources
The Combining input source is used to read data from multiple InputSources. This input source should be only used if all the delegate input sources are
_splittable_ and can be used by the [Parallel task](./native-batch.md). This input source will identify the splits from its delegates and each split will be processed by a worker task. Similar to other input sources, this input source supports a single `inputFormat`. Therefore, please note that delegate input sources requiring an `inputFormat` must have the same format for input data.
|property|description|required?|
|--------|-----------|---------|
|type|This should be "combining".|Yes|
|delegates|List of _splittable_ InputSources to read data from.|Yes|
Sample spec:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "combining",
"delegates" : [
{
"type": "local",
"filter" : "*.csv",
"baseDir": "/data/directory",
"files": ["/bar/foo", "/foo/bar"]
},
{
"type": "druid",
"dataSource": "wikipedia",
"interval": "2013-01-01/2013-01-02"
}
]
},
"inputFormat": {
"type": "csv"
},
...
},
...
```

View File

@ -0,0 +1,185 @@
---
id: native-batch-simple-task
title: "Native batch simple task indexing"
sidebar_label: "Simple task indexing"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
The simple task (type `index`) is designed to ingest small data sets into Apache Druid. The task executes within the indexing service. For general information on native batch indexing and parallel task indexing, see [Native batch ingestion](./native-batch.md).
## Task syntax
A sample task is shown below:
```json
{
"type" : "index",
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["country", "page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent",,"region","city"],
"dimensionExclusions" : []
},
"metricsSpec" : [
{
"type" : "count",
"name" : "count"
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
},
{
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE",
"intervals" : [ "2013-08-31/2013-09-01" ]
}
},
"ioConfig" : {
"type" : "index",
"inputSource" : {
"type" : "local",
"baseDir" : "examples/indexing/",
"filter" : "wikipedia_data.json"
},
"inputFormat": {
"type": "json"
}
},
"tuningConfig" : {
"type" : "index",
"partitionsSpec": {
"type": "single_dim",
"partitionDimension": "country",
"targetRowsPerSegment": 5000000
}
}
}
}
```
|property|description|required?|
|--------|-----------|---------|
|type|The task type, this should always be `index`.|yes|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, and date-time stamp. |no|
|spec|The ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details. |yes|
|context|Context containing various task configuration parameters. See below for more details.|no|
### `dataSchema`
This field is required.
See the [`dataSchema`](./ingestion-spec.md#dataschema) section of the ingestion docs for details.
If you do not specify `intervals` explicitly in your dataSchema's granularitySpec, the Local Index Task will do an extra
pass over the data to determine the range to lock when it starts up. If you specify `intervals` explicitly, any rows
outside the specified intervals will be thrown away. We recommend setting `intervals` explicitly if you know the time
range of the data because it allows the task to skip the extra pass, and so that you don't accidentally replace data outside
that range if there's some stray data with unexpected timestamps.
### `ioConfig`
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|The task type, this should always be "index".|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no|
### `tuningConfig`
The tuningConfig is optional and default parameters will be used if no tuningConfig is specified. See below for more details.
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|The task type, this should always be "index".|none|yes|
|maxRowsPerSegment|Deprecated. Use `partitionsSpec` instead. Used in sharding. Determines how many rows are in each segment.|5000000|no|
|maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no|
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). Note that `maxBytesInMemory` also includes heap usage of artifacts created from intermediary persists. This means that after every persist, the amount of `maxBytesInMemory` until next persist will decreases, and task will fail when the sum of bytes of all intermediary persisted artifacts exceeds `maxBytesInMemory`.|1/6 of max JVM memory|no|
|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|partitionDimensions|Deprecated. Use `partitionsSpec` instead. The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no|
|partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` if `forceGuaranteedRollup` = true|no|
|indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](ingestion-spec.md#indexspec)|null|no|
|indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](ingestion-spec.md#indexspec) for possible values.|same as indexSpec|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](rollup.md). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. This flag cannot be used with `appendToExisting` of IOConfig. For more details, see the below __Segment pushing modes__ section.|false|no|
|reportParseExceptions|DEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|false|no|
|pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no|
|segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentwriteoutmediumfactory).|Not specified, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used|no|
|logParseExceptions|If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.|false|no|
|maxParseExceptions|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set.|unlimited|no|
|maxSavedParseExceptions|When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](tasks.md#task-reports). Overridden if `reportParseExceptions` is set.|0|no|
### `partitionsSpec`
PartitionsSpec is to describe the secondary partitioning method.
You should use different partitionsSpec depending on the [rollup mode](rollup.md) you want.
For perfect rollup, you should use `hashed`.
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `hashed`|none|yes|
|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
|partitionFunction|A function to compute hash of partition dimensions. See [Hash partition function](./native-batch.md#hash-partition-function)|`murmur3_32_abs`|no|
For best-effort rollup, you should use `dynamic`.
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `dynamic`|none|yes|
|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
|maxTotalRows|Total number of rows in segments waiting for being pushed.|20000000|no|
### `segmentWriteOutMediumFactory`
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|See [Additional Peon Configuration: SegmentWriteOutMediumFactory](../configuration/index.md#segmentwriteoutmediumfactory) for explanation and available options.|yes|
### Segment pushing modes
While ingesting data using the simple task indexing, Druid creates segments from the input data and pushes them. For segment pushing,
the simple task index supports the following segment pushing modes based upon your type of [rollup](./rollup.md):
- Bulk pushing mode: Used for perfect rollup. Druid pushes every segment at the very end of the index task. Until then, Druid stores created segments in memory and local storage of the service running the index task. This mode can cause problems if you have limited storage capacity, and is not recommended to use in production.
To enable bulk pushing mode, set `forceGuaranteedRollup` in your TuningConfig. You can not use bulk pushing with `appendToExisting` in your IOConfig.
- Incremental pushing mode: Used for best-effort rollup. Druid pushes segments are incrementally during the course of the indexing task. The index task collects data and stores created segments in the memory and disks of the services running the task until the total number of collected rows exceeds `maxTotalRows`. At that point the index task immediately pushes all segments created up until that moment, cleans up pushed segments, and continues to ingest the remaining data.

File diff suppressed because it is too large Load Diff

View File

@ -45,7 +45,7 @@ the Overlord APIs.
A report containing information about the number of rows ingested, and any parse exceptions that occurred is available for both completed tasks and running tasks.
The reporting feature is supported by the [simple native batch task](../ingestion/native-batch.md#simple-task), the Hadoop batch task, and Kafka and Kinesis ingestion tasks.
The reporting feature is supported by the [simple native batch task](../ingestion/native-batch-simple-task.md), the Hadoop batch task, and Kafka and Kinesis ingestion tasks.
### Completion report
@ -184,7 +184,7 @@ The `errorMsg` field shows a message describing the error that caused a task to
### Row stats
The non-parallel [simple native batch task](../ingestion/native-batch.md#simple-task), the Hadoop batch task, and Kafka and Kinesis ingestion tasks support retrieval of row stats while the task is running.
The non-parallel [simple native batch task](./native-batch-simple-task.md), the Hadoop batch task, and Kafka and Kinesis ingestion tasks support retrieval of row stats while the task is running.
The live report can be accessed with a GET to the following URL on a Peon running a task:
@ -396,11 +396,11 @@ You can configure retention periods for logs in milliseconds by setting `druid.i
### `index`
See [Native batch ingestion (simple task)](native-batch.md#simple-task).
See [Native batch ingestion (simple task)](./native-batch-simple-task.md).
### `index_parallel`
See [Native batch ingestion (parallel task)](native-batch.md#parallel-task).
See [Native batch ingestion (parallel task)](native-batch.md).
### `index_sub`

View File

@ -270,7 +270,7 @@ If you are only using [Hadoop-based batch ingestion](../ingestion/hadoop.md) wit
###### Parallel native ingestion
If you are using [parallel native batch ingestion](../ingestion/native-batch.md#parallel-task), allocating more available task slots is a good idea and will allow greater ingestion concurrency.
If you are using [parallel native batch ingestion](../ingestion/native-batch.md), allocating more available task slots is a good idea and will allow greater ingestion concurrency.
### Coordinator

View File

@ -1089,6 +1089,7 @@ appendToExisting
baseDir
chatHandlerNumRetries
chatHandlerTimeout
cityName
connectorConfig
countryName
dataSchema's
@ -1118,6 +1119,39 @@ totalNumMergeTasks
StaticS3Firehose
prefetchTriggerBytes
awaitSegmentAvailabilityTimeoutMillis
- ../docs/ingestion/native-batch-firehose.md
LocalFirehose
baseDir
HttpFirehose
httpAuthenticationUsername
DefaultPassword
PasswordProviders
EnvironmentVariablePasswordProvider
ingestSegment
maxInputSegmentBytesPerTask
150MB
foldCase
sqls
connectorConfig
InlineFirehose
CombiningFirehose
httpAuthenticationPassword
- ../docs/ingestion/native-batch-input-source.md
accessKeyId
secretAccessKey
accessKeyId
httpAuthenticationPassword
countryName
- ../docs/ingestion/native-batch-simple-task.md
dataSchema's
appendToExisting
dropExisting
timeChunk
PartitionsSpec
forceGuaranteedRollup
reportParseExceptions
pushTimeout
segmentWriteOutMediumFactory
- ../docs/ingestion/schema-design.md
product_category
product_id

View File

@ -55,6 +55,9 @@
"label": "Batch ingestion",
"ids": [
"ingestion/native-batch",
"ingestion/native-batch-simple-task",
"ingestion/native-batch-input-sources",
"ingestion/native-batch-firehose",
"ingestion/hadoop"
]
},