From 2efb74ff1e4a9921e236a21475978822be57fe11 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Thu, 10 Mar 2022 05:47:12 +0530 Subject: [PATCH] fix supervisor auto scaler config serde bug (#12317) --- .../supervisor/KafkaSupervisorIOConfig.java | 2 +- .../KafkaSupervisorIOConfigTest.java | 57 +++ .../supervisor/KinesisSupervisorIOConfig.java | 2 +- .../supervisor/KinesisSupervisorTest.java | 4 +- .../supervisor/SeekableStreamSupervisor.java | 2 +- .../SeekableStreamSupervisorIOConfig.java | 2 +- .../SeekableStreamSupervisorSpec.java | 2 +- .../SeekableStreamSupervisorSpecTest.java | 465 ++++++++++-------- 8 files changed, 326 insertions(+), 210 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index 87b689e6e64..0625ead0973 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -121,7 +121,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig ", taskCount=" + getTaskCount() + ", taskDuration=" + getTaskDuration() + ", consumerProperties=" + consumerProperties + - ", autoScalerConfig=" + getAutoscalerConfig() + + ", autoScalerConfig=" + getAutoScalerConfig() + ", pollTimeout=" + pollTimeout + ", startDelay=" + getStartDelay() + ", period=" + getPeriod() + diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 50866890b0f..e503d4fd7c7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -19,21 +19,27 @@ package org.apache.druid.indexing.kafka.supervisor; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.indexing.kafka.KafkaRecordSupplier; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.hamcrest.CoreMatchers; import org.joda.time.Duration; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; public class KafkaSupervisorIOConfigTest @@ -260,4 +266,55 @@ public class KafkaSupervisorIOConfigTest ), KafkaSupervisorIOConfig.class ); } + + @Test + public void testAutoScalingConfigSerde() throws JsonProcessingException + { + HashMap autoScalerConfig = new HashMap<>(); + autoScalerConfig.put("enableTaskAutoScaler", true); + autoScalerConfig.put("lagCollectionIntervalMillis", 500); + autoScalerConfig.put("lagCollectionRangeMillis", 500); + autoScalerConfig.put("scaleOutThreshold", 0); + autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0); + autoScalerConfig.put("scaleInThreshold", 1000000); + autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8); + autoScalerConfig.put("scaleActionStartDelayMillis", 0); + autoScalerConfig.put("scaleActionPeriodMillis", 100); + autoScalerConfig.put("taskCountMax", 2); + autoScalerConfig.put("taskCountMin", 1); + autoScalerConfig.put("scaleInStep", 1); + autoScalerConfig.put("scaleOutStep", 2); + autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000); + + final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); + consumerProperties.put("bootstrap.servers", "localhost:8082"); + + KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( + "test", + null, + 1, + 1, + new Period("PT1H"), + consumerProperties, + mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + new Period("P1D"), + new Period("PT30S"), + true, + new Period("PT30M"), + null, + null, + null + ); + String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); + KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class); + Assert.assertNotNull(kafkaSupervisorIOConfig1.getAutoScalerConfig()); + Assert.assertTrue(kafkaSupervisorIOConfig1.getAutoScalerConfig().getEnableTaskAutoScaler()); + Assert.assertEquals(1, kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMin()); + Assert.assertEquals(2, kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMax()); + Assert.assertEquals( + 1200000, + kafkaSupervisorIOConfig1.getAutoScalerConfig().getMinTriggerScaleActionFrequencyMillis() + ); + } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java index 41ae8767b2e..220c22fcf47 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java @@ -151,7 +151,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig ", endpoint='" + endpoint + '\'' + ", replicas=" + getReplicas() + ", taskCount=" + getTaskCount() + - ", autoScalerConfig=" + getAutoscalerConfig() + + ", autoScalerConfig=" + getAutoScalerConfig() + ", taskDuration=" + getTaskDuration() + ", startDelay=" + getStartDelay() + ", period=" + getPeriod() + diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 02b5bd20b97..3bfe6dd8aa2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -531,7 +531,7 @@ public class KinesisSupervisorTest extends EasyMockSupport false ); - AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoscalerConfig(); + AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoScalerConfig(); Assert.assertNull(autoscalerConfigNull); // create KinesisSupervisorIOConfig with autoScalerConfig Empty @@ -558,7 +558,7 @@ public class KinesisSupervisorTest extends EasyMockSupport false ); - AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoscalerConfig(); + AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoScalerConfig(); Assert.assertNotNull(autoscalerConfig); Assert.assertTrue(autoscalerConfig instanceof LagBasedAutoScalerConfig); Assert.assertFalse(autoscalerConfig.getEnableTaskAutoScaler()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 86579a02b12..e4445f20a2e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -765,7 +765,7 @@ public abstract class SeekableStreamSupervisor startPartitions, - Map endPartitions, - String baseSequenceName, - DateTime minimumMessageTime, - DateTime maximumMessageTime, - Set exclusiveStartSequenceNumberPartitions, - SeekableStreamSupervisorIOConfig ioConfig + int groupId, + Map startPartitions, + Map endPartitions, + String baseSequenceName, + DateTime minimumMessageTime, + DateTime maximumMessageTime, + Set exclusiveStartSequenceNumberPartitions, + SeekableStreamSupervisorIOConfig ioConfig ) { return new SeekableStreamIndexTaskIOConfig( - groupId, - baseSequenceName, - new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions), - new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions), - true, - minimumMessageTime, - maximumMessageTime, - ioConfig.getInputFormat() + groupId, + baseSequenceName, + new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions), + new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions), + true, + minimumMessageTime, + maximumMessageTime, + ioConfig.getInputFormat() ) { }; @@ -199,13 +199,13 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport @Override protected List> createIndexTasks( - int replicas, - String baseSequenceName, - ObjectMapper sortingMapper, - TreeMap> sequenceOffsets, - SeekableStreamIndexTaskIOConfig taskIoConfig, - SeekableStreamIndexTaskTuningConfig taskTuningConfig, - RowIngestionMetersFactory rowIngestionMetersFactory + int replicas, + String baseSequenceName, + ObjectMapper sortingMapper, + TreeMap> sequenceOffsets, + SeekableStreamIndexTaskIOConfig taskIoConfig, + SeekableStreamIndexTaskTuningConfig taskTuningConfig, + RowIngestionMetersFactory rowIngestionMetersFactory ) { return null; @@ -231,8 +231,8 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport @Override protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( - String stream, - Map map + String stream, + Map map ) { return null; @@ -271,27 +271,27 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport @Override protected SeekableStreamSupervisorReportPayload createReportPayload( - int numPartitions, - boolean includeOffsets + int numPartitions, + boolean includeOffsets ) { return new SeekableStreamSupervisorReportPayload( - DATASOURCE, - STREAM, - 1, - 1, - 1L, - null, - null, - null, - null, - null, - null, - false, - true, - null, - null, - null + DATASOURCE, + STREAM, + 1, + 1, + 1L, + null, + null, + null, + null, + null, + null, + false, + true, + null, + null, + null ) { }; @@ -340,7 +340,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport @Override protected void scheduleReporting(ScheduledExecutorService reportingExec) { - // do nothing + // do nothing } @Override @@ -362,34 +362,37 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport private SeekableStreamSupervisor supervisor; private String id; - public TestSeekableStreamSupervisorSpec(SeekableStreamSupervisorIngestionSpec ingestionSchema, - @Nullable Map context, - Boolean suspended, - TaskStorage taskStorage, - TaskMaster taskMaster, - IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, - SeekableStreamIndexTaskClientFactory indexTaskClientFactory, - ObjectMapper mapper, - ServiceEmitter emitter, - DruidMonitorSchedulerConfig monitorSchedulerConfig, - RowIngestionMetersFactory rowIngestionMetersFactory, - SupervisorStateManagerConfig supervisorStateManagerConfig, - SeekableStreamSupervisor supervisor, - String id) + public TestSeekableStreamSupervisorSpec( + SeekableStreamSupervisorIngestionSpec ingestionSchema, + @Nullable Map context, + Boolean suspended, + TaskStorage taskStorage, + TaskMaster taskMaster, + IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, + SeekableStreamIndexTaskClientFactory indexTaskClientFactory, + ObjectMapper mapper, + ServiceEmitter emitter, + DruidMonitorSchedulerConfig monitorSchedulerConfig, + RowIngestionMetersFactory rowIngestionMetersFactory, + SupervisorStateManagerConfig supervisorStateManagerConfig, + SeekableStreamSupervisor supervisor, + String id + ) { super( - ingestionSchema, - context, - suspended, - taskStorage, - taskMaster, - indexerMetadataStorageCoordinator, - indexTaskClientFactory, - mapper, - emitter, - monitorSchedulerConfig, - rowIngestionMetersFactory, - supervisorStateManagerConfig); + ingestionSchema, + context, + suspended, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig + ); this.supervisor = supervisor; this.id = id; @@ -482,26 +485,26 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() { return new SeekableStreamIndexTaskTuningConfig( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null ) { @Override @@ -530,17 +533,30 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport AutoScalerConfig autoScalerConfigNull = mapper.convertValue(null, AutoScalerConfig.class); Assert.assertNull(autoScalerConfigNull); - AutoScalerConfig autoScalerConfigDefault = mapper.convertValue(ImmutableMap.of("autoScalerStrategy", "lagBased"), AutoScalerConfig.class); + AutoScalerConfig autoScalerConfigDefault = mapper.convertValue( + ImmutableMap.of("autoScalerStrategy", "lagBased"), + AutoScalerConfig.class + ); Assert.assertTrue(autoScalerConfigDefault instanceof LagBasedAutoScalerConfig); - AutoScalerConfig autoScalerConfigValue = mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1"), AutoScalerConfig.class); + AutoScalerConfig autoScalerConfigValue = mapper.convertValue( + ImmutableMap.of("lagCollectionIntervalMillis", "1"), + AutoScalerConfig.class + ); Assert.assertTrue(autoScalerConfigValue instanceof LagBasedAutoScalerConfig); LagBasedAutoScalerConfig lagBasedAutoScalerConfig = (LagBasedAutoScalerConfig) autoScalerConfigValue; Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 1); Exception e = null; try { - AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true", "taskCountMax", "1", "taskCountMin", "4"), AutoScalerConfig.class); + AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of( + "enableTaskAutoScaler", + "true", + "taskCountMax", + "1", + "taskCountMin", + "4" + ), AutoScalerConfig.class); } catch (RuntimeException ex) { e = ex; @@ -550,14 +566,15 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport e = null; try { // taskCountMax and taskCountMin couldn't be ignored. - AutoScalerConfig autoScalerError2 = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true"), AutoScalerConfig.class); + AutoScalerConfig autoScalerError2 = mapper.convertValue( + ImmutableMap.of("enableTaskAutoScaler", "true"), + AutoScalerConfig.class + ); } catch (RuntimeException ex) { e = ex; } Assert.assertNotNull(e); - - } @Test @@ -584,51 +601,60 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); EasyMock.replay(ingestionSchema); - EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes(); + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig()) + .andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); EasyMock.replay(seekableStreamSupervisorIOConfig); EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes(); EasyMock.replay(supervisor4); - TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema, - null, - false, - taskStorage, - taskMaster, - indexerMetadataStorageCoordinator, - indexTaskClientFactory, - mapper, - emitter, - monitorSchedulerConfig, - rowIngestionMetersFactory, - supervisorStateManagerConfig, - supervisor4, - "id1"); + TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec( + ingestionSchema, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ); SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4); Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler); EasyMock.reset(seekableStreamSupervisorIOConfig); autoScalerConfig.put("enableTaskAutoScaler", false); - EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes(); + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig()) + .andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); EasyMock.replay(seekableStreamSupervisorIOConfig); SupervisorTaskAutoScaler autoscaler2 = spec.createAutoscaler(supervisor4); Assert.assertTrue(autoscaler2 instanceof NoopTaskAutoScaler); EasyMock.reset(seekableStreamSupervisorIOConfig); autoScalerConfig.remove("enableTaskAutoScaler"); - EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes(); + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig()) + .andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); EasyMock.replay(seekableStreamSupervisorIOConfig); SupervisorTaskAutoScaler autoscaler3 = spec.createAutoscaler(supervisor4); Assert.assertTrue(autoscaler3 instanceof NoopTaskAutoScaler); EasyMock.reset(seekableStreamSupervisorIOConfig); autoScalerConfig.clear(); - EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes(); + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig()) + .andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); EasyMock.replay(seekableStreamSupervisorIOConfig); Assert.assertTrue(autoScalerConfig.isEmpty()); SupervisorTaskAutoScaler autoscaler4 = spec.createAutoscaler(supervisor4); Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler); - } @Test @@ -639,26 +665,38 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); EasyMock.replay(ingestionSchema); - EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1", "enableTaskAutoScaler", true, "taskCountMax", "4", "taskCountMin", "1"), AutoScalerConfig.class)).anyTimes(); + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig()) + .andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", + "1", + "enableTaskAutoScaler", + true, + "taskCountMax", + "4", + "taskCountMin", + "1" + ), AutoScalerConfig.class)) + .anyTimes(); EasyMock.replay(seekableStreamSupervisorIOConfig); EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes(); EasyMock.replay(supervisor4); - TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema, - null, - false, - taskStorage, - taskMaster, - indexerMetadataStorageCoordinator, - indexTaskClientFactory, - mapper, - emitter, - monitorSchedulerConfig, - rowIngestionMetersFactory, - supervisorStateManagerConfig, - supervisor4, - "id1"); + TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec( + ingestionSchema, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ); SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4); Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler); LagBasedAutoScaler lagBasedAutoScaler = (LagBasedAutoScaler) autoscaler; @@ -679,7 +717,6 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport @Test public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedException { - EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); @@ -700,27 +737,31 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3); - LagStats lagStats = supervisor.computeLagStats(); - - LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(2), LagBasedAutoScalerConfig.class), spec); + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( + supervisor, + DATASOURCE, + mapper.convertValue( + getScaleOutProperties(2), + LagBasedAutoScalerConfig.class + ), + spec + ); supervisor.start(); autoScaler.start(); supervisor.runInternal(); int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountBeforeScaleOut); - Thread.sleep(1 * 1000); + Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScaleOut); autoScaler.reset(); autoScaler.stop(); - } @Test public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws InterruptedException { - EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); @@ -740,28 +781,31 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport EasyMock.replay(taskMaster); TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(2); - - LagStats lagStats = supervisor.computeLagStats(); - - LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(3), LagBasedAutoScalerConfig.class), spec); + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( + supervisor, + DATASOURCE, + mapper.convertValue( + getScaleOutProperties(3), + LagBasedAutoScalerConfig.class + ), + spec + ); supervisor.start(); autoScaler.start(); supervisor.runInternal(); int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountBeforeScaleOut); - Thread.sleep(1 * 1000); + Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScaleOut); autoScaler.reset(); autoScaler.stop(); - } @Test public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException { - EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); @@ -781,7 +825,15 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport EasyMock.replay(taskMaster); TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3); - LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleInProperties(), LagBasedAutoScalerConfig.class), spec); + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( + supervisor, + DATASOURCE, + mapper.convertValue( + getScaleInProperties(), + LagBasedAutoScalerConfig.class + ), + spec + ); // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin. Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); @@ -791,7 +843,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport supervisor.runInternal(); int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountBeforeScaleOut); - Thread.sleep(1 * 1000); + Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountAfterScaleOut); @@ -802,20 +854,21 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport @Test public void testSeekableStreamSupervisorSpecWithScaleDisable() throws InterruptedException { - SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig( - "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), - 1, - 1, - new Period("PT1H"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, null, null - ) {}; + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + 1, + 1, + new Period("PT1H"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, null, null + ) + { + }; EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); @@ -856,16 +909,16 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport dimensions.add(StringDimensionSchema.create("dim2")); return new DataSchema( - DATASOURCE, - new TimestampSpec("timestamp", "iso", null), - new DimensionsSpec(dimensions), - new AggregatorFactory[]{new CountAggregatorFactory("rows")}, - new UniformGranularitySpec( - Granularities.HOUR, - Granularities.NONE, - ImmutableList.of() - ), - null + DATASOURCE, + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(dimensions), + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.NONE, + ImmutableList.of() + ), + null ); } @@ -873,32 +926,38 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport { if (scaleOut) { return new SeekableStreamSupervisorIOConfig( - "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), - 1, - taskCount, - new Period("PT1H"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null - ) {}; + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + 1, + taskCount, + new Period("PT1H"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null + ) + { + }; } else { return new SeekableStreamSupervisorIOConfig( - "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), - 1, - taskCount, - new Period("PT1H"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null - ) {}; + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + 1, + taskCount, + new Period("PT1H"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null + ) + { + }; } }