From 5aaec6e23f6b5e9dcc418c6121203ffb26c60c85 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 30 Jan 2020 17:05:03 +0100 Subject: [PATCH] Issue #4321 Refactored Graceful shutdown (#4482) * Issue #4321 Refactored Graceful shutdown removed stopTimeout from all abstractLifeCycles. It is on Graceful.LifeCycle, which is only implemented by components that can start a graceful shutdown (eg Server, ContextHandler and QueuedThreadPool) Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown cleanup after review Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown reinstate other stop tests (more work to do). Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown Fixes for stop test by improving LocalConnector shutdown handling Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown Removed broken test on LocalConnector that is already tested in GracefulStopTest Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown Fixed all stop tests Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown fixed checkstyle Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown No stopTimeout JMX attribute Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown Dump stopTimeout test with default stopTimeout Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown USe sendError for 503 Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown minor cleanups Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown Simplifications after review Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown after review Signed-off-by: Greg Wilkins * Issue #4321 Refactored Graceful shutdown after review Signed-off-by: Greg Wilkins --- .../jetty/deploy/test/XmlConfiguredJetty.java | 1 - .../org/eclipse/jetty/io/ManagedSelector.java | 1 - .../eclipse/jetty/jmx/ObjectMBeanTest.java | 2 +- ...sLoadingTestingServletContextListener.java | 2 +- .../src/main/java/org/olamy/App.java | 6 +- .../maven/plugin/MavenServerConnector.java | 4 +- .../jetty/server/AbstractConnector.java | 83 +- .../server/AbstractNetworkConnector.java | 4 +- .../eclipse/jetty/server/LocalConnector.java | 4 +- .../java/org/eclipse/jetty/server/Server.java | 53 +- .../handler/AbstractHandlerContainer.java | 64 - .../jetty/server/handler/ContextHandler.java | 26 +- .../server/handler/StatisticsHandler.java | 43 +- .../jetty/server/GracefulStopTest.java | 1031 +++++------------ .../org/eclipse/jetty/server/StopTest.java | 548 +++++++++ .../eclipse/jetty/util/MultiException.java | 2 +- .../util/component/AbstractLifeCycle.java | 12 - .../util/component/ContainerLifeCycle.java | 16 - .../jetty/util/component/Graceful.java | 99 +- .../jetty/util/thread/ExecutorThreadPool.java | 2 +- .../jetty/util/thread/QueuedThreadPool.java | 31 +- .../thread/ReservedThreadExecutorTest.java | 1 - .../jetty/test/FailedSelectorTest.java | 1 - 23 files changed, 1071 insertions(+), 965 deletions(-) create mode 100644 jetty-server/src/test/java/org/eclipse/jetty/server/StopTest.java diff --git a/jetty-deploy/src/test/java/org/eclipse/jetty/deploy/test/XmlConfiguredJetty.java b/jetty-deploy/src/test/java/org/eclipse/jetty/deploy/test/XmlConfiguredJetty.java index ff1580bff89..a917a411d1a 100644 --- a/jetty-deploy/src/test/java/org/eclipse/jetty/deploy/test/XmlConfiguredJetty.java +++ b/jetty-deploy/src/test/java/org/eclipse/jetty/deploy/test/XmlConfiguredJetty.java @@ -366,7 +366,6 @@ public class XmlConfiguredJetty assertEquals(1, serverCount, "Server load count"); this._server = foundServer; - this._server.setStopTimeout(10); } public void removeWebapp(String name) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 5fa18a2633e..e8b1122c7ba 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -95,7 +95,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable Executor executor = selectorManager.getExecutor(); _strategy = new EatWhatYouKill(producer, executor); addBean(_strategy, true); - setStopTimeout(5000); } public Selector getSelector() diff --git a/jetty-jmx/src/test/java/org/eclipse/jetty/jmx/ObjectMBeanTest.java b/jetty-jmx/src/test/java/org/eclipse/jetty/jmx/ObjectMBeanTest.java index 45f93185c6f..c79226e6d81 100644 --- a/jetty-jmx/src/test/java/org/eclipse/jetty/jmx/ObjectMBeanTest.java +++ b/jetty-jmx/src/test/java/org/eclipse/jetty/jmx/ObjectMBeanTest.java @@ -123,7 +123,7 @@ public class ObjectMBeanTest assertEquals("com.acme.Derived", derivedInfo.getClassName(), "name does not match"); assertEquals("Test the mbean stuff", derivedInfo.getDescription(), "description does not match"); - assertEquals(6, derivedInfo.getAttributes().length, "attribute count does not match"); + assertEquals(5, derivedInfo.getAttributes().length, "attribute count does not match"); assertEquals("Full Name", derivedMBean.getAttribute("fname"), "attribute values does not match"); derivedMBean.setAttribute(new Attribute("fname", "Fuller Name")); diff --git a/jetty-maven-plugin/src/it/jetty-maven-plugin-provided-module-dep/web/src/main/java/test/ClassLoadingTestingServletContextListener.java b/jetty-maven-plugin/src/it/jetty-maven-plugin-provided-module-dep/web/src/main/java/test/ClassLoadingTestingServletContextListener.java index 3d4454cdddd..47ed26e235a 100755 --- a/jetty-maven-plugin/src/it/jetty-maven-plugin-provided-module-dep/web/src/main/java/test/ClassLoadingTestingServletContextListener.java +++ b/jetty-maven-plugin/src/it/jetty-maven-plugin-provided-module-dep/web/src/main/java/test/ClassLoadingTestingServletContextListener.java @@ -61,7 +61,7 @@ public class ClassLoadingTestingServletContextListener { } - private void printURLs (URLClassLoader l) + private void printURLs(URLClassLoader l) { if (l == null) return; diff --git a/jetty-maven-plugin/src/it/jetty-start-gwt-it/beer-client/src/main/java/org/olamy/App.java b/jetty-maven-plugin/src/it/jetty-start-gwt-it/beer-client/src/main/java/org/olamy/App.java index f6d7d601c78..19d099dc4c9 100644 --- a/jetty-maven-plugin/src/it/jetty-start-gwt-it/beer-client/src/main/java/org/olamy/App.java +++ b/jetty-maven-plugin/src/it/jetty-start-gwt-it/beer-client/src/main/java/org/olamy/App.java @@ -44,9 +44,9 @@ public class App implements EntryPoint * The message displayed to the user when the server cannot be reached or * returns an error. */ - private static final String SERVER_ERROR = "An error occurred while " - + "attempting to contact the server. Please check your network " - + "connection and try again."; + private static final String SERVER_ERROR = "An error occurred while " + + "attempting to contact the server. Please check your network " + + "connection and try again."; /** * Create a remote service proxy to talk to the server-side Greeting service. diff --git a/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/MavenServerConnector.java b/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/MavenServerConnector.java index 9784de0aebc..a886de76abc 100644 --- a/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/MavenServerConnector.java +++ b/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/MavenServerConnector.java @@ -20,8 +20,8 @@ package org.eclipse.jetty.maven.plugin; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; @@ -121,7 +121,7 @@ public class MavenServerConnector extends ContainerLifeCycle implements Connecto } @Override - public Future shutdown() + public CompletableFuture shutdown() { return checkDelegate().shutdown(); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 9b434113909..56a225803a8 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -30,11 +30,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import org.eclipse.jetty.io.ArrayByteBufferPool; @@ -154,10 +152,10 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co private final Thread[] _acceptors; private final Set _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set _immutableEndPoints = Collections.unmodifiableSet(_endpoints); - private final Graceful.Shutdown _shutdown = new Graceful.Shutdown(); + private Shutdown _shutdown; private HttpChannel.Listener _httpChannelListeners = HttpChannel.NOOP_LISTENER; - private CountDownLatch _stopping; private long _idleTimeout = 30000; + private long _shutdownIdleTimeout = 1000L; private String _defaultProtocol; private ConnectionFactory _defaultConnectionFactory; private String _name; @@ -286,6 +284,20 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co public void setIdleTimeout(long idleTimeout) { _idleTimeout = idleTimeout; + if (_idleTimeout == 0) + _shutdownIdleTimeout = 0; + else if (_idleTimeout < _shutdownIdleTimeout) + _shutdownIdleTimeout = Math.min(1000L, _idleTimeout); + } + + public void setShutdownIdleTimeout(long idle) + { + _shutdownIdleTimeout = idle; + } + + public long getShutdownIdleTimeout() + { + return _shutdownIdleTimeout; } /** @@ -300,7 +312,21 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co @Override protected void doStart() throws Exception { - _shutdown.cancel(); + _shutdown = new Graceful.Shutdown(this) + { + @Override + public boolean isShutdownDone() + { + if (!_endpoints.isEmpty()) + return false; + + for (Thread a : _acceptors) + if (a != null) + return false; + + return true; + } + }; if (_defaultProtocol == null) throw new IllegalStateException("No default protocol for " + this); @@ -319,7 +345,6 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co _lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _acceptors.length); super.doStart(); - _stopping = new CountDownLatch(_acceptors.length); for (int i = 0; i < _acceptors.length; i++) { Acceptor a = new Acceptor(i); @@ -343,15 +368,29 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co } @Override - public Future shutdown() + public CompletableFuture shutdown() { - return _shutdown.shutdown(); + Shutdown shutdown = _shutdown; + if (shutdown == null) + return CompletableFuture.completedFuture(null); + + // Signal for the acceptors to stop + CompletableFuture done = shutdown.shutdown(); + interruptAcceptors(); + + // Reduce the idle timeout of existing connections + for (EndPoint ep : _endpoints) + ep.setIdleTimeout(_shutdownIdleTimeout); + + // Return Future that waits for no acceptors and no connections. + return done; } @Override public boolean isShutdown() { - return _shutdown.isShutdown(); + Shutdown shutdown = _shutdown; + return shutdown == null || shutdown.isShutdown(); } @Override @@ -362,20 +401,11 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co // Tell the acceptors we are stopping interruptAcceptors(); - - // If we have a stop timeout - long stopTimeout = getStopTimeout(); - CountDownLatch stopping = _stopping; - if (stopTimeout > 0 && stopping != null && getAcceptors() > 0) - stopping.await(stopTimeout, TimeUnit.MILLISECONDS); - _stopping = null; - super.doStop(); - for (Acceptor a : getBeans(Acceptor.class)) - { removeBean(a); - } + + _shutdown = null; LOG.info("Stopped {}", this); } @@ -681,7 +711,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co try { - while (isRunning()) + while (isRunning() && !_shutdown.isShutdown()) { try (AutoLock lock = _lock.lock()) { @@ -717,9 +747,9 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co { _acceptors[_id] = null; } - CountDownLatch stopping = _stopping; - if (stopping != null) - stopping.countDown(); + Shutdown shutdown = _shutdown; + if (shutdown != null) + shutdown.check(); } } @@ -747,6 +777,9 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co protected void onEndPointClosed(EndPoint endp) { _endpoints.remove(endp); + Shutdown shutdown = _shutdown; + if (shutdown != null) + shutdown.check(); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetworkConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetworkConnector.java index 034baf2f5fc..d7e7c5d5131 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetworkConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetworkConnector.java @@ -19,8 +19,8 @@ package org.eclipse.jetty.server; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.annotation.ManagedAttribute; @@ -99,7 +99,7 @@ public abstract class AbstractNetworkConnector extends AbstractConnector impleme } @Override - public Future shutdown() + public CompletableFuture shutdown() { close(); return super.shutdown(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index aedd7c970fe..fe8c0073703 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -98,6 +98,8 @@ public class LocalConnector extends AbstractConnector { if (!isStarted()) throw new IllegalStateException("!STARTED"); + if (isShutdown()) + throw new IllegalStateException("Shutdown"); LocalEndPoint endp = new LocalEndPoint(); endp.addInput(rawRequest); _connects.add(endp); @@ -412,7 +414,7 @@ public class LocalConnector extends AbstractConnector else { chunk = waitForOutput(time, unit); - if (BufferUtil.isEmpty(chunk) && (!isOpen() || isOutputShutdown())) + if (BufferUtil.isEmpty(chunk) && (!isOpen() || isOutputShutdown() || isShutdown())) { parser.atEOF(); parser.parseNext(BufferUtil.EMPTY_BUFFER); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java index d1fe91d02a2..bc0063c8e5f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java @@ -27,7 +27,7 @@ import java.util.Arrays; import java.util.Enumeration; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -44,7 +44,6 @@ import org.eclipse.jetty.http.PreEncodedHttpField; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ErrorHandler; import org.eclipse.jetty.server.handler.HandlerWrapper; -import org.eclipse.jetty.server.handler.StatisticsHandler; import org.eclipse.jetty.util.Attributes; import org.eclipse.jetty.util.Jetty; import org.eclipse.jetty.util.MultiException; @@ -54,6 +53,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.component.AttributeContainerMap; +import org.eclipse.jetty.util.component.Graceful; import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -86,6 +86,7 @@ public class Server extends HandlerWrapper implements Attributes private boolean _dryRun; private final AutoLock _dateLock = new AutoLock(); private volatile DateField _dateField; + private long _stopTimeout; public Server() { @@ -173,24 +174,21 @@ public class Server extends HandlerWrapper implements Attributes return Jetty.VERSION; } + public void setStopTimeout(long stopTimeout) + { + _stopTimeout = stopTimeout; + } + + public long getStopTimeout() + { + return _stopTimeout; + } + public boolean getStopAtShutdown() { return _stopAtShutdown; } - /** - * Set a graceful stop time. - * The {@link StatisticsHandler} must be configured so that open connections can - * be tracked for a graceful shutdown. - * - * @see org.eclipse.jetty.util.component.ContainerLifeCycle#setStopTimeout(long) - */ - @Override - public void setStopTimeout(long stopTimeout) - { - super.setStopTimeout(stopTimeout); - } - /** * Set stop server at shutdown behaviour. * @@ -466,21 +464,20 @@ public class Server extends HandlerWrapper implements Attributes MultiException mex = new MultiException(); - try + if (getStopTimeout() > 0) { - // list if graceful futures - List> futures = new ArrayList<>(); - // First shutdown the network connectors to stop accepting new connections - for (Connector connector : _connectors) + long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(getStopTimeout()); + try { - futures.add(connector.shutdown()); + Graceful.shutdown(this).get(getStopTimeout(), TimeUnit.MILLISECONDS); } - // then shutdown all graceful handlers - doShutdown(futures); - } - catch (Throwable e) - { - mex.add(e); + catch (Throwable e) + { + mex.add(e); + } + QueuedThreadPool qtp = getBean(QueuedThreadPool.class); + if (qtp != null) + qtp.setStopTimeout(Math.max(1000L, TimeUnit.NANOSECONDS.toMillis(end - System.nanoTime()))); } // Now stop the connectors (this will close existing connections) @@ -708,7 +705,7 @@ public class Server extends HandlerWrapper implements Attributes @Override public String toString() { - return String.format("%s[%s]", super.toString(), getVersion()); + return String.format("%s[%s,sto=%d]", super.toString(), getVersion(), getStopTimeout()); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/AbstractHandlerContainer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/AbstractHandlerContainer.java index 9cf3ca852f3..6e7f126f39f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/AbstractHandlerContainer.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/AbstractHandlerContainer.java @@ -20,17 +20,11 @@ package org.eclipse.jetty.server.handler; import java.util.ArrayList; import java.util.Arrays; -import java.util.Date; import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HandlerContainer; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.MultiException; -import org.eclipse.jetty.util.component.Graceful; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -138,62 +132,4 @@ public abstract class AbstractHandlerContainer extends AbstractHandler implement h.setServer(server); } } - - /** - * Shutdown nested Gracefule handlers - * - * @param futures A list of Futures which must also be waited on for the shutdown (or null) - * returns A MultiException to which any failures are added or null - */ - protected void doShutdown(List> futures) throws MultiException - { - MultiException mex = null; - - // tell the graceful handlers that we are shutting down - Handler[] gracefuls = getChildHandlersByClass(Graceful.class); - if (futures == null) - futures = new ArrayList<>(gracefuls.length); - for (Handler graceful : gracefuls) - { - futures.add(((Graceful)graceful).shutdown()); - } - - // Wait for all futures with a reducing time budget - long stopTimeout = getStopTimeout(); - if (stopTimeout > 0) - { - long stopBy = System.currentTimeMillis() + stopTimeout; - if (LOG.isDebugEnabled()) - LOG.debug("Graceful shutdown {} by ", this, new Date(stopBy)); - - // Wait for shutdowns - for (Future future : futures) - { - try - { - if (!future.isDone()) - future.get(Math.max(1L, stopBy - System.currentTimeMillis()), TimeUnit.MILLISECONDS); - } - catch (Exception e) - { - // If the future is also a callback, fail it here (rather than cancel) so we can capture the exception - if (future instanceof Callback && !future.isDone()) - ((Callback)future).failed(e); - if (mex == null) - mex = new MultiException(); - mex.add(e); - } - } - } - - // Cancel any shutdowns not done - for (Future future : futures) - { - if (!future.isDone()) - future.cancel(true); - } - - if (mex != null) - mex.ifExceptionThrowMulti(); - } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java index 8be5b3f42a3..4f3e2050d4e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java @@ -39,8 +39,8 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Future; import javax.servlet.DispatcherType; import javax.servlet.Filter; import javax.servlet.FilterRegistration; @@ -77,7 +77,6 @@ import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.Attributes; import org.eclipse.jetty.util.AttributesMap; -import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.Loader; import org.eclipse.jetty.util.MultiException; import org.eclipse.jetty.util.StringUtil; @@ -181,19 +180,16 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu private String _defaultResponseCharacterEncoding; private String _contextPath = "/"; private String _contextPathEncoded = "/"; - private String _displayName; - + private long _stopTimeout; private Resource _baseResource; private MimeTypes _mimeTypes; private Map _localeEncodingMap; private String[] _welcomeFiles; private ErrorHandler _errorHandler; - private String[] _vhosts; // Host name portion, matching _vconnectors array private boolean[] _vhostswildcard; private String[] _vconnectors; // connector portion, matching _vhosts array - private Logger _logger; private boolean _allowNullPathInfo; private int _maxFormKeys = Integer.getInteger(MAX_FORM_KEYS_KEY, DEFAULT_MAX_FORM_KEYS); @@ -709,10 +705,12 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu * requests can complete, but no new requests are accepted. */ @Override - public Future shutdown() + public CompletableFuture shutdown() { _availability = isRunning() ? Availability.SHUTDOWN : Availability.UNAVAILABLE; - return new FutureCallback(true); + CompletableFuture shutdown = new CompletableFuture(); + shutdown.complete(null); + return shutdown; } /** @@ -915,18 +913,6 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu // Should we attempt a graceful shutdown? MultiException mex = null; - if (getStopTimeout() > 0) - { - try - { - doShutdown(null); - } - catch (MultiException e) - { - mex = e; - } - } - _availability = Availability.UNAVAILABLE; ClassLoader oldClassloader = null; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java index 8b7a3d8edeb..9c85d0054df 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java @@ -19,7 +19,7 @@ package org.eclipse.jetty.server.handler; import java.io.IOException; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -35,7 +35,6 @@ import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpChannelState; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; -import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; @@ -50,6 +49,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful { private static final Logger LOG = Log.getLogger(StatisticsHandler.class); private final AtomicLong _statsStartedAt = new AtomicLong(); + private volatile Shutdown _shutdown; private final CounterStatistic _requestStats = new CounterStatistic(); private final SampleStatistic _requestTimeStats = new SampleStatistic(); @@ -67,15 +67,6 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful private final LongAdder _responses5xx = new LongAdder(); private final LongAdder _responsesTotalBytes = new LongAdder(); - private final Graceful.Shutdown _shutdown = new Graceful.Shutdown() - { - @Override - protected FutureCallback newShutdownCallback() - { - return new FutureCallback(_requestStats.getCurrent() == 0); - } - }; - private final AtomicBoolean _wrapWarning = new AtomicBoolean(); private final AsyncListener _onCompletion = new AsyncListener() @@ -115,9 +106,9 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful // If we have no more dispatches, should we signal shutdown? if (d == 0) { - FutureCallback shutdown = _shutdown.get(); + Shutdown shutdown = _shutdown; if (shutdown != null) - shutdown.succeeded(); + shutdown.check(); } } }; @@ -204,12 +195,12 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful updateResponse(baseRequest); // If we have no more dispatches, should we signal shutdown? - FutureCallback shutdown = _shutdown.get(); + Shutdown shutdown = _shutdown; if (shutdown != null) { response.flushBuffer(); if (d == 0) - shutdown.succeeded(); + shutdown.check(); } } // else onCompletion will handle it. @@ -251,7 +242,14 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful @Override protected void doStart() throws Exception { - _shutdown.cancel(); + _shutdown = new Shutdown(this) + { + @Override + public boolean isShutdownDone() + { + return _requestStats.getCurrent() == 0; + } + }; super.doStart(); statsReset(); } @@ -259,8 +257,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful @Override protected void doStop() throws Exception { - _shutdown.cancel(); super.doStop(); + _shutdown = null; } /** @@ -576,15 +574,19 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful } @Override - public Future shutdown() + public CompletableFuture shutdown() { - return _shutdown.shutdown(); + Shutdown shutdown = _shutdown; + if (shutdown == null) + return CompletableFuture.completedFuture(null); + return shutdown.shutdown(); } @Override public boolean isShutdown() { - return _shutdown.isShutdown(); + Shutdown shutdown = _shutdown; + return shutdown == null || shutdown.isShutdown(); } @Override @@ -592,4 +594,5 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful { return String.format("%s@%x{%s,r=%d,d=%d}", getClass().getSimpleName(), hashCode(), getState(), _requestStats.getCurrent(), _dispatchedStats.getCurrent()); } + } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java index 7368b5ea419..d8b18a4684f 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java @@ -18,845 +18,408 @@ package org.eclipse.jetty.server; -import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.ConnectException; import java.net.Socket; -import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Exchanger; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.io.Connection; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.server.LocalConnector.LocalEndPoint; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.tools.HttpTester; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.server.handler.HandlerList; import org.eclipse.jetty.server.handler.StatisticsHandler; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.component.Graceful; import org.eclipse.jetty.util.component.LifeCycle; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.hamcrest.Matcher; import org.hamcrest.Matchers; -import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledOnOs; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.startsWith; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; -import static org.junit.jupiter.api.condition.OS.WINDOWS; public class GracefulStopTest { - /** - * Test of standard graceful timeout mechanism when a block request does - * not complete - * - * @throws Exception on test failure - */ - @Test - public void testGracefulNoWaiter() throws Exception - { - Server server = new Server(); - server.setStopTimeout(1000); - - ServerConnector connector = new ServerConnector(server); - connector.setPort(0); - server.addConnector(connector); - - TestHandler handler = new TestHandler(); - server.setHandler(handler); - - server.start(); - final int port = connector.getLocalPort(); - Socket client = new Socket("127.0.0.1", port); - client.getOutputStream().write(( - "POST / HTTP/1.0\r\n" + - "Host: localhost:" + port + "\r\n" + + static byte[] POST_12345 = ("POST / HTTP/1.1\r\n" + + "Host: localhost\r\n" + "Content-Type: plain/text\r\n" + "Content-Length: 10\r\n" + "\r\n" + - "12345" - ).getBytes()); - client.getOutputStream().flush(); - handler.latch.await(); + "12345").getBytes(StandardCharsets.ISO_8859_1); + static byte[] POST_A_12345 = ("POST /a/ HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Content-Type: plain/text\r\n" + + "Content-Length: 10\r\n" + + "\r\n" + + "12345").getBytes(StandardCharsets.ISO_8859_1); + + static byte[] POST_B_12345 = ("POST /b/ HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Content-Type: plain/text\r\n" + + "Content-Length: 10\r\n" + + "\r\n" + + "12345").getBytes(StandardCharsets.ISO_8859_1); + + static byte[] POST_12345_C = ("POST /?commit=true HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Content-Type: plain/text\r\n" + + "Content-Length: 10\r\n" + + "\r\n" + + "12345").getBytes(StandardCharsets.ISO_8859_1); + + static byte[] POST_A_12345_C = ("POST /a/?commit=true HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Content-Type: plain/text\r\n" + + "Content-Length: 10\r\n" + + "\r\n" + + "12345").getBytes(StandardCharsets.ISO_8859_1); + + static byte[] POST_B_12345_C = ("POST /b/?commit=true HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Content-Type: plain/text\r\n" + + "Content-Length: 10\r\n" + + "\r\n" + + "12345").getBytes(StandardCharsets.ISO_8859_1); + + static byte[] BODY_67890 = "67890".getBytes(StandardCharsets.ISO_8859_1); + + Server server = new Server(); + ServerConnector connector = new ServerConnector(server); + HandlerList handlers = new HandlerList(); + ContextHandlerCollection contexts = new ContextHandlerCollection(); + ContextHandler contextA = new ContextHandler(contexts, "/a"); + TestHandler handlerA = new TestHandler(); + ContextHandler contextB = new ContextHandler(contexts, "/b"); + StatisticsHandler statsB = new StatisticsHandler(); + TestHandler handlerB = new TestHandler(); + TestHandler handler = new TestHandler(); + + @BeforeEach + public void beforeEach() throws Exception + { + connector.setIdleTimeout(10000); + connector.setShutdownIdleTimeout(1000); + connector.setPort(0); + server.addConnector(connector); + + server.setHandler(handlers); + handlers.addHandler(contexts); + handlers.addHandler(handler); + + contextA.setHandler(handlerA); + + contextB.setHandler(statsB); + statsB.setHandler(handlerB); + + server.setStopTimeout(10000); + + server.start(); + } + + Socket newClientBusy(byte[] post, TestHandler handler) throws Exception + { + handler.latch = new CountDownLatch(1); + final int port = connector.getLocalPort(); + Socket client = new Socket("127.0.0.1", port); + client.getOutputStream().write(post); + client.getOutputStream().flush(); + assertTrue(handler.latch.await(5, TimeUnit.SECONDS)); + return client; + } + + Socket newClientIdle(byte[] post, TestHandler handler) throws Exception + { + handler.latch = new CountDownLatch(1); + final int port = connector.getLocalPort(); + Socket client = new Socket("127.0.0.1", port); + client.getOutputStream().write(post); + client.getOutputStream().write(BODY_67890); + client.getOutputStream().flush(); + assertTrue(handler.latch.await(5, TimeUnit.SECONDS)); + + HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), is("read 10/10\n")); + assertThat(response.get(HttpHeader.CONNECTION), nullValue()); + + return client; + } + + void assertAvailable(Socket client, byte[] post, TestHandler handler) throws Exception + { + handler.latch = new CountDownLatch(1); + client.getOutputStream().write(post); + client.getOutputStream().write(BODY_67890); + client.getOutputStream().flush(); + assertTrue(handler.latch.await(5, TimeUnit.SECONDS)); + + HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); + assertThat(response.getStatus(), is(200)); + assertThat(response.getContent(), is("read 10/10\n")); + assertThat(response.get(HttpHeader.CONNECTION), nullValue()); + } + + Future backgroundUnavailable(Socket client, byte[] post, ContextHandler context, TestHandler handler) throws Exception + { + FuturePromise future = new FuturePromise<>(); + long start = System.nanoTime(); + new Thread(() -> + { + try + { + while (context.isAvailable()) + { + assertThat(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), lessThan(5000L)); + Thread.sleep(100); + } + + client.getOutputStream().write(post); + client.getOutputStream().write(BODY_67890); + client.getOutputStream().flush(); + HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); + + future.succeeded(response.getStatus()); + } + catch (Exception e) + { + future.failed(e); + } + }).start(); + + return future; + } + + void assertQuickStop() throws Exception + { long start = System.nanoTime(); server.stop(); long stop = System.nanoTime(); - - // No Graceful waiters - assertThat(TimeUnit.NANOSECONDS.toMillis(stop - start), lessThan(900L)); - - assertThat(client.getInputStream().read(), is(-1)); - assertThat(handler.handling.get(), is(false)); - assertThat(handler.thrown.get(), Matchers.notNullValue()); - client.close(); + long duration = TimeUnit.NANOSECONDS.toMillis(stop - start); + assertThat(duration, lessThan(2000L)); } - /** - * Test of standard graceful timeout mechanism when a block request does - * not complete - * - * @throws Exception on test failure - */ - @Test - public void testGracefulTimeout() throws Exception + void assertGracefulStop(LifeCycle lifecycle) throws Exception { - Server server = new Server(); - server.setStopTimeout(1000); - - ServerConnector connector = new ServerConnector(server); - connector.setPort(0); - server.addConnector(connector); - - TestHandler handler = new TestHandler(); - StatisticsHandler stats = new StatisticsHandler(); - server.setHandler(stats); - stats.setHandler(handler); - - server.start(); - final int port = connector.getLocalPort(); - Socket client = new Socket("127.0.0.1", port); - client.getOutputStream().write(( - "POST / HTTP/1.0\r\n" + - "Host: localhost:" + port + "\r\n" + - "Content-Type: plain/text\r\n" + - "Content-Length: 10\r\n" + - "\r\n" + - "12345" - ).getBytes()); - client.getOutputStream().flush(); - handler.latch.await(); - long start = System.nanoTime(); - - assertThrows(TimeoutException.class, () -> server.stop()); - + lifecycle.stop(); long stop = System.nanoTime(); - // No Graceful waiters - assertThat(TimeUnit.NANOSECONDS.toMillis(stop - start), greaterThan(900L)); - - assertThat(client.getInputStream().read(), is(-1)); - - assertThat(handler.handling.get(), is(false)); - assertThat(handler.thrown.get(), instanceOf(ClosedChannelException.class)); - - client.close(); + long duration = TimeUnit.NANOSECONDS.toMillis(stop - start); + assertThat(duration, greaterThan(50L)); + assertThat(duration, lessThan(5000L)); } - /** - * Test completed writes during shutdown do not close output - * - * @throws Exception on test failure - */ - @Test - public void testWriteDuringShutdown() throws Exception + void assertResponse(Socket client, boolean close) throws Exception { - Server server = new Server(); - server.setStopTimeout(1000); - - ServerConnector connector = new ServerConnector(server); - connector.setPort(0); - server.addConnector(connector); - - ABHandler handler = new ABHandler(); - StatisticsHandler stats = new StatisticsHandler(); - server.setHandler(stats); - stats.setHandler(handler); - - server.start(); - - Thread stopper = new Thread(() -> - { - try - { - handler.latchA.await(); - server.stop(); - } - catch (Exception e) - { - e.printStackTrace(); - } - }); - stopper.start(); - - final int port = connector.getLocalPort(); - try (Socket client = new Socket("127.0.0.1", port)) - { - client.getOutputStream().write(( - "GET / HTTP/1.1\r\n" + - "Host: localhost:" + port + "\r\n" + - "\r\n" - ).getBytes()); - client.getOutputStream().flush(); - - while (!connector.isShutdown()) - { - Thread.sleep(10); - } - - handler.latchB.countDown(); - - String response = IO.toString(client.getInputStream()); - assertThat(response, startsWith("HTTP/1.1 200 ")); - assertThat(response, containsString("Content-Length: 2")); - assertThat(response, containsString("Connection: close")); - assertThat(response, endsWith("ab")); - } - stopper.join(); + HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); + assertThat(response.getStatus(), is(200)); + if (close) + assertThat(response.get(HttpHeader.CONNECTION), is("close")); + else + assertThat(response.get(HttpHeader.CONNECTION), nullValue()); + assertThat(response.getContent(), is("read 10/10\n")); } - /** - * Test of standard graceful timeout mechanism when a block request does - * complete. Note that even though the request completes after 100ms, the - * stop always takes 1000ms - * - * @throws Exception on test failure - */ - @Test - @DisabledOnOs(WINDOWS) // TODO: needs more investigation - public void testGracefulComplete() throws Exception + void assert500Response(Socket client) throws Exception { - Server server = new Server(); - server.setStopTimeout(10000); - - ServerConnector connector = new ServerConnector(server); - connector.setPort(0); - server.addConnector(connector); - - TestHandler handler = new TestHandler(); - StatisticsHandler stats = new StatisticsHandler(); - server.setHandler(stats); - stats.setHandler(handler); - - server.start(); - final int port = connector.getLocalPort(); - - try (final Socket client1 = new Socket("127.0.0.1", port); - final Socket client2 = new Socket("127.0.0.1", port)) + HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); + if (response != null) { - client1.getOutputStream().write(( - "POST / HTTP/1.0\r\n" + - "Host: localhost:" + port + "\r\n" + - "Content-Type: plain/text\r\n" + - "Content-Length: 10\r\n" + - "\r\n" + - "12345" - ).getBytes()); - client1.getOutputStream().flush(); - handler.latch.await(); - - new Thread() - { - @Override - public void run() - { - long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - long end = now + 500; - - try - { - Thread.sleep(100); - - // Try creating a new connection - try - { - try (Socket s = new Socket("127.0.0.1", port)) - { - // no op - } - throw new IllegalStateException(); - } - catch (ConnectException e) - { - // no op - } - - // Try another request on existing connection - - client2.getOutputStream().write(( - "GET / HTTP/1.0\r\n" + - "Host: localhost:" + port + "\r\n" + - "\r\n" - ).getBytes()); - client2.getOutputStream().flush(); - String response2 = IO.toString(client2.getInputStream()); - assertThat(response2, containsString(" 503 ")); - - now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - Thread.sleep(Math.max(1, end - now)); - client1.getOutputStream().write("567890".getBytes()); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - }.start(); - - long start = System.nanoTime(); - server.stop(); - long stop = System.nanoTime(); - assertThat(TimeUnit.NANOSECONDS.toMillis(stop - start), greaterThan(490L)); - assertThat(TimeUnit.NANOSECONDS.toMillis(stop - start), lessThan(10000L)); - - String response = IO.toString(client1.getInputStream()); - - assertThat(handler.handling.get(), is(false)); - assertThat(response, containsString(" 200 OK")); - assertThat(response, containsString("read 10/10")); - - assertThat(stats.getRequests(), is(2)); - assertThat(stats.getResponses5xx(), is(1)); + assertThat(response.getStatus(), is(500)); + assertThat(response.get(HttpHeader.CONNECTION), is("close")); } } - public void testSlowClose(long stopTimeout, long closeWait, Matcher stopTimeMatcher) throws Exception + void assertQuickClose(Socket client) throws Exception { - Server server = new Server(); - server.setStopTimeout(stopTimeout); - - CountDownLatch closed = new CountDownLatch(1); - ServerConnector connector = new ServerConnector(server, 2, 2, new HttpConnectionFactory() - { - - @Override - public Connection newConnection(Connector con, EndPoint endPoint) - { - // Slow closing connection - HttpConnection conn = new HttpConnection(getHttpConfiguration(), con, endPoint, isRecordHttpComplianceViolations()) - { - @Override - public void close() - { - try - { - new Thread(() -> - { - try - { - Thread.sleep(closeWait); - } - catch (InterruptedException e) - { - // no op - } - finally - { - super.close(); - } - }).start(); - } - catch (Exception e) - { - // e.printStackTrace(); - } - finally - { - closed.countDown(); - } - } - }; - return configure(conn, con, endPoint); - } - }); - connector.setPort(0); - server.addConnector(connector); - - NoopHandler handler = new NoopHandler(); - server.setHandler(handler); - - server.start(); - final int port = connector.getLocalPort(); - Socket client = new Socket("127.0.0.1", port); - client.setSoTimeout(10000); - client.getOutputStream().write(( - "GET / HTTP/1.1\r\n" + - "Host: localhost:" + port + "\r\n" + - "Content-Type: plain/text\r\n" + - "\r\n" - ).getBytes()); - client.getOutputStream().flush(); - handler.latch.await(); - - // look for a response - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream(), StandardCharsets.ISO_8859_1)); - while (true) - { - String line = in.readLine(); - assertThat("Line should not be null", line, is(notNullValue())); - if (line.length() == 0) - break; - } - long start = System.nanoTime(); - try - { - server.stop(); - assertTrue(stopTimeout == 0 || stopTimeout > closeWait); - } - catch (Exception e) - { - assertTrue(stopTimeout > 0 && stopTimeout < closeWait); - } + assertThat(client.getInputStream().read(), is(-1)); long stop = System.nanoTime(); - - // Check stop time was correct - assertThat(TimeUnit.NANOSECONDS.toMillis(stop - start), stopTimeMatcher); - - // Connection closed - while (true) - { - int r = client.getInputStream().read(); - if (r == -1) - break; - } - - // onClose Thread interrupted or completed - if (stopTimeout > 0) - assertTrue(closed.await(1000, TimeUnit.MILLISECONDS)); - - if (!client.isClosed()) - client.close(); + long duration = TimeUnit.NANOSECONDS.toMillis(stop - start); + assertThat(duration, lessThan(2000L)); } - /** - * Test of non graceful stop when a connection close is slow - * - * @throws Exception on test failure - */ - @Test - public void testSlowCloseNotGraceful() throws Exception + void assertHandled(TestHandler handler, boolean error) { - Log.getLogger(QueuedThreadPool.class).info("Expect some threads can't be stopped"); - testSlowClose(0, 5000, lessThan(750L)); + assertThat(handler.handling.get(), is(false)); + if (error) + assertThat(handler.thrown.get(), Matchers.notNullValue()); + else + assertThat(handler.thrown.get(), Matchers.nullValue()); } - /** - * Test of graceful stop when close is slower than timeout - * - * @throws Exception on test failure - */ - @Test - @Disabled // TODO disable while #2046 is fixed - public void testSlowCloseTinyGraceful() throws Exception + void backgroundComplete(Socket client, TestHandler handler) throws Exception { - Log.getLogger(QueuedThreadPool.class).info("Expect some threads can't be stopped"); - testSlowClose(1, 5000, lessThan(1500L)); - } - - /** - * Test of graceful stop when close is faster than timeout; - * - * @throws Exception on test failure - */ - @Test - @Disabled // TODO disable while #2046 is fixed - public void testSlowCloseGraceful() throws Exception - { - testSlowClose(5000, 1000, Matchers.allOf(greaterThan(750L), lessThan(4999L))); - } - - @Test - public void testResponsesAreClosed() throws Exception - { - Server server = new Server(); - - LocalConnector connector = new LocalConnector(server); - server.addConnector(connector); - - StatisticsHandler stats = new StatisticsHandler(); - server.setHandler(stats); - - ContextHandler context = new ContextHandler(stats, "/"); - - Exchanger exchanger0 = new Exchanger<>(); - Exchanger exchanger1 = new Exchanger<>(); - context.setHandler(new AbstractHandler() - { - @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException - { - baseRequest.setHandled(true); - response.setStatus(200); - response.setContentLength(13); - response.flushBuffer(); - - try - { - exchanger0.exchange(null); - exchanger1.exchange(null); - } - catch (Throwable x) - { - throw new ServletException(x); - } - - response.getOutputStream().print("The Response\n"); - } - }); - - server.setStopTimeout(1000); - server.start(); - - LocalEndPoint endp = connector.executeRequest("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n"); - - exchanger0.exchange(null); - exchanger1.exchange(null); - - String response = endp.getResponse(); - assertThat(response, containsString("200 OK")); - - endp.addInputAndExecute(BufferUtil.toBuffer("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n")); - - exchanger0.exchange(null); - - server.getConnectors()[0].shutdown().get(); - - // Check completed 200 does not have close - exchanger1.exchange(null); - response = endp.getResponse(); - assertThat(response, containsString("200 OK")); - assertThat(response, Matchers.not(containsString("Connection: close"))); - - // But endpoint is still closes soon after - long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(1); - while (endp.isOpen() && System.nanoTime() < end) - { - Thread.sleep(10); - } - assertFalse(endp.isOpen()); - } - - @Test - public void testCommittedResponsesAreClosed() throws Exception - { - Server server = new Server(); - - LocalConnector connector = new LocalConnector(server); - server.addConnector(connector); - - StatisticsHandler stats = new StatisticsHandler(); - server.setHandler(stats); - - ContextHandler context = new ContextHandler(stats, "/"); - - Exchanger exchanger0 = new Exchanger<>(); - Exchanger exchanger1 = new Exchanger<>(); - context.setHandler(new AbstractHandler() - { - @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException - { - try - { - exchanger0.exchange(null); - exchanger1.exchange(null); - } - catch (Throwable x) - { - throw new ServletException(x); - } - - baseRequest.setHandled(true); - response.setStatus(200); - response.getWriter().println("The Response"); - response.getWriter().close(); - } - }); - - server.setStopTimeout(1000); - server.start(); - - LocalEndPoint endp = connector.executeRequest( - "GET / HTTP/1.1\r\n" + - "Host: localhost\r\n" + - "\r\n" - ); - - exchanger0.exchange(null); - exchanger1.exchange(null); - - String response = endp.getResponse(); - assertThat(response, containsString("200 OK")); - assertThat(response, Matchers.not(containsString("Connection: close"))); - - endp.addInputAndExecute(BufferUtil.toBuffer("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n")); - - exchanger0.exchange(null); - - CountDownLatch latch = new CountDownLatch(1); + long start = System.nanoTime(); new Thread(() -> { try { - server.stop(); - latch.countDown(); + handler.latch.await(); + long now = System.nanoTime(); + Thread.sleep(100 - TimeUnit.NANOSECONDS.toMillis(now - start)); + client.getOutputStream().write(BODY_67890); } catch (Exception e) { e.printStackTrace(); } }).start(); - while (server.isStarted()) - { - Thread.sleep(10); - } - - // Check new connections rejected! - String unavailable = connector.getResponse("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n"); - assertThat(unavailable, containsString(" 503 Service Unavailable")); - assertThat(unavailable, Matchers.containsString("Connection: close")); - - // Check completed 200 has close - exchanger1.exchange(null); - response = endp.getResponse(); - assertThat(response, containsString("200 OK")); - assertThat(response, Matchers.containsString("Connection: close")); - assertTrue(latch.await(10, TimeUnit.SECONDS)); } @Test - public void testContextStop() throws Exception + public void testNotGraceful() throws Exception { - Server server = new Server(); + server.setStopTimeout(0); - LocalConnector connector = new LocalConnector(server); - server.addConnector(connector); - - ContextHandler context = new ContextHandler(server, "/"); - - StatisticsHandler stats = new StatisticsHandler(); - context.setHandler(stats); - - Exchanger exchanger0 = new Exchanger<>(); - Exchanger exchanger1 = new Exchanger<>(); - stats.setHandler(new AbstractHandler() - { - @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException - { - try - { - exchanger0.exchange(null); - exchanger1.exchange(null); - } - catch (Throwable x) - { - throw new ServletException(x); - } - - baseRequest.setHandled(true); - response.setStatus(200); - response.getWriter().println("The Response"); - response.getWriter().close(); - } - }); - - context.setStopTimeout(1000); server.start(); + Socket client0 = newClientBusy(POST_12345, handler); + Socket client1 = newClientIdle(POST_12345, handler); - LocalEndPoint endp = connector.executeRequest( - "GET / HTTP/1.1\r\n" + - "Host: localhost\r\n" + - "\r\n" - ); - - exchanger0.exchange(null); - exchanger1.exchange(null); - - String response = endp.getResponse(); - assertThat(response, containsString("200 OK")); - assertThat(response, Matchers.not(containsString("Connection: close"))); - - endp.addInputAndExecute(BufferUtil.toBuffer("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n")); - exchanger0.exchange(null); - - CountDownLatch latch = new CountDownLatch(1); - new Thread(() -> - { - try - { - context.stop(); - latch.countDown(); - } - catch (Exception e) - { - e.printStackTrace(); - } - }).start(); - while (context.isStarted()) - { - Thread.sleep(10); - } - - // Check new connections accepted, but don't find context! - String unavailable = connector.getResponse("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n"); - assertThat(unavailable, containsString(" 404 Not Found")); - - // Check completed 200 does not have close - exchanger1.exchange(null); - response = endp.getResponse(); - assertThat(response, containsString("200 OK")); - assertThat(response, Matchers.not(Matchers.containsString("Connection: close"))); - assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertQuickStop(); + assertQuickClose(client0); + assertQuickClose(client1); + assertHandled(handler, true); } @Test - public void testFailedStart() + public void testGracefulConnection() throws Exception { - Server server = new Server(); + Socket client0 = newClientBusy(POST_12345, handler); + Socket client1 = newClientBusy(POST_12345_C, handler); + Socket client2 = newClientIdle(POST_12345, handler); - LocalConnector connector = new LocalConnector(server); - server.addConnector(connector); + backgroundComplete(client0, handler); + backgroundComplete(client1, handler); - ContextHandlerCollection contexts = new ContextHandlerCollection(); - server.setHandler(contexts); - AtomicBoolean context0Started = new AtomicBoolean(false); - ContextHandler context0 = new ContextHandler("/zero") - { - @Override - protected void doStart() throws Exception - { - context0Started.set(true); - } - }; - ContextHandler context1 = new ContextHandler("/one") - { - @Override - protected void doStart() throws Exception - { - throw new Exception("Test start failure"); - } - }; - AtomicBoolean context2Started = new AtomicBoolean(false); - ContextHandler context2 = new ContextHandler("/two") - { - @Override - protected void doStart() throws Exception - { - context2Started.set(true); - } - }; - contexts.setHandlers(new Handler[]{context0, context1, context2}); + assertGracefulStop(server); - try - { - server.start(); - fail(); - } - catch (Exception e) - { - assertThat(e.getMessage(), is("Test start failure")); - } - - assertTrue(server.getContainedBeans(LifeCycle.class).stream().noneMatch(LifeCycle::isRunning)); - assertTrue(server.getContainedBeans(LifeCycle.class).stream().anyMatch(LifeCycle::isFailed)); - assertTrue(context0Started.get()); - assertFalse(context2Started.get()); + assertResponse(client0, true); + assertResponse(client1, false); + assertQuickClose(client0); + assertQuickClose(client1); + assertQuickClose(client2); + assertHandled(handler, false); } - static class NoopHandler extends AbstractHandler + @Test + public void testGracefulConnectionNotComplete() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); + server.setStopTimeout(3000L); + Socket client0 = newClientBusy(POST_12345, handler); + Socket client1 = newClientBusy(POST_12345_C, handler); + Socket client2 = newClientIdle(POST_12345, handler); - NoopHandler() - { - } + assertGracefulStop(server); + + assert500Response(client0); + assert500Response(client1); + assertQuickClose(client0); + assertQuickClose(client1); + assertQuickClose(client2); + assertHandled(handler, true); + } + + @Test + public void testGracefulWithContext() throws Exception + { + Socket client0 = newClientBusy(POST_A_12345, handlerA); + Socket client1 = newClientBusy(POST_A_12345_C, handlerA); + Socket client2 = newClientIdle(POST_A_12345, handlerA); + + backgroundComplete(client0, handlerA); + backgroundComplete(client1, handlerA); + Future status2 = backgroundUnavailable(client2, POST_A_12345, contextA, handlerA); + + assertGracefulStop(server); + + assertResponse(client0, true); + assertResponse(client1, false); + assertThat(status2.get(), is(503)); + + assertQuickClose(client0); + assertQuickClose(client1); + assertQuickClose(client2); + assertHandled(handlerA, false); + } + + @Test + public void testGracefulContext() throws Exception + { + Socket client0 = newClientBusy(POST_B_12345, handlerB); + Socket client1 = newClientBusy(POST_B_12345_C, handlerB); + Socket client2 = newClientIdle(POST_B_12345, handlerB); + + backgroundComplete(client0, handlerB); + backgroundComplete(client1, handlerB); + Future status2 = backgroundUnavailable(client2, POST_B_12345, contextB, handlerB); + + Graceful.shutdown(contextB).orTimeout(10, TimeUnit.SECONDS).get(); + + assertResponse(client0, false); + assertResponse(client1, false); + assertThat(status2.get(), is(503)); + + assertAvailable(client0, POST_A_12345, handlerA); + assertAvailable(client1, POST_A_12345_C, handlerA); + assertAvailable(client2, POST_A_12345, handlerA); + + assertHandled(handlerA, false); + assertHandled(handlerB, false); + } + + static class TestHandler extends AbstractHandler + { + final AtomicReference thrown = new AtomicReference(); + final AtomicBoolean handling = new AtomicBoolean(false); + volatile CountDownLatch latch; @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - baseRequest.setHandled(true); - latch.countDown(); - } - } - - static class ABHandler extends AbstractHandler - { - final CountDownLatch latchA = new CountDownLatch(1); - final CountDownLatch latchB = new CountDownLatch(1); - - @Override - public void handle(String s, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException - { - response.setContentLength(2); - response.getOutputStream().write("a".getBytes()); - try - { - latchA.countDown(); - latchB.await(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - response.flushBuffer(); - response.getOutputStream().write("b".getBytes()); - } - } - - static class TestHandler extends AbstractHandler - { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference thrown = new AtomicReference(); - final AtomicBoolean handling = new AtomicBoolean(false); - - @Override - public void handle(String target, Request baseRequest, - HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException - { + // Log.getRootLogger().info("Handle {} / {} ? {}", request.getContextPath(), request.getPathInfo(), request.getQueryString()); handling.set(true); - latch.countDown(); + baseRequest.setHandled(true); + response.setStatus(200); + if ("true".equals(request.getParameter("commit"))) + response.flushBuffer(); + CountDownLatch l = latch; + if (l != null) + l.countDown(); int c = 0; try { int contentLength = request.getContentLength(); - InputStream in = request.getInputStream(); - - while (true) + if (contentLength > 0) { - if (in.read() < 0) - break; - c++; + InputStream in = request.getInputStream(); + while (in.read() >= 0) + { + c++; + } } - baseRequest.setHandled(true); - response.setStatus(200); response.getWriter().printf("read %d/%d%n", c, contentLength); } catch (Throwable th) { thrown.set(th); + throw th; } finally { diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/StopTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/StopTest.java new file mode 100644 index 00000000000..618398b0667 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/StopTest.java @@ -0,0 +1,548 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.server.LocalConnector.LocalEndPoint; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.server.handler.StatisticsHandler; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.FutureCallback; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class StopTest +{ + /** + * Test completed writes during shutdown do not close output + * + * @throws Exception on test failure + */ + @Test + public void testWriteDuringShutdown() throws Exception + { + Server server = new Server(); + server.setStopTimeout(1000); + + ServerConnector connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + ABHandler handler = new ABHandler(); + StatisticsHandler stats = new StatisticsHandler(); + server.setHandler(stats); + stats.setHandler(handler); + + server.start(); + + Thread stopper = new Thread(() -> + { + try + { + handler.latchA.await(); + server.stop(); + } + catch (Exception e) + { + e.printStackTrace(); + } + }); + stopper.start(); + + final int port = connector.getLocalPort(); + try (Socket client = new Socket("127.0.0.1", port)) + { + client.getOutputStream().write(( + "GET / HTTP/1.1\r\n" + + "Host: localhost:" + port + "\r\n" + + "\r\n" + ).getBytes()); + client.getOutputStream().flush(); + + while (!connector.isShutdown()) + { + Thread.sleep(10); + } + + handler.latchB.countDown(); + + String response = IO.toString(client.getInputStream()); + assertThat(response, startsWith("HTTP/1.1 200 ")); + assertThat(response, containsString("Content-Length: 2")); + assertThat(response, containsString("Connection: close")); + assertThat(response, endsWith("ab")); + } + stopper.join(); + } + + public void testSlowClose(long stopTimeout, long closeWait, Matcher stopTimeMatcher) throws Exception + { + Server server = new Server(); + server.setStopTimeout(stopTimeout); + + FutureCallback closed = new FutureCallback(); + ServerConnector connector = new ServerConnector(server, 2, 2, new HttpConnectionFactory() + { + + @Override + public Connection newConnection(Connector con, EndPoint endPoint) + { + // Slow closing connection + HttpConnection conn = new HttpConnection(getHttpConfiguration(), con, endPoint, isRecordHttpComplianceViolations()) + { + @Override + public void onClose(Throwable cause) + { + try + { + super.onClose(cause); + } + finally + { + if (cause == null) + closed.succeeded(); + else + closed.failed(cause); + } + } + + @Override + public void close() + { + long start = System.nanoTime(); + new Thread(() -> + { + try + { + Thread.sleep(closeWait - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + } + catch (Throwable e) + { + // no op + } + finally + { + super.close(); + } + }).start(); + } + }; + return configure(conn, con, endPoint); + } + }); + connector.setPort(0); + server.addConnector(connector); + + NoopHandler handler = new NoopHandler(); + server.setHandler(handler); + + server.start(); + final int port = connector.getLocalPort(); + Socket client = new Socket("127.0.0.1", port); + client.setSoTimeout(10000); + client.getOutputStream().write(( + "GET / HTTP/1.1\r\n" + + "Host: localhost:" + port + "\r\n" + + "Content-Type: plain/text\r\n" + + "\r\n" + ).getBytes()); + client.getOutputStream().flush(); + handler.latch.await(); + + // look for a response + BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream(), StandardCharsets.ISO_8859_1)); + while (true) + { + String line = in.readLine(); + assertThat("Line should not be null", line, is(notNullValue())); + if (line.length() == 0) + break; + } + + long start = System.nanoTime(); + try + { + server.stop(); + assertTrue(stopTimeout == 0 || stopTimeout > closeWait); + } + catch (Exception e) + { + assertTrue(stopTimeout > 0 && stopTimeout < closeWait); + } + long stop = System.nanoTime(); + + // Check stop time was correct + assertThat(TimeUnit.NANOSECONDS.toMillis(stop - start), stopTimeMatcher); + + // Connection closed + while (true) + { + int r = client.getInputStream().read(); + if (r == -1) + break; + assertThat(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start), lessThan(10L)); + } + + // onClose Thread interrupted or completed + closed.get(Math.max(closeWait, stopTimeout) + 1000, TimeUnit.MILLISECONDS); + + if (!client.isClosed()) + client.close(); + } + + /** + * Test of non graceful stop when a connection close is slow + * + * @throws Exception on test failure + */ + @Test + public void testSlowCloseNotGraceful() throws Exception + { + Log.getLogger(QueuedThreadPool.class).info("Expect some threads can't be stopped"); + testSlowClose(0, 5000, lessThan(750L)); + } + + /** + * Test of graceful stop when close is slower than timeout + * + * @throws Exception on test failure + */ + @Test + public void testSlowCloseTinyGraceful() throws Exception + { + Log.getLogger(QueuedThreadPool.class).info("Expect some threads can't be stopped"); + testSlowClose(1, 5000, lessThan(1500L)); + } + + /** + * Test of graceful stop when close is faster than timeout; + * + * @throws Exception on test failure + */ + @Test + public void testSlowCloseGraceful() throws Exception + { + testSlowClose(5000, 1000, Matchers.allOf(greaterThan(750L), lessThan(4999L))); + } + + @Test + public void testCommittedResponsesAreClosed() throws Exception + { + Server server = new Server(); + + LocalConnector connector = new LocalConnector(server); + server.addConnector(connector); + + StatisticsHandler stats = new StatisticsHandler(); + server.setHandler(stats); + + ContextHandler context = new ContextHandler(stats, "/"); + + Exchanger exchanger0 = new Exchanger<>(); + Exchanger exchanger1 = new Exchanger<>(); + context.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException + { + try + { + exchanger0.exchange(null); + exchanger1.exchange(null); + } + catch (Throwable x) + { + throw new ServletException(x); + } + + baseRequest.setHandled(true); + response.setStatus(200); + response.getWriter().println("The Response"); + response.getWriter().close(); + } + }); + + server.setStopTimeout(1000); + server.start(); + + LocalEndPoint endp = connector.executeRequest( + "GET / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "\r\n" + ); + + exchanger0.exchange(null); + exchanger1.exchange(null); + + String response = endp.getResponse(); + assertThat(response, containsString("200 OK")); + assertThat(response, Matchers.not(containsString("Connection: close"))); + + endp.addInputAndExecute(BufferUtil.toBuffer("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n")); + + exchanger0.exchange(null); + + FutureCallback stopped = new FutureCallback(); + new Thread(() -> + { + try + { + server.stop(); + stopped.succeeded(); + } + catch (Throwable e) + { + stopped.failed(e); + } + }).start(); + + long start = System.nanoTime(); + while (!connector.isShutdown()) + { + assertThat(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start), lessThan(10L)); + Thread.sleep(10); + } + + // Check new connections rejected! + assertThrows(IllegalStateException.class, () -> connector.getResponse("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n")); + + // Check completed 200 has close + exchanger1.exchange(null); + response = endp.getResponse(); + assertThat(response, containsString("200 OK")); + assertThat(response, Matchers.containsString("Connection: close")); + stopped.get(10, TimeUnit.SECONDS); + } + + @Test + public void testContextStop() throws Exception + { + Server server = new Server(); + + LocalConnector connector = new LocalConnector(server); + server.addConnector(connector); + + ContextHandler context = new ContextHandler(server, "/"); + + StatisticsHandler stats = new StatisticsHandler(); + context.setHandler(stats); + + Exchanger exchanger0 = new Exchanger<>(); + Exchanger exchanger1 = new Exchanger<>(); + stats.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException + { + try + { + exchanger0.exchange(null); + exchanger1.exchange(null); + } + catch (Throwable x) + { + throw new ServletException(x); + } + + baseRequest.setHandled(true); + response.setStatus(200); + response.getWriter().println("The Response"); + response.getWriter().close(); + } + }); + + server.start(); + + LocalEndPoint endp = connector.executeRequest( + "GET / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "\r\n" + ); + + exchanger0.exchange(null); + exchanger1.exchange(null); + + String response = endp.getResponse(); + assertThat(response, containsString("200 OK")); + assertThat(response, Matchers.not(containsString("Connection: close"))); + + endp.addInputAndExecute(BufferUtil.toBuffer("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n")); + exchanger0.exchange(null); + + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> + { + try + { + context.stop(); + latch.countDown(); + } + catch (Exception e) + { + e.printStackTrace(); + } + }).start(); + while (context.isStarted()) + { + Thread.sleep(10); + } + + // Check new connections accepted, but don't find context! + String unavailable = connector.getResponse("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n"); + assertThat(unavailable, containsString(" 404 Not Found")); + + // Check completed 200 does not have close + exchanger1.exchange(null); + response = endp.getResponse(); + assertThat(response, containsString("200 OK")); + assertThat(response, Matchers.not(Matchers.containsString("Connection: close"))); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } + + @Test + public void testFailedStart() + { + Server server = new Server(); + + LocalConnector connector = new LocalConnector(server); + server.addConnector(connector); + + ContextHandlerCollection contexts = new ContextHandlerCollection(); + server.setHandler(contexts); + AtomicBoolean context0Started = new AtomicBoolean(false); + ContextHandler context0 = new ContextHandler("/zero") + { + @Override + protected void doStart() throws Exception + { + context0Started.set(true); + } + }; + ContextHandler context1 = new ContextHandler("/one") + { + @Override + protected void doStart() throws Exception + { + throw new Exception("Test start failure"); + } + }; + AtomicBoolean context2Started = new AtomicBoolean(false); + ContextHandler context2 = new ContextHandler("/two") + { + @Override + protected void doStart() throws Exception + { + context2Started.set(true); + } + }; + contexts.setHandlers(new Handler[]{context0, context1, context2}); + + try + { + server.start(); + fail(); + } + catch (Exception e) + { + assertThat(e.getMessage(), is("Test start failure")); + } + + assertTrue(server.getContainedBeans(LifeCycle.class).stream().noneMatch(LifeCycle::isRunning)); + assertTrue(server.getContainedBeans(LifeCycle.class).stream().anyMatch(LifeCycle::isFailed)); + assertTrue(context0Started.get()); + assertFalse(context2Started.get()); + } + + static class NoopHandler extends AbstractHandler + { + final CountDownLatch latch = new CountDownLatch(1); + + NoopHandler() + { + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException + { + baseRequest.setHandled(true); + latch.countDown(); + } + } + + static class ABHandler extends AbstractHandler + { + final CountDownLatch latchA = new CountDownLatch(1); + final CountDownLatch latchB = new CountDownLatch(1); + + @Override + public void handle(String s, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.setContentLength(2); + response.getOutputStream().write("a".getBytes()); + try + { + latchA.countDown(); + latchB.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + response.flushBuffer(); + response.getOutputStream().write("b".getBytes()); + } + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/MultiException.java b/jetty-util/src/main/java/org/eclipse/jetty/util/MultiException.java index cb62c556e78..8dc3db0d2c4 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/MultiException.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/MultiException.java @@ -60,7 +60,7 @@ public class MultiException extends Exception for (Throwable t : nested) { - if (t != this) + if (t != this && t != getCause()) addSuppressed(t); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/AbstractLifeCycle.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/AbstractLifeCycle.java index 7e4fd7dd2ba..39dca100a4a 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/AbstractLifeCycle.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/AbstractLifeCycle.java @@ -55,7 +55,6 @@ public abstract class AbstractLifeCycle implements LifeCycle private final List _eventListener = new CopyOnWriteArrayList<>(); private final Object _lock = new Object(); private volatile State _state = State.STOPPED; - private long _stopTimeout = 30000; /** * Method to override to start the lifecycle @@ -303,17 +302,6 @@ public abstract class AbstractLifeCycle implements LifeCycle } } - @ManagedAttribute(value = "The stop timeout in milliseconds") - public long getStopTimeout() - { - return _stopTimeout; - } - - public void setStopTimeout(long stopTimeout) - { - this._stopTimeout = stopTimeout; - } - public abstract static class AbstractLifeCycleListener implements LifeCycle.Listener { @Override diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java index 3ce0f842d55..82c5691f8be 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java @@ -549,11 +549,6 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container, } } } - - if (bean._bean instanceof AbstractLifeCycle) - { - ((AbstractLifeCycle)bean._bean).setStopTimeout(getStopTimeout()); - } } } @@ -701,17 +696,6 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container, return false; } - @Override - public void setStopTimeout(long stopTimeout) - { - super.setStopTimeout(stopTimeout); - for (Bean bean : _beans) - { - if (bean.isManaged() && bean._bean instanceof AbstractLifeCycle) - ((AbstractLifeCycle)bean._bean).setStopTimeout(stopTimeout); - } - } - /** * Dumps to {@link System#err}. * diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Graceful.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Graceful.java index 2f73f18aef0..c02a18eba6f 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Graceful.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Graceful.java @@ -18,11 +18,14 @@ package org.eclipse.jetty.util.component; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.FutureCallback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; /** *

Jetty components that wish to be part of a Graceful shutdown implement this interface so that @@ -34,7 +37,8 @@ import org.eclipse.jetty.util.FutureCallback; *

  • Waiting for existing load to complete (eg waiting for active request count to reduce to 0)
  • *
  • Performing cleanup operations that may take time (eg closing an SSL connection)
  • * - *

    The {@link Future} returned by the the shutdown call will be completed to indicate the shutdown operation is completed. + *

    The {@link CompletableFuture} returned by the the shutdown call will be completed to indicate the + * shutdown operation is completed. * Some shutdown operations may be instantaneous and always return a completed future. *

    * Graceful shutdown is typically orchestrated by the doStop methods of Server or ContextHandler (for a full or partial @@ -43,47 +47,96 @@ import org.eclipse.jetty.util.FutureCallback; */ public interface Graceful { - Future shutdown(); + /** + * Shutdown the component. When this method returns, the component should not accept any new load. + * @return A future that is completed once all load on the component is completed + */ + CompletableFuture shutdown(); + /** + * @return True if {@link #shutdown()} has been called. + */ boolean isShutdown(); /** - * A utility Graceful that uses a {@link FutureCallback} to indicate if shutdown is completed. - * By default the {@link FutureCallback} is returned as already completed, but the {@link #newShutdownCallback()} method - * can be overloaded to return a non-completed callback that will require a {@link Callback#succeeded()} or - * {@link Callback#failed(Throwable)} call to be completed. + * A utility class to assist implementing the Graceful interface. + * The {@link #isShutdownDone()} method should be implemented to check if the {@link CompletableFuture} + * returned by {@link #shutdown()} should be completed or not. The {@link #check()} + * method should be called when any state is changed which may complete the shutdown. */ - class Shutdown implements Graceful + abstract class Shutdown implements Graceful { - private final AtomicReference _shutdown = new AtomicReference<>(); + final Object _component; + final AtomicReference> _done = new AtomicReference<>(); - protected FutureCallback newShutdownCallback() + protected Shutdown(Object component) { - return FutureCallback.SUCCEEDED; + _component = component; } @Override - public Future shutdown() + public CompletableFuture shutdown() { - return _shutdown.updateAndGet(fcb -> fcb == null ? newShutdownCallback() : fcb); + if (_done.get() == null) + _done.compareAndSet(null, new CompletableFuture() + { + @Override + public String toString() + { + return String.format("Shutdown<%s>@%x", _component, hashCode()); + } + }); + CompletableFuture done = _done.get(); + check(); + return done; } @Override public boolean isShutdown() { - return _shutdown.get() != null; + return _done.get() != null; } - public void cancel() + /** + * This method should be called whenever the components state has been updated. + * If {@link #shutdown()} has been called, then {@link #isShutdownDone()} is called + * by this method and if it returns true then the {@link Future} returned by + * {@link #shutdown()} is completed. + */ + public void check() { - FutureCallback shutdown = _shutdown.getAndSet(null); - if (shutdown != null && !shutdown.isDone()) - shutdown.cancel(true); + CompletableFuture done = _done.get(); + if (done != null && isShutdownDone()) + done.complete(null); } - public FutureCallback get() - { - return _shutdown.get(); - } + /** + * @return True if the component is shutdown and has no remaining load. + */ + public abstract boolean isShutdownDone(); + } + + /** + * Utility method to shutdown all Gracefuls within a container. + * @param component The container in which to look for {@link Graceful}s + * @return A {@link CompletableFuture } that is complete once all returns from {@link Graceful#shutdown()} + * of the contained {@link Graceful}s are complete. + */ + static CompletableFuture shutdown(Container component) + { + Logger log = Log.getLogger(component.getClass()); + + log.info("Shutdown {}", component); + + // tell the graceful handlers that we are shutting down + List gracefuls = new ArrayList<>(); + if (component instanceof Graceful) + gracefuls.add((Graceful)component); + gracefuls.addAll(component.getContainedBeans(Graceful.class)); + + if (log.isDebugEnabled()) + gracefuls.forEach(g -> log.debug("graceful {}", g)); + + return CompletableFuture.allOf(gracefuls.stream().map(Graceful::shutdown).toArray(CompletableFuture[]::new)); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java index e8a9a571f34..dd337439a3a 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java @@ -302,7 +302,7 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool @Override public void join() throws InterruptedException { - _executor.awaitTermination(getStopTimeout(), TimeUnit.MILLISECONDS); + _executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } @Override diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 4e042dbd108..07e2fbcafbc 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -40,7 +40,6 @@ import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.DumpableCollection; -import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; @@ -80,6 +79,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor private boolean _detailedDump = false; private int _lowThreadsThreshold = 1; private ThreadPoolBudget _budget; + private long _stopTimeout; public QueuedThreadPool() { @@ -159,6 +159,16 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor _budget = budget; } + public void setStopTimeout(long stopTimeout) + { + _stopTimeout = stopTimeout; + } + + public long getStopTimeout() + { + return _stopTimeout; + } + @Override protected void doStart() throws Exception { @@ -555,7 +565,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor } /** - * Blocks until the thread pool is {@link LifeCycle#stop stopped}. + * Blocks until the thread pool is {@link org.eclipse.jetty.util.component.LifeCycle} stopped. */ @Override public void join() throws InterruptedException @@ -675,10 +685,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor int threads = AtomicBiInteger.getHi(encoded); int idle = AtomicBiInteger.getLo(encoded); if (threads == Integer.MIN_VALUE) // This is a marker that the pool is stopped. - return false; - long update = AtomicBiInteger.encode(threads + deltaThreads, idle + deltaIdle); - if (_counts.compareAndSet(encoded, update)) - return true; + { + long update = AtomicBiInteger.encode(threads, idle + deltaIdle); + if (_counts.compareAndSet(encoded, update)) + return false; + } + else + { + long update = AtomicBiInteger.encode(threads + deltaThreads, idle + deltaIdle); + if (_counts.compareAndSet(encoded, update)) + return true; + } } } @@ -880,10 +897,10 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor // If we had a job, if (job != null) { + idle = true; // signal that we are idle again if (!addCounts(0, 1)) break; - idle = true; } // else check we are still running else if (_counts.getHi() == Integer.MIN_VALUE) diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java index 7135da0079c..63a86bf5dd7 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java @@ -244,7 +244,6 @@ public class ReservedThreadExecutorTest public void stressTest() throws Exception { QueuedThreadPool pool = new QueuedThreadPool(20); - pool.setStopTimeout(10000); pool.start(); ReservedThreadExecutor reserved = new ReservedThreadExecutor(pool, 10); reserved.setIdleTimeout(0, null); diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/FailedSelectorTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/FailedSelectorTest.java index 51842aab22d..f44460d2f26 100644 --- a/tests/test-integration/src/test/java/org/eclipse/jetty/test/FailedSelectorTest.java +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/FailedSelectorTest.java @@ -89,7 +89,6 @@ public class FailedSelectorTest HttpClientTransport transport = new HttpClientTransportOverHTTP(1); QueuedThreadPool qtp = new QueuedThreadPool(); qtp.setName("Client"); - qtp.setStopTimeout(1000); client = new HttpClient(transport); client.setExecutor(qtp);