From 60daddedf8b0a9faf1edeb4603eb4b8e86fd3dc2 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 22 Oct 2024 20:21:21 -0700 Subject: [PATCH] SeekableStreamSupervisor: Use workerExec as the client connectExec. (#17394) * SeekableStreamSupervisor: Use workerExec as the client connectExec. This patch uses the already-existing per-supervisor workerExec as the connectExec for task clients, rather than using the process-wide default ServiceClientFactory pool. This helps prevent callbacks from backlogging on the process-wide pool. It's especially useful for retries, where callbacks may need to establish new TCP connections or perform TLS handshakes. * Fix compilation, tests. * Fix style. --- .../RabbitStreamIndexTaskClientFactory.java | 6 +- .../kafka/KafkaIndexTaskClientFactory.java | 6 +- .../kafka/supervisor/KafkaSupervisorTest.java | 975 +++++++++++++----- .../KinesisIndexTaskClientFactory.java | 6 +- .../supervisor/KinesisSupervisorTest.java | 24 +- .../SeekableStreamIndexTaskClientFactory.java | 27 +- .../supervisor/SeekableStreamSupervisor.java | 36 +- .../SeekableStreamSupervisorStateTest.java | 2 +- 8 files changed, 773 insertions(+), 309 deletions(-) diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java index 83cdde8191f..088dc6b77d5 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java @@ -25,17 +25,17 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Json; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; -import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.java.util.http.client.HttpClient; @LazySingleton public class RabbitStreamIndexTaskClientFactory extends SeekableStreamIndexTaskClientFactory { @Inject public RabbitStreamIndexTaskClientFactory( - @EscalatedGlobal ServiceClientFactory serviceClientFactory, + @EscalatedGlobal HttpClient httpClient, @Json ObjectMapper mapper) { - super(serviceClientFactory, mapper); + super(httpClient, mapper); } @Override diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java index 73a880b524b..10b57dd4757 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java @@ -26,18 +26,18 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Json; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; -import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.java.util.http.client.HttpClient; @LazySingleton public class KafkaIndexTaskClientFactory extends SeekableStreamIndexTaskClientFactory { @Inject public KafkaIndexTaskClientFactory( - @EscalatedGlobal ServiceClientFactory serviceClientFactory, + @EscalatedGlobal HttpClient httpClient, @Json ObjectMapper mapper ) { - super(serviceClientFactory, mapper); + super(httpClient, mapper); } @Override diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index e436b8cd56a..7926a0568fd 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -127,6 +127,7 @@ import java.util.Map; import java.util.Properties; import java.util.TreeMap; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; @RunWith(Parameterized.class) public class KafkaSupervisorTest extends EasyMockSupport @@ -202,10 +203,11 @@ public class KafkaSupervisorTest extends EasyMockSupport zkServer.getConnectString(), null, 1, - ImmutableMap.of("num.partitions", - String.valueOf(NUM_PARTITIONS), - "auto.create.topics.enable", - String.valueOf(false) + ImmutableMap.of( + "num.partitions", + String.valueOf(NUM_PARTITIONS), + "auto.create.topics.enable", + String.valueOf(false) ) ); kafkaServer.start(); @@ -267,11 +269,10 @@ public class KafkaSupervisorTest extends EasyMockSupport public SeekableStreamIndexTaskClient build( final String dataSource, final TaskInfoProvider taskInfoProvider, - final int maxNumTasks, - final SeekableStreamSupervisorTuningConfig tuningConfig + final SeekableStreamSupervisorTuningConfig tuningConfig, + final ScheduledExecutorService connectExec ) { - Assert.assertEquals(replicas * taskCountMax, maxNumTasks); Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), tuningConfig.getHttpTimeout()); Assert.assertEquals(TEST_CHAT_RETRIES, (long) tuningConfig.getChatRetries()); return taskClient; @@ -299,53 +300,53 @@ public class KafkaSupervisorTest extends EasyMockSupport consumerProperties.put("bootstrap.servers", kafkaHost); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( - topic, - null, - INPUT_FORMAT, - replicas, - 1, - new Period("PT1H"), - consumerProperties, - OBJECT_MAPPER.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class), - KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, - new Period("P1D"), - new Period("PT30S"), - true, - new Period("PT30M"), - null, - null, - null, - null, - new IdleConfig(true, 1000L), - 1 + topic, + null, + INPUT_FORMAT, + replicas, + 1, + new Period("PT1H"), + consumerProperties, + OBJECT_MAPPER.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + new Period("P1D"), + new Period("PT30S"), + true, + new Period("PT30M"), + null, + null, + null, + null, + new IdleConfig(true, 1000L), + 1 ); final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig( - null, - 1000, - null, - null, - 50000, - null, - new Period("P1Y"), - null, - null, - null, - false, - null, - false, - null, - numThreads, - TEST_CHAT_RETRIES, - TEST_HTTP_TIMEOUT, - TEST_SHUTDOWN_TIMEOUT, - null, - null, - null, - null, - null, - null, - null + null, + 1000, + null, + null, + 50000, + null, + new Period("P1Y"), + null, + null, + null, + false, + null, + false, + null, + numThreads, + TEST_CHAT_RETRIES, + TEST_HTTP_TIMEOUT, + TEST_SHUTDOWN_TIMEOUT, + null, + null, + null, + null, + null, + null, + null ); EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes(); @@ -354,31 +355,31 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.replay(ingestionSchema); SeekableStreamSupervisorSpec testableSupervisorSpec = new KafkaSupervisorSpec( - ingestionSchema, - dataSchema, - tuningConfigOri, - kafkaSupervisorIOConfig, - null, - false, - taskStorage, - taskMaster, - indexerMetadataStorageCoordinator, - taskClientFactory, - OBJECT_MAPPER, - new NoopServiceEmitter(), - new DruidMonitorSchedulerConfig(), - rowIngestionMetersFactory, - new SupervisorStateManagerConfig() + ingestionSchema, + dataSchema, + tuningConfigOri, + kafkaSupervisorIOConfig, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + new SupervisorStateManagerConfig() ); supervisor = new TestableKafkaSupervisor( - taskStorage, - taskMaster, - indexerMetadataStorageCoordinator, - taskClientFactory, - OBJECT_MAPPER, - (KafkaSupervisorSpec) testableSupervisorSpec, - rowIngestionMetersFactory + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + (KafkaSupervisorSpec) testableSupervisorSpec, + rowIngestionMetersFactory ); SupervisorTaskAutoScaler autoscaler = testableSupervisorSpec.createAutoscaler(supervisor); @@ -393,9 +394,9 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( - new KafkaDataSourceMetadata( - null - ) + new KafkaDataSourceMetadata( + null + ) ).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).anyTimes(); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); @@ -425,22 +426,43 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent()); Assert.assertEquals(topic, taskConfig.getStartSequenceNumbers().getStream()); - Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0))); - Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1))); - Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2))); + Assert.assertEquals( + 0L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + ); + Assert.assertEquals( + 0L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + ); + Assert.assertEquals( + 0L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + ); Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream()); Assert.assertEquals( - Long.MAX_VALUE, - (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)) + Long.MAX_VALUE, + (long) taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) ); Assert.assertEquals( - Long.MAX_VALUE, - (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)) + Long.MAX_VALUE, + (long) taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) ); Assert.assertEquals( - Long.MAX_VALUE, - (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)) + Long.MAX_VALUE, + (long) taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) ); Assert.assertEquals( Collections.singleton(new ResourceAction( @@ -543,22 +565,43 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent()); Assert.assertEquals(topic, taskConfig.getStartSequenceNumbers().getStream()); - Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0))); - Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1))); - Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2))); + Assert.assertEquals( + 0L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + ); + Assert.assertEquals( + 0L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + ); + Assert.assertEquals( + 0L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + ); Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream()); Assert.assertEquals( Long.MAX_VALUE, - (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)) + (long) taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) ); Assert.assertEquals( Long.MAX_VALUE, - (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)) + (long) taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) ); Assert.assertEquals( Long.MAX_VALUE, - (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)) + (long) taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) ); } @@ -613,19 +656,35 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals(2, task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size()); Assert.assertEquals( 0L, - task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + task1.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( Long.MAX_VALUE, - task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + task1.getIOConfig() + .getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( 0L, - task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + task1.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); Assert.assertEquals( Long.MAX_VALUE, - task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + task1.getIOConfig() + .getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); KafkaIndexTask task2 = captured.getValues().get(1); @@ -633,11 +692,19 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals(1, task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size()); Assert.assertEquals( 0L, - task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + task2.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( Long.MAX_VALUE, - task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + task2.getIOConfig() + .getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); } @@ -668,15 +735,27 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals(3, task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size()); Assert.assertEquals( 0L, - task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + task1.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( 0L, - task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + task1.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( 0L, - task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + task1.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); KafkaIndexTask task2 = captured.getValues().get(1); @@ -684,15 +763,27 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals(3, task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size()); Assert.assertEquals( 0L, - task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + task2.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( 0L, - task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + task2.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( 0L, - task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + task2.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); } @@ -802,15 +893,27 @@ public class KafkaSupervisorTest extends EasyMockSupport KafkaIndexTask task = captured.getValue(); Assert.assertEquals( 1101L, - task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + task.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( 1101L, - task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + task.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( 1101L, - task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + task.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); } @@ -867,15 +970,27 @@ public class KafkaSupervisorTest extends EasyMockSupport KafkaIndexTask task = captured.getValue(); Assert.assertEquals( 10, - task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + task.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( 10, - task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + task.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( 10, - task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + task.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); addMoreEvents(9, 6); @@ -899,15 +1014,27 @@ public class KafkaSupervisorTest extends EasyMockSupport task = newcaptured.getValue(); Assert.assertEquals( 0, - task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 3)).longValue() + task.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 3)) + .longValue() ); Assert.assertEquals( 0, - task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 4)).longValue() + task.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 4)) + .longValue() ); Assert.assertEquals( 0, - task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 5)).longValue() + task.getIOConfig() + .getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 5)) + .longValue() ); } @@ -928,7 +1055,11 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()) + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ) ) ).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); @@ -943,15 +1074,24 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertEquals( 10L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( 20L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( 30L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); } @@ -991,15 +1131,24 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertEquals( 10L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(true, topic, 0)) + .longValue() ); Assert.assertEquals( 20L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(true, topic, 1)) + .longValue() ); Assert.assertEquals( 30L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(true, topic, 2)) + .longValue() ); Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); } @@ -1040,15 +1189,24 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertEquals( 0L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(true, topic, 0)) + .longValue() ); Assert.assertEquals( 0L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(true, topic, 1)) + .longValue() ); Assert.assertEquals( 0L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(true, topic, 2)) + .longValue() ); Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); } @@ -1074,7 +1232,11 @@ public class KafkaSupervisorTest extends EasyMockSupport partitionSequenceNumberMap.putAll(multiTopicPartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L)); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topicPattern, partitionSequenceNumberMap.build(), ImmutableSet.of()) + new SeekableStreamStartSequenceNumbers<>( + topicPattern, + partitionSequenceNumberMap.build(), + ImmutableSet.of() + ) ) ).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); @@ -1089,15 +1251,24 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertEquals( 10L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( 20L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( 30L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); } @@ -1139,15 +1310,24 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertEquals( 10L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(true, topic, 0)) + .longValue() ); Assert.assertEquals( 20L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(true, topic, 1)) + .longValue() ); Assert.assertEquals( 30L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(true, topic, 2)) + .longValue() ); Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); } @@ -1239,7 +1419,10 @@ public class KafkaSupervisorTest extends EasyMockSupport DATASOURCE, 0, new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 0L, 2, 0L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>("topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + new SeekableStreamEndSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), null, null, tuningConfig @@ -1258,7 +1441,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id3", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -1272,7 +1459,10 @@ public class KafkaSupervisorTest extends EasyMockSupport DATASOURCE, 0, new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 0L, 1, 0L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>("topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE)), + new SeekableStreamEndSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE) + ), null, null, tuningConfig @@ -1282,7 +1472,10 @@ public class KafkaSupervisorTest extends EasyMockSupport DATASOURCE, 0, new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 1, 0L, 2, 0L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>("topic", singlePartitionMap(topic, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + new SeekableStreamEndSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), null, null, tuningConfig @@ -1443,7 +1636,10 @@ public class KafkaSupervisorTest extends EasyMockSupport DATASOURCE, 0, new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 0L, 2, 0L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>("topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + new SeekableStreamEndSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), now, maxi, supervisor.getTuningConfig() @@ -1726,8 +1922,18 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); Assert.assertEquals(topic, taskConfig.getStartSequenceNumbers().getStream()); - Assert.assertEquals(10L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0))); - Assert.assertEquals(35L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2))); + Assert.assertEquals( + 10L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + ); + Assert.assertEquals( + 35L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + ); } } @@ -1744,7 +1950,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -1825,29 +2035,47 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals(topic, capturedTaskConfig.getStartSequenceNumbers().getStream()); Assert.assertEquals( 10L, - capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + capturedTaskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( 20L, - capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + capturedTaskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( 30L, - capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + capturedTaskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); Assert.assertEquals(topic, capturedTaskConfig.getEndSequenceNumbers().getStream()); Assert.assertEquals( Long.MAX_VALUE, - capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + capturedTaskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( Long.MAX_VALUE, - capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + capturedTaskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( Long.MAX_VALUE, - capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + capturedTaskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); } @@ -1865,7 +2093,10 @@ public class KafkaSupervisorTest extends EasyMockSupport DATASOURCE, 0, new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 0L, 2, 0L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>("topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + new SeekableStreamEndSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), null, null, supervisor.getTuningConfig() @@ -1936,29 +2167,47 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals(topic, capturedTaskConfig.getStartSequenceNumbers().getStream()); Assert.assertEquals( 10L, - capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + capturedTaskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( 0L, - capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + capturedTaskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( 30L, - capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + capturedTaskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); Assert.assertEquals(topic, capturedTaskConfig.getEndSequenceNumbers().getStream()); Assert.assertEquals( Long.MAX_VALUE, - capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + capturedTaskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( Long.MAX_VALUE, - capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + capturedTaskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( Long.MAX_VALUE, - capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + capturedTaskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); } @@ -1977,7 +2226,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -1991,7 +2244,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id2", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 1L, 1, 2L, 2, 3L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 1L, 1, 2L, 2, 3L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -2407,7 +2664,17 @@ public class KafkaSupervisorTest extends EasyMockSupport taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers(topic, singlePartitionMap(topic, 0, 2L, 1, 2L, 2, 2L)) + new SeekableStreamEndSequenceNumbers( + topic, + singlePartitionMap(topic, + 0, + 2L, + 1, + 2L, + 2, + 2L + ) + ) ) ).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); @@ -2455,7 +2722,17 @@ public class KafkaSupervisorTest extends EasyMockSupport taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers(topic, singlePartitionMap(topic, 0, 2L, 1, 2L, 2, 2L)) + new SeekableStreamEndSequenceNumbers( + topic, + singlePartitionMap(topic, + 0, + 2L, + 1, + 2L, + 2, + 2L + ) + ) ) ).anyTimes(); @@ -2484,7 +2761,8 @@ public class KafkaSupervisorTest extends EasyMockSupport public void testSupervisorIsIdleIfStreamInactiveWhenSuspended() throws Exception { Map config = ImmutableMap.of("idleConfig.enabled", "false", - "idleConfig.inactiveAfterMillis", "200"); + "idleConfig.inactiveAfterMillis", "200" + ); supervisorConfig = OBJECT_MAPPER.convertValue(config, SupervisorStateManagerConfig.class); supervisor = getTestableSupervisorForIdleBehaviour( 1, @@ -2504,7 +2782,17 @@ public class KafkaSupervisorTest extends EasyMockSupport taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers(topic, singlePartitionMap(topic, 0, 2L, 1, 2L, 2, 2L)) + new SeekableStreamEndSequenceNumbers( + topic, + singlePartitionMap(topic, + 0, + 2L, + 1, + 2L, + 2, + 2L + ) + ) ) ).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); @@ -2836,8 +3124,18 @@ public class KafkaSupervisorTest extends EasyMockSupport for (Task task : captured.getValues()) { KafkaIndexTaskIOConfig taskConfig = ((KafkaIndexTask) task).getIOConfig(); - Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0))); - Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2))); + Assert.assertEquals( + 0L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + ); + Assert.assertEquals( + 0L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + ); } } @@ -2930,8 +3228,18 @@ public class KafkaSupervisorTest extends EasyMockSupport for (Task task : captured.getValues()) { KafkaIndexTaskIOConfig taskConfig = ((KafkaIndexTask) task).getIOConfig(); - Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0))); - Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2))); + Assert.assertEquals( + 0L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + ); + Assert.assertEquals( + 0L, + (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + ); } } @@ -2972,7 +3280,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -2986,7 +3298,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id2", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3000,7 +3316,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id3", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3132,7 +3452,11 @@ public class KafkaSupervisorTest extends EasyMockSupport ); KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 1, 1000L, 2, 1000L), ImmutableSet.of()) + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 1, 1000L, 2, 1000L), + ImmutableSet.of() + ) ); KafkaDataSourceMetadata expectedMetadata = new KafkaDataSourceMetadata( @@ -3249,7 +3573,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3263,7 +3591,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id2", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3277,7 +3609,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id3", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3355,7 +3691,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3369,7 +3709,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id2", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3383,7 +3727,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id3", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3459,7 +3807,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( topic, singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3473,7 +3825,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id2", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( topic, singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3487,7 +3843,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id3", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( topic, singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3517,8 +3877,10 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); EasyMock.expect( - indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) - ).anyTimes(); + indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) + .andReturn(new KafkaDataSourceMetadata(null) + ) + .anyTimes(); EasyMock.expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(Status.READING)); EasyMock.expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(Status.READING)); EasyMock.expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(Status.READING)); @@ -3574,7 +3936,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( topic, singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3588,7 +3954,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id2", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( topic, singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3602,7 +3972,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id3", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( topic, singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3624,8 +3998,10 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); EasyMock.expect( - indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) - ).anyTimes(); + indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) + .andReturn(new KafkaDataSourceMetadata(null) + ) + .anyTimes(); replayAll(); @@ -3703,7 +4079,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id1", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3717,7 +4097,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id2", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3731,7 +4115,11 @@ public class KafkaSupervisorTest extends EasyMockSupport "id3", DATASOURCE, 0, - new SeekableStreamStartSequenceNumbers<>("topic", singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()), + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), + ImmutableSet.of() + ), new SeekableStreamEndSequenceNumbers<>( "topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE) @@ -3909,29 +4297,47 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals(topic, taskConfig.getStartSequenceNumbers().getStream()); Assert.assertEquals( 0L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( 0L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( 0L, - taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream()); Assert.assertEquals( Long.MAX_VALUE, - taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0)) + .longValue() ); Assert.assertEquals( Long.MAX_VALUE, - taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1)) + .longValue() ); Assert.assertEquals( Long.MAX_VALUE, - taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2)) + .longValue() ); } @@ -4147,7 +4553,10 @@ public class KafkaSupervisorTest extends EasyMockSupport singlePartitionMap(topic, 0, 0L, 2, 0L), ImmutableSet.of() ), - new SeekableStreamEndSequenceNumbers<>("topic", singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + new SeekableStreamEndSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), null, null, supervisor.getTuningConfig() @@ -4485,67 +4894,80 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.reset(taskClient); addSomeEvents(100); - KafkaIndexTask readingTask = createKafkaIndexTask("readingTask", - DATASOURCE, - 0, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 0L), Collections.emptySet()), - new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 0, Long.MAX_VALUE)), - null, - null, - supervisor.getTuningConfig() + KafkaIndexTask readingTask = createKafkaIndexTask( + "readingTask", + DATASOURCE, + 0, + new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 0L), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 0, Long.MAX_VALUE)), + null, + null, + supervisor.getTuningConfig() ); - KafkaIndexTask publishingTask = createKafkaIndexTask("publishingTask", - DATASOURCE, - 1, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 0L), Collections.emptySet()), - new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 0, Long.MAX_VALUE)), - null, - null, - supervisor.getTuningConfig() + KafkaIndexTask publishingTask = createKafkaIndexTask( + "publishingTask", + DATASOURCE, + 1, + new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 0L), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 0, Long.MAX_VALUE)), + null, + null, + supervisor.getTuningConfig() ); - KafkaIndexTask pausedTask = createKafkaIndexTask("pausedTask", - DATASOURCE, - 1, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 1, 0L), Collections.emptySet()), - new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 1, Long.MAX_VALUE)), - null, - null, - supervisor.getTuningConfig() + KafkaIndexTask pausedTask = createKafkaIndexTask( + "pausedTask", + DATASOURCE, + 1, + new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 1, 0L), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 1, Long.MAX_VALUE)), + null, + null, + supervisor.getTuningConfig() ); - KafkaIndexTask failsToResumePausedTask = createKafkaIndexTask("failsToResumePausedTask", - DATASOURCE, - 1, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 1, 0L), Collections.emptySet()), - new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 1, Long.MAX_VALUE)), - null, - null, - supervisor.getTuningConfig() + KafkaIndexTask failsToResumePausedTask = createKafkaIndexTask( + "failsToResumePausedTask", + DATASOURCE, + 1, + new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 1, 0L), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 1, Long.MAX_VALUE)), + null, + null, + supervisor.getTuningConfig() ); - KafkaIndexTask waitingTask = createKafkaIndexTask("waitingTask", - DATASOURCE, - 2, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 2, 0L), Collections.emptySet()), - new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 2, Long.MAX_VALUE)), - null, - null, - supervisor.getTuningConfig() + KafkaIndexTask waitingTask = createKafkaIndexTask( + "waitingTask", + DATASOURCE, + 2, + new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 2, 0L), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 2, Long.MAX_VALUE)), + null, + null, + supervisor.getTuningConfig() ); - KafkaIndexTask pendingTask = createKafkaIndexTask("pendingTask", - DATASOURCE, - 2, - new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 2, 0L), Collections.emptySet()), - new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 2, Long.MAX_VALUE)), - null, - null, - supervisor.getTuningConfig() + KafkaIndexTask pendingTask = createKafkaIndexTask( + "pendingTask", + DATASOURCE, + 2, + new SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 2, 0L), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 2, Long.MAX_VALUE)), + null, + null, + supervisor.getTuningConfig() ); - List tasks = ImmutableList.of(readingTask, publishingTask, pausedTask, failsToResumePausedTask, waitingTask, pendingTask); + List tasks = ImmutableList.of( + readingTask, + publishingTask, + pausedTask, + failsToResumePausedTask, + waitingTask, + pendingTask + ); Collection taskRunnerWorkItems = ImmutableList.of( new TestTaskRunnerWorkItem(readingTask, null, TaskLocation.create("testHost", 1001, -1)), new TestTaskRunnerWorkItem(publishingTask, null, TaskLocation.create("testHost", 1002, -1)), @@ -4641,7 +5063,8 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskClient.getStartTimeAsync(pausedTask.getId())).andReturn(Futures.immediateFuture(startTime)); EasyMock.expect(taskClient.resumeAsync(pausedTask.getId())).andReturn(Futures.immediateFuture(true)); - EasyMock.expect(taskClient.getStartTimeAsync(failsToResumePausedTask.getId())).andReturn(Futures.immediateFuture(startTime)); + EasyMock.expect(taskClient.getStartTimeAsync(failsToResumePausedTask.getId())) + .andReturn(Futures.immediateFuture(startTime)); EasyMock.expect(taskClient.resumeAsync(failsToResumePausedTask.getId())).andReturn(Futures.immediateFuture(false)); Capture shutdownTaskId = EasyMock.newCapture(); @@ -4856,11 +5279,10 @@ public class KafkaSupervisorTest extends EasyMockSupport public SeekableStreamIndexTaskClient build( String dataSource, TaskInfoProvider taskInfoProvider, - int maxNumTasks, - SeekableStreamSupervisorTuningConfig tuningConfig + SeekableStreamSupervisorTuningConfig tuningConfig, + ScheduledExecutorService connectExec ) { - Assert.assertEquals(replicas * taskCount, maxNumTasks); Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), tuningConfig.getHttpTimeout()); Assert.assertEquals(TEST_CHAT_RETRIES, (long) tuningConfig.getChatRetries()); return taskClient; @@ -4971,11 +5393,10 @@ public class KafkaSupervisorTest extends EasyMockSupport public SeekableStreamIndexTaskClient build( String dataSource, TaskInfoProvider taskInfoProvider, - int maxNumTasks, - SeekableStreamSupervisorTuningConfig tuningConfig + SeekableStreamSupervisorTuningConfig tuningConfig, + ScheduledExecutorService connectExec ) { - Assert.assertEquals(replicas * taskCount, maxNumTasks); Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), tuningConfig.getHttpTimeout()); Assert.assertEquals(TEST_CHAT_RETRIES, (long) tuningConfig.getChatRetries()); return taskClient; @@ -5089,11 +5510,10 @@ public class KafkaSupervisorTest extends EasyMockSupport public SeekableStreamIndexTaskClient build( String dataSource, TaskInfoProvider taskInfoProvider, - int maxNumTasks, - SeekableStreamSupervisorTuningConfig tuningConfig + SeekableStreamSupervisorTuningConfig tuningConfig, + ScheduledExecutorService connectExec ) { - Assert.assertEquals(replicas * taskCount, maxNumTasks); Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), tuningConfig.getHttpTimeout()); Assert.assertEquals(TEST_CHAT_RETRIES, (long) tuningConfig.getChatRetries()); return taskClient; @@ -5227,30 +5647,63 @@ public class KafkaSupervisorTest extends EasyMockSupport OBJECT_MAPPER ); } - + private static ImmutableMap singlePartitionMap(String topic, int partition, long offset) { return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition), offset); } - private static ImmutableMap singlePartitionMap(String topic, int partition1, long offset1, int partition2, long offset2) + private static ImmutableMap singlePartitionMap( + String topic, + int partition1, + long offset1, + int partition2, + long offset2 + ) { - return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1), offset1, new KafkaTopicPartition(false, topic, partition2), - offset2); + return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1), + offset1, + new KafkaTopicPartition(false, topic, partition2), + offset2 + ); } - private static ImmutableMap singlePartitionMap(String topic, int partition1, long offset1, - int partition2, long offset2, int partition3, long offset3) + private static ImmutableMap singlePartitionMap( + String topic, + int partition1, + long offset1, + int partition2, + long offset2, + int partition3, + long offset3 + ) { - return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1), offset1, new KafkaTopicPartition(false, topic, partition2), - offset2, new KafkaTopicPartition(false, topic, partition3), offset3); + return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1), + offset1, + new KafkaTopicPartition(false, topic, partition2), + offset2, + new KafkaTopicPartition(false, topic, partition3), + offset3 + ); } - private static ImmutableMap multiTopicPartitionMap(String topic, int partition1, long offset1, - int partition2, long offset2, int partition3, long offset3) + private static ImmutableMap multiTopicPartitionMap( + String topic, + int partition1, + long offset1, + int partition2, + long offset2, + int partition3, + long offset3 + ) { - return ImmutableMap.of(new KafkaTopicPartition(true, topic, partition1), offset1, new KafkaTopicPartition(true, topic, partition2), - offset2, new KafkaTopicPartition(true, topic, partition3), offset3); + return ImmutableMap.of(new KafkaTopicPartition(true, topic, partition1), + offset1, + new KafkaTopicPartition(true, topic, partition2), + offset2, + new KafkaTopicPartition(true, topic, partition3), + offset3 + ); } private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java index 2399008688c..599c32b2158 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java @@ -25,18 +25,18 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Json; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; -import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.java.util.http.client.HttpClient; @LazySingleton public class KinesisIndexTaskClientFactory extends SeekableStreamIndexTaskClientFactory { @Inject public KinesisIndexTaskClientFactory( - @EscalatedGlobal ServiceClientFactory serviceClientFactory, + @EscalatedGlobal HttpClient httpClient, @Json ObjectMapper mapper ) { - super(serviceClientFactory, mapper); + super(httpClient, mapper); } @Override diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index fe851a183a2..122a8e1c5ae 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -111,6 +111,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; public class KinesisSupervisorTest extends EasyMockSupport { @@ -5117,11 +5118,10 @@ public class KinesisSupervisorTest extends EasyMockSupport public SeekableStreamIndexTaskClient build( String dataSource, TaskInfoProvider taskInfoProvider, - int maxNumTasks, - SeekableStreamSupervisorTuningConfig tuningConfig + SeekableStreamSupervisorTuningConfig tuningConfig, + ScheduledExecutorService connectExec ) { - Assert.assertEquals(replicas * taskCount, maxNumTasks); Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), tuningConfig.getHttpTimeout()); Assert.assertEquals(TEST_CHAT_RETRIES, (long) tuningConfig.getChatRetries()); return taskClient; @@ -5259,14 +5259,10 @@ public class KinesisSupervisorTest extends EasyMockSupport public SeekableStreamIndexTaskClient build( String dataSource, TaskInfoProvider taskInfoProvider, - int maxNumTasks, - SeekableStreamSupervisorTuningConfig tuningConfig + SeekableStreamSupervisorTuningConfig tuningConfig, + ScheduledExecutorService connectExec ) { - Assert.assertEquals( - replicas * (autoScalerConfig != null ? autoScalerConfig.getTaskCountMax() : taskCount), - maxNumTasks - ); Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), tuningConfig.getHttpTimeout()); Assert.assertEquals(TEST_CHAT_RETRIES, (long) tuningConfig.getChatRetries()); return taskClient; @@ -5348,11 +5344,10 @@ public class KinesisSupervisorTest extends EasyMockSupport public SeekableStreamIndexTaskClient build( String dataSource, TaskInfoProvider taskInfoProvider, - int maxNumTasks, - SeekableStreamSupervisorTuningConfig tuningConfig + SeekableStreamSupervisorTuningConfig tuningConfig, + ScheduledExecutorService connectExec ) { - Assert.assertEquals(replicas * taskCount, maxNumTasks); Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), tuningConfig.getHttpTimeout()); Assert.assertEquals(TEST_CHAT_RETRIES, (long) tuningConfig.getChatRetries()); return taskClient; @@ -5436,11 +5431,10 @@ public class KinesisSupervisorTest extends EasyMockSupport public SeekableStreamIndexTaskClient build( String dataSource, TaskInfoProvider taskInfoProvider, - int maxNumTasks, - SeekableStreamSupervisorTuningConfig tuningConfig + SeekableStreamSupervisorTuningConfig tuningConfig, + ScheduledExecutorService connectExec ) { - Assert.assertEquals(replicas * taskCount, maxNumTasks); Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), tuningConfig.getHttpTimeout()); Assert.assertEquals(TEST_CHAT_RETRIES, (long) tuningConfig.getChatRetries()); return taskClient; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java index 5bdf8aaac39..c3b4e69f832 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java @@ -21,31 +21,46 @@ package org.apache.druid.indexing.seekablestream; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.common.TaskInfoProvider; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceClientFactoryImpl; + +import java.util.concurrent.ScheduledExecutorService; public abstract class SeekableStreamIndexTaskClientFactory { private static final Logger log = new Logger(SeekableStreamIndexTaskClientFactory.class); - private final ServiceClientFactory serviceClientFactory; + private final HttpClient httpClient; private final ObjectMapper jsonMapper; protected SeekableStreamIndexTaskClientFactory( - final ServiceClientFactory serviceClientFactory, + final HttpClient httpClient, final ObjectMapper jsonMapper ) { - this.serviceClientFactory = serviceClientFactory; + this.httpClient = httpClient; this.jsonMapper = jsonMapper; } + /** + * Creates a task client for a specific supervisor. + * + * @param dataSource task datasource + * @param taskInfoProvider task locator + * @param tuningConfig from {@link SeekableStreamSupervisor#tuningConfig} + * @param connectExec should generally be {@link SeekableStreamSupervisor#workerExec}. This is preferable to + * the global pool for the default {@link ServiceClientFactory}, to prevent callbacks from + * different supervisors from backlogging each other. + */ public SeekableStreamIndexTaskClient build( final String dataSource, final TaskInfoProvider taskInfoProvider, - final int maxNumTasks, - final SeekableStreamSupervisorTuningConfig tuningConfig + final SeekableStreamSupervisorTuningConfig tuningConfig, + final ScheduledExecutorService connectExec ) { log.info( @@ -57,7 +72,7 @@ public abstract class SeekableStreamIndexTaskClientFactory( dataSource, - serviceClientFactory, + new ServiceClientFactoryImpl(httpClient, connectExec), taskInfoProvider, jsonMapper, tuningConfig.getHttpTimeout(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 86c4ba385bd..4cbf0ccfa68 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -33,7 +33,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; @@ -65,7 +65,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientAsyncImpl; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; @@ -86,6 +85,7 @@ import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; @@ -837,12 +837,25 @@ public abstract class SeekableStreamSupervisor notices = new NoticesQueue<>(); private final Object stopLock = new Object(); private final Object stateChangeLock = new Object(); @@ -903,21 +916,16 @@ public abstract class SeekableStreamSupervisor