476170 - Support servers that close connections without sending Connection: close header.

Removed previous implementation in favor of a customized
ConnectionPool, that gives more flexibility on the actual logic to
validate connections.
This commit is contained in:
Simone Bordet 2015-09-01 15:44:30 +02:00
parent 60136c6825
commit b23c2bd309
10 changed files with 492 additions and 260 deletions

View File

@ -44,7 +44,7 @@ import org.eclipse.jetty.util.thread.Sweeper;
@ManagedObject("The connection pool")
public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
protected static final Logger LOG = Log.getLogger(ConnectionPool.class);
private static final Logger LOG = Log.getLogger(ConnectionPool.class);
private final AtomicInteger connectionCount = new AtomicInteger();
private final ReentrantLock lock = new ReentrantLock();
@ -129,7 +129,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
idleCreated(connection);
requester.succeeded();
proceed();
}
@Override
@ -150,11 +150,15 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
}
}
protected void proceed()
{
requester.succeeded();
}
protected void idleCreated(Connection connection)
{
boolean idle;
final ReentrantLock lock = this.lock;
lock.lock();
lock();
try
{
// Use "cold" new connections as last.
@ -162,7 +166,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
}
finally
{
lock.unlock();
unlock();
}
idle(connection, idle);
@ -172,8 +176,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
boolean acquired;
Connection connection;
final ReentrantLock lock = this.lock;
lock.lock();
lock();
try
{
connection = idleConnections.pollFirst();
@ -183,7 +186,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
}
finally
{
lock.unlock();
unlock();
}
if (acquired)
@ -209,24 +212,28 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
public boolean release(Connection connection)
{
boolean idle;
final ReentrantLock lock = this.lock;
lock.lock();
lock();
try
{
if (!activeConnections.remove(connection))
return false;
// Make sure we use "hot" connections first.
idle = idleConnections.offerFirst(connection);
idle = offerIdle(connection);
}
finally
{
lock.unlock();
unlock();
}
released(connection);
return idle(connection, idle);
}
protected boolean offerIdle(Connection connection)
{
return idleConnections.offerFirst(connection);
}
protected boolean idle(Connection connection, boolean idle)
{
if (idle)
@ -249,11 +256,15 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
}
public boolean remove(Connection connection)
{
return remove(connection, false);
}
protected boolean remove(Connection connection, boolean force)
{
boolean activeRemoved;
boolean idleRemoved;
final ReentrantLock lock = this.lock;
lock.lock();
lock();
try
{
activeRemoved = activeConnections.remove(connection);
@ -261,12 +272,12 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
}
finally
{
lock.unlock();
unlock();
}
if (activeRemoved)
released(connection);
boolean removed = activeRemoved || idleRemoved;
boolean removed = activeRemoved || idleRemoved || force;
if (removed)
{
int pooled = connectionCount.decrementAndGet();
@ -278,29 +289,27 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
public boolean isActive(Connection connection)
{
final ReentrantLock lock = this.lock;
lock.lock();
lock();
try
{
return activeConnections.contains(connection);
}
finally
{
lock.unlock();
unlock();
}
}
public boolean isIdle(Connection connection)
{
final ReentrantLock lock = this.lock;
lock.lock();
lock();
try
{
return idleConnections.contains(connection);
}
finally
{
lock.unlock();
unlock();
}
}
@ -313,8 +322,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
List<Connection> idles = new ArrayList<>();
List<Connection> actives = new ArrayList<>();
final ReentrantLock lock = this.lock;
lock.lock();
lock();
try
{
idles.addAll(idleConnections);
@ -324,7 +332,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
}
finally
{
lock.unlock();
unlock();
}
connectionCount.set(0);
@ -348,8 +356,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
{
List<Connection> actives = new ArrayList<>();
List<Connection> idles = new ArrayList<>();
final ReentrantLock lock = this.lock;
lock.lock();
lock();
try
{
actives.addAll(activeConnections);
@ -357,7 +364,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
}
finally
{
lock.unlock();
unlock();
}
ContainerLifeCycle.dumpObject(out, this);
@ -368,8 +375,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
public boolean sweep()
{
List<Sweeper.Sweepable> toSweep = new ArrayList<>();
final ReentrantLock lock = this.lock;
lock.lock();
lock();
try
{
for (Connection connection : getActiveConnections())
@ -380,7 +386,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
}
finally
{
lock.unlock();
unlock();
}
for (Sweeper.Sweepable candidate : toSweep)
@ -400,13 +406,22 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
return false;
}
protected void lock()
{
lock.lock();
}
protected void unlock()
{
lock.unlock();
}
@Override
public String toString()
{
int activeSize;
int idleSize;
final ReentrantLock lock = this.lock;
lock.lock();
lock();
try
{
activeSize = activeConnections.size();
@ -414,7 +429,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
}
finally
{
lock.unlock();
unlock();
}
return String.format("%s[c=%d/%d,a=%d,i=%d]",

View File

@ -144,7 +144,6 @@ public class HttpClient extends ContainerLifeCycle
private volatile HttpField encodingField;
private volatile boolean removeIdleDestinations = false;
private volatile boolean connectBlocking = false;
private volatile boolean validateConnections = false;
/**
* Creates a {@link HttpClient} instance that can perform requests to non-TLS destinations only
@ -1026,32 +1025,6 @@ public class HttpClient extends ContainerLifeCycle
this.connectBlocking = connectBlocking;
}
/**
* @return whether connections are validated before sending a request
* @see #setValidateConnections(boolean)
*/
public boolean isValidateConnections()
{
return validateConnections;
}
/**
* <p>Whether connections are validated before sending a request.</p>
* <p>This is a best-effort attempt to validate that the connection
* onto which requests are sent is valid before sending the request.</p>
* <p>The validation is performed only if the underlying transport
* implementation supports validating connections before their use
* (some transport implementation may not be able to perform such
* check).</p>
*
* @param validateConnections whether connections are validated before sending a request.
* @see #isValidateConnections()
*/
public void setValidateConnections(boolean validateConnections)
{
this.validateConnections = validateConnections;
}
/**
* @return the forward proxy configuration
*/

View File

@ -22,9 +22,13 @@ import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class LeakTrackingConnectionPool extends ConnectionPool
{
private static final Logger LOG = Log.getLogger(LeakTrackingConnectionPool.class);
private final LeakDetector<Connection> leakDetector = new LeakDetector<Connection>()
{
@Override

View File

@ -71,34 +71,22 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
{
if (getHttpExchanges().isEmpty())
return;
process();
}
@SuppressWarnings("unchecked")
public C acquire()
{
return (C)connectionPool.acquire();
}
private void process()
{
C connection = acquire();
if (connection != null)
process(connection);
}
public C acquire()
{
while (true)
{
@SuppressWarnings("unchecked")
C c = (C)connectionPool.acquire();
if (isValid(c))
return c;
else
c.close();
}
}
private boolean isValid(Connection c)
{
if (getHttpClient().isValidateConnections())
{
if (c instanceof Validateable)
return ((Validateable)c).validate();
}
return true;
}
/**
* <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
* <p>A new connection is created when a request needs to be executed; it is possible that the request that
@ -154,21 +142,21 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
@SuppressWarnings("unchecked")
C connection = (C)c;
if (LOG.isDebugEnabled())
LOG.debug("{} released", connection);
LOG.debug("Released {}", connection);
HttpClient client = getHttpClient();
if (client.isRunning())
{
if (connectionPool.isActive(connection))
{
if (isValid(connection))
process(connection);
if (connectionPool.release(connection))
send();
else
connection.close();
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{} explicit", connection);
LOG.debug("Released explicit {}", connection);
}
}
else
@ -204,9 +192,7 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries.
C newConnection = acquire();
if (newConnection != null)
process(newConnection);
process();
}
}

View File

@ -1,24 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
public interface Validateable
{
boolean validate();
}

View File

@ -0,0 +1,209 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* <p>A {@link ConnectionPool} that validates connections before
* making them available for use.</p>
* <p>Connections that have just been opened are not validated.
* Connections that are {@link #release(Connection) released} will
* be validated.</p>
* <p>Validation by reading from the EndPoint is not reliable,
* since the TCP FIN may arrive just after the validation read.</p>
* <p>This class validates connections by putting them in a
* "quarantine" for a configurable timeout, where they cannot
* be used to send requests. When the timeout expires, the
* quarantined connection is made idle and therefore available
* to send requests.</p>
* <p>The existing HttpClient mechanism to detect server closes
* will trigger and close quarantined connections, before they
* are made idle (and reusable) again.</p>
* <p>There still is a small chance that the timeout expires,
* the connection is made idle and available again, it is used
* to send a request exactly when the server decides to close.
* This case is however unavoidable and may be mitigated by
* tuning the idle timeout of the servers to be larger than
* that of the client.</p>
*/
public class ValidatingConnectionPool extends ConnectionPool
{
private static final Logger LOG = Log.getLogger(ValidatingConnectionPool.class);
private final Scheduler scheduler;
private final long timeout;
private final Map<Connection, Holder> quarantine;
public ValidatingConnectionPool(Destination destination, int maxConnections, Callback requester, Scheduler scheduler, long timeout)
{
super(destination, maxConnections, requester);
this.scheduler = scheduler;
this.timeout = timeout;
this.quarantine = new HashMap<>(maxConnections);
}
@ManagedAttribute(value = "The number of validating connections", readonly = true)
public int getValidatingConnectionCount()
{
return quarantine.size();
}
@Override
public boolean release(Connection connection)
{
lock();
try
{
if (!getActiveConnections().remove(connection))
return false;
Holder holder = new Holder(connection);
holder.task = scheduler.schedule(holder, timeout, TimeUnit.MILLISECONDS);
quarantine.put(connection, holder);
if (LOG.isDebugEnabled())
LOG.debug("Validating for {}ms {}", timeout, connection);
}
finally
{
unlock();
}
released(connection);
return true;
}
@Override
public boolean remove(Connection connection)
{
Holder holder;
lock();
try
{
holder = quarantine.remove(connection);
}
finally
{
unlock();
}
if (holder == null)
return super.remove(connection);
if (LOG.isDebugEnabled())
LOG.debug("Removed while validating {}", connection);
boolean cancelled = holder.cancel();
if (cancelled)
return remove(connection, true);
return super.remove(connection);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
super.dump(out, indent);
ContainerLifeCycle.dump(out, indent, quarantine.values());
}
@Override
public String toString()
{
int size;
lock();
try
{
size = quarantine.size();
}
finally
{
unlock();
}
return String.format("%s[v=%d]", super.toString(), size);
}
private class Holder implements Runnable
{
private final long timestamp = System.nanoTime();
private final AtomicBoolean latch = new AtomicBoolean();
private final Connection connection;
public Scheduler.Task task;
public Holder(Connection connection)
{
this.connection = connection;
}
@Override
public void run()
{
if (latch.compareAndSet(false, true))
{
boolean idle;
lock();
try
{
quarantine.remove(connection);
idle = offerIdle(connection);
if (LOG.isDebugEnabled())
LOG.debug("Validated {}", connection);
}
finally
{
unlock();
}
if (idle(connection, idle))
proceed();
}
}
public boolean cancel()
{
if (latch.compareAndSet(false, true))
{
task.cancel();
return true;
}
return false;
}
@Override
public String toString()
{
return String.format("%s[validationLeft=%dms]",
connection,
timeout - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - timestamp)
);
}
}
}

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.client.http;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -27,19 +26,17 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.Validateable;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper;
public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, Sweeper.Sweepable, Validateable
public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, Sweeper.Sweepable
{
private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
@ -144,10 +141,10 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
getHttpDestination().close(this);
getEndPoint().shutdownOutput();
if (LOG.isDebugEnabled())
LOG.debug("{} oshut", this);
LOG.debug("Shutdown {}", this);
getEndPoint().close();
if (LOG.isDebugEnabled())
LOG.debug("{} closed", this);
LOG.debug("Closed {}", this);
abort(failure);
}
@ -174,32 +171,6 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
return true;
}
@Override
public boolean validate()
{
ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool();
ByteBuffer buffer = byteBufferPool.acquire(1, true);
try
{
EndPoint endPoint = getEndPoint();
int filled = endPoint.fill(buffer);
if (LOG.isDebugEnabled())
LOG.debug("Validated {} {}", filled, this);
// Invalid if we read -1 or garbage bytes.
return filled == 0;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not validate connection " + this, x);
return false;
}
finally
{
byteBufferPool.release(buffer);
}
}
@Override
public String toString()
{

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.client;
import java.util.Arrays;
import java.util.Collection;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.NetworkConnector;
@ -84,10 +85,15 @@ public abstract class AbstractHttpClientServerTest
}
protected void startClient() throws Exception
{
startClient(new HttpClientTransportOverHTTP(1));
}
protected void startClient(HttpClientTransport transport) throws Exception
{
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client = new HttpClient(sslContextFactory);
client = new HttpClient(transport, sslContextFactory);
client.setExecutor(clientThreads);
client.start();
}

View File

@ -74,9 +74,7 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.TestingDir;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
@ -1541,116 +1539,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest
}
}
@Test
public void testServerClosesConnectionAfterRedirect() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (target.endsWith("/redirect"))
{
response.setStatus(HttpStatus.TEMPORARY_REDIRECT_307);
response.setContentLength(0);
response.setHeader(HttpHeader.LOCATION.asString(), scheme + "://localhost:" + connector.getLocalPort() + "/");
response.flushBuffer();
baseRequest.getHttpChannel().getEndPoint().shutdownOutput();
}
else
{
response.setStatus(HttpStatus.OK_200);
response.setContentLength(0);
response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString());
}
}
});
client.setValidateConnections(true);
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/redirect")
.send();
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnectionsWithConnectionCloseHeader() throws Exception
{
testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setStatus(HttpStatus.OK_200);
response.setContentLength(0);
response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString());
}
});
}
@Test
public void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnectionsWithoutConnectionCloseHeader() throws Exception
{
testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setStatus(HttpStatus.OK_200);
response.setContentLength(0);
response.flushBuffer();
baseRequest.getHttpChannel().getEndPoint().shutdownOutput();
}
});
}
private void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(Handler handler) throws Exception
{
start(handler);
client.setValidateConnections(true);
client.setMaxConnectionsPerDestination(1);
final CountDownLatch latch = new CountDownLatch(1);
Request request1 = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/one")
.onRequestBegin(r ->
{
try
{
latch.await();
}
catch (InterruptedException x)
{
r.abort(x);
}
});
FutureResponseListener listener1 = new FutureResponseListener(request1);
request1.send(listener1);
Request request2 = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/two");
FutureResponseListener listener2 = new FutureResponseListener(request2);
request2.send(listener2);
// Now we have one request about to be sent, and one queued.
latch.countDown();
ContentResponse response1 = listener1.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response1.getStatus());
ContentResponse response2 = listener2.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response2.getStatus());
}
private void consume(InputStream input, boolean eof) throws IOException
{
int crlfs = 0;

View File

@ -0,0 +1,204 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
public class ValidatingConnectionPoolTest extends AbstractHttpClientServerTest
{
public ValidatingConnectionPoolTest(SslContextFactory sslContextFactory)
{
super(sslContextFactory);
}
@Override
protected void startClient() throws Exception
{
startClient(new ValidatingHttpClientTransportOverHTTP(1000));
}
@Test
public void testRequestAfterValidation() throws Exception
{
start(new EmptyServerHandler());
client.setMaxConnectionsPerDestination(1);
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send();
Assert.assertEquals(200, response.getStatus());
// The second request should be sent after the validating timeout.
response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send();
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testServerClosesConnectionAfterRedirectWithoutConnectionCloseHeader() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (target.endsWith("/redirect"))
{
response.setStatus(HttpStatus.TEMPORARY_REDIRECT_307);
response.setContentLength(0);
response.setHeader(HttpHeader.LOCATION.asString(), scheme + "://localhost:" + connector.getLocalPort() + "/");
response.flushBuffer();
baseRequest.getHttpChannel().getEndPoint().shutdownOutput();
}
else
{
response.setStatus(HttpStatus.OK_200);
response.setContentLength(0);
response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString());
}
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/redirect")
.send();
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnectionsWithConnectionCloseHeader() throws Exception
{
testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setStatus(HttpStatus.OK_200);
response.setContentLength(0);
response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString());
}
});
}
@Test
public void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnectionsWithoutConnectionCloseHeader() throws Exception
{
testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setStatus(HttpStatus.OK_200);
response.setContentLength(0);
response.flushBuffer();
baseRequest.getHttpChannel().getEndPoint().shutdownOutput();
}
});
}
private void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(Handler handler) throws Exception
{
start(handler);
client.setMaxConnectionsPerDestination(1);
final CountDownLatch latch = new CountDownLatch(1);
Request request1 = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/one")
.onRequestBegin(r ->
{
try
{
latch.await();
}
catch (InterruptedException x)
{
r.abort(x);
}
});
FutureResponseListener listener1 = new FutureResponseListener(request1);
request1.send(listener1);
Request request2 = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/two");
FutureResponseListener listener2 = new FutureResponseListener(request2);
request2.send(listener2);
// Now we have one request about to be sent, and one queued.
latch.countDown();
ContentResponse response1 = listener1.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response1.getStatus());
ContentResponse response2 = listener2.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response2.getStatus());
}
private static class ValidatingHttpClientTransportOverHTTP extends HttpClientTransportOverHTTP
{
private final long timeout;
public ValidatingHttpClientTransportOverHTTP(long timeout)
{
super(1);
this.timeout = timeout;
}
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverHTTP(getHttpClient(), origin)
{
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new ValidatingConnectionPool(this, client.getMaxConnectionsPerDestination(), this, client.getScheduler(), timeout);
}
};
}
}
}