mirror of https://github.com/apache/druid.git
Merge pull request #1862 from metamx/indexingServiceMMGone
Add timeout to shutdown request to middle manager for indexing service
This commit is contained in:
commit
7df7370935
|
@ -81,6 +81,7 @@ The following configs only apply if the overlord is running in remote mode:
|
|||
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|true|
|
||||
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|
||||
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a middle manager is disconnected from Zookeeper.|PT15M|
|
||||
|`druid.indexer.runner.taskShutdownLinkTimeout`|How long to wait on a shutdown request to a middle manager before timing out|PT1M|
|
||||
|
||||
There are additional configs for autoscaling (if it is enabled):
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ import com.google.common.util.concurrent.Futures;
|
|||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.RE;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
@ -66,12 +67,12 @@ import org.apache.zookeeper.KeeperException;
|
|||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -110,6 +111,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final RemoteTaskRunnerConfig config;
|
||||
private final Duration shutdownTimeout;
|
||||
private final IndexerZkConfig indexerZkConfig;
|
||||
private final CuratorFramework cf;
|
||||
private final PathChildrenCacheFactory pathChildrenCacheFactory;
|
||||
|
@ -155,6 +157,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.config = config;
|
||||
this.shutdownTimeout = config.getTaskShutdownLinkTimeout().toStandardDuration(); // Fail fast
|
||||
this.indexerZkConfig = indexerZkConfig;
|
||||
this.cf = cf;
|
||||
this.pathChildrenCacheFactory = pathChildrenCacheFactory;
|
||||
|
@ -252,7 +255,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
List<String> workers;
|
||||
try {
|
||||
workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
}
|
||||
catch (KeeperException.NoNodeException e) {
|
||||
// statusPath doesn't exist yet; can occur if no middleManagers have started.
|
||||
workers = ImmutableList.of();
|
||||
}
|
||||
|
@ -386,12 +390,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
log.info("Can't shutdown! No worker running task %s", taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
URL url = null;
|
||||
try {
|
||||
final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId));
|
||||
url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId));
|
||||
final StatusResponseHolder response = httpClient.go(
|
||||
new Request(HttpMethod.POST, url),
|
||||
RESPONSE_HANDLER
|
||||
RESPONSE_HANDLER,
|
||||
shutdownTimeout
|
||||
).get();
|
||||
|
||||
log.info(
|
||||
|
@ -405,8 +410,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
log.error("Shutdown failed for %s! Are you sure the task was running?", taskId);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RE(e, "Interrupted posting shutdown to [%s] for task [%s]", url, taskId);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
throw new RE(e, "Error in handling post to [%s] for task [%s]", zkWorker.getWorker().getHost(), taskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -911,7 +920,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
catch (Exception e) {
|
||||
log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
|
||||
throw Throwables.propagate(e);
|
||||
} finally {
|
||||
}
|
||||
finally {
|
||||
removedWorkerCleanups.remove(worker);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,9 @@ public class RemoteTaskRunnerConfig
|
|||
@Min(10 * 1024)
|
||||
private long maxZnodeBytes = 512 * 1024;
|
||||
|
||||
@JsonProperty
|
||||
private Period taskShutdownLinkTimeout = new Period("PT1M");
|
||||
|
||||
public Period getTaskAssignmentTimeout()
|
||||
{
|
||||
return taskAssignmentTimeout;
|
||||
|
@ -61,4 +64,9 @@ public class RemoteTaskRunnerConfig
|
|||
{
|
||||
return maxZnodeBytes;
|
||||
}
|
||||
|
||||
public Period getTaskShutdownLinkTimeout()
|
||||
{
|
||||
return taskShutdownLinkTimeout;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,12 @@ public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
|
|||
return 10 * 1024;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Period getTaskShutdownLinkTimeout()
|
||||
{
|
||||
return timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMinWorkerVersion()
|
||||
{
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -126,7 +126,7 @@
|
|||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>http-client</artifactId>
|
||||
<version>1.0.2</version>
|
||||
<version>1.0.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
|
|
|
@ -158,7 +158,7 @@ public class CoordinatorRuleManager
|
|||
).get();
|
||||
|
||||
if (response.getStatus().equals(HttpResponseStatus.FOUND)) {
|
||||
url = response.getResponse().getHeader("Location");
|
||||
url = response.getResponse().headers().get("Location");
|
||||
log.info("Redirecting rule request to [%s]", url);
|
||||
response = httpClient.go(
|
||||
new Request(
|
||||
|
|
Loading…
Reference in New Issue