From c48aa74a301a11f49b0d6ba6bde4283bdab7f699 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 13 Jul 2018 17:14:57 -0700 Subject: [PATCH] Fix NPE while handling CheckpointNotice in KafkaSupervisor (#5996) * Fix NPE while handling CheckpointNotice * fix code style * Fix test * fix test * add a log for creating a new taskGroup * fix backward compatibility in KafkaIOConfig --- .../MaterializedViewSupervisor.java | 7 +- ...ementalPublishingKafkaIndexTaskRunner.java | 7 +- .../druid/indexing/kafka/KafkaIOConfig.java | 15 +- .../kafka/supervisor/KafkaSupervisor.java | 185 +++++----- .../indexing/kafka/KafkaIOConfigTest.java | 9 + .../indexing/kafka/KafkaIndexTaskTest.java | 106 ++++-- .../kafka/supervisor/KafkaSupervisorTest.java | 344 ++++++++++++++---- .../CheckPointDataSourceMetadataAction.java | 27 +- .../supervisor/SupervisorManager.java | 8 +- .../java/util/emitter/EmittingLogger.java | 4 + .../supervisor/NoopSupervisorSpec.java | 6 +- .../overlord/supervisor/Supervisor.java | 9 +- .../ExceptionCapturingServiceEmitter.java | 71 ++++ 13 files changed, 579 insertions(+), 219 deletions(-) create mode 100644 server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java index eba35a4faff..fedda092c4d 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -53,7 +53,6 @@ import io.druid.timeline.DataSegment; import org.joda.time.Duration; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -240,11 +239,7 @@ public class MaterializedViewSupervisor implements Supervisor } @Override - public void checkpoint( - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousCheckPoint, - @Nullable DataSourceMetadata currentCheckPoint - ) + public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) { // do nothing } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 3a440b14fcb..a93fde611c5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -600,12 +600,13 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask sequences ); requestPause(); - if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction( + final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( task.getDataSource(), - ioConfig.getBaseSequenceName(), + ioConfig.getTaskGroupId(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) - ))) { + ); + if (!toolbox.getTaskActionClient().submit(checkpointAction)) { throw new ISE("Checkpoint request with offsets [%s] failed, dying", nextOffsets); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index 4dd3aaf3885..b6c1d765c95 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -26,6 +26,7 @@ import com.google.common.base.Preconditions; import io.druid.segment.indexing.IOConfig; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.util.Map; public class KafkaIOConfig implements IOConfig @@ -33,6 +34,8 @@ public class KafkaIOConfig implements IOConfig private static final boolean DEFAULT_USE_TRANSACTION = true; private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false; + @Nullable + private final Integer taskGroupId; private final String baseSequenceName; private final KafkaPartitions startPartitions; private final KafkaPartitions endPartitions; @@ -44,6 +47,7 @@ public class KafkaIOConfig implements IOConfig @JsonCreator public KafkaIOConfig( + @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility @JsonProperty("baseSequenceName") String baseSequenceName, @JsonProperty("startPartitions") KafkaPartitions startPartitions, @JsonProperty("endPartitions") KafkaPartitions endPartitions, @@ -54,6 +58,7 @@ public class KafkaIOConfig implements IOConfig @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps ) { + this.taskGroupId = taskGroupId; this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions"); this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); @@ -83,6 +88,13 @@ public class KafkaIOConfig implements IOConfig } } + @Nullable + @JsonProperty + public Integer getTaskGroupId() + { + return taskGroupId; + } + @JsonProperty public String getBaseSequenceName() { @@ -135,7 +147,8 @@ public class KafkaIOConfig implements IOConfig public String toString() { return "KafkaIOConfig{" + - "baseSequenceName='" + baseSequenceName + '\'' + + "taskGroupId=" + taskGroupId + + ", baseSequenceName='" + baseSequenceName + '\'' + ", startPartitions=" + startPartitions + ", endPartitions=" + endPartitions + ", consumerProperties=" + consumerProperties + diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 8ea13efad0a..ed287fa0591 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -143,8 +143,7 @@ public class KafkaSupervisor implements Supervisor * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups] * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]). */ - @VisibleForTesting - static class TaskGroup + private static class TaskGroup { // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in @@ -159,7 +158,7 @@ public class KafkaSupervisor implements Supervisor DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action final TreeMap> sequenceOffsets = new TreeMap<>(); - public TaskGroup( + TaskGroup( ImmutableMap partitionOffsets, Optional minimumMessageTime, Optional maximumMessageTime @@ -171,7 +170,7 @@ public class KafkaSupervisor implements Supervisor this.sequenceOffsets.put(0, partitionOffsets); } - public int addNewCheckpoint(Map checkpoint) + int addNewCheckpoint(Map checkpoint) { sequenceOffsets.put(sequenceOffsets.lastKey() + 1, checkpoint); return sequenceOffsets.lastKey(); @@ -212,9 +211,6 @@ public class KafkaSupervisor implements Supervisor private final ConcurrentHashMap> partitionGroups = new ConcurrentHashMap<>(); // -------------------------------------------------------- - // BaseSequenceName -> TaskGroup - private final ConcurrentHashMap sequenceTaskGroup = new ConcurrentHashMap<>(); - private final TaskStorage taskStorage; private final TaskMaster taskMaster; private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; @@ -513,13 +509,9 @@ public class KafkaSupervisor implements Supervisor } @Override - public void checkpoint( - String sequenceName, - DataSourceMetadata previousCheckpoint, - DataSourceMetadata currentCheckpoint - ) + public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint) { - Preconditions.checkNotNull(sequenceName, "Cannot checkpoint without a sequence name"); + Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint"); Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null"); Preconditions.checkArgument( ioConfig.getTopic() @@ -530,12 +522,14 @@ public class KafkaSupervisor implements Supervisor ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic() ); - log.info("Checkpointing [%s] for sequence [%s]", currentCheckpoint, sequenceName); - notices.add(new CheckpointNotice( - sequenceName, - (KafkaDataSourceMetadata) previousCheckpoint, - (KafkaDataSourceMetadata) currentCheckpoint - )); + log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId); + notices.add( + new CheckpointNotice( + taskGroupId, + (KafkaDataSourceMetadata) previousCheckpoint, + (KafkaDataSourceMetadata) currentCheckpoint + ) + ); } public void possiblyRegisterListener() @@ -637,17 +631,17 @@ public class KafkaSupervisor implements Supervisor private class CheckpointNotice implements Notice { - final String sequenceName; + final int taskGroupId; final KafkaDataSourceMetadata previousCheckpoint; final KafkaDataSourceMetadata currentCheckpoint; CheckpointNotice( - String sequenceName, + int taskGroupId, KafkaDataSourceMetadata previousCheckpoint, KafkaDataSourceMetadata currentCheckpoint ) { - this.sequenceName = sequenceName; + this.taskGroupId = taskGroupId; this.previousCheckpoint = previousCheckpoint; this.currentCheckpoint = currentCheckpoint; } @@ -658,17 +652,12 @@ public class KafkaSupervisor implements Supervisor // check for consistency // if already received request for this sequenceName and dataSourceMetadata combination then return - Preconditions.checkNotNull( - sequenceTaskGroup.get(sequenceName), - "WTH?! cannot find task group for this sequence [%s], sequencesTaskGroup map [%s], taskGroups [%s]", - sequenceName, - sequenceTaskGroup, - taskGroups - ); - final TreeMap> checkpoints = sequenceTaskGroup.get(sequenceName).sequenceOffsets; + final TaskGroup taskGroup = taskGroups.get(taskGroupId); - // check validity of previousCheckpoint if it is not null - if (previousCheckpoint != null) { + if (isValidTaskGroup(taskGroup)) { + final TreeMap> checkpoints = taskGroup.sequenceOffsets; + + // check validity of previousCheckpoint int index = checkpoints.size(); for (int sequenceId : checkpoints.descendingKeySet()) { Map checkpoint = checkpoints.get(sequenceId); @@ -685,26 +674,39 @@ public class KafkaSupervisor implements Supervisor log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue()); return; } - } else { - // There cannot be more than one checkpoint when previous checkpoint is null - // as when the task starts they are sent existing checkpoints - Preconditions.checkState( - checkpoints.size() <= 1, - "Got checkpoint request with null as previous check point, however found more than one checkpoints" + final int taskGroupId = getTaskGroupIdForPartition( + currentCheckpoint.getKafkaPartitions() + .getPartitionOffsetMap() + .keySet() + .iterator() + .next() ); - if (checkpoints.size() == 1) { - log.info("Already checkpointed with dataSourceMetadata [%s]", checkpoints.get(0)); - return; + final Map newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); + taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint); + log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId); + } + } + + private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup) + { + if (taskGroup == null) { + // taskGroup might be in pendingCompletionTaskGroups or partitionGroups + if (pendingCompletionTaskGroups.containsKey(taskGroupId)) { + log.warn( + "Ignoring checkpoint request because taskGroup[%d] has already stopped indexing and is waiting for " + + "publishing segments", + taskGroupId + ); + return false; + } else if (partitionGroups.containsKey(taskGroupId)) { + log.warn("Ignoring checkpoint request because taskGroup[%d] is inactive", taskGroupId); + return false; + } else { + throw new ISE("WTH?! cannot find taskGroup [%s] among all taskGroups [%s]", taskGroupId, taskGroups); } } - final int taskGroupId = getTaskGroupIdForPartition(currentCheckpoint.getKafkaPartitions() - .getPartitionOffsetMap() - .keySet() - .iterator() - .next()); - final Map newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); - sequenceTaskGroup.get(sequenceName).addNewCheckpoint(newCheckpoint); - log.info("Handled checkpoint notice, new checkpoint is [%s] for sequence [%s]", newCheckpoint, sequenceName); + + return true; } } @@ -718,7 +720,6 @@ public class KafkaSupervisor implements Supervisor taskGroups.values().forEach(this::killTasksInGroup); taskGroups.clear(); partitionGroups.clear(); - sequenceTaskGroup.clear(); } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) { throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass()); } else { @@ -778,8 +779,7 @@ public class KafkaSupervisor implements Supervisor resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> { final int groupId = getTaskGroupIdForPartition(partition); killTaskGroupForPartitions(ImmutableSet.of(partition)); - final TaskGroup removedGroup = taskGroups.remove(groupId); - sequenceTaskGroup.remove(generateSequenceName(removedGroup)); + taskGroups.remove(groupId); partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET); }); } else { @@ -955,9 +955,10 @@ public class KafkaSupervisor implements Supervisor for (int partition = 0; partition < numPartitions; partition++) { int taskGroupId = getTaskGroupIdForPartition(partition); - partitionGroups.putIfAbsent(taskGroupId, new ConcurrentHashMap()); - - ConcurrentHashMap partitionMap = partitionGroups.get(taskGroupId); + ConcurrentHashMap partitionMap = partitionGroups.computeIfAbsent( + taskGroupId, + k -> new ConcurrentHashMap<>() + ); // The starting offset for a new partition in [partitionGroups] is initially set to NOT_SET; when a new task group // is created and is assigned partitions, if the offset in [partitionGroups] is NOT_SET it will take the starting @@ -1087,23 +1088,21 @@ public class KafkaSupervisor implements Supervisor } return false; } else { - final TaskGroup taskGroup = new TaskGroup( - ImmutableMap.copyOf( - kafkaTask.getIOConfig() - .getStartPartitions() - .getPartitionOffsetMap() - ), kafkaTask.getIOConfig().getMinimumMessageTime(), - kafkaTask.getIOConfig().getMaximumMessageTime() - ); - if (taskGroups.putIfAbsent( + final TaskGroup taskGroup = taskGroups.computeIfAbsent( taskGroupId, - taskGroup - ) == null) { - sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(taskGroupId)); - log.info("Created new task group [%d]", taskGroupId); - } + k -> { + log.info("Creating a new task group for taskGroupId[%d]", taskGroupId); + return new TaskGroup( + ImmutableMap.copyOf( + kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap() + ), + kafkaTask.getIOConfig().getMinimumMessageTime(), + kafkaTask.getIOConfig().getMaximumMessageTime() + ); + } + ); taskGroupsToVerify.add(taskGroupId); - taskGroups.get(taskGroupId).tasks.putIfAbsent(taskId, new TaskData()); + taskGroup.tasks.putIfAbsent(taskId, new TaskData()); } } return true; @@ -1256,7 +1255,6 @@ public class KafkaSupervisor implements Supervisor // killing all tasks or no task left in the group ? // clear state about the taskgroup so that get latest offset information is fetched from metadata store log.warn("Clearing task group [%d] information as no valid tasks left the group", groupId); - sequenceTaskGroup.remove(generateSequenceName(taskGroup)); taskGroups.remove(groupId); partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); } @@ -1281,9 +1279,10 @@ public class KafkaSupervisor implements Supervisor Map startingPartitions ) { - pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.newCopyOnWriteArrayList()); - - CopyOnWriteArrayList taskGroupList = pendingCompletionTaskGroups.get(groupId); + final CopyOnWriteArrayList taskGroupList = pendingCompletionTaskGroups.computeIfAbsent( + groupId, + k -> new CopyOnWriteArrayList<>() + ); for (TaskGroup taskGroup : taskGroupList) { if (taskGroup.partitionOffsets.equals(startingPartitions)) { if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) { @@ -1411,8 +1410,7 @@ public class KafkaSupervisor implements Supervisor if (endOffsets != null) { // set a timeout and put this group in pendingCompletionTaskGroups so that it can be monitored for completion group.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout()); - pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.newCopyOnWriteArrayList()); - pendingCompletionTaskGroups.get(groupId).add(group); + pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group); // set endOffsets as the next startOffsets for (Map.Entry entry : endOffsets.entrySet()) { @@ -1432,7 +1430,6 @@ public class KafkaSupervisor implements Supervisor partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); } - sequenceTaskGroup.remove(generateSequenceName(group)); // remove this task group from the list of current task groups now that it has been handled taskGroups.remove(groupId); } @@ -1456,7 +1453,8 @@ public class KafkaSupervisor implements Supervisor // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing // failed and we need to re-ingest) return Futures.transform( - stopTasksInGroup(taskGroup), new Function>() + stopTasksInGroup(taskGroup), + new Function>() { @Nullable @Override @@ -1625,15 +1623,15 @@ public class KafkaSupervisor implements Supervisor log.warn("All tasks in group [%d] failed to publish, killing all tasks for these partitions", groupId); } else { log.makeAlert( - "No task in [%s] succeeded before the completion timeout elapsed [%s]!", + "No task in [%s] for taskGroup [%d] succeeded before the completion timeout elapsed [%s]!", group.taskIds(), + groupId, ioConfig.getCompletionTimeout() ).emit(); } // reset partitions offsets for this task group so that they will be re-read from metadata storage partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); - sequenceTaskGroup.remove(generateSequenceName(group)); // kill all the tasks in this pending completion group killTasksInGroup(group); // set a flag so the other pending completion groups for this set of partitions will also stop @@ -1693,7 +1691,6 @@ public class KafkaSupervisor implements Supervisor // be recreated with the next set of offsets if (taskData.status.isSuccess()) { futures.add(stopTasksInGroup(taskGroup)); - sequenceTaskGroup.remove(generateSequenceName(taskGroup)); iTaskGroups.remove(); break; } @@ -1735,7 +1732,6 @@ public class KafkaSupervisor implements Supervisor groupId, taskGroup ); - sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(groupId)); } } @@ -1778,6 +1774,7 @@ public class KafkaSupervisor implements Supervisor DateTime maximumMessageTime = taskGroups.get(groupId).maximumMessageTime.orNull(); KafkaIOConfig kafkaIOConfig = new KafkaIOConfig( + groupId, sequenceName, new KafkaPartitions(ioConfig.getTopic(), startPartitions), new KafkaPartitions(ioConfig.getTopic(), endPartitions), @@ -1944,7 +1941,7 @@ public class KafkaSupervisor implements Supervisor } } - private ListenableFuture stopTasksInGroup(TaskGroup taskGroup) + private ListenableFuture stopTasksInGroup(@Nullable TaskGroup taskGroup) { if (taskGroup == null) { return Futures.immediateFuture(null); @@ -2289,6 +2286,28 @@ public class KafkaSupervisor implements Supervisor return allStats; } + @VisibleForTesting + @Nullable + TaskGroup removeTaskGroup(int taskGroupId) + { + return taskGroups.remove(taskGroupId); + } + + @VisibleForTesting + void moveTaskGroupToPendingCompletion(int taskGroupId) + { + final TaskGroup taskGroup = taskGroups.remove(taskGroupId); + if (taskGroup != null) { + pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, k -> new CopyOnWriteArrayList<>()).add(taskGroup); + } + } + + @VisibleForTesting + int getNoticesQueueSize() + { + return notices.size(); + } + private static class StatsFromTaskResult { private final String groupId; diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java index 3bc55e277cb..050dba753b8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java @@ -50,6 +50,7 @@ public class KafkaIOConfigTest { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -82,6 +83,7 @@ public class KafkaIOConfigTest { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -118,6 +120,7 @@ public class KafkaIOConfigTest { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" @@ -137,6 +140,7 @@ public class KafkaIOConfigTest { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" @@ -156,6 +160,7 @@ public class KafkaIOConfigTest { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" @@ -175,6 +180,7 @@ public class KafkaIOConfigTest { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -194,6 +200,7 @@ public class KafkaIOConfigTest { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -214,6 +221,7 @@ public class KafkaIOConfigTest { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n" @@ -234,6 +242,7 @@ public class KafkaIOConfigTest { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n" diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 15f4bb07775..411fff9168e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -262,21 +262,21 @@ public class KafkaIndexTaskTest private static List> generateRecords(String topic) { return ImmutableList.of( - new ProducerRecord(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2011", "e", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, StringUtils.toUtf8("unparseable")), - new ProducerRecord(topic, 0, null, StringUtils.toUtf8("unparseable2")), - new ProducerRecord(topic, 0, null, null), - new ProducerRecord(topic, 0, null, JB("2013", "f", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2049", "f", "y", "notanumber", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2049", "f", "y", "10", "notanumber", "1.0")), - new ProducerRecord(topic, 0, null, JB("2049", "f", "y", "10", "20.0", "notanumber")), - new ProducerRecord(topic, 1, null, JB("2012", "g", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 1, null, JB("2011", "h", "y", "10", "20.0", "1.0")) + new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")), + new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")), + new ProducerRecord<>(topic, 0, null, null), + new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "notanumber", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", "notanumber", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", "20.0", "notanumber")), + new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", "10", "20.0", "1.0")) ); } @@ -377,6 +377,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -418,6 +419,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -493,6 +495,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, baseSequenceName, startPartitions, endPartitions, @@ -514,14 +517,16 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(1, checkpointRequestsHash.size()); - Assert.assertTrue(checkpointRequestsHash.contains( - Objects.hash( - DATA_SCHEMA.getDataSource(), - baseSequenceName, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)) + Assert.assertTrue( + checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + 0, + new KafkaDataSourceMetadata(startPartitions), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)) + ) ) - )); + ); // Check metrics Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed()); @@ -581,6 +586,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, baseSequenceName, startPartitions, endPartitions, @@ -603,14 +609,16 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(1, checkpointRequestsHash.size()); - Assert.assertTrue(checkpointRequestsHash.contains( - Objects.hash( - DATA_SCHEMA.getDataSource(), - baseSequenceName, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoint.getPartitionOffsetMap())) + Assert.assertTrue( + checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + 0, + new KafkaDataSourceMetadata(startPartitions), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoint.getPartitionOffsetMap())) + ) ) - )); + ); // Check metrics Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getProcessed()); @@ -637,6 +645,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -690,6 +699,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -753,6 +763,7 @@ public class KafkaIndexTaskTest ) ), new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -812,6 +823,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), @@ -852,6 +864,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -903,6 +916,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -957,6 +971,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 7L)), @@ -1000,6 +1015,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 13L)), @@ -1081,6 +1097,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), @@ -1140,6 +1157,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1153,6 +1171,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1206,6 +1225,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1219,6 +1239,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 1, "sequence1", new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), @@ -1273,6 +1294,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1286,6 +1308,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 1, "sequence1", new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), @@ -1345,6 +1368,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)), @@ -1409,6 +1433,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1422,6 +1447,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 1, "sequence1", new KafkaPartitions(topic, ImmutableMap.of(1, 0L)), new KafkaPartitions(topic, ImmutableMap.of(1, 1L)), @@ -1477,6 +1503,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1513,6 +1540,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task2 = createTask( task1.getId(), new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1564,6 +1592,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1647,6 +1676,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1685,6 +1715,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 200L)), new KafkaPartitions(topic, ImmutableMap.of(0, 500L)), @@ -1737,6 +1768,7 @@ public class KafkaIndexTaskTest final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", // task should ignore these and use sequence info sent in the context new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), @@ -2026,18 +2058,20 @@ public class KafkaIndexTaskTest @Override public boolean checkPointDataSourceMetadata( String supervisorId, - @Nullable String sequenceName, + int taskGroupId, @Nullable DataSourceMetadata previousDataSourceMetadata, @Nullable DataSourceMetadata currentDataSourceMetadata ) { log.info("Adding checkpoint hash to the set"); - checkpointRequestsHash.add(Objects.hash( - supervisorId, - sequenceName, - previousDataSourceMetadata, - currentDataSourceMetadata - )); + checkpointRequestsHash.add( + Objects.hash( + supervisorId, + taskGroupId, + previousDataSourceMetadata, + currentDataSourceMetadata + ) + ); return true; } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 86648fef282..f0f6033eea3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -19,6 +19,7 @@ package io.druid.indexing.kafka.supervisor; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -61,6 +62,7 @@ import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.parsers.JSONPathFieldSpec; import io.druid.java.util.common.parsers.JSONPathSpec; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.TestHelper; @@ -70,6 +72,7 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.FireDepartment; import io.druid.server.metrics.DruidMonitorSchedulerConfig; import io.druid.server.metrics.NoopServiceEmitter; +import io.druid.server.metrics.ExceptionCapturingServiceEmitter; import org.apache.curator.test.TestingCluster; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -99,7 +102,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; import static org.easymock.EasyMock.anyBoolean; import static org.easymock.EasyMock.anyObject; @@ -141,6 +146,7 @@ public class KafkaSupervisorTest extends EasyMockSupport private TaskQueue taskQueue; private String topic; private RowIngestionMetersFactory rowIngestionMetersFactory; + private ExceptionCapturingServiceEmitter serviceEmitter; private static String getTopic() { @@ -213,6 +219,8 @@ public class KafkaSupervisorTest extends EasyMockSupport topic = getTopic(); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); + serviceEmitter = new ExceptionCapturingServiceEmitter(); + EmittingLogger.registerEmitter(serviceEmitter); } @After @@ -553,7 +561,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "index_kafka_testDS__some_other_sequenceName", + 1, new KafkaPartitions("topic", ImmutableMap.of(0, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, 10L)), null, @@ -564,7 +572,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L)), null, @@ -575,7 +583,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "index_kafka_testDS__some_other_sequenceName", + 1, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L)), new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L)), null, @@ -586,7 +594,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id4 = createKafkaIndexTask( "id4", "other-datasource", - "index_kafka_testDS_d927edff33c4b3f", + 2, new KafkaPartitions("topic", ImmutableMap.of(0, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, 10L)), null, @@ -634,7 +642,9 @@ public class KafkaSupervisorTest extends EasyMockSupport TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(2); replayAll(); @@ -652,7 +662,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -661,7 +671,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-1", + 1, new KafkaPartitions("topic", ImmutableMap.of(1, 0L)), new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)), null, @@ -670,7 +680,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -679,7 +689,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id4 = createKafkaIndexTask( "id4", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE)), null, @@ -688,7 +698,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id5 = createKafkaIndexTask( "id5", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -727,8 +737,12 @@ public class KafkaSupervisorTest extends EasyMockSupport checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); taskQueue.shutdown("id4"); @@ -765,10 +779,12 @@ public class KafkaSupervisorTest extends EasyMockSupport checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .anyTimes(); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .anyTimes(); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .anyTimes(); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .anyTimes(); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -830,7 +846,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), now, @@ -857,7 +873,9 @@ public class KafkaSupervisorTest extends EasyMockSupport TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 0L, 2, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(2); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -878,9 +896,12 @@ public class KafkaSupervisorTest extends EasyMockSupport reset(taskClient); // for the newly created replica task - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId()))); @@ -953,10 +974,12 @@ public class KafkaSupervisorTest extends EasyMockSupport TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); // there would be 4 tasks, 2 for each task group - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); for (Task task : tasks) { @@ -1063,10 +1086,12 @@ public class KafkaSupervisorTest extends EasyMockSupport checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); replay(taskStorage, taskRunner, taskClient, taskQueue); @@ -1100,7 +1125,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task task = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1192,7 +1217,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task task = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1282,7 +1307,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1292,7 +1317,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1330,7 +1355,9 @@ public class KafkaSupervisorTest extends EasyMockSupport // since id1 is publishing, so getCheckpoints wouldn't be called for it TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); replayAll(); @@ -1404,10 +1431,12 @@ public class KafkaSupervisorTest extends EasyMockSupport checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); for (Task task : tasks) { @@ -1463,10 +1492,12 @@ public class KafkaSupervisorTest extends EasyMockSupport checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); captured = Capture.newInstance(CaptureType.ALL); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); @@ -1540,10 +1571,12 @@ public class KafkaSupervisorTest extends EasyMockSupport checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); captured = Capture.newInstance(CaptureType.ALL); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); @@ -1622,7 +1655,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1632,7 +1665,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1642,7 +1675,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1678,8 +1711,12 @@ public class KafkaSupervisorTest extends EasyMockSupport // getCheckpoints will not be called for id1 as it is in publishing state TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1824,7 +1861,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1834,7 +1871,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1844,7 +1881,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1879,8 +1916,12 @@ public class KafkaSupervisorTest extends EasyMockSupport TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1908,7 +1949,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1918,7 +1959,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1928,7 +1969,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1958,9 +1999,15 @@ public class KafkaSupervisorTest extends EasyMockSupport TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1980,6 +2027,172 @@ public class KafkaSupervisorTest extends EasyMockSupport verifyAll(); } + @Test(timeout = 60_000L) + public void testCheckpointForInactiveTaskGroup() + throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException + { + supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + //not adding any events + final Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect( + indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) + ).anyTimes(); + expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + + final DateTime startTime = DateTimes.nowUtc(); + expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); + + final TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + + final Map fakeCheckpoints = Collections.emptyMap(); + supervisor.moveTaskGroupToPendingCompletion(0); + supervisor.checkpoint( + 0, + new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints)) + ); + + while (supervisor.getNoticesQueueSize() > 0) { + Thread.sleep(100); + } + + verifyAll(); + + Assert.assertNull(serviceEmitter.getStackTrace()); + Assert.assertNull(serviceEmitter.getExceptionMessage()); + Assert.assertNull(serviceEmitter.getExceptionClass()); + } + + @Test(timeout = 60_000L) + public void testCheckpointForUnknownTaskGroup() throws InterruptedException + { + supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + //not adding any events + final Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect( + indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) + ).anyTimes(); + + replayAll(); + + supervisor.start(); + + supervisor.checkpoint( + 0, + new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())) + ); + + while (supervisor.getNoticesQueueSize() > 0) { + Thread.sleep(100); + } + + verifyAll(); + + Assert.assertNotNull(serviceEmitter.getStackTrace()); + Assert.assertEquals( + "WTH?! cannot find taskGroup [0] among all taskGroups [{}]", + serviceEmitter.getExceptionMessage() + ); + Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { @@ -2106,7 +2319,7 @@ public class KafkaSupervisorTest extends EasyMockSupport private KafkaIndexTask createKafkaIndexTask( String id, String dataSource, - String sequenceName, + int taskGroupId, KafkaPartitions startPartitions, KafkaPartitions endPartitions, DateTime minimumMessageTime, @@ -2119,7 +2332,8 @@ public class KafkaSupervisorTest extends EasyMockSupport getDataSchema(dataSource), tuningConfig, new KafkaIOConfig( - sequenceName, + taskGroupId, + "sequenceName-" + taskGroupId, startPartitions, endPartitions, ImmutableMap.of(), @@ -2128,7 +2342,7 @@ public class KafkaSupervisorTest extends EasyMockSupport maximumMessageTime, false ), - ImmutableMap.of(), + Collections.emptyMap(), null, null, rowIngestionMetersFactory diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index 8265f87c7b9..f1d11deb4ea 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -21,27 +21,28 @@ package io.druid.indexing.common.actions; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Preconditions; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.DataSourceMetadata; public class CheckPointDataSourceMetadataAction implements TaskAction { private final String supervisorId; - private final String sequenceName; + private final int taskGroupId; private final DataSourceMetadata previousCheckPoint; private final DataSourceMetadata currentCheckPoint; public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, - @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("taskGroupId") Integer taskGroupId, @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint ) { - this.supervisorId = supervisorId; - this.sequenceName = sequenceName; - this.previousCheckPoint = previousCheckPoint; - this.currentCheckPoint = currentCheckPoint; + this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); + this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId"); + this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); + this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint"); } @JsonProperty @@ -51,9 +52,9 @@ public class CheckPointDataSourceMetadataAction implements TaskAction } @JsonProperty - public String getSequenceName() + public int getTaskGroupId() { - return sequenceName; + return taskGroupId; } @JsonProperty @@ -81,8 +82,12 @@ public class CheckPointDataSourceMetadataAction implements TaskAction Task task, TaskActionToolbox toolbox ) { - return toolbox.getSupervisorManager() - .checkPointDataSourceMetadata(supervisorId, sequenceName, previousCheckPoint, currentCheckPoint); + return toolbox.getSupervisorManager().checkPointDataSourceMetadata( + supervisorId, + taskGroupId, + previousCheckPoint, + currentCheckPoint + ); } @Override @@ -96,7 +101,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction { return "CheckPointDataSourceMetadataAction{" + "supervisorId='" + supervisorId + '\'' + - ", sequenceName='" + sequenceName + '\'' + + ", taskGroupId='" + taskGroupId + '\'' + ", previousCheckPoint=" + previousCheckPoint + ", currentCheckPoint=" + currentCheckPoint + '}'; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index dcdd014c95c..f9a55644432 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -165,9 +165,9 @@ public class SupervisorManager public boolean checkPointDataSourceMetadata( String supervisorId, - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousDataSourceMetadata, - @Nullable DataSourceMetadata currentDataSourceMetadata + int taskGroupId, + DataSourceMetadata previousDataSourceMetadata, + DataSourceMetadata currentDataSourceMetadata ) { try { @@ -178,7 +178,7 @@ public class SupervisorManager Preconditions.checkNotNull(supervisor, "supervisor could not be found"); - supervisor.lhs.checkpoint(sequenceName, previousDataSourceMetadata, currentDataSourceMetadata); + supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata); return true; } catch (Exception e) { diff --git a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java index 0020cf1c79f..661ed17d3b9 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java @@ -35,6 +35,10 @@ import java.io.StringWriter; */ public class EmittingLogger extends Logger { + public static final String EXCEPTION_TYPE_KEY = "exceptionType"; + public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage"; + public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace"; + private static volatile ServiceEmitter emitter = null; private final String className; diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 26ed99cd543..0408104cde8 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -83,9 +83,9 @@ public class NoopSupervisorSpec implements SupervisorSpec @Override public void checkpoint( - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousCheckPoint, - @Nullable DataSourceMetadata currentCheckPoint + int taskGroupId, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint ) { diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index 5afe9122991..04afac7aea6 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -22,7 +22,6 @@ package io.druid.indexing.overlord.supervisor; import com.google.common.collect.ImmutableMap; import io.druid.indexing.overlord.DataSourceMetadata; -import javax.annotation.Nullable; import java.util.Map; public interface Supervisor @@ -52,13 +51,9 @@ public interface Supervisor * for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data * represented by {@param currentCheckpoint} DataSourceMetadata * - * @param sequenceName unique Identifier to figure out for which sequence to do checkpointing + * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing * @param previousCheckPoint DataSourceMetadata checkpointed in previous call * @param currentCheckPoint current DataSourceMetadata to be checkpointed */ - void checkpoint( - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousCheckPoint, - @Nullable DataSourceMetadata currentCheckPoint - ); + void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint); } diff --git a/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java new file mode 100644 index 00000000000..cc217c6f5b0 --- /dev/null +++ b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.server.metrics; + +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.core.Event; +import io.druid.java.util.emitter.service.ServiceEmitter; + +import javax.annotation.Nullable; +import java.util.Map; + +public class ExceptionCapturingServiceEmitter extends ServiceEmitter +{ + private volatile Class exceptionClass; + private volatile String exceptionMessage; + private volatile String stackTrace; + + public ExceptionCapturingServiceEmitter() + { + super("", "", null); + } + + @Override + public void emit(Event event) + { + //noinspection unchecked + final Map dataMap = (Map) event.toMap().get("data"); + final Class exceptionClass = (Class) dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY); + if (exceptionClass != null) { + final String exceptionMessage = (String) dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY); + final String stackTrace = (String) dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY); + this.exceptionClass = exceptionClass; + this.exceptionMessage = exceptionMessage; + this.stackTrace = stackTrace; + } + } + + @Nullable + public Class getExceptionClass() + { + return exceptionClass; + } + + @Nullable + public String getExceptionMessage() + { + return exceptionMessage; + } + + @Nullable + public String getStackTrace() + { + return stackTrace; + } +}