druid/docs/content/ingestion/stream-pull.md

377 lines
22 KiB
Markdown
Raw Normal View History

---
layout: doc_page
title: "Stream Pull Ingestion"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
2018-09-04 15:54:41 -04:00
<div class="note info">
2019-02-28 21:10:39 -05:00
NOTE: Realtime processes are deprecated. Please use the <a href="../development/extensions-core/kafka-ingestion.html">Kafka Indexing Service</a> for stream pull use cases instead.
2018-09-04 15:54:41 -04:00
</div>
# Stream Pull Ingestion
2016-01-06 00:27:52 -05:00
If you have an external service that you want to pull data from, you have two options. The simplest
option is to set up a "copying" service that reads from the data source and writes to Apache Druid (incubating) using
2016-01-06 00:27:52 -05:00
the [stream push method](stream-push.html).
2019-02-28 21:10:39 -05:00
Another option is *stream pull*. With this approach, a Druid Realtime Process ingests data from a
2016-01-06 00:27:52 -05:00
[Firehose](../ingestion/firehose.html) connected to the data you want to
2019-02-28 21:10:39 -05:00
read. The Druid quickstart and tutorials do not include information about how to set up standalone realtime processes, but
they can be used in place for Tranquility server and the indexing service. Please note that Realtime processes have different properties and roles than the indexing service.
2019-02-28 21:10:39 -05:00
## Realtime Process Ingestion
2019-02-28 21:10:39 -05:00
Much of the configuration governing Realtime processes and the ingestion of data is set in the Realtime spec file, discussed on this page.
2019-02-28 21:10:39 -05:00
For general Real-time Process information, see [here](../design/realtime.html).
2016-01-06 00:27:52 -05:00
2019-02-28 21:10:39 -05:00
For Real-time Process Configuration, see [Realtime Configuration](../configuration/realtime.html).
2016-01-06 00:27:52 -05:00
2019-02-28 21:10:39 -05:00
For writing your own plugins to the real-time process, see [Firehose](../ingestion/firehose.html).
2016-01-06 00:27:52 -05:00
## Realtime "specFile"
The property `druid.realtime.specFile` has the path of a file (absolute or relative path and file name) with realtime specifications in it. This "specFile" should be a JSON Array of JSON objects like the following:
```json
[
{
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [{
"type" : "count",
"name" : "count"
}, {
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
}, {
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
}, {
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE"
}
},
"ioConfig" : {
"type" : "realtime",
"firehose": {
"type": "kafka-0.8",
"consumerProps": {
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms" : "15000",
"zookeeper.session.timeout.ms" : "15000",
"zookeeper.sync.time.ms" : "5000",
"group.id": "druid-example",
"fetch.message.max.bytes" : "1048586",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"feed": "wikipedia"
},
"plumber": {
"type": "realtime"
}
},
"tuningConfig": {
"type" : "realtime",
'maxBytesInMemory' tuningConfig introduced for ingestion tasks (#5583) * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Fix check style and remove a comment * Add overlord unsecured paths to coordinator when using combined service (#5579) * Add overlord unsecured paths to coordinator when using combined service * PR comment * More error reporting and stats for ingestion tasks (#5418) * Add more indexing task status and error reporting * PR comments, add support in AppenderatorDriverRealtimeIndexTask * Use TaskReport instead of metrics/context * Fix tests * Use TaskReport uploads * Refactor fire department metrics retrieval * Refactor input row serde in hadoop task * Refactor hadoop task loader names * Truncate error message in TaskStatus, add errorMsg to task report * PR comments * Allow getDomain to return disjointed intervals (#5570) * Allow getDomain to return disjointed intervals * Indentation issues * Adding feature thetaSketchConstant to do some set operation in PostAgg (#5551) * Adding feature thetaSketchConstant to do some set operation in PostAggregator * Updated review comments for PR #5551 - Adding thetaSketchConstant * Fixed CI build issue * Updated review comments 2 for PR #5551 - Adding thetaSketchConstant * Fix taskDuration docs for KafkaIndexingService (#5572) * With incremental handoff the changed line is no longer true. * Add doc for automatic pendingSegments (#5565) * Add missing doc for automatic pendingSegments * address comments * Fix indexTask to respect forceExtendableShardSpecs (#5509) * Fix indexTask to respect forceExtendableShardSpecs * add comments * Deprecate spark2 profile in pom.xml (#5581) Deprecated due to https://github.com/druid-io/druid/pull/5382 * CompressionUtils: Add support for decompressing xz, bz2, zip. (#5586) Also switch various firehoses to the new method. Fixes #5585. * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Address code review comments * Fix the coding style according to druid conventions * Add more javadocs * Rename some variables/methods * Other minor issues * Address more code review comments * Some refactoring to put defaults in IndexTaskUtils * Added check for maxBytesInMemory in AppenderatorImpl * Decrement bytes in abandonSegment * Test unit test for multiple sinks in single appenderator * Fix some merge conflicts after rebase * Fix some style checks * Merge conflicts * Fix failing tests Add back check for 0 maxBytesInMemory in OnHeapIncrementalIndex * Address PR comments * Put defaults for maxRows and maxBytes in TuningConfig * Change/add javadocs * Refactoring and renaming some variables/methods * Fix TeamCity inspection warnings * Added maxBytesInMemory config to HadoopTuningConfig * Updated the docs and examples * Added maxBytesInMemory config in docs * Removed references to maxRowsInMemory under tuningConfig in examples * Set maxBytesInMemory to 0 until used Set the maxBytesInMemory to 0 if user does not set it as part of tuningConfing and set to part of max jvm memory when ingestion task starts * Update toString in KafkaSupervisorTuningConfig * Use correct maxBytesInMemory value in AppenderatorImpl * Update DEFAULT_MAX_BYTES_IN_MEMORY to 1/6 max jvm memory Experimenting with various defaults, 1/3 jvm memory causes OOM * Update docs to correct maxBytesInMemory default value * Minor to rename and add comment * Add more details in docs * Address new PR comments * Address PR comments * Fix spelling typo
2018-05-03 19:25:58 -04:00
"maxRowsInMemory": 1000000,
"intermediatePersistPeriod": "PT10M",
"windowPeriod": "PT10M",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "serverTime"
}
}
}
]
```
2019-02-28 21:10:39 -05:00
This is a JSON Array so you can give more than one realtime stream to a given process. The number you can put in the same process depends on the exact configuration. In general, it is best to think of each realtime stream handler as requiring 2-threads: 1 thread for data consumption and aggregation, 1 thread for incremental persists and other background tasks.
There are three parts to a realtime stream specification, `dataSchema`, `IOConfig`, and `tuningConfig` which we will go into here.
### DataSchema
This field is required.
See [Ingestion](../ingestion/index.html)
### IOConfig
This field is required.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|This should always be 'realtime'.|yes|
|firehose|JSON Object|Where the data is coming from. Described in detail below.|yes|
|plumber|JSON Object|Where the data is going. Described in detail below.|yes|
#### Firehose
2016-01-06 00:27:52 -05:00
See [Firehose](../ingestion/firehose.html) for more information on various firehoses.
#### Plumber
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|This should always be 'realtime'.|no|
### TuningConfig
The tuningConfig is optional and default parameters will be used if no tuningConfig is specified.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|This should always be 'realtime'.|no|
'maxBytesInMemory' tuningConfig introduced for ingestion tasks (#5583) * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Fix check style and remove a comment * Add overlord unsecured paths to coordinator when using combined service (#5579) * Add overlord unsecured paths to coordinator when using combined service * PR comment * More error reporting and stats for ingestion tasks (#5418) * Add more indexing task status and error reporting * PR comments, add support in AppenderatorDriverRealtimeIndexTask * Use TaskReport instead of metrics/context * Fix tests * Use TaskReport uploads * Refactor fire department metrics retrieval * Refactor input row serde in hadoop task * Refactor hadoop task loader names * Truncate error message in TaskStatus, add errorMsg to task report * PR comments * Allow getDomain to return disjointed intervals (#5570) * Allow getDomain to return disjointed intervals * Indentation issues * Adding feature thetaSketchConstant to do some set operation in PostAgg (#5551) * Adding feature thetaSketchConstant to do some set operation in PostAggregator * Updated review comments for PR #5551 - Adding thetaSketchConstant * Fixed CI build issue * Updated review comments 2 for PR #5551 - Adding thetaSketchConstant * Fix taskDuration docs for KafkaIndexingService (#5572) * With incremental handoff the changed line is no longer true. * Add doc for automatic pendingSegments (#5565) * Add missing doc for automatic pendingSegments * address comments * Fix indexTask to respect forceExtendableShardSpecs (#5509) * Fix indexTask to respect forceExtendableShardSpecs * add comments * Deprecate spark2 profile in pom.xml (#5581) Deprecated due to https://github.com/druid-io/druid/pull/5382 * CompressionUtils: Add support for decompressing xz, bz2, zip. (#5586) Also switch various firehoses to the new method. Fixes #5585. * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Address code review comments * Fix the coding style according to druid conventions * Add more javadocs * Rename some variables/methods * Other minor issues * Address more code review comments * Some refactoring to put defaults in IndexTaskUtils * Added check for maxBytesInMemory in AppenderatorImpl * Decrement bytes in abandonSegment * Test unit test for multiple sinks in single appenderator * Fix some merge conflicts after rebase * Fix some style checks * Merge conflicts * Fix failing tests Add back check for 0 maxBytesInMemory in OnHeapIncrementalIndex * Address PR comments * Put defaults for maxRows and maxBytes in TuningConfig * Change/add javadocs * Refactoring and renaming some variables/methods * Fix TeamCity inspection warnings * Added maxBytesInMemory config to HadoopTuningConfig * Updated the docs and examples * Added maxBytesInMemory config in docs * Removed references to maxRowsInMemory under tuningConfig in examples * Set maxBytesInMemory to 0 until used Set the maxBytesInMemory to 0 if user does not set it as part of tuningConfing and set to part of max jvm memory when ingestion task starts * Update toString in KafkaSupervisorTuningConfig * Use correct maxBytesInMemory value in AppenderatorImpl * Update DEFAULT_MAX_BYTES_IN_MEMORY to 1/6 max jvm memory Experimenting with various defaults, 1/3 jvm memory causes OOM * Update docs to correct maxBytesInMemory default value * Minor to rename and add comment * Add more details in docs * Address new PR comments * Address PR comments * Fix spelling typo
2018-05-03 19:25:58 -04:00
|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 1000000)|
|maxBytesInMemory|Long|The maximum number of bytes to keep in memory to aggregate before persisting. This is used to manage the required JVM heap size.|no (default == One-sixth of max JVM memory)|
|windowPeriod|ISO 8601 Period String|The amount of lag time to allow events. This is configured with a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server.|no (default == PT10M)|
|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|no (default == PT10M)|
|basePersistDirectory|String|The directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists.|no (default == java tmp dir)|
|versioningPolicy|Object|How to version segments.|no (default == based on segment start time)|
2016-04-28 18:34:58 -04:00
|rejectionPolicy|Object|Controls how data sets the data acceptance policy for creating and handing off segments. More on this below.|no (default == 'serverTime')|
|maxPendingPersists|Integer|Maximum number of persists that can be pending, but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0; meaning one persist can be running concurrently with ingestion, and none can be queued up)|
2019-02-28 21:10:39 -05:00
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime processes indexing the same data stream in a [sharded fashion](#sharding).|no (default == 'NoneShardSpec')|
2016-04-28 18:34:58 -04:00
|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default == 0; inherit and do not override)|
|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default == 0; inherit and do not override)|
|reportParseExceptions|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion. If false, unparseable rows and fields will be skipped. If an entire row is skipped, the "unparseable" counter will be incremented. If some fields in a row were parseable and some were not, the parseable fields will be indexed and the "unparseable" counter will not be incremented.|no (default == false)|
|handoffConditionTimeout|long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|alertTimeout|long|Milliseconds timeout after which an alert is created if the task isn't finished by then. This allows users to monitor tasks that are failing to finish and give up the worker slot for any unexpected errors.|no (default == 0)|
|segmentWriteOutMediumFactory|Object|Segment write-out medium to use when creating segments. See below for more information.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used)|
|dedupColumn|String|the column to judge whether this row is already in this segment, if so, throw away this row. If it is String type column, to reduce heap cost, use long type hashcode of this column's value to judge whether this row is already ingested, so there maybe very small chance to throw away a row that is not ingested before.|no (default == null)|
|indexSpec|Object|Tune how data is indexed. See below for more information.|no|
Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/apache/incubator-druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`.
#### Rejection Policy
The following policies are available:
* `serverTime` &ndash; The recommended policy for "current time" data, it is optimal for current data that is generated and ingested in real time. Uses `windowPeriod` to accept only those events that are inside the window looking forward and back.
* `messageTime` &ndash; Can be used for non-"current time" as long as that data is relatively in sequence. Events are rejected if they are less than `windowPeriod` from the event with the latest timestamp. Hand off only occurs if an event is seen after the segmentGranularity and `windowPeriod` (hand off will not periodically occur unless you have a constant stream of data).
* `none` &ndash; All events are accepted. Never hands off data unless shutdown() is called on the configured firehose.
#### SegmentWriteOutMediumFactory
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|See [Additional Peon Configuration: SegmentWriteOutMediumFactory](../configuration/index.html#segmentwriteoutmediumfactory) for explanation and available options.|yes|
#### IndexSpec
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|bitmap|Object|Compression format for bitmap indexes. Should be a JSON object; see below for options.|no (defaults to Concise)|
|dimensionCompression|String|Compression format for dimension columns. Choose from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)|
|metricCompression|String|Compression format for metric columns. Choose from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)|
##### Bitmap types
For Concise bitmaps:
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|Must be `concise`.|yes|
For Roaring bitmaps:
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|Must be `roaring`.|yes|
|compressRunOnSerialization|Boolean|Use a run-length encoding where it is estimated as more space efficient.|no (default == `true`)|
2016-02-04 14:53:09 -05:00
#### Sharding
2016-01-06 00:27:52 -05:00
Druid uses shards, or segments with partition numbers, to more efficiently handle large amounts of incoming data. In Druid, shards represent the segments that together cover a time interval based on the value of `segmentGranularity`. If, for example, `segmentGranularity` is set to "hour", then a number of shards may be used to store the data for that hour. Sharding along dimensions may also occur to optimize efficiency.
Segments are identified by datasource, time interval, and version. With sharding, a segment is also identified by a partition number. Typically, each shard will have the same version but a different partition number to uniquely identify it.
In small-data scenarios, sharding is unnecessary and can be set to none (the default):
```json
"shardSpec": {"type": "none"}
```
2019-02-28 21:10:39 -05:00
However, in scenarios with multiple realtime processes, `none` is less useful as it cannot help with scaling data volume (see below). Note that for the batch indexing service, no explicit configuration is required; sharding is provided automatically.
Druid uses sharding based on the `shardSpec` setting you configure. The recommended choices, `linear` and `numbered`, are discussed below; other types have been useful for internal Druid development but are not appropriate for production setups.
Keep in mind, that sharding configuration has nothing to do with configured firehose. For example, if you set partition number to 0, it doesn't mean that Kafka firehose will consume only from 0 topic partition.
##### Linear
2016-01-06 00:27:52 -05:00
This strategy provides following advantages:
2019-02-28 21:10:39 -05:00
* There is no need to update the fileSpec configurations of existing processes when adding new processes.
* All unique shards are queried, regardless of whether the partition numbering is sequential or not (it allows querying of partitions 0 and 2, even if partition 1 is missing).
Configure `linear` under `schema`:
```json
"shardSpec": {
"type": "linear",
"partitionNum": 0
}
```
##### Numbered
2016-01-06 00:27:52 -05:00
This strategy is similar to `linear` except that it does not tolerate non-sequential partition numbering (it will *not* allow querying of partitions 0 and 2 if partition 1 is missing). It also requires explicitly setting the total number of partitions.
Configure `numbered` under `schema`:
```json
"shardSpec": {
"type": "numbered",
"partitionNum": 0,
"partitions": 2
}
```
##### Scale and Redundancy
2016-01-06 00:27:52 -05:00
2019-02-28 21:10:39 -05:00
The `shardSpec` configuration can be used to create redundancy by having the same `partitionNum` values on different processes.
2019-02-28 21:10:39 -05:00
For example, if RealTimeProcess1 has:
```json
"shardSpec": {
"type": "linear",
"partitionNum": 0
}
```
2019-02-28 21:10:39 -05:00
and RealTimeProcess2 has:
```json
"shardSpec": {
"type": "linear",
"partitionNum": 0
}
```
2019-02-28 21:10:39 -05:00
then two realtime processes can store segments with the same datasource, version, time interval, and partition number. Brokers that query for data in such segments will assume that they hold the same data, and the query will target only one of the segments.
2019-02-28 21:10:39 -05:00
`shardSpec` can also help achieve scale. For this, add processes with a different `partionNum`. Continuing with the example, if RealTimeProcess3 has:
```json
"shardSpec": {
"type": "linear",
"partitionNum": 1
}
```
2019-02-28 21:10:39 -05:00
then it can store segments with the same datasource, time interval, and version as in the first two processes, but with a different partition number. Brokers that query for data in such segments will assume that a segment from RealTimeProcess3 holds *different* data, and the query will target it along with a segment from the first two processes.
You can use type `numbered` similarly. Note that type `none` is essentially type `linear` with all shards having a fixed `partitionNum` of 0.
## Constraints
The following table summarizes constraints between settings in the spec file for the Realtime subsystem.
|Name|Effect|Minimum|Recommended|
|----|------|-------|-----------|
|windowPeriod| When reading a row, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers |
|segmentGranularity| Time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than queryGranularity|
|queryGranularity| Time granularity (minute, hour, day, week, month) for rollup | less than segmentGranularity| minute, hour, day, week, month |
|intermediatePersistPeriod| The max time (ISO8601 Period) between flushes of ingested rows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory |
'maxBytesInMemory' tuningConfig introduced for ingestion tasks (#5583) * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Fix check style and remove a comment * Add overlord unsecured paths to coordinator when using combined service (#5579) * Add overlord unsecured paths to coordinator when using combined service * PR comment * More error reporting and stats for ingestion tasks (#5418) * Add more indexing task status and error reporting * PR comments, add support in AppenderatorDriverRealtimeIndexTask * Use TaskReport instead of metrics/context * Fix tests * Use TaskReport uploads * Refactor fire department metrics retrieval * Refactor input row serde in hadoop task * Refactor hadoop task loader names * Truncate error message in TaskStatus, add errorMsg to task report * PR comments * Allow getDomain to return disjointed intervals (#5570) * Allow getDomain to return disjointed intervals * Indentation issues * Adding feature thetaSketchConstant to do some set operation in PostAgg (#5551) * Adding feature thetaSketchConstant to do some set operation in PostAggregator * Updated review comments for PR #5551 - Adding thetaSketchConstant * Fixed CI build issue * Updated review comments 2 for PR #5551 - Adding thetaSketchConstant * Fix taskDuration docs for KafkaIndexingService (#5572) * With incremental handoff the changed line is no longer true. * Add doc for automatic pendingSegments (#5565) * Add missing doc for automatic pendingSegments * address comments * Fix indexTask to respect forceExtendableShardSpecs (#5509) * Fix indexTask to respect forceExtendableShardSpecs * add comments * Deprecate spark2 profile in pom.xml (#5581) Deprecated due to https://github.com/druid-io/druid/pull/5382 * CompressionUtils: Add support for decompressing xz, bz2, zip. (#5586) Also switch various firehoses to the new method. Fixes #5585. * This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks Currently a config called 'maxRowsInMemory' is present which affects how much memory gets used for indexing.If this value is not optimal for your JVM heap size, it could lead to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might be bad for query performance and a higher value will limit number of persists but require more jvm heap space and could lead to OOM. 'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes kept in memory before persisting. * The default value is 1/3(Runtime.maxMemory()) * To maintain the current behaviour set 'maxBytesInMemory' to -1 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them will be respected i.e. the first one to go above threshold will trigger persist * Address code review comments * Fix the coding style according to druid conventions * Add more javadocs * Rename some variables/methods * Other minor issues * Address more code review comments * Some refactoring to put defaults in IndexTaskUtils * Added check for maxBytesInMemory in AppenderatorImpl * Decrement bytes in abandonSegment * Test unit test for multiple sinks in single appenderator * Fix some merge conflicts after rebase * Fix some style checks * Merge conflicts * Fix failing tests Add back check for 0 maxBytesInMemory in OnHeapIncrementalIndex * Address PR comments * Put defaults for maxRows and maxBytes in TuningConfig * Change/add javadocs * Refactoring and renaming some variables/methods * Fix TeamCity inspection warnings * Added maxBytesInMemory config to HadoopTuningConfig * Updated the docs and examples * Added maxBytesInMemory config in docs * Removed references to maxRowsInMemory under tuningConfig in examples * Set maxBytesInMemory to 0 until used Set the maxBytesInMemory to 0 if user does not set it as part of tuningConfing and set to part of max jvm memory when ingestion task starts * Update toString in KafkaSupervisorTuningConfig * Use correct maxBytesInMemory value in AppenderatorImpl * Update DEFAULT_MAX_BYTES_IN_MEMORY to 1/6 max jvm memory Experimenting with various defaults, 1/3 jvm memory causes OOM * Update docs to correct maxBytesInMemory default value * Minor to rename and add comment * Add more details in docs * Address new PR comments * Address PR comments * Fix spelling typo
2018-05-03 19:25:58 -04:00
|maxRowsInMemory| The max number of ingested rows to hold in memory before a flush to disk. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set| number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
|maxBytesInMemory| The number of bytes to keep in memory before a flush to disk. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)| number of un-persisted post-aggregation bytes in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
The normal, expected use cases have the following overall constraints: `intermediatePersistPeriod ≤ windowPeriod < segmentGranularity` and `queryGranularity ≤ segmentGranularity`
2016-01-06 00:27:52 -05:00
## Limitations
### Kafka
2019-02-28 21:10:39 -05:00
Standalone realtime processes use the Kafka high level consumer, which imposes a few restrictions.
2016-01-06 00:27:52 -05:00
2019-02-28 21:10:39 -05:00
Druid replicates segment such that logically equivalent data segments are concurrently hosted on N processes. If N1 processes go down,
the data will still be available for querying. On real-time processes, this process depends on maintaining logically equivalent
data segments on each of the N processes, which is not possible with standard Kafka consumer groups if your Kafka topic requires more than one consumer
2016-01-06 00:27:52 -05:00
(because consumers in different consumer groups will split up the data differently).
2019-02-28 21:10:39 -05:00
For example, let's say your topic is split across Kafka partitions 1, 2, & 3 and you have 2 real-time processes with linear shard specs 1 & 2.
Both of the real-time processes are in the same consumer group. Real-time process 1 may consume data from partitions 1 & 3, and real-time process 2 may consume data from partition 2.
Querying for your data through the Broker will yield correct results.
2016-01-06 00:27:52 -05:00
2019-02-28 21:10:39 -05:00
The problem arises if you want to replicate your data by creating real-time processes 3 & 4. These new real-time processes also
have linear shard specs 1 & 2, and they will consume data from Kafka using a different consumer group. In this case,
2019-02-28 21:10:39 -05:00
real-time process 3 may consume data from partitions 1 & 2, and real-time process 4 may consume data from partition 2.
From Druid's perspective, the segments hosted by real-time processes 1 and 3 are the same, and the data hosted by real-time processes
2 and 4 are the same, although they are reading from different Kafka partitions. Querying for the data will yield inconsistent
2016-01-06 00:27:52 -05:00
results.
Is this always a problem? No. If your data is small enough to fit on a single Kafka partition, you can replicate without issues.
2019-02-28 21:10:39 -05:00
Otherwise, you can run real-time processes without replication.
2016-01-06 00:27:52 -05:00
2016-03-03 16:15:49 -05:00
Please note that druid will skip over event that failed its checksum and it is corrupt.
2016-01-06 00:27:52 -05:00
### Locking
2019-02-28 21:10:39 -05:00
Using stream pull ingestion with Realtime processes together batch ingestion may introduce data override issues. For example, if you
are generating hourly segments for the current day, and run a daily batch job for the current day's data, the segments created by
the batch job will have a more recent version than most of the segments generated by realtime ingestion. If your batch job is indexing
data that isn't yet complete for the day, the daily segment created by the batch job can override recent segments created by
2019-02-28 21:10:39 -05:00
realtime processes. A portion of data will appear to be lost in this case.
2016-01-06 00:27:52 -05:00
### Schema changes
2019-02-28 21:10:39 -05:00
Standalone realtime processes require stopping a process to update a schema, and starting it up again for the schema to take effect.
2016-01-06 00:27:52 -05:00
This can be difficult to manage at scale, especially with multiple partitions.
### Log management
2019-02-28 21:10:39 -05:00
Each standalone realtime process has its own set of logs. Diagnosing errors across many partitions across many servers may be
2016-01-06 00:27:52 -05:00
difficult to manage and track at scale.
## Deployment Notes
Stream ingestion may generate a large number of small segments because it's difficult to optimize the segment size at
ingestion time. The number of segments will increase over time, and this might cause the query performance issue.
Details on how to optimize the segment size can be found on [Segment size optimization](../operations/segment-optimization.html).