From 6407322d9c3f7d65ac32e8e56d6fe8b0e28fe542 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 2 Jul 2020 11:15:34 +1000 Subject: [PATCH 1/2] Issue #5012 - Remove unneeded dependency for test-jaas-webapp Signed-off-by: Lachlan Roberts --- tests/test-webapps/test-jaas-webapp/pom.xml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/test-webapps/test-jaas-webapp/pom.xml b/tests/test-webapps/test-jaas-webapp/pom.xml index 837c3c7b723..c7ee8037da4 100644 --- a/tests/test-webapps/test-jaas-webapp/pom.xml +++ b/tests/test-webapps/test-jaas-webapp/pom.xml @@ -50,13 +50,6 @@ - - - mysql - mysql-connector-java - 5.1.19 - - org.apache.maven.plugins From e0955192b8e6aae6a20614388279ad5c4f893866 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 3 Jul 2020 09:30:15 +0200 Subject: [PATCH 2/2] Jetty 9.4.x 4976 httpclient fix null network buffer (#5010) Fixes #4976 HttpClient async content throws NPE in DEBUG log. Reworked handling of asynchronous content by immediately exiting HttpReceiverOverHTTP.process(), so that there is no race with other threads that have been scheduled to resume the processing. The call to HttpReceiver.dispose() that could be triggered by an asynchronous failure is now performed either by the failing thread (if the HttpReceiver is not processing) or by an I/O thread (if the HttpReceiver is processing) similarly to what happens when terminating the response. The content decoding has been reworked to perform the required state changes similarly to what non-decoded content is doing, as this was completely lacking before (it was actually a side bug that is now fixed). Signed-off-by: Simone Bordet Co-authored-by: Ludovic Orban --- .../eclipse/jetty/client/HttpReceiver.java | 132 ++++--- .../client/http/HttpReceiverOverHTTP.java | 59 ++- .../client/HttpClientAsyncContentTest.java | 337 ++++++++++++++++-- .../eclipse/jetty/client/HttpClientTest.java | 6 - 4 files changed, 423 insertions(+), 111 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index f14eda67106..27127388768 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -199,6 +199,7 @@ public abstract class HttpReceiver if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN)) return true; + dispose(); terminateResponse(exchange); return false; } @@ -217,23 +218,17 @@ public abstract class HttpReceiver */ protected boolean responseHeader(HttpExchange exchange, HttpField field) { - out: while (true) { ResponseState current = responseState.get(); - switch (current) + if (current == ResponseState.BEGIN || current == ResponseState.HEADER) { - case BEGIN: - case HEADER: - { - if (updateResponseState(current, ResponseState.TRANSIENT)) - break out; + if (updateResponseState(current, ResponseState.TRANSIENT)) break; - } - default: - { - return false; - } + } + else + { + return false; } } @@ -267,6 +262,7 @@ public abstract class HttpReceiver if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER)) return true; + dispose(); terminateResponse(exchange); return false; } @@ -334,7 +330,7 @@ public abstract class HttpReceiver { if (factory.getEncoding().equalsIgnoreCase(encoding)) { - decoder = new Decoder(response, factory.newContentDecoder()); + decoder = new Decoder(exchange, factory.newContentDecoder()); break; } } @@ -350,6 +346,7 @@ public abstract class HttpReceiver return hasDemand; } + dispose(); terminateResponse(exchange); return false; } @@ -393,39 +390,28 @@ public abstract class HttpReceiver { if (LOG.isDebugEnabled()) LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer)); - - ContentListeners listeners = this.contentListeners; - if (listeners != null) + if (contentListeners.isEmpty()) { - if (listeners.isEmpty()) - { - callback.succeeded(); - } - else - { - Decoder decoder = this.decoder; - if (decoder == null) - { - listeners.notifyContent(response, buffer, callback); - } - else - { - try - { - proceed = decoder.decode(buffer, callback); - } - catch (Throwable x) - { - callback.failed(x); - proceed = false; - } - } - } + callback.succeeded(); } else { - // May happen in case of concurrent abort. - proceed = false; + if (decoder == null) + { + contentListeners.notifyContent(response, buffer, callback); + } + else + { + try + { + proceed = decoder.decode(buffer, callback); + } + catch (Throwable x) + { + callback.failed(x); + proceed = false; + } + } } } @@ -444,6 +430,7 @@ public abstract class HttpReceiver } } + dispose(); terminateResponse(exchange); return false; } @@ -567,6 +554,7 @@ public abstract class HttpReceiver */ protected void dispose() { + assert responseState.get() != ResponseState.TRANSIENT; cleanup(); } @@ -598,7 +586,8 @@ public abstract class HttpReceiver this.failure = failure; - dispose(); + if (terminate) + dispose(); HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) @@ -776,14 +765,14 @@ public abstract class HttpReceiver */ private class Decoder implements Destroyable { - private final HttpResponse response; + private final HttpExchange exchange; private final ContentDecoder decoder; private ByteBuffer encoded; private Callback callback; - private Decoder(HttpResponse response, ContentDecoder decoder) + private Decoder(HttpExchange exchange, ContentDecoder decoder) { - this.response = response; + this.exchange = exchange; this.decoder = Objects.requireNonNull(decoder); } @@ -814,13 +803,13 @@ public abstract class HttpReceiver } ByteBuffer decoded = buffer; if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded)); + LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded)); - contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); + contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); boolean hasDemand = hasDemandOrStall(); if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded {}, hasDemand={}", response, hasDemand); + LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand); if (!hasDemand) return false; } @@ -829,9 +818,50 @@ public abstract class HttpReceiver private void resume() { if (LOG.isDebugEnabled()) - LOG.debug("Response content resuming decoding {}", response); - if (decode()) + LOG.debug("Response content resuming decoding {}", exchange); + + // The content and callback may be null + // if there is no initial content demand. + if (callback == null) + { receive(); + return; + } + + while (true) + { + ResponseState current = responseState.get(); + if (current == ResponseState.HEADERS || current == ResponseState.CONTENT) + { + if (updateResponseState(current, ResponseState.TRANSIENT)) + break; + } + else + { + callback.failed(new IllegalStateException("Invalid response state " + current)); + return; + } + } + + boolean decoded = false; + try + { + decoded = decode(); + } + catch (Throwable x) + { + callback.failed(x); + } + + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) + { + if (decoded) + receive(); + return; + } + + dispose(); + terminateResponse(exchange); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java index e032e70ab7e..e5bf9eec51b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java @@ -87,7 +87,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res RetainableByteBuffer currentBuffer = networkBuffer; if (currentBuffer == null) throw new IllegalStateException(); - if (currentBuffer.hasRemaining()) throw new IllegalStateException(); @@ -107,9 +106,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res private void releaseNetworkBuffer() { if (networkBuffer == null) - throw new IllegalStateException(); - if (networkBuffer.hasRemaining()) - throw new IllegalStateException(); + return; networkBuffer.release(); if (LOG.isDebugEnabled()) LOG.debug("Released {}", networkBuffer); @@ -138,24 +135,27 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res while (true) { // Always parse even empty buffers to advance the parser. - boolean stopProcessing = parse(); + if (parse()) + { + // Return immediately, as this thread may be in a race + // with e.g. another thread demanding more content. + return; + } // Connection may be closed or upgraded in a parser callback. boolean upgraded = connection != endPoint.getConnection(); if (connection.isClosed() || upgraded) { if (LOG.isDebugEnabled()) - LOG.debug("{} {}", connection, upgraded ? "upgraded" : "closed"); + LOG.debug("{} {}", upgraded ? "Upgraded" : "Closed", connection); releaseNetworkBuffer(); return; } - if (stopProcessing) - return; - if (networkBuffer.getReferences() > 1) reacquireNetworkBuffer(); + // The networkBuffer may have been reacquired. int read = endPoint.fill(networkBuffer.getBuffer()); if (LOG.isDebugEnabled()) LOG.debug("Read {} bytes in {} from {}", read, networkBuffer, endPoint); @@ -182,7 +182,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res { if (LOG.isDebugEnabled()) LOG.debug(x); - networkBuffer.clear(); releaseNetworkBuffer(); failAndClose(x); } @@ -198,14 +197,24 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res while (true) { boolean handle = parser.parseNext(networkBuffer.getBuffer()); + boolean failed = isFailed(); + if (LOG.isDebugEnabled()) + LOG.debug("Parse result={}, failed={}", handle, failed); + // When failed, it's safe to close the parser because there + // will be no races with other threads demanding more content. + if (failed) + parser.close(); + if (handle) + return !failed; + boolean complete = this.complete; this.complete = false; if (LOG.isDebugEnabled()) - LOG.debug("Parsed {}, remaining {} {}", handle, networkBuffer.remaining(), parser); - if (handle) - return true; + LOG.debug("Parse complete={}, remaining {} {}", complete, networkBuffer.remaining(), parser); + if (networkBuffer.isEmpty()) return false; + if (complete) { if (LOG.isDebugEnabled()) @@ -291,8 +300,13 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res if (exchange == null) return false; + RetainableByteBuffer networkBuffer = this.networkBuffer; networkBuffer.retain(); - return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, this::failAndClose)); + return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, failure -> + { + networkBuffer.release(); + failAndClose(failure); + })); } @Override @@ -323,15 +337,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res if (status != HttpStatus.CONTINUE_100) complete = true; - boolean proceed = responseSuccess(exchange); - if (!proceed) - return true; - - if (status == HttpStatus.SWITCHING_PROTOCOLS_101) - return true; - - return HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) && - status == HttpStatus.OK_200; + return !responseSuccess(exchange); } @Override @@ -364,13 +370,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res parser.reset(); } - @Override - protected void dispose() - { - super.dispose(); - parser.close(); - } - private void failAndClose(Throwable failure) { if (responseFailure(failure)) diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java index 3dfec04e1e2..9902c27fe99 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java @@ -18,22 +18,32 @@ package org.eclipse.jetty.client; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongConsumer; +import java.util.zip.GZIPOutputStream; +import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.http.HttpChannelOverHTTP; +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; +import org.eclipse.jetty.client.http.HttpReceiverOverHTTP; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -46,10 +56,10 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest @ArgumentsSource(ScenarioProvider.class) public void testSmallAsyncContent(Scenario scenario) throws Exception { - start(scenario, new AbstractHandler() + start(scenario, new EmptyServerHandler() { @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { ServletOutputStream output = response.getOutputStream(); output.write(65); @@ -58,30 +68,19 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest } }); - final AtomicInteger contentCount = new AtomicInteger(); - final AtomicReference callbackRef = new AtomicReference<>(); - final AtomicReference contentLatch = new AtomicReference<>(new CountDownLatch(1)); - final CountDownLatch completeLatch = new CountDownLatch(1); + AtomicInteger contentCount = new AtomicInteger(); + AtomicReference callbackRef = new AtomicReference<>(); + AtomicReference contentLatch = new AtomicReference<>(new CountDownLatch(1)); + CountDownLatch completeLatch = new CountDownLatch(1); client.newRequest("localhost", connector.getLocalPort()) .scheme(scenario.getScheme()) - .onResponseContentAsync(new Response.AsyncContentListener() + .onResponseContentAsync((response, content, callback) -> { - @Override - public void onContent(Response response, ByteBuffer content, Callback callback) - { - contentCount.incrementAndGet(); - callbackRef.set(callback); - contentLatch.get().countDown(); - } + contentCount.incrementAndGet(); + callbackRef.set(callback); + contentLatch.get().countDown(); }) - .send(new Response.CompleteListener() - { - @Override - public void onComplete(Result result) - { - completeLatch.countDown(); - } - }); + .send(result -> completeLatch.countDown()); assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS)); Callback callback = callbackRef.get(); @@ -113,4 +112,294 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); assertEquals(2, contentCount.get()); } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testConcurrentAsyncContent(Scenario scenario) throws Exception + { + AtomicReference asyncContextRef = new AtomicReference<>(); + startServer(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + ServletOutputStream output = response.getOutputStream(); + output.write(new byte[1024]); + output.flush(); + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + asyncContextRef.set(asyncContext); + } + }); + AtomicReference demandRef = new AtomicReference<>(); + startClient(scenario, new HttpClientTransportOverHTTP() + { + @Override + protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise promise) + { + return new HttpConnectionOverHTTP(endPoint, destination, promise) + { + @Override + protected HttpChannelOverHTTP newHttpChannel() + { + return new HttpChannelOverHTTP(this) + { + @Override + protected HttpReceiverOverHTTP newHttpReceiver() + { + return new HttpReceiverOverHTTP(this) + { + @Override + public boolean content(ByteBuffer buffer) + { + try + { + boolean result = super.content(buffer); + // The content has been notified, but the listener has not demanded. + + // Simulate an asynchronous demand from otherThread. + // There is no further content, so otherThread will fill 0, + // set the fill interest, and release the network buffer. + CountDownLatch latch = new CountDownLatch(1); + Thread otherThread = new Thread(() -> + { + demandRef.get().accept(1); + latch.countDown(); + }); + otherThread.start(); + // Wait for otherThread to finish, then let this thread continue. + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + return result; + } + catch (InterruptedException x) + { + throw new RuntimeException(x); + } + } + }; + } + }; + } + }; + } + }, httpClient -> {}); + + CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> + { + demandRef.set(demand); + // Don't demand and don't succeed the callback. + }) + .send(result -> + { + if (result.isSucceeded()) + latch.countDown(); + }); + + // Wait for the threads to finish their processing. + Thread.sleep(1000); + + // Complete the response. + asyncContextRef.get().complete(); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncContentAbort(Scenario scenario) throws Exception + { + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.getOutputStream().write(new byte[1024]); + } + }); + + CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> response.abort(new Throwable())) + .send(result -> + { + if (result.isFailed()) + latch.countDown(); + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncGzipContentAbortThenDemand(Scenario scenario) throws Exception + { + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.setHeader("Content-Encoding", "gzip"); + GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream()); + gzip.write(new byte[1024]); + gzip.finish(); + } + }); + + CountDownLatch latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> + { + response.abort(new Throwable()); + demand.accept(1); + }) + .send(result -> + { + if (result.isFailed()) + latch.countDown(); + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncGzipContentDelayedDemand(Scenario scenario) throws Exception + { + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + response.setHeader("Content-Encoding", "gzip"); + try (GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream())) + { + gzip.write(new byte[1024]); + } + } + }); + + AtomicReference demandRef = new AtomicReference<>(); + CountDownLatch headersLatch = new CountDownLatch(1); + CountDownLatch resultLatch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded(new Response.DemandedContentListener() + { + @Override + public void onBeforeContent(Response response, LongConsumer demand) + { + // Don't demand yet. + demandRef.set(demand); + headersLatch.countDown(); + } + + @Override + public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) + { + demand.accept(1); + } + }) + .send(result -> + { + if (result.isSucceeded()) + resultLatch.countDown(); + }); + + assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); + // Wait to make sure the demand is really delayed. + Thread.sleep(500); + demandRef.get().accept(1); + + assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testAsyncGzipContentAbortWhileDecodingWithDelayedDemand(Scenario scenario) throws Exception + { + // Use a large content so that the gzip decoding is done in multiple passes. + byte[] bytes = new byte[8 * 1024 * 1024]; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(baos)) + { + gzip.write(bytes); + } + byte[] gzipBytes = baos.toByteArray(); + int half = gzipBytes.length / 2; + byte[] gzip1 = Arrays.copyOfRange(gzipBytes, 0, half); + byte[] gzip2 = Arrays.copyOfRange(gzipBytes, half, gzipBytes.length); + + AtomicReference asyncContextRef = new AtomicReference<>(); + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + asyncContextRef.set(asyncContext); + + response.setHeader("Content-Encoding", "gzip"); + ServletOutputStream output = response.getOutputStream(); + output.write(gzip1); + output.flush(); + } + }); + + AtomicReference demandRef = new AtomicReference<>(); + CountDownLatch firstChunkLatch = new CountDownLatch(1); + CountDownLatch secondChunkLatch = new CountDownLatch(1); + CountDownLatch resultLatch = new CountDownLatch(1); + AtomicInteger chunks = new AtomicInteger(); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .onResponseContentDemanded((response, demand, content, callback) -> + { + if (chunks.incrementAndGet() == 1) + { + try + { + // Don't demand, but make the server write the second chunk. + AsyncContext asyncContext = asyncContextRef.get(); + asyncContext.getResponse().getOutputStream().write(gzip2); + asyncContext.complete(); + demandRef.set(demand); + firstChunkLatch.countDown(); + } + catch (IOException x) + { + throw new RuntimeException(x); + } + } + else + { + response.abort(new Throwable()); + demandRef.set(demand); + secondChunkLatch.countDown(); + } + }) + .send(result -> + { + if (result.isFailed()) + resultLatch.countDown(); + }); + + assertTrue(firstChunkLatch.await(5, TimeUnit.SECONDS)); + // Wait to make sure the demand is really delayed. + Thread.sleep(500); + demandRef.get().accept(1); + + assertTrue(secondChunkLatch.await(5, TimeUnit.SECONDS)); + // Wait to make sure the demand is really delayed. + Thread.sleep(500); + demandRef.get().accept(1); + + assertTrue(resultLatch.await(555, TimeUnit.SECONDS)); + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index b082a9753c9..aef8d691562 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -79,7 +79,6 @@ import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.Net; @@ -1603,11 +1602,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest ContentResponse response = listener.get(5, TimeUnit.SECONDS); assertEquals(200, response.getStatus()); - // Because the tunnel was successful, this connection will be - // upgraded to an SslConnection, so it will not be fill interested. - // This test doesn't upgrade, so it needs to restore the fill interest. - ((AbstractConnection)connection).fillInterested(); - // Test that I can send another request on the same connection. request = client.newRequest(host, port); listener = new FutureResponseListener(request);