Adds Idle feature to `SeekableStreamSupervisor` for inactive stream (#13144)

* Idle Seekable stream supervisor changes.

* nit

* nit

* nit

* Adds unit tests

* Supervisor decides it's idle state instead of AutoScaler

* docs update

* nit

* nit

* docs update

* Adds Kafka unit test

* Adds Kafka Integration test.

* Updates travis config.

* Updates kafka-indexing-service dependencies.

* updates previous offsets snapshot & doc

* Doesn't act if supervisor is suspended.

* Fixes highest current offsets fetch bug, adds new Kafka UT tests, doc changes.

* Reverts Kinesis Supervisor idle behaviour changes.

* nit

* nit

* Corrects SeekableStreamSupervisorSpec check on idle behaviour config, adds tests.

* Fixes getHighestCurrentOffsets to fetch offsets of publishing tasks too

* Adds Kafka Supervisor UT

* Improves test coverage in druid-server

* Corrects IT override config

* Doc updates and Syntactic changes

* nit

* supervisorSpec.ioConfig.idleConfig changes
This commit is contained in:
Tejaswini Bandlamudi 2022-10-12 18:31:08 +05:30 committed by GitHub
parent 59e2afc566
commit 3e13584e0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1128 additions and 47 deletions

View File

@ -56,6 +56,7 @@ The list of `detailedState` values and their corresponding `state` mapping is as
|DISCOVERING_INITIAL_TASKS (first iteration only)|RUNNING|The supervisor is discovering already-running tasks|
|CREATING_TASKS (first iteration only)|RUNNING|The supervisor is creating tasks and discovering state|
|RUNNING|RUNNING|The supervisor has started tasks and is waiting for taskDuration to elapse|
|IDLE|IDLE|The supervisor is not creating tasks since the input stream has not received any new data and all the existing data is read.|
|SUSPENDED|SUSPENDED|The supervisor has been suspended|
|STOPPING|STOPPING|The supervisor is stopping|
@ -68,14 +69,14 @@ On each iteration of the supervisor's run loop, the supervisor completes the fol
4) Handle tasks that have exceeded `taskDuration` and should transition from the reading to publishing state.
5) Handle tasks that have finished publishing and signal redundant replica tasks to stop.
6) Handle tasks that have failed and clean up the supervisor's internal state.
7) Compare the list of healthy tasks to the requested `taskCount` and `replicas` configurations and create additional tasks if required.
7) Compare the list of healthy tasks to the requested `taskCount` and `replicas` configurations and create additional tasks if required in case supervisor is not idle.
The `detailedState` field will show additional values (those marked with "first iteration only") the first time the
supervisor executes this run loop after startup or after resuming from a suspension. This is intended to surface
initialization-type issues, where the supervisor is unable to reach a stable state (perhaps because it can't connect to
Kafka, it can't read from the Kafka topic, or it can't communicate with existing tasks). Once the supervisor is stable -
that is, once it has completed a full execution without encountering any issues - `detailedState` will show a `RUNNING`
state until it is stopped, suspended, or hits a task failure threshold and transitions to an unhealthy state.
state until it is idle, stopped, suspended, or hits a task failure threshold and transitions to an unhealthy state.
## Getting Supervisor Ingestion Stats Report

View File

@ -52,6 +52,7 @@ This topic contains configuration reference information for the Apache Kafka sup
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no (default == null)|
|`idleConfig`|Object|Defines how and when Kafka Supervisor can become idle. See [Idle Supervisor Configuration](#idle-supervisor-configuration) for more details.|no (default == null)|
## Task Autoscaler Properties
@ -79,7 +80,16 @@ This topic contains configuration reference information for the Apache Kafka sup
| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
The following example demonstrates supervisor spec with `lagBased` autoScaler enabled:
## Idle Supervisor Configuration
> Note that Idle state transitioning is currently designated as experimental.
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `enabled` | If `true`, Kafka supervisor will become idle if there is no data on input stream/topic for some time. | no (default == false) |
| `inactiveAfterMillis` | Supervisor is marked as idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds. | no (default == 600000) |
The following example demonstrates supervisor spec with `lagBased` autoScaler and idle config enabled:
```json
{
"type": "kafka",
@ -114,7 +124,11 @@ The following example demonstrates supervisor spec with `lagBased` autoScaler en
},
"taskCount":1,
"replicas":1,
"taskDuration":"PT1H"
"taskDuration":"PT1H",
"idleConfig": {
"enabled": true,
"inactiveAfterMillis": 600000
}
},
"tuningConfig":{
...

View File

@ -62,9 +62,11 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
@ -274,17 +276,19 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
@SuppressWarnings("SSBasedInspection")
protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> currentOffsets)
{
return currentOffsets
if (latestSequenceFromStream == null) {
return Collections.emptyMap();
}
return latestSequenceFromStream
.entrySet()
.stream()
.collect(
Collectors.toMap(
Entry::getKey,
e -> latestSequenceFromStream != null
&& latestSequenceFromStream.get(e.getKey()) != null
&& e.getValue() != null
? latestSequenceFromStream.get(e.getKey()) - e.getValue()
: Integer.MIN_VALUE
e -> e.getValue() != null
? e.getValue() - Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
: 0
)
);
}
@ -383,6 +387,12 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
}
}
@Override
protected Map<Integer, Long> getLatestSequencesFromStream()
{
return latestSequenceFromStream != null ? latestSequenceFromStream : new HashMap<>();
}
@Override
protected String baseTaskName()
{

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.StringUtils;
@ -63,7 +64,8 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
@JsonProperty("idleConfig") IdleConfig idleConfig
)
{
super(
@ -79,7 +81,8 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime
lateMessageRejectionStartDateTime,
idleConfig
);
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
@ -140,6 +143,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() +
", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() +
", configOverrides=" + getConfigOverrides() +
", idleConfig=" + getIdleConfig() +
'}';
}

View File

@ -144,6 +144,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
),
null,
@ -320,6 +321,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
),
null,

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@ -305,6 +306,7 @@ public class KafkaSupervisorIOConfigTest
null,
null,
null,
null,
null
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
@ -318,4 +320,40 @@ public class KafkaSupervisorIOConfigTest
kafkaSupervisorIOConfig1.getAutoScalerConfig().getMinTriggerScaleActionFrequencyMillis()
);
}
@Test
public void testIdleConfigSerde() throws JsonProcessingException
{
HashMap<String, Object> idleConfig = new HashMap<>();
idleConfig.put("enabled", true);
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
consumerProperties.put("bootstrap.servers", "localhost:8082");
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
"test",
null,
1,
1,
new Period("PT1H"),
consumerProperties,
null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
true,
new Period("PT30M"),
null,
null,
null,
null,
mapper.convertValue(idleConfig, IdleConfig.class)
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
Assert.assertNotNull(kafkaSupervisorIOConfig1.getIdleConfig());
Assert.assertTrue(kafkaSupervisorIOConfig1.getIdleConfig().isEnabled());
Assert.assertEquals(600000L, kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis());
}
}

View File

@ -67,6 +67,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.St
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
@ -273,7 +274,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
autoScalerConfig.put("scaleInThreshold", 1000000);
autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
autoScalerConfig.put("scaleActionStartDelayMillis", 0);
autoScalerConfig.put("scaleActionPeriodMillis", 100);
autoScalerConfig.put("scaleActionPeriodMillis", 600);
autoScalerConfig.put("taskCountMax", 2);
autoScalerConfig.put("taskCountMin", 1);
autoScalerConfig.put("scaleInStep", 1);
@ -300,7 +301,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
null
null,
new IdleConfig(true, 1000L)
);
final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig(
@ -380,7 +382,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
replayAll();
@ -389,11 +391,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(1, taskCountBeforeScale);
autoscaler.start();
supervisor.runInternal();
Thread.sleep(1 * 1000);
Thread.sleep(1000);
supervisor.runInternal();
verifyAll();
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScale);
Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisor.getState());
KafkaIndexTask task = captured.getValue();
@ -1855,6 +1859,425 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow());
}
@Test
public void testSupervisorIsIdleIfStreamInactive() throws Exception
{
supervisor = getTestableSupervisorForIdleBehaviour(
1,
2,
true,
"PT10S",
null,
null,
false,
new IdleConfig(true, 200L)
);
addSomeEvents(100);
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost", 234, -1);
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of(0, 10L, 2, 30L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
"topic",
ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null,
supervisor.getTuningConfig()
);
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of(1, 20L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
"topic",
ImmutableMap.of(1, Long.MAX_VALUE)
),
null,
null,
supervisor.getTuningConfig()
);
List<Task> existingTasks = ImmutableList.of(id1, id2);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskClient.getStatusAsync("id1"))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
EasyMock.expect(taskClient.getStatusAsync("id2"))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
.anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
replayAll();
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
0,
ImmutableMap.of(0, 0L, 2, 0L),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("id1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
1,
ImmutableMap.of(1, 0L),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("id2"),
ImmutableSet.of()
);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
EasyMock.reset(taskClient);
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 25L, 2, 45L)));
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(1, 45L)));
EasyMock.replay(taskClient);
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
EasyMock.reset(taskClient);
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 25L, 2, 45L)));
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(1, 45L)));
EasyMock.replay(taskClient);
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Assert.assertNotEquals(supervisor.getState(), SupervisorStateManager.BasicState.IDLE);
EasyMock.reset(taskClient);
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2, 101L)));
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
EasyMock.replay(taskClient);
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
EasyMock.reset(taskClient);
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2, 101L)));
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
EasyMock.replay(taskClient);
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisor.getState());
}
@Test
public void testSupervisorIsIdleIfStreamInactiveWhenNoActiveTasks() throws Exception
{
supervisor = getTestableSupervisorForIdleBehaviour(
1,
2,
true,
"PT10S",
null,
null,
false,
new IdleConfig(true, 200L)
);
addSomeEvents(1);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<Integer, Long>(topic, ImmutableMap.of(0, 2L, 1, 2L, 2, 2L))
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
replayAll();
supervisor.start();
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
verifyAll();
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisor.getState());
}
@Test
public void testSupervisorNotIdleIfStreamInactiveWhenSuspended() throws Exception
{
supervisor = getTestableSupervisorForIdleBehaviour(
1,
2,
true,
"PT10S",
null,
null,
true,
new IdleConfig(true, 200L)
);
addSomeEvents(1);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<Integer, Long>(topic, ImmutableMap.of(0, 2L, 1, 2L, 2, 2L))
)
).anyTimes();
replayAll();
supervisor.start();
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, supervisor.getState());
}
@Test
public void testSupervisorIsIdleIfStreamInactiveWhenNoActiveTasksAndFewPendingTasks() throws Exception
{
supervisor = getTestableSupervisorForIdleBehaviour(
1,
2,
true,
"PT10S",
null,
null,
false,
new IdleConfig(true, 200L)
);
addSomeEvents(100);
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost", 234, -1);
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of(0, 10L, 2, 30L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
"topic",
ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null,
supervisor.getTuningConfig()
);
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of(1, 20L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
"topic",
ImmutableMap.of(1, Long.MAX_VALUE)
),
null,
null,
supervisor.getTuningConfig()
);
List<Task> existingTasks = ImmutableList.of(id1, id2);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskClient.getStatusAsync("id1"))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
EasyMock.expect(taskClient.getStatusAsync("id2"))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
.anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
replayAll();
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
0,
ImmutableMap.of(0, 0L, 2, 0L),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("id1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
1,
ImmutableMap.of(1, 0L),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("id2"),
ImmutableSet.of()
);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
EasyMock.reset(taskClient);
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 25L, 2, 45L)));
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(1, 45L)));
EasyMock.replay(taskClient);
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
EasyMock.reset(taskClient);
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2, 101L)));
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
EasyMock.replay(taskClient);
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
EasyMock.reset(taskClient);
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2, 101L)));
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
EasyMock.replay(taskClient);
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisor.getState());
supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.moveTaskGroupToPendingCompletion(1);
Assert.assertEquals(0, supervisor.getActiveTaskGroupsCount());
EasyMock.reset(taskClient);
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2, 101L)));
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
EasyMock.replay(taskClient);
Thread.sleep(100);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisor.getState());
}
@Test
public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
{
@ -3595,7 +4018,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
false,
kafkaHost
kafkaHost,
null
);
}
@ -3619,7 +4043,33 @@ public class KafkaSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
suspended,
kafkaHost
kafkaHost,
null
);
}
private TestableKafkaSupervisor getTestableSupervisorForIdleBehaviour(
int replicas,
int taskCount,
boolean useEarliestOffset,
String duration,
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
boolean suspended,
IdleConfig idleConfig
)
{
return getTestableSupervisor(
replicas,
taskCount,
useEarliestOffset,
false,
duration,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
suspended,
kafkaHost,
idleConfig
);
}
@ -3632,7 +4082,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
boolean suspended,
String kafkaHost
String kafkaHost,
IdleConfig idleConfig
)
{
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
@ -3654,7 +4105,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
null
null,
idleConfig
);
KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
@ -3767,6 +4219,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
null,
null
);
@ -3884,6 +4337,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
null,
null
);

View File

@ -90,7 +90,8 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime
lateMessageRejectionStartDateTime,
null
);
this.endpoint = endpoint != null

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream.supervisor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
public class IdleConfig
{
private final boolean enabled;
private final long inactiveAfterMillis;
@JsonCreator
public IdleConfig(
@Nullable @JsonProperty("enabled") Boolean enabled,
@Nullable @JsonProperty("inactiveAfterMillis") Long inactiveAfterMillis
)
{
this.enabled = enabled != null && enabled;
this.inactiveAfterMillis = inactiveAfterMillis != null
? inactiveAfterMillis
: 600_000L;
Preconditions.checkArgument(this.inactiveAfterMillis > 0,
"inactiveAfterMillis should be a postive number");
}
@JsonProperty
public boolean isEnabled()
{
return this.enabled;
}
@JsonProperty
public long getInactiveAfterMillis()
{
return this.inactiveAfterMillis;
}
@Override
public String toString()
{
return "idleConfig{" +
"enabled=" + enabled +
", inactiveAfterMillis=" + inactiveAfterMillis +
'}';
}
}

View File

@ -743,6 +743,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;
// snapshots latest sequences from stream to be verified in next run cycle of inactive stream check
private final Map<PartitionIdType, SequenceOffsetType> previousSequencesFromStream = new HashMap<>();
private long lastActiveTimeMillis;
public SeekableStreamSupervisor(
final String supervisorId,
final TaskStorage taskStorage,
@ -1454,6 +1458,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
checkCurrentTaskState();
checkIfStreamInactiveAndTurnSupervisorIdle();
// If supervisor is already stopping, don't contend for stateChangeLock since the block can be skipped
if (stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING)) {
generateAndLogReport();
@ -1466,6 +1472,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING)) {
// if we're already terminating, don't do anything here, the terminate already handles shutdown
log.info("[%s] supervisor is already stopping.", dataSource);
} else if (stateManager.isIdle()) {
log.info("[%s] supervisor is idle.", dataSource);
} else if (!spec.isSuspended()) {
log.info("[%s] supervisor is running.", dataSource);
@ -2502,11 +2510,19 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return true;
}
/**
* gets mapping of partitions in stream to their latest offsets.
*/
protected Map<PartitionIdType, SequenceOffsetType> getLatestSequencesFromStream()
{
return new HashMap<>();
}
private void assignRecordSupplierToPartitionIds()
{
recordSupplierLock.lock();
try {
final Set partitions = partitionIds.stream()
final Set<StreamPartition<PartitionIdType>> partitions = partitionIds.stream()
.map(partitionId -> new StreamPartition<>(ioConfig.getStream(), partitionId))
.collect(Collectors.toSet());
if (!recordSupplier.getAssignment().containsAll(partitions)) {
@ -3244,6 +3260,44 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
private void checkIfStreamInactiveAndTurnSupervisorIdle()
{
IdleConfig idleConfig = spec.getIoConfig().getIdleConfig();
if ((idleConfig == null || !idleConfig.isEnabled()) || spec.isSuspended()) {
return;
}
Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream = getLatestSequencesFromStream();
long nowTime = Instant.now().toEpochMilli();
boolean idle;
long idleTime;
if (lastActiveTimeMillis > 0
&& previousSequencesFromStream.equals(latestSequencesFromStream)
&& computeTotalLag() == 0) {
idleTime = nowTime - lastActiveTimeMillis;
idle = true;
} else {
idleTime = 0L;
lastActiveTimeMillis = nowTime;
idle = false;
}
previousSequencesFromStream.clear();
previousSequencesFromStream.putAll(latestSequencesFromStream);
if (!idle) {
stateManager.maybeSetState(SupervisorStateManager.BasicState.RUNNING);
} else if (!stateManager.isIdle() && idleTime > idleConfig.getInactiveAfterMillis()) {
stateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE);
}
}
private long computeTotalLag()
{
LagStats lagStats = computeLagStats();
return lagStats != null ? lagStats.getTotalLag() : 0;
}
/**
* If the seekable stream system supported by this supervisor allows for partition expiration, expired partitions
* should be removed from the starting offsets sent to the tasks.
@ -3595,7 +3649,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// if we aren't in a steady state, chill out for a bit, don't worry, we'll get called later, but if we aren't
// healthy go ahead and try anyway to try if possible to provide insight into how much time is left to fix the
// issue for cluster operators since this feeds the lag metrics
if (stateManager.isSteadyState() || !stateManager.isHealthy()) {
if (stateManager.isIdle() || stateManager.isSteadyState() || !stateManager.isHealthy()) {
try {
updateCurrentOffsets();
updatePartitionLagFromStream();
@ -3646,27 +3700,41 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@Nullable
protected abstract Map<PartitionIdType, Long> getPartitionTimeLag();
/**
* Gets highest current offsets of all the tasks (actively reading and publishing) for all partitions of the stream.
* In case if no task is reading for a partition, returns offset stored in metadata storage for that partition.
* In case of no active and publishing task groups, returns offsets stored in metadata storage.
* Used to compute lag by comparing with latest offsets from stream for reporting and determining idleness.
*/
protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()
{
Map<PartitionIdType, SequenceOffsetType> offsetsFromMetadataStorage = getOffsetsFromMetadataStorage();
if (!spec.isSuspended()) {
if (activelyReadingTaskGroups.size() > 0 || pendingCompletionTaskGroups.size() > 0) {
return activelyReadingTaskGroups
.values()
.stream()
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
.flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream())
.collect(Collectors.toMap(
Map<PartitionIdType, SequenceOffsetType> currentOffsets =
Stream.concat(
activelyReadingTaskGroups
.values()
.stream()
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
.flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream()),
pendingCompletionTaskGroups
.values()
.stream()
.flatMap(taskGroups -> taskGroups.stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()))
.flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream())
).collect(Collectors.toMap(
Entry::getKey,
Entry::getValue,
(v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
));
partitionIds.forEach(partitionId -> currentOffsets.putIfAbsent(partitionId, offsetsFromMetadataStorage.get(partitionId)));
return currentOffsets;
}
// nothing is running but we are not suspended, so lets just hang out in case we get called while things start up
return ImmutableMap.of();
} else {
// if supervisor is suspended, no tasks are likely running so use offsets in metadata, if exist
return getOffsetsFromMetadataStorage();
}
// if supervisor is suspended or is idle and nothing is running, use offsets in metadata, if exist
return offsetsFromMetadataStorage;
}
private OrderedSequenceNumber<SequenceOffsetType> makeSequenceNumber(SequenceOffsetType seq)
@ -3977,7 +4045,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected void emitLag()
{
if (spec.isSuspended() || !stateManager.isSteadyState()) {
if (spec.isSuspended() || !(stateManager.isSteadyState() || stateManager.isIdle())) {
// don't emit metrics if supervisor is suspended or not in a healthy running state
// (lag should still available in status report)
return;

View File

@ -48,6 +48,7 @@ public abstract class SeekableStreamSupervisorIOConfig
private final Optional<Duration> earlyMessageRejectionPeriod;
private final Optional<DateTime> lateMessageRejectionStartDateTime;
@Nullable private final AutoScalerConfig autoScalerConfig;
@Nullable private final IdleConfig idleConfig;
public SeekableStreamSupervisorIOConfig(
String stream,
@ -62,7 +63,8 @@ public abstract class SeekableStreamSupervisorIOConfig
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
@Nullable AutoScalerConfig autoScalerConfig,
DateTime lateMessageRejectionStartDateTime
DateTime lateMessageRejectionStartDateTime,
@Nullable IdleConfig idleConfig
)
{
this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
@ -97,6 +99,8 @@ public abstract class SeekableStreamSupervisorIOConfig
+ "both properties lateMessageRejectionStartDateTime "
+ "and lateMessageRejectionPeriod.");
}
this.idleConfig = idleConfig;
}
private static Duration defaultDuration(final Period period, final String theDefault)
@ -188,4 +192,11 @@ public abstract class SeekableStreamSupervisorIOConfig
{
return lateMessageRejectionStartDateTime;
}
@Nullable
@JsonProperty
public IdleConfig getIdleConfig()
{
return idleConfig;
}
}

View File

@ -40,6 +40,7 @@ import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
@ -77,6 +78,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
@ -865,7 +867,10 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
false,
new Period("PT30M"),
null,
null, null, null
null,
null,
null,
null
)
{
};
@ -902,6 +907,75 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
autoScaler.stop();
}
@Test
public void testEnablingIdleBeviourPerSupervisorWithOverlordConfigEnabled()
{
SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
1,
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
null,
new IdleConfig(true, null)
){
};
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.replay(ingestionSchema);
spec = new SeekableStreamSupervisorSpec(
ingestionSchema,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
supervisorStateManagerConfig
)
{
@Override
public Supervisor createSupervisor()
{
return null;
}
@Override
protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
{
return null;
}
@Override
public String getType()
{
return null;
}
@Override
public String getSource()
{
return null;
}
};
Assert.assertTrue(Objects.requireNonNull(spec.getIoConfig().getIdleConfig()).isEnabled());
Assert.assertEquals(600000L, spec.getIoConfig().getIdleConfig().getInactiveAfterMillis());
}
private static DataSchema getDataSchema()
{
List<DimensionSchema> dimensions = new ArrayList<>();
@ -937,7 +1011,9 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
new Period("PT30M"),
null,
null,
mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null
mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class),
null,
null
)
{
};
@ -954,7 +1030,9 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
new Period("PT30M"),
null,
null,
mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null
mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class),
null,
null
)
{
};

View File

@ -98,6 +98,43 @@ public class SeekableStreamSupervisorStateManagerTest
Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
}
@Test
public void testIdlePath()
{
Assert.assertEquals(BasicState.PENDING, stateManager.getSupervisorState());
Assert.assertEquals(BasicState.PENDING, stateManager.getSupervisorState().getBasicState());
stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, stateManager.getSupervisorState());
Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
stateManager.maybeSetState(SeekableStreamState.DISCOVERING_INITIAL_TASKS);
Assert.assertEquals(SeekableStreamState.DISCOVERING_INITIAL_TASKS, stateManager.getSupervisorState());
Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
stateManager.maybeSetState(SeekableStreamState.CREATING_TASKS);
Assert.assertEquals(SeekableStreamState.CREATING_TASKS, stateManager.getSupervisorState());
Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
stateManager.markRunFinished();
Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
// Emulates submitting Idle notice
stateManager.maybeSetState(BasicState.IDLE);
Assert.assertEquals(BasicState.IDLE, stateManager.getSupervisorState());
Assert.assertEquals(BasicState.IDLE, stateManager.getSupervisorState().getBasicState());
// Stay in idle state when supervisor is running until or unless it is specifically set to a different state
stateManager.markRunFinished();
Assert.assertEquals(BasicState.IDLE, stateManager.getSupervisorState());
Assert.assertEquals(BasicState.IDLE, stateManager.getSupervisorState().getBasicState());
stateManager.maybeSetState(BasicState.RUNNING);
Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
}
@Test
public void testStoppingPath()
{

View File

@ -465,6 +465,95 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
verifyAll();
}
@Test
public void testIdleStateTransition() throws Exception
{
EasyMock.reset(spec);
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
1,
new Period("PT1H"),
new Period("PT1S"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
null,
new IdleConfig(true, 200L)
)
{
}).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
@Override
public Duration getEmitterPeriod()
{
return new Period("PT1S").toStandardDuration();
}
}).anyTimes();
EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
replayAll();
SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
supervisor.start();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
supervisor.runInternal();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
Thread.sleep(100L);
supervisor.runInternal();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
Thread.sleep(100L);
supervisor.runInternal();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
Thread.sleep(100L);
supervisor.runInternal();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
verifyAll();
}
@Test
public void testCreatingTasksFailRecoveryFail() throws Exception
{
@ -908,6 +997,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
null,
null,
null,
null,
null
)
{
@ -964,7 +1054,10 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
false,
new Period("PT30M"),
null,
null, OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class), null
null,
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class),
null,
null
)
{
};
@ -1339,7 +1432,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
@Override
public LagStats computeLagStats()
{
return null;
return new LagStats(0, 0, 0);
}
}

View File

@ -79,12 +79,16 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json";
private static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE = "supervisor_with_autoscaler_spec_template.json";
private static final String SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE =
"supervisor_with_idle_behaviour_enabled_spec_template.json";
protected static final String DATA_RESOURCE_ROOT = "/stream/data";
protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
protected static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE);
protected static final String SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE);
protected static final String SERIALIZER_SPEC_DIR = "serializer";
protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
@ -332,6 +336,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// Start supervisor
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
LOG.info("Submitted supervisor");
String dataSource = generatedTestConfig.getFullDatasourceName();
// Start generating half of the data
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
@ -358,7 +363,10 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// wait for autoScaling task numbers from 1 to 2.
ITRetryUtil.retryUntil(
() -> indexer.getRunningTasks().size() == 2,
() -> indexer.getRunningTasks()
.stream()
.filter(taskResponseObject -> taskResponseObject.getId().contains(dataSource))
.count() == 2,
true,
10000,
50,
@ -378,6 +386,79 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
}
}
protected void doTestIndexDataWithIdleConfigEnabled(@Nullable Boolean transactionEnabled) throws Exception
{
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
INPUT_FORMAT,
getResourceAsString(JSON_INPUT_FORMAT_PATH)
);
try (
final Closeable closer = createResourceCloser(generatedTestConfig);
final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
) {
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
.apply(getResourceAsString(SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH));
LOG.info("supervisorSpec: [%s]\n", taskSpec);
// Start supervisor
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
LOG.info("Submitted supervisor");
String dataSource = generatedTestConfig.getFullDatasourceName();
// Start generating half of the data
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
new JsonEventSerializer(jsonMapper),
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
long numWritten = streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateFirstRound,
FIRST_EVENT_TIME
);
// Verify supervisor is healthy before suspension
ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
true,
10000,
30,
"Waiting for supervisor to be healthy"
);
ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.IDLE.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
true,
10000,
30,
"Waiting for supervisor to be idle"
);
// wait for no more creation of indexing tasks.
ITRetryUtil.retryUntil(
() -> indexer.getRunningTasks()
.stream()
.noneMatch(taskResponseObject -> taskResponseObject.getId().contains(dataSource)),
true,
10000,
50,
"wait for no more creation of indexing tasks"
);
// Start generating remainning half of the data
numWritten += streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateRemaining,
FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
);
// Verify that supervisor can catch up with the stream
verifyIngestedData(generatedTestConfig, numWritten);
}
}
protected void doTestTerminatedSupervisorAutoCleanup(@Nullable Boolean transactionEnabled) throws Exception
{
final GeneratedTestConfig generatedTestConfig1 = new GeneratedTestConfig(

View File

@ -62,6 +62,12 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
doTestIndexDataWithAutoscaler(false);
}
@Test
public void testIndexDataWithIdleConfigEnabled() throws Exception
{
doTestIndexDataWithIdleConfigEnabled(false);
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only

View File

@ -0,0 +1,62 @@
{
"type": "%%STREAM_TYPE%%",
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"parser": %%PARSER%%,
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
"dimensionExclusions": [],
"spatialDimensions": []
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "MINUTE",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "%%STREAM_TYPE%%",
"intermediatePersistPeriod": "PT30S",
"maxRowsPerSegment": 5000000,
"maxRowsInMemory": 500000
},
"ioConfig": {
"%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
"%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
"autoScalerConfig": null,
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT120S",
"%%USE_EARLIEST_KEY%%": true,
"inputFormat" : %%INPUT_FORMAT%%,
"idleConfig": {
"enabled": true,
"inactiveAfterMillis": 10000
}
}
}

View File

@ -62,6 +62,7 @@ public class SupervisorStateManager
PENDING(true, true),
RUNNING(true, false),
IDLE(true, false),
SUSPENDED(true, false),
STOPPING(true, false);
@ -154,10 +155,11 @@ public class SupervisorStateManager
return;
}
// if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) but haven't had a successful run
// if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) or IDLE state but haven't had a successful run
// yet, refuse to switch and prefer the more specific states used for first run (CONNECTING_TO_STREAM,
// DISCOVERING_INITIAL_TASKS, CREATING_TASKS, etc.)
if (healthySteadyState.equals(proposedState) && !atLeastOneSuccessfulRun) {
if ((healthySteadyState.equals(proposedState) || BasicState.IDLE.equals(proposedState))
&& !atLeastOneSuccessfulRun) {
return;
}
@ -196,11 +198,13 @@ public class SupervisorStateManager
consecutiveSuccessfulRuns = currentRunSuccessful ? consecutiveSuccessfulRuns + 1 : 0;
consecutiveFailedRuns = currentRunSuccessful ? 0 : consecutiveFailedRuns + 1;
// Try to set the state to RUNNING or SUSPENDED. This will be rejected if we haven't had atLeastOneSuccessfulRun
// If the supervisor is not IDLE, try to set the state to RUNNING or SUSPENDED.
// This will be rejected if we haven't had atLeastOneSuccessfulRun
// (in favor of the more specific states for the initial run) and will instead trigger setting the state to an
// unhealthy one if we are now over the error thresholds.
maybeSetState(healthySteadyState);
if (!isIdle()) {
maybeSetState(healthySteadyState);
}
// reset for next run
currentRunSuccessful = true;
}
@ -230,6 +234,11 @@ public class SupervisorStateManager
return atLeastOneSuccessfulRun;
}
public boolean isIdle()
{
return SupervisorStateManager.BasicState.IDLE.equals(supervisorState);
}
protected Deque<ExceptionEvent> getRecentEventsQueue()
{
return recentEventsQueue;

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.supervisor;
import org.junit.Assert;
import org.junit.Test;
public class SupervisorStateManagerTest
{
@Test
public void testMarkRunFinishedIfSupervisorIsIdle()
{
SupervisorStateManager supervisorStateManager = new SupervisorStateManager(
new SupervisorStateManagerConfig(),
false
);
supervisorStateManager.markRunFinished();
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, supervisorStateManager.getSupervisorState());
supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE);
supervisorStateManager.markRunFinished();
Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisorStateManager.getSupervisorState());
}
}