From 2bfe1b6a5a666f925ea1cc1781ea55bafaeeb3c5 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 17 Aug 2018 10:15:45 -0700 Subject: [PATCH] Fix NPE for taskGroupId when rolling update (#6168) * Fix NPE for taskGroupId * missing changes * fix wrong annotation * fix potential race * keep baseSequenceName * make deprecated old param --- .../MaterializedViewSupervisor.java | 8 +- ...ementalPublishingKafkaIndexTaskRunner.java | 1 + .../kafka/supervisor/KafkaSupervisor.java | 108 +++++++++++------- .../indexing/kafka/KafkaIndexTaskTest.java | 3 +- .../kafka/supervisor/KafkaSupervisorTest.java | 91 ++++++++++++++- .../CheckPointDataSourceMetadataAction.java | 25 +++- .../supervisor/SupervisorManager.java | 5 +- .../supervisor/NoopSupervisorSpec.java | 3 +- .../overlord/supervisor/Supervisor.java | 9 +- 9 files changed, 202 insertions(+), 51 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java index fedda092c4d..d499b4f0e8d 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -53,6 +53,7 @@ import io.druid.timeline.DataSegment; import org.joda.time.Duration; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -239,7 +240,12 @@ public class MaterializedViewSupervisor implements Supervisor } @Override - public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) + public void checkpoint( + @Nullable Integer taskGroupId, + String baseSequenceName, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ) { // do nothing } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index f8d2e10c338..6d424163a27 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -603,6 +603,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( task.getDataSource(), ioConfig.getTaskGroupId(), + task.getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index e511b00dcd7..8c9bb599ada 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -92,6 +92,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Random; import java.util.Set; @@ -143,7 +144,7 @@ public class KafkaSupervisor implements Supervisor * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups] * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]). */ - private static class TaskGroup + private class TaskGroup { // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in @@ -157,6 +158,7 @@ public class KafkaSupervisor implements Supervisor final Optional maximumMessageTime; DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action final TreeMap> sequenceOffsets = new TreeMap<>(); + final String baseSequenceName; TaskGroup( ImmutableMap partitionOffsets, @@ -168,6 +170,7 @@ public class KafkaSupervisor implements Supervisor this.minimumMessageTime = minimumMessageTime; this.maximumMessageTime = maximumMessageTime; this.sequenceOffsets.put(0, partitionOffsets); + this.baseSequenceName = generateSequenceName(partitionOffsets, minimumMessageTime, maximumMessageTime); } int addNewCheckpoint(Map checkpoint) @@ -509,23 +512,29 @@ public class KafkaSupervisor implements Supervisor } @Override - public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint) + public void checkpoint( + @Nullable Integer taskGroupId, + @Deprecated String baseSequenceName, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ) { - Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint"); - Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null"); + Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint"); + Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot be null"); Preconditions.checkArgument( - ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()), + ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()), "Supervisor topic [%s] and topic in checkpoint [%s] does not match", ioConfig.getTopic(), - ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic() + ((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic() ); - log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId); + log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId); notices.add( new CheckpointNotice( taskGroupId, - (KafkaDataSourceMetadata) previousCheckpoint, - (KafkaDataSourceMetadata) currentCheckpoint + baseSequenceName, + (KafkaDataSourceMetadata) previousCheckPoint, + (KafkaDataSourceMetadata) currentCheckPoint ) ); } @@ -629,17 +638,20 @@ public class KafkaSupervisor implements Supervisor private class CheckpointNotice implements Notice { - final int taskGroupId; - final KafkaDataSourceMetadata previousCheckpoint; - final KafkaDataSourceMetadata currentCheckpoint; + @Nullable private final Integer nullableTaskGroupId; + @Deprecated private final String baseSequenceName; + private final KafkaDataSourceMetadata previousCheckpoint; + private final KafkaDataSourceMetadata currentCheckpoint; CheckpointNotice( - int taskGroupId, + @Nullable Integer nullableTaskGroupId, + @Deprecated String baseSequenceName, KafkaDataSourceMetadata previousCheckpoint, KafkaDataSourceMetadata currentCheckpoint ) { - this.taskGroupId = taskGroupId; + this.baseSequenceName = baseSequenceName; + this.nullableTaskGroupId = nullableTaskGroupId; this.previousCheckpoint = previousCheckpoint; this.currentCheckpoint = currentCheckpoint; } @@ -647,12 +659,44 @@ public class KafkaSupervisor implements Supervisor @Override public void handle() throws ExecutionException, InterruptedException { + // Find taskGroupId using taskId if it's null. It can be null while rolling update. + final int taskGroupId; + if (nullableTaskGroupId == null) { + // We search taskId in taskGroups and pendingCompletionTaskGroups sequentially. This should be fine because + // 1) a taskGroup can be moved from taskGroups to pendingCompletionTaskGroups in RunNotice + // (see checkTaskDuration()). + // 2) Notices are proceesed by a single thread. So, CheckpointNotice and RunNotice cannot be processed at the + // same time. + final java.util.Optional maybeGroupId = taskGroups + .entrySet() + .stream() + .filter(entry -> { + final TaskGroup taskGroup = entry.getValue(); + return taskGroup.baseSequenceName.equals(baseSequenceName); + }) + .findAny() + .map(Entry::getKey); + taskGroupId = maybeGroupId.orElse( + pendingCompletionTaskGroups + .entrySet() + .stream() + .filter(entry -> { + final List taskGroups = entry.getValue(); + return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName)); + }) + .findAny() + .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName)) + .getKey() + ); + } else { + taskGroupId = nullableTaskGroupId; + } + // check for consistency // if already received request for this sequenceName and dataSourceMetadata combination then return - final TaskGroup taskGroup = taskGroups.get(taskGroupId); - if (isValidTaskGroup(taskGroup)) { + if (isValidTaskGroup(taskGroupId, taskGroup)) { final TreeMap> checkpoints = taskGroup.sequenceOffsets; // check validity of previousCheckpoint @@ -674,20 +718,13 @@ public class KafkaSupervisor implements Supervisor log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue()); return; } - final int taskGroupId = getTaskGroupIdForPartition( - currentCheckpoint.getKafkaPartitions() - .getPartitionOffsetMap() - .keySet() - .iterator() - .next() - ); final Map newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint); log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId); } } - private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup) + private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup) { if (taskGroup == null) { // taskGroup might be in pendingCompletionTaskGroups or partitionGroups @@ -886,17 +923,6 @@ public class KafkaSupervisor implements Supervisor return Joiner.on("_").join("index_kafka", dataSource, hashCode); } - @VisibleForTesting - String generateSequenceName(TaskGroup taskGroup) - { - Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null"); - return generateSequenceName( - taskGroup.partitionOffsets, - taskGroup.minimumMessageTime, - taskGroup.maximumMessageTime - ); - } - private static String getRandomId() { final StringBuilder suffix = new StringBuilder(8); @@ -1774,7 +1800,6 @@ public class KafkaSupervisor implements Supervisor endPartitions.put(partition, Long.MAX_VALUE); } TaskGroup group = taskGroups.get(groupId); - String sequenceName = generateSequenceName(group); Map consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties()); DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull(); @@ -1782,7 +1807,7 @@ public class KafkaSupervisor implements Supervisor KafkaIOConfig kafkaIOConfig = new KafkaIOConfig( groupId, - sequenceName, + group.baseSequenceName, new KafkaPartitions(ioConfig.getTopic(), startPartitions), new KafkaPartitions(ioConfig.getTopic(), endPartitions), consumerProperties, @@ -1803,10 +1828,10 @@ public class KafkaSupervisor implements Supervisor .putAll(spec.getContext()) .build(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(sequenceName, getRandomId()); + String taskId = Joiner.on("_").join(group.baseSequenceName, getRandomId()); KafkaIndexTask indexTask = new KafkaIndexTask( taskId, - new TaskResource(sequenceName, 1), + new TaskResource(group.baseSequenceName, 1), spec.getDataSchema(), taskTuningConfig, kafkaIOConfig, @@ -1936,7 +1961,10 @@ public class KafkaSupervisor implements Supervisor String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName(); if (taskGroups.get(taskGroupId) != null) { - return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName); + return Preconditions + .checkNotNull(taskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId) + .baseSequenceName + .equals(taskSequenceName); } else { return generateSequenceName( ((KafkaIndexTask) taskOptional.get()).getIOConfig() diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 1929bb45132..c3cf1153671 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2060,7 +2060,8 @@ public class KafkaIndexTaskTest @Override public boolean checkPointDataSourceMetadata( String supervisorId, - int taskGroupId, + @Nullable Integer taskGroupId, + String baseSequenceName, @Nullable DataSourceMetadata previousDataSourceMetadata, @Nullable DataSourceMetadata currentDataSourceMetadata ) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 36f3d733d6f..83e253ff9f1 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2104,6 +2104,7 @@ public class KafkaSupervisorTest extends EasyMockSupport supervisor.moveTaskGroupToPendingCompletion(0); supervisor.checkpoint( 0, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints)) ); @@ -2173,6 +2174,7 @@ public class KafkaSupervisorTest extends EasyMockSupport supervisor.checkpoint( 0, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())) ); @@ -2195,13 +2197,100 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); } + @Test(timeout = 60_000L) + public void testCheckpointWithNullTaskGroupId() + throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException + { + supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false); + //not adding any events + final Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + final Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + final Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect( + indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) + ).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + expect(taskClient.getStatusAsync(anyString())) + .andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)) + .anyTimes(); + final TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ImmutableMap.of(0, 0L)); + expect(taskClient.getCheckpointsAsync(anyString(), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(3); + expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes(); + expect(taskClient.pauseAsync(anyString())) + .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L))) + .anyTimes(); + expect(taskClient.setEndOffsetsAsync(anyString(), EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean())) + .andReturn(Futures.immediateFuture(true)) + .anyTimes(); + + replayAll(); + + supervisor.start(); + + supervisor.runInternal(); + + final TreeMap> newCheckpoints = new TreeMap<>(); + newCheckpoints.put(0, ImmutableMap.of(0, 10L)); + supervisor.checkpoint( + null, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, newCheckpoints.get(0))) + ); + + while (supervisor.getNoticesQueueSize() > 0) { + Thread.sleep(100); + } + + verifyAll(); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { for (int i = 0; i < NUM_PARTITIONS; i++) { for (int j = 0; j < numEventsPerPartition; j++) { kafkaProducer.send( - new ProducerRecord( + new ProducerRecord<>( topic, i, null, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index f1d11deb4ea..433af987be3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -25,22 +25,29 @@ import com.google.common.base.Preconditions; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.DataSourceMetadata; +import javax.annotation.Nullable; + public class CheckPointDataSourceMetadataAction implements TaskAction { private final String supervisorId; - private final int taskGroupId; + @Nullable + private final Integer taskGroupId; + @Deprecated + private final String baseSequenceName; private final DataSourceMetadata previousCheckPoint; private final DataSourceMetadata currentCheckPoint; public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, - @JsonProperty("taskGroupId") Integer taskGroupId, + @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility, + @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint ) { this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); - this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId"); + this.taskGroupId = taskGroupId; + this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName"); this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint"); } @@ -51,8 +58,16 @@ public class CheckPointDataSourceMetadataAction implements TaskAction return supervisorId; } + @Deprecated + @JsonProperty("sequenceName") + public String getBaseSequenceName() + { + return baseSequenceName; + } + + @Nullable @JsonProperty - public int getTaskGroupId() + public Integer getTaskGroupId() { return taskGroupId; } @@ -85,6 +100,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction return toolbox.getSupervisorManager().checkPointDataSourceMetadata( supervisorId, taskGroupId, + baseSequenceName, previousCheckPoint, currentCheckPoint ); @@ -101,6 +117,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction { return "CheckPointDataSourceMetadataAction{" + "supervisorId='" + supervisorId + '\'' + + ", baseSequenceName='" + baseSequenceName + '\'' + ", taskGroupId='" + taskGroupId + '\'' + ", previousCheckPoint=" + previousCheckPoint + ", currentCheckPoint=" + currentCheckPoint + diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 355465cec5d..03cc96f5b0b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -165,7 +165,8 @@ public class SupervisorManager public boolean checkPointDataSourceMetadata( String supervisorId, - int taskGroupId, + @Nullable Integer taskGroupId, + String baseSequenceName, DataSourceMetadata previousDataSourceMetadata, DataSourceMetadata currentDataSourceMetadata ) @@ -178,7 +179,7 @@ public class SupervisorManager Preconditions.checkNotNull(supervisor, "supervisor could not be found"); - supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata); + supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata, currentDataSourceMetadata); return true; } catch (Exception e) { diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 0408104cde8..ccf695e41a5 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -83,7 +83,8 @@ public class NoopSupervisorSpec implements SupervisorSpec @Override public void checkpoint( - int taskGroupId, + @Nullable Integer taskGroupId, + String baseSequenceName, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint ) diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index 04afac7aea6..6f3c05003a8 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -22,6 +22,7 @@ package io.druid.indexing.overlord.supervisor; import com.google.common.collect.ImmutableMap; import io.druid.indexing.overlord.DataSourceMetadata; +import javax.annotation.Nullable; import java.util.Map; public interface Supervisor @@ -52,8 +53,14 @@ public interface Supervisor * represented by {@param currentCheckpoint} DataSourceMetadata * * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing + * @param baseSequenceName baseSequenceName * @param previousCheckPoint DataSourceMetadata checkpointed in previous call * @param currentCheckPoint current DataSourceMetadata to be checkpointed */ - void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint); + void checkpoint( + @Nullable Integer taskGroupId, + @Deprecated String baseSequenceName, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ); }