+ Introduced Request.content(ContentProvider content, String contentType)

+ Introduced a new renamings to clarify concepts
+ Vastly improved Javadocs.
This commit is contained in:
Simone Bordet 2012-11-30 03:15:20 +01:00
parent 6bc507c3af
commit b2e878a7e8
26 changed files with 671 additions and 71 deletions

View File

@ -89,10 +89,10 @@ import org.eclipse.jetty.util.thread.TimerScheduler;
*
* // Asynchronously
* HttpClient client = new HttpClient();
* client.newRequest("http://localhost:8080").send(new Response.Listener.Empty()
* client.newRequest("http://localhost:8080").send(new Response.CompleteListener()
* {
* @Override
* public void onSuccess(Response response)
* public void onComplete(Result result)
* {
* ...
* }
@ -117,8 +117,8 @@ public class HttpClient extends ContainerLifeCycle
private volatile SelectorManager selectorManager;
private volatile String agent = "Jetty/" + Jetty.VERSION;
private volatile boolean followRedirects = true;
private volatile int maxConnectionsPerAddress = 8;
private volatile int maxQueueSizePerAddress = 1024;
private volatile int maxConnectionsPerDestination = 8;
private volatile int maxRequestsQueuedPerDestination = 1024;
private volatile int requestBufferSize = 4096;
private volatile int responseBufferSize = 4096;
private volatile int maxRedirects = 8;
@ -129,16 +129,33 @@ public class HttpClient extends ContainerLifeCycle
private volatile boolean dispatchIO = true;
private volatile ProxyConfiguration proxyConfig;
/**
* Creates a {@link HttpClient} instance that can perform requests to non-TLS destinations only
* (that is, requests with the "http" scheme only, and not "https").
*
* @see #HttpClient(SslContextFactory) to perform requests to TLS destinations.
*/
public HttpClient()
{
this(null);
}
/**
* Creates a {@link HttpClient} instance that can perform requests to non-TLS and TLS destinations
* (that is, both requests with the "http" scheme and with the "https" scheme).
*
* @param sslContextFactory the {@link SslContextFactory} that manages TLS encryption
* @see #getSslContextFactory()
*/
public HttpClient(SslContextFactory sslContextFactory)
{
this.sslContextFactory = sslContextFactory;
}
/**
* @return the {@link SslContextFactory} that manages TLS encryption
* @see #HttpClient(SslContextFactory)
*/
public SslContextFactory getSslContextFactory()
{
return sslContextFactory;
@ -214,56 +231,114 @@ public class HttpClient extends ContainerLifeCycle
LOG.info("Stopped {}", this);
}
/**
* @return a list of {@link Request.Listener} that can be used to add and remove listeners
*/
public List<Request.Listener> getRequestListeners()
{
return requestListeners;
}
/**
* @return the cookie store associated with this instance
*/
public CookieStore getCookieStore()
{
return cookieStore;
}
/**
* @return the authentication store associated with this instance
*/
public AuthenticationStore getAuthenticationStore()
{
return authenticationStore;
}
/**
* @return a set of {@link ContentDecoder.Factory} that can be used to add and remove content decoder factories
*/
public Set<ContentDecoder.Factory> getContentDecoderFactories()
{
return decoderFactories;
}
/**
* Performs a GET request to the specified URI.
*
* @param uri the URI to GET
* @return a future for a {@link ContentResponse}
* @see #GET(URI)
*/
public Future<ContentResponse> GET(String uri)
{
return GET(URI.create(uri));
}
/**
* Performs a GET request to the specified URI.
*
* @param uri the URI to GET
* @return a future for a {@link ContentResponse}
* @see #newRequest(URI)
*/
public Future<ContentResponse> GET(URI uri)
{
return newRequest(uri).send();
}
/**
* Creates a POST request to the specified URI.
*
* @param uri the URI to POST to
* @return the POST request
* @see #POST(URI)
*/
public Request POST(String uri)
{
return POST(URI.create(uri));
}
/**
* Creates a POST request to the specified URI.
*
* @param uri the URI to POST to
* @return the POST request
*/
public Request POST(URI uri)
{
return newRequest(uri).method(HttpMethod.POST);
}
/**
* Creates a new request with the "http" scheme and the specified host and port
*
* @param host the request host
* @param port the request port
* @return the request just created
*/
public Request newRequest(String host, int port)
{
return newRequest(URI.create(address("http", host, port)));
}
/**
* Creates a new request with the specified URI.
*
* @param uri the URI to request
* @return the request just created
*/
public Request newRequest(String uri)
{
return newRequest(URI.create(uri));
}
/**
* Creates a new request with the specified URI.
*
* @param uri the URI to request
* @return the request just created
*/
public Request newRequest(URI uri)
{
return new HttpRequest(this, uri);
@ -291,6 +366,19 @@ public class HttpClient extends ContainerLifeCycle
return scheme + "://" + host + ":" + port;
}
/**
* Returns a {@link Destination} for the given scheme, host and port.
* Applications may use {@link Destination}s to create {@link Connection}s
* that will be outside {@link HttpClient}'s pooling mechanism, to explicitly
* control the connection lifecycle (in particular their termination with
* {@link Connection#close()}).
*
* @param scheme the destination scheme
* @param host the destination host
* @param port the destination port
* @return the destination
* @see #getDestinations()
*/
public Destination getDestination(String scheme, String host, int port)
{
return provideDestination(scheme, host, port);
@ -321,6 +409,9 @@ public class HttpClient extends ContainerLifeCycle
return destination;
}
/**
* @return the list of destinations known to this {@link HttpClient}.
*/
public List<Destination> getDestinations()
{
return new ArrayList<Destination>(destinations.values());
@ -417,31 +508,50 @@ public class HttpClient extends ContainerLifeCycle
return null;
}
/**
* @return the {@link ByteBufferPool} of this {@link HttpClient}
*/
public ByteBufferPool getByteBufferPool()
{
return byteBufferPool;
}
/**
* @param byteBufferPool the {@link ByteBufferPool} of this {@link HttpClient}
*/
public void setByteBufferPool(ByteBufferPool byteBufferPool)
{
this.byteBufferPool = byteBufferPool;
}
/**
* @return the max time a connection can take to connect to destinations
*/
public long getConnectTimeout()
{
return connectTimeout;
}
/**
* @param connectTimeout the max time a connection can take to connect to destinations
* @see java.net.Socket#connect(SocketAddress, int)
*/
public void setConnectTimeout(long connectTimeout)
{
this.connectTimeout = connectTimeout;
}
/**
* @return the max time a connection can be idle (that is, without traffic of bytes in either direction)
*/
public long getIdleTimeout()
{
return idleTimeout;
}
/**
* @param idleTimeout the max time a connection can be idle (that is, without traffic of bytes in either direction)
*/
public void setIdleTimeout(long idleTimeout)
{
this.idleTimeout = idleTimeout;
@ -459,112 +569,196 @@ public class HttpClient extends ContainerLifeCycle
/**
* @param bindAddress the address to bind socket channels to
* @see #getBindAddress()
* @see SocketChannel#bind(SocketAddress)
*/
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}
/**
* @return the "User-Agent" HTTP header string of this {@link HttpClient}
*/
public String getUserAgent()
{
return agent;
}
/**
* @param agent the "User-Agent" HTTP header string of this {@link HttpClient}
*/
public void setUserAgent(String agent)
{
this.agent = agent;
}
/**
* @return whether this {@link HttpClient} follows HTTP redirects
* @see Request#isFollowRedirects()
*/
public boolean isFollowRedirects()
{
return followRedirects;
}
/**
* @param follow whether this {@link HttpClient} follows HTTP redirects
* @see #setMaxRedirects(int)
*/
public void setFollowRedirects(boolean follow)
{
this.followRedirects = follow;
}
/**
* @return the {@link Executor} of this {@link HttpClient}
*/
public Executor getExecutor()
{
return executor;
}
/**
* @param executor the {@link Executor} of this {@link HttpClient}
*/
public void setExecutor(Executor executor)
{
this.executor = executor;
}
/**
* @return the {@link Scheduler} of this {@link HttpClient}
*/
public Scheduler getScheduler()
{
return scheduler;
}
/**
* @param scheduler the {@link Scheduler} of this {@link HttpClient}
*/
public void setScheduler(Scheduler scheduler)
{
this.scheduler = scheduler;
}
public SelectorManager getSelectorManager()
protected SelectorManager getSelectorManager()
{
return selectorManager;
}
public int getMaxConnectionsPerAddress()
/**
* @return the max number of connections that this {@link HttpClient} opens to {@link Destination}s
*/
public int getMaxConnectionsPerDestination()
{
return maxConnectionsPerAddress;
return maxConnectionsPerDestination;
}
public void setMaxConnectionsPerAddress(int maxConnectionsPerAddress)
/**
* Sets the max number of connections to open to each destinations.
* <p />
* RFC 2616 suggests that 2 connections should be opened per each destination,
* but browsers commonly open 6.
* If this {@link HttpClient} is used for load testing, it is common to have only one destination
* (the server to load test), and it is recommended to set this value to a high value (at least as
* much as the threads present in the {@link #getExecutor() executor}).
*
* @param maxConnectionsPerDestination the max number of connections that this {@link HttpClient} opens to {@link Destination}s
*/
public void setMaxConnectionsPerDestination(int maxConnectionsPerDestination)
{
this.maxConnectionsPerAddress = maxConnectionsPerAddress;
this.maxConnectionsPerDestination = maxConnectionsPerDestination;
}
public int getMaxQueueSizePerAddress()
/**
* @return the max number of requests that may be queued to a {@link Destination}.
*/
public int getMaxRequestsQueuedPerDestination()
{
return maxQueueSizePerAddress;
return maxRequestsQueuedPerDestination;
}
public void setMaxQueueSizePerAddress(int maxQueueSizePerAddress)
/**
* Sets the max number of requests that may be queued to a destination.
* <p />
* If this {@link HttpClient} performs a high rate of requests to a destination,
* and all the connections managed by that destination are busy with other requests,
* then new requests will be queued up in the destination.
* This parameter controls how many requests can be queued before starting to reject them.
* If this {@link HttpClient} is used for load testing, it is common to have this parameter
* set to a high value, although this may impact latency (requests sit in the queue for a long
* time before being sent).
*
* @param maxRequestsQueuedPerDestination the max number of requests that may be queued to a {@link Destination}.
*/
public void setMaxRequestsQueuedPerDestination(int maxRequestsQueuedPerDestination)
{
this.maxQueueSizePerAddress = maxQueueSizePerAddress;
this.maxRequestsQueuedPerDestination = maxRequestsQueuedPerDestination;
}
/**
* @return the size of the buffer used to write requests
*/
public int getRequestBufferSize()
{
return requestBufferSize;
}
/**
* @param requestBufferSize the size of the buffer used to write requests
*/
public void setRequestBufferSize(int requestBufferSize)
{
this.requestBufferSize = requestBufferSize;
}
/**
* @return the size of the buffer used to read responses
*/
public int getResponseBufferSize()
{
return responseBufferSize;
}
/**
* @param responseBufferSize the size of the buffer used to read responses
*/
public void setResponseBufferSize(int responseBufferSize)
{
this.responseBufferSize = responseBufferSize;
}
/**
* @return the max number of HTTP redirects that are followed
* @see #setMaxRedirects(int)
*/
public int getMaxRedirects()
{
return maxRedirects;
}
/**
* @param maxRedirects the max number of HTTP redirects that are followed
* @see #setFollowRedirects(boolean)
*/
public void setMaxRedirects(int maxRedirects)
{
this.maxRedirects = maxRedirects;
}
/**
* @return whether TCP_NODELAY is enabled
*/
public boolean isTCPNoDelay()
{
return tcpNoDelay;
}
/**
* @param tcpNoDelay whether TCP_NODELAY is enabled
* @see java.net.Socket#setTcpNoDelay(boolean)
*/
public void setTCPNoDelay(boolean tcpNoDelay)
{
this.tcpNoDelay = tcpNoDelay;
@ -585,22 +779,29 @@ public class HttpClient extends ContainerLifeCycle
* This implementation never blocks on I/O operation, but invokes application callbacks that may
* take time to execute or block on other I/O.
* If application callbacks are known to take time or block on I/O, then parameter {@code dispatchIO}
* must be set to true.
* should be set to true.
* If application callbacks are known to be quick and never block on I/O, then parameter {@code dispatchIO}
* may be set to false.
*
* @param dispatchIO true to dispatch I/O operations in a different thread, false to execute them in the selector thread
* @param dispatchIO true to dispatch I/O operations in a different thread,
* false to execute them in the selector thread
*/
public void setDispatchIO(boolean dispatchIO)
{
this.dispatchIO = dispatchIO;
}
/**
* @return the forward proxy configuration
*/
public ProxyConfiguration getProxyConfiguration()
{
return proxyConfig;
}
/**
* @param proxyConfig the forward proxy configuration
*/
public void setProxyConfiguration(ProxyConfiguration proxyConfig)
{
this.proxyConfig = proxyConfig;

View File

@ -68,9 +68,9 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
this.client = client;
this.scheme = scheme;
this.address = new InetSocketAddress(host, port);
this.requests = new ArrayBlockingQueue<>(client.getMaxQueueSizePerAddress());
this.idleConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
this.activeConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
this.requests = new ArrayBlockingQueue<>(client.getMaxRequestsQueuedPerDestination());
this.idleConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerDestination());
this.activeConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerDestination());
this.requestNotifier = new RequestNotifier(client);
this.responseNotifier = new ResponseNotifier(client);
@ -147,7 +147,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
}
else
{
throw new RejectedExecutionException("Max requests per address " + client.getMaxQueueSizePerAddress() + " exceeded");
throw new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded");
}
}
else
@ -174,7 +174,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
if (result != null)
return result;
final int maxConnections = client.getMaxConnectionsPerAddress();
final int maxConnections = client.getMaxConnectionsPerDestination();
while (true)
{
int current = connectionCount.get();

View File

@ -337,6 +337,14 @@ public class HttpRequest implements Request
@Override
public Request content(ContentProvider content)
{
return content(content, null);
}
@Override
public Request content(ContentProvider content, String contentType)
{
if (contentType != null)
header(HttpHeader.CONTENT_TYPE.asString(), contentType);
this.content = content;
return this;
}
@ -355,12 +363,6 @@ public class HttpRequest implements Request
return content(new PathContentProvider(file));
}
// @Override
// public Request decoder(ContentDecoder decoder)
// {
// return this;
// }
@Override
public boolean isFollowRedirects()
{

View File

@ -20,11 +20,11 @@ package org.eclipse.jetty.client.api;
/**
* {@link Connection} represent a connection to a {@link Destination} and allow applications to send
* requests via {@link #send(Request, Response.Listener)}.
* requests via {@link #send(Request, Response.CompleteListener)}.
* <p />
* {@link Connection}s are normally pooled by {@link Destination}s, but unpooled {@link Connection}s
* may be created by applications that want to do their own connection management via
* {@link Destination#newConnection()}.
* {@link Destination#newConnection()} and {@link Connection#close()}.
*/
public interface Connection extends AutoCloseable
{

View File

@ -21,6 +21,14 @@ package org.eclipse.jetty.client.api;
import java.util.HashSet;
import java.util.Set;
/**
* The configuration of the forward proxy to use with {@link org.eclipse.jetty.client.HttpClient}.
* <p />
* Configuration parameters include the host and port of the forward proxy, and a list of
* {@link #getExcludedOrigins() origins} that are excluded from being proxied.
*
* @see org.eclipse.jetty.client.HttpClient#setProxyConfiguration(ProxyConfiguration)
*/
public class ProxyConfiguration
{
private final Set<String> excluded = new HashSet<>();
@ -33,23 +41,40 @@ public class ProxyConfiguration
this.port = port;
}
/**
* @return the host name of the forward proxy
*/
public String getHost()
{
return host;
}
/**
* @return the port of the forward proxy
*/
public int getPort()
{
return port;
}
/**
* Matches the given {@code host} and {@code port} with the list of excluded origins,
* returning true if the origin is to be proxied, false if it is excluded from proxying.
* @param host the host to match
* @param port the port to match
* @return true if the origin made of {@code host} and {@code port} is to be proxied,
* false if it is excluded from proxying.
*/
public boolean matches(String host, int port)
{
String hostPort = host + ":" + port;
return !getExcludedHosts().contains(hostPort);
return !getExcludedOrigins().contains(hostPort);
}
public Set<String> getExcludedHosts()
/**
* @return the list of origins to exclude from proxying, in the form "host:port".
*/
public Set<String> getExcludedOrigins()
{
return excluded;
}

View File

@ -153,6 +153,12 @@ public interface Request
*/
Request content(ContentProvider content);
/**
* @param content the content provider of this request
* @return this request object
*/
Request content(ContentProvider content, String contentType);
/**
* Shortcut method to specify a file as a content for this request, with the default content type of
* "application/octect-stream".
@ -319,10 +325,16 @@ public interface Request
*/
Throwable getAbortCause();
/**
* Common, empty, super-interface for request listeners.
*/
public interface RequestListener extends EventListener
{
}
/**
* Listener for the request queued event.
*/
public interface QueuedListener extends RequestListener
{
/**
@ -333,6 +345,9 @@ public interface Request
public void onQueued(Request request);
}
/**
* Listener for the request begin event.
*/
public interface BeginListener extends RequestListener
{
/**
@ -344,6 +359,9 @@ public interface Request
public void onBegin(Request request);
}
/**
* Listener for the request committed event.
*/
public interface HeadersListener extends RequestListener
{
/**
@ -355,6 +373,9 @@ public interface Request
public void onHeaders(Request request);
}
/**
* Listener for the request succeeded event.
*/
public interface SuccessListener extends RequestListener
{
/**
@ -365,6 +386,9 @@ public interface Request
public void onSuccess(Request request);
}
/**
* Listener for the request failed event.
*/
public interface FailureListener extends RequestListener
{
/**
@ -376,7 +400,7 @@ public interface Request
}
/**
* Listener for all request events
* Listener for all request events.
*/
public interface Listener extends QueuedListener, BeginListener, HeadersListener, SuccessListener, FailureListener
{

View File

@ -76,10 +76,16 @@ public interface Response
*/
boolean abort(Throwable cause);
/**
* Common, empty, super-interface for response listeners
*/
public interface ResponseListener extends EventListener
{
}
/**
* Listener for the response begin event.
*/
public interface BeginListener extends ResponseListener
{
/**
@ -93,6 +99,9 @@ public interface Response
public void onBegin(Response response);
}
/**
* Listener for the response headers event.
*/
public interface HeadersListener extends ResponseListener
{
/**
@ -103,6 +112,9 @@ public interface Response
public void onHeaders(Response response);
}
/**
* Listener for the response content events.
*/
public interface ContentListener extends ResponseListener
{
/**
@ -116,6 +128,9 @@ public interface Response
public void onContent(Response response, ByteBuffer content);
}
/**
* Listener for the response succeeded event.
*/
public interface SuccessListener extends ResponseListener
{
/**
@ -126,6 +141,9 @@ public interface Response
public void onSuccess(Response response);
}
/**
* Listener for the response failure event.
*/
public interface FailureListener extends ResponseListener
{
/**
@ -137,6 +155,9 @@ public interface Response
public void onFailure(Response response, Throwable failure);
}
/**
* Listener for the request and response completed event.
*/
public interface CompleteListener extends ResponseListener
{
/**
@ -157,7 +178,7 @@ public interface Response
}
/**
* Listener for response events
* Listener for all response events.
*/
public interface Listener extends BeginListener, HeadersListener, ContentListener, SuccessListener, FailureListener, CompleteListener
{

View File

@ -84,12 +84,20 @@ public class Result
return responseFailure;
}
/**
* @return whether both the request and the response succeeded
*/
public boolean isSucceeded()
{
return getFailure() == null;
}
/**
* @return whether either the response or the request failed
*/
public boolean isFailed()
{
return getFailure() != null;
return !isSucceeded();
}
/**

View File

@ -17,11 +17,11 @@
//
/**
* This package provides APIs, utility classes and implementation class of an asynchronous HTTP client
* This package provides APIs, utility classes and an implementation of an asynchronous HTTP client.
* <p />
* The core class is {@link HttpClient}, which acts as a central configuration object (for example
* for {@link HttpClient#setIdleTimeout(long) idle timeouts}, {@link HttpClient#setMaxConnectionsPerAddress(int)
* max connections per domain}, etc.) and as a factory for {@link Request} objects.
* for {@link HttpClient#setIdleTimeout(long) idle timeouts}, {@link HttpClient#setMaxConnectionsPerDestination(int)
* max connections per destination}, etc.) and as a factory for {@link Request} objects.
* <p />
* The HTTP protocol is based on the request/response paradigm, a unit that in this implementation is called
* <em>exchange</em> and is represented by {@link HttpExchange}.
@ -36,7 +36,7 @@
* <p />
* When a request is sent, its exchange is associated to a connection, either taken from an idle queue or created
* anew, and when both the request and response are completed, the exchange is disassociated from the connection.
* A conversation may span multiple connections on different destinations, and therefore are maintained at the
* Conversations may span multiple connections on different destinations, and therefore are maintained at the
* {@link HttpClient} level.
* <p />
* Applications may decide to send the request and wait for the response in a blocking way, using

View File

@ -18,7 +18,9 @@
package org.eclipse.jetty.client.util;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.AuthenticationStore;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
@ -26,6 +28,13 @@ import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
/**
* Implementation of the HTTP "Basic" authentication defined in RFC 2617.
* <p />
* Applications should create objects of this class and add them to the
* {@link AuthenticationStore} retrieved from the {@link HttpClient}
* via {@link HttpClient#getAuthenticationStore()}.
*/
public class BasicAuthentication implements Authentication
{
private final String uri;
@ -33,6 +42,12 @@ public class BasicAuthentication implements Authentication
private final String user;
private final String password;
/**
* @param uri the URI to match for the authentication
* @param realm the realm to match for the authentication
* @param user the user that wants to authenticate
* @param password the password of the user
*/
public BasicAuthentication(String uri, String realm, String user, String password)
{
this.uri = uri;

View File

@ -30,6 +30,19 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Result;
/**
* A {@link BufferingResponseListener} that is also a {@link Future}, to allow applications
* to block (indefinitely or for a timeout) until {@link #onComplete(Result)} is called,
* or to {@link #cancel(boolean) abort} the request/response conversation.
* <p />
* Typical usage is:
* <pre>
* Request request = httpClient.newRequest(...)...;
* BlockingResponseListener listener = new BlockingResponseListener(request);
* request.send(listener); // Asynchronous send
* ContentResponse response = listener.get(5, TimeUnit.SECONDS); // Timed block
* </pre>
*/
public class BlockingResponseListener extends BufferingResponseListener implements Future<ContentResponse>
{
private final CountDownLatch latch = new CountDownLatch(1);

View File

@ -24,6 +24,13 @@ import java.util.NoSuchElementException;
import org.eclipse.jetty.client.api.ContentProvider;
/**
* A {@link ContentProvider} for {@link ByteBuffer}s.
* <p />
* The position and limit of the {@link ByteBuffer}s passed to the constructor are not modified,
* and each invocation of the {@link #iterator()} method returns a {@link ByteBuffer#slice() slice}
* of the original {@link ByteBuffer}.
*/
public class ByteBufferContentProvider implements ContentProvider
{
private final ByteBuffer[] buffers;

View File

@ -24,6 +24,9 @@ import java.util.NoSuchElementException;
import org.eclipse.jetty.client.api.ContentProvider;
/**
* A {@link ContentProvider} for byte arrays.
*/
public class BytesContentProvider implements ContentProvider
{
private final byte[][] bytes;

View File

@ -32,13 +32,22 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.AuthenticationStore;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.TypeUtil;
/**
* Implementation of the HTTP "Digest" authentication defined in RFC 2617.
* <p />
* Applications should create objects of this class and add them to the
* {@link AuthenticationStore} retrieved from the {@link HttpClient}
* via {@link HttpClient#getAuthenticationStore()}.
*/
public class DigestAuthentication implements Authentication
{
private static final Pattern PARAM_PATTERN = Pattern.compile("([^=]+)=(.*)");
@ -48,6 +57,12 @@ public class DigestAuthentication implements Authentication
private final String user;
private final String password;
/**
* @param uri the URI to match for the authentication
* @param realm the realm to match for the authentication
* @param user the user that wants to authenticate
* @param password the password of the user
*/
public DigestAuthentication(String uri, String realm, String user, String password)
{
this.uri = uri;

View File

@ -26,6 +26,22 @@ import java.util.NoSuchElementException;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.util.BufferUtil;
/**
* A {@link ContentProvider} for an {@link InputStream}.
* <p />
* The input stream is read once and therefore fully consumed.
* Invocations to the {@link #iterator()} method after the first will return an "empty" iterator
* because the stream has been consumed on the first invocation.
* <p />
* It is possible to specify, at the constructor, a buffer size used to read content from the
* stream, by default 4096 bytes.
* <p />
* However, it is possible for subclasses to override {@link #onRead(byte[], int, int)} to copy
* the content read from the stream to another location (for example a file), and be able to
* support multiple invocations of {@link #iterator()}, returning the iterator provided by this
* class on the first invocation, and an iterator on the bytes copied to the other location
* for subsequent invocations.
*/
public class InputStreamContentProvider implements ContentProvider
{
private final InputStream stream;
@ -48,6 +64,20 @@ public class InputStreamContentProvider implements ContentProvider
return -1;
}
/**
* Callback method invoked just after having read from the stream,
* but before returning the iteration element (a {@link ByteBuffer}
* to the caller.
* <p />
* Subclasses may override this method to copy the content read from
* the stream to another location (a file, or in memory if the content
* is known to fit).
*
* @param buffer the byte array containing the bytes read
* @param offset the offset from where bytes should be read
* @param length the length of the bytes read
* @return a {@link ByteBuffer} wrapping the byte array
*/
protected ByteBuffer onRead(byte[] buffer, int offset, int length)
{
if (length <= 0)

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -30,21 +31,54 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* Implementation of {@link Response.Listener} that produces an {@link InputStream}
* that allows applications to read the response content.
* <p />
* Typical usage is:
* <pre>
* InputStreamResponseListener listener = new InputStreamResponseListener();
* client.newRequest(...).send(listener);
*
* // Wait for the response headers to arrive
* Response response = listener.get(5, TimeUnit.SECONDS);
* if (response.getStatus() == 200)
* {
* // Obtain the input stream on the response content
* try (InputStream input = listener.getInputStream())
* {
* // Read the response content
* }
* }
* </pre>
* <p />
* The {@link HttpClient} implementation (the producer) will feed the input stream
* asynchronously while the application (the consumer) is reading from it.
* Chunks of content are maintained in a queue, and it is possible to specify a
* maximum buffer size for the bytes held in the queue, by default 16384 bytes.
* <p />
* If the consumer is faster than the producer, then the consumer will block
* with the typical {@link InputStream#read()} semantic.
* If the consumer is slower than the producer, then the producer will block
* until the client consumes.
*/
public class InputStreamResponseListener extends Response.Listener.Empty
{
public static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
private static final byte[] EOF = new byte[0];
private static final byte[] CLOSE = new byte[0];
private static final byte[] FAILURE = new byte[0];
private final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();
private final AtomicLong length = new AtomicLong();
private final CountDownLatch responseLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1);
private final long capacity;
private final long maxBufferSize;
private Response response;
private Result result;
private volatile Throwable failure;
@ -54,9 +88,9 @@ public class InputStreamResponseListener extends Response.Listener.Empty
this(16 * 1024L);
}
public InputStreamResponseListener(long capacity)
public InputStreamResponseListener(long maxBufferSize)
{
this.capacity = capacity;
this.maxBufferSize = maxBufferSize;
}
@Override
@ -72,17 +106,17 @@ public class InputStreamResponseListener extends Response.Listener.Empty
int remaining = content.remaining();
byte[] bytes = new byte[remaining];
content.get(bytes);
LOG.debug("Queued {}/{} bytes", bytes, bytes.length);
LOG.debug("Queuing {}/{} bytes", bytes, bytes.length);
queue.offer(bytes);
long newLength = length.addAndGet(remaining);
while (newLength >= capacity)
while (newLength >= maxBufferSize)
{
LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, capacity);
LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize);
if (!await())
break;
newLength = length.get();
LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, capacity);
LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize);
}
}
@ -90,16 +124,16 @@ public class InputStreamResponseListener extends Response.Listener.Empty
public void onFailure(Response response, Throwable failure)
{
this.failure = failure;
LOG.debug("Queuing failure {} {}", FAILURE, failure);
queue.offer(FAILURE);
LOG.debug("Queued failure {} {}", FAILURE, failure);
responseLatch.countDown();
}
@Override
public void onSuccess(Response response)
{
LOG.debug("Queuing end of content {}{}", EOF, "");
queue.offer(EOF);
LOG.debug("Queued end of content {}{}", EOF, "");
}
@Override
@ -166,7 +200,24 @@ public class InputStreamResponseListener extends Response.Listener.Empty
{
while (true)
{
if (bytes != null)
if (bytes == EOF)
{
// Mark the fact that we saw -1,
// so that in the close case we don't throw
index = -1;
return -1;
}
else if (bytes == FAILURE)
{
throw failure();
}
else if (bytes == CLOSE)
{
if (index < 0)
return -1;
throw new AsynchronousCloseException();
}
else if (bytes != null)
{
if (index < bytes.length)
return bytes[index++];
@ -174,23 +225,23 @@ public class InputStreamResponseListener extends Response.Listener.Empty
bytes = null;
index = 0;
}
bytes = take();
LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
if (bytes == EOF)
return -1;
if (bytes == FAILURE)
else
{
if (failure instanceof IOException)
throw (IOException)failure;
else
throw new IOException(failure);
bytes = take();
LOG.debug("Dequeued {}/{} bytes", bytes, bytes.length);
signal();
}
signal();
}
}
private IOException failure()
{
if (failure instanceof IOException)
return (IOException)failure;
else
return new IOException(failure);
}
private byte[] take() throws IOException
{
try
@ -202,5 +253,13 @@ public class InputStreamResponseListener extends Response.Listener.Empty
throw new InterruptedIOException();
}
}
@Override
public void close() throws IOException
{
LOG.debug("Queuing close {}{}", CLOSE, "");
queue.offer(CLOSE);
super.close();
}
}
}

View File

@ -31,6 +31,12 @@ import java.util.NoSuchElementException;
import org.eclipse.jetty.client.api.ContentProvider;
/**
* A {@link ContentProvider} for files using JDK 7's {@code java.nio.file} APIs.
* <p />
* It is possible to specify, at the constructor, a buffer size used to read content from the
* stream, by default 4096 bytes.
*/
public class PathContentProvider implements ContentProvider
{
private final Path filePath;

View File

@ -20,6 +20,14 @@ package org.eclipse.jetty.client.util;
import java.nio.charset.Charset;
import org.eclipse.jetty.client.api.ContentProvider;
/**
* A {@link ContentProvider} for strings.
* <p />
* It is possible to specify, at the constructor, an encoding used to convert
* the string into bytes, by default UTF-8.
*/
public class StringContentProvider extends BytesContentProvider
{
public StringContentProvider(String content)

View File

@ -31,6 +31,34 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* Implementation of {@link Response.Listener} that allows to specify a timeout for asynchronous
* operations.
* <p />
* {@link TimedResponseListener} may be used to decorate a delegate {@link Response.CompleteListener}
* provided by the application. Events are forwarded by {@link TimedResponseListener} to the delegate
* listener.
* Alternatively, {@link TimedResponseListener} may be subclassed to override callbacks that are
* interesting to the application, typically {@link #onComplete(Result)}.
* <p />
* If the timeout specified at the constructor elapses, the request is {@link Request#abort(Throwable) aborted}
* with a {@link TimeoutException}.
* <p />
* Typical usage is:
* <pre>
* Request request = httpClient.newRequest(...)...;
* TimedResponseListener listener = new TimedResponseListener(5, TimeUnit.SECONDS, request, new Response.CompleteListener()
* {
* public void onComplete(Result result)
* {
* // Invoked when request/response completes or when timeout elapses
*
* // Your logic here
* }
* });
* request.send(listener); // Asynchronous send
* </pre>
*/
public class TimedResponseListener implements Response.Listener, Schedulable, Runnable
{
private static final Logger LOG = Log.getLogger(TimedResponseListener.class);

View File

@ -64,8 +64,8 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
{
start(new LoadHandler());
client.setMaxConnectionsPerAddress(32768);
client.setMaxQueueSizePerAddress(1024 * 1024);
client.setMaxConnectionsPerDestination(32768);
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
client.setDispatchIO(false);
Random random = new Random();

View File

@ -20,11 +20,14 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.channels.AsynchronousCloseException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
@ -189,4 +192,136 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
Assert.assertNotNull(result);
Assert.assertTrue(result.isFailed());
}
@Test(expected = AsynchronousCloseException.class)
public void testDownloadWithCloseBeforeContent() throws Exception
{
final byte[] data = new byte[128 * 1024];
byte value = 3;
Arrays.fill(data, value);
final CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.flushBuffer();
try
{
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
response.getOutputStream().write(data);
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
InputStream input = listener.getInputStream();
Assert.assertNotNull(input);
input.close();
latch.countDown();
input.read();
}
@Test(expected = AsynchronousCloseException.class)
public void testDownloadWithCloseMiddleOfContent() throws Exception
{
final byte[] data1 = new byte[1024];
final byte[] data2 = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.getOutputStream().write(data1);
response.flushBuffer();
try
{
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
response.getOutputStream().write(data2);
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
InputStream input = listener.getInputStream();
Assert.assertNotNull(input);
for (byte b : data1)
input.read();
input.close();
latch.countDown();
input.read(); // throws
}
@Test
public void testDownloadWithCloseEndOfContent() throws Exception
{
final byte[] data = new byte[1024];
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.getOutputStream().write(data);
response.flushBuffer();
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
InputStream input = listener.getInputStream();
Assert.assertNotNull(input);
for (byte b : data)
input.read();
// Read EOF
Assert.assertEquals(-1, input.read());
input.close();
// Must not throw
Assert.assertEquals(-1, input.read());
}
}

View File

@ -283,7 +283,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
{
start(new EmptyServerHandler());
client.setMaxConnectionsPerAddress(1);
client.setMaxConnectionsPerDestination(1);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(2);
@ -343,7 +343,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
{
start(new EmptyServerHandler());
client.setMaxConnectionsPerAddress(1);
client.setMaxConnectionsPerDestination(1);
final long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);

View File

@ -90,7 +90,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
start(new TimeoutHandler(3 * timeout));
// Only one connection so requests get queued
client.setMaxConnectionsPerAddress(1);
client.setMaxConnectionsPerDestination(1);
// The first request has a long timeout
final CountDownLatch firstLatch = new CountDownLatch(1);

View File

@ -210,7 +210,7 @@ public class ProxyServlet extends HttpServlet
* <tr>
* <td>maxConnections</td>
* <td>32768</td>
* <td>The max number of connection per address, see {@link HttpClient#setMaxConnectionsPerAddress(int)}</td>
* <td>The max number of connections per destination, see {@link HttpClient#setMaxConnectionsPerDestination(int)}</td>
* </tr>
* <tr>
* <td>idleTimeout</td>
@ -260,7 +260,7 @@ public class ProxyServlet extends HttpServlet
value = config.getInitParameter("maxConnections");
if (value == null)
value = "32768";
client.setMaxConnectionsPerAddress(Integer.parseInt(value));
client.setMaxConnectionsPerDestination(Integer.parseInt(value));
value = config.getInitParameter("idleTimeout");
if (value == null)
@ -380,7 +380,7 @@ public class ProxyServlet extends HttpServlet
// Remove hop-by-hop headers
if (HOP_HEADERS.contains(lowerHeaderName))
continue;
if (_hostHeader!=null && lowerHeaderName.equals("host"))
continue;
@ -458,7 +458,7 @@ public class ProxyServlet extends HttpServlet
System.lineSeparator(),
proxyRequest.getHeaders().toString().trim());
}
proxyRequest.send(new TimedResponseListener(getTimeout(), TimeUnit.MILLISECONDS, proxyRequest, new ProxyResponseListener(request, response)));
}

View File

@ -592,7 +592,7 @@ public class ProxyServletTest
}
});
int port = serverConnector.getLocalPort();
client.getProxyConfiguration().getExcludedHosts().add("127.0.0.1:" + port);
client.getProxyConfiguration().getExcludedOrigins().add("127.0.0.1:" + port);
// Try with a proxied host
ContentResponse response = client.newRequest("localhost", port)

View File

@ -86,7 +86,7 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest
connector.setDefaultProtocol(factory.getProtocol());
HttpClient httpClient = new HttpClient();
// Simulate browsers, that open 6 connection per origin
httpClient.setMaxConnectionsPerAddress(6);
httpClient.setMaxConnectionsPerDestination(6);
httpClient.start();
benchmarkHTTP(httpClient);
httpClient.stop();