Fix NPE while handling CheckpointNotice in KafkaSupervisor (#5996)

* Fix NPE while handling CheckpointNotice

* fix code style

* Fix test

* fix test

* add a log for creating a new taskGroup

* fix backward compatibility in KafkaIOConfig
This commit is contained in:
Jihoon Son 2018-07-13 17:14:57 -07:00 committed by Jonathan Wei
parent 31c2179fe1
commit c48aa74a30
13 changed files with 579 additions and 219 deletions

View File

@ -53,7 +53,6 @@ import io.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -240,11 +239,7 @@ public class MaterializedViewSupervisor implements Supervisor
}
@Override
public void checkpoint(
@Nullable String sequenceName,
@Nullable DataSourceMetadata previousCheckPoint,
@Nullable DataSourceMetadata currentCheckPoint
)
public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint)
{
// do nothing
}

View File

@ -600,12 +600,13 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
sequences
);
requestPause();
if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction(
final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction(
task.getDataSource(),
ioConfig.getBaseSequenceName(),
ioConfig.getTaskGroupId(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets))
))) {
);
if (!toolbox.getTaskActionClient().submit(checkpointAction)) {
throw new ISE("Checkpoint request with offsets [%s] failed, dying", nextOffsets);
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import io.druid.segment.indexing.IOConfig;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Map;
public class KafkaIOConfig implements IOConfig
@ -33,6 +34,8 @@ public class KafkaIOConfig implements IOConfig
private static final boolean DEFAULT_USE_TRANSACTION = true;
private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false;
@Nullable
private final Integer taskGroupId;
private final String baseSequenceName;
private final KafkaPartitions startPartitions;
private final KafkaPartitions endPartitions;
@ -44,6 +47,7 @@ public class KafkaIOConfig implements IOConfig
@JsonCreator
public KafkaIOConfig(
@JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility
@JsonProperty("baseSequenceName") String baseSequenceName,
@JsonProperty("startPartitions") KafkaPartitions startPartitions,
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
@ -54,6 +58,7 @@ public class KafkaIOConfig implements IOConfig
@JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
)
{
this.taskGroupId = taskGroupId;
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions");
this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions");
@ -83,6 +88,13 @@ public class KafkaIOConfig implements IOConfig
}
}
@Nullable
@JsonProperty
public Integer getTaskGroupId()
{
return taskGroupId;
}
@JsonProperty
public String getBaseSequenceName()
{
@ -135,7 +147,8 @@ public class KafkaIOConfig implements IOConfig
public String toString()
{
return "KafkaIOConfig{" +
"baseSequenceName='" + baseSequenceName + '\'' +
"taskGroupId=" + taskGroupId +
", baseSequenceName='" + baseSequenceName + '\'' +
", startPartitions=" + startPartitions +
", endPartitions=" + endPartitions +
", consumerProperties=" + consumerProperties +

View File

@ -143,8 +143,7 @@ public class KafkaSupervisor implements Supervisor
* time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups]
* map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]).
*/
@VisibleForTesting
static class TaskGroup
private static class TaskGroup
{
// This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
@ -159,7 +158,7 @@ public class KafkaSupervisor implements Supervisor
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap<>();
public TaskGroup(
TaskGroup(
ImmutableMap<Integer, Long> partitionOffsets,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
@ -171,7 +170,7 @@ public class KafkaSupervisor implements Supervisor
this.sequenceOffsets.put(0, partitionOffsets);
}
public int addNewCheckpoint(Map<Integer, Long> checkpoint)
int addNewCheckpoint(Map<Integer, Long> checkpoint)
{
sequenceOffsets.put(sequenceOffsets.lastKey() + 1, checkpoint);
return sequenceOffsets.lastKey();
@ -212,9 +211,6 @@ public class KafkaSupervisor implements Supervisor
private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, Long>> partitionGroups = new ConcurrentHashMap<>();
// --------------------------------------------------------
// BaseSequenceName -> TaskGroup
private final ConcurrentHashMap<String, TaskGroup> sequenceTaskGroup = new ConcurrentHashMap<>();
private final TaskStorage taskStorage;
private final TaskMaster taskMaster;
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
@ -513,13 +509,9 @@ public class KafkaSupervisor implements Supervisor
}
@Override
public void checkpoint(
String sequenceName,
DataSourceMetadata previousCheckpoint,
DataSourceMetadata currentCheckpoint
)
public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint)
{
Preconditions.checkNotNull(sequenceName, "Cannot checkpoint without a sequence name");
Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null");
Preconditions.checkArgument(
ioConfig.getTopic()
@ -530,12 +522,14 @@ public class KafkaSupervisor implements Supervisor
((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()
);
log.info("Checkpointing [%s] for sequence [%s]", currentCheckpoint, sequenceName);
notices.add(new CheckpointNotice(
sequenceName,
(KafkaDataSourceMetadata) previousCheckpoint,
(KafkaDataSourceMetadata) currentCheckpoint
));
log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId);
notices.add(
new CheckpointNotice(
taskGroupId,
(KafkaDataSourceMetadata) previousCheckpoint,
(KafkaDataSourceMetadata) currentCheckpoint
)
);
}
public void possiblyRegisterListener()
@ -637,17 +631,17 @@ public class KafkaSupervisor implements Supervisor
private class CheckpointNotice implements Notice
{
final String sequenceName;
final int taskGroupId;
final KafkaDataSourceMetadata previousCheckpoint;
final KafkaDataSourceMetadata currentCheckpoint;
CheckpointNotice(
String sequenceName,
int taskGroupId,
KafkaDataSourceMetadata previousCheckpoint,
KafkaDataSourceMetadata currentCheckpoint
)
{
this.sequenceName = sequenceName;
this.taskGroupId = taskGroupId;
this.previousCheckpoint = previousCheckpoint;
this.currentCheckpoint = currentCheckpoint;
}
@ -658,17 +652,12 @@ public class KafkaSupervisor implements Supervisor
// check for consistency
// if already received request for this sequenceName and dataSourceMetadata combination then return
Preconditions.checkNotNull(
sequenceTaskGroup.get(sequenceName),
"WTH?! cannot find task group for this sequence [%s], sequencesTaskGroup map [%s], taskGroups [%s]",
sequenceName,
sequenceTaskGroup,
taskGroups
);
final TreeMap<Integer, Map<Integer, Long>> checkpoints = sequenceTaskGroup.get(sequenceName).sequenceOffsets;
final TaskGroup taskGroup = taskGroups.get(taskGroupId);
// check validity of previousCheckpoint if it is not null
if (previousCheckpoint != null) {
if (isValidTaskGroup(taskGroup)) {
final TreeMap<Integer, Map<Integer, Long>> checkpoints = taskGroup.sequenceOffsets;
// check validity of previousCheckpoint
int index = checkpoints.size();
for (int sequenceId : checkpoints.descendingKeySet()) {
Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
@ -685,26 +674,39 @@ public class KafkaSupervisor implements Supervisor
log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
return;
}
} else {
// There cannot be more than one checkpoint when previous checkpoint is null
// as when the task starts they are sent existing checkpoints
Preconditions.checkState(
checkpoints.size() <= 1,
"Got checkpoint request with null as previous check point, however found more than one checkpoints"
final int taskGroupId = getTaskGroupIdForPartition(
currentCheckpoint.getKafkaPartitions()
.getPartitionOffsetMap()
.keySet()
.iterator()
.next()
);
if (checkpoints.size() == 1) {
log.info("Already checkpointed with dataSourceMetadata [%s]", checkpoints.get(0));
return;
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
}
}
private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
{
if (taskGroup == null) {
// taskGroup might be in pendingCompletionTaskGroups or partitionGroups
if (pendingCompletionTaskGroups.containsKey(taskGroupId)) {
log.warn(
"Ignoring checkpoint request because taskGroup[%d] has already stopped indexing and is waiting for "
+ "publishing segments",
taskGroupId
);
return false;
} else if (partitionGroups.containsKey(taskGroupId)) {
log.warn("Ignoring checkpoint request because taskGroup[%d] is inactive", taskGroupId);
return false;
} else {
throw new ISE("WTH?! cannot find taskGroup [%s] among all taskGroups [%s]", taskGroupId, taskGroups);
}
}
final int taskGroupId = getTaskGroupIdForPartition(currentCheckpoint.getKafkaPartitions()
.getPartitionOffsetMap()
.keySet()
.iterator()
.next());
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
sequenceTaskGroup.get(sequenceName).addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for sequence [%s]", newCheckpoint, sequenceName);
return true;
}
}
@ -718,7 +720,6 @@ public class KafkaSupervisor implements Supervisor
taskGroups.values().forEach(this::killTasksInGroup);
taskGroups.clear();
partitionGroups.clear();
sequenceTaskGroup.clear();
} else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass());
} else {
@ -778,8 +779,7 @@ public class KafkaSupervisor implements Supervisor
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> {
final int groupId = getTaskGroupIdForPartition(partition);
killTaskGroupForPartitions(ImmutableSet.of(partition));
final TaskGroup removedGroup = taskGroups.remove(groupId);
sequenceTaskGroup.remove(generateSequenceName(removedGroup));
taskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET);
});
} else {
@ -955,9 +955,10 @@ public class KafkaSupervisor implements Supervisor
for (int partition = 0; partition < numPartitions; partition++) {
int taskGroupId = getTaskGroupIdForPartition(partition);
partitionGroups.putIfAbsent(taskGroupId, new ConcurrentHashMap<Integer, Long>());
ConcurrentHashMap<Integer, Long> partitionMap = partitionGroups.get(taskGroupId);
ConcurrentHashMap<Integer, Long> partitionMap = partitionGroups.computeIfAbsent(
taskGroupId,
k -> new ConcurrentHashMap<>()
);
// The starting offset for a new partition in [partitionGroups] is initially set to NOT_SET; when a new task group
// is created and is assigned partitions, if the offset in [partitionGroups] is NOT_SET it will take the starting
@ -1087,23 +1088,21 @@ public class KafkaSupervisor implements Supervisor
}
return false;
} else {
final TaskGroup taskGroup = new TaskGroup(
ImmutableMap.copyOf(
kafkaTask.getIOConfig()
.getStartPartitions()
.getPartitionOffsetMap()
), kafkaTask.getIOConfig().getMinimumMessageTime(),
kafkaTask.getIOConfig().getMaximumMessageTime()
);
if (taskGroups.putIfAbsent(
final TaskGroup taskGroup = taskGroups.computeIfAbsent(
taskGroupId,
taskGroup
) == null) {
sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(taskGroupId));
log.info("Created new task group [%d]", taskGroupId);
}
k -> {
log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
return new TaskGroup(
ImmutableMap.copyOf(
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
),
kafkaTask.getIOConfig().getMinimumMessageTime(),
kafkaTask.getIOConfig().getMaximumMessageTime()
);
}
);
taskGroupsToVerify.add(taskGroupId);
taskGroups.get(taskGroupId).tasks.putIfAbsent(taskId, new TaskData());
taskGroup.tasks.putIfAbsent(taskId, new TaskData());
}
}
return true;
@ -1256,7 +1255,6 @@ public class KafkaSupervisor implements Supervisor
// killing all tasks or no task left in the group ?
// clear state about the taskgroup so that get latest offset information is fetched from metadata store
log.warn("Clearing task group [%d] information as no valid tasks left the group", groupId);
sequenceTaskGroup.remove(generateSequenceName(taskGroup));
taskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
}
@ -1281,9 +1279,10 @@ public class KafkaSupervisor implements Supervisor
Map<Integer, Long> startingPartitions
)
{
pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.<TaskGroup>newCopyOnWriteArrayList());
CopyOnWriteArrayList<TaskGroup> taskGroupList = pendingCompletionTaskGroups.get(groupId);
final CopyOnWriteArrayList<TaskGroup> taskGroupList = pendingCompletionTaskGroups.computeIfAbsent(
groupId,
k -> new CopyOnWriteArrayList<>()
);
for (TaskGroup taskGroup : taskGroupList) {
if (taskGroup.partitionOffsets.equals(startingPartitions)) {
if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
@ -1411,8 +1410,7 @@ public class KafkaSupervisor implements Supervisor
if (endOffsets != null) {
// set a timeout and put this group in pendingCompletionTaskGroups so that it can be monitored for completion
group.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.<TaskGroup>newCopyOnWriteArrayList());
pendingCompletionTaskGroups.get(groupId).add(group);
pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group);
// set endOffsets as the next startOffsets
for (Map.Entry<Integer, Long> entry : endOffsets.entrySet()) {
@ -1432,7 +1430,6 @@ public class KafkaSupervisor implements Supervisor
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
}
sequenceTaskGroup.remove(generateSequenceName(group));
// remove this task group from the list of current task groups now that it has been handled
taskGroups.remove(groupId);
}
@ -1456,7 +1453,8 @@ public class KafkaSupervisor implements Supervisor
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
// failed and we need to re-ingest)
return Futures.transform(
stopTasksInGroup(taskGroup), new Function<Object, Map<Integer, Long>>()
stopTasksInGroup(taskGroup),
new Function<Object, Map<Integer, Long>>()
{
@Nullable
@Override
@ -1625,15 +1623,15 @@ public class KafkaSupervisor implements Supervisor
log.warn("All tasks in group [%d] failed to publish, killing all tasks for these partitions", groupId);
} else {
log.makeAlert(
"No task in [%s] succeeded before the completion timeout elapsed [%s]!",
"No task in [%s] for taskGroup [%d] succeeded before the completion timeout elapsed [%s]!",
group.taskIds(),
groupId,
ioConfig.getCompletionTimeout()
).emit();
}
// reset partitions offsets for this task group so that they will be re-read from metadata storage
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
sequenceTaskGroup.remove(generateSequenceName(group));
// kill all the tasks in this pending completion group
killTasksInGroup(group);
// set a flag so the other pending completion groups for this set of partitions will also stop
@ -1693,7 +1691,6 @@ public class KafkaSupervisor implements Supervisor
// be recreated with the next set of offsets
if (taskData.status.isSuccess()) {
futures.add(stopTasksInGroup(taskGroup));
sequenceTaskGroup.remove(generateSequenceName(taskGroup));
iTaskGroups.remove();
break;
}
@ -1735,7 +1732,6 @@ public class KafkaSupervisor implements Supervisor
groupId,
taskGroup
);
sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(groupId));
}
}
@ -1778,6 +1774,7 @@ public class KafkaSupervisor implements Supervisor
DateTime maximumMessageTime = taskGroups.get(groupId).maximumMessageTime.orNull();
KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
groupId,
sequenceName,
new KafkaPartitions(ioConfig.getTopic(), startPartitions),
new KafkaPartitions(ioConfig.getTopic(), endPartitions),
@ -1944,7 +1941,7 @@ public class KafkaSupervisor implements Supervisor
}
}
private ListenableFuture<?> stopTasksInGroup(TaskGroup taskGroup)
private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)
{
if (taskGroup == null) {
return Futures.immediateFuture(null);
@ -2289,6 +2286,28 @@ public class KafkaSupervisor implements Supervisor
return allStats;
}
@VisibleForTesting
@Nullable
TaskGroup removeTaskGroup(int taskGroupId)
{
return taskGroups.remove(taskGroupId);
}
@VisibleForTesting
void moveTaskGroupToPendingCompletion(int taskGroupId)
{
final TaskGroup taskGroup = taskGroups.remove(taskGroupId);
if (taskGroup != null) {
pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, k -> new CopyOnWriteArrayList<>()).add(taskGroup);
}
}
@VisibleForTesting
int getNoticesQueueSize()
{
return notices.size();
}
private static class StatsFromTaskResult
{
private final String groupId;

View File

@ -50,6 +50,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@ -82,6 +83,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@ -118,6 +120,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"taskGroupId\": 0,\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
@ -137,6 +140,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
@ -156,6 +160,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
@ -175,6 +180,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@ -194,6 +200,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@ -214,6 +221,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n"
@ -234,6 +242,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n"

View File

@ -262,21 +262,21 @@ public class KafkaIndexTaskTest
private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
{
return ImmutableList.of(
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "e", "y", "10", "20.0", "1.0")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, StringUtils.toUtf8("unparseable")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, StringUtils.toUtf8("unparseable2")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, null),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2013", "f", "y", "10", "20.0", "1.0")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f", "y", "notanumber", "20.0", "1.0")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f", "y", "10", "notanumber", "1.0")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f", "y", "10", "20.0", "notanumber")),
new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2012", "g", "y", "10", "20.0", "1.0")),
new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2011", "h", "y", "10", "20.0", "1.0"))
new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")),
new ProducerRecord<>(topic, 0, null, null),
new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "notanumber", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", "notanumber", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", "20.0", "notanumber")),
new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", "10", "20.0", "1.0"))
);
}
@ -377,6 +377,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -418,6 +419,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -493,6 +495,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
baseSequenceName,
startPartitions,
endPartitions,
@ -514,14 +517,16 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
Assert.assertTrue(checkpointRequestsHash.contains(
Objects.hash(
DATA_SCHEMA.getDataSource(),
baseSequenceName,
new KafkaDataSourceMetadata(startPartitions),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets))
Assert.assertTrue(
checkpointRequestsHash.contains(
Objects.hash(
DATA_SCHEMA.getDataSource(),
0,
new KafkaDataSourceMetadata(startPartitions),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets))
)
)
));
);
// Check metrics
Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed());
@ -581,6 +586,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
baseSequenceName,
startPartitions,
endPartitions,
@ -603,14 +609,16 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
Assert.assertTrue(checkpointRequestsHash.contains(
Objects.hash(
DATA_SCHEMA.getDataSource(),
baseSequenceName,
new KafkaDataSourceMetadata(startPartitions),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoint.getPartitionOffsetMap()))
Assert.assertTrue(
checkpointRequestsHash.contains(
Objects.hash(
DATA_SCHEMA.getDataSource(),
0,
new KafkaDataSourceMetadata(startPartitions),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoint.getPartitionOffsetMap()))
)
)
));
);
// Check metrics
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getProcessed());
@ -637,6 +645,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -690,6 +699,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -753,6 +763,7 @@ public class KafkaIndexTaskTest
)
),
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -812,6 +823,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
@ -852,6 +864,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -903,6 +916,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -957,6 +971,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 7L)),
@ -1000,6 +1015,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 13L)),
@ -1081,6 +1097,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@ -1140,6 +1157,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -1153,6 +1171,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -1206,6 +1225,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -1219,6 +1239,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
1,
"sequence1",
new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@ -1273,6 +1294,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -1286,6 +1308,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
1,
"sequence1",
new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@ -1345,6 +1368,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)),
@ -1409,6 +1433,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -1422,6 +1447,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
1,
"sequence1",
new KafkaPartitions(topic, ImmutableMap.of(1, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(1, 1L)),
@ -1477,6 +1503,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -1513,6 +1540,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task2 = createTask(
task1.getId(),
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -1564,6 +1592,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -1647,6 +1676,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@ -1685,6 +1715,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 200L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 500L)),
@ -1737,6 +1768,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
"sequence0",
// task should ignore these and use sequence info sent in the context
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
@ -2026,18 +2058,20 @@ public class KafkaIndexTaskTest
@Override
public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable String sequenceName,
int taskGroupId,
@Nullable DataSourceMetadata previousDataSourceMetadata,
@Nullable DataSourceMetadata currentDataSourceMetadata
)
{
log.info("Adding checkpoint hash to the set");
checkpointRequestsHash.add(Objects.hash(
supervisorId,
sequenceName,
previousDataSourceMetadata,
currentDataSourceMetadata
));
checkpointRequestsHash.add(
Objects.hash(
supervisorId,
taskGroupId,
previousDataSourceMetadata,
currentDataSourceMetadata
)
);
return true;
}
}

View File

@ -19,6 +19,7 @@
package io.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@ -61,6 +62,7 @@ import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.parsers.JSONPathFieldSpec;
import io.druid.java.util.common.parsers.JSONPathSpec;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.TestHelper;
@ -70,6 +72,7 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireDepartment;
import io.druid.server.metrics.DruidMonitorSchedulerConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.curator.test.TestingCluster;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -99,7 +102,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.anyObject;
@ -141,6 +146,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private TaskQueue taskQueue;
private String topic;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private ExceptionCapturingServiceEmitter serviceEmitter;
private static String getTopic()
{
@ -213,6 +219,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
topic = getTopic();
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
serviceEmitter = new ExceptionCapturingServiceEmitter();
EmittingLogger.registerEmitter(serviceEmitter);
}
@After
@ -553,7 +561,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
"index_kafka_testDS__some_other_sequenceName",
1,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
null,
@ -564,7 +572,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L)),
null,
@ -575,7 +583,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
"index_kafka_testDS__some_other_sequenceName",
1,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L)),
null,
@ -586,7 +594,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id4 = createKafkaIndexTask(
"id4",
"other-datasource",
"index_kafka_testDS_d927edff33c4b3f",
2,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
null,
@ -634,7 +642,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(2);
replayAll();
@ -652,7 +662,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -661,7 +671,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
"sequenceName-1",
1,
new KafkaPartitions("topic", ImmutableMap.of(1, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)),
null,
@ -670,7 +680,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -679,7 +689,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id4 = createKafkaIndexTask(
"id4",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE)),
null,
@ -688,7 +698,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id5 = createKafkaIndexTask(
"id5",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -727,8 +737,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints1))
.times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints2))
.times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
taskQueue.shutdown("id4");
@ -765,10 +779,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.anyTimes();
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.anyTimes();
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints1))
.anyTimes();
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints2))
.anyTimes();
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
@ -830,7 +846,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
now,
@ -857,7 +873,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L, 2, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(2);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
@ -878,9 +896,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
reset(taskClient);
// for the newly created replica task
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
@ -953,10 +974,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
// there would be 4 tasks, 2 for each task group
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
@ -1063,10 +1086,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
replay(taskStorage, taskRunner, taskClient, taskQueue);
@ -1100,7 +1125,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task task = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1192,7 +1217,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task task = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1282,7 +1307,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1292,7 +1317,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1330,7 +1355,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
// since id1 is publishing, so getCheckpoints wouldn't be called for it
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 1L, 1, 2L, 2, 3L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
replayAll();
@ -1404,10 +1431,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
@ -1463,10 +1492,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
captured = Capture.newInstance(CaptureType.ALL);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@ -1540,10 +1571,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints1))
.times(2);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints2))
.times(2);
captured = Capture.newInstance(CaptureType.ALL);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@ -1622,7 +1655,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1632,7 +1665,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1642,7 +1675,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1678,8 +1711,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
// getCheckpoints will not be called for id1 as it is in publishing state
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
@ -1824,7 +1861,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1834,7 +1871,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1844,7 +1881,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1879,8 +1916,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
@ -1908,7 +1949,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1918,7 +1959,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1928,7 +1969,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
"sequenceName-0",
0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@ -1958,9 +1999,15 @@ public class KafkaSupervisorTest extends EasyMockSupport
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
@ -1980,6 +2027,172 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
@Test(timeout = 60_000L)
public void testCheckpointForInactiveTaskGroup()
throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
{
supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
//not adding any events
final Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
);
final Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
);
final Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)
).anyTimes();
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
final DateTime startTime = DateTimes.nowUtc();
expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
final Map<Integer, Long> fakeCheckpoints = Collections.emptyMap();
supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint(
0,
new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints))
);
while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}
verifyAll();
Assert.assertNull(serviceEmitter.getStackTrace());
Assert.assertNull(serviceEmitter.getExceptionMessage());
Assert.assertNull(serviceEmitter.getExceptionClass());
}
@Test(timeout = 60_000L)
public void testCheckpointForUnknownTaskGroup() throws InterruptedException
{
supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
//not adding any events
final Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
);
final Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
);
final Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)
).anyTimes();
replayAll();
supervisor.start();
supervisor.checkpoint(
0,
new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap()))
);
while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}
verifyAll();
Assert.assertNotNull(serviceEmitter.getStackTrace());
Assert.assertEquals(
"WTH?! cannot find taskGroup [0] among all taskGroups [{}]",
serviceEmitter.getExceptionMessage()
);
Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
}
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
@ -2106,7 +2319,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private KafkaIndexTask createKafkaIndexTask(
String id,
String dataSource,
String sequenceName,
int taskGroupId,
KafkaPartitions startPartitions,
KafkaPartitions endPartitions,
DateTime minimumMessageTime,
@ -2119,7 +2332,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
getDataSchema(dataSource),
tuningConfig,
new KafkaIOConfig(
sequenceName,
taskGroupId,
"sequenceName-" + taskGroupId,
startPartitions,
endPartitions,
ImmutableMap.<String, String>of(),
@ -2128,7 +2342,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
maximumMessageTime,
false
),
ImmutableMap.<String, Object>of(),
Collections.emptyMap(),
null,
null,
rowIngestionMetersFactory

View File

@ -21,27 +21,28 @@ package io.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.DataSourceMetadata;
public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{
private final String supervisorId;
private final String sequenceName;
private final int taskGroupId;
private final DataSourceMetadata previousCheckPoint;
private final DataSourceMetadata currentCheckPoint;
public CheckPointDataSourceMetadataAction(
@JsonProperty("supervisorId") String supervisorId,
@JsonProperty("sequenceName") String sequenceName,
@JsonProperty("taskGroupId") Integer taskGroupId,
@JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint,
@JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
)
{
this.supervisorId = supervisorId;
this.sequenceName = sequenceName;
this.previousCheckPoint = previousCheckPoint;
this.currentCheckPoint = currentCheckPoint;
this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId");
this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint");
this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint");
}
@JsonProperty
@ -51,9 +52,9 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
}
@JsonProperty
public String getSequenceName()
public int getTaskGroupId()
{
return sequenceName;
return taskGroupId;
}
@JsonProperty
@ -81,8 +82,12 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
Task task, TaskActionToolbox toolbox
)
{
return toolbox.getSupervisorManager()
.checkPointDataSourceMetadata(supervisorId, sequenceName, previousCheckPoint, currentCheckPoint);
return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
supervisorId,
taskGroupId,
previousCheckPoint,
currentCheckPoint
);
}
@Override
@ -96,7 +101,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{
return "CheckPointDataSourceMetadataAction{" +
"supervisorId='" + supervisorId + '\'' +
", sequenceName='" + sequenceName + '\'' +
", taskGroupId='" + taskGroupId + '\'' +
", previousCheckPoint=" + previousCheckPoint +
", currentCheckPoint=" + currentCheckPoint +
'}';

View File

@ -165,9 +165,9 @@ public class SupervisorManager
public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable String sequenceName,
@Nullable DataSourceMetadata previousDataSourceMetadata,
@Nullable DataSourceMetadata currentDataSourceMetadata
int taskGroupId,
DataSourceMetadata previousDataSourceMetadata,
DataSourceMetadata currentDataSourceMetadata
)
{
try {
@ -178,7 +178,7 @@ public class SupervisorManager
Preconditions.checkNotNull(supervisor, "supervisor could not be found");
supervisor.lhs.checkpoint(sequenceName, previousDataSourceMetadata, currentDataSourceMetadata);
supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata);
return true;
}
catch (Exception e) {

View File

@ -35,6 +35,10 @@ import java.io.StringWriter;
*/
public class EmittingLogger extends Logger
{
public static final String EXCEPTION_TYPE_KEY = "exceptionType";
public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage";
public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace";
private static volatile ServiceEmitter emitter = null;
private final String className;

View File

@ -83,9 +83,9 @@ public class NoopSupervisorSpec implements SupervisorSpec
@Override
public void checkpoint(
@Nullable String sequenceName,
@Nullable DataSourceMetadata previousCheckPoint,
@Nullable DataSourceMetadata currentCheckPoint
int taskGroupId,
DataSourceMetadata previousCheckPoint,
DataSourceMetadata currentCheckPoint
)
{

View File

@ -22,7 +22,6 @@ package io.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.overlord.DataSourceMetadata;
import javax.annotation.Nullable;
import java.util.Map;
public interface Supervisor
@ -52,13 +51,9 @@ public interface Supervisor
* for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data
* represented by {@param currentCheckpoint} DataSourceMetadata
*
* @param sequenceName unique Identifier to figure out for which sequence to do checkpointing
* @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing
* @param previousCheckPoint DataSourceMetadata checkpointed in previous call
* @param currentCheckPoint current DataSourceMetadata to be checkpointed
*/
void checkpoint(
@Nullable String sequenceName,
@Nullable DataSourceMetadata previousCheckPoint,
@Nullable DataSourceMetadata currentCheckPoint
);
void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint);
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.metrics;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.core.Event;
import io.druid.java.util.emitter.service.ServiceEmitter;
import javax.annotation.Nullable;
import java.util.Map;
public class ExceptionCapturingServiceEmitter extends ServiceEmitter
{
private volatile Class exceptionClass;
private volatile String exceptionMessage;
private volatile String stackTrace;
public ExceptionCapturingServiceEmitter()
{
super("", "", null);
}
@Override
public void emit(Event event)
{
//noinspection unchecked
final Map<String, Object> dataMap = (Map<String, Object>) event.toMap().get("data");
final Class exceptionClass = (Class) dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY);
if (exceptionClass != null) {
final String exceptionMessage = (String) dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY);
final String stackTrace = (String) dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY);
this.exceptionClass = exceptionClass;
this.exceptionMessage = exceptionMessage;
this.stackTrace = stackTrace;
}
}
@Nullable
public Class getExceptionClass()
{
return exceptionClass;
}
@Nullable
public String getExceptionMessage()
{
return exceptionMessage;
}
@Nullable
public String getStackTrace()
{
return stackTrace;
}
}