Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-11.0.x

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-11-25 10:40:55 +01:00
commit 3ca48f719c
5 changed files with 137 additions and 11 deletions

View File

@ -152,14 +152,23 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
@Override
public void reset(ResetFrame frame, Callback callback)
{
Throwable resetFailure = null;
try (AutoLock l = lock.lock())
{
if (isReset())
return;
localReset = true;
failure = new EOFException("reset");
{
resetFailure = failure;
}
else
{
localReset = true;
failure = new EOFException("reset");
}
}
((HTTP2Session)session).reset(this, frame, callback);
if (resetFailure != null)
callback.failed(resetFailure);
else
((HTTP2Session)session).reset(this, frame, callback);
}
private boolean startWrite(Callback callback)
@ -541,6 +550,8 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
close();
if (session.removeStream(this))
notifyReset(this, frame, callback);
else
callback.succeeded();
}
private void onPush(PushPromiseFrame frame, Callback callback)
@ -565,6 +576,8 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
close();
if (session.removeStream(this))
notifyFailure(this, frame, callback);
else
callback.succeeded();
}
@Override

View File

@ -106,7 +106,10 @@ public class HttpChannelOverHTTP2 extends HttpChannel
public void release()
{
setStream(null);
if (connection.release(this))
boolean released = connection.release(this);
if (LOG.isDebugEnabled())
LOG.debug("released channel? {} {}", released, this);
if (released)
getHttpDestination().release(getHttpConnection());
}
@ -114,13 +117,15 @@ public class HttpChannelOverHTTP2 extends HttpChannel
public void exchangeTerminated(HttpExchange exchange, Result result)
{
super.exchangeTerminated(exchange, result);
Stream stream = getStream();
if (LOG.isDebugEnabled())
LOG.debug("exchange terminated {} {}", result, stream);
if (result.isSucceeded())
{
release();
}
else
{
Stream stream = getStream();
if (stream != null)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), new ReleaseCallback());
else

View File

@ -13,6 +13,9 @@
package org.eclipse.jetty.http2.client.http;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ -43,6 +46,7 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -58,10 +62,17 @@ public class MultiplexedConnectionPoolTest
private HttpClient client;
private void startServer(Handler handler) throws Exception
{
startServer(handler, MAX_MULTIPLEX, -1L);
}
private void startServer(Handler handler, int maxConcurrentStreams, long streamIdleTimeout) throws Exception
{
server = new Server();
HTTP2ServerConnectionFactory http2ServerConnectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration());
http2ServerConnectionFactory.setMaxConcurrentStreams(MAX_MULTIPLEX);
http2ServerConnectionFactory.setMaxConcurrentStreams(maxConcurrentStreams);
if (streamIdleTimeout > 0)
http2ServerConnectionFactory.setStreamIdleTimeout(streamIdleTimeout);
connector = new ServerConnector(server, 1, 1, http2ServerConnectionFactory);
server.addConnector(connector);
server.setHandler(handler);
@ -205,6 +216,92 @@ public class MultiplexedConnectionPoolTest
});
}
@Test
public void testStreamIdleTimeout() throws Exception
{
AtomicInteger poolCreateCounter = new AtomicInteger();
AtomicInteger poolRemoveCounter = new AtomicInteger();
AtomicReference<Pool<Connection>> poolRef = new AtomicReference<>();
ConnectionPoolFactory factory = new ConnectionPoolFactory("StreamIdleTimeout", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
MultiplexConnectionPool pool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, 10)
{
@Override
protected void onCreated(Connection connection)
{
poolCreateCounter.incrementAndGet();
}
@Override
protected void removed(Connection connection)
{
poolRemoveCounter.incrementAndGet();
}
};
poolRef.set(pool.getBean(Pool.class));
return pool;
});
startServer(new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
int req = Integer.parseInt(target.substring(1));
try
{
response.getWriter().println("req " + req + " executed");
response.getWriter().flush();
}
catch (Exception e)
{
throw new ServletException(e);
}
}
}, 64, 1L);
ClientConnector clientConnector = new ClientConnector();
HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client(clientConnector));
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport);
client.start();
List<CompletableFuture<Void>> futures = new ArrayList<>();
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 100; i++)
{
CompletableFuture<Void> cf = new CompletableFuture<>();
client.newRequest("localhost", connector.getLocalPort())
.path("/" + i)
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
counter.incrementAndGet();
cf.complete(null);
});
futures.add(cf);
}
// Wait for all requests to complete.
for (CompletableFuture<Void> cf : futures)
{
cf.get(5, TimeUnit.SECONDS);
}
assertThat(counter.get(), is(100));
// All remaining pooled connections should be in IDLE state.
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
for (Pool<Connection>.Entry value : poolRef.get().values())
{
if (!value.isIdle())
return false;
}
return true;
});
}
@Test
public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception
{

View File

@ -33,9 +33,20 @@
<doCheck>false</doCheck>
<doUpdate>false</doUpdate>
<revisionOnScmFailure>${nonCanonicalRevision}</revisionOnScmFailure>
<providerImplementations>
<git>jgit</git>
</providerImplementations>
<scmDirectory>../</scmDirectory>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.maven.scm</groupId>
<artifactId>maven-scm-provider-jgit</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>

View File

@ -86,10 +86,10 @@
<jmh.version>1.33</jmh.version>
<jna.version>5.10.0</jna.version>
<jnr-constants.version>0.10.3</jnr-constants.version>
<jnr-enxio.version>0.32.10</jnr-enxio.version>
<jnr-ffi.version>2.2.8</jnr-ffi.version>
<jnr-posix.version>3.1.11</jnr-posix.version>
<jnr-unixsocket.version>0.38.12</jnr-unixsocket.version>
<jnr-enxio.version>0.32.11</jnr-enxio.version>
<jnr-ffi.version>2.2.9</jnr-ffi.version>
<jnr-posix.version>3.1.12</jnr-posix.version>
<jnr-unixsocket.version>0.38.13</jnr-unixsocket.version>
<jolokia.version>1.3.3</jolokia.version>
<json-simple.version>1.1.1</json-simple.version>
<json-smart.version>2.4.7</json-smart.version>