diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 0859665d600..59df3058e20 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.kafka; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; @@ -60,10 +61,20 @@ public class KafkaRecordSupplier implements RecordSupplier Map consumerProperties, ObjectMapper sortingMapper ) + { + this(consumerProperties, sortingMapper, getKafkaConsumer(sortingMapper, consumerProperties)); + } + + @VisibleForTesting + public KafkaRecordSupplier( + Map consumerProperties, + ObjectMapper sortingMapper, + KafkaConsumer consumer + ) { this.consumerProperties = consumerProperties; this.sortingMapper = sortingMapper; - this.consumer = getKafkaConsumer(); + this.consumer = consumer; } @Override @@ -200,20 +211,25 @@ public class KafkaRecordSupplier implements RecordSupplier } } } - - private Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey) + + private static Deserializer getKafkaDeserializer(Properties properties, String kafkaConfigKey) { Deserializer deserializerObject; try { - Class deserializerClass = Class.forName(properties.getProperty(kafkaConfigKey, ByteArrayDeserializer.class.getTypeName())); + Class deserializerClass = Class.forName(properties.getProperty( + kafkaConfigKey, + ByteArrayDeserializer.class.getTypeName() + )); Method deserializerMethod = deserializerClass.getMethod("deserialize", String.class, byte[].class); - + Type deserializerReturnType = deserializerMethod.getGenericReturnType(); - + if (deserializerReturnType == byte[].class) { deserializerObject = (Deserializer) deserializerClass.getConstructor().newInstance(); } else { - throw new IllegalArgumentException("Kafka deserializers must return a byte array (byte[]), " + deserializerClass.getName() + " returns " + deserializerReturnType.getTypeName()); + throw new IllegalArgumentException("Kafka deserializers must return a byte array (byte[]), " + + deserializerClass.getName() + " returns " + + deserializerReturnType.getTypeName()); } } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { @@ -221,8 +237,8 @@ public class KafkaRecordSupplier implements RecordSupplier } return deserializerObject; } - - private KafkaConsumer getKafkaConsumer() + + private static KafkaConsumer getKafkaConsumer(ObjectMapper sortingMapper, Map consumerProperties) { final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Properties props = new Properties(); @@ -231,10 +247,10 @@ public class KafkaRecordSupplier implements RecordSupplier ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); try { - Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + Thread.currentThread().setContextClassLoader(KafkaRecordSupplier.class.getClassLoader()); Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer"); Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer"); - + return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject); } finally { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index e47ef2be7a4..21e845f7187 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -156,9 +156,13 @@ public class KafkaSupervisor extends SeekableStreamSupervisor @Override - protected int getTaskGroupIdForPartition(Integer partition) + protected int getTaskGroupIdForPartition(Integer partitionId) { - return partition % spec.getIoConfig().getTaskCount(); + // record partitionIds so that supervisor knows when a partition is discovered. + if (!partitionIds.contains(partitionId)) { + partitionIds.add(partitionId); + } + return partitionId % spec.getIoConfig().getTaskCount(); } @Override diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 89ec2b6541c..0662e6e4ef4 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import kafka.admin.AdminUtils; +import kafka.admin.BrokerMetadata; import kafka.admin.RackAwareMode; import kafka.utils.ZkUtils; import org.apache.curator.test.TestingCluster; @@ -51,6 +52,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskClient; import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; import org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig; import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig; +import org.apache.druid.indexing.kafka.KafkaRecordSupplier; import org.apache.druid.indexing.kafka.test.TestBroker; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -67,6 +69,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData; import org.apache.druid.java.util.common.DateTimes; @@ -86,9 +89,12 @@ import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppendera import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; @@ -105,6 +111,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import scala.Option; +import scala.collection.Seq; import java.io.File; import java.io.IOException; @@ -585,6 +593,103 @@ public class KafkaSupervisorTest extends EasyMockSupport ); } + /** + * Test if partitionIds get updated + */ + @Test + public void testPartitionIdsUpdates() throws Exception + { + supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null); + addSomeEvents(1100); + + Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + Assert.assertFalse(supervisor.isPartitionIdsEmpty()); + } + + + @Test + public void testAlwaysUsesEarliestOffsetForNewlyDiscoveredPartitions() throws Exception + { + supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null); + addSomeEvents(9); + + Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); + replayAll(); + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + KafkaIndexTask task = captured.getValue(); + Assert.assertEquals( + 10, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0).longValue() + ); + Assert.assertEquals( + 10, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1).longValue() + ); + Assert.assertEquals( + 10, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2).longValue() + ); + + addMoreEvents(9, 6); + EasyMock.reset(taskQueue, taskStorage); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + Capture tmp = Capture.newInstance(); + EasyMock.expect(taskQueue.add(EasyMock.capture(tmp))).andReturn(true); + EasyMock.replay(taskStorage, taskQueue); + supervisor.runInternal(); + verifyAll(); + + EasyMock.reset(taskQueue, taskStorage); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + Capture newcaptured = Capture.newInstance(); + EasyMock.expect(taskQueue.add(EasyMock.capture(newcaptured))).andReturn(true); + EasyMock.replay(taskStorage, taskQueue); + supervisor.runInternal(); + verifyAll(); + + //check if start from earliest offset + task = newcaptured.getValue(); + Assert.assertEquals( + 0, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(3).longValue() + ); + Assert.assertEquals( + 0, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(4).longValue() + ); + Assert.assertEquals( + 0, + task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(5).longValue() + ); + } + /** * Test generating the starting offsets from the partition data stored in druid_dataSource which contains the * offsets of the last built segments. @@ -640,7 +745,11 @@ public class KafkaSupervisorTest extends EasyMockSupport // because the stream's earliest offset is 0, although that would not happen in real usage. EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, -10L, 1, -20L, 2, -30L), ImmutableSet.of()) + new SeekableStreamStartSequenceNumbers<>( + topic, + ImmutableMap.of(0, -10L, 1, -20L, 2, -30L), + ImmutableSet.of() + ) ) ).anyTimes(); replayAll(); @@ -973,7 +1082,9 @@ public class KafkaSupervisorTest extends EasyMockSupport .andReturn(Futures.immediateFuture(checkpoints)) .times(1); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(captured.getValue())) + .anyTimes(); EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId())) .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId()))); EasyMock.expect(taskStorage.getStatus(runningTaskId)) @@ -1477,7 +1588,9 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); @@ -1865,7 +1978,9 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2130,7 +2245,9 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2228,7 +2345,9 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2338,7 +2457,9 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2445,7 +2566,9 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2576,7 +2699,9 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3135,6 +3260,46 @@ public class KafkaSupervisorTest extends EasyMockSupport } } + private void addMoreEvents(int numEventsPerPartition, int num_partitions) throws Exception + { + Seq brokerList = AdminUtils.getBrokerMetadatas( + zkUtils, + RackAwareMode.Enforced$.MODULE$, + Option.apply(zkUtils.getSortedBrokerList()) + ); + scala.collection.Map> replicaAssignment = AdminUtils.assignReplicasToBrokers( + brokerList, + num_partitions, + 1, 0, 0 + ); + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK( + zkUtils, + topic, + replicaAssignment, + new Properties(), + true + ); + + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (int i = NUM_PARTITIONS; i < num_partitions; i++) { + for (int j = 0; j < numEventsPerPartition; j++) { + kafkaProducer.send( + new ProducerRecord<>( + topic, + i, + null, + StringUtils.toUtf8(StringUtils.format("event-%d", j)) + ) + ).get(); + } + } + kafkaProducer.commitTransaction(); + } + } + + private TestableKafkaSupervisor getTestableSupervisor( int replicas, int taskCount, @@ -3652,6 +3817,8 @@ public class KafkaSupervisorTest extends EasyMockSupport private static class TestableKafkaSupervisor extends KafkaSupervisor { + private final Map consumerProperties; + public TestableKafkaSupervisor( TaskStorage taskStorage, TaskMaster taskMaster, @@ -3671,6 +3838,24 @@ public class KafkaSupervisorTest extends EasyMockSupport spec, rowIngestionMetersFactory ); + this.consumerProperties = spec.getIoConfig().getConsumerProperties(); + } + + @Override + protected RecordSupplier setupRecordSupplier() + { + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); + consumerConfigs.put("metadata.max.age.ms", "1"); + final Properties props = new Properties(); + KafkaRecordSupplier.addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); + props.putAll(consumerConfigs); + Deserializer keyDeserializerObject = new ByteArrayDeserializer(); + Deserializer valueDeserializerObject = new ByteArrayDeserializer(); + return new KafkaRecordSupplier( + consumerProperties, + sortingMapper, + new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject) + ); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index b6d9ccc0980..17b75c887ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -511,7 +511,10 @@ public abstract class SeekableStreamSupervisortaskId->task row stats - * * @throws InterruptedException * @throws ExecutionException * @throws TimeoutException @@ -1885,7 +1887,6 @@ public abstract class SeekableStreamSupervisor * It will mark the expired partitions in metadata and recompute the partition->task group mappings, updating * the metadata, the partitionIds list, and the partitionGroups mappings. - * + *

* Note that partition IDs that were newly discovered (appears in record supplier set but not in metadata set) * are not added to the recomputed partition groups here. This is handled later in * {@link #updatePartitionDataFromStream} after this method is called. * - * @param storedPartitions Set of partitions previously tracked, from the metadata store + * @param storedPartitions Set of partitions previously tracked, from the metadata store * @param partitionIdsFromSupplier Set of partitions currently returned by the record supplier. */ private void cleanupExpiredPartitions( @@ -2021,10 +2022,10 @@ public abstract class SeekableStreamSupervisor groupID * mappings to ensure balanced distribution of partitions. - * + *

* This function should return a copy of partitionGroups, using the provided availablePartitions as the list of * active partitions, reassigning partitions to different groups if necessary. - * + *

* If a partition is not in availablePartitions, it should be filtered out of the new partition groups returned * by this method. * @@ -2039,12 +2040,11 @@ public abstract class SeekableStreamSupervisor * Specifically, we check that the cleaned metadata's partitions are a subset of the original metadata's partitions, * that newly expired partitions are marked as expired, and that none of the offsets for the non-expired partitions * have changed. * - * @param oldMetadata metadata containing expired partitions. + * @param oldMetadata metadata containing expired partitions. * @param cleanedMetadata new metadata without expired partitions, generated by the subclass */ private void validateMetadataPartitionExpiration( @@ -2074,10 +2074,10 @@ public abstract class SeekableStreamSupervisor oldPartitionSeqNos = oldMetadata.getSeekableStreamSequenceNumbers() - .getPartitionSequenceNumberMap(); + .getPartitionSequenceNumberMap(); Map cleanedPartitionSeqNos = cleanedMetadata.getSeekableStreamSequenceNumbers() - .getPartitionSequenceNumberMap(); + .getPartitionSequenceNumberMap(); for (Entry cleanedPartitionSeqNo : cleanedPartitionSeqNos.entrySet()) { if (!oldPartitionSeqNos.containsKey(cleanedPartitionSeqNo.getKey())) { @@ -2116,7 +2116,7 @@ public abstract class SeekableStreamSupervisor * Specifically, we check that the new partition groups' partitions are a subset of the original groups' partitions, * and that none of the offsets for the non-expired partitions have changed. * @@ -3134,6 +3134,12 @@ public abstract class SeekableStreamSupervisor> createIndexTasks( @@ -3173,7 +3178,6 @@ public abstract class SeekableStreamSupervisor sequence - * * @return specific instance of datasource metadata */ protected abstract SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(