From c4b513e5990bd8e112a499be7a48ffddd74244ba Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 24 Oct 2024 12:07:18 -0700 Subject: [PATCH] SeekableStreamSupervisor: Don't await task futures in workerExec. (#17403) Following #17394, workerExec can get deadlocked with itself, because it waits for task futures and is also used as the connectExec for the task client. To fix this, we need to never await task futures in the workerExec. There are two specific changes: in "verifyAndMergeCheckpoints" and "checkpointTaskGroup", two "coalesceAndAwait" calls that formerly occurred in workerExec are replaced with Futures.transform (using a callback in workerExec). Because this adjustment removes a source of blocking, it may also improve supervisor responsiveness for high task counts. This is not the primary goal, however. The primary goal is to fix the bug introduced by #17394. --- .../supervisor/SeekableStreamSupervisor.java | 161 ++++++++++-------- 1 file changed, 89 insertions(+), 72 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 4cbf0ccfa68..8b4845b08d0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; @@ -852,7 +853,7 @@ public abstract class SeekableStreamSupervisor> futures = new ArrayList<>(); for (TaskGroup taskGroup : taskGroupsToVerify) { //noinspection unchecked - futures.add((ListenableFuture) workerExec.submit(() -> verifyAndMergeCheckpoints(taskGroup))); + futures.add((ListenableFuture) verifyAndMergeCheckpoints(taskGroup)); } try { @@ -2327,16 +2328,11 @@ public abstract class SeekableStreamSupervisor verifyAndMergeCheckpoints(final TaskGroup taskGroup) { - final int groupId = taskGroup.groupId; - final List>>> taskSequences = new ArrayList<>(); final List>>> futures = new ArrayList<>(); final List taskIds = new ArrayList<>(); @@ -2350,30 +2346,48 @@ public abstract class SeekableStreamSupervisor>>> futuresResult = - coalesceAndAwait(futures); + return Futures.transform( + FutureUtils.coalesce(futures), + futuresResult -> { + verifyAndMergeCheckpoints(taskGroup, taskIds, futuresResult); + return null; + }, + workerExec + ); + } - for (int i = 0; i < futuresResult.size(); i++) { - final Either>> futureResult = - futuresResult.get(i); - final String taskId = taskIds.get(i); - if (futureResult.isError()) { - final Throwable e = new RuntimeException(futureResult.error()); - stateManager.recordThrowableEvent(e); - log.error(e, "Problem while getting checkpoints for task [%s], killing the task", taskId); - killTask(taskId, "Exception[%s] while getting checkpoints", e.getClass()); - taskGroup.tasks.remove(taskId); - } else if (futureResult.valueOrThrow().isEmpty()) { - log.warn("Ignoring task [%s], as probably it is not started running yet", taskId); - } else { - taskSequences.add(new Pair<>(taskId, futureResult.valueOrThrow())); - } + /** + * This method does two things in {@link #workerExec} - + * 1. Makes sure the checkpoints information in the taskGroup is consistent with that of the tasks, if not kill + * inconsistent tasks. + * 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly + * created tasks for the taskGroup start indexing from after the latest published sequences. + */ + private void verifyAndMergeCheckpoints( + final TaskGroup taskGroup, + final List taskIds, + final List>>> checkpointResults + ) + { + final int groupId = taskGroup.groupId; + final List>>> taskSequences = new ArrayList<>(); + + for (int i = 0; i < checkpointResults.size(); i++) { + final Either>> checkpointResult = + checkpointResults.get(i); + final String taskId = taskIds.get(i); + if (checkpointResult.isError()) { + final Throwable e = new RuntimeException(checkpointResult.error()); + stateManager.recordThrowableEvent(e); + log.error(e, "Problem while getting checkpoints for task [%s], killing the task", taskId); + killTask(taskId, "Exception[%s] while getting checkpoints", e.getClass()); + taskGroup.tasks.remove(taskId); + } else if (checkpointResult.valueOrThrow().isEmpty()) { + log.warn("Ignoring task [%s], as probably it is not started running yet", taskId); + } else { + taskSequences.add(new Pair<>(taskId, checkpointResult.valueOrThrow())); } } - catch (Exception e) { - throw new RuntimeException(e); - } final DataSourceMetadata rawDataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata( dataSource); @@ -3361,13 +3375,12 @@ public abstract class SeekableStreamSupervisor>>, Map>() + new AsyncFunction>>, Map>() { - @Nullable @Override - public Map apply(List>> input) + public ListenableFuture> apply(List>> input) { // 3) Build a map of the highest sequence read by any task in the group for each partition final Map endOffsets = new HashMap<>(); @@ -3408,50 +3421,54 @@ public abstract class SeekableStreamSupervisor> results = coalesceAndAwait(setEndOffsetFutures); - for (int i = 0; i < results.size(); i++) { - if (results.get(i).isValue() && Boolean.valueOf(true).equals(results.get(i).valueOrThrow())) { - log.info("Successfully set endOffsets for task[%s] and resumed it", setEndOffsetTaskIds.get(i)); - } else { - String taskId = setEndOffsetTaskIds.get(i); - killTask(taskId, "Failed to set end offsets, killing task"); - taskGroup.tasks.remove(taskId); - } - } - } - catch (Exception e) { - log.error("An exception occurred while setting end offsets: [%s]", e.getMessage()); - throw new RuntimeException(e); } - if (taskGroup.tasks.isEmpty()) { - log.info("All tasks in taskGroup[%d] have failed, tasks will be re-created", taskGroup.groupId); - return null; + log.info( + "Setting endOffsets for tasks in taskGroup[%d] to [%s]", + taskGroup.groupId, endOffsets + ); + for (final String taskId : setEndOffsetTaskIds) { + setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize)); } - return endOffsets; + return Futures.transform( + FutureUtils.coalesce(setEndOffsetFutures), + results -> { + try { + for (int i = 0; i < results.size(); i++) { + if (results.get(i).isValue() && Boolean.valueOf(true).equals(results.get(i).valueOrThrow())) { + log.info("Successfully set endOffsets for task[%s] and resumed it", setEndOffsetTaskIds.get(i)); + } else { + String taskId = setEndOffsetTaskIds.get(i); + killTask(taskId, "Failed to set end offsets, killing task"); + taskGroup.tasks.remove(taskId); + } + } + } + catch (Exception e) { + log.error("An exception occurred while setting end offsets: [%s]", e.getMessage()); + throw new RuntimeException(e); + } + + if (taskGroup.tasks.isEmpty()) { + log.info("All tasks in taskGroup[%d] have failed, tasks will be re-created", taskGroup.groupId); + return null; + } + + return endOffsets; + }, + workerExec + ); } }, workerExec