Fixes #5409 - HttpClient fails intermittently with "Invalid response state TRANSIENT".

The problem was a race condition during content decoding.
Since decoding needs to be done in a loop, the condition to loop is to
check whether there is demand for the next chunk of decoded content.

Checking for demand also sets the stalled flag, and this must be done
only after the response state has been set back to CONTENT.
Unfortunately this was not done in the decoding loop.

The fix is to always update the response state in the decoding loop.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-10-13 23:18:26 +02:00
parent c37c2c59ab
commit c5df807b6d
3 changed files with 184 additions and 136 deletions

View File

@ -290,11 +290,11 @@ public class HttpExchange
{
synchronized (this)
{
return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h",
return String.format("%s@%x{req=%s[%s/%s] res=%s[%s/%s]}",
HttpExchange.class.getSimpleName(),
hashCode(),
requestState, requestFailure, requestFailure,
responseState, responseFailure, responseFailure);
request, requestState, requestFailure,
response, responseState, responseFailure);
}
}

View File

@ -187,7 +187,7 @@ public abstract class HttpReceiver
{
handlerListener = protocolHandler.getResponseListener();
if (LOG.isDebugEnabled())
LOG.debug("Found protocol handler {}", protocolHandler);
LOG.debug("Response {} found protocol handler {}", response, protocolHandler);
}
exchange.getConversation().updateResponseListeners(handlerListener);
@ -218,19 +218,8 @@ public abstract class HttpReceiver
*/
protected boolean responseHeader(HttpExchange exchange, HttpField field)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
return false;
}
}
if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT))
return false;
HttpResponse response = exchange.getResponse();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
@ -296,19 +285,8 @@ public abstract class HttpReceiver
*/
protected boolean responseHeaders(HttpExchange exchange)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
return false;
}
}
if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT))
return false;
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
@ -342,7 +320,7 @@ public abstract class HttpReceiver
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response headers {}, hasDemand={}", response, hasDemand);
LOG.debug("Response headers hasDemand={} {}", hasDemand, response);
return hasDemand;
}
@ -363,71 +341,39 @@ public abstract class HttpReceiver
*/
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return false;
}
}
boolean proceed = true;
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", exchange.getResponse(), System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (demand() <= 0)
{
callback.failed(new IllegalStateException("No demand for response content"));
proceed = false;
return false;
}
if (decoder == null)
return plainResponseContent(exchange, buffer, callback);
else
return decodeResponseContent(buffer, callback);
}
private boolean plainResponseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT))
{
callback.failed(new IllegalStateException("Invalid response state " + responseState));
return false;
}
HttpResponse response = exchange.getResponse();
if (proceed)
{
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (contentListeners.isEmpty())
{
callback.succeeded();
}
else
{
if (decoder == null)
{
contentListeners.notifyContent(response, buffer, callback);
}
else
{
try
{
proceed = decoder.decode(buffer, callback);
}
catch (Throwable x)
{
callback.failed(x);
proceed = false;
}
}
}
}
if (contentListeners.isEmpty())
callback.succeeded();
else
contentListeners.notifyContent(response, buffer, callback);
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (proceed)
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
return hasDemand;
}
else
{
return false;
}
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
return hasDemand;
}
dispose();
@ -435,6 +381,11 @@ public abstract class HttpReceiver
return false;
}
private boolean decodeResponseContent(ByteBuffer buffer, Callback callback)
{
return decoder.decode(buffer, callback);
}
/**
* Method to be invoked when the response is successful.
* <p>
@ -614,15 +565,42 @@ public abstract class HttpReceiver
}
}
private boolean updateResponseState(ResponseState from1, ResponseState from2, ResponseState to)
{
while (true)
{
ResponseState current = responseState.get();
if (current == from1 || current == from2)
{
if (updateResponseState(current, to))
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: [{},{}] -> {}: {}", from1, from2, to, current);
return false;
}
}
}
private boolean updateResponseState(ResponseState from, ResponseState to)
{
boolean updated = responseState.compareAndSet(from, to);
if (!updated)
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
ResponseState current = responseState.get();
if (current == from)
{
if (responseState.compareAndSet(current, to))
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: {} -> {}: {}", from, to, current);
return false;
}
}
return updated;
}
@Override
@ -778,14 +756,62 @@ public abstract class HttpReceiver
private boolean decode(ByteBuffer encoded, Callback callback)
{
// Store the buffer to decode in case the
// decoding produces multiple decoded buffers.
this.encoded = encoded;
this.callback = callback;
return decode();
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoding {} with {}{}{}", response, decoder, System.lineSeparator(), BufferUtil.toDetailString(encoded));
boolean needInput = decode();
if (!needInput)
return false;
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded, hasDemand={} {}", hasDemand, response);
return hasDemand;
}
private boolean decode()
{
while (true)
{
if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT))
{
callback.failed(new IllegalStateException("Invalid response state " + responseState));
return false;
}
DecodeResult result = decodeChunk();
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (result == DecodeResult.NEED_INPUT)
return true;
if (result == DecodeResult.ABORT)
return false;
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded chunk, hasDemand={} {}", hasDemand, exchange.getResponse());
if (hasDemand)
continue;
else
return false;
}
dispose();
terminateResponse(exchange);
return false;
}
}
private DecodeResult decodeChunk()
{
try
{
ByteBuffer buffer;
while (true)
@ -798,27 +824,30 @@ public abstract class HttpReceiver
callback.succeeded();
encoded = null;
callback = null;
return true;
return DecodeResult.NEED_INPUT;
}
}
ByteBuffer decoded = buffer;
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded));
LOG.debug("Response content decoded chunk {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand);
if (!hasDemand)
return false;
return DecodeResult.DECODE;
}
catch (Throwable x)
{
callback.failed(x);
return DecodeResult.ABORT;
}
}
private void resume()
{
if (LOG.isDebugEnabled())
LOG.debug("Response content resuming decoding {}", exchange);
LOG.debug("Response content resume decoding {} with {}", exchange.getResponse(), decoder);
// The content and callback may be null
// if there is no initial content demand.
@ -828,40 +857,9 @@ public abstract class HttpReceiver
return;
}
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return;
}
}
boolean decoded = false;
try
{
decoded = decode();
}
catch (Throwable x)
{
callback.failed(x);
}
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (decoded)
receive();
return;
}
dispose();
terminateResponse(exchange);
boolean needInput = decode();
if (needInput)
receive();
}
@Override
@ -871,4 +869,9 @@ public abstract class HttpReceiver
((Destroyable)decoder).destroy();
}
}
private enum DecodeResult
{
DECODE, NEED_INPUT, ABORT
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@ -32,11 +33,14 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.IO;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -266,6 +270,47 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest
assertThat(heapMemory, lessThan((long)content.length));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testLargeGZIPContentAsync(Scenario scenario) throws Exception
{
String digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
Random random = new Random();
byte[] content = new byte[32 * 1024 * 1024];
for (int i = 0; i < content.length; ++i)
{
content[i] = (byte)digits.charAt(random.nextInt(digits.length()));
}
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setContentType("text/plain;charset=" + StandardCharsets.US_ASCII.name());
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream());
gzip.write(content);
gzip.finish();
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.timeout(5, TimeUnit.SECONDS)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
ByteArrayOutputStream output = new ByteArrayOutputStream();
try (InputStream input = listener.getInputStream())
{
IO.copy(input, output);
}
assertArrayEquals(content, output.toByteArray());
}
private static void sleep(long ms) throws IOException
{
try