From 44a2b204df6f976ec1740e6c89932c88f6206326 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Mon, 26 Oct 2015 11:49:20 -0700 Subject: [PATCH] Add timeout to shutdown request to middle manager for indexing service --- .../content/configuration/indexing-service.md | 1 + .../indexing/overlord/RemoteTaskRunner.java | 24 +++++++++++++------ .../config/RemoteTaskRunnerConfig.java | 8 +++++++ .../overlord/TestRemoteTaskRunnerConfig.java | 6 +++++ pom.xml | 2 +- 5 files changed, 33 insertions(+), 8 deletions(-) diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index 8f9ee327f0a..68d6612f41d 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -81,6 +81,7 @@ The following configs only apply if the overlord is running in remote mode: |`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|true| |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288| |`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a middle manager is disconnected from Zookeeper.|PT15M| +|`druid.indexer.runner.taskShutdownLinkTimeout`|How long to wait on a shutdown request to a middle manager before timing out|PT1M| There are additional configs for autoscaling (if it is enabled): diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index a54d9abb8cf..008729d8969 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -37,6 +37,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.metamx.common.ISE; +import com.metamx.common.RE; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; @@ -66,12 +67,12 @@ import org.apache.zookeeper.KeeperException; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; +import org.joda.time.Duration; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -110,6 +111,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer private final ObjectMapper jsonMapper; private final RemoteTaskRunnerConfig config; + private final Duration shutdownTimeout; private final IndexerZkConfig indexerZkConfig; private final CuratorFramework cf; private final PathChildrenCacheFactory pathChildrenCacheFactory; @@ -155,6 +157,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer { this.jsonMapper = jsonMapper; this.config = config; + this.shutdownTimeout = config.getTaskShutdownLinkTimeout().toStandardDuration(); // Fail fast this.indexerZkConfig = indexerZkConfig; this.cf = cf; this.pathChildrenCacheFactory = pathChildrenCacheFactory; @@ -252,7 +255,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer List workers; try { workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath()); - } catch (KeeperException.NoNodeException e) { + } + catch (KeeperException.NoNodeException e) { // statusPath doesn't exist yet; can occur if no middleManagers have started. workers = ImmutableList.of(); } @@ -386,12 +390,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer log.info("Can't shutdown! No worker running task %s", taskId); return; } - + URL url = null; try { - final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId)); + url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId)); final StatusResponseHolder response = httpClient.go( new Request(HttpMethod.POST, url), - RESPONSE_HANDLER + RESPONSE_HANDLER, + shutdownTimeout ).get(); log.info( @@ -405,8 +410,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer log.error("Shutdown failed for %s! Are you sure the task was running?", taskId); } } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RE(e, "Interrupted posting shutdown to [%s] for task [%s]", url, taskId); + } catch (Exception e) { - throw Throwables.propagate(e); + throw new RE(e, "Error in handling post to [%s] for task [%s]", zkWorker.getWorker().getHost(), taskId); } } } @@ -911,7 +920,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer catch (Exception e) { log.makeAlert("Exception while cleaning up worker[%s]", worker).emit(); throw Throwables.propagate(e); - } finally { + } + finally { removedWorkerCleanups.remove(worker); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java index 6291cec1213..5f6f34edb5e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java @@ -42,6 +42,9 @@ public class RemoteTaskRunnerConfig @Min(10 * 1024) private long maxZnodeBytes = 512 * 1024; + @JsonProperty + private Period taskShutdownLinkTimeout = new Period("PT1M"); + public Period getTaskAssignmentTimeout() { return taskAssignmentTimeout; @@ -61,4 +64,9 @@ public class RemoteTaskRunnerConfig { return maxZnodeBytes; } + + public Period getTaskShutdownLinkTimeout() + { + return taskShutdownLinkTimeout; + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TestRemoteTaskRunnerConfig.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TestRemoteTaskRunnerConfig.java index 24bd272b717..b94fc5906f9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TestRemoteTaskRunnerConfig.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TestRemoteTaskRunnerConfig.java @@ -50,6 +50,12 @@ public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig return 10 * 1024; } + @Override + public Period getTaskShutdownLinkTimeout() + { + return timeout; + } + @Override public String getMinWorkerVersion() { diff --git a/pom.xml b/pom.xml index 816be54d14c..f21bbf34095 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,7 @@ com.metamx http-client - 1.0.2 + 1.0.4 com.metamx