Remove deprecated parameter for Checkpoint request (#8707)

* Remove deprecated parameter for Checkpoint request

* fix wrong doc
This commit is contained in:
Jihoon Son 2019-10-23 16:51:16 -07:00 committed by Fangjin Yang
parent d9c9aef3d1
commit 2518478b20
11 changed files with 15 additions and 296 deletions

View File

@ -52,7 +52,6 @@ import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@ -278,11 +277,7 @@ public class MaterializedViewSupervisor implements Supervisor
}
@Override
public void checkpoint(
@Nullable Integer taskGroupId,
String baseSequenceName,
DataSourceMetadata checkpointMetadata
)
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
// do nothing
}

View File

@ -2705,8 +2705,7 @@ public class KafkaIndexTaskTest
@Override
public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable Integer taskGroupId,
String baseSequenceName,
int taskGroupId,
@Nullable DataSourceMetadata previousDataSourceMetadata
)
{

View File

@ -2378,7 +2378,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())
)
@ -2463,7 +2462,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of())
)
@ -2489,104 +2487,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
}
@Test(timeout = 60_000L)
public void testCheckpointWithNullTaskGroupId()
throws InterruptedException
{
supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
supervisor.getStateManager().markRunFinished();
//not adding any events
final KafkaIndexTask id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null,
tuningConfig
);
final Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null,
tuningConfig
);
final Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null,
tuningConfig
);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
EasyMock.expect(
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)
).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L));
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.anyString(), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(3);
EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
.anyTimes();
EasyMock.expect(taskClient.pauseAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L)))
.anyTimes();
EasyMock.expect(taskClient.setEndOffsetsAsync(
EasyMock.anyString(),
EasyMock.eq(ImmutableMap.of(0, 10L)),
EasyMock.anyBoolean()
))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
replayAll();
supervisor.start();
supervisor.runInternal();
supervisor.checkpoint(
null,
id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())
)
);
while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}
verifyAll();
}
@Test
public void testSuspendedNoRunningTasks() throws Exception
{

View File

@ -2912,8 +2912,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
@Override
public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable Integer taskGroupId,
String baseSequenceName,
int taskGroupId,
@Nullable DataSourceMetadata checkpointMetadata
)
{

View File

@ -2971,7 +2971,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), checkpoints.get(0).keySet())
)
@ -3098,7 +3097,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.emptyMap(), ImmutableSet.of())
)
@ -3123,111 +3121,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
}
@Test(timeout = 60_000L)
public void testCheckpointWithNullTaskGroupId() throws InterruptedException
{
supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null, false);
supervisor.getStateManager().markRunFinished();
//not adding any events
final KinesisIndexTask id1 = createKinesisIndexTask(
"id1",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(SHARD_ID1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER)
),
null,
null
);
final Task id2 = createKinesisIndexTask(
"id2",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(SHARD_ID1, "0"), ImmutableSet.of(SHARD_ID1)),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER)
),
null,
null
);
final Task id3 = createKinesisIndexTask(
"id3",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(SHARD_ID1, "0"), ImmutableSet.of(SHARD_ID1)),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER)
),
null,
null
);
EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
EasyMock.expect(
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KinesisDataSourceMetadata(
null)
).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(SHARD_ID1, "0"));
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.anyString(), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(3);
EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
.anyTimes();
EasyMock.expect(taskClient.pauseAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(SHARD_ID1, "10")))
.anyTimes();
EasyMock.expect(taskClient.setEndOffsetsAsync(
EasyMock.anyString(),
EasyMock.eq(ImmutableMap.of(SHARD_ID1, "10")),
EasyMock.anyBoolean()
))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
replayAll();
supervisor.start();
supervisor.runInternal();
supervisor.checkpoint(
null,
id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), ImmutableSet.of())
)
);
while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}
verifyAll();
}
@Test
public void testSuspendedNoRunningTasks() throws Exception
{

View File

@ -31,23 +31,18 @@ import javax.annotation.Nullable;
public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{
private final String supervisorId;
@Nullable
private final Integer taskGroupId;
@Deprecated
private final String baseSequenceName;
private final int taskGroupId;
private final SeekableStreamDataSourceMetadata checkpointMetadata;
public CheckPointDataSourceMetadataAction(
@JsonProperty("supervisorId") String supervisorId,
@JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility,
@JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this
@JsonProperty("taskGroupId") Integer taskGroupId,
@JsonProperty("previousCheckPoint") @Nullable @Deprecated SeekableStreamDataSourceMetadata previousCheckPoint,
@JsonProperty("checkpointMetadata") @Nullable SeekableStreamDataSourceMetadata checkpointMetadata
)
{
this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId");
this.taskGroupId = taskGroupId;
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName");
this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
this.checkpointMetadata = checkpointMetadata == null ? previousCheckPoint : checkpointMetadata;
Preconditions.checkNotNull(this.checkpointMetadata, "checkpointMetadata");
@ -65,13 +60,6 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
return supervisorId;
}
@Deprecated
@JsonProperty("sequenceName")
public String getBaseSequenceName()
{
return baseSequenceName;
}
@Nullable
@JsonProperty
public Integer getTaskGroupId()
@ -107,7 +95,6 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
supervisorId,
taskGroupId,
baseSequenceName,
checkpointMetadata
);
}
@ -123,7 +110,6 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{
return "CheckPointDataSourceMetadataAction{" +
"supervisorId='" + supervisorId + '\'' +
", baseSequenceName='" + baseSequenceName + '\'' +
", taskGroupId='" + taskGroupId + '\'' +
", checkpointMetadata=" + checkpointMetadata +
'}';

View File

@ -192,8 +192,7 @@ public class SupervisorManager
public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable Integer taskGroupId,
String baseSequenceName,
int taskGroupId,
DataSourceMetadata previousDataSourceMetadata
)
{
@ -205,7 +204,7 @@ public class SupervisorManager
Preconditions.checkNotNull(supervisor, "supervisor could not be found");
supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata);
supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata);
return true;
}
catch (Exception e) {

View File

@ -724,7 +724,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction(
task.getDataSource(),
ioConfig.getTaskGroupId(),
task.getIOConfig().getBaseSequenceName(),
null,
createDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(

View File

@ -349,59 +349,21 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected class CheckpointNotice implements Notice
{
@Nullable
private final Integer nullableTaskGroupId;
@Deprecated
private final String baseSequenceName;
private final int taskGroupId;
private final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> checkpointMetadata;
CheckpointNotice(
@Nullable Integer nullableTaskGroupId,
@Deprecated String baseSequenceName,
int taskGroupId,
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> checkpointMetadata
)
{
this.baseSequenceName = baseSequenceName;
this.nullableTaskGroupId = nullableTaskGroupId;
this.taskGroupId = taskGroupId;
this.checkpointMetadata = checkpointMetadata;
}
@Override
public void handle() throws ExecutionException, InterruptedException
{
// Find taskGroupId using taskId if it's null. It can be null while rolling update.
final int taskGroupId;
if (nullableTaskGroupId == null) {
// We search taskId in activelyReadingTaskGroups and pendingCompletionTaskGroups sequentially. This should be fine because
// 1) a taskGroup can be moved from activelyReadingTaskGroups to pendingCompletionTaskGroups in RunNotice
// (see checkTaskDuration()).
// 2) Notices are proceesed by a single thread. So, CheckpointNotice and RunNotice cannot be processed at the
// same time.
final java.util.Optional<Integer> maybeGroupId = activelyReadingTaskGroups
.entrySet()
.stream()
.filter(entry -> {
final TaskGroup taskGroup = entry.getValue();
return taskGroup.baseSequenceName.equals(baseSequenceName);
})
.findAny()
.map(Entry::getKey);
taskGroupId = maybeGroupId.orElseGet(() -> pendingCompletionTaskGroups
.entrySet()
.stream()
.filter(entry -> {
final List<TaskGroup> taskGroups = entry.getValue();
return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
})
.findAny()
.orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
.getKey());
} else {
taskGroupId = nullableTaskGroupId;
}
// check for consistency
// if already received request for this sequenceName and dataSourceMetadata combination then return
final TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId);
@ -3129,11 +3091,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
@Override
public void checkpoint(
@Nullable Integer taskGroupId,
@Deprecated String baseSequenceName,
DataSourceMetadata checkpointMetadata
)
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
Preconditions.checkNotNull(checkpointMetadata, "checkpointMetadata");
@ -3153,7 +3111,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
log.info("Checkpointing [%s] for taskGroup [%s]", checkpointMetadata, taskGroupId);
addNotice(new CheckpointNotice(taskGroupId, baseSequenceName, seekableMetadata));
addNotice(new CheckpointNotice(taskGroupId, seekableMetadata));
}
@VisibleForTesting

View File

@ -150,11 +150,7 @@ public class NoopSupervisorSpec implements SupervisorSpec
}
@Override
public void checkpoint(
@Nullable Integer taskGroupId,
String baseSequenceName,
DataSourceMetadata checkpointMetadata
)
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
}

View File

@ -61,12 +61,7 @@ public interface Supervisor
* represented by {@param currentCheckpoint} DataSourceMetadata
*
* @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing
* @param baseSequenceName baseSequenceName
* @param checkpointMetadata metadata for the sequence to currently checkpoint
*/
void checkpoint(
@Nullable Integer taskGroupId,
@Deprecated String baseSequenceName,
DataSourceMetadata checkpointMetadata
);
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
}