diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 0f99575b13e..6fb3201d98f 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1152,6 +1152,10 @@ There are additional configs for autoscaling (if it is enabled): |`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task failures before the supervisor is considered unhealthy.|3| |`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor exceptions should be stored and returned by the supervisor `/status` endpoint.|false| |`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception events that can be returned through the supervisor `/status` endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`| +|`druid.supervisor.idleConfig.enabled`|If `true`, supervisor can become idle if there is no data on input stream/topic for some time.|false| +|`druid.supervisor.idleConfig.inactiveAfterMillis`|Supervisor is marked as idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds.|`600_000`| + +The `druid.supervisor.idleConfig.*` specified in the runtime properties of the overlord defines the default behavior for the entire cluster. See [Idle Configuration in Kafka Supervisor IOConfig](../development/extensions-core/kafka-supervisor-reference.md#kafkasupervisorioconfig) to override it for an individual supervisor. #### Overlord Dynamic Configuration diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index f1153c71120..6f94f90ffc7 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -87,7 +87,7 @@ This topic contains configuration reference information for the Apache Kafka sup | Property | Description | Required | | ------------- | ------------- | ------------- | | `enabled` | If `true`, Kafka supervisor will become idle if there is no data on input stream/topic for some time. | no (default == false) | -| `inactiveAfterMillis` | Supervisor is marked as idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds. | no (default == 600000) | +| `inactiveAfterMillis` | Supervisor is marked as idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds. | no (default == `600_000`) | The following example demonstrates supervisor spec with `lagBased` autoScaler and idle config enabled: ```json diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index b6a95769b8b..5a4a15590c7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -326,6 +326,7 @@ public class KafkaSupervisorIOConfigTest { HashMap idleConfig = new HashMap<>(); idleConfig.put("enabled", true); + idleConfig.put("inactiveAfterMillis", 600000L); final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); consumerProperties.put("bootstrap.servers", "localhost:8082"); @@ -354,6 +355,6 @@ public class KafkaSupervisorIOConfigTest Assert.assertNotNull(kafkaSupervisorIOConfig1.getIdleConfig()); Assert.assertTrue(kafkaSupervisorIOConfig1.getIdleConfig().isEnabled()); - Assert.assertEquals(600000L, kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis()); + Assert.assertEquals(Long.valueOf(600000), kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis()); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index bcb0c9ed1c9..f81b62bdce4 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2112,6 +2112,57 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, supervisor.getState()); } + @Test + public void testSupervisorIsIdleIfStreamInactiveWhenSuspended() throws Exception + { + Map config = ImmutableMap.of("idleConfig.enabled", "false", + "idleConfig.inactiveAfterMillis", "200"); + supervisorConfig = OBJECT_MAPPER.convertValue(config, SupervisorStateManagerConfig.class); + supervisor = getTestableSupervisorForIdleBehaviour( + 1, + 2, + true, + "PT10S", + null, + null, + false, + new IdleConfig(true, null) + ); + addSomeEvents(1); + + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + new SeekableStreamEndSequenceNumbers(topic, ImmutableMap.of(0, 2L, 1, 2L, 2, 2L)) + ) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + + replayAll(); + + supervisor.start(); + supervisor.updateCurrentAndLatestOffsets(); + supervisor.runInternal(); + verifyAll(); + + Thread.sleep(100); + supervisor.updateCurrentAndLatestOffsets(); + supervisor.runInternal(); + + Thread.sleep(100); + supervisor.updateCurrentAndLatestOffsets(); + supervisor.runInternal(); + + Thread.sleep(100); + supervisor.updateCurrentAndLatestOffsets(); + supervisor.runInternal(); + + Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisor.getState()); + } + @Test public void testSupervisorIsIdleIfStreamInactiveWhenNoActiveTasksAndFewPendingTasks() throws Exception { @@ -4314,7 +4365,7 @@ public class KafkaSupervisorTest extends EasyMockSupport new NoopServiceEmitter(), new DruidMonitorSchedulerConfig(), rowIngestionMetersFactory, - new SupervisorStateManagerConfig() + supervisorConfig ), rowIngestionMetersFactory ); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java index 38d92fb8805..c31b834bf13 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputFormat; import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig; import org.apache.druid.indexing.kinesis.KinesisRegion; +import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.joda.time.DateTime; @@ -91,7 +92,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig earlyMessageRejectionPeriod, autoScalerConfig, lateMessageRejectionStartDateTime, - null + new IdleConfig(null, null) ); this.endpoint = endpoint != null diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java index 6b017399ddd..40734815aa6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java @@ -25,10 +25,13 @@ import com.google.common.base.Preconditions; import javax.annotation.Nullable; +/** + * Defines if and when {@link SeekableStreamSupervisor} can become idle. + */ public class IdleConfig { private final boolean enabled; - private final long inactiveAfterMillis; + private final Long inactiveAfterMillis; @JsonCreator public IdleConfig( @@ -36,13 +39,12 @@ public class IdleConfig @Nullable @JsonProperty("inactiveAfterMillis") Long inactiveAfterMillis ) { + Preconditions.checkArgument( + inactiveAfterMillis == null || inactiveAfterMillis > 0, + "inactiveAfterMillis should be a postive number" + ); this.enabled = enabled != null && enabled; - this.inactiveAfterMillis = inactiveAfterMillis != null - ? inactiveAfterMillis - : 600_000L; - - Preconditions.checkArgument(this.inactiveAfterMillis > 0, - "inactiveAfterMillis should be a postive number"); + this.inactiveAfterMillis = inactiveAfterMillis; } @JsonProperty @@ -52,7 +54,7 @@ public class IdleConfig } @JsonProperty - public long getInactiveAfterMillis() + public Long getInactiveAfterMillis() { return this.inactiveAfterMillis; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 9c90cfe9834..8a79825a5c3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -746,6 +746,7 @@ public abstract class SeekableStreamSupervisor previousSequencesFromStream = new HashMap<>(); private long lastActiveTimeMillis; + private final IdleConfig idleConfig; public SeekableStreamSupervisor( final String supervisorId, @@ -804,6 +805,23 @@ public abstract class SeekableStreamSupervisor config = ImmutableMap.of( + "idleConfig.enabled", "true", + "idleConfig.inactiveAfterMillis", "60000" + ); + stateManagerConfig = mapper.convertValue(config, SupervisorStateManagerConfig.class); + + Assert.assertTrue(stateManagerConfig.isIdleConfigEnabled()); + Assert.assertEquals(60000, stateManagerConfig.getInactiveAfterMillis()); + } }