In a cluster with a large number of streaming tasks (~1000), SegmentAllocateActions
on the overlord can often take very long intervals of time to finish thus causing spikes
in the `task/action/run/time`. This may result in lag building up while a task waits for a
segment to get allocated.
The root causes are:
- large number of metadata calls made to the segments and pending segments tables
- `giant` lock held in `TaskLockbox.tryLock()` to acquire task locks and allocate segments
Since the contention typically arises when several tasks of the same datasource try
to allocate segments for the same interval/granularity, the allocation run times can be
improved by batching the requests together.
Changes
- Add flags
- `druid.indexer.tasklock.batchSegmentAllocation` (default `false`)
- `druid.indexer.tasklock.batchAllocationMaxWaitTime` (in millis) (default `1000`)
- Add methods `canPerformAsync` and `performAsync` to `TaskAction`
- Submit each allocate action to a `SegmentAllocationQueue`, and add to correct batch
- Process batch after `batchAllocationMaxWaitTime`
- Acquire `giant` lock just once per batch in `TaskLockbox`
- Reduce metadata calls by batching statements together and updating query filters
- Except for batching, retain the whole behaviour (order of steps, retries, etc.)
- Respond to leadership changes and fail items in queue when not leader
- Emit batch and request level metrics
* fixes BlockLayoutColumnarLongs close method to nullify internal buffer.
* fixes other BlockLayoutColumnar supplier close methods to nullify internal buffers.
* fix spotbugs
Main changes:
1) Convert SeekableStreamIndexTaskClient to an interface, move old code
to SeekableStreamIndexTaskClientSyncImpl, and add new implementation
SeekableStreamIndexTaskClientAsyncImpl that uses ServiceClient.
2) Add "chatAsync" parameter to seekable stream supervisors that causes
the supervisor to use an async task client.
3) In SeekableStreamSupervisor.discoverTasks, adjust logic to avoid making
blocking RPC calls in workerExec threads.
4) In SeekableStreamSupervisor generally, switch from Futures.successfulAsList
to FutureUtils.coalesce, so we can better capture the errors that occurred
with contacting individual tasks.
Other, related changes:
1) Add ServiceRetryPolicy.retryNotAvailable, which controls whether
ServiceClient retries unavailable services. Useful since we do not
want to retry calls unavailable tasks within the service client. (The
supervisor does its own higher-level retries.)
2) Add FutureUtils.transformAsync, a more lambda friendly version of
Futures.transform(f, AsyncFunction).
3) Add FutureUtils.coalesce. Similar to Futures.successfulAsList, but
returns Either instead of using null on error.
4) Add JacksonUtils.readValue overloads for JavaType and TypeReference.
Currently, a shared lock is acquired only when all other locks are also shared locks.
This commit updates the behaviour and acquires a shared lock only if all locks
of equal or higher priority are either shared locks or are already revoked.
The lock type of locks with lower priority does not matter as they can be revoked.
Eliminates two common sources of noise with Kafka supervisors that have
large numbers of tasks and partitions:
1) Log the report at DEBUG rather than INFO level at each run cycle.
It can get quite large, and can be retrieved via API when needed.
2) Use log4j2.xml to quiet down the org.apache.kafka.clients.consumer.internals
package. Avoids a log message per-partition per-minute as part of seeking
to the latest offset in the reporting thread. In the tasks, where this
sort of logging might be more useful, we have another log message with
the same information: "Seeking partition[%s] to[%s]".
* SeekableStreamSupervisor: Don't enqueue duplicate notices.
Similar goal to #12018, but more aggressive. Don't enqueue a notice at
all if it is equal to one currently in the queue.
* Adjustments from review.
* Update indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/NoticesQueueTest.java
Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
Changes:
- Add a metric for partition-wise kafka/kinesis lag for streaming ingestion.
- Emit lag metrics for streaming ingestion when supervisor is not suspended and state is in {RUNNING, IDLE, UNHEALTHY_TASKS, UNHEALTHY_SUPERVISOR}
- Document metrics
* Compaction: Fetch segments one at a time on main task; skip when possible.
Compact tasks include the ability to fetch existing segments and determine
reasonable defaults for granularitySpec, dimensionsSpec, and metricsSpec.
This is a useful feature that makes compact tasks work well even when the
user running the compaction does not have a clear idea of what they want
the compacted segments to be like.
However, this comes at a cost: it takes time, and disk space, to do all
of these fetches. This patch improves the situation in two ways:
1) When segments do need to be fetched, download them one at a time and
delete them when we're done. This still takes time, but minimizes the
required disk space.
2) Don't fetch segments on the main compact task when they aren't needed.
If the user provides a full granularitySpec, dimensionsSpec, and
metricsSpec, we can skip it.
* Adjustments.
* Changes from code review.
* Fix logic for determining rollup.
* Support for middle manager less druid, tasks launch as k8s jobs
* Fixing forking task runner test
* Test cleanup, dependency cleanup, intellij inspections cleanup
* Changes per PR review
Add configuration option to disable http/https proxy for the k8s client
Update the docs to provide more detail about sidecar support
* Removing un-needed log lines
* Small changes per PR review
* Upon task completion we callback to the overlord to update the status / locaiton, for slower k8s clusters, this reduces locking time significantly
* Merge conflict fix
* Fixing tests and docs
* update tiny-cluster.yaml
changed `enableTaskLevelLogPush` to `encapsulatedTask`
* Apply suggestions from code review
Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
* Minor changes per PR request
* Cleanup, adding test to AbstractTask
* Add comment in peon.sh
* Bumping code coverage
* More tests to make code coverage happy
* Doh a duplicate dependnecy
* Integration test setup is weird for k8s, will do this in a different PR
* Reverting back all integration test changes, will do in anotbher PR
* use StringUtils.base64 instead of Base64
* Jdk is nasty, if i compress in jdk 11 in jdk 17 the decompressed result is different
Co-authored-by: Rahul Gidwani <r_gidwani@apple.com>
Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
In clusters with a large number of segments, the duty `MarkAsUnusedOvershadowedSegments`
can take a long very long time to finish. This is because of the costly invocation of
`timeline.isOvershadowed` which is done for every used segment in every coordinator run.
Changes
- Use `DataSourceSnapshot.getOvershadowedSegments` to get all overshadowed segments
- Iterate over this set instead of all used segments to identify segments that can be marked as unused
- Mark segments as unused in the DB in batches rather than one at a time
- Refactor: Add class `SegmentTimeline` for ease of use and readability while using a
`VersionedIntervalTimeline` of segments.
* Remove basePersistDirectory from tuning configs.
Since the removal of CliRealtime, it serves no purpose, since it is
always overridden in production using withBasePersistDirectory given
some subdirectory of the task work directory.
Removing this from the tuning config has a benefit beyond removing
no-longer-needed logic: it also avoids the side effect of empty
"druid-realtime-persist" directories getting created in the systemwide
temp directory.
* Test adjustments to appropriately set basePersistDirectory.
* Remove unused import.
* Fix RATC constructor.
Overlord leader election can sometimes fail due to task lock re-acquisition issues.
This commit solves the issue by failing such tasks and clearing all their locks.
This commit fixes issues with delayed supervisor termination during certain transient states.
Tasks can be created during supervisor termination and left behind since the cleanup may
not consider these newly added tasks.
#12178 added a lock for the entire process of task creation to prevent such dangling tasks.
But it also introduced a deadlock scenario as follows:
- An invocation of `runInternal` is in progress.
- A `stop` request comes, acquires `stateChangeLock` and submit a `ShutdownNotice`
- `runInternal` keeps waiting to acquire the `stateChangeLock`
- `ShutdownNotice` remains stuck in the notice queue because `runInternal` is still running
- After some timeout, the supervisor goes through a forced termination
Fix:
* `SeekableStreamSupervisor.runInternal` - do not try to acquire lock if supervisor is already stopping
* `SupervisorStateManager.maybeSetState` - do not allow transitions from STOPPING state
* Fixing RACE in HTTP remote task Runner
* Changes in the interface
* Updating documentation
* Adding test cases to SwitchingTaskLogStreamer
* Adding more tests
* Fix serialization in TaskReportFileWriters.
For some reason, serializing a Map<String, TaskReport> would omit the
"type" field. Explicitly sending each value through the ObjectMapper
fixes this, because the type information does not get lost.
* Fixes for static analysis.
* Fix race in TaskQueue.notifyStatus.
It was possible for manageInternal to relaunch a task while it was
being cleaned up, due to a race that happens when notifyStatus is
called to clean up a successful task:
1) In a critical section, notifyStatus removes the task from "tasks".
2) Outside a critical section, notifyStatus calls taskRunner.shutdown
to let the task runner know it can clear out its data structures.
3) In a critical section, syncFromStorage adds the task back to "tasks",
because it is still present in metadata storage.
4) In a critical section, manageInternalCritical notices that the task
is in "tasks" and is not running in the taskRunner, so it launches
it again.
5) In a (different) critical section, notifyStatus updates the metadata
store to set the task status to SUCCESS.
6) The task continues running even though it should not be.
The possibility for this race was introduced in #12099, which shrunk
the critical section in notifyStatus. Prior to that patch, a single
critical section encompassed (1), (2), and (5), so the ordering above
was not possible.
This patch does the following:
1) Fixes the race by adding a recentlyCompletedTasks set that prevents
the main management loop from doing anything with tasks that are
currently being cleaned up.
2) Switches the order of the critical sections in notifyStatus, so
metadata store updates happen first. This is useful in case of
server failures: it ensures that if the Overlord fails in the midst
of notifyStatus, then completed-task statuses are still available in
ZK or on MMs for the next Overlord. (Those are cleaned up by
taskRunner.shutdown, which formerly ran first.) This isn't related
to the race described above, but is fixed opportunistically as part
of the same patch.
3) Changes the "tasks" list to a map. Many operations require retrieval
or removal of individual tasks; those are now O(1) instead of O(N)
in the number of running tasks.
4) Changes various log messages to use task ID instead of full task
payload, to make the logs more readable.
* Fix format string.
* Update comment.
Kinesis ingestion requires all shards to have at least 1 record at the required position in druid.
Even if this is satisified initially, resharding the stream can lead to empty intermediate shards. A significant delay in writing to newly created shards was also problematic.
Kinesis shard sequence numbers are big integers. Introduce two more custom sequence tokens UNREAD_TRIM_HORIZON and UNREAD_LATEST to indicate that a shard has not been read from and that it needs to be read from the start or the end respectively.
These values can be used to avoid the need to read at least one record to obtain a sequence number for ingesting a newly discovered shard.
If a record cannot be obtained immediately, use a marker to obtain the relevant shardIterator and use this shardIterator to obtain a valid sequence number. As long as a valid sequence number is not obtained, continue storing the token as the offset.
These tokens (UNREAD_TRIM_HORIZON and UNREAD_LATEST) are logically ordered to be earlier than any valid sequence number.
However, the ordering requires a few subtle changes to the existing mechanism for record sequence validation:
The sequence availability check ensures that the current offset is before the earliest available sequence in the shard. However, current token being an UNREAD token indicates that any sequence number in the shard is valid (despite the ordering)
Kinesis sequence numbers are inclusive i.e if current sequence == end sequence, there are more records left to read.
However, the equality check is exclusive when dealing with UNREAD tokens.
* Refactor Guice initialization
Builders for various module collections
Revise the extensions loader
Injector builders for server startup
Move Hadoop init to indexer
Clean up server node role filtering
Calcite test injector builder
* Revisions from review comments
* Build fixes
* Revisions from review comments