diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index 894d274675d..b5fe83706d7 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -157,8 +157,9 @@ The tuningConfig is optional and default parameters will be used if no tuningCon | `intermediateHandoffPeriod` | ISO8601 Period | How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier. | no (default == P2147483647D) | | `logParseExceptions` | Boolean | If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred. | no, default == false | | `maxParseExceptions` | Integer | The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set. | no, unlimited default | -| `maxSavedParseExceptions` | Integer | When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#reports). Overridden if `reportParseExceptions` is set. | no, default == 0 | -| `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1)) | no, default == 100 | +| `maxSavedParseExceptions` | Integer | When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#reports). Overridden if `reportParseExceptions` is set. | no, default == 0 | +| `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))` | no, default == 100 | +| `repartitionTransitionDuration` | ISO8601 Period | When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/incubator-druid/issues/7600. | no, (default == PT2M) | #### IndexSpec @@ -316,9 +317,9 @@ may cause some Kinesis messages to be skipped or to be read twice. `POST /druid/indexer/v1/supervisor//terminate` terminates a supervisor and causes all associated indexing tasks managed by this supervisor to immediately stop and begin publishing their segments. This supervisor will still exist in the metadata store and it's history may be retrieved -with the supervisor history api, but will not be listed in the 'get supervisors' api response nor can it's configuration +with the supervisor history API, but will not be listed in the 'get supervisors' API response nor can it's configuration or status report be retrieved. The only way this supervisor can start again is by submitting a functioning supervisor -spec to the create api. +spec to the create API. ### Capacity Planning diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 60406397b3c..e47ef2be7a4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -361,6 +361,12 @@ public class KafkaSupervisor extends SeekableStreamSupervisor return false; } + @Override + protected boolean isShardExpirationMarker(Long seqNum) + { + return false; + } + @Override protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 7e24835056c..5d5592afacf 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -140,6 +140,17 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig return shutdownTimeout; } + @Override + public Duration getRepartitionTransitionDuration() + { + // Stopping tasks early for Kafka ingestion on partition set change is not supported yet, + // just return a default for now. + return SeekableStreamSupervisorTuningConfig.defaultDuration( + null, + SeekableStreamSupervisorTuningConfig.DEFAULT_REPARTITION_TRANSITION_DURATION + ); + } + @JsonProperty public Duration getOffsetFetchPeriod() { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 759182feac2..2aec61040e3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -636,9 +636,11 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + // for simplicity in testing the offset availability check, we use negative stored offsets in metadata here, + // because the stream's earliest offset is 0, although that would not happen in real usage. EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()) + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, -10L, 1, -20L, 2, -30L), ImmutableSet.of()) ) ).anyTimes(); replayAll(); @@ -2041,10 +2043,12 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.reset(indexerMetadataStorageCoordinator); // unknown DataSourceMetadata in metadata store + // for simplicity in testing the offset availability check, we use negative stored offsets in metadata here, + // because the stream's earliest offset is 0, although that would not happen in real usage. EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)) .andReturn( new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(1, 100L, 2, 200L)) + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(1, -100L, 2, 200L)) ) ).times(4); // getOffsetFromStorageForPartition() throws an exception when the offsets are automatically reset. diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java index 6cce1444b99..4bd8fe8d487 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java @@ -42,6 +42,10 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber // to ingest data until taskDuration has elapsed or the task was stopped or paused or killed public static final String NO_END_SEQUENCE_NUMBER = "NO_END_SEQUENCE_NUMBER"; + // this special marker is used by the KinesisSupervisor to mark that a shard has been expired + // (i.e., closed and then the retention period has passed) + public static final String EXPIRED_MARKER = "EXPIRED"; + // this flag is used to indicate either END_OF_SHARD_MARKER // or NO_END_SEQUENCE_NUMBER so that they can be properly compared // with other sequence numbers diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index fc7c6ee4c40..8e2b6084b63 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; -import com.google.common.hash.HashCode; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import org.apache.druid.common.aws.AWSCredentialsConfig; @@ -60,7 +59,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.joda.time.DateTime; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -68,6 +66,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; /** @@ -234,13 +233,38 @@ public class KinesisSupervisor extends SeekableStreamSupervisor partitionIds.add(partitionId); } - return Math.abs(getHashIntFromShardId(partitionId) % spec.getIoConfig().getTaskCount()); + return getTaskGroupIdForPartitionWithProvidedList(partitionId, partitionIds); } - private int getHashIntFromShardId(String shardId) + private int getTaskGroupIdForPartitionWithProvidedList(String partitionId, List availablePartitions) { - HashCode hashCode = HASH_FUNCTION.hashString(shardId, StandardCharsets.UTF_8); - return hashCode.asInt(); + return availablePartitions.indexOf(partitionId) % spec.getIoConfig().getTaskCount(); + } + + @Override + protected Map> recomputePartitionGroupsForExpiration( + Set availablePartitions + ) + { + List availablePartitionsList = new ArrayList<>(availablePartitions); + + Map> newPartitionGroups = new HashMap<>(); + + for (ConcurrentHashMap oldGroup : partitionGroups.values()) { + for (Map.Entry partitionOffsetMapping : oldGroup.entrySet()) { + String partitionId = partitionOffsetMapping.getKey(); + if (availablePartitions.contains(partitionId)) { + int newTaskGroupId = getTaskGroupIdForPartitionWithProvidedList(partitionId, availablePartitionsList); + ConcurrentHashMap partitionMap = newPartitionGroups.computeIfAbsent( + newTaskGroupId, + k -> new ConcurrentHashMap<>() + ); + partitionMap.put(partitionId, partitionOffsetMapping.getValue()); + } + } + } + + return newPartitionGroups; } @Override @@ -330,6 +354,12 @@ public class KinesisSupervisor extends SeekableStreamSupervisor return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(seqNum); } + @Override + protected boolean isShardExpirationMarker(String seqNum) + { + return KinesisSequenceNumber.EXPIRED_MARKER.equals(seqNum); + } + @Override protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { @@ -359,12 +389,11 @@ public class KinesisSupervisor extends SeekableStreamSupervisor } @Override - protected KinesisDataSourceMetadata createDataSourceMetadataWithoutExpiredPartitions( - SeekableStreamDataSourceMetadata currentMetadata, - Set expiredPartitionIds + protected SeekableStreamDataSourceMetadata createDataSourceMetadataWithExpiredPartitions( + SeekableStreamDataSourceMetadata currentMetadata, Set expiredPartitionIds ) { - log.info("Cleaning up dead shards: " + expiredPartitionIds); + log.info("Marking expired shards in metadata: " + expiredPartitionIds); final KinesisDataSourceMetadata dataSourceMetadata = (KinesisDataSourceMetadata) currentMetadata; @@ -375,12 +404,16 @@ public class KinesisSupervisor extends SeekableStreamSupervisor for (Map.Entry entry : oldPartitionSequenceNumberMap.entrySet()) { if (!expiredPartitionIds.contains(entry.getKey())) { newPartitionSequenceNumberMap.put(entry.getKey(), entry.getValue()); + } else { + newPartitionSequenceNumberMap.put(entry.getKey(), KinesisSequenceNumber.EXPIRED_MARKER); } } - Set oldExclusiveStartPartitions; - Set newExclusiveStartPartitions = null; + SeekableStreamSequenceNumbers newSequences; if (old instanceof SeekableStreamStartSequenceNumbers) { + Set oldExclusiveStartPartitions; + Set newExclusiveStartPartitions; + newExclusiveStartPartitions = new HashSet<>(); oldExclusiveStartPartitions = ((SeekableStreamStartSequenceNumbers) old).getExclusivePartitions(); for (String partitionId : oldExclusiveStartPartitions) { @@ -388,10 +421,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor newExclusiveStartPartitions.add(partitionId); } } - } - SeekableStreamSequenceNumbers newSequences; - if (old instanceof SeekableStreamStartSequenceNumbers) { newSequences = new SeekableStreamStartSequenceNumbers( old.getStream(), null, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index e921fc98f8a..ef067980570 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -97,6 +97,7 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec null, null, null, + null, null ), ioConfig, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 4af88dc44a1..91da145b1ad 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -38,6 +38,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig private final Long chatRetries; private final Duration httpTimeout; private final Duration shutdownTimeout; + private final Duration repartitionTransitionDuration; public KinesisSupervisorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @@ -69,7 +70,8 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, - @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod + @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, + @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration ) { super( @@ -108,6 +110,10 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig shutdownTimeout, DEFAULT_SHUTDOWN_TIMEOUT ); + this.repartitionTransitionDuration = SeekableStreamSupervisorTuningConfig.defaultDuration( + repartitionTransitionDuration, + DEFAULT_REPARTITION_TRANSITION_DURATION + ); } @Override @@ -145,6 +151,12 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig return shutdownTimeout; } + @Override + public Duration getRepartitionTransitionDuration() + { + return repartitionTransitionDuration; + } + @Override public String toString() { @@ -177,6 +189,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + ", maxRecordsPerPoll=" + getMaxRecordsPerPoll() + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + + ", repartitionTransitionDuration=" + getRepartitionTransitionDuration() + '}'; } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 918cc2dd3b1..13cfd74c722 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -311,6 +311,7 @@ public class KinesisIndexTaskTuningConfigTest null, null, null, + null, null ); KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 6a8764aee78..6b87e4b1c52 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -120,11 +120,9 @@ public class KinesisSupervisorTest extends EasyMockSupport private static final String SHARD_ID0 = "shardId-000000000000"; private static final String SHARD_ID1 = "shardId-000000000001"; private static final String SHARD_ID2 = "shardId-000000000002"; - private static final String SHARD_ID3 = "shardId-000000000003"; private static final StreamPartition SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID0); private static final StreamPartition SHARD1_PARTITION = StreamPartition.of(STREAM, SHARD_ID1); private static final StreamPartition SHARD2_PARTITION = StreamPartition.of(STREAM, SHARD_ID2); - private static final StreamPartition SHARD3_PARTITION = StreamPartition.of(STREAM, SHARD_ID3); private static DataSchema dataSchema; private KinesisRecordSupplier supervisorRecordSupplier; @@ -194,6 +192,7 @@ public class KinesisSupervisorTest extends EasyMockSupport null, null, null, + null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -2460,8 +2459,8 @@ public class KinesisSupervisorTest extends EasyMockSupport .anyTimes(); supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("300").anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("400").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); @@ -2478,7 +2477,22 @@ public class KinesisSupervisorTest extends EasyMockSupport new KinesisDataSourceMetadata( new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "100", SHARD_ID2, "200")) ) - ).times(3); + ).times(2); + + // Since shard 2 was in metadata before but is not in the list of shards returned by the record supplier, + // it gets deleted from metadata (it is an expired shard) + EasyMock.expect( + indexerMetadataStorageCoordinator.resetDataSourceMetadata( + DATASOURCE, + new KinesisDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + ImmutableMap.of(SHARD_ID1, "100", SHARD_ID2, KinesisSequenceNumber.EXPIRED_MARKER) + ) + ) + ) + ).andReturn(true).times(1); + // getOffsetFromStorageForPartition() throws an exception when the offsets are automatically reset. // Since getOffsetFromStorageForPartition() is called per partition, all partitions can't be reset at the same time. // Instead, subsequent partitions will be reset in the following supervisor runs. @@ -2487,10 +2501,18 @@ public class KinesisSupervisorTest extends EasyMockSupport DATASOURCE, new KinesisDataSourceMetadata( // Only one partition is reset in a single supervisor run. - new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID2, "200")) + new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of()) ) ) - ).andReturn(true); + ).andReturn(true).times(1); + + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)) + .andReturn( + new KinesisDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "100")) + ) + ).times(2); + replayAll(); supervisor.start(); @@ -3718,6 +3740,7 @@ public class KinesisSupervisorTest extends EasyMockSupport null, null, 42, // This property is different from tuningConfig + null, null ); @@ -3885,8 +3908,8 @@ public class KinesisSupervisorTest extends EasyMockSupport SHARD_ID0, "0" )); - // there would be 4 tasks, 2 for each task group - EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean())) + // there would be 1 task, since there is only 1 shard + EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) .andReturn(Futures.immediateFuture(checkpoints1)) .times(1); @@ -3932,13 +3955,11 @@ public class KinesisSupervisorTest extends EasyMockSupport ) ).anyTimes(); - // Normally the split would result in 0 -> 1,2, but we use shard ID 3 instead to ensure that each - // task group gets one shard after the split (due to hashing behavior) EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) - .andReturn(ImmutableSet.of(SHARD_ID0, SHARD_ID1, SHARD_ID3)) + .andReturn(ImmutableSet.of(SHARD_ID0, SHARD_ID1, SHARD_ID2)) .anyTimes(); EasyMock.expect(supervisorRecordSupplier.getAssignment()) - .andReturn(ImmutableSet.of(SHARD0_PARTITION, SHARD1_PARTITION, SHARD3_PARTITION)) + .andReturn(ImmutableSet.of(SHARD0_PARTITION, SHARD1_PARTITION, SHARD2_PARTITION)) .anyTimes(); supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); @@ -3948,7 +3969,7 @@ public class KinesisSupervisorTest extends EasyMockSupport .andReturn(KinesisSequenceNumber.END_OF_SHARD_MARKER).anyTimes(); EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID1))) .andReturn("100").anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID3))) + EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2))) .andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); @@ -3989,12 +4010,12 @@ public class KinesisSupervisorTest extends EasyMockSupport .anyTimes(); TreeMap> checkpointsGroup0 = new TreeMap<>(); checkpointsGroup0.put(0, ImmutableMap.of( - SHARD_ID1, "0", + SHARD_ID2, "0", SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER )); TreeMap> checkpointsGroup1 = new TreeMap<>(); checkpointsGroup1.put(1, ImmutableMap.of( - SHARD_ID3, "0" + SHARD_ID1, "0" )); // there would be 2 tasks, 1 for each task group EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) @@ -4023,7 +4044,7 @@ public class KinesisSupervisorTest extends EasyMockSupport new SeekableStreamStartSequenceNumbers<>( STREAM, ImmutableMap.of( - SHARD_ID1, "0" + SHARD_ID2, "0" ), ImmutableSet.of() ); @@ -4032,7 +4053,7 @@ public class KinesisSupervisorTest extends EasyMockSupport new SeekableStreamEndSequenceNumbers<>( STREAM, ImmutableMap.of( - SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER + SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER ) ); @@ -4040,7 +4061,7 @@ public class KinesisSupervisorTest extends EasyMockSupport new SeekableStreamStartSequenceNumbers<>( STREAM, ImmutableMap.of( - SHARD_ID3, "0" + SHARD_ID1, "0" ), ImmutableSet.of() ); @@ -4049,7 +4070,7 @@ public class KinesisSupervisorTest extends EasyMockSupport new SeekableStreamEndSequenceNumbers<>( STREAM, ImmutableMap.of( - SHARD_ID3, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER + SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER ) ); @@ -4090,7 +4111,7 @@ public class KinesisSupervisorTest extends EasyMockSupport ImmutableMap.of( SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER, SHARD_ID1, "100", - SHARD_ID3, "100" + SHARD_ID2, "100" ) ) ) @@ -4103,8 +4124,9 @@ public class KinesisSupervisorTest extends EasyMockSupport new SeekableStreamEndSequenceNumbers( STREAM, ImmutableMap.of( + SHARD_ID0, KinesisSequenceNumber.EXPIRED_MARKER, SHARD_ID1, "100", - SHARD_ID3, "100" + SHARD_ID2, "100" ) ) ) @@ -4112,10 +4134,10 @@ public class KinesisSupervisorTest extends EasyMockSupport ).andReturn(true).anyTimes(); EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) - .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID3)) + .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID2)) .anyTimes(); EasyMock.expect(supervisorRecordSupplier.getAssignment()) - .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD3_PARTITION)) + .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD2_PARTITION)) .anyTimes(); supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); @@ -4123,7 +4145,7 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID1))) .andReturn("200").anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID3))) + EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2))) .andReturn("200").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); @@ -4171,11 +4193,11 @@ public class KinesisSupervisorTest extends EasyMockSupport .anyTimes(); TreeMap> checkpointsGroup0 = new TreeMap<>(); checkpointsGroup0.put(0, ImmutableMap.of( - SHARD_ID1, "100" + SHARD_ID2, "100" )); TreeMap> checkpointsGroup1 = new TreeMap<>(); checkpointsGroup1.put(1, ImmutableMap.of( - SHARD_ID3, "100" + SHARD_ID1, "100" )); // there would be 2 tasks, 1 for each task group EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean())) @@ -4222,16 +4244,16 @@ public class KinesisSupervisorTest extends EasyMockSupport new SeekableStreamStartSequenceNumbers<>( STREAM, ImmutableMap.of( - SHARD_ID3, "100" + SHARD_ID2, "100" ), - ImmutableSet.of(SHARD_ID3) + ImmutableSet.of(SHARD_ID2) ); SeekableStreamEndSequenceNumbers group1ExpectedEndSequenceNumbers = new SeekableStreamEndSequenceNumbers<>( STREAM, ImmutableMap.of( - SHARD_ID3, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER + SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER ) ); @@ -4247,7 +4269,7 @@ public class KinesisSupervisorTest extends EasyMockSupport Map> expectedPartitionGroups = ImmutableMap.of( 0, ImmutableMap.of(SHARD_ID1, "-1"), - 1, ImmutableMap.of(SHARD_ID3, "-1") + 1, ImmutableMap.of(SHARD_ID2, "-1") ); Assert.assertEquals(expectedPartitionGroups, supervisor.getPartitionGroups()); } @@ -4530,6 +4552,8 @@ public class KinesisSupervisorTest extends EasyMockSupport new SeekableStreamEndSequenceNumbers( STREAM, ImmutableMap.of( + SHARD_ID0, KinesisSequenceNumber.EXPIRED_MARKER, + SHARD_ID1, KinesisSequenceNumber.EXPIRED_MARKER, SHARD_ID2, "100" ) ) @@ -4748,6 +4772,7 @@ public class KinesisSupervisorTest extends EasyMockSupport null, null, null, + null, null ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java index 45151b51bdb..d50321043ce 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java @@ -71,6 +71,7 @@ public class KinesisSupervisorTuningConfigTest Assert.assertEquals(8L, (long) config.getChatRetries()); Assert.assertEquals(Duration.standardSeconds(10), config.getHttpTimeout()); Assert.assertEquals(Duration.standardSeconds(80), config.getShutdownTimeout()); + Assert.assertEquals(Duration.standardSeconds(120), config.getRepartitionTransitionDuration()); } @Test @@ -90,7 +91,8 @@ public class KinesisSupervisorTuningConfigTest + " \"chatThreads\": 13,\n" + " \"chatRetries\": 14,\n" + " \"httpTimeout\": \"PT15S\",\n" - + " \"shutdownTimeout\": \"PT95S\"\n" + + " \"shutdownTimeout\": \"PT95S\",\n" + + " \"repartitionTransitionDuration\": \"PT500S\"\n" + "}"; KinesisSupervisorTuningConfig config = (KinesisSupervisorTuningConfig) mapper.readValue( @@ -116,5 +118,6 @@ public class KinesisSupervisorTuningConfigTest Assert.assertEquals(14L, (long) config.getChatRetries()); Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout()); Assert.assertEquals(Duration.standardSeconds(95), config.getShutdownTimeout()); + Assert.assertEquals(Duration.standardSeconds(500), config.getRepartitionTransitionDuration()); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index c25807f6c02..5b1fad9a3c4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -515,6 +516,7 @@ public abstract class SeekableStreamSupervisor recordSupplier; private volatile boolean started = false; private volatile boolean stopped = false; @@ -1852,46 +1854,13 @@ public abstract class SeekableStreamSupervisor createDataSourceMetadataWithoutExpiredPartitions( - SeekableStreamDataSourceMetadata currentMetadata, - Set expiredPartitionIds - ) - { - throw new UnsupportedOperationException("This supervisor type does not support partition expiration."); - } - - /** - * Removes a set of expired partition IDs from partitionIds and partitionGroups. This is called after - * successfully removing expired partitions from metadata, for supervisor types that support partition expiration. - * - * @param expiredPartitionIds Set of expired partition IDs. - */ - private void removeExpiredPartitionsFromMemory(Set expiredPartitionIds) - { - partitionIds.removeAll(expiredPartitionIds); - - for (ConcurrentHashMap partitionGroup : partitionGroups.values()) { - for (PartitionIdType expiredShard : expiredPartitionIds) { - partitionGroup.remove(expiredShard); - } - } - } - private boolean updatePartitionDataFromStream() { - Set partitionIds; + List previousPartitionIds = new ArrayList<>(partitionIds); + Set partitionIdsFromSupplier; try { synchronized (recordSupplierLock) { - partitionIds = recordSupplier.getPartitionIds(ioConfig.getStream()); + partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream()); } } catch (Exception e) { @@ -1901,24 +1870,55 @@ public abstract class SeekableStreamSupervisor closedPartitions = getOffsetsFromMetadataStorage() + Map storedMetadata = getOffsetsFromMetadataStorage(); + Set storedPartitions = storedMetadata.keySet(); + Set closedPartitions = storedMetadata .entrySet() .stream() .filter(x -> isEndOfShard(x.getValue())) .map(Entry::getKey) .collect(Collectors.toSet()); + Set previouslyExpiredPartitions = storedMetadata + .entrySet() + .stream() + .filter(x -> isShardExpirationMarker(x.getValue())) + .map(Entry::getKey) + .collect(Collectors.toSet()); + + Set partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions = Sets.difference( + partitionIdsFromSupplier, + previouslyExpiredPartitions + ); + + if (partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions.size() != partitionIdsFromSupplier.size()) { + // this should never happen, but we check for it and exclude the expired partitions if they somehow reappear + log.warn( + "Previously expired partitions [%s] were present in the current list [%s] from the record supplier.", + previouslyExpiredPartitions, + partitionIdsFromSupplier + ); + } + if (partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions.size() == 0) { + String errMsg = StringUtils.format( + "No partitions found for stream [%s] after removing previously expired partitions", + ioConfig.getStream() + ); + stateManager.recordThrowableEvent(new StreamException(new ISE(errMsg))); + log.warn(errMsg); + return false; + } boolean initialPartitionDiscovery = this.partitionIds.isEmpty(); - for (PartitionIdType partitionId : partitionIds) { + for (PartitionIdType partitionId : partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions) { if (closedPartitions.contains(partitionId)) { log.info("partition [%s] is closed and has no more data, skipping.", partitionId); continue; @@ -1926,16 +1926,26 @@ public abstract class SeekableStreamSupervisor partitionMap = partitionGroups.computeIfAbsent( taskGroupId, k -> new ConcurrentHashMap<>() ); - if (partitionMap.putIfAbsent(partitionId, getNotSetMarker()) == null) { log.info( "New partition [%s] discovered for stream [%s], added to task group [%d]", @@ -1946,35 +1956,26 @@ public abstract class SeekableStreamSupervisor expiredPartitions = new HashSet<>(); - for (PartitionIdType partitionTd : closedPartitions) { - if (!partitionIds.contains(partitionTd)) { - expiredPartitions.add(partitionTd); - } - } - - if (expiredPartitions.size() > 0) { - @SuppressWarnings("unchecked") - SeekableStreamDataSourceMetadata currentMetadata = - (SeekableStreamDataSourceMetadata) indexerMetadataStorageCoordinator.getDataSourceMetadata( - dataSource); - SeekableStreamDataSourceMetadata cleanedMetadata = - createDataSourceMetadataWithoutExpiredPartitions(currentMetadata, expiredPartitions); - - validateMetadataPartitionExpiration(currentMetadata, cleanedMetadata); - - try { - boolean success = indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, cleanedMetadata); - if (success) { - removeExpiredPartitionsFromMemory(expiredPartitions); - } else { - log.error("Failed to update datasource metadata[%s] with expired partitions removed", cleanedMetadata); - } - } - catch (IOException ioe) { - throw new RuntimeException(ioe); + if (!partitionIds.equals(previousPartitionIds)) { + // the set of partition IDs has changed, have any running tasks stop early so that we can adjust to the + // repartitioning quickly by creating new tasks + for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { + if (!taskGroup.taskIds().isEmpty()) { + // Partitions have changed and we are managing active tasks - set an early publish time + // at the current time + repartitionTransitionDuration. + // This allows time for the stream to start writing to the new partitions after repartitioning. + // For Kinesis ingestion, this cooldown time is particularly useful, lowering the possibility of + // the new shards being empty, which can cause issues presently + // (see https://github.com/apache/incubator-druid/issues/7600) + earlyStopTime = DateTimes.nowUtc().plus(tuningConfig.getRepartitionTransitionDuration()); + log.info( + "Previous partition set [%s] has changed to [%s] - requesting that tasks stop after [%s] at [%s]", + previousPartitionIds, + partitionIds, + tuningConfig.getRepartitionTransitionDuration(), + earlyStopTime + ); + break; } } } @@ -1982,17 +1983,130 @@ public abstract class SeekableStreamSupervisortask group mappings, updating + * the metadata, the partitionIds list, and the partitionGroups mappings. + * + * Note that partition IDs that were newly discovered (appears in record supplier set but not in metadata set) + * are not added to the recomputed partition groups here. This is handled later in + * {@link #updatePartitionDataFromStream} after this method is called. + * + * @param storedPartitions Set of partitions previously tracked, from the metadata store + * @param partitionIdsFromSupplier Set of partitions currently returned by the record supplier. + */ + private void cleanupExpiredPartitions( + Set storedPartitions, + Set previouslyExpiredPartitions, + Set partitionIdsFromSupplier + ) + { + // If a partition was previously known (stored in metadata) but no longer appears in the list of partitions + // provided by the record supplier, it has expired. + Set newlyExpiredPartitions = Sets.difference(storedPartitions, previouslyExpiredPartitions); + newlyExpiredPartitions = Sets.difference(newlyExpiredPartitions, partitionIdsFromSupplier); + + if (newlyExpiredPartitions.size() > 0) { + log.info("Detected newly expired partitions: " + newlyExpiredPartitions); + + // Mark partitions as expired in metadata + @SuppressWarnings("unchecked") + SeekableStreamDataSourceMetadata currentMetadata = + (SeekableStreamDataSourceMetadata) indexerMetadataStorageCoordinator.getDataSourceMetadata( + dataSource); + + SeekableStreamDataSourceMetadata cleanedMetadata = + createDataSourceMetadataWithExpiredPartitions(currentMetadata, newlyExpiredPartitions); + + log.info("New metadata after partition expiration: " + cleanedMetadata); + + validateMetadataPartitionExpiration(newlyExpiredPartitions, currentMetadata, cleanedMetadata); + + // Compute new partition groups, only including partitions that are + // still in partitionIdsFromSupplier + Map> newPartitionGroups = + recomputePartitionGroupsForExpiration(partitionIdsFromSupplier); + + validatePartitionGroupReassignments(newPartitionGroups); + + log.info("New partition groups after partition expiration: " + newPartitionGroups); + + try { + boolean success = indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, cleanedMetadata); + if (success) { + partitionIds.clear(); + partitionIds.addAll(partitionIdsFromSupplier); + + for (Integer groupId : partitionGroups.keySet()) { + if (newPartitionGroups.containsKey(groupId)) { + partitionGroups.put(groupId, newPartitionGroups.get(groupId)); + } else { + partitionGroups.put(groupId, new ConcurrentHashMap<>()); + } + } + } else { + log.error("Failed to update datasource metadata[%s] with expired partitions removed", cleanedMetadata); + } + } + catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + } + + /** + * When partitions are removed due to expiration it may be necessary to recompute the partitionID -> groupID + * mappings to ensure balanced distribution of partitions. + * + * This function should return a copy of partitionGroups, using the provided availablePartitions as the list of + * active partitions, reassigning partitions to different groups if necessary. + * + * If a partition is not in availablePartitions, it should be filtered out of the new partition groups returned + * by this method. + * + * @param availablePartitions + * @return a remapped copy of partitionGroups, containing only the partitions in availablePartitions + */ + protected Map> recomputePartitionGroupsForExpiration( + Set availablePartitions + ) + { + throw new UnsupportedOperationException("This supervisor type does not support partition expiration."); + } + + /** + * + * Some seekable stream systems such as Kinesis allow partitions to expire. When this occurs, the supervisor should + * mark the expired partitions in the saved metadata. This method returns a copy of the current metadata + * with any expired partitions marked with an implementation-specific offset value that represents the expired state. + * + * @param currentMetadata The current DataSourceMetadata from metadata storage + * @param expiredPartitionIds The set of expired partition IDs. + * @return currentMetadata but with any expired partitions removed. + */ + protected SeekableStreamDataSourceMetadata createDataSourceMetadataWithExpiredPartitions( + SeekableStreamDataSourceMetadata currentMetadata, + Set expiredPartitionIds + ) + { + throw new UnsupportedOperationException("This supervisor type does not support partition expiration."); + } + /** * Perform a sanity check on the datasource metadata returned by - * {@link #createDataSourceMetadataWithoutExpiredPartitions}. + * {@link #createDataSourceMetadataWithExpiredPartitions}. * * Specifically, we check that the cleaned metadata's partitions are a subset of the original metadata's partitions, - * and that none of the offsets for the non-expired partitions have changed. + * that newly expired partitions are marked as expired, and that none of the offsets for the non-expired partitions + * have changed. * * @param oldMetadata metadata containing expired partitions. * @param cleanedMetadata new metadata without expired partitions, generated by the subclass */ private void validateMetadataPartitionExpiration( + Set newlyExpiredPartitions, SeekableStreamDataSourceMetadata oldMetadata, SeekableStreamDataSourceMetadata cleanedMetadata ) @@ -2015,7 +2129,18 @@ public abstract class SeekableStreamSupervisor> newPartitionGroups + ) + { + Map oldPartitionMappings = new HashMap<>(); + for (ConcurrentHashMap oldGroup : partitionGroups.values()) { + // we don't care about old task group mappings, only the partition-offset mappings + oldPartitionMappings.putAll(oldGroup); + } + + for (ConcurrentHashMap newGroup : newPartitionGroups.values()) { + for (Entry newPartitionMapping : newGroup.entrySet()) { + if (!oldPartitionMappings.containsKey(newPartitionMapping.getKey())) { + // recomputing the groups without the expired partitions added an unknown partition somehow + throw new IAE( + "Recomputed partition groups [%s] contains unexpected partition ID [%s], old partition groups: [%s]", + newPartitionGroups, + newPartitionMapping.getKey(), + partitionGroups + ); + } + + SequenceOffsetType oldOffset = oldPartitionMappings.get(newPartitionMapping.getKey()); + if (!oldOffset.equals(newPartitionMapping.getValue())) { + throw new IAE( + "Recomputed partition groups [%s] has offset mismatch for partition ID [%s], original partition map: [%s]", + newPartitionGroups, + newPartitionMapping.getKey(), + partitionGroups + ); + } + } + } + } + private void updateTaskStatus() throws ExecutionException, InterruptedException, TimeoutException { final List> futures = new ArrayList<>(); @@ -2110,8 +2279,18 @@ public abstract class SeekableStreamSupervisor> filterExpiredPartitionsFromStartingOffsets( Map> startingOffsets @@ -2688,7 +2867,7 @@ public abstract class SeekableStreamSupervisor= 0); + final SequenceOffsetType earliestOffset = getOffsetFromStreamForPartition(partition, true); + return earliestOffset != null + && makeSequenceNumber(earliestOffset).compareTo(makeSequenceNumber(offsetFromMetadata)) <= 0; } - /** * a special sequence number that is used to indicate that the sequence offset * for a particular partition has not yet been calculated by the supervisor. When @@ -3151,10 +3326,15 @@ public abstract class SeekableStreamSupervisor