This fixes a race where, if there is no output at all, setAllDoneIfPossible
could be called twice (once when the output partitions future resolves, and
once when the batcher finishes). If the calls happen in that order, it would
try to create nil output channels both times, resulting in a "Channel already set"
error.
Stages can be instructed to exit before they finish, especially when a
downstream stage includes a "LIMIT". This patch has improvements related
to early-exiting stages.
Bug fix:
- WorkerStageKernel: Don't allow fail() to set an exception if the stage is
already in a terminal state (FINISHED or FAILED). If fail() is called while
in a terminal state, log the exception, then throw it away. If it's a
cancellation exception, don't even log it. This fixes a bug where a stage
that exited early could transition to FINISHED and then to FAILED, causing
the overall query to fail.
Performance:
- DartWorkerManager previously sent stopWorker commands to workers
even when "interrupt" was false. Now it only sends those commands when
"interrupt" is true. The method javadoc already claimed this is what the
method did, but the implementation did not match the javadoc. This reduces
the number of RPCs by 1 per worker per query.
Quieter logging:
- In ReadableByteChunksFrameChannel, skip logging exception from setError if
the channel has been closed. Channels are closed when readers are done with
them, so at that point, we wouldn't be interested in the errors.
- In RunWorkOrder, skip calling notifyListener on failure of the main work,
in the case when stop() has already been called. The stop() method will
set its own error using CanceledFault. This enables callers to detect
when a stage was canceled vs. failed for some other reason.
- In WorkerStageKernel, skip logging cancellation errors in fail(). This is
made possible by the previous change in RunWorkOrder.
* adds support for `UNNEST` expressions
* introduces `LogicalUnnestRule` to transform a `Correlate` doing UNNEST into a `LogicalUnnest`
* `UnnestInputCleanupRule` could move the final unnested expr into the `LogicalUnnest` itself (usually its an `mv_to_array` expression)
* enhanced source unwrapping to utilize `FilteredDataSource` if it looks right
This patch adds a profile of MSQ named "Dart" that runs on Brokers and
Historicals, and which is compatible with the standard SQL query API.
For more high-level description, and notes on future work, refer to #17139.
This patch contains the following changes, grouped into packages.
Controller (org.apache.druid.msq.dart.controller):
The controller runs on Brokers. Main classes are,
- DartSqlResource, which serves /druid/v2/sql/dart/.
- DartSqlEngine and DartQueryMaker, the entry points from SQL that actually
run the MSQ controller code.
- DartControllerContext, which configures the MSQ controller.
- DartMessageRelays, which sets up relays (see "message relays" below) to read
messages from workers' DartControllerClients.
- DartTableInputSpecSlicer, which assigns work based on a TimelineServerView.
Worker (org.apache.druid.msq.dart.worker)
The worker runs on Historicals. Main classes are,
- DartWorkerResource, which supplies the regular MSQ WorkerResource, plus
Dart-specific APIs.
- DartWorkerRunner, which runs MSQ worker code.
- DartWorkerContext, which configures the MSQ worker.
- DartProcessingBuffersProvider, which provides processing buffers from
sliced-up merge buffers.
- DartDataSegmentProvider, which provides segments from the Historical's
local cache.
Message relays (org.apache.druid.messages):
To avoid the need for Historicals to contact Brokers during a query, which
would create opportunities for queries to get stuck, all connections are
opened from Broker to Historical. This is made possible by a message relay
system, where the relay server (worker) has an outbox of messages.
The relay client (controller) connects to the outbox and retrieves messages.
Code for this system lives in the "server" package to keep it separate from
the MSQ extension and make it easier to maintain. The worker-to-controller
ControllerClient is implemented using message relays.
Other changes:
- Controller: Added the method "hasWorker". Used by the ControllerMessageListener
to notify the appropriate controllers when a worker fails.
- WorkerResource: No longer tries to respond more than once in the
"httpGetChannelData" API. This comes up when a response due to resolved future
is ready at about the same time as a timeout occurs.
- MSQTaskQueryMaker: Refactor to separate out some useful functions for reuse
in DartQueryMaker.
- SqlEngine: Add "queryContext" to "resultTypeForSelect" and "resultTypeForInsert".
This allows the DartSqlEngine to modify result format based on whether a "fullReport"
context parameter is set.
- LimitedOutputStream: New utility class. Used when in "fullReport" mode.
- TimelineServerView: Add getDruidServerMetadata as a performance optimization.
- CliHistorical: Add SegmentWrangler, so it can query inline data, lookups, etc.
- ServiceLocation: Add "fromUri" method, relocating some code from ServiceClientImpl.
- FixedServiceLocator: New locator for a fixed set of service locations. Useful for
URI locations.
Fixes a mistake introduced in #16533 which can result in CursorGranularizer incorrectly trying to get values from a selector after calling cursor.advance because of a missing check for cursor.isDone
changes:
* filter index processing is now automatically ordered based on estimated 'cost', which is approximated based on how many expected bitmap operations are required to construct the bitmap used for the 'offset'
* cursorAutoArrangeFilters context flag now defaults to true, but can be set to false to disable cost based filter index sorting
Extracting a few miscellaneous non-functional changes from the batch supervisor branch:
- Replace anonymous inner classes with lambda expressions in the SQL supervisor manager layer
- Add explicit @Nullable annotations in DynamicConfigProviderUtils to make IDE happy
- Small variable renames (copy-paste error perhaps) and fix typos
- Add table name for this exception message: Delete the supervisor from the table[%s] in the database...
- Prefer CollectionUtils.isEmptyOrNull() over list == null || list.size() > 0. We can change the Precondition checks to throwing DruidException separately for a batch of APIs at a time.
* update docs for kafka lookup extension to specify correct extension ordering
* fix first line
* test with extension dependencies
* save work on dependency management
* working dependency graph
* working pull
* fix style
* fix style
* remove name
* load extension dependencies recursively
* generate depenencies on classloader creation
* add check for circular dependencies
* fix style
* revert style changes
* remove mutable class loader
* clean up class heirarchy
* extensions loader test working
* add unit tests
* pr comments
* fix unit tests
changes:
* add `ApplyFunction` support to vectorization fallback, allowing many of the remaining expressions to be vectorized
* add `CastToObjectVectorProcessor` so that vector engine can correctly cast any type
* add support for array and complex vector constants
* reduce number of cases which can block vectorization in expression planner to be unknown inputs (such as unknown multi-valuedness)
* fix array constructor expression, apply map expression to make actual evaluated type match the output type inference
* fix bug in array_contains where something like array_contains([null], 'hello') would return true if the array was a numeric array since the non-null string value would cast to a null numeric
* fix isNull/isNotNull to correctly handle any type of input argument
Text-based input formats like csv and tsv currently parse inputs only as strings, following the RFC4180Parser spec).
To workaround this, the web-console and other tools need to further inspect the sample data returned to sample data returned by the Druid sampler API to parse them as numbers.
This patch introduces a new optional config, tryParseNumbers, for the csv and tsv input formats. If enabled, any numbers present in the input will be parsed in the following manner -- long data type for integer types and double for floating-point numbers, and if parsing fails for whatever reason, the input is treated as a string. By default, this configuration is set to false, so numeric strings will be treated as strings.
* Use the whole frame when writing rows.
This patch makes the following adjustments to enable writing larger
single rows to frames:
1) RowBasedFrameWriter: Max out allocation size on the final doubling.
i.e., if the final allocation "naturally" would be 1 MiB but the
max frame size is 900 KiB, use 900 KiB rather than failing the 1 MiB
allocation.
2) AppendableMemory: In reserveAdditional, release the last block if it
is empty. This eliminates waste when a frame writer uses a
successive-doubling approach to find the right allocation size.
3) ArenaMemoryAllocator: Reclaim memory from the last allocation when
the last allocation is closed.
Prior to these changes, a single row could be much smaller than the
frame size and still fail to be added to the frame.
* Style.
* Fix test.
Currently, TaskDataSegmentProvider fetches the DataSegment from the Coordinator while loading the segment, but just discards it later. This PR refactors this to also return the DataSegment so that it can be used by workers without a separate fetch.
Previously, the processor used "remainingChannels" to track the number of
non-null entries of currentFrame. Now, "remainingChannels" tracks the
number of channels that are unfinished.
The difference is subtle. In the previous code, when an input channel
was blocked upon exiting nextFrame(), the "currentFrames" entry would be
null, and therefore the "remainingChannels" variable would be decremented.
After the next await and call to populateCurrentFramesAndTournamentTree(),
"remainingChannels" would be incremented if the channel had become
unblocked after awaiting.
This means that finished(), which returned true if remainingChannels was
zero, would not be reliable if called between nextFrame() and the
next await + populateCurrentFramesAndTournamentTree().
This patch changes things such that finished() is always reliable. This
fixes a regression introduced in PR #16911, which added a call to
finished() that was, at that time, unsafe.
Register a Ser-De for RowsAndColumns so that the window operator query running on leaf operators would be transferred properly on the wire. Would fix the empty response given by window queries without group by on the native engine.
* Speed up FrameFileTest, SuperSorterTest.
These are two heavily parameterized tests that, together, account for
about 60% of runtime in the test suite.
FrameFileTest changes:
1) Cache frame files in a static, rather than building the frame file
for each parameterization of the test.
2) Adjust TestArrayCursorFactory to cache the signature, rather than
re-creating it on each call to getColumnCapabilities.
SuperSorterTest changes:
1) Dramatically reduce the number of tests that run with
"maxRowsPerFrame" = 1. These are particularly slow due to writing so
many small files. Some still run, since it's useful to test edge cases,
but much fewer than before.
2) Reduce the "maxActiveProcessors" axis of the test from [1, 2, 4] to
[1, 3]. The aim is to reduce the number of cases while still getting
good coverage of the feature.
3) Reduce the "maxChannelsPerProcessor" axis of the test from [2, 3, 8]
to [2, 7]. The aim is to reduce the number of cases while still getting
good coverage of the feature.
4) Use in-memory input channels rather than file channels.
5) Defer formatting of assertion failure messages until they are needed.
6) Cache the cursor factory and its signature in a static.
7) Cache sorted test rows (used for verification) in a static.
* It helps to include the file.
* Style.
* abstract `IncrementalIndex` cursor stuff to prepare to allow for possibility of using different "views" of the data based on the cursor build spec
changes:
* introduce `IncrementalIndexRowSelector` interface to capture how `IncrementalIndexCursor` and `IncrementalIndexColumnSelectorFactory` read data
* `IncrementalIndex` implements `IncrementalIndexRowSelector`
* move `FactsHolder` interface to separate file
* other minor refactorings
* BaseWorkerClientImpl: Don't attempt to recover from a closed channel.
This patch introduces an exception type "ChannelClosedForWritesException",
which allows the BaseWorkerClientImpl to avoid retrying when the local
channel has been closed. This can happen in cases of cancellation.
* Add some test coverage.
* wip
* Add test coverage.
* Style.
* MSQ: Improved worker cancellation.
Four changes:
1) FrameProcessorExecutor now requires that cancellationIds be registered
with "registerCancellationId" prior to being used in "runFully" or "runAllFully".
2) FrameProcessorExecutor gains an "asExecutor" method, which allows that
executor to be used as an executor for future callbacks in such a way
that respects cancellationId.
3) RunWorkOrder gains a "stop" method, which cancels the current
cancellationId and closes the current FrameContext. It blocks until
both operations are complete.
4) Fixes a bug in RunAllFullyWidget where "processorManager.result()" was
called outside "runAllFullyLock", which could cause it to be called
out-of-order with "cleanup()" in case of cancellation or other error.
Together, these changes help ensure cancellation does not have races.
Once "cancel" is called for a given cancellationId, all existing processors
and running callbacks are canceled and exit in an orderly manner. Future
processors and callbacks with the same cancellationId are rejected
before being executed.
* Fix test.
* Use execute, which doesn't return, to avoid errorprone complaints.
* Fix some style stuff.
* Further enhancements.
* Fix style.
* MSQ: Rework memory management.
This patch reworks memory management to better support multi-threaded
workers running in shared JVMs. There are two main changes.
First, processing buffers and threads are moved from a per-JVM model to
a per-worker model. This enables queries to hold processing buffers
without blocking other concurrently-running queries. Changes:
- Introduce ProcessingBuffersSet and ProcessingBuffers to hold the
per-worker and per-work-order processing buffers (respectively). On Peons,
this is the JVM-wide processing pool. On Indexers, this is a per-worker
pool of on-heap buffers. (This change fixes a bug on Indexers where
excessive processing buffers could be used if MSQ tasks ran concurrently
with realtime tasks.)
- Add "bufferPool" argument to GroupingEngine#process so a per-worker pool
can be passed in.
- Add "druid.msq.task.memory.maxThreads" property, which controls the
maximum number of processing threads to use per task. This allows usage of
multiple processing buffers per task if admins desire.
- IndexerWorkerContext acquires processingBuffers when creating the FrameContext
for a work order, and releases them when closing the FrameContext.
- Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know
how many sets of processing buffers are needed to run a given query.
Second, adjustments to how WorkerMemoryParameters slices up bundles, to
favor more memory for sorting and segment generation. Changes:
- Instead of using same-sized bundles for processing and for sorting,
workers now use minimally-sized processing bundles (just enough to read
inputs plus a little overhead). The rest is devoted to broadcast data
buffering, sorting, and segment-building.
- Segment-building is now limited to 1 concurrent segment per work order.
This allows each segment-building action to use more memory. Note that
segment-building is internally multi-threaded to a degree. (Build and
persist can run concurrently.)
- Simplify frame size calculations by removing the distinction between
"standard" and "large" frames. The new default frame size is the same
as the old "standard" frames, 1 MB. The original goal of of the large
frames was to reduce the number of temporary files during sorting, but
I think we can achieve the same thing by simply merging a larger number
of standard frames at once.
- Remove the small worker adjustment that was added in #14117 to account
for an extra frame involved in writing to durable storage. Instead,
account for the extra frame whenever we are actually using durable storage.
- Cap super-sorter parallelism using the number of output partitions, rather
than using a hard coded cap at 4. Note that in practice, so far, this cap
has not been relevant for tasks because they have only been using a single
processing thread anyway.
* Remove unused import.
* Fix errorprone annotation.
* Fixes for javadocs and inspections.
* Additional test coverage.
* Fix test.
changes:
* CursorHolder.isPreAggregated method indicates that a cursor has pre-aggregated data for all AggregatorFactory specified in a CursorBuildSpec. If true, engines should rewrite the query to use AggregatorFactory.getCombiningAggreggator, and column selector factories will provide selectors with the aggregator interediate type for the aggregator factory name
* Added groupby, timeseries, and topN support for CursorHolder.isPreAggregated
* Added synthetic test since no CursorHolder implementations support isPreAggregated at this point in time
If the GroupByMergingQueryRunner gets scheduled after the query timeout, it fails to clean up the processing tasks that have been scheduled. This can lead to unnecessary processing being done for the tasks whos results won't get consumed.
Tasks that do not support querying or query processing i.e. supportsQueries = false do not require processing threads, processing buffers, and merge buffers.
* transition away from StorageAdapter
changes:
* CursorHolderFactory has been renamed to CursorFactory and moved off of StorageAdapter, instead fetched directly from the segment via 'asCursorFactory'. The previous deprecated CursorFactory interface has been merged into StorageAdapter
* StorageAdapter is no longer used by any engines or tests and has been marked as deprecated with default implementations of all methods that throw exceptions indicating the new methods to call instead
* StorageAdapter methods not covered by CursorFactory (CursorHolderFactory prior to this change) have been moved into interfaces which are retrieved by Segment.as, the primary classes are the previously existing Metadata, as well as new interfaces PhysicalSegmentInspector and TopNOptimizationInspector
* added UnnestSegment and FilteredSegment that extend WrappedSegmentReference since their StorageAdapter implementations were previously provided by WrappedSegmentReference
* added PhysicalSegmentInspector which covers some of the previous StorageAdapter functionality which was primarily used for segment metadata queries and other metadata uses, and is implemented for QueryableIndexSegment and IncrementalIndexSegment
* added TopNOptimizationInspector to cover the oddly specific StorageAdapter.hasBuiltInFilters implementation, which is implemented for HashJoinSegment, UnnestSegment, and FilteredSegment
* Updated all engines and tests to no longer use StorageAdapter
This commit aims to reject MVDs in window processing as we do not support them.
Earlier to this commit, query running a window aggregate partitioned by an MVD column would fail with ClassCastException
This patch adds "TypeCastSelectors", which is used when writing frames to
perform two coercions:
- When a numeric type is desired and the underlying type is non-numeric or
unknown, the underlying selector is wrapped, "getObject" is called and the
result is coerced using "ExprEval.ofType". This differs from the prior
behavior where the primitive methods like "getLong", "getDouble", etc, would
be called directly. This fixes an issue where a column would be read as
all-zeroes when its SQL type is numeric and its physical type is string, which
can happen when evolving a column's type from string to number.
- When an array type is desired, the underlying selector is wrapped,
"getObject" is called, and the result is coerced to Object[]. This coercion
replaces some earlier logic from #15917.
Description:
#16768 introduces new compaction APIs on the Overlord `/compact/status` and `/compact/progress`.
But the corresponding `OverlordClient` methods do not return an object compatible with the actual
endpoints defined in `OverlordCompactionResource`.
This patch ensures that the objects are compatible.
Changes:
- Add `CompactionStatusResponse` and `CompactionProgressResponse`
- Use these as the return type in `OverlordClient` methods and as the response entity in `OverlordCompactionResource`
- Add `SupervisorCleanupModule` bound on the Coordinator to perform cleanup of supervisors.
Without this module, Coordinator cannot deserialize compaction supervisors.
* MSQ: Add limitHint to global-sort shuffles.
This allows pushing down limits into the SuperSorter.
* Test fixes.
* Add limitSpec to ScanQueryKit. Fix SuperSorter tracking.
Description
-----------
Auto-compaction currently poses several challenges as it:
1. may get stuck on a failing interval.
2. may get stuck on the latest interval if more data keeps coming into it.
3. always picks the latest interval regardless of the level of compaction in it.
4. may never pick a datasource if its intervals are not very recent.
5. requires setting an explicit period which does not cater to the changing needs of a Druid cluster.
This PR introduces various improvements to compaction scheduling to tackle the above problems.
Change Summary
--------------
1. Run compaction for a datasource as a supervisor of type `autocompact` on Overlord.
2. Make compaction policy extensible and configurable.
3. Track status of recently submitted compaction tasks and pass this info to policy.
4. Add `/simulate` API on both Coordinator and Overlord to run compaction simulations.
5. Redirect compaction status APIs to the Overlord when compaction supervisors are enabled.
* Make IntelliJ's MethodIsIdenticalToSuperMethod an error
* Change codebase to follow new IntelliJ inspection
* Restore non-short-circuit boolean expressions to pass tests
* MSQ: Add CPU and thread usage counters.
The main change adds "cpu" and "wall" counters. The "cpu" counter measures
CPU time (using JvmUtils.getCurrentThreadCpuTime) taken up by processors
in processing threads. The "wall" counter measures the amount of wall time
taken up by processors in those same processing threads. Both counters are
broken down by type of processor.
This patch also includes changes to support adding new counters. Due to an
oversight in the original design, older deserializers are not forwards-compatible;
they throw errors when encountering an unknown counter type. To manage this,
the following changes are made:
1) The defaultImpl NilQueryCounterSnapshot is added to QueryCounterSnapshot's
deserialization configuration. This means that any unrecognized counter types
will be read as "nil" by deserializers. Going forward, once all servers are
on the latest code, this is enough to enable easily adding new counters.
2) A new context parameter "includeAllCounters" is added, which defaults to "false".
When this parameter is set "false", only legacy counters are included. When set
to "true", all counters are included. This is currently undocumented. In a future
version, we should set the default to "true", and at that time, include a release
note that people updating from versions prior to Druid 31 should set this to
"false" until their upgrade is complete.
* Style, coverage.
* Fix.
Changes:
- Simplify exception handling in `CryptoService` by just catching a `Exception`
- Throw a `DruidException` as the exception is user facing
- Log the exception for easier debugging
- Add a test to verify thrown exception
Currently, if we have a query with window function having PARTITION BY xyz, and we have a million unique values for xyz each having 1 row, we'd end up creating a million individual RACs for processing, each having a single row. This is unnecessary, and we can batch the PARTITION BY keys together for processing, and process them only when we can't batch further rows to adhere to maxRowsMaterialized config.
The previous iteration of this PR was simplifying WindowOperatorQueryFrameProcessor to run all operators on all the rows instead of creating smaller RACs per partition by key. That approach was discarded in favor of the batching approach, and the details are summarized here: #16823 (comment).
changes:
* Adds new `CompressedComplexColumn`, `CompressedComplexColumnSerializer`, `CompressedComplexColumnSupplier` based on `CompressedVariableSizedBlobColumn` used by JSON columns
* Adds `IndexSpec.complexMetricCompression` which can be used to specify compression for the generic compressed complex column. Defaults to uncompressed because compressed columns are not backwards compatible.
* Adds new definition of `ComplexMetricSerde.getSerializer` which accepts an `IndexSpec` argument when creating a serializer. The old signature has been marked `@Deprecated` and has a default implementation that returns `null`, but it will be used by the default implementation of the new version if it is implemented to return a non-null value. The default implementation of the new method will use a `CompressedComplexColumnSerializer` if `IndexSpec.complexMetricCompression` is not null/none/uncompressed, or will use `LargeColumnSupportedComplexColumnSerializer` otherwise.
* Removed all duplicate generic implementations of `ComplexMetricSerde.getSerializer` and `ComplexMetricSerde.deserializeColumn` into default implementations `ComplexMetricSerde` instead of being copied all over the place. The default implementation of `deserializeColumn` will check if the first byte indicates that the new compression was used, otherwise will use the `GenericIndexed` based supplier.
* Complex columns with custom serializers/deserializers are unaffected and may continue doing whatever it is they do, either with specialized compression or whatever else, this new stuff is just to provide generic implementations built around `ObjectStrategy`.
* add ObjectStrategy.readRetainsBufferReference so CompressedComplexColumn only copies on read if required
* add copyValueOnRead flag down to CompressedBlockReader to avoid buffer duplicate if the value needs copied anyway
* MSQ: Fix validation of time position in collations.
It is possible for the collation to refer to a field that isn't mapped,
such as when the DML includes "CLUSTERED BY some_function(some_field)".
In this case, the collation refers to a projected column that is not
part of the field mappings. Prior to this patch, that would lead to an
out of bounds list access on fieldMappings.
This patch fixes the problem by identifying the position of __time in
the fieldMappings first, rather than retrieving each collation field
from fieldMappings.
Fixes a bug introduced in #16849.
* Fix test. Better warning message.
* Place __time in signatures according to sort order.
Updates a variety of places to put __time in row signatures according
to its position in the sort order, rather than always first, including:
- InputSourceSampler.
- ScanQueryEngine (in the default signature when "columns" is empty).
- Various StorageAdapters, which also have the effect of reordering
the column order in segmentMetadata queries, and therefore in SQL
schemas as well.
Follow-up to #16849.
* Fix compilation.
* Additional fixes.
* Fix.
* Fix style.
* Omit nonexistent columns from the row signature.
* Fix tests.
* Segments primarily sorted by non-time columns.
Currently, segments are always sorted by __time, followed by the sort
order provided by the user via dimensionsSpec or CLUSTERED BY. Sorting
by __time enables efficient execution of queries involving time-ordering
or granularity. Time-ordering is a simple matter of reading the rows in
stored order, and granular cursors can be generated in streaming fashion.
However, for various workloads, it's better for storage footprint and
query performance to sort by arbitrary orders that do not start with __time.
With this patch, users can sort segments by such orders.
For spec-based ingestion, users add "useExplicitSegmentSortOrder: true" to
dimensionsSpec. The "dimensions" list determines the sort order. To
define a sort order that includes "__time", users explicitly
include a dimension named "__time".
For SQL-based ingestion, users set the context parameter
"useExplicitSegmentSortOrder: true". The CLUSTERED BY clause is then
used as the explicit segment sort order.
In both cases, when the new "useExplicitSegmentSortOrder" parameter is
false (the default), __time is implicitly prepended to the sort order,
as it always was prior to this patch.
The new parameter is experimental for two main reasons. First, such
segments can cause errors when loaded by older servers, due to violating
their expectations that timestamps are always monotonically increasing.
Second, even on newer servers, not all queries can run on non-time-sorted
segments. Scan queries involving time-ordering and any query involving
granularity will not run. (To partially mitigate this, a currently-undocumented
SQL feature "sqlUseGranularity" is provided. When set to false the SQL planner
avoids using "granularity".)
Changes on the write path:
1) DimensionsSpec can now optionally contain a __time dimension, which
controls the placement of __time in the sort order. If not present,
__time is considered to be first in the sort order, as it has always
been.
2) IncrementalIndex and IndexMerger are updated to sort facts more
flexibly; not always by time first.
3) Metadata (stored in metadata.drd) gains a "sortOrder" field.
4) MSQ can generate range-based shard specs even when not all columns are
singly-valued strings. It merely stops accepting new clustering key
fields when it encounters the first one that isn't a singly-valued
string. This is useful because it enables range shard specs on
"someDim" to be created for clauses like "CLUSTERED BY someDim, __time".
Changes on the read path:
1) Add StorageAdapter#getSortOrder so query engines can tell how a
segment is sorted.
2) Update QueryableIndexStorageAdapter, IncrementalIndexStorageAdapter,
and VectorCursorGranularizer to throw errors when using granularities
on non-time-ordered segments.
3) Update ScanQueryEngine to throw an error when using the time-ordering
"order" parameter on non-time-ordered segments.
4) Update TimeBoundaryQueryRunnerFactory to perform a segment scan when
running on a non-time-ordered segment.
5) Add "sqlUseGranularity" context parameter that causes the SQL planner
to avoid using granularities other than ALL.
Other changes:
1) Rename DimensionsSpec "hasCustomDimensions" to "hasFixedDimensions"
and change the meaning subtly: it now returns true if the DimensionsSpec
represents an unchanging list of dimensions, or false if there is
some discovery happening. This is what call sites had expected anyway.
* Fixups from CI.
* Fixes.
* Fix missing arg.
* Additional changes.
* Fix logic.
* Fixes.
* Fix test.
* Adjust test.
* Remove throws.
* Fix styles.
* Fix javadocs.
* Cleanup.
* Smoother handling of null ordering.
* Fix tests.
* Missed a spot on the merge.
* Fixups.
* Avoid needless Filters.and.
* Add timeBoundaryInspector to test.
* Fix tests.
* Fix FrameStorageAdapterTest.
* Fix various tests.
* Use forceSegmentSortByTime instead of useExplicitSegmentSortOrder.
* Pom fix.
* Fix doc.
This PR generally improves the working of WriteOutBytes and WriteOutMedium. Some analysis of usage of TmpFileSegmentWriteOutMedium shows that they periodically get used for very small things. The overhead of creating a tmp file is actually very large. To improve the performance in these cases, this PR modifies TmpFileSegmentWriteOutMedium to return a heap-based WriteOutBytes that falls back to making a tmp file when it actually fills up.
---------
Co-authored-by: imply-cheddar <eric.tschetter@imply.io>
* Add type coercion and null check to left, right, repeat exprs.
These exprs shouldn't validate types; they should coerce types. Coercion
is typical behavior for functions because it enables schema evolution.
The functions are also modified to check isNumericNull on the right-hand
argument. This was missing previously, which would erroneously cause
nulls to be treated as zeroes.
* Fix tests.
The specific error on a truncated file can vary based on how the final
frame of the truncated file is written. This patch loosens the check so
it passes regardless of how the truncated file is written.
changes:
* Added `CursorBuildSpec` which captures all of the 'interesting' stuff that goes into producing a cursor as a replacement for the method arguments of `CursorFactory.canVectorize`, `CursorFactory.makeCursor`, and `CursorFactory.makeVectorCursor`
* added new interface `CursorHolder` and new interface `CursorHolderFactory` as a replacement for `CursorFactory`, with method `makeCursorHolder`, which takes a `CursorBuildSpec` as an argument and replaces `CursorFactory.canVectorize`, `CursorFactory.makeCursor`, and `CursorFactory.makeVectorCursor`
* `CursorFactory.makeCursors` previously returned a `Sequence<Cursor>` corresponding to the query granularity buckets, with a separate `Cursor` per bucket. `CursorHolder.asCursor` instead returns a single `Cursor` (equivalent to 'ALL' granularity), and a new `CursorGranularizer` has been added for query engines to iterate over the cursor and divide into granularity buckets. This makes the non-vectorized engine behave the same way as the vectorized query engine (with its `VectorCursorGranularizer`), and simplifies a lot of stuff that has to read segments particularly if it does not care about bucketing the results into granularities.
* Deprecated `CursorFactory`, `CursorFactory.canVectorize`, `CursorFactory.makeCursors`, and `CursorFactory.makeVectorCursor`
* updated all `StorageAdapter` implementations to implement `makeCursorHolder`, transitioned direct `CursorFactory` implementations to instead implement `CursorMakerFactory`. `StorageAdapter` being a `CursorMakerFactory` is intended to be a transitional thing, ideally will not be released in favor of moving `CursorMakerFactory` to be fetched directly from `Segment`, however this PR was already large enough so this will be done in a follow-up.
* updated all query engines to use `makeCursorHolder`, granularity based engines to use `CursorGranularizer`.
Two performance enhancements:
1) Direct merging of input frames to output channels, without any
temporary files, if all input frames fit in memory.
2) When doing multi-level merging (now called "external mode"),
improve parallelism by boosting up the number of mergers in the
penultimate level.
To support direct merging, FrameChannelMerger is enhanced such that the
output partition min/max values are used to filter input frames. This
is necessary because all direct mergers read all input frames, but only
rows corresponding to a single output partition.
Some general refactors across Druid.
Switch to DruidExceptions
Add javadocs
Fix a bug in IntArrayColumns
Add a class for LongArrayColumns
Remove wireTransferable since it would never be called
Refactor DictionaryWriter to return the index written as a return value from write.
Refactors the SemanticCreator annotation.
Moves the interface to the semantic package.
Create a SemanticUtils to hold logic for storing semantic maps.
Add FrameMaker interface.
This PR fixes query correctness issues for MSQ window functions when using more than 1 worker (that is, maxNumTasks > 2).
Currently, we were keeping the shuffle spec of the previous stage when we didn't have any partition columns for window stage. This PR changes it to override the shuffle spec of the previous stage to MixShuffleSpec (if we have a window function with empty over clause) so that the window stage gets a single partition to work on.
A test has been added for a query which returned incorrect results prior to this change when using more than 1 workers.
This patch introduces an optional cluster configuration, druid.indexing.formats.stringMultiValueHandlingMode, allowing operators to override the default mode SORTED_SET for string dimensions. The possible values for the config are SORTED_SET, SORTED_ARRAY, or ARRAY (SORTED_SET is the default). Case insensitive values are allowed.
While this cluster property allows users to manage the multi-value handling mode for string dimension types, it's recommended to migrate to using real array types instead of MVDs.
This fixes a long-standing issue where compaction will honor the configured cluster wide property instead of rewriting it as the default SORTED_ARRAY always, even if the data was originally ingested with ARRAY or SORTED_SET.
* MSQ worker: Support in-memory shuffles.
This patch is a follow-up to #16168, adding worker-side support for
in-memory shuffles. Changes include:
1) Worker-side code now respects the same context parameter "maxConcurrentStages"
that was added to the controller in #16168. The parameter remains undocumented
for now, to give us a chance to more fully develop and test this functionality.
1) WorkerImpl is broken up into WorkerImpl, RunWorkOrder, and RunWorkOrderListener
to improve readability.
2) WorkerImpl has a new StageOutputHolder + StageOutputReader concept, which
abstract over memory-based or file-based stage results.
3) RunWorkOrder is updated to create in-memory stage output channels when
instructed to.
4) ControllerResource is updated to add /doneReadingInput/, so the controller
can tell when workers that sort, but do not gather statistics, are done reading
their inputs.
5) WorkerMemoryParameters is updated to consider maxConcurrentStages.
Additionally, WorkerChatHandler is split into WorkerResource, so as to match
ControllerChatHandler and ControllerResource.
* Updates for static checks, test coverage.
* Fixes.
* Remove exception.
* Changes from review.
* Address static check.
* Changes from review.
* Improvements to docs and method names.
* Update comments, add test.
* Additional javadocs.
* Fix throws.
* Fix worker stopping in tests.
* Fix stuck test.
* Round-robin iterator for datasources to kill.
Currently there's a fairness problem in the KillUnusedSegments duty
where the duty consistently selects the same set of datasources as discovered
from the metadata store or dynamic config params. This is a problem especially
when there are multiple unused. In a medium to large cluster, while we can increase
the task slots to increase the likelihood of broader coverage. This patch adds a simple
round-robin iterator to select datasources and has the following properties:
1. Starts with an initial random cursor position in an ordered list of candidates.
2. Consecutive {@code next()} iterations from {@link #getIterator()} are guaranteed to be deterministic
unless the set of candidates change when {@link #updateCandidates(Set)} is called.
3. Guarantees that no duplicate candidates are returned in two consecutive {@code next()} iterations.
* Renames in RoundRobinIteratorTest.
* Address review comments.
1. Clarify javadocs on the ordered list. Also flesh out the details a bit more.
2. Rename the test hooks to make intent clearer and fix typo.
3. Add NotThreadSafe annotation.
4. Remove one potentially noisy log that's in the path of iteration.
* Add null check to input candidates.
* More commentary.
* Addres review feedback: downgrade some new info logs to debug; invert condition.
Remove redundant comments.
Remove rendundant variable tracking.
* CircularList adjustments.
* Updates to CircularList and cleanup RoundRobinInterator.
* One more case and add more tests.
* Make advanceCursor private for now.
* Review comments.
* Coerce COMPLEX to number in numeric aggregators.
PR #15371 eliminated ObjectColumnSelector's built-in implementations of
numeric methods, which had been marked deprecated.
However, some complex types, like SpectatorHistogram, can be successfully coerced
to number. The documentation for spectator histograms encourages taking advantage of
this by aggregating complex columns with doubleSum and longSum. Currently, this
doesn't work properly for IncrementalIndex, where the behavior relied on those
deprecated ObjectColumnSelector methods.
This patch fixes the behavior by making two changes:
1) SimpleXYZAggregatorFactory (XYZ = type; base class for simple numeric aggregators;
all of these extend NullableNumericAggregatorFactory) use getObject for STRING
and COMPLEX. Previously, getObject was only used for STRING.
2) NullableNumericAggregatorFactory (base class for simple numeric aggregators)
has a new protected method "useGetObject". This allows the base class to
correctly check for null (using getObject or isNull).
The patch also adds a test for SpectatorHistogram + doubleSum + IncrementalIndex.
* Fix tests.
* Remove the special ColumnValueSelector.
* Add test.
* HashJoinEngine: Check for interruptions while walking left cursor.
Previously, the engine only checked for interruptions between emitting
joined rows. In scenarios where large numbers of left rows are skipped
completely (such as a highly selective INNER JOIN) this led to the
join cursor being insufficiently responsive to cancellation.
* Coverage.
Changes the WindowFrame internals / representation a bit; introduces dedicated frametypes for rows and groups which corresponds to the implemented processing methods
* more aggressive cancellation of broker parallel merge, more chill blocking queue timeouts
* wire parallel merge into query cancellation system
* oops
* style
* adjust metrics initialization
* fix timeout, fix cleanup to not block
* javadocs to clarify why cancellation future and gizmo are split
* cancelled -> canceled, simplify QueuePusher since it always takes a ResultBatch, non-static terminal marker to make stuff stop complaining about types, specialize tryOffer to be tryOfferTerminal so it wont be misused, add comments to clarify reason for non-blocking offers that might fail
For aggregators like StringFirst/Last, whose intermediate type isn't the same as the final type, using them in GroupBy, TopN or Timeseries subqueries causes a fallback when maxSubqueryBytes is set. This is because we assume that the finalization is not known, due to which the row signature cannot determine whether to use the intermediate or the final type, and it puts it as null. This PR figures out the finalization from the query context and uses the intermediate or the final type appropriately.
changes:
* moves value column serializer initialization, call to `writeValue` method to `GlobalDictionaryEncodedFieldColumnWriter.writeTo` instead of during `GlobalDictionaryEncodedFieldColumnWriter.addValue`. This shift means these numeric value columns are now done in the per field section that happens after serializing the nested column raw data, so only a single compression buffer and temp file will be needed at a time instead of the total number of nested literal fields present in the column. This should be especially helpful for complicated nested structures with thousands of columns as even those 64k compression buffers can add up pretty quickly to a sizeable chunk of direct memory.
changes:
* removes `druid.indexer.task.batchProcessingMode` in favor of always using `CLOSED_SEGMENT_SINKS` which uses `BatchAppenderator`. This was intended to become the default for native batch, but that was missed so `CLOSED_SEGMENTS` was the default (using `AppenderatorImpl`), however MSQ has been exclusively using `BatchAppenderator` with no problems so it seems safe to just roll it out as the only option for batch ingestion everywhere.
* with `batchProcessingMode` gone, there is no use for `AppenderatorImpl` so it has been removed
* implify `Appenderator` construction since there are only separate stream and batch versions now
* simplify tests since `batchProcessingMode` is gone
This PR aims to check if the complex column being queried aligns with the supported types in the aggregator and aggregator factories, and throws a user-friendly error message if they don't.
* Throw exception if DISTINCT used with window functions aggregate call
* Improve error message when unsupported aggregations are used with window functions
changes:
* removed `Firehose` and `FirehoseFactory` and remaining implementations which were mostly no longer used after #16602
* Moved `IngestSegmentFirehose` which was still used internally by Hadoop ingestion to `DatasourceRecordReader.SegmentReader`
* Rename `SQLFirehoseFactoryDatabaseConnector` to `SQLInputSourceDatabaseConnector` and similar renames for sub-classes
* Moved anything remaining in a 'firehose' package somewhere else
* Clean up docs on firehose stuff