From 22d6384d364a851fd2b5cecafea9d72b004cb03b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 21 Aug 2019 10:58:22 -0700 Subject: [PATCH] Fix unrealistic test variables in KafkaSupervisorTest and tidy up unused variable in checkpointing process (#7319) * Fix unrealistic test arguments in KafkaSupervisorTest * remove currentCheckpoint from checkpoint action * rename variable --- .../MaterializedViewSupervisor.java | 3 +- .../druid/indexing/kafka/KafkaIndexTask.java | 5 ++ .../indexing/kafka/KafkaIndexTaskTest.java | 24 +++------ .../kafka/supervisor/KafkaSupervisorTest.java | 48 ++++-------------- .../kinesis/KinesisIndexTaskTest.java | 16 ++---- .../supervisor/KinesisSupervisorTest.java | 36 +++----------- .../CheckPointDataSourceMetadataAction.java | 37 ++++++++------ .../supervisor/SupervisorManager.java | 5 +- .../SeekableStreamIndexTaskRunner.java | 9 +--- .../supervisor/SeekableStreamSupervisor.java | 49 +++++++++---------- .../supervisor/NoopSupervisorSpec.java | 3 +- .../overlord/supervisor/Supervisor.java | 8 ++- 12 files changed, 85 insertions(+), 158 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 378ed508755..2dccb547c47 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -280,8 +280,7 @@ public class MaterializedViewSupervisor implements Supervisor public void checkpoint( @Nullable Integer taskGroupId, String baseSequenceName, - DataSourceMetadata previousCheckPoint, - DataSourceMetadata currentCheckPoint + DataSourceMetadata checkpointMetadata ) { // do nothing diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index b55d05b5846..a1b14a07057 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; @@ -83,6 +84,10 @@ public class KafkaIndexTask extends SeekableStreamIndexTask this.configMapper = configMapper; this.ioConfig = ioConfig; + Preconditions.checkArgument( + ioConfig.getStartSequenceNumbers().getExclusivePartitions().isEmpty(), + "All startSequenceNumbers must be inclusive" + ); } long getPollRetryMs() diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index bc019dec0ad..1455d2ab29c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -567,10 +567,7 @@ public class KafkaIndexTaskTest Objects.hash( DATA_SCHEMA.getDataSource(), 0, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>(topic, currentOffsets) - ) + new KafkaDataSourceMetadata(startPartitions) ) ) ); @@ -700,10 +697,7 @@ public class KafkaIndexTaskTest Objects.hash( DATA_SCHEMA.getDataSource(), 0, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>(topic, currentOffsets) - ) + new KafkaDataSourceMetadata(startPartitions) ) ) ); @@ -714,8 +708,7 @@ public class KafkaIndexTaskTest 0, new KafkaDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(topic, currentOffsets, ImmutableSet.of()) - ), - new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, nextOffsets)) + ) ) ) ); @@ -820,10 +813,7 @@ public class KafkaIndexTaskTest Objects.hash( DATA_SCHEMA.getDataSource(), 0, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>(topic, checkpoint.getPartitionSequenceNumberMap()) - ) + new KafkaDataSourceMetadata(startPartitions) ) ) ); @@ -2653,8 +2643,7 @@ public class KafkaIndexTaskTest String supervisorId, @Nullable Integer taskGroupId, String baseSequenceName, - @Nullable DataSourceMetadata previousDataSourceMetadata, - @Nullable DataSourceMetadata currentDataSourceMetadata + @Nullable DataSourceMetadata previousDataSourceMetadata ) { log.info("Adding checkpoint hash to the set"); @@ -2662,8 +2651,7 @@ public class KafkaIndexTaskTest Objects.hash( supervisorId, taskGroupId, - previousDataSourceMetadata, - currentDataSourceMetadata + previousDataSourceMetadata ) ); return true; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 8a9571ff1dc..91b11b3c855 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -628,11 +628,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>( - topic, - ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), - ImmutableSet.of(0, 1, 2) - ) + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()) ) ).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); @@ -669,11 +665,7 @@ public class KafkaSupervisorTest extends EasyMockSupport EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>( - topic, - ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), - ImmutableSet.of(0, 1, 2) - ) + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()) ) ).anyTimes(); replayAll(); @@ -2348,21 +2340,13 @@ public class KafkaSupervisorTest extends EasyMockSupport supervisor.start(); supervisor.runInternal(); - final Map fakeCheckpoints = Collections.emptyMap(); supervisor.moveTaskGroupToPendingCompletion(0); supervisor.checkpoint( 0, id1.getIOConfig().getBaseSequenceName(), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( - topic, - checkpoints.get(0), - ImmutableSet.of() - )), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( - topic, - fakeCheckpoints, - fakeCheckpoints.keySet() - )) + new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of()) + ) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -2441,16 +2425,9 @@ public class KafkaSupervisorTest extends EasyMockSupport supervisor.checkpoint( 0, id1.getIOConfig().getBaseSequenceName(), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( - topic, - Collections.emptyMap(), - ImmutableSet.of() - )), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( - topic, - Collections.emptyMap(), - ImmutableSet.of() - )) + new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of()) + ) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -2552,18 +2529,11 @@ public class KafkaSupervisorTest extends EasyMockSupport supervisor.runInternal(); - final TreeMap> newCheckpoints = new TreeMap<>(); - newCheckpoints.put(0, ImmutableMap.of(0, 10L)); supervisor.checkpoint( null, id1.getIOConfig().getBaseSequenceName(), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( - topic, - checkpoints.get(0), - ImmutableSet.of() - )), new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, newCheckpoints.get(0), newCheckpoints.get(0).keySet()) + new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of()) ) ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 59fe45a5928..d8b98a272a2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -619,8 +619,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport Objects.hash( DATA_SCHEMA.getDataSource(), 0, - new KinesisDataSourceMetadata(startPartitions), - new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, currentOffsets)) + new KinesisDataSourceMetadata(startPartitions) ) ) ); @@ -758,8 +757,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport Objects.hash( DATA_SCHEMA.getDataSource(), 0, - new KinesisDataSourceMetadata(startPartitions), - new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, currentOffsets)) + new KinesisDataSourceMetadata(startPartitions) ) ) ); @@ -769,9 +767,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport DATA_SCHEMA.getDataSource(), 0, new KinesisDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(STREAM, currentOffsets, currentOffsets.keySet()) - ), - new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, nextOffsets)) + new SeekableStreamStartSequenceNumbers<>(STREAM, currentOffsets, currentOffsets.keySet())) ) ) ); @@ -2832,8 +2828,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport String supervisorId, @Nullable Integer taskGroupId, String baseSequenceName, - @Nullable DataSourceMetadata previousDataSourceMetadata, - @Nullable DataSourceMetadata currentDataSourceMetadata + @Nullable DataSourceMetadata checkpointMetadata ) { LOG.info("Adding checkpoint hash to the set"); @@ -2841,8 +2836,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport Objects.hash( supervisorId, taskGroupId, - previousDataSourceMetadata, - currentDataSourceMetadata + checkpointMetadata ) ); return true; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 7b5fcf81661..a8d73e67509 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -2907,19 +2907,13 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisor.start(); supervisor.runInternal(); - final Map fakeCheckpoints = Collections.emptyMap(); supervisor.moveTaskGroupToPendingCompletion(0); supervisor.checkpoint( 0, id1.getIOConfig().getBaseSequenceName(), new KinesisDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), checkpoints.get(0).keySet()) - ), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( - stream, - fakeCheckpoints, - ImmutableSet.of() - )) + ) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -3044,16 +3038,9 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisor.checkpoint( 0, id1.getIOConfig().getBaseSequenceName(), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( - stream, - Collections.emptyMap(), - ImmutableSet.of() - )), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( - stream, - Collections.emptyMap(), - ImmutableSet.of() - )) + new KinesisDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>(stream, Collections.emptyMap(), ImmutableSet.of()) + ) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -3164,21 +3151,12 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisor.runInternal(); - final TreeMap> newCheckpoints = new TreeMap<>(); - newCheckpoints.put(0, ImmutableMap.of(shardId1, "10")); supervisor.checkpoint( null, id1.getIOConfig().getBaseSequenceName(), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( - stream, - checkpoints.get(0), - ImmutableSet.of() - )), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>( - stream, - newCheckpoints.get(0), - ImmutableSet.of() - )) + new KinesisDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), ImmutableSet.of()) + ) ); while (supervisor.getNoticesQueueSize() > 0) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index f0ebd49be90..46c90941636 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -23,7 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import javax.annotation.Nullable; @@ -34,22 +35,28 @@ public class CheckPointDataSourceMetadataAction implements TaskAction private final Integer taskGroupId; @Deprecated private final String baseSequenceName; - private final DataSourceMetadata previousCheckPoint; - private final DataSourceMetadata currentCheckPoint; + private final SeekableStreamDataSourceMetadata checkpointMetadata; public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility, @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this - @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, - @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint + @JsonProperty("previousCheckPoint") @Nullable @Deprecated SeekableStreamDataSourceMetadata previousCheckPoint, + @JsonProperty("checkpointMetadata") @Nullable SeekableStreamDataSourceMetadata checkpointMetadata ) { this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); this.taskGroupId = taskGroupId; this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName"); - this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); - this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint"); + this.checkpointMetadata = checkpointMetadata == null ? previousCheckPoint : checkpointMetadata; + + Preconditions.checkNotNull(this.checkpointMetadata, "checkpointMetadata"); + // checkpointMetadata must be SeekableStreamStartSequenceNumbers because it's the start sequence numbers of the + // sequence currently being checkpointed + Preconditions.checkArgument( + this.checkpointMetadata.getSeekableStreamSequenceNumbers() instanceof SeekableStreamStartSequenceNumbers, + "checkpointMetadata must be SeekableStreamStartSequenceNumbers" + ); } @JsonProperty @@ -72,16 +79,18 @@ public class CheckPointDataSourceMetadataAction implements TaskAction return taskGroupId; } + // For backwards compatibility + @Deprecated @JsonProperty - public DataSourceMetadata getPreviousCheckPoint() + public SeekableStreamDataSourceMetadata getPreviousCheckPoint() { - return previousCheckPoint; + return checkpointMetadata; } @JsonProperty - public DataSourceMetadata getCurrentCheckPoint() + public SeekableStreamDataSourceMetadata getCheckpointMetadata() { - return currentCheckPoint; + return checkpointMetadata; } @Override @@ -99,8 +108,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction supervisorId, taskGroupId, baseSequenceName, - previousCheckPoint, - currentCheckPoint + checkpointMetadata ); } @@ -117,8 +125,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction "supervisorId='" + supervisorId + '\'' + ", baseSequenceName='" + baseSequenceName + '\'' + ", taskGroupId='" + taskGroupId + '\'' + - ", previousCheckPoint=" + previousCheckPoint + - ", currentCheckPoint=" + currentCheckPoint + + ", checkpointMetadata=" + checkpointMetadata + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 24b4b1d52f3..756b707f760 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -194,8 +194,7 @@ public class SupervisorManager String supervisorId, @Nullable Integer taskGroupId, String baseSequenceName, - DataSourceMetadata previousDataSourceMetadata, - DataSourceMetadata currentDataSourceMetadata + DataSourceMetadata previousDataSourceMetadata ) { try { @@ -206,7 +205,7 @@ public class SupervisorManager Preconditions.checkNotNull(supervisor, "supervisor could not be found"); - supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata, currentDataSourceMetadata); + supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata); return true; } catch (Exception e) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index ec7f8dc32bd..fefc7a8f05d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -724,18 +724,13 @@ public abstract class SeekableStreamIndexTaskRunner( stream, sequenceToCheckpoint.getStartOffsets(), sequenceToCheckpoint.getExclusiveStartPartitions() ) - ), - createDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>( - stream, - currOffsets - ) ) ); if (!toolbox.getTaskActionClient().submit(checkpointAction)) { @@ -1167,7 +1162,7 @@ public abstract class SeekableStreamIndexTaskRunner getLastSequenceMetadata() { Preconditions.checkState(!sequences.isEmpty(), "Empty sequences"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 78f9e2d868f..654f0ed6043 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -63,6 +63,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamException; @@ -339,20 +340,17 @@ public abstract class SeekableStreamSupervisor previousCheckpoint; - private final SeekableStreamDataSourceMetadata currentCheckpoint; + private final SeekableStreamDataSourceMetadata checkpointMetadata; - public CheckpointNotice( + CheckpointNotice( @Nullable Integer nullableTaskGroupId, @Deprecated String baseSequenceName, - SeekableStreamDataSourceMetadata previousCheckpoint, - SeekableStreamDataSourceMetadata currentCheckpoint + SeekableStreamDataSourceMetadata checkpointMetadata ) { this.baseSequenceName = baseSequenceName; this.nullableTaskGroupId = nullableTaskGroupId; - this.previousCheckpoint = previousCheckpoint; - this.currentCheckpoint = currentCheckpoint; + this.checkpointMetadata = checkpointMetadata; } @Override @@ -404,7 +402,7 @@ public abstract class SeekableStreamSupervisor checkpoint = checkpoints.get(sequenceId); // We have already verified the stream of the current checkpoint is same with that in ioConfig. // See checkpoint(). - if (checkpoint.equals(previousCheckpoint.getSeekableStreamSequenceNumbers() + if (checkpoint.equals(checkpointMetadata.getSeekableStreamSequenceNumbers() .getPartitionSequenceNumberMap() )) { break; @@ -412,7 +410,7 @@ public abstract class SeekableStreamSupervisor seekableMetadata = + (SeekableStreamDataSourceMetadata) checkpointMetadata; + Preconditions.checkArgument( - spec.getIoConfig() - .getStream() - .equals(((SeekableStreamDataSourceMetadata) currentCheckPoint).getSeekableStreamSequenceNumbers() - .getStream()), + spec.getIoConfig().getStream().equals(seekableMetadata.getSeekableStreamSequenceNumbers().getStream()), "Supervisor stream [%s] and stream in checkpoint [%s] does not match", spec.getIoConfig().getStream(), - ((SeekableStreamDataSourceMetadata) currentCheckPoint).getSeekableStreamSequenceNumbers().getStream() + seekableMetadata.getSeekableStreamSequenceNumbers().getStream() + ); + Preconditions.checkArgument( + seekableMetadata.getSeekableStreamSequenceNumbers() instanceof SeekableStreamStartSequenceNumbers, + "checkpointMetadata must be SeekableStreamStartSequenceNumbers" ); - log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId); - addNotice( - new CheckpointNotice( - taskGroupId, - baseSequenceName, - (SeekableStreamDataSourceMetadata) previousCheckPoint, - (SeekableStreamDataSourceMetadata) currentCheckPoint - ) - ); + log.info("Checkpointing [%s] for taskGroup [%s]", checkpointMetadata, taskGroupId); + addNotice(new CheckpointNotice(taskGroupId, baseSequenceName, seekableMetadata)); } @VisibleForTesting diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index e11b017e5dc..1d1a61f58f7 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -128,8 +128,7 @@ public class NoopSupervisorSpec implements SupervisorSpec public void checkpoint( @Nullable Integer taskGroupId, String baseSequenceName, - DataSourceMetadata previousCheckPoint, - DataSourceMetadata currentCheckPoint + DataSourceMetadata checkpointMetadata ) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index c0ecf44c29c..5009345892f 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -57,18 +57,16 @@ public interface Supervisor /** * The definition of checkpoint is not very strict as currently it does not affect data or control path. * On this call Supervisor can potentially checkpoint data processed so far to some durable storage - * for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data + * for example - Kafka/Kinesis Supervisor uses this to merge and handoff segments containing at least the data * represented by {@param currentCheckpoint} DataSourceMetadata * * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing * @param baseSequenceName baseSequenceName - * @param previousCheckPoint DataSourceMetadata checkpointed in previous call - * @param currentCheckPoint current DataSourceMetadata to be checkpointed + * @param checkpointMetadata metadata for the sequence to currently checkpoint */ void checkpoint( @Nullable Integer taskGroupId, @Deprecated String baseSequenceName, - DataSourceMetadata previousCheckPoint, - DataSourceMetadata currentCheckPoint + DataSourceMetadata checkpointMetadata ); }