diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index a06ed7f8f3b..7b2cb35c8eb 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -127,6 +127,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`chatThreads`|Integer|The number of threads that will be used for communicating with indexing tasks.|no (default == min(10, taskCount * replicas))| |`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.|no (default == 8)| |`httpTimeout`|ISO8601 Period|How long to wait for a HTTP response from an indexing task.|no (default == PT10S)| +|`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)| #### IndexSpec diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 8009fc93294..0c1b58eb474 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -101,7 +101,6 @@ public class KafkaSupervisor implements Supervisor private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class); private static final Random RANDOM = new Random(); private static final long MAX_RUN_FREQUENCY_MILLIS = 1000; // prevent us from running too often in response to events - private static final int SHUTDOWN_TIMEOUT_MILLIS = 15000; private static final long NOT_SET = -1; // Internal data structures @@ -365,11 +364,12 @@ public class KafkaSupervisor implements Supervisor notices.add(new ShutdownNotice()); } - long endTime = System.currentTimeMillis() + SHUTDOWN_TIMEOUT_MILLIS; + long shutdownTimeoutMillis = tuningConfig.getShutdownTimeout().getMillis(); + long endTime = System.currentTimeMillis() + shutdownTimeoutMillis; while (!stopped) { long sleepTime = endTime - System.currentTimeMillis(); if (sleepTime <= 0) { - log.info("Timed out while waiting for shutdown"); + log.info("Timed out while waiting for shutdown (timeout [%,dms])", shutdownTimeoutMillis); stopped = true; break; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 06ace520e9f..d4e0f2055c1 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -73,6 +73,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec null, null, null, + null, null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 88feaf8b5a8..62d29e848b5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -33,6 +33,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig private final Integer chatThreads; private final Long chatRetries; private final Duration httpTimeout; + private final Duration shutdownTimeout; public KafkaSupervisorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @@ -47,7 +48,8 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig @JsonProperty("workerThreads") Integer workerThreads, @JsonProperty("chatThreads") Integer chatThreads, @JsonProperty("chatRetries") Long chatRetries, - @JsonProperty("httpTimeout") Period httpTimeout + @JsonProperty("httpTimeout") Period httpTimeout, + @JsonProperty("shutdownTimeout") Period shutdownTimeout ) { super( @@ -66,6 +68,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig this.chatThreads = chatThreads; this.chatRetries = (chatRetries != null ? chatRetries : 8); this.httpTimeout = defaultDuration(httpTimeout, "PT10S"); + this.shutdownTimeout = defaultDuration(shutdownTimeout, "PT80S"); } @JsonProperty @@ -92,6 +95,12 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig return httpTimeout; } + @JsonProperty + public Duration getShutdownTimeout() + { + return shutdownTimeout; + } + @Override public String toString() { @@ -109,6 +118,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig ", chatThreads=" + chatThreads + ", chatRetries=" + chatRetries + ", httpTimeout=" + httpTimeout + + ", shutdownTimeout=" + shutdownTimeout + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index ec0565ac701..a7819788722 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -112,6 +112,7 @@ public class KafkaSupervisorTest extends EasyMockSupport private static final int TEST_CHAT_THREADS = 3; private static final long TEST_CHAT_RETRIES = 9L; private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S"); + private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S"); private int numThreads; private TestingCluster zkServer; @@ -177,7 +178,8 @@ public class KafkaSupervisorTest extends EasyMockSupport numThreads, TEST_CHAT_THREADS, TEST_CHAT_RETRIES, - TEST_HTTP_TIMEOUT + TEST_HTTP_TIMEOUT, + TEST_SHUTDOWN_TIMEOUT ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index 6b6ba4b9d80..bfe08ec9fc6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -70,6 +70,7 @@ public class KafkaSupervisorTuningConfigTest Assert.assertNull(config.getChatThreads()); Assert.assertEquals(8L, (long) config.getChatRetries()); Assert.assertEquals(Duration.standardSeconds(10), config.getHttpTimeout()); + Assert.assertEquals(Duration.standardSeconds(80), config.getShutdownTimeout()); } @Test @@ -88,7 +89,8 @@ public class KafkaSupervisorTuningConfigTest + " \"workerThreads\": 12,\n" + " \"chatThreads\": 13,\n" + " \"chatRetries\": 14,\n" - + " \"httpTimeout\": \"PT15S\"\n" + + " \"httpTimeout\": \"PT15S\",\n" + + " \"shutdownTimeout\": \"PT95S\"\n" + "}"; KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue( @@ -113,5 +115,6 @@ public class KafkaSupervisorTuningConfigTest Assert.assertEquals(13, (int) config.getChatThreads()); Assert.assertEquals(14L, (long) config.getChatRetries()); Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout()); + Assert.assertEquals(Duration.standardSeconds(95), config.getShutdownTimeout()); } }