jetty-9 - HTTP client: implemented correct shutdown.

This commit is contained in:
Simone Bordet 2012-09-07 00:01:27 +02:00
parent 8b7e1463a1
commit 1da6b61854
10 changed files with 155 additions and 41 deletions

View File

@ -93,7 +93,7 @@ public class HttpClient extends AggregateLifeCycle
{ {
private static final Logger LOG = Log.getLogger(HttpClient.class); private static final Logger LOG = Log.getLogger(HttpClient.class);
private final ConcurrentMap<String, Destination> destinations = new ConcurrentHashMap<>(); private final ConcurrentMap<String, HttpDestination> destinations = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, HttpConversation> conversations = new ConcurrentHashMap<>(); private final ConcurrentMap<Long, HttpConversation> conversations = new ConcurrentHashMap<>();
private final CookieStore cookieStore = new HttpCookieStore(); private final CookieStore cookieStore = new HttpCookieStore();
private volatile Executor executor; private volatile Executor executor;
@ -153,6 +153,15 @@ public class HttpClient extends AggregateLifeCycle
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
LOG.debug("Stopping {}", this); LOG.debug("Stopping {}", this);
for (HttpDestination destination : destinations.values())
destination.close();
destinations.clear();
conversations.clear();
cookieStore.clear();
super.doStop(); super.doStop();
LOG.info("Stopped {}", this); LOG.info("Stopped {}", this);
} }
@ -223,22 +232,28 @@ public class HttpClient extends AggregateLifeCycle
public Destination getDestination(String scheme, String host, int port) public Destination getDestination(String scheme, String host, int port)
{ {
String address = address(scheme, host, port); String address = address(scheme, host, port);
Destination destination = destinations.get(address); HttpDestination destination = destinations.get(address);
if (destination == null) if (destination == null)
{ {
destination = new HttpDestination(this, scheme, host, port); destination = new HttpDestination(this, scheme, host, port);
Destination existing = destinations.putIfAbsent(address, destination); if (isRunning())
{
HttpDestination existing = destinations.putIfAbsent(address, destination);
if (existing != null) if (existing != null)
destination = existing; destination = existing;
else else
LOG.debug("Created {}", destination); LOG.debug("Created {}", destination);
if (!isRunning())
destinations.remove(address);
}
} }
return destination; return destination;
} }
public List<Destination> getDestinations() public List<Destination> getDestinations()
{ {
return new ArrayList<>(destinations.values()); return new ArrayList<Destination>(destinations.values());
} }
public String getUserAgent() public String getUserAgent()
@ -376,7 +391,6 @@ public class HttpClient extends AggregateLifeCycle
{ {
conversations.remove(conversation.id()); conversations.remove(conversation.id());
LOG.debug("Removed {}", conversation); LOG.debug("Removed {}", conversation);
} }
public Response.Listener lookup(int status) public Response.Listener lookup(int status)

View File

@ -224,6 +224,13 @@ public class HttpConnection extends AbstractConnection implements Connection
} }
} }
@Override
public void close()
{
super.close();
LOG.debug("Closed {}", this);
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -21,8 +21,9 @@ package org.eclipse.jetty.client;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.client.api.CookieStore; import org.eclipse.jetty.client.api.CookieStore;
@ -31,7 +32,7 @@ import org.eclipse.jetty.http.HttpCookie;
public class HttpCookieStore implements CookieStore public class HttpCookieStore implements CookieStore
{ {
private final ConcurrentMap<String, Map<String, HttpCookie>> allCookies = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Queue<HttpCookie>> allCookies = new ConcurrentHashMap<>();
@Override @Override
public List<HttpCookie> getCookies(Destination destination, String path) public List<HttpCookie> getCookies(Destination destination, String path)
@ -43,7 +44,7 @@ public class HttpCookieStore implements CookieStore
String key = host + ":" + port + path; String key = host + ":" + port + path;
// First lookup: direct hit // First lookup: direct hit
Map<String, HttpCookie> cookies = allCookies.get(key); Queue<HttpCookie> cookies = allCookies.get(key);
if (cookies != null) if (cookies != null)
accumulateCookies(destination, cookies, result); accumulateCookies(destination, cookies, result);
@ -71,9 +72,9 @@ public class HttpCookieStore implements CookieStore
return result; return result;
} }
private void accumulateCookies(Destination destination, Map<String, HttpCookie> cookies, List<HttpCookie> result) private void accumulateCookies(Destination destination, Queue<HttpCookie> cookies, List<HttpCookie> result)
{ {
for (Iterator<HttpCookie> iterator = cookies.values().iterator(); iterator.hasNext(); ) for (Iterator<HttpCookie> iterator = cookies.iterator(); iterator.hasNext(); )
{ {
HttpCookie cookie = iterator.next(); HttpCookie cookie = iterator.next();
if (cookie.isExpired(System.nanoTime())) if (cookie.isExpired(System.nanoTime()))
@ -113,15 +114,21 @@ public class HttpCookieStore implements CookieStore
path = "/"; path = "/";
String key = destination.host() + ":" + destination.port() + path; String key = destination.host() + ":" + destination.port() + path;
Map<String, HttpCookie> cookies = allCookies.get(key); Queue<HttpCookie> cookies = allCookies.get(key);
if (cookies == null) if (cookies == null)
{ {
cookies = new ConcurrentHashMap<>(); cookies = new ConcurrentLinkedQueue<>();
Map<String, HttpCookie> existing = allCookies.putIfAbsent(key, cookies); Queue<HttpCookie> existing = allCookies.putIfAbsent(key, cookies);
if (existing != null) if (existing != null)
cookies = existing; cookies = existing;
} }
cookies.put(path, cookie); cookies.add(cookie);
return true; return true;
} }
@Override
public void clear()
{
allCookies.clear();
}
} }

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.nio.channels.AsynchronousCloseException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -34,7 +35,7 @@ import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
public class HttpDestination implements Destination public class HttpDestination implements Destination, AutoCloseable
{ {
private static final Logger LOG = Log.getLogger(HttpDestination.class); private static final Logger LOG = Log.getLogger(HttpDestination.class);
@ -58,12 +59,12 @@ public class HttpDestination implements Destination
this.activeConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress()); this.activeConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
} }
protected BlockingQueue<Connection> idleConnections() protected BlockingQueue<Connection> getIdleConnections()
{ {
return idleConnections; return idleConnections;
} }
protected BlockingQueue<Connection> activeConnections() protected BlockingQueue<Connection> getActiveConnections()
{ {
return activeConnections; return activeConnections;
} }
@ -103,7 +104,7 @@ public class HttpDestination implements Destination
{ {
if (!client.isRunning() && requests.remove(requestPair)) if (!client.isRunning() && requests.remove(requestPair))
{ {
throw new RejectedExecutionException(HttpClient.class.getSimpleName() + " is stopping"); throw new RejectedExecutionException(client + " is stopping");
} }
else else
{ {
@ -121,7 +122,7 @@ public class HttpDestination implements Destination
} }
else else
{ {
throw new RejectedExecutionException(HttpClient.class.getSimpleName() + " is stopped"); throw new RejectedExecutionException(client + " is stopped");
} }
} }
@ -269,8 +270,22 @@ public class HttpDestination implements Destination
public void release(Connection connection) public void release(Connection connection)
{ {
LOG.debug("Connection {} released", connection); LOG.debug("Connection {} released", connection);
if (client.isRunning())
{
activeConnections.remove(connection); activeConnections.remove(connection);
idleConnections.offer(connection); idleConnections.offer(connection);
if (!client.isRunning())
{
LOG.debug("{} is stopping", client);
idleConnections.remove(connection);
connection.close();
}
}
else
{
LOG.debug("{} is stopped", client);
connection.close();
}
} }
public void remove(Connection connection) public void remove(Connection connection)
@ -281,6 +296,30 @@ public class HttpDestination implements Destination
idleConnections.remove(connection); idleConnections.remove(connection);
} }
public void close()
{
for (Connection connection : idleConnections)
connection.close();
idleConnections.clear();
// A bit drastic, but we cannot wait for all requests to complete
for (Connection connection : activeConnections)
connection.close();
activeConnections.clear();
AsynchronousCloseException failure = new AsynchronousCloseException();
RequestPair pair;
while ((pair = requests.poll()) != null)
{
notifyRequestFailure(pair.request, failure);
notifyResponseFailure(pair.listener, failure);
}
connectionCount.set(0);
LOG.debug("Closed {}", this);
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -78,6 +79,11 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
} }
} }
} }
catch (EofException x)
{
LOG.ignore(x);
fail(x);
}
catch (IOException x) catch (IOException x)
{ {
LOG.debug(x); LOG.debug(x);

View File

@ -27,4 +27,6 @@ public interface CookieStore
List<HttpCookie> getCookies(Destination destination, String path); List<HttpCookie> getCookies(Destination destination, String path);
boolean addCookie(Destination destination, HttpCookie cookie); boolean addCookie(Destination destination, HttpCookie cookie);
void clear();
} }

View File

@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert; import org.junit.Assert;
@ -37,6 +38,33 @@ import org.junit.Test;
public class HttpClientTest extends AbstractHttpClientServerTest public class HttpClientTest extends AbstractHttpClientServerTest
{ {
@Test
public void testStoppingClosesConnections() throws Exception
{
start(new EmptyHandler());
String scheme = "http";
String host = "localhost";
int port = connector.getLocalPort();
String path = "/";
Response response = client.GET(scheme + "://" + host + ":" + port + path).get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.status());
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
HttpConnection connection = (HttpConnection)destination.getIdleConnections().peek();
Assert.assertNotNull(connection);
client.getCookieStore().addCookie(destination, new HttpCookie("foo", "bar", null, path));
client.stop();
Assert.assertEquals(0, client.getDestinations().size());
Assert.assertEquals(0, destination.getIdleConnections().size());
Assert.assertEquals(0, destination.getActiveConnections().size());
Assert.assertEquals(0, client.getCookieStore().getCookies(destination, path).size());
Assert.assertFalse(connection.getEndPoint().isOpen());
}
@Test @Test
public void test_DestinationCount() throws Exception public void test_DestinationCount() throws Exception
{ {

View File

@ -41,10 +41,10 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
int port = connector.getLocalPort(); int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port); HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
final BlockingQueue<Connection> idleConnections = destination.idleConnections(); final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
Assert.assertEquals(0, idleConnections.size()); Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.activeConnections(); final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
Assert.assertEquals(0, activeConnections.size()); Assert.assertEquals(0, activeConnections.size());
final CountDownLatch headersLatch = new CountDownLatch(1); final CountDownLatch headersLatch = new CountDownLatch(1);
@ -92,10 +92,10 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
int port = connector.getLocalPort(); int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port); HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
final BlockingQueue<Connection> idleConnections = destination.idleConnections(); final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
Assert.assertEquals(0, idleConnections.size()); Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.activeConnections(); final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
Assert.assertEquals(0, activeConnections.size()); Assert.assertEquals(0, activeConnections.size());
final CountDownLatch headersLatch = new CountDownLatch(1); final CountDownLatch headersLatch = new CountDownLatch(1);
@ -142,10 +142,10 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
int port = connector.getLocalPort(); int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port); HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
final BlockingQueue<Connection> idleConnections = destination.idleConnections(); final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
Assert.assertEquals(0, idleConnections.size()); Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.activeConnections(); final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
Assert.assertEquals(0, activeConnections.size()); Assert.assertEquals(0, activeConnections.size());
final CountDownLatch successLatch = new CountDownLatch(2); final CountDownLatch successLatch = new CountDownLatch(2);
@ -190,10 +190,10 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
int port = connector.getLocalPort(); int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port); HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
final BlockingQueue<Connection> idleConnections = destination.idleConnections(); final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
Assert.assertEquals(0, idleConnections.size()); Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.activeConnections(); final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
Assert.assertEquals(0, activeConnections.size()); Assert.assertEquals(0, activeConnections.size());
server.stop(); server.stop();

View File

@ -130,10 +130,21 @@ public class HttpCookieStoreTest
{ {
CookieStore cookies = new HttpCookieStore(); CookieStore cookies = new HttpCookieStore();
Destination destination = new HttpDestination(client, "http", "localhost.org", 80); Destination destination = new HttpDestination(client, "http", "localhost.org", 80);
Assert.assertTrue(cookies.addCookie(destination, new HttpCookie("a", "1", null, "/", 0, false, true))); Assert.assertTrue(cookies.addCookie(destination, new HttpCookie("a", "1", null, "/", -1, false, true)));
List<HttpCookie> result = cookies.getCookies(destination, "/"); List<HttpCookie> result = cookies.getCookies(destination, "/");
Assert.assertNotNull(result); Assert.assertNotNull(result);
Assert.assertEquals(0, result.size()); Assert.assertEquals(0, result.size());
} }
@Test
public void testCookiesAreCleared() throws Exception
{
CookieStore cookies = new HttpCookieStore();
Destination destination = new HttpDestination(client, "http", "localhost.org", 80);
Assert.assertTrue(cookies.addCookie(destination, new HttpCookie("a", "1", null, "/", -1, false, true)));
cookies.clear();
Assert.assertEquals(0, cookies.getCookies(destination, "/").size());
}
} }

View File

@ -43,7 +43,7 @@ public class HttpDestinationTest extends AbstractHttpClientServerTest
if (connection == null) if (connection == null)
{ {
// There are no queued requests, so the newly created connection will be idle // There are no queued requests, so the newly created connection will be idle
connection = destination.idleConnections().poll(5, TimeUnit.SECONDS); connection = destination.getIdleConnections().poll(5, TimeUnit.SECONDS);
} }
Assert.assertNotNull(connection); Assert.assertNotNull(connection);
} }
@ -59,7 +59,7 @@ public class HttpDestinationTest extends AbstractHttpClientServerTest
long start = System.nanoTime(); long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{ {
connection1 = destination.idleConnections().peek(); connection1 = destination.getIdleConnections().peek();
TimeUnit.MILLISECONDS.sleep(50); TimeUnit.MILLISECONDS.sleep(50);
} }
Assert.assertNotNull(connection1); Assert.assertNotNull(connection1);
@ -101,9 +101,9 @@ public class HttpDestinationTest extends AbstractHttpClientServerTest
latch.countDown(); latch.countDown();
// There must be 2 idle connections // There must be 2 idle connections
Connection connection = destination.idleConnections().poll(5, TimeUnit.SECONDS); Connection connection = destination.getIdleConnections().poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(connection); Assert.assertNotNull(connection);
connection = destination.idleConnections().poll(5, TimeUnit.SECONDS); connection = destination.getIdleConnections().poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(connection); Assert.assertNotNull(connection);
} }
@ -113,7 +113,7 @@ public class HttpDestinationTest extends AbstractHttpClientServerTest
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort()); HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
Connection connection1 = destination.acquire(); Connection connection1 = destination.acquire();
if (connection1 == null) if (connection1 == null)
connection1 = destination.idleConnections().poll(5, TimeUnit.SECONDS); connection1 = destination.getIdleConnections().poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(connection1); Assert.assertNotNull(connection1);
destination.release(connection1); destination.release(connection1);
@ -137,14 +137,14 @@ public class HttpDestinationTest extends AbstractHttpClientServerTest
long start = System.nanoTime(); long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{ {
connection1 = destination.idleConnections().peek(); connection1 = destination.getIdleConnections().peek();
TimeUnit.MILLISECONDS.sleep(50); TimeUnit.MILLISECONDS.sleep(50);
} }
Assert.assertNotNull(connection1); Assert.assertNotNull(connection1);
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout); TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
connection1 = destination.idleConnections().poll(); connection1 = destination.getIdleConnections().poll();
Assert.assertNull(connection1); Assert.assertNull(connection1);
} }
} }