Merge pull request #5175 from eclipse/jetty-9.4.x-5105-StatisticsHandler

Issue #5105 - StatisticsHandler Graceful Shutdown of Async Requests
This commit is contained in:
Lachlan 2020-08-28 11:54:45 +10:00 committed by GitHub
commit 001def4905
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 258 additions and 51 deletions

View File

@ -5,7 +5,9 @@
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="insertHandler">
<Arg>
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler"></New>
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler">
<Set name="gracefulShutdownWaitsForRequests"><Property name="jetty.statistics.gracefulShutdownWaitsForRequests" default="true"/></Set>
</New>
</Arg>
</Call>
<Call name="addBeanToAllConnectors">

View File

@ -15,3 +15,8 @@ etc/jetty-stats.xml
[ini]
jetty.webapp.addServerClasses+=,-org.eclipse.jetty.servlet.StatisticsServlet
[ini-template]
## If the Graceful shutdown should wait for async requests as well as the currently dispatched ones.
# jetty.statistics.gracefulShutdownWaitsForRequests=true

View File

@ -61,7 +61,6 @@ public class DebugHandler extends HandlerWrapper implements Connection.Listener
final Thread thread = Thread.currentThread();
final String old_name = thread.getName();
boolean suspend = false;
boolean retry = false;
String name = (String)request.getAttribute("org.eclipse.jetty.thread.name");
if (name == null)
@ -103,11 +102,10 @@ public class DebugHandler extends HandlerWrapper implements Connection.Listener
finally
{
thread.setName(old_name);
suspend = baseRequest.getHttpChannelState().isSuspended();
if (suspend)
if (baseRequest.getHttpChannelState().isAsyncStarted())
{
request.setAttribute("org.eclipse.jetty.thread.name", name);
print(name, "SUSPEND");
print(name, "ASYNC");
}
else
print(name, "RESPONSE " + base_response.getStatus() + (ex == null ? "" : ("/" + ex)) + " " + base_response.getContentType());

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.server.handler;
import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.AsyncEvent;
@ -59,6 +58,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
private final LongAdder _asyncDispatches = new LongAdder();
private final LongAdder _expires = new LongAdder();
private final LongAdder _errors = new LongAdder();
private final LongAdder _responses1xx = new LongAdder();
private final LongAdder _responses2xx = new LongAdder();
@ -67,6 +67,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
private final LongAdder _responses5xx = new LongAdder();
private final LongAdder _responsesTotalBytes = new LongAdder();
private boolean _gracefulShutdownWaitsForRequests = true;
private final Graceful.Shutdown _shutdown = new Graceful.Shutdown()
{
@Override
@ -76,44 +78,42 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
}
};
private final AtomicBoolean _wrapWarning = new AtomicBoolean();
private final AsyncListener _onCompletion = new AsyncListener()
{
@Override
public void onTimeout(AsyncEvent event) throws IOException
{
_expires.increment();
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException
public void onStartAsync(AsyncEvent event)
{
event.getAsyncContext().addListener(this);
}
@Override
public void onError(AsyncEvent event) throws IOException
public void onTimeout(AsyncEvent event)
{
_expires.increment();
}
@Override
public void onComplete(AsyncEvent event) throws IOException
public void onError(AsyncEvent event)
{
_errors.increment();
}
@Override
public void onComplete(AsyncEvent event)
{
HttpChannelState state = ((AsyncContextEvent)event).getHttpChannelState();
Request request = state.getBaseRequest();
final long elapsed = System.currentTimeMillis() - request.getTimeStamp();
long d = _requestStats.decrement();
long numRequests = _requestStats.decrement();
_requestTimeStats.record(elapsed);
updateResponse(request);
_asyncWaitStats.decrement();
// If we have no more dispatches, should we signal shutdown?
if (d == 0)
if (numRequests == 0 && _gracefulShutdownWaitsForRequests)
{
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
@ -149,6 +149,14 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@Override
public void handle(String path, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
Handler handler = getHandler();
if (handler == null || !isStarted() || isShutdown())
{
if (!baseRequest.getResponse().isCommitted())
response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503);
return;
}
_dispatchedStats.increment();
final long start;
@ -168,51 +176,39 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
try
{
Handler handler = getHandler();
if (handler != null && !_shutdown.isShutdown() && isStarted())
handler.handle(path, baseRequest, request, response);
else
{
if (!baseRequest.isHandled())
baseRequest.setHandled(true);
else if (_wrapWarning.compareAndSet(false, true))
LOG.warn("Bad statistics configuration. Latencies will be incorrect in {}", this);
if (!baseRequest.getResponse().isCommitted())
response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503);
}
handler.handle(path, baseRequest, request, response);
}
finally
{
final long now = System.currentTimeMillis();
final long dispatched = now - start;
_dispatchedStats.decrement();
long numRequests = -1;
long numDispatches = _dispatchedStats.decrement();
_dispatchedTimeStats.record(dispatched);
if (state.isSuspended())
if (state.isInitial())
{
if (state.isInitial())
if (state.isAsyncStarted())
{
state.addListener(_onCompletion);
_asyncWaitStats.increment();
}
}
else if (state.isInitial())
{
long d = _requestStats.decrement();
_requestTimeStats.record(dispatched);
updateResponse(baseRequest);
// If we have no more dispatches, should we signal shutdown?
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
else
{
response.flushBuffer();
if (d == 0)
shutdown.succeeded();
numRequests = _requestStats.decrement();
_requestTimeStats.record(dispatched);
updateResponse(baseRequest);
}
}
// else onCompletion will handle it.
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
{
response.flushBuffer();
if (_gracefulShutdownWaitsForRequests ? (numRequests == 0) : (numDispatches == 0))
shutdown.succeeded();
}
}
}
@ -251,6 +247,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
@Override
protected void doStart() throws Exception
{
if (getHandler() == null)
throw new IllegalStateException("StatisticsHandler has no Wrapped Handler");
_shutdown.cancel();
super.doStart();
statsReset();
@ -263,6 +261,29 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
super.doStop();
}
/**
* Set whether the graceful shutdown should wait for all requests to complete including
* async requests which are not currently dispatched, or whether it should only wait for all the
* actively dispatched requests to complete.
* @param gracefulShutdownWaitsForRequests true to wait for async requests on graceful shutdown.
*/
public void setGracefulShutdownWaitsForRequests(boolean gracefulShutdownWaitsForRequests)
{
_gracefulShutdownWaitsForRequests = gracefulShutdownWaitsForRequests;
}
/**
* @return whether the graceful shutdown will wait for all requests to complete including
* async requests which are not currently dispatched, or whether it will only wait for all the
* actively dispatched requests to complete.
* @see #getAsyncDispatches()
*/
@ManagedAttribute("if graceful shutdown will wait for all requests")
public boolean getGracefulShutdownWaitsForRequests()
{
return _gracefulShutdownWaitsForRequests;
}
/**
* @return the number of requests handled by this handler
* since {@link #statsReset()} was last called, excluding
@ -467,6 +488,16 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
return _expires.intValue();
}
/**
* @return the number of async errors that occurred.
* @see #getAsyncDispatches()
*/
@ManagedAttribute("number of async errors that occurred")
public int getErrors()
{
return _errors.intValue();
}
/**
* @return the number of responses with a 1xx status returned by this context
* since {@link #statsReset()} was last called.

View File

@ -330,8 +330,9 @@ public class GracefulStopTest
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("read 10/10"));
assertThat(stats.getRequests(), is(2));
assertThat(stats.getResponses5xx(), is(1));
// The StatisticsHandler was shutdown when it received the second request so does not contribute to the stats.
assertThat(stats.getRequests(), is(1));
assertThat(stats.getResponses4xx(), is(0));
}
}

View File

@ -19,10 +19,12 @@
package org.eclipse.jetty.server.handler;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
@ -45,6 +47,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class StatisticsHandlerTest
@ -363,6 +366,173 @@ public class StatisticsHandlerTest
assertTrue(_statsHandler.getDispatchedTimeMax() + dispatchTime <= _statsHandler.getDispatchedTimeTotal());
}
@Test
public void asyncDispatchTest() throws Exception
{
final AtomicReference<AsyncContext> asyncHolder = new AtomicReference<>();
final CyclicBarrier[] barrier = {new CyclicBarrier(2), new CyclicBarrier(2), new CyclicBarrier(2), new CyclicBarrier(2)};
_statsHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException
{
request.setHandled(true);
try
{
if (asyncHolder.get() == null)
{
barrier[0].await();
barrier[1].await();
AsyncContext asyncContext = request.startAsync();
asyncHolder.set(asyncContext);
asyncContext.dispatch();
}
else
{
barrier[2].await();
barrier[3].await();
}
}
catch (Exception x)
{
throw new ServletException(x);
}
}
});
_server.start();
String request = "GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n";
_connector.executeRequest(request);
// Before we have started async we have one active request.
barrier[0].await();
assertEquals(1, _statistics.getConnections());
assertEquals(1, _statsHandler.getRequests());
assertEquals(1, _statsHandler.getRequestsActive());
assertEquals(1, _statsHandler.getDispatched());
assertEquals(1, _statsHandler.getDispatchedActive());
barrier[1].await();
// After we are async the same request should still be active even though we have async dispatched.
barrier[2].await();
assertEquals(1, _statistics.getConnections());
assertEquals(1, _statsHandler.getRequests());
assertEquals(1, _statsHandler.getRequestsActive());
assertEquals(2, _statsHandler.getDispatched());
assertEquals(1, _statsHandler.getDispatchedActive());
barrier[3].await();
}
@Test
public void waitForSuspendedRequestTest() throws Exception
{
CyclicBarrier barrier = new CyclicBarrier(3);
final AtomicReference<AsyncContext> asyncHolder = new AtomicReference<>();
final CountDownLatch dispatched = new CountDownLatch(1);
_statsHandler.setGracefulShutdownWaitsForRequests(true);
_statsHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException
{
request.setHandled(true);
try
{
if (path.contains("async"))
{
asyncHolder.set(request.startAsync());
barrier.await();
}
else
{
barrier.await();
dispatched.await();
}
}
catch (Exception e)
{
throw new ServletException(e);
}
}
});
_server.start();
// One request to block while dispatched other will go async.
_connector.executeRequest("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n");
_connector.executeRequest("GET /async HTTP/1.1\r\nHost: localhost\r\n\r\n");
// Ensure the requests have been dispatched and async started.
barrier.await();
AsyncContext asyncContext = Objects.requireNonNull(asyncHolder.get());
// Shutdown should timeout as there are two active requests.
Future<Void> shutdown = _statsHandler.shutdown();
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));
// When the dispatched thread exits we should still be waiting on the async request.
dispatched.countDown();
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));
// Shutdown should complete only now the AsyncContext is completed.
asyncContext.complete();
shutdown.get(5, TimeUnit.MILLISECONDS);
}
@Test
public void doNotWaitForSuspendedRequestTest() throws Exception
{
CyclicBarrier barrier = new CyclicBarrier(3);
final AtomicReference<AsyncContext> asyncHolder = new AtomicReference<>();
final CountDownLatch dispatched = new CountDownLatch(1);
_statsHandler.setGracefulShutdownWaitsForRequests(false);
_statsHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException
{
request.setHandled(true);
try
{
if (path.contains("async"))
{
asyncHolder.set(request.startAsync());
barrier.await();
}
else
{
barrier.await();
dispatched.await();
}
}
catch (Exception e)
{
throw new ServletException(e);
}
}
});
_server.start();
// One request to block while dispatched other will go async.
_connector.executeRequest("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n");
_connector.executeRequest("GET /async HTTP/1.1\r\nHost: localhost\r\n\r\n");
// Ensure the requests have been dispatched and async started.
barrier.await();
assertNotNull(asyncHolder.get());
// Shutdown should timeout as there is a request dispatched.
Future<Void> shutdown = _statsHandler.shutdown();
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));
// When the dispatched thread exits we should shutdown even though we have a waiting async request.
dispatched.countDown();
shutdown.get(5, TimeUnit.MILLISECONDS);
}
@Test
public void testSuspendExpire() throws Exception
{