Merge pull request #5449 from eclipse/jetty-9.4.x-5409-invalid_response_state_transient

Fixes #5409 - HttpClient fails intermittently with "Invalid response …
This commit is contained in:
Simone Bordet 2020-10-14 15:55:20 +02:00 committed by GitHub
commit 7bfca258d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 184 additions and 136 deletions

View File

@ -290,11 +290,11 @@ public class HttpExchange
{
synchronized (this)
{
return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h",
return String.format("%s@%x{req=%s[%s/%s] res=%s[%s/%s]}",
HttpExchange.class.getSimpleName(),
hashCode(),
requestState, requestFailure, requestFailure,
responseState, responseFailure, responseFailure);
request, requestState, requestFailure,
response, responseState, responseFailure);
}
}

View File

@ -187,7 +187,7 @@ public abstract class HttpReceiver
{
handlerListener = protocolHandler.getResponseListener();
if (LOG.isDebugEnabled())
LOG.debug("Found protocol handler {}", protocolHandler);
LOG.debug("Response {} found protocol handler {}", response, protocolHandler);
}
exchange.getConversation().updateResponseListeners(handlerListener);
@ -218,19 +218,8 @@ public abstract class HttpReceiver
*/
protected boolean responseHeader(HttpExchange exchange, HttpField field)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT))
return false;
}
}
HttpResponse response = exchange.getResponse();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
@ -296,19 +285,8 @@ public abstract class HttpReceiver
*/
protected boolean responseHeaders(HttpExchange exchange)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT))
return false;
}
}
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
@ -342,7 +320,7 @@ public abstract class HttpReceiver
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response headers {}, hasDemand={}", response, hasDemand);
LOG.debug("Response headers hasDemand={} {}", hasDemand, response);
return hasDemand;
}
@ -363,78 +341,51 @@ public abstract class HttpReceiver
*/
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return false;
}
}
boolean proceed = true;
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", exchange.getResponse(), System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (demand() <= 0)
{
callback.failed(new IllegalStateException("No demand for response content"));
proceed = false;
return false;
}
if (decoder == null)
return plainResponseContent(exchange, buffer, callback);
else
return decodeResponseContent(buffer, callback);
}
private boolean plainResponseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT))
{
callback.failed(new IllegalStateException("Invalid response state " + responseState));
return false;
}
HttpResponse response = exchange.getResponse();
if (proceed)
{
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (contentListeners.isEmpty())
{
callback.succeeded();
}
else
{
if (decoder == null)
{
contentListeners.notifyContent(response, buffer, callback);
}
else
{
try
{
proceed = decoder.decode(buffer, callback);
}
catch (Throwable x)
{
callback.failed(x);
proceed = false;
}
}
}
}
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (proceed)
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
return hasDemand;
}
else
{
return false;
}
}
dispose();
terminateResponse(exchange);
return false;
}
private boolean decodeResponseContent(ByteBuffer buffer, Callback callback)
{
return decoder.decode(buffer, callback);
}
/**
* Method to be invoked when the response is successful.
* <p>
@ -614,15 +565,42 @@ public abstract class HttpReceiver
}
}
private boolean updateResponseState(ResponseState from, ResponseState to)
private boolean updateResponseState(ResponseState from1, ResponseState from2, ResponseState to)
{
boolean updated = responseState.compareAndSet(from, to);
if (!updated)
while (true)
{
ResponseState current = responseState.get();
if (current == from1 || current == from2)
{
if (updateResponseState(current, to))
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
LOG.debug("State update failed: [{},{}] -> {}: {}", from1, from2, to, current);
return false;
}
}
}
private boolean updateResponseState(ResponseState from, ResponseState to)
{
while (true)
{
ResponseState current = responseState.get();
if (current == from)
{
if (responseState.compareAndSet(current, to))
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: {} -> {}: {}", from, to, current);
return false;
}
}
return updated;
}
@Override
@ -778,14 +756,62 @@ public abstract class HttpReceiver
private boolean decode(ByteBuffer encoded, Callback callback)
{
// Store the buffer to decode in case the
// decoding produces multiple decoded buffers.
this.encoded = encoded;
this.callback = callback;
return decode();
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoding {} with {}{}{}", response, decoder, System.lineSeparator(), BufferUtil.toDetailString(encoded));
boolean needInput = decode();
if (!needInput)
return false;
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded, hasDemand={} {}", hasDemand, response);
return hasDemand;
}
private boolean decode()
{
while (true)
{
if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT))
{
callback.failed(new IllegalStateException("Invalid response state " + responseState));
return false;
}
DecodeResult result = decodeChunk();
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (result == DecodeResult.NEED_INPUT)
return true;
if (result == DecodeResult.ABORT)
return false;
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded chunk, hasDemand={} {}", hasDemand, exchange.getResponse());
if (hasDemand)
continue;
else
return false;
}
dispose();
terminateResponse(exchange);
return false;
}
}
private DecodeResult decodeChunk()
{
try
{
ByteBuffer buffer;
while (true)
@ -798,27 +824,30 @@ public abstract class HttpReceiver
callback.succeeded();
encoded = null;
callback = null;
return true;
return DecodeResult.NEED_INPUT;
}
}
ByteBuffer decoded = buffer;
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded));
LOG.debug("Response content decoded chunk {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand);
if (!hasDemand)
return false;
return DecodeResult.DECODE;
}
catch (Throwable x)
{
callback.failed(x);
return DecodeResult.ABORT;
}
}
private void resume()
{
if (LOG.isDebugEnabled())
LOG.debug("Response content resuming decoding {}", exchange);
LOG.debug("Response content resume decoding {} with {}", exchange.getResponse(), decoder);
// The content and callback may be null
// if there is no initial content demand.
@ -828,40 +857,9 @@ public abstract class HttpReceiver
return;
}
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return;
}
}
boolean decoded = false;
try
{
decoded = decode();
}
catch (Throwable x)
{
callback.failed(x);
}
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (decoded)
boolean needInput = decode();
if (needInput)
receive();
return;
}
dispose();
terminateResponse(exchange);
}
@Override
@ -871,4 +869,9 @@ public abstract class HttpReceiver
((Destroyable)decoder).destroy();
}
}
private enum DecodeResult
{
DECODE, NEED_INPUT, ABORT
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@ -32,11 +33,14 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.IO;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -266,6 +270,47 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest
assertThat(heapMemory, lessThan((long)content.length));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testLargeGZIPContentAsync(Scenario scenario) throws Exception
{
String digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
Random random = new Random();
byte[] content = new byte[32 * 1024 * 1024];
for (int i = 0; i < content.length; ++i)
{
content[i] = (byte)digits.charAt(random.nextInt(digits.length()));
}
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setContentType("text/plain;charset=" + StandardCharsets.US_ASCII.name());
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream());
gzip.write(content);
gzip.finish();
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.timeout(5, TimeUnit.SECONDS)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response.getStatus());
ByteArrayOutputStream output = new ByteArrayOutputStream();
try (InputStream input = listener.getInputStream())
{
IO.copy(input, output);
}
assertArrayEquals(content, output.toByteArray());
}
private static void sleep(long ms) throws IOException
{
try