mirror of https://github.com/apache/druid.git
Add graceful shutdown timeout for Jetty (#5429)
* Add graceful shutdown timeout * Handle interruptedException * Incorporate code review comments * Address code review comments * Poll for activeConnections to be zero * Use statistics handler to get active requests * Use native jetty shutdown gracefully * Move log line back to where it was * Add unannounce wait time * Make the default retain prior behavior * Update docs with new config defaults * Make duration handling on jetty shutdown more consistent * StatisticsHandler is a wrapper * Move jetty lifecycle error logging to error
This commit is contained in:
parent
0851f2206c
commit
ef21ce5a64
|
@ -41,6 +41,8 @@ Druid uses Jetty to serve HTTP requests.
|
|||
|`druid.server.http.enableRequestLimit`|If enabled, no requests would be queued in jetty queue and "HTTP 429 Too Many Requests" error response would be sent. |false|
|
||||
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|
||||
|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This is an advance configuration that allows to protect in case broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used.|Long.MAX_VALUE|
|
||||
|`druid.server.http.gracefulShutdownTimeout`|The maximum amount of time Jetty waits after receiving shutdown signal. After this timeout the threads will be forcefully shutdown. This allows any queries that are executing to complete.|`PT0s` (do not wait)|
|
||||
|`druid.server.http.unannouncePropogationDelay`|How long to wait for zookeeper unannouncements to propgate before shutting down Jetty. This is a minimum and `druid.server.http.gracefulShutdownTimeout` does not start counting down until after this period elapses.|`PT0s` (do not wait)|
|
||||
|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|
||||
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|
||||
|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M|
|
||||
|
|
|
@ -54,6 +54,8 @@ Druid uses Jetty to serve HTTP requests.
|
|||
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|
||||
|`druid.server.http.enableRequestLimit`|If enabled, no requests would be queued in jetty queue and "HTTP 429 Too Many Requests" error response would be sent. |false|
|
||||
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|
||||
|`druid.server.http.gracefulShutdownTimeout`|The maximum amount of time Jetty waits after receiving shutdown signal. After this timeout the threads will be forcefully shutdown. This allows any queries that are executing to complete.|`PT0s` (do not wait)|
|
||||
|`druid.server.http.unannouncePropogationDelay`|How long to wait for zookeeper unannouncements to propgate before shutting down Jetty. This is a minimum and `druid.server.http.gracefulShutdownTimeout` does not start counting down until after this period elapses.|`PT0s` (do not wait)|
|
||||
|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE|
|
||||
|`druid.server.http.maxRequestHeaderSize`|Maximum size of a request header in bytes. Larger headers consume more memory and can make a server more vulnerable to denial of service attacks.|8 * 1024|
|
||||
|
||||
|
|
|
@ -60,6 +60,14 @@ public class ServerConfig
|
|||
@JsonProperty
|
||||
private int maxRequestHeaderSize = 8 * 1024;
|
||||
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
private Period gracefulShutdownTimeout = Period.ZERO;
|
||||
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
private Period unannouncePropogationDelay = Period.ZERO;
|
||||
|
||||
public int getNumThreads()
|
||||
{
|
||||
return numThreads;
|
||||
|
@ -100,6 +108,16 @@ public class ServerConfig
|
|||
return maxRequestHeaderSize;
|
||||
}
|
||||
|
||||
public Period getGracefulShutdownTimeout()
|
||||
{
|
||||
return gracefulShutdownTimeout;
|
||||
}
|
||||
|
||||
public Period getUnannouncePropogationDelay()
|
||||
{
|
||||
return unannouncePropogationDelay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
@ -115,13 +133,17 @@ public class ServerConfig
|
|||
enableRequestLimit == that.enableRequestLimit &&
|
||||
defaultQueryTimeout == that.defaultQueryTimeout &&
|
||||
maxScatterGatherBytes == that.maxScatterGatherBytes &&
|
||||
maxQueryTimeout == that.maxQueryTimeout &&
|
||||
maxRequestHeaderSize == that.maxRequestHeaderSize &&
|
||||
Objects.equals(maxIdleTime, that.maxIdleTime) &&
|
||||
maxQueryTimeout == that.maxQueryTimeout;
|
||||
Objects.equals(gracefulShutdownTimeout, that.gracefulShutdownTimeout) &&
|
||||
Objects.equals(unannouncePropogationDelay, that.unannouncePropogationDelay);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
|
||||
return Objects.hash(
|
||||
numThreads,
|
||||
queueSize,
|
||||
|
@ -129,7 +151,27 @@ public class ServerConfig
|
|||
maxIdleTime,
|
||||
defaultQueryTimeout,
|
||||
maxScatterGatherBytes,
|
||||
maxQueryTimeout
|
||||
maxQueryTimeout,
|
||||
maxRequestHeaderSize,
|
||||
gracefulShutdownTimeout,
|
||||
unannouncePropogationDelay
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "ServerConfig{" +
|
||||
"numThreads=" + numThreads +
|
||||
", queueSize=" + queueSize +
|
||||
", enableRequestLimit=" + enableRequestLimit +
|
||||
", maxIdleTime=" + maxIdleTime +
|
||||
", defaultQueryTimeout=" + defaultQueryTimeout +
|
||||
", maxScatterGatherBytes=" + maxScatterGatherBytes +
|
||||
", maxQueryTimeout=" + maxQueryTimeout +
|
||||
", maxRequestHeaderSize=" + maxRequestHeaderSize +
|
||||
", gracefulShutdownTimeout=" + gracefulShutdownTimeout +
|
||||
", unannouncePropogationDelay=" + unannouncePropogationDelay +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.eclipse.jetty.server.SecureRequestCustomizer;
|
|||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.SslConnectionFactory;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
|
||||
|
@ -294,6 +295,42 @@ public class JettyServerModule extends JerseyServletModule
|
|||
}
|
||||
|
||||
server.setConnectors(connectors);
|
||||
final long gracefulStop = config.getGracefulShutdownTimeout().toStandardDuration().getMillis();
|
||||
if (gracefulStop > 0) {
|
||||
server.setStopTimeout(gracefulStop);
|
||||
}
|
||||
server.addLifeCycleListener(new LifeCycle.Listener()
|
||||
{
|
||||
@Override
|
||||
public void lifeCycleStarting(LifeCycle event)
|
||||
{
|
||||
log.debug("Jetty lifecycle starting [%s]", event.getClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lifeCycleStarted(LifeCycle event)
|
||||
{
|
||||
log.debug("Jetty lifeycle started [%s]", event.getClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lifeCycleFailure(LifeCycle event, Throwable cause)
|
||||
{
|
||||
log.error(cause, "Jetty lifecycle event failed [%s]", event.getClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lifeCycleStopping(LifeCycle event)
|
||||
{
|
||||
log.debug("Jetty lifecycle stopping [%s]", event.getClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lifeCycleStopped(LifeCycle event)
|
||||
{
|
||||
log.debug("Jetty lifecycle stopped [%s]", event.getClass());
|
||||
}
|
||||
});
|
||||
|
||||
// initialize server
|
||||
JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class);
|
||||
|
@ -339,9 +376,20 @@ public class JettyServerModule extends JerseyServletModule
|
|||
public void stop()
|
||||
{
|
||||
try {
|
||||
final long unannounceDelay = config.getUnannouncePropogationDelay().toStandardDuration().getMillis();
|
||||
if (unannounceDelay > 0) {
|
||||
log.info("Waiting %s ms for unannouncement to propogate.", unannounceDelay);
|
||||
Thread.sleep(unannounceDelay);
|
||||
} else {
|
||||
log.debug("Skipping unannounce wait.");
|
||||
}
|
||||
log.info("Stopping Jetty Server...");
|
||||
server.stop();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RE(e, "Interrupted waiting for jetty shutdown.");
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Unable to stop Jetty server.");
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import io.druid.server.security.AuthenticatorMapper;
|
|||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.server.handler.StatisticsHandler;
|
||||
import org.eclipse.jetty.servlet.DefaultServlet;
|
||||
import org.eclipse.jetty.servlet.FilterHolder;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
|
@ -111,13 +112,24 @@ public class QueryJettyServerInitializer implements JettyServerInitializer
|
|||
root.addFilter(GuiceFilter.class, "/*", null);
|
||||
|
||||
final HandlerList handlerList = new HandlerList();
|
||||
final Handler[] handlers = new Handler[extensionHandlers.size() + 2];
|
||||
handlers[0] = JettyServerInitUtils.getJettyRequestLogHandler();
|
||||
handlers[handlers.length - 1] = JettyServerInitUtils.wrapWithDefaultGzipHandler(root);
|
||||
for (int i = 0; i < extensionHandlers.size(); i++) {
|
||||
handlers[i + 1] = extensionHandlers.get(i);
|
||||
// Do not change the order of the handlers that have already been added
|
||||
for (Handler handler : server.getHandlers()) {
|
||||
handlerList.addHandler(handler);
|
||||
}
|
||||
handlerList.setHandlers(handlers);
|
||||
server.setHandler(handlerList);
|
||||
|
||||
handlerList.addHandler(JettyServerInitUtils.getJettyRequestLogHandler());
|
||||
|
||||
// Add all extension handlers
|
||||
for (Handler handler : extensionHandlers) {
|
||||
handlerList.addHandler(handler);
|
||||
}
|
||||
|
||||
// Add Gzip handler at the very end
|
||||
handlerList.addHandler(JettyServerInitUtils.wrapWithDefaultGzipHandler(root));
|
||||
|
||||
final StatisticsHandler statisticsHandler = new StatisticsHandler();
|
||||
statisticsHandler.setHandler(handlerList);
|
||||
|
||||
server.setHandler(statisticsHandler);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue