Fix NPE in HttpReceiverOverHTTP2.read() when the channel's stream is null (#11443)
* Added null guard. * Avoid executing the event actions in case the response is either complete or (new change) terminated. Signed-off-by: Ludovic Orban <lorban@bitronix.be> Signed-off-by: Simone Bordet <simone.bordet@gmail.com> Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
06d4b6ff13
commit
1e6240e2f4
|
@ -233,6 +233,14 @@ public class HttpExchange implements CyclicTimeouts.Expirable
|
|||
return result;
|
||||
}
|
||||
|
||||
boolean isResponseCompleteOrTerminated()
|
||||
{
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
return responseState == State.COMPLETED || responseState == State.TERMINATED;
|
||||
}
|
||||
}
|
||||
|
||||
public void abort(Throwable failure, Promise<Boolean> promise)
|
||||
{
|
||||
// Atomically change the state of this exchange to be completed.
|
||||
|
|
|
@ -148,8 +148,9 @@ public abstract class HttpReceiver
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Executing responseBegin for {} on {}", exchange, this);
|
||||
|
||||
if (exchange.isResponseComplete())
|
||||
if (exchange.isResponseCompleteOrTerminated())
|
||||
return;
|
||||
|
||||
responseState = ResponseState.BEGIN;
|
||||
HttpResponse response = exchange.getResponse();
|
||||
HttpConversation conversation = exchange.getConversation();
|
||||
|
@ -192,8 +193,9 @@ public abstract class HttpReceiver
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Executing responseHeader on {}", this);
|
||||
|
||||
if (exchange.isResponseComplete())
|
||||
if (exchange.isResponseCompleteOrTerminated())
|
||||
return;
|
||||
|
||||
responseState = ResponseState.HEADER;
|
||||
HttpResponse response = exchange.getResponse();
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -243,7 +245,7 @@ public abstract class HttpReceiver
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Executing responseHeaders on {}", this);
|
||||
|
||||
if (exchange.isResponseComplete())
|
||||
if (exchange.isResponseCompleteOrTerminated())
|
||||
return;
|
||||
|
||||
responseState = ResponseState.HEADERS;
|
||||
|
@ -284,7 +286,7 @@ public abstract class HttpReceiver
|
|||
ResponseListeners responseListeners = exchange.getConversation().getResponseListeners();
|
||||
responseListeners.notifyHeaders(response);
|
||||
|
||||
if (exchange.isResponseComplete())
|
||||
if (exchange.isResponseCompleteOrTerminated())
|
||||
return;
|
||||
|
||||
if (HttpStatus.isInterim(response.getStatus()))
|
||||
|
@ -461,8 +463,7 @@ public abstract class HttpReceiver
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Invoking abort with {} on {}", failure, this);
|
||||
|
||||
// This method should be called only after calling HttpExchange.responseComplete().
|
||||
if (!exchange.isResponseComplete())
|
||||
if (!exchange.isResponseCompleteOrTerminated())
|
||||
throw new IllegalStateException();
|
||||
|
||||
invoker.run(() ->
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
package org.eclipse.jetty.http2.client.transport.internal;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.BiFunction;
|
||||
|
@ -67,6 +68,8 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Reading, fillInterestIfNeeded={} in {}", fillInterestIfNeeded, this);
|
||||
Stream stream = getHttpChannel().getStream();
|
||||
if (stream == null)
|
||||
return Content.Chunk.from(new EOFException("Channel has been released"));
|
||||
Stream.Data data = stream.readData();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Read stream data {} in {}", data, this);
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.eclipse.jetty.client.HttpProxy;
|
|||
import org.eclipse.jetty.client.InputStreamResponseListener;
|
||||
import org.eclipse.jetty.client.Origin;
|
||||
import org.eclipse.jetty.client.Response;
|
||||
import org.eclipse.jetty.client.Result;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
|
@ -81,8 +82,13 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
|||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
@ -736,6 +742,57 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResponseListenerAbortInOnBegin() throws Exception
|
||||
{
|
||||
start(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
AtomicReference<Throwable> onContentSourceErrorRef = new AtomicReference<>();
|
||||
AtomicReference<Result> resultRef = new AtomicReference<>();
|
||||
|
||||
org.eclipse.jetty.client.Request jettyRequest = httpClient.newRequest("localhost", connector.getLocalPort());
|
||||
jettyRequest.send(new Response.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onBegin(org.eclipse.jetty.client.Response response)
|
||||
{
|
||||
response.abort(new ArrayStoreException("nothing is ever going to throw ArrayStoreException in our code"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContentSource(org.eclipse.jetty.client.Response response, Content.Source contentSource)
|
||||
{
|
||||
try
|
||||
{
|
||||
Content.Chunk chunk = contentSource.read();
|
||||
chunk.release();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
onContentSourceErrorRef.set(x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
resultRef.set(result);
|
||||
}
|
||||
});
|
||||
|
||||
await().atMost(5, TimeUnit.SECONDS).until(resultRef::get, not(nullValue()));
|
||||
assertThat(resultRef.get().getFailure(), instanceOf(ArrayStoreException.class));
|
||||
assertThat(onContentSourceErrorRef.get(), is(nullValue()));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Tag("external")
|
||||
public void testExternalServer() throws Exception
|
||||
|
|
Loading…
Reference in New Issue