|hadoopDependencyCoordinates|A JSON array of Hadoop dependency coordinates that Druid will use, this property will override the default Hadoop coordinates. Once specified, Druid will look for those Hadoop dependencies from the location specified by `druid.extensions.hadoopDependenciesDir`|no|
Also note that Druid automatically computes the classpath for Hadoop job containers that run in the Hadoop cluster. But in case of conflicts between Hadoop and Druid's dependencies, you can manually specify the classpath by setting `druid.extensions.hadoopContainerDruidClasspath` property. See the extensions config in [base druid configuration](../configuration/index.html#extensions).
|segmentOutputPath|String|The path to dump segments into.|Only used by the [Command-line Hadoop indexer](#cli). This field must be null otherwise.|
|metadataUpdateSpec|Object|A specification of how to update the metadata for the druid cluster these segments belong to.|Only used by the [Command-line Hadoop indexer](#cli). This field must be null otherwise.|
You can also read from cloud storage such as AWS S3 or Google Cloud Storage.
To do so, you need to install the necessary library under Druid's classpath in _all MiddleManager or Indexer processes_.
For S3, you can run the below command to install the [Hadoop AWS module](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
For Google Cloud Storage, you need to install [GCS connector jar](https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md)
under `${DRUID_HOME}/hadoop-dependencies` in _all MiddleManager or Indexer processes_.
Once you install the GCS Connector jar in all MiddleManager and Indexer processes, you can put
your Google Cloud Storage paths in the inputSpec with the below job properties.
For more configurations, see the [instructions to configure Hadoop](https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md#configure-hadoop),
A type of inputSpec that expects data to be organized in directories according to datetime using the path format: `y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX` (where date is represented by lowercase and time is represented by uppercase).
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|dataGranularity|String|Specifies the granularity to expect the data at, e.g. hour means to expect directories `y=XXXX/m=XX/d=XX/H=XX`.|yes|
|inputFormat|String|Specifies the Hadoop InputFormat class to use. e.g. `org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat` |no|
|inputPath|String|Base path to append the datetime path to.|yes|
|filePattern|String|Pattern that files should match to be included.|yes|
|pathFormat|String|Joda datetime format for each directory. Default value is `"'y'=yyyy/'m'=MM/'d'=dd/'H'=HH"`, or see [Joda documentation](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)|no|
For example, if the sample config were run with the interval 2012-06-01/2012-06-02, it would expect data at the paths:
This is a type of `inputSpec` that reads data already stored inside Druid. This is used to allow "re-indexing" data and for "delta-ingestion" described later in `multi` type inputSpec.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String.|This should always be 'dataSource'.|yes|
|ingestionSpec|JSON object.|Specification of Druid segments to be loaded. See below.|yes|
|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. With -1, druid calculates max split size based on user specified number of map task(mapred.map.tasks or mapreduce.job.maps). By default, one split is made for one segment. maxSplitSize is specified in bytes.|no|
|useNewAggs|Boolean|If "false", then list of aggregators in "metricsSpec" of hadoop indexing task must be same as that used in original indexing task while ingesting raw data. Default value is "false". This field can be set to "true" when "inputSpec" type is "dataSource" and not "multi" to enable arbitrary aggregators while reindexing. See below for "multi" type support for delta-ingestion.|no|
Here is what goes inside `ingestionSpec`:
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|dataSource|String|Druid dataSource name from which you are loading the data.|yes|
|intervals|List|A list of strings representing ISO-8601 Intervals.|yes|
|segments|List|List of segments from which to read data from, by default it is obtained automatically. You can obtain list of segments to put here by making a POST query to Coordinator at url /druid/coordinator/v1/metadata/datasources/segments?full with list of intervals specified in the request payload, e.g. ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]. You may want to provide this list manually in order to ensure that segments read are exactly same as they were at the time of task submission, task would fail if the list provided by the user does not match with state of database when the task actually runs.|no|
|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no|
|metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no|
|ignoreWhenNoSegments|boolean|Whether to ignore this ingestionSpec if no segments were found. Default behavior is to throw error when no segments were found.|no|
This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. You can also use a `multi` inputSpec to combine data from multiple dataSources. However, each particular dataSource can only be specified one time.
Note that, "useNewAggs" must be set to default value false to support delta-ingestion.
It is STRONGLY RECOMMENDED to provide list of segments in `dataSource` inputSpec explicitly so that your delta ingestion task is idempotent. You can obtain that list of segments by making following call to the Coordinator.
POST `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments?full`
Request Body: [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
|workingPath|String|The working path to use for intermediate results (results between Hadoop jobs).|Only used by the [Command-line Hadoop indexer](#cli). The default is '/tmp/druid-indexing'. This field must be null otherwise.|
|version|String|The version of created segments. Ignored for HadoopIndexTask unless useExplicitVersion is set to true|no (default == datetime that indexing starts at)|
|partitionsSpec|Object|A specification of how to partition each time bucket into segments. Absence of this property means no partitioning will occur. See [`partitionsSpec`](#partitionsspec) below.|no (default == 'hashed')|
|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. Note that this is the number of post-aggregation rows which may not be equal to the number of input events due to roll-up. This is used to manage the required JVM heap size. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)|
|maxBytesInMemory|Long|The number of bytes to aggregate in heap memory before persisting. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists).|no (default == One-sixth of max JVM memory)|
|leaveIntermediate|Boolean|Leave behind intermediate files (for debugging) in the workingPath when a job completes, whether it passes or fails.|no (default == false)|
|cleanupOnFailure|Boolean|Clean up intermediate files when a job fails (unless leaveIntermediate is on).|no (default == true)|
|overwriteFiles|Boolean|Override existing files found during indexing.|no (default == false)|
|ignoreInvalidRows|Boolean|DEPRECATED. Ignore rows found to have problems. If false, any exception encountered during parsing will be thrown and will halt ingestion; if true, unparseable rows and fields will be skipped. If `maxParseExceptions` is defined, this property is ignored.|no (default == false)|
|combineText|Boolean|Use CombineTextInputFormat to combine multiple files into a file split. This can speed up Hadoop jobs when processing a large number of small files.|no (default == false)|
|useCombiner|Boolean|Use Hadoop combiner to merge rows at mapper if possible.|no (default == false)|
|jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)|
|indexSpec|Object|Tune how data is indexed. See [`indexSpec`](index.md#indexspec) on the main ingestion page for more information.|no|
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [`indexSpec`](index.md#indexspec) for possible values.|no (default = same as indexSpec)|
|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and CPU usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
|forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check [Partitioning specification](#partitionsspec). This option can be useful when you need to append more data to existing dataSource.|no (default = false)|
|logParseExceptions|Boolean|If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.|false|no|
|maxParseExceptions|Integer|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overrides `ignoreInvalidRows` if `maxParseExceptions` is defined.|unlimited|no|
|useYarnRMJobStatusFallback|Boolean|If the Hadoop jobs created by the indexing task are unable to retrieve their completion status from the JobHistory server, and this parameter is true, the indexing task will try to fetch the application status from `http://<yarn-rm-address>/ws/v1/cluster/apps/<application-id>`, where `<yarn-rm-address>` is the value of `yarn.resourcemanager.webapp.address` in your Hadoop configuration. This flag is intended as a fallback for cases where an indexing task's jobs succeed, but the JobHistory server is unavailable, causing the indexing task to fail because it cannot determine the job statuses.|no (default = true)|
Hadoop's [MapReduce documentation](https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml) lists the possible configuration parameters.
With some Hadoop distributions, it may be necessary to set `mapreduce.job.classpath` or `mapreduce.job.user.classpath.first`
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB. Defaults to 5000000 if `numShards` is not set.|either this or `numShards`|
|targetPartitionSize|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`|
|maxRowsPerSegment|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`|
|numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or `maxRowsPerSegment`|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `targetRowsPerSegment` is set.|no|
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes|
|targetPartitionSize|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|no|
|maxPartitionSize|Deprecated. Use `maxRowsPerSegment` instead. Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetPartitionSize`.|no|
|partitionDimension|The dimension to partition on. Leave blank to select a dimension automatically.|no|
|assumeGrouped|Assume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.|no|
If you have a remote Hadoop cluster, make sure to include the folder holding your configuration `*.xml` files in your Druid `_common` configuration folder.
If you are having dependency problems with your version of Hadoop and the version compiled with Druid, please see [these docs](../operations/other-hadoop.md).
- "--coordinate" - provide a version of Apache Hadoop to use. This property will override the default Hadoop coordinates. Once specified, Apache Druid will look for those Hadoop dependencies from the location specified by `druid.extensions.hadoopDependenciesDir`.
- "--no-default-hadoop" - don't pull down the default hadoop version
### Spec file
The spec file needs to contain a JSON object where the contents are the same as the "spec" field in the Hadoop index task. See [Hadoop Batch Ingestion](../ingestion/hadoop.md) for details on the spec format.
In addition, a `metadataUpdateSpec` and `segmentOutputPath` field needs to be added to the ioConfig:
and a `workingPath` field needs to be added to the tuningConfig:
```
"tuningConfig" : {
...
"workingPath": "/tmp",
...
}
```
#### Metadata Update Job Spec
This is a specification of the properties that tell the job how to update metadata such that the Druid cluster will see the output segments and load them.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|"metadata" is the only value available.|yes|
|connectURI|String|A valid JDBC url to metadata storage.|yes|
|user|String|Username for db.|yes|
|password|String|password for db.|yes|
|segmentTable|String|Table to use in DB.|yes|
These properties should parrot what you have configured for your [Coordinator](../design/coordinator.md).
#### segmentOutputPath Config
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|segmentOutputPath|String|the path to dump segments into.|yes|
#### workingPath Config
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|workingPath|String|the working path to use for intermediate results (results between Hadoop jobs).|no (default == '/tmp/druid-indexing')|
Please note that the command line Hadoop indexer doesn't have the locking capabilities of the indexing service, so if you choose to use it,
you have to take caution to not override segments created by real-time processing (if you that a real-time pipeline set up).