More Kinesis resharding adjustments (#8671)

* More Kinesis resharding adjustments

* Fix TC inspection

* Fix comment'

* Adjust comment, small refactor

* Make repartition transition time configurable

* Add spellcheck exclusion

* Spelling fix
This commit is contained in:
Jonathan Wei 2019-10-15 23:19:17 -07:00 committed by GitHub
parent 4046c86d62
commit 89ce6384f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 448 additions and 144 deletions

View File

@ -157,8 +157,9 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
| `intermediateHandoffPeriod` | ISO8601 Period | How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier. | no (default == P2147483647D) |
| `logParseExceptions` | Boolean | If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred. | no, default == false |
| `maxParseExceptions` | Integer | The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set. | no, unlimited default |
| `maxSavedParseExceptions` | Integer | When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#reports). Overridden if `reportParseExceptions` is set. | no, default == 0 |
| `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1)) | no, default == 100 |
| `maxSavedParseExceptions` | Integer | When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#reports). Overridden if `reportParseExceptions` is set. | no, default == 0 |
| `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))` | no, default == 100 |
| `repartitionTransitionDuration` | ISO8601 Period | When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/incubator-druid/issues/7600. | no, (default == PT2M) |
#### IndexSpec
@ -316,9 +317,9 @@ may cause some Kinesis messages to be skipped or to be read twice.
`POST /druid/indexer/v1/supervisor/<supervisorId>/terminate` terminates a supervisor and causes all associated indexing
tasks managed by this supervisor to immediately stop and begin
publishing their segments. This supervisor will still exist in the metadata store and it's history may be retrieved
with the supervisor history api, but will not be listed in the 'get supervisors' api response nor can it's configuration
with the supervisor history API, but will not be listed in the 'get supervisors' API response nor can it's configuration
or status report be retrieved. The only way this supervisor can start again is by submitting a functioning supervisor
spec to the create api.
spec to the create API.
### Capacity Planning

View File

@ -361,6 +361,12 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
return false;
}
@Override
protected boolean isShardExpirationMarker(Long seqNum)
{
return false;
}
@Override
protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{

View File

@ -140,6 +140,17 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
return shutdownTimeout;
}
@Override
public Duration getRepartitionTransitionDuration()
{
// Stopping tasks early for Kafka ingestion on partition set change is not supported yet,
// just return a default for now.
return SeekableStreamSupervisorTuningConfig.defaultDuration(
null,
SeekableStreamSupervisorTuningConfig.DEFAULT_REPARTITION_TRANSITION_DURATION
);
}
@JsonProperty
public Duration getOffsetFetchPeriod()
{

View File

@ -636,9 +636,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
// for simplicity in testing the offset availability check, we use negative stored offsets in metadata here,
// because the stream's earliest offset is 0, although that would not happen in real usage.
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of())
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, -10L, 1, -20L, 2, -30L), ImmutableSet.of())
)
).anyTimes();
replayAll();
@ -2041,10 +2043,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.reset(indexerMetadataStorageCoordinator);
// unknown DataSourceMetadata in metadata store
// for simplicity in testing the offset availability check, we use negative stored offsets in metadata here,
// because the stream's earliest offset is 0, although that would not happen in real usage.
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE))
.andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(1, 100L, 2, 200L))
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(1, -100L, 2, 200L))
)
).times(4);
// getOffsetFromStorageForPartition() throws an exception when the offsets are automatically reset.

View File

@ -42,6 +42,10 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber<String>
// to ingest data until taskDuration has elapsed or the task was stopped or paused or killed
public static final String NO_END_SEQUENCE_NUMBER = "NO_END_SEQUENCE_NUMBER";
// this special marker is used by the KinesisSupervisor to mark that a shard has been expired
// (i.e., closed and then the retention period has passed)
public static final String EXPIRED_MARKER = "EXPIRED";
// this flag is used to indicate either END_OF_SHARD_MARKER
// or NO_END_SEQUENCE_NUMBER so that they can be properly compared
// with other sequence numbers

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.druid.common.aws.AWSCredentialsConfig;
@ -60,7 +59,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.joda.time.DateTime;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -68,6 +66,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -234,13 +233,38 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
partitionIds.add(partitionId);
}
return Math.abs(getHashIntFromShardId(partitionId) % spec.getIoConfig().getTaskCount());
return getTaskGroupIdForPartitionWithProvidedList(partitionId, partitionIds);
}
private int getHashIntFromShardId(String shardId)
private int getTaskGroupIdForPartitionWithProvidedList(String partitionId, List<String> availablePartitions)
{
HashCode hashCode = HASH_FUNCTION.hashString(shardId, StandardCharsets.UTF_8);
return hashCode.asInt();
return availablePartitions.indexOf(partitionId) % spec.getIoConfig().getTaskCount();
}
@Override
protected Map<Integer, ConcurrentHashMap<String, String>> recomputePartitionGroupsForExpiration(
Set<String> availablePartitions
)
{
List<String> availablePartitionsList = new ArrayList<>(availablePartitions);
Map<Integer, ConcurrentHashMap<String, String>> newPartitionGroups = new HashMap<>();
for (ConcurrentHashMap<String, String> oldGroup : partitionGroups.values()) {
for (Map.Entry<String, String> partitionOffsetMapping : oldGroup.entrySet()) {
String partitionId = partitionOffsetMapping.getKey();
if (availablePartitions.contains(partitionId)) {
int newTaskGroupId = getTaskGroupIdForPartitionWithProvidedList(partitionId, availablePartitionsList);
ConcurrentHashMap<String, String> partitionMap = newPartitionGroups.computeIfAbsent(
newTaskGroupId,
k -> new ConcurrentHashMap<>()
);
partitionMap.put(partitionId, partitionOffsetMapping.getValue());
}
}
}
return newPartitionGroups;
}
@Override
@ -330,6 +354,12 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(seqNum);
}
@Override
protected boolean isShardExpirationMarker(String seqNum)
{
return KinesisSequenceNumber.EXPIRED_MARKER.equals(seqNum);
}
@Override
protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{
@ -359,12 +389,11 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
}
@Override
protected KinesisDataSourceMetadata createDataSourceMetadataWithoutExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata,
Set<String> expiredPartitionIds
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> expiredPartitionIds
)
{
log.info("Cleaning up dead shards: " + expiredPartitionIds);
log.info("Marking expired shards in metadata: " + expiredPartitionIds);
final KinesisDataSourceMetadata dataSourceMetadata = (KinesisDataSourceMetadata) currentMetadata;
@ -375,12 +404,16 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
for (Map.Entry<String, String> entry : oldPartitionSequenceNumberMap.entrySet()) {
if (!expiredPartitionIds.contains(entry.getKey())) {
newPartitionSequenceNumberMap.put(entry.getKey(), entry.getValue());
} else {
newPartitionSequenceNumberMap.put(entry.getKey(), KinesisSequenceNumber.EXPIRED_MARKER);
}
}
Set<String> oldExclusiveStartPartitions;
Set<String> newExclusiveStartPartitions = null;
SeekableStreamSequenceNumbers<String, String> newSequences;
if (old instanceof SeekableStreamStartSequenceNumbers) {
Set<String> oldExclusiveStartPartitions;
Set<String> newExclusiveStartPartitions;
newExclusiveStartPartitions = new HashSet<>();
oldExclusiveStartPartitions = ((SeekableStreamStartSequenceNumbers<String, String>) old).getExclusivePartitions();
for (String partitionId : oldExclusiveStartPartitions) {
@ -388,10 +421,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
newExclusiveStartPartitions.add(partitionId);
}
}
}
SeekableStreamSequenceNumbers<String, String> newSequences;
if (old instanceof SeekableStreamStartSequenceNumbers) {
newSequences = new SeekableStreamStartSequenceNumbers<String, String>(
old.getStream(),
null,

View File

@ -97,6 +97,7 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
null,
null,
null,
null,
null
),
ioConfig,

View File

@ -38,6 +38,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
private final Long chatRetries;
private final Duration httpTimeout;
private final Duration shutdownTimeout;
private final Duration repartitionTransitionDuration;
public KinesisSupervisorTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@ -69,7 +70,8 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration
)
{
super(
@ -108,6 +110,10 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
shutdownTimeout,
DEFAULT_SHUTDOWN_TIMEOUT
);
this.repartitionTransitionDuration = SeekableStreamSupervisorTuningConfig.defaultDuration(
repartitionTransitionDuration,
DEFAULT_REPARTITION_TRANSITION_DURATION
);
}
@Override
@ -145,6 +151,12 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
return shutdownTimeout;
}
@Override
public Duration getRepartitionTransitionDuration()
{
return repartitionTransitionDuration;
}
@Override
public String toString()
{
@ -177,6 +189,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", maxRecordsPerPoll=" + getMaxRecordsPerPoll() +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", repartitionTransitionDuration=" + getRepartitionTransitionDuration() +
'}';
}

View File

@ -311,6 +311,7 @@ public class KinesisIndexTaskTuningConfigTest
null,
null,
null,
null,
null
);
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();

View File

@ -120,11 +120,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
private static final String SHARD_ID0 = "shardId-000000000000";
private static final String SHARD_ID1 = "shardId-000000000001";
private static final String SHARD_ID2 = "shardId-000000000002";
private static final String SHARD_ID3 = "shardId-000000000003";
private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID0);
private static final StreamPartition<String> SHARD1_PARTITION = StreamPartition.of(STREAM, SHARD_ID1);
private static final StreamPartition<String> SHARD2_PARTITION = StreamPartition.of(STREAM, SHARD_ID2);
private static final StreamPartition<String> SHARD3_PARTITION = StreamPartition.of(STREAM, SHARD_ID3);
private static DataSchema dataSchema;
private KinesisRecordSupplier supervisorRecordSupplier;
@ -194,6 +192,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
null,
null
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
@ -2460,8 +2459,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
.anyTimes();
supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("300").anyTimes();
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("400").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
@ -2478,7 +2477,22 @@ public class KinesisSupervisorTest extends EasyMockSupport
new KinesisDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "100", SHARD_ID2, "200"))
)
).times(3);
).times(2);
// Since shard 2 was in metadata before but is not in the list of shards returned by the record supplier,
// it gets deleted from metadata (it is an expired shard)
EasyMock.expect(
indexerMetadataStorageCoordinator.resetDataSourceMetadata(
DATASOURCE,
new KinesisDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
ImmutableMap.of(SHARD_ID1, "100", SHARD_ID2, KinesisSequenceNumber.EXPIRED_MARKER)
)
)
)
).andReturn(true).times(1);
// getOffsetFromStorageForPartition() throws an exception when the offsets are automatically reset.
// Since getOffsetFromStorageForPartition() is called per partition, all partitions can't be reset at the same time.
// Instead, subsequent partitions will be reset in the following supervisor runs.
@ -2487,10 +2501,18 @@ public class KinesisSupervisorTest extends EasyMockSupport
DATASOURCE,
new KinesisDataSourceMetadata(
// Only one partition is reset in a single supervisor run.
new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID2, "200"))
new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of())
)
)
).andReturn(true);
).andReturn(true).times(1);
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE))
.andReturn(
new KinesisDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "100"))
)
).times(2);
replayAll();
supervisor.start();
@ -3718,6 +3740,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
42, // This property is different from tuningConfig
null,
null
);
@ -3885,8 +3908,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
SHARD_ID0,
"0"
));
// there would be 4 tasks, 2 for each task group
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), EasyMock.anyBoolean()))
// there would be 1 task, since there is only 1 shard
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints1))
.times(1);
@ -3932,13 +3955,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
)
).anyTimes();
// Normally the split would result in 0 -> 1,2, but we use shard ID 3 instead to ensure that each
// task group gets one shard after the split (due to hashing behavior)
EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
.andReturn(ImmutableSet.of(SHARD_ID0, SHARD_ID1, SHARD_ID3))
.andReturn(ImmutableSet.of(SHARD_ID0, SHARD_ID1, SHARD_ID2))
.anyTimes();
EasyMock.expect(supervisorRecordSupplier.getAssignment())
.andReturn(ImmutableSet.of(SHARD0_PARTITION, SHARD1_PARTITION, SHARD3_PARTITION))
.andReturn(ImmutableSet.of(SHARD0_PARTITION, SHARD1_PARTITION, SHARD2_PARTITION))
.anyTimes();
supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
@ -3948,7 +3969,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
.andReturn(KinesisSequenceNumber.END_OF_SHARD_MARKER).anyTimes();
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID1)))
.andReturn("100").anyTimes();
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID3)))
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2)))
.andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
@ -3989,12 +4010,12 @@ public class KinesisSupervisorTest extends EasyMockSupport
.anyTimes();
TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>();
checkpointsGroup0.put(0, ImmutableMap.of(
SHARD_ID1, "0",
SHARD_ID2, "0",
SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER
));
TreeMap<Integer, Map<String, String>> checkpointsGroup1 = new TreeMap<>();
checkpointsGroup1.put(1, ImmutableMap.of(
SHARD_ID3, "0"
SHARD_ID1, "0"
));
// there would be 2 tasks, 1 for each task group
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
@ -4023,7 +4044,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>(
STREAM,
ImmutableMap.of(
SHARD_ID1, "0"
SHARD_ID2, "0"
),
ImmutableSet.of()
);
@ -4032,7 +4053,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new SeekableStreamEndSequenceNumbers<>(
STREAM,
ImmutableMap.of(
SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
)
);
@ -4040,7 +4061,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>(
STREAM,
ImmutableMap.of(
SHARD_ID3, "0"
SHARD_ID1, "0"
),
ImmutableSet.of()
);
@ -4049,7 +4070,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
new SeekableStreamEndSequenceNumbers<>(
STREAM,
ImmutableMap.of(
SHARD_ID3, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
)
);
@ -4090,7 +4111,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
ImmutableMap.of(
SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER,
SHARD_ID1, "100",
SHARD_ID3, "100"
SHARD_ID2, "100"
)
)
)
@ -4103,8 +4124,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
new SeekableStreamEndSequenceNumbers<String, String>(
STREAM,
ImmutableMap.of(
SHARD_ID0, KinesisSequenceNumber.EXPIRED_MARKER,
SHARD_ID1, "100",
SHARD_ID3, "100"
SHARD_ID2, "100"
)
)
)
@ -4112,10 +4134,10 @@ public class KinesisSupervisorTest extends EasyMockSupport
).andReturn(true).anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
.andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID3))
.andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID2))
.anyTimes();
EasyMock.expect(supervisorRecordSupplier.getAssignment())
.andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD3_PARTITION))
.andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD2_PARTITION))
.anyTimes();
supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
@ -4123,7 +4145,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID1)))
.andReturn("200").anyTimes();
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID3)))
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2)))
.andReturn("200").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
@ -4171,11 +4193,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
.anyTimes();
TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>();
checkpointsGroup0.put(0, ImmutableMap.of(
SHARD_ID1, "100"
SHARD_ID2, "100"
));
TreeMap<Integer, Map<String, String>> checkpointsGroup1 = new TreeMap<>();
checkpointsGroup1.put(1, ImmutableMap.of(
SHARD_ID3, "100"
SHARD_ID1, "100"
));
// there would be 2 tasks, 1 for each task group
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), EasyMock.anyBoolean()))
@ -4222,16 +4244,16 @@ public class KinesisSupervisorTest extends EasyMockSupport
new SeekableStreamStartSequenceNumbers<>(
STREAM,
ImmutableMap.of(
SHARD_ID3, "100"
SHARD_ID2, "100"
),
ImmutableSet.of(SHARD_ID3)
ImmutableSet.of(SHARD_ID2)
);
SeekableStreamEndSequenceNumbers<String, String> group1ExpectedEndSequenceNumbers =
new SeekableStreamEndSequenceNumbers<>(
STREAM,
ImmutableMap.of(
SHARD_ID3, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
)
);
@ -4247,7 +4269,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
Map<Integer, Map<String, String>> expectedPartitionGroups = ImmutableMap.of(
0, ImmutableMap.of(SHARD_ID1, "-1"),
1, ImmutableMap.of(SHARD_ID3, "-1")
1, ImmutableMap.of(SHARD_ID2, "-1")
);
Assert.assertEquals(expectedPartitionGroups, supervisor.getPartitionGroups());
}
@ -4530,6 +4552,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
new SeekableStreamEndSequenceNumbers<String, String>(
STREAM,
ImmutableMap.of(
SHARD_ID0, KinesisSequenceNumber.EXPIRED_MARKER,
SHARD_ID1, KinesisSequenceNumber.EXPIRED_MARKER,
SHARD_ID2, "100"
)
)
@ -4748,6 +4772,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
null,
null
);

View File

@ -71,6 +71,7 @@ public class KinesisSupervisorTuningConfigTest
Assert.assertEquals(8L, (long) config.getChatRetries());
Assert.assertEquals(Duration.standardSeconds(10), config.getHttpTimeout());
Assert.assertEquals(Duration.standardSeconds(80), config.getShutdownTimeout());
Assert.assertEquals(Duration.standardSeconds(120), config.getRepartitionTransitionDuration());
}
@Test
@ -90,7 +91,8 @@ public class KinesisSupervisorTuningConfigTest
+ " \"chatThreads\": 13,\n"
+ " \"chatRetries\": 14,\n"
+ " \"httpTimeout\": \"PT15S\",\n"
+ " \"shutdownTimeout\": \"PT95S\"\n"
+ " \"shutdownTimeout\": \"PT95S\",\n"
+ " \"repartitionTransitionDuration\": \"PT500S\"\n"
+ "}";
KinesisSupervisorTuningConfig config = (KinesisSupervisorTuningConfig) mapper.readValue(
@ -116,5 +118,6 @@ public class KinesisSupervisorTuningConfigTest
Assert.assertEquals(14L, (long) config.getChatRetries());
Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout());
Assert.assertEquals(Duration.standardSeconds(95), config.getShutdownTimeout());
Assert.assertEquals(Duration.standardSeconds(500), config.getRepartitionTransitionDuration());
}
}

View File

@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -515,6 +516,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private long lastRunTime;
private int initRetryCounter = 0;
private volatile DateTime firstRunTime;
private volatile DateTime earlyStopTime = null;
private volatile RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier;
private volatile boolean started = false;
private volatile boolean stopped = false;
@ -1852,46 +1854,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return false;
}
/**
*
* Some seekable stream systems such as Kinesis allow partitions to expire. When this occurs, the supervisor should
* remove the expired partitions from saved metadata and from the partition groups stored in memory.
*
* @param currentMetadata The current DataSourceMetadata from metadata storage
* @param expiredPartitionIds The set of expired partition IDs.
* @return currentMetadata but with any expired partitions removed.
*/
protected SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadataWithoutExpiredPartitions(
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentMetadata,
Set<PartitionIdType> expiredPartitionIds
)
{
throw new UnsupportedOperationException("This supervisor type does not support partition expiration.");
}
/**
* Removes a set of expired partition IDs from partitionIds and partitionGroups. This is called after
* successfully removing expired partitions from metadata, for supervisor types that support partition expiration.
*
* @param expiredPartitionIds Set of expired partition IDs.
*/
private void removeExpiredPartitionsFromMemory(Set<PartitionIdType> expiredPartitionIds)
{
partitionIds.removeAll(expiredPartitionIds);
for (ConcurrentHashMap<PartitionIdType, SequenceOffsetType> partitionGroup : partitionGroups.values()) {
for (PartitionIdType expiredShard : expiredPartitionIds) {
partitionGroup.remove(expiredShard);
}
}
}
private boolean updatePartitionDataFromStream()
{
Set<PartitionIdType> partitionIds;
List<PartitionIdType> previousPartitionIds = new ArrayList<>(partitionIds);
Set<PartitionIdType> partitionIdsFromSupplier;
try {
synchronized (recordSupplierLock) {
partitionIds = recordSupplier.getPartitionIds(ioConfig.getStream());
partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream());
}
}
catch (Exception e) {
@ -1901,24 +1870,55 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return false;
}
if (partitionIds == null || partitionIds.size() == 0) {
if (partitionIdsFromSupplier == null || partitionIdsFromSupplier.size() == 0) {
String errMsg = StringUtils.format("No partitions found for stream [%s]", ioConfig.getStream());
stateManager.recordThrowableEvent(new StreamException(new ISE(errMsg)));
log.warn(errMsg);
return false;
}
log.debug("Found [%d] partitions for stream [%s]", partitionIds.size(), ioConfig.getStream());
log.debug("Found [%d] partitions for stream [%s]", partitionIdsFromSupplier.size(), ioConfig.getStream());
Set<PartitionIdType> closedPartitions = getOffsetsFromMetadataStorage()
Map<PartitionIdType, SequenceOffsetType> storedMetadata = getOffsetsFromMetadataStorage();
Set<PartitionIdType> storedPartitions = storedMetadata.keySet();
Set<PartitionIdType> closedPartitions = storedMetadata
.entrySet()
.stream()
.filter(x -> isEndOfShard(x.getValue()))
.map(Entry::getKey)
.collect(Collectors.toSet());
Set<PartitionIdType> previouslyExpiredPartitions = storedMetadata
.entrySet()
.stream()
.filter(x -> isShardExpirationMarker(x.getValue()))
.map(Entry::getKey)
.collect(Collectors.toSet());
Set<PartitionIdType> partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions = Sets.difference(
partitionIdsFromSupplier,
previouslyExpiredPartitions
);
if (partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions.size() != partitionIdsFromSupplier.size()) {
// this should never happen, but we check for it and exclude the expired partitions if they somehow reappear
log.warn(
"Previously expired partitions [%s] were present in the current list [%s] from the record supplier.",
previouslyExpiredPartitions,
partitionIdsFromSupplier
);
}
if (partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions.size() == 0) {
String errMsg = StringUtils.format(
"No partitions found for stream [%s] after removing previously expired partitions",
ioConfig.getStream()
);
stateManager.recordThrowableEvent(new StreamException(new ISE(errMsg)));
log.warn(errMsg);
return false;
}
boolean initialPartitionDiscovery = this.partitionIds.isEmpty();
for (PartitionIdType partitionId : partitionIds) {
for (PartitionIdType partitionId : partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions) {
if (closedPartitions.contains(partitionId)) {
log.info("partition [%s] is closed and has no more data, skipping.", partitionId);
continue;
@ -1926,16 +1926,26 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (!initialPartitionDiscovery && !this.partitionIds.contains(partitionId)) {
subsequentlyDiscoveredPartitions.add(partitionId);
// should check for earlyPublishTime (Kinesis) here, not supported yet
}
}
// When partitions expire, we need to recompute the task group assignments, considering only non-expired partitions,
// to ensure that we have even distribution of readable partitions across tasks.
if (supportsPartitionExpiration()) {
cleanupExpiredPartitions(
storedPartitions,
previouslyExpiredPartitions,
partitionIdsFromSupplier
);
}
for (PartitionIdType partitionId : partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions) {
int taskGroupId = getTaskGroupIdForPartition(partitionId);
ConcurrentHashMap<PartitionIdType, SequenceOffsetType> partitionMap = partitionGroups.computeIfAbsent(
taskGroupId,
k -> new ConcurrentHashMap<>()
);
if (partitionMap.putIfAbsent(partitionId, getNotSetMarker()) == null) {
log.info(
"New partition [%s] discovered for stream [%s], added to task group [%d]",
@ -1946,35 +1956,26 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
if (supportsPartitionExpiration()) {
// Look for expired shards and remove them from metadata storage and the partition groups
Set<PartitionIdType> expiredPartitions = new HashSet<>();
for (PartitionIdType partitionTd : closedPartitions) {
if (!partitionIds.contains(partitionTd)) {
expiredPartitions.add(partitionTd);
}
}
if (expiredPartitions.size() > 0) {
@SuppressWarnings("unchecked")
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) indexerMetadataStorageCoordinator.getDataSourceMetadata(
dataSource);
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> cleanedMetadata =
createDataSourceMetadataWithoutExpiredPartitions(currentMetadata, expiredPartitions);
validateMetadataPartitionExpiration(currentMetadata, cleanedMetadata);
try {
boolean success = indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, cleanedMetadata);
if (success) {
removeExpiredPartitionsFromMemory(expiredPartitions);
} else {
log.error("Failed to update datasource metadata[%s] with expired partitions removed", cleanedMetadata);
}
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
if (!partitionIds.equals(previousPartitionIds)) {
// the set of partition IDs has changed, have any running tasks stop early so that we can adjust to the
// repartitioning quickly by creating new tasks
for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
if (!taskGroup.taskIds().isEmpty()) {
// Partitions have changed and we are managing active tasks - set an early publish time
// at the current time + repartitionTransitionDuration.
// This allows time for the stream to start writing to the new partitions after repartitioning.
// For Kinesis ingestion, this cooldown time is particularly useful, lowering the possibility of
// the new shards being empty, which can cause issues presently
// (see https://github.com/apache/incubator-druid/issues/7600)
earlyStopTime = DateTimes.nowUtc().plus(tuningConfig.getRepartitionTransitionDuration());
log.info(
"Previous partition set [%s] has changed to [%s] - requesting that tasks stop after [%s] at [%s]",
previousPartitionIds,
partitionIds,
tuningConfig.getRepartitionTransitionDuration(),
earlyStopTime
);
break;
}
}
}
@ -1982,17 +1983,130 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return true;
}
/**
* This method determines the set of expired partitions from the set of partitions currently returned by
* the record supplier and the set of partitions previously tracked in the metadata.
*
* It will mark the expired partitions in metadata and recompute the partition->task group mappings, updating
* the metadata, the partitionIds list, and the partitionGroups mappings.
*
* Note that partition IDs that were newly discovered (appears in record supplier set but not in metadata set)
* are not added to the recomputed partition groups here. This is handled later in
* {@link #updatePartitionDataFromStream} after this method is called.
*
* @param storedPartitions Set of partitions previously tracked, from the metadata store
* @param partitionIdsFromSupplier Set of partitions currently returned by the record supplier.
*/
private void cleanupExpiredPartitions(
Set<PartitionIdType> storedPartitions,
Set<PartitionIdType> previouslyExpiredPartitions,
Set<PartitionIdType> partitionIdsFromSupplier
)
{
// If a partition was previously known (stored in metadata) but no longer appears in the list of partitions
// provided by the record supplier, it has expired.
Set<PartitionIdType> newlyExpiredPartitions = Sets.difference(storedPartitions, previouslyExpiredPartitions);
newlyExpiredPartitions = Sets.difference(newlyExpiredPartitions, partitionIdsFromSupplier);
if (newlyExpiredPartitions.size() > 0) {
log.info("Detected newly expired partitions: " + newlyExpiredPartitions);
// Mark partitions as expired in metadata
@SuppressWarnings("unchecked")
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) indexerMetadataStorageCoordinator.getDataSourceMetadata(
dataSource);
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> cleanedMetadata =
createDataSourceMetadataWithExpiredPartitions(currentMetadata, newlyExpiredPartitions);
log.info("New metadata after partition expiration: " + cleanedMetadata);
validateMetadataPartitionExpiration(newlyExpiredPartitions, currentMetadata, cleanedMetadata);
// Compute new partition groups, only including partitions that are
// still in partitionIdsFromSupplier
Map<Integer, ConcurrentHashMap<PartitionIdType, SequenceOffsetType>> newPartitionGroups =
recomputePartitionGroupsForExpiration(partitionIdsFromSupplier);
validatePartitionGroupReassignments(newPartitionGroups);
log.info("New partition groups after partition expiration: " + newPartitionGroups);
try {
boolean success = indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, cleanedMetadata);
if (success) {
partitionIds.clear();
partitionIds.addAll(partitionIdsFromSupplier);
for (Integer groupId : partitionGroups.keySet()) {
if (newPartitionGroups.containsKey(groupId)) {
partitionGroups.put(groupId, newPartitionGroups.get(groupId));
} else {
partitionGroups.put(groupId, new ConcurrentHashMap<>());
}
}
} else {
log.error("Failed to update datasource metadata[%s] with expired partitions removed", cleanedMetadata);
}
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}
/**
* When partitions are removed due to expiration it may be necessary to recompute the partitionID -> groupID
* mappings to ensure balanced distribution of partitions.
*
* This function should return a copy of partitionGroups, using the provided availablePartitions as the list of
* active partitions, reassigning partitions to different groups if necessary.
*
* If a partition is not in availablePartitions, it should be filtered out of the new partition groups returned
* by this method.
*
* @param availablePartitions
* @return a remapped copy of partitionGroups, containing only the partitions in availablePartitions
*/
protected Map<Integer, ConcurrentHashMap<PartitionIdType, SequenceOffsetType>> recomputePartitionGroupsForExpiration(
Set<PartitionIdType> availablePartitions
)
{
throw new UnsupportedOperationException("This supervisor type does not support partition expiration.");
}
/**
*
* Some seekable stream systems such as Kinesis allow partitions to expire. When this occurs, the supervisor should
* mark the expired partitions in the saved metadata. This method returns a copy of the current metadata
* with any expired partitions marked with an implementation-specific offset value that represents the expired state.
*
* @param currentMetadata The current DataSourceMetadata from metadata storage
* @param expiredPartitionIds The set of expired partition IDs.
* @return currentMetadata but with any expired partitions removed.
*/
protected SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadataWithExpiredPartitions(
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentMetadata,
Set<PartitionIdType> expiredPartitionIds
)
{
throw new UnsupportedOperationException("This supervisor type does not support partition expiration.");
}
/**
* Perform a sanity check on the datasource metadata returned by
* {@link #createDataSourceMetadataWithoutExpiredPartitions}.
* {@link #createDataSourceMetadataWithExpiredPartitions}.
*
* Specifically, we check that the cleaned metadata's partitions are a subset of the original metadata's partitions,
* and that none of the offsets for the non-expired partitions have changed.
* that newly expired partitions are marked as expired, and that none of the offsets for the non-expired partitions
* have changed.
*
* @param oldMetadata metadata containing expired partitions.
* @param cleanedMetadata new metadata without expired partitions, generated by the subclass
*/
private void validateMetadataPartitionExpiration(
Set<PartitionIdType> newlyExpiredPartitions,
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> oldMetadata,
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> cleanedMetadata
)
@ -2015,7 +2129,18 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
SequenceOffsetType oldOffset = oldPartitionSeqNos.get(cleanedPartitionSeqNo.getKey());
if (!oldOffset.equals(cleanedPartitionSeqNo.getValue())) {
if (newlyExpiredPartitions.contains(cleanedPartitionSeqNo.getKey())) {
// this is a newly expired partition, check that we did actually mark it as expired
if (!isShardExpirationMarker(cleanedPartitionSeqNo.getValue())) {
throw new IAE(
"Newly expired partition [%] was not marked as expired in the cleaned partition map [%s], original partition map: [%s]",
cleanedPartitionSeqNo.getKey(),
cleanedPartitionSeqNos,
oldPartitionSeqNos
);
}
} else if (!oldOffset.equals(cleanedPartitionSeqNo.getValue())) {
// this is not an expired shard, check that the offset did not change
throw new IAE(
"Cleaned partition map [%s] has offset mismatch for partition ID [%s], original partition map: [%s]",
cleanedPartitionSeqNos,
@ -2026,6 +2151,50 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
/**
* Perform a sanity check on the new partition groups returned by
* {@link #recomputePartitionGroupsForExpiration}.
*
* Specifically, we check that the new partition groups' partitions are a subset of the original groups' partitions,
* and that none of the offsets for the non-expired partitions have changed.
*
* @param newPartitionGroups new metadata without expired partitions, generated by the subclass
*/
private void validatePartitionGroupReassignments(
Map<Integer, ConcurrentHashMap<PartitionIdType, SequenceOffsetType>> newPartitionGroups
)
{
Map<PartitionIdType, SequenceOffsetType> oldPartitionMappings = new HashMap<>();
for (ConcurrentHashMap<PartitionIdType, SequenceOffsetType> oldGroup : partitionGroups.values()) {
// we don't care about old task group mappings, only the partition-offset mappings
oldPartitionMappings.putAll(oldGroup);
}
for (ConcurrentHashMap<PartitionIdType, SequenceOffsetType> newGroup : newPartitionGroups.values()) {
for (Entry<PartitionIdType, SequenceOffsetType> newPartitionMapping : newGroup.entrySet()) {
if (!oldPartitionMappings.containsKey(newPartitionMapping.getKey())) {
// recomputing the groups without the expired partitions added an unknown partition somehow
throw new IAE(
"Recomputed partition groups [%s] contains unexpected partition ID [%s], old partition groups: [%s]",
newPartitionGroups,
newPartitionMapping.getKey(),
partitionGroups
);
}
SequenceOffsetType oldOffset = oldPartitionMappings.get(newPartitionMapping.getKey());
if (!oldOffset.equals(newPartitionMapping.getValue())) {
throw new IAE(
"Recomputed partition groups [%s] has offset mismatch for partition ID [%s], original partition map: [%s]",
newPartitionGroups,
newPartitionMapping.getKey(),
partitionGroups
);
}
}
}
}
private void updateTaskStatus() throws ExecutionException, InterruptedException, TimeoutException
{
final List<ListenableFuture<Boolean>> futures = new ArrayList<>();
@ -2110,8 +2279,18 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
boolean stopTasksEarly = false;
if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) {
log.info("Early stop requested - signalling tasks to complete");
earlyStopTime = null;
stopTasksEarly = true;
}
// if this task has run longer than the configured duration, signal all tasks in the group to persist
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || stopTasksEarly) {
log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
@ -2507,7 +2686,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* should be removed from the starting offsets sent to the tasks.
*
* @param startingOffsets
* @return
* @return startingOffsets with entries for expired partitions removed
*/
protected Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> filterExpiredPartitionsFromStartingOffsets(
Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> startingOffsets
@ -2688,7 +2867,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (sequence != null) {
log.debug("Getting sequence [%s] from metadata storage for partition [%s]", sequence, partition);
if (!taskTuningConfig.isSkipSequenceNumberAvailabilityCheck()) {
if (!checkSequenceAvailability(partition, sequence)) {
if (!checkOffsetAvailability(partition, sequence)) {
if (taskTuningConfig.isResetOffsetAutomatically()) {
resetInternal(
createDataSourceMetaDataForReset(ioConfig.getStream(), ImmutableMap.of(partition, sequence))
@ -3113,24 +3292,20 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
/**
* checks if sequence from metadata storage is still valid
* checks if offset from metadata storage is still valid
*
* @return true if still valid else false
*/
private boolean checkSequenceAvailability(
private boolean checkOffsetAvailability(
@NotNull PartitionIdType partition,
@NotNull SequenceOffsetType sequenceFromMetadata
@NotNull SequenceOffsetType offsetFromMetadata
)
{
SequenceOffsetType earliestSequence = getOffsetFromStreamForPartition(partition, true);
SequenceOffsetType latestSequence = getOffsetFromStreamForPartition(partition, false);
return (earliestSequence == null
|| makeSequenceNumber(earliestSequence).compareTo(makeSequenceNumber(sequenceFromMetadata)) <= 0)
&& (latestSequence == null
|| makeSequenceNumber(latestSequence).compareTo(makeSequenceNumber(sequenceFromMetadata)) >= 0);
final SequenceOffsetType earliestOffset = getOffsetFromStreamForPartition(partition, true);
return earliestOffset != null
&& makeSequenceNumber(earliestOffset).compareTo(makeSequenceNumber(offsetFromMetadata)) <= 0;
}
/**
* a special sequence number that is used to indicate that the sequence offset
* for a particular partition has not yet been calculated by the supervisor. When
@ -3151,10 +3326,15 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected abstract SequenceOffsetType getEndOfPartitionMarker();
/**
* checks if seqNum marks the end of a Kinesis shard. Used by Kinesis only.
* checks if seqNum marks the end of a Kinesis shard. This indicates that the shard is closed. Used by Kinesis only.
*/
protected abstract boolean isEndOfShard(SequenceOffsetType seqNum);
/**
* checks if seqNum marks an expired Kinesis shard. Used by Kinesis only.
*/
protected abstract boolean isShardExpirationMarker(SequenceOffsetType seqNum);
/**
* Returns true if the start sequence number should be exclusive for the non-first sequences for the whole partition.
* For example, in Kinesis, the start offsets are inclusive for the first sequence, but exclusive for following

View File

@ -30,7 +30,7 @@ public interface SeekableStreamSupervisorTuningConfig
int DEFAULT_CHAT_RETRIES = 8;
String DEFAULT_HTTP_TIMEOUT = "PT10S";
String DEFAULT_SHUTDOWN_TIMEOUT = "PT80S";
String DEFAULT_REPARTITION_TRANSITION_DURATION = "PT2M";
static Duration defaultDuration(final Period period, final String theDefault)
{
@ -52,5 +52,8 @@ public interface SeekableStreamSupervisorTuningConfig
@JsonProperty
Duration getShutdownTimeout();
@JsonProperty
Duration getRepartitionTransitionDuration();
SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig();
}

View File

@ -640,6 +640,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
return new Period("PT1S").toStandardDuration();
}
@Override
public Duration getRepartitionTransitionDuration()
{
return new Period("PT2M").toStandardDuration();
}
@Override
public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
{
@ -913,6 +919,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
return false;
}
@Override
protected boolean isShardExpirationMarker(String seqNum)
{
return false;
}
@Override
protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{

View File

@ -671,6 +671,7 @@ maxRecordsPerPollrecordsPerFetchfetchDelayMillisreplicasfetchDelayMillisrecordsP
numKinesisShards
numProcessors
q.size
repartitionTransitionDuration
replicastaskCounttaskCount
resetuseEarliestSequenceNumberPOST
resumePOST
@ -683,6 +684,15 @@ taskCounttaskDurationtaskDurationPOST
taskDurationstartDelayperioduseEarliestSequenceNumbercompletionTimeouttaskDurationlateMessageRejectionPeriodPT1HearlyMessageRejectionPeriodPT1HPT1HrecordsPerFetchfetchDelayMillisawsAssumedRoleArnawsExternalIddeaggregateGET
terminatePOST
terminatedruid.worker.capacitytaskDurationcompletionTimeoutreplicastaskCountreplicas
PT2M
kinesis.us
amazonaws.com
PT6H
GetRecords
KCL
signalled
ProvisionedThroughputExceededException
Deaggregation
- ../docs/development/extensions-core/lookups-cached-global.md
baz
customJson