diff --git a/jetty-server/src/main/config/etc/jetty-stats.xml b/jetty-server/src/main/config/etc/jetty-stats.xml index aade7341986..fa00a88899c 100644 --- a/jetty-server/src/main/config/etc/jetty-stats.xml +++ b/jetty-server/src/main/config/etc/jetty-stats.xml @@ -5,7 +5,9 @@ - + + + diff --git a/jetty-server/src/main/config/modules/stats.mod b/jetty-server/src/main/config/modules/stats.mod index 8bf06451c8f..a6289c4174b 100644 --- a/jetty-server/src/main/config/modules/stats.mod +++ b/jetty-server/src/main/config/modules/stats.mod @@ -15,3 +15,8 @@ etc/jetty-stats.xml [ini] jetty.webapp.addServerClasses+=,-org.eclipse.jetty.servlet.StatisticsServlet + +[ini-template] + +## If the Graceful shutdown should wait for async requests as well as the currently dispatched ones. +# jetty.statistics.gracefulShutdownWaitsForRequests=true diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/DebugHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/DebugHandler.java index 2f91bb174d6..fc110b0f6a2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/DebugHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/DebugHandler.java @@ -61,7 +61,6 @@ public class DebugHandler extends HandlerWrapper implements Connection.Listener final Thread thread = Thread.currentThread(); final String old_name = thread.getName(); - boolean suspend = false; boolean retry = false; String name = (String)request.getAttribute("org.eclipse.jetty.thread.name"); if (name == null) @@ -103,11 +102,10 @@ public class DebugHandler extends HandlerWrapper implements Connection.Listener finally { thread.setName(old_name); - suspend = baseRequest.getHttpChannelState().isSuspended(); - if (suspend) + if (baseRequest.getHttpChannelState().isAsyncStarted()) { request.setAttribute("org.eclipse.jetty.thread.name", name); - print(name, "SUSPEND"); + print(name, "ASYNC"); } else print(name, "RESPONSE " + base_response.getStatus() + (ex == null ? "" : ("/" + ex)) + " " + base_response.getContentType()); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java index b918a95ab7a..96589551295 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java @@ -20,7 +20,6 @@ package org.eclipse.jetty.server.handler; import java.io.IOException; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import javax.servlet.AsyncEvent; @@ -59,6 +58,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful private final LongAdder _asyncDispatches = new LongAdder(); private final LongAdder _expires = new LongAdder(); + private final LongAdder _errors = new LongAdder(); private final LongAdder _responses1xx = new LongAdder(); private final LongAdder _responses2xx = new LongAdder(); @@ -67,6 +67,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful private final LongAdder _responses5xx = new LongAdder(); private final LongAdder _responsesTotalBytes = new LongAdder(); + private boolean _gracefulShutdownWaitsForRequests = true; + private final Graceful.Shutdown _shutdown = new Graceful.Shutdown() { @Override @@ -76,44 +78,42 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful } }; - private final AtomicBoolean _wrapWarning = new AtomicBoolean(); - private final AsyncListener _onCompletion = new AsyncListener() { @Override - public void onTimeout(AsyncEvent event) throws IOException - { - _expires.increment(); - } - - @Override - public void onStartAsync(AsyncEvent event) throws IOException + public void onStartAsync(AsyncEvent event) { event.getAsyncContext().addListener(this); } @Override - public void onError(AsyncEvent event) throws IOException + public void onTimeout(AsyncEvent event) { + _expires.increment(); } @Override - public void onComplete(AsyncEvent event) throws IOException + public void onError(AsyncEvent event) + { + _errors.increment(); + } + + @Override + public void onComplete(AsyncEvent event) { HttpChannelState state = ((AsyncContextEvent)event).getHttpChannelState(); Request request = state.getBaseRequest(); final long elapsed = System.currentTimeMillis() - request.getTimeStamp(); - long d = _requestStats.decrement(); + long numRequests = _requestStats.decrement(); _requestTimeStats.record(elapsed); updateResponse(request); _asyncWaitStats.decrement(); - // If we have no more dispatches, should we signal shutdown? - if (d == 0) + if (numRequests == 0 && _gracefulShutdownWaitsForRequests) { FutureCallback shutdown = _shutdown.get(); if (shutdown != null) @@ -149,6 +149,14 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful @Override public void handle(String path, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + Handler handler = getHandler(); + if (handler == null || !isStarted() || isShutdown()) + { + if (!baseRequest.getResponse().isCommitted()) + response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503); + return; + } + _dispatchedStats.increment(); final long start; @@ -168,51 +176,39 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful try { - Handler handler = getHandler(); - if (handler != null && !_shutdown.isShutdown() && isStarted()) - handler.handle(path, baseRequest, request, response); - else - { - if (!baseRequest.isHandled()) - baseRequest.setHandled(true); - else if (_wrapWarning.compareAndSet(false, true)) - LOG.warn("Bad statistics configuration. Latencies will be incorrect in {}", this); - if (!baseRequest.getResponse().isCommitted()) - response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503); - } + handler.handle(path, baseRequest, request, response); } finally { final long now = System.currentTimeMillis(); final long dispatched = now - start; - _dispatchedStats.decrement(); + long numRequests = -1; + long numDispatches = _dispatchedStats.decrement(); _dispatchedTimeStats.record(dispatched); - if (state.isSuspended()) + if (state.isInitial()) { - if (state.isInitial()) + if (state.isAsyncStarted()) { state.addListener(_onCompletion); _asyncWaitStats.increment(); } - } - else if (state.isInitial()) - { - long d = _requestStats.decrement(); - _requestTimeStats.record(dispatched); - updateResponse(baseRequest); - - // If we have no more dispatches, should we signal shutdown? - FutureCallback shutdown = _shutdown.get(); - if (shutdown != null) + else { - response.flushBuffer(); - if (d == 0) - shutdown.succeeded(); + numRequests = _requestStats.decrement(); + _requestTimeStats.record(dispatched); + updateResponse(baseRequest); } } - // else onCompletion will handle it. + + FutureCallback shutdown = _shutdown.get(); + if (shutdown != null) + { + response.flushBuffer(); + if (_gracefulShutdownWaitsForRequests ? (numRequests == 0) : (numDispatches == 0)) + shutdown.succeeded(); + } } } @@ -251,6 +247,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful @Override protected void doStart() throws Exception { + if (getHandler() == null) + throw new IllegalStateException("StatisticsHandler has no Wrapped Handler"); _shutdown.cancel(); super.doStart(); statsReset(); @@ -263,6 +261,29 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful super.doStop(); } + /** + * Set whether the graceful shutdown should wait for all requests to complete including + * async requests which are not currently dispatched, or whether it should only wait for all the + * actively dispatched requests to complete. + * @param gracefulShutdownWaitsForRequests true to wait for async requests on graceful shutdown. + */ + public void setGracefulShutdownWaitsForRequests(boolean gracefulShutdownWaitsForRequests) + { + _gracefulShutdownWaitsForRequests = gracefulShutdownWaitsForRequests; + } + + /** + * @return whether the graceful shutdown will wait for all requests to complete including + * async requests which are not currently dispatched, or whether it will only wait for all the + * actively dispatched requests to complete. + * @see #getAsyncDispatches() + */ + @ManagedAttribute("if graceful shutdown will wait for all requests") + public boolean getGracefulShutdownWaitsForRequests() + { + return _gracefulShutdownWaitsForRequests; + } + /** * @return the number of requests handled by this handler * since {@link #statsReset()} was last called, excluding @@ -467,6 +488,16 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful return _expires.intValue(); } + /** + * @return the number of async errors that occurred. + * @see #getAsyncDispatches() + */ + @ManagedAttribute("number of async errors that occurred") + public int getErrors() + { + return _errors.intValue(); + } + /** * @return the number of responses with a 1xx status returned by this context * since {@link #statsReset()} was last called. diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java index 6cd9a8b7548..902c15a7228 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java @@ -330,8 +330,9 @@ public class GracefulStopTest assertThat(response, containsString(" 200 OK")); assertThat(response, containsString("read 10/10")); - assertThat(stats.getRequests(), is(2)); - assertThat(stats.getResponses5xx(), is(1)); + // The StatisticsHandler was shutdown when it received the second request so does not contribute to the stats. + assertThat(stats.getRequests(), is(1)); + assertThat(stats.getResponses4xx(), is(0)); } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java index 8216ec21551..7ce2b5b0718 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java @@ -19,10 +19,12 @@ package org.eclipse.jetty.server.handler; import java.io.IOException; +import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; @@ -45,6 +47,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class StatisticsHandlerTest @@ -363,6 +366,173 @@ public class StatisticsHandlerTest assertTrue(_statsHandler.getDispatchedTimeMax() + dispatchTime <= _statsHandler.getDispatchedTimeTotal()); } + @Test + public void asyncDispatchTest() throws Exception + { + final AtomicReference asyncHolder = new AtomicReference<>(); + final CyclicBarrier[] barrier = {new CyclicBarrier(2), new CyclicBarrier(2), new CyclicBarrier(2), new CyclicBarrier(2)}; + _statsHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException + { + request.setHandled(true); + try + { + if (asyncHolder.get() == null) + { + barrier[0].await(); + barrier[1].await(); + AsyncContext asyncContext = request.startAsync(); + asyncHolder.set(asyncContext); + asyncContext.dispatch(); + } + else + { + barrier[2].await(); + barrier[3].await(); + } + } + catch (Exception x) + { + throw new ServletException(x); + } + } + }); + _server.start(); + + String request = "GET / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "\r\n"; + _connector.executeRequest(request); + + // Before we have started async we have one active request. + barrier[0].await(); + assertEquals(1, _statistics.getConnections()); + assertEquals(1, _statsHandler.getRequests()); + assertEquals(1, _statsHandler.getRequestsActive()); + assertEquals(1, _statsHandler.getDispatched()); + assertEquals(1, _statsHandler.getDispatchedActive()); + barrier[1].await(); + + // After we are async the same request should still be active even though we have async dispatched. + barrier[2].await(); + assertEquals(1, _statistics.getConnections()); + assertEquals(1, _statsHandler.getRequests()); + assertEquals(1, _statsHandler.getRequestsActive()); + assertEquals(2, _statsHandler.getDispatched()); + assertEquals(1, _statsHandler.getDispatchedActive()); + barrier[3].await(); + } + + @Test + public void waitForSuspendedRequestTest() throws Exception + { + CyclicBarrier barrier = new CyclicBarrier(3); + final AtomicReference asyncHolder = new AtomicReference<>(); + final CountDownLatch dispatched = new CountDownLatch(1); + _statsHandler.setGracefulShutdownWaitsForRequests(true); + _statsHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException + { + request.setHandled(true); + + try + { + if (path.contains("async")) + { + asyncHolder.set(request.startAsync()); + barrier.await(); + } + else + { + barrier.await(); + dispatched.await(); + } + } + catch (Exception e) + { + throw new ServletException(e); + } + } + }); + _server.start(); + + // One request to block while dispatched other will go async. + _connector.executeRequest("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n"); + _connector.executeRequest("GET /async HTTP/1.1\r\nHost: localhost\r\n\r\n"); + + // Ensure the requests have been dispatched and async started. + barrier.await(); + AsyncContext asyncContext = Objects.requireNonNull(asyncHolder.get()); + + // Shutdown should timeout as there are two active requests. + Future shutdown = _statsHandler.shutdown(); + assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS)); + + // When the dispatched thread exits we should still be waiting on the async request. + dispatched.countDown(); + assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS)); + + // Shutdown should complete only now the AsyncContext is completed. + asyncContext.complete(); + shutdown.get(5, TimeUnit.MILLISECONDS); + } + + @Test + public void doNotWaitForSuspendedRequestTest() throws Exception + { + CyclicBarrier barrier = new CyclicBarrier(3); + final AtomicReference asyncHolder = new AtomicReference<>(); + final CountDownLatch dispatched = new CountDownLatch(1); + _statsHandler.setGracefulShutdownWaitsForRequests(false); + _statsHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException + { + request.setHandled(true); + + try + { + if (path.contains("async")) + { + asyncHolder.set(request.startAsync()); + barrier.await(); + } + else + { + barrier.await(); + dispatched.await(); + } + } + catch (Exception e) + { + throw new ServletException(e); + } + } + }); + _server.start(); + + // One request to block while dispatched other will go async. + _connector.executeRequest("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n"); + _connector.executeRequest("GET /async HTTP/1.1\r\nHost: localhost\r\n\r\n"); + + // Ensure the requests have been dispatched and async started. + barrier.await(); + assertNotNull(asyncHolder.get()); + + // Shutdown should timeout as there is a request dispatched. + Future shutdown = _statsHandler.shutdown(); + assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS)); + + // When the dispatched thread exits we should shutdown even though we have a waiting async request. + dispatched.countDown(); + shutdown.get(5, TimeUnit.MILLISECONDS); + } + @Test public void testSuspendExpire() throws Exception {