From 0bca1974b799e3a85e459b0f212bb5b9faf35e93 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 19 Jun 2013 19:43:49 +0200 Subject: [PATCH] 411135 - HttpClient may send proxied https requests to the proxy instead of the target server. Made sure to always tunnel the connection if needs to be tunnelled. --- .../eclipse/jetty/client/HttpDestination.java | 154 +++++++++--------- .../jetty/client/ProxyTunnellingTest.java | 93 ++++++++++- 2 files changed, 169 insertions(+), 78 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 1181eb80592..73c85d23161 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -52,10 +52,10 @@ public class HttpDestination implements Dumpable { private static final Logger LOG = Log.getLogger(HttpDestination.class); - private final List _queue = new LinkedList(); + private final List _exchanges = new LinkedList(); private final List _connections = new LinkedList(); - private final BlockingQueue _newQueue = new ArrayBlockingQueue(10, true); - private final List _idle = new ArrayList(); + private final BlockingQueue _reservedConnections = new ArrayBlockingQueue(10, true); + private final List _idleConnections = new ArrayList(); private final HttpClient _client; private final Address _address; private final boolean _ssl; @@ -63,14 +63,12 @@ public class HttpDestination implements Dumpable private volatile int _maxConnections; private volatile int _maxQueueSize; private int _pendingConnections = 0; - private int _newConnection = 0; + private int _pendingReservedConnections = 0; private volatile Address _proxy; private Authentication _proxyAuthentication; private PathMap _authorizations; private List _cookies; - - HttpDestination(HttpClient client, Address address, boolean ssl) { _client = client; @@ -136,7 +134,7 @@ public class HttpDestination implements Dumpable { synchronized (this) { - return _idle.size(); + return _idleConnections.size(); } } @@ -185,7 +183,7 @@ public class HttpDestination implements Dumpable int totalConnections = _connections.size() + _pendingConnections; if (totalConnections < _maxConnections) { - _newConnection++; + _pendingReservedConnections++; startConnection = true; } } @@ -195,7 +193,7 @@ public class HttpDestination implements Dumpable startNewConnection(); try { - Object o = _newQueue.take(); + Object o = _reservedConnections.take(); if (o instanceof AbstractHttpConnection) { connection = (AbstractHttpConnection)o; @@ -246,15 +244,15 @@ public class HttpDestination implements Dumpable connection.close(); connection = null; } - if (_idle.size() > 0) - connection = _idle.remove(_idle.size() - 1); + if (_idleConnections.size() > 0) + connection = _idleConnections.remove(_idleConnections.size() - 1); } if (connection == null) { return null; } - + // Check if the connection was idle, // but it expired just a moment ago if (connection.cancelIdleTimeout()) @@ -291,20 +289,20 @@ public class HttpDestination implements Dumpable synchronized (this) { _pendingConnections--; - if (_newConnection > 0) + if (_pendingReservedConnections > 0) { connect_failure = throwable; - _newConnection--; + _pendingReservedConnections--; } - else if (_queue.size() > 0) + else if (_exchanges.size() > 0) { - HttpExchange ex = _queue.remove(0); + HttpExchange ex = _exchanges.remove(0); if (ex.setStatus(HttpExchange.STATUS_EXCEPTED)) ex.getEventListener().onConnectionFailed(throwable); // Since an existing connection had failed, we need to create a // connection if the queue is not empty and client is running. - if (!_queue.isEmpty() && _client.isStarted()) + if (!_exchanges.isEmpty() && _client.isStarted()) startConnection = true; } } @@ -316,7 +314,7 @@ public class HttpDestination implements Dumpable { try { - _newQueue.put(connect_failure); + _reservedConnections.put(connect_failure); } catch (InterruptedException e) { @@ -330,10 +328,10 @@ public class HttpDestination implements Dumpable synchronized (this) { _pendingConnections--; - if (_queue.size() > 0) + if (_exchanges.size() > 0) { - HttpExchange ex = _queue.remove(0); - if(ex.setStatus(HttpExchange.STATUS_EXCEPTED)) + HttpExchange ex = _exchanges.remove(0); + if (ex.setStatus(HttpExchange.STATUS_EXCEPTED)) ex.getEventListener().onException(throwable); } } @@ -341,47 +339,53 @@ public class HttpDestination implements Dumpable public void onNewConnection(final AbstractHttpConnection connection) throws IOException { - Connection q_connection = null; + Connection reservedConnection = null; synchronized (this) { _pendingConnections--; _connections.add(connection); - if (_newConnection > 0) + if (_pendingReservedConnections > 0) { - q_connection = connection; - _newConnection--; - } - else if (_queue.size() == 0) - { - connection.setIdleTimeout(); - _idle.add(connection); + reservedConnection = connection; + _pendingReservedConnections--; } else { + // Establish the tunnel if needed EndPoint endPoint = connection.getEndPoint(); if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint) { SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint; - HttpExchange exchange = _queue.get(0); - ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange); + ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint); connect.setAddress(getProxy()); + LOG.debug("Establishing tunnel to {} via {}", getAddress(), getProxy()); send(connection, connect); } else { - HttpExchange exchange = _queue.remove(0); - send(connection, exchange); + // Another connection stole the exchange that caused the creation of this connection ? + if (_exchanges.size() == 0) + { + LOG.debug("No exchanges for new connection {}", connection); + connection.setIdleTimeout(); + _idleConnections.add(connection); + } + else + { + HttpExchange exchange = _exchanges.remove(0); + send(connection, exchange); + } } } } - if (q_connection != null) + if (reservedConnection != null) { try { - _newQueue.put(q_connection); + _reservedConnections.put(reservedConnection); } catch (InterruptedException e) { @@ -414,14 +418,14 @@ public class HttpDestination implements Dumpable { synchronized (this) { - if (_queue.size() == 0) + if (_exchanges.size() == 0) { connection.setIdleTimeout(); - _idle.add(connection); + _idleConnections.add(connection); } else { - HttpExchange ex = _queue.remove(0); + HttpExchange ex = _exchanges.remove(0); send(connection, ex); } this.notifyAll(); @@ -433,7 +437,7 @@ public class HttpDestination implements Dumpable synchronized (this) { _connections.remove(connection); - if (!_queue.isEmpty()) + if (!_exchanges.isEmpty()) startConnection = true; } @@ -445,16 +449,16 @@ public class HttpDestination implements Dumpable public void returnIdleConnection(AbstractHttpConnection connection) { // TODO work out the real idle time; - long idleForMs=connection!=null&&connection.getEndPoint()!=null?connection.getEndPoint().getMaxIdleTime():-1; + long idleForMs = connection.getEndPoint() != null ? connection.getEndPoint().getMaxIdleTime() : -1; connection.onIdleExpired(idleForMs); boolean startConnection = false; synchronized (this) { - _idle.remove(connection); + _idleConnections.remove(connection); _connections.remove(connection); - if (!_queue.isEmpty() && _client.isStarted()) + if (!_exchanges.isEmpty() && _client.isStarted()) startConnection = true; } @@ -472,10 +476,9 @@ public class HttpDestination implements Dumpable for (int i = listeners.size(); i > 0; --i) { String listenerClass = listeners.get(i - 1); - try { - Class listener = Class.forName(listenerClass); + Class listener = Class.forName(listenerClass); Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class); HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex); ex.setEventListener(elistener); @@ -484,7 +487,9 @@ public class HttpDestination implements Dumpable { throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass) { - {initCause(e);} + { + initCause(e); + } }; } } @@ -549,10 +554,10 @@ public class HttpDestination implements Dumpable boolean startConnection = false; synchronized (this) { - if (_queue.size() == _maxQueueSize) + if (_exchanges.size() == _maxQueueSize) throw new RejectedExecutionException("Queue full for address " + _address); - _queue.add(ex); + _exchanges.add(ex); if (_connections.size() + _pendingConnections < _maxConnections) startConnection = true; } @@ -568,7 +573,7 @@ public class HttpDestination implements Dumpable // destination queue, make sure it is removed synchronized (this) { - _queue.remove(exchange); + _exchanges.remove(exchange); } } @@ -581,7 +586,7 @@ public class HttpDestination implements Dumpable if (!connection.send(exchange)) { if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION) - _queue.add(0, exchange); + _exchanges.add(0, exchange); returnIdleConnection(connection); } } @@ -590,7 +595,7 @@ public class HttpDestination implements Dumpable @Override public synchronized String toString() { - return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n",hashCode(),_address.getHost(),_address.getPort(),_connections.size(),_maxConnections,_idle.size(),_queue.size(),_maxQueueSize); + return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n", hashCode(), _address.getHost(), _address.getPort(), _connections.size(), _maxConnections, _idleConnections.size(), _exchanges.size(), _maxQueueSize); } public synchronized String toDetailString() @@ -603,7 +608,7 @@ public class HttpDestination implements Dumpable for (AbstractHttpConnection connection : _connections) { b.append(connection.toDetailString()); - if (_idle.contains(connection)) + if (_idleConnections.contains(connection)) b.append(" IDLE"); b.append('\n'); } @@ -650,39 +655,33 @@ public class HttpDestination implements Dumpable } } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.util.component.Dumpable#dump() - */ public String dump() { return AggregateLifeCycle.dump(this); } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.util.component.Dumpable#dump(java.lang.Appendable, java.lang.String) - */ public void dump(Appendable out, String indent) throws IOException { synchronized (this) { - out.append(String.valueOf(this)+"idle="+_idle.size()+" pending="+_pendingConnections).append("\n"); - AggregateLifeCycle.dump(out,indent,_connections); + out.append(String.valueOf(this)); + out.append("idle="); + out.append(String.valueOf(_idleConnections.size())); + out.append(" pending="); + out.append(String.valueOf(_pendingConnections)); + out.append("\n"); + AggregateLifeCycle.dump(out, indent, _connections); } } private class ConnectExchange extends ContentExchange { private final SelectConnector.UpgradableEndPoint proxyEndPoint; - private final HttpExchange exchange; - public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint, HttpExchange exchange) + public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint) { this.proxyEndPoint = proxyEndPoint; - this.exchange = exchange; setMethod(HttpMethods.CONNECT); - setVersion(exchange.getVersion()); String serverHostAndPort = serverAddress.toString(); setRequestURI(serverHostAndPort); addRequestHeader(HttpHeaders.HOST, serverHostAndPort); @@ -698,13 +697,13 @@ public class HttpDestination implements Dumpable { proxyEndPoint.upgrade(); } - else if(responseStatus == HttpStatus.GATEWAY_TIMEOUT_504) + else if (responseStatus == HttpStatus.GATEWAY_TIMEOUT_504) { onExpire(); } else { - onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() +":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus + " while trying to request: " + exchange.getAddress().toString())); + onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() + ":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus)); } } @@ -717,18 +716,27 @@ public class HttpDestination implements Dumpable @Override protected void onException(Throwable x) { - _queue.remove(exchange); - if (exchange.setStatus(STATUS_EXCEPTED)) + HttpExchange exchange = null; + synchronized (HttpDestination.this) + { + if (!_exchanges.isEmpty()) + exchange = _exchanges.remove(0); + } + if (exchange != null && exchange.setStatus(STATUS_EXCEPTED)) exchange.getEventListener().onException(x); } @Override protected void onExpire() { - _queue.remove(exchange); - if (exchange.setStatus(STATUS_EXPIRED)) + HttpExchange exchange = null; + synchronized (HttpDestination.this) + { + if (!_exchanges.isEmpty()) + exchange = _exchanges.remove(0); + } + if (exchange != null && exchange.setStatus(STATUS_EXPIRED)) exchange.getEventListener().onExpire(); } - } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ProxyTunnellingTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ProxyTunnellingTest.java index 521710c057c..550c5418ea0 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ProxyTunnellingTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ProxyTunnellingTest.java @@ -18,13 +18,12 @@ package org.eclipse.jetty.client; -import static org.junit.Assert.*; - import java.io.IOException; +import java.io.InterruptedIOException; import java.net.URLEncoder; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; @@ -32,6 +31,7 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpHeaders; import org.eclipse.jetty.http.HttpMethods; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.io.ByteArrayBuffer; import org.eclipse.jetty.server.Connector; @@ -45,8 +45,12 @@ import org.eclipse.jetty.server.ssl.SslSelectChannelConnector; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.After; +import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class ProxyTunnellingTest { private Server server; @@ -113,7 +117,7 @@ public class ProxyTunnellingTest } @Test - public void testOneMessageSSL() throws Exception + public void testOneExchangeViaSSL() throws Exception { startSSLServer(new ServerHandler()); startProxy(); @@ -141,7 +145,7 @@ public class ProxyTunnellingTest } @Test - public void testTwoMessagesSSL() throws Exception + public void testTwoExchangesViaSSL() throws Exception { startSSLServer(new ServerHandler()); startProxy(); @@ -181,6 +185,85 @@ public class ProxyTunnellingTest } } + @Test + public void testTwoConcurrentExchangesViaSSL() throws Exception + { + startSSLServer(new ServerHandler()); + startProxy(); + + final HttpClient httpClient = new HttpClient(); + httpClient.setProxy(new Address("localhost", proxyPort())); + httpClient.start(); + + try + { + final AtomicReference connection = new AtomicReference(); + final CountDownLatch connectionLatch = new CountDownLatch(1); + ContentExchange exchange1 = new ContentExchange(true) + { + @Override + protected void onRequestCommitted() throws IOException + { + // Simulate the concurrent send of a second exchange which + // triggers the opening of a second connection but then + // it's "stolen" by the first connection, so that the + // second connection is put into the idle connections. + + HttpDestination destination = httpClient.getDestination(new Address("localhost", serverConnector.getLocalPort()), true); + destination.startNewConnection(); + + // Wait until we have the new connection + AbstractHttpConnection httpConnection = null; + while (httpConnection == null) + { + try + { + Thread.sleep(10); + httpConnection = destination.getIdleConnection(); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + + connection.set(httpConnection); + connectionLatch.countDown(); + } + }; + exchange1.setMethod(HttpMethods.GET); + String body1 = "BODY"; + exchange1.setURL("https://localhost:" + serverConnector.getLocalPort() + "/echo?body=" + URLEncoder.encode(body1, "UTF-8")); + + httpClient.send(exchange1); + assertEquals(HttpExchange.STATUS_COMPLETED, exchange1.waitForDone()); + assertEquals(HttpStatus.OK_200, exchange1.getResponseStatus()); + String content1 = exchange1.getResponseContent(); + assertEquals(body1, content1); + + Assert.assertTrue(connectionLatch.await(5, TimeUnit.SECONDS)); + + ContentExchange exchange2 = new ContentExchange(true); + exchange2.setMethod(HttpMethods.POST); + exchange2.setURL("https://localhost:" + serverConnector.getLocalPort() + "/echo"); + exchange2.setRequestHeader(HttpHeaders.CONTENT_TYPE, MimeTypes.FORM_ENCODED); + String body2 = "body=" + body1; + exchange2.setRequestHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(body2.length())); + exchange2.setRequestContent(new ByteArrayBuffer(body2, "UTF-8")); + + // Make sure the second connection can send the exchange via the tunnel + connection.get().send(exchange2); + assertEquals(HttpExchange.STATUS_COMPLETED, exchange2.waitForDone()); + assertEquals(HttpStatus.OK_200, exchange2.getResponseStatus()); + String content2 = exchange2.getResponseContent(); + assertEquals(body1, content2); + } + finally + { + httpClient.stop(); + } + } + @Test public void testProxyDown() throws Exception {