Jetty9 - Eight take at HTTP client implementation: supporting conversation over multiple destinations.

This commit is contained in:
Simone Bordet 2012-09-05 22:53:20 +02:00
parent b4115c0101
commit c8b76fa759
7 changed files with 76 additions and 55 deletions

View File

@ -344,7 +344,7 @@ public class HttpClient extends AggregateLifeCycle
}
}
public HttpConversation conversationFor(Request request)
public HttpConversation getConversation(Request request)
{
long id = request.id();
HttpConversation conversation = conversations.get(id);
@ -354,10 +354,19 @@ public class HttpClient extends AggregateLifeCycle
HttpConversation existing = conversations.putIfAbsent(id, conversation);
if (existing != null)
conversation = existing;
else
LOG.debug("Created {}", conversation);
}
return conversation;
}
public void removeConversation(HttpConversation conversation)
{
conversations.remove(conversation.id());
LOG.debug("Removed {}", conversation);
}
public Response.Listener lookup(int status)
{
// TODO

View File

@ -69,7 +69,7 @@ public class HttpConnection extends AbstractConnection implements Connection
public void send(Request request, Response.Listener listener)
{
normalizeRequest(request);
HttpConversation conversation = client.conversationFor(request);
HttpConversation conversation = client.getConversation(request);
HttpExchange exchange = new HttpExchange(conversation, this, request, listener);
setExchange(exchange);
conversation.add(exchange);

View File

@ -1,67 +1,53 @@
package org.eclipse.jetty.client;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
public class HttpConversation
{
private final List<HttpExchange> exchanges = new ArrayList<>();
private final Queue<HttpExchange> exchanges = new ConcurrentLinkedQueue<>();
private final AtomicReference<Response.Listener> listener = new AtomicReference<>();
private final HttpClient client;
private final long id;
private HttpConnection connection;
private Request request;
private Response.Listener listener;
public HttpConversation(HttpClient client, long id)
{
this.client = client;
this.id = id;
}
public void done()
public long id()
{
reset();
}
private void reset()
{
connection = null;
request = null;
listener = null;
}
public HttpConnection connection()
{
return connection;
}
public Request request()
{
return request;
return id;
}
public Response.Listener listener()
{
return listener;
return listener.get();
}
public void listener(Response.Listener listener)
{
this.listener = listener;
this.listener.set(listener);
}
public void add(HttpExchange exchange)
{
exchanges.add(exchange);
exchanges.offer(exchange);
}
public HttpExchange first()
{
return exchanges.get(0);
return exchanges.peek();
}
public void complete()
{
listener.set(null);
client.removeConversation(this);
}
@Override

View File

@ -52,12 +52,12 @@ public class HttpExchange
connection.receive();
}
public void requestDone(boolean success)
public void requestComplete(boolean success)
{
done(success, REQUEST_SUCCESS);
}
public void responseDone(boolean success)
public void responseComplete(boolean success)
{
done(success, RESPONSE_SUCCESS);
}

View File

@ -4,7 +4,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpHeader;
@ -19,10 +19,10 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
{
private static final Logger LOG = Log.getLogger(HttpReceiver.class);
private final AtomicBoolean complete = new AtomicBoolean();
private final HttpParser parser = new HttpParser(this);
private final AtomicReference<Response.Listener> listener = new AtomicReference<>();
private final HttpConnection connection;
private boolean failed;
private volatile boolean failed;
public HttpReceiver(HttpConnection connection)
{
@ -72,14 +72,18 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
public boolean startResponse(HttpVersion version, int status, String reason)
{
HttpExchange exchange = connection.getExchange();
HttpConversation conversation = exchange.conversation();
// Probe the protocol listeners
HttpClient client = connection.getHttpClient();
HttpResponse response = exchange.response();
Response.Listener listener = client.lookup(status);
if (listener == null)
listener = exchange.conversation().first().listener();
this.listener.set(listener);
{
listener = conversation.first().listener();
complete.set(true);
}
conversation.listener(listener);
response.version(version).status(status).reason(reason);
LOG.debug("Receiving {}", response);
@ -100,9 +104,10 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
public boolean headerComplete()
{
HttpExchange exchange = connection.getExchange();
HttpConversation conversation = exchange.conversation();
HttpResponse response = exchange.response();
LOG.debug("Headers {}", response);
notifyHeaders(listener.get(), response);
notifyHeaders(conversation.listener(), response);
return false;
}
@ -110,9 +115,10 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
public boolean content(ByteBuffer buffer)
{
HttpExchange exchange = connection.getExchange();
HttpConversation conversation = exchange.conversation();
HttpResponse response = exchange.response();
LOG.debug("Content {}: {} bytes", response, buffer.remaining());
notifyContent(listener.get(), response, buffer);
notifyContent(conversation.listener(), response, buffer);
return false;
}
@ -127,13 +133,18 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
protected void success()
{
HttpExchange exchange = connection.getExchange();
HttpConversation conversation = exchange.conversation();
HttpResponse response = exchange.response();
LOG.debug("Received {}", response);
parser.reset();
failed = false;
boolean complete = this.complete.getAndSet(false);
exchange.responseDone(true);
notifySuccess(listener.get(), response);
exchange.responseComplete(true);
notifySuccess(conversation.listener(), response);
if (complete)
conversation.complete();
}
protected void fail(Throwable failure)
@ -146,14 +157,17 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
if (exchange == null)
return;
HttpConversation conversation = exchange.conversation();
HttpResponse response = exchange.response();
LOG.debug("Failed {} {}", response, failure);
failed = true;
parser.reset();
failed = true;
complete.set(false);
exchange.responseDone(false);
notifyFailure(listener.get(), response, failure);
exchange.responseComplete(false);
notifyFailure(conversation.listener(), response, failure);
conversation.complete();
}
@Override

View File

@ -166,7 +166,7 @@ public class HttpSender
// Notify after
HttpExchange exchange = this.exchange.getAndSet(null);
LOG.debug("Sent {}", exchange.request());
exchange.requestDone(true);
exchange.requestComplete(true);
// It is important to notify *after* we reset because
// the notification may trigger another request/response
@ -186,7 +186,7 @@ public class HttpSender
// Notify after
HttpExchange exchange = this.exchange.getAndSet(null);
LOG.debug("Failed {}", exchange.request());
exchange.requestDone(false);
exchange.requestComplete(false);
notifyRequestFailure(exchange.request(), failure);
notifyResponseFailure(exchange.listener(), failure);

View File

@ -26,7 +26,7 @@ public class RedirectionTest extends AbstractHttpClientServerTest
public void test_303() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/303/done")
.path("/localhost/303/localhost/done")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
@ -37,8 +37,19 @@ public class RedirectionTest extends AbstractHttpClientServerTest
public void test_303_302() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/303/302/done")
.send().get(500, TimeUnit.SECONDS);
.path("/localhost/303/localhost/302/localhost/done")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
@Test
public void test_303_302_OnDifferentDestinations() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/127.0.0.1/303/localhost/302/localhost/done")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
@ -51,10 +62,11 @@ public class RedirectionTest extends AbstractHttpClientServerTest
{
try
{
String[] paths = target.split("/", 3);
int status = Integer.parseInt(paths[1]);
String[] paths = target.split("/", 4);
String host = paths[1];
int status = Integer.parseInt(paths[2]);
response.setStatus(status);
response.setHeader("Location", request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + "/" + paths[2]);
response.setHeader("Location", request.getScheme() + "://" + host + ":" + request.getServerPort() + "/" + paths[3]);
}
catch (NumberFormatException x)
{