Adds cluster level idleConfig setting for supervisor (#13311)

* adds cluster level idleConfig

* updates docs

* refactoring

* spelling nit

* nit

* nit

* refactoring
This commit is contained in:
Tejaswini Bandlamudi 2022-11-08 14:54:14 +05:30 committed by GitHub
parent 9a684af3c9
commit 594545da55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 132 additions and 16 deletions

View File

@ -1152,6 +1152,10 @@ There are additional configs for autoscaling (if it is enabled):
|`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task failures before the supervisor is considered unhealthy.|3|
|`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor exceptions should be stored and returned by the supervisor `/status` endpoint.|false|
|`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception events that can be returned through the supervisor `/status` endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
|`druid.supervisor.idleConfig.enabled`|If `true`, supervisor can become idle if there is no data on input stream/topic for some time.|false|
|`druid.supervisor.idleConfig.inactiveAfterMillis`|Supervisor is marked as idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds.|`600_000`|
The `druid.supervisor.idleConfig.*` specified in the runtime properties of the overlord defines the default behavior for the entire cluster. See [Idle Configuration in Kafka Supervisor IOConfig](../development/extensions-core/kafka-supervisor-reference.md#kafkasupervisorioconfig) to override it for an individual supervisor.
#### Overlord Dynamic Configuration

View File

@ -87,7 +87,7 @@ This topic contains configuration reference information for the Apache Kafka sup
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `enabled` | If `true`, Kafka supervisor will become idle if there is no data on input stream/topic for some time. | no (default == false) |
| `inactiveAfterMillis` | Supervisor is marked as idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds. | no (default == 600000) |
| `inactiveAfterMillis` | Supervisor is marked as idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds. | no (default == `600_000`) |
The following example demonstrates supervisor spec with `lagBased` autoScaler and idle config enabled:
```json

View File

@ -326,6 +326,7 @@ public class KafkaSupervisorIOConfigTest
{
HashMap<String, Object> idleConfig = new HashMap<>();
idleConfig.put("enabled", true);
idleConfig.put("inactiveAfterMillis", 600000L);
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
consumerProperties.put("bootstrap.servers", "localhost:8082");
@ -354,6 +355,6 @@ public class KafkaSupervisorIOConfigTest
Assert.assertNotNull(kafkaSupervisorIOConfig1.getIdleConfig());
Assert.assertTrue(kafkaSupervisorIOConfig1.getIdleConfig().isEnabled());
Assert.assertEquals(600000L, kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis());
Assert.assertEquals(Long.valueOf(600000), kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis());
}
}

View File

@ -2112,6 +2112,57 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, supervisor.getState());
}
@Test
public void testSupervisorIsIdleIfStreamInactiveWhenSuspended() throws Exception
{
Map<String, String> config = ImmutableMap.of("idleConfig.enabled", "false",
"idleConfig.inactiveAfterMillis", "200");
supervisorConfig = OBJECT_MAPPER.convertValue(config, SupervisorStateManagerConfig.class);
supervisor = getTestableSupervisorForIdleBehaviour(
1,
2,
true,
"PT10S",
null,
null,
false,
new IdleConfig(true, null)
);
addSomeEvents(1);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<Integer, Long>(topic, ImmutableMap.of(0, 2L, 1, 2L, 2, 2L))
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
replayAll();
supervisor.start();
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
verifyAll();
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisor.getState());
}
@Test
public void testSupervisorIsIdleIfStreamInactiveWhenNoActiveTasksAndFewPendingTasks() throws Exception
{
@ -4314,7 +4365,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new NoopServiceEmitter(),
new DruidMonitorSchedulerConfig(),
rowIngestionMetersFactory,
new SupervisorStateManagerConfig()
supervisorConfig
),
rowIngestionMetersFactory
);

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
import org.apache.druid.indexing.kinesis.KinesisRegion;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.joda.time.DateTime;
@ -91,7 +92,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime,
null
new IdleConfig(null, null)
);
this.endpoint = endpoint != null

View File

@ -25,10 +25,13 @@ import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
/**
* Defines if and when {@link SeekableStreamSupervisor} can become idle.
*/
public class IdleConfig
{
private final boolean enabled;
private final long inactiveAfterMillis;
private final Long inactiveAfterMillis;
@JsonCreator
public IdleConfig(
@ -36,13 +39,12 @@ public class IdleConfig
@Nullable @JsonProperty("inactiveAfterMillis") Long inactiveAfterMillis
)
{
Preconditions.checkArgument(
inactiveAfterMillis == null || inactiveAfterMillis > 0,
"inactiveAfterMillis should be a postive number"
);
this.enabled = enabled != null && enabled;
this.inactiveAfterMillis = inactiveAfterMillis != null
? inactiveAfterMillis
: 600_000L;
Preconditions.checkArgument(this.inactiveAfterMillis > 0,
"inactiveAfterMillis should be a postive number");
this.inactiveAfterMillis = inactiveAfterMillis;
}
@JsonProperty
@ -52,7 +54,7 @@ public class IdleConfig
}
@JsonProperty
public long getInactiveAfterMillis()
public Long getInactiveAfterMillis()
{
return this.inactiveAfterMillis;
}

View File

@ -746,6 +746,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// snapshots latest sequences from stream to be verified in next run cycle of inactive stream check
private final Map<PartitionIdType, SequenceOffsetType> previousSequencesFromStream = new HashMap<>();
private long lastActiveTimeMillis;
private final IdleConfig idleConfig;
public SeekableStreamSupervisor(
final String supervisorId,
@ -804,6 +805,23 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
: Math.min(10, this.ioConfig.getTaskCount() * this.ioConfig.getReplicas()));
}
IdleConfig specIdleConfig = spec.getIoConfig().getIdleConfig();
if (specIdleConfig != null) {
if (specIdleConfig.getInactiveAfterMillis() != null) {
idleConfig = specIdleConfig;
} else {
idleConfig = new IdleConfig(
specIdleConfig.isEnabled(),
spec.getSupervisorStateManagerConfig().getInactiveAfterMillis()
);
}
} else {
idleConfig = new IdleConfig(
spec.getSupervisorStateManagerConfig().isIdleConfigEnabled(),
spec.getSupervisorStateManagerConfig().getInactiveAfterMillis()
);
}
this.workerExec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(
workerThreads,
@ -3292,8 +3310,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private void checkIfStreamInactiveAndTurnSupervisorIdle()
{
IdleConfig idleConfig = spec.getIoConfig().getIdleConfig();
if ((idleConfig == null || !idleConfig.isEnabled()) || spec.isSuspended()) {
if (!idleConfig.isEnabled() || spec.isSuspended()) {
return;
}

View File

@ -973,7 +973,6 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
};
Assert.assertTrue(Objects.requireNonNull(spec.getIoConfig().getIdleConfig()).isEnabled());
Assert.assertEquals(600000L, spec.getIoConfig().getIdleConfig().getInactiveAfterMillis());
}
private static DataSchema getDataSchema()

View File

@ -46,6 +46,12 @@ public class SupervisorStateManagerConfig
@JsonProperty
private int maxStoredExceptionEvents = Math.max(unhealthinessThreshold, healthinessThreshold);
@JsonProperty("idleConfig.enabled")
private boolean idleConfigEnabled = false;
@JsonProperty("idleConfig.inactiveAfterMillis")
private long inactiveAfterMillis = 600_000L;
public SupervisorStateManagerConfig()
{
@ -85,4 +91,14 @@ public class SupervisorStateManagerConfig
{
return maxStoredExceptionEvents;
}
public boolean isIdleConfigEnabled()
{
return idleConfigEnabled;
}
public long getInactiveAfterMillis()
{
return inactiveAfterMillis;
}
}

View File

@ -19,19 +19,30 @@
package org.apache.druid.indexing.overlord.supervisor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class SupervisorStateManagerTest
{
SupervisorStateManagerConfig stateManagerConfig;
@Test
public void testMarkRunFinishedIfSupervisorIsIdle()
{
stateManagerConfig = new SupervisorStateManagerConfig();
SupervisorStateManager supervisorStateManager = new SupervisorStateManager(
new SupervisorStateManagerConfig(),
stateManagerConfig,
false
);
Assert.assertFalse(stateManagerConfig.isIdleConfigEnabled());
Assert.assertEquals(600000, stateManagerConfig.getInactiveAfterMillis());
supervisorStateManager.markRunFinished();
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, supervisorStateManager.getSupervisorState());
@ -41,4 +52,18 @@ public class SupervisorStateManagerTest
Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisorStateManager.getSupervisorState());
}
@Test
public void testIdleConfigSerde()
{
ObjectMapper mapper = new DefaultObjectMapper();
Map<String, String> config = ImmutableMap.of(
"idleConfig.enabled", "true",
"idleConfig.inactiveAfterMillis", "60000"
);
stateManagerConfig = mapper.convertValue(config, SupervisorStateManagerConfig.class);
Assert.assertTrue(stateManagerConfig.isIdleConfigEnabled());
Assert.assertEquals(60000, stateManagerConfig.getInactiveAfterMillis());
}
}