Fix unrealistic test variables in KafkaSupervisorTest and tidy up unused variable in checkpointing process (#7319)

* Fix unrealistic test arguments in KafkaSupervisorTest

* remove currentCheckpoint from checkpoint action

* rename variable
This commit is contained in:
Jihoon Son 2019-08-21 10:58:22 -07:00 committed by GitHub
parent d007477742
commit 22d6384d36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 85 additions and 158 deletions

View File

@ -280,8 +280,7 @@ public class MaterializedViewSupervisor implements Supervisor
public void checkpoint( public void checkpoint(
@Nullable Integer taskGroupId, @Nullable Integer taskGroupId,
String baseSequenceName, String baseSequenceName,
DataSourceMetadata previousCheckPoint, DataSourceMetadata checkpointMetadata
DataSourceMetadata currentCheckPoint
) )
{ {
// do nothing // do nothing

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@ -83,6 +84,10 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
this.configMapper = configMapper; this.configMapper = configMapper;
this.ioConfig = ioConfig; this.ioConfig = ioConfig;
Preconditions.checkArgument(
ioConfig.getStartSequenceNumbers().getExclusivePartitions().isEmpty(),
"All startSequenceNumbers must be inclusive"
);
} }
long getPollRetryMs() long getPollRetryMs()

View File

@ -567,10 +567,7 @@ public class KafkaIndexTaskTest
Objects.hash( Objects.hash(
DATA_SCHEMA.getDataSource(), DATA_SCHEMA.getDataSource(),
0, 0,
new KafkaDataSourceMetadata(startPartitions), new KafkaDataSourceMetadata(startPartitions)
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, currentOffsets)
)
) )
) )
); );
@ -700,10 +697,7 @@ public class KafkaIndexTaskTest
Objects.hash( Objects.hash(
DATA_SCHEMA.getDataSource(), DATA_SCHEMA.getDataSource(),
0, 0,
new KafkaDataSourceMetadata(startPartitions), new KafkaDataSourceMetadata(startPartitions)
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, currentOffsets)
)
) )
) )
); );
@ -714,8 +708,7 @@ public class KafkaIndexTaskTest
0, 0,
new KafkaDataSourceMetadata( new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, currentOffsets, ImmutableSet.of()) new SeekableStreamStartSequenceNumbers<>(topic, currentOffsets, ImmutableSet.of())
), )
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, nextOffsets))
) )
) )
); );
@ -820,10 +813,7 @@ public class KafkaIndexTaskTest
Objects.hash( Objects.hash(
DATA_SCHEMA.getDataSource(), DATA_SCHEMA.getDataSource(),
0, 0,
new KafkaDataSourceMetadata(startPartitions), new KafkaDataSourceMetadata(startPartitions)
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, checkpoint.getPartitionSequenceNumberMap())
)
) )
) )
); );
@ -2653,8 +2643,7 @@ public class KafkaIndexTaskTest
String supervisorId, String supervisorId,
@Nullable Integer taskGroupId, @Nullable Integer taskGroupId,
String baseSequenceName, String baseSequenceName,
@Nullable DataSourceMetadata previousDataSourceMetadata, @Nullable DataSourceMetadata previousDataSourceMetadata
@Nullable DataSourceMetadata currentDataSourceMetadata
) )
{ {
log.info("Adding checkpoint hash to the set"); log.info("Adding checkpoint hash to the set");
@ -2662,8 +2651,7 @@ public class KafkaIndexTaskTest
Objects.hash( Objects.hash(
supervisorId, supervisorId,
taskGroupId, taskGroupId,
previousDataSourceMetadata, previousDataSourceMetadata
currentDataSourceMetadata
) )
); );
return true; return true;

View File

@ -628,11 +628,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata( new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>( new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of())
topic,
ImmutableMap.of(0, 10L, 1, 20L, 2, 30L),
ImmutableSet.of(0, 1, 2)
)
) )
).anyTimes(); ).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
@ -669,11 +665,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata( new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>( new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of())
topic,
ImmutableMap.of(0, 10L, 1, 20L, 2, 30L),
ImmutableSet.of(0, 1, 2)
)
) )
).anyTimes(); ).anyTimes();
replayAll(); replayAll();
@ -2348,21 +2340,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.start(); supervisor.start();
supervisor.runInternal(); supervisor.runInternal();
final Map<Integer, Long> fakeCheckpoints = Collections.emptyMap();
supervisor.moveTaskGroupToPendingCompletion(0); supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint( supervisor.checkpoint(
0, 0,
id1.getIOConfig().getBaseSequenceName(), id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( new KafkaDataSourceMetadata(
topic, new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())
checkpoints.get(0), )
ImmutableSet.of()
)),
new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
topic,
fakeCheckpoints,
fakeCheckpoints.keySet()
))
); );
while (supervisor.getNoticesQueueSize() > 0) { while (supervisor.getNoticesQueueSize() > 0) {
@ -2441,16 +2425,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.checkpoint( supervisor.checkpoint(
0, 0,
id1.getIOConfig().getBaseSequenceName(), id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( new KafkaDataSourceMetadata(
topic, new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of())
Collections.emptyMap(), )
ImmutableSet.of()
)),
new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
topic,
Collections.emptyMap(),
ImmutableSet.of()
))
); );
while (supervisor.getNoticesQueueSize() > 0) { while (supervisor.getNoticesQueueSize() > 0) {
@ -2552,18 +2529,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.runInternal(); supervisor.runInternal();
final TreeMap<Integer, Map<Integer, Long>> newCheckpoints = new TreeMap<>();
newCheckpoints.put(0, ImmutableMap.of(0, 10L));
supervisor.checkpoint( supervisor.checkpoint(
null, null,
id1.getIOConfig().getBaseSequenceName(), id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
topic,
checkpoints.get(0),
ImmutableSet.of()
)),
new KafkaDataSourceMetadata( new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, newCheckpoints.get(0), newCheckpoints.get(0).keySet()) new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())
) )
); );

View File

@ -619,8 +619,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Objects.hash( Objects.hash(
DATA_SCHEMA.getDataSource(), DATA_SCHEMA.getDataSource(),
0, 0,
new KinesisDataSourceMetadata(startPartitions), new KinesisDataSourceMetadata(startPartitions)
new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, currentOffsets))
) )
) )
); );
@ -758,8 +757,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Objects.hash( Objects.hash(
DATA_SCHEMA.getDataSource(), DATA_SCHEMA.getDataSource(),
0, 0,
new KinesisDataSourceMetadata(startPartitions), new KinesisDataSourceMetadata(startPartitions)
new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, currentOffsets))
) )
) )
); );
@ -769,9 +767,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
DATA_SCHEMA.getDataSource(), DATA_SCHEMA.getDataSource(),
0, 0,
new KinesisDataSourceMetadata( new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, currentOffsets, currentOffsets.keySet()) new SeekableStreamStartSequenceNumbers<>(STREAM, currentOffsets, currentOffsets.keySet()))
),
new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, nextOffsets))
) )
) )
); );
@ -2832,8 +2828,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
String supervisorId, String supervisorId,
@Nullable Integer taskGroupId, @Nullable Integer taskGroupId,
String baseSequenceName, String baseSequenceName,
@Nullable DataSourceMetadata previousDataSourceMetadata, @Nullable DataSourceMetadata checkpointMetadata
@Nullable DataSourceMetadata currentDataSourceMetadata
) )
{ {
LOG.info("Adding checkpoint hash to the set"); LOG.info("Adding checkpoint hash to the set");
@ -2841,8 +2836,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Objects.hash( Objects.hash(
supervisorId, supervisorId,
taskGroupId, taskGroupId,
previousDataSourceMetadata, checkpointMetadata
currentDataSourceMetadata
) )
); );
return true; return true;

View File

@ -2907,19 +2907,13 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.start(); supervisor.start();
supervisor.runInternal(); supervisor.runInternal();
final Map<String, String> fakeCheckpoints = Collections.emptyMap();
supervisor.moveTaskGroupToPendingCompletion(0); supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint( supervisor.checkpoint(
0, 0,
id1.getIOConfig().getBaseSequenceName(), id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata( new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), checkpoints.get(0).keySet()) new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), checkpoints.get(0).keySet())
), )
new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
stream,
fakeCheckpoints,
ImmutableSet.of()
))
); );
while (supervisor.getNoticesQueueSize() > 0) { while (supervisor.getNoticesQueueSize() > 0) {
@ -3044,16 +3038,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.checkpoint( supervisor.checkpoint(
0, 0,
id1.getIOConfig().getBaseSequenceName(), id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( new KinesisDataSourceMetadata(
stream, new SeekableStreamStartSequenceNumbers<>(stream, Collections.emptyMap(), ImmutableSet.of())
Collections.emptyMap(), )
ImmutableSet.of()
)),
new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
stream,
Collections.emptyMap(),
ImmutableSet.of()
))
); );
while (supervisor.getNoticesQueueSize() > 0) { while (supervisor.getNoticesQueueSize() > 0) {
@ -3164,21 +3151,12 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.runInternal(); supervisor.runInternal();
final TreeMap<Integer, Map<String, String>> newCheckpoints = new TreeMap<>();
newCheckpoints.put(0, ImmutableMap.of(shardId1, "10"));
supervisor.checkpoint( supervisor.checkpoint(
null, null,
id1.getIOConfig().getBaseSequenceName(), id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( new KinesisDataSourceMetadata(
stream, new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), ImmutableSet.of())
checkpoints.get(0), )
ImmutableSet.of()
)),
new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
stream,
newCheckpoints.get(0),
ImmutableSet.of()
))
); );
while (supervisor.getNoticesQueueSize() > 0) { while (supervisor.getNoticesQueueSize() > 0) {

View File

@ -23,7 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -34,22 +35,28 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
private final Integer taskGroupId; private final Integer taskGroupId;
@Deprecated @Deprecated
private final String baseSequenceName; private final String baseSequenceName;
private final DataSourceMetadata previousCheckPoint; private final SeekableStreamDataSourceMetadata checkpointMetadata;
private final DataSourceMetadata currentCheckPoint;
public CheckPointDataSourceMetadataAction( public CheckPointDataSourceMetadataAction(
@JsonProperty("supervisorId") String supervisorId, @JsonProperty("supervisorId") String supervisorId,
@JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility, @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility,
@JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this
@JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, @JsonProperty("previousCheckPoint") @Nullable @Deprecated SeekableStreamDataSourceMetadata previousCheckPoint,
@JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint @JsonProperty("checkpointMetadata") @Nullable SeekableStreamDataSourceMetadata checkpointMetadata
) )
{ {
this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId");
this.taskGroupId = taskGroupId; this.taskGroupId = taskGroupId;
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName"); this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName");
this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); this.checkpointMetadata = checkpointMetadata == null ? previousCheckPoint : checkpointMetadata;
this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint");
Preconditions.checkNotNull(this.checkpointMetadata, "checkpointMetadata");
// checkpointMetadata must be SeekableStreamStartSequenceNumbers because it's the start sequence numbers of the
// sequence currently being checkpointed
Preconditions.checkArgument(
this.checkpointMetadata.getSeekableStreamSequenceNumbers() instanceof SeekableStreamStartSequenceNumbers,
"checkpointMetadata must be SeekableStreamStartSequenceNumbers"
);
} }
@JsonProperty @JsonProperty
@ -72,16 +79,18 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
return taskGroupId; return taskGroupId;
} }
// For backwards compatibility
@Deprecated
@JsonProperty @JsonProperty
public DataSourceMetadata getPreviousCheckPoint() public SeekableStreamDataSourceMetadata getPreviousCheckPoint()
{ {
return previousCheckPoint; return checkpointMetadata;
} }
@JsonProperty @JsonProperty
public DataSourceMetadata getCurrentCheckPoint() public SeekableStreamDataSourceMetadata getCheckpointMetadata()
{ {
return currentCheckPoint; return checkpointMetadata;
} }
@Override @Override
@ -99,8 +108,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
supervisorId, supervisorId,
taskGroupId, taskGroupId,
baseSequenceName, baseSequenceName,
previousCheckPoint, checkpointMetadata
currentCheckPoint
); );
} }
@ -117,8 +125,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
"supervisorId='" + supervisorId + '\'' + "supervisorId='" + supervisorId + '\'' +
", baseSequenceName='" + baseSequenceName + '\'' + ", baseSequenceName='" + baseSequenceName + '\'' +
", taskGroupId='" + taskGroupId + '\'' + ", taskGroupId='" + taskGroupId + '\'' +
", previousCheckPoint=" + previousCheckPoint + ", checkpointMetadata=" + checkpointMetadata +
", currentCheckPoint=" + currentCheckPoint +
'}'; '}';
} }
} }

View File

@ -194,8 +194,7 @@ public class SupervisorManager
String supervisorId, String supervisorId,
@Nullable Integer taskGroupId, @Nullable Integer taskGroupId,
String baseSequenceName, String baseSequenceName,
DataSourceMetadata previousDataSourceMetadata, DataSourceMetadata previousDataSourceMetadata
DataSourceMetadata currentDataSourceMetadata
) )
{ {
try { try {
@ -206,7 +205,7 @@ public class SupervisorManager
Preconditions.checkNotNull(supervisor, "supervisor could not be found"); Preconditions.checkNotNull(supervisor, "supervisor could not be found");
supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata, currentDataSourceMetadata); supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata);
return true; return true;
} }
catch (Exception e) { catch (Exception e) {

View File

@ -724,18 +724,13 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
task.getDataSource(), task.getDataSource(),
ioConfig.getTaskGroupId(), ioConfig.getTaskGroupId(),
task.getIOConfig().getBaseSequenceName(), task.getIOConfig().getBaseSequenceName(),
null,
createDataSourceMetadata( createDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>( new SeekableStreamStartSequenceNumbers<>(
stream, stream,
sequenceToCheckpoint.getStartOffsets(), sequenceToCheckpoint.getStartOffsets(),
sequenceToCheckpoint.getExclusiveStartPartitions() sequenceToCheckpoint.getExclusiveStartPartitions()
) )
),
createDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
stream,
currOffsets
)
) )
); );
if (!toolbox.getTaskActionClient().submit(checkpointAction)) { if (!toolbox.getTaskActionClient().submit(checkpointAction)) {

View File

@ -63,6 +63,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamException;
@ -339,20 +340,17 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private final Integer nullableTaskGroupId; private final Integer nullableTaskGroupId;
@Deprecated @Deprecated
private final String baseSequenceName; private final String baseSequenceName;
private final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> previousCheckpoint; private final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> checkpointMetadata;
private final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentCheckpoint;
public CheckpointNotice( CheckpointNotice(
@Nullable Integer nullableTaskGroupId, @Nullable Integer nullableTaskGroupId,
@Deprecated String baseSequenceName, @Deprecated String baseSequenceName,
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> previousCheckpoint, SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> checkpointMetadata
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentCheckpoint
) )
{ {
this.baseSequenceName = baseSequenceName; this.baseSequenceName = baseSequenceName;
this.nullableTaskGroupId = nullableTaskGroupId; this.nullableTaskGroupId = nullableTaskGroupId;
this.previousCheckpoint = previousCheckpoint; this.checkpointMetadata = checkpointMetadata;
this.currentCheckpoint = currentCheckpoint;
} }
@Override @Override
@ -404,7 +402,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Map<PartitionIdType, SequenceOffsetType> checkpoint = checkpoints.get(sequenceId); Map<PartitionIdType, SequenceOffsetType> checkpoint = checkpoints.get(sequenceId);
// We have already verified the stream of the current checkpoint is same with that in ioConfig. // We have already verified the stream of the current checkpoint is same with that in ioConfig.
// See checkpoint(). // See checkpoint().
if (checkpoint.equals(previousCheckpoint.getSeekableStreamSequenceNumbers() if (checkpoint.equals(checkpointMetadata.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap() .getPartitionSequenceNumberMap()
)) { )) {
break; break;
@ -412,7 +410,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
index--; index--;
} }
if (index == 0) { if (index == 0) {
throw new ISE("No such previous checkpoint [%s] found", previousCheckpoint); throw new ISE("No such previous checkpoint [%s] found", checkpointMetadata);
} else if (index < checkpoints.size()) { } else if (index < checkpoints.size()) {
// if the found checkpoint is not the latest one then already checkpointed by a replica // if the found checkpoint is not the latest one then already checkpointed by a replica
Preconditions.checkState(index == checkpoints.size() - 1, "checkpoint consistency failure"); Preconditions.checkState(index == checkpoints.size() - 1, "checkpoint consistency failure");
@ -2772,31 +2770,28 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
public void checkpoint( public void checkpoint(
@Nullable Integer taskGroupId, @Nullable Integer taskGroupId,
@Deprecated String baseSequenceName, @Deprecated String baseSequenceName,
DataSourceMetadata previousCheckPoint, DataSourceMetadata checkpointMetadata
DataSourceMetadata currentCheckPoint
) )
{ {
Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint"); Preconditions.checkNotNull(checkpointMetadata, "checkpointMetadata");
Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot be null");
//noinspection unchecked
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> seekableMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) checkpointMetadata;
Preconditions.checkArgument( Preconditions.checkArgument(
spec.getIoConfig() spec.getIoConfig().getStream().equals(seekableMetadata.getSeekableStreamSequenceNumbers().getStream()),
.getStream()
.equals(((SeekableStreamDataSourceMetadata) currentCheckPoint).getSeekableStreamSequenceNumbers()
.getStream()),
"Supervisor stream [%s] and stream in checkpoint [%s] does not match", "Supervisor stream [%s] and stream in checkpoint [%s] does not match",
spec.getIoConfig().getStream(), spec.getIoConfig().getStream(),
((SeekableStreamDataSourceMetadata) currentCheckPoint).getSeekableStreamSequenceNumbers().getStream() seekableMetadata.getSeekableStreamSequenceNumbers().getStream()
);
Preconditions.checkArgument(
seekableMetadata.getSeekableStreamSequenceNumbers() instanceof SeekableStreamStartSequenceNumbers,
"checkpointMetadata must be SeekableStreamStartSequenceNumbers"
); );
log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId); log.info("Checkpointing [%s] for taskGroup [%s]", checkpointMetadata, taskGroupId);
addNotice( addNotice(new CheckpointNotice(taskGroupId, baseSequenceName, seekableMetadata));
new CheckpointNotice(
taskGroupId,
baseSequenceName,
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) previousCheckPoint,
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) currentCheckPoint
)
);
} }
@VisibleForTesting @VisibleForTesting

View File

@ -128,8 +128,7 @@ public class NoopSupervisorSpec implements SupervisorSpec
public void checkpoint( public void checkpoint(
@Nullable Integer taskGroupId, @Nullable Integer taskGroupId,
String baseSequenceName, String baseSequenceName,
DataSourceMetadata previousCheckPoint, DataSourceMetadata checkpointMetadata
DataSourceMetadata currentCheckPoint
) )
{ {

View File

@ -57,18 +57,16 @@ public interface Supervisor
/** /**
* The definition of checkpoint is not very strict as currently it does not affect data or control path. * The definition of checkpoint is not very strict as currently it does not affect data or control path.
* On this call Supervisor can potentially checkpoint data processed so far to some durable storage * On this call Supervisor can potentially checkpoint data processed so far to some durable storage
* for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data * for example - Kafka/Kinesis Supervisor uses this to merge and handoff segments containing at least the data
* represented by {@param currentCheckpoint} DataSourceMetadata * represented by {@param currentCheckpoint} DataSourceMetadata
* *
* @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing
* @param baseSequenceName baseSequenceName * @param baseSequenceName baseSequenceName
* @param previousCheckPoint DataSourceMetadata checkpointed in previous call * @param checkpointMetadata metadata for the sequence to currently checkpoint
* @param currentCheckPoint current DataSourceMetadata to be checkpointed
*/ */
void checkpoint( void checkpoint(
@Nullable Integer taskGroupId, @Nullable Integer taskGroupId,
@Deprecated String baseSequenceName, @Deprecated String baseSequenceName,
DataSourceMetadata previousCheckPoint, DataSourceMetadata checkpointMetadata
DataSourceMetadata currentCheckPoint
); );
} }