| **How it works** | Druid reads directly from Apache Kafka. | Druid reads directly from Amazon Kinesis. | Tranquility, a library that ships separately from Druid, is used to push data into Druid. |
| **Can ingest late data?** | Yes | Yes | No (late data is dropped based on the `windowPeriod` config) |
When doing batch loads from files, you should use one-time [tasks](tasks.md), and you have three options: `index_parallel` (native batch; parallel), `index_hadoop` (Hadoop-based),
| **Parallel?** | Yes, if `inputFormat` is splittable and `maxNumConcurrentSubTasks` > 1 in `tuningConfig`. See [data format documentation](./data-formats.md) for details. | Yes, always. | No. Each task is single-threaded. |
| **Input locations** | Any [`inputSource`](./native-batch.md#input-sources). | Any Hadoop FileSystem or Druid datasource. | Any [`inputSource`](./native-batch.md#input-sources). |
| **File formats** | Any [`inputFormat`](./data-formats.md#input-format). | Any Hadoop InputFormat. | Any [`inputFormat`](./data-formats.md#input-format). |
| **[Rollup modes](#rollup)** | Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). | Always perfect. | Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). |
| **Partitioning options** | Dynamic, hash-based, and range-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec) for details. | Hash-based or range-based partitioning via [`partitionsSpec`](hadoop.md#partitionsspec). | Dynamic and hash-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec) for details. |
improve locality, meaning that rows with the same value for that dimension are stored together and can be accessed
quickly.
You will usually get the best performance and smallest overall footprint by partitioning your data on some "natural"
dimension that you often filter by, if one exists. This will often improve compression - users have reported threefold
storage size decreases - and it also tends to improve query performance as well.
> Partitioning and sorting are best friends! If you do have a "natural" partitioning dimension, you should also consider
> placing it first in the `dimensions` list of your `dimensionsSpec`, which tells Druid to sort rows within each segment
> by that column. This will often improve compression even more, beyond the improvement gained by partitioning alone.
>
> However, note that currently, Druid always sorts rows within a segment by timestamp first, even before the first
> dimension listed in your `dimensionsSpec`. This can prevent dimension sorting from being maximally effective. If
> necessary, you can work around this limitation by setting `queryGranularity` equal to `segmentGranularity` in your
> [`granularitySpec`](#granularityspec), which will set all timestamps within the segment to the same value, and by saving
> your "real" timestamp as a [secondary timestamp](schema-design.md#secondary-timestamps). This limitation may be removed
> in a future version of Druid.
### How to set up partitioning
Not all ingestion methods support an explicit partitioning configuration, and not all have equivalent levels of
flexibility. As of current Druid versions, If you are doing initial ingestion through a less-flexible method (like
Kafka) then you can use [reindexing techniques](data-management.html#compaction-and-reindexing) to repartition your data after it
is initially ingested. This is a powerful technique: you can use it to ensure that any data older than a certain
threshold is optimally partitioned, even as you continuously add new data from a stream.
The following table shows how each ingestion method handles partitioning:
|Method|How it works|
|------|------------|
|[Native batch](native-batch.html)|Configured using [`partitionsSpec`](native-batch.html#partitionsspec) inside the `tuningConfig`.|
|[Hadoop](hadoop.html)|Configured using [`partitionsSpec`](hadoop.html#partitionsspec) inside the `tuningConfig`.|
|[Kafka indexing service](../development/extensions-core/kafka-ingestion.md)|Partitioning in Druid is guided by how your Kafka topic is partitioned. You can also [reindex](data-management.html#compaction-and-reindexing) to repartition after initial ingestion.|
|[Kinesis indexing service](../development/extensions-core/kinesis-ingestion.md)|Partitioning in Druid is guided by how your Kinesis stream is sharded. You can also [reindex](data-management.html#compaction-and-reindexing) to repartition after initial ingestion.|
> Note that, of course, one way to partition data is to load it into separate datasources. This is a perfectly viable
> approach and works very well when the number of datasources does not lead to excessive per-datasource overheads. If
> you go with this approach, then you can ignore this section, since it is describing how to set up partitioning
> _within a single datasource_.
>
> For more details on splitting data up into separate datasources, and potential operational considerations, refer
> to the [Multitenancy considerations](../querying/multitenancy.md) page.
<aname="spec"></a>
## Ingestion specs
No matter what ingestion method you use, data is loaded into Druid using either one-time [tasks](tasks.html) or
ongoing "supervisors" (which run and supervised a set of tasks over time). In any case, part of the task or supervisor
definition is an _ingestion spec_.
Ingestion specs consists of three main components:
> and finally [`dimensionsSpec`](#dimensionsspec) and [`metricsSpec`](#metricsspec). Keep this in mind when writing
> your ingestion spec.
A `timestampSpec` can have the following components:
|Field|Description|Default|
|-----|-----------|-------|
|column|Input row field to read the primary timestamp from.<br><br>Regardless of the name of this input field, the primary timestamp will always be stored as a column named `__time` in your Druid datasource.|timestamp|
|format|Timestamp format. Options are: <ul><li>`iso`: ISO8601 with 'T' separator, like "2000-01-01T01:02:03.456"</li><li>`posix`: seconds since epoch</li><li>`millis`: milliseconds since epoch</li><li>`micro`: microseconds since epoch</li><li>`nano`: nanoseconds since epoch</li><li>`auto`: automatically detects ISO (either 'T' or space separator) or millis format</li><li>any [Joda DateTimeFormat string](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html)</li></ul>|auto|
|missingValue|Timestamp to use for input records that have a null or missing timestamp `column`. Should be in ISO8601 format, like `"2000-01-01T01:02:03.456"`, even if you have specified something else for `format`. Since Druid requires a primary timestamp, this setting can be useful for ingesting datasets that do not have any per-record timestamps at all. |none|
| dimensions | A list of [dimension names or objects](#dimension-objects). Cannot have the same column in both `dimensions` and `dimensionExclusions`.<br><br>If this is an empty array, Druid will treat all non-timestamp, non-metric columns that do not appear in `dimensionExclusions` as String-typed dimension columns (see [inclusions and exclusions](#inclusions-and-exclusions) below). | `[]` |
| dimensionExclusions | The names of dimensions to exclude from ingestion. Only names are supported here, not objects. Cannot have the same column in both `dimensions` and `dimensionExclusions`.| `[]` |
| spatialDimensions | An array of [spatial dimensions](../development/geo.md). | `[]` |
#### Dimension objects
Each dimension in the `dimensions` list can either be a name or an object. Providing a name is equivalent to providing
a `string` type dimension object with the given name, e.g. `"page"` is equivalent to `{"name": "page", "type": "string"}`.
Dimension objects can have the following components:
| Field | Description | Default |
|-------|-------------|---------|
| type | Either `string`, `long`, `float`, or `double`. | `string` |
| name | The name of the dimension. This will be used as the field name to read from input records, as well as the column name stored in generated segments.<br><br>Note that you can use a [`transformSpec`](#transformspec) if you want to rename columns during ingestion time. | none (required) |
| createBitmapIndex | For `string` typed dimensions, whether or not bitmap indexes should be created for the column in generated segments. Creating a bitmap index requires more storage, but speeds up certain kinds of filtering (especially equality and prefix filtering). Only supported for `string` typed dimensions. | `true` |
#### Inclusions and exclusions
Druid will interpret a `dimensionsSpec` in two possible ways: _normal_ or _schemaless_.
Normal interpretation occurs when either `dimensions` or `spatialDimensions` is non-empty. In this case, the combination of the two lists will be taken as the set of dimensions to be ingested.
Schemaless interpretation occurs when both `dimensions` and `spatialDimensions` are empty or null. In this case, the set of dimensions is determined in the following way:
1. First, start from the set of all input fields from the [`inputFormat`](./data-formats.md) (or the [`flattenSpec`](./data-formats.md#flattenspec), if one is being used).
| segmentGranularity | [Time chunking](../design/architecture.html#datasources-and-segments) granularity for this datasource. Multiple segments can be created per time chunk. For example, when set to `day`, the events of the same day fall into the same time chunk which can be optionally further partitioned into multiple segments based on other configurations and input size. Any [granularity](../querying/granularities.md) can be provided here. Note that all segments in the same time chunk should have the same segment granularity.<br><br>Ignored if `type` is set to `arbitrary`.| `day` |
| queryGranularity | The resolution of timestamp storage within each segment. This must be equal to, or finer, than `segmentGranularity`. This will be the finest granularity that you can query at and still receive sensible results, but note that you can still query at anything coarser than this granularity. E.g., a value of `minute` will mean that records will be stored at minutely granularity, and can be sensibly queried at any multiple of minutes (including minutely, 5-minutely, hourly, etc).<br><br>Any [granularity](../querying/granularities.md) can be provided here. Use `none` to store timestamps as-is, without any truncation. Note that `rollup` will be applied if it is set even when the `queryGranularity` is set to `none`. | `none` |
| rollup | Whether to use ingestion-time [rollup](#rollup) or not. Note that rollup is still effective even when `queryGranularity` is set to `none`. Your data will be rolled up if they have the exactly same timestamp. | `true` |
| intervals | A list of intervals describing what time chunks of segments should be created. If `type` is set to `uniform`, this list will be broken up and rounded-off based on the `segmentGranularity`. If `type` is set to `arbitrary`, this list will be used as-is.<br><br>If `null` or not provided, batch ingestion tasks will generally determine which time chunks to output based on what timestamps are found in the input data.<br><br>If specified, batch ingestion tasks may be able to skip a determining-partitions phase, which can result in faster ingestion. Batch ingestion tasks may also be able to request all their locks up-front instead of one by one. Batch ingestion tasks will throw away any records with timestamps outside of the specified intervals.<br><br>Ignored for any form of streaming ingestion. | `null` |
### `transformSpec`
The `transformSpec` is located in `dataSchema` → `transformSpec` and is responsible for transforming and filtering
records during ingestion time. It is optional. An example `transformSpec` is:
|type|Each ingestion method has its own tuning type code. You must specify the type code that matches your ingestion method. Common options are `index`, `hadoop`, `kafka`, and `kinesis`.||
|maxRowsInMemory|The maximum number of records to store in memory before persisting to disk. Note that this is the number of rowspost-rollup, and so it may not be equal to the number of input records. Ingested records will be persisted to disk when either `maxRowsInMemory` or `maxBytesInMemory` are reached (whichever happens first).|`1000000`|
|maxBytesInMemory|The maximum aggregate size of records, in bytes, to store in the JVM heap before persisting. This is based on a rough estimate of memory usage. Ingested records will be persisted to disk when either `maxRowsInMemory` or `maxBytesInMemory` are reached (whichever happens first).<br/><br/>Setting maxBytesInMemory to -1 disables this check, meaning Druid will rely entirely on maxRowsInMemory to control memory usage. Setting it to zero means the default value will be used (one-sixth of JVM heap size).<br/><br/>Note that the estimate of memory usage is designed to be an overestimate, and can be especially high when using complex ingest-time aggregators, including sketches. If this causes your indexing workloads to persist to disk too often, you can set maxBytesInMemory to -1 and rely on maxRowsInMemory instead.|One-sixth of max JVM heap size|
|indexSpec|Tune how data is indexed. See below for more information.|See table below|
|Other properties|Each ingestion method has its own list of additional tuning properties. See the documentation for each method for a full list: [Kafka indexing service](../development/extensions-core/kafka-ingestion.md#tuningconfig), [Kinesis indexing service](../development/extensions-core/kinesis-ingestion.md#tuningconfig), [Native batch](native-batch.md#tuningconfig), and [Hadoop-based](hadoop.md#tuningconfig).||
#### `indexSpec`
The `indexSpec` object can include the following properties:
|bitmap|Compression format for bitmap indexes. Should be a JSON object with `type` set to `roaring` or `concise`. For type `roaring`, the boolean property `compressRunOnSerialization` (defaults to true) controls whether or not run-length encoding will be used when it is determined to be more space-efficient.|`{"type": "concise"}`|
|metricCompression|Compression format for primitive type metric columns. Options are `lz4`, `lzf`, `uncompressed`, or `none` (which is more efficient than `uncompressed`, but not supported by older versions of Druid).|`lz4`|
|longEncoding|Encoding format for long-typed columns. Applies regardless of whether they are dimensions or metrics. Options are `auto` or `longs`. `auto` encodes the values using offset or lookup table depending on column cardinality, and store them with variable size. `longs` stores the value as-is with 8 bytes each.|`longs`|
Beyond these properties, each ingestion method has its own specific tuning properties. See the documentation for each
[ingestion method](#ingestion-methods) for details.