Jetty9 - Second take at HTTP client implementation.

This commit is contained in:
Simone Bordet 2012-09-04 21:03:04 +02:00
parent b18ab0e76a
commit 250865f51d
7 changed files with 97 additions and 56 deletions

View File

@ -179,14 +179,7 @@ public class HttpClient extends AggregateLifeCycle
public Future<Response> GET(URI uri)
{
// TODO: Add decoder, cookies, agent, default headers, etc.
return newRequest(uri)
.method(HttpMethod.GET)
.version(HttpVersion.HTTP_1_1)
.agent(getUserAgent())
.idleTimeout(getIdleTimeout())
.followRedirects(isFollowRedirects())
.send();
return newRequest(uri).send();
}
public Request newRequest(String host, int port)
@ -196,12 +189,26 @@ public class HttpClient extends AggregateLifeCycle
public Request newRequest(URI uri)
{
return new HttpRequest(this, uri);
HttpRequest request = new HttpRequest(this, uri);
normalizeRequest(request);
return request;
}
protected Request newRequest(long id, URI uri)
{
return new HttpRequest(this, id, uri);
HttpRequest request = new HttpRequest(this, id, uri);
normalizeRequest(request);
return request;
}
protected void normalizeRequest(Request request)
{
// TODO: Add decoder, cookies, agent, default headers, etc.
request.method(HttpMethod.GET)
.version(HttpVersion.HTTP_1_1)
.agent(getUserAgent())
.idleTimeout(getIdleTimeout())
.followRedirects(isFollowRedirects());
}
private String address(String scheme, String host, int port)
@ -338,13 +345,13 @@ public class HttpClient extends AggregateLifeCycle
}
}
public HttpConversation conversationFor(Request request)
public HttpConversation conversationFor(Request request, Response.Listener listener)
{
long id = request.id();
HttpConversation conversation = conversations.get(id);
if (conversation == null)
{
conversation = new HttpConversation();
conversation = new HttpConversation(this, listener);
HttpConversation existing = conversations.putIfAbsent(id, conversation);
if (existing != null)
conversation = existing;
@ -352,6 +359,18 @@ public class HttpClient extends AggregateLifeCycle
return conversation;
}
public Response.Listener lookup(int status)
{
// TODO
switch (status)
{
case 303:
return new RedirectionListener(this);
}
return null;
}
protected class ClientSelectorManager extends SelectorManager
{
public ClientSelectorManager()

View File

@ -50,7 +50,7 @@ public class HttpConnection extends AbstractConnection implements Connection
public void send(Request request, Response.Listener listener)
{
normalizeRequest(request);
HttpConversation conversation = client.conversationFor(request);
HttpConversation conversation = client.conversationFor(request, listener);
this.conversation = conversation;
conversation.prepare(this, request, listener);
conversation.send();

View File

@ -5,6 +5,7 @@ import org.eclipse.jetty.client.api.Response;
public class HttpConversation
{
private final Response.Listener applicationListener;
private final HttpSender sender;
private final HttpReceiver receiver;
private HttpConnection connection;
@ -12,12 +13,18 @@ public class HttpConversation
private Response.Listener listener;
private HttpResponse response;
public HttpConversation()
public HttpConversation(HttpClient client, Response.Listener listener)
{
sender = new HttpSender();
applicationListener = listener;
sender = new HttpSender(client);
receiver = new HttpReceiver();
}
public Response.Listener applicationListener()
{
return applicationListener;
}
public void prepare(HttpConnection connection, Request request, Response.Listener listener)
{
if (this.connection != null)
@ -55,6 +62,11 @@ public class HttpConversation
return listener;
}
public void listener(Response.Listener listener)
{
this.listener = listener;
}
public HttpResponse response()
{
return response;

View File

@ -65,15 +65,17 @@ class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
@Override
public boolean startResponse(HttpVersion version, int status, String reason)
{
// Probe the protocol listeners
// HttpClient client = connection.getHttpClient();
// listener = client.find(status); // TODO
// listener = new RedirectionListener(connection);
// if (listener == null)
// listener = applicationListener;
HttpResponse response = conversation.response();
response.version(version).status(status).reason(reason);
// Probe the protocol listeners
HttpClient client = conversation.connection().getHttpClient();
Response.Listener listener = client.lookup(status);
if (listener != null)
conversation.listener(listener);
else
conversation.listener(conversation.applicationListener());
notifyBegin(conversation.listener(), response);
return false;
}
@ -108,6 +110,8 @@ class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
protected void success()
{
HttpConversation conversation = this.conversation;
this.conversation = null;
Response.Listener listener = conversation.listener();
Response response = conversation.response();
conversation.done();
@ -119,6 +123,7 @@ class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
Response.Listener listener = conversation.listener();
Response response = conversation.response();
conversation.done();
conversation = null;
notifyFailure(listener, response, failure);
}
@ -136,7 +141,7 @@ class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
fail(new HttpResponseException());
}
private void notifyBegin(Response.Listener listener, HttpResponse response)
private void notifyBegin(Response.Listener listener, Response response)
{
try
{
@ -149,7 +154,7 @@ class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
}
}
private void notifyHeaders(Response.Listener listener, HttpResponse response)
private void notifyHeaders(Response.Listener listener, Response response)
{
try
{
@ -162,7 +167,7 @@ class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
}
}
private void notifyContent(Response.Listener listener, HttpResponse response, ByteBuffer buffer)
private void notifyContent(Response.Listener listener, Response response, ByteBuffer buffer)
{
try
{

View File

@ -23,6 +23,7 @@ class HttpSender
private static final Logger LOG = Log.getLogger(HttpSender.class);
private final HttpGenerator generator = new HttpGenerator();
private final HttpClient client;
private HttpConversation conversation;
private long contentLength;
private Iterator<ByteBuffer> contentChunks;
@ -30,6 +31,11 @@ class HttpSender
private ByteBuffer chunk;
private boolean requestHeadersComplete;
HttpSender(HttpClient client)
{
this.client = client;
}
public void send(HttpConversation conversation)
{
this.conversation = conversation;
@ -45,7 +51,6 @@ class HttpSender
{
HttpConnection connection = conversation.connection();
EndPoint endPoint = connection.getEndPoint();
HttpClient client = connection.getHttpClient();
ByteBufferPool byteBufferPool = client.getByteBufferPool();
HttpGenerator.RequestInfo info = null;
ByteBuffer content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
@ -78,7 +83,7 @@ class HttpSender
@Override
protected void pendingCompleted()
{
notifyRequestHeadersComplete();
notifyRequestHeadersComplete(conversation.request());
send();
}
@ -103,7 +108,7 @@ class HttpSender
if (!requestHeadersComplete)
{
requestHeadersComplete = true;
notifyRequestHeadersComplete();
notifyRequestHeadersComplete(conversation.request());
}
releaseBuffers();
content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
@ -146,38 +151,43 @@ class HttpSender
protected void success()
{
notifyRequestSuccess();
Request request = conversation.request();
conversation = null;
generator.reset();
requestHeadersComplete = false;
// It is important to notify *after* we reset because
// the notification may trigger another request/response
notifyRequestSuccess(request);
}
protected void fail(Throwable x)
protected void fail(Throwable failure)
{
BufferUtil.clear(header);
BufferUtil.clear(chunk);
releaseBuffers();
notifyRequestFailure(x);
notifyResponseFailure(x);
conversation.connection().getEndPoint().shutdownOutput();
generator.abort();
notifyRequestFailure(conversation.request(), failure);
notifyResponseFailure(conversation.listener(), failure);
}
private void releaseBuffers()
{
ByteBufferPool byteBufferPool = conversation.connection().getHttpClient().getByteBufferPool();
ByteBufferPool bufferPool = client.getByteBufferPool();
if (!BufferUtil.hasContent(header))
{
byteBufferPool.release(header);
bufferPool.release(header);
header = null;
}
if (!BufferUtil.hasContent(chunk))
{
byteBufferPool.release(chunk);
bufferPool.release(chunk);
chunk = null;
}
}
private void notifyRequestHeadersComplete()
private void notifyRequestHeadersComplete(Request request)
{
Request request = conversation.request();
Request.Listener listener = request.listener();
try
{
@ -190,9 +200,8 @@ class HttpSender
}
}
private void notifyRequestSuccess()
private void notifyRequestSuccess(Request request)
{
Request request = conversation.request();
Request.Listener listener = request.listener();
try
{
@ -205,33 +214,30 @@ class HttpSender
}
}
private void notifyRequestFailure(Throwable x)
private void notifyRequestFailure(Request request, Throwable failure)
{
Request request = conversation.request();
Request.Listener listener = request.listener();
try
{
if (listener != null)
listener.onFailure(request, x);
listener.onFailure(request, failure);
}
catch (Exception xx)
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, xx);
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyResponseFailure(Throwable x)
private void notifyResponseFailure(Response.Listener listener, Throwable failure)
{
Response.Listener listener = conversation.listener();
try
{
if (listener != null)
listener.onFailure(null, x);
listener.onFailure(null, failure);
}
catch (Exception xx)
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, xx);
LOG.info("Exception while notifying listener " + listener, x);
}
}

View File

@ -7,11 +7,11 @@ import org.eclipse.jetty.client.api.Response;
public class RedirectionListener extends Response.Listener.Adapter
{
private final HttpConnection connection;
private final HttpClient client;
public RedirectionListener(HttpConnection connection)
public RedirectionListener(HttpClient client)
{
this.connection = connection;
this.client = client;
}
@Override
@ -27,8 +27,7 @@ public class RedirectionListener extends Response.Listener.Adapter
case 303: // use GET for next request
{
String location = response.headers().get("location");
HttpClient httpClient = connection.getHttpClient();
Request redirect = httpClient.newRequest(response.request().id(), URI.create(location));
Request redirect = client.newRequest(response.request().id(), URI.create(location));
redirect.send(this);
}
}

View File

@ -27,7 +27,7 @@ public class RedirectionTest extends AbstractHttpClientTest
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/303/done")
.send().get(5, TimeUnit.SECONDS);
.send().get(500, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));