diff --git a/jetty-http/src/test/java/org/eclipse/jetty/http/QuotedQualityCSVTest.java b/jetty-http/src/test/java/org/eclipse/jetty/http/QuotedQualityCSVTest.java index 32b909bf5e4..80b4d957450 100644 --- a/jetty-http/src/test/java/org/eclipse/jetty/http/QuotedQualityCSVTest.java +++ b/jetty-http/src/test/java/org/eclipse/jetty/http/QuotedQualityCSVTest.java @@ -18,13 +18,16 @@ package org.eclipse.jetty.http; -import static org.hamcrest.Matchers.contains; -import static org.junit.Assert.assertThat; +import java.util.ArrayList; +import java.util.List; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertThat; + public class QuotedQualityCSVTest { @@ -309,5 +312,46 @@ public class QuotedQualityCSVTest values.addValue("one,two;,three;x=y"); Assert.assertThat(values.getValues(),Matchers.contains("one","two","three;x=y")); } - + + + @Test + public void testQuality() + { + List results = new ArrayList<>(); + + QuotedQualityCSV values = new QuotedQualityCSV() + { + @Override + protected void parsedValue(StringBuffer buffer) + { + results.add("parsedValue: " + buffer.toString()); + + super.parsedValue(buffer); + } + + @Override + protected void parsedParam(StringBuffer buffer, int valueLength, int paramName, int paramValue) + { + String param = buffer.substring(paramName, buffer.length()); + results.add("parsedParam: " + param); + + super.parsedParam(buffer, valueLength, paramName, paramValue); + } + }; + + + // The provided string is not legal according to some RFCs ( not a token because of = and not a parameter because not preceded by ; ) + // The string is legal according to RFC7239 which allows for just parameters (called forwarded-pairs) + values.addValue("p=0.5,q=0.5"); + + + // The QuotedCSV implementation is lenient and adopts the later interpretation and thus sees q=0.5 and p=0.5 both as parameters + assertThat(results,contains("parsedValue: ", "parsedParam: p=0.5", + "parsedValue: ", "parsedParam: q=0.5")); + + + // However the QuotedQualityCSV only handles the q parameter and that is consumed from the parameter string. + assertThat(values,contains("p=0.5", "")); + + } } diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 9875d602526..361ef5bcfe1 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -66,6 +66,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory Generator generator = new Generator(byteBufferPool); FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy(); HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl); + Parser parser = new Parser(byteBufferPool, session, 4096, 8192); HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java index 90906b3ef74..59996074445 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java @@ -20,7 +20,10 @@ package org.eclipse.jetty.http2.client.http; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; import java.util.Map; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicMarkableReference; import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory; import org.eclipse.jetty.client.AbstractHttpClientTransport; @@ -135,7 +138,12 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport if (HttpScheme.HTTPS.is(destination.getScheme())) sslContextFactory = httpClient.getSslContextFactory(); - client.connect(sslContextFactory, address, listenerPromise, listenerPromise, context); + connect(sslContextFactory, address, listenerPromise, listenerPromise, context); + } + + protected void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise promise, Map context) + { + getHTTP2Client().connect(sslContextFactory, address, listener, promise, context); } @Override @@ -164,8 +172,8 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport private class SessionListenerPromise extends Session.Listener.Adapter implements Promise { + private final AtomicMarkableReference connection = new AtomicMarkableReference<>(null, false); private final Map context; - private HttpConnectionOverHTTP2 connection; private SessionListenerPromise(Map context) { @@ -175,14 +183,15 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport @Override public void succeeded(Session session) { - connection = newHttpConnection(destination(), session); - promise().succeeded(connection); + // This method is invoked when the client preface + // is sent, but we want to succeed the nested + // promise when the server preface is received. } @Override public void failed(Throwable failure) { - promise().failed(failure); + failConnectionPromise(failure); } private HttpDestinationOverHTTP2 destination() @@ -191,7 +200,7 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport } @SuppressWarnings("unchecked") - private Promise promise() + private Promise connectionPromise() { return (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY); } @@ -202,26 +211,55 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport Map settings = frame.getSettings(); if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS)) destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS)); + if (!connection.isMarked()) + onServerPreface(session); + } + + private void onServerPreface(Session session) + { + HttpConnectionOverHTTP2 connection = newHttpConnection(destination(), session); + if (this.connection.compareAndSet(null, connection, false, true)) + connectionPromise().succeeded(connection); } @Override public void onClose(Session session, GoAwayFrame frame) { - HttpClientTransportOverHTTP2.this.onClose(connection, frame); + if (failConnectionPromise(new ClosedChannelException())) + return; + HttpConnectionOverHTTP2 connection = this.connection.getReference(); + if (connection != null) + HttpClientTransportOverHTTP2.this.onClose(connection, frame); } @Override public boolean onIdleTimeout(Session session) { - return connection.onIdleTimeout(((HTTP2Session)session).getEndPoint().getIdleTimeout()); + long idleTimeout = ((HTTP2Session)session).getEndPoint().getIdleTimeout(); + if (failConnectionPromise(new TimeoutException("Idle timeout expired: " + idleTimeout + " ms"))) + return true; + HttpConnectionOverHTTP2 connection = this.connection.getReference(); + if (connection != null) + return connection.onIdleTimeout(idleTimeout); + return true; } @Override public void onFailure(Session session, Throwable failure) { - HttpConnectionOverHTTP2 c = connection; - if (c != null) - c.close(failure); + if (failConnectionPromise(failure)) + return; + HttpConnectionOverHTTP2 connection = this.connection.getReference(); + if (connection != null) + connection.close(failure); + } + + private boolean failConnectionPromise(Throwable failure) + { + boolean result = connection.compareAndSet(null, null, false, true); + if (result) + connectionPromise().failed(failure); + return result; } } } diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java index c5f6890654f..e668fc20056 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java @@ -477,22 +477,33 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest ServerParser parser = new ServerParser(byteBufferPool, new ServerParser.Listener.Adapter() { @Override - public void onHeaders(HeadersFrame request) + public void onPreface() { // Server's preface. generator.control(lease, new SettingsFrame(new HashMap<>(), false)); // Reply to client's SETTINGS. generator.control(lease, new SettingsFrame(new HashMap<>(), true)); + writeFrames(); + } + + @Override + public void onHeaders(HeadersFrame request) + { // Response. MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); HeadersFrame response = new HeadersFrame(request.getStreamId(), metaData, null, true); generator.control(lease, response); + writeFrames(); + } + private void writeFrames() + { try { // Write the frames. for (ByteBuffer buffer : lease.getByteBuffers()) output.write(BufferUtil.toArray(buffer)); + lease.recycle(); } catch (Throwable x) { diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java index f3ce3a830ba..3bf5ebc6483 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java @@ -18,33 +18,64 @@ package org.eclipse.jetty.http2.client.http; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.AbstractConnectionPool; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpDestination; +import org.eclipse.jetty.client.HttpResponseException; import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.frames.GoAwayFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.PingFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.Assert; import org.junit.Test; public class MaxConcurrentStreamsTest extends AbstractTest { private void start(int maxConcurrentStreams, Handler handler) throws Exception + { + startServer(maxConcurrentStreams, handler); + prepareClient(); + client.start(); + } + + private void startServer(int maxConcurrentStreams, Handler handler) throws Exception { HTTP2ServerConnectionFactory http2 = new HTTP2ServerConnectionFactory(new HttpConfiguration()); http2.setMaxConcurrentStreams(maxConcurrentStreams); prepareServer(http2); server.setHandler(handler); server.start(); - prepareClient(); - client.start(); } @Test @@ -114,6 +145,85 @@ public class MaxConcurrentStreamsTest extends AbstractTest ); } + @Test + public void testSmallMaxConcurrentStreamsExceededOnClient() throws Exception + { + int maxConcurrentStreams = 1; + startServer(maxConcurrentStreams, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) + { + sleep(1000); + } + }); + + String scheme = "http"; + String host = "localhost"; + int port = connector.getLocalPort(); + + AtomicInteger connections = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(1); + List failures = new ArrayList<>(); + client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client()) + { + @Override + protected void connect(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise promise, Map context) + { + super.connect(sslContextFactory, address, new Wrapper(listener) + { + @Override + public void onSettings(Session session, SettingsFrame frame) + { + // Send another request to simulate a request being + // sent concurrently with connection establishment. + // Sending this request will trigger the creation of + // another connection since maxConcurrentStream=1. + if (connections.incrementAndGet() == 1) + { + client.newRequest(host, port) + .path("/2") + .send(result -> + { + if (result.isSucceeded()) + { + Response response2 = result.getResponse(); + if (response2.getStatus() == HttpStatus.OK_200) + latch.countDown(); + else + failures.add(new HttpResponseException("", response2)); + } + else + { + failures.add(result.getFailure()); + } + }); + } + super.onSettings(session, frame); + } + }, promise, context); + } + }, null); + QueuedThreadPool clientExecutor = new QueuedThreadPool(); + clientExecutor.setName("client"); + client.setExecutor(clientExecutor); + client.start(); + + // This request will be queued and establish the connection, + // which will trigger the send of the second request. + ContentResponse response1 = client.newRequest(host, port) + .path("/1") + .timeout(5, TimeUnit.SECONDS) + .send(); + + Assert.assertEquals(HttpStatus.OK_200, response1.getStatus()); + Assert.assertTrue(failures.toString(), latch.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(2, connections.get()); + HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port); + AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool(); + Assert.assertEquals(2, connectionPool.getConnectionCount()); + } + @Test public void testTwoConcurrentStreamsThirdWaits() throws Exception { @@ -214,6 +324,49 @@ public class MaxConcurrentStreamsTest extends AbstractTest Assert.assertTrue(latch.await(maxConcurrent * sleep / 2, TimeUnit.MILLISECONDS)); } + @Test + public void testManyConcurrentRequestsWithSmallConcurrentStreams() throws Exception + { + byte[] data = new byte[64 * 1024]; + start(1, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.getOutputStream().write(data); + } + }); + + client.setMaxConnectionsPerDestination(32768); + client.setMaxRequestsQueuedPerDestination(1024 * 1024); + + int parallelism = 16; + int runs = 1; + int iterations = 256; + int total = parallelism * runs * iterations; + CountDownLatch latch = new CountDownLatch(total); + Queue failures = new ConcurrentLinkedQueue<>(); + ForkJoinPool pool = new ForkJoinPool(parallelism); + pool.submit(() -> IntStream.range(0, parallelism).parallel().forEach(i -> + IntStream.range(0, runs).forEach(j -> + { + for (int k = 0; k < iterations; ++k) + { + client.newRequest("localhost", connector.getLocalPort()) + .path("/" + i + "_" + j + "_" + k) + .send(result -> + { + if (result.isFailed()) + failures.offer(result); + latch.countDown(); + }); + } + }))); + + Assert.assertTrue(latch.await(total * 10, TimeUnit.MILLISECONDS)); + Assert.assertTrue(failures.toString(), failures.isEmpty()); + } + private void primeConnection() throws Exception { // Prime the connection so that the maxConcurrentStream setting arrives to the client. @@ -233,4 +386,62 @@ public class MaxConcurrentStreamsTest extends AbstractTest throw new RuntimeException(x); } } + + private static class Wrapper implements Session.Listener + { + private final Session.Listener listener; + + private Wrapper(Session.Listener listener) + { + this.listener = listener; + } + + @Override + public Map onPreface(Session session) + { + return listener.onPreface(session); + } + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + return listener.onNewStream(stream, frame); + } + + @Override + public void onSettings(Session session, SettingsFrame frame) + { + listener.onSettings(session, frame); + } + + @Override + public void onPing(Session session, PingFrame frame) + { + listener.onPing(session, frame); + } + + @Override + public void onReset(Session session, ResetFrame frame) + { + listener.onReset(session, frame); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + listener.onClose(session, frame); + } + + @Override + public boolean onIdleTimeout(Session session) + { + return listener.onIdleTimeout(session); + } + + @Override + public void onFailure(Session session, Throwable failure) + { + listener.onFailure(session, failure); + } + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java index 3bb9f43d34a..38fee0bdc1b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java @@ -584,7 +584,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful FutureCallback shutdown=new FutureCallback(false); _shutdown.compareAndSet(null,shutdown); shutdown=_shutdown.get(); - if (_dispatchedStats.getCurrent()==0) + if (_requestStats.getCurrent()==0) shutdown.succeeded(); return shutdown; } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java index c29f91f85eb..fd593d62316 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.server.handler; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -31,6 +32,7 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.ConnectionStatistics; import org.eclipse.jetty.server.LocalConnector; import org.eclipse.jetty.server.Request; @@ -41,6 +43,7 @@ import org.junit.Test; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -85,7 +88,7 @@ public class StatisticsHandlerTest _statsHandler.setHandler(new AbstractHandler() { @Override - public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException + public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException { request.setHandled(true); try @@ -97,7 +100,7 @@ public class StatisticsHandlerTest catch (Exception x) { Thread.currentThread().interrupt(); - throw (IOException)new IOException().initCause(x); + throw new IOException(x); } } }); @@ -182,7 +185,7 @@ public class StatisticsHandlerTest _statsHandler.setHandler(new AbstractHandler() { @Override - public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException + public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException { request.setHandled(true); try @@ -193,7 +196,7 @@ public class StatisticsHandlerTest catch (Exception x) { Thread.currentThread().interrupt(); - throw (IOException)new IOException().initCause(x); + throw new IOException(x); } } }); @@ -245,7 +248,7 @@ public class StatisticsHandlerTest _statsHandler.setHandler(new AbstractHandler() { @Override - public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException + public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException { request.setHandled(true); try @@ -306,22 +309,22 @@ public class StatisticsHandlerTest asyncHolder.get().addListener(new AsyncListener() { @Override - public void onTimeout(AsyncEvent event) throws IOException + public void onTimeout(AsyncEvent event) { } @Override - public void onStartAsync(AsyncEvent event) throws IOException + public void onStartAsync(AsyncEvent event) { } @Override - public void onError(AsyncEvent event) throws IOException + public void onError(AsyncEvent event) { } @Override - public void onComplete(AsyncEvent event) throws IOException + public void onComplete(AsyncEvent event) { try { @@ -375,7 +378,7 @@ public class StatisticsHandlerTest _statsHandler.setHandler(new AbstractHandler() { @Override - public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException + public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException { request.setHandled(true); try @@ -428,23 +431,23 @@ public class StatisticsHandlerTest asyncHolder.get().addListener(new AsyncListener() { @Override - public void onTimeout(AsyncEvent event) throws IOException + public void onTimeout(AsyncEvent event) { event.getAsyncContext().complete(); } @Override - public void onStartAsync(AsyncEvent event) throws IOException + public void onStartAsync(AsyncEvent event) { } @Override - public void onError(AsyncEvent event) throws IOException + public void onError(AsyncEvent event) { } @Override - public void onComplete(AsyncEvent event) throws IOException + public void onComplete(AsyncEvent event) { try { @@ -491,7 +494,7 @@ public class StatisticsHandlerTest _statsHandler.setHandler(new AbstractHandler() { @Override - public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException + public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException { request.setHandled(true); try @@ -550,22 +553,22 @@ public class StatisticsHandlerTest asyncHolder.get().addListener(new AsyncListener() { @Override - public void onTimeout(AsyncEvent event) throws IOException + public void onTimeout(AsyncEvent event) { } @Override - public void onStartAsync(AsyncEvent event) throws IOException + public void onStartAsync(AsyncEvent event) { } @Override - public void onError(AsyncEvent event) throws IOException + public void onError(AsyncEvent event) { } @Override - public void onComplete(AsyncEvent event) throws IOException + public void onComplete(AsyncEvent event) { try { @@ -601,6 +604,53 @@ public class StatisticsHandlerTest assertEquals(_statsHandler.getDispatchedTimeTotal(), _statsHandler.getDispatchedTimeMean(), 0.01); } + @Test + public void testAsyncRequestWithShutdown() throws Exception + { + long delay = 500; + CountDownLatch serverLatch = new CountDownLatch(1); + _statsHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + { + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + new Thread(() -> + { + try + { + Thread.sleep(delay); + asyncContext.complete(); + } + catch (InterruptedException e) + { + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500); + asyncContext.complete(); + } + }).start(); + serverLatch.countDown(); + } + }); + _server.start(); + + String request = "GET / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "\r\n"; + _connector.executeRequest(request); + + assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); + + Future shutdown = _statsHandler.shutdown(); + assertFalse(shutdown.isDone()); + + Thread.sleep(delay / 2); + assertFalse(shutdown.isDone()); + + Thread.sleep(delay); + assertTrue(shutdown.isDone()); + } + /** * This handler is external to the statistics handler and it is used to ensure that statistics handler's * handle() is fully executed before asserting its values in the tests, to avoid race conditions with the