fix worker thread pool exhaustion bug (#3760)

* fix worker thread pool exhaustion bug

* code review changes

* code review changes
This commit is contained in:
David Lim 2016-12-09 16:23:11 -07:00 committed by Fangjin Yang
parent 7f087cdd3b
commit 0b9dff0bc1
3 changed files with 55 additions and 45 deletions

View File

@ -76,6 +76,9 @@ public class KafkaIndexTaskClient
} }
} }
public static final int MAX_RETRY_WAIT_SECONDS = 10;
private static final int MIN_RETRY_WAIT_SECONDS = 2;
private static final EmittingLogger log = new EmittingLogger(KafkaIndexTaskClient.class); private static final EmittingLogger log = new EmittingLogger(KafkaIndexTaskClient.class);
private static final String BASE_PATH = "/druid/worker/v1/chat"; private static final String BASE_PATH = "/druid/worker/v1/chat";
private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5; private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
@ -437,8 +440,8 @@ public class KafkaIndexTaskClient
// the middle of persisting to disk and doesn't respond immediately. // the middle of persisting to disk and doesn't respond immediately.
return new RetryPolicyFactory( return new RetryPolicyFactory(
new RetryPolicyConfig() new RetryPolicyConfig()
.setMinWait(Period.seconds(2)) .setMinWait(Period.seconds(MIN_RETRY_WAIT_SECONDS))
.setMaxWait(Period.seconds(10)) .setMaxWait(Period.seconds(MAX_RETRY_WAIT_SECONDS))
.setMaxRetryCount(numRetries) .setMaxRetryCount(numRetries)
); );
} }

View File

@ -87,6 +87,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
* Supervisor responsible for managing the KafkaIndexTasks for a single dataSource. At a high level, the class accepts a * Supervisor responsible for managing the KafkaIndexTasks for a single dataSource. At a high level, the class accepts a
@ -102,6 +103,7 @@ public class KafkaSupervisor implements Supervisor
private static final Random RANDOM = new Random(); private static final Random RANDOM = new Random();
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000; // prevent us from running too often in response to events private static final long MAX_RUN_FREQUENCY_MILLIS = 1000; // prevent us from running too often in response to events
private static final long NOT_SET = -1; private static final long NOT_SET = -1;
private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
// Internal data structures // Internal data structures
// -------------------------------------------------------- // --------------------------------------------------------
@ -174,6 +176,7 @@ public class KafkaSupervisor implements Supervisor
private final KafkaTuningConfig taskTuningConfig; private final KafkaTuningConfig taskTuningConfig;
private final String supervisorId; private final String supervisorId;
private final TaskInfoProvider taskInfoProvider; private final TaskInfoProvider taskInfoProvider;
private final long futureTimeoutInSeconds; // how long to wait for async operations to complete
private final ExecutorService exec; private final ExecutorService exec;
private final ScheduledExecutorService scheduledExec; private final ScheduledExecutorService scheduledExec;
@ -255,6 +258,12 @@ public class KafkaSupervisor implements Supervisor
} }
}; };
this.futureTimeoutInSeconds = Math.max(
MINIMUM_FUTURE_TIMEOUT_IN_SECONDS,
tuningConfig.getChatRetries() * (tuningConfig.getHttpTimeout().getStandardSeconds()
+ KafkaIndexTaskClient.MAX_RETRY_WAIT_SECONDS)
);
int chatThreads = (this.tuningConfig.getChatThreads() != null int chatThreads = (this.tuningConfig.getChatThreads() != null
? this.tuningConfig.getChatThreads() ? this.tuningConfig.getChatThreads()
: Math.min(10, this.ioConfig.getTaskCount() * this.ioConfig.getReplicas())); : Math.min(10, this.ioConfig.getTaskCount() * this.ioConfig.getReplicas()));
@ -401,7 +410,8 @@ public class KafkaSupervisor implements Supervisor
} }
@Override @Override
public void reset() { public void reset()
{
log.info("Posting ResetNotice"); log.info("Posting ResetNotice");
notices.add(new ResetNotice()); notices.add(new ResetNotice());
} }
@ -445,13 +455,13 @@ public class KafkaSupervisor implements Supervisor
private interface Notice private interface Notice
{ {
void handle() throws ExecutionException, InterruptedException; void handle() throws ExecutionException, InterruptedException, TimeoutException;
} }
private class RunNotice implements Notice private class RunNotice implements Notice
{ {
@Override @Override
public void handle() throws ExecutionException, InterruptedException public void handle() throws ExecutionException, InterruptedException, TimeoutException
{ {
long nowTime = System.currentTimeMillis(); long nowTime = System.currentTimeMillis();
if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) { if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) {
@ -466,7 +476,7 @@ public class KafkaSupervisor implements Supervisor
private class GracefulShutdownNotice extends ShutdownNotice private class GracefulShutdownNotice extends ShutdownNotice
{ {
@Override @Override
public void handle() throws InterruptedException, ExecutionException public void handle() throws InterruptedException, ExecutionException, TimeoutException
{ {
gracefulShutdownInternal(); gracefulShutdownInternal();
super.handle(); super.handle();
@ -476,7 +486,7 @@ public class KafkaSupervisor implements Supervisor
private class ShutdownNotice implements Notice private class ShutdownNotice implements Notice
{ {
@Override @Override
public void handle() throws InterruptedException, ExecutionException public void handle() throws InterruptedException, ExecutionException, TimeoutException
{ {
consumer.close(); consumer.close();
@ -515,7 +525,7 @@ public class KafkaSupervisor implements Supervisor
} }
@VisibleForTesting @VisibleForTesting
void gracefulShutdownInternal() throws ExecutionException, InterruptedException void gracefulShutdownInternal() throws ExecutionException, InterruptedException, TimeoutException
{ {
// Prepare for shutdown by 1) killing all tasks that haven't been assigned to a worker yet, and 2) causing all // Prepare for shutdown by 1) killing all tasks that haven't been assigned to a worker yet, and 2) causing all
// running tasks to begin publishing by setting their startTime to a very long time ago so that the logic in // running tasks to begin publishing by setting their startTime to a very long time ago so that the logic in
@ -535,7 +545,7 @@ public class KafkaSupervisor implements Supervisor
} }
@VisibleForTesting @VisibleForTesting
void runInternal() throws ExecutionException, InterruptedException void runInternal() throws ExecutionException, InterruptedException, TimeoutException
{ {
possiblyRegisterListener(); possiblyRegisterListener();
updatePartitionDataFromKafka(); updatePartitionDataFromKafka();
@ -659,7 +669,7 @@ public class KafkaSupervisor implements Supervisor
} }
} }
private void discoverTasks() throws ExecutionException, InterruptedException private void discoverTasks() throws ExecutionException, InterruptedException, TimeoutException
{ {
int taskCount = 0; int taskCount = 0;
List<String> futureTaskIds = Lists.newArrayList(); List<String> futureTaskIds = Lists.newArrayList();
@ -740,9 +750,9 @@ public class KafkaSupervisor implements Supervisor
taskId taskId
); );
try { try {
stopTask(taskId, false).get(); stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
} }
catch (InterruptedException | ExecutionException e) { catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn(e, "Exception while stopping task"); log.warn(e, "Exception while stopping task");
} }
return false; return false;
@ -768,9 +778,9 @@ public class KafkaSupervisor implements Supervisor
taskId taskId
); );
try { try {
stopTask(taskId, false).get(); stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
} }
catch (InterruptedException | ExecutionException e) { catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn(e, "Exception while stopping task"); log.warn(e, "Exception while stopping task");
} }
return false; return false;
@ -787,7 +797,7 @@ public class KafkaSupervisor implements Supervisor
} }
} }
List<Boolean> results = Futures.successfulAsList(futures).get(); List<Boolean> results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
for (int i = 0; i < results.size(); i++) { for (int i = 0; i < results.size(); i++) {
if (results.get(i) == null) { if (results.get(i) == null) {
String taskId = futureTaskIds.get(i); String taskId = futureTaskIds.get(i);
@ -828,7 +838,7 @@ public class KafkaSupervisor implements Supervisor
taskGroupList.add(newTaskGroup); taskGroupList.add(newTaskGroup);
} }
private void updateTaskStatus() throws ExecutionException, InterruptedException private void updateTaskStatus() throws ExecutionException, InterruptedException, TimeoutException
{ {
final List<ListenableFuture<Boolean>> futures = Lists.newArrayList(); final List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
final List<String> futureTaskIds = Lists.newArrayList(); final List<String> futureTaskIds = Lists.newArrayList();
@ -884,7 +894,7 @@ public class KafkaSupervisor implements Supervisor
} }
} }
List<Boolean> results = Futures.successfulAsList(futures).get(); List<Boolean> results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
for (int i = 0; i < results.size(); i++) { for (int i = 0; i < results.size(); i++) {
// false means the task hasn't started running yet and that's okay; null means it should be running but the HTTP // false means the task hasn't started running yet and that's okay; null means it should be running but the HTTP
// request threw an exception so kill the task // request threw an exception so kill the task
@ -896,7 +906,7 @@ public class KafkaSupervisor implements Supervisor
} }
} }
private void checkTaskDuration() throws InterruptedException, ExecutionException private void checkTaskDuration() throws InterruptedException, ExecutionException, TimeoutException
{ {
final List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList(); final List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
final List<Integer> futureGroupIds = Lists.newArrayList(); final List<Integer> futureGroupIds = Lists.newArrayList();
@ -921,7 +931,7 @@ public class KafkaSupervisor implements Supervisor
} }
} }
List<Map<Integer, Long>> results = Futures.successfulAsList(futures).get(); List<Map<Integer, Long>> results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
for (int j = 0; j < results.size(); j++) { for (int j = 0; j < results.size(); j++) {
Integer groupId = futureGroupIds.get(j); Integer groupId = futureGroupIds.get(j);
TaskGroup group = taskGroups.get(groupId); TaskGroup group = taskGroups.get(groupId);
@ -970,15 +980,15 @@ public class KafkaSupervisor implements Supervisor
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
// failed and we need to re-ingest) // failed and we need to re-ingest)
return Futures.transform( return Futures.transform(
stopTasksInGroup(taskGroup), new Function<Void, Map<Integer, Long>>() stopTasksInGroup(taskGroup), new Function<Object, Map<Integer, Long>>()
{ {
@Nullable @Nullable
@Override @Override
public Map<Integer, Long> apply(@Nullable Void input) public Map<Integer, Long> apply(@Nullable Object input)
{ {
return null; return null;
} }
}, workerExec }
); );
} }
@ -1042,7 +1052,8 @@ public class KafkaSupervisor implements Supervisor
} }
try { try {
List<Boolean> results = Futures.successfulAsList(setEndOffsetFutures).get(); List<Boolean> results = Futures.successfulAsList(setEndOffsetFutures)
.get(futureTimeoutInSeconds, TimeUnit.SECONDS);
for (int i = 0; i < results.size(); i++) { for (int i = 0; i < results.size(); i++) {
if (results.get(i) == null || !results.get(i)) { if (results.get(i) == null || !results.get(i)) {
String taskId = setEndOffsetTaskIds.get(i); String taskId = setEndOffsetTaskIds.get(i);
@ -1075,9 +1086,9 @@ public class KafkaSupervisor implements Supervisor
* starting offset for subsequent task groups is no longer valid, and subsequent tasks would fail as soon as they * starting offset for subsequent task groups is no longer valid, and subsequent tasks would fail as soon as they
* attempted to publish because of the contiguous range consistency check. * attempted to publish because of the contiguous range consistency check.
*/ */
private void checkPendingCompletionTasks() throws ExecutionException, InterruptedException private void checkPendingCompletionTasks() throws ExecutionException, InterruptedException, TimeoutException
{ {
List<ListenableFuture<Void>> futures = Lists.newArrayList(); List<ListenableFuture<?>> futures = Lists.newArrayList();
for (Map.Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : pendingCompletionTaskGroups.entrySet()) { for (Map.Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : pendingCompletionTaskGroups.entrySet()) {
@ -1151,12 +1162,13 @@ public class KafkaSupervisor implements Supervisor
taskGroupList.removeAll(toRemove); taskGroupList.removeAll(toRemove);
} }
Futures.successfulAsList(futures).get(); // wait for all task shutdowns to complete before returning // wait for all task shutdowns to complete before returning
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
} }
private void checkCurrentTaskState() throws ExecutionException, InterruptedException private void checkCurrentTaskState() throws ExecutionException, InterruptedException, TimeoutException
{ {
List<ListenableFuture<Void>> futures = Lists.newArrayList(); List<ListenableFuture<?>> futures = Lists.newArrayList();
Iterator<Map.Entry<Integer, TaskGroup>> iTaskGroups = taskGroups.entrySet().iterator(); Iterator<Map.Entry<Integer, TaskGroup>> iTaskGroups = taskGroups.entrySet().iterator();
while (iTaskGroups.hasNext()) { while (iTaskGroups.hasNext()) {
Map.Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next(); Map.Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
@ -1202,7 +1214,8 @@ public class KafkaSupervisor implements Supervisor
log.debug("Task group [%d] post-pruning: %s", groupId, taskGroup.tasks.keySet()); log.debug("Task group [%d] post-pruning: %s", groupId, taskGroup.tasks.keySet());
} }
Futures.successfulAsList(futures).get(); // wait for all task shutdowns to complete before returning // wait for all task shutdowns to complete before returning
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
} }
void createNewTasks() void createNewTasks()
@ -1400,7 +1413,7 @@ public class KafkaSupervisor implements Supervisor
return generateSequenceName(taskGroupId).equals(taskSequenceName); return generateSequenceName(taskGroupId).equals(taskSequenceName);
} }
private ListenableFuture<Void> stopTasksInGroup(TaskGroup taskGroup) private ListenableFuture<?> stopTasksInGroup(TaskGroup taskGroup)
{ {
if (taskGroup == null) { if (taskGroup == null) {
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
@ -1413,17 +1426,7 @@ public class KafkaSupervisor implements Supervisor
} }
} }
return Futures.transform( return Futures.successfulAsList(futures);
Futures.successfulAsList(futures), new Function<List<Void>, Void>()
{
@Nullable
@Override
public Void apply(@Nullable List<Void> input)
{
return null;
}
}, workerExec
);
} }
private ListenableFuture<Void> stopTask(final String id, final boolean publish) private ListenableFuture<Void> stopTask(final String id, final boolean publish)
@ -1441,7 +1444,7 @@ public class KafkaSupervisor implements Supervisor
} }
return null; return null;
} }
}, workerExec }
); );
} }
@ -1549,7 +1552,8 @@ public class KafkaSupervisor implements Supervisor
} }
} }
List<Map<Integer, Long>> results = Futures.successfulAsList(futures).get(); List<Map<Integer, Long>> results = Futures.successfulAsList(futures)
.get(futureTimeoutInSeconds, TimeUnit.SECONDS);
for (int i = 0; i < taskReports.size(); i++) { for (int i = 0; i < taskReports.size(); i++) {
TaskReportData reportData = taskReports.get(i); TaskReportData reportData = taskReports.get(i);
if (includeOffsets) { if (includeOffsets) {

View File

@ -513,8 +513,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
null null
) )
).anyTimes(); ).anyTimes();
expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(false));
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
taskQueue.shutdown("id1");
taskQueue.shutdown("id3"); taskQueue.shutdown("id3");
expect(taskQueue.add(anyObject(Task.class))).andReturn(true); expect(taskQueue.add(anyObject(Task.class))).andReturn(true);
@ -596,8 +597,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
null null
) )
).anyTimes(); ).anyTimes();
expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(true));
expect(taskClient.stopAsync("id4", false)).andReturn(Futures.immediateFuture(false));
expect(taskClient.stopAsync("id5", false)).andReturn(Futures.immediateFuture((Boolean) null));
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
taskQueue.shutdown("id3");
taskQueue.shutdown("id4"); taskQueue.shutdown("id4");
taskQueue.shutdown("id5"); taskQueue.shutdown("id5");
replayAll(); replayAll();