Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.

This commit is contained in:
Simone Bordet 2016-05-12 19:30:50 +02:00
commit 922319c3a3
12 changed files with 307 additions and 139 deletions

View File

@ -173,7 +173,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
newRequest.onResponseSuccess(r -> client.getAuthenticationStore().addAuthenticationResult(authnResult));
Connection connection = (Connection)request.getAttributes().get(HttpRequest.CONNECTION_ATTRIBUTE);
Connection connection = (Connection)request.getAttributes().get(Connection.class.getName());
if (connection != null)
connection.send(newRequest, null);
else

View File

@ -556,12 +556,12 @@ public class HttpClient extends ContainerLifeCycle
private void connect(List<InetSocketAddress> socketAddresses, int index, Map<String, Object> context)
{
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, new Promise<Connection>()
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<Connection>(promise)
{
@Override
public void succeeded(Connection result)
{
promise.succeeded(result);
getPromise().succeeded(result);
}
@Override
@ -569,7 +569,7 @@ public class HttpClient extends ContainerLifeCycle
{
int nextIndex = index + 1;
if (nextIndex == socketAddresses.size())
promise.failed(x);
getPromise().failed(x);
else
connect(socketAddresses, nextIndex, context);
}

View File

@ -24,12 +24,15 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
@ -40,6 +43,8 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HttpProxy extends ProxyConfiguration.Proxy
{
private static final Logger LOG = Log.getLogger(HttpProxy.class);
public HttpProxy(String host, int port)
{
this(new Origin.Address(host, port), false);
@ -63,12 +68,11 @@ public class HttpProxy extends ProxyConfiguration.Proxy
return URI.create(new Origin(scheme, getAddress()).asString());
}
private static class HttpProxyClientConnectionFactory implements ClientConnectionFactory
private class HttpProxyClientConnectionFactory implements ClientConnectionFactory
{
private static final Logger LOG = Log.getLogger(HttpProxyClientConnectionFactory.class);
private final ClientConnectionFactory connectionFactory;
public HttpProxyClientConnectionFactory(ClientConnectionFactory connectionFactory)
private HttpProxyClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
this.connectionFactory = connectionFactory;
}
@ -76,130 +80,217 @@ public class HttpProxy extends ProxyConfiguration.Proxy
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
final ProxyPromise proxyPromise = new ProxyPromise(endPoint, promise, context);
// Replace the promise with the proxy one
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, proxyPromise);
return connectionFactory.newConnection(endPoint, context);
}
/**
* Decides whether to establish a proxy tunnel using HTTP CONNECT.
* It is implemented as a promise because it needs to establish the
* tunnel after the TCP connection is succeeded, and needs to notify
* the nested promise when the tunnel is established (or failed).
*/
private class ProxyPromise implements Promise<Connection>
{
private final EndPoint endPoint;
private final Promise<Connection> promise;
private final Map<String, Object> context;
private ProxyPromise(EndPoint endPoint, Promise<Connection> promise, Map<String, Object> context)
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
SslContextFactory sslContextFactory = destination.getHttpClient().getSslContextFactory();
if (destination.isSecure())
{
this.endPoint = endPoint;
this.promise = promise;
this.context = context;
}
@Override
public void succeeded(Connection connection)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
if (destination.isSecure())
if (sslContextFactory != null)
{
SslContextFactory sslContextFactory = destination.getHttpClient().getSslContextFactory();
if (sslContextFactory != null)
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
Promise<Connection> wrapped = promise;
if (promise instanceof Promise.Wrapper)
wrapped = ((Promise.Wrapper<Connection>)promise).unwrap();
if (wrapped instanceof TunnelPromise)
{
tunnel(destination, connection);
((TunnelPromise)wrapped).setEndPoint(endPoint);
return connectionFactory.newConnection(endPoint, context);
}
else
{
String message = String.format("Cannot perform requests over SSL, no %s in %s",
SslContextFactory.class.getSimpleName(), HttpClient.class.getSimpleName());
tunnelFailed(new IllegalStateException(message));
// Replace the promise with the proxy promise that creates the tunnel to the server.
CreateTunnelPromise tunnelPromise = new CreateTunnelPromise(connectionFactory, endPoint, promise, context);
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, tunnelPromise);
return connectionFactory.newConnection(endPoint, context);
}
}
else
{
promise.succeeded(connection);
throw new IOException("Cannot tunnel request, missing " +
SslContextFactory.class.getName() + " in " + HttpClient.class.getName());
}
}
@Override
public void failed(Throwable x)
else
{
tunnelFailed(x);
}
private void tunnel(HttpDestination destination, final Connection connection)
{
String target = destination.getOrigin().getAddress().asString();
Origin.Address proxyAddress = destination.getConnectAddress();
HttpClient httpClient = destination.getHttpClient();
long connectTimeout = httpClient.getConnectTimeout();
Request connect = httpClient.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST, target)
.idleTimeout(2 * connectTimeout, TimeUnit.MILLISECONDS)
.timeout(connectTimeout, TimeUnit.MILLISECONDS);
// In case the proxy replies with a 407, we want
// to use the same connection for resending the
// request (this time with the Proxy-Authorization
// header), so we save it as an attribute to be
// used to send the next request.
connect.attribute(HttpRequest.CONNECTION_ATTRIBUTE, connection);
connection.send(connect, result ->
{
if (result.isFailed())
{
tunnelFailed(result.getFailure());
}
else
{
Response response = result.getResponse();
if (response.getStatus() == 200)
{
tunnelSucceeded();
}
else
{
tunnelFailed(new HttpResponseException("Received " + response + " for " + result.getRequest(), response));
}
}
});
}
private void tunnelSucceeded()
{
try
{
// Replace the promise back with the original
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
ClientConnectionFactory sslConnectionFactory = new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
HttpConnectionOverHTTP oldConnection = (HttpConnectionOverHTTP)endPoint.getConnection();
org.eclipse.jetty.io.Connection newConnection = sslConnectionFactory.newConnection(endPoint, context);
endPoint.upgrade(newConnection);
if (LOG.isDebugEnabled())
LOG.debug("HTTP tunnel established: {} over {}", oldConnection, newConnection);
}
catch (Throwable x)
{
tunnelFailed(x);
}
}
private void tunnelFailed(Throwable failure)
{
endPoint.close();
promise.failed(failure);
return connectionFactory.newConnection(endPoint, context);
}
}
}
/**
* <p>Creates a tunnel using HTTP CONNECT.</p>
* <p>It is implemented as a promise because it needs to establish the
* tunnel after the TCP connection is succeeded, and needs to notify
* the nested promise when the tunnel is established (or failed).</p>
*/
private class CreateTunnelPromise implements Promise<Connection>
{
private final ClientConnectionFactory connectionFactory;
private final EndPoint endPoint;
private final Promise<Connection> promise;
private final Map<String, Object> context;
private CreateTunnelPromise(ClientConnectionFactory connectionFactory, EndPoint endPoint, Promise<Connection> promise, Map<String, Object> context)
{
this.connectionFactory = connectionFactory;
this.endPoint = endPoint;
this.promise = promise;
this.context = context;
}
@Override
public void succeeded(Connection connection)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
tunnel(destination, connection);
}
@Override
public void failed(Throwable x)
{
tunnelFailed(endPoint, x);
}
private void tunnel(HttpDestination destination, Connection connection)
{
String target = destination.getOrigin().getAddress().asString();
Origin.Address proxyAddress = destination.getConnectAddress();
HttpClient httpClient = destination.getHttpClient();
long connectTimeout = httpClient.getConnectTimeout();
Request connect = httpClient.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST, target)
.idleTimeout(2 * connectTimeout, TimeUnit.MILLISECONDS)
.timeout(connectTimeout, TimeUnit.MILLISECONDS);
final HttpConversation conversation = ((HttpRequest)connect).getConversation();
conversation.setAttribute(EndPoint.class.getName(), endPoint);
connect.attribute(Connection.class.getName(), new ProxyConnection(destination, connection, promise));
connection.send(connect, result ->
{
// The EndPoint may have changed during the conversation, get the latest.
EndPoint endPoint1 = (EndPoint)conversation.getAttribute(EndPoint.class.getName());
if (result.isSucceeded())
{
Response response = result.getResponse();
if (response.getStatus() == HttpStatus.OK_200)
{
tunnelSucceeded(endPoint1);
}
else
{
HttpResponseException failure = new HttpResponseException("Unexpected " + response +
" for " + result.getRequest(), response);
tunnelFailed(endPoint1, failure);
}
}
else
{
tunnelFailed(endPoint1, result.getFailure());
}
});
}
private void tunnelSucceeded(EndPoint endPoint)
{
try
{
// Replace the promise back with the original
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
ClientConnectionFactory sslConnectionFactory =
new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
HttpConnectionOverHTTP oldConnection = (HttpConnectionOverHTTP)endPoint.getConnection();
org.eclipse.jetty.io.Connection newConnection = sslConnectionFactory.newConnection(endPoint, context);
endPoint.upgrade(newConnection);
if (LOG.isDebugEnabled())
LOG.debug("HTTP tunnel established: {} over {}", oldConnection, newConnection);
}
catch (Throwable x)
{
tunnelFailed(endPoint, x);
}
}
private void tunnelFailed(EndPoint endPoint, Throwable failure)
{
endPoint.close();
promise.failed(failure);
}
}
private class ProxyConnection implements Connection
{
private final Destination destination;
private final Connection connection;
private final Promise<Connection> promise;
private ProxyConnection(Destination destination, Connection connection, Promise<Connection> promise)
{
this.destination = destination;
this.connection = connection;
this.promise = promise;
}
@Override
public void send(Request request, Response.CompleteListener listener)
{
if (connection.isClosed())
{
destination.newConnection(new TunnelPromise(request, listener, promise));
}
else
{
connection.send(request, listener);
}
}
@Override
public void close()
{
connection.close();
}
@Override
public boolean isClosed()
{
return connection.isClosed();
}
}
private class TunnelPromise implements Promise<Connection>
{
private final Request request;
private final Response.CompleteListener listener;
private final Promise<Connection> promise;
private TunnelPromise(Request request, Response.CompleteListener listener, Promise<Connection> promise)
{
this.request = request;
this.listener = listener;
this.promise = promise;
}
@Override
public void succeeded(Connection connection)
{
connection.send(request, listener);
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
private void setEndPoint(EndPoint endPoint)
{
HttpConversation conversation = ((HttpRequest)request).getConversation();
conversation.setAttribute(EndPoint.class.getName(), endPoint);
}
}
}

View File

@ -59,7 +59,6 @@ import org.eclipse.jetty.util.Fields;
public class HttpRequest implements Request
{
private static final URI NULL_URI = URI.create("null:0");
static final String CONNECTION_ATTRIBUTE = HttpRequest.class.getName() + ".connection";
private final HttpFields headers = new HttpFields();
private final Fields params = new Fields(true);

View File

@ -45,4 +45,10 @@ public interface Connection extends Closeable
@Override
void close();
/**
* @return whether this connection has been closed
* @see #close()
*/
boolean isClosed();
}

View File

@ -91,6 +91,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
promise.succeeded(this);
}
@Override
public boolean isClosed()
{
return closed.get();
@ -220,6 +221,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
HttpConnectionOverHTTP.this.close();
}
@Override
public boolean isClosed()
{
return HttpConnectionOverHTTP.this.isClosed();
}
@Override
public String toString()
{

View File

@ -61,7 +61,7 @@ public abstract class AbstractAuthentication implements Authentication
public static boolean matchesURI(URI uri1, URI uri2)
{
String scheme = uri1.getScheme();
if (uri1.getScheme().equalsIgnoreCase(scheme))
if (scheme.equalsIgnoreCase(uri2.getScheme()))
{
if (uri1.getHost().equalsIgnoreCase(uri2.getHost()))
{

View File

@ -208,11 +208,6 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
destination.release(this);
}
public boolean isClosed()
{
return closed.get();
}
@Override
public void close()
{
@ -236,6 +231,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
}
}
@Override
public boolean isClosed()
{
return closed.get();
}
protected boolean closeByHTTP(HttpFields fields)
{
if (multiplexed)
@ -332,6 +333,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
HttpConnectionOverFCGI.this.close(failure);
}
@Override
public boolean isClosed()
{
return HttpConnectionOverFCGI.this.isClosed();
}
@Override
public String toString()
{

View File

@ -87,11 +87,6 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
return false;
}
public boolean isClosed()
{
return closed.get();
}
@Override
public void close()
{
@ -110,6 +105,12 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
}
}
@Override
public boolean isClosed()
{
return closed.get();
}
private void abort(Throwable failure)
{
for (HttpChannel channel : channels)

View File

@ -454,9 +454,8 @@ public class ForwardProxyTLSServerTest
@Test
public void testProxyAuthentication() throws Exception
{
startTLSServer(new ServerHandler());
String proxyRealm = "ProxyRealm";
startProxy(new ConnectHandler()
final String realm = "test-realm";
testProxyAuthentication(realm, new ConnectHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -466,27 +465,57 @@ public class ForwardProxyTLSServerTest
{
baseRequest.setHandled(true);
response.setStatus(HttpStatus.PROXY_AUTHENTICATION_REQUIRED_407);
response.setHeader(HttpHeader.PROXY_AUTHENTICATE.asString(), "Basic realm=\"" + proxyRealm + "\"");
response.setHeader(HttpHeader.PROXY_AUTHENTICATE.asString(), "Basic realm=\"" + realm + "\"");
return;
}
super.handle(target, baseRequest, request, response);
}
});
}
@Test
public void testProxyAuthenticationClosesConnection() throws Exception
{
final String realm = "test-realm";
testProxyAuthentication(realm, new ConnectHandler()
{
@Override
protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address)
{
final String header = request.getHeader(HttpHeader.PROXY_AUTHORIZATION.toString());
if (header == null || !header.startsWith("Basic "))
{
response.setHeader(HttpHeader.PROXY_AUTHENTICATE.toString(), "Basic realm=\"" + realm + "\"");
// Returning false adds Connection: close to the 407 response.
return false;
}
else
{
return true;
}
}
});
}
private void testProxyAuthentication(String realm, ConnectHandler connectHandler) throws Exception
{
startTLSServer(new ServerHandler());
startProxy(connectHandler);
HttpClient httpClient = new HttpClient(newSslContextFactory());
httpClient.getProxyConfiguration().getProxies().add(newHttpProxy());
URI proxyURI = URI.create("https://localhost:" + proxyConnector.getLocalPort());
httpClient.getAuthenticationStore().addAuthentication(new BasicAuthentication(proxyURI, proxyRealm, "proxyUser", "proxyPassword"));
URI uri = URI.create((proxySslContextFactory == null ? "http" : "https") + "://localhost:" + proxyConnector.getLocalPort());
httpClient.getAuthenticationStore().addAuthentication(new BasicAuthentication(uri, realm, "proxyUser", "proxyPassword"));
httpClient.start();
try
{
String host = "localhost";
String body = "BODY";
ContentResponse response = httpClient.newRequest("localhost", serverConnector.getLocalPort())
ContentResponse response = httpClient.newRequest(host, serverConnector.getLocalPort())
.scheme(HttpScheme.HTTPS.asString())
.method(HttpMethod.GET)
.path("/echo")
.param("body", body)
.path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
.send();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());

View File

@ -118,4 +118,32 @@ public interface Promise<C>
completeExceptionally(x);
}
}
public static abstract class Wrapper<W> implements Promise<W>
{
private final Promise<W> promise;
public Wrapper(Promise<W> promise)
{
this.promise = promise;
}
public Promise<W> getPromise()
{
return promise;
}
public Promise<W> unwrap()
{
Promise<W> result = promise;
while (true)
{
if (result instanceof Wrapper)
result = ((Wrapper<W>)result).unwrap();
else
break;
}
return result;
}
}
}

View File

@ -439,7 +439,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
* <p>Returns whether this thread pool is low on threads.</p>
* <p>The current formula is:</p>
* <pre>
* maxThreads - threads + idleThreads - queueSize <= lowThreadsThreshold
* maxThreads - threads + idleThreads - queueSize &lt;= lowThreadsThreshold
* </pre>
*
* @return whether the pool is low on threads