Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.
This commit is contained in:
commit
30303c7979
|
@ -429,9 +429,14 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
{
|
||||
// Trigger the next request after releasing the connection.
|
||||
if (connectionPool.release(connection))
|
||||
{
|
||||
send(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
connection.close();
|
||||
send(true);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -18,12 +18,11 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Pool;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -32,8 +31,9 @@ public class RoundRobinConnectionPool extends MultiplexConnectionPool
|
|||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RoundRobinConnectionPool.class);
|
||||
|
||||
private final AtomicInteger offset = new AtomicInteger();
|
||||
private final AutoLock lock = new AutoLock();
|
||||
private final Pool<Connection> pool;
|
||||
private int offset;
|
||||
|
||||
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
|
||||
{
|
||||
|
@ -47,26 +47,32 @@ public class RoundRobinConnectionPool extends MultiplexConnectionPool
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Connection activate()
|
||||
public Connection acquire(boolean create)
|
||||
{
|
||||
int offset = this.offset.get();
|
||||
Connection connection = activate(offset);
|
||||
if (connection != null)
|
||||
this.offset.getAndIncrement();
|
||||
return connection;
|
||||
// If there are queued requests and connections get
|
||||
// closed due to idle timeout or overuse, we want to
|
||||
// aggressively try to open new connections to replace
|
||||
// those that were closed to process queued requests.
|
||||
return super.acquire(true);
|
||||
}
|
||||
|
||||
private Connection activate(int offset)
|
||||
@Override
|
||||
protected Connection activate()
|
||||
{
|
||||
Pool<Connection>.Entry entry = pool.acquireAt(Math.abs(offset % pool.getMaxEntries()));
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("activated '{}'", entry);
|
||||
if (entry != null)
|
||||
Pool<Connection>.Entry entry;
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
Connection connection = entry.getPooled();
|
||||
acquired(connection);
|
||||
return connection;
|
||||
int index = Math.abs(offset % pool.getMaxEntries());
|
||||
entry = pool.acquireAt(index);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("activated at index={} entry={}", index, entry);
|
||||
if (entry != null)
|
||||
++offset;
|
||||
}
|
||||
return null;
|
||||
if (entry == null)
|
||||
return null;
|
||||
Connection connection = entry.getPooled();
|
||||
acquired(connection);
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -110,12 +110,9 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
|||
@Override
|
||||
public void release()
|
||||
{
|
||||
setStream(null);
|
||||
connection.release(this);
|
||||
}
|
||||
|
||||
void onStreamClosed(IStream stream)
|
||||
{
|
||||
connection.onStreamClosed(stream, this);
|
||||
getHttpDestination().release(getHttpConnection());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -215,12 +212,5 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
|||
HTTP2Channel.Client channel = (HTTP2Channel.Client)((IStream)stream).getAttachment();
|
||||
channel.onFailure(failure, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClosed(Stream stream)
|
||||
{
|
||||
// TODO: needs to call HTTP2Channel?
|
||||
receiver.onClosed(stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -167,16 +167,6 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
|
|||
}
|
||||
}
|
||||
|
||||
void onStreamClosed(IStream stream, HttpChannelOverHTTP2 channel)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} closed for {}", stream, channel);
|
||||
channel.setStream(null);
|
||||
// Only non-push channels are released.
|
||||
if (stream.isLocal())
|
||||
getHttpDestination().release(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleTimeout(long idleTimeout)
|
||||
{
|
||||
|
@ -239,9 +229,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
|
|||
{
|
||||
if (!isClosed())
|
||||
return false;
|
||||
if (sweeps.incrementAndGet() < 4)
|
||||
return false;
|
||||
return true;
|
||||
return sweeps.incrementAndGet() >= 4;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -235,11 +235,6 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
|
|||
callback.succeeded();
|
||||
}
|
||||
|
||||
void onClosed(Stream stream)
|
||||
{
|
||||
getHttpChannel().onStreamClosed((IStream)stream);
|
||||
}
|
||||
|
||||
private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
|
||||
{
|
||||
contentNotifier.offer(exchange, frame, callback);
|
||||
|
|
|
@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
|
@ -180,4 +182,56 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
|
|||
assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate)));
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testMultiplexWithMaxUsage(Transport transport) throws Exception
|
||||
{
|
||||
init(transport);
|
||||
|
||||
int multiplex = 1;
|
||||
if (scenario.transport.isHttp2Based())
|
||||
multiplex = 2;
|
||||
int maxMultiplex = multiplex;
|
||||
|
||||
int maxUsage = 2;
|
||||
int maxConnections = 2;
|
||||
int count = maxConnections * maxMultiplex * maxUsage;
|
||||
|
||||
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
|
||||
scenario.start(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
|
||||
{
|
||||
remotePorts.add(request.getRemotePort());
|
||||
}
|
||||
});
|
||||
scenario.client.getTransport().setConnectionPoolFactory(destination ->
|
||||
{
|
||||
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex);
|
||||
pool.setMaxUsageCount(maxUsage);
|
||||
return pool;
|
||||
});
|
||||
|
||||
CountDownLatch clientLatch = new CountDownLatch(count);
|
||||
for (int i = 0; i < count; ++i)
|
||||
{
|
||||
scenario.client.newRequest(scenario.newURI())
|
||||
.path("/" + i)
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send(result ->
|
||||
{
|
||||
if (result.getResponse().getStatus() == HttpStatus.OK_200)
|
||||
clientLatch.countDown();
|
||||
});
|
||||
}
|
||||
assertTrue(clientLatch.await(count, TimeUnit.SECONDS));
|
||||
assertEquals(count, remotePorts.size());
|
||||
|
||||
Map<Integer, Long> results = remotePorts.stream()
|
||||
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
|
||||
assertEquals(count / maxUsage, results.size(), remotePorts.toString());
|
||||
assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue