druid/docs/ingestion/partitioning.md
Gian Merlino 0603d5153d
Segments sorted by non-time columns. (#16849)
* Segments primarily sorted by non-time columns.

Currently, segments are always sorted by __time, followed by the sort
order provided by the user via dimensionsSpec or CLUSTERED BY. Sorting
by __time enables efficient execution of queries involving time-ordering
or granularity. Time-ordering is a simple matter of reading the rows in
stored order, and granular cursors can be generated in streaming fashion.

However, for various workloads, it's better for storage footprint and
query performance to sort by arbitrary orders that do not start with __time.
With this patch, users can sort segments by such orders.

For spec-based ingestion, users add "useExplicitSegmentSortOrder: true" to
dimensionsSpec. The "dimensions" list determines the sort order. To
define a sort order that includes "__time", users explicitly
include a dimension named "__time".

For SQL-based ingestion, users set the context parameter
"useExplicitSegmentSortOrder: true". The CLUSTERED BY clause is then
used as the explicit segment sort order.

In both cases, when the new "useExplicitSegmentSortOrder" parameter is
false (the default), __time is implicitly prepended to the sort order,
as it always was prior to this patch.

The new parameter is experimental for two main reasons. First, such
segments can cause errors when loaded by older servers, due to violating
their expectations that timestamps are always monotonically increasing.
Second, even on newer servers, not all queries can run on non-time-sorted
segments. Scan queries involving time-ordering and any query involving
granularity will not run. (To partially mitigate this, a currently-undocumented
SQL feature "sqlUseGranularity" is provided. When set to false the SQL planner
avoids using "granularity".)

Changes on the write path:

1) DimensionsSpec can now optionally contain a __time dimension, which
   controls the placement of __time in the sort order. If not present,
   __time is considered to be first in the sort order, as it has always
   been.

2) IncrementalIndex and IndexMerger are updated to sort facts more
   flexibly; not always by time first.

3) Metadata (stored in metadata.drd) gains a "sortOrder" field.

4) MSQ can generate range-based shard specs even when not all columns are
   singly-valued strings. It merely stops accepting new clustering key
   fields when it encounters the first one that isn't a singly-valued
   string. This is useful because it enables range shard specs on
   "someDim" to be created for clauses like "CLUSTERED BY someDim, __time".

Changes on the read path:

1) Add StorageAdapter#getSortOrder so query engines can tell how a
   segment is sorted.

2) Update QueryableIndexStorageAdapter, IncrementalIndexStorageAdapter,
   and VectorCursorGranularizer to throw errors when using granularities
   on non-time-ordered segments.

3) Update ScanQueryEngine to throw an error when using the time-ordering
  "order" parameter on non-time-ordered segments.

4) Update TimeBoundaryQueryRunnerFactory to perform a segment scan when
   running on a non-time-ordered segment.

5) Add "sqlUseGranularity" context parameter that causes the SQL planner
   to avoid using granularities other than ALL.

Other changes:

1) Rename DimensionsSpec "hasCustomDimensions" to "hasFixedDimensions"
   and change the meaning subtly: it now returns true if the DimensionsSpec
   represents an unchanging list of dimensions, or false if there is
   some discovery happening. This is what call sites had expected anyway.

* Fixups from CI.

* Fixes.

* Fix missing arg.

* Additional changes.

* Fix logic.

* Fixes.

* Fix test.

* Adjust test.

* Remove throws.

* Fix styles.

* Fix javadocs.

* Cleanup.

* Smoother handling of null ordering.

* Fix tests.

* Missed a spot on the merge.

* Fixups.

* Avoid needless Filters.and.

* Add timeBoundaryInspector to test.

* Fix tests.

* Fix FrameStorageAdapterTest.

* Fix various tests.

* Use forceSegmentSortByTime instead of useExplicitSegmentSortOrder.

* Pom fix.

* Fix doc.
2024-08-23 08:24:43 -07:00

6.5 KiB

id title sidebar_label description
partitioning Partitioning Partitioning Describes time chunk and secondary partitioning in Druid. Provides guidance to choose a secondary partition dimension.

You can use segment partitioning and sorting within your Druid datasources to reduce the size of your data and increase performance.

One way to partition is to load data into separate datasources. This is a perfectly viable approach that works very well when the number of datasources does not lead to excessive per-datasource overheads.

This topic describes how to set up partitions within a single datasource. It does not cover how to use multiple datasources. See Multitenancy considerations for more details on splitting data into separate datasources and potential operational considerations.

Time chunk partitioning

Druid always partitions datasources by time into time chunks. Each time chunk contains one or more segments. This partitioning happens for all ingestion methods based on the segmentGranularity parameter in your ingestion spec dataSchema object.

Partitioning by time is important for two reasons:

  1. Queries that filter by __time (SQL) or intervals (native) are able to use time partitioning to prune the set of segments to consider.
  2. Certain data management operations, such as overwriting and compacting existing data, acquire exclusive write locks on time partitions.
  3. Each segment file is wholly contained within a time partition. Too-fine-grained partitioning may cause a large number of small segments, which leads to poor performance.

The most common choices to balance these considerations are hour and day. For streaming ingestion, hour is especially common, because it allows compaction to follow ingestion with less of a time delay.

The following table describes how to configure time chunk partitioning.

Method Configuration
SQL PARTITIONED BY
Kafka or Kinesis segmentGranularity inside the granularitySpec
Native batch or Hadoop segmentGranularity inside the granularitySpec

Secondary partitioning

Druid further partitions each time chunk into immutable segments. Secondary partitioning on a particular dimension improves locality. This means that rows with the same value for that dimension are stored together, decreasing access time.

To achieve the best performance and smallest overall footprint, partition your data on a "natural" dimension that you often use as a filter, or that achieves some alignment within your data. Such partitioning can improve compression and query performance by significant multiples.

The following table describes how to configure secondary partitioning.

Method Configuration
SQL CLUSTERED BY
Kafka or Kinesis Upstream partitioning defines how Druid partitions the datasource. You can also alter clustering using REPLACE (with CLUSTERED BY) or compaction after initial ingestion.
Native batch or Hadoop partitionsSpec inside the tuningConfig

Sorting

Each segment is internally sorted to promote compression and locality.

Partitioning and sorting work well together. If you do have a "natural" partitioning dimension, consider placing it first in your sort order as well. This way, Druid sorts rows within each segment by that column. This sorting configuration frequently improves compression and performance more than using partitioning alone.

The following table describes how to configure sorting.

Method Configuration
SQL Uses order of fields in CLUSTERED BY or segmentSortOrder in the query context
Kafka or Kinesis Uses order of fields in dimensionsSpec
Native batch or Hadoop Uses order of fields in dimensionsSpec

:::info Druid implicitly sorts rows within a segment by __time first before any dimensions or CLUSTERED BY fields, unless you set forceSegmentSortByTime to false in your query context (for SQL) or in your dimensionsSpec (for other ingestion forms).

Setting forceSegmentSortByTime to false is an experimental feature. Segments created with sort orders that do not start with __time can only be read by Druid 31 or later. Additionally, at this time, certain queries are not supported on such segments, including:

  • Native queries with granularity other than all.
  • Native scan query with ascending or descending time order.
  • SQL queries that plan into an unsupported native query. :::

Learn more

See the following topics for more information: