Issue #5105 - add optional configuration to not wait for suspended requests in StatisticsHandler
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
32358b1b77
commit
a65f00156d
|
@ -5,7 +5,11 @@
|
|||
<Configure id="Server" class="org.eclipse.jetty.server.Server">
|
||||
<Call name="insertHandler">
|
||||
<Arg>
|
||||
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler"></New>
|
||||
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler">
|
||||
<Set name="waitForSuspendedRequestsOnShutdown">
|
||||
<Property name="jetty.statistics.waitForSuspendedRequestsOnShutdown" default="true"/>
|
||||
</Set>
|
||||
</New>
|
||||
</Arg>
|
||||
</Call>
|
||||
<Call class="org.eclipse.jetty.server.ServerConnectionStatistics" name="addToAllConnectors">
|
||||
|
|
|
@ -15,3 +15,8 @@ etc/jetty-stats.xml
|
|||
|
||||
[ini]
|
||||
jetty.webapp.addServerClasses+=,-org.eclipse.jetty.servlet.StatisticsServlet
|
||||
|
||||
[ini-template]
|
||||
|
||||
## If the Graceful shutdown should wait for suspended requests as well as dispatched ones.
|
||||
# jetty.statistics.waitForSuspendedRequestsOnShutdown=true
|
||||
|
|
|
@ -66,6 +66,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
|
|||
private final LongAdder _responses5xx = new LongAdder();
|
||||
private final LongAdder _responsesTotalBytes = new LongAdder();
|
||||
|
||||
private boolean waitForSuspendedRequestsOnShutdown = true;
|
||||
|
||||
private final Graceful.Shutdown _shutdown = new Graceful.Shutdown()
|
||||
{
|
||||
@Override
|
||||
|
@ -98,13 +100,12 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
|
|||
@Override
|
||||
public void onComplete(AsyncEvent event) throws IOException
|
||||
{
|
||||
System.err.println("On Async Complete for " + event);
|
||||
HttpChannelState state = ((AsyncContextEvent)event).getHttpChannelState();
|
||||
|
||||
Request request = state.getBaseRequest();
|
||||
final long elapsed = System.currentTimeMillis() - request.getTimeStamp();
|
||||
|
||||
long d = _requestStats.decrement();
|
||||
long numRequests = _requestStats.decrement();
|
||||
_requestTimeStats.record(elapsed);
|
||||
|
||||
updateResponse(request);
|
||||
|
@ -112,7 +113,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
|
|||
_asyncWaitStats.decrement();
|
||||
|
||||
// If we have no more dispatches, should we signal shutdown?
|
||||
if (d == 0)
|
||||
if (numRequests == 0 && waitForSuspendedRequestsOnShutdown)
|
||||
{
|
||||
FutureCallback shutdown = _shutdown.get();
|
||||
if (shutdown != null)
|
||||
|
@ -178,8 +179,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
|
|||
final long now = System.currentTimeMillis();
|
||||
final long dispatched = now - start;
|
||||
|
||||
// TODO: make dispatchedStats optional metric for shutdown
|
||||
_dispatchedStats.decrement();
|
||||
long numRequests = -1;
|
||||
long numDispatches = _dispatchedStats.decrement();
|
||||
_dispatchedTimeStats.record(dispatched);
|
||||
|
||||
if (state.isInitial())
|
||||
|
@ -191,20 +192,21 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
|
|||
}
|
||||
else
|
||||
{
|
||||
long d = _requestStats.decrement();
|
||||
numRequests = _requestStats.decrement();
|
||||
_requestTimeStats.record(dispatched);
|
||||
updateResponse(baseRequest);
|
||||
|
||||
// If we have no more dispatches, should we signal shutdown?
|
||||
FutureCallback shutdown = _shutdown.get();
|
||||
if (shutdown != null)
|
||||
{
|
||||
response.flushBuffer();
|
||||
if (d == 0)
|
||||
shutdown.succeeded();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FutureCallback shutdown = _shutdown.get();
|
||||
if (shutdown != null)
|
||||
{
|
||||
response.flushBuffer();
|
||||
|
||||
// If we either have no more requests or dispatches, we can complete shutdown.
|
||||
if (waitForSuspendedRequestsOnShutdown ? (numRequests == 0) : (numDispatches == 0))
|
||||
shutdown.succeeded();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -257,6 +259,16 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
|
|||
super.doStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set whether the graceful shutdown should wait for all requests to complete (including suspended requests)
|
||||
* or whether it should only wait for all the actively dispatched requests to complete.
|
||||
* @param waitForSuspendedRequests true to wait for suspended requests on graceful shutdown.
|
||||
*/
|
||||
public void waitForSuspendedRequestsOnShutdown(boolean waitForSuspendedRequests)
|
||||
{
|
||||
this.waitForSuspendedRequestsOnShutdown = waitForSuspendedRequests;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of requests handled by this handler
|
||||
* since {@link #statsReset()} was last called, excluding
|
||||
|
|
|
@ -19,10 +19,12 @@
|
|||
package org.eclipse.jetty.server.handler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.AsyncEvent;
|
||||
|
@ -45,6 +47,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class StatisticsHandlerTest
|
||||
|
@ -422,6 +425,114 @@ public class StatisticsHandlerTest
|
|||
barrier[3].await();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void waitForSuspendedRequestTest() throws Exception
|
||||
{
|
||||
CyclicBarrier barrier = new CyclicBarrier(3);
|
||||
final AtomicReference<AsyncContext> asyncHolder = new AtomicReference<>();
|
||||
final CountDownLatch dispatched = new CountDownLatch(1);
|
||||
_statsHandler.waitForSuspendedRequestsOnShutdown(true);
|
||||
_statsHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException
|
||||
{
|
||||
request.setHandled(true);
|
||||
|
||||
try
|
||||
{
|
||||
if (path.contains("async"))
|
||||
{
|
||||
asyncHolder.set(request.startAsync());
|
||||
barrier.await();
|
||||
}
|
||||
else
|
||||
{
|
||||
barrier.await();
|
||||
dispatched.await();
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new ServletException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
_server.start();
|
||||
|
||||
// One request to block while dispatched other will go async.
|
||||
_connector.executeRequest("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
_connector.executeRequest("GET /async HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
|
||||
// Ensure the requests have been dispatched and async started.
|
||||
barrier.await();
|
||||
AsyncContext asyncContext = Objects.requireNonNull(asyncHolder.get());
|
||||
|
||||
// Shutdown should timeout as there are two active requests.
|
||||
Future<Void> shutdown = _statsHandler.shutdown();
|
||||
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));
|
||||
|
||||
// When the dispatched thread exits we should still be waiting on the async request.
|
||||
dispatched.countDown();
|
||||
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));
|
||||
|
||||
// Shutdown should complete only now the AsyncContext is completed.
|
||||
asyncContext.complete();
|
||||
shutdown.get(5, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void doNotWaitForSuspendedRequestTest() throws Exception
|
||||
{
|
||||
CyclicBarrier barrier = new CyclicBarrier(3);
|
||||
final AtomicReference<AsyncContext> asyncHolder = new AtomicReference<>();
|
||||
final CountDownLatch dispatched = new CountDownLatch(1);
|
||||
_statsHandler.waitForSuspendedRequestsOnShutdown(false);
|
||||
_statsHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException
|
||||
{
|
||||
request.setHandled(true);
|
||||
|
||||
try
|
||||
{
|
||||
if (path.contains("async"))
|
||||
{
|
||||
asyncHolder.set(request.startAsync());
|
||||
barrier.await();
|
||||
}
|
||||
else
|
||||
{
|
||||
barrier.await();
|
||||
dispatched.await();
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new ServletException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
_server.start();
|
||||
|
||||
// One request to block while dispatched other will go async.
|
||||
_connector.executeRequest("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
_connector.executeRequest("GET /async HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
|
||||
// Ensure the requests have been dispatched and async started.
|
||||
barrier.await();
|
||||
assertNotNull(asyncHolder.get());
|
||||
|
||||
// Shutdown should timeout as there is a request dispatched.
|
||||
Future<Void> shutdown = _statsHandler.shutdown();
|
||||
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));
|
||||
|
||||
// When the dispatched thread exits we should shutdown even though we have a waiting async request.
|
||||
dispatched.countDown();
|
||||
shutdown.get(5, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendExpire() throws Exception
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue