Fix NPE for taskGroupId when rolling update (#6168)

* Fix NPE for taskGroupId

* missing changes

* fix wrong annotation

* fix potential race

* keep baseSequenceName

* make deprecated old param
This commit is contained in:
Jihoon Son 2018-08-17 10:15:45 -07:00 committed by Fangjin Yang
parent 78fc5b246c
commit 2bfe1b6a5a
9 changed files with 202 additions and 51 deletions

View File

@ -53,6 +53,7 @@ import io.druid.timeline.DataSegment;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -239,7 +240,12 @@ public class MaterializedViewSupervisor implements Supervisor
} }
@Override @Override
public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) public void checkpoint(
@Nullable Integer taskGroupId,
String baseSequenceName,
DataSourceMetadata previousCheckPoint,
DataSourceMetadata currentCheckPoint
)
{ {
// do nothing // do nothing
} }

View File

@ -603,6 +603,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction(
task.getDataSource(), task.getDataSource(),
ioConfig.getTaskGroupId(), ioConfig.getTaskGroupId(),
task.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets))
); );

View File

@ -92,6 +92,7 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
@ -143,7 +144,7 @@ public class KafkaSupervisor implements Supervisor
* time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups] * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups]
* map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]). * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]).
*/ */
private static class TaskGroup private class TaskGroup
{ {
// This specifies the partitions and starting offsets for this task group. It is set on group creation from the data // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
@ -157,6 +158,7 @@ public class KafkaSupervisor implements Supervisor
final Optional<DateTime> maximumMessageTime; final Optional<DateTime> maximumMessageTime;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap<>(); final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap<>();
final String baseSequenceName;
TaskGroup( TaskGroup(
ImmutableMap<Integer, Long> partitionOffsets, ImmutableMap<Integer, Long> partitionOffsets,
@ -168,6 +170,7 @@ public class KafkaSupervisor implements Supervisor
this.minimumMessageTime = minimumMessageTime; this.minimumMessageTime = minimumMessageTime;
this.maximumMessageTime = maximumMessageTime; this.maximumMessageTime = maximumMessageTime;
this.sequenceOffsets.put(0, partitionOffsets); this.sequenceOffsets.put(0, partitionOffsets);
this.baseSequenceName = generateSequenceName(partitionOffsets, minimumMessageTime, maximumMessageTime);
} }
int addNewCheckpoint(Map<Integer, Long> checkpoint) int addNewCheckpoint(Map<Integer, Long> checkpoint)
@ -509,23 +512,29 @@ public class KafkaSupervisor implements Supervisor
} }
@Override @Override
public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint) public void checkpoint(
@Nullable Integer taskGroupId,
@Deprecated String baseSequenceName,
DataSourceMetadata previousCheckPoint,
DataSourceMetadata currentCheckPoint
)
{ {
Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint"); Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint");
Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null"); Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot be null");
Preconditions.checkArgument( Preconditions.checkArgument(
ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()), ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()),
"Supervisor topic [%s] and topic in checkpoint [%s] does not match", "Supervisor topic [%s] and topic in checkpoint [%s] does not match",
ioConfig.getTopic(), ioConfig.getTopic(),
((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic() ((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()
); );
log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId); log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId);
notices.add( notices.add(
new CheckpointNotice( new CheckpointNotice(
taskGroupId, taskGroupId,
(KafkaDataSourceMetadata) previousCheckpoint, baseSequenceName,
(KafkaDataSourceMetadata) currentCheckpoint (KafkaDataSourceMetadata) previousCheckPoint,
(KafkaDataSourceMetadata) currentCheckPoint
) )
); );
} }
@ -629,17 +638,20 @@ public class KafkaSupervisor implements Supervisor
private class CheckpointNotice implements Notice private class CheckpointNotice implements Notice
{ {
final int taskGroupId; @Nullable private final Integer nullableTaskGroupId;
final KafkaDataSourceMetadata previousCheckpoint; @Deprecated private final String baseSequenceName;
final KafkaDataSourceMetadata currentCheckpoint; private final KafkaDataSourceMetadata previousCheckpoint;
private final KafkaDataSourceMetadata currentCheckpoint;
CheckpointNotice( CheckpointNotice(
int taskGroupId, @Nullable Integer nullableTaskGroupId,
@Deprecated String baseSequenceName,
KafkaDataSourceMetadata previousCheckpoint, KafkaDataSourceMetadata previousCheckpoint,
KafkaDataSourceMetadata currentCheckpoint KafkaDataSourceMetadata currentCheckpoint
) )
{ {
this.taskGroupId = taskGroupId; this.baseSequenceName = baseSequenceName;
this.nullableTaskGroupId = nullableTaskGroupId;
this.previousCheckpoint = previousCheckpoint; this.previousCheckpoint = previousCheckpoint;
this.currentCheckpoint = currentCheckpoint; this.currentCheckpoint = currentCheckpoint;
} }
@ -647,12 +659,44 @@ public class KafkaSupervisor implements Supervisor
@Override @Override
public void handle() throws ExecutionException, InterruptedException public void handle() throws ExecutionException, InterruptedException
{ {
// Find taskGroupId using taskId if it's null. It can be null while rolling update.
final int taskGroupId;
if (nullableTaskGroupId == null) {
// We search taskId in taskGroups and pendingCompletionTaskGroups sequentially. This should be fine because
// 1) a taskGroup can be moved from taskGroups to pendingCompletionTaskGroups in RunNotice
// (see checkTaskDuration()).
// 2) Notices are proceesed by a single thread. So, CheckpointNotice and RunNotice cannot be processed at the
// same time.
final java.util.Optional<Integer> maybeGroupId = taskGroups
.entrySet()
.stream()
.filter(entry -> {
final TaskGroup taskGroup = entry.getValue();
return taskGroup.baseSequenceName.equals(baseSequenceName);
})
.findAny()
.map(Entry::getKey);
taskGroupId = maybeGroupId.orElse(
pendingCompletionTaskGroups
.entrySet()
.stream()
.filter(entry -> {
final List<TaskGroup> taskGroups = entry.getValue();
return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
})
.findAny()
.orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
.getKey()
);
} else {
taskGroupId = nullableTaskGroupId;
}
// check for consistency // check for consistency
// if already received request for this sequenceName and dataSourceMetadata combination then return // if already received request for this sequenceName and dataSourceMetadata combination then return
final TaskGroup taskGroup = taskGroups.get(taskGroupId); final TaskGroup taskGroup = taskGroups.get(taskGroupId);
if (isValidTaskGroup(taskGroup)) { if (isValidTaskGroup(taskGroupId, taskGroup)) {
final TreeMap<Integer, Map<Integer, Long>> checkpoints = taskGroup.sequenceOffsets; final TreeMap<Integer, Map<Integer, Long>> checkpoints = taskGroup.sequenceOffsets;
// check validity of previousCheckpoint // check validity of previousCheckpoint
@ -674,20 +718,13 @@ public class KafkaSupervisor implements Supervisor
log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue()); log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
return; return;
} }
final int taskGroupId = getTaskGroupIdForPartition(
currentCheckpoint.getKafkaPartitions()
.getPartitionOffsetMap()
.keySet()
.iterator()
.next()
);
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint); taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId); log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
} }
} }
private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup) private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup)
{ {
if (taskGroup == null) { if (taskGroup == null) {
// taskGroup might be in pendingCompletionTaskGroups or partitionGroups // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
@ -886,17 +923,6 @@ public class KafkaSupervisor implements Supervisor
return Joiner.on("_").join("index_kafka", dataSource, hashCode); return Joiner.on("_").join("index_kafka", dataSource, hashCode);
} }
@VisibleForTesting
String generateSequenceName(TaskGroup taskGroup)
{
Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
return generateSequenceName(
taskGroup.partitionOffsets,
taskGroup.minimumMessageTime,
taskGroup.maximumMessageTime
);
}
private static String getRandomId() private static String getRandomId()
{ {
final StringBuilder suffix = new StringBuilder(8); final StringBuilder suffix = new StringBuilder(8);
@ -1774,7 +1800,6 @@ public class KafkaSupervisor implements Supervisor
endPartitions.put(partition, Long.MAX_VALUE); endPartitions.put(partition, Long.MAX_VALUE);
} }
TaskGroup group = taskGroups.get(groupId); TaskGroup group = taskGroups.get(groupId);
String sequenceName = generateSequenceName(group);
Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties()); Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull(); DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull();
@ -1782,7 +1807,7 @@ public class KafkaSupervisor implements Supervisor
KafkaIOConfig kafkaIOConfig = new KafkaIOConfig( KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
groupId, groupId,
sequenceName, group.baseSequenceName,
new KafkaPartitions(ioConfig.getTopic(), startPartitions), new KafkaPartitions(ioConfig.getTopic(), startPartitions),
new KafkaPartitions(ioConfig.getTopic(), endPartitions), new KafkaPartitions(ioConfig.getTopic(), endPartitions),
consumerProperties, consumerProperties,
@ -1803,10 +1828,10 @@ public class KafkaSupervisor implements Supervisor
.putAll(spec.getContext()) .putAll(spec.getContext())
.build(); .build();
for (int i = 0; i < replicas; i++) { for (int i = 0; i < replicas; i++) {
String taskId = Joiner.on("_").join(sequenceName, getRandomId()); String taskId = Joiner.on("_").join(group.baseSequenceName, getRandomId());
KafkaIndexTask indexTask = new KafkaIndexTask( KafkaIndexTask indexTask = new KafkaIndexTask(
taskId, taskId,
new TaskResource(sequenceName, 1), new TaskResource(group.baseSequenceName, 1),
spec.getDataSchema(), spec.getDataSchema(),
taskTuningConfig, taskTuningConfig,
kafkaIOConfig, kafkaIOConfig,
@ -1936,7 +1961,10 @@ public class KafkaSupervisor implements Supervisor
String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName(); String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName();
if (taskGroups.get(taskGroupId) != null) { if (taskGroups.get(taskGroupId) != null) {
return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName); return Preconditions
.checkNotNull(taskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId)
.baseSequenceName
.equals(taskSequenceName);
} else { } else {
return generateSequenceName( return generateSequenceName(
((KafkaIndexTask) taskOptional.get()).getIOConfig() ((KafkaIndexTask) taskOptional.get()).getIOConfig()

View File

@ -2060,7 +2060,8 @@ public class KafkaIndexTaskTest
@Override @Override
public boolean checkPointDataSourceMetadata( public boolean checkPointDataSourceMetadata(
String supervisorId, String supervisorId,
int taskGroupId, @Nullable Integer taskGroupId,
String baseSequenceName,
@Nullable DataSourceMetadata previousDataSourceMetadata, @Nullable DataSourceMetadata previousDataSourceMetadata,
@Nullable DataSourceMetadata currentDataSourceMetadata @Nullable DataSourceMetadata currentDataSourceMetadata
) )

View File

@ -2104,6 +2104,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.moveTaskGroupToPendingCompletion(0); supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint( supervisor.checkpoint(
0, 0,
((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints)) new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints))
); );
@ -2173,6 +2174,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.checkpoint( supervisor.checkpoint(
0, 0,
((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())) new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap()))
); );
@ -2195,13 +2197,100 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
} }
@Test(timeout = 60_000L)
public void testCheckpointWithNullTaskGroupId()
throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
{
supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
//not adding any events
final Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null
);
final Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null
);
final Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null
);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)
).anyTimes();
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
expect(taskClient.getStatusAsync(anyString()))
.andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING))
.anyTimes();
final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L));
expect(taskClient.getCheckpointsAsync(anyString(), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(3);
expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes();
expect(taskClient.pauseAsync(anyString()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L)))
.anyTimes();
expect(taskClient.setEndOffsetsAsync(anyString(), EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean()))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
replayAll();
supervisor.start();
supervisor.runInternal();
final TreeMap<Integer, Map<Integer, Long>> newCheckpoints = new TreeMap<>();
newCheckpoints.put(0, ImmutableMap.of(0, 10L));
supervisor.checkpoint(
null,
((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, newCheckpoints.get(0)))
);
while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}
verifyAll();
}
private void addSomeEvents(int numEventsPerPartition) throws Exception private void addSomeEvents(int numEventsPerPartition) throws Exception
{ {
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (int i = 0; i < NUM_PARTITIONS; i++) { for (int i = 0; i < NUM_PARTITIONS; i++) {
for (int j = 0; j < numEventsPerPartition; j++) { for (int j = 0; j < numEventsPerPartition; j++) {
kafkaProducer.send( kafkaProducer.send(
new ProducerRecord<byte[], byte[]>( new ProducerRecord<>(
topic, topic,
i, i,
null, null,

View File

@ -25,22 +25,29 @@ import com.google.common.base.Preconditions;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.indexing.overlord.DataSourceMetadata;
import javax.annotation.Nullable;
public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean> public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{ {
private final String supervisorId; private final String supervisorId;
private final int taskGroupId; @Nullable
private final Integer taskGroupId;
@Deprecated
private final String baseSequenceName;
private final DataSourceMetadata previousCheckPoint; private final DataSourceMetadata previousCheckPoint;
private final DataSourceMetadata currentCheckPoint; private final DataSourceMetadata currentCheckPoint;
public CheckPointDataSourceMetadataAction( public CheckPointDataSourceMetadataAction(
@JsonProperty("supervisorId") String supervisorId, @JsonProperty("supervisorId") String supervisorId,
@JsonProperty("taskGroupId") Integer taskGroupId, @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility,
@JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this
@JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint,
@JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
) )
{ {
this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId");
this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId"); this.taskGroupId = taskGroupId;
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName");
this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint");
this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint"); this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint");
} }
@ -51,8 +58,16 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
return supervisorId; return supervisorId;
} }
@Deprecated
@JsonProperty("sequenceName")
public String getBaseSequenceName()
{
return baseSequenceName;
}
@Nullable
@JsonProperty @JsonProperty
public int getTaskGroupId() public Integer getTaskGroupId()
{ {
return taskGroupId; return taskGroupId;
} }
@ -85,6 +100,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
return toolbox.getSupervisorManager().checkPointDataSourceMetadata( return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
supervisorId, supervisorId,
taskGroupId, taskGroupId,
baseSequenceName,
previousCheckPoint, previousCheckPoint,
currentCheckPoint currentCheckPoint
); );
@ -101,6 +117,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{ {
return "CheckPointDataSourceMetadataAction{" + return "CheckPointDataSourceMetadataAction{" +
"supervisorId='" + supervisorId + '\'' + "supervisorId='" + supervisorId + '\'' +
", baseSequenceName='" + baseSequenceName + '\'' +
", taskGroupId='" + taskGroupId + '\'' + ", taskGroupId='" + taskGroupId + '\'' +
", previousCheckPoint=" + previousCheckPoint + ", previousCheckPoint=" + previousCheckPoint +
", currentCheckPoint=" + currentCheckPoint + ", currentCheckPoint=" + currentCheckPoint +

View File

@ -165,7 +165,8 @@ public class SupervisorManager
public boolean checkPointDataSourceMetadata( public boolean checkPointDataSourceMetadata(
String supervisorId, String supervisorId,
int taskGroupId, @Nullable Integer taskGroupId,
String baseSequenceName,
DataSourceMetadata previousDataSourceMetadata, DataSourceMetadata previousDataSourceMetadata,
DataSourceMetadata currentDataSourceMetadata DataSourceMetadata currentDataSourceMetadata
) )
@ -178,7 +179,7 @@ public class SupervisorManager
Preconditions.checkNotNull(supervisor, "supervisor could not be found"); Preconditions.checkNotNull(supervisor, "supervisor could not be found");
supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata); supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata, currentDataSourceMetadata);
return true; return true;
} }
catch (Exception e) { catch (Exception e) {

View File

@ -83,7 +83,8 @@ public class NoopSupervisorSpec implements SupervisorSpec
@Override @Override
public void checkpoint( public void checkpoint(
int taskGroupId, @Nullable Integer taskGroupId,
String baseSequenceName,
DataSourceMetadata previousCheckPoint, DataSourceMetadata previousCheckPoint,
DataSourceMetadata currentCheckPoint DataSourceMetadata currentCheckPoint
) )

View File

@ -22,6 +22,7 @@ package io.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.indexing.overlord.DataSourceMetadata;
import javax.annotation.Nullable;
import java.util.Map; import java.util.Map;
public interface Supervisor public interface Supervisor
@ -52,8 +53,14 @@ public interface Supervisor
* represented by {@param currentCheckpoint} DataSourceMetadata * represented by {@param currentCheckpoint} DataSourceMetadata
* *
* @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing
* @param baseSequenceName baseSequenceName
* @param previousCheckPoint DataSourceMetadata checkpointed in previous call * @param previousCheckPoint DataSourceMetadata checkpointed in previous call
* @param currentCheckPoint current DataSourceMetadata to be checkpointed * @param currentCheckPoint current DataSourceMetadata to be checkpointed
*/ */
void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint); void checkpoint(
@Nullable Integer taskGroupId,
@Deprecated String baseSequenceName,
DataSourceMetadata previousCheckPoint,
DataSourceMetadata currentCheckPoint
);
} }