fix supervisor auto scaler config serde bug (#12317)

This commit is contained in:
Parag Jain 2022-03-10 05:47:12 +05:30 committed by GitHub
parent d89d4ff588
commit 2efb74ff1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 326 additions and 210 deletions

View File

@ -121,7 +121,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", taskCount=" + getTaskCount() +
", taskDuration=" + getTaskDuration() +
", consumerProperties=" + consumerProperties +
", autoScalerConfig=" + getAutoscalerConfig() +
", autoScalerConfig=" + getAutoScalerConfig() +
", pollTimeout=" + pollTimeout +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +

View File

@ -19,21 +19,27 @@
package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.hamcrest.CoreMatchers;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KafkaSupervisorIOConfigTest
@ -260,4 +266,55 @@ public class KafkaSupervisorIOConfigTest
), KafkaSupervisorIOConfig.class
);
}
@Test
public void testAutoScalingConfigSerde() throws JsonProcessingException
{
HashMap<String, Object> autoScalerConfig = new HashMap<>();
autoScalerConfig.put("enableTaskAutoScaler", true);
autoScalerConfig.put("lagCollectionIntervalMillis", 500);
autoScalerConfig.put("lagCollectionRangeMillis", 500);
autoScalerConfig.put("scaleOutThreshold", 0);
autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
autoScalerConfig.put("scaleInThreshold", 1000000);
autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
autoScalerConfig.put("scaleActionStartDelayMillis", 0);
autoScalerConfig.put("scaleActionPeriodMillis", 100);
autoScalerConfig.put("taskCountMax", 2);
autoScalerConfig.put("taskCountMin", 1);
autoScalerConfig.put("scaleInStep", 1);
autoScalerConfig.put("scaleOutStep", 2);
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
consumerProperties.put("bootstrap.servers", "localhost:8082");
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
"test",
null,
1,
1,
new Period("PT1H"),
consumerProperties,
mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
true,
new Period("PT30M"),
null,
null,
null
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
Assert.assertNotNull(kafkaSupervisorIOConfig1.getAutoScalerConfig());
Assert.assertTrue(kafkaSupervisorIOConfig1.getAutoScalerConfig().getEnableTaskAutoScaler());
Assert.assertEquals(1, kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMin());
Assert.assertEquals(2, kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMax());
Assert.assertEquals(
1200000,
kafkaSupervisorIOConfig1.getAutoScalerConfig().getMinTriggerScaleActionFrequencyMillis()
);
}
}

View File

@ -151,7 +151,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", endpoint='" + endpoint + '\'' +
", replicas=" + getReplicas() +
", taskCount=" + getTaskCount() +
", autoScalerConfig=" + getAutoscalerConfig() +
", autoScalerConfig=" + getAutoScalerConfig() +
", taskDuration=" + getTaskDuration() +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +

View File

@ -531,7 +531,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
false
);
AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoscalerConfig();
AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoScalerConfig();
Assert.assertNull(autoscalerConfigNull);
// create KinesisSupervisorIOConfig with autoScalerConfig Empty
@ -558,7 +558,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
false
);
AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoscalerConfig();
AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoScalerConfig();
Assert.assertNotNull(autoscalerConfig);
Assert.assertTrue(autoscalerConfig instanceof LagBasedAutoScalerConfig);
Assert.assertFalse(autoscalerConfig.getEnableTaskAutoScaler());

View File

@ -765,7 +765,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
this.autoScalerConfig = ioConfig.getAutoscalerConfig();
this.autoScalerConfig = ioConfig.getAutoScalerConfig();
this.tuningConfig = spec.getTuningConfig();
this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
this.supervisorId = supervisorId;

View File

@ -125,7 +125,7 @@ public abstract class SeekableStreamSupervisorIOConfig
@Nullable
@JsonProperty
public AutoScalerConfig getAutoscalerConfig()
public AutoScalerConfig getAutoScalerConfig()
{
return autoScalerConfig;
}

View File

@ -162,7 +162,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
@Override
public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
{
AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoscalerConfig();
AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoScalerConfig();
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) {
return autoScalerConfig.createAutoScaler(supervisor, this);
}

View File

@ -133,15 +133,15 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
private BaseTestSeekableStreamSupervisor()
{
super(
"testSupervisorId",
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
taskClientFactory,
OBJECT_MAPPER,
spec,
rowIngestionMetersFactory,
false
"testSupervisorId",
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
taskClientFactory,
OBJECT_MAPPER,
spec,
rowIngestionMetersFactory,
false
);
}
@ -154,7 +154,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
@Override
protected void updatePartitionLagFromStream()
{
// do nothing
// do nothing
}
@Nullable
@ -173,25 +173,25 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
@Override
protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
int groupId,
Map<String, String> startPartitions,
Map<String, String> endPartitions,
String baseSequenceName,
DateTime minimumMessageTime,
DateTime maximumMessageTime,
Set<String> exclusiveStartSequenceNumberPartitions,
SeekableStreamSupervisorIOConfig ioConfig
int groupId,
Map<String, String> startPartitions,
Map<String, String> endPartitions,
String baseSequenceName,
DateTime minimumMessageTime,
DateTime maximumMessageTime,
Set<String> exclusiveStartSequenceNumberPartitions,
SeekableStreamSupervisorIOConfig ioConfig
)
{
return new SeekableStreamIndexTaskIOConfig<String, String>(
groupId,
baseSequenceName,
new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions),
new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
true,
minimumMessageTime,
maximumMessageTime,
ioConfig.getInputFormat()
groupId,
baseSequenceName,
new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions),
new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
true,
minimumMessageTime,
maximumMessageTime,
ioConfig.getInputFormat()
)
{
};
@ -199,13 +199,13 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
@Override
protected List<SeekableStreamIndexTask<String, String, ByteEntity>> createIndexTasks(
int replicas,
String baseSequenceName,
ObjectMapper sortingMapper,
TreeMap<Integer, Map<String, String>> sequenceOffsets,
SeekableStreamIndexTaskIOConfig taskIoConfig,
SeekableStreamIndexTaskTuningConfig taskTuningConfig,
RowIngestionMetersFactory rowIngestionMetersFactory
int replicas,
String baseSequenceName,
ObjectMapper sortingMapper,
TreeMap<Integer, Map<String, String>> sequenceOffsets,
SeekableStreamIndexTaskIOConfig taskIoConfig,
SeekableStreamIndexTaskTuningConfig taskTuningConfig,
RowIngestionMetersFactory rowIngestionMetersFactory
)
{
return null;
@ -231,8 +231,8 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
String stream,
Map<String, String> map
String stream,
Map<String, String> map
)
{
return null;
@ -271,27 +271,27 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
@Override
protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(
int numPartitions,
boolean includeOffsets
int numPartitions,
boolean includeOffsets
)
{
return new SeekableStreamSupervisorReportPayload<String, String>(
DATASOURCE,
STREAM,
1,
1,
1L,
null,
null,
null,
null,
null,
null,
false,
true,
null,
null,
null
DATASOURCE,
STREAM,
1,
1,
1L,
null,
null,
null,
null,
null,
null,
false,
true,
null,
null,
null
)
{
};
@ -340,7 +340,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
@Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
// do nothing
// do nothing
}
@Override
@ -362,34 +362,37 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
private SeekableStreamSupervisor supervisor;
private String id;
public TestSeekableStreamSupervisorSpec(SeekableStreamSupervisorIngestionSpec ingestionSchema,
@Nullable Map<String, Object> context,
Boolean suspended,
TaskStorage taskStorage,
TaskMaster taskMaster,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
SeekableStreamIndexTaskClientFactory indexTaskClientFactory,
ObjectMapper mapper,
ServiceEmitter emitter,
DruidMonitorSchedulerConfig monitorSchedulerConfig,
RowIngestionMetersFactory rowIngestionMetersFactory,
SupervisorStateManagerConfig supervisorStateManagerConfig,
SeekableStreamSupervisor supervisor,
String id)
public TestSeekableStreamSupervisorSpec(
SeekableStreamSupervisorIngestionSpec ingestionSchema,
@Nullable Map<String, Object> context,
Boolean suspended,
TaskStorage taskStorage,
TaskMaster taskMaster,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
SeekableStreamIndexTaskClientFactory indexTaskClientFactory,
ObjectMapper mapper,
ServiceEmitter emitter,
DruidMonitorSchedulerConfig monitorSchedulerConfig,
RowIngestionMetersFactory rowIngestionMetersFactory,
SupervisorStateManagerConfig supervisorStateManagerConfig,
SeekableStreamSupervisor supervisor,
String id
)
{
super(
ingestionSchema,
context,
suspended,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig);
ingestionSchema,
context,
suspended,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig
);
this.supervisor = supervisor;
this.id = id;
@ -482,26 +485,26 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
{
return new SeekableStreamIndexTaskTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
{
@Override
@ -530,17 +533,30 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
AutoScalerConfig autoScalerConfigNull = mapper.convertValue(null, AutoScalerConfig.class);
Assert.assertNull(autoScalerConfigNull);
AutoScalerConfig autoScalerConfigDefault = mapper.convertValue(ImmutableMap.of("autoScalerStrategy", "lagBased"), AutoScalerConfig.class);
AutoScalerConfig autoScalerConfigDefault = mapper.convertValue(
ImmutableMap.of("autoScalerStrategy", "lagBased"),
AutoScalerConfig.class
);
Assert.assertTrue(autoScalerConfigDefault instanceof LagBasedAutoScalerConfig);
AutoScalerConfig autoScalerConfigValue = mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1"), AutoScalerConfig.class);
AutoScalerConfig autoScalerConfigValue = mapper.convertValue(
ImmutableMap.of("lagCollectionIntervalMillis", "1"),
AutoScalerConfig.class
);
Assert.assertTrue(autoScalerConfigValue instanceof LagBasedAutoScalerConfig);
LagBasedAutoScalerConfig lagBasedAutoScalerConfig = (LagBasedAutoScalerConfig) autoScalerConfigValue;
Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 1);
Exception e = null;
try {
AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true", "taskCountMax", "1", "taskCountMin", "4"), AutoScalerConfig.class);
AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of(
"enableTaskAutoScaler",
"true",
"taskCountMax",
"1",
"taskCountMin",
"4"
), AutoScalerConfig.class);
}
catch (RuntimeException ex) {
e = ex;
@ -550,14 +566,15 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
e = null;
try {
// taskCountMax and taskCountMin couldn't be ignored.
AutoScalerConfig autoScalerError2 = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true"), AutoScalerConfig.class);
AutoScalerConfig autoScalerError2 = mapper.convertValue(
ImmutableMap.of("enableTaskAutoScaler", "true"),
AutoScalerConfig.class
);
}
catch (RuntimeException ex) {
e = ex;
}
Assert.assertNotNull(e);
}
@Test
@ -584,51 +601,60 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
.andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class))
.anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
EasyMock.replay(supervisor4);
TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema,
null,
false,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig,
supervisor4,
"id1");
TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(
ingestionSchema,
null,
false,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig,
supervisor4,
"id1"
);
SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
EasyMock.reset(seekableStreamSupervisorIOConfig);
autoScalerConfig.put("enableTaskAutoScaler", false);
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
.andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class))
.anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
SupervisorTaskAutoScaler autoscaler2 = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler2 instanceof NoopTaskAutoScaler);
EasyMock.reset(seekableStreamSupervisorIOConfig);
autoScalerConfig.remove("enableTaskAutoScaler");
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
.andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class))
.anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
SupervisorTaskAutoScaler autoscaler3 = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler3 instanceof NoopTaskAutoScaler);
EasyMock.reset(seekableStreamSupervisorIOConfig);
autoScalerConfig.clear();
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
.andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class))
.anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
Assert.assertTrue(autoScalerConfig.isEmpty());
SupervisorTaskAutoScaler autoscaler4 = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler);
}
@Test
@ -639,26 +665,38 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1", "enableTaskAutoScaler", true, "taskCountMax", "4", "taskCountMin", "1"), AutoScalerConfig.class)).anyTimes();
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
.andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis",
"1",
"enableTaskAutoScaler",
true,
"taskCountMax",
"4",
"taskCountMin",
"1"
), AutoScalerConfig.class))
.anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
EasyMock.replay(supervisor4);
TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema,
null,
false,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig,
supervisor4,
"id1");
TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(
ingestionSchema,
null,
false,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig,
supervisor4,
"id1"
);
SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
LagBasedAutoScaler lagBasedAutoScaler = (LagBasedAutoScaler) autoscaler;
@ -679,7 +717,6 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
@Test
public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@ -700,27 +737,31 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
LagStats lagStats = supervisor.computeLagStats();
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(2), LagBasedAutoScalerConfig.class), spec);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
DATASOURCE,
mapper.convertValue(
getScaleOutProperties(2),
LagBasedAutoScalerConfig.class
),
spec
);
supervisor.start();
autoScaler.start();
supervisor.runInternal();
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountBeforeScaleOut);
Thread.sleep(1 * 1000);
Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
autoScaler.reset();
autoScaler.stop();
}
@Test
public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@ -740,28 +781,31 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
EasyMock.replay(taskMaster);
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(2);
LagStats lagStats = supervisor.computeLagStats();
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(3), LagBasedAutoScalerConfig.class), spec);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
DATASOURCE,
mapper.convertValue(
getScaleOutProperties(3),
LagBasedAutoScalerConfig.class
),
spec
);
supervisor.start();
autoScaler.start();
supervisor.runInternal();
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountBeforeScaleOut);
Thread.sleep(1 * 1000);
Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
autoScaler.reset();
autoScaler.stop();
}
@Test
public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@ -781,7 +825,15 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
EasyMock.replay(taskMaster);
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleInProperties(), LagBasedAutoScalerConfig.class), spec);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
DATASOURCE,
mapper.convertValue(
getScaleInProperties(),
LagBasedAutoScalerConfig.class
),
spec
);
// enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin.
Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
@ -791,7 +843,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
supervisor.runInternal();
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountBeforeScaleOut);
Thread.sleep(1 * 1000);
Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountAfterScaleOut);
@ -802,20 +854,21 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
@Test
public void testSeekableStreamSupervisorSpecWithScaleDisable() throws InterruptedException
{
SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
1,
1,
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null, null, null
) {};
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
1,
1,
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null, null, null
)
{
};
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
@ -856,16 +909,16 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
dimensions.add(StringDimensionSchema.create("dim2"));
return new DataSchema(
DATASOURCE,
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(dimensions),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
ImmutableList.of()
),
null
DATASOURCE,
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(dimensions),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
ImmutableList.of()
),
null
);
}
@ -873,32 +926,38 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
{
if (scaleOut) {
return new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
1,
taskCount,
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null, mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null
) {};
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
1,
taskCount,
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null
)
{
};
} else {
return new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
1,
taskCount,
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null, mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null
) {};
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
1,
taskCount,
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null
)
{
};
}
}