* Support Framing for Window Aggregations
This adds support for framing over ROWS
for window aggregations.
Still not implemented as yet:
1. RANGE frames
2. Multiple different frames in the same query
3. Frames on last/first functions
This commit adds a new class `InputStats` to track the total bytes processed by a task.
The field `processedBytes` is published in task reports along with other row stats.
Major changes:
- Add class `InputStats` to track processed bytes
- Add method `InputSourceReader.read(InputStats)` to read input rows while counting bytes.
> Since we need to count the bytes, we could not just have a wrapper around `InputSourceReader` or `InputEntityReader` (the way `CountableInputSourceReader` does) because the `InputSourceReader` only deals with `InputRow`s and the byte information is already lost.
- Classic batch: Use the new `InputSourceReader.read(inputStats)` in `AbstractBatchIndexTask`
- Streaming: Increment `processedBytes` in `StreamChunkParser`. This does not use the new `InputSourceReader.read(inputStats)` method.
- Extend `InputStats` with `RowIngestionMeters` so that bytes can be exposed in task reports
Other changes:
- Update tests to verify the value of `processedBytes`
- Rename `MutableRowIngestionMeters` to `SimpleRowIngestionMeters` and remove duplicate class
- Replace `CacheTestSegmentCacheManager` with `NoopSegmentCacheManager`
- Refactor `KafkaIndexTaskTest` and `KinesisIndexTaskTest`
Refactor DataSource to have a getAnalysis method()
This removes various parts of the code where while loops and instanceof
checks were being used to walk through the structure of DataSource objects
in order to build a DataSourceAnalysis. Instead we just ask the DataSource
for its analysis and allow the stack to rebuild whatever structure existed.
* Zero-copy local deep storage.
This is useful for local deep storage, since it reduces disk usage and
makes Historicals able to load segments instantaneously.
Two changes:
1) Introduce "druid.storage.zip" parameter for local storage, which defaults
to false. This changes default behavior from writing an index.zip to writing
a regular directory. This is safe to do even during a rolling update, because
the older code actually already handled unzipped directories being present
on local deep storage.
2) In LocalDataSegmentPuller and LocalDataSegmentPusher, use hard links
instead of copies when possible. (Generally this is possible when the
source and destination directory are on the same filesystem.)
Changes:
- Limit max batch size in `SegmentAllocationQueue` to 500
- Rename `batchAllocationMaxWaitTime` to `batchAllocationWaitTime` since the actual
wait time may exceed this configured value.
- Replace usage of `SegmentInsertAction` in `TaskToolbox` with `SegmentTransactionalInsertAction`
In a cluster with a large number of streaming tasks (~1000), SegmentAllocateActions
on the overlord can often take very long intervals of time to finish thus causing spikes
in the `task/action/run/time`. This may result in lag building up while a task waits for a
segment to get allocated.
The root causes are:
- large number of metadata calls made to the segments and pending segments tables
- `giant` lock held in `TaskLockbox.tryLock()` to acquire task locks and allocate segments
Since the contention typically arises when several tasks of the same datasource try
to allocate segments for the same interval/granularity, the allocation run times can be
improved by batching the requests together.
Changes
- Add flags
- `druid.indexer.tasklock.batchSegmentAllocation` (default `false`)
- `druid.indexer.tasklock.batchAllocationMaxWaitTime` (in millis) (default `1000`)
- Add methods `canPerformAsync` and `performAsync` to `TaskAction`
- Submit each allocate action to a `SegmentAllocationQueue`, and add to correct batch
- Process batch after `batchAllocationMaxWaitTime`
- Acquire `giant` lock just once per batch in `TaskLockbox`
- Reduce metadata calls by batching statements together and updating query filters
- Except for batching, retain the whole behaviour (order of steps, retries, etc.)
- Respond to leadership changes and fail items in queue when not leader
- Emit batch and request level metrics
* fixes BlockLayoutColumnarLongs close method to nullify internal buffer.
* fixes other BlockLayoutColumnar supplier close methods to nullify internal buffers.
* fix spotbugs
Main changes:
1) Convert SeekableStreamIndexTaskClient to an interface, move old code
to SeekableStreamIndexTaskClientSyncImpl, and add new implementation
SeekableStreamIndexTaskClientAsyncImpl that uses ServiceClient.
2) Add "chatAsync" parameter to seekable stream supervisors that causes
the supervisor to use an async task client.
3) In SeekableStreamSupervisor.discoverTasks, adjust logic to avoid making
blocking RPC calls in workerExec threads.
4) In SeekableStreamSupervisor generally, switch from Futures.successfulAsList
to FutureUtils.coalesce, so we can better capture the errors that occurred
with contacting individual tasks.
Other, related changes:
1) Add ServiceRetryPolicy.retryNotAvailable, which controls whether
ServiceClient retries unavailable services. Useful since we do not
want to retry calls unavailable tasks within the service client. (The
supervisor does its own higher-level retries.)
2) Add FutureUtils.transformAsync, a more lambda friendly version of
Futures.transform(f, AsyncFunction).
3) Add FutureUtils.coalesce. Similar to Futures.successfulAsList, but
returns Either instead of using null on error.
4) Add JacksonUtils.readValue overloads for JavaType and TypeReference.
Currently, a shared lock is acquired only when all other locks are also shared locks.
This commit updates the behaviour and acquires a shared lock only if all locks
of equal or higher priority are either shared locks or are already revoked.
The lock type of locks with lower priority does not matter as they can be revoked.
Eliminates two common sources of noise with Kafka supervisors that have
large numbers of tasks and partitions:
1) Log the report at DEBUG rather than INFO level at each run cycle.
It can get quite large, and can be retrieved via API when needed.
2) Use log4j2.xml to quiet down the org.apache.kafka.clients.consumer.internals
package. Avoids a log message per-partition per-minute as part of seeking
to the latest offset in the reporting thread. In the tasks, where this
sort of logging might be more useful, we have another log message with
the same information: "Seeking partition[%s] to[%s]".
* SeekableStreamSupervisor: Don't enqueue duplicate notices.
Similar goal to #12018, but more aggressive. Don't enqueue a notice at
all if it is equal to one currently in the queue.
* Adjustments from review.
* Update indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/NoticesQueueTest.java
Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
Changes:
- Add a metric for partition-wise kafka/kinesis lag for streaming ingestion.
- Emit lag metrics for streaming ingestion when supervisor is not suspended and state is in {RUNNING, IDLE, UNHEALTHY_TASKS, UNHEALTHY_SUPERVISOR}
- Document metrics
* Compaction: Fetch segments one at a time on main task; skip when possible.
Compact tasks include the ability to fetch existing segments and determine
reasonable defaults for granularitySpec, dimensionsSpec, and metricsSpec.
This is a useful feature that makes compact tasks work well even when the
user running the compaction does not have a clear idea of what they want
the compacted segments to be like.
However, this comes at a cost: it takes time, and disk space, to do all
of these fetches. This patch improves the situation in two ways:
1) When segments do need to be fetched, download them one at a time and
delete them when we're done. This still takes time, but minimizes the
required disk space.
2) Don't fetch segments on the main compact task when they aren't needed.
If the user provides a full granularitySpec, dimensionsSpec, and
metricsSpec, we can skip it.
* Adjustments.
* Changes from code review.
* Fix logic for determining rollup.
* Support for middle manager less druid, tasks launch as k8s jobs
* Fixing forking task runner test
* Test cleanup, dependency cleanup, intellij inspections cleanup
* Changes per PR review
Add configuration option to disable http/https proxy for the k8s client
Update the docs to provide more detail about sidecar support
* Removing un-needed log lines
* Small changes per PR review
* Upon task completion we callback to the overlord to update the status / locaiton, for slower k8s clusters, this reduces locking time significantly
* Merge conflict fix
* Fixing tests and docs
* update tiny-cluster.yaml
changed `enableTaskLevelLogPush` to `encapsulatedTask`
* Apply suggestions from code review
Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
* Minor changes per PR request
* Cleanup, adding test to AbstractTask
* Add comment in peon.sh
* Bumping code coverage
* More tests to make code coverage happy
* Doh a duplicate dependnecy
* Integration test setup is weird for k8s, will do this in a different PR
* Reverting back all integration test changes, will do in anotbher PR
* use StringUtils.base64 instead of Base64
* Jdk is nasty, if i compress in jdk 11 in jdk 17 the decompressed result is different
Co-authored-by: Rahul Gidwani <r_gidwani@apple.com>
Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
In clusters with a large number of segments, the duty `MarkAsUnusedOvershadowedSegments`
can take a long very long time to finish. This is because of the costly invocation of
`timeline.isOvershadowed` which is done for every used segment in every coordinator run.
Changes
- Use `DataSourceSnapshot.getOvershadowedSegments` to get all overshadowed segments
- Iterate over this set instead of all used segments to identify segments that can be marked as unused
- Mark segments as unused in the DB in batches rather than one at a time
- Refactor: Add class `SegmentTimeline` for ease of use and readability while using a
`VersionedIntervalTimeline` of segments.
* Remove basePersistDirectory from tuning configs.
Since the removal of CliRealtime, it serves no purpose, since it is
always overridden in production using withBasePersistDirectory given
some subdirectory of the task work directory.
Removing this from the tuning config has a benefit beyond removing
no-longer-needed logic: it also avoids the side effect of empty
"druid-realtime-persist" directories getting created in the systemwide
temp directory.
* Test adjustments to appropriately set basePersistDirectory.
* Remove unused import.
* Fix RATC constructor.
Overlord leader election can sometimes fail due to task lock re-acquisition issues.
This commit solves the issue by failing such tasks and clearing all their locks.
This commit fixes issues with delayed supervisor termination during certain transient states.
Tasks can be created during supervisor termination and left behind since the cleanup may
not consider these newly added tasks.
#12178 added a lock for the entire process of task creation to prevent such dangling tasks.
But it also introduced a deadlock scenario as follows:
- An invocation of `runInternal` is in progress.
- A `stop` request comes, acquires `stateChangeLock` and submit a `ShutdownNotice`
- `runInternal` keeps waiting to acquire the `stateChangeLock`
- `ShutdownNotice` remains stuck in the notice queue because `runInternal` is still running
- After some timeout, the supervisor goes through a forced termination
Fix:
* `SeekableStreamSupervisor.runInternal` - do not try to acquire lock if supervisor is already stopping
* `SupervisorStateManager.maybeSetState` - do not allow transitions from STOPPING state
* Fixing RACE in HTTP remote task Runner
* Changes in the interface
* Updating documentation
* Adding test cases to SwitchingTaskLogStreamer
* Adding more tests
* Fix serialization in TaskReportFileWriters.
For some reason, serializing a Map<String, TaskReport> would omit the
"type" field. Explicitly sending each value through the ObjectMapper
fixes this, because the type information does not get lost.
* Fixes for static analysis.