From 738cbfdccc1b4c07eea7dd954130af7da7c90401 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 27 Oct 2011 16:37:07 +1100 Subject: [PATCH] refactored client to use upgradeable endpoint. Instert SslConnection when needed --- .../jetty/client/AbstractHttpConnection.java | 4 +- .../eclipse/jetty/client/HttpDestination.java | 8 +- .../eclipse/jetty/client/SelectConnector.java | 277 +++++++++--------- .../client/AsyncSslHttpExchangeTest.java | 10 +- .../eclipse/jetty/io/AbstractConnection.java | 4 +- .../org/eclipse/jetty/io/AsyncEndPoint.java | 7 + .../jetty/io/nio/SelectChannelEndPoint.java | 7 +- .../eclipse/jetty/io/nio/SelectorManager.java | 3 +- .../eclipse/jetty/io/nio/SslConnection.java | 26 +- .../io/nio/SelectChannelEndPointTest.java | 9 +- .../jetty/server/AsyncHttpConnection.java | 2 +- .../eclipse/jetty/server/LocalConnector.java | 3 +- .../jetty/server/bio/SocketConnector.java | 2 +- .../jetty/server/handler/ConnectHandler.java | 14 +- .../NetworkTrafficSelectChannelConnector.java | 1 + .../server/nio/SelectChannelConnector.java | 6 +- .../server/BusySelectChannelServerTest.java | 6 +- .../websocket/WebSocketClientFactory.java | 15 +- 18 files changed, 213 insertions(+), 191 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java index 0acd2b3d337..2f65309c54d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java @@ -13,7 +13,6 @@ package org.eclipse.jetty.client; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.util.Collections; @@ -354,7 +353,8 @@ public abstract class AbstractHttpConnection extends AbstractConnection implemen @Override public String toString() { - return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort(); + return "HttpConnection@" + hashCode() + "//" + + (_destination==null?"?.?.?.?:??":(_destination.getAddress().getHost() + ":" + _destination.getAddress().getPort())); } public String toDetailString() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 310854026af..63f8390d128 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -352,9 +352,9 @@ public class HttpDestination implements Dumpable else { EndPoint endPoint = connection.getEndPoint(); - if (isProxied() && endPoint instanceof SelectConnector.ProxySelectChannelEndPoint) + if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint) { - SelectConnector.ProxySelectChannelEndPoint proxyEndPoint = (SelectConnector.ProxySelectChannelEndPoint)endPoint; + SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint; HttpExchange exchange = _queue.get(0); ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange); connect.setAddress(getProxy()); @@ -668,10 +668,10 @@ public class HttpDestination implements Dumpable private class ConnectExchange extends ContentExchange { - private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint; + private final SelectConnector.UpgradableEndPoint proxyEndPoint; private final HttpExchange exchange; - public ConnectExchange(Address serverAddress, SelectConnector.ProxySelectChannelEndPoint proxyEndPoint, HttpExchange exchange) + public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint, HttpExchange exchange) { this.proxyEndPoint = proxyEndPoint; this.exchange = exchange; diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java index 92558d4e1eb..c9aff5a10f1 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java @@ -24,23 +24,23 @@ import java.util.concurrent.ConcurrentHashMap; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLSession; -import org.eclipse.jetty.http.HttpGenerator; -import org.eclipse.jetty.http.HttpParser; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.Buffers.Type; import org.eclipse.jetty.io.BuffersFactory; import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; -import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.nio.AsyncConnection; import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.io.nio.SelectorManager; +import org.eclipse.jetty.io.nio.SslConnection; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.Timeout; +import org.eclipse.jetty.util.thread.Timeout.Task; class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector { @@ -49,8 +49,7 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector private final HttpClient _httpClient; private final Manager _selectorManager=new Manager(); private final Map _connectingChannels = new ConcurrentHashMap(); - private Buffers _sslBuffers; - + /** * @param httpClient the HttpClient this connector is associated to */ @@ -65,16 +64,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector { super.doStart(); - - final boolean direct=_httpClient.getUseDirectBuffers(); - - SSLEngine sslEngine=_selectorManager.newSslEngine(null); - final SSLSession ssl_session=sslEngine.getSession(); - _sslBuffers = BuffersFactory.newBuffers( - direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(), - direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(), - direct?Type.DIRECT:Type.INDIRECT,1024); - _selectorManager.start(); } @@ -129,6 +118,8 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector /* ------------------------------------------------------------ */ class Manager extends SelectorManager { + Logger LOG = SelectConnector.LOG; + @Override public boolean dispatch(Runnable task) { @@ -151,11 +142,8 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector } @Override - protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint) + public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) { - if (endpoint instanceof SslSelectChannelEndPoint) - return new AsyncHttpConnection(_sslBuffers,_sslBuffers,endpoint); - return new AsyncHttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint); } @@ -172,32 +160,29 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector // key should have destination at this point (will be replaced by endpoint after this call) HttpDestination dest=(HttpDestination)key.attachment(); - SelectChannelEndPoint ep=null; + AsyncEndPoint ep=null; + SelectChannelEndPoint scep= new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout()); + ep = scep; + if (dest.isSecure()) { - if (dest.isProxied()) - { - SSLEngine engine=newSslEngine(channel); - ep = new ProxySelectChannelEndPoint(channel, selectSet, key, _sslBuffers, engine, (int)_httpClient.getIdleTimeout()); - } - else - { - SSLEngine engine=newSslEngine(channel); - SslSelectChannelEndPoint sslEp = new SslSelectChannelEndPoint(_sslBuffers, channel, selectSet, key, engine, (int)_httpClient.getIdleTimeout()); - sslEp.setAllowRenegotiate(_httpClient.getSslContextFactory().isAllowRenegotiate()); - ep = sslEp; - } + LOG.debug("secure to {}, proxied={}",channel,dest.isProxied()); + ep = new UpgradableEndPoint(ep,newSslEngine(channel)); } - else - { - ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout()); - } - - AbstractHttpConnection connection=(AbstractHttpConnection)ep.getConnection(); - connection.setDestination(dest); - dest.onNewConnection(connection); - return ep; + + AsyncConnection connection = selectSet.getManager().newConnection(channel,ep, key.attachment()); + ep.setConnection(connection); + + AbstractHttpConnection httpConnection=(AbstractHttpConnection)connection; + httpConnection.setDestination(dest); + + if (dest.isSecure() && !dest.isProxied()) + ((UpgradableEndPoint)ep).upgrade(); + + dest.onNewConnection(httpConnection); + + return scep; } private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException @@ -268,204 +253,206 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector } } } - - /** - * An endpoint that is able to "upgrade" from a normal endpoint to a SSL endpoint. - * Since {@link HttpParser} and {@link HttpGenerator} only depend on the {@link EndPoint} - * interface, this class overrides all methods of {@link EndPoint} to provide the right - * behavior depending on the fact that it has been upgraded or not. - */ - public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint + + public static class UpgradableEndPoint implements AsyncEndPoint { - private final SelectChannelEndPoint plainEndPoint; - private volatile boolean upgraded = false; - - public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException + AsyncEndPoint _endp; + SSLEngine _engine; + + public UpgradableEndPoint(AsyncEndPoint endp, SSLEngine engine) throws IOException { - super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout); - this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout); + _engine=engine; + _endp=endp; } public void upgrade() { - upgraded = true; + AsyncHttpConnection connection = (AsyncHttpConnection) ((SelectChannelEndPoint)_endp).getConnection(); + + SslConnection sslConnection = new SslConnection(_engine,_endp); + ((SelectChannelEndPoint)_endp).setConnection(sslConnection); + + _endp=sslConnection.getSslEndPoint(); + sslConnection.setConnection(connection); + + LOG.debug("upgrade {} to {} for {}",this,sslConnection,connection); + } + + + public Connection getConnection() + { + return _endp.getConnection(); + } + + public void setConnection(Connection connection) + { + _endp.setConnection(connection); } public void shutdownOutput() throws IOException { - if (upgraded) - super.shutdownOutput(); - else - plainEndPoint.shutdownOutput(); + _endp.shutdownOutput(); + } + + public void asyncDispatch() + { + _endp.asyncDispatch(); + } + + public boolean isOutputShutdown() + { + return _endp.isOutputShutdown(); + } + + public void shutdownInput() throws IOException + { + _endp.shutdownInput(); + } + + public void scheduleWrite() + { + _endp.scheduleWrite(); + } + + public boolean isInputShutdown() + { + return _endp.isInputShutdown(); } public void close() throws IOException { - if (upgraded) - super.close(); - else - plainEndPoint.close(); + _endp.close(); + } + + public void scheduleIdle() + { + _endp.scheduleIdle(); } public int fill(Buffer buffer) throws IOException { - if (upgraded) - return super.fill(buffer); - else - return plainEndPoint.fill(buffer); + return _endp.fill(buffer); + } + + public void cancelIdle() + { + _endp.cancelIdle(); + } + + public boolean isWritable() + { + return _endp.isWritable(); + } + + public boolean hasProgressed() + { + return _endp.hasProgressed(); } public int flush(Buffer buffer) throws IOException { - if (upgraded) - return super.flush(buffer); - else - return plainEndPoint.flush(buffer); + return _endp.flush(buffer); + } + + public void scheduleTimeout(Task task, long timeoutMs) + { + _endp.scheduleTimeout(task,timeoutMs); + } + + public void cancelTimeout(Task task) + { + _endp.cancelTimeout(task); } public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException { - if (upgraded) - return super.flush(header, buffer, trailer); - else - return plainEndPoint.flush(header, buffer, trailer); + return _endp.flush(header,buffer,trailer); } public String getLocalAddr() { - if (upgraded) - return super.getLocalAddr(); - else - return plainEndPoint.getLocalAddr(); + return _endp.getLocalAddr(); } public String getLocalHost() { - if (upgraded) - return super.getLocalHost(); - else - return plainEndPoint.getLocalHost(); + return _endp.getLocalHost(); } public int getLocalPort() { - if (upgraded) - return super.getLocalPort(); - else - return plainEndPoint.getLocalPort(); + return _endp.getLocalPort(); } public String getRemoteAddr() { - if (upgraded) - return super.getRemoteAddr(); - else - return plainEndPoint.getRemoteAddr(); + return _endp.getRemoteAddr(); } public String getRemoteHost() { - if (upgraded) - return super.getRemoteHost(); - else - return plainEndPoint.getRemoteHost(); + return _endp.getRemoteHost(); } public int getRemotePort() { - if (upgraded) - return super.getRemotePort(); - else - return plainEndPoint.getRemotePort(); + return _endp.getRemotePort(); } public boolean isBlocking() { - if (upgraded) - return super.isBlocking(); - else - return plainEndPoint.isBlocking(); + return _endp.isBlocking(); } public boolean isBufferred() { - if (upgraded) - return super.isBufferred(); - else - return plainEndPoint.isBufferred(); + return _endp.isBufferred(); } public boolean blockReadable(long millisecs) throws IOException { - if (upgraded) - return super.blockReadable(millisecs); - else - return plainEndPoint.blockReadable(millisecs); + return _endp.blockReadable(millisecs); } public boolean blockWritable(long millisecs) throws IOException { - if (upgraded) - return super.blockWritable(millisecs); - else - return plainEndPoint.blockWritable(millisecs); + return _endp.blockWritable(millisecs); } public boolean isOpen() { - if (upgraded) - return super.isOpen(); - else - return plainEndPoint.isOpen(); + return _endp.isOpen(); } public Object getTransport() { - if (upgraded) - return super.getTransport(); - else - return plainEndPoint.getTransport(); + return _endp.getTransport(); } public boolean isBufferingInput() { - if (upgraded) - return super.isBufferingInput(); - else - return plainEndPoint.isBufferingInput(); + return _endp.isBufferingInput(); } public boolean isBufferingOutput() { - if (upgraded) - return super.isBufferingOutput(); - else - return plainEndPoint.isBufferingOutput(); + return _endp.isBufferingOutput(); } public void flush() throws IOException { - if (upgraded) - super.flush(); - else - plainEndPoint.flush(); - + _endp.flush(); } public int getMaxIdleTime() { - if (upgraded) - return super.getMaxIdleTime(); - else - return plainEndPoint.getMaxIdleTime(); + return _endp.getMaxIdleTime(); } public void setMaxIdleTime(int timeMs) throws IOException { - if (upgraded) - super.setMaxIdleTime(timeMs); - else - plainEndPoint.setMaxIdleTime(timeMs); + _endp.setMaxIdleTime(timeMs); } + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/AsyncSslHttpExchangeTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/AsyncSslHttpExchangeTest.java index ce739426f35..661879d79d9 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/AsyncSslHttpExchangeTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/AsyncSslHttpExchangeTest.java @@ -30,11 +30,11 @@ public class AsyncSslHttpExchangeTest extends SslHttpExchangeTest _port = _server.getConnectors()[0].getLocalPort(); } - @Override - public void testPerf() throws Exception - { - sender(10,true); - } + @Override + public void testGetWithContentExchange() throws Exception + { + super.testGetWithContentExchange(); + } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index 1062b7df487..088102b9b65 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -11,7 +11,7 @@ public abstract class AbstractConnection implements Connection private static final Logger LOG = Log.getLogger(AbstractConnection.class); private final long _timeStamp; - public final EndPoint _endp; // TODO make private + protected final EndPoint _endp; public AbstractConnection(EndPoint endp) { @@ -59,6 +59,6 @@ public abstract class AbstractConnection implements Connection public String toString() { - return super.toString()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort(); + return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort(); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java index 0910d0fe349..1571ed9dafa 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java @@ -51,6 +51,13 @@ public interface AsyncEndPoint extends EndPoint public boolean hasProgressed(); + /* ------------------------------------------------------------ */ + public Connection getConnection(); + + /* ------------------------------------------------------------ */ + public void setConnection(Connection connection); + + /* ------------------------------------------------------------ */ /** */ diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java index ef6a7f56cdf..da2fc0a21f2 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @@ -94,10 +94,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo _open=true; _key = key; - _connection = _manager.newConnection(channel,this); - scheduleIdle(); - } /* ------------------------------------------------------------ */ @@ -124,10 +121,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo /* ------------------------------------------------------------ */ public void setConnection(Connection connection) { - // TODO Only needed for local connection Connection old=_connection; _connection=(AsyncConnection)connection; - _manager.endPointUpgraded(this,old); + if (old!=null && old!=_connection) + _manager.endPointUpgraded(this,old); } /* ------------------------------------------------------------ */ diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java index baef1ee1327..cffc20ae791 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; @@ -337,7 +338,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection); /* ------------------------------------------------------------------------------- */ - protected abstract AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint); + public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); /* ------------------------------------------------------------ */ /** diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java index 499b8218eaa..f6b5e508de5 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java @@ -59,12 +59,14 @@ public class SslConnection extends AbstractConnection implements AsyncConnection private boolean _allowRenegotiate=true; private boolean _handshook; - + + /* ------------------------------------------------------------ */ public SslConnection(SSLEngine engine,EndPoint endp) { this(engine,endp,System.currentTimeMillis()); } - + + /* ------------------------------------------------------------ */ public SslConnection(SSLEngine engine,EndPoint endp, long timeStamp) { super(endp,timeStamp); @@ -72,12 +74,14 @@ public class SslConnection extends AbstractConnection implements AsyncConnection _session=_engine.getSession(); _aEndp=(AsyncEndPoint)endp; } - + + /* ------------------------------------------------------------ */ public synchronized void setConnection(AsyncConnection connection) { _connection=connection; } - + + /* ------------------------------------------------------------ */ public synchronized AsyncConnection getConnection() { return _connection; @@ -478,8 +482,8 @@ public class SslConnection extends AbstractConnection implements AsyncConnection throw new IOException(result.toString()); } - if (LOG.isDebugEnabled() && result.bytesProduced()>0) - LOG.debug("{} unwrapped '{}'",_session,buffer); + //if (LOG.isDebugEnabled() && result.bytesProduced()>0) + // LOG.debug("{} unwrapped '{}'",_session,buffer); return result.bytesConsumed()>0 || result.bytesProduced()>0; } @@ -721,6 +725,16 @@ public class SslConnection extends AbstractConnection implements AsyncConnection _aEndp.setMaxIdleTime(timeMs); } + public Connection getConnection() + { + return _connection; + } + + public void setConnection(Connection connection) + { + _connection=(AsyncConnection)connection; + } + } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointTest.java index 6bb97fbbcb9..9c0319071f9 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointTest.java @@ -11,6 +11,7 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.thread.QueuedThreadPool; @@ -48,15 +49,17 @@ public class SelectChannelEndPointTest } @Override - protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint) + public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) { return SelectChannelEndPointTest.this.newConnection(channel,endpoint); } @Override - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException + protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException { - return new SelectChannelEndPoint(channel,selectSet,sKey,2000); + SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000); + endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); + return endp; } }; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java index 20aacec2957..e6e5608c37c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java @@ -26,7 +26,7 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async super(connector,endpoint,server); _asyncEndp=(AsyncEndPoint)endpoint; } - + public Connection handle() throws IOException { Connection connection = this; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index 4840351ba89..149b1bfbc19 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -110,7 +110,8 @@ public class LocalConnector extends AbstractConnector @Override public void setConnection(Connection connection) { - connectionUpgraded(getConnection(),connection); + if (getConnection()!=null && connection!=getConnection()) + connectionUpgraded(getConnection(),connection); super.setConnection(connection); } }; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java index 6695a85807c..5066606567a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java @@ -193,7 +193,7 @@ public class SocketConnector extends AbstractConnector public void setConnection(Connection connection) { - if (_connection!=connection) + if (_connection!=connection && _connection!=null) connectionUpgraded(_connection,connection); _connection=connection; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java index 92bbcf46cbd..e6144f81465 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java @@ -17,6 +17,7 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpMethods; import org.eclipse.jetty.http.HttpParser; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; @@ -422,17 +423,18 @@ public class ConnectHandler extends HandlerWrapper private class Manager extends SelectorManager { @Override - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException + protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException { - SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, channel.socket().getSoTimeout()); + SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, key, channel.socket().getSoTimeout()); + endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); endp.setMaxIdleTime(_writeTimeout); return endp; } @Override - protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint) + public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) { - ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment(); + ProxyToServerConnection proxyToServer = (ProxyToServerConnection)attachment; proxyToServer.setTimeStamp(System.currentTimeMillis()); proxyToServer.setEndPoint(endpoint); return proxyToServer; @@ -472,7 +474,7 @@ public class ConnectHandler extends HandlerWrapper private volatile Buffer _data; private volatile ClientToProxyConnection _toClient; private volatile long _timestamp; - private volatile SelectChannelEndPoint _endPoint; + private volatile AsyncEndPoint _endPoint; public ProxyToServerConnection(ConcurrentMap context, Buffer data) { @@ -589,7 +591,7 @@ public class ConnectHandler extends HandlerWrapper _timestamp = timestamp; } - public void setEndPoint(SelectChannelEndPoint endpoint) + public void setEndPoint(AsyncEndPoint endpoint) { _endPoint = endpoint; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java index efea044745b..072de4cf778 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java @@ -54,6 +54,7 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key) throws IOException { NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, _maxIdleTime, listeners); + endPoint.setConnection(selectSet.getManager().newConnection(channel,endPoint, key.attachment())); endPoint.notifyOpened(); return endPoint; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java index 887d773e929..bebe450a8ae 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java @@ -278,7 +278,9 @@ public class SelectChannelConnector extends AbstractNIOConnector /* ------------------------------------------------------------ */ protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException { - return new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime); + SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime); + endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); + return endp; } /* ------------------------------------------------------------------------------- */ @@ -353,7 +355,7 @@ public class SelectChannelConnector extends AbstractNIOConnector } @Override - protected AsyncConnection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint) + public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment) { return SelectChannelConnector.this.newConnection(channel,endpoint); } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BusySelectChannelServerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BusySelectChannelServerTest.java index 1f6c6852a8f..b280387283d 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/BusySelectChannelServerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BusySelectChannelServerTest.java @@ -16,8 +16,10 @@ import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.View; +import org.eclipse.jetty.io.nio.AsyncConnection; import org.eclipse.jetty.io.nio.IndirectNIOBuffer; import org.eclipse.jetty.io.nio.NIOBuffer; import org.eclipse.jetty.io.nio.SelectChannelEndPoint; @@ -38,7 +40,7 @@ public class BusySelectChannelServerTest extends HttpServerTestBase @Override protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException { - return new SelectChannelEndPoint(channel,selectSet,key, _maxIdleTime) + SelectChannelEndPoint endp=new SelectChannelEndPoint(channel,selectSet,key, _maxIdleTime) { int write; int read; @@ -134,6 +136,8 @@ public class BusySelectChannelServerTest extends HttpServerTestBase return super.fill(buffer); } }; + endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); + return endp; } }; connector.setAcceptors(1); diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketClientFactory.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketClientFactory.java index 699d35f4cbe..1293b92c006 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketClientFactory.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketClientFactory.java @@ -10,6 +10,7 @@ import java.util.Random; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.ByteArrayBuffer; @@ -206,15 +207,17 @@ public class WebSocketClientFactory extends AggregateLifeCycle } @Override - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey sKey) throws IOException + protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException { - return new SelectChannelEndPoint(channel,selectSet,sKey,channel.socket().getSoTimeout()); + SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key,channel.socket().getSoTimeout()); + endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); + return endp; } @Override - protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint) + public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) { - WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture) endpoint.getSelectionKey().attachment(); + WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture) attachment; return new HandshakeConnection(endpoint,holder); } @@ -258,14 +261,14 @@ public class WebSocketClientFactory extends AggregateLifeCycle */ class HandshakeConnection extends AbstractConnection implements AsyncConnection { - private final SelectChannelEndPoint _endp; + private final AsyncEndPoint _endp; private final WebSocketClient.WebSocketFuture _future; private final String _key; private final HttpParser _parser; private String _accept; private String _error; - public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketClient.WebSocketFuture future) + public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future) { super(endpoint,System.currentTimeMillis()); _endp=endpoint;