jetty-9 - HTTP client: implemented correctly execution of queued requests.

This commit is contained in:
Simone Bordet 2012-09-07 13:36:05 +02:00
parent 67670c22bd
commit 03a92fb966
5 changed files with 167 additions and 45 deletions

View File

@ -214,6 +214,11 @@ public class HttpClient extends AggregateLifeCycle
return newRequest(URI.create(address("http", host, port)));
}
public Request newRequest(String uri)
{
return newRequest(URI.create(uri));
}
public Request newRequest(URI uri)
{
return new HttpRequest(this, uri);

View File

@ -74,13 +74,13 @@ public class HttpConnection extends AbstractConnection implements Connection
@Override
protected boolean onReadTimeout()
{
LOG.debug("{} idle timeout", this);
HttpExchange exchange = this.exchange.get();
if (exchange != null)
idleTimeout();
// We will be closing the connection, so remove it
LOG.debug("Connection {} idle timeout", this);
destination.remove(this);
else
destination.remove(this);
return true;
}
@ -218,9 +218,12 @@ public class HttpConnection extends AbstractConnection implements Connection
}
else
{
destination.remove(this);
close();
throw new IllegalStateException();
// It is possible that the exchange has already been disassociated,
// for example if the connection idle timeouts: this will fail
// the response, but the request may still be under processing.
// Eventually the request will also fail as the connection is closed
// and will arrive here without an exchange being present.
// We just ignore this fact, as the exchange has already been processed
}
}
@ -228,7 +231,7 @@ public class HttpConnection extends AbstractConnection implements Connection
public void close()
{
super.close();
LOG.debug("Closed {}", this);
LOG.debug("{} closed", this);
}
@Override

View File

@ -193,26 +193,27 @@ public class HttpDestination implements Destination, AutoCloseable
if (next > maxConnections)
{
LOG.debug("Max connections reached {}: {}", this, current);
LOG.debug("Max connections {} reached for {}", current, this);
// Try again the idle connections
return idleConnections.poll();
}
if (connectionCount.compareAndSet(current, next))
{
LOG.debug("Creating connection {}/{} for {}", next, maxConnections, this);
newConnection(new Callback<Connection>()
{
@Override
public void completed(Connection connection)
{
LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, this);
LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this);
process(connection);
}
@Override
public void failed(Connection connection, final Throwable x)
{
LOG.debug("Connection failed {} for {}", x, this);
LOG.debug("Connection failed {} for {}", x, HttpDestination.this);
connectionCount.decrementAndGet();
client.getExecutor().execute(new Runnable()
{
@ -249,12 +250,18 @@ public class HttpDestination implements Destination, AutoCloseable
final RequestPair requestPair = requests.poll();
if (requestPair == null)
{
LOG.debug("Connection {} idle", connection);
LOG.debug("{} idle", connection);
idleConnections.offer(connection);
if (!client.isRunning())
{
LOG.debug("{} is stopping", client);
remove(connection);
connection.close();
}
}
else
{
LOG.debug("Connection {} active", connection);
LOG.debug("{} active", connection);
activeConnections.offer(connection);
client.getExecutor().execute(new Runnable()
{
@ -269,31 +276,36 @@ public class HttpDestination implements Destination, AutoCloseable
public void release(Connection connection)
{
LOG.debug("Connection {} released", connection);
LOG.debug("{} released", connection);
if (client.isRunning())
{
activeConnections.remove(connection);
idleConnections.offer(connection);
if (!client.isRunning())
{
LOG.debug("{} is stopping", client);
idleConnections.remove(connection);
connection.close();
}
process(connection);
}
else
{
LOG.debug("{} is stopped", client);
remove(connection);
connection.close();
}
}
public void remove(Connection connection)
{
LOG.debug("Connection {} removed", connection);
LOG.debug("{} removed", connection);
connectionCount.decrementAndGet();
activeConnections.remove(connection);
idleConnections.remove(connection);
// We need to executed queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries
if (!requests.isEmpty())
{
connection = acquire();
if (connection != null)
process(connection);
}
}
public void close()

View File

@ -23,24 +23,4 @@ public class HttpResponseException extends RuntimeException
public HttpResponseException()
{
}
public HttpResponseException(String message)
{
super(message);
}
public HttpResponseException(String message, Throwable cause)
{
super(message, cause);
}
public HttpResponseException(Throwable cause)
{
super(cause);
}
public HttpResponseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace)
{
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
@ -33,6 +34,7 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.Assert;
import org.junit.Test;
@ -51,7 +53,14 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertEquals(200, response.status());
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
HttpConnection connection = (HttpConnection)destination.getIdleConnections().peek();
long start = System.nanoTime();
HttpConnection connection = null;
while (connection == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
connection = (HttpConnection)destination.getIdleConnections().peek();
TimeUnit.MILLISECONDS.sleep(10);
}
Assert.assertNotNull(connection);
client.getCookieStore().addCookie(destination, new HttpCookie("foo", "bar", null, path));
@ -86,7 +95,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
}
@Test
public void test_GET_ResponseWithContent() throws Exception
public void test_GET_ResponseWithoutContent() throws Exception
{
start(new EmptyHandler());
@ -97,7 +106,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
}
@Test
public void test_GET_ResponseWithoutContent() throws Exception
public void test_GET_ResponseWithContent() throws Exception
{
final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
start(new AbstractHandler()
@ -185,4 +194,117 @@ public class HttpClientTest extends AbstractHttpClientServerTest
String content = new String(response.content(), "UTF-8");
Assert.assertEquals(value11 + value12 + value2, content);
}
@Test
public void test_QueuedRequest_IsSent_WhenPreviousRequestSucceeded() throws Exception
{
start(new EmptyHandler());
client.setMaxConnectionsPerAddress(1);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(2);
client.newRequest("http://localhost:" + connector.getLocalPort())
.listener(new org.eclipse.jetty.client.api.Request.Listener.Adapter()
{
@Override
public void onBegin(org.eclipse.jetty.client.api.Request request)
{
try
{
latch.await();
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
})
.send(new Response.Listener.Adapter()
{
@Override
public void onSuccess(Response response)
{
Assert.assertEquals(200, response.status());
successLatch.countDown();
}
});
client.newRequest("http://localhost:" + connector.getLocalPort())
.listener(new org.eclipse.jetty.client.api.Request.Listener.Adapter()
{
@Override
public void onQueued(org.eclipse.jetty.client.api.Request request)
{
latch.countDown();
}
})
.send(new Response.Listener.Adapter()
{
@Override
public void onSuccess(Response response)
{
Assert.assertEquals(200, response.status());
successLatch.countDown();
}
});
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Slow
@Test
public void test_QueuedRequest_IsSent_WhenPreviousRequestClosedConnection() throws Exception
{
start(new EmptyHandler());
client.setMaxConnectionsPerAddress(1);
final long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);
final CountDownLatch latch = new CountDownLatch(3);
client.newRequest("http://localhost:" + connector.getLocalPort())
.listener(new org.eclipse.jetty.client.api.Request.Listener.Adapter()
{
@Override
public void onBegin(org.eclipse.jetty.client.api.Request request)
{
try
{
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
@Override
public void onFailure(org.eclipse.jetty.client.api.Request request, Throwable failure)
{
latch.countDown();
}
})
.send(new Response.Listener.Adapter()
{
@Override
public void onFailure(Response response, Throwable failure)
{
latch.countDown();
}
});
client.newRequest("http://localhost:" + connector.getLocalPort())
.send(new Response.Listener.Adapter()
{
@Override
public void onSuccess(Response response)
{
Assert.assertEquals(200, response.status());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
}
}