Fixes #2796 - Max local stream count exceeded when request fails.
Now releasing the connection only after the stream has been reset, so we are sure that the stream has been closed and its count decremented. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
5ee856c0c1
commit
35541d0c1e
|
@ -101,24 +101,22 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
|||
connection.release(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure)
|
||||
{
|
||||
Stream stream = getStream();
|
||||
boolean aborted = super.abort(exchange, requestFailure, responseFailure);
|
||||
if (aborted)
|
||||
{
|
||||
if (stream != null)
|
||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
||||
}
|
||||
return aborted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exchangeTerminated(HttpExchange exchange, Result result)
|
||||
{
|
||||
super.exchangeTerminated(exchange, result);
|
||||
release();
|
||||
if (result.isSucceeded())
|
||||
{
|
||||
release();
|
||||
}
|
||||
else
|
||||
{
|
||||
Stream stream = getStream();
|
||||
if (stream != null)
|
||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), new ReleaseCallback());
|
||||
else
|
||||
release();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -129,4 +127,27 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
|||
sender,
|
||||
receiver);
|
||||
}
|
||||
|
||||
private class ReleaseCallback implements Callback
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug(x);
|
||||
release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
return InvocationType.NON_BLOCKING;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -367,6 +367,39 @@ public class MaxConcurrentStreamsTest extends AbstractTest
|
|||
Assert.assertTrue(failures.toString(), failures.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoConcurrentStreamsFirstTimesOut() throws Exception
|
||||
{
|
||||
long timeout = 1000;
|
||||
start(1, new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
|
||||
{
|
||||
if (target.endsWith("/1"))
|
||||
sleep(2 * timeout);
|
||||
}
|
||||
});
|
||||
client.setMaxConnectionsPerDestination(1);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/1")
|
||||
.timeout(timeout, TimeUnit.MILLISECONDS)
|
||||
.send(result ->
|
||||
{
|
||||
if (result.isFailed())
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
ContentResponse response2 = client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/2")
|
||||
.send();
|
||||
|
||||
Assert.assertEquals(HttpStatus.OK_200, response2.getStatus());
|
||||
Assert.assertTrue(latch.await(2 * timeout, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
private void primeConnection() throws Exception
|
||||
{
|
||||
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
|
||||
|
|
Loading…
Reference in New Issue