Merged branch 'jetty-9.4.x' into 'master'.
This commit is contained in:
commit
01a5415796
|
@ -27,11 +27,13 @@ import org.eclipse.jetty.client.api.Destination;
|
|||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
@ManagedObject
|
||||
public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);
|
||||
|
|
|
@ -40,7 +40,7 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
|
||||
@ManagedObject("The connection pool")
|
||||
@ManagedObject
|
||||
public class DuplexConnectionPool extends AbstractConnectionPool implements Sweeper.Sweepable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(DuplexConnectionPool.class);
|
||||
|
|
|
@ -21,16 +21,21 @@ package org.eclipse.jetty.client;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Destination;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
|
||||
@ManagedObject
|
||||
public class RoundRobinConnectionPool extends AbstractConnectionPool
|
||||
{
|
||||
private final List<Entry> entries;
|
||||
private int index;
|
||||
private long liveTimeout;
|
||||
|
||||
public RoundRobinConnectionPool(Destination destination, int maxConnections, Callback requester)
|
||||
{
|
||||
|
@ -40,6 +45,17 @@ public class RoundRobinConnectionPool extends AbstractConnectionPool
|
|||
entries.add(new Entry());
|
||||
}
|
||||
|
||||
@ManagedAttribute("The timeout, in milliseconds, after which a live connection may be closed")
|
||||
public long getLiveTimeout()
|
||||
{
|
||||
return liveTimeout;
|
||||
}
|
||||
|
||||
public void setLiveTimeout(long liveTimeout)
|
||||
{
|
||||
this.liveTimeout = liveTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCreated(Connection connection)
|
||||
{
|
||||
|
@ -50,6 +66,7 @@ public class RoundRobinConnectionPool extends AbstractConnectionPool
|
|||
if (entry.connection == null)
|
||||
{
|
||||
entry.connection = connection;
|
||||
entry.createdTime = System.nanoTime();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -110,7 +127,8 @@ public class RoundRobinConnectionPool extends AbstractConnectionPool
|
|||
@Override
|
||||
public boolean release(Connection connection)
|
||||
{
|
||||
boolean released = false;
|
||||
boolean active = false;
|
||||
boolean removed = false;
|
||||
synchronized (this)
|
||||
{
|
||||
for (Entry entry : entries)
|
||||
|
@ -118,21 +136,38 @@ public class RoundRobinConnectionPool extends AbstractConnectionPool
|
|||
if (entry.connection == connection)
|
||||
{
|
||||
entry.active = false;
|
||||
released = true;
|
||||
active = true;
|
||||
long timeout = getLiveTimeout();
|
||||
if (timeout > 0)
|
||||
{
|
||||
long live = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - entry.createdTime);
|
||||
if (live >= timeout)
|
||||
{
|
||||
entry.reset();
|
||||
removed = true;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (released)
|
||||
if (active)
|
||||
{
|
||||
released(connection);
|
||||
if (removed)
|
||||
{
|
||||
removed(connection);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return idle(connection, isClosed());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Connection connection)
|
||||
{
|
||||
boolean removed = false;
|
||||
boolean active = false;
|
||||
boolean removed = false;
|
||||
synchronized (this)
|
||||
{
|
||||
for (Entry entry : entries)
|
||||
|
@ -140,9 +175,7 @@ public class RoundRobinConnectionPool extends AbstractConnectionPool
|
|||
if (entry.connection == connection)
|
||||
{
|
||||
active = entry.active;
|
||||
entry.connection = null;
|
||||
entry.active = false;
|
||||
entry.used = 0;
|
||||
entry.reset();
|
||||
removed = true;
|
||||
break;
|
||||
}
|
||||
|
@ -198,6 +231,15 @@ public class RoundRobinConnectionPool extends AbstractConnectionPool
|
|||
private Connection connection;
|
||||
private boolean active;
|
||||
private long used;
|
||||
private long createdTime;
|
||||
|
||||
private void reset()
|
||||
{
|
||||
connection = null;
|
||||
active = false;
|
||||
used = 0;
|
||||
createdTime = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class RoundRobinConnectionPoolTest extends AbstractHttpClientServerTest
|
||||
{
|
||||
public RoundRobinConnectionPoolTest(SslContextFactory sslContextFactory)
|
||||
{
|
||||
super(sslContextFactory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLiveTimeout() throws Exception
|
||||
{
|
||||
List<Integer> remotePorts = new ArrayList<>();
|
||||
start(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
remotePorts.add(request.getRemotePort());
|
||||
if (target.equals("/wait"))
|
||||
{
|
||||
long time = Long.parseLong(request.getParameter("time"));
|
||||
try
|
||||
{
|
||||
Thread.sleep(time);
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
long liveTimeout = 1000;
|
||||
client.getTransport().setConnectionPoolFactory(destination ->
|
||||
{
|
||||
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, 1, destination);
|
||||
pool.setLiveTimeout(liveTimeout);
|
||||
return pool;
|
||||
});
|
||||
|
||||
// Make a request to create the initial connection.
|
||||
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
|
||||
// Wait a bit.
|
||||
Thread.sleep(liveTimeout / 2);
|
||||
|
||||
// Live timeout will expire during this request.
|
||||
response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.path("/wait")
|
||||
.param("time", String.valueOf(liveTimeout))
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
|
||||
// Make another request to be sure another connection will be used.
|
||||
response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
|
||||
Assert.assertThat(remotePorts.size(), Matchers.equalTo(3));
|
||||
Assert.assertThat(remotePorts.get(0), Matchers.equalTo(remotePorts.get(1)));
|
||||
// Third request must be on a different connection.
|
||||
Assert.assertThat(remotePorts.get(1), Matchers.not(Matchers.equalTo(remotePorts.get(2))));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue