#7157 add missing callback calls in H2 reset codepath

This commit is contained in:
Ludovic Orban 2021-11-23 11:47:12 +01:00
parent 27a08770a7
commit 08724d8e70
3 changed files with 122 additions and 7 deletions

View File

@ -152,14 +152,23 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
@Override
public void reset(ResetFrame frame, Callback callback)
{
Throwable resetFailure = null;
try (AutoLock l = lock.lock())
{
if (isReset())
return;
localReset = true;
failure = new EOFException("reset");
{
resetFailure = failure;
}
else
{
localReset = true;
failure = new EOFException("reset");
}
}
((HTTP2Session)session).reset(this, frame, callback);
if (resetFailure != null)
callback.failed(resetFailure);
else
((HTTP2Session)session).reset(this, frame, callback);
}
private boolean startWrite(Callback callback)
@ -541,6 +550,8 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
close();
if (session.removeStream(this))
notifyReset(this, frame, callback);
else
callback.succeeded();
}
private void onPush(PushPromiseFrame frame, Callback callback)
@ -565,6 +576,8 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
close();
if (session.removeStream(this))
notifyFailure(this, frame, callback);
else
callback.succeeded();
}
@Override

View File

@ -106,7 +106,10 @@ public class HttpChannelOverHTTP2 extends HttpChannel
public void release()
{
setStream(null);
if (connection.release(this))
boolean released = connection.release(this);
if (LOG.isDebugEnabled())
LOG.debug("released channel? {} {}", released, this);
if (released)
getHttpDestination().release(getHttpConnection());
}
@ -114,13 +117,15 @@ public class HttpChannelOverHTTP2 extends HttpChannel
public void exchangeTerminated(HttpExchange exchange, Result result)
{
super.exchangeTerminated(exchange, result);
Stream stream = getStream();
if (LOG.isDebugEnabled())
LOG.debug("exchange terminated {} {}", result, stream);
if (result.isSucceeded())
{
release();
}
else
{
Stream stream = getStream();
if (stream != null)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), new ReleaseCallback());
else

View File

@ -13,6 +13,9 @@
package org.eclipse.jetty.http2.client.http;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ -43,6 +46,7 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -58,10 +62,17 @@ public class MultiplexedConnectionPoolTest
private HttpClient client;
private void startServer(Handler handler) throws Exception
{
startServer(handler, MAX_MULTIPLEX, -1L);
}
private void startServer(Handler handler, int maxConcurrentStreams, long streamIdleTimeout) throws Exception
{
server = new Server();
HTTP2ServerConnectionFactory http2ServerConnectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration());
http2ServerConnectionFactory.setMaxConcurrentStreams(MAX_MULTIPLEX);
http2ServerConnectionFactory.setMaxConcurrentStreams(maxConcurrentStreams);
if (streamIdleTimeout > 0)
http2ServerConnectionFactory.setStreamIdleTimeout(streamIdleTimeout);
connector = new ServerConnector(server, 1, 1, http2ServerConnectionFactory);
server.addConnector(connector);
server.setHandler(handler);
@ -205,6 +216,92 @@ public class MultiplexedConnectionPoolTest
});
}
@Test
public void testStreamIdleTimeout() throws Exception
{
AtomicInteger poolCreateCounter = new AtomicInteger();
AtomicInteger poolRemoveCounter = new AtomicInteger();
AtomicReference<Pool<Connection>> poolRef = new AtomicReference<>();
ConnectionPoolFactory factory = new ConnectionPoolFactory("StreamIdleTimeout", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
MultiplexConnectionPool pool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, 10)
{
@Override
protected void onCreated(Connection connection)
{
poolCreateCounter.incrementAndGet();
}
@Override
protected void removed(Connection connection)
{
poolRemoveCounter.incrementAndGet();
}
};
poolRef.set(pool.getBean(Pool.class));
return pool;
});
startServer(new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
int req = Integer.parseInt(target.substring(1));
try
{
response.getWriter().println("req " + req + " executed");
response.getWriter().flush();
}
catch (Exception e)
{
throw new ServletException(e);
}
}
}, 64, 1L);
ClientConnector clientConnector = new ClientConnector();
HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client(clientConnector));
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport);
client.start();
List<CompletableFuture<Void>> futures = new ArrayList<>();
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 100; i++)
{
CompletableFuture<Void> cf = new CompletableFuture<>();
client.newRequest("localhost", connector.getLocalPort())
.path("/" + i)
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
counter.incrementAndGet();
cf.complete(null);
});
futures.add(cf);
}
// Wait for all requests to complete.
for (CompletableFuture<Void> cf : futures)
{
cf.get(5, TimeUnit.SECONDS);
}
assertThat(counter.get(), is(100));
// All remaining pooled connections should be in IDLE state.
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
for (Pool<Connection>.Entry value : poolRef.get().values())
{
if (!value.isIdle())
return false;
}
return true;
});
}
@Test
public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception
{