Issue #4321 Refactored Graceful shutdown (#4482)

* Issue #4321 Refactored Graceful shutdown

removed stopTimeout from all abstractLifeCycles.  It is on Graceful.LifeCycle, which is only implemented by components that can start a graceful shutdown (eg Server, ContextHandler and QueuedThreadPool)

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

cleanup after review

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

reinstate other stop tests (more work to do).

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

Fixes for stop test by improving LocalConnector shutdown handling

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

Removed broken test on LocalConnector that is already tested in GracefulStopTest

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

Fixed all stop tests

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

fixed checkstyle

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

No stopTimeout JMX attribute

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

Dump stopTimeout
test with default stopTimeout

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

USe sendError for 503

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

minor cleanups

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

Simplifications after review

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

after review

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4321 Refactored Graceful shutdown

after review

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2020-01-30 17:05:03 +01:00 committed by GitHub
parent 7592d36c4b
commit 5aaec6e23f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1071 additions and 965 deletions

View File

@ -366,7 +366,6 @@ public class XmlConfiguredJetty
assertEquals(1, serverCount, "Server load count");
this._server = foundServer;
this._server.setStopTimeout(10);
}
public void removeWebapp(String name)

View File

@ -95,7 +95,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
Executor executor = selectorManager.getExecutor();
_strategy = new EatWhatYouKill(producer, executor);
addBean(_strategy, true);
setStopTimeout(5000);
}
public Selector getSelector()

View File

@ -123,7 +123,7 @@ public class ObjectMBeanTest
assertEquals("com.acme.Derived", derivedInfo.getClassName(), "name does not match");
assertEquals("Test the mbean stuff", derivedInfo.getDescription(), "description does not match");
assertEquals(6, derivedInfo.getAttributes().length, "attribute count does not match");
assertEquals(5, derivedInfo.getAttributes().length, "attribute count does not match");
assertEquals("Full Name", derivedMBean.getAttribute("fname"), "attribute values does not match");
derivedMBean.setAttribute(new Attribute("fname", "Fuller Name"));

View File

@ -61,7 +61,7 @@ public class ClassLoadingTestingServletContextListener
{
}
private void printURLs (URLClassLoader l)
private void printURLs(URLClassLoader l)
{
if (l == null)
return;

View File

@ -44,9 +44,9 @@ public class App implements EntryPoint
* The message displayed to the user when the server cannot be reached or
* returns an error.
*/
private static final String SERVER_ERROR = "An error occurred while "
+ "attempting to contact the server. Please check your network "
+ "connection and try again.";
private static final String SERVER_ERROR = "An error occurred while " +
"attempting to contact the server. Please check your network " +
"connection and try again.";
/**
* Create a remote service proxy to talk to the server-side Greeting service.

View File

@ -20,8 +20,8 @@ package org.eclipse.jetty.maven.plugin;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
@ -121,7 +121,7 @@ public class MavenServerConnector extends ContainerLifeCycle implements Connecto
}
@Override
public Future<Void> shutdown()
public CompletableFuture<Void> shutdown()
{
return checkDelegate().shutdown();
}

View File

@ -30,11 +30,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.io.ArrayByteBufferPool;
@ -154,10 +152,10 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
private final Thread[] _acceptors;
private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
private final Graceful.Shutdown _shutdown = new Graceful.Shutdown();
private Shutdown _shutdown;
private HttpChannel.Listener _httpChannelListeners = HttpChannel.NOOP_LISTENER;
private CountDownLatch _stopping;
private long _idleTimeout = 30000;
private long _shutdownIdleTimeout = 1000L;
private String _defaultProtocol;
private ConnectionFactory _defaultConnectionFactory;
private String _name;
@ -286,6 +284,20 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
public void setIdleTimeout(long idleTimeout)
{
_idleTimeout = idleTimeout;
if (_idleTimeout == 0)
_shutdownIdleTimeout = 0;
else if (_idleTimeout < _shutdownIdleTimeout)
_shutdownIdleTimeout = Math.min(1000L, _idleTimeout);
}
public void setShutdownIdleTimeout(long idle)
{
_shutdownIdleTimeout = idle;
}
public long getShutdownIdleTimeout()
{
return _shutdownIdleTimeout;
}
/**
@ -300,7 +312,21 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
@Override
protected void doStart() throws Exception
{
_shutdown.cancel();
_shutdown = new Graceful.Shutdown(this)
{
@Override
public boolean isShutdownDone()
{
if (!_endpoints.isEmpty())
return false;
for (Thread a : _acceptors)
if (a != null)
return false;
return true;
}
};
if (_defaultProtocol == null)
throw new IllegalStateException("No default protocol for " + this);
@ -319,7 +345,6 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
_lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _acceptors.length);
super.doStart();
_stopping = new CountDownLatch(_acceptors.length);
for (int i = 0; i < _acceptors.length; i++)
{
Acceptor a = new Acceptor(i);
@ -343,15 +368,29 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
}
@Override
public Future<Void> shutdown()
public CompletableFuture<Void> shutdown()
{
return _shutdown.shutdown();
Shutdown shutdown = _shutdown;
if (shutdown == null)
return CompletableFuture.completedFuture(null);
// Signal for the acceptors to stop
CompletableFuture<Void> done = shutdown.shutdown();
interruptAcceptors();
// Reduce the idle timeout of existing connections
for (EndPoint ep : _endpoints)
ep.setIdleTimeout(_shutdownIdleTimeout);
// Return Future that waits for no acceptors and no connections.
return done;
}
@Override
public boolean isShutdown()
{
return _shutdown.isShutdown();
Shutdown shutdown = _shutdown;
return shutdown == null || shutdown.isShutdown();
}
@Override
@ -362,20 +401,11 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
// Tell the acceptors we are stopping
interruptAcceptors();
// If we have a stop timeout
long stopTimeout = getStopTimeout();
CountDownLatch stopping = _stopping;
if (stopTimeout > 0 && stopping != null && getAcceptors() > 0)
stopping.await(stopTimeout, TimeUnit.MILLISECONDS);
_stopping = null;
super.doStop();
for (Acceptor a : getBeans(Acceptor.class))
{
removeBean(a);
}
_shutdown = null;
LOG.info("Stopped {}", this);
}
@ -681,7 +711,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
try
{
while (isRunning())
while (isRunning() && !_shutdown.isShutdown())
{
try (AutoLock lock = _lock.lock())
{
@ -717,9 +747,9 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
{
_acceptors[_id] = null;
}
CountDownLatch stopping = _stopping;
if (stopping != null)
stopping.countDown();
Shutdown shutdown = _shutdown;
if (shutdown != null)
shutdown.check();
}
}
@ -747,6 +777,9 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
protected void onEndPointClosed(EndPoint endp)
{
_endpoints.remove(endp);
Shutdown shutdown = _shutdown;
if (shutdown != null)
shutdown.check();
}
@Override

View File

@ -19,8 +19,8 @@
package org.eclipse.jetty.server;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
@ -99,7 +99,7 @@ public abstract class AbstractNetworkConnector extends AbstractConnector impleme
}
@Override
public Future<Void> shutdown()
public CompletableFuture<Void> shutdown()
{
close();
return super.shutdown();

View File

@ -98,6 +98,8 @@ public class LocalConnector extends AbstractConnector
{
if (!isStarted())
throw new IllegalStateException("!STARTED");
if (isShutdown())
throw new IllegalStateException("Shutdown");
LocalEndPoint endp = new LocalEndPoint();
endp.addInput(rawRequest);
_connects.add(endp);
@ -412,7 +414,7 @@ public class LocalConnector extends AbstractConnector
else
{
chunk = waitForOutput(time, unit);
if (BufferUtil.isEmpty(chunk) && (!isOpen() || isOutputShutdown()))
if (BufferUtil.isEmpty(chunk) && (!isOpen() || isOutputShutdown() || isShutdown()))
{
parser.atEOF();
parser.parseNext(BufferUtil.EMPTY_BUFFER);

View File

@ -27,7 +27,7 @@ import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@ -44,7 +44,6 @@ import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.MultiException;
@ -54,6 +53,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.AttributeContainerMap;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -86,6 +86,7 @@ public class Server extends HandlerWrapper implements Attributes
private boolean _dryRun;
private final AutoLock _dateLock = new AutoLock();
private volatile DateField _dateField;
private long _stopTimeout;
public Server()
{
@ -173,24 +174,21 @@ public class Server extends HandlerWrapper implements Attributes
return Jetty.VERSION;
}
public void setStopTimeout(long stopTimeout)
{
_stopTimeout = stopTimeout;
}
public long getStopTimeout()
{
return _stopTimeout;
}
public boolean getStopAtShutdown()
{
return _stopAtShutdown;
}
/**
* Set a graceful stop time.
* The {@link StatisticsHandler} must be configured so that open connections can
* be tracked for a graceful shutdown.
*
* @see org.eclipse.jetty.util.component.ContainerLifeCycle#setStopTimeout(long)
*/
@Override
public void setStopTimeout(long stopTimeout)
{
super.setStopTimeout(stopTimeout);
}
/**
* Set stop server at shutdown behaviour.
*
@ -466,21 +464,20 @@ public class Server extends HandlerWrapper implements Attributes
MultiException mex = new MultiException();
try
if (getStopTimeout() > 0)
{
// list if graceful futures
List<Future<Void>> futures = new ArrayList<>();
// First shutdown the network connectors to stop accepting new connections
for (Connector connector : _connectors)
long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(getStopTimeout());
try
{
futures.add(connector.shutdown());
Graceful.shutdown(this).get(getStopTimeout(), TimeUnit.MILLISECONDS);
}
// then shutdown all graceful handlers
doShutdown(futures);
}
catch (Throwable e)
{
mex.add(e);
catch (Throwable e)
{
mex.add(e);
}
QueuedThreadPool qtp = getBean(QueuedThreadPool.class);
if (qtp != null)
qtp.setStopTimeout(Math.max(1000L, TimeUnit.NANOSECONDS.toMillis(end - System.nanoTime())));
}
// Now stop the connectors (this will close existing connections)
@ -708,7 +705,7 @@ public class Server extends HandlerWrapper implements Attributes
@Override
public String toString()
{
return String.format("%s[%s]", super.toString(), getVersion());
return String.format("%s[%s,sto=%d]", super.toString(), getVersion(), getStopTimeout());
}
@Override

View File

@ -20,17 +20,11 @@ package org.eclipse.jetty.server.handler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HandlerContainer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.MultiException;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -138,62 +132,4 @@ public abstract class AbstractHandlerContainer extends AbstractHandler implement
h.setServer(server);
}
}
/**
* Shutdown nested Gracefule handlers
*
* @param futures A list of Futures which must also be waited on for the shutdown (or null)
* returns A MultiException to which any failures are added or null
*/
protected void doShutdown(List<Future<Void>> futures) throws MultiException
{
MultiException mex = null;
// tell the graceful handlers that we are shutting down
Handler[] gracefuls = getChildHandlersByClass(Graceful.class);
if (futures == null)
futures = new ArrayList<>(gracefuls.length);
for (Handler graceful : gracefuls)
{
futures.add(((Graceful)graceful).shutdown());
}
// Wait for all futures with a reducing time budget
long stopTimeout = getStopTimeout();
if (stopTimeout > 0)
{
long stopBy = System.currentTimeMillis() + stopTimeout;
if (LOG.isDebugEnabled())
LOG.debug("Graceful shutdown {} by ", this, new Date(stopBy));
// Wait for shutdowns
for (Future<Void> future : futures)
{
try
{
if (!future.isDone())
future.get(Math.max(1L, stopBy - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
}
catch (Exception e)
{
// If the future is also a callback, fail it here (rather than cancel) so we can capture the exception
if (future instanceof Callback && !future.isDone())
((Callback)future).failed(e);
if (mex == null)
mex = new MultiException();
mex.add(e);
}
}
}
// Cancel any shutdowns not done
for (Future<Void> future : futures)
{
if (!future.isDone())
future.cancel(true);
}
if (mex != null)
mex.ifExceptionThrowMulti();
}
}

View File

@ -39,8 +39,8 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import javax.servlet.DispatcherType;
import javax.servlet.Filter;
import javax.servlet.FilterRegistration;
@ -77,7 +77,6 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.AttributesMap;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.Loader;
import org.eclipse.jetty.util.MultiException;
import org.eclipse.jetty.util.StringUtil;
@ -181,19 +180,16 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
private String _defaultResponseCharacterEncoding;
private String _contextPath = "/";
private String _contextPathEncoded = "/";
private String _displayName;
private long _stopTimeout;
private Resource _baseResource;
private MimeTypes _mimeTypes;
private Map<String, String> _localeEncodingMap;
private String[] _welcomeFiles;
private ErrorHandler _errorHandler;
private String[] _vhosts; // Host name portion, matching _vconnectors array
private boolean[] _vhostswildcard;
private String[] _vconnectors; // connector portion, matching _vhosts array
private Logger _logger;
private boolean _allowNullPathInfo;
private int _maxFormKeys = Integer.getInteger(MAX_FORM_KEYS_KEY, DEFAULT_MAX_FORM_KEYS);
@ -709,10 +705,12 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
* requests can complete, but no new requests are accepted.
*/
@Override
public Future<Void> shutdown()
public CompletableFuture<Void> shutdown()
{
_availability = isRunning() ? Availability.SHUTDOWN : Availability.UNAVAILABLE;
return new FutureCallback(true);
CompletableFuture<Void> shutdown = new CompletableFuture<Void>();
shutdown.complete(null);
return shutdown;
}
/**
@ -915,18 +913,6 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
// Should we attempt a graceful shutdown?
MultiException mex = null;
if (getStopTimeout() > 0)
{
try
{
doShutdown(null);
}
catch (MultiException e)
{
mex = e;
}
}
_availability = Availability.UNAVAILABLE;
ClassLoader oldClassloader = null;

View File

@ -19,7 +19,7 @@
package org.eclipse.jetty.server.handler;
import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
@ -35,7 +35,6 @@ import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
@ -50,6 +49,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
{
private static final Logger LOG = Log.getLogger(StatisticsHandler.class);
private final AtomicLong _statsStartedAt = new AtomicLong();
private volatile Shutdown _shutdown;
private final CounterStatistic _requestStats = new CounterStatistic();
private final SampleStatistic _requestTimeStats = new SampleStatistic();
@ -67,15 +67,6 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
private final LongAdder _responses5xx = new LongAdder();
private final LongAdder _responsesTotalBytes = new LongAdder();
private final Graceful.Shutdown _shutdown = new Graceful.Shutdown()
{
@Override
protected FutureCallback newShutdownCallback()
{
return new FutureCallback(_requestStats.getCurrent() == 0);
}
};
private final AtomicBoolean _wrapWarning = new AtomicBoolean();
private final AsyncListener _onCompletion = new AsyncListener()
@ -115,9 +106,9 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
// If we have no more dispatches, should we signal shutdown?
if (d == 0)
{
FutureCallback shutdown = _shutdown.get();
Shutdown shutdown = _shutdown;
if (shutdown != null)
shutdown.succeeded();
shutdown.check();
}
}
};
@ -204,12 +195,12 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
updateResponse(baseRequest);
// If we have no more dispatches, should we signal shutdown?
FutureCallback shutdown = _shutdown.get();
Shutdown shutdown = _shutdown;
if (shutdown != null)
{
response.flushBuffer();
if (d == 0)
shutdown.succeeded();
shutdown.check();
}
}
// else onCompletion will handle it.
@ -251,7 +242,14 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@Override
protected void doStart() throws Exception
{
_shutdown.cancel();
_shutdown = new Shutdown(this)
{
@Override
public boolean isShutdownDone()
{
return _requestStats.getCurrent() == 0;
}
};
super.doStart();
statsReset();
}
@ -259,8 +257,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@Override
protected void doStop() throws Exception
{
_shutdown.cancel();
super.doStop();
_shutdown = null;
}
/**
@ -576,15 +574,19 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
}
@Override
public Future<Void> shutdown()
public CompletableFuture<Void> shutdown()
{
return _shutdown.shutdown();
Shutdown shutdown = _shutdown;
if (shutdown == null)
return CompletableFuture.completedFuture(null);
return shutdown.shutdown();
}
@Override
public boolean isShutdown()
{
return _shutdown.isShutdown();
Shutdown shutdown = _shutdown;
return shutdown == null || shutdown.isShutdown();
}
@Override
@ -592,4 +594,5 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
{
return String.format("%s@%x{%s,r=%d,d=%d}", getClass().getSimpleName(), hashCode(), getState(), _requestStats.getCurrent(), _dispatchedStats.getCurrent());
}
}

View File

@ -0,0 +1,548 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.LocalConnector.LocalEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class StopTest
{
/**
* Test completed writes during shutdown do not close output
*
* @throws Exception on test failure
*/
@Test
public void testWriteDuringShutdown() throws Exception
{
Server server = new Server();
server.setStopTimeout(1000);
ServerConnector connector = new ServerConnector(server);
connector.setPort(0);
server.addConnector(connector);
ABHandler handler = new ABHandler();
StatisticsHandler stats = new StatisticsHandler();
server.setHandler(stats);
stats.setHandler(handler);
server.start();
Thread stopper = new Thread(() ->
{
try
{
handler.latchA.await();
server.stop();
}
catch (Exception e)
{
e.printStackTrace();
}
});
stopper.start();
final int port = connector.getLocalPort();
try (Socket client = new Socket("127.0.0.1", port))
{
client.getOutputStream().write((
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + port + "\r\n" +
"\r\n"
).getBytes());
client.getOutputStream().flush();
while (!connector.isShutdown())
{
Thread.sleep(10);
}
handler.latchB.countDown();
String response = IO.toString(client.getInputStream());
assertThat(response, startsWith("HTTP/1.1 200 "));
assertThat(response, containsString("Content-Length: 2"));
assertThat(response, containsString("Connection: close"));
assertThat(response, endsWith("ab"));
}
stopper.join();
}
public void testSlowClose(long stopTimeout, long closeWait, Matcher<Long> stopTimeMatcher) throws Exception
{
Server server = new Server();
server.setStopTimeout(stopTimeout);
FutureCallback closed = new FutureCallback();
ServerConnector connector = new ServerConnector(server, 2, 2, new HttpConnectionFactory()
{
@Override
public Connection newConnection(Connector con, EndPoint endPoint)
{
// Slow closing connection
HttpConnection conn = new HttpConnection(getHttpConfiguration(), con, endPoint, isRecordHttpComplianceViolations())
{
@Override
public void onClose(Throwable cause)
{
try
{
super.onClose(cause);
}
finally
{
if (cause == null)
closed.succeeded();
else
closed.failed(cause);
}
}
@Override
public void close()
{
long start = System.nanoTime();
new Thread(() ->
{
try
{
Thread.sleep(closeWait - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
catch (Throwable e)
{
// no op
}
finally
{
super.close();
}
}).start();
}
};
return configure(conn, con, endPoint);
}
});
connector.setPort(0);
server.addConnector(connector);
NoopHandler handler = new NoopHandler();
server.setHandler(handler);
server.start();
final int port = connector.getLocalPort();
Socket client = new Socket("127.0.0.1", port);
client.setSoTimeout(10000);
client.getOutputStream().write((
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + port + "\r\n" +
"Content-Type: plain/text\r\n" +
"\r\n"
).getBytes());
client.getOutputStream().flush();
handler.latch.await();
// look for a response
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream(), StandardCharsets.ISO_8859_1));
while (true)
{
String line = in.readLine();
assertThat("Line should not be null", line, is(notNullValue()));
if (line.length() == 0)
break;
}
long start = System.nanoTime();
try
{
server.stop();
assertTrue(stopTimeout == 0 || stopTimeout > closeWait);
}
catch (Exception e)
{
assertTrue(stopTimeout > 0 && stopTimeout < closeWait);
}
long stop = System.nanoTime();
// Check stop time was correct
assertThat(TimeUnit.NANOSECONDS.toMillis(stop - start), stopTimeMatcher);
// Connection closed
while (true)
{
int r = client.getInputStream().read();
if (r == -1)
break;
assertThat(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start), lessThan(10L));
}
// onClose Thread interrupted or completed
closed.get(Math.max(closeWait, stopTimeout) + 1000, TimeUnit.MILLISECONDS);
if (!client.isClosed())
client.close();
}
/**
* Test of non graceful stop when a connection close is slow
*
* @throws Exception on test failure
*/
@Test
public void testSlowCloseNotGraceful() throws Exception
{
Log.getLogger(QueuedThreadPool.class).info("Expect some threads can't be stopped");
testSlowClose(0, 5000, lessThan(750L));
}
/**
* Test of graceful stop when close is slower than timeout
*
* @throws Exception on test failure
*/
@Test
public void testSlowCloseTinyGraceful() throws Exception
{
Log.getLogger(QueuedThreadPool.class).info("Expect some threads can't be stopped");
testSlowClose(1, 5000, lessThan(1500L));
}
/**
* Test of graceful stop when close is faster than timeout;
*
* @throws Exception on test failure
*/
@Test
public void testSlowCloseGraceful() throws Exception
{
testSlowClose(5000, 1000, Matchers.allOf(greaterThan(750L), lessThan(4999L)));
}
@Test
public void testCommittedResponsesAreClosed() throws Exception
{
Server server = new Server();
LocalConnector connector = new LocalConnector(server);
server.addConnector(connector);
StatisticsHandler stats = new StatisticsHandler();
server.setHandler(stats);
ContextHandler context = new ContextHandler(stats, "/");
Exchanger<Void> exchanger0 = new Exchanger<>();
Exchanger<Void> exchanger1 = new Exchanger<>();
context.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException
{
try
{
exchanger0.exchange(null);
exchanger1.exchange(null);
}
catch (Throwable x)
{
throw new ServletException(x);
}
baseRequest.setHandled(true);
response.setStatus(200);
response.getWriter().println("The Response");
response.getWriter().close();
}
});
server.setStopTimeout(1000);
server.start();
LocalEndPoint endp = connector.executeRequest(
"GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n"
);
exchanger0.exchange(null);
exchanger1.exchange(null);
String response = endp.getResponse();
assertThat(response, containsString("200 OK"));
assertThat(response, Matchers.not(containsString("Connection: close")));
endp.addInputAndExecute(BufferUtil.toBuffer("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n"));
exchanger0.exchange(null);
FutureCallback stopped = new FutureCallback();
new Thread(() ->
{
try
{
server.stop();
stopped.succeeded();
}
catch (Throwable e)
{
stopped.failed(e);
}
}).start();
long start = System.nanoTime();
while (!connector.isShutdown())
{
assertThat(TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start), lessThan(10L));
Thread.sleep(10);
}
// Check new connections rejected!
assertThrows(IllegalStateException.class, () -> connector.getResponse("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n"));
// Check completed 200 has close
exchanger1.exchange(null);
response = endp.getResponse();
assertThat(response, containsString("200 OK"));
assertThat(response, Matchers.containsString("Connection: close"));
stopped.get(10, TimeUnit.SECONDS);
}
@Test
public void testContextStop() throws Exception
{
Server server = new Server();
LocalConnector connector = new LocalConnector(server);
server.addConnector(connector);
ContextHandler context = new ContextHandler(server, "/");
StatisticsHandler stats = new StatisticsHandler();
context.setHandler(stats);
Exchanger<Void> exchanger0 = new Exchanger<>();
Exchanger<Void> exchanger1 = new Exchanger<>();
stats.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException
{
try
{
exchanger0.exchange(null);
exchanger1.exchange(null);
}
catch (Throwable x)
{
throw new ServletException(x);
}
baseRequest.setHandled(true);
response.setStatus(200);
response.getWriter().println("The Response");
response.getWriter().close();
}
});
server.start();
LocalEndPoint endp = connector.executeRequest(
"GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n"
);
exchanger0.exchange(null);
exchanger1.exchange(null);
String response = endp.getResponse();
assertThat(response, containsString("200 OK"));
assertThat(response, Matchers.not(containsString("Connection: close")));
endp.addInputAndExecute(BufferUtil.toBuffer("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n"));
exchanger0.exchange(null);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() ->
{
try
{
context.stop();
latch.countDown();
}
catch (Exception e)
{
e.printStackTrace();
}
}).start();
while (context.isStarted())
{
Thread.sleep(10);
}
// Check new connections accepted, but don't find context!
String unavailable = connector.getResponse("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n");
assertThat(unavailable, containsString(" 404 Not Found"));
// Check completed 200 does not have close
exchanger1.exchange(null);
response = endp.getResponse();
assertThat(response, containsString("200 OK"));
assertThat(response, Matchers.not(Matchers.containsString("Connection: close")));
assertTrue(latch.await(10, TimeUnit.SECONDS));
}
@Test
public void testFailedStart()
{
Server server = new Server();
LocalConnector connector = new LocalConnector(server);
server.addConnector(connector);
ContextHandlerCollection contexts = new ContextHandlerCollection();
server.setHandler(contexts);
AtomicBoolean context0Started = new AtomicBoolean(false);
ContextHandler context0 = new ContextHandler("/zero")
{
@Override
protected void doStart() throws Exception
{
context0Started.set(true);
}
};
ContextHandler context1 = new ContextHandler("/one")
{
@Override
protected void doStart() throws Exception
{
throw new Exception("Test start failure");
}
};
AtomicBoolean context2Started = new AtomicBoolean(false);
ContextHandler context2 = new ContextHandler("/two")
{
@Override
protected void doStart() throws Exception
{
context2Started.set(true);
}
};
contexts.setHandlers(new Handler[]{context0, context1, context2});
try
{
server.start();
fail();
}
catch (Exception e)
{
assertThat(e.getMessage(), is("Test start failure"));
}
assertTrue(server.getContainedBeans(LifeCycle.class).stream().noneMatch(LifeCycle::isRunning));
assertTrue(server.getContainedBeans(LifeCycle.class).stream().anyMatch(LifeCycle::isFailed));
assertTrue(context0Started.get());
assertFalse(context2Started.get());
}
static class NoopHandler extends AbstractHandler
{
final CountDownLatch latch = new CountDownLatch(1);
NoopHandler()
{
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException
{
baseRequest.setHandled(true);
latch.countDown();
}
}
static class ABHandler extends AbstractHandler
{
final CountDownLatch latchA = new CountDownLatch(1);
final CountDownLatch latchB = new CountDownLatch(1);
@Override
public void handle(String s, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setContentLength(2);
response.getOutputStream().write("a".getBytes());
try
{
latchA.countDown();
latchB.await();
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
response.flushBuffer();
response.getOutputStream().write("b".getBytes());
}
}
}

View File

@ -60,7 +60,7 @@ public class MultiException extends Exception
for (Throwable t : nested)
{
if (t != this)
if (t != this && t != getCause())
addSuppressed(t);
}
}

View File

@ -55,7 +55,6 @@ public abstract class AbstractLifeCycle implements LifeCycle
private final List<EventListener> _eventListener = new CopyOnWriteArrayList<>();
private final Object _lock = new Object();
private volatile State _state = State.STOPPED;
private long _stopTimeout = 30000;
/**
* Method to override to start the lifecycle
@ -303,17 +302,6 @@ public abstract class AbstractLifeCycle implements LifeCycle
}
}
@ManagedAttribute(value = "The stop timeout in milliseconds")
public long getStopTimeout()
{
return _stopTimeout;
}
public void setStopTimeout(long stopTimeout)
{
this._stopTimeout = stopTimeout;
}
public abstract static class AbstractLifeCycleListener implements LifeCycle.Listener
{
@Override

View File

@ -549,11 +549,6 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
}
}
}
if (bean._bean instanceof AbstractLifeCycle)
{
((AbstractLifeCycle)bean._bean).setStopTimeout(getStopTimeout());
}
}
}
@ -701,17 +696,6 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
return false;
}
@Override
public void setStopTimeout(long stopTimeout)
{
super.setStopTimeout(stopTimeout);
for (Bean bean : _beans)
{
if (bean.isManaged() && bean._bean instanceof AbstractLifeCycle)
((AbstractLifeCycle)bean._bean).setStopTimeout(stopTimeout);
}
}
/**
* Dumps to {@link System#err}.
*

View File

@ -18,11 +18,14 @@
package org.eclipse.jetty.util.component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>Jetty components that wish to be part of a Graceful shutdown implement this interface so that
@ -34,7 +37,8 @@ import org.eclipse.jetty.util.FutureCallback;
* <li>Waiting for existing load to complete (eg waiting for active request count to reduce to 0)</li>
* <li>Performing cleanup operations that may take time (eg closing an SSL connection)</li>
* </ul>
* <p>The {@link Future} returned by the the shutdown call will be completed to indicate the shutdown operation is completed.
* <p>The {@link CompletableFuture} returned by the the shutdown call will be completed to indicate the
* shutdown operation is completed.
* Some shutdown operations may be instantaneous and always return a completed future.
* </p><p>
* Graceful shutdown is typically orchestrated by the doStop methods of Server or ContextHandler (for a full or partial
@ -43,47 +47,96 @@ import org.eclipse.jetty.util.FutureCallback;
*/
public interface Graceful
{
Future<Void> shutdown();
/**
* Shutdown the component. When this method returns, the component should not accept any new load.
* @return A future that is completed once all load on the component is completed
*/
CompletableFuture<Void> shutdown();
/**
* @return True if {@link #shutdown()} has been called.
*/
boolean isShutdown();
/**
* A utility Graceful that uses a {@link FutureCallback} to indicate if shutdown is completed.
* By default the {@link FutureCallback} is returned as already completed, but the {@link #newShutdownCallback()} method
* can be overloaded to return a non-completed callback that will require a {@link Callback#succeeded()} or
* {@link Callback#failed(Throwable)} call to be completed.
* A utility class to assist implementing the Graceful interface.
* The {@link #isShutdownDone()} method should be implemented to check if the {@link CompletableFuture}
* returned by {@link #shutdown()} should be completed or not. The {@link #check()}
* method should be called when any state is changed which may complete the shutdown.
*/
class Shutdown implements Graceful
abstract class Shutdown implements Graceful
{
private final AtomicReference<FutureCallback> _shutdown = new AtomicReference<>();
final Object _component;
final AtomicReference<CompletableFuture<Void>> _done = new AtomicReference<>();
protected FutureCallback newShutdownCallback()
protected Shutdown(Object component)
{
return FutureCallback.SUCCEEDED;
_component = component;
}
@Override
public Future<Void> shutdown()
public CompletableFuture<Void> shutdown()
{
return _shutdown.updateAndGet(fcb -> fcb == null ? newShutdownCallback() : fcb);
if (_done.get() == null)
_done.compareAndSet(null, new CompletableFuture<Void>()
{
@Override
public String toString()
{
return String.format("Shutdown<%s>@%x", _component, hashCode());
}
});
CompletableFuture<Void> done = _done.get();
check();
return done;
}
@Override
public boolean isShutdown()
{
return _shutdown.get() != null;
return _done.get() != null;
}
public void cancel()
/**
* This method should be called whenever the components state has been updated.
* If {@link #shutdown()} has been called, then {@link #isShutdownDone()} is called
* by this method and if it returns true then the {@link Future} returned by
* {@link #shutdown()} is completed.
*/
public void check()
{
FutureCallback shutdown = _shutdown.getAndSet(null);
if (shutdown != null && !shutdown.isDone())
shutdown.cancel(true);
CompletableFuture<Void> done = _done.get();
if (done != null && isShutdownDone())
done.complete(null);
}
public FutureCallback get()
{
return _shutdown.get();
}
/**
* @return True if the component is shutdown and has no remaining load.
*/
public abstract boolean isShutdownDone();
}
/**
* Utility method to shutdown all Gracefuls within a container.
* @param component The container in which to look for {@link Graceful}s
* @return A {@link CompletableFuture } that is complete once all returns from {@link Graceful#shutdown()}
* of the contained {@link Graceful}s are complete.
*/
static CompletableFuture<Void> shutdown(Container component)
{
Logger log = Log.getLogger(component.getClass());
log.info("Shutdown {}", component);
// tell the graceful handlers that we are shutting down
List<Graceful> gracefuls = new ArrayList<>();
if (component instanceof Graceful)
gracefuls.add((Graceful)component);
gracefuls.addAll(component.getContainedBeans(Graceful.class));
if (log.isDebugEnabled())
gracefuls.forEach(g -> log.debug("graceful {}", g));
return CompletableFuture.allOf(gracefuls.stream().map(Graceful::shutdown).toArray(CompletableFuture[]::new));
}
}

View File

@ -302,7 +302,7 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool
@Override
public void join() throws InterruptedException
{
_executor.awaitTermination(getStopTimeout(), TimeUnit.MILLISECONDS);
_executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
@Override

View File

@ -40,7 +40,6 @@ import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
@ -80,6 +79,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
private boolean _detailedDump = false;
private int _lowThreadsThreshold = 1;
private ThreadPoolBudget _budget;
private long _stopTimeout;
public QueuedThreadPool()
{
@ -159,6 +159,16 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
_budget = budget;
}
public void setStopTimeout(long stopTimeout)
{
_stopTimeout = stopTimeout;
}
public long getStopTimeout()
{
return _stopTimeout;
}
@Override
protected void doStart() throws Exception
{
@ -555,7 +565,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
/**
* Blocks until the thread pool is {@link LifeCycle#stop stopped}.
* Blocks until the thread pool is {@link org.eclipse.jetty.util.component.LifeCycle} stopped.
*/
@Override
public void join() throws InterruptedException
@ -675,10 +685,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
int threads = AtomicBiInteger.getHi(encoded);
int idle = AtomicBiInteger.getLo(encoded);
if (threads == Integer.MIN_VALUE) // This is a marker that the pool is stopped.
return false;
long update = AtomicBiInteger.encode(threads + deltaThreads, idle + deltaIdle);
if (_counts.compareAndSet(encoded, update))
return true;
{
long update = AtomicBiInteger.encode(threads, idle + deltaIdle);
if (_counts.compareAndSet(encoded, update))
return false;
}
else
{
long update = AtomicBiInteger.encode(threads + deltaThreads, idle + deltaIdle);
if (_counts.compareAndSet(encoded, update))
return true;
}
}
}
@ -880,10 +897,10 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
// If we had a job,
if (job != null)
{
idle = true;
// signal that we are idle again
if (!addCounts(0, 1))
break;
idle = true;
}
// else check we are still running
else if (_counts.getHi() == Integer.MIN_VALUE)

View File

@ -244,7 +244,6 @@ public class ReservedThreadExecutorTest
public void stressTest() throws Exception
{
QueuedThreadPool pool = new QueuedThreadPool(20);
pool.setStopTimeout(10000);
pool.start();
ReservedThreadExecutor reserved = new ReservedThreadExecutor(pool, 10);
reserved.setIdleTimeout(0, null);

View File

@ -89,7 +89,6 @@ public class FailedSelectorTest
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
QueuedThreadPool qtp = new QueuedThreadPool();
qtp.setName("Client");
qtp.setStopTimeout(1000);
client = new HttpClient(transport);
client.setExecutor(qtp);