Fix handling of InterruptedException in WorkerTaskMonitor's mainLoop.

I believe this will fix #2664.
This commit is contained in:
Gian Merlino 2016-03-25 12:17:33 -07:00
parent ef0bc5d64f
commit ee4bb96855
2 changed files with 42 additions and 49 deletions

View File

@ -20,9 +20,7 @@
package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
@ -37,9 +35,9 @@ import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.server.initialization.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
@ -167,24 +165,23 @@ public class WorkerCuratorCoordinator
return worker;
}
public void removeTaskRunZnode(String taskId)
public void removeTaskRunZnode(String taskId) throws Exception
{
try {
curatorFramework.delete().guaranteed().forPath(getTaskPathForId(taskId));
}
catch (Exception e) {
catch (KeeperException e) {
log.warn(e, "Could not delete task path for task[%s]", taskId);
}
}
public void updateTaskStatusAnnouncement(TaskAnnouncement announcement)
public void updateTaskStatusAnnouncement(TaskAnnouncement announcement) throws Exception
{
synchronized (lock) {
if (!started) {
return;
}
try {
CuratorUtils.createOrSet(
curatorFramework,
getStatusPathForId(announcement.getTaskStatus().getId()),
@ -193,38 +190,22 @@ public class WorkerCuratorCoordinator
config.getMaxZnodeBytes()
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
public List<TaskAnnouncement> getAnnouncements()
public List<TaskAnnouncement> getAnnouncements() throws Exception
{
try {
return Lists.transform(
curatorFramework.getChildren().forPath(getStatusPathForWorker()), new Function<String, TaskAnnouncement>()
{
@Nullable
@Override
public TaskAnnouncement apply(String input)
{
try {
return jsonMapper.readValue(
curatorFramework.getData().forPath(getStatusPathForId(input)),
final List<TaskAnnouncement> announcements = Lists.newArrayList();
for (String id : curatorFramework.getChildren().forPath(getStatusPathForWorker())) {
announcements.add(
jsonMapper.readValue(
curatorFramework.getData().forPath(getStatusPathForId(id)),
TaskAnnouncement.class
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return announcements;
}
public void updateWorkerAnnouncement(Worker newWorker) throws Exception

View File

@ -21,7 +21,6 @@ package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -98,7 +97,7 @@ public class WorkerTaskMonitor
* started the task. When the task is complete, the worker node updates the status.
*/
@LifecycleStart
public void start()
public void start() throws Exception
{
synchronized (lifecycleLock) {
Preconditions.checkState(!started, "already started");
@ -125,10 +124,13 @@ public class WorkerTaskMonitor
log.info("Started WorkerTaskMonitor.");
started = true;
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
log.makeAlert(e, "Exception starting WorkerTaskMonitor")
.emit();
throw Throwables.propagate(e);
throw e;
}
}
}
@ -142,6 +144,10 @@ public class WorkerTaskMonitor
try {
notice.handle();
}
catch (InterruptedException e) {
// Will be caught and logged in the outer try block
throw e;
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle notice")
.addData("noticeClass", notice.getClass().getSimpleName())
@ -166,7 +172,7 @@ public class WorkerTaskMonitor
}
}
private void cleanupStaleAnnouncements()
private void cleanupStaleAnnouncements() throws Exception
{
// cleanup any old running task announcements which are invalid after restart
for (TaskAnnouncement announcement : workerCuratorCoordinator.getAnnouncements()) {
@ -292,7 +298,7 @@ public class WorkerTaskMonitor
{
String getTaskId();
void handle();
void handle() throws Exception;
}
private class RunNotice implements Notice
@ -311,7 +317,7 @@ public class WorkerTaskMonitor
}
@Override
public void handle()
public void handle() throws Exception
{
if (running.containsKey(task.getId())) {
log.warn(
@ -357,7 +363,7 @@ public class WorkerTaskMonitor
}
@Override
public void handle()
public void handle() throws Exception
{
final TaskDetails details = running.get(task.getId());
@ -391,6 +397,9 @@ public class WorkerTaskMonitor
status.getStatusCode()
);
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
log.makeAlert(e, "Failed to update task announcement")
.addData("task", task.getId())
@ -420,7 +429,7 @@ public class WorkerTaskMonitor
}
@Override
public void handle()
public void handle() throws InterruptedException
{
final TaskDetails details = running.get(taskId);
@ -442,6 +451,9 @@ public class WorkerTaskMonitor
)
);
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
log.makeAlert(e, "Failed to update task announcement")
.addData("task", taskId)