Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2020-07-03 10:46:24 +02:00
commit 42d5db3208
5 changed files with 434 additions and 119 deletions

View File

@ -199,6 +199,7 @@ public abstract class HttpReceiver
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN)) if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
return true; return true;
dispose();
terminateResponse(exchange); terminateResponse(exchange);
return false; return false;
} }
@ -217,25 +218,19 @@ public abstract class HttpReceiver
*/ */
protected boolean responseHeader(HttpExchange exchange, HttpField field) protected boolean responseHeader(HttpExchange exchange, HttpField field)
{ {
out:
while (true) while (true)
{ {
ResponseState current = responseState.get(); ResponseState current = responseState.get();
switch (current) if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
case BEGIN:
case HEADER:
{ {
if (updateResponseState(current, ResponseState.TRANSIENT)) if (updateResponseState(current, ResponseState.TRANSIENT))
break out;
break; break;
} }
default: else
{ {
return false; return false;
} }
} }
}
HttpResponse response = exchange.getResponse(); HttpResponse response = exchange.getResponse();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
@ -267,6 +262,7 @@ public abstract class HttpReceiver
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER)) if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
return true; return true;
dispose();
terminateResponse(exchange); terminateResponse(exchange);
return false; return false;
} }
@ -334,7 +330,7 @@ public abstract class HttpReceiver
{ {
if (factory.getEncoding().equalsIgnoreCase(encoding)) if (factory.getEncoding().equalsIgnoreCase(encoding))
{ {
decoder = new Decoder(response, factory.newContentDecoder()); decoder = new Decoder(exchange, factory.newContentDecoder());
break; break;
} }
} }
@ -350,6 +346,7 @@ public abstract class HttpReceiver
return hasDemand; return hasDemand;
} }
dispose();
terminateResponse(exchange); terminateResponse(exchange);
return false; return false;
} }
@ -393,20 +390,15 @@ public abstract class HttpReceiver
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer)); LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (contentListeners.isEmpty())
ContentListeners listeners = this.contentListeners;
if (listeners != null)
{
if (listeners.isEmpty())
{ {
callback.succeeded(); callback.succeeded();
} }
else else
{ {
Decoder decoder = this.decoder;
if (decoder == null) if (decoder == null)
{ {
listeners.notifyContent(response, buffer, callback); contentListeners.notifyContent(response, buffer, callback);
} }
else else
{ {
@ -422,12 +414,6 @@ public abstract class HttpReceiver
} }
} }
} }
else
{
// May happen in case of concurrent abort.
proceed = false;
}
}
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{ {
@ -444,6 +430,7 @@ public abstract class HttpReceiver
} }
} }
dispose();
terminateResponse(exchange); terminateResponse(exchange);
return false; return false;
} }
@ -567,6 +554,7 @@ public abstract class HttpReceiver
*/ */
protected void dispose() protected void dispose()
{ {
assert responseState.get() != ResponseState.TRANSIENT;
cleanup(); cleanup();
} }
@ -598,6 +586,7 @@ public abstract class HttpReceiver
this.failure = failure; this.failure = failure;
if (terminate)
dispose(); dispose();
HttpResponse response = exchange.getResponse(); HttpResponse response = exchange.getResponse();
@ -776,14 +765,14 @@ public abstract class HttpReceiver
*/ */
private class Decoder implements Destroyable private class Decoder implements Destroyable
{ {
private final HttpResponse response; private final HttpExchange exchange;
private final ContentDecoder decoder; private final ContentDecoder decoder;
private ByteBuffer encoded; private ByteBuffer encoded;
private Callback callback; private Callback callback;
private Decoder(HttpResponse response, ContentDecoder decoder) private Decoder(HttpExchange exchange, ContentDecoder decoder)
{ {
this.response = response; this.exchange = exchange;
this.decoder = Objects.requireNonNull(decoder); this.decoder = Objects.requireNonNull(decoder);
} }
@ -814,13 +803,13 @@ public abstract class HttpReceiver
} }
ByteBuffer decoded = buffer; ByteBuffer decoded = buffer;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded)); LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded));
contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
boolean hasDemand = hasDemandOrStall(); boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Response content decoded {}, hasDemand={}", response, hasDemand); LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand);
if (!hasDemand) if (!hasDemand)
return false; return false;
} }
@ -829,9 +818,50 @@ public abstract class HttpReceiver
private void resume() private void resume()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Response content resuming decoding {}", response); LOG.debug("Response content resuming decoding {}", exchange);
if (decode())
// The content and callback may be null
// if there is no initial content demand.
if (callback == null)
{
receive(); receive();
return;
}
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return;
}
}
boolean decoded = false;
try
{
decoded = decode();
}
catch (Throwable x)
{
callback.failed(x);
}
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (decoded)
receive();
return;
}
dispose();
terminateResponse(exchange);
} }
@Override @Override

View File

@ -100,7 +100,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
RetainableByteBuffer currentBuffer = networkBuffer; RetainableByteBuffer currentBuffer = networkBuffer;
if (currentBuffer == null) if (currentBuffer == null)
throw new IllegalStateException(); throw new IllegalStateException();
if (currentBuffer.hasRemaining()) if (currentBuffer.hasRemaining())
throw new IllegalStateException(); throw new IllegalStateException();
@ -121,9 +120,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
private void releaseNetworkBuffer() private void releaseNetworkBuffer()
{ {
if (networkBuffer == null) if (networkBuffer == null)
throw new IllegalStateException(); return;
if (networkBuffer.hasRemaining())
throw new IllegalStateException();
networkBuffer.release(); networkBuffer.release();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Released {}", networkBuffer); LOG.debug("Released {}", networkBuffer);
@ -153,24 +150,27 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
while (true) while (true)
{ {
// Always parse even empty buffers to advance the parser. // Always parse even empty buffers to advance the parser.
boolean stopProcessing = parse(); if (parse())
{
// Return immediately, as this thread may be in a race
// with e.g. another thread demanding more content.
return;
}
// Connection may be closed or upgraded in a parser callback. // Connection may be closed or upgraded in a parser callback.
boolean upgraded = connection != endPoint.getConnection(); boolean upgraded = connection != endPoint.getConnection();
if (connection.isClosed() || upgraded) if (connection.isClosed() || upgraded)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} {}", connection, upgraded ? "upgraded" : "closed"); LOG.debug("{} {}", upgraded ? "Upgraded" : "Closed", connection);
releaseNetworkBuffer(); releaseNetworkBuffer();
return; return;
} }
if (stopProcessing)
return;
if (networkBuffer.getReferences() > 1) if (networkBuffer.getReferences() > 1)
reacquireNetworkBuffer(); reacquireNetworkBuffer();
// The networkBuffer may have been reacquired.
int read = endPoint.fill(networkBuffer.getBuffer()); int read = endPoint.fill(networkBuffer.getBuffer());
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes in {} from {}", read, networkBuffer, endPoint); LOG.debug("Read {} bytes in {} from {}", read, networkBuffer, endPoint);
@ -196,8 +196,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
catch (Throwable x) catch (Throwable x)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Unable to fill from endpoint {}", endPoint, x); LOG.debug("Error processing {}", endPoint, x);
networkBuffer.clear();
releaseNetworkBuffer(); releaseNetworkBuffer();
failAndClose(x); failAndClose(x);
} }
@ -213,14 +212,24 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
while (true) while (true)
{ {
boolean handle = parser.parseNext(networkBuffer.getBuffer()); boolean handle = parser.parseNext(networkBuffer.getBuffer());
boolean failed = isFailed();
if (LOG.isDebugEnabled())
LOG.debug("Parse result={}, failed={}", handle, failed);
// When failed, it's safe to close the parser because there
// will be no races with other threads demanding more content.
if (failed)
parser.close();
if (handle)
return !failed;
boolean complete = this.complete; boolean complete = this.complete;
this.complete = false; this.complete = false;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Parsed {}, remaining {} {}", handle, networkBuffer.remaining(), parser); LOG.debug("Parse complete={}, remaining {} {}", complete, networkBuffer.remaining(), parser);
if (handle)
return true;
if (networkBuffer.isEmpty()) if (networkBuffer.isEmpty())
return false; return false;
if (complete) if (complete)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -301,8 +310,13 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null) if (exchange == null)
return false; return false;
RetainableByteBuffer networkBuffer = this.networkBuffer;
networkBuffer.retain(); networkBuffer.retain();
return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, this::failAndClose)); return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, failure ->
{
networkBuffer.release();
failAndClose(failure);
}));
} }
@Override @Override
@ -333,17 +347,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (status != HttpStatus.CONTINUE_100) if (status != HttpStatus.CONTINUE_100)
complete = true; complete = true;
boolean proceed = responseSuccess(exchange); return !responseSuccess(exchange);
if (!proceed)
return true;
if (status == HttpStatus.SWITCHING_PROTOCOLS_101)
return true;
if (HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) && status == HttpStatus.OK_200)
return true;
return false;
} }
@Override @Override
@ -376,13 +380,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
parser.reset(); parser.reset();
} }
@Override
protected void dispose()
{
super.dispose();
parser.close();
}
private void failAndClose(Throwable failure) private void failAndClose(Throwable failure)
{ {
if (responseFailure(failure)) if (responseFailure(failure))

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.client;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
@ -45,13 +46,13 @@ public abstract class AbstractHttpClientServerTest
protected HttpClient client; protected HttpClient client;
protected ServerConnector connector; protected ServerConnector connector;
public void start(final Scenario scenario, Handler handler) throws Exception public void start(Scenario scenario, Handler handler) throws Exception
{ {
startServer(scenario, handler); startServer(scenario, handler);
startClient(scenario); startClient(scenario);
} }
protected void startServer(final Scenario scenario, Handler handler) throws Exception protected void startServer(Scenario scenario, Handler handler) throws Exception
{ {
if (server == null) if (server == null)
{ {
@ -66,23 +67,27 @@ public abstract class AbstractHttpClientServerTest
server.start(); server.start();
} }
protected void startClient(final Scenario scenario) throws Exception protected void startClient(Scenario scenario) throws Exception
{ {
startClient(scenario, null); startClient(scenario, null);
} }
protected void startClient(final Scenario scenario, Consumer<HttpClient> config) throws Exception protected void startClient(Scenario scenario, Consumer<HttpClient> config) throws Exception
{
startClient(scenario, HttpClientTransportOverHTTP::new, config);
}
protected void startClient(Scenario scenario, Function<ClientConnector, HttpClientTransportOverHTTP> transport, Consumer<HttpClient> config) throws Exception
{ {
ClientConnector clientConnector = new ClientConnector(); ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1); clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(scenario.newClientSslContextFactory()); clientConnector.setSslContextFactory(scenario.newClientSslContextFactory());
HttpClientTransport transport = new HttpClientTransportOverHTTP(clientConnector);
QueuedThreadPool executor = new QueuedThreadPool(); QueuedThreadPool executor = new QueuedThreadPool();
executor.setName("client"); executor.setName("client");
clientConnector.setExecutor(executor); clientConnector.setExecutor(executor);
Scheduler scheduler = new ScheduledExecutorScheduler("client-scheduler", false); Scheduler scheduler = new ScheduledExecutorScheduler("client-scheduler", false);
clientConnector.setScheduler(scheduler); clientConnector.setScheduler(scheduler);
client = newHttpClient(transport); client = newHttpClient(transport.apply(clientConnector));
client.setSocketAddressResolver(new SocketAddressResolver.Sync()); client.setSocketAddressResolver(new SocketAddressResolver.Sync());
if (config != null) if (config != null)
config.accept(client); config.accept(client);

View File

@ -18,21 +18,30 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.zip.GZIPOutputStream;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.http.HttpChannelOverHTTP;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpReceiverOverHTTP;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.ArgumentsSource;
@ -46,10 +55,10 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
@ArgumentsSource(ScenarioProvider.class) @ArgumentsSource(ScenarioProvider.class)
public void testSmallAsyncContent(Scenario scenario) throws Exception public void testSmallAsyncContent(Scenario scenario) throws Exception
{ {
start(scenario, new AbstractHandler() start(scenario, new EmptyServerHandler()
{ {
@Override @Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{ {
ServletOutputStream output = response.getOutputStream(); ServletOutputStream output = response.getOutputStream();
output.write(65); output.write(65);
@ -58,30 +67,19 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
} }
}); });
final AtomicInteger contentCount = new AtomicInteger(); AtomicInteger contentCount = new AtomicInteger();
final AtomicReference<Callback> callbackRef = new AtomicReference<>(); AtomicReference<Callback> callbackRef = new AtomicReference<>();
final AtomicReference<CountDownLatch> contentLatch = new AtomicReference<>(new CountDownLatch(1)); AtomicReference<CountDownLatch> contentLatch = new AtomicReference<>(new CountDownLatch(1));
final CountDownLatch completeLatch = new CountDownLatch(1); CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme()) .scheme(scenario.getScheme())
.onResponseContentAsync(new Response.AsyncContentListener() .onResponseContentAsync((response, content, callback) ->
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{ {
contentCount.incrementAndGet(); contentCount.incrementAndGet();
callbackRef.set(callback); callbackRef.set(callback);
contentLatch.get().countDown(); contentLatch.get().countDown();
}
}) })
.send(new Response.CompleteListener() .send(result -> completeLatch.countDown());
{
@Override
public void onComplete(Result result)
{
completeLatch.countDown();
}
});
assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS)); assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS));
Callback callback = callbackRef.get(); Callback callback = callbackRef.get();
@ -113,4 +111,294 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
assertEquals(2, contentCount.get()); assertEquals(2, contentCount.get());
} }
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testConcurrentAsyncContent(Scenario scenario) throws Exception
{
AtomicReference<AsyncContext> asyncContextRef = new AtomicReference<>();
startServer(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
ServletOutputStream output = response.getOutputStream();
output.write(new byte[1024]);
output.flush();
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
asyncContextRef.set(asyncContext);
}
});
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
startClient(scenario, clientConnector -> new HttpClientTransportOverHTTP(clientConnector)
{
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
return customize(new HttpConnectionOverHTTP(endPoint, context)
{
@Override
protected HttpChannelOverHTTP newHttpChannel()
{
return new HttpChannelOverHTTP(this)
{
@Override
protected HttpReceiverOverHTTP newHttpReceiver()
{
return new HttpReceiverOverHTTP(this)
{
@Override
public boolean content(ByteBuffer buffer)
{
try
{
boolean result = super.content(buffer);
// The content has been notified, but the listener has not demanded.
// Simulate an asynchronous demand from otherThread.
// There is no further content, so otherThread will fill 0,
// set the fill interest, and release the network buffer.
CountDownLatch latch = new CountDownLatch(1);
Thread otherThread = new Thread(() ->
{
demandRef.get().accept(1);
latch.countDown();
});
otherThread.start();
// Wait for otherThread to finish, then let this thread continue.
assertTrue(latch.await(5, TimeUnit.SECONDS));
return result;
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}
};
}
};
}
}, context);
}
}, null);
CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.onResponseContentDemanded((response, demand, content, callback) ->
{
demandRef.set(demand);
// Don't demand and don't succeed the callback.
})
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
});
// Wait for the threads to finish their processing.
Thread.sleep(1000);
// Complete the response.
asyncContextRef.get().complete();
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testAsyncContentAbort(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.getOutputStream().write(new byte[1024]);
}
});
CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.onResponseContentDemanded((response, demand, content, callback) -> response.abort(new Throwable()))
.send(result ->
{
if (result.isFailed())
latch.countDown();
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testAsyncGzipContentAbortThenDemand(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setHeader("Content-Encoding", "gzip");
GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream());
gzip.write(new byte[1024]);
gzip.finish();
}
});
CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.onResponseContentDemanded((response, demand, content, callback) ->
{
response.abort(new Throwable());
demand.accept(1);
})
.send(result ->
{
if (result.isFailed())
latch.countDown();
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testAsyncGzipContentDelayedDemand(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
response.setHeader("Content-Encoding", "gzip");
try (GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream()))
{
gzip.write(new byte[1024]);
}
}
});
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
CountDownLatch headersLatch = new CountDownLatch(1);
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.onResponseContentDemanded(new Response.DemandedContentListener()
{
@Override
public void onBeforeContent(Response response, LongConsumer demand)
{
// Don't demand yet.
demandRef.set(demand);
headersLatch.countDown();
}
@Override
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
{
demand.accept(1);
}
})
.send(result ->
{
if (result.isSucceeded())
resultLatch.countDown();
});
assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
// Wait to make sure the demand is really delayed.
Thread.sleep(500);
demandRef.get().accept(1);
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testAsyncGzipContentAbortWhileDecodingWithDelayedDemand(Scenario scenario) throws Exception
{
// Use a large content so that the gzip decoding is done in multiple passes.
byte[] bytes = new byte[8 * 1024 * 1024];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(baos))
{
gzip.write(bytes);
}
byte[] gzipBytes = baos.toByteArray();
int half = gzipBytes.length / 2;
byte[] gzip1 = Arrays.copyOfRange(gzipBytes, 0, half);
byte[] gzip2 = Arrays.copyOfRange(gzipBytes, half, gzipBytes.length);
AtomicReference<AsyncContext> asyncContextRef = new AtomicReference<>();
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
asyncContextRef.set(asyncContext);
response.setHeader("Content-Encoding", "gzip");
ServletOutputStream output = response.getOutputStream();
output.write(gzip1);
output.flush();
}
});
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
CountDownLatch firstChunkLatch = new CountDownLatch(1);
CountDownLatch secondChunkLatch = new CountDownLatch(1);
CountDownLatch resultLatch = new CountDownLatch(1);
AtomicInteger chunks = new AtomicInteger();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.onResponseContentDemanded((response, demand, content, callback) ->
{
if (chunks.incrementAndGet() == 1)
{
try
{
// Don't demand, but make the server write the second chunk.
AsyncContext asyncContext = asyncContextRef.get();
asyncContext.getResponse().getOutputStream().write(gzip2);
asyncContext.complete();
demandRef.set(demand);
firstChunkLatch.countDown();
}
catch (IOException x)
{
throw new RuntimeException(x);
}
}
else
{
response.abort(new Throwable());
demandRef.set(demand);
secondChunkLatch.countDown();
}
})
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
assertTrue(firstChunkLatch.await(5, TimeUnit.SECONDS));
// Wait to make sure the demand is really delayed.
Thread.sleep(500);
demandRef.get().accept(1);
assertTrue(secondChunkLatch.await(5, TimeUnit.SECONDS));
// Wait to make sure the demand is really delayed.
Thread.sleep(500);
demandRef.get().accept(1);
assertTrue(resultLatch.await(555, TimeUnit.SECONDS));
}
} }

View File

@ -1582,11 +1582,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest
ContentResponse response = listener.get(5, TimeUnit.SECONDS); ContentResponse response = listener.get(5, TimeUnit.SECONDS);
assertEquals(200, response.getStatus()); assertEquals(200, response.getStatus());
// Because the tunnel was successful, this connection will be
// upgraded to an SslConnection, so it will not be fill interested.
// This test doesn't upgrade, so it needs to restore the fill interest.
((AbstractConnection)connection).fillInterested();
// Test that I can send another request on the same connection. // Test that I can send another request on the same connection.
request = client.newRequest(host, port); request = client.newRequest(host, port);
listener = new FutureResponseListener(request); listener = new FutureResponseListener(request);