Merge pull request #5187 from eclipse/jetty-9.4.x-5147-h2_round_robin_max_usage

Issue #5147 - HTTP2 RoundRobinConnectionPool with maxUsage
This commit is contained in:
Simone Bordet 2020-08-25 15:54:32 +02:00 committed by GitHub
commit 0dd8274f9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 85 additions and 42 deletions

View File

@ -440,9 +440,14 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
{
// Trigger the next request after releasing the connection.
if (connectionPool.release(connection))
{
send(false);
}
else
{
connection.close();
send(true);
}
}
else
{

View File

@ -18,22 +18,22 @@
package org.eclipse.jetty.client;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
@ManagedObject
public class RoundRobinConnectionPool extends MultiplexConnectionPool
{
private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class);
private final AtomicInteger offset = new AtomicInteger();
private final Locker lock = new Locker();
private final Pool<Connection> pool;
private int offset;
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
@ -47,26 +47,32 @@ public class RoundRobinConnectionPool extends MultiplexConnectionPool
}
@Override
protected Connection activate()
protected Connection acquire(boolean create)
{
int offset = this.offset.get();
Connection connection = activate(offset);
if (connection != null)
this.offset.getAndIncrement();
return connection;
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
// those that were closed to process queued requests.
return super.acquire(true);
}
private Connection activate(int offset)
@Override
protected Connection activate()
{
Pool<Connection>.Entry entry = pool.acquireAt(Math.abs(offset % pool.getMaxEntries()));
if (LOG.isDebugEnabled())
LOG.debug("activated '{}'", entry);
if (entry != null)
Pool<Connection>.Entry entry;
try (Locker.Lock l = lock.lock())
{
Connection connection = entry.getPooled();
acquired(connection);
return connection;
int index = Math.abs(offset % pool.getMaxEntries());
entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activated at index={} entry={}", index, entry);
if (entry != null)
++offset;
}
return null;
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
}

View File

@ -25,7 +25,6 @@ import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.ResetFrame;
@ -99,12 +98,9 @@ public class HttpChannelOverHTTP2 extends HttpChannel
@Override
public void release()
{
setStream(null);
connection.release(this);
}
void onStreamClosed(IStream stream)
{
connection.onStreamClosed(stream, this);
getHttpDestination().release(getHttpConnection());
}
@Override

View File

@ -35,7 +35,6 @@ import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
@ -119,16 +118,6 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
}
}
void onStreamClosed(IStream stream, HttpChannelOverHTTP2 channel)
{
if (LOG.isDebugEnabled())
LOG.debug("{} closed for {}", stream, channel);
channel.setStream(null);
// Only non-push channels are released.
if (stream.isLocal())
getHttpDestination().release(this);
}
@Override
public boolean onIdleTimeout(long idleTimeout)
{

View File

@ -38,7 +38,6 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
@ -201,12 +200,6 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
callback.succeeded();
}
@Override
public void onClosed(Stream stream)
{
getHttpChannel().onStreamClosed((IStream)stream);
}
private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
{
contentNotifier.offer(exchange, frame, callback);

View File

@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -180,4 +182,56 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate)));
}
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testMultiplexWithMaxUsage(Transport transport) throws Exception
{
init(transport);
int multiplex = 1;
if (scenario.transport.isHttp2Based())
multiplex = 2;
int maxMultiplex = multiplex;
int maxUsage = 2;
int maxConnections = 2;
int count = maxConnections * maxMultiplex * maxUsage;
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
remotePorts.add(request.getRemotePort());
}
});
scenario.client.getTransport().setConnectionPoolFactory(destination ->
{
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex);
pool.setMaxUsageCount(maxUsage);
return pool;
});
CountDownLatch clientLatch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
scenario.client.newRequest(scenario.newURI())
.path("/" + i)
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.OK_200)
clientLatch.countDown();
});
}
assertTrue(clientLatch.await(count, TimeUnit.SECONDS));
assertEquals(count, remotePorts.size());
Map<Integer, Long> results = remotePorts.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
assertEquals(count / maxUsage, results.size(), remotePorts.toString());
assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString());
}
}