Remove skip ignorable shards (#13221)

* Revert "Improve kinesis task assignment after resharding (#12235)"

This reverts commit 1ec57cb935bd0b04d3123dfbb26a962a984422c7.
This commit is contained in:
AmatyaAvadhanula 2022-10-28 16:19:01 +05:30 committed by GitHub
parent de7ef81dff
commit 9cbda66d96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 10 additions and 223 deletions

View File

@ -76,6 +76,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -742,7 +743,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
}); });
} }
public Set<Shard> getShards(String stream) private Set<Shard> getShards(String stream)
{ {
if (useListShards) { if (useListShards) {
return getShardsUsingListShards(stream); return getShardsUsingListShards(stream);
@ -782,7 +783,8 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
* This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream) * This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
* *
* @param stream name of stream * @param stream name of stream
* @return Immutable set of shards *
* @return Set of Shard ids
*/ */
private Set<Shard> getShardsUsingListShards(String stream) private Set<Shard> getShardsUsingListShards(String stream)
{ {
@ -803,11 +805,11 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
public Set<String> getPartitionIds(String stream) public Set<String> getPartitionIds(String stream)
{ {
return wrapExceptions(() -> { return wrapExceptions(() -> {
ImmutableSet.Builder<String> partitionIds = ImmutableSet.builder(); Set<String> partitionIds = new TreeSet<>();
for (Shard shard : getShards(stream)) { for (Shard shard : getShards(stream)) {
partitionIds.add(shard.getShardId()); partitionIds.add(shard.getShardId());
} }
return partitionIds.build(); return partitionIds;
}); });
} }
@ -870,25 +872,6 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
.anyMatch(fetch -> (fetch != null && !fetch.isDone())); .anyMatch(fetch -> (fetch != null && !fetch.isDone()));
} }
/**
* Fetches records from the specified shard to determine if it is empty.
* @param stream to which shard belongs
* @param shardId of the closed shard
* @return true if the closed shard is empty, false otherwise.
*/
public boolean isClosedShardEmpty(String stream, String shardId)
{
String shardIterator = kinesis.getShardIterator(stream,
shardId,
ShardIteratorType.TRIM_HORIZON.toString())
.getShardIterator();
GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator)
.withLimit(1);
GetRecordsResult shardData = kinesis.getRecords(request);
return shardData.getRecords().isEmpty() && shardData.getNextShardIterator() == null;
}
/** /**
* Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call * Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call
* {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background * {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background

View File

@ -19,12 +19,10 @@
package org.apache.druid.indexing.kinesis.supervisor; package org.apache.druid.indexing.kinesis.supervisor;
import com.amazonaws.services.kinesis.model.Shard;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.utils.IdUtils; import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.ByteEntity;
@ -66,7 +64,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -91,11 +88,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
private final AWSCredentialsConfig awsCredentialsConfig; private final AWSCredentialsConfig awsCredentialsConfig;
private volatile Map<String, Long> currentPartitionTimeLag; private volatile Map<String, Long> currentPartitionTimeLag;
// Maintain sets of currently closed shards to find ignorable (closed and empty) shards
// Poll closed shards once and store the result to avoid redundant costly calls to kinesis
private final Set<String> emptyClosedShardIds = new TreeSet<>();
private final Set<String> nonEmptyClosedShardIds = new TreeSet<>();
public KinesisSupervisor( public KinesisSupervisor(
final TaskStorage taskStorage, final TaskStorage taskStorage,
final TaskMaster taskMaster, final TaskMaster taskMaster,
@ -425,52 +417,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
return true; return true;
} }
@Override
protected boolean shouldSkipIgnorablePartitions()
{
return spec.getSpec().getTuningConfig().isSkipIgnorableShards();
}
/**
* A kinesis shard is considered to be an ignorable partition if it is both closed and empty
* @return set of shards ignorable by kinesis ingestion
*/
@Override
protected Set<String> computeIgnorablePartitionIds()
{
updateClosedShardCache();
return ImmutableSet.copyOf(emptyClosedShardIds);
}
private synchronized void updateClosedShardCache()
{
final KinesisRecordSupplier kinesisRecordSupplier = (KinesisRecordSupplier) recordSupplier;
final String stream = spec.getSource();
final Set<Shard> allActiveShards = kinesisRecordSupplier.getShards(stream);
final Set<String> activeClosedShards = allActiveShards.stream()
.filter(shard -> isShardClosed(shard))
.map(Shard::getShardId)
.collect(Collectors.toSet());
// clear stale shards
emptyClosedShardIds.retainAll(activeClosedShards);
nonEmptyClosedShardIds.retainAll(activeClosedShards);
for (String closedShardId : activeClosedShards) {
// Try to utilize cache
if (emptyClosedShardIds.contains(closedShardId) || nonEmptyClosedShardIds.contains(closedShardId)) {
continue;
}
// Check if it is closed using kinesis and add to cache
if (kinesisRecordSupplier.isClosedShardEmpty(stream, closedShardId)) {
emptyClosedShardIds.add(closedShardId);
} else {
nonEmptyClosedShardIds.add(closedShardId);
}
}
}
@Override @Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithExpiredPartitions( protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> expiredPartitionIds SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> expiredPartitionIds
@ -536,15 +482,4 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
return new KinesisDataSourceMetadata(newSequences); return new KinesisDataSourceMetadata(newSequences);
} }
/**
* A shard is considered closed iff it has an ending sequence number.
*
* @param shard to be checked
* @return if shard is closed
*/
private boolean isShardClosed(Shard shard)
{
return shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
}
} }

View File

@ -41,7 +41,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
private final Duration repartitionTransitionDuration; private final Duration repartitionTransitionDuration;
private final Duration offsetFetchPeriod; private final Duration offsetFetchPeriod;
private final boolean useListShards; private final boolean useListShards;
private final boolean skipIgnorableShards;
public static KinesisSupervisorTuningConfig defaultConfig() public static KinesisSupervisorTuningConfig defaultConfig()
{ {
@ -77,7 +76,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
null, null,
null, null,
null, null,
null,
null null
); );
} }
@ -114,8 +112,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@JsonProperty("useListShards") Boolean useListShards, @JsonProperty("useListShards") Boolean useListShards
@JsonProperty("skipIgnorableShards") Boolean skipIgnorableShards
) )
{ {
super( super(
@ -163,7 +160,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
DEFAULT_OFFSET_FETCH_PERIOD DEFAULT_OFFSET_FETCH_PERIOD
); );
this.useListShards = (useListShards != null ? useListShards : false); this.useListShards = (useListShards != null ? useListShards : false);
this.skipIgnorableShards = (skipIgnorableShards != null ? skipIgnorableShards : false);
} }
@Override @Override
@ -220,12 +216,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
return useListShards; return useListShards;
} }
@JsonProperty
public boolean isSkipIgnorableShards()
{
return skipIgnorableShards;
}
@Override @Override
public String toString() public String toString()
{ {
@ -259,7 +249,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", repartitionTransitionDuration=" + getRepartitionTransitionDuration() + ", repartitionTransitionDuration=" + getRepartitionTransitionDuration() +
", useListShards=" + isUseListShards() + ", useListShards=" + isUseListShards() +
", skipIgnorableShards=" + isSkipIgnorableShards() +
'}'; '}';
} }

View File

@ -310,7 +310,6 @@ public class KinesisIndexTaskTuningConfigTest
null, null,
null, null,
null, null,
null,
null null
); );
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig(); KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();

View File

@ -53,7 +53,6 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -1107,45 +1106,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
verifyAll(); verifyAll();
} }
@Test
public void testIsClosedShardEmpty()
{
AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
recordsPerFetch,
0,
2,
false,
100,
5000,
5000,
5,
true,
false
);
Record record = new Record();
final String shardWithoutRecordsAndNullNextIterator = "0";
setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNullNextIterator, new ArrayList<>(), null);
final String shardWithRecordsAndNullNextIterator = "1";
setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNullNextIterator, Collections.singletonList(record), null);
final String shardWithoutRecordsAndNonNullNextIterator = "2";
setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNonNullNextIterator, new ArrayList<>(), "nextIterator");
final String shardWithRecordsAndNonNullNextIterator = "3";
setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNonNullNextIterator, Collections.singletonList(record), "nextIterator");
EasyMock.replay(mockKinesis);
// A closed shard is empty only when the records are empty and the next iterator is null
Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNullNextIterator));
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNullNextIterator));
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNonNullNextIterator));
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNonNullNextIterator));
}
@Test @Test
public void testIsOffsetAvailable() public void testIsOffsetAvailable()
{ {

View File

@ -19,8 +19,6 @@
package org.apache.druid.indexing.kinesis.supervisor; package org.apache.druid.indexing.kinesis.supervisor;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -107,7 +105,6 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -208,7 +205,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null, null,
null, null,
null, null,
null,
null null
); );
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
@ -3970,7 +3966,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null, null,
null, null,
null, null,
null,
null null
); );
@ -4924,55 +4919,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets()); Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets());
} }
@Test
public void testGetIgnorablePartitionIds()
{
supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
supervisor.setupRecordSupplier();
supervisor.tryInit();
String stream = supervisor.getKinesisSupervisorSpec().getSource();
SequenceNumberRange openShardRange = new SequenceNumberRange().withEndingSequenceNumber(null);
SequenceNumberRange closedShardRange = new SequenceNumberRange().withEndingSequenceNumber("non-null");
Shard openShard = new Shard().withShardId("openShard")
.withSequenceNumberRange(openShardRange);
Shard emptyClosedShard = new Shard().withShardId("emptyClosedShard")
.withSequenceNumberRange(closedShardRange);
Shard nonEmptyClosedShard = new Shard().withShardId("nonEmptyClosedShard")
.withSequenceNumberRange(closedShardRange);
EasyMock.expect(supervisorRecordSupplier.getShards(stream))
.andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once()
.andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once()
.andReturn(ImmutableSet.of(openShard, emptyClosedShard)).once()
.andReturn(ImmutableSet.of(openShard)).once()
.andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once();
// The following calls happen twice, once during the first call since there was no cache,
// and once during the last since the cache was cleared prior to it
EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, emptyClosedShard.getShardId()))
.andReturn(true).times(2);
EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, nonEmptyClosedShard.getShardId()))
.andReturn(false).times(2);
EasyMock.replay(supervisorRecordSupplier);
// ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
// {empty-closed, nonEmpty-closed} added to cache
Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
// ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
// ActiveShards = {open, empty-closed}, IgnorableShards = {empty-closed}
// {nonEmpty-closed} removed from cache
Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
// ActiveShards = {open}, IgnorableShards = {}
// {empty-closed} removed from cache
Assert.assertEquals(new HashSet<>(), supervisor.computeIgnorablePartitionIds());
// ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
// {empty-closed, nonEmpty-closed} re-added to cache
Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
}
private TestableKinesisSupervisor getTestableSupervisor( private TestableKinesisSupervisor getTestableSupervisor(
int replicas, int replicas,
int taskCount, int taskCount,
@ -5082,7 +5028,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null, null,
null, null,
null, null,
null,
null null
); );

View File

@ -124,11 +124,11 @@ public interface RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType
SequenceOffsetType getPosition(StreamPartition<PartitionIdType> partition); SequenceOffsetType getPosition(StreamPartition<PartitionIdType> partition);
/** /**
* returns the set of all available partitions under the given stream * returns the set of partitions under the given stream
* *
* @param stream name of stream * @param stream name of stream
* *
* @return set of partition ids belonging to the stream * @return set of partitions
*/ */
Set<PartitionIdType> getPartitionIds(String stream); Set<PartitionIdType> getPartitionIds(String stream);

View File

@ -2315,30 +2315,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return false; return false;
} }
protected boolean shouldSkipIgnorablePartitions()
{
return false;
}
/**
* Use this method if skipIgnorablePartitions is true in the spec
*
* These partitions can be safely ignored for both ingestion task assignment and autoscaler limits
*
* @return set of ids of ignorable partitions
*/
protected Set<PartitionIdType> computeIgnorablePartitionIds()
{
return ImmutableSet.of();
}
public int getPartitionCount() public int getPartitionCount()
{ {
int partitionCount = recordSupplier.getPartitionIds(ioConfig.getStream()).size(); return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
if (shouldSkipIgnorablePartitions()) {
partitionCount -= computeIgnorablePartitionIds().size();
}
return partitionCount;
} }
private boolean updatePartitionDataFromStream() private boolean updatePartitionDataFromStream()
@ -2348,9 +2327,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
recordSupplierLock.lock(); recordSupplierLock.lock();
try { try {
partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream()); partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream());
if (shouldSkipIgnorablePartitions()) {
partitionIdsFromSupplier.removeAll(computeIgnorablePartitionIds());
}
} }
catch (Exception e) { catch (Exception e) {
stateManager.recordThrowableEvent(e); stateManager.recordThrowableEvent(e);