Fixes #560 - Jetty Client Proxy Authentication does not work with HTTP Proxy tunneling.

The issue was related to the fact that the proxy responded 407 with a
Connection: close header.

Because the endPoint underlying the original connection was closed,
it should not have been used as a tunnel.
Rather, the endPoint of the new CONNECT attempt (with the proxy
credentials) must be used for the tunnel.

Also partially backported the fix for #408.
This commit is contained in:
Simone Bordet 2016-05-12 17:19:04 +02:00
parent 2d3d71ea23
commit f3675dbad7
8 changed files with 367 additions and 192 deletions

View File

@ -26,12 +26,14 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -114,12 +116,12 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
Authentication authentication = null;
Authentication.HeaderInfo headerInfo = null;
URI uri = getAuthenticationURI(request);
if (uri != null)
URI authURI = getAuthenticationURI(request);
if (authURI != null)
{
for (Authentication.HeaderInfo element : headerInfos)
{
authentication = client.getAuthenticationStore().findAuthentication(element.getType(), uri, element.getRealm());
authentication = client.getAuthenticationStore().findAuthentication(element.getType(), authURI, element.getRealm());
if (authentication != null)
{
headerInfo = element;
@ -148,8 +150,23 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
conversation.setAttribute(AUTHENTICATION_ATTRIBUTE, true);
Request newRequest = client.copyRequest(request, request.getURI());
URI requestURI = request.getURI();
String path = null;
if (HttpMethod.CONNECT.is(request.getMethod()))
{
String uri = request.getScheme() + "://" + request.getHost();
int port = request.getPort();
if (port > 0)
uri += ":" + port;
requestURI = URI.create(uri);
path = request.getPath();
}
Request newRequest = client.copyRequest(request, requestURI);
if (path != null)
newRequest.path(path);
authnResult.apply(newRequest);
newRequest.onResponseSuccess(new Response.SuccessListener()
{
@Override
@ -157,7 +174,13 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
{
client.getAuthenticationStore().addAuthenticationResult(authnResult);
}
}).send(null);
});
Connection connection = (Connection)request.getAttributes().get(Connection.class.getName());
if (connection != null)
connection.send(newRequest, null);
else
newRequest.send(null);
}
catch (Throwable x)
{

View File

@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
@ -31,6 +32,7 @@ import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
@ -41,6 +43,8 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HttpProxy extends ProxyConfiguration.Proxy
{
private static final Logger LOG = Log.getLogger(HttpProxy.class);
public HttpProxy(String host, int port)
{
this(new Origin.Address(host, port), false);
@ -64,12 +68,11 @@ public class HttpProxy extends ProxyConfiguration.Proxy
return URI.create(new Origin(scheme, getAddress()).asString());
}
public static class HttpProxyClientConnectionFactory implements ClientConnectionFactory
private class HttpProxyClientConnectionFactory implements ClientConnectionFactory
{
private static final Logger LOG = Log.getLogger(HttpProxyClientConnectionFactory.class);
private final ClientConnectionFactory connectionFactory;
public HttpProxyClientConnectionFactory(ClientConnectionFactory connectionFactory)
private HttpProxyClientConnectionFactory(ClientConnectionFactory connectionFactory)
{
this.connectionFactory = connectionFactory;
}
@ -77,129 +80,221 @@ public class HttpProxy extends ProxyConfiguration.Proxy
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
final ProxyPromise proxyPromise = new ProxyPromise(endPoint, promise, context);
// Replace the promise with the proxy one
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, proxyPromise);
return connectionFactory.newConnection(endPoint, context);
}
/**
* Decides whether to establish a proxy tunnel using HTTP CONNECT.
* It is implemented as a promise because it needs to establish the
* tunnel after the TCP connection is succeeded, and needs to notify
* the nested promise when the tunnel is established (or failed).
*/
private class ProxyPromise implements Promise<Connection>
{
private final EndPoint endPoint;
private final Promise<Connection> promise;
private final Map<String, Object> context;
private ProxyPromise(EndPoint endPoint, Promise<Connection> promise, Map<String, Object> context)
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
boolean secure = HttpScheme.HTTPS.is(destination.getScheme());
SslContextFactory sslContextFactory = destination.getHttpClient().getSslContextFactory();
if (secure)
{
this.endPoint = endPoint;
this.promise = promise;
this.context = context;
}
@Override
public void succeeded(Connection connection)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
if (HttpScheme.HTTPS.is(destination.getScheme()))
if (sslContextFactory != null)
{
SslContextFactory sslContextFactory = destination.getHttpClient().getSslContextFactory();
if (sslContextFactory != null)
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
if (promise instanceof TunnelPromise)
{
tunnel(destination, connection);
((TunnelPromise)promise).setEndPoint(endPoint);
return connectionFactory.newConnection(endPoint, context);
}
else
{
String message = String.format("Cannot perform requests over SSL, no %s in %s",
SslContextFactory.class.getSimpleName(), HttpClient.class.getSimpleName());
promise.failed(new IllegalStateException(message));
// Replace the promise with the proxy promise that creates the tunnel to the server.
CreateTunnelPromise tunnelPromise = new CreateTunnelPromise(connectionFactory, endPoint, promise, context);
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, tunnelPromise);
return connectionFactory.newConnection(endPoint, context);
}
}
else
{
promise.succeeded(connection);
throw new IOException("Cannot tunnel request, missing " +
SslContextFactory.class.getName() + " in " + HttpClient.class.getName());
}
}
@Override
public void failed(Throwable x)
else
{
promise.failed(x);
}
private void tunnel(HttpDestination destination, final Connection connection)
{
String target = destination.getOrigin().getAddress().asString();
Origin.Address proxyAddress = destination.getConnectAddress();
HttpClient httpClient = destination.getHttpClient();
Request connect = httpClient.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
.scheme(HttpScheme.HTTP.asString())
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST, target)
.timeout(httpClient.getConnectTimeout(), TimeUnit.MILLISECONDS);
connection.send(connect, new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
{
tunnelFailed(result.getFailure());
}
else
{
Response response = result.getResponse();
if (response.getStatus() == 200)
{
tunnelSucceeded();
}
else
{
tunnelFailed(new HttpResponseException("Received " + response + " for " + result.getRequest(), response));
}
}
}
});
}
private void tunnelSucceeded()
{
try
{
// Replace the promise back with the original
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
ClientConnectionFactory sslConnectionFactory = new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
HttpConnectionOverHTTP oldConnection = (HttpConnectionOverHTTP)endPoint.getConnection();
org.eclipse.jetty.io.Connection newConnection = sslConnectionFactory.newConnection(endPoint, context);
Helper.replaceConnection(oldConnection, newConnection);
// Avoid setting fill interest in the old Connection,
// without closing the underlying EndPoint.
oldConnection.softClose();
if (LOG.isDebugEnabled())
LOG.debug("HTTP tunnel established: {} over {}", oldConnection, newConnection);
}
catch (Throwable x)
{
tunnelFailed(x);
}
}
private void tunnelFailed(Throwable failure)
{
endPoint.close();
failed(failure);
return connectionFactory.newConnection(endPoint, context);
}
}
}
/**
* <p>Creates a tunnel using HTTP CONNECT.</p>
* <p>It is implemented as a promise because it needs to establish the
* tunnel after the TCP connection is succeeded, and needs to notify
* the nested promise when the tunnel is established (or failed).</p>
*/
private class CreateTunnelPromise implements Promise<Connection>
{
private final ClientConnectionFactory connectionFactory;
private final EndPoint endPoint;
private final Promise<Connection> promise;
private final Map<String, Object> context;
private CreateTunnelPromise(ClientConnectionFactory connectionFactory, EndPoint endPoint, Promise<Connection> promise, Map<String, Object> context)
{
this.connectionFactory = connectionFactory;
this.endPoint = endPoint;
this.promise = promise;
this.context = context;
}
@Override
public void succeeded(Connection connection)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
tunnel(destination, connection);
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
private void tunnel(HttpDestination destination, Connection connection)
{
String target = destination.getOrigin().getAddress().asString();
Origin.Address proxyAddress = destination.getConnectAddress();
HttpClient httpClient = destination.getHttpClient();
Request connect = httpClient.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
.scheme(HttpScheme.HTTP.asString())
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST, target)
.timeout(httpClient.getConnectTimeout(), TimeUnit.MILLISECONDS);
final HttpConversation conversation = ((HttpRequest)connect).getConversation();
conversation.setAttribute(EndPoint.class.getName(), endPoint);
connect.attribute(Connection.class.getName(), new ProxyConnection(destination, connection, promise));
connection.send(connect, new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
// The EndPoint may have changed during the conversation, get the latest.
EndPoint endPoint = (EndPoint)conversation.getAttribute(EndPoint.class.getName());
if (result.isSucceeded())
{
Response response = result.getResponse();
if (response.getStatus() == HttpStatus.OK_200)
{
tunnelSucceeded(endPoint);
}
else
{
HttpResponseException failure = new HttpResponseException("Unexpected " + response +
" for " + result.getRequest(), response);
tunnelFailed(endPoint, failure);
}
}
else
{
tunnelFailed(endPoint, result.getFailure());
}
}
});
}
private void tunnelSucceeded(EndPoint endPoint)
{
try
{
// Replace the promise back with the original
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise);
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();
ClientConnectionFactory sslConnectionFactory =
new SslClientConnectionFactory(client.getSslContextFactory(), client.getByteBufferPool(), client.getExecutor(), connectionFactory);
HttpConnectionOverHTTP oldConnection = (HttpConnectionOverHTTP)endPoint.getConnection();
org.eclipse.jetty.io.Connection newConnection = sslConnectionFactory.newConnection(endPoint, context);
ClientConnectionFactory.Helper.replaceConnection(oldConnection, newConnection);
// Avoid setting fill interest in the old Connection,
// without closing the underlying EndPoint.
oldConnection.softClose();
if (LOG.isDebugEnabled())
LOG.debug("HTTP tunnel established: {} over {}", oldConnection, newConnection);
}
catch (Throwable x)
{
tunnelFailed(endPoint, x);
}
}
private void tunnelFailed(EndPoint endPoint, Throwable failure)
{
endPoint.close();
failed(failure);
}
}
private class ProxyConnection implements Connection
{
private final Destination destination;
private final Connection connection;
private final Promise<Connection> promise;
private ProxyConnection(Destination destination, Connection connection, Promise<Connection> promise)
{
this.destination = destination;
this.connection = connection;
this.promise = promise;
}
@Override
public void send(Request request, Response.CompleteListener listener)
{
if (connection.isClosed())
{
destination.newConnection(new TunnelPromise(request, listener, promise));
}
else
{
connection.send(request, listener);
}
}
@Override
public void close()
{
connection.close();
}
@Override
public boolean isClosed()
{
return connection.isClosed();
}
}
private class TunnelPromise implements Promise<Connection>
{
private final Request request;
private final Response.CompleteListener listener;
private final Promise<Connection> promise;
private TunnelPromise(Request request, Response.CompleteListener listener, Promise<Connection> promise)
{
this.request = request;
this.listener = listener;
this.promise = promise;
}
@Override
public void succeeded(Connection connection)
{
connection.send(request, listener);
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
private void setEndPoint(EndPoint endPoint)
{
HttpConversation conversation = ((HttpRequest)request).getConversation();
conversation.setAttribute(EndPoint.class.getName(), endPoint);
}
}
}

View File

@ -191,7 +191,7 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
{
super.close(oldConnection);
connectionPool.remove(oldConnection);
boolean removed = connectionPool.remove(oldConnection);
if (getHttpExchanges().isEmpty())
{
@ -206,7 +206,7 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
getHttpClient().removeDestination(this);
}
}
else
else if (removed)
{
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually

View File

@ -43,4 +43,10 @@ public interface Connection extends Closeable
@Override
void close();
/**
* @return whether this connection has been closed
* @see #close()
*/
boolean isClosed();
}

View File

@ -99,6 +99,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
promise.succeeded(this);
}
@Override
public boolean isClosed()
{
return closed.get();
@ -226,6 +227,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
HttpConnectionOverHTTP.this.close();
}
@Override
public boolean isClosed()
{
return HttpConnectionOverHTTP.this.isClosed();
}
@Override
public String toString()
{

View File

@ -224,6 +224,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
}
}
@Override
public boolean isClosed()
{
return closed.get();
}
protected boolean closeByHTTP(HttpFields fields)
{
if (multiplexed)
@ -312,6 +318,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
HttpConnectionOverFCGI.this.close();
}
@Override
public boolean isClosed()
{
return HttpConnectionOverFCGI.this.isClosed();
}
@Override
public String toString()
{

View File

@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
@ -63,10 +64,6 @@ import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import static org.eclipse.jetty.http.HttpHeader.PROXY_AUTHENTICATE;
import static org.eclipse.jetty.http.HttpHeader.PROXY_AUTHORIZATION;
import static org.junit.Assert.assertEquals;
public class ProxyTunnellingTest
{
@Rule
@ -160,9 +157,9 @@ public class ProxyTunnellingTest
.path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
String content = response.getContentAsString();
assertEquals(body, content);
Assert.assertEquals(body, content);
}
finally
{
@ -189,9 +186,9 @@ public class ProxyTunnellingTest
.path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
.send();
assertEquals(HttpStatus.OK_200, response1.getStatus());
Assert.assertEquals(HttpStatus.OK_200, response1.getStatus());
String content = response1.getContentAsString();
assertEquals(body, content);
Assert.assertEquals(body, content);
content = "body=" + body;
ContentResponse response2 = httpClient.newRequest("localhost", serverConnector.getLocalPort())
@ -203,9 +200,9 @@ public class ProxyTunnellingTest
.content(new StringContentProvider(content))
.send();
assertEquals(HttpStatus.OK_200, response2.getStatus());
Assert.assertEquals(HttpStatus.OK_200, response2.getStatus());
content = response2.getContentAsString();
assertEquals(body, content);
Assert.assertEquals(body, content);
}
finally
{
@ -251,9 +248,9 @@ public class ProxyTunnellingTest
})
.send();
assertEquals(HttpStatus.OK_200, response1.getStatus());
Assert.assertEquals(HttpStatus.OK_200, response1.getStatus());
String content = response1.getContentAsString();
assertEquals(body1, content);
Assert.assertEquals(body1, content);
Assert.assertTrue(connectionLatch.await(5, TimeUnit.SECONDS));
@ -271,62 +268,9 @@ public class ProxyTunnellingTest
connection.get().send(request2, listener2);
ContentResponse response2 = listener2.get(5, TimeUnit.SECONDS);
assertEquals(HttpStatus.OK_200, response2.getStatus());
Assert.assertEquals(HttpStatus.OK_200, response2.getStatus());
String content2 = response1.getContentAsString();
assertEquals(body1, content2);
}
finally
{
httpClient.stop();
}
}
@Test
public void testProxyAuth() throws Exception
{
startSSLServer(new ServerHandler() {
@Override
public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException {
super.handle(target, request, httpRequest, httpResponse);
}
});
startProxy(new ConnectHandler() {
@Override
protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address) {
// validate proxy-authentication header
final String header = request.getHeader(PROXY_AUTHORIZATION.toString());
if (header == null || !header.startsWith("Basic ")) {
LOG.warn("Missing header " + PROXY_AUTHORIZATION);
// ask for authentication header
response.setHeader(PROXY_AUTHENTICATE.toString(), String.format("Basic realm=\"%s\"", "test-realm"));
return false;
} else {
LOG.info("Request contains required header " + PROXY_AUTHORIZATION);
return true;
}
}
});
HttpClient httpClient = new HttpClient(sslContextFactory);
httpClient.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyPort()));
httpClient.getAuthenticationStore().addAuthentication(
new BasicAuthentication(URI.create("http://localhost:" + proxyPort()), "test-realm", "user", "password"));
httpClient.start();
try
{
// Use a numeric host to test the URI of the CONNECT request.
String host = "127.0.0.1";
String body = "BODY";
ContentResponse response = httpClient.newRequest(host, serverConnector.getLocalPort())
.scheme(HttpScheme.HTTPS.asString())
.method(HttpMethod.GET)
.path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
String content = response.getContentAsString();
assertEquals(body, content);
Assert.assertEquals(body1, content2);
}
finally
{
@ -432,6 +376,83 @@ public class ProxyTunnellingTest
}
}
@Test
public void testProxyAuthentication() throws Exception
{
final String realm = "test-realm";
testProxyAuthentication(realm, new ConnectHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
String proxyAuth = request.getHeader(HttpHeader.PROXY_AUTHORIZATION.asString());
if (proxyAuth == null)
{
baseRequest.setHandled(true);
response.setStatus(HttpStatus.PROXY_AUTHENTICATION_REQUIRED_407);
response.setHeader(HttpHeader.PROXY_AUTHENTICATE.asString(), "Basic realm=\"" + realm + "\"");
return;
}
super.handle(target, baseRequest, request, response);
}
});
}
@Test
public void testProxyAuthenticationClosesConnection() throws Exception
{
final String realm = "test-realm";
testProxyAuthentication(realm, new ConnectHandler()
{
@Override
protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address)
{
final String header = request.getHeader(HttpHeader.PROXY_AUTHORIZATION.toString());
if (header == null || !header.startsWith("Basic "))
{
response.setHeader(HttpHeader.PROXY_AUTHENTICATE.toString(), "Basic realm=\"" + realm + "\"");
// Returning false adds Connection: close to the 407 response.
return false;
}
else
{
return true;
}
}
});
}
private void testProxyAuthentication(String realm, ConnectHandler connectHandler) throws Exception
{
startSSLServer(new ServerHandler());
startProxy(connectHandler);
HttpClient httpClient = new HttpClient(sslContextFactory);
httpClient.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyPort()));
httpClient.getAuthenticationStore().addAuthentication(
new BasicAuthentication(URI.create("http://localhost:" + proxyPort()), realm, "proxyUser", "proxyPassword"));
httpClient.start();
try
{
String host = "localhost";
String body = "BODY";
ContentResponse response = httpClient.newRequest(host, serverConnector.getLocalPort())
.scheme(HttpScheme.HTTPS.asString())
.method(HttpMethod.GET)
.path("/echo?body=" + URLEncoder.encode(body, "UTF-8"))
.send();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
String content = response.getContentAsString();
Assert.assertEquals(body, content);
}
finally
{
httpClient.stop();
}
}
@Test
@Ignore("External Proxy Server no longer stable enough for testing")
public void testExternalProxy() throws Exception
@ -461,7 +482,7 @@ public class ProxyTunnellingTest
// Use a longer timeout, sometimes the proxy takes a while to answer
.timeout(20, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
}
finally
{

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.spdy.client.http;
import java.nio.channels.AsynchronousCloseException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
@ -33,6 +34,7 @@ import org.eclipse.jetty.util.ConcurrentHashSet;
public class HttpConnectionOverSPDY extends HttpConnection
{
private final Set<HttpChannel> channels = new ConcurrentHashSet<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final Session session;
public HttpConnectionOverSPDY(HttpDestination destination, Session session)
@ -62,11 +64,20 @@ public class HttpConnectionOverSPDY extends HttpConnection
@Override
public void close()
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
getHttpDestination().close(this);
session.goAway(new GoAwayInfo(), Callback.Adapter.INSTANCE);
abort(new AsynchronousCloseException());
if (closed.compareAndSet(false, true))
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
getHttpDestination().close(this);
session.goAway(new GoAwayInfo(), Callback.Adapter.INSTANCE);
abort(new AsynchronousCloseException());
}
}
@Override
public boolean isClosed()
{
return closed.get();
}
private void abort(Throwable failure)