Jetty 9.4.x 4976 httpclient fix null network buffer (#5010)

Fixes #4976  HttpClient async content throws NPE in DEBUG log.

Reworked handling of asynchronous content by immediately exiting
HttpReceiverOverHTTP.process(), so that there is no race with
other threads that have been scheduled to resume the processing.

The call to HttpReceiver.dispose() that could be triggered by
an asynchronous failure is now performed either by the failing
thread (if the HttpReceiver is not processing) or by an I/O
thread (if the HttpReceiver is processing) similarly to what
happens when terminating the response.

The content decoding has been reworked to perform the required
state changes similarly to what non-decoded content is doing,
as this was completely lacking before (it was actually a side
bug that is now fixed).

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Simone Bordet 2020-07-03 09:30:15 +02:00 committed by GitHub
parent 5510d7a9b1
commit e0955192b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 423 additions and 111 deletions

View File

@ -199,6 +199,7 @@ public abstract class HttpReceiver
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
return true;
dispose();
terminateResponse(exchange);
return false;
}
@ -217,23 +218,17 @@ public abstract class HttpReceiver
*/
protected boolean responseHeader(HttpExchange exchange, HttpField field)
{
out:
while (true)
{
ResponseState current = responseState.get();
switch (current)
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
case BEGIN:
case HEADER:
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break out;
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
default:
{
return false;
}
}
else
{
return false;
}
}
@ -267,6 +262,7 @@ public abstract class HttpReceiver
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
return true;
dispose();
terminateResponse(exchange);
return false;
}
@ -334,7 +330,7 @@ public abstract class HttpReceiver
{
if (factory.getEncoding().equalsIgnoreCase(encoding))
{
decoder = new Decoder(response, factory.newContentDecoder());
decoder = new Decoder(exchange, factory.newContentDecoder());
break;
}
}
@ -350,6 +346,7 @@ public abstract class HttpReceiver
return hasDemand;
}
dispose();
terminateResponse(exchange);
return false;
}
@ -393,39 +390,28 @@ public abstract class HttpReceiver
{
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
ContentListeners listeners = this.contentListeners;
if (listeners != null)
if (contentListeners.isEmpty())
{
if (listeners.isEmpty())
{
callback.succeeded();
}
else
{
Decoder decoder = this.decoder;
if (decoder == null)
{
listeners.notifyContent(response, buffer, callback);
}
else
{
try
{
proceed = decoder.decode(buffer, callback);
}
catch (Throwable x)
{
callback.failed(x);
proceed = false;
}
}
}
callback.succeeded();
}
else
{
// May happen in case of concurrent abort.
proceed = false;
if (decoder == null)
{
contentListeners.notifyContent(response, buffer, callback);
}
else
{
try
{
proceed = decoder.decode(buffer, callback);
}
catch (Throwable x)
{
callback.failed(x);
proceed = false;
}
}
}
}
@ -444,6 +430,7 @@ public abstract class HttpReceiver
}
}
dispose();
terminateResponse(exchange);
return false;
}
@ -567,6 +554,7 @@ public abstract class HttpReceiver
*/
protected void dispose()
{
assert responseState.get() != ResponseState.TRANSIENT;
cleanup();
}
@ -598,7 +586,8 @@ public abstract class HttpReceiver
this.failure = failure;
dispose();
if (terminate)
dispose();
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
@ -776,14 +765,14 @@ public abstract class HttpReceiver
*/
private class Decoder implements Destroyable
{
private final HttpResponse response;
private final HttpExchange exchange;
private final ContentDecoder decoder;
private ByteBuffer encoded;
private Callback callback;
private Decoder(HttpResponse response, ContentDecoder decoder)
private Decoder(HttpExchange exchange, ContentDecoder decoder)
{
this.response = response;
this.exchange = exchange;
this.decoder = Objects.requireNonNull(decoder);
}
@ -814,13 +803,13 @@ public abstract class HttpReceiver
}
ByteBuffer decoded = buffer;
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded));
contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded {}, hasDemand={}", response, hasDemand);
LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand);
if (!hasDemand)
return false;
}
@ -829,9 +818,50 @@ public abstract class HttpReceiver
private void resume()
{
if (LOG.isDebugEnabled())
LOG.debug("Response content resuming decoding {}", response);
if (decode())
LOG.debug("Response content resuming decoding {}", exchange);
// The content and callback may be null
// if there is no initial content demand.
if (callback == null)
{
receive();
return;
}
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return;
}
}
boolean decoded = false;
try
{
decoded = decode();
}
catch (Throwable x)
{
callback.failed(x);
}
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (decoded)
receive();
return;
}
dispose();
terminateResponse(exchange);
}
@Override

View File

@ -87,7 +87,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
RetainableByteBuffer currentBuffer = networkBuffer;
if (currentBuffer == null)
throw new IllegalStateException();
if (currentBuffer.hasRemaining())
throw new IllegalStateException();
@ -107,9 +106,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
private void releaseNetworkBuffer()
{
if (networkBuffer == null)
throw new IllegalStateException();
if (networkBuffer.hasRemaining())
throw new IllegalStateException();
return;
networkBuffer.release();
if (LOG.isDebugEnabled())
LOG.debug("Released {}", networkBuffer);
@ -138,24 +135,27 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
while (true)
{
// Always parse even empty buffers to advance the parser.
boolean stopProcessing = parse();
if (parse())
{
// Return immediately, as this thread may be in a race
// with e.g. another thread demanding more content.
return;
}
// Connection may be closed or upgraded in a parser callback.
boolean upgraded = connection != endPoint.getConnection();
if (connection.isClosed() || upgraded)
{
if (LOG.isDebugEnabled())
LOG.debug("{} {}", connection, upgraded ? "upgraded" : "closed");
LOG.debug("{} {}", upgraded ? "Upgraded" : "Closed", connection);
releaseNetworkBuffer();
return;
}
if (stopProcessing)
return;
if (networkBuffer.getReferences() > 1)
reacquireNetworkBuffer();
// The networkBuffer may have been reacquired.
int read = endPoint.fill(networkBuffer.getBuffer());
if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes in {} from {}", read, networkBuffer, endPoint);
@ -182,7 +182,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
if (LOG.isDebugEnabled())
LOG.debug(x);
networkBuffer.clear();
releaseNetworkBuffer();
failAndClose(x);
}
@ -198,14 +197,24 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
while (true)
{
boolean handle = parser.parseNext(networkBuffer.getBuffer());
boolean failed = isFailed();
if (LOG.isDebugEnabled())
LOG.debug("Parse result={}, failed={}", handle, failed);
// When failed, it's safe to close the parser because there
// will be no races with other threads demanding more content.
if (failed)
parser.close();
if (handle)
return !failed;
boolean complete = this.complete;
this.complete = false;
if (LOG.isDebugEnabled())
LOG.debug("Parsed {}, remaining {} {}", handle, networkBuffer.remaining(), parser);
if (handle)
return true;
LOG.debug("Parse complete={}, remaining {} {}", complete, networkBuffer.remaining(), parser);
if (networkBuffer.isEmpty())
return false;
if (complete)
{
if (LOG.isDebugEnabled())
@ -291,8 +300,13 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null)
return false;
RetainableByteBuffer networkBuffer = this.networkBuffer;
networkBuffer.retain();
return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, this::failAndClose));
return !responseContent(exchange, buffer, Callback.from(networkBuffer::release, failure ->
{
networkBuffer.release();
failAndClose(failure);
}));
}
@Override
@ -323,15 +337,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (status != HttpStatus.CONTINUE_100)
complete = true;
boolean proceed = responseSuccess(exchange);
if (!proceed)
return true;
if (status == HttpStatus.SWITCHING_PROTOCOLS_101)
return true;
return HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) &&
status == HttpStatus.OK_200;
return !responseSuccess(exchange);
}
@Override
@ -364,13 +370,6 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
parser.reset();
}
@Override
protected void dispose()
{
super.dispose();
parser.close();
}
private void failAndClose(Throwable failure)
{
if (responseFailure(failure))

View File

@ -18,22 +18,32 @@
package org.eclipse.jetty.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.zip.GZIPOutputStream;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpChannelOverHTTP;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpReceiverOverHTTP;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -46,10 +56,10 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
@ArgumentsSource(ScenarioProvider.class)
public void testSmallAsyncContent(Scenario scenario) throws Exception
{
start(scenario, new AbstractHandler()
start(scenario, new EmptyServerHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
ServletOutputStream output = response.getOutputStream();
output.write(65);
@ -58,30 +68,19 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
}
});
final AtomicInteger contentCount = new AtomicInteger();
final AtomicReference<Callback> callbackRef = new AtomicReference<>();
final AtomicReference<CountDownLatch> contentLatch = new AtomicReference<>(new CountDownLatch(1));
final CountDownLatch completeLatch = new CountDownLatch(1);
AtomicInteger contentCount = new AtomicInteger();
AtomicReference<Callback> callbackRef = new AtomicReference<>();
AtomicReference<CountDownLatch> contentLatch = new AtomicReference<>(new CountDownLatch(1));
CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.onResponseContentAsync(new Response.AsyncContentListener()
.onResponseContentAsync((response, content, callback) ->
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
contentCount.incrementAndGet();
callbackRef.set(callback);
contentLatch.get().countDown();
}
contentCount.incrementAndGet();
callbackRef.set(callback);
contentLatch.get().countDown();
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
completeLatch.countDown();
}
});
.send(result -> completeLatch.countDown());
assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS));
Callback callback = callbackRef.get();
@ -113,4 +112,294 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
assertEquals(2, contentCount.get());
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testConcurrentAsyncContent(Scenario scenario) throws Exception
{
AtomicReference<AsyncContext> asyncContextRef = new AtomicReference<>();
startServer(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
ServletOutputStream output = response.getOutputStream();
output.write(new byte[1024]);
output.flush();
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
asyncContextRef.set(asyncContext);
}
});
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
startClient(scenario, new HttpClientTransportOverHTTP()
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise)
{
@Override
protected HttpChannelOverHTTP newHttpChannel()
{
return new HttpChannelOverHTTP(this)
{
@Override
protected HttpReceiverOverHTTP newHttpReceiver()
{
return new HttpReceiverOverHTTP(this)
{
@Override
public boolean content(ByteBuffer buffer)
{
try
{
boolean result = super.content(buffer);
// The content has been notified, but the listener has not demanded.
// Simulate an asynchronous demand from otherThread.
// There is no further content, so otherThread will fill 0,
// set the fill interest, and release the network buffer.
CountDownLatch latch = new CountDownLatch(1);
Thread otherThread = new Thread(() ->
{
demandRef.get().accept(1);
latch.countDown();
});
otherThread.start();
// Wait for otherThread to finish, then let this thread continue.
assertTrue(latch.await(5, TimeUnit.SECONDS));
return result;
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}
};
}
};
}
};
}
}, httpClient -> {});
CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.onResponseContentDemanded((response, demand, content, callback) ->
{
demandRef.set(demand);
// Don't demand and don't succeed the callback.
})
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
});
// Wait for the threads to finish their processing.
Thread.sleep(1000);
// Complete the response.
asyncContextRef.get().complete();
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testAsyncContentAbort(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.getOutputStream().write(new byte[1024]);
}
});
CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.onResponseContentDemanded((response, demand, content, callback) -> response.abort(new Throwable()))
.send(result ->
{
if (result.isFailed())
latch.countDown();
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testAsyncGzipContentAbortThenDemand(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setHeader("Content-Encoding", "gzip");
GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream());
gzip.write(new byte[1024]);
gzip.finish();
}
});
CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.onResponseContentDemanded((response, demand, content, callback) ->
{
response.abort(new Throwable());
demand.accept(1);
})
.send(result ->
{
if (result.isFailed())
latch.countDown();
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testAsyncGzipContentDelayedDemand(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
response.setHeader("Content-Encoding", "gzip");
try (GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream()))
{
gzip.write(new byte[1024]);
}
}
});
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
CountDownLatch headersLatch = new CountDownLatch(1);
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.onResponseContentDemanded(new Response.DemandedContentListener()
{
@Override
public void onBeforeContent(Response response, LongConsumer demand)
{
// Don't demand yet.
demandRef.set(demand);
headersLatch.countDown();
}
@Override
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
{
demand.accept(1);
}
})
.send(result ->
{
if (result.isSucceeded())
resultLatch.countDown();
});
assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
// Wait to make sure the demand is really delayed.
Thread.sleep(500);
demandRef.get().accept(1);
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testAsyncGzipContentAbortWhileDecodingWithDelayedDemand(Scenario scenario) throws Exception
{
// Use a large content so that the gzip decoding is done in multiple passes.
byte[] bytes = new byte[8 * 1024 * 1024];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(baos))
{
gzip.write(bytes);
}
byte[] gzipBytes = baos.toByteArray();
int half = gzipBytes.length / 2;
byte[] gzip1 = Arrays.copyOfRange(gzipBytes, 0, half);
byte[] gzip2 = Arrays.copyOfRange(gzipBytes, half, gzipBytes.length);
AtomicReference<AsyncContext> asyncContextRef = new AtomicReference<>();
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
asyncContextRef.set(asyncContext);
response.setHeader("Content-Encoding", "gzip");
ServletOutputStream output = response.getOutputStream();
output.write(gzip1);
output.flush();
}
});
AtomicReference<LongConsumer> demandRef = new AtomicReference<>();
CountDownLatch firstChunkLatch = new CountDownLatch(1);
CountDownLatch secondChunkLatch = new CountDownLatch(1);
CountDownLatch resultLatch = new CountDownLatch(1);
AtomicInteger chunks = new AtomicInteger();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.onResponseContentDemanded((response, demand, content, callback) ->
{
if (chunks.incrementAndGet() == 1)
{
try
{
// Don't demand, but make the server write the second chunk.
AsyncContext asyncContext = asyncContextRef.get();
asyncContext.getResponse().getOutputStream().write(gzip2);
asyncContext.complete();
demandRef.set(demand);
firstChunkLatch.countDown();
}
catch (IOException x)
{
throw new RuntimeException(x);
}
}
else
{
response.abort(new Throwable());
demandRef.set(demand);
secondChunkLatch.countDown();
}
})
.send(result ->
{
if (result.isFailed())
resultLatch.countDown();
});
assertTrue(firstChunkLatch.await(5, TimeUnit.SECONDS));
// Wait to make sure the demand is really delayed.
Thread.sleep(500);
demandRef.get().accept(1);
assertTrue(secondChunkLatch.await(5, TimeUnit.SECONDS));
// Wait to make sure the demand is really delayed.
Thread.sleep(500);
demandRef.get().accept(1);
assertTrue(resultLatch.await(555, TimeUnit.SECONDS));
}
}

View File

@ -79,7 +79,6 @@ import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.Net;
@ -1603,11 +1602,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
assertEquals(200, response.getStatus());
// Because the tunnel was successful, this connection will be
// upgraded to an SslConnection, so it will not be fill interested.
// This test doesn't upgrade, so it needs to restore the fill interest.
((AbstractConnection)connection).fillInterested();
// Test that I can send another request on the same connection.
request = client.newRequest(host, port);
listener = new FutureResponseListener(request);