Jetty9 - Sixth take at HTTP client implementation: all tests are passing.

This commit is contained in:
Simone Bordet 2012-09-05 19:43:42 +02:00
parent 9bd395835e
commit 4ec7ef2800
13 changed files with 347 additions and 81 deletions

View File

@ -51,30 +51,29 @@ public class HttpConnection extends AbstractConnection implements Connection
{
HttpExchange exchange = this.exchange.get();
if (exchange != null)
exchange.idleTimeout();
idleTimeout();
// We will be closing the connection, so remove it
LOG.debug("Connection {} idle timeout", this);
destination.remove(this);
return true;
}
protected void idleTimeout()
{
receiver.idleTimeout();
}
@Override
public void send(Request request, Response.Listener listener)
{
normalizeRequest(request);
HttpConversation conversation = client.conversationFor(request);
HttpExchange exchange = new HttpExchange(conversation, sender, receiver, request, listener);
if (this.exchange.compareAndSet(null, exchange))
{
conversation.add(exchange);
LOG.debug("{}({})", request, conversation);
exchange.send();
}
else
{
throw new UnsupportedOperationException("Pipelined requests not supported");
}
HttpExchange exchange = new HttpExchange(conversation, this, request, listener);
setExchange(exchange);
conversation.add(exchange);
sender.send(exchange);
}
private void normalizeRequest(Request request)
@ -135,16 +134,57 @@ public class HttpConnection extends AbstractConnection implements Connection
}
}
public HttpExchange getExchange()
{
return exchange.get();
}
protected void setExchange(HttpExchange exchange)
{
if (!this.exchange.compareAndSet(null, exchange))
throw new UnsupportedOperationException("Pipelined requests not supported");
else
LOG.debug("{} associated to {}", exchange, this);
}
@Override
public void onFillable()
{
HttpExchange exchange = this.exchange.get();
HttpExchange exchange = getExchange();
if (exchange != null)
exchange.receive();
else
throw new IllegalStateException();
}
protected void receive()
{
receiver.receive();
}
public void completed(HttpExchange exchange, boolean success)
{
if (this.exchange.compareAndSet(exchange, null))
{
LOG.debug("{} disassociated from {}", exchange, this);
if (success)
{
destination.release(this);
}
else
{
destination.remove(this);
close();
}
}
else
{
destination.remove(this);
close();
throw new IllegalStateException();
}
}
@Override
public String toString()
{

View File

@ -58,6 +58,11 @@ public class HttpDestination implements Destination
return idleConnections;
}
protected BlockingQueue<Connection> activeConnections()
{
return activeConnections;
}
@Override
public String scheme()
{
@ -201,6 +206,7 @@ public class HttpDestination implements Destination
}
else
{
LOG.debug("Connection {} active", connection);
activeConnections.offer(connection);
client.getExecutor().execute(new Runnable()
{
@ -215,12 +221,14 @@ public class HttpDestination implements Destination
public void release(Connection connection)
{
LOG.debug("Connection {} released", connection);
activeConnections.remove(connection);
idleConnections.offer(connection);
}
public void remove(Connection connection)
{
LOG.debug("Connection {} removed", connection);
connectionCount.decrementAndGet();
activeConnections.remove(connection);
idleConnections.remove(connection);

View File

@ -1,22 +1,27 @@
package org.eclipse.jetty.client;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
public class HttpExchange
{
private static final int REQUEST_SUCCESS = 1;
private static final int RESPONSE_SUCCESS = 2;
private static final int REQUEST_RESPONSE_SUCCESS = REQUEST_SUCCESS + RESPONSE_SUCCESS;
private final AtomicInteger done = new AtomicInteger();
private final HttpConversation conversation;
private final HttpSender sender;
private final HttpReceiver receiver;
private final HttpConnection connection;
private final Request request;
private final Response.Listener listener;
private final HttpResponse response;
public HttpExchange(HttpConversation conversation, HttpSender sender, HttpReceiver receiver, Request request, Response.Listener listener)
public HttpExchange(HttpConversation conversation, HttpConnection connection, Request request, Response.Listener listener)
{
this.conversation = conversation;
this.sender = sender;
this.receiver = receiver;
this.connection = connection;
this.request = request;
this.listener = listener;
this.response = new HttpResponse(request, listener);
@ -42,28 +47,39 @@ public class HttpExchange
return response;
}
public void send()
{
sender.send(this);
}
public void idleTimeout()
{
receiver.idleTimeout();
}
public void receive()
{
receiver.receive(this);
connection.receive();
}
public void requestDone(boolean success)
{
// TODO
done(success, REQUEST_SUCCESS);
}
public void responseDone(boolean success)
{
// TODO
done(success, RESPONSE_SUCCESS);
}
private void done(boolean success, int kind)
{
if (success)
{
if (done.addAndGet(kind) == REQUEST_RESPONSE_SUCCESS)
{
connection.completed(this, true);
}
}
else
{
connection.completed(this, false);
}
}
@Override
public String toString()
{
return String.format("%s@%x", HttpExchange.class.getSimpleName(), hashCode());
}
}

View File

@ -20,7 +20,6 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
private static final Logger LOG = Log.getLogger(HttpReceiver.class);
private final HttpParser parser = new HttpParser(this);
private final AtomicReference<HttpExchange> exchange = new AtomicReference<>();
private final AtomicReference<Response.Listener> listener = new AtomicReference<>();
private final HttpConnection connection;
private boolean failed;
@ -30,19 +29,18 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
this.connection = connection;
}
public void receive(HttpExchange exchange)
public void receive()
{
this.exchange.set(exchange);
EndPoint endPoint = connection.getEndPoint();
HttpClient client = connection.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
EndPoint endPoint = connection.getEndPoint();
try
{
while (true)
{
int read = endPoint.fill(buffer);
LOG.debug("Read {} bytes", read);
if (read > 0)
{
parser.parseNext(buffer);
@ -62,15 +60,18 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
catch (IOException x)
{
LOG.debug(x);
bufferPool.release(buffer);
fail(x);
}
finally
{
bufferPool.release(buffer);
}
}
@Override
public boolean startResponse(HttpVersion version, int status, String reason)
{
HttpExchange exchange = this.exchange.get();
HttpExchange exchange = connection.getExchange();
// Probe the protocol listeners
HttpClient client = connection.getHttpClient();
@ -81,7 +82,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
this.listener.set(listener);
response.version(version).status(status).reason(reason);
LOG.debug("{}", response);
LOG.debug("Receiving {}", response);
notifyBegin(listener, response);
return false;
@ -90,7 +91,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
@Override
public boolean parsedHeader(HttpHeader header, String name, String value)
{
HttpExchange exchange = this.exchange.get();
HttpExchange exchange = connection.getExchange();
exchange.response().headers().put(name, value);
return false;
}
@ -98,16 +99,20 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
@Override
public boolean headerComplete()
{
HttpExchange exchange = this.exchange.get();
notifyHeaders(listener.get(), exchange.response());
HttpExchange exchange = connection.getExchange();
HttpResponse response = exchange.response();
LOG.debug("Headers {}", response);
notifyHeaders(listener.get(), response);
return false;
}
@Override
public boolean content(ByteBuffer buffer)
{
HttpExchange exchange = this.exchange.get();
notifyContent(listener.get(), exchange.response(), buffer);
HttpExchange exchange = connection.getExchange();
HttpResponse response = exchange.response();
LOG.debug("Content {}: {} bytes", response, buffer.remaining());
notifyContent(listener.get(), response, buffer);
return false;
}
@ -116,22 +121,39 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
{
if (!failed)
success();
return false;
return true;
}
protected void success()
{
HttpExchange exchange = this.exchange.getAndSet(null);
HttpExchange exchange = connection.getExchange();
HttpResponse response = exchange.response();
LOG.debug("Received {}", response);
parser.reset();
exchange.responseDone(true);
notifySuccess(listener.get(), exchange.response());
notifySuccess(listener.get(), response);
}
protected void fail(Throwable failure)
{
HttpExchange exchange = connection.getExchange();
// In case of a response error, the failure has already been notified
// and it is possible that a further attempt to read in the receive
// loop throws an exception that reenters here but without exchange
if (exchange == null)
return;
HttpResponse response = exchange.response();
LOG.debug("Failed {} {}", response, failure);
failed = true;
HttpExchange exchange = this.exchange.getAndSet(null);
parser.reset();
exchange.responseDone(false);
notifyFailure(listener.get(), exchange.response(), failure);
notifyFailure(listener.get(), response, failure);
}
@Override
@ -144,7 +166,8 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
@Override
public void badMessage(int status, String reason)
{
exchange.get().response().status(status).reason(reason);
HttpExchange exchange = connection.getExchange();
exchange.response().status(status).reason(reason);
fail(new HttpResponseException());
}

View File

@ -161,7 +161,10 @@ public class HttpRequest implements Request
@Override
public Request header(String name, String value)
{
headers.add(name, value);
if (value == null)
headers.remove(name);
else
headers.add(name, value);
return this;
}

View File

@ -42,6 +42,8 @@ public class HttpSender
{
if (this.exchange.compareAndSet(null, exchange))
{
LOG.debug("Sending {}", exchange.request());
notifyRequestBegin(exchange.request());
ContentProvider content = exchange.request().content();
this.contentLength = content == null ? -1 : content.length();
this.contentChunks = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
@ -91,7 +93,7 @@ public class HttpSender
@Override
protected void pendingCompleted()
{
notifyRequestHeadersComplete(request);
notifyRequestHeaders(request);
send();
}
@ -114,7 +116,7 @@ public class HttpSender
if (!headersComplete)
{
headersComplete = true;
notifyRequestHeadersComplete(request);
notifyRequestHeaders(request);
}
releaseBuffers();
content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
@ -163,7 +165,7 @@ public class HttpSender
// Notify after
HttpExchange exchange = this.exchange.getAndSet(null);
LOG.debug("{} succeeded", exchange.request());
LOG.debug("Sent {}", exchange.request());
exchange.requestDone(true);
// It is important to notify *after* we reset because
@ -183,6 +185,7 @@ public class HttpSender
// Notify after
HttpExchange exchange = this.exchange.getAndSet(null);
LOG.debug("Failed {}", exchange.request());
exchange.requestDone(false);
notifyRequestFailure(exchange.request(), failure);
@ -204,7 +207,21 @@ public class HttpSender
}
}
private void notifyRequestHeadersComplete(Request request)
private void notifyRequestBegin(Request request)
{
Request.Listener listener = request.listener();
try
{
if (listener != null)
listener.onBegin(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyRequestHeaders(Request request)
{
Request.Listener listener = request.listener();
try

View File

@ -16,4 +16,7 @@ package org.eclipse.jetty.client.api;
public interface Connection extends AutoCloseable
{
void send(Request request, Response.Listener listener);
@Override
void close();
}

View File

@ -0,0 +1,153 @@
package org.eclipse.jetty.client;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpHeader;
import org.junit.Assert;
import org.junit.Test;
public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
{
@Test
public void test_SuccessfulRequest_ReturnsConnection() throws Exception
{
start(new EmptyHandler());
String scheme = "http";
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
final BlockingQueue<Connection> idleConnections = destination.idleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.activeConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
client.newRequest(host, port).send(new Response.Listener.Adapter()
{
@Override
public void onHeaders(Response response)
{
Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(1, activeConnections.size());
headersLatch.countDown();
}
@Override
public void onSuccess(Response response)
{
Assert.assertEquals(1, idleConnections.size());
Assert.assertEquals(0, activeConnections.size());
successLatch.countDown();
}
});
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(1, idleConnections.size());
Assert.assertEquals(0, activeConnections.size());
}
@Test
public void test_FailedRequest_RemovesConnection() throws Exception
{
start(new EmptyHandler());
String scheme = "http";
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
final BlockingQueue<Connection> idleConnections = destination.idleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.activeConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch failureLatch = new CountDownLatch(2);
client.newRequest(host, port).listener(new Request.Listener.Adapter()
{
@Override
public void onBegin(Request request)
{
activeConnections.peek().close();
headersLatch.countDown();
}
@Override
public void onFailure(Request request, Throwable failure)
{
failureLatch.countDown();
}
}).send(new Response.Listener.Adapter()
{
@Override
public void onFailure(Response response, Throwable failure)
{
Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(0, activeConnections.size());
failureLatch.countDown();
}
});
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(0, activeConnections.size());
}
@Test
public void test_BadRequest_ReturnsConnection() throws Exception
{
start(new EmptyHandler());
String scheme = "http";
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
final BlockingQueue<Connection> idleConnections = destination.idleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.activeConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch successLatch = new CountDownLatch(1);
client.newRequest(host, port)
.listener(new Request.Listener.Adapter()
{
@Override
public void onBegin(Request request)
{
// Remove the host header, this will make the request invalid
request.header(HttpHeader.HOST.asString(), null);
}
})
.send(new Response.Listener.Adapter()
{
@Override
public void onSuccess(Response response)
{
Assert.assertEquals(1, idleConnections.size());
Assert.assertEquals(0, activeConnections.size());
successLatch.countDown();
}
});
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(1, idleConnections.size());
Assert.assertEquals(0, activeConnections.size());
}
}

View File

@ -139,5 +139,4 @@ public class HttpDestinationTest extends AbstractHttpClientServerTest
connection1 = destination.idleConnections().poll();
Assert.assertNull(connection1);
}
}

View File

@ -23,8 +23,6 @@ public class HttpReceiverTest
private HttpDestination destination;
private ByteArrayEndPoint endPoint;
private HttpConnection connection;
private HttpSender sender;
private HttpReceiver receiver;
private HttpConversation conversation;
@Before
@ -35,8 +33,6 @@ public class HttpReceiverTest
destination = new HttpDestination(client, "http", "localhost", 8080);
endPoint = new ByteArrayEndPoint();
connection = new HttpConnection(client, endPoint, destination);
sender = new HttpSender(connection);
receiver = new HttpReceiver(connection);
conversation = new HttpConversation(client, 1);
}
@ -48,14 +44,19 @@ public class HttpReceiverTest
protected HttpExchange newExchange(Response.Listener listener)
{
HttpExchange exchange = new HttpExchange(conversation, sender, receiver, null, listener);
HttpExchange exchange = new HttpExchange(conversation, connection, null, listener);
conversation.add(exchange);
connection.setExchange(exchange);
return exchange;
}
@Test
public void test_Receive_NoResponseContent() throws Exception
{
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: 0\r\n" +
"\r\n");
final AtomicReference<Response> responseRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
HttpExchange exchange = newExchange(new Response.Listener.Adapter()
@ -67,12 +68,7 @@ public class HttpReceiverTest
latch.countDown();
}
});
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: 0\r\n" +
"\r\n");
receiver.receive(exchange);
exchange.receive();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Response response = responseRef.get();
@ -97,7 +93,7 @@ public class HttpReceiverTest
content);
BufferingResponseListener listener = new BufferingResponseListener();
HttpExchange exchange = newExchange(listener);
receiver.receive(exchange);
exchange.receive();
Response response = listener.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
@ -124,9 +120,9 @@ public class HttpReceiverTest
content1);
BufferingResponseListener listener = new BufferingResponseListener();
HttpExchange exchange = newExchange(listener);
receiver.receive(exchange);
exchange.receive();
endPoint.setInputEOF();
receiver.receive(exchange);
exchange.receive();
try
{
@ -148,9 +144,9 @@ public class HttpReceiverTest
"\r\n");
BufferingResponseListener listener = new BufferingResponseListener();
HttpExchange exchange = newExchange(listener);
receiver.receive(exchange);
exchange.receive();
// Simulate an idle timeout
receiver.idleTimeout();
connection.idleTimeout();
try
{
@ -172,7 +168,7 @@ public class HttpReceiverTest
"\r\n");
BufferingResponseListener listener = new BufferingResponseListener();
HttpExchange exchange = newExchange(listener);
receiver.receive(exchange);
exchange.receive();
try
{

View File

@ -35,7 +35,8 @@ public class HttpSenderTest
public void test_Send_NoRequestContent() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint, null);
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
@ -67,7 +68,8 @@ public class HttpSenderTest
public void test_Send_NoRequestContent_IncompleteFlush() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpConnection connection = new HttpConnection(client, endPoint, null);
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
connection.send(request, null);
@ -95,7 +97,8 @@ public class HttpSenderTest
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// Shutdown output to trigger the exception on write
endPoint.shutdownOutput();
HttpConnection connection = new HttpConnection(client, endPoint, null);
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
request.listener(new Request.Listener.Adapter()
@ -122,7 +125,8 @@ public class HttpSenderTest
public void test_Send_NoRequestContent_IncompleteFlush_Exception() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpConnection connection = new HttpConnection(client, endPoint, null);
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
request.listener(new Request.Listener.Adapter()
@ -155,7 +159,8 @@ public class HttpSenderTest
public void test_Send_SmallRequestContent_InOneBuffer() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint, null);
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
String content = "abcdef";
request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes("UTF-8"))));
@ -188,7 +193,8 @@ public class HttpSenderTest
public void test_Send_SmallRequestContent_InTwoBuffers() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint, null);
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "abcdef";
@ -222,7 +228,8 @@ public class HttpSenderTest
public void test_Send_SmallRequestContent_Chunked_InTwoChunks() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint, null);
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "ABCDEF";

View File

@ -38,7 +38,7 @@ public class RedirectionTest extends AbstractHttpClientServerTest
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/303/302/done")
.send().get(5, TimeUnit.SECONDS);
.send().get(500, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));

View File

@ -1146,6 +1146,7 @@ public class HttpParser
// was this unexpected?
switch(_state)
{
case START:
case END:
break;