The code in the groupBy engine and the topN engine assume that the dimensions are comparable and can call dimA.compareTo(dimB) to sort the dimensions and group them together.
This works well for the primitive dimensions, because they are Comparable, however falls apart when the dimensions can be arrays (or in future scenarios complex columns). In cases when the dimensions are not comparable, Druid resorts to having a wrapper type ComparableStringArray and ComparableList, which is a Comparable, based on the list comparator.
Changes:
- Add visibility into number of records processed by each streaming task per partition
- Add field `recordsProcessed` to `IngestionStatsAndErrorsTaskReportData`
- Populate number of records processed per partition in `SeekableStreamIndexTaskRunner`
This PR contains a portion of the changes from the inactive draft PR for integrating the catalog with the Calcite planner https://github.com/apache/druid/pull/13686 from @paul-rogers, Refactoring the IngestHandler and subclasses to produce a validated SqlInsert instance node instead of the previous Insert source node. The SqlInsert node is then validated in the calcite validator. The validation that is implemented as part of this pr, is only that for the source node, and some of the validation that was previously done in the ingest handlers. As part of this change, the partitionedBy clause can be supplied by the table catalog metadata if it exists, and can be omitted from the ingest time query in this case.
Apache Druid brings the dependency json-path which is affected by CVE-2023-51074.
Its latest version 2.9.0 fixes the above CVE.
Append function has been added to json-path and so the unit test to check for the append function not present has been updated.
---------
Co-authored-by: Xavier Léauté <xvrl@apache.org>
* Rework ExprMacro base classes to simplify implementations.
This patch removes BaseScalarUnivariateMacroFunctionExpr, adds
BaseMacroFunctionExpr at the top of the hierarchy (a suitable base class
for ExprMacros that take either arrays or scalars), and adds an
implementation for "visit" to BaseMacroFunctionExpr.
The effect on implementations is generally cleaner code:
- Exprs no longer need to implement "visit".
- Exprs no longer need to implement "stringify", even if they don't
use all of their args at runtime, because BaseMacroFunctionExpr has
access to even unused args.
- Exprs that accept arrays can extend BaseMacroFunctionExpr and
inherit a bunch of useful methods. The only one they need to
implement themselves that scalar exprs don't is "supplyAnalyzeInputs".
* Make StringDecodeBase64UTFExpression a static class.
* Remove unused import.
* Formatting, annotation changes.
* Fix HllSketchHolderObjectStrategy#isSafeToConvertToNullSketch.
The prior code from #15162 was reading only the low-order byte of an int
representing the size of a coupon set. As a result, it would erroneously
believe that a coupon set with a multiple of 256 elements was empty.
During ingestion, incremental segments are created in memory for the different time chunks and persisted to disk when certain thresholds are reached (max number of rows, max memory, incremental persist period etc). In the case where there are a lot of dimension and metrics (1000+) it was observed that the creation/serialization of incremental segment file format for persistence and persisting the file took a while and it was blocking ingestion of new data. This affected the real-time ingestion. This serialization and persistence can be parallelized across the different time chunks. This update aims to do that.
The patch adds a simple configuration parameter to the ingestion tuning configuration to specify number of persistence threads. The default value is 1 if it not specified which makes it the same as it is today.
PassthroughAggregatorFactory overrides a deprecated method in the AggregatorFactory, on which it relies on for serializing one of its fields complexTypeName. This was accidentally removed, leading to a bug in the factory, where the type name doesn't get serialized properly, and places null in the type name. This PR revives that method with a different name and adds tests for the same.
- After upgrading the pac4j version in: https://github.com/apache/druid/pull/15522. We were not able to access the druid ui.
- Upgraded the Nimbus libraries version to a compatible version to pac4j.
- In the older pac4j version, when we return RedirectAction there we also update the webcontext Response status code and add the authentication URL to the header. But in the newer pac4j version, we just simply return the RedirectAction. So that's why it was not getting redirected to the generated authentication URL.
- To fix the above, I have updated the NOOP_HTTP_ACTION_ADAPTER to JEE_HTTP_ACTION_ADAPTER and it updates the HTTP Response in context as per the HTTP Action.
As part of becoming FIPS compliance, we are seeing this error: salt must be at least 128 bits when we run the Druid code against FIPS Compliant cryptographic security providers.
This PR fixes the salt size used in Pac4jSessionStore.java
### Description
Our Kinesis consumer works by using the [GetRecords API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) in some number of `fetchThreads`, each fetching some number of records (`recordsPerFetch`) and each inserting into a shared buffer that can hold a `recordBufferSize` number of records. The logic is described in our documentation at: https://druid.apache.org/docs/27.0.0/development/extensions-core/kinesis-ingestion/#determine-fetch-settings
There is a problem with the logic that this pr fixes: the memory limits rely on a hard-coded “estimated record size” that is `10 KB` if `deaggregate: false` and `1 MB` if `deaggregate: true`. There have been cases where a supervisor had `deaggregate: true` set even though it wasn’t needed, leading to under-utilization of memory and poor ingestion performance.
Users don’t always know if their records are aggregated or not. Also, even if they could figure it out, it’s better to not have to. So we’d like to eliminate the `deaggregate` parameter, which means we need to do memory management more adaptively based on the actual record sizes.
We take advantage of the fact that GetRecords doesn’t return more than 10MB (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html ):
This pr:
eliminates `recordsPerFetch`, always use the max limit of 10000 records (the default limit if not set)
eliminate `deaggregate`, always have it true
cap `fetchThreads` to ensure that if each fetch returns the max (`10MB`) then we don't exceed our budget (`100MB` or `5% of heap`). In practice this means `fetchThreads` will never be more than `10`. Tasks usually don't have that many processors available to them anyway, so in practice I don't think this will change the number of threads for too many deployments
add `recordBufferSizeBytes` as a bytes-based limit rather than records-based limit for the shared queue. We do know the byte size of kinesis records by at this point. Default should be `100MB` or `10% of heap`, whichever is smaller.
add `maxBytesPerPoll` as a bytes-based limit for how much data we poll from shared buffer at a time. Default is `1000000` bytes.
deprecate `recordBufferSize`, use `recordBufferSizeBytes` instead. Warning is logged if `recordBufferSize` is specified
deprecate `maxRecordsPerPoll`, use `maxBytesPerPoll` instead. Warning is logged if maxRecordsPerPoll` is specified
Fixed issue that when the record buffer is full, the fetchRecords logic throws away the rest of the GetRecords result after `recordBufferOfferTimeout` and starts a new shard iterator. This seems excessively churny. Instead, wait an unbounded amount of time for queue to stop being full. If the queue remains full, we’ll end up right back waiting for it after the restarted fetch.
There was also a call to `newQ::offer` without check in `filterBufferAndResetBackgroundFetch`, which seemed like it could cause data loss. Now checking return value here, and failing if false.
### Release Note
Kinesis ingestion memory tuning config has been greatly simplified, and a more adaptive approach is now taken for the configuration. Here is a summary of the changes made:
eliminates `recordsPerFetch`, always use the max limit of 10000 records (the default limit if not set)
eliminate `deaggregate`, always have it true
cap `fetchThreads` to ensure that if each fetch returns the max (`10MB`) then we don't exceed our budget (`100MB` or `5% of heap`). In practice this means `fetchThreads` will never be more than `10`. Tasks usually don't have that many processors available to them anyway, so in practice I don't think this will change the number of threads for too many deployments
add `recordBufferSizeBytes` as a bytes-based limit rather than records-based limit for the shared queue. We do know the byte size of kinesis records by at this point. Default should be `100MB` or `10% of heap`, whichever is smaller.
add `maxBytesPerPoll` as a bytes-based limit for how much data we poll from shared buffer at a time. Default is `1000000` bytes.
deprecate `recordBufferSize`, use `recordBufferSizeBytes` instead. Warning is logged if `recordBufferSize` is specified
deprecate `maxRecordsPerPoll`, use `maxBytesPerPoll` instead. Warning is logged if maxRecordsPerPoll` is specified
* Clear "lineSplittable" for JSON when using KafkaInputFormat.
JsonInputFormat has a "withLineSplittable" method that can be used to
control whether JSON is read line-by-line, or as a whole. The intent
is that in streaming ingestion, "lineSplittable" is false (although it
can be overridden by "assumeNewlineDelimited"), and in batch ingestion,
lineSplittable is true.
When a "json" format is wrapped by a "kafka" format, this isn't set
properly. This patch updates KafkaInputFormat to set this on an
underlying "json" format.
The tests for KafkaInputFormat were overriding the "lineSplittable"
parameter explicitly, which wasn't really fair, because that made them
unrealistic to what happens in production. Now they omit the parameter
and get the production behavior.
* Add test.
* Fix test coverage.
* Faster parsing: reduce String usage, list-based input rows.
Three changes:
1) Reworked FastLineIterator to optionally avoid generating Strings
entirely, and reduce copying somewhat. Benefits the line-oriented
JSON, CSV, delimited (TSV), and regex formats.
2) In the delimited (TSV) format, when the delimiter is a single byte,
split on UTF-8 bytes directly.
3) In CSV and delimited (TSV) formats, use list-based input rows when
the column list is provided upfront by the user.
* Fix style.
* Fix inspections.
* Restore validation.
* Remove fastutil-extra.
* Exception type.
* Fixes for error messages.
* Fixes for null handling.
MSQ now allows empty ingest queries by default. For such queries that don't generate any output rows, the query counters in the async status result object/task report don't contain numTotalRows and totalSizeInBytes. These properties when not set/undefined can be confusing to API clients. For example, the web-console treats it as unknown values.
This patch fixes the counters by explicitly reporting them as 0 instead of null for empty ingest queries.
* support groups windowing mode; which is a close relative of ranges (but not in the standard)
* all windows with range expressions will be executed wit it groups
* it will be 100% correct in case for both bounds its true that: isCurrentRow() || isUnBounded()
* this covers OVER ( ORDER BY COL )
* for other cases it will have some chances of getting correct results...
Changes:
- Add new task context flag useConcurrentLocks.
- This can be set for an individual task or at a cluster level using `druid.indexer.task.default.context`.
- When set to true, any appending task would use an APPEND lock and any other
ingestion task would use a REPLACE lock when using time chunk locking.
- If false (default), we fall back on the context flag taskLockType and then useSharedLock.
* Add ImmutableLookupMap for static lookups.
This patch adds a new ImmutableLookupMap, which comes with an
ImmutableLookupExtractor. It uses a fastutil open hashmap plus two
lists to store its data in such a way that forward and reverse
lookups can both be done quickly. I also observed footprint to be
somewhat smaller than Java HashMap + MapLookupExtractor for a 1 million
row lookup.
The main advantage, though, is that reverse lookups can be done much
more quickly than MapLookupExtractor (which iterates the entire map
for each call to unapplyAll). This speeds up the recently added
ReverseLookupRule (#15626) during SQL planning with very large lookups.
* Use in one more test.
* Fix benchmark.
* Object2ObjectOpenHashMap
* Fixes, and LookupExtractor interface update to have asMap.
* Remove commented-out code.
* Fix style.
* Fix import order.
* Add fastutil.
* Avoid storing Map entries.
* Faster k-way merging using tournament trees, 8-byte key strides.
Two speedups for FrameChannelMerger (which does k-way merging in MSQ):
1) Replace the priority queue with a tournament tree, which does fewer
comparisons.
2) Compare keys using 8-byte strides, rather than 1 byte at a time.
* Adjust comments.
* Fix style.
* Adjust benchmark and test.
* Add eight-list test (power of two).
Add class PasswordHashGenerator. Move hashing logic from BasicAuthUtils to this new class.
Add cache in the hash generator to contain the computed hash of passwords and boost validator performance
Cache has max size 1000 and expiry 1 hour
Key of the cache is an SHA-256 hash of the (password + random salt generated on service startup)
Currently, If 2 tasks are consuming from the same partitions, try to publish the segment and update the metadata, the second task can fail because the end offset stored in the metadata store doesn't match with the start offset of the second task. We can fix this by retrying instead of failing.
AFAIK apart from the above issue, the metadata mismatch can happen in 2 scenarios:
- when we update the input topic name for the data source
- when we run 2 replicas of ingestion tasks(1 replica will publish and 1 will fail as the first replica has already updated the metadata).
Implemented the comparable function to compare the last committed end offset and new Sequence start offset. And return a specific error msg for this.
Add retry logic on indexers to retry for this specific error msg.
Updated the existing test case.
Added support for Azure Government storage in Druid Azure-Extensions. This enhancement allows the Azure-Extensions to be compatible with different Azure storage types by updating the endpoint suffix from a hardcoded value to a configurable one.
* overhaul DruidPredicateFactory to better handle 3VL
fixes some bugs caused by some limitations of the original design of how DruidPredicateFactory interacts with 3-value logic. The primary impacted area was with how filters on values transformed with expressions or extractionFn which turn non-null values into nulls, which were not possible to be modelled with the 'isNullInputUnknown' method
changes:
* adds DruidObjectPredicate to specialize string, array, and object based predicates instead of using guava Predicate
* DruidPredicateFactory now uses DruidObjectPredicate
* introduces DruidPredicateMatch enum, which all predicates returned from DruidPredicateFactory now use instead of booleans to indicate match. This means DruidLongPredicate, DruidFloatPredicate, DruidDoublePredicate, and the newly added DruidObjectPredicate apply methods all now return DruidPredicateMatch. This allows matchers and indexes
* isNullInputUnknown has been removed from DruidPredicateFactory
* rename, fix test
* adjust
* style
* npe
* more test
* fix default value mode to not match new test
FILTER_INTO_JOIN is mainly run along with the other rules with the Volcano planner; however if the query starts highly underdefined (join conditions in the where clauses) that generic query could give a lot of room for the other rules to play around with only enabled it for when the join uses subqueries for its inputs.
PROJECT_FILTER rule is not that useful. and could increase planning times by providing new plans. This problem worsened after we started supporting inner joins with arbitrary join conditions in https://github.com/apache/druid/pull/15302
* Reverse lookup fixes and enhancements.
1) Add a "mayIncludeUnknown" parameter to DimFilter#optimize. This is important
because otherwise the reverse-lookup optimization is done improperly when
the "in" filter appears under a "not", and the lookup extractionFn may return
null for some possible values of the filtered column. The "includeUnknown" test
cases in InDimFilterTest illustrate the difference in behavior.
2) Enhance InDimFilter#optimizeLookup to handle "mayIncludeUnknown", and to be able
to do a reverse lookup in a wider variety of cases.
3) Make "unapply" protected in LookupExtractor, and move callers to "unapplyAll".
The main reason is that MapLookupExtractor, a common implementation, lacks a
reverse mapping and therefore does a scan of the map for each call to "unapply".
For performance sake these calls need to be batched.
* Remove optimize call from BloomDimFilter.
* Follow the law.
* Fix tests.
* Fix imports.
* Switch function.
* Fix tests.
* More tests.
* Allow empty inserts and replace.
- Introduce a new query context failOnEmptyInsert which defaults to false.
- When this context is false (default), MSQE will now allow empty inserts and replaces.
- When this context is true, MSQE will throw the existing InsertCannotBeEmpty MSQ fault.
- For REPLACE ALL over an ALL grain segment, the query will generate a tombstone spanning eternity
which will be removed eventually be the coordinator.
- Add unit tests in MSQInsertTest, MSQReplaceTest to test the new default behavior (i.e., when failOnEmptyInsert = false)
- Update unit tests in MSQFaultsTest to test the non-default behavior (i.e., when failOnEmptyInsert = true)
* Ignore test to see if it's the culprit for OOM
* Add heap dump config
* Bump up -Xmx from 1500 MB to 2048 MB
* Add steps to tarball and collect hprof dump to GHA action
* put back mx to 1500MB to trigger the failure
* add the step to reusable unit test workflow as well
* Revert the temp heap dump & @Ignore changes since max heap size is increased
* Minor updates
* Review comments
1. Doc suggestions
2. Add tests for empty insert and replace queries with ALL grain and limit in the
default failOnEmptyInsert mode (=false). Add similar tests to MSQFaultsTest with
failOnEmptyInsert = true, so the query does fail with an InsertCannotBeEmpty fault.
3. Nullable annotation and javadocs
* Add comment
replace_limit.patch
Changes
- Add `log` implementation for `AuditManager` alongwith `SQLAuditManager`
- `LoggingAuditManager` simply logs the audit event. Thus, it returns empty for
all `fetchAuditHistory` calls.
- Add new config `druid.audit.manager.type` which can take values `log`, `sql` (default)
- Add new config `druid.audit.manager.logLevel` which can take values `DEBUG`, `INFO`, `WARN`.
This gets activated only if `type` is `log`.
- Remove usage of `ConfigSerde` from `AuditManager` as audit is not just limited to configs
- Add `AuditSerdeHelper` for a single implementation of serialization/deserialization of
audit payload and other utility methods.