Merged branch 'jetty-9.2.x' into 'jetty-9.3.x'.

This commit is contained in:
Simone Bordet 2016-05-12 18:59:40 +02:00
commit 242b6b5f66
11 changed files with 306 additions and 134 deletions

View File

@ -173,7 +173,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
newRequest.onResponseSuccess(r -> client.getAuthenticationStore().addAuthenticationResult(authnResult)); newRequest.onResponseSuccess(r -> client.getAuthenticationStore().addAuthenticationResult(authnResult));
Connection connection = (Connection)request.getAttributes().get(HttpRequest.CONNECTION_ATTRIBUTE); Connection connection = (Connection)request.getAttributes().get(Connection.class.getName());
if (connection != null) if (connection != null)
connection.send(newRequest, null); connection.send(newRequest, null);
else else

View File

@ -559,12 +559,12 @@ public class HttpClient extends ContainerLifeCycle
private void connect(List<InetSocketAddress> socketAddresses, int index, Map<String, Object> context) private void connect(List<InetSocketAddress> socketAddresses, int index, Map<String, Object> context)
{ {
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, new Promise<Connection>() context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<Connection>(promise)
{ {
@Override @Override
public void succeeded(Connection result) public void succeeded(Connection result)
{ {
promise.succeeded(result); getPromise().succeeded(result);
} }
@Override @Override
@ -572,7 +572,7 @@ public class HttpClient extends ContainerLifeCycle
{ {
int nextIndex = index + 1; int nextIndex = index + 1;
if (nextIndex == socketAddresses.size()) if (nextIndex == socketAddresses.size())
promise.failed(x); getPromise().failed(x);
else else
connect(socketAddresses, nextIndex, context); connect(socketAddresses, nextIndex, context);
} }

View File

@ -24,12 +24,15 @@ import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory; import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
@ -40,6 +43,8 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HttpProxy extends ProxyConfiguration.Proxy public class HttpProxy extends ProxyConfiguration.Proxy
{ {
private static final Logger LOG = Log.getLogger(HttpProxy.class);
public HttpProxy(String host, int port) public HttpProxy(String host, int port)
{ {
this(new Origin.Address(host, port), false); this(new Origin.Address(host, port), false);
@ -63,12 +68,11 @@ public class HttpProxy extends ProxyConfiguration.Proxy
return URI.create(new Origin(scheme, getAddress()).asString()); return URI.create(new Origin(scheme, getAddress()).asString());
} }
private static class HttpProxyClientConnectionFactory implements ClientConnectionFactory private class HttpProxyClientConnectionFactory implements ClientConnectionFactory
{ {
private static final Logger LOG = Log.getLogger(HttpProxyClientConnectionFactory.class);
private final ClientConnectionFactory connectionFactory; private final ClientConnectionFactory connectionFactory;
public HttpProxyClientConnectionFactory(ClientConnectionFactory connectionFactory) private HttpProxyClientConnectionFactory(ClientConnectionFactory connectionFactory)
{ {
this.connectionFactory = connectionFactory; this.connectionFactory = connectionFactory;
} }
@ -76,130 +80,218 @@ public class HttpProxy extends ProxyConfiguration.Proxy
@Override @Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{ {
@SuppressWarnings("unchecked") HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY); boolean secure = HttpScheme.HTTPS.is(destination.getScheme());
final ProxyPromise proxyPromise = new ProxyPromise(endPoint, promise, context); SslContextFactory sslContextFactory = destination.getHttpClient().getSslContextFactory();
// Replace the promise with the proxy one if (secure)
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, proxyPromise);
return connectionFactory.newConnection(endPoint, context);
}
/**
* Decides whether to establish a proxy tunnel using HTTP CONNECT.
* It is implemented as a promise because it needs to establish the
* tunnel after the TCP connection is succeeded, and needs to notify
* the nested promise when the tunnel is established (or failed).
*/
private class ProxyPromise implements Promise<Connection>
{
private final EndPoint endPoint;
private final Promise<Connection> promise;
private final Map<String, Object> context;
private ProxyPromise(EndPoint endPoint, Promise<Connection> promise, Map<String, Object> context)
{ {
this.endPoint = endPoint; if (sslContextFactory != null)
this.promise = promise;
this.context = context;
}
@Override
public void succeeded(Connection connection)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
if (HttpScheme.HTTPS.is(destination.getScheme()))
{ {
SslContextFactory sslContextFactory = destination.getHttpClient().getSslContextFactory(); @SuppressWarnings("unchecked")
if (sslContextFactory != null) Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
Promise<Connection> wrapped = promise;
if (promise instanceof Promise.Wrapper)
wrapped = ((Promise.Wrapper<Connection>)promise).unwrap();
if (wrapped instanceof TunnelPromise)
{ {
tunnel(destination, connection); ((TunnelPromise)wrapped).setEndPoint(endPoint);
return connectionFactory.newConnection(endPoint, context);
} }
else else
{ {
String message = String.format("Cannot perform requests over SSL, no %s in %s", // Replace the promise with the proxy promise that creates the tunnel to the server.
SslContextFactory.class.getSimpleName(), HttpClient.class.getSimpleName()); CreateTunnelPromise tunnelPromise = new CreateTunnelPromise(connectionFactory, endPoint, promise, context);
tunnelFailed(new IllegalStateException(message)); context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, tunnelPromise);
return connectionFactory.newConnection(endPoint, context);
} }
} }
else else
{ {
promise.succeeded(connection); throw new IOException("Cannot tunnel request, missing " +
SslContextFactory.class.getName() + " in " + HttpClient.class.getName());
} }
} }
else
@Override
public void failed(Throwable x)
{ {
tunnelFailed(x); return connectionFactory.newConnection(endPoint, context);
}
private void tunnel(HttpDestination destination, final Connection connection)
{
String target = destination.getOrigin().getAddress().asString();
Origin.Address proxyAddress = destination.getConnectAddress();
HttpClient httpClient = destination.getHttpClient();
long connectTimeout = httpClient.getConnectTimeout();
Request connect = httpClient.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST, target)
.idleTimeout(2 * connectTimeout, TimeUnit.MILLISECONDS)
.timeout(connectTimeout, TimeUnit.MILLISECONDS);
// In case the proxy replies with a 407, we want
// to use the same connection for resending the
// request (this time with the Proxy-Authorization
// header), so we save it as an attribute to be
// used to send the next request.
connect.attribute(HttpRequest.CONNECTION_ATTRIBUTE, connection);
connection.send(connect, result ->
{
if (result.isFailed())
{
tunnelFailed(result.getFailure());
}
else
{
Response response = result.getResponse();
if (response.getStatus() == 200)
{
tunnelSucceeded();
}
else
{
tunnelFailed(new HttpResponseException("Received " + response + " for " + result.getRequest(), response));
}
}
});
}
private void tunnelSucceeded()
{
try
{
// Replace the promise back with the original
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
ClientConnectionFactory sslConnectionFactory = new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
HttpConnectionOverHTTP oldConnection = (HttpConnectionOverHTTP)endPoint.getConnection();
org.eclipse.jetty.io.Connection newConnection = sslConnectionFactory.newConnection(endPoint, context);
endPoint.upgrade(newConnection);
if (LOG.isDebugEnabled())
LOG.debug("HTTP tunnel established: {} over {}", oldConnection, newConnection);
}
catch (Throwable x)
{
tunnelFailed(x);
}
}
private void tunnelFailed(Throwable failure)
{
endPoint.close();
promise.failed(failure);
} }
} }
} }
/**
* <p>Creates a tunnel using HTTP CONNECT.</p>
* <p>It is implemented as a promise because it needs to establish the
* tunnel after the TCP connection is succeeded, and needs to notify
* the nested promise when the tunnel is established (or failed).</p>
*/
private class CreateTunnelPromise implements Promise<Connection>
{
private final ClientConnectionFactory connectionFactory;
private final EndPoint endPoint;
private final Promise<Connection> promise;
private final Map<String, Object> context;
private CreateTunnelPromise(ClientConnectionFactory connectionFactory, EndPoint endPoint, Promise<Connection> promise, Map<String, Object> context)
{
this.connectionFactory = connectionFactory;
this.endPoint = endPoint;
this.promise = promise;
this.context = context;
}
@Override
public void succeeded(Connection connection)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
tunnel(destination, connection);
}
@Override
public void failed(Throwable x)
{
tunnelFailed(endPoint, x);
}
private void tunnel(HttpDestination destination, Connection connection)
{
String target = destination.getOrigin().getAddress().asString();
Origin.Address proxyAddress = destination.getConnectAddress();
HttpClient httpClient = destination.getHttpClient();
long connectTimeout = httpClient.getConnectTimeout();
Request connect = httpClient.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST, target)
.idleTimeout(2 * connectTimeout, TimeUnit.MILLISECONDS)
.timeout(connectTimeout, TimeUnit.MILLISECONDS);
final HttpConversation conversation = ((HttpRequest)connect).getConversation();
conversation.setAttribute(EndPoint.class.getName(), endPoint);
connect.attribute(Connection.class.getName(), new ProxyConnection(destination, connection, promise));
connection.send(connect, result ->
{
// The EndPoint may have changed during the conversation, get the latest.
EndPoint endPoint1 = (EndPoint)conversation.getAttribute(EndPoint.class.getName());
if (result.isSucceeded())
{
Response response = result.getResponse();
if (response.getStatus() == HttpStatus.OK_200)
{
tunnelSucceeded(endPoint1);
}
else
{
HttpResponseException failure = new HttpResponseException("Unexpected " + response +
" for " + result.getRequest(), response);
tunnelFailed(endPoint1, failure);
}
}
else
{
tunnelFailed(endPoint1, result.getFailure());
}
});
}
private void tunnelSucceeded(EndPoint endPoint)
{
try
{
// Replace the promise back with the original
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
ClientConnectionFactory sslConnectionFactory =
new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
HttpConnectionOverHTTP oldConnection = (HttpConnectionOverHTTP)endPoint.getConnection();
org.eclipse.jetty.io.Connection newConnection = sslConnectionFactory.newConnection(endPoint, context);
endPoint.upgrade(newConnection);
if (LOG.isDebugEnabled())
LOG.debug("HTTP tunnel established: {} over {}", oldConnection, newConnection);
}
catch (Throwable x)
{
tunnelFailed(endPoint, x);
}
}
private void tunnelFailed(EndPoint endPoint, Throwable failure)
{
endPoint.close();
promise.failed(failure);
}
}
private class ProxyConnection implements Connection
{
private final Destination destination;
private final Connection connection;
private final Promise<Connection> promise;
private ProxyConnection(Destination destination, Connection connection, Promise<Connection> promise)
{
this.destination = destination;
this.connection = connection;
this.promise = promise;
}
@Override
public void send(Request request, Response.CompleteListener listener)
{
if (connection.isClosed())
{
destination.newConnection(new TunnelPromise(request, listener, promise));
}
else
{
connection.send(request, listener);
}
}
@Override
public void close()
{
connection.close();
}
@Override
public boolean isClosed()
{
return connection.isClosed();
}
}
private class TunnelPromise implements Promise<Connection>
{
private final Request request;
private final Response.CompleteListener listener;
private final Promise<Connection> promise;
private TunnelPromise(Request request, Response.CompleteListener listener, Promise<Connection> promise)
{
this.request = request;
this.listener = listener;
this.promise = promise;
}
@Override
public void succeeded(Connection connection)
{
connection.send(request, listener);
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
private void setEndPoint(EndPoint endPoint)
{
HttpConversation conversation = ((HttpRequest)request).getConversation();
conversation.setAttribute(EndPoint.class.getName(), endPoint);
}
}
} }

View File

@ -59,7 +59,6 @@ import org.eclipse.jetty.util.Fields;
public class HttpRequest implements Request public class HttpRequest implements Request
{ {
private static final URI NULL_URI = URI.create("null:0"); private static final URI NULL_URI = URI.create("null:0");
static final String CONNECTION_ATTRIBUTE = HttpRequest.class.getName() + ".connection";
private final HttpFields headers = new HttpFields(); private final HttpFields headers = new HttpFields();
private final Fields params = new Fields(true); private final Fields params = new Fields(true);

View File

@ -45,4 +45,10 @@ public interface Connection extends Closeable
@Override @Override
void close(); void close();
/**
* @return whether this connection has been closed
* @see #close()
*/
boolean isClosed();
} }

View File

@ -90,6 +90,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
promise.succeeded(this); promise.succeeded(this);
} }
@Override
public boolean isClosed() public boolean isClosed()
{ {
return closed.get(); return closed.get();
@ -207,6 +208,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
HttpConnectionOverHTTP.this.close(); HttpConnectionOverHTTP.this.close();
} }
@Override
public boolean isClosed()
{
return HttpConnectionOverHTTP.this.isClosed();
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -61,7 +61,7 @@ public abstract class AbstractAuthentication implements Authentication
public static boolean matchesURI(URI uri1, URI uri2) public static boolean matchesURI(URI uri1, URI uri2)
{ {
String scheme = uri1.getScheme(); String scheme = uri1.getScheme();
if (uri1.getScheme().equalsIgnoreCase(scheme)) if (scheme.equalsIgnoreCase(uri2.getScheme()))
{ {
if (uri1.getHost().equalsIgnoreCase(uri2.getHost())) if (uri1.getHost().equalsIgnoreCase(uri2.getHost()))
{ {

View File

@ -208,11 +208,6 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
destination.release(this); destination.release(this);
} }
public boolean isClosed()
{
return closed.get();
}
@Override @Override
public void close() public void close()
{ {
@ -236,6 +231,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
} }
} }
@Override
public boolean isClosed()
{
return closed.get();
}
protected boolean closeByHTTP(HttpFields fields) protected boolean closeByHTTP(HttpFields fields)
{ {
if (multiplexed) if (multiplexed)
@ -332,6 +333,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
HttpConnectionOverFCGI.this.close(failure); HttpConnectionOverFCGI.this.close(failure);
} }
@Override
public boolean isClosed()
{
return HttpConnectionOverFCGI.this.isClosed();
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -102,6 +102,12 @@ public class HttpConnectionOverHTTP2 extends HttpConnection
} }
} }
@Override
public boolean isClosed()
{
return closed.get();
}
private void abort(Throwable failure) private void abort(Throwable failure)
{ {
for (HttpChannel channel : channels) for (HttpChannel channel : channels)

View File

@ -454,9 +454,8 @@ public class ForwardProxyTLSServerTest
@Test @Test
public void testProxyAuthentication() throws Exception public void testProxyAuthentication() throws Exception
{ {
startTLSServer(new ServerHandler()); final String realm = "test-realm";
String proxyRealm = "ProxyRealm"; testProxyAuthentication(realm, new ConnectHandler()
startProxy(new ConnectHandler()
{ {
@Override @Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
@ -466,27 +465,57 @@ public class ForwardProxyTLSServerTest
{ {
baseRequest.setHandled(true); baseRequest.setHandled(true);
response.setStatus(HttpStatus.PROXY_AUTHENTICATION_REQUIRED_407); response.setStatus(HttpStatus.PROXY_AUTHENTICATION_REQUIRED_407);
response.setHeader(HttpHeader.PROXY_AUTHENTICATE.asString(), "Basic realm=\"" + proxyRealm + "\""); response.setHeader(HttpHeader.PROXY_AUTHENTICATE.asString(), "Basic realm=\"" + realm + "\"");
return; return;
} }
super.handle(target, baseRequest, request, response); super.handle(target, baseRequest, request, response);
} }
}); });
}
@Test
public void testProxyAuthenticationClosesConnection() throws Exception
{
final String realm = "test-realm";
testProxyAuthentication(realm, new ConnectHandler()
{
@Override
protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address)
{
final String header = request.getHeader(HttpHeader.PROXY_AUTHORIZATION.toString());
if (header == null || !header.startsWith("Basic "))
{
response.setHeader(HttpHeader.PROXY_AUTHENTICATE.toString(), "Basic realm=\"" + realm + "\"");
// Returning false adds Connection: close to the 407 response.
return false;
}
else
{
return true;
}
}
});
}
private void testProxyAuthentication(String realm, ConnectHandler connectHandler) throws Exception
{
startTLSServer(new ServerHandler());
startProxy(connectHandler);
HttpClient httpClient = new HttpClient(newSslContextFactory()); HttpClient httpClient = new HttpClient(newSslContextFactory());
httpClient.getProxyConfiguration().getProxies().add(newHttpProxy()); httpClient.getProxyConfiguration().getProxies().add(newHttpProxy());
URI proxyURI = URI.create("https://localhost:" + proxyConnector.getLocalPort()); URI uri = URI.create((proxySslContextFactory == null ? "http" : "https") + "://localhost:" + proxyConnector.getLocalPort());
httpClient.getAuthenticationStore().addAuthentication(new BasicAuthentication(proxyURI, proxyRealm, "proxyUser", "proxyPassword")); httpClient.getAuthenticationStore().addAuthentication(new BasicAuthentication(uri, realm, "proxyUser", "proxyPassword"));
httpClient.start(); httpClient.start();
try try
{ {
String host = "localhost";
String body = "BODY"; String body = "BODY";
ContentResponse response = httpClient.newRequest("localhost", serverConnector.getLocalPort()) ContentResponse response = httpClient.newRequest(host, serverConnector.getLocalPort())
.scheme(HttpScheme.HTTPS.asString()) .scheme(HttpScheme.HTTPS.asString())
.method(HttpMethod.GET) .method(HttpMethod.GET)
.path("/echo") .path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
.param("body", body)
.send(); .send();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); Assert.assertEquals(HttpStatus.OK_200, response.getStatus());

View File

@ -42,7 +42,6 @@ public interface Promise<C>
*/ */
public void failed(Throwable x); public void failed(Throwable x);
/** /**
* <p>Empty implementation of {@link Promise}</p> * <p>Empty implementation of {@link Promise}</p>
* *
@ -62,4 +61,31 @@ public interface Promise<C>
} }
} }
public static abstract class Wrapper<W> implements Promise<W>
{
private final Promise<W> promise;
public Wrapper(Promise<W> promise)
{
this.promise = promise;
}
public Promise<W> getPromise()
{
return promise;
}
public Promise<W> unwrap()
{
Promise<W> result = promise;
while (true)
{
if (result instanceof Wrapper)
result = ((Wrapper<W>)result).unwrap();
else
break;
}
return result;
}
}
} }