Working draft of the abstraction of HttpClient transport.

This commit is contained in:
Simone Bordet 2013-07-12 19:29:58 +02:00
parent 1c546a90d6
commit a4c63caf26
35 changed files with 3715 additions and 2262 deletions

View File

@ -91,6 +91,12 @@
<artifactId>jetty-io</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.spdy</groupId>
<artifactId>spdy-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>

View File

@ -0,0 +1,76 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class HttpChannel
{
protected static final Logger LOG = Log.getLogger(new Object(){}.getClass().getEnclosingClass());
private final AtomicReference<HttpExchange> exchange = new AtomicReference<>();
private final HttpDestination destination;
protected HttpChannel(HttpDestination destination)
{
this.destination = destination;
}
public HttpDestination getHttpDestination()
{
return destination;
}
public void associate(HttpExchange exchange)
{
if (!this.exchange.compareAndSet(null, exchange))
throw new UnsupportedOperationException("Pipelined requests not supported");
exchange.associate(this);
LOG.debug("{} associated to {}", exchange, this);
}
public HttpExchange disassociate()
{
HttpExchange exchange = this.exchange.getAndSet(null);
if (exchange != null)
exchange.disassociate(this);
LOG.debug("{} disassociated from {}", exchange, this);
return exchange;
}
public HttpExchange getHttpExchange()
{
return exchange.get();
}
public abstract void send();
public abstract void proceed(HttpExchange exchange, boolean proceed);
public abstract boolean abort(Throwable cause);
public void exchangeTerminated(Result result)
{
disassociate();
}
}

View File

@ -19,14 +19,11 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.ConnectException;
import java.net.CookieManager;
import java.net.CookiePolicy;
import java.net.CookieStore;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URI;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
@ -41,7 +38,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.client.api.AuthenticationStore;
import org.eclipse.jetty.client.api.Connection;
@ -50,16 +46,13 @@ import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.ProxyConfiguration;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.SocketAddressResolver;
@ -116,6 +109,7 @@ public class HttpClient extends ContainerLifeCycle
private final List<Request.Listener> requestListeners = new ArrayList<>();
private final AuthenticationStore authenticationStore = new HttpAuthenticationStore();
private final Set<ContentDecoder.Factory> decoderFactories = new ContentDecoderFactorySet();
private final HttpClientTransport transport;
private final SslContextFactory sslContextFactory;
private volatile CookieManager cookieManager;
private volatile CookieStore cookieStore;
@ -123,7 +117,6 @@ public class HttpClient extends ContainerLifeCycle
private volatile ByteBufferPool byteBufferPool;
private volatile Scheduler scheduler;
private volatile SocketAddressResolver resolver;
private volatile SelectorManager selectorManager;
private volatile HttpField agentField = new HttpField(HttpHeader.USER_AGENT, "Jetty/" + Jetty.VERSION);
private volatile boolean followRedirects = true;
private volatile int maxConnectionsPerDestination = 64;
@ -137,6 +130,7 @@ public class HttpClient extends ContainerLifeCycle
private volatile long idleTimeout;
private volatile boolean tcpNoDelay = true;
private volatile boolean dispatchIO = true;
private volatile boolean strictEventOrdering = true;
private volatile ProxyConfiguration proxyConfig;
private volatile HttpField encodingField;
@ -160,9 +154,20 @@ public class HttpClient extends ContainerLifeCycle
*/
public HttpClient(SslContextFactory sslContextFactory)
{
this(new HttpClientTransportOverHTTP(), sslContextFactory);
}
public HttpClient(HttpClientTransport transport, SslContextFactory sslContextFactory)
{
this.transport = transport;
this.sslContextFactory = sslContextFactory;
}
public HttpClientTransport getTransport()
{
return transport;
}
/**
* @return the {@link SslContextFactory} that manages TLS encryption
* @see #HttpClient(SslContextFactory)
@ -196,11 +201,10 @@ public class HttpClient extends ContainerLifeCycle
scheduler = new ScheduledExecutorScheduler(name + "-scheduler", false);
addBean(scheduler);
resolver = new SocketAddressResolver(executor, scheduler, getAddressResolutionTimeout());
addBean(transport);
transport.setHttpClient(this);
selectorManager = newSelectorManager();
selectorManager.setConnectTimeout(getConnectTimeout());
addBean(selectorManager);
resolver = new SocketAddressResolver(executor, scheduler, getAddressResolutionTimeout());
handlers.add(new ContinueProtocolHandler(this));
handlers.add(new RedirectProtocolHandler(this));
@ -215,11 +219,6 @@ public class HttpClient extends ContainerLifeCycle
super.doStart();
}
protected SelectorManager newSelectorManager()
{
return new ClientSelectorManager(getExecutor(), getScheduler());
}
private CookieManager newCookieManager()
{
return new CookieManager(getCookieStore(), CookiePolicy.ACCEPT_ALL);
@ -414,7 +413,7 @@ public class HttpClient extends ContainerLifeCycle
return newRequest;
}
protected String address(String scheme, String host, int port)
public String address(String scheme, String host, int port)
{
StringBuilder result = new StringBuilder();
URIUtil.appendSchemeHostPort(result, scheme, host, port);
@ -447,7 +446,7 @@ public class HttpClient extends ContainerLifeCycle
HttpDestination destination = destinations.get(address);
if (destination == null)
{
destination = new HttpDestination(this, scheme, host, port);
destination = transport.newHttpDestination(this, scheme, host, port);
if (isRunning())
{
HttpDestination existing = destinations.putIfAbsent(address, destination);
@ -489,28 +488,7 @@ public class HttpClient extends ContainerLifeCycle
@Override
public void succeeded(SocketAddress socketAddress)
{
SocketChannel channel = null;
try
{
channel = SocketChannel.open();
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
configure(channel);
channel.configureBlocking(false);
channel.connect(socketAddress);
ConnectionCallback callback = new ConnectionCallback(destination, promise);
selectorManager.connect(channel, callback);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
if (channel != null)
close(channel);
promise.failed(x);
}
transport.connect(destination, socketAddress, promise);
}
@Override
@ -521,23 +499,6 @@ public class HttpClient extends ContainerLifeCycle
});
}
protected void configure(SocketChannel channel) throws SocketException
{
channel.socket().setTcpNoDelay(isTCPNoDelay());
}
private void close(SocketChannel channel)
{
try
{
channel.close();
}
catch (IOException x)
{
LOG.ignore(x);
}
}
protected HttpConversation getConversation(long id, boolean create)
{
HttpConversation conversation = conversations.get(id);
@ -729,11 +690,6 @@ public class HttpClient extends ContainerLifeCycle
this.scheduler = scheduler;
}
protected SelectorManager getSelectorManager()
{
return selectorManager;
}
/**
* @return the max number of connections that this {@link HttpClient} opens to {@link Destination}s
*/
@ -878,6 +834,41 @@ public class HttpClient extends ContainerLifeCycle
this.dispatchIO = dispatchIO;
}
/**
* @return whether request events must be strictly ordered
*/
public boolean isStrictEventOrdering()
{
return strictEventOrdering;
}
/**
* Whether request events must be strictly ordered.
* <p />
* {@link Response.CompleteListener}s may send a second request.
* If the second request is for the same destination, there is an inherent race
* condition for the use of the connection: the first request may still be associated with the
* connection, so the second request cannot use that connection and is forced to open another one.
* <p />
* From the point of view of connection usage, the connection is reusable just before the "complete"
* event, so it would be possible to reuse that connection from {@link Response.CompleteListener}s;
* but in this case the second request's events will fire before the "complete" events of the first
* request.
* <p />
* This setting enforces strict event ordering so that a "begin" event of a second request can never
* fire before the "complete" event of a first request, but at the expense of an increased usage
* of connections.
* <p />
* When not enforced, a "begin" event of a second request may happen before the "complete" event of
* a first request and allow for better usage of connections.
*
* @param strictEventOrdering whether request events must be strictly ordered
*/
public void setStrictEventOrdering(boolean strictEventOrdering)
{
this.strictEventOrdering = strictEventOrdering;
}
/**
* @return the forward proxy configuration
*/
@ -916,16 +907,6 @@ public class HttpClient extends ContainerLifeCycle
return HttpScheme.HTTPS.is(scheme) ? port == 443 : port == 80;
}
protected HttpConnection newHttpConnection(HttpClient httpClient, EndPoint endPoint, HttpDestination destination)
{
return new HttpConnection(httpClient, endPoint, destination);
}
protected SslConnection newSslConnection(HttpClient httpClient, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(httpClient.getByteBufferPool(), httpClient.getExecutor(), endPoint, engine);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
@ -935,109 +916,18 @@ public class HttpClient extends ContainerLifeCycle
protected Connection tunnel(Connection connection)
{
// TODO
/*
HttpConnection httpConnection = (HttpConnection)connection;
HttpDestination destination = httpConnection.getDestination();
HttpDestination destination = httpConnection.getHttpDestination();
SslConnection sslConnection = createSslConnection(destination, httpConnection.getEndPoint());
Connection result = (Connection)sslConnection.getDecryptedEndPoint().getConnection();
selectorManager.connectionClosed(httpConnection);
selectorManager.connectionOpened(sslConnection);
LOG.debug("Tunnelled {} over {}", connection, result);
return result;
}
private SslConnection createSslConnection(HttpDestination destination, EndPoint endPoint)
{
SSLEngine engine = sslContextFactory.newSSLEngine(destination.getHost(), destination.getPort());
engine.setUseClientMode(true);
SslConnection sslConnection = newSslConnection(HttpClient.this, endPoint, engine);
sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed());
endPoint.setConnection(sslConnection);
EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
HttpConnection connection = newHttpConnection(this, appEndPoint, destination);
appEndPoint.setConnection(connection);
return sslConnection;
}
protected class ClientSelectorManager extends SelectorManager
{
public ClientSelectorManager(Executor executor, Scheduler scheduler)
{
this(executor, scheduler, 1);
}
public ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(executor, scheduler, selectors);
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key)
{
return new SelectChannelEndPoint(channel, selector, key, getScheduler(), getIdleTimeout());
}
@Override
public org.eclipse.jetty.io.Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
ConnectionCallback callback = (ConnectionCallback)attachment;
HttpDestination destination = callback.destination;
SslContextFactory sslContextFactory = getSslContextFactory();
if (!destination.isProxied() && HttpScheme.HTTPS.is(destination.getScheme()))
{
if (sslContextFactory == null)
{
IOException failure = new ConnectException("Missing " + SslContextFactory.class.getSimpleName() + " for " + destination.getScheme() + " requests");
callback.failed(failure);
throw failure;
}
else
{
SslConnection sslConnection = createSslConnection(destination, endPoint);
callback.succeeded((Connection)sslConnection.getDecryptedEndPoint().getConnection());
return sslConnection;
}
}
else
{
HttpConnection connection = newHttpConnection(HttpClient.this, endPoint, destination);
callback.succeeded(connection);
return connection;
}
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
ConnectionCallback callback = (ConnectionCallback)attachment;
callback.failed(ex);
}
}
private class ConnectionCallback implements Promise<Connection>
{
private final HttpDestination destination;
private final Promise<Connection> promise;
private ConnectionCallback(HttpDestination destination, Promise<Connection> promise)
{
this.destination = destination;
this.promise = promise;
}
@Override
public void succeeded(Connection result)
{
promise.succeeded(result);
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
*/
return connection;
}
private class ContentDecoderFactorySet implements Set<ContentDecoder.Factory>

View File

@ -0,0 +1,33 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.net.SocketAddress;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Promise;
public interface HttpClientTransport
{
void setHttpClient(HttpClient client);
HttpDestination newHttpDestination(HttpClient httpClient, String scheme, String host, int port);
void connect(HttpDestination destination, SocketAddress address, Promise<Connection> promise);
}

View File

@ -21,10 +21,8 @@ package org.eclipse.jetty.client;
import java.net.HttpCookie;
import java.net.URI;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.Connection;
@ -37,31 +35,18 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpConnection extends AbstractConnection implements Connection
public abstract class HttpConnection implements Connection
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final HttpField CHUNKED_FIELD = new HttpField(HttpHeader.TRANSFER_ENCODING, HttpHeaderValue.CHUNKED);
private final AtomicReference<HttpExchange> exchange = new AtomicReference<>();
private final HttpClient client;
private final HttpDestination destination;
private final HttpSender sender;
private final HttpReceiver receiver;
private long idleTimeout;
private volatile boolean closed;
public HttpConnection(HttpClient client, EndPoint endPoint, HttpDestination destination)
protected HttpConnection(HttpClient client, HttpDestination destination)
{
super(endPoint, client.getExecutor(), client.isDispatchIO());
this.client = client;
this.destination = destination;
this.sender = new HttpSender(this);
this.receiver = new HttpReceiver(this);
}
public HttpClient getHttpClient()
@ -69,55 +54,11 @@ public class HttpConnection extends AbstractConnection implements Connection
return client;
}
public HttpDestination getDestination()
public HttpDestination getHttpDestination()
{
return destination;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onClose()
{
closed = true;
super.onClose();
}
@Override
public void fillInterested()
{
// This is necessary when "upgrading" the connection for example after proxied
// CONNECT requests, because the old connection will read the CONNECT response
// and then set the read interest, while the new connection attached to the same
// EndPoint also will set the read interest, causing a ReadPendingException.
if (!closed)
super.fillInterested();
}
@Override
protected boolean onReadTimeout()
{
LOG.debug("{} idle timeout", this);
HttpExchange exchange = getExchange();
if (exchange != null)
idleTimeout();
else
destination.remove(this);
return true;
}
protected void idleTimeout()
{
receiver.idleTimeout();
}
@Override
public void send(Request request, Response.CompleteListener listener)
{
@ -132,27 +73,14 @@ public class HttpConnection extends AbstractConnection implements Connection
listeners.add(listener);
HttpConversation conversation = client.getConversation(request.getConversationID(), true);
HttpExchange exchange = new HttpExchange(conversation, getDestination(), request, listeners);
HttpExchange exchange = new HttpExchange(conversation, getHttpDestination(), request, listeners);
send(exchange);
}
public void send(HttpExchange exchange)
{
Request request = exchange.getRequest();
normalizeRequest(request);
protected abstract void send(HttpExchange exchange);
// Save the old idle timeout to restore it
EndPoint endPoint = getEndPoint();
idleTimeout = endPoint.getIdleTimeout();
endPoint.setIdleTimeout(request.getIdleTimeout());
// Associate the exchange to the connection
associate(exchange);
sender.send(exchange);
}
private void normalizeRequest(Request request)
protected void normalizeRequest(Request request)
{
if (request.getMethod() == null)
request.method(HttpMethod.GET);
@ -188,7 +116,7 @@ public class HttpConnection extends AbstractConnection implements Connection
if (version.getVersion() > 10)
{
if (!headers.containsKey(HttpHeader.HOST.asString()))
headers.put(getDestination().getHostField());
headers.put(getHttpDestination().getHostField());
}
// Add content headers
@ -235,129 +163,4 @@ public class HttpConnection extends AbstractConnection implements Connection
headers.put(acceptEncodingField);
}
}
public HttpExchange getExchange()
{
return exchange.get();
}
protected void associate(HttpExchange exchange)
{
if (!this.exchange.compareAndSet(null, exchange))
throw new UnsupportedOperationException("Pipelined requests not supported");
exchange.setConnection(this);
LOG.debug("{} associated to {}", exchange, this);
}
protected HttpExchange disassociate()
{
HttpExchange exchange = this.exchange.getAndSet(null);
if (exchange != null)
exchange.setConnection(null);
LOG.debug("{} disassociated from {}", exchange, this);
return exchange;
}
@Override
public void onFillable()
{
HttpExchange exchange = getExchange();
if (exchange != null)
{
receive();
}
else
{
// If there is no exchange, then could be either a remote close,
// or garbage bytes; in both cases we close the connection
close();
}
}
protected void receive()
{
receiver.receive();
}
public void complete(HttpExchange exchange, boolean success)
{
HttpExchange existing = disassociate();
if (existing == exchange)
{
exchange.awaitTermination();
// Restore idle timeout
getEndPoint().setIdleTimeout(idleTimeout);
LOG.debug("{} disassociated from {}", exchange, this);
if (success)
{
HttpFields responseHeaders = exchange.getResponse().getHeaders();
Enumeration<String> values = responseHeaders.getValues(HttpHeader.CONNECTION.asString(), ",");
if (values != null)
{
while (values.hasMoreElements())
{
if ("close".equalsIgnoreCase(values.nextElement()))
{
close();
return;
}
}
}
destination.release(this);
}
else
{
close();
}
}
else if (existing == null)
{
// It is possible that the exchange has already been disassociated,
// for example if the connection idle timeouts: this will fail
// the response, but the request may still be under processing.
// Eventually the request will also fail as the connection is closed
// and will arrive here without an exchange being present.
// We just ignore this fact, as the exchange has already been processed
}
else
{
throw new IllegalStateException();
}
}
public boolean abort(Throwable cause)
{
// We want the return value to be that of the response
// because if the response has already successfully
// arrived then we failed to abort the exchange
sender.abort(cause);
return receiver.abort(cause);
}
public void proceed(boolean proceed)
{
sender.proceed(proceed);
}
@Override
public void close()
{
destination.remove(this);
getEndPoint().shutdownOutput();
LOG.debug("{} oshut", this);
getEndPoint().close();
LOG.debug("{} closed", this);
}
@Override
public String toString()
{
return String.format("%s@%x(l:%s <-> r:%s)",
HttpConnection.class.getSimpleName(),
hashCode(),
getEndPoint().getLocalAddress(),
getEndPoint().getRemoteAddress());
}
}

View File

@ -0,0 +1,89 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public abstract class HttpContent implements Callback
{
private final ContentProvider provider;
private final Iterator<ByteBuffer> iterator;
private ByteBuffer buffer;
private volatile ByteBuffer content;
public HttpContent(ContentProvider provider)
{
this(provider, provider == null ? Collections.<ByteBuffer>emptyIterator() : provider.iterator());
}
public HttpContent(HttpContent that)
{
this(that.provider, that.iterator);
this.buffer = that.buffer;
this.content = that.content;
}
private HttpContent(ContentProvider provider, Iterator<ByteBuffer> iterator)
{
this.provider = provider;
this.iterator = iterator;
}
public boolean hasContent()
{
return provider != null;
}
public boolean isLast()
{
return !iterator.hasNext();
}
public ByteBuffer getByteBuffer()
{
return buffer;
}
public ByteBuffer getContent()
{
return content;
}
public boolean advance()
{
if (isLast())
{
if (content != null)
content = buffer = BufferUtil.EMPTY_BUFFER;
return false;
}
else
{
ByteBuffer buffer = this.buffer = iterator.next();
content = buffer == null ? null : buffer.slice();
return buffer != null;
}
}
}

View File

@ -22,13 +22,11 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.AsynchronousCloseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
@ -40,7 +38,6 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
@ -48,18 +45,15 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HttpDestination implements Destination, Closeable, Dumpable
public abstract class HttpDestination implements Destination, Closeable, Dumpable
{
private static final Logger LOG = Log.getLogger(HttpDestination.class);
protected static final Logger LOG = Log.getLogger(new Object(){}.getClass().getEnclosingClass());
private final AtomicInteger connectionCount = new AtomicInteger();
private final HttpClient client;
private final String scheme;
private final String host;
private final Address address;
private final Queue<HttpExchange> exchanges;
private final BlockingQueue<Connection> idleConnections;
private final BlockingQueue<Connection> activeConnections;
private final RequestNotifier requestNotifier;
private final ResponseNotifier responseNotifier;
private final Address proxyAddress;
@ -72,14 +66,7 @@ public class HttpDestination implements Destination, Closeable, Dumpable
this.host = host;
this.address = new Address(host, port);
int maxRequestsQueued = client.getMaxRequestsQueuedPerDestination();
int capacity = Math.min(32, maxRequestsQueued);
this.exchanges = new BlockingArrayQueue<>(capacity, capacity, maxRequestsQueued);
int maxConnections = client.getMaxConnectionsPerDestination();
capacity = Math.min(8, maxConnections);
this.idleConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
this.activeConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
this.exchanges = new LinkedBlockingQueue<>(client.getMaxRequestsQueuedPerDestination());
this.requestNotifier = new RequestNotifier(client);
this.responseNotifier = new ResponseNotifier(client);
@ -93,14 +80,14 @@ public class HttpDestination implements Destination, Closeable, Dumpable
hostField = new HttpField(HttpHeader.HOST, host);
}
protected BlockingQueue<Connection> getIdleConnections()
public HttpClient getHttpClient()
{
return idleConnections;
return client;
}
protected BlockingQueue<Connection> getActiveConnections()
public Queue<HttpExchange> getHttpExchanges()
{
return activeConnections;
return exchanges;
}
public RequestNotifier getRequestNotifier()
@ -157,7 +144,7 @@ public class HttpDestination implements Destination, Closeable, Dumpable
return hostField;
}
public void send(Request request, List<Response.ResponseListener> listeners)
protected void send(Request request, List<Response.ResponseListener> listeners)
{
if (!scheme.equals(request.getScheme()))
throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
@ -182,9 +169,7 @@ public class HttpDestination implements Destination, Closeable, Dumpable
{
LOG.debug("Queued {}", request);
requestNotifier.notifyQueued(request);
Connection connection = acquire();
if (connection != null)
process(connection, false);
send();
}
}
else
@ -199,6 +184,8 @@ public class HttpDestination implements Destination, Closeable, Dumpable
}
}
protected abstract void send();
public void newConnection(Promise<Connection> promise)
{
createConnection(new ProxyPromise(promise));
@ -209,80 +196,24 @@ public class HttpDestination implements Destination, Closeable, Dumpable
client.newConnection(this, promise);
}
protected Connection acquire()
public boolean remove(HttpExchange exchange)
{
Connection result = idleConnections.poll();
if (result != null)
return result;
final int maxConnections = client.getMaxConnectionsPerDestination();
while (true)
{
int current = connectionCount.get();
final int next = current + 1;
if (next > maxConnections)
{
LOG.debug("Max connections per destination {} exceeded for {}", current, this);
// Try again the idle connections
return idleConnections.poll();
}
if (connectionCount.compareAndSet(current, next))
{
LOG.debug("Creating connection {}/{} for {}", next, maxConnections, this);
// This is the promise that is being called when a connection (eventually proxied) succeeds or fails.
Promise<Connection> promise = new Promise<Connection>()
{
@Override
public void succeeded(Connection connection)
{
process(connection, true);
}
@Override
public void failed(final Throwable x)
{
client.getExecutor().execute(new Runnable()
{
@Override
public void run()
{
abort(x);
}
});
}
};
// Create a new connection, and pass a ProxyPromise to establish a proxy tunnel, if needed.
// Differently from the case where the connection is created explicitly by applications, here
// we need to do a bit more logging and keep track of the connection count in case of failures.
createConnection(new ProxyPromise(promise)
{
@Override
public void succeeded(Connection connection)
{
LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this);
super.succeeded(connection);
}
@Override
public void failed(Throwable x)
{
LOG.debug("Connection failed {} for {}", x, HttpDestination.this);
connectionCount.decrementAndGet();
super.failed(x);
}
});
// Try again the idle connections
return idleConnections.poll();
}
}
return exchanges.remove(exchange);
}
private void abort(Throwable cause)
public void close()
{
abort(new AsynchronousCloseException());
LOG.debug("Closed {}", this);
}
/**
* Aborts all the {@link HttpExchange}s queued in this destination.
*
* @param cause the abort cause
* @see #abort(HttpExchange, Throwable)
*/
public void abort(Throwable cause)
{
HttpExchange exchange;
while ((exchange = exchanges.poll()) != null)
@ -290,134 +221,11 @@ public class HttpDestination implements Destination, Closeable, Dumpable
}
/**
* <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
* <p>A new connection is created when a request needs to be executed; it is possible that the request that
* triggered the request creation is executed by another connection that was just released, so the new connection
* may become idle.</p>
* <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
* Aborts the given {@code exchange}, notifies listeners of the failure, and completes the exchange.
*
* @param connection the new connection
* @param dispatch whether to dispatch the processing to another thread
* @param exchange the {@link HttpExchange} to abort
* @param cause the abort cause
*/
protected void process(Connection connection, boolean dispatch)
{
// Ugly cast, but lack of generic reification forces it
final HttpConnection httpConnection = (HttpConnection)connection;
final HttpExchange exchange = exchanges.poll();
if (exchange == null)
{
LOG.debug("{} idle", httpConnection);
if (!idleConnections.offer(httpConnection))
{
LOG.debug("{} idle overflow");
httpConnection.close();
}
if (!client.isRunning())
{
LOG.debug("{} is stopping", client);
remove(httpConnection);
httpConnection.close();
}
}
else
{
final Request request = exchange.getRequest();
Throwable cause = request.getAbortCause();
if (cause != null)
{
abort(exchange, cause);
LOG.debug("Aborted before processing {}: {}", exchange, cause);
}
else
{
LOG.debug("{} active", httpConnection);
if (!activeConnections.offer(httpConnection))
{
LOG.warn("{} active overflow");
}
if (dispatch)
{
client.getExecutor().execute(new Runnable()
{
@Override
public void run()
{
httpConnection.send(exchange);
}
});
}
else
{
httpConnection.send(exchange);
}
}
}
}
public void release(Connection connection)
{
LOG.debug("{} released", connection);
if (client.isRunning())
{
boolean removed = activeConnections.remove(connection);
if (removed)
process(connection, false);
else
LOG.debug("{} explicit", connection);
}
else
{
LOG.debug("{} is stopped", client);
remove(connection);
connection.close();
}
}
public void remove(Connection connection)
{
boolean removed = activeConnections.remove(connection);
removed |= idleConnections.remove(connection);
if (removed)
{
int open = connectionCount.decrementAndGet();
LOG.debug("Removed connection {} for {} - open: {}", connection, this, open);
}
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries
if (!exchanges.isEmpty())
{
connection = acquire();
if (connection != null)
process(connection, false);
}
}
public void close()
{
for (Connection connection : idleConnections)
connection.close();
idleConnections.clear();
// A bit drastic, but we cannot wait for all requests to complete
for (Connection connection : activeConnections)
connection.close();
activeConnections.clear();
abort(new AsynchronousCloseException());
connectionCount.set(0);
LOG.debug("Closed {}", this);
}
public boolean remove(HttpExchange exchange)
{
return exchanges.remove(exchange);
}
protected void abort(HttpExchange exchange, Throwable cause)
{
Request request = exchange.getRequest();
@ -438,22 +246,19 @@ public class HttpDestination implements Destination, Closeable, Dumpable
public void dump(Appendable out, String indent) throws IOException
{
ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size());
List<String> connections = new ArrayList<>();
for (Connection connection : idleConnections)
connections.add(connection + " - IDLE");
for (Connection connection : activeConnections)
connections.add(connection + " - ACTIVE");
ContainerLifeCycle.dump(out, indent, connections);
}
public String asString()
{
return client.address(getScheme(), getHost(), getPort());
}
@Override
public String toString()
{
return String.format("%s(%s://%s:%d)%s",
return String.format("%s(%s)%s",
HttpDestination.class.getSimpleName(),
getScheme(),
getHost(),
getPort(),
asString(),
proxyAddress == null ? "" : " via " + proxyAddress.getHost() + ":" + proxyAddress.getPort());
}

View File

@ -20,8 +20,9 @@ package org.eclipse.jetty.client;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicMarkableReference;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@ -33,14 +34,16 @@ public class HttpExchange
{
private static final Logger LOG = Log.getLogger(HttpExchange.class);
private final AtomicBoolean requestComplete = new AtomicBoolean();
private final AtomicBoolean responseComplete = new AtomicBoolean();
private final AtomicInteger complete = new AtomicInteger();
private final CountDownLatch terminate = new CountDownLatch(2);
private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
private final HttpConversation conversation;
private final HttpDestination destination;
private final Request request;
private final List<Response.ResponseListener> listeners;
private final HttpResponse response;
private volatile HttpConnection connection;
private volatile Throwable requestFailure;
private volatile Throwable responseFailure;
@ -85,30 +88,47 @@ public class HttpExchange
return responseFailure;
}
public void setConnection(HttpConnection connection)
public void associate(HttpChannel channel)
{
this.connection = connection;
if (!this.channel.compareAndSet(null, channel))
throw new IllegalStateException();
}
public AtomicMarkableReference<Result> requestComplete(Throwable failure)
public void disassociate(HttpChannel channel)
{
if (!this.channel.compareAndSet(channel, null))
throw new IllegalStateException();
}
public boolean requestComplete()
{
return requestComplete.compareAndSet(false, true);
}
public boolean responseComplete()
{
return responseComplete.compareAndSet(false, true);
}
public Result terminateRequest(Throwable failure)
{
int requestSuccess = 0b0011;
int requestFailure = 0b0001;
return complete(failure == null ? requestSuccess : requestFailure, failure);
return terminate(failure == null ? requestSuccess : requestFailure, failure);
}
public AtomicMarkableReference<Result> responseComplete(Throwable failure)
public Result terminateResponse(Throwable failure)
{
if (failure == null)
{
int responseSuccess = 0b1100;
return complete(responseSuccess, failure);
return terminate(responseSuccess, failure);
}
else
{
proceed(false);
int responseFailure = 0b0100;
return complete(responseFailure, failure);
return terminate(responseFailure, failure);
}
}
@ -125,16 +145,10 @@ public class HttpExchange
* By using {@link AtomicInteger} to atomically sum these codes we can know
* whether the exchange is completed and whether is successful.
*
* @param code the bits representing the status code for either the request or the response
* @param failure the failure - if any - associated with the status code for either the request or the response
* @return an AtomicMarkableReference holding whether the operation modified the
* completion status and the {@link Result} - if any - associated with the status
* @return the {@link Result} - if any - associated with the status
*/
private AtomicMarkableReference<Result> complete(int code, Throwable failure)
private Result terminate(int code, Throwable failure)
{
Result result = null;
boolean modified = false;
int current;
while (true)
{
@ -146,7 +160,6 @@ public class HttpExchange
if (!complete.compareAndSet(current, candidate))
continue;
current = candidate;
modified = true;
if ((code & 0b01) == 0b01)
requestFailure = failure;
else
@ -156,19 +169,16 @@ public class HttpExchange
break;
}
int completed = 0b0101;
if ((current & completed) == completed)
int terminated = 0b0101;
if ((current & terminated) == terminated)
{
if (modified)
{
// Request and response completed
LOG.debug("{} complete", this);
conversation.complete();
}
result = new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
// Request and response terminated
LOG.debug("{} terminated", this);
conversation.complete();
return new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
}
return new AtomicMarkableReference<>(result, modified);
return null;
}
public boolean abort(Throwable cause)
@ -181,12 +191,12 @@ public class HttpExchange
}
else
{
HttpConnection connection = this.connection;
// If there is no connection, this exchange is already completed
if (connection == null)
HttpChannel channel = this.channel.get();
// If there is no channel, this exchange is already completed
if (channel == null)
return false;
boolean aborted = connection.abort(cause);
boolean aborted = channel.abort(cause);
LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
return aborted;
}
@ -194,6 +204,7 @@ public class HttpExchange
public void resetResponse(boolean success)
{
responseComplete.set(false);
int responseSuccess = 0b1100;
int responseFailure = 0b0100;
int code = success ? responseSuccess : responseFailure;
@ -202,9 +213,9 @@ public class HttpExchange
public void proceed(boolean proceed)
{
HttpConnection connection = this.connection;
if (connection != null)
connection.proceed(proceed);
HttpChannel channel = this.channel.get();
if (channel != null)
channel.proceed(this, proceed);
}
public void terminateRequest()
@ -229,15 +240,20 @@ public class HttpExchange
}
}
@Override
public String toString()
private String toString(int code)
{
String padding = "0000";
String status = Integer.toBinaryString(complete.get());
String status = Integer.toBinaryString(code);
return String.format("%s@%x status=%s%s",
HttpExchange.class.getSimpleName(),
hashCode(),
padding.substring(status.length()),
status);
}
@Override
public String toString()
{
return toString(complete.get());
}
}

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.client;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
@ -28,185 +27,113 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicMarkableReference;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
public abstract class HttpReceiver
{
private static final Logger LOG = Log.getLogger(HttpReceiver.class);
protected static final Logger LOG = Log.getLogger(new Object(){}.getClass().getEnclosingClass());
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
private final HttpParser parser = new HttpParser(this);
private final HttpConnection connection;
private ContentDecoder decoder;
private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
private final HttpChannel channel;
private volatile ContentDecoder decoder;
public HttpReceiver(HttpConnection connection)
public HttpReceiver(HttpChannel channel)
{
this.connection = connection;
this.channel = channel;
}
public void receive()
public HttpChannel getHttpChannel()
{
EndPoint endPoint = connection.getEndPoint();
HttpClient client = connection.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
try
return channel;
}
protected HttpExchange getHttpExchange()
{
return channel.getHttpExchange();
}
protected HttpDestination getHttpDestination()
{
return channel.getHttpDestination();
}
protected void onResponseBegin(HttpExchange exchange)
{
if (!updateResponseState(ResponseState.IDLE, ResponseState.BEGIN))
return;
HttpConversation conversation = exchange.getConversation();
HttpResponse response = exchange.getResponse();
// Probe the protocol handlers
HttpDestination destination = getHttpDestination();
HttpClient client = destination.getHttpClient();
ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
Response.Listener handlerListener = null;
if (protocolHandler != null)
{
while (true)
handlerListener = protocolHandler.getResponseListener();
LOG.debug("Found protocol handler {}", protocolHandler);
}
exchange.getConversation().updateResponseListeners(handlerListener);
LOG.debug("Response begin {}", response);
ResponseNotifier notifier = destination.getResponseNotifier();
notifier.notifyBegin(conversation.getResponseListeners(), response);
}
protected void onResponseHeader(HttpExchange exchange, HttpField field)
{
out: while (true)
{
ResponseState current = responseState.get();
switch (current)
{
int read = endPoint.fill(buffer);
LOG.debug("Read {} bytes from {}", read, connection);
if (read > 0)
case BEGIN:
case HEADER:
{
parse(buffer);
}
else if (read == 0)
{
fillInterested();
if (updateResponseState(current, ResponseState.HEADER))
break out;
break;
}
else
default:
{
shutdown();
break;
return;
}
}
}
catch (EofException x)
HttpResponse response = exchange.getResponse();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
boolean process = notifier.notifyHeader(exchange.getConversation().getResponseListeners(), response, field);
if (process)
{
LOG.ignore(x);
failAndClose(x);
}
catch (Exception x)
{
LOG.debug(x);
failAndClose(x);
}
finally
{
bufferPool.release(buffer);
}
}
private void parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
parser.parseNext(buffer);
}
private void fillInterested()
{
State state = this.state.get();
if (state == State.IDLE || state == State.RECEIVE)
connection.fillInterested();
}
private void shutdown()
{
parser.atEOF();
parser.parseNext(BufferUtil.EMPTY_BUFFER);
State state = this.state.get();
if (state == State.IDLE || state == State.RECEIVE)
{
if (!fail(new EOFException()))
connection.close();
}
}
@Override
public int getHeaderCacheSize()
{
// TODO get from configuration
return 256;
}
@Override
public boolean startResponse(HttpVersion version, int status, String reason)
{
if (updateState(State.IDLE, State.RECEIVE))
{
HttpExchange exchange = connection.getExchange();
// The exchange may be null if it failed concurrently
if (exchange != null)
response.getHeaders().add(field);
HttpHeader fieldHeader = field.getHeader();
if (fieldHeader != null)
{
HttpConversation conversation = exchange.getConversation();
HttpResponse response = exchange.getResponse();
parser.setHeadResponse(exchange.getRequest().getMethod() == HttpMethod.HEAD);
response.version(version).status(status).reason(reason);
// Probe the protocol handlers
HttpClient client = connection.getHttpClient();
ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
Response.Listener handlerListener = null;
if (protocolHandler != null)
switch (fieldHeader)
{
handlerListener = protocolHandler.getResponseListener();
LOG.debug("Found protocol handler {}", protocolHandler);
}
exchange.getConversation().updateResponseListeners(handlerListener);
LOG.debug("Receiving {}", response);
ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
notifier.notifyBegin(conversation.getResponseListeners(), response);
}
}
return false;
}
@Override
public boolean parsedHeader(HttpField field)
{
if (updateState(State.RECEIVE, State.RECEIVE))
{
HttpExchange exchange = connection.getExchange();
// The exchange may be null if it failed concurrently
if (exchange != null)
{
HttpConversation conversation = exchange.getConversation();
HttpResponse response = exchange.getResponse();
ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
boolean process = notifier.notifyHeader(conversation.getResponseListeners(), response, field);
if (process)
{
response.getHeaders().add(field);
HttpHeader fieldHeader = field.getHeader();
if (fieldHeader != null)
case SET_COOKIE:
case SET_COOKIE2:
{
switch (fieldHeader)
{
case SET_COOKIE:
case SET_COOKIE2:
{
storeCookie(exchange.getRequest().getURI(), field);
break;
}
default:
{
break;
}
}
storeCookie(exchange.getRequest().getURI(), field);
break;
}
default:
{
break;
}
}
}
}
return false;
}
private void storeCookie(URI uri, HttpField field)
@ -218,7 +145,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
{
Map<String, List<String>> header = new HashMap<>(1);
header.put(field.getHeader().asString(), Collections.singletonList(value));
connection.getHttpClient().getCookieManager().put(uri, header);
getHttpDestination().getHttpClient().getCookieManager().put(uri, header);
}
}
catch (IOException x)
@ -227,113 +154,128 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
}
}
@Override
public boolean headerComplete()
protected void onResponseHeaders(HttpExchange exchange)
{
if (updateState(State.RECEIVE, State.RECEIVE))
out: while (true)
{
HttpExchange exchange = connection.getExchange();
// The exchange may be null if it failed concurrently
if (exchange != null)
ResponseState current = responseState.get();
switch (current)
{
HttpConversation conversation = exchange.getConversation();
HttpResponse response = exchange.getResponse();
LOG.debug("Headers {}", response);
ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
notifier.notifyHeaders(conversation.getResponseListeners(), response);
Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
if (contentEncodings != null)
case BEGIN:
case HEADER:
{
for (ContentDecoder.Factory factory : connection.getHttpClient().getContentDecoderFactories())
if (updateResponseState(current, ResponseState.HEADERS))
break out;
break;
}
default:
{
return;
}
}
}
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response headers {}{}{}", response, System.getProperty("line.separator"), response.getHeaders().toString().trim());
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyHeaders(exchange.getConversation().getResponseListeners(), response);
Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
if (contentEncodings != null)
{
for (ContentDecoder.Factory factory : getHttpDestination().getHttpClient().getContentDecoderFactories())
{
while (contentEncodings.hasMoreElements())
{
if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
{
while (contentEncodings.hasMoreElements())
{
if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
{
this.decoder = factory.newContentDecoder();
break;
}
}
this.decoder = factory.newContentDecoder();
break;
}
}
}
}
return false;
}
@Override
public boolean content(ByteBuffer buffer)
protected void onResponseContent(HttpExchange exchange, ByteBuffer buffer)
{
if (updateState(State.RECEIVE, State.RECEIVE))
out: while (true)
{
HttpExchange exchange = connection.getExchange();
// The exchange may be null if it failed concurrently
if (exchange != null)
ResponseState current = responseState.get();
switch (current)
{
HttpConversation conversation = exchange.getConversation();
HttpResponse response = exchange.getResponse();
LOG.debug("Content {}: {} bytes", response, buffer.remaining());
ContentDecoder decoder = this.decoder;
if (decoder != null)
case HEADERS:
case CONTENT:
{
buffer = decoder.decode(buffer);
LOG.debug("{} {}: {} bytes", decoder, response, buffer.remaining());
if (updateResponseState(current, ResponseState.CONTENT))
break out;
break;
}
default:
{
return;
}
ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
notifier.notifyContent(conversation.getResponseListeners(), response, buffer);
}
}
return false;
}
@Override
public boolean messageComplete()
{
if (updateState(State.RECEIVE, State.RECEIVE))
success();
return true;
}
protected boolean success()
{
HttpExchange exchange = connection.getExchange();
if (exchange == null)
return false;
AtomicMarkableReference<Result> completion = exchange.responseComplete(null);
if (!completion.isMarked())
return false;
parser.reset();
decoder = null;
if (!updateState(State.RECEIVE, State.IDLE))
throw new IllegalStateException();
exchange.terminateResponse();
HttpResponse response = exchange.getResponse();
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
notifier.notifySuccess(listeners, response);
LOG.debug("Received {}", response);
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.getProperty("line.separator"), BufferUtil.toDetailString(buffer));
ContentDecoder decoder = this.decoder;
if (decoder != null)
{
buffer = decoder.decode(buffer);
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.getProperty("line.separator"), BufferUtil.toDetailString(buffer));
}
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer);
}
protected boolean onResponseSuccess(HttpExchange exchange)
{
// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
boolean completed = exchange.responseComplete();
if (!completed)
return false;
// Reset to be ready for another response
reset();
// Mark atomically the response as terminated and succeeded,
// with respect to concurrency between request and response.
// If there is a non-null result, then both sender and
// receiver are reset and ready to be reused, and the
// connection closed/pooled (depending on the transport).
Result result = exchange.terminateResponse(null);
HttpResponse response = exchange.getResponse();
LOG.debug("Response success {}", response);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifySuccess(listeners, response);
Result result = completion.getReference();
if (result != null)
{
connection.complete(exchange, !result.isFailed());
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(result);
LOG.debug("Request/Response complete {}", response);
notifier.notifyComplete(listeners, result);
if (ordered)
channel.exchangeTerminated(result);
}
return true;
}
protected boolean fail(Throwable failure)
protected boolean onResponseFailure(Throwable failure)
{
HttpExchange exchange = connection.getExchange();
HttpExchange exchange = getHttpExchange();
// In case of a response error, the failure has already been notified
// and it is possible that a further attempt to read in the receive
// loop throws an exception that reenters here but without exchange;
@ -341,82 +283,71 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
if (exchange == null)
return false;
AtomicMarkableReference<Result> completion = exchange.responseComplete(failure);
if (!completion.isMarked())
// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
boolean completed = exchange.responseComplete();
if (!completed)
return false;
parser.close();
decoder = null;
// Dispose to avoid further responses
dispose();
while (true)
{
State current = state.get();
if (updateState(current, State.FAILURE))
break;
}
exchange.terminateResponse();
// Mark atomically the response as terminated and failed,
// with respect to concurrency between request and response.
Result result = exchange.terminateResponse(failure);
HttpResponse response = exchange.getResponse();
HttpConversation conversation = exchange.getConversation();
ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
notifier.notifyFailure(conversation.getResponseListeners(), response, failure);
LOG.debug("Failed {} {}", response, failure);
LOG.debug("Response failure {} {}", response, failure);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyFailure(listeners, response, failure);
Result result = completion.getReference();
if (result != null)
{
connection.complete(exchange, false);
notifier.notifyComplete(conversation.getResponseListeners(), result);
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(result);
notifier.notifyComplete(listeners, result);
if (ordered)
channel.exchangeTerminated(result);
}
return true;
}
@Override
public void earlyEOF()
protected void reset()
{
failAndClose(new EOFException());
responseState.set(ResponseState.IDLE);
decoder = null;
}
private void failAndClose(Throwable failure)
protected void dispose()
{
fail(failure);
connection.close();
}
@Override
public void badMessage(int status, String reason)
{
HttpExchange exchange = connection.getExchange();
HttpResponse response = exchange.getResponse();
response.status(status).reason(reason);
failAndClose(new HttpResponseException("HTTP protocol violation: bad response", response));
decoder = null;
}
public void idleTimeout()
{
// If we cannot fail, it means a response arrived
// just when we were timeout idling, so we don't close
fail(new TimeoutException());
onResponseFailure(new TimeoutException());
}
public boolean abort(Throwable cause)
{
return fail(cause);
return onResponseFailure(cause);
}
private boolean updateState(State from, State to)
private boolean updateResponseState(ResponseState from, ResponseState to)
{
boolean updated = state.compareAndSet(from, to);
boolean updated = responseState.compareAndSet(from, to);
if (!updated)
LOG.debug("State update failed: {} -> {}: {}", from, to, state.get());
LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
return updated;
}
private enum State
private enum ResponseState
{
IDLE, RECEIVE, FAILURE
IDLE, BEGIN, HEADER, HEADERS, CONTENT, FAILURE
}
}

View File

@ -106,6 +106,6 @@ public class HttpResponse implements Response
@Override
public String toString()
{
return String.format("%s[%s %d %s]", HttpResponse.class.getSimpleName(), getVersion(), getStatus(), getReason());
return String.format("%s[%s %d %s]@%x", HttpResponse.class.getSimpleName(), getVersion(), getStatus(), getReason(), hashCode());
}
}

View File

@ -0,0 +1,116 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.http;
import java.util.Enumeration;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
public class HttpChannelOverHTTP extends HttpChannel
{
private final HttpConnectionOverHTTP connection;
private final HttpSenderOverHTTP sender;
private final HttpReceiverOverHTTP receiver;
public HttpChannelOverHTTP(HttpConnectionOverHTTP connection)
{
super(connection.getHttpDestination());
this.connection = connection;
this.sender = new HttpSenderOverHTTP(this);
this.receiver = new HttpReceiverOverHTTP(this);
}
public HttpConnectionOverHTTP getHttpConnection()
{
return connection;
}
@Override
public void send()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
sender.send(exchange);
}
@Override
public void proceed(HttpExchange exchange, boolean proceed)
{
sender.proceed(exchange, proceed);
}
@Override
public boolean abort(Throwable cause)
{
// We want the return value to be that of the response
// because if the response has already successfully
// arrived then we failed to abort the exchange
sender.abort(cause);
return receiver.abort(cause);
}
public void receive()
{
receiver.receive();
}
@Override
public void exchangeTerminated(Result result)
{
super.exchangeTerminated(result);
if (result.isSucceeded())
{
HttpFields responseHeaders = result.getResponse().getHeaders();
Enumeration<String> values = responseHeaders.getValues(HttpHeader.CONNECTION.asString(), ",");
if (values != null)
{
while (values.hasMoreElements())
{
if (HttpHeaderValue.CLOSE.asString().equalsIgnoreCase(values.nextElement()))
{
connection.close();
return;
}
}
}
connection.release();
}
else
{
connection.close();
}
}
public void idleTimeout()
{
receiver.idleTimeout();
}
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
}

View File

@ -0,0 +1,224 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.http;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HttpClientTransportOverHTTP extends ContainerLifeCycle implements HttpClientTransport
{
private static final Logger LOG = Log.getLogger(HttpClientTransportOverHTTP.class);
private volatile HttpClient client;
private volatile SelectorManager selectorManager;
@Override
public void setHttpClient(HttpClient client)
{
this.client = client;
}
@Override
protected void doStart() throws Exception
{
selectorManager = newSelectorManager(client);
selectorManager.setConnectTimeout(client.getConnectTimeout());
addBean(selectorManager);
super.doStart();
}
@Override
public HttpDestination newHttpDestination(HttpClient client, String scheme, String host, int port)
{
return new HttpDestinationOverHTTP(client, scheme, host, port);
}
@Override
public void connect(HttpDestination destination, SocketAddress address, Promise<org.eclipse.jetty.client.api.Connection> promise)
{
SocketChannel channel = null;
try
{
channel = SocketChannel.open();
HttpClient client = destination.getHttpClient();
SocketAddress bindAddress = client.getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
configure(client, channel);
channel.configureBlocking(false);
channel.connect(address);
ConnectionCallback callback = new ConnectionCallback(destination, promise);
selectorManager.connect(channel, callback);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
finally
{
promise.failed(x);
}
}
}
protected void configure(HttpClient client, SocketChannel channel) throws SocketException
{
channel.socket().setTcpNoDelay(client.isTCPNoDelay());
}
protected SelectorManager newSelectorManager(HttpClient client)
{
return new ClientSelectorManager(client, 1);
}
protected Connection newHttpConnection(HttpClient httpClient, EndPoint endPoint, HttpDestination destination)
{
return new HttpConnectionOverHTTP(httpClient, endPoint, destination);
}
protected SslConnection newSslConnection(HttpClient httpClient, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(httpClient.getByteBufferPool(), httpClient.getExecutor(), endPoint, engine);
}
private SslConnection createSslConnection(HttpDestination destination, EndPoint endPoint)
{
HttpClient httpClient = destination.getHttpClient();
SslContextFactory sslContextFactory = httpClient.getSslContextFactory();
SSLEngine engine = sslContextFactory.newSSLEngine(destination.getHost(), destination.getPort());
engine.setUseClientMode(true);
SslConnection sslConnection = newSslConnection(httpClient, endPoint, engine);
sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed());
endPoint.setConnection(sslConnection);
EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
Connection connection = newHttpConnection(httpClient, appEndPoint, destination);
appEndPoint.setConnection(connection);
return sslConnection;
}
protected class ClientSelectorManager extends SelectorManager
{
private final HttpClient client;
protected ClientSelectorManager(HttpClient client, int selectors)
{
super(client.getExecutor(), client.getScheduler(), selectors);
this.client = client;
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key)
{
return new SelectChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout());
}
@Override
public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
ConnectionCallback callback = (ConnectionCallback)attachment;
HttpDestination destination = callback.destination;
SslContextFactory sslContextFactory = client.getSslContextFactory();
if (!destination.isProxied() && HttpScheme.HTTPS.is(destination.getScheme()))
{
if (sslContextFactory == null)
{
IOException failure = new ConnectException("Missing " + SslContextFactory.class.getSimpleName() + " for " + destination.getScheme() + " requests");
callback.failed(failure);
throw failure;
}
else
{
SslConnection sslConnection = createSslConnection(destination, endPoint);
callback.succeeded((org.eclipse.jetty.client.api.Connection)sslConnection.getDecryptedEndPoint().getConnection());
return sslConnection;
}
}
else
{
Connection connection = newHttpConnection(client, endPoint, destination);
callback.succeeded((org.eclipse.jetty.client.api.Connection)connection);
return connection;
}
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
ConnectionCallback callback = (ConnectionCallback)attachment;
callback.failed(ex);
}
}
private class ConnectionCallback implements Promise<org.eclipse.jetty.client.api.Connection>
{
private final HttpDestination destination;
private final Promise<org.eclipse.jetty.client.api.Connection> promise;
private ConnectionCallback(HttpDestination destination, Promise<org.eclipse.jetty.client.api.Connection> promise)
{
this.destination = destination;
this.promise = promise;
}
@Override
public void succeeded(org.eclipse.jetty.client.api.Connection result)
{
promise.succeeded(result);
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
}
}

View File

@ -0,0 +1,189 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.http;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpConnectionOverHTTP extends AbstractConnection implements Connection
{
private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
private final Delegate delegate;
private final HttpChannelOverHTTP channel;
private volatile boolean closed;
private volatile long idleTimeout;
public HttpConnectionOverHTTP(HttpClient client, EndPoint endPoint, HttpDestination destination)
{
super(endPoint, client.getExecutor(), client.isDispatchIO());
this.delegate = new Delegate(client, destination);
this.channel = new HttpChannelOverHTTP(this);
}
public HttpChannelOverHTTP getHttpChannel()
{
return channel;
}
public HttpDestinationOverHTTP getHttpDestination()
{
return (HttpDestinationOverHTTP)delegate.getHttpDestination();
}
@Override
public void send(Request request, Response.CompleteListener listener)
{
delegate.send(request, listener);
}
protected void send(HttpExchange exchange)
{
delegate.send(exchange);
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onClose()
{
closed = true;
super.onClose();
}
@Override
public void fillInterested()
{
// This is necessary when "upgrading" the connection for example after proxied
// CONNECT requests, because the old connection will read the CONNECT response
// and then set the read interest, while the new connection attached to the same
// EndPoint also will set the read interest, causing a ReadPendingException.
if (!closed)
super.fillInterested();
}
@Override
protected boolean onReadTimeout()
{
LOG.debug("{} idle timeout", this);
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
idleTimeout();
else
getHttpDestination().remove(this);
return true;
}
protected void idleTimeout()
{
// TODO: we need to fail the exchange if we did not get an answer from the server
// TODO: however this mechanism does not seem to be available in SPDY if not subclassing SPDYConnection
// TODO: but the API (Session) does not have such facilities; perhaps we need to add a callback to ISession
channel.idleTimeout();
}
@Override
public void onFillable()
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
{
channel.receive();
}
else
{
// If there is no exchange, then could be either a remote close,
// or garbage bytes; in both cases we close the connection
close();
}
}
public void release()
{
// Restore idle timeout
getEndPoint().setIdleTimeout(idleTimeout);
getHttpDestination().release(this);
}
@Override
public void close()
{
getHttpDestination().remove(this);
getEndPoint().shutdownOutput();
LOG.debug("{} oshut", this);
getEndPoint().close();
LOG.debug("{} closed", this);
}
@Override
public String toString()
{
return String.format("%s@%x(l:%s <-> r:%s)",
HttpConnection.class.getSimpleName(),
hashCode(),
getEndPoint().getLocalAddress(),
getEndPoint().getRemoteAddress());
}
private class Delegate extends HttpConnection
{
private Delegate(HttpClient client, HttpDestination destination)
{
super(client, destination);
}
@Override
protected void send(HttpExchange exchange)
{
Request request = exchange.getRequest();
normalizeRequest(request);
// Save the old idle timeout to restore it
EndPoint endPoint = getEndPoint();
idleTimeout = endPoint.getIdleTimeout();
endPoint.setIdleTimeout(request.getIdleTimeout());
// One channel per connection, just delegate the send
channel.associate(exchange);
channel.send();
}
@Override
public void close()
{
HttpConnectionOverHTTP.this.close();
}
}
}

View File

@ -0,0 +1,203 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.http;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpConnectionPool implements Dumpable
{
private static final Logger LOG = Log.getLogger(HttpConnectionPool.class);
private final AtomicInteger connectionCount = new AtomicInteger();
private final Destination destination;
private final int maxConnections;
private final Promise<Connection> connectionPromise;
private final BlockingQueue<Connection> idleConnections;
private final BlockingQueue<Connection> activeConnections;
public HttpConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
{
this.destination = destination;
this.maxConnections = maxConnections;
this.connectionPromise = connectionPromise;
this.idleConnections = new BlockingArrayQueue<>(maxConnections);
this.activeConnections = new BlockingArrayQueue<>(maxConnections);
}
public BlockingQueue<Connection> getIdleConnections()
{
return idleConnections;
}
public BlockingQueue<Connection> getActiveConnections()
{
return activeConnections;
}
public Connection acquire()
{
Connection result = acquireIdleConnection();
if (result != null)
return result;
while (true)
{
int current = connectionCount.get();
final int next = current + 1;
if (next > maxConnections)
{
LOG.debug("Max connections {}/{} reached", current, maxConnections);
// Try again the idle connections
return acquireIdleConnection();
}
if (connectionCount.compareAndSet(current, next))
{
LOG.debug("Connection {}/{} creation", next, maxConnections);
destination.newConnection(new Promise<Connection>()
{
@Override
public void succeeded(Connection connection)
{
LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
activate(connection);
connectionPromise.succeeded(connection);
}
@Override
public void failed(Throwable x)
{
LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
connectionCount.decrementAndGet();
connectionPromise.failed(x);
}
});
// Try again the idle connections
return acquireIdleConnection();
}
}
}
private Connection acquireIdleConnection()
{
Connection connection = idleConnections.poll();
if (connection != null)
activate(connection);
return connection;
}
private boolean activate(Connection connection)
{
if (activeConnections.offer(connection))
{
LOG.debug("Connection active {}", connection);
return true;
}
else
{
LOG.debug("Connection active overflow {}", connection);
return false;
}
}
public boolean release(Connection connection)
{
if (activeConnections.remove(connection))
{
if (idleConnections.offer(connection))
{
LOG.debug("Connection idle {}", connection);
return true;
}
else
{
LOG.debug("Connection idle overflow {}", connection);
}
}
return false;
}
public void remove(Connection connection)
{
boolean removed = activeConnections.remove(connection);
removed |= idleConnections.remove(connection);
if (removed)
{
int pooled = connectionCount.decrementAndGet();
LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
}
}
public boolean isActive(Connection connection)
{
return activeConnections.contains(connection);
}
public boolean isIdle(Connection connection)
{
return idleConnections.contains(connection);
}
public void close()
{
for (Connection connection : idleConnections)
connection.close();
idleConnections.clear();
// A bit drastic, but we cannot wait for all requests to complete
for (Connection connection : activeConnections)
connection.close();
activeConnections.clear();
connectionCount.set(0);
}
@Override
public String dump()
{
return ContainerLifeCycle.dump(this);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
ContainerLifeCycle.dumpObject(out, this);
ContainerLifeCycle.dump(out, indent, activeConnections, idleConnections);
}
@Override
public String toString()
{
return String.format("%s %d/%d", getClass().getSimpleName(), connectionCount.get(), maxConnections);
}
}

View File

@ -0,0 +1,182 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.http;
import java.io.IOException;
import java.util.Arrays;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
public class HttpDestinationOverHTTP extends HttpDestination implements Promise<Connection>
{
private final HttpConnectionPool connectionPool;
public HttpDestinationOverHTTP(HttpClient client, String scheme, String host, int port)
{
super(client, scheme, host, port);
this.connectionPool = new HttpConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
}
public HttpConnectionPool getHttpConnectionPool()
{
return connectionPool;
}
@Override
public void succeeded(Connection connection)
{
process((HttpConnectionOverHTTP)connection, true);
}
@Override
public void failed(final Throwable x)
{
getHttpClient().getExecutor().execute(new Runnable()
{
@Override
public void run()
{
abort(x);
}
});
}
protected void send()
{
HttpConnectionOverHTTP connection = acquire();
if (connection != null)
process(connection, false);
}
// TODO: make it protected
public HttpConnectionOverHTTP acquire()
{
return (HttpConnectionOverHTTP)connectionPool.acquire();
}
/**
* <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
* <p>A new connection is created when a request needs to be executed; it is possible that the request that
* triggered the request creation is executed by another connection that was just released, so the new connection
* may become idle.</p>
* <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
*
* @param connection the new connection
* @param dispatch whether to dispatch the processing to another thread
*/
private void process(final HttpConnectionOverHTTP connection, boolean dispatch)
{
HttpClient client = getHttpClient();
final HttpExchange exchange = getHttpExchanges().poll();
LOG.debug("Processing exchange {} on connection {}", exchange, connection);
if (exchange == null)
{
// TODO: review this part... may not be 100% correct
// TODO: e.g. is client is not running, there should be no need to close the connection
if (!connectionPool.release(connection))
connection.close();
if (!client.isRunning())
{
LOG.debug("{} is stopping", client);
connection.close();
}
}
else
{
final Request request = exchange.getRequest();
Throwable cause = request.getAbortCause();
if (cause != null)
{
abort(exchange, cause);
LOG.debug("Aborted before processing {}: {}", exchange, cause);
}
else
{
if (dispatch)
{
client.getExecutor().execute(new Runnable()
{
@Override
public void run()
{
connection.send(exchange);
}
});
}
else
{
connection.send(exchange);
}
}
}
}
protected void release(HttpConnectionOverHTTP connection)
{
LOG.debug("{} released", connection);
HttpClient client = getHttpClient();
if (client.isRunning())
{
if (connectionPool.isActive(connection))
process(connection, false);
else
LOG.debug("{} explicit", connection);
}
else
{
LOG.debug("{} is stopped", client);
remove(connection);
connection.close();
}
}
protected void remove(HttpConnectionOverHTTP connection)
{
connectionPool.remove(connection);
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries
if (!getHttpExchanges().isEmpty())
{
connection = acquire();
if (connection != null)
process(connection, false);
}
}
public void close()
{
connectionPool.close();
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
ContainerLifeCycle.dump(out, indent, Arrays.asList(connectionPool));
}
}

View File

@ -0,0 +1,221 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.http;
import java.io.EOFException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
{
private final HttpParser parser = new HttpParser(this);
public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
{
super(channel);
}
@Override
public HttpChannelOverHTTP getHttpChannel()
{
return (HttpChannelOverHTTP)super.getHttpChannel();
}
public void receive()
{
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
try
{
while (true)
{
int read = endPoint.fill(buffer);
LOG.debug("Read {} bytes from {}", read, endPoint);
if (read > 0)
{
parse(buffer);
}
else if (read == 0)
{
fillInterested();
break;
}
else
{
shutdown();
break;
}
}
}
catch (EofException x)
{
LOG.ignore(x);
failAndClose(x);
}
catch (Exception x)
{
LOG.debug(x);
failAndClose(x);
}
finally
{
bufferPool.release(buffer);
}
}
private void parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
parser.parseNext(buffer);
}
private void fillInterested()
{
// TODO: do we need to call fillInterested() only if we are not failed (or we have an exchange) ?
getHttpChannel().getHttpConnection().fillInterested();
}
private void shutdown()
{
// Shutting down the parser may invoke messageComplete() or earlyEOF()
parser.shutdownInput();
if (!onResponseFailure(new EOFException()))
{
// TODO: just shutdown here, or full close ?
getHttpChannel().getHttpConnection().close();
}
}
@Override
public int getHeaderCacheSize()
{
// TODO get from configuration
return 256;
}
@Override
public boolean startResponse(HttpVersion version, int status, String reason)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
parser.setHeadResponse(exchange.getRequest().getMethod() == HttpMethod.HEAD);
exchange.getResponse().version(version).status(status).reason(reason);
onResponseBegin(exchange);
return false;
}
@Override
public boolean parsedHeader(HttpField field)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
onResponseHeader(exchange, field);
return false;
}
@Override
public boolean headerComplete()
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
onResponseHeaders(exchange);
return false;
}
@Override
public boolean content(ByteBuffer buffer)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
onResponseContent(exchange, buffer);
return false;
}
@Override
public boolean messageComplete()
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false; // TODO: is it correct to return false here ?
onResponseSuccess(exchange);
return true;
}
@Override
public void earlyEOF()
{
failAndClose(new EOFException());
}
@Override
public void badMessage(int status, String reason)
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
{
HttpResponse response = exchange.getResponse();
response.status(status).reason(reason);
failAndClose(new HttpResponseException("HTTP protocol violation: bad response", response));
}
}
@Override
protected void reset()
{
super.reset();
parser.reset();
}
@Override
protected void dispose()
{
super.dispose();
parser.close();
}
private void failAndClose(Throwable failure)
{
onResponseFailure(failure);
getHttpChannel().getHttpConnection().close();
}
}

View File

@ -0,0 +1,236 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpContent;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
public class HttpSenderOverHTTP extends HttpSender
{
private final HttpGenerator generator = new HttpGenerator();
public HttpSenderOverHTTP(HttpChannelOverHTTP channel)
{
super(channel);
}
@Override
public HttpChannelOverHTTP getHttpChannel()
{
return (HttpChannelOverHTTP)super.getHttpChannel();
}
@Override
protected void sendHeaders(HttpExchange exchange, HttpContent content)
{
Request request = exchange.getRequest();
ContentProvider requestContent = request.getContent();
long contentLength = requestContent == null ? -1 : requestContent.getLength();
String path = request.getPath();
String query = request.getQuery();
if (query != null)
path += "?" + query;
HttpGenerator.RequestInfo requestInfo = new HttpGenerator.RequestInfo(request.getVersion(), request.getHeaders(), contentLength, request.getMethod().asString(), path);
try
{
HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
ByteBuffer header = bufferPool.acquire(client.getRequestBufferSize(), false);
ByteBuffer chunk = null;
ByteBuffer contentBuffer = null;
boolean lastContent = false;
if (!expects100Continue(request))
{
content.advance();
contentBuffer = content.getByteBuffer();
lastContent = content.isLast();
}
while (true)
{
HttpGenerator.Result result = generator.generateRequest(requestInfo, header, chunk, contentBuffer, lastContent);
switch (result)
{
case NEED_CHUNK:
{
chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
break;
}
case FLUSH:
{
int size = 1;
boolean hasChunk = chunk != null;
if (hasChunk)
++size;
boolean hasContent = contentBuffer != null;
if (hasContent)
++size;
ByteBuffer[] toWrite = new ByteBuffer[size];
ByteBuffer[] toRecycle = new ByteBuffer[hasChunk ? 2 : 1];
toWrite[0] = header;
toRecycle[0] = header;
if (hasChunk)
{
toWrite[1] = chunk;
toRecycle[1] = chunk;
}
if (hasContent)
toWrite[toWrite.length - 1] = contentBuffer;
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
endPoint.write(new ByteBufferRecyclerCallback(content, bufferPool, toRecycle), toWrite);
return;
}
default:
{
throw new IllegalStateException();
}
}
}
}
catch (Throwable x)
{
LOG.debug(x);
content.failed(x);
}
}
@Override
protected void sendContent(HttpExchange exchange, HttpContent content)
{
try
{
HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
ByteBuffer chunk = null;
while (true)
{
ByteBuffer contentBuffer = content.getByteBuffer();
boolean lastContent = content.isLast();
HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
switch (result)
{
case NEED_CHUNK:
{
chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
break;
}
case FLUSH:
{
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
if (chunk != null)
endPoint.write(new ByteBufferRecyclerCallback(content, bufferPool, chunk), chunk, contentBuffer);
else
endPoint.write(content, contentBuffer);
return;
}
case SHUTDOWN_OUT:
{
shutdownOutput();
break;
}
case CONTINUE:
{
break;
}
case DONE:
{
assert generator.isEnd();
content.succeeded();
return;
}
default:
{
throw new IllegalStateException();
}
}
}
}
catch (IOException x)
{
LOG.debug(x);
content.failed(x);
}
}
@Override
protected void reset()
{
generator.reset();
super.reset();
}
@Override
protected RequestState dispose()
{
generator.abort();
RequestState result = super.dispose();
shutdownOutput();
return result;
}
private void shutdownOutput()
{
getHttpChannel().getHttpConnection().getEndPoint().shutdownOutput();
}
private class ByteBufferRecyclerCallback implements Callback
{
private final Callback callback;
private final ByteBufferPool pool;
private final ByteBuffer[] buffers;
private ByteBufferRecyclerCallback(Callback callback, ByteBufferPool pool, ByteBuffer... buffers)
{
this.callback = callback;
this.pool = pool;
this.buffers = buffers;
}
@Override
public void succeeded()
{
for (ByteBuffer buffer : buffers)
{
assert !buffer.hasRemaining();
pool.release(buffer);
}
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
for (ByteBuffer buffer : buffers)
pool.release(buffer);
callback.failed(x);
}
}
}

View File

@ -0,0 +1,75 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.spdy;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.spdy.api.Session;
public class HttpChannelOverSPDY extends HttpChannel
{
private final Session session;
private final HttpSenderOverSPDY sender;
private final HttpReceiverOverSPDY receiver;
public HttpChannelOverSPDY(HttpDestination destination, Session session)
{
super(destination);
this.session = session;
this.sender = new HttpSenderOverSPDY(this);
this.receiver = new HttpReceiverOverSPDY(this);
}
public Session getSession()
{
return session;
}
public HttpSenderOverSPDY getHttpSender()
{
return sender;
}
public HttpReceiverOverSPDY getHttpReceiver()
{
return receiver;
}
@Override
public void send()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
sender.send(exchange);
}
@Override
public void proceed(HttpExchange exchange, boolean proceed)
{
sender.proceed(exchange, proceed);
}
@Override
public boolean abort(Throwable cause)
{
sender.abort(cause);
return receiver.abort(cause);
}
}

View File

@ -0,0 +1,90 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.spdy;
import java.net.SocketAddress;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.client.SPDYClient;
import org.eclipse.jetty.util.Promise;
public class HttpClientTransportOverSPDY implements HttpClientTransport
{
private final SPDYClient client;
private volatile HttpClient httpClient;
public HttpClientTransportOverSPDY(SPDYClient client)
{
this.client = client;
}
@Override
public void setHttpClient(HttpClient client)
{
httpClient = client;
}
@Override
public HttpDestination newHttpDestination(HttpClient httpClient, String scheme, String host, int port)
{
return new HttpDestinationOverSPDY(httpClient, scheme, host, port);
}
@Override
public void connect(final HttpDestination destination, SocketAddress address, final Promise<Connection> promise)
{
SessionFrameListener.Adapter listener = new SessionFrameListener.Adapter()
{
@Override
public void onException(Throwable x)
{
// TODO: is this correct ?
// TODO: if I get a stream error (e.g. invalid response headers)
// TODO: I must abort the *current* exchange, while below I will abort
// TODO: the queued exchanges only.
// TODO: The problem is that a single destination/connection multiplexes
// TODO: several exchanges, so I would need to cancel them all,
// TODO: or only the one that failed ?
destination.abort(x);
}
};
client.connect(address, listener, new Promise<Session>()
{
@Override
public void succeeded(Session session)
{
Connection result = new HttpConnectionOverSPDY(httpClient, destination, session);
promise.succeeded(result);
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
}
);
}
}

View File

@ -0,0 +1,55 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.spdy;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.util.Callback;
public class HttpConnectionOverSPDY extends HttpConnection
{
private final Session session;
public HttpConnectionOverSPDY(HttpClient client, HttpDestination destination, Session session)
{
super(client, destination);
this.session = session;
}
@Override
protected void send(HttpExchange exchange)
{
normalizeRequest(exchange.getRequest());
// One connection maps to N channels, so for each exchange we create a new channel
HttpChannel channel = new HttpChannelOverSPDY(getHttpDestination(), session);
channel.associate(exchange);
channel.send();
}
@Override
public void close()
{
session.goAway(new GoAwayInfo(), new Callback.Adapter());
}
}

View File

@ -0,0 +1,135 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.spdy;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.Promise;
public class HttpDestinationOverSPDY extends HttpDestination implements Promise<Connection>
{
private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
private volatile HttpConnectionOverSPDY connection;
public HttpDestinationOverSPDY(HttpClient client, String scheme, String host, int port)
{
super(client, scheme, host, port);
}
@Override
protected void send()
{
while (true)
{
ConnectState current = connect.get();
switch (current)
{
case DISCONNECTED:
{
if (!connect.compareAndSet(current, ConnectState.CONNECTING))
continue;
newConnection(this);
return;
}
case CONNECTING:
{
// Waiting to connect, just return
return;
}
case CONNECTED:
{
HttpConnectionOverSPDY connection = this.connection;
if (connection != null)
process(connection, false);
break;
}
default:
{
throw new IllegalStateException();
}
}
}
}
@Override
public void succeeded(Connection result)
{
if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
{
HttpConnectionOverSPDY connection = this.connection = (HttpConnectionOverSPDY)result;
process(connection, true);
}
else
{
result.close();
failed(new IllegalStateException());
}
}
@Override
public void failed(Throwable x)
{
connect.set(ConnectState.DISCONNECTED);
}
private void process(final HttpConnectionOverSPDY connection, boolean dispatch)
{
HttpClient client = getHttpClient();
final HttpExchange exchange = getHttpExchanges().poll();
LOG.debug("Processing exchange {} on connection {}", exchange, connection);
if (exchange != null)
{
final Request request = exchange.getRequest();
Throwable cause = request.getAbortCause();
if (cause != null)
{
abort(exchange, cause);
LOG.debug("Aborted before processing {}: {}", exchange, cause);
}
else
{
if (dispatch)
{
client.getExecutor().execute(new Runnable()
{
@Override
public void run()
{
connection.send(exchange);
}
});
}
else
{
connection.send(exchange);
}
}
}
}
private enum ConnectState
{
DISCONNECTED, CONNECTING, CONNECTED
}
}

View File

@ -0,0 +1,116 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.spdy;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
public class HttpReceiverOverSPDY extends HttpReceiver implements StreamFrameListener
{
public HttpReceiverOverSPDY(HttpChannelOverSPDY channel)
{
super(channel);
}
@Override
public HttpChannelOverSPDY getHttpChannel()
{
return (HttpChannelOverSPDY)super.getHttpChannel();
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
HttpResponse response = exchange.getResponse();
Fields fields = replyInfo.getHeaders();
// TODO: use HTTPSPDYHeader enum
HttpVersion version = HttpVersion.fromString(fields.get(":version").value());
response.version(version);
Integer status = fields.get(":status").valueAsInt();
response.status(status);
response.reason(HttpStatus.getMessage(status));
onResponseBegin(exchange);
for (Fields.Field field : fields)
{
// TODO: handle multiple values properly
// TODO: skip special headers
HttpField httpField = new HttpField(field.name(), field.value());
onResponseHeader(exchange, httpField);
}
onResponseHeaders(exchange);
if (replyInfo.isClose())
{
onResponseSuccess(exchange);
}
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
getHttpChannel().getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM), new Callback.Adapter());
return null;
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
// TODO: see above handling of headers
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
int length = dataInfo.length();
// TODO: avoid data copy here
onResponseContent(exchange, dataInfo.asByteBuffer(false));
dataInfo.consume(length);
if (dataInfo.isClose())
{
onResponseSuccess(exchange);
}
}
}

View File

@ -0,0 +1,91 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.spdy;
import org.eclipse.jetty.client.HttpContent;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise;
public class HttpSenderOverSPDY extends HttpSender
{
private volatile Stream stream;
public HttpSenderOverSPDY(HttpChannelOverSPDY channel)
{
super(channel);
}
@Override
public HttpChannelOverSPDY getHttpChannel()
{
return (HttpChannelOverSPDY)super.getHttpChannel();
}
@Override
protected void sendHeaders(HttpExchange exchange, final HttpContent content)
{
final Request request = exchange.getRequest();
Fields fields = new Fields();
HttpFields headers = request.getHeaders();
for (HttpField header : headers)
fields.add(header.getName(), header.getValue());
SynInfo synInfo = new SynInfo(fields, !content.hasContent());
getHttpChannel().getSession().syn(synInfo, getHttpChannel().getHttpReceiver(), new Promise<Stream>()
{
@Override
public void succeeded(Stream stream)
{
if (content.hasContent())
HttpSenderOverSPDY.this.stream = stream;
content.succeeded();
}
@Override
public void failed(Throwable failure)
{
content.failed(failure);
}
});
}
@Override
protected void sendContent(HttpExchange exchange, HttpContent content)
{
assert stream != null;
ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(content.getByteBuffer(), content.isLast());
stream.data(dataInfo, content);
}
@Override
protected void reset()
{
super.reset();
stream = null;
}
}

View File

@ -86,7 +86,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
.scheme(scheme)
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(contents))
.timeout(5, TimeUnit.SECONDS)
.timeout(555, TimeUnit.SECONDS)
.send();
Assert.assertNotNull(response);
@ -133,7 +133,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
return -1;
}
})
.timeout(5, TimeUnit.SECONDS)
.timeout(555, TimeUnit.SECONDS)
.send();
Assert.assertNotNull(response);
@ -564,40 +564,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
});
final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
final DeferredContentProvider content = new DeferredContentProvider()
{
@Override
public Iterator<ByteBuffer> iterator()
{
final Iterator<ByteBuffer> delegate = super.iterator();
return new Iterator<ByteBuffer>()
{
private int count;
@Override
public boolean hasNext()
{
return delegate.hasNext();
}
@Override
public ByteBuffer next()
{
// Fake that it returns null for two times,
// to trigger particular branches in HttpSender
if (++count <= 2)
return null;
return delegate.next();
}
@Override
public void remove()
{
delegate.remove();
}
};
}
};
final DeferredContentProvider content = new DeferredContentProvider();
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
@ -623,7 +590,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(latch.await(555, TimeUnit.SECONDS));
}
@Test

View File

@ -24,6 +24,9 @@ import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionPool;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.FuturePromise;
@ -56,9 +59,10 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
HttpDestination httpDestination = (HttpDestination)destination;
Assert.assertTrue(httpDestination.getActiveConnections().isEmpty());
Assert.assertTrue(httpDestination.getIdleConnections().isEmpty());
HttpDestinationOverHTTP httpDestination = (HttpDestinationOverHTTP)destination;
HttpConnectionPool connectionPool = httpDestination.getHttpConnectionPool();
Assert.assertTrue(connectionPool.getActiveConnections().isEmpty());
Assert.assertTrue(connectionPool.getIdleConnections().isEmpty());
}
}
@ -84,11 +88,12 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
// Give the connection some time to process the remote close
TimeUnit.SECONDS.sleep(1);
HttpConnection httpConnection = (HttpConnection)connection;
HttpConnectionOverHTTP httpConnection = (HttpConnectionOverHTTP)connection;
Assert.assertFalse(httpConnection.getEndPoint().isOpen());
HttpDestination httpDestination = (HttpDestination)destination;
Assert.assertTrue(httpDestination.getActiveConnections().isEmpty());
Assert.assertTrue(httpDestination.getIdleConnections().isEmpty());
HttpDestinationOverHTTP httpDestination = (HttpDestinationOverHTTP)destination;
HttpConnectionPool connectionPool = httpDestination.getHttpConnectionPool();
Assert.assertTrue(connectionPool.getActiveConnections().isEmpty());
Assert.assertTrue(connectionPool.getIdleConnections().isEmpty());
}
}

View File

@ -37,6 +37,9 @@ import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionPool;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
@ -89,11 +92,12 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
logger.warn("Interrupting test, it is taking too long");
for (String host : Arrays.asList("localhost", "127.0.0.1"))
{
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, connector.getLocalPort());
for (Connection connection : new ArrayList<>(destination.getActiveConnections()))
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, connector.getLocalPort());
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
for (Connection connection : new ArrayList<>(connectionPool.getActiveConnections()))
{
HttpConnection active = (HttpConnection)connection;
logger.warn(active.getEndPoint() + " exchange " + active.getExchange());
HttpConnectionOverHTTP active = (HttpConnectionOverHTTP)connection;
logger.warn(active.getEndPoint() + " exchange " + active.getHttpChannel().getHttpExchange());
}
}
testThread.interrupt();

View File

@ -50,6 +50,9 @@ import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionPool;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpMethod;
@ -81,13 +84,14 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Response response = client.GET(scheme + "://" + host + ":" + port + path);
Assert.assertEquals(200, response.getStatus());
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
long start = System.nanoTime();
HttpConnection connection = null;
HttpConnectionOverHTTP connection = null;
while (connection == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
connection = (HttpConnection)destination.getIdleConnections().peek();
connection = (HttpConnectionOverHTTP)connectionPool.getIdleConnections().peek();
TimeUnit.MILLISECONDS.sleep(10);
}
Assert.assertNotNull(connection);
@ -98,8 +102,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest
client.stop();
Assert.assertEquals(0, client.getDestinations().size());
Assert.assertEquals(0, destination.getIdleConnections().size());
Assert.assertEquals(0, destination.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertFalse(connection.getEndPoint().isOpen());
}
@ -632,8 +636,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Override
public void onBegin(Request request)
{
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
destination.getActiveConnections().peek().close();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
destination.getHttpConnectionPool().getActiveConnections().peek().close();
}
})
.send(new Response.Listener.Empty()

View File

@ -34,6 +34,7 @@ import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.io.EndPoint;
@ -240,7 +241,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
start(new TimeoutHandler(2 * timeout));
client.stop();
final AtomicBoolean sslIdle = new AtomicBoolean();
client = new HttpClient(sslContextFactory)
client = new HttpClient(new HttpClientTransportOverHTTP()
{
@Override
protected SslConnection newSslConnection(HttpClient httpClient, EndPoint endPoint, SSLEngine engine)
@ -255,7 +256,7 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
}
};
}
};
}, sslContextFactory);
client.setIdleTimeout(timeout);
client.start();

View File

@ -33,8 +33,11 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionPool;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.log.Log;
@ -50,6 +53,13 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
super(sslContextFactory);
}
@Override
public void start(Handler handler) throws Exception
{
super.start(handler);
client.setStrictEventOrdering(false);
}
@Test
public void test_SuccessfulRequest_ReturnsConnection() throws Exception
{
@ -57,12 +67,13 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
final BlockingQueue<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch headersLatch = new CountDownLatch(1);
@ -117,12 +128,13 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
final BlockingQueue<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch beginLatch = new CountDownLatch(1);
@ -167,12 +179,13 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
final BlockingQueue<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch successLatch = new CountDownLatch(3);
@ -226,12 +239,13 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
final BlockingQueue<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final long delay = 1000;
@ -298,12 +312,13 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
final BlockingQueue<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
server.stop();
@ -350,12 +365,13 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
final BlockingQueue<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch latch = new CountDownLatch(1);
@ -399,12 +415,13 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
final BlockingQueue<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
Log.getLogger(HttpConnection.class).info("Expecting java.lang.IllegalStateException: HttpParser{s=CLOSED,...");
@ -448,12 +465,13 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
HttpConnectionPool connectionPool = destination.getHttpConnectionPool();
final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
final BlockingQueue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
final BlockingQueue<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
ContentResponse response = client.newRequest(host, port)

View File

@ -18,18 +18,11 @@
package org.eclipse.jetty.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -50,167 +43,167 @@ public class HttpDestinationTest extends AbstractHttpClientServerTest
@Test
public void test_FirstAcquire_WithEmptyQueue() throws Exception
{
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, "http", "localhost", connector.getLocalPort());
Connection connection = destination.acquire();
if (connection == null)
{
// There are no queued requests, so the newly created connection will be idle
connection = destination.getIdleConnections().poll(5, TimeUnit.SECONDS);
connection = destination.getHttpConnectionPool().getIdleConnections().poll(5, TimeUnit.SECONDS);
}
Assert.assertNotNull(connection);
}
@Test
public void test_SecondAcquire_AfterFirstAcquire_WithEmptyQueue_ReturnsSameConnection() throws Exception
{
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
Connection connection1 = destination.acquire();
if (connection1 == null)
{
// There are no queued requests, so the newly created connection will be idle
long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
connection1 = destination.getIdleConnections().peek();
TimeUnit.MILLISECONDS.sleep(50);
}
Assert.assertNotNull(connection1);
Connection connection2 = destination.acquire();
Assert.assertSame(connection1, connection2);
}
}
@Test
public void test_SecondAcquire_ConcurrentWithFirstAcquire_WithEmptyQueue_CreatesTwoConnections() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort())
{
@Override
protected void process(Connection connection, boolean dispatch)
{
try
{
latch.await(5, TimeUnit.SECONDS);
super.process(connection, dispatch);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
};
Connection connection1 = destination.acquire();
// There are no available existing connections, so acquire()
// returns null because we delayed process() above
Assert.assertNull(connection1);
Connection connection2 = destination.acquire();
Assert.assertNull(connection2);
latch.countDown();
// There must be 2 idle connections
Connection connection = destination.getIdleConnections().poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(connection);
connection = destination.getIdleConnections().poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(connection);
}
@Test
public void test_Acquire_Process_Release_Acquire_ReturnsSameConnection() throws Exception
{
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
Connection connection1 = destination.acquire();
if (connection1 == null)
connection1 = destination.getIdleConnections().poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(connection1);
destination.process(connection1, false);
destination.release(connection1);
Connection connection2 = destination.acquire();
Assert.assertSame(connection1, connection2);
}
@Slow
@Test
public void test_IdleConnection_IdleTimeout() throws Exception
{
long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
Connection connection1 = destination.acquire();
if (connection1 == null)
{
// There are no queued requests, so the newly created connection will be idle
long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
connection1 = destination.getIdleConnections().peek();
TimeUnit.MILLISECONDS.sleep(50);
}
Assert.assertNotNull(connection1);
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
connection1 = destination.getIdleConnections().poll();
Assert.assertNull(connection1);
}
}
@Test
public void test_Request_Failed_If_MaxRequestsQueuedPerDestination_Exceeded() throws Exception
{
int maxQueued = 1;
client.setMaxRequestsQueuedPerDestination(maxQueued);
client.setMaxConnectionsPerDestination(1);
// Make one request to open the connection and be sure everything is setup properly
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send();
Assert.assertEquals(200, response.getStatus());
// Send another request that is sent immediately
final CountDownLatch successLatch = new CountDownLatch(1);
final CountDownLatch failureLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onRequestQueued(new Request.QueuedListener()
{
@Override
public void onQueued(Request request)
{
// This request exceeds the maximum queued, should fail
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
Assert.assertThat(result.getRequestFailure(), Matchers.instanceOf(RejectedExecutionException.class));
failureLatch.countDown();
}
});
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
successLatch.countDown();
}
});
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
// @Test
// public void test_SecondAcquire_AfterFirstAcquire_WithEmptyQueue_ReturnsSameConnection() throws Exception
// {
// HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
// Connection connection1 = destination.acquire();
// if (connection1 == null)
// {
// // There are no queued requests, so the newly created connection will be idle
// long start = System.nanoTime();
// while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
// {
// connection1 = destination.getIdleConnections().peek();
// TimeUnit.MILLISECONDS.sleep(50);
// }
// Assert.assertNotNull(connection1);
//
// Connection connection2 = destination.acquire();
// Assert.assertSame(connection1, connection2);
// }
// }
//
// @Test
// public void test_SecondAcquire_ConcurrentWithFirstAcquire_WithEmptyQueue_CreatesTwoConnections() throws Exception
// {
// final CountDownLatch latch = new CountDownLatch(1);
// HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort())
// {
// @Override
// protected void process(Connection connection, boolean dispatch)
// {
// try
// {
// latch.await(5, TimeUnit.SECONDS);
// super.process(connection, dispatch);
// }
// catch (InterruptedException x)
// {
// x.printStackTrace();
// }
// }
// };
// Connection connection1 = destination.acquire();
//
// // There are no available existing connections, so acquire()
// // returns null because we delayed process() above
// Assert.assertNull(connection1);
//
// Connection connection2 = destination.acquire();
// Assert.assertNull(connection2);
//
// latch.countDown();
//
// // There must be 2 idle connections
// Connection connection = destination.getIdleConnections().poll(5, TimeUnit.SECONDS);
// Assert.assertNotNull(connection);
// connection = destination.getIdleConnections().poll(5, TimeUnit.SECONDS);
// Assert.assertNotNull(connection);
// }
//
// @Test
// public void test_Acquire_Process_Release_Acquire_ReturnsSameConnection() throws Exception
// {
// HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
// Connection connection1 = destination.acquire();
// if (connection1 == null)
// connection1 = destination.getIdleConnections().poll(5, TimeUnit.SECONDS);
// Assert.assertNotNull(connection1);
//
// destination.process(connection1, false);
// destination.release(connection1);
//
// Connection connection2 = destination.acquire();
// Assert.assertSame(connection1, connection2);
// }
//
// @Slow
// @Test
// public void test_IdleConnection_IdleTimeout() throws Exception
// {
// long idleTimeout = 1000;
// client.setIdleTimeout(idleTimeout);
//
// HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort());
// Connection connection1 = destination.acquire();
// if (connection1 == null)
// {
// // There are no queued requests, so the newly created connection will be idle
// long start = System.nanoTime();
// while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
// {
// connection1 = destination.getIdleConnections().peek();
// TimeUnit.MILLISECONDS.sleep(50);
// }
// Assert.assertNotNull(connection1);
//
// TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
//
// connection1 = destination.getIdleConnections().poll();
// Assert.assertNull(connection1);
// }
// }
//
// @Test
// public void test_Request_Failed_If_MaxRequestsQueuedPerDestination_Exceeded() throws Exception
// {
// int maxQueued = 1;
// client.setMaxRequestsQueuedPerDestination(maxQueued);
// client.setMaxConnectionsPerDestination(1);
//
// // Make one request to open the connection and be sure everything is setup properly
// ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
// .scheme(scheme)
// .send();
// Assert.assertEquals(200, response.getStatus());
//
// // Send another request that is sent immediately
// final CountDownLatch successLatch = new CountDownLatch(1);
// final CountDownLatch failureLatch = new CountDownLatch(1);
// client.newRequest("localhost", connector.getLocalPort())
// .scheme(scheme)
// .onRequestQueued(new Request.QueuedListener()
// {
// @Override
// public void onQueued(Request request)
// {
// // This request exceeds the maximum queued, should fail
// client.newRequest("localhost", connector.getLocalPort())
// .scheme(scheme)
// .send(new Response.CompleteListener()
// {
// @Override
// public void onComplete(Result result)
// {
// Assert.assertTrue(result.isFailed());
// Assert.assertThat(result.getRequestFailure(), Matchers.instanceOf(RejectedExecutionException.class));
// failureLatch.countDown();
// }
// });
// }
// })
// .send(new Response.CompleteListener()
// {
// @Override
// public void onComplete(Result result)
// {
// if (result.isSucceeded())
// successLatch.countDown();
// }
// });
//
// Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
// }
}

View File

@ -18,227 +18,203 @@
package org.eclipse.jetty.client;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
public class HttpReceiverTest
{
@Rule
public final TestTracker tracker = new TestTracker();
private HttpClient client;
private HttpDestination destination;
private ByteArrayEndPoint endPoint;
private HttpConnection connection;
private HttpConversation conversation;
@Before
public void init() throws Exception
{
client = new HttpClient();
client.start();
destination = new HttpDestination(client, "http", "localhost", 8080);
endPoint = new ByteArrayEndPoint();
connection = new HttpConnection(client, endPoint, destination);
conversation = new HttpConversation(client, 1);
}
@After
public void destroy() throws Exception
{
client.stop();
}
protected HttpExchange newExchange()
{
HttpRequest request = new HttpRequest(client, URI.create("http://localhost"));
FutureResponseListener listener = new FutureResponseListener(request);
HttpExchange exchange = new HttpExchange(conversation, destination, request, Collections.<Response.ResponseListener>singletonList(listener));
conversation.getExchanges().offer(exchange);
connection.associate(exchange);
exchange.requestComplete(null);
exchange.terminateRequest();
return exchange;
}
@Test
public void test_Receive_NoResponseContent() throws Exception
{
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: 0\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.receive();
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals("OK", response.getReason());
Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion());
HttpFields headers = response.getHeaders();
Assert.assertNotNull(headers);
Assert.assertEquals(1, headers.size());
Assert.assertEquals("0", headers.get(HttpHeader.CONTENT_LENGTH));
}
@Test
public void test_Receive_ResponseContent() throws Exception
{
String content = "0123456789ABCDEF";
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: " + content.length() + "\r\n" +
"\r\n" +
content);
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.receive();
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals("OK", response.getReason());
Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion());
HttpFields headers = response.getHeaders();
Assert.assertNotNull(headers);
Assert.assertEquals(1, headers.size());
Assert.assertEquals(String.valueOf(content.length()), headers.get(HttpHeader.CONTENT_LENGTH));
String received = listener.getContentAsString("UTF-8");
Assert.assertEquals(content, received);
}
@Test
public void test_Receive_ResponseContent_EarlyEOF() throws Exception
{
String content1 = "0123456789";
String content2 = "ABCDEF";
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: " + (content1.length() + content2.length()) + "\r\n" +
"\r\n" +
content1);
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.receive();
endPoint.setInputEOF();
connection.receive();
try
{
listener.get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException e)
{
Assert.assertTrue(e.getCause() instanceof EOFException);
}
}
@Test
public void test_Receive_ResponseContent_IdleTimeout() throws Exception
{
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: 1\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.receive();
// Simulate an idle timeout
connection.idleTimeout();
try
{
listener.get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException e)
{
Assert.assertTrue(e.getCause() instanceof TimeoutException);
}
}
@Test
public void test_Receive_BadResponse() throws Exception
{
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-length: A\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.receive();
try
{
listener.get(5, TimeUnit.SECONDS);
Assert.fail();
}
catch (ExecutionException e)
{
Assert.assertTrue(e.getCause() instanceof HttpResponseException);
}
}
@Test
public void test_Receive_GZIPResponseContent_Fragmented() throws Exception
{
byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOutput = new GZIPOutputStream(baos))
{
gzipOutput.write(data);
}
byte[] gzip = baos.toByteArray();
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-Length: " + gzip.length + "\r\n" +
"Content-Encoding: gzip\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.receive();
endPoint.reset();
ByteBuffer buffer = ByteBuffer.wrap(gzip);
int fragment = buffer.limit() - 1;
buffer.limit(fragment);
endPoint.setInput(buffer);
connection.receive();
endPoint.reset();
buffer.limit(gzip.length);
buffer.position(fragment);
endPoint.setInput(buffer);
connection.receive();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(data, response.getContent());
}
// @Rule
// public final TestTracker tracker = new TestTracker();
//
// private HttpClient client;
// private HttpDestination destination;
// private ByteArrayEndPoint endPoint;
// private HttpConnection connection;
// private HttpConversation conversation;
//
// @Before
// public void init() throws Exception
// {
// client = new HttpClient();
// client.start();
// destination = new HttpDestination(client, "http", "localhost", 8080);
// endPoint = new ByteArrayEndPoint();
// connection = new HttpConnection(client, endPoint, destination);
// conversation = new HttpConversation(client, 1);
// }
//
// @After
// public void destroy() throws Exception
// {
// client.stop();
// }
//
// protected HttpExchange newExchange()
// {
// HttpRequest request = new HttpRequest(client, URI.create("http://localhost"));
// FutureResponseListener listener = new FutureResponseListener(request);
// HttpExchange exchange = new HttpExchange(conversation, destination, request, Collections.<Response.ResponseListener>singletonList(listener));
// conversation.getExchanges().offer(exchange);
// connection.associate(exchange);
// exchange.requestComplete();
// exchange.terminateRequest();
// return exchange;
// }
//
// @Test
// public void test_Receive_NoResponseContent() throws Exception
// {
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: 0\r\n" +
// "\r\n");
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
//
// Response response = listener.get(5, TimeUnit.SECONDS);
// Assert.assertNotNull(response);
// Assert.assertEquals(200, response.getStatus());
// Assert.assertEquals("OK", response.getReason());
// Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion());
// HttpFields headers = response.getHeaders();
// Assert.assertNotNull(headers);
// Assert.assertEquals(1, headers.size());
// Assert.assertEquals("0", headers.get(HttpHeader.CONTENT_LENGTH));
// }
//
// @Test
// public void test_Receive_ResponseContent() throws Exception
// {
// String content = "0123456789ABCDEF";
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: " + content.length() + "\r\n" +
// "\r\n" +
// content);
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
//
// Response response = listener.get(5, TimeUnit.SECONDS);
// Assert.assertNotNull(response);
// Assert.assertEquals(200, response.getStatus());
// Assert.assertEquals("OK", response.getReason());
// Assert.assertSame(HttpVersion.HTTP_1_1, response.getVersion());
// HttpFields headers = response.getHeaders();
// Assert.assertNotNull(headers);
// Assert.assertEquals(1, headers.size());
// Assert.assertEquals(String.valueOf(content.length()), headers.get(HttpHeader.CONTENT_LENGTH));
// String received = listener.getContentAsString("UTF-8");
// Assert.assertEquals(content, received);
// }
//
// @Test
// public void test_Receive_ResponseContent_EarlyEOF() throws Exception
// {
// String content1 = "0123456789";
// String content2 = "ABCDEF";
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: " + (content1.length() + content2.length()) + "\r\n" +
// "\r\n" +
// content1);
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
// endPoint.setInputEOF();
// connection.receive();
//
// try
// {
// listener.get(5, TimeUnit.SECONDS);
// Assert.fail();
// }
// catch (ExecutionException e)
// {
// Assert.assertTrue(e.getCause() instanceof EOFException);
// }
// }
//
// @Test
// public void test_Receive_ResponseContent_IdleTimeout() throws Exception
// {
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: 1\r\n" +
// "\r\n");
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
// // Simulate an idle timeout
// connection.idleTimeout();
//
// try
// {
// listener.get(5, TimeUnit.SECONDS);
// Assert.fail();
// }
// catch (ExecutionException e)
// {
// Assert.assertTrue(e.getCause() instanceof TimeoutException);
// }
// }
//
// @Test
// public void test_Receive_BadResponse() throws Exception
// {
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-length: A\r\n" +
// "\r\n");
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
//
// try
// {
// listener.get(5, TimeUnit.SECONDS);
// Assert.fail();
// }
// catch (ExecutionException e)
// {
// Assert.assertTrue(e.getCause() instanceof HttpResponseException);
// }
// }
//
// @Test
// public void test_Receive_GZIPResponseContent_Fragmented() throws Exception
// {
// byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
// ByteArrayOutputStream baos = new ByteArrayOutputStream();
// try (GZIPOutputStream gzipOutput = new GZIPOutputStream(baos))
// {
// gzipOutput.write(data);
// }
// byte[] gzip = baos.toByteArray();
//
// endPoint.setInput("" +
// "HTTP/1.1 200 OK\r\n" +
// "Content-Length: " + gzip.length + "\r\n" +
// "Content-Encoding: gzip\r\n" +
// "\r\n");
// HttpExchange exchange = newExchange();
// FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
// connection.receive();
// endPoint.reset();
//
// ByteBuffer buffer = ByteBuffer.wrap(gzip);
// int fragment = buffer.limit() - 1;
// buffer.limit(fragment);
// endPoint.setInput(buffer);
// connection.receive();
// endPoint.reset();
//
// buffer.limit(gzip.length);
// buffer.position(fragment);
// endPoint.setInput(buffer);
// connection.receive();
//
// ContentResponse response = listener.get(5, TimeUnit.SECONDS);
// Assert.assertNotNull(response);
// Assert.assertEquals(200, response.getStatus());
// Assert.assertArrayEquals(data, response.getContent());
// }
}

View File

@ -18,282 +18,263 @@
package org.eclipse.jetty.client;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
public class HttpSenderTest
{
@Rule
public final TestTracker tracker = new TestTracker();
private HttpClient client;
@Before
public void init() throws Exception
{
client = new HttpClient();
client.start();
}
@After
public void destroy() throws Exception
{
client.stop();
}
@Test
public void test_Send_NoRequestContent() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
request.listener(new Request.Listener.Empty()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(request, (Response.CompleteListener)null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n"));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Slow
@Test
public void test_Send_NoRequestContent_IncompleteFlush() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
connection.send(request, (Response.CompleteListener)null);
// This take will free space in the buffer and allow for the write to complete
StringBuilder builder = new StringBuilder(endPoint.takeOutputString());
// Wait for the write to complete
TimeUnit.SECONDS.sleep(1);
String chunk = endPoint.takeOutputString();
while (chunk.length() > 0)
{
builder.append(chunk);
chunk = endPoint.takeOutputString();
}
String requestString = builder.toString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n"));
}
@Test
public void test_Send_NoRequestContent_Exception() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// Shutdown output to trigger the exception on write
endPoint.shutdownOutput();
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
request.listener(new Request.Listener.Empty()
{
@Override
public void onFailure(Request request, Throwable x)
{
failureLatch.countDown();
}
});
connection.send(request, new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
failureLatch.countDown();
}
});
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_NoRequestContent_IncompleteFlush_Exception() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
request.listener(new Request.Listener.Empty()
{
@Override
public void onFailure(Request request, Throwable x)
{
failureLatch.countDown();
}
});
connection.send(request, new Response.Listener.Empty()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
failureLatch.countDown();
}
});
// Shutdown output to trigger the exception on write
endPoint.shutdownOutput();
// This take will free space in the buffer and allow for the write to complete
// although it will fail because we shut down the output
endPoint.takeOutputString();
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_SmallRequestContent_InOneBuffer() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
String content = "abcdef";
request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes("UTF-8"))));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
request.listener(new Request.Listener.Empty()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(request, (Response.CompleteListener)null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_SmallRequestContent_InTwoBuffers() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "abcdef";
request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8"))));
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
request.listener(new Request.Listener.Empty()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(request, (Response.CompleteListener)null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content1 + content2));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_Send_SmallRequestContent_Chunked_InTwoChunks() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";
String content2 = "ABCDEF";
request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8")))
{
@Override
public long getLength()
{
return -1;
}
});
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(1);
request.listener(new Request.Listener.Empty()
{
@Override
public void onHeaders(Request request)
{
headersLatch.countDown();
}
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
});
connection.send(request, (Response.CompleteListener)null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
String content = Integer.toHexString(content1.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content1 + "\r\n";
content += Integer.toHexString(content2.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content2 + "\r\n";
content += "0\r\n\r\n";
Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content));
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
}
// @Rule
// public final TestTracker tracker = new TestTracker();
//
// private HttpClient client;
//
// @Before
// public void init() throws Exception
// {
// client = new HttpClient();
// client.start();
// }
//
// @After
// public void destroy() throws Exception
// {
// client.stop();
// }
//
// @Test
// public void test_Send_NoRequestContent() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// final CountDownLatch headersLatch = new CountDownLatch(1);
// final CountDownLatch successLatch = new CountDownLatch(1);
// request.listener(new Request.Listener.Empty()
// {
// @Override
// public void onHeaders(Request request)
// {
// headersLatch.countDown();
// }
//
// @Override
// public void onSuccess(Request request)
// {
// successLatch.countDown();
// }
// });
// connection.send(request, (Response.CompleteListener)null);
//
// String requestString = endPoint.takeOutputString();
// Assert.assertTrue(requestString.startsWith("GET "));
// Assert.assertTrue(requestString.endsWith("\r\n\r\n"));
// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
// }
//
// @Slow
// @Test
// public void test_Send_NoRequestContent_IncompleteFlush() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// connection.send(request, (Response.CompleteListener)null);
//
// // This take will free space in the buffer and allow for the write to complete
// StringBuilder builder = new StringBuilder(endPoint.takeOutputString());
//
// // Wait for the write to complete
// TimeUnit.SECONDS.sleep(1);
//
// String chunk = endPoint.takeOutputString();
// while (chunk.length() > 0)
// {
// builder.append(chunk);
// chunk = endPoint.takeOutputString();
// }
//
// String requestString = builder.toString();
// Assert.assertTrue(requestString.startsWith("GET "));
// Assert.assertTrue(requestString.endsWith("\r\n\r\n"));
// }
//
// @Test
// public void test_Send_NoRequestContent_Exception() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// // Shutdown output to trigger the exception on write
// endPoint.shutdownOutput();
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// final CountDownLatch failureLatch = new CountDownLatch(2);
// request.listener(new Request.Listener.Empty()
// {
// @Override
// public void onFailure(Request request, Throwable x)
// {
// failureLatch.countDown();
// }
// });
// connection.send(request, new Response.Listener.Empty()
// {
// @Override
// public void onComplete(Result result)
// {
// Assert.assertTrue(result.isFailed());
// failureLatch.countDown();
// }
// });
//
// Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// }
//
// @Test
// public void test_Send_NoRequestContent_IncompleteFlush_Exception() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// final CountDownLatch failureLatch = new CountDownLatch(2);
// request.listener(new Request.Listener.Empty()
// {
// @Override
// public void onFailure(Request request, Throwable x)
// {
// failureLatch.countDown();
// }
// });
// connection.send(request, new Response.Listener.Empty()
// {
// @Override
// public void onComplete(Result result)
// {
// Assert.assertTrue(result.isFailed());
// failureLatch.countDown();
// }
// });
//
// // Shutdown output to trigger the exception on write
// endPoint.shutdownOutput();
// // This take will free space in the buffer and allow for the write to complete
// // although it will fail because we shut down the output
// endPoint.takeOutputString();
//
// Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// }
//
// @Test
// public void test_Send_SmallRequestContent_InOneBuffer() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// String content = "abcdef";
// request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes("UTF-8"))));
// final CountDownLatch headersLatch = new CountDownLatch(1);
// final CountDownLatch successLatch = new CountDownLatch(1);
// request.listener(new Request.Listener.Empty()
// {
// @Override
// public void onHeaders(Request request)
// {
// headersLatch.countDown();
// }
//
// @Override
// public void onSuccess(Request request)
// {
// successLatch.countDown();
// }
// });
// connection.send(request, (Response.CompleteListener)null);
//
// String requestString = endPoint.takeOutputString();
// Assert.assertTrue(requestString.startsWith("GET "));
// Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content));
// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
// }
//
// @Test
// public void test_Send_SmallRequestContent_InTwoBuffers() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// String content1 = "0123456789";
// String content2 = "abcdef";
// request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8"))));
// final CountDownLatch headersLatch = new CountDownLatch(1);
// final CountDownLatch successLatch = new CountDownLatch(1);
// request.listener(new Request.Listener.Empty()
// {
// @Override
// public void onHeaders(Request request)
// {
// headersLatch.countDown();
// }
//
// @Override
// public void onSuccess(Request request)
// {
// successLatch.countDown();
// }
// });
// connection.send(request, (Response.CompleteListener)null);
//
// String requestString = endPoint.takeOutputString();
// Assert.assertTrue(requestString.startsWith("GET "));
// Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content1 + content2));
// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
// }
//
// @Test
// public void test_Send_SmallRequestContent_Chunked_InTwoChunks() throws Exception
// {
// ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
// HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
// HttpConnection connection = new HttpConnection(client, endPoint, destination);
// Request request = client.newRequest(URI.create("http://localhost/"));
// String content1 = "0123456789";
// String content2 = "ABCDEF";
// request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8")))
// {
// @Override
// public long getLength()
// {
// return -1;
// }
// });
// final CountDownLatch headersLatch = new CountDownLatch(1);
// final CountDownLatch successLatch = new CountDownLatch(1);
// request.listener(new Request.Listener.Empty()
// {
// @Override
// public void onHeaders(Request request)
// {
// headersLatch.countDown();
// }
//
// @Override
// public void onSuccess(Request request)
// {
// successLatch.countDown();
// }
// });
// connection.send(request, (Response.CompleteListener)null);
//
// String requestString = endPoint.takeOutputString();
// Assert.assertTrue(requestString.startsWith("GET "));
// String content = Integer.toHexString(content1.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content1 + "\r\n";
// content += Integer.toHexString(content2.length()).toUpperCase(Locale.ENGLISH) + "\r\n" + content2 + "\r\n";
// content += "0\r\n\r\n";
// Assert.assertTrue(requestString.endsWith("\r\n\r\n" + content));
// Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
// Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
// }
}

View File

@ -1,3 +1,3 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.client.LEVEL=DEBUG
org.eclipse.jetty.client.LEVEL=DEBUG

View File

@ -120,6 +120,7 @@
<module>spdy-client</module>
<module>spdy-server</module>
<module>spdy-http-server</module>
<module>spdy-http-client-transport</module>
<module>spdy-example-webapp</module>
</modules>