* Merge hydrant runners flatly for realtime queries.
Prior to this patch, we have two layers of mergeRunners for realtime
queries: one for each Sink (a logical segment) and one across all
Sinks. This is done because to keep metrics and results grouped by Sink,
given that each FireHydrant within a Sink has its own separate storage
adapter.
However, it costs extra memory usage due to the extra layer of
materialization. This is especially pronounced for groupBy queries,
which only use their merge buffers at the top layer of merging. The
lower layer of merging materializes ResultRows directly into the heap,
which can cause heap exhaustion if there are enough ResultRows.
This patch changes to a single layer of merging when bySegment: false,
just like Historicals. To accommodate that, segment metrics like
query/segment/time are now per-FireHydrant instead of per-Sink.
Two layers of merging are retained when bySegment: true. This isn't
common, because it's typically only used when segment level caching
is enabled on the Broker, which is off by default.
* Use SinkQueryRunners.
* Remove unused method.
* Possibly stabilize intellij-inspections
* remove `integration-tests-ex/cases` from excluded projects from initial build
* enable ErrorProne's `CheckedExceptionNotThrown` to get earlier errors than intellij-inspections
* fix ddsketch pom.xml
* fix spellcheck
* New: Add DDSketch-Druid extension
- Based off of http://www.vldb.org/pvldb/vol12/p2195-masson.pdf and uses
the corresponding https://github.com/DataDog/sketches-java library
- contains tests for post building and using aggregation/post
aggregation.
- New aggregator: `ddSketch`
- New post aggregators: `quantileFromDDSketch` and
`quantilesFromDDSketch`
* Fixing easy CodeQL warnings/errors
* Fixing docs, and dependencies
Also moved aggregator ids to AggregatorUtil and PostAggregatorIds
* Adding more Docs and better null/empty handling for aggregators
* Fixing docs, and pom version
* DDSketch documentation format and wording
A low value of inSubQueryThreshold can cause queries with IN filter to plan as joins more commonly. However, some of these join queries may not get planned as IN filter on data nodes and causes significant perf regression.
### Description
Our Kinesis consumer works by using the [GetRecords API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) in some number of `fetchThreads`, each fetching some number of records (`recordsPerFetch`) and each inserting into a shared buffer that can hold a `recordBufferSize` number of records. The logic is described in our documentation at: https://druid.apache.org/docs/27.0.0/development/extensions-core/kinesis-ingestion/#determine-fetch-settings
There is a problem with the logic that this pr fixes: the memory limits rely on a hard-coded “estimated record size” that is `10 KB` if `deaggregate: false` and `1 MB` if `deaggregate: true`. There have been cases where a supervisor had `deaggregate: true` set even though it wasn’t needed, leading to under-utilization of memory and poor ingestion performance.
Users don’t always know if their records are aggregated or not. Also, even if they could figure it out, it’s better to not have to. So we’d like to eliminate the `deaggregate` parameter, which means we need to do memory management more adaptively based on the actual record sizes.
We take advantage of the fact that GetRecords doesn’t return more than 10MB (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html ):
This pr:
eliminates `recordsPerFetch`, always use the max limit of 10000 records (the default limit if not set)
eliminate `deaggregate`, always have it true
cap `fetchThreads` to ensure that if each fetch returns the max (`10MB`) then we don't exceed our budget (`100MB` or `5% of heap`). In practice this means `fetchThreads` will never be more than `10`. Tasks usually don't have that many processors available to them anyway, so in practice I don't think this will change the number of threads for too many deployments
add `recordBufferSizeBytes` as a bytes-based limit rather than records-based limit for the shared queue. We do know the byte size of kinesis records by at this point. Default should be `100MB` or `10% of heap`, whichever is smaller.
add `maxBytesPerPoll` as a bytes-based limit for how much data we poll from shared buffer at a time. Default is `1000000` bytes.
deprecate `recordBufferSize`, use `recordBufferSizeBytes` instead. Warning is logged if `recordBufferSize` is specified
deprecate `maxRecordsPerPoll`, use `maxBytesPerPoll` instead. Warning is logged if maxRecordsPerPoll` is specified
Fixed issue that when the record buffer is full, the fetchRecords logic throws away the rest of the GetRecords result after `recordBufferOfferTimeout` and starts a new shard iterator. This seems excessively churny. Instead, wait an unbounded amount of time for queue to stop being full. If the queue remains full, we’ll end up right back waiting for it after the restarted fetch.
There was also a call to `newQ::offer` without check in `filterBufferAndResetBackgroundFetch`, which seemed like it could cause data loss. Now checking return value here, and failing if false.
### Release Note
Kinesis ingestion memory tuning config has been greatly simplified, and a more adaptive approach is now taken for the configuration. Here is a summary of the changes made:
eliminates `recordsPerFetch`, always use the max limit of 10000 records (the default limit if not set)
eliminate `deaggregate`, always have it true
cap `fetchThreads` to ensure that if each fetch returns the max (`10MB`) then we don't exceed our budget (`100MB` or `5% of heap`). In practice this means `fetchThreads` will never be more than `10`. Tasks usually don't have that many processors available to them anyway, so in practice I don't think this will change the number of threads for too many deployments
add `recordBufferSizeBytes` as a bytes-based limit rather than records-based limit for the shared queue. We do know the byte size of kinesis records by at this point. Default should be `100MB` or `10% of heap`, whichever is smaller.
add `maxBytesPerPoll` as a bytes-based limit for how much data we poll from shared buffer at a time. Default is `1000000` bytes.
deprecate `recordBufferSize`, use `recordBufferSizeBytes` instead. Warning is logged if `recordBufferSize` is specified
deprecate `maxRecordsPerPoll`, use `maxBytesPerPoll` instead. Warning is logged if maxRecordsPerPoll` is specified
* Kill tasks should honor the buffer period of unused segments.
- The coordinator duty KillUnusedSegments determines an umbrella interval
for each datasource to determine the kill interval. There can be multiple unused
segments in an umbrella interval with different used_status_last_updated timestamps.
For example, consider an unused segment that is 30 days old and one that is 1 hour old. Currently
the kill task after the 30-day mark would kill both the unused segments and not retain the 1-hour
old one.
- However, when a kill task is instantiated with this umbrella interval, it’d kill
all the unused segments regardless of the last updated timestamp. We need kill
tasks and RetrieveUnusedSegmentsAction to honor the bufferPeriod to avoid killing
unused segments in the kill interval prematurely.
* Clarify default behavior in docs.
* test comments
* fix canDutyRun()
* small updates.
* checkstyle
* forbidden api fix
* doc fix, unused import, codeql scan error, and cleanup logs.
* Address review comments
* Rename maxUsedFlagLastUpdatedTime to maxUsedStatusLastUpdatedTime
This is consistent with the column name `used_status_last_updated`.
* Apply suggestions from code review
Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
* Make period Duration type
* Remove older variants of runKilLTask() in OverlordClient interface
* Test can now run without waiting for canDutyRun().
* Remove previous variants of retrieveUnusedSegments from internal metadata storage coordinator interface.
Removes the following interface methods in favor of a new method added:
- retrieveUnusedSegmentsForInterval(String, Interval)
- retrieveUnusedSegmentsForInterval(String, Interval, Integer)
* Chain stream operations
* cleanup
* Pass in the lastUpdatedTime to markUnused test function and remove sleep.
---------
Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
* Undocument unused segments retrieval API.
* Mark API deprecated and unstable. Note that it'll be removed.
* Cleanup .spelling entries
* Remove the Unstable annotation
This change intelligently provisions the correct number of threads per scheduled task. 1 for each event type, and 1 for logging the lost events.
This is a change to make this work. But in the future it would be worthwhile to make each task not be greedy and share threads so there isn't a need of a thread per task.
* IncrementalIndex#add is no longer thread-safe.
Following #14866, there is no longer a reason for IncrementalIndex#add
to be thread-safe.
It turns out it already was not using its selectors in a thread-safe way,
as exposed by #15615 making `testMultithreadAddFactsUsingExpressionAndJavaScript`
in `IncrementalIndexIngestionTest` flaky. Note that this problem isn't
new: Strings have been stored in the dimension selectors for some time,
but we didn't have a test that checked for that case; we only have
this test that checks for concurrent adds involving numeric selectors.
At any rate, this patch changes OnheapIncrementalIndex to no longer try
to offer a thread-safe "add" method. It also improves performance a bit
by adding a row ID supplier to the selectors it uses to read InputRows,
meaning that it can get the benefit of caching values inside the selectors.
This patch also:
1) Adds synchronization to HyperUniquesAggregator and CardinalityAggregator,
which the similar datasketches versions already have. This is done to
help them adhere to the contract of Aggregator: concurrent calls to
"aggregate" and "get" must be thread-safe.
2) Updates OnHeapIncrementalIndexBenchmark to use JMH and moves it to the
druid-benchmarks module.
* Spelling.
* Changes from static analysis.
* Fix javadoc.
* Clear "lineSplittable" for JSON when using KafkaInputFormat.
JsonInputFormat has a "withLineSplittable" method that can be used to
control whether JSON is read line-by-line, or as a whole. The intent
is that in streaming ingestion, "lineSplittable" is false (although it
can be overridden by "assumeNewlineDelimited"), and in batch ingestion,
lineSplittable is true.
When a "json" format is wrapped by a "kafka" format, this isn't set
properly. This patch updates KafkaInputFormat to set this on an
underlying "json" format.
The tests for KafkaInputFormat were overriding the "lineSplittable"
parameter explicitly, which wasn't really fair, because that made them
unrealistic to what happens in production. Now they omit the parameter
and get the production behavior.
* Add test.
* Fix test coverage.
* Faster parsing: reduce String usage, list-based input rows.
Three changes:
1) Reworked FastLineIterator to optionally avoid generating Strings
entirely, and reduce copying somewhat. Benefits the line-oriented
JSON, CSV, delimited (TSV), and regex formats.
2) In the delimited (TSV) format, when the delimiter is a single byte,
split on UTF-8 bytes directly.
3) In CSV and delimited (TSV) formats, use list-based input rows when
the column list is provided upfront by the user.
* Fix style.
* Fix inspections.
* Restore validation.
* Remove fastutil-extra.
* Exception type.
* Fixes for error messages.
* Fixes for null handling.
This PR fixes the summary iterator to add aggregators in the correct position. The summary iterator is used when dims are not present, therefore the new change is identical to the old one, but seems more correct while reading.
MSQ now allows empty ingest queries by default. For such queries that don't generate any output rows, the query counters in the async status result object/task report don't contain numTotalRows and totalSizeInBytes. These properties when not set/undefined can be confusing to API clients. For example, the web-console treats it as unknown values.
This patch fixes the counters by explicitly reporting them as 0 instead of null for empty ingest queries.
* support groups windowing mode; which is a close relative of ranges (but not in the standard)
* all windows with range expressions will be executed wit it groups
* it will be 100% correct in case for both bounds its true that: isCurrentRow() || isUnBounded()
* this covers OVER ( ORDER BY COL )
* for other cases it will have some chances of getting correct results...
Changes:
- Add new task context flag useConcurrentLocks.
- This can be set for an individual task or at a cluster level using `druid.indexer.task.default.context`.
- When set to true, any appending task would use an APPEND lock and any other
ingestion task would use a REPLACE lock when using time chunk locking.
- If false (default), we fall back on the context flag taskLockType and then useSharedLock.
* Cache value selectors in RowBasedColumnSelectorFactory.
There was already caching for dimension selectors. This patch adds caching
for value (object and number) selectors. It's helpful when the same field is
read multiple times during processing of a single row (for example, by being
an input to both MIN and MAX aggregations).
* Fix typing.
* Fix logic.
Changes:
- Handle exception in deletePendingSegments API and map to correct HTTP status code
- Clean up exception message using `DruidException`
- Add unit tests
* Add SpectatorHistogram extension
* Clarify documentation
Cleanup comments
* Use ColumnValueSelector directly
so that we support being queried as a Number using longSum or doubleSum aggregators as well as a histogram.
When queried as a Number, we're returning the count of entries in the histogram.
* Apply suggestions from code review
Co-authored-by: Victoria Lim <vtlim@users.noreply.github.com>
* Fix references
* Fix spelling
* Update docs/development/extensions-contrib/spectator-histogram.md
Co-authored-by: Victoria Lim <vtlim@users.noreply.github.com>
---------
Co-authored-by: Victoria Lim <vtlim@users.noreply.github.com>
* Add ImmutableLookupMap for static lookups.
This patch adds a new ImmutableLookupMap, which comes with an
ImmutableLookupExtractor. It uses a fastutil open hashmap plus two
lists to store its data in such a way that forward and reverse
lookups can both be done quickly. I also observed footprint to be
somewhat smaller than Java HashMap + MapLookupExtractor for a 1 million
row lookup.
The main advantage, though, is that reverse lookups can be done much
more quickly than MapLookupExtractor (which iterates the entire map
for each call to unapplyAll). This speeds up the recently added
ReverseLookupRule (#15626) during SQL planning with very large lookups.
* Use in one more test.
* Fix benchmark.
* Object2ObjectOpenHashMap
* Fixes, and LookupExtractor interface update to have asMap.
* Remove commented-out code.
* Fix style.
* Fix import order.
* Add fastutil.
* Avoid storing Map entries.
* Fix k8sAndWorker mode in a zookeeper-less environment
* add unit test
* code reformat
* minor refine
* change to inject Provider
* correct style
* bind HttpRemoteTaskRunnerFactory as provider
* change to bind on TaskRunnerFactory
* fix styling
* Reverse, pull up lookups in the SQL planner.
Adds two new rules:
1) ReverseLookupRule, which eliminates calls to LOOKUP by doing
reverse lookups.
2) AggregatePullUpLookupRule, which pulls up calls to LOOKUP above
GROUP BY, when the lookup is injective.
Adds configs `sqlReverseLookup` and `sqlPullUpLookup` to control whether
these rules fire. Both are enabled by default.
To minimize the chance of performance problems due to many keys mapping to
the same value, ReverseLookupRule refrains from reversing a lookup if there
are more keys than `inSubQueryThreshold`. The rationale for using this setting
is that reversal works by generating an IN, and the `inSubQueryThreshold`
describes the largest IN the user wants the planner to create.
* Add additional line.
* Style.
* Remove commented-out lines.
* Fix tests.
* Add test.
* Fix doc link.
* Fix docs.
* Add one more test.
* Fix tests.
* Logic, test updates.
* - Make FilterDecomposeConcatRule more flexible.
- Make CalciteRulesManager apply reduction rules til fixpoint.
* Additional tests, simplify code.
* CONCAT flattening, filter decomposition.
Flattening: CONCAT(CONCAT(x, y), z) is flattened to CONCAT(x, y, z). This
is especially useful for the || operator, which is a binary operator and
leads to non-flat CONCAT calls.
Filter decomposition: transforms CONCAT(x, '-', y) = 'a-b' into
x = 'a' AND y = 'b'.
* One more test.
* Fix two tests.
* Adjustments from review.
* Fix empty string problem, add tests.
* Faster k-way merging using tournament trees, 8-byte key strides.
Two speedups for FrameChannelMerger (which does k-way merging in MSQ):
1) Replace the priority queue with a tournament tree, which does fewer
comparisons.
2) Compare keys using 8-byte strides, rather than 1 byte at a time.
* Adjust comments.
* Fix style.
* Adjust benchmark and test.
* Add eight-list test (power of two).
* Update execution-submit-dialog for file input support
Modified the execution-submit-dialog to support file inputs instead of text inputs for better usability. Users can now submit their queries by selecting a JSON file directly or dragging the file into the dialog. Made appropriate UI adjustments to accommodate this change in execution-submit-dialog styles file.
* Update web-console/src/views/workbench-view/execution-submit-dialog/execution-submit-dialog.tsx
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
* Update web-console/src/views/workbench-view/execution-submit-dialog/execution-submit-dialog.tsx
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
* Update web-console/src/views/workbench-view/execution-submit-dialog/execution-submit-dialog.tsx
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
* Update drag-and-drop instructions in execution-submit-dialog
* Add snapshot tests for ExecutionSubmitDialog
* prettify
---------
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
I was looking into adding a rule to do this, and found that it was already
happening as part of Calcite's RexSimplify. So this patch simply adds some
tests to ensure that it continues to happen.