Jetty9 - First take at HTTP client implementation.

This commit is contained in:
Simone Bordet 2012-09-04 19:20:29 +02:00
parent 4de5b0ad63
commit b18ab0e76a
42 changed files with 2754 additions and 885 deletions

View File

@ -0,0 +1,77 @@
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.client.api.Response;
public class BufferingResponseListener extends Response.Listener.Adapter
{
private final CountDownLatch latch = new CountDownLatch(1);
private final int maxCapacity;
private Response response;
private Throwable failure;
private byte[] buffer = new byte[0];
public BufferingResponseListener()
{
this(16 * 1024 * 1024);
}
public BufferingResponseListener(int maxCapacity)
{
this.maxCapacity = maxCapacity;
}
@Override
public void onContent(Response response, ByteBuffer content)
{
long newLength = buffer.length + content.remaining();
if (newLength > maxCapacity)
throw new IllegalStateException("Buffering capacity exceeded");
byte[] newBuffer = new byte[(int)newLength];
System.arraycopy(buffer, 0, newBuffer, 0, buffer.length);
content.get(newBuffer, buffer.length, content.remaining());
buffer = newBuffer;
}
@Override
public void onSuccess(Response response)
{
this.response = response;
latch.countDown();
}
@Override
public void onFailure(Response response, Throwable failure)
{
this.response = response;
this.failure = failure;
latch.countDown();
}
public Response await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
boolean expired = !latch.await(timeout, unit);
if (failure != null)
throw new ExecutionException(failure);
if (expired)
throw new TimeoutException();
return response;
}
public byte[] content()
{
return buffer;
}
public String contentAsString(String encoding)
{
return new String(content(), Charset.forName(encoding));
}
}

View File

@ -0,0 +1,32 @@
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import org.eclipse.jetty.client.api.ContentProvider;
public class ByteBufferContentProvider implements ContentProvider
{
private final ByteBuffer[] buffers;
public ByteBufferContentProvider(ByteBuffer... buffers)
{
this.buffers = buffers;
}
@Override
public long length()
{
int length = 0;
for (ByteBuffer buffer : buffers)
length += buffer.remaining();
return length;
}
@Override
public Iterator<ByteBuffer> iterator()
{
return Arrays.asList(buffers).iterator();
}
}

View File

@ -1,294 +0,0 @@
//========================================================================
//Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.sun.jndi.toolkit.url.Uri;
import org.eclipse.jetty.client.api.Address;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
/**
* <p>{@link HTTPClient} provides an asynchronous non-blocking implementation to perform HTTP requests to a server.</p>
* <p>{@link HTTPClient} provides easy-to-use methods such as {@link #GET(String)} that allow to perform HTTP
* requests in a one-liner, but also gives the ability to fine tune the configuration of requests via
* {@link Request.Builder}.</p>
* <p>{@link HTTPClient} acts as a central configuration point for network parameters (such as idle timeouts) and
* HTTP parameters (such as whether to follow redirects).</p>
* <p>{@link HTTPClient} transparently pools connections to servers, but allows direct control of connections for
* cases where this is needed.</p>
* <p>Typical usage:</p>
* <pre>
* // One liner:
* new HTTPClient().GET("http://localhost:8080/").get().getStatus();
*
* // Using the builder with a timeout
* HTTPClient client = new HTTPClient();
* Response response = client.builder("http://localhost:8080/").build().send().get(5, TimeUnit.SECONDS);
* int status = response.getStatus();
*
* // Asynchronously
* HTTPClient client = new HTTPClient();
* client.builder("http://localhost:8080/").build().send(new Response.Listener.Adapter()
* {
* &#64;Override
* public void onComplete(Response response)
* {
* ...
* }
* });
* </pre>
*/
public class HTTPClient extends AggregateLifeCycle
{
private final ConcurrentMap<String, Destination> destinations = new ConcurrentHashMap<>();
private volatile String agent = "Jetty/" + Jetty.VERSION;
private volatile boolean followRedirects = true;
private volatile Executor executor;
private volatile int maxConnectionsPerAddress = Integer.MAX_VALUE;
private volatile int maxQueueSizePerAddress = Integer.MAX_VALUE;
private volatile SocketAddress bindAddress;
private volatile SelectorManager selectorManager;
private volatile long idleTimeout;
@Override
protected void doStart() throws Exception
{
selectorManager = newSelectorManager();
addBean(selectorManager);
super.doStart();
}
protected SelectorManager newSelectorManager()
{
ClientSelectorManager result = new ClientSelectorManager();
result.setMaxIdleTime(getIdleTimeout());
return result;
}
@Override
protected void doStop() throws Exception
{
super.doStop();
}
public long getIdleTimeout()
{
return idleTimeout;
}
public void setIdleTimeout(long idleTimeout)
{
this.idleTimeout = idleTimeout;
}
/**
* @return the address to bind socket channels to
* @see #setBindAddress(SocketAddress)
*/
public SocketAddress getBindAddress()
{
return bindAddress;
}
/**
* @param bindAddress the address to bind socket channels to
* @see #getBindAddress()
*/
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}
public Future<Response> GET(String uri)
{
return GET(URI.create(uri));
}
public Future<Response> GET(URI uri)
{
return builder(uri)
.method("GET")
// Add decoder, cookies, agent, default headers, etc.
.agent(getUserAgent())
.followRedirects(isFollowRedirects())
.build()
.send();
}
public Request.Builder builder(String uri)
{
return builder(URI.create(uri));
}
public Request.Builder builder(URI uri)
{
return new StandardRequest(this, uri);
}
public Request.Builder builder(Request prototype)
{
return null;
}
public Destination getDestination(String address)
{
Destination destination = destinations.get(address);
if (destination == null)
{
destination = new StandardDestination(this, address);
Destination existing = destinations.putIfAbsent(address, destination);
if (existing != null)
destination = existing;
}
return destination;
}
public String getUserAgent()
{
return agent;
}
public void setUserAgent(String agent)
{
this.agent = agent;
}
public boolean isFollowRedirects()
{
return followRedirects;
}
public void setFollowRedirects(boolean follow)
{
this.followRedirects = follow;
}
public void join()
{
}
public void join(long timeout, TimeUnit unit)
{
}
public Future<Response> send(Request request, Response.Listener listener)
{
URI uri = request.uri();
String scheme = uri.getScheme();
if (!Arrays.asList("http", "https").contains(scheme.toLowerCase()))
throw new IllegalArgumentException("Invalid protocol " + scheme);
String key = scheme.toLowerCase() + "://" + uri.getHost().toLowerCase();
int port = uri.getPort();
if (port < 0)
key += "https".equalsIgnoreCase(scheme) ? ":443" : ":80";
return getDestination(key).send(request, listener);
}
public Executor getExecutor()
{
return executor;
}
public int getMaxConnectionsPerAddress()
{
return maxConnectionsPerAddress;
}
public void setMaxConnectionsPerAddress(int maxConnectionsPerAddress)
{
this.maxConnectionsPerAddress = maxConnectionsPerAddress;
}
public int getMaxQueueSizePerAddress()
{
return maxQueueSizePerAddress;
}
public void setMaxQueueSizePerAddress(int maxQueueSizePerAddress)
{
this.maxQueueSizePerAddress = maxQueueSizePerAddress;
}
protected Future<Connection> newConnection(Destination destination) throws IOException
{
SocketChannel channel = SocketChannel.open();
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
channel.socket().setTcpNoDelay(true);
channel.connect(destination.address().toSocketAddress());
FutureCallback<Connection> result = new FutureCallback<>();
selectorManager.connect(channel, result);
return result;
}
protected class ClientSelectorManager extends SelectorManager
{
public ClientSelectorManager()
{
this(1);
}
public ClientSelectorManager(int selectors)
{
super(selectors);
}
@Override
protected Selectable newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey sKey) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, sKey, getMaxIdleTime());
}
@Override
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
{
// TODO: SSL
return new StandardConnection(channel, endpoint, )
}
@Override
protected void execute(Runnable task)
{
getExecutor().execute(task);
}
}
}

View File

@ -0,0 +1,430 @@
//========================================================================
//Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
/**
* <p>{@link HttpClient} provides an efficient, asynchronous, non-blocking implementation
* to perform HTTP requests to a server through a simple API that offers also blocking semantic.</p>
* <p>{@link HttpClient} provides easy-to-use methods such as {@link #GET(String)} that allow to perform HTTP
* requests in a one-liner, but also gives the ability to fine tune the configuration of requests via
* {@link HttpClient#newRequest(URI)}.</p>
* <p>{@link HttpClient} acts as a central configuration point for network parameters (such as idle timeouts)
* and HTTP parameters (such as whether to follow redirects).</p>
* <p>{@link HttpClient} transparently pools connections to servers, but allows direct control of connections
* for cases where this is needed.</p>
* <p>Typical usage:</p>
* <pre>
* // One liner:
* new HTTPClient().GET("http://localhost:8080/").get().status();
*
* // Building a request with a timeout
* HTTPClient client = new HTTPClient();
* Response response = client.newRequest("localhost:8080").send().get(5, TimeUnit.SECONDS);
* int status = response.status();
*
* // Asynchronously
* HTTPClient client = new HTTPClient();
* client.newRequest("localhost:8080").send(new Response.Listener.Adapter()
* {
* &#64;Override
* public void onSuccess(Response response)
* {
* ...
* }
* });
* </pre>
*/
public class HttpClient extends AggregateLifeCycle
{
private static final Logger LOG = Log.getLogger(HttpClient.class);
private final ConcurrentMap<String, Destination> destinations = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, HttpConversation> conversations = new ConcurrentHashMap<>();
private volatile Executor executor;
private volatile ByteBufferPool byteBufferPool;
private volatile ScheduledExecutorService scheduler;
private volatile SelectorManager selectorManager;
private volatile SslContextFactory sslContextFactory;
private volatile String agent = "Jetty/" + Jetty.VERSION;
private volatile boolean followRedirects = true;
private volatile int maxConnectionsPerAddress = 8;
private volatile int maxQueueSizePerAddress = 1024;
private volatile int requestBufferSize = 4096;
private volatile int responseBufferSize = 4096;
private volatile SocketAddress bindAddress;
private volatile long idleTimeout;
public ByteBufferPool getByteBufferPool()
{
return byteBufferPool;
}
public SslContextFactory getSslContextFactory()
{
return sslContextFactory;
}
@Override
protected void doStart() throws Exception
{
if (executor == null)
executor = new QueuedThreadPool();
addBean(executor);
if (byteBufferPool == null)
byteBufferPool = new MappedByteBufferPool();
addBean(byteBufferPool);
if (scheduler == null)
scheduler = Executors.newSingleThreadScheduledExecutor();
addBean(scheduler);
selectorManager = newSelectorManager();
addBean(selectorManager);
super.doStart();
}
protected SelectorManager newSelectorManager()
{
return new ClientSelectorManager();
}
@Override
protected void doStop() throws Exception
{
super.doStop();
}
public long getIdleTimeout()
{
return idleTimeout;
}
public void setIdleTimeout(long idleTimeout)
{
this.idleTimeout = idleTimeout;
}
/**
* @return the address to bind socket channels to
* @see #setBindAddress(SocketAddress)
*/
public SocketAddress getBindAddress()
{
return bindAddress;
}
/**
* @param bindAddress the address to bind socket channels to
* @see #getBindAddress()
*/
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}
public Future<Response> GET(String uri)
{
return GET(URI.create(uri));
}
public Future<Response> GET(URI uri)
{
// TODO: Add decoder, cookies, agent, default headers, etc.
return newRequest(uri)
.method(HttpMethod.GET)
.version(HttpVersion.HTTP_1_1)
.agent(getUserAgent())
.idleTimeout(getIdleTimeout())
.followRedirects(isFollowRedirects())
.send();
}
public Request newRequest(String host, int port)
{
return newRequest(URI.create(address("http", host, port)));
}
public Request newRequest(URI uri)
{
return new HttpRequest(this, uri);
}
protected Request newRequest(long id, URI uri)
{
return new HttpRequest(this, id, uri);
}
private String address(String scheme, String host, int port)
{
return scheme + "://" + host + ":" + port;
}
public Destination getDestination(String scheme, String host, int port)
{
String address = address(scheme, host, port);
Destination destination = destinations.get(address);
if (destination == null)
{
destination = new HttpDestination(this, scheme, host, port);
Destination existing = destinations.putIfAbsent(address, destination);
if (existing != null)
destination = existing;
}
return destination;
}
public String getUserAgent()
{
return agent;
}
public void setUserAgent(String agent)
{
this.agent = agent;
}
public boolean isFollowRedirects()
{
return followRedirects;
}
public void setFollowRedirects(boolean follow)
{
this.followRedirects = follow;
}
public void send(Request request, Response.Listener listener)
{
String scheme = request.scheme().toLowerCase();
if (!Arrays.asList("http", "https").contains(scheme))
throw new IllegalArgumentException("Invalid protocol " + scheme);
String host = request.host().toLowerCase();
int port = request.port();
if (port < 0)
port = "https".equals(scheme) ? 443 : 80;
getDestination(scheme, host, port).send(request, listener);
}
public Executor getExecutor()
{
return executor;
}
public int getMaxConnectionsPerAddress()
{
return maxConnectionsPerAddress;
}
public void setMaxConnectionsPerAddress(int maxConnectionsPerAddress)
{
this.maxConnectionsPerAddress = maxConnectionsPerAddress;
}
public int getMaxQueueSizePerAddress()
{
return maxQueueSizePerAddress;
}
public void setMaxQueueSizePerAddress(int maxQueueSizePerAddress)
{
this.maxQueueSizePerAddress = maxQueueSizePerAddress;
}
public int getRequestBufferSize()
{
return requestBufferSize;
}
public void setRequestBufferSize(int requestBufferSize)
{
this.requestBufferSize = requestBufferSize;
}
public int getResponseBufferSize()
{
return responseBufferSize;
}
public void setResponseBufferSize(int responseBufferSize)
{
this.responseBufferSize = responseBufferSize;
}
protected void newConnection(Destination destination, Callback<Connection> callback)
{
SocketChannel channel = null;
try
{
channel = SocketChannel.open();
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
channel.socket().setTcpNoDelay(true);
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(destination.host(), destination.port()));
Future<Connection> result = new ConnectionCallback(destination, callback);
selectorManager.connect(channel, result);
}
catch (IOException x)
{
if (channel != null)
close(channel);
callback.failed(null, x);
}
}
private void close(SocketChannel channel)
{
try
{
channel.close();
}
catch (IOException x)
{
LOG.ignore(x);
}
}
public HttpConversation conversationFor(Request request)
{
long id = request.id();
HttpConversation conversation = conversations.get(id);
if (conversation == null)
{
conversation = new HttpConversation();
HttpConversation existing = conversations.putIfAbsent(id, conversation);
if (existing != null)
conversation = existing;
}
return conversation;
}
protected class ClientSelectorManager extends SelectorManager
{
public ClientSelectorManager()
{
this(1);
}
public ClientSelectorManager(int selectors)
{
super(selectors);
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key)
{
return new SelectChannelEndPoint(channel, selector, key, scheduler, getIdleTimeout());
}
@Override
public org.eclipse.jetty.io.Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
ConnectionCallback callback = (ConnectionCallback)attachment;
Destination destination = callback.destination;
SslContextFactory sslContextFactory = getSslContextFactory();
if ("https".equals(destination.scheme()))
{
if (sslContextFactory == null)
{
IOException failure = new ConnectException("Missing " + SslContextFactory.class.getSimpleName() + " for " + destination.scheme() + " requests");
callback.failed(null, failure);
throw failure;
}
else
{
SSLEngine engine = sslContextFactory.newSSLEngine(endPoint.getRemoteAddress());
engine.setUseClientMode(false);
SslConnection sslConnection = new SslConnection(getByteBufferPool(), getExecutor(), endPoint, engine);
EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
HttpConnection connection = new HttpConnection(HttpClient.this, appEndPoint);
appEndPoint.setConnection(connection);
callback.callback.completed(connection);
connection.onOpen();
return sslConnection;
}
}
else
{
HttpConnection connection = new HttpConnection(HttpClient.this, endPoint);
callback.callback.completed(connection);
return connection;
}
}
@Override
protected void execute(Runnable task)
{
getExecutor().execute(task);
}
}
private class ConnectionCallback extends FutureCallback<Connection>
{
private final Destination destination;
private final Callback<Connection> callback;
private ConnectionCallback(Destination destination, Callback<Connection> callback)
{
this.destination = destination;
this.callback = callback;
}
}
}

View File

@ -0,0 +1,106 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpConnection extends AbstractConnection implements Connection
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
private final HttpClient client;
private volatile HttpConversation conversation;
public HttpConnection(HttpClient client, EndPoint endPoint)
{
super(endPoint, client.getExecutor());
this.client = client;
}
public HttpClient getHttpClient()
{
return client;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
protected boolean onReadTimeout()
{
HttpConversation conversation = this.conversation;
if (conversation != null)
conversation.idleTimeout();
return true;
}
@Override
public void send(Request request, Response.Listener listener)
{
normalizeRequest(request);
HttpConversation conversation = client.conversationFor(request);
this.conversation = conversation;
conversation.prepare(this, request, listener);
conversation.send();
}
private void normalizeRequest(Request request)
{
HttpVersion version = request.version();
HttpFields headers = request.headers();
ContentProvider content = request.content();
// Make sure the path is there
String path = request.path();
if (path.matches("\\s*"))
request.path("/");
// Add content headers
if (content != null)
{
long contentLength = content.length();
if (contentLength >= 0)
{
if (!headers.containsKey(HttpHeader.CONTENT_LENGTH.asString()))
headers.put(HttpHeader.CONTENT_LENGTH, String.valueOf(contentLength));
}
else
{
if (!headers.containsKey(HttpHeader.TRANSFER_ENCODING.asString()))
headers.put(HttpHeader.TRANSFER_ENCODING, "chunked");
}
}
// TODO: decoder headers
// If we are HTTP 1.1, add the Host header
if (version.getVersion() > 10)
{
if (!headers.containsKey(HttpHeader.HOST.asString()))
headers.put(HttpHeader.HOST, request.host() + ":" + request.port());
}
}
@Override
public void onFillable()
{
HttpConversation conversation = this.conversation;
if (conversation != null)
conversation.receive();
else
// TODO test sending white space... we want to consume it but throw if it's not whitespace
LOG.warn("Ready to read response, but no receiver");
}
}

View File

@ -0,0 +1,77 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
public class HttpConversation
{
private final HttpSender sender;
private final HttpReceiver receiver;
private HttpConnection connection;
private Request request;
private Response.Listener listener;
private HttpResponse response;
public HttpConversation()
{
sender = new HttpSender();
receiver = new HttpReceiver();
}
public void prepare(HttpConnection connection, Request request, Response.Listener listener)
{
if (this.connection != null)
throw new IllegalStateException();
this.connection = connection;
this.request = request;
this.listener = listener;
this.response = new HttpResponse(request, listener);
}
public void done()
{
reset();
}
private void reset()
{
connection = null;
request = null;
listener = null;
}
public HttpConnection connection()
{
return connection;
}
public Request request()
{
return request;
}
public Response.Listener listener()
{
return listener;
}
public HttpResponse response()
{
return response;
}
public void send()
{
sender.send(this);
}
public void idleTimeout()
{
receiver.idleTimeout();
}
public void receive()
{
receiver.receive(this);
}
}

View File

@ -0,0 +1,200 @@
//========================================================================
//Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.client;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
public class HttpDestination implements Destination
{
private final AtomicInteger connectionCount = new AtomicInteger();
private final HttpClient client;
private final String scheme;
private final String host;
private final int port;
private final Queue<Response> requests;
private final Queue<Connection> idleConnections;
private final Queue<Connection> activeConnections;
public HttpDestination(HttpClient client, String scheme, String host, int port)
{
this.client = client;
this.scheme = scheme;
this.host = host;
this.port = port;
this.requests = new ArrayBlockingQueue<>(client.getMaxQueueSizePerAddress());
this.idleConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
this.activeConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
}
@Override
public String scheme()
{
return scheme;
}
@Override
public String host()
{
return host;
}
@Override
public int port()
{
return port;
}
@Override
public void send(Request request, Response.Listener listener)
{
if (!scheme.equals(request.scheme()))
throw new IllegalArgumentException("Invalid request scheme " + request.scheme() + " for destination " + this);
if (!host.equals(request.host()))
throw new IllegalArgumentException("Invalid request host " + request.host() + " for destination " + this);
if (port != request.port())
throw new IllegalArgumentException("Invalid request port " + request.port() + " for destination " + this);
HttpResponse response = new HttpResponse(request, listener);
if (client.isRunning())
{
if (requests.offer(response))
{
if (!client.isRunning() && requests.remove(response))
{
throw new RejectedExecutionException(HttpClient.class.getSimpleName() + " is shutting down");
}
else
{
Request.Listener requestListener = request.listener();
notifyRequestQueued(requestListener, request);
ensureConnection();
}
}
else
{
throw new RejectedExecutionException("Max requests per address " + client.getMaxQueueSizePerAddress() + " exceeded");
}
}
}
private void notifyRequestQueued(Request.Listener listener, Request request)
{
try
{
if (listener != null)
listener.onQueued(request);
}
catch (Exception x)
{
// TODO: log or abort request send ?
}
}
private void ensureConnection()
{
int maxConnections = client.getMaxConnectionsPerAddress();
while (true)
{
int count = connectionCount.get();
if (count >= maxConnections)
break;
if (connectionCount.compareAndSet(count, count + 1))
{
newConnection(new Callback<Connection>()
{
@Override
public void completed(Connection connection)
{
dispatch(connection);
}
@Override
public void failed(Connection connection, Throwable x)
{
// TODO: what here ?
}
});
break;
}
}
}
public Future<Connection> newConnection()
{
FutureCallback<Connection> result = new FutureCallback<>();
newConnection(result);
return result;
}
private void newConnection(Callback<Connection> callback)
{
client.newConnection(this, callback);
}
/**
* Responsibility of this method is to dequeue a request, associate it to the given {@code connection}
* and dispatch a thread to execute the request.
*
* This can be done in several ways: one could be to
* @param connection
*/
protected void dispatch(final Connection connection)
{
final Response response = requests.poll();
if (response == null)
{
idleConnections.offer(connection);
}
else
{
activeConnections.offer(connection);
client.getExecutor().execute(new Runnable()
{
@Override
public void run()
{
connection.send(response.request(), response.listener());
}
});
}
}
// TODO: 1. We must do queuing of requests in any case, because we cannot do blocking connect
// TODO: 2. We must be non-blocking connect, therefore we need to queue
// Connections should compete for the queue of requests in separate threads
// that poses a problem of thread pool size: if < maxConnections we're starving
//
// conn1 is executed, takes on the queue => I need at least one thread per destination
// we need to queue the request, pick an idle connection, then execute { conn.send(request, listener) }
// if I create manually the connection, then I call send(request, listener)
// Other ways ?
}

View File

@ -0,0 +1,208 @@
package org.eclipse.jetty.client;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
{
private static final Logger LOG = Log.getLogger(HttpReceiver.class);
private final HttpParser parser = new HttpParser(this);
private HttpConversation conversation;
public void receive(HttpConversation conversation)
{
if (this.conversation != null)
throw new IllegalStateException();
this.conversation = conversation;
HttpConnection connection = conversation.connection();
HttpClient client = connection.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
EndPoint endPoint = connection.getEndPoint();
try
{
while (true)
{
int read = endPoint.fill(buffer);
if (read > 0)
{
parser.parseNext(buffer);
// TODO: response done, reset ?
}
else if (read == 0)
{
connection.fillInterested();
break;
}
else
{
parser.shutdownInput();
break;
}
}
}
catch (IOException x)
{
LOG.debug(x);
bufferPool.release(buffer);
fail(x);
}
}
@Override
public boolean startResponse(HttpVersion version, int status, String reason)
{
// Probe the protocol listeners
// HttpClient client = connection.getHttpClient();
// listener = client.find(status); // TODO
// listener = new RedirectionListener(connection);
// if (listener == null)
// listener = applicationListener;
HttpResponse response = conversation.response();
response.version(version).status(status).reason(reason);
notifyBegin(conversation.listener(), response);
return false;
}
@Override
public boolean parsedHeader(HttpHeader header, String name, String value)
{
conversation.response().headers().put(name, value);
return false;
}
@Override
public boolean headerComplete()
{
notifyHeaders(conversation.listener(), conversation.response());
return false;
}
@Override
public boolean content(ByteBuffer buffer)
{
notifyContent(conversation.listener(), conversation.response(), buffer);
return false;
}
@Override
public boolean messageComplete(long contentLength)
{
success();
return false;
}
protected void success()
{
Response.Listener listener = conversation.listener();
Response response = conversation.response();
conversation.done();
notifySuccess(listener, response);
}
protected void fail(Throwable failure)
{
Response.Listener listener = conversation.listener();
Response response = conversation.response();
conversation.done();
notifyFailure(listener, response, failure);
}
@Override
public boolean earlyEOF()
{
fail(new EOFException());
return false;
}
@Override
public void badMessage(int status, String reason)
{
conversation.response().status(status).reason(reason);
fail(new HttpResponseException());
}
private void notifyBegin(Response.Listener listener, HttpResponse response)
{
try
{
if (listener != null)
listener.onBegin(response);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyHeaders(Response.Listener listener, HttpResponse response)
{
try
{
if (listener != null)
listener.onHeaders(response);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyContent(Response.Listener listener, HttpResponse response, ByteBuffer buffer)
{
try
{
if (listener != null)
listener.onContent(response, buffer);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifySuccess(Response.Listener listener, Response response)
{
try
{
if (listener != null)
listener.onSuccess(response);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyFailure(Response.Listener listener, Response response, Throwable failure)
{
try
{
if (listener != null)
listener.onFailure(response, failure);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void idleTimeout()
{
fail(new TimeoutException());
}
}

View File

@ -0,0 +1,259 @@
//========================================================================
//Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.client;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.api.ContentDecoder;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.FutureCallback;
public class HttpRequest implements Request
{
private static final AtomicLong ids = new AtomicLong();
private final HttpClient client;
private final long id;
private String scheme;
private final String host;
private final int port;
private String path;
private HttpMethod method;
private HttpVersion version;
private String agent;
private long idleTimeout;
private Response response;
private Listener listener;
private ContentProvider content;
private final HttpFields headers = new HttpFields();
public HttpRequest(HttpClient client, URI uri)
{
this(client, ids.incrementAndGet(), uri);
}
protected HttpRequest(HttpClient client, long id, URI uri)
{
this.client = client;
this.id = id;
scheme(uri.getScheme());
host = uri.getHost();
port = uri.getPort();
path(uri.getPath());
// TODO: query params
}
@Override
public long id()
{
return id;
}
@Override
public String scheme()
{
return scheme;
}
@Override
public Request scheme(String scheme)
{
this.scheme = scheme;
return this;
}
@Override
public String host()
{
return host;
}
@Override
public int port()
{
return port;
}
@Override
public HttpMethod method()
{
return method;
}
@Override
public Request method(HttpMethod method)
{
this.method = method;
return this;
}
@Override
public String path()
{
return path;
}
@Override
public Request path(String path)
{
this.path = path;
return this;
}
@Override
public HttpVersion version()
{
return version;
}
@Override
public Request version(HttpVersion version)
{
this.version = version;
return this;
}
@Override
public Request param(String name, String value)
{
return this;
}
@Override
public Map<String, String> params()
{
return null;
}
@Override
public String agent()
{
return agent;
}
@Override
public Request agent(String userAgent)
{
this.agent = userAgent;
return this;
}
@Override
public Request header(String name, String value)
{
headers.add(name, value);
return this;
}
@Override
public HttpFields headers()
{
return headers;
}
@Override
public Listener listener()
{
return listener;
}
@Override
public Request listener(Request.Listener listener)
{
this.listener = listener;
return this;
}
@Override
public ContentProvider content()
{
return content;
}
@Override
public Request content(ContentProvider content)
{
this.content = content;
return this;
}
@Override
public Request decoder(ContentDecoder decoder)
{
return this;
}
@Override
public Request cookie(String key, String value)
{
return this;
}
@Override
public Request followRedirects(boolean follow)
{
return this;
}
@Override
public long idleTimeout()
{
return idleTimeout;
}
@Override
public Request idleTimeout(long timeout)
{
this.idleTimeout = timeout;
return this;
}
@Override
public Future<Response> send()
{
final FutureCallback<Response> result = new FutureCallback<>();
BufferingResponseListener listener = new BufferingResponseListener()
{
@Override
public void onSuccess(Response response)
{
super.onSuccess(response);
result.completed(response);
}
@Override
public void onFailure(Response response, Throwable failure)
{
super.onFailure(response, failure);
result.failed(response, failure);
}
};
send(listener);
return result;
}
@Override
public void send(final Response.Listener listener)
{
client.send(this, listener);
}
}

View File

@ -13,58 +13,82 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.io.InputStream;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Headers;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
public class StandardResponse extends FutureCallback<Response> implements Response public class HttpResponse extends FutureCallback<Response> implements Response
{ {
private final HttpFields headers = new HttpFields();
private final Request request; private final Request request;
private final Listener listener; private final Listener listener;
private HttpVersion version;
private int status;
private String reason;
public StandardResponse(Request request, Response.Listener listener) public HttpResponse(Request request, Response.Listener listener)
{ {
this.request = request; this.request = request;
this.listener = listener; this.listener = listener;
} }
@Override public HttpVersion version()
public int getStatus()
{ {
return 0; return version;
}
public HttpResponse version(HttpVersion version)
{
this.version = version;
return this;
} }
@Override @Override
public Headers getHeaders() public int status()
{ {
return null; return status;
}
public HttpResponse status(int status)
{
this.status = status;
return this;
}
public String reason()
{
return reason;
}
public HttpResponse reason(String reason)
{
this.reason = reason;
return this;
} }
@Override @Override
public Request getRequest() public HttpFields headers()
{
return headers;
}
@Override
public Request request()
{ {
return request; return request;
} }
@Override @Override
public ContentProvider content() public Listener listener()
{ {
return null; return listener;
}
@Override
public InputStream contentAsStream()
{
return null;
} }
@Override @Override
public void abort() public void abort()
{ {
request.abort(); // request.abort();
} }
} }

View File

@ -0,0 +1,28 @@
package org.eclipse.jetty.client;
public class HttpResponseException extends RuntimeException
{
public HttpResponseException()
{
}
public HttpResponseException(String message)
{
super(message);
}
public HttpResponseException(String message, Throwable cause)
{
super(message, cause);
}
public HttpResponseException(Throwable cause)
{
super(cause);
}
public HttpResponseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace)
{
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,319 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
class HttpSender
{
private static final Logger LOG = Log.getLogger(HttpSender.class);
private final HttpGenerator generator = new HttpGenerator();
private HttpConversation conversation;
private long contentLength;
private Iterator<ByteBuffer> contentChunks;
private ByteBuffer header;
private ByteBuffer chunk;
private boolean requestHeadersComplete;
public void send(HttpConversation conversation)
{
this.conversation = conversation;
ContentProvider content = conversation.request().content();
this.contentLength = content == null ? -1 : content.length();
this.contentChunks = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
send();
}
private void send()
{
try
{
HttpConnection connection = conversation.connection();
EndPoint endPoint = connection.getEndPoint();
HttpClient client = connection.getHttpClient();
ByteBufferPool byteBufferPool = client.getByteBufferPool();
HttpGenerator.RequestInfo info = null;
ByteBuffer content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
boolean lastContent = !contentChunks.hasNext();
while (true)
{
HttpGenerator.Result result = generator.generateRequest(info, header, chunk, content, lastContent);
switch (result)
{
case NEED_INFO:
{
Request request = conversation.request();
info = new HttpGenerator.RequestInfo(request.version(), request.headers(), contentLength, request.method().asString(), request.path());
break;
}
case NEED_HEADER:
{
header = byteBufferPool.acquire(client.getRequestBufferSize(), false);
break;
}
case NEED_CHUNK:
{
chunk = byteBufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
break;
}
case FLUSH:
{
StatefulExecutorCallback callback = new StatefulExecutorCallback(client.getExecutor())
{
@Override
protected void pendingCompleted()
{
notifyRequestHeadersComplete();
send();
}
@Override
protected void failed(Throwable x)
{
fail(x);
}
};
if (header == null)
header = BufferUtil.EMPTY_BUFFER;
if (chunk == null)
chunk = BufferUtil.EMPTY_BUFFER;
if (content == null)
content = BufferUtil.EMPTY_BUFFER;
endPoint.write(null, callback, header, chunk, content);
if (callback.pending())
return;
if (callback.completed())
{
if (!requestHeadersComplete)
{
requestHeadersComplete = true;
notifyRequestHeadersComplete();
}
releaseBuffers();
content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
lastContent = !contentChunks.hasNext();
}
break;
}
case SHUTDOWN_OUT:
{
endPoint.shutdownOutput();
break;
}
case CONTINUE:
{
break;
}
case DONE:
{
if (generator.isEnd())
success();
return;
}
default:
{
throw new IllegalStateException("Unknown result " + result);
}
}
}
}
catch (IOException x)
{
LOG.debug(x);
fail(x);
}
finally
{
releaseBuffers();
}
}
protected void success()
{
notifyRequestSuccess();
}
protected void fail(Throwable x)
{
BufferUtil.clear(header);
BufferUtil.clear(chunk);
releaseBuffers();
notifyRequestFailure(x);
notifyResponseFailure(x);
conversation.connection().getEndPoint().shutdownOutput();
generator.abort();
}
private void releaseBuffers()
{
ByteBufferPool byteBufferPool = conversation.connection().getHttpClient().getByteBufferPool();
if (!BufferUtil.hasContent(header))
{
byteBufferPool.release(header);
header = null;
}
if (!BufferUtil.hasContent(chunk))
{
byteBufferPool.release(chunk);
chunk = null;
}
}
private void notifyRequestHeadersComplete()
{
Request request = conversation.request();
Request.Listener listener = request.listener();
try
{
if (listener != null)
listener.onHeaders(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyRequestSuccess()
{
Request request = conversation.request();
Request.Listener listener = request.listener();
try
{
if (listener != null)
listener.onSuccess(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyRequestFailure(Throwable x)
{
Request request = conversation.request();
Request.Listener listener = request.listener();
try
{
if (listener != null)
listener.onFailure(request, x);
}
catch (Exception xx)
{
LOG.info("Exception while notifying listener " + listener, xx);
}
}
private void notifyResponseFailure(Throwable x)
{
Response.Listener listener = conversation.listener();
try
{
if (listener != null)
listener.onFailure(null, x);
}
catch (Exception xx)
{
LOG.info("Exception while notifying listener " + listener, xx);
}
}
private static abstract class StatefulExecutorCallback implements Callback<Void>, Runnable
{
private final AtomicReference<State> state = new AtomicReference<>(State.INCOMPLETE);
private final Executor executor;
private StatefulExecutorCallback(Executor executor)
{
this.executor = executor;
}
@Override
public final void completed(final Void context)
{
State previous = state.get();
while (true)
{
if (state.compareAndSet(previous, State.COMPLETE))
break;
previous = state.get();
}
if (previous == State.PENDING)
executor.execute(this);
}
@Override
public final void run()
{
pendingCompleted();
}
protected abstract void pendingCompleted();
@Override
public final void failed(Void context, final Throwable x)
{
State previous = state.get();
while (true)
{
if (state.compareAndSet(previous, State.FAILED))
break;
previous = state.get();
}
if (previous == State.PENDING)
{
executor.execute(new Runnable()
{
@Override
public void run()
{
failed(x);
}
});
}
else
{
failed(x);
}
}
protected abstract void failed(Throwable x);
public boolean pending()
{
return state.compareAndSet(State.INCOMPLETE, State.PENDING);
}
public boolean completed()
{
return state.get() == State.COMPLETE;
}
public boolean failed()
{
return state.get() == State.FAILED;
}
private enum State
{
INCOMPLETE, PENDING, COMPLETE, FAILED
}
}
}

View File

@ -0,0 +1,91 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.eclipse.jetty.client.api.ContentProvider;
public class PathContentProvider implements ContentProvider
{
private final Path filePath;
private final long fileSize;
private final int bufferSize;
public PathContentProvider(Path filePath) throws IOException
{
this(filePath, 4096);
}
public PathContentProvider(Path filePath, int bufferSize) throws IOException
{
if (!Files.isRegularFile(filePath))
throw new NoSuchFileException(filePath.toString());
if (!Files.isReadable(filePath))
throw new AccessDeniedException(filePath.toString());
this.filePath = filePath;
this.fileSize = Files.size(filePath);
this.bufferSize = bufferSize;
}
@Override
public long length()
{
return fileSize;
}
@Override
public Iterator<ByteBuffer> iterator()
{
return new LazyIterator();
}
private class LazyIterator implements Iterator<ByteBuffer>
{
private final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
private SeekableByteChannel channel;
private long position;
@Override
public boolean hasNext()
{
return position < length();
}
@Override
public ByteBuffer next()
{
try
{
if (channel == null)
channel = Files.newByteChannel(filePath, StandardOpenOption.READ);
buffer.clear();
int read = channel.read(buffer);
if (read < 0)
throw new NoSuchElementException();
position += read;
buffer.flip();
return buffer;
}
catch (IOException x)
{
throw (NoSuchElementException)new NoSuchElementException().initCause(x);
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -0,0 +1,42 @@
package org.eclipse.jetty.client;
import java.net.URI;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
public class RedirectionListener extends Response.Listener.Adapter
{
private final HttpConnection connection;
public RedirectionListener(HttpConnection connection)
{
this.connection = connection;
}
@Override
public void onSuccess(Response response)
{
switch (response.status())
{
case 301: // GET or HEAD only allowed, keep the method
{
break;
}
case 302:
case 303: // use GET for next request
{
String location = response.headers().get("location");
HttpClient httpClient = connection.getHttpClient();
Request redirect = httpClient.newRequest(response.request().id(), URI.create(location));
redirect.send(this);
}
}
}
@Override
public void onFailure(Response response, Throwable failure)
{
// TODO
}
}

View File

@ -1,98 +0,0 @@
//========================================================================
//Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.client;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Address;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
public class StandardDestination implements Destination
{
private final HTTPClient client;
private final Address address;
private final Queue<Response> requests;
private final Queue<Connection> connections;
public StandardDestination(HTTPClient client, Address address)
{
this.client = client;
this.address = address;
this.requests = new ArrayBlockingQueue<>(client.getMaxQueueSizePerAddress());
this.connections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
}
@Override
public Address address()
{
return address;
}
@Override
public Connection connect(long timeout, TimeUnit unit)
{
return null;
}
@Override
public Future<Response> send(Request request, Response.Listener listener)
{
if (!address.equals(request.address()))
throw new IllegalArgumentException("Invalid request address " + request.address() + " for destination " + this);
StandardResponse response = new StandardResponse(request, listener);
Connection connection = connections.poll();
if (connection == null)
newConnection();
if (!requests.offer(response))
throw new RejectedExecutionException("Max requests per address " + client.getMaxQueueSizePerAddress() + " exceeded");
return response;
}
protected Future<Connection> newConnection()
{
return client.newConnection(this);
}
protected Connection getConnection()
{
Connection connection = connections.poll();
if (connection == null)
{
client.
}
return connection;
}
// TODO: 1. We must do queuing of requests in any case, because we cannot do blocking connect
// TODO: 2. We must be non-blocking connect, therefore we need to queue
// Connections should compete for the queue of requests in separate threads
// that poses a problem of thread pool size: if < maxConnections we're starving
//
/**
* I need a Future<Connection> connect(), and a void connect(Callback<Connection>)
*/
}

View File

@ -1,126 +0,0 @@
//========================================================================
//Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.client;
import java.io.File;
import java.net.URI;
import java.util.concurrent.Future;
import org.eclipse.jetty.client.api.Address;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.ContentDecoder;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
public class StandardRequest implements Request, Request.Builder
{
private final HTTPClient client;
private final URI uri;
private String method;
private String agent;
private Response response;
public StandardRequest(HTTPClient client, URI uri)
{
this.client = client;
this.uri = uri;
}
@Override
public Request.Builder method(String method)
{
this.method = method;
return this;
}
@Override
public Request.Builder agent(String userAgent)
{
this.agent = userAgent;
return this;
}
@Override
public Request.Builder header(String name, String value)
{
return this;
}
@Override
public Request.Builder listener(Request.Listener listener)
{
return this;
}
@Override
public Request.Builder file(File file)
{
return this;
}
@Override
public Request.Builder content(ContentProvider buffer)
{
return this;
}
@Override
public Request.Builder decoder(ContentDecoder decoder)
{
return this;
}
@Override
public Request.Builder param(String name, String value)
{
return this;
}
@Override
public Request.Builder cookie(String key, String value)
{
return this;
}
@Override
public Request.Builder authentication(Authentication authentication)
{
return this;
}
@Override
public Request.Builder followRedirects(boolean follow)
{
return this;
}
@Override
public Request build()
{
return this;
}
@Override
public Future<Response> send()
{
return send(null);
}
@Override
public Future<Response> send(final Response.Listener listener)
{
return client.send(this, listener);
}
}

View File

@ -19,7 +19,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
public class StreamResponseListener extends Response.Listener.Adapter public class StreamingResponseListener extends Response.Listener.Adapter
{ {
public Response get(long timeout, TimeUnit seconds) public Response get(long timeout, TimeUnit seconds)
{ {

View File

@ -13,9 +13,7 @@
package org.eclipse.jetty.client.api; package org.eclipse.jetty.client.api;
import java.util.concurrent.Future;
public interface Connection extends AutoCloseable public interface Connection extends AutoCloseable
{ {
Future<Response> send(Request request, Response.Listener listener); void send(Request request, Response.Listener listener);
} }

View File

@ -13,6 +13,9 @@
package org.eclipse.jetty.client.api; package org.eclipse.jetty.client.api;
public interface ContentProvider import java.nio.ByteBuffer;
public interface ContentProvider extends Iterable<ByteBuffer>
{ {
long length();
} }

View File

@ -14,13 +14,16 @@
package org.eclipse.jetty.client.api; package org.eclipse.jetty.client.api;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public interface Destination public interface Destination
{ {
Connection connect(long timeout, TimeUnit unit); String scheme();
Future<Response> send(Request request, Response.Listener listener); String host();
Address address(); int port();
Future<Connection> newConnection();
void send(Request request, Response.Listener listener);
} }

View File

@ -1,30 +0,0 @@
//========================================================================
//Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.client.api;
public class Headers
{
public Header get(String name)
{
return null;
}
public static class Header
{
public int valueAsInt()
{
return 0;
}
}
}

View File

@ -13,53 +13,74 @@
package org.eclipse.jetty.client.api; package org.eclipse.jetty.client.api;
import java.io.File; import java.util.Map;
import java.net.URI;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
public interface Request public interface Request
{ {
long id();
String scheme();
Request scheme(String scheme);
String host();
int port();
HttpMethod method();
Request method(HttpMethod method);
String path();
Request path(String path);
HttpVersion version();
Request version(HttpVersion version);
Map<String, String> params();
Request param(String name, String value);
HttpFields headers();
Request header(String name, String value);
ContentProvider content();
Request content(ContentProvider buffer);
Request decoder(ContentDecoder decoder);
Request cookie(String key, String value);
String agent();
Request agent(String userAgent);
long idleTimeout();
Request idleTimeout(long timeout);
Request followRedirects(boolean follow);
Listener listener();
Request listener(Listener listener);
Future<Response> send(); Future<Response> send();
Future<Response> send(Response.Listener listener); void send(Response.Listener listener);
URI uri();
void abort();
/**
* <p>A builder for requests</p>.
*/
public interface Builder
{
Builder method(String method);
Builder header(String name, String value);
Builder listener(Request.Listener listener);
Builder file(File file);
Builder content(ContentProvider buffer);
Builder decoder(ContentDecoder decoder);
Builder param(String name, String value);
Builder cookie(String key, String value);
Builder authentication(Authentication authentication);
Builder agent(String userAgent);
Builder followRedirects(boolean follow);
Request build();
}
public interface Listener public interface Listener
{ {
public void onQueue(Request request); public void onQueued(Request request);
public void onBegin(Request request); public void onBegin(Request request);
@ -67,16 +88,14 @@ public interface Request
public void onFlush(Request request, int bytes); public void onFlush(Request request, int bytes);
public void onComplete(Request request); public void onSuccess(Request request);
public void onException(Request request, Exception exception); public void onFailure(Request request, Throwable failure);
public void onEnd(Request request);
public static class Adapter implements Listener public static class Adapter implements Listener
{ {
@Override @Override
public void onQueue(Request request) public void onQueued(Request request)
{ {
} }
@ -96,17 +115,12 @@ public interface Request
} }
@Override @Override
public void onComplete(Request request) public void onSuccess(Request request)
{ {
} }
@Override @Override
public void onException(Request request, Exception exception) public void onFailure(Request request, Throwable failure)
{
}
@Override
public void onEnd(Request request)
{ {
} }
} }

View File

@ -13,85 +13,63 @@
package org.eclipse.jetty.client.api; package org.eclipse.jetty.client.api;
import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
public interface Response public interface Response
{ {
int getStatus(); Request request();
Headers getHeaders(); Listener listener();
Request getRequest(); HttpVersion version();
ContentProvider content(); int status();
InputStream contentAsStream(); String reason();
HttpFields headers();
void abort(); void abort();
public interface Listener public interface Listener
{ {
public boolean onBegin(Response response, String version, int code, String message); public void onBegin(Response response);
public boolean onHeader(Response response, String name, String value); public void onHeaders(Response response);
public boolean onHeaders(Response response); public void onContent(Response response, ByteBuffer content);
public boolean onContent(Response response, ByteBuffer content); public void onSuccess(Response response);
public boolean onTrailer(Response response, String name, String value); public void onFailure(Response response, Throwable failure);
public void onComplete(Response response);
public void onException(Response response, Exception exception);
public void onEnd(Response response);
public static class Adapter implements Listener public static class Adapter implements Listener
{ {
@Override @Override
public boolean onBegin(Response response, String version, int code, String message) public void onBegin(Response response)
{
return false;
}
@Override
public boolean onHeader(Response response, String name, String value)
{
return false;
}
@Override
public boolean onHeaders(Response response)
{
return false;
}
@Override
public boolean onContent(Response response, ByteBuffer content)
{
return false;
}
@Override
public boolean onTrailer(Response response, String name, String value)
{
return false;
}
@Override
public void onComplete(Response response)
{ {
} }
@Override @Override
public void onException(Response response, Exception exception) public void onHeaders(Response response)
{ {
} }
@Override @Override
public void onEnd(Response response) public void onContent(Response response, ByteBuffer content)
{
}
@Override
public void onSuccess(Response response)
{
}
@Override
public void onFailure(Response response, Throwable failure)
{ {
} }
} }

View File

@ -0,0 +1,35 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server;
import org.junit.After;
public class AbstractHttpClientTest
{
protected Server server;
protected HttpClient client;
protected NetworkConnector connector;
public void start(Handler handler) throws Exception
{
server = new Server();
connector = new SelectChannelConnector(server);
server.addConnector(connector);
server.setHandler(handler);
server.start();
client = new HttpClient();
client.start();
}
@After
public void destroy() throws Exception
{
if (client != null)
client.stop();
if (server != null)
server.stop();
}
}

View File

@ -21,43 +21,12 @@ import javax.servlet.http.HttpServletResponse;
import junit.framework.Assert; import junit.framework.Assert;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
public class HTTPClientTest public class HttpClientTest extends AbstractHttpClientTest
{ {
private Server server;
private HTTPClient client;
private Connector.NetConnector connector;
public void start(Handler handler) throws Exception
{
server = new Server();
connector = new SelectChannelConnector();
server.addConnector(connector);
server.setHandler(handler);
server.start();
client = new HTTPClient();
client.start();
}
@After
public void destroy() throws Exception
{
client.stop();
client.join(5, TimeUnit.SECONDS);
server.stop();
server.join();
}
@Test @Test
public void testGETNoResponseContent() throws Exception public void testGETNoResponseContent() throws Exception
{ {
@ -73,6 +42,6 @@ public class HTTPClientTest
Response response = client.GET("http://localhost:" + connector.getLocalPort()).get(5, TimeUnit.SECONDS); Response response = client.GET("http://localhost:" + connector.getLocalPort()).get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response); Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(200, response.status());
} }
} }

View File

@ -0,0 +1,160 @@
package org.eclipse.jetty.client;
public class HttpReceiverTest
{
// private HttpClient client;
//
// @Before
// public void init() throws Exception
// {
// client = new HttpClient();
// client.start();
// }
//
// @After
// public void destroy() throws Exception
// {
// client.stop();
// }
//
// @Test
// public void test_Receive_NoResponseContent() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpConnection connection = new HttpConnection(client, endPoint);
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: 0\r\n" +
// "\r\n");
// final AtomicReference<Response> responseRef = new AtomicReference<>();
// final CountDownLatch latch = new CountDownLatch(1);
// HttpReceiver receiver = new HttpReceiver(connection, null, new Response.Listener.Adapter()
// {
// @Override
// public void onSuccess(Response response)
// {
// responseRef.set(response);
// latch.countDown();
// }
// });
// receiver.receive(connection);
//
// Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Response response = responseRef.get();
// Assert.assertNotNull(response);
// Assert.assertEquals(200, response.status());
// Assert.assertEquals("OK", response.reason());
// Assert.assertSame(HttpVersion.HTTP_1_1, response.version());
// HttpFields headers = response.headers();
// Assert.assertNotNull(headers);
// Assert.assertEquals(1, headers.size());
// Assert.assertEquals("0", headers.get(HttpHeader.CONTENT_LENGTH));
// }
//
// @Test
// public void test_Receive_ResponseContent() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpConnection connection = new HttpConnection(client, endPoint);
// String content = "0123456789ABCDEF";
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: " + content.length() + "\r\n" +
// "\r\n" +
// content);
// BufferingResponseListener listener = new BufferingResponseListener();
// HttpReceiver receiver = new HttpReceiver(connection, null, listener);
// receiver.receive(connection);
//
// Response response = listener.await(5, TimeUnit.SECONDS);
// Assert.assertNotNull(response);
// Assert.assertEquals(200, response.status());
// Assert.assertEquals("OK", response.reason());
// Assert.assertSame(HttpVersion.HTTP_1_1, response.version());
// HttpFields headers = response.headers();
// Assert.assertNotNull(headers);
// Assert.assertEquals(1, headers.size());
// Assert.assertEquals(String.valueOf(content.length()), headers.get(HttpHeader.CONTENT_LENGTH));
// String received = listener.contentAsString("UTF-8");
// Assert.assertEquals(content, received);
// }
//
// @Test
// public void test_Receive_ResponseContent_EarlyEOF() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpConnection connection = new HttpConnection(client, endPoint);
// String content1 = "0123456789";
// String content2 = "ABCDEF";
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: " + (content1.length() + content2.length()) + "\r\n" +
// "\r\n" +
// content1);
// BufferingResponseListener listener = new BufferingResponseListener();
// HttpReceiver receiver = new HttpReceiver(connection, null, listener);
// receiver.receive(connection);
// endPoint.setInputEOF();
// receiver.receive(connection);
//
// try
// {
// listener.await(5, TimeUnit.SECONDS);
// Assert.fail();
// }
// catch (ExecutionException e)
// {
// Assert.assertTrue(e.getCause() instanceof EOFException);
// }
// }
//
// @Test
// public void test_Receive_ResponseContent_IdleTimeout() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpConnection connection = new HttpConnection(client, endPoint);
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: 1\r\n" +
// "\r\n");
// BufferingResponseListener listener = new BufferingResponseListener();
// HttpReceiver receiver = new HttpReceiver(connection, null, listener);
// receiver.receive(connection);
// // Simulate an idle timeout
// receiver.idleTimeout();
//
// try
// {
// listener.await(5, TimeUnit.SECONDS);
// Assert.fail();
// }
// catch (ExecutionException e)
// {
// Assert.assertTrue(e.getCause() instanceof TimeoutException);
// }
// }
//
// @Test
// public void test_Receive_BadResponse() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpConnection connection = new HttpConnection(client, endPoint);
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: A\r\n" +
// "\r\n");
// BufferingResponseListener listener = new BufferingResponseListener();
// HttpReceiver receiver = new HttpReceiver(connection, null, listener);
// receiver.receive(connection);
//
// try
// {
// listener.await(5, TimeUnit.SECONDS);
// Assert.fail();
// }
// catch (ExecutionException e)
// {
// Assert.assertTrue(e.getCause() instanceof HttpResponseException);
// }
// }
}

View File

@ -0,0 +1,266 @@
package org.eclipse.jetty.client;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class HttpSenderTest
{
private HttpClient client;
@Before
public void init() throws Exception
{
client = new HttpClient();
client.start();
}
@After
public void destroy() throws Exception
{
client.stop();
}
@Test
public void test_Send_NoRequestContent() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
httpRequest.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(httpRequest, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n"));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_NoRequestContent_IncompleteFlush() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
final CountDownLatch headersLatch = new CountDownLatch(1);
httpRequest.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
});
connection.send(httpRequest, null);
// This take will free space in the buffer and allow for the write to complete
StringBuilder request = new StringBuilder(endPoint.takeOutputString());
// Wait for the write to complete
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
request.append(endPoint.takeOutputString());
String requestString = request.toString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n"));
}
@Test
public void test_Send_NoRequestContent_Exception() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// Shutdown output to trigger the exception on write
endPoint.shutdownOutput();
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
httpRequest.listener(new Request.Listener.Adapter()
{
@Override
public void onFailure(Request request, Throwable x)
{
failureLatch.countDown();
}
});
connection.send(httpRequest, new Response.Listener.Adapter()
{
@Override
public void onFailure(Response response, Throwable failure)
{
failureLatch.countDown();
}
});
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_NoRequestContent_IncompleteFlush_Exception() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
httpRequest.listener(new Request.Listener.Adapter()
{
@Override
public void onFailure(Request request, Throwable x)
{
failureLatch.countDown();
}
});
connection.send(httpRequest, new Response.Listener.Adapter()
{
@Override
public void onFailure(Response response, Throwable failure)
{
failureLatch.countDown();
}
});
// Shutdown output to trigger the exception on write
endPoint.shutdownOutput();
// This take will free space in the buffer and allow for the write to complete
// although it will fail because we shut down the output
endPoint.takeOutputString();
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_SmallRequestContent_InOneBuffer() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
String content = "abcdef";
httpRequest.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes("UTF-8"))));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
httpRequest.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(httpRequest, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_SmallRequestContent_InTwoBuffers() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "abcdef";
httpRequest.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8"))));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
httpRequest.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(httpRequest, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content1 + content2));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_SmallRequestContent_Chunked_InTwoChunks() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpConnection connection = new HttpConnection(client, endPoint);
Request httpRequest = new HttpRequest(client, URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "ABCDEF";
httpRequest.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8")))
{
@Override
public long length()
{
return -1;
}
});
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
httpRequest.listener(new Request.Listener.Adapter()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(httpRequest, null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
String content = Integer.toHexString(content1.length()).toUpperCase() + "\r\n" + content1 + "\r\n";
content += Integer.toHexString(content2.length()).toUpperCase() + "\r\n" + content2 + "\r\n";
content += "0\r\n\r\n";
Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -0,0 +1,58 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class RedirectionTest extends AbstractHttpClientTest
{
@Before
public void init() throws Exception
{
start(new RedirectHandler());
}
@Test
public void test303() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/303/done")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
private class RedirectHandler extends AbstractHandler
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
String[] paths = target.split("/", 3);
int status = Integer.parseInt(paths[1]);
response.setStatus(status);
response.setHeader("Location", request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + "/" + paths[2]);
}
catch (NumberFormatException x)
{
response.setStatus(200);
}
finally
{
baseRequest.setHandled(true);
}
}
}
}

View File

@ -15,8 +15,10 @@ package org.eclipse.jetty.client.api;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.impl.client.DefaultHttpClient;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@Ignore
public class ApacheUsage public class ApacheUsage
{ {
@Test @Test

View File

@ -22,8 +22,10 @@ import com.ning.http.client.Cookie;
import com.ning.http.client.Realm; import com.ning.http.client.Realm;
import com.ning.http.client.Request; import com.ning.http.client.Request;
import com.ning.http.client.Response; import com.ning.http.client.Response;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@Ignore
public class NingUsage public class NingUsage
{ {
@Test @Test

View File

@ -13,14 +13,21 @@
package org.eclipse.jetty.client.api; package org.eclipse.jetty.client.api;
import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.InputStream; import java.io.InputStream;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.HTTPClient; import org.eclipse.jetty.client.BufferingResponseListener;
import org.eclipse.jetty.client.StreamResponseListener; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.PathContentProvider;
import org.eclipse.jetty.client.StreamingResponseListener;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.junit.Assert;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -28,122 +35,134 @@ import org.junit.Test;
public class Usage public class Usage
{ {
@Test @Test
public void testSimpleBlockingGET() throws Exception public void testGETBlocking_ShortAPI() throws Exception
{ {
HTTPClient client = new HTTPClient(); HttpClient client = new HttpClient();
Future<Response> responseFuture = client.GET("http://localhost:8080/foo"); Future<Response> responseFuture = client.GET("http://localhost:8080/foo");
Response response = responseFuture.get(); Response response = responseFuture.get();
response.getStatus(); // 200 Assert.assertEquals(200, response.status());
// Headers abstraction needed for: // Headers abstraction needed for:
// 1. case insensitivity // 1. case insensitivity
// 2. multi values // 2. multi values
// 3. value conversion // 3. value conversion
// Reuse SPDY's ? // Reuse SPDY's ?
response.getHeaders().get("Content-Length").valueAsInt(); response.headers().get("Content-Length");
} }
@Test @Test
public void testBlockingGET() throws Exception public void testGETBlocking() throws Exception
{ {
HTTPClient client = new HTTPClient(); HttpClient client = new HttpClient();
// Address must be provided, it's the only thing non defaultable // Address must be provided, it's the only thing non defaultable
Request.Builder builder = client.builder("localhost:8080"); Request request = client.newRequest("localhost", 8080)
Future<Response> responseFuture = builder.method("GET").path("/").header("Origin", "localhost").build().send(); .scheme("https")
responseFuture.get(); .method(HttpMethod.GET)
.path("/uri")
.version(HttpVersion.HTTP_1_1)
.param("a", "b")
.header("X-Header", "Y-value")
.agent("Jetty HTTP Client")
.cookie("cookie1", "value1")
.decoder(null)
.content(null)
.idleTimeout(5000L);
Future<Response> responseFuture = request.send();
Response response = responseFuture.get();
Assert.assertEquals(200, response.status());
} }
@Test @Test
public void testSimpleAsyncGET() throws Exception public void testGETAsync() throws Exception
{ {
HTTPClient client = new HTTPClient(); HttpClient client = new HttpClient();
client.builder("localhost:8080").method("GET").path("/").header("Origin", "localhost").build().send(new Response.Listener.Adapter() final AtomicReference<Response> responseRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", 8080).send(new Response.Listener.Adapter()
{ {
@Override @Override
public void onEnd(Response response) public void onSuccess(Response response)
{ {
responseRef.set(response);
latch.countDown();
} }
}); });
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Response response = responseRef.get();
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
} }
@Test @Test
public void testRequestListener() throws Exception public void testRequestListener() throws Exception
{ {
HTTPClient client = new HTTPClient(); HttpClient client = new HttpClient();
Response response = client.builder("localhost:8080") Response response = client.newRequest("localhost", 8080)
.method("GET")
.path("/")
.listener(new Request.Listener.Adapter() .listener(new Request.Listener.Adapter()
{ {
@Override @Override
public void onEnd(Request request) public void onSuccess(Request request)
{ {
} }
}) }).send().get(5, TimeUnit.SECONDS);
.build().send(new Response.Listener.Adapter() Assert.assertEquals(200, response.status());
{
@Override
public void onEnd(Response response)
{
}
}).get();
response.getStatus();
} }
@Test @Test
public void testRequestWithExplicitConnectionControl() throws Exception public void testRequestWithExplicitConnectionControl() throws Exception
{ {
HTTPClient client = new HTTPClient(); HttpClient client = new HttpClient();
try (Connection connection = client.getDestination(Address.from("localhost:8080")).connect(5, TimeUnit.SECONDS)) try (Connection connection = client.getDestination("http", "localhost", 8080).newConnection().get(5, TimeUnit.SECONDS))
{ {
Request.Builder builder = client.builder("localhost:8080"); Request request = client.newRequest("localhost", 8080);
Request request = builder.method("GET").path("/").header("Origin", "localhost").build(); BufferingResponseListener listener = new BufferingResponseListener();
connection.send(request, listener);
Future<Response> response = connection.send(request, new Response.Listener.Adapter()); Response response = listener.await(5, TimeUnit.SECONDS);
response.get().getStatus(); Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
} }
} }
@Test @Test
public void testFileUpload() throws Exception public void testFileUpload() throws Exception
{ {
HTTPClient client = new HTTPClient(); HttpClient client = new HttpClient();
Response response = client.builder("localhost:8080") Response response = client.newRequest("localhost", 8080)
.method("GET").path("/").file(new File("")).build().send().get(); .content(new PathContentProvider(Paths.get(""))).send().get();
response.getStatus(); Assert.assertEquals(200, response.status());
} }
@Test @Test
public void testCookie() throws Exception public void testCookie() throws Exception
{ {
HTTPClient client = new HTTPClient(); HttpClient client = new HttpClient();
client.builder("localhost:8080").cookie("key", "value").build().send().get().getStatus(); // 200 Response response = client.newRequest("localhost", 8080).cookie("key", "value").send().get();
Assert.assertEquals(200, response.status());
} }
@Test // @Test
public void testAuthentication() throws Exception // public void testAuthentication() throws Exception
{ // {
HTTPClient client = new HTTPClient(); // HTTPClient client = new HTTPClient();
client.builder("localhost:8080").authentication(new Authentication.Kerberos()).build().send().get().getStatus(); // 200 // client.newRequest("localhost", 8080).authentication(new Authentication.Kerberos()).build().send().get().status(); // 200
} // }
@Test @Test
public void testFollowRedirects() throws Exception public void testFollowRedirects() throws Exception
{ {
HTTPClient client = new HTTPClient(); HttpClient client = new HttpClient();
client.setFollowRedirects(false); client.setFollowRedirects(false);
client.builder("localhost:8080").followRedirects(true).build().send().get().getStatus(); // 200 client.newRequest("localhost", 8080).followRedirects(true).send().get().status(); // 200
} }
@Test @Test
public void testResponseStream() throws Exception public void testResponseStream() throws Exception
{ {
HTTPClient client = new HTTPClient(); HttpClient client = new HttpClient();
StreamResponseListener listener = new StreamResponseListener(); StreamingResponseListener listener = new StreamingResponseListener();
client.builder("localhost:8080").build().send(listener); client.newRequest("localhost", 8080).send(listener);
// Call to get() blocks until the headers arrived // Call to get() blocks until the headers arrived
Response response = listener.get(5, TimeUnit.SECONDS); Response response = listener.get(5, TimeUnit.SECONDS);
if (response.getStatus() == 200) if (response.status() == 200)
{ {
// Solution 1: use input stream // Solution 1: use input stream
byte[] buffer = new byte[256]; byte[] buffer = new byte[256];

View File

@ -0,0 +1,2 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=DEBUG

View File

@ -921,7 +921,8 @@ public class HttpFields implements Iterable<HttpFields.Field>
/* -------------------------------------------------------------- */ /* -------------------------------------------------------------- */
@Override @Override
public String toString() public String
toString()
{ {
try try
{ {

View File

@ -1141,7 +1141,7 @@ public class HttpParser
} }
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
public void inputShutdown() public void shutdownInput()
{ {
// was this unexpected? // was this unexpected?
switch(_state) switch(_state)

View File

@ -324,7 +324,7 @@ public class HttpParserTest
parser.reset(); parser.reset();
init(); init();
parser.parseNext(buffer); parser.parseNext(buffer);
parser.inputShutdown(); parser.shutdownInput();
assertEquals("PUT", _methodOrVersion); assertEquals("PUT", _methodOrVersion);
assertEquals("/doodle", _uriOrStatus); assertEquals("/doodle", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason); assertEquals("HTTP/1.0", _versionOrReason);
@ -400,7 +400,7 @@ public class HttpParserTest
init(); init();
parser.parseNext(buffer); parser.parseNext(buffer);
parser.inputShutdown(); parser.shutdownInput();
assertEquals("HTTP/1.1", _methodOrVersion); assertEquals("HTTP/1.1", _methodOrVersion);
assertEquals("200", _uriOrStatus); assertEquals("200", _uriOrStatus);
assertEquals("Correct", _versionOrReason); assertEquals("Correct", _versionOrReason);

View File

@ -177,7 +177,16 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/ */
public String getOutputString() public String getOutputString()
{ {
return BufferUtil.toString(_out,StringUtil.__UTF8_CHARSET); return getOutputString(StringUtil.__UTF8_CHARSET);
}
/* ------------------------------------------------------------ */
/**
* @return Returns the out.
*/
public String getOutputString(Charset charset)
{
return BufferUtil.toString(_out,charset);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -198,17 +207,17 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/ */
public String takeOutputString() public String takeOutputString()
{ {
ByteBuffer buffer=takeOutput(); return takeOutputString(StringUtil.__UTF8_CHARSET);
return BufferUtil.toString(buffer,StringUtil.__UTF8_CHARSET);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @return Returns the out. * @return Returns the out.
*/ */
public String getOutputString(Charset charset) public String takeOutputString(Charset charset)
{ {
return BufferUtil.toString(_out,charset); ByteBuffer buffer=takeOutput();
return BufferUtil.toString(buffer,charset);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */

View File

@ -47,7 +47,7 @@ public class ChannelEndPoint extends AbstractEndPoint
private volatile boolean _ishut; private volatile boolean _ishut;
private volatile boolean _oshut; private volatile boolean _oshut;
public ChannelEndPoint(ScheduledExecutorService scheduler,SocketChannel channel) throws IOException public ChannelEndPoint(ScheduledExecutorService scheduler,SocketChannel channel)
{ {
super(scheduler, super(scheduler,
(InetSocketAddress)channel.socket().getLocalSocketAddress(), (InetSocketAddress)channel.socket().getLocalSocketAddress(),

View File

@ -66,6 +66,9 @@ public class MappedByteBufferPool implements ByteBufferPool
public void release(ByteBuffer buffer) public void release(ByteBuffer buffer)
{ {
if (buffer == null)
return;
int bucket = bucketFor(buffer.capacity()); int bucket = bucketFor(buffer.capacity());
ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(buffer.isDirect()); ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(buffer.isDirect());

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
@ -75,7 +74,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
*/ */
private volatile int _interestOps; private volatile int _interestOps;
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, ScheduledExecutorService scheduler, long idleTimeout) throws IOException public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, ScheduledExecutorService scheduler, long idleTimeout)
{ {
super(scheduler,channel); super(scheduler,channel);
_selector = selector; _selector = selector;

View File

@ -302,7 +302,7 @@ abstract public class WriteFlusher
// Are we complete? // Are we complete?
for (ByteBuffer b : buffers) for (ByteBuffer b : buffers)
{ {
if (b.hasRemaining()) if (BufferUtil.hasContent(b))
{ {
PendingState<?> pending=new PendingState<>(buffers, context, callback); PendingState<?> pending=new PendingState<>(buffers, context, callback);
if (updateState(__WRITING,pending)) if (updateState(__WRITING,pending))
@ -362,7 +362,7 @@ abstract public class WriteFlusher
// Are we complete? // Are we complete?
for (ByteBuffer b : buffers) for (ByteBuffer b : buffers)
{ {
if (b.hasRemaining()) if (BufferUtil.hasContent(b))
{ {
if (updateState(__COMPLETING,pending)) if (updateState(__COMPLETING,pending))
onIncompleteFlushed(); onIncompleteFlushed();

View File

@ -190,7 +190,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
else if (filled < 0) else if (filled < 0)
{ {
_parser.inputShutdown(); _parser.shutdownInput();
// We were only filling if fully consumed, so if we have // We were only filling if fully consumed, so if we have
// read -1 then we have nothing to parse and thus nothing that // read -1 then we have nothing to parse and thus nothing that
// will generate a response. If we had a suspended request pending // will generate a response. If we had a suspended request pending
@ -517,7 +517,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// If no more input // If no more input
if (getEndPoint().isInputShutdown()) if (getEndPoint().isInputShutdown())
{ {
_parser.inputShutdown(); _parser.shutdownInput();
return; return;
} }
@ -536,7 +536,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
LOG.debug("{} block filled {}",this,filled); LOG.debug("{} block filled {}",this,filled);
if (filled<0) if (filled<0)
{ {
_parser.inputShutdown(); _parser.shutdownInput();
return; return;
} }
} }

View File

@ -117,10 +117,13 @@ public class BufferUtil
* @param buffer The buffer to clear. * @param buffer The buffer to clear.
*/ */
public static void clearToFill(ByteBuffer buffer) public static void clearToFill(ByteBuffer buffer)
{
if (buffer!=null)
{ {
buffer.position(0); buffer.position(0);
buffer.limit(buffer.capacity()); buffer.limit(buffer.capacity());
} }
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Flip the buffer to fill mode. /** Flip the buffer to fill mode.