Jetty9 - Third take at HTTP client implementation.
This commit is contained in:
parent
250865f51d
commit
2b5ec13003
|
@ -20,7 +20,9 @@ import java.net.SocketAddress;
|
|||
import java.net.URI;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
@ -30,6 +32,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import javax.net.ssl.SSLEngine;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Destination;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
|
@ -85,6 +88,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
|||
public class HttpClient extends AggregateLifeCycle
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpClient.class);
|
||||
|
||||
private final ConcurrentMap<String, Destination> destinations = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<Long, HttpConversation> conversations = new ConcurrentHashMap<>();
|
||||
private volatile Executor executor;
|
||||
|
@ -131,6 +135,8 @@ public class HttpClient extends AggregateLifeCycle
|
|||
addBean(selectorManager);
|
||||
|
||||
super.doStart();
|
||||
|
||||
LOG.info("Started {}", this);
|
||||
}
|
||||
|
||||
protected SelectorManager newSelectorManager()
|
||||
|
@ -172,12 +178,12 @@ public class HttpClient extends AggregateLifeCycle
|
|||
this.bindAddress = bindAddress;
|
||||
}
|
||||
|
||||
public Future<Response> GET(String uri)
|
||||
public Future<ContentResponse> GET(String uri)
|
||||
{
|
||||
return GET(URI.create(uri));
|
||||
}
|
||||
|
||||
public Future<Response> GET(URI uri)
|
||||
public Future<ContentResponse> GET(URI uri)
|
||||
{
|
||||
return newRequest(uri).send();
|
||||
}
|
||||
|
@ -226,10 +232,17 @@ public class HttpClient extends AggregateLifeCycle
|
|||
Destination existing = destinations.putIfAbsent(address, destination);
|
||||
if (existing != null)
|
||||
destination = existing;
|
||||
else
|
||||
LOG.debug("Created {}", destination);
|
||||
}
|
||||
return destination;
|
||||
}
|
||||
|
||||
public List<Destination> getDestinations()
|
||||
{
|
||||
return new ArrayList<>(destinations.values());
|
||||
}
|
||||
|
||||
public String getUserAgent()
|
||||
{
|
||||
return agent;
|
||||
|
@ -345,13 +358,13 @@ public class HttpClient extends AggregateLifeCycle
|
|||
}
|
||||
}
|
||||
|
||||
public HttpConversation conversationFor(Request request, Response.Listener listener)
|
||||
public HttpConversation conversationFor(Request request)
|
||||
{
|
||||
long id = request.id();
|
||||
HttpConversation conversation = conversations.get(id);
|
||||
if (conversation == null)
|
||||
{
|
||||
conversation = new HttpConversation(this, listener);
|
||||
conversation = new HttpConversation(this, id);
|
||||
HttpConversation existing = conversations.putIfAbsent(id, conversation);
|
||||
if (existing != null)
|
||||
conversation = existing;
|
||||
|
@ -364,6 +377,7 @@ public class HttpClient extends AggregateLifeCycle
|
|||
// TODO
|
||||
switch (status)
|
||||
{
|
||||
case 302:
|
||||
case 303:
|
||||
return new RedirectionListener(this);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.ContentProvider;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
|
@ -16,13 +18,17 @@ public class HttpConnection extends AbstractConnection implements Connection
|
|||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpConnection.class);
|
||||
|
||||
private final AtomicReference<HttpExchange> exchange = new AtomicReference<>();
|
||||
private final HttpClient client;
|
||||
private volatile HttpConversation conversation;
|
||||
private final HttpSender sender;
|
||||
private final HttpReceiver receiver;
|
||||
|
||||
public HttpConnection(HttpClient client, EndPoint endPoint)
|
||||
{
|
||||
super(endPoint, client.getExecutor());
|
||||
this.client = client;
|
||||
this.sender = new HttpSender(this);
|
||||
this.receiver = new HttpReceiver(this);
|
||||
}
|
||||
|
||||
public HttpClient getHttpClient()
|
||||
|
@ -40,9 +46,9 @@ public class HttpConnection extends AbstractConnection implements Connection
|
|||
@Override
|
||||
protected boolean onReadTimeout()
|
||||
{
|
||||
HttpConversation conversation = this.conversation;
|
||||
if (conversation != null)
|
||||
conversation.idleTimeout();
|
||||
HttpExchange exchange = this.exchange.get();
|
||||
if (exchange != null)
|
||||
exchange.idleTimeout();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -50,10 +56,18 @@ public class HttpConnection extends AbstractConnection implements Connection
|
|||
public void send(Request request, Response.Listener listener)
|
||||
{
|
||||
normalizeRequest(request);
|
||||
HttpConversation conversation = client.conversationFor(request, listener);
|
||||
this.conversation = conversation;
|
||||
conversation.prepare(this, request, listener);
|
||||
conversation.send();
|
||||
HttpConversation conversation = client.conversationFor(request);
|
||||
HttpExchange exchange = new HttpExchange(conversation, sender, receiver, request, listener);
|
||||
if (this.exchange.compareAndSet(null, exchange))
|
||||
{
|
||||
conversation.add(exchange);
|
||||
LOG.debug("{}({})", request, conversation);
|
||||
exchange.send();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new UnsupportedOperationException("Pipelined requests not supported");
|
||||
}
|
||||
}
|
||||
|
||||
private void normalizeRequest(Request request)
|
||||
|
@ -96,11 +110,10 @@ public class HttpConnection extends AbstractConnection implements Connection
|
|||
@Override
|
||||
public void onFillable()
|
||||
{
|
||||
HttpConversation conversation = this.conversation;
|
||||
if (conversation != null)
|
||||
conversation.receive();
|
||||
HttpExchange exchange = this.exchange.get();
|
||||
if (exchange != null)
|
||||
exchange.receive();
|
||||
else
|
||||
// TODO test sending white space... we want to consume it but throw if it's not whitespace
|
||||
LOG.warn("Ready to read response, but no receiver");
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
package org.eclipse.jetty.client;
|
||||
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
|
||||
public class HttpContentResponse implements ContentResponse
|
||||
{
|
||||
private final Response response;
|
||||
private final BufferingResponseListener listener;
|
||||
|
||||
public HttpContentResponse(Response response, BufferingResponseListener listener)
|
||||
{
|
||||
this.response = response;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Request request()
|
||||
{
|
||||
return response.request();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Listener listener()
|
||||
{
|
||||
return response.listener();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpVersion version()
|
||||
{
|
||||
return response.version();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int status()
|
||||
{
|
||||
return response.status();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String reason()
|
||||
{
|
||||
return response.reason();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpFields headers()
|
||||
{
|
||||
return response.headers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort()
|
||||
{
|
||||
response.abort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] content()
|
||||
{
|
||||
return listener.content();
|
||||
}
|
||||
}
|
|
@ -1,38 +1,25 @@
|
|||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
|
||||
public class HttpConversation
|
||||
{
|
||||
private final Response.Listener applicationListener;
|
||||
private final HttpSender sender;
|
||||
private final HttpReceiver receiver;
|
||||
private final List<HttpExchange> exchanges = new ArrayList<>();
|
||||
private final HttpClient client;
|
||||
private final long id;
|
||||
|
||||
private HttpConnection connection;
|
||||
private Request request;
|
||||
private Response.Listener listener;
|
||||
private HttpResponse response;
|
||||
|
||||
public HttpConversation(HttpClient client, Response.Listener listener)
|
||||
public HttpConversation(HttpClient client, long id)
|
||||
{
|
||||
applicationListener = listener;
|
||||
sender = new HttpSender(client);
|
||||
receiver = new HttpReceiver();
|
||||
}
|
||||
|
||||
public Response.Listener applicationListener()
|
||||
{
|
||||
return applicationListener;
|
||||
}
|
||||
|
||||
public void prepare(HttpConnection connection, Request request, Response.Listener listener)
|
||||
{
|
||||
if (this.connection != null)
|
||||
throw new IllegalStateException();
|
||||
this.connection = connection;
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
this.response = new HttpResponse(request, listener);
|
||||
this.client = client;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public void done()
|
||||
|
@ -67,23 +54,19 @@ public class HttpConversation
|
|||
this.listener = listener;
|
||||
}
|
||||
|
||||
public HttpResponse response()
|
||||
public void add(HttpExchange exchange)
|
||||
{
|
||||
return response;
|
||||
exchanges.add(exchange);
|
||||
}
|
||||
|
||||
public void send()
|
||||
public HttpExchange first()
|
||||
{
|
||||
sender.send(this);
|
||||
return exchanges.get(0);
|
||||
}
|
||||
|
||||
public void idleTimeout()
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
receiver.idleTimeout();
|
||||
}
|
||||
|
||||
public void receive()
|
||||
{
|
||||
receiver.receive(this);
|
||||
return String.format("%s[%d]", HttpConversation.class.getSimpleName(), id);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,15 +25,19 @@ import org.eclipse.jetty.client.api.Request;
|
|||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
public class HttpDestination implements Destination
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpDestination.class);
|
||||
|
||||
private final AtomicInteger connectionCount = new AtomicInteger();
|
||||
private final HttpClient client;
|
||||
private final String scheme;
|
||||
private final String host;
|
||||
private final int port;
|
||||
private final Queue<Response> requests;
|
||||
private final Queue<RequestPair> requests;
|
||||
private final Queue<Connection> idleConnections;
|
||||
private final Queue<Connection> activeConnections;
|
||||
|
||||
|
@ -76,21 +80,20 @@ public class HttpDestination implements Destination
|
|||
if (port != request.port())
|
||||
throw new IllegalArgumentException("Invalid request port " + request.port() + " for destination " + this);
|
||||
|
||||
HttpResponse response = new HttpResponse(request, listener);
|
||||
|
||||
RequestPair requestPair = new RequestPair(request, listener);
|
||||
if (client.isRunning())
|
||||
{
|
||||
if (requests.offer(response))
|
||||
if (requests.offer(requestPair))
|
||||
{
|
||||
if (!client.isRunning() && requests.remove(response))
|
||||
if (!client.isRunning() && requests.remove(requestPair))
|
||||
{
|
||||
throw new RejectedExecutionException(HttpClient.class.getSimpleName() + " is shutting down");
|
||||
throw new RejectedExecutionException(HttpClient.class.getSimpleName() + " is stopping");
|
||||
}
|
||||
else
|
||||
{
|
||||
Request.Listener requestListener = request.listener();
|
||||
notifyRequestQueued(requestListener, request);
|
||||
ensureConnection();
|
||||
LOG.debug("Queued {}", request);
|
||||
notifyRequestQueued(request.listener(), request);
|
||||
ensureConnection(); // TODO: improve and test this
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -98,6 +101,10 @@ public class HttpDestination implements Destination
|
|||
throw new RejectedExecutionException("Max requests per address " + client.getMaxQueueSizePerAddress() + " exceeded");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new RejectedExecutionException(HttpClient.class.getSimpleName() + " is stopped");
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyRequestQueued(Request.Listener listener, Request request)
|
||||
|
@ -109,28 +116,33 @@ public class HttpDestination implements Destination
|
|||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
// TODO: log or abort request send ?
|
||||
LOG.info("Exception while notifying listener " + listener, x);
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureConnection()
|
||||
{
|
||||
int maxConnections = client.getMaxConnectionsPerAddress();
|
||||
final int maxConnections = client.getMaxConnectionsPerAddress();
|
||||
while (true)
|
||||
{
|
||||
int count = connectionCount.get();
|
||||
int current = connectionCount.get();
|
||||
final int next = current + 1;
|
||||
|
||||
if (count >= maxConnections)
|
||||
if (next > maxConnections)
|
||||
{
|
||||
LOG.debug("Max connections reached {}: {}", this, current);
|
||||
break;
|
||||
}
|
||||
|
||||
if (connectionCount.compareAndSet(count, count + 1))
|
||||
if (connectionCount.compareAndSet(current, next))
|
||||
{
|
||||
newConnection(new Callback<Connection>()
|
||||
{
|
||||
@Override
|
||||
public void completed(Connection connection)
|
||||
{
|
||||
dispatch(connection);
|
||||
LOG.debug("Created connection {}/{} for {}", next, maxConnections, this);
|
||||
process(connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -157,16 +169,18 @@ public class HttpDestination implements Destination
|
|||
}
|
||||
|
||||
/**
|
||||
* Responsibility of this method is to dequeue a request, associate it to the given {@code connection}
|
||||
* and dispatch a thread to execute the request.
|
||||
* <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
|
||||
* <p>A new connection is created when a request needs to be executed; it is possible that the request that
|
||||
* triggered the request creation is executed by another connection that was just released, so the new connection
|
||||
* may become idle.</p>
|
||||
* <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
|
||||
*
|
||||
* This can be done in several ways: one could be to
|
||||
* @param connection
|
||||
* @param connection the new connection
|
||||
*/
|
||||
protected void dispatch(final Connection connection)
|
||||
protected void process(final Connection connection)
|
||||
{
|
||||
final Response response = requests.poll();
|
||||
if (response == null)
|
||||
final RequestPair requestPair = requests.poll();
|
||||
if (requestPair == null)
|
||||
{
|
||||
idleConnections.offer(connection);
|
||||
}
|
||||
|
@ -178,7 +192,7 @@ public class HttpDestination implements Destination
|
|||
@Override
|
||||
public void run()
|
||||
{
|
||||
connection.send(response.request(), response.listener());
|
||||
connection.send(requestPair.request, requestPair.listener);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -197,4 +211,23 @@ public class HttpDestination implements Destination
|
|||
// if I create manually the connection, then I call send(request, listener)
|
||||
|
||||
// Other ways ?
|
||||
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s(%s://%s:%d)", HttpDestination.class.getSimpleName(), scheme(), host(), port());
|
||||
}
|
||||
|
||||
private static class RequestPair
|
||||
{
|
||||
private final Request request;
|
||||
private final Response.Listener listener;
|
||||
|
||||
public RequestPair(Request request, Response.Listener listener)
|
||||
{
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
package org.eclipse.jetty.client;
|
||||
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
|
||||
public class HttpExchange
|
||||
{
|
||||
private final HttpConversation conversation;
|
||||
private final HttpSender sender;
|
||||
private final HttpReceiver receiver;
|
||||
private final Request request;
|
||||
private final Response.Listener listener;
|
||||
private final HttpResponse response;
|
||||
|
||||
public HttpExchange(HttpConversation conversation, HttpSender sender, HttpReceiver receiver, Request request, Response.Listener listener)
|
||||
{
|
||||
this.conversation = conversation;
|
||||
this.sender = sender;
|
||||
this.receiver = receiver;
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
this.response = new HttpResponse(request, listener);
|
||||
}
|
||||
|
||||
public HttpConversation conversation()
|
||||
{
|
||||
return conversation;
|
||||
}
|
||||
|
||||
public Request request()
|
||||
{
|
||||
return request;
|
||||
}
|
||||
|
||||
public Response.Listener listener()
|
||||
{
|
||||
return listener;
|
||||
}
|
||||
|
||||
public HttpResponse response()
|
||||
{
|
||||
return response;
|
||||
}
|
||||
|
||||
public void send()
|
||||
{
|
||||
sender.send(this);
|
||||
}
|
||||
|
||||
public void idleTimeout()
|
||||
{
|
||||
receiver.idleTimeout();
|
||||
}
|
||||
|
||||
public void receive()
|
||||
{
|
||||
receiver.receive(this);
|
||||
}
|
||||
|
||||
public void requestDone()
|
||||
{
|
||||
// TODO
|
||||
}
|
||||
|
||||
public void responseDone()
|
||||
{
|
||||
// TODO
|
||||
}
|
||||
}
|
|
@ -4,6 +4,7 @@ import java.io.EOFException;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
|
@ -14,20 +15,24 @@ import org.eclipse.jetty.io.EndPoint;
|
|||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
|
||||
public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpReceiver.class);
|
||||
|
||||
private final HttpParser parser = new HttpParser(this);
|
||||
private HttpConversation conversation;
|
||||
private final AtomicReference<HttpExchange> exchange = new AtomicReference<>();
|
||||
private final AtomicReference<Response.Listener> listener = new AtomicReference<>();
|
||||
private final HttpConnection connection;
|
||||
|
||||
public void receive(HttpConversation conversation)
|
||||
public HttpReceiver(HttpConnection connection)
|
||||
{
|
||||
if (this.conversation != null)
|
||||
throw new IllegalStateException();
|
||||
this.conversation = conversation;
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
public void receive(HttpExchange exchange)
|
||||
{
|
||||
this.exchange.set(exchange);
|
||||
|
||||
HttpConnection connection = conversation.connection();
|
||||
HttpClient client = connection.getHttpClient();
|
||||
ByteBufferPool bufferPool = client.getByteBufferPool();
|
||||
ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
|
||||
|
@ -40,7 +45,6 @@ class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
|
|||
if (read > 0)
|
||||
{
|
||||
parser.parseNext(buffer);
|
||||
// TODO: response done, reset ?
|
||||
}
|
||||
else if (read == 0)
|
||||
{
|
||||
|
@ -65,39 +69,44 @@ class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
|
|||
@Override
|
||||
public boolean startResponse(HttpVersion version, int status, String reason)
|
||||
{
|
||||
HttpResponse response = conversation.response();
|
||||
response.version(version).status(status).reason(reason);
|
||||
HttpExchange exchange = this.exchange.get();
|
||||
|
||||
// Probe the protocol listeners
|
||||
HttpClient client = conversation.connection().getHttpClient();
|
||||
HttpClient client = connection.getHttpClient();
|
||||
HttpResponse response = exchange.response();
|
||||
Response.Listener listener = client.lookup(status);
|
||||
if (listener != null)
|
||||
conversation.listener(listener);
|
||||
else
|
||||
conversation.listener(conversation.applicationListener());
|
||||
if (listener == null)
|
||||
listener = exchange.conversation().first().listener();
|
||||
this.listener.set(listener);
|
||||
|
||||
notifyBegin(conversation.listener(), response);
|
||||
response.version(version).status(status).reason(reason);
|
||||
LOG.debug("{}", response);
|
||||
|
||||
notifyBegin(listener, response);
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean parsedHeader(HttpHeader header, String name, String value)
|
||||
{
|
||||
conversation.response().headers().put(name, value);
|
||||
HttpExchange exchange = this.exchange.get();
|
||||
exchange.response().headers().put(name, value);
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean headerComplete()
|
||||
{
|
||||
notifyHeaders(conversation.listener(), conversation.response());
|
||||
HttpExchange exchange = this.exchange.get();
|
||||
notifyHeaders(listener.get(), exchange.response());
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean content(ByteBuffer buffer)
|
||||
{
|
||||
notifyContent(conversation.listener(), conversation.response(), buffer);
|
||||
HttpExchange exchange = this.exchange.get();
|
||||
notifyContent(listener.get(), exchange.response(), buffer);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -110,21 +119,16 @@ class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
|
|||
|
||||
protected void success()
|
||||
{
|
||||
HttpConversation conversation = this.conversation;
|
||||
this.conversation = null;
|
||||
Response.Listener listener = conversation.listener();
|
||||
Response response = conversation.response();
|
||||
conversation.done();
|
||||
notifySuccess(listener, response);
|
||||
HttpExchange exchange = this.exchange.getAndSet(null);
|
||||
exchange.responseDone();
|
||||
notifySuccess(listener.get(), exchange.response());
|
||||
}
|
||||
|
||||
protected void fail(Throwable failure)
|
||||
{
|
||||
Response.Listener listener = conversation.listener();
|
||||
Response response = conversation.response();
|
||||
conversation.done();
|
||||
conversation = null;
|
||||
notifyFailure(listener, response, failure);
|
||||
HttpExchange exchange = this.exchange.getAndSet(null);
|
||||
exchange.responseDone();
|
||||
notifyFailure(listener.get(), exchange.response(), failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -137,7 +141,7 @@ class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
|
|||
@Override
|
||||
public void badMessage(int status, String reason)
|
||||
{
|
||||
conversation.response().status(status).reason(reason);
|
||||
exchange.get().response().status(status).reason(reason);
|
||||
fail(new HttpResponseException());
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
|
||||
import org.eclipse.jetty.client.api.ContentDecoder;
|
||||
import org.eclipse.jetty.client.api.ContentProvider;
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
|
@ -41,7 +42,6 @@ public class HttpRequest implements Request
|
|||
private HttpVersion version;
|
||||
private String agent;
|
||||
private long idleTimeout;
|
||||
private Response response;
|
||||
private Listener listener;
|
||||
private ContentProvider content;
|
||||
private final HttpFields headers = new HttpFields();
|
||||
|
@ -228,23 +228,23 @@ public class HttpRequest implements Request
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<Response> send()
|
||||
public Future<ContentResponse> send()
|
||||
{
|
||||
final FutureCallback<Response> result = new FutureCallback<>();
|
||||
final FutureCallback<ContentResponse> result = new FutureCallback<>();
|
||||
BufferingResponseListener listener = new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(Response response)
|
||||
{
|
||||
super.onSuccess(response);
|
||||
result.completed(response);
|
||||
result.completed(new HttpContentResponse(response, this));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Response response, Throwable failure)
|
||||
{
|
||||
super.onFailure(response, failure);
|
||||
result.failed(response, failure);
|
||||
result.failed(new HttpContentResponse(response, this), failure);
|
||||
}
|
||||
};
|
||||
send(listener);
|
||||
|
@ -256,4 +256,10 @@ public class HttpRequest implements Request
|
|||
{
|
||||
client.send(this, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[%s %s %s]@%x", HttpRequest.class.getSimpleName(), method(), path(), version(), hashCode());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,9 +17,8 @@ import org.eclipse.jetty.client.api.Request;
|
|||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
|
||||
public class HttpResponse extends FutureCallback<Response> implements Response
|
||||
public class HttpResponse implements Response
|
||||
{
|
||||
private final HttpFields headers = new HttpFields();
|
||||
private final Request request;
|
||||
|
@ -91,4 +90,10 @@ public class HttpResponse extends FutureCallback<Response> implements Response
|
|||
{
|
||||
// request.abort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[%s %d %s]", HttpResponse.class.getSimpleName(), version(), status(), reason());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,40 +18,48 @@ import org.eclipse.jetty.util.Callback;
|
|||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
class HttpSender
|
||||
public class HttpSender
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpSender.class);
|
||||
|
||||
private final HttpGenerator generator = new HttpGenerator();
|
||||
private final HttpClient client;
|
||||
private HttpConversation conversation;
|
||||
private final AtomicReference<HttpExchange> exchange = new AtomicReference<>();
|
||||
private final HttpConnection connection;
|
||||
|
||||
private long contentLength;
|
||||
private Iterator<ByteBuffer> contentChunks;
|
||||
private ByteBuffer header;
|
||||
private ByteBuffer chunk;
|
||||
private boolean requestHeadersComplete;
|
||||
|
||||
HttpSender(HttpClient client)
|
||||
public HttpSender(HttpConnection connection)
|
||||
{
|
||||
this.client = client;
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
public void send(HttpConversation conversation)
|
||||
public void send(HttpExchange exchange)
|
||||
{
|
||||
this.conversation = conversation;
|
||||
ContentProvider content = conversation.request().content();
|
||||
this.contentLength = content == null ? -1 : content.length();
|
||||
this.contentChunks = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
|
||||
send();
|
||||
if (this.exchange.compareAndSet(null, exchange))
|
||||
{
|
||||
ContentProvider content = exchange.request().content();
|
||||
this.contentLength = content == null ? -1 : content.length();
|
||||
this.contentChunks = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
|
||||
send();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
private void send()
|
||||
{
|
||||
try
|
||||
{
|
||||
HttpConnection connection = conversation.connection();
|
||||
HttpClient client = connection.getHttpClient();
|
||||
EndPoint endPoint = connection.getEndPoint();
|
||||
ByteBufferPool byteBufferPool = client.getByteBufferPool();
|
||||
final Request request = exchange.get().request();
|
||||
HttpGenerator.RequestInfo info = null;
|
||||
ByteBuffer content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
|
||||
boolean lastContent = !contentChunks.hasNext();
|
||||
|
@ -62,7 +70,6 @@ class HttpSender
|
|||
{
|
||||
case NEED_INFO:
|
||||
{
|
||||
Request request = conversation.request();
|
||||
info = new HttpGenerator.RequestInfo(request.version(), request.headers(), contentLength, request.method().asString(), request.path());
|
||||
break;
|
||||
}
|
||||
|
@ -83,7 +90,7 @@ class HttpSender
|
|||
@Override
|
||||
protected void pendingCompleted()
|
||||
{
|
||||
notifyRequestHeadersComplete(conversation.request());
|
||||
notifyRequestHeadersComplete(request);
|
||||
send();
|
||||
}
|
||||
|
||||
|
@ -108,7 +115,7 @@ class HttpSender
|
|||
if (!requestHeadersComplete)
|
||||
{
|
||||
requestHeadersComplete = true;
|
||||
notifyRequestHeadersComplete(conversation.request());
|
||||
notifyRequestHeadersComplete(request);
|
||||
}
|
||||
releaseBuffers();
|
||||
content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
|
||||
|
@ -151,13 +158,13 @@ class HttpSender
|
|||
|
||||
protected void success()
|
||||
{
|
||||
Request request = conversation.request();
|
||||
conversation = null;
|
||||
HttpExchange exchange = this.exchange.getAndSet(null);
|
||||
exchange.requestDone();
|
||||
generator.reset();
|
||||
requestHeadersComplete = false;
|
||||
// It is important to notify *after* we reset because
|
||||
// the notification may trigger another request/response
|
||||
notifyRequestSuccess(request);
|
||||
notifyRequestSuccess(exchange.request());
|
||||
}
|
||||
|
||||
protected void fail(Throwable failure)
|
||||
|
@ -165,15 +172,17 @@ class HttpSender
|
|||
BufferUtil.clear(header);
|
||||
BufferUtil.clear(chunk);
|
||||
releaseBuffers();
|
||||
conversation.connection().getEndPoint().shutdownOutput();
|
||||
connection.getEndPoint().shutdownOutput();
|
||||
generator.abort();
|
||||
notifyRequestFailure(conversation.request(), failure);
|
||||
notifyResponseFailure(conversation.listener(), failure);
|
||||
HttpExchange exchange = this.exchange.getAndSet(null);
|
||||
exchange.requestDone();
|
||||
notifyRequestFailure(exchange.request(), failure);
|
||||
notifyResponseFailure(exchange.listener(), failure);
|
||||
}
|
||||
|
||||
private void releaseBuffers()
|
||||
{
|
||||
ByteBufferPool bufferPool = client.getByteBufferPool();
|
||||
ByteBufferPool bufferPool = connection.getHttpClient().getByteBufferPool();
|
||||
if (!BufferUtil.hasContent(header))
|
||||
{
|
||||
bufferPool.release(header);
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
package org.eclipse.jetty.client.api;
|
||||
|
||||
public interface ContentResponse extends Response
|
||||
{
|
||||
byte[] content();
|
||||
}
|
|
@ -74,7 +74,7 @@ public interface Request
|
|||
|
||||
Request listener(Listener listener);
|
||||
|
||||
Future<Response> send();
|
||||
Future<ContentResponse> send();
|
||||
|
||||
void send(Response.Listener listener);
|
||||
|
||||
|
|
|
@ -14,21 +14,24 @@
|
|||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Destination;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class HttpClientTest extends AbstractHttpClientTest
|
||||
{
|
||||
@Test
|
||||
public void testGETNoResponseContent() throws Exception
|
||||
public void test_GET_NoResponseContent() throws Exception
|
||||
{
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
|
@ -44,4 +47,35 @@ public class HttpClientTest extends AbstractHttpClientTest
|
|||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(200, response.status());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_GET_ResponseContent() throws Exception
|
||||
{
|
||||
final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
response.getOutputStream().write(data);
|
||||
baseRequest.setHandled(true);
|
||||
}
|
||||
});
|
||||
|
||||
ContentResponse response = client.GET("http://localhost:" + connector.getLocalPort()).get(5, TimeUnit.SECONDS);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(200, response.status());
|
||||
byte[] content = response.content();
|
||||
Assert.assertArrayEquals(data, content);
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
List<Destination> destinations = client.getDestinations();
|
||||
Assert.assertNotNull(destinations);
|
||||
Assert.assertEquals(1, destinations.size());
|
||||
Destination destination = destinations.get(0);
|
||||
Assert.assertNotNull(destination);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,11 +23,22 @@ public class RedirectionTest extends AbstractHttpClientTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void test303() throws Exception
|
||||
public void test_303() throws Exception
|
||||
{
|
||||
Response response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/303/done")
|
||||
.send().get(500, TimeUnit.SECONDS);
|
||||
.send().get(5, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(200, response.status());
|
||||
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_303_302() throws Exception
|
||||
{
|
||||
Response response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/303/302/done")
|
||||
.send().get(5, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(200, response.status());
|
||||
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
|
||||
|
|
|
@ -38,7 +38,7 @@ public class Usage
|
|||
public void testGETBlocking_ShortAPI() throws Exception
|
||||
{
|
||||
HttpClient client = new HttpClient();
|
||||
Future<Response> responseFuture = client.GET("http://localhost:8080/foo");
|
||||
Future<ContentResponse> responseFuture = client.GET("http://localhost:8080/foo");
|
||||
Response response = responseFuture.get();
|
||||
Assert.assertEquals(200, response.status());
|
||||
// Headers abstraction needed for:
|
||||
|
@ -66,7 +66,7 @@ public class Usage
|
|||
.decoder(null)
|
||||
.content(null)
|
||||
.idleTimeout(5000L);
|
||||
Future<Response> responseFuture = request.send();
|
||||
Future<ContentResponse> responseFuture = request.send();
|
||||
Response response = responseFuture.get();
|
||||
Assert.assertEquals(200, response.status());
|
||||
}
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||
org.eclipse.jetty.LEVEL=DEBUG
|
||||
org.eclipse.jetty.client.LEVEL=DEBUG
|
||||
|
|
Loading…
Reference in New Issue