Optimize metadata calls in SeekableStreamSupervisor (#13328)

* Optimize metadata calls

* Modify isTaskCurrent

* Fix tests

* Refactoring
This commit is contained in:
AmatyaAvadhanula 2022-11-10 07:22:51 +05:30 committed by GitHub
parent 0040042863
commit 0512ae4922
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 98 additions and 47 deletions

View File

@ -2833,7 +2833,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(1, -100L, 2, 200L))
)
).times(4);
).times(3);
// getOffsetFromStorageForPartition() throws an exception when the offsets are automatically reset.
// Since getOffsetFromStorageForPartition() is called per partition, all partitions can't be reset at the same time.
// Instead, subsequent partitions will be reset in the following supervisor runs.
@ -3879,6 +3879,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
null
);
KafkaIndexTask completedTaskFromStorage = createKafkaIndexTask(
"id0",
0,
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of(0, 0L, 2, 0L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
"topic",
ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
minMessageTime,
maxMessageTime,
dataSchema,
supervisor.getTuningConfig()
);
// Expect metadata call only for tasks that are not active
EasyMock.expect(taskStorage.getTask("id0")).andReturn(Optional.of(completedTaskFromStorage));
KafkaIndexTask taskFromStorage = createKafkaIndexTask(
"id1",
0,
@ -3951,25 +3972,21 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.getTuningConfig()
);
EasyMock.expect(taskStorage.getTask("id1"))
.andReturn(Optional.of(taskFromStorage))
.once();
EasyMock.expect(taskStorage.getTask("id2"))
.andReturn(Optional.of(taskFromStorageMismatchedDataSchema))
.once();
EasyMock.expect(taskStorage.getTask("id3"))
.andReturn(Optional.of(taskFromStorageMismatchedTuningConfig))
.once();
EasyMock.expect(taskStorage.getTask("id4"))
.andReturn(Optional.of(taskFromStorageMismatchedPartitionsWithTaskGroup))
.once();
Map<String, Task> taskMap = ImmutableMap.of(
taskFromStorage.getId(), taskFromStorage,
taskFromStorageMismatchedDataSchema.getId(), taskFromStorageMismatchedDataSchema,
taskFromStorageMismatchedTuningConfig.getId(), taskFromStorageMismatchedTuningConfig,
taskFromStorageMismatchedPartitionsWithTaskGroup.getId(), taskFromStorageMismatchedPartitionsWithTaskGroup
);
replayAll();
Assert.assertTrue(supervisor.isTaskCurrent(42, "id1"));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id2"));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id3"));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id4"));
Assert.assertTrue(supervisor.isTaskCurrent(42, "id0", taskMap));
Assert.assertTrue(supervisor.isTaskCurrent(42, "id1", taskMap));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id2", taskMap));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id3", taskMap));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id4", taskMap));
verifyAll();
}
@ -4795,7 +4812,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
@Override
public boolean isTaskCurrent(int taskGroupId, String taskId)
public boolean isTaskCurrent(int taskGroupId, String taskId, Map<String, Task> taskMap)
{
return isTaskCurrentReturn;
}

View File

@ -4039,25 +4039,19 @@ public class KinesisSupervisorTest extends EasyMockSupport
dataSchema
);
EasyMock.expect(taskStorage.getTask("id1"))
.andReturn(Optional.of(taskFromStorage))
.once();
EasyMock.expect(taskStorage.getTask("id2"))
.andReturn(Optional.of(taskFromStorageMismatchedDataSchema))
.once();
EasyMock.expect(taskStorage.getTask("id3"))
.andReturn(Optional.of(taskFromStorageMismatchedTuningConfig))
.once();
EasyMock.expect(taskStorage.getTask("id4"))
.andReturn(Optional.of(taskFromStorageMismatchedPartitionsWithTaskGroup))
.once();
Map<String, Task> taskMap = ImmutableMap.of(
taskFromStorage.getId(), taskFromStorage,
taskFromStorageMismatchedDataSchema.getId(), taskFromStorageMismatchedDataSchema,
taskFromStorageMismatchedTuningConfig.getId(), taskFromStorageMismatchedTuningConfig,
taskFromStorageMismatchedPartitionsWithTaskGroup.getId(), taskFromStorageMismatchedPartitionsWithTaskGroup
);
replayAll();
Assert.assertTrue(supervisor.isTaskCurrent(42, "id1"));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id2"));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id3"));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id4"));
Assert.assertTrue(supervisor.isTaskCurrent(42, "id1", taskMap));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id2", taskMap));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id3", taskMap));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id4", taskMap));
verifyAll();
}
@ -5571,7 +5565,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Override
public boolean isTaskCurrent(int taskGroupId, String taskId)
public boolean isTaskCurrent(int taskGroupId, String taskId, Map<String, Task> taskMap)
{
return isTaskCurrentReturn;
}

View File

@ -1744,11 +1744,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
int taskCount = 0;
List<String> futureTaskIds = new ArrayList<>();
List<ListenableFuture<Boolean>> futures = new ArrayList<>();
List<Task> tasks = taskStorage.getActiveTasksByDatasource(dataSource);
final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>();
for (Task task : tasks) {
final Map<String, Task> activeTaskMap = getActiveTaskMap();
for (Task task : activeTaskMap.values()) {
if (!doesTaskTypeMatchSupervisor(task)) {
continue;
}
@ -1892,7 +1893,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
// make sure the task's io and tuning configs match with the supervisor config
// if it is current then only create corresponding taskGroup if it does not exist
if (!isTaskCurrent(taskGroupId, taskId)) {
if (!isTaskCurrent(taskGroupId, taskId, activeTaskMap)) {
log.info(
"Stopping task [%s] which does not match the expected parameters and ingestion spec",
taskId
@ -2295,17 +2296,33 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
/**
* Determines whether a given task was created by the current version of the supervisor.
* Uses the Task object mapped to this taskId in the {@code activeTaskMap}.
* If not found in the map, fetch it from the metadata store.
* @param taskGroupId task group id
* @param taskId task id
* @param activeTaskMap Set of active tasks that were pre-fetched
* @return true if the task was created by the current supervisor
*/
@VisibleForTesting
public boolean isTaskCurrent(int taskGroupId, String taskId)
public boolean isTaskCurrent(int taskGroupId, String taskId, Map<String, Task> activeTaskMap)
{
Optional<Task> taskOptional = taskStorage.getTask(taskId);
if (!taskOptional.isPresent() || !doesTaskTypeMatchSupervisor(taskOptional.get())) {
final Task genericTask;
if (activeTaskMap != null && activeTaskMap.containsKey(taskId)) {
genericTask = activeTaskMap.get(taskId);
} else {
genericTask = taskStorage.getTask(taskId).orNull();
}
if (genericTask == null || !doesTaskTypeMatchSupervisor(genericTask)) {
return false;
}
@SuppressWarnings("unchecked")
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task =
(SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType>) taskOptional.get();
(SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType>) genericTask;
// We recompute the sequence name hash for the supervisor's own configuration and compare this to the hash created
// by rehashing the task's sequence name using the most up-to-date class definitions of tuning config and
@ -3255,6 +3272,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private void checkCurrentTaskState() throws ExecutionException, InterruptedException, TimeoutException
{
Map<String, Task> activeTaskMap = getActiveTaskMap();
List<ListenableFuture<?>> futures = new ArrayList<>();
Iterator<Entry<Integer, TaskGroup>> iTaskGroups = activelyReadingTaskGroups.entrySet().iterator();
while (iTaskGroups.hasNext()) {
@ -3277,7 +3296,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
TaskData taskData = task.getValue();
// stop and remove bad tasks from the task group
if (!isTaskCurrent(groupId, taskId)) {
if (!isTaskCurrent(groupId, taskId, activeTaskMap)) {
log.info("Stopping task [%s] which does not match the expected sequence range and ingestion spec", taskId);
futures.add(stopTask(taskId, false));
iTasks.remove();
@ -3508,6 +3527,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
)
{
ImmutableMap.Builder<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> builder = ImmutableMap.builder();
final Map<PartitionIdType, SequenceOffsetType> metadataOffsets = getOffsetsFromMetadataStorage();
for (PartitionIdType partitionId : partitionGroups.get(groupId)) {
SequenceOffsetType sequence = partitionOffsets.get(partitionId);
@ -3519,7 +3539,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
} else {
// if we don't have a startingOffset (first run or we had some previous failures and reset the sequences) then
// get the sequence from metadata storage (if available) or Kafka/Kinesis (otherwise)
OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage = getOffsetFromStorageForPartition(partitionId);
OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage = getOffsetFromStorageForPartition(
partitionId,
metadataOffsets
);
if (offsetFromStorage != null) {
builder.put(partitionId, offsetFromStorage);
@ -3534,9 +3557,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* doesn't find any data, it will retrieve the latest or earliest Kafka/Kinesis sequence depending on the
* {@link SeekableStreamSupervisorIOConfig#useEarliestSequenceNumber}.
*/
private OrderedSequenceNumber<SequenceOffsetType> getOffsetFromStorageForPartition(PartitionIdType partition)
private OrderedSequenceNumber<SequenceOffsetType> getOffsetFromStorageForPartition(
PartitionIdType partition,
final Map<PartitionIdType, SequenceOffsetType> metadataOffsets
)
{
final Map<PartitionIdType, SequenceOffsetType> metadataOffsets = getOffsetsFromMetadataStorage();
SequenceOffsetType sequence = metadataOffsets.get(partition);
if (sequence != null) {
log.debug("Getting sequence [%s] from metadata storage for partition [%s]", sequence, partition);
@ -3881,6 +3906,21 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
partitionIds.addAll(partitionIdsForTests);
}
/**
* Get all active tasks from metadata storage
* @return map from taskId to Task
*/
private Map<String, Task> getActiveTaskMap()
{
ImmutableMap.Builder activeTaskMap = ImmutableMap.builder();
List<Task> tasks = taskStorage.getActiveTasksByDatasource(dataSource);
for (Task task : tasks) {
activeTaskMap.put(task.getId(), task);
}
return activeTaskMap.build();
}
/**
* creates a specific task IOConfig instance for Kafka/Kinesis
*

View File

@ -395,7 +395,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).times(3);
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).times(6);
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();