---
id: index
title: "Ingestion"
---
## Overview
All data in Druid is organized into _segments_, which are data files that generally have up to a few million rows each.
Loading data in Druid is called _ingestion_ or _indexing_ and consists of reading data from a source system and creating
segments based on that data.
In most ingestion methods, the work of loading data is done by Druid [MiddleManager](../design/middlemanager.md) processes
(or the [Indexer](../design/indexer.md) processes). One exception is
Hadoop-based ingestion, where this work is instead done using a Hadoop MapReduce job on YARN (although MiddleManager or Indexer
processes are still involved in starting and monitoring the Hadoop jobs). Once segments have been generated and stored
in [deep storage](../dependencies/deep-storage.md), they will be loaded by Historical processes. For more details on
how this works under the hood, see the [Storage design](../design/architecture.md#storage-design) section of Druid's design
documentation.
## How to use this documentation
This **page you are currently reading** provides information about universal Druid ingestion concepts, and about
configurations that are common to all [ingestion methods](#ingestion-methods).
The **individual pages for each ingestion method** provide additional information about concepts and configurations
that are unique to each ingestion method.
We recommend reading (or at least skimming) this universal page first, and then referring to the page for the
ingestion method or methods that you have chosen.
## Ingestion methods
The table below lists Druid's most common data ingestion methods, along with comparisons to help you choose
the best one for your situation. Each ingestion method supports its own set of source systems to pull from. For details
about how each method works, as well as configuration properties specific to that method, check out its documentation
page.
### Streaming
The most recommended, and most popular, method of streaming ingestion is the
[Kafka indexing service](../development/extensions-core/kafka-ingestion.md) that reads directly from Kafka. The Kinesis
indexing service also works well if you prefer Kinesis.
This table compares the major available options:
| **Method** | [Kafka](../development/extensions-core/kafka-ingestion.md) | [Kinesis](../development/extensions-core/kinesis-ingestion.md) | [Tranquility](tranquility.md) |
|---|-----|--------------|------------|
| **Supervisor type** | `kafka` | `kinesis` | N/A |
| **How it works** | Druid reads directly from Apache Kafka. | Druid reads directly from Amazon Kinesis. | Tranquility, a library that ships separately from Druid, is used to push data into Druid. |
| **Can ingest late data?** | Yes | Yes | No (late data is dropped based on the `windowPeriod` config) |
| **Exactly-once guarantees?** | Yes | Yes | No |
### Batch
When doing batch loads from files, you should use one-time [tasks](tasks.md), and you have three options: `index_parallel` (native batch; parallel), `index_hadoop` (Hadoop-based),
or `index` (native batch; single-task).
In general, we recommend native batch whenever it meets your needs, since the setup is simpler (it does not depend on
an external Hadoop cluster). However, there are still scenarios where Hadoop-based batch ingestion might be a better choice,
for example when you already have a running Hadoop cluster and want to
use the cluster resource of the existing cluster for batch ingestion.
This table compares the three available options:
| **Method** | [Native batch (parallel)](native-batch.md#parallel-task) | [Hadoop-based](hadoop.md) | [Native batch (simple)](native-batch.md#simple-task) |
|---|-----|--------------|------------|
| **Task type** | `index_parallel` | `index_hadoop` | `index` |
| **Parallel?** | Yes, if `inputFormat` is splittable and `maxNumConcurrentSubTasks` > 1 in `tuningConfig`. See [data format documentation](./data-formats.md) for details. | Yes, always. | No. Each task is single-threaded. |
| **Can append or overwrite?** | Yes, both. | Overwrite only. | Yes, both. |
| **External dependencies** | None. | Hadoop cluster (Druid submits Map/Reduce jobs). | None. |
| **Input locations** | Any [`inputSource`](./native-batch.md#input-sources). | Any Hadoop FileSystem or Druid datasource. | Any [`inputSource`](./native-batch.md#input-sources). |
| **File formats** | Any [`inputFormat`](./data-formats.md#input-format). | Any Hadoop InputFormat. | Any [`inputFormat`](./data-formats.md#input-format). |
| **[Rollup modes](#rollup)** | Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). | Always perfect. | Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). |
| **Partitioning options** | Dynamic, hash-based, and range-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec) for details. | Hash-based or range-based partitioning via [`partitionsSpec`](hadoop.md#partitionsspec). | Dynamic and hash-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec-1) for details. |
## Druid's data model
### Datasources
Druid data is stored in datasources, which are similar to tables in a traditional RDBMS. Druid
offers a unique data modeling system that bears similarity to both relational and timeseries models.
### Primary timestamp
Druid schemas must always include a primary timestamp. The primary timestamp is used for
[partitioning and sorting](#partitioning) your data. Druid queries are able to rapidly identify and retrieve data
corresponding to time ranges of the primary timestamp column. Druid is also able to use the primary timestamp column
for time-based [data management operations](data-management.md) such as dropping time chunks, overwriting time chunks,
and time-based retention rules.
The primary timestamp is parsed based on the [`timestampSpec`](#timestampspec). In addition, the
[`granularitySpec`](#granularityspec) controls other important operations that are based on the primary timestamp.
Regardless of which input field the primary timestamp is read from, it will always be stored as a column named `__time`
in your Druid datasource.
If you have more than one timestamp column, you can store the others as
[secondary timestamps](schema-design.md#secondary-timestamps).
### Dimensions
Dimensions are columns that are stored as-is and can be used for any purpose. You can group, filter, or apply
aggregators to dimensions at query time in an ad-hoc manner. If you run with [rollup](#rollup) disabled, then the set of
dimensions is simply treated like a set of columns to ingest, and behaves exactly as you would expect from a typical
database that does not support a rollup feature.
Dimensions are configured through the [`dimensionsSpec`](#dimensionsspec).
### Metrics
Metrics are columns that are stored in an aggregated form. They are most useful when [rollup](#rollup) is enabled.
Specifying a metric allows you to choose an aggregation function for Druid to apply to each row during ingestion. This
has two benefits:
1. If [rollup](#rollup) is enabled, multiple rows can be collapsed into one row even while retaining summary
information. In the [rollup tutorial](../tutorials/tutorial-rollup.md), this is used to collapse netflow data to a
single row per `(minute, srcIP, dstIP)` tuple, while retaining aggregate information about total packet and byte counts.
2. Some aggregators, especially approximate ones, can be computed faster at query time even on non-rolled-up data if
they are partially computed at ingestion time.
Metrics are configured through the [`metricsSpec`](#metricsspec).
## Rollup
### What is rollup?
Druid can roll up data as it is ingested to minimize the amount of raw data that needs to be stored. Rollup is
a form of summarization or pre-aggregation. In practice, rolling up data can dramatically reduce the size of data that
needs to be stored, reducing row counts by potentially orders of magnitude. This storage reduction does come at a cost:
as we roll up data, we lose the ability to query individual events.
When rollup is disabled, Druid loads each row as-is without doing any form of pre-aggregation. This mode is similar
to what you would expect from a typical database that does not support a rollup feature.
When rollup is enabled, then any rows that have identical [dimensions](#dimensions) and [timestamp](#primary-timestamp)
to each other (after [`queryGranularity`-based truncation](#granularityspec)) can be collapsed, or _rolled up_, into a
single row in Druid.
By default, rollup is enabled.
### Enabling or disabling rollup
Rollup is controlled by the `rollup` setting in the [`granularitySpec`](#granularityspec). By default, it is `true`
(enabled). Set this to `false` if you want Druid to store each record as-is, without any rollup summarization.
### Example of rollup
For an example of how to configure rollup, and of how the feature will modify your data, check out the
[rollup tutorial](../tutorials/tutorial-rollup.md).
### Maximizing rollup ratio
You can measure the rollup ratio of a datasource by comparing the number of rows in Druid with the number of ingested
events. The higher this number, the more benefit you are gaining from rollup. One way to do this is with a
[Druid SQL](../querying/sql.md) query like:
```sql
SELECT SUM("cnt") / COUNT(*) * 1.0 FROM datasource
```
In this query, `cnt` should refer to a "count" type metric specified at ingestion time. See
[Counting the number of ingested events](schema-design.md#counting) on the "Schema design" page for more details about
how counting works when rollup is enabled.
Tips for maximizing rollup:
- Generally, the fewer dimensions you have, and the lower the cardinality of your dimensions, the better rollup ratios
you will achieve.
- Use [sketches](schema-design.md#sketches) to avoid storing high cardinality dimensions, which harm rollup ratios.
- Adjusting `queryGranularity` at ingestion time (for example, using `PT5M` instead of `PT1M`) increases the
likelihood of two rows in Druid having matching timestamps, and can improve your rollup ratios.
- It can be beneficial to load the same data into more than one Druid datasource. Some users choose to create a "full"
datasource that has rollup disabled (or enabled, but with a minimal rollup ratio) and an "abbreviated" datasource that
has fewer dimensions and a higher rollup ratio. When queries only involve dimensions in the "abbreviated" set, using
that datasource leads to much faster query times. This can often be done with just a small increase in storage
footprint, since abbreviated datasources tend to be substantially smaller.
- If you are using a [best-effort rollup](#perfect-rollup-vs-best-effort-rollup) ingestion configuration that does not guarantee perfect
rollup, you can potentially improve your rollup ratio by switching to a guaranteed perfect rollup option, or by
[reindexing](data-management.md#reingesting-data) or [compacting](compaction.md) your data in the background after initial ingestion.
### Perfect rollup vs Best-effort rollup
Some Druid ingestion methods guarantee _perfect rollup_, meaning that input data are perfectly aggregated at ingestion
time. Others offer _best-effort rollup_, meaning that input data might not be perfectly aggregated and thus there could
be multiple segments holding rows with the same timestamp and dimension values.
In general, ingestion methods that offer best-effort rollup do this because they are either parallelizing ingestion
without a shuffling step (which would be required for perfect rollup), or because they are finalizing and publishing
segments before all data for a time chunk has been received, which we call _incremental publishing_. In both of these
cases, records that could theoretically be rolled up may end up in different segments. All types of streaming ingestion
run in this mode.
Ingestion methods that guarantee perfect rollup do it with an additional preprocessing step to determine intervals
and partitioning before the actual data ingestion stage. This preprocessing step scans the entire input dataset, which
generally increases the time required for ingestion, but provides information necessary for perfect rollup.
The following table shows how each method handles rollup:
|Method|How it works|
|------|------------|
|[Native batch](native-batch.md)|`index_parallel` and `index` type may be either perfect or best-effort, based on configuration.|
|[Hadoop](hadoop.md)|Always perfect.|
|[Kafka indexing service](../development/extensions-core/kafka-ingestion.md)|Always best-effort.|
|[Kinesis indexing service](../development/extensions-core/kinesis-ingestion.md)|Always best-effort.|
## Partitioning
### Why partition?
Optimal partitioning and sorting of segments within your datasources can have substantial impact on footprint and
performance.
Druid datasources are always partitioned by time into _time chunks_, and each time chunk contains one or more segments.
This partitioning happens for all ingestion methods, and is based on the `segmentGranularity` parameter of your
ingestion spec's `dataSchema`.
The segments within a particular time chunk may also be partitioned further, using options that vary based on the
ingestion type you have chosen. In general, doing this secondary partitioning using a particular dimension will
improve locality, meaning that rows with the same value for that dimension are stored together and can be accessed
quickly.
You will usually get the best performance and smallest overall footprint by partitioning your data on some "natural"
dimension that you often filter by, if one exists. This will often improve compression - users have reported threefold
storage size decreases - and it also tends to improve query performance as well.
> Partitioning and sorting are best friends! If you do have a "natural" partitioning dimension, you should also consider
> placing it first in the `dimensions` list of your `dimensionsSpec`, which tells Druid to sort rows within each segment
> by that column. This will often improve compression even more, beyond the improvement gained by partitioning alone.
>
> However, note that currently, Druid always sorts rows within a segment by timestamp first, even before the first
> dimension listed in your `dimensionsSpec`. This can prevent dimension sorting from being maximally effective. If
> necessary, you can work around this limitation by setting `queryGranularity` equal to `segmentGranularity` in your
> [`granularitySpec`](#granularityspec), which will set all timestamps within the segment to the same value, and by saving
> your "real" timestamp as a [secondary timestamp](schema-design.md#secondary-timestamps). This limitation may be removed
> in a future version of Druid.
### How to set up partitioning
Not all ingestion methods support an explicit partitioning configuration, and not all have equivalent levels of
flexibility. As of current Druid versions, If you are doing initial ingestion through a less-flexible method (like
Kafka) then you can use [reindexing](data-management.md#reingesting-data) or [compaction](compaction.md) to repartition your data after it
is initially ingested. This is a powerful technique: you can use it to ensure that any data older than a certain
threshold is optimally partitioned, even as you continuously add new data from a stream.
The following table shows how each ingestion method handles partitioning:
|Method|How it works|
|------|------------|
|[Native batch](native-batch.md)|Configured using [`partitionsSpec`](native-batch.md#partitionsspec) inside the `tuningConfig`.|
|[Hadoop](hadoop.md)|Configured using [`partitionsSpec`](hadoop.md#partitionsspec) inside the `tuningConfig`.|
|[Kafka indexing service](../development/extensions-core/kafka-ingestion.md)|Partitioning in Druid is guided by how your Kafka topic is partitioned. You can also [reindex](data-management.md#reingesting-data) or [compact](compaction.md) to repartition after initial ingestion.|
|[Kinesis indexing service](../development/extensions-core/kinesis-ingestion.md)|Partitioning in Druid is guided by how your Kinesis stream is sharded. You can also [reindex](data-management.md#reingesting-data) or [compact](compaction.md) to repartition after initial ingestion.|
> Note that, of course, one way to partition data is to load it into separate datasources. This is a perfectly viable
> approach and works very well when the number of datasources does not lead to excessive per-datasource overheads. If
> you go with this approach, then you can ignore this section, since it is describing how to set up partitioning
> _within a single datasource_.
>
> For more details on splitting data up into separate datasources, and potential operational considerations, refer
> to the [Multitenancy considerations](../querying/multitenancy.md) page.
## Ingestion specs
No matter what ingestion method you use, data is loaded into Druid using either one-time [tasks](tasks.md) or
ongoing "supervisors" (which run and supervise a set of tasks over time). In any case, part of the task or supervisor
definition is an _ingestion spec_.
Ingestion specs consists of three main components:
- [`dataSchema`](#dataschema), which configures the [datasource name](#datasource),
[primary timestamp](#timestampspec), [dimensions](#dimensionsspec), [metrics](#metricsspec), and [transforms and filters](#transformspec) (if needed).
- [`ioConfig`](#ioconfig), which tells Druid how to connect to the source system and how to parse data. For more information, see the
documentation for each [ingestion method](#ingestion-methods).
- [`tuningConfig`](#tuningconfig), which controls various tuning parameters specific to each
[ingestion method](#ingestion-methods).
Example ingestion spec for task type `index_parallel` (native batch):
```
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "wikipedia",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
{ "type": "string", "page" },
{ "type": "string", "language" },
{ "type": "long", "name": "userId" }
]
},
"metricsSpec": [
{ "type": "count", "name": "count" },
{ "type": "doubleSum", "name": "bytes_added_sum", "fieldName": "bytes_added" },
{ "type": "doubleSum", "name": "bytes_deleted_sum", "fieldName": "bytes_deleted" }
],
"granularitySpec": {
"segmentGranularity": "day",
"queryGranularity": "none",
"intervals": [
"2013-08-31/2013-09-01"
]
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"baseDir": "examples/indexing/",
"filter": "wikipedia_data.json"
},
"inputFormat": {
"type": "json",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{ "type": "path", "name": "userId", "expr": "$.user.id" }
]
}
}
},
"tuningConfig": {
"type": "index_parallel"
}
}
}
```
The specific options supported by these sections will depend on the [ingestion method](#ingestion-methods) you have chosen.
For more examples, refer to the documentation for each ingestion method.
You can also load data visually, without the need to write an ingestion spec, using the "Load data" functionality
available in Druid's [web console](../operations/druid-console.md). Druid's visual data loader supports
[Kafka](../development/extensions-core/kafka-ingestion.md),
[Kinesis](../development/extensions-core/kinesis-ingestion.md), and
[native batch](native-batch.md) mode.
## `dataSchema`
> The `dataSchema` spec has been changed in 0.17.0. The new spec is supported by all ingestion methods
except for _Hadoop_ ingestion. See the [Legacy `dataSchema` spec](#legacy-dataschema-spec) for the old spec.
The `dataSchema` is a holder for the following components:
- [datasource name](#datasource), [primary timestamp](#timestampspec),
[dimensions](#dimensionsspec), [metrics](#metricsspec), and
[transforms and filters](#transformspec) (if needed).
An example `dataSchema` is:
```
"dataSchema": {
"dataSource": "wikipedia",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
{ "type": "string", "page" },
{ "type": "string", "language" },
{ "type": "long", "name": "userId" }
]
},
"metricsSpec": [
{ "type": "count", "name": "count" },
{ "type": "doubleSum", "name": "bytes_added_sum", "fieldName": "bytes_added" },
{ "type": "doubleSum", "name": "bytes_deleted_sum", "fieldName": "bytes_deleted" }
],
"granularitySpec": {
"segmentGranularity": "day",
"queryGranularity": "none",
"intervals": [
"2013-08-31/2013-09-01"
]
}
}
```
### `dataSource`
The `dataSource` is located in `dataSchema` → `dataSource` and is simply the name of the
[datasource](../design/architecture.md#datasources-and-segments) that data will be written to. An example
`dataSource` is:
```
"dataSource": "my-first-datasource"
```
### `timestampSpec`
The `timestampSpec` is located in `dataSchema` → `timestampSpec` and is responsible for
configuring the [primary timestamp](#primary-timestamp). An example `timestampSpec` is:
```
"timestampSpec": {
"column": "timestamp",
"format": "auto"
}
```
> Conceptually, after input data records are read, Druid applies ingestion spec components in a particular order:
> first [`flattenSpec`](data-formats.md#flattenspec) (if any), then [`timestampSpec`](#timestampspec), then [`transformSpec`](#transformspec),
> and finally [`dimensionsSpec`](#dimensionsspec) and [`metricsSpec`](#metricsspec). Keep this in mind when writing
> your ingestion spec.
A `timestampSpec` can have the following components:
|Field|Description|Default|
|-----|-----------|-------|
|column|Input row field to read the primary timestamp from.
Regardless of the name of this input field, the primary timestamp will always be stored as a column named `__time` in your Druid datasource.|timestamp|
|format|Timestamp format. Options are: