Fix Kafka Indexing task pause forever if no events in taskDuration (#5656) (#5899)

* Fix Kafka Indexing task pause forever (#5656)

* Fix Nullpointer Exception in overlord if taskGroups does not contain the groupId
* If the endOffset is same as startOffset, still let the task resume instead of returning
   endOffsets early which causes the tasks to pause forever and ultimately fail on timeout

* Address PR comment

*Remove the null check and do not return null from generateSequenceName
This commit is contained in:
Surekha 2018-06-25 19:29:36 -07:00 committed by Gian Merlino
parent 7649742943
commit 0f429298cf
2 changed files with 83 additions and 3 deletions

View File

@ -43,8 +43,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
@ -1524,12 +1524,11 @@ public class KafkaSupervisor implements Supervisor
if (endOffsets.equals(taskGroup.sequenceOffsets.lastEntry().getValue())) {
log.warn(
"Not adding checkpoint [%s] as its same as the start offsets [%s] of latest sequence for the task group [%d]",
"Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]",
endOffsets,
taskGroup.sequenceOffsets.lastEntry().getValue(),
groupId
);
return endOffsets;
}
log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);

View File

@ -1905,6 +1905,87 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
@Test
public void testNoDataIngestionTasks() throws Exception
{
final DateTime startTime = DateTimes.nowUtc();
supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
//not adding any events
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
);
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
);
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
null
);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
reset(taskQueue, indexerMetadataStorageCoordinator);
expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
taskQueue.shutdown("id1");
taskQueue.shutdown("id2");
taskQueue.shutdown("id3");
replay(taskQueue, indexerMetadataStorageCoordinator);
supervisor.resetInternal(null);
verifyAll();
}
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {