Issue #5147 - HTTP2 RoundRobinConnectionPool with maxUsage

Reworked HTTP/2 release after an exchange is terminated.

Previously, the release was bound to 2 events: onStreamClosed(),
introduced for #2796, and exchangeTerminated().
Unfortunately, if the former happens before the latter and
closes the connection, the latter will see the exchange as
aborted, while in fact it was successful, causing what
reported in #5147, an AsynchronousCloseException.

Now, the release is always performed by the exchangeTerminated()
event. With respect to #2796, the stream is always already
closed by the time the exchangeTerminated() event fires (it
was not before).

Reworked the implementation of RoundRobinConnectionPool using
a lock and aggressively trying to open new connections.

A second fix is related to HttpDestination.release(Connection).
If the connection is closed for e.g. overuse, we need to trigger
the processing of queued requests via send(create: true).

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-08-22 22:10:08 +02:00
parent 2d3f0e0c10
commit 0af5f676cd
6 changed files with 85 additions and 42 deletions

View File

@ -440,9 +440,14 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
{
// Trigger the next request after releasing the connection.
if (connectionPool.release(connection))
{
send(false);
}
else
{
connection.close();
send(true);
}
}
else
{

View File

@ -18,22 +18,22 @@
package org.eclipse.jetty.client;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
@ManagedObject
public class RoundRobinConnectionPool extends MultiplexConnectionPool
{
private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class);
private final AtomicInteger offset = new AtomicInteger();
private final Locker lock = new Locker();
private final Pool<Connection> pool;
private int offset;
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
@ -47,26 +47,32 @@ public class RoundRobinConnectionPool extends MultiplexConnectionPool
}
@Override
protected Connection activate()
protected Connection acquire(boolean create)
{
int offset = this.offset.get();
Connection connection = activate(offset);
if (connection != null)
this.offset.getAndIncrement();
return connection;
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
// those that were closed to process queued requests.
return super.acquire(true);
}
private Connection activate(int offset)
@Override
protected Connection activate()
{
Pool<Connection>.Entry entry = pool.acquireAt(Math.abs(offset % pool.getMaxEntries()));
Pool<Connection>.Entry entry;
try (Locker.Lock l = lock.lock())
{
int index = Math.abs(offset % pool.getMaxEntries());
entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activated '{}'", entry);
LOG.debug("activated at index={} entry={}", index, entry);
if (entry != null)
{
++offset;
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
return null;
}
}

View File

@ -25,7 +25,6 @@ import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.ResetFrame;
@ -99,12 +98,9 @@ public class HttpChannelOverHTTP2 extends HttpChannel
@Override
public void release()
{
setStream(null);
connection.release(this);
}
void onStreamClosed(IStream stream)
{
connection.onStreamClosed(stream, this);
getHttpDestination().release(getHttpConnection());
}
@Override

View File

@ -35,7 +35,6 @@ import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
@ -119,16 +118,6 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
}
}
void onStreamClosed(IStream stream, HttpChannelOverHTTP2 channel)
{
if (LOG.isDebugEnabled())
LOG.debug("{} closed for {}", stream, channel);
channel.setStream(null);
// Only non-push channels are released.
if (stream.isLocal())
getHttpDestination().release(this);
}
@Override
public boolean onIdleTimeout(long idleTimeout)
{

View File

@ -38,7 +38,6 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
@ -201,12 +200,6 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
callback.succeeded();
}
@Override
public void onClosed(Stream stream)
{
getHttpChannel().onStreamClosed((IStream)stream);
}
private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
{
contentNotifier.offer(exchange, frame, callback);

View File

@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -180,4 +182,56 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate)));
}
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testMultiplexWithMaxUsage(Transport transport) throws Exception
{
init(transport);
int multiplex = 1;
if (scenario.transport.isHttp2Based())
multiplex = 2;
int maxMultiplex = multiplex;
int maxUsage = 2;
int maxConnections = 2;
int count = maxConnections * maxMultiplex * maxUsage;
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
remotePorts.add(request.getRemotePort());
}
});
scenario.client.getTransport().setConnectionPoolFactory(destination ->
{
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex);
pool.setMaxUsageCount(maxUsage);
return pool;
});
CountDownLatch clientLatch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
scenario.client.newRequest(scenario.newURI())
.path("/" + i)
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.OK_200)
clientLatch.countDown();
});
}
assertTrue(clientLatch.await(count, TimeUnit.SECONDS));
assertEquals(count, remotePorts.size());
Map<Integer, Long> results = remotePorts.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
assertEquals(count / maxUsage, results.size(), remotePorts.toString());
assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString());
}
}