diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClient.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClient.java index 803a6bea07f..30f1c89a910 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClient.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClient.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.indexing.common.IndexTaskClient; import org.apache.druid.indexing.common.TaskInfoProvider; +import org.apache.druid.indexing.kafka.KafkaIndexTask.Status; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; @@ -115,31 +116,48 @@ public class KafkaIndexTaskClient extends IndexTaskClient true ); - if (response.getStatus().equals(HttpResponseStatus.OK)) { + final HttpResponseStatus responseStatus = response.getStatus(); + final String responseContent = response.getContent(); + + if (responseStatus.equals(HttpResponseStatus.OK)) { log.info("Task [%s] paused successfully", id); - return deserialize(response.getContent(), new TypeReference>() + return deserialize(responseContent, new TypeReference>() { }); - } + } else if (responseStatus.equals(HttpResponseStatus.ACCEPTED)) { + // The task received the pause request, but its status hasn't been changed yet. + while (true) { + final Status status = getStatus(id); + if (status == KafkaIndexTask.Status.PAUSED) { + return getCurrentOffsets(id, true); + } - while (true) { - if (getStatus(id) == KafkaIndexTask.Status.PAUSED) { - return getCurrentOffsets(id, true); - } - - final Duration delay = newRetryPolicy().getAndIncrementRetryDelay(); - if (delay == null) { - log.error("Task [%s] failed to pause, aborting", id); - throw new ISE("Task [%s] failed to pause, aborting", id); - } else { - final long sleepTime = delay.getMillis(); - log.info( - "Still waiting for task [%s] to pause; will try again in [%s]", - id, - new Duration(sleepTime).toString() - ); - Thread.sleep(sleepTime); + final Duration delay = newRetryPolicy().getAndIncrementRetryDelay(); + if (delay == null) { + throw new ISE( + "Task [%s] failed to change its status from [%s] to [%s], aborting", + id, + status, + Status.PAUSED + ); + } else { + final long sleepTime = delay.getMillis(); + log.info( + "Still waiting for task [%s] to change its status to [%s]; will try again in [%s]", + id, + Status.PAUSED, + new Duration(sleepTime).toString() + ); + Thread.sleep(sleepTime); + } } + } else { + throw new ISE( + "Pause request for task [%s] failed with response [%s] : [%s]", + id, + responseStatus, + responseContent + ); } } catch (NoTaskLocationException e) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 6992b8dcb28..b0c4ba56ec5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1628,13 +1628,39 @@ public class KafkaSupervisor implements Supervisor // 3) Build a map of the highest offset read by any task in the group for each partition final Map endOffsets = new HashMap<>(); for (int i = 0; i < input.size(); i++) { - Map result = input.get(i); + final Map result = input.get(i); + final String taskId = pauseTaskIds.get(i); - if (result == null || result.isEmpty()) { // kill tasks that didn't return a value - String taskId = pauseTaskIds.get(i); - killTask(taskId, "Task [%s] failed to respond to [pause] in a timely manner, killing task", taskId); + if (result == null) { + // Get the exception + final Throwable pauseException; + try { + // The below get should throw ExecutionException since result is null. + final Map pauseResult = pauseFutures.get(i).get(); + throw new ISE( + "WTH? The pause request for task [%s] is supposed to fail, but returned [%s]", + taskId, + pauseResult + ); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (ExecutionException e) { + pauseException = e.getCause() == null ? e : e.getCause(); + } + + killTask( + taskId, + "An exception occured while waiting for task [%s] to pause: [%s]", + taskId, + pauseException + ); taskGroup.tasks.remove(taskId); + } else if (result.isEmpty()) { + killTask(taskId, "Task [%s] returned empty offsets after pause", taskId); + taskGroup.tasks.remove(taskId); } else { // otherwise build a map of the highest offsets seen for (Entry offset : result.entrySet()) { if (!endOffsets.containsKey(offset.getKey()) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java index 19872b7de18..5f0d915e5c8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java @@ -457,9 +457,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport Capture captured = Capture.newInstance(); Capture captured2 = Capture.newInstance(); Capture captured3 = Capture.newInstance(); + // one time in IndexTaskClient.submitRequest() and another in KafkaIndexTaskClient.pause() expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED).times(2) - .andReturn(HttpResponseStatus.OK).times(2); - expect(responseHolder.getContent()).andReturn("\"PAUSED\"") + .andReturn(HttpResponseStatus.OK).anyTimes(); + expect(responseHolder.getContent()).andReturn("\"PAUSED\"").times(2) .andReturn("{\"0\":1, \"1\":10}").anyTimes(); expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn( Futures.immediateFuture(responseHolder) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 9b4a5a6cf4a..f08212aa309 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -1537,8 +1537,9 @@ public class KafkaSupervisorTest extends EasyMockSupport .andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2); taskQueue.shutdown( EasyMock.contains("sequenceName-0"), - EasyMock.eq("Task [%s] failed to respond to [pause] in a timely manner, killing task"), - EasyMock.contains("sequenceName-0") + EasyMock.eq("An exception occured while waiting for task [%s] to pause: [%s]"), + EasyMock.contains("sequenceName-0"), + EasyMock.anyString() ); expectLastCall().times(2); expect(taskQueue.add(capture(captured))).andReturn(true).times(2);