From dfab993fcbd97deb523468bcd513bfecaacedac3 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 4 Apr 2012 13:47:46 +1000 Subject: [PATCH] jetty-9 SSL partially working --- .../eclipse/jetty/io/AbstractConnection.java | 70 --- .../io/AbstractSelectableConnection.java | 226 ++++++++ .../eclipse/jetty/io/ByteArrayEndPoint.java | 41 +- .../jetty/io/{nio => }/ChannelEndPoint.java | 70 +-- .../jetty/io/{nio => }/Connection.java | 27 +- .../java/org/eclipse/jetty/io/EndPoint.java | 24 - .../NetworkTrafficSelectChannelEndPoint.java | 3 +- .../io/{nio => }/SelectChannelEndPoint.java | 230 +++++--- .../jetty/io/SelectableConnection.java | 28 + .../eclipse/jetty/io/SelectableEndPoint.java | 38 ++ .../jetty/io/{nio => }/SelectorManager.java | 14 +- .../jetty/io/{nio => }/SslConnection.java | 548 +++++++++--------- .../jetty/io/{bio => }/StringEndPoint.java | 3 +- .../jetty/io/{nio => }/BufferUtilTest.java | 2 +- .../io/{nio => }/ChannelEndPointTest.java | 4 +- .../eclipse/jetty/io/{nio => }/NIOTest.java | 2 +- .../SelectChannelEndPointSslTest.java | 12 +- .../{nio => }/SelectChannelEndPointTest.java | 136 +++-- .../jetty/server/AbstractConnector.java | 6 +- .../eclipse/jetty/server/HttpConnection.java | 6 +- .../eclipse/jetty/server/LocalConnector.java | 2 +- .../jetty/server/handler/ConnectHandler.java | 7 +- .../NetworkTrafficSelectChannelConnector.java | 9 +- .../server/nio/SelectChannelConnector.java | 13 +- .../server/ssl/SslSelectChannelConnector.java | 8 +- .../jetty/server/ConnectorTimeoutTest.java | 6 +- .../server/SelectChannelAsyncContextTest.java | 8 +- .../org/eclipse/jetty/util/BufferUtil.java | 14 +- 28 files changed, 906 insertions(+), 651 deletions(-) delete mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/AbstractSelectableConnection.java rename jetty-io/src/main/java/org/eclipse/jetty/io/{nio => }/ChannelEndPoint.java (90%) rename jetty-io/src/main/java/org/eclipse/jetty/io/{nio => }/Connection.java (63%) rename jetty-io/src/main/java/org/eclipse/jetty/io/{nio => }/NetworkTrafficSelectChannelEndPoint.java (97%) rename jetty-io/src/main/java/org/eclipse/jetty/io/{nio => }/SelectChannelEndPoint.java (66%) create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/SelectableEndPoint.java rename jetty-io/src/main/java/org/eclipse/jetty/io/{nio => }/SelectorManager.java (98%) rename jetty-io/src/main/java/org/eclipse/jetty/io/{nio => }/SslConnection.java (58%) rename jetty-io/src/main/java/org/eclipse/jetty/io/{bio => }/StringEndPoint.java (96%) rename jetty-io/src/test/java/org/eclipse/jetty/io/{nio => }/BufferUtilTest.java (97%) rename jetty-io/src/test/java/org/eclipse/jetty/io/{nio => }/ChannelEndPointTest.java (93%) rename jetty-io/src/test/java/org/eclipse/jetty/io/{nio => }/NIOTest.java (99%) rename jetty-io/src/test/java/org/eclipse/jetty/io/{nio => }/SelectChannelEndPointSslTest.java (93%) rename jetty-io/src/test/java/org/eclipse/jetty/io/{nio => }/SelectChannelEndPointTest.java (81%) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java deleted file mode 100644 index 336d78a0033..00000000000 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ /dev/null @@ -1,70 +0,0 @@ -package org.eclipse.jetty.io; - -import java.io.IOException; - -import org.eclipse.jetty.io.nio.Connection; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; - - -public abstract class AbstractConnection implements Connection -{ - private static final Logger LOG = Log.getLogger(AbstractConnection.class); - - private final long _timeStamp; - protected final EndPoint _endp; - - public AbstractConnection(EndPoint endp) - { - _endp=endp; - _timeStamp = System.currentTimeMillis(); - } - - public AbstractConnection(EndPoint endp,long timestamp) - { - _endp=endp; - _timeStamp = timestamp; - } - - - @Override - public EndPoint getEndPoint() - { - return _endp; - } - - public long getTimeStamp() - { - return _timeStamp; - } - - public void onIdleExpired(long idleForMs) - { - try - { - LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp); - if (_endp.isInputShutdown() || _endp.isOutputShutdown()) - _endp.close(); - else - _endp.shutdownOutput(); - } - catch(IOException e) - { - LOG.ignore(e); - - try - { - _endp.close(); - } - catch(IOException e2) - { - LOG.ignore(e2); - } - } - } - - public String toString() - { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); - } -} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractSelectableConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractSelectableConnection.java new file mode 100644 index 00000000000..4dc1a189f6f --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractSelectableConnection.java @@ -0,0 +1,226 @@ +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + + +public abstract class AbstractSelectableConnection implements SelectableConnection +{ + private static final Logger LOG = Log.getLogger(AbstractSelectableConnection.class); + + protected final SelectableEndPoint _endp; + private final long _createdTimeStamp; + private final Lock _lock=new ReentrantLock(); + private final Condition _readable=_lock.newCondition(); + private final Condition _writeable=_lock.newCondition(); + private boolean _readBlocked; + private boolean _writeBlocked; + + private final Runnable _reader=new Runnable() + { + @Override + public void run() + { + try + { + doRead(); + } + catch(Throwable th) + { + LOG.warn(th); + } + } + }; + private final Runnable _writer=new Runnable() + { + @Override + public void run() + { + try + { + doWrite(); + } + catch(Throwable th) + { + LOG.warn(th); + } + } + }; + + private volatile int _maxIdleTime=-1; + + public AbstractSelectableConnection(SelectableEndPoint endp) + { + _endp=endp; + _createdTimeStamp = System.currentTimeMillis(); + } + + @Override + public EndPoint getEndPoint() + { + return _endp; + } + + @Override + public SelectableEndPoint getSelectableEndPoint() + { + return _endp; + } + + @Override + public long getCreatedTimeStamp() + { + return _createdTimeStamp; + } + + @Override + public Runnable onReadable() + { + _lock.lock(); + try + { + if (_readBlocked) + _readable.signalAll(); + else + return _reader; + } + finally + { + _lock.unlock(); + } + return null; + } + + @Override + public Runnable onWriteable() + { + _lock.lock(); + try + { + if (_writeBlocked) + _writeable.signalAll(); + else + return _writer; + } + finally + { + _lock.unlock(); + } + return null; + } + + @Override + public boolean blockReadable() + { + _lock.lock(); + boolean readable=false; + try + { + if (_readBlocked) + throw new IllegalStateException(); + _readBlocked=true; + _endp.setReadInterested(true); + readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS); + } + catch(InterruptedException e) + { + LOG.ignore(e); + } + finally + { + if (!readable) + _endp.setReadInterested(false); + _readBlocked=false; + _lock.unlock(); + } + return readable; + } + + @Override + public boolean blockWriteable() + { + _lock.lock(); + boolean writeable=false; + try + { + if (_writeBlocked) + throw new IllegalStateException(); + _writeBlocked=true; + _endp.setWriteInterested(true); + writeable=_writeable.await(getMaxIdleTime(),TimeUnit.SECONDS); + } + catch(InterruptedException e) + { + LOG.ignore(e); + } + finally + { + if (!writeable) + _endp.setWriteInterested(false); + _writeBlocked=false; + _lock.unlock(); + } + return writeable; + } + + @Override + public void onIdleExpired(long idleForMs) + { + try + { + LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp); + if (_endp.isInputShutdown() || _endp.isOutputShutdown()) + _endp.close(); + else + _endp.shutdownOutput(); + } + catch(IOException e) + { + LOG.ignore(e); + + try + { + _endp.close(); + } + catch(IOException e2) + { + LOG.ignore(e2); + } + } + } + + protected void doRead() + { + throw new IllegalStateException(); + } + + protected void doWrite() + { + throw new IllegalStateException(); + } + + + @Override + public int getMaxIdleTime() + { + int max=_maxIdleTime; + return (max==-1)?_endp.getMaxIdleTime():max; + } + + public void setMaxIdleTime(int max) + { + _maxIdleTime=max; + } + + @Override + public String toString() + { + return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index a45cc47a97f..5147668895f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -17,7 +17,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import org.eclipse.jetty.io.nio.Connection; import org.eclipse.jetty.util.BufferUtil; @@ -36,7 +35,6 @@ public class ByteArrayEndPoint implements EndPoint protected boolean _growOutput; protected int _maxIdleTime; protected Connection _connection; - private boolean _idleCheck; /* ------------------------------------------------------------ */ /** @@ -97,6 +95,7 @@ public class ByteArrayEndPoint implements EndPoint /* * @see org.eclipse.io.EndPoint#isOpen() */ + @Override public boolean isOpen() { return !_closed; @@ -106,6 +105,7 @@ public class ByteArrayEndPoint implements EndPoint /* * @see org.eclipse.jetty.io.EndPoint#isInputShutdown() */ + @Override public boolean isInputShutdown() { return _closed; @@ -115,6 +115,7 @@ public class ByteArrayEndPoint implements EndPoint /* * @see org.eclipse.jetty.io.EndPoint#isOutputShutdown() */ + @Override public boolean isOutputShutdown() { return _closed; @@ -124,6 +125,7 @@ public class ByteArrayEndPoint implements EndPoint /* * @see org.eclipse.io.EndPoint#shutdownOutput() */ + @Override public void shutdownOutput() throws IOException { close(); @@ -133,6 +135,7 @@ public class ByteArrayEndPoint implements EndPoint /* * @see org.eclipse.io.EndPoint#shutdownInput() */ + @Override public void shutdownInput() throws IOException { close(); @@ -142,6 +145,7 @@ public class ByteArrayEndPoint implements EndPoint /* * @see org.eclipse.io.EndPoint#close() */ + @Override public void close() throws IOException { _closed=true; @@ -151,6 +155,7 @@ public class ByteArrayEndPoint implements EndPoint /* * @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer) */ + @Override public int fill(ByteBuffer buffer) throws IOException { if (_closed) @@ -165,6 +170,7 @@ public class ByteArrayEndPoint implements EndPoint /* * @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer) */ + @Override public int flush(ByteBuffer... buffers) throws IOException { if (_closed) @@ -230,6 +236,7 @@ public class ByteArrayEndPoint implements EndPoint /* * @see org.eclipse.io.EndPoint#getConnection() */ + @Override public Object getTransport() { return _inBytes; @@ -257,6 +264,7 @@ public class ByteArrayEndPoint implements EndPoint /** * @see org.eclipse.jetty.io.EndPoint#getMaxIdleTime() */ + @Override public int getMaxIdleTime() { return _maxIdleTime; @@ -266,39 +274,12 @@ public class ByteArrayEndPoint implements EndPoint /** * @see org.eclipse.jetty.io.EndPoint#setMaxIdleTime(int) */ + @Override public void setMaxIdleTime(int timeMs) throws IOException { _maxIdleTime=timeMs; } - @Override - public Connection getConnection() - { - return _connection; - } - - @Override - public void setConnection(Connection connection) - { - _connection=connection; - } - - @Override - public void onIdleExpired(long idleForMs) - { - } - - @Override - public void setCheckForIdle(boolean check) - { - _idleCheck=check; - } - - @Override - public boolean isCheckForIdle() - { - return _idleCheck; - } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java similarity index 90% rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java rename to jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index 61bb3c59d61..c15c94758e3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -11,7 +11,7 @@ // You may elect to redistribute this code under either of these licenses. // ======================================================================== -package org.eclipse.jetty.io.nio; +package org.eclipse.jetty.io; import java.io.IOException; import java.net.InetSocketAddress; @@ -23,7 +23,7 @@ import java.nio.channels.GatheringByteChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SocketChannel; -import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -43,9 +43,6 @@ public class ChannelEndPoint implements EndPoint private volatile int _maxIdleTime; private volatile boolean _ishut; private volatile boolean _oshut; - private Connection _connection; - private boolean _idleCheck; - public ChannelEndPoint(ByteChannel channel) throws IOException { @@ -90,6 +87,7 @@ public class ChannelEndPoint implements EndPoint /* * @see org.eclipse.io.EndPoint#isOpen() */ + @Override public boolean isOpen() { return _channel.isOpen(); @@ -133,6 +131,7 @@ public class ChannelEndPoint implements EndPoint /* (non-Javadoc) * @see org.eclipse.io.EndPoint#close() */ + @Override public void shutdownInput() throws IOException { shutdownChannelInput(); @@ -172,16 +171,19 @@ public class ChannelEndPoint implements EndPoint /* (non-Javadoc) * @see org.eclipse.io.EndPoint#close() */ + @Override public void shutdownOutput() throws IOException { shutdownChannelOutput(); } + @Override public boolean isOutputShutdown() { return _oshut || !_channel.isOpen() || _socket != null && _socket.isOutputShutdown(); } + @Override public boolean isInputShutdown() { return _ishut || !_channel.isOpen() || _socket != null && _socket.isInputShutdown(); @@ -190,6 +192,7 @@ public class ChannelEndPoint implements EndPoint /* (non-Javadoc) * @see org.eclipse.io.EndPoint#close() */ + @Override public void close() throws IOException { LOG.debug("close {}",this); @@ -199,17 +202,15 @@ public class ChannelEndPoint implements EndPoint /* (non-Javadoc) * @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer) */ + @Override public int fill(ByteBuffer buffer) throws IOException { if (_ishut) return -1; - int pos=buffer.position(); + int pos=BufferUtil.flipToFill(buffer); try { - buffer.position(buffer.limit()); - buffer.limit(buffer.capacity()); - int filled = _channel.read(buffer); if (filled==-1) @@ -217,23 +218,29 @@ public class ChannelEndPoint implements EndPoint return filled; } + catch(IOException e) + { + LOG.debug(e); + shutdownInput(); + return -1; + } finally { - buffer.limit(buffer.position()); - buffer.position(pos); + BufferUtil.flipToFlush(buffer,pos); } } /* (non-Javadoc) * @see org.eclipse.io.EndPoint#flush(org.eclipse.io.Buffer, org.eclipse.io.Buffer, org.eclipse.io.Buffer) */ + @Override public int flush(ByteBuffer... buffers) throws IOException { int len=0; - if (_channel instanceof GatheringByteChannel) - { - len= (int)((GatheringByteChannel)_channel).write(buffers,0,2); - } + if (buffers.length==1) + len=_channel.write(buffers[0]); + else if (buffers.length>1 && _channel instanceof GatheringByteChannel) + len= (int)((GatheringByteChannel)_channel).write(buffers,0,buffers.length); else { for (ByteBuffer b : buffers) @@ -275,6 +282,7 @@ public class ChannelEndPoint implements EndPoint } /* ------------------------------------------------------------ */ + @Override public Object getTransport() { return _channel; @@ -287,6 +295,7 @@ public class ChannelEndPoint implements EndPoint } /* ------------------------------------------------------------ */ + @Override public int getMaxIdleTime() { return _maxIdleTime; @@ -296,6 +305,7 @@ public class ChannelEndPoint implements EndPoint /** * @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int) */ + @Override public void setMaxIdleTime(int timeMs) throws IOException { //if (_socket!=null && timeMs!=_maxIdleTime) @@ -303,34 +313,4 @@ public class ChannelEndPoint implements EndPoint _maxIdleTime=timeMs; } - - @Override - public Connection getConnection() - { - return _connection; - } - - @Override - public void setConnection(Connection connection) - { - _connection=connection; - } - - @Override - public void onIdleExpired(long idleForMs) - { - } - - @Override - public void setCheckForIdle(boolean check) - { - _idleCheck=check; - } - - @Override - public boolean isCheckForIdle() - { - return _idleCheck; - } - } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/Connection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java similarity index 63% rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/Connection.java rename to jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java index 8b3c7e871c4..c4de0328ed0 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/Connection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java @@ -11,42 +11,21 @@ // You may elect to redistribute this code under either of these licenses. // ======================================================================== -package org.eclipse.jetty.io.nio; +package org.eclipse.jetty.io; -import java.io.IOException; -import org.eclipse.jetty.io.EndPoint; public interface Connection { - EndPoint getEndPoint(); - void canRead(); - void canWrite(); - - boolean isReadInterested(); - boolean isWriteInterested(); - - - void onInputShutdown() throws IOException; - - /** - * Called when the connection is closed - */ - void onClose(); - - /** - * Called when the connection idle timeout expires - * @param idleForMs TODO - */ - void onIdleExpired(long idleForMs); + int getMaxIdleTime(); /** * @return the timestamp at which the connection was created */ - long getTimeStamp(); + long getCreatedTimeStamp(); boolean isIdle(); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index f075a5f7016..b1019fdfb70 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -17,7 +17,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import org.eclipse.jetty.io.nio.Connection; /** @@ -113,28 +112,5 @@ public interface EndPoint void setMaxIdleTime(int timeMs) throws IOException; - /* ------------------------------------------------------------ */ - Connection getConnection(); - /* ------------------------------------------------------------ */ - void setConnection(Connection connection); - - - /* ------------------------------------------------------------ */ - /** Callback when idle. - *

An endpoint is idle if there has been no IO activity for - * {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true. - * @param idleForMs TODO - */ - public void onIdleExpired(long idleForMs); - - /* ------------------------------------------------------------ */ - /** Set if the endpoint should be checked for idleness - */ - public void setCheckForIdle(boolean check); - - /* ------------------------------------------------------------ */ - /** Get if the endpoint should be checked for idleness - */ - public boolean isCheckForIdle(); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/NetworkTrafficSelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java similarity index 97% rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/NetworkTrafficSelectChannelEndPoint.java rename to jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java index 09c34cc661d..46b3a2a144e 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/NetworkTrafficSelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java @@ -11,7 +11,7 @@ // You may elect to redistribute this code under either of these licenses. // ======================================================================== -package org.eclipse.jetty.io.nio; +package org.eclipse.jetty.io; import java.io.IOException; import java.nio.ByteBuffer; @@ -19,7 +19,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.List; -import org.eclipse.jetty.io.NetworkTrafficListener; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java similarity index 66% rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java rename to jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 8693a84f771..9dd159a0c71 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -11,17 +11,17 @@ // You may elect to redistribute this code under either of these licenses. // ======================================================================== -package org.eclipse.jetty.io.nio; +package org.eclipse.jetty.io; import java.io.IOException; -import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; +import org.eclipse.jetty.io.SelectorManager.SelectSet; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Timeout.Task; @@ -30,22 +30,29 @@ import org.eclipse.jetty.util.thread.Timeout.Task; /** * An Endpoint that can be scheduled by {@link SelectorManager}. */ -public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint +public class SelectChannelEndPoint extends ChannelEndPoint implements SelectableEndPoint { public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio"); + private final Lock _lock = new ReentrantLock(); private final SelectorManager.SelectSet _selectSet; private final SelectorManager _manager; private SelectionKey _key; + private boolean _selected; + private boolean _changing; + /** The desired value for {@link SelectionKey#interestOps()} */ private int _interestOps; + private boolean _ishutCalled; + /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ private boolean _open; - private volatile long _idleTimestamp; - private volatile Connection _connection; + private volatile boolean _idlecheck; + private volatile long _lastNotIdleTimestamp; + private volatile SelectableConnection _connection; /* ------------------------------------------------------------ */ public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) @@ -78,18 +85,19 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint /* ------------------------------------------------------------ */ - public void setConnection(Connection connection) + public void setSelectableConnection(SelectableConnection connection) { - Connection old=getConnection(); + Connection old=getSelectableConnection(); _connection=connection; if (old!=null && old!=connection) - _manager.endPointUpgraded(this,(Connection)old); + _manager.endPointUpgraded(this,old); } - + /* ------------------------------------------------------------ */ - public long getIdleTimestamp() + @Override + public long getLastNotIdleTimestamp() { - return _idleTimestamp; + return _lastNotIdleTimestamp; } /* ------------------------------------------------------------ */ @@ -98,9 +106,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint */ public void selected() { - final boolean can_read; - final boolean can_write; - synchronized (this) + _lock.lock(); + _selected=true; + try { // If there is no key, then do nothing if (_key == null || !_key.isValid()) @@ -108,19 +116,103 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint this.notifyAll(); return; } - - can_read=(_key.isReadable() && (_key.interestOps()|SelectionKey.OP_READ)!=0); - can_write=(_key.isWritable() && (_key.interestOps()|SelectionKey.OP_WRITE)!=0); + + boolean can_read=(_key.isReadable() && (_key.interestOps()|SelectionKey.OP_READ)!=0); + boolean can_write=(_key.isWritable() && (_key.interestOps()|SelectionKey.OP_WRITE)!=0); _interestOps=0; - _key.interestOps(0); + + if (can_read) + { + Runnable task=getSelectableConnection().onReadable(); + if (task!=null) + _manager.dispatch(task); + } + if (can_write) + { + Runnable task=getSelectableConnection().onWriteable(); + if (task!=null) + _manager.dispatch(task); + } + + if (isInputShutdown() && !_ishutCalled) + { + _ishutCalled=true; + getSelectableConnection().onInputShutdown(); + } + } + finally + { + doUpdateKey(); + _selected=false; + _lock.unlock(); } - - if (can_read) - getConnection().canRead(); - if (can_write) - getConnection().canWrite(); } + /* ------------------------------------------------------------ */ + @Override + public boolean isReadInterested() + { + _lock.lock(); + try + { + return (_interestOps&SelectionKey.OP_READ)!=0; + } + finally + { + _lock.unlock(); + } + } + + /* ------------------------------------------------------------ */ + @Override + public void setReadInterested(boolean interested) + { + _lock.lock(); + try + { + _interestOps=interested?(_interestOps|SelectionKey.OP_READ):(_interestOps&~SelectionKey.OP_READ); + if (!_selected) + updateKey(); + } + finally + { + _lock.unlock(); + } + } + + /* ------------------------------------------------------------ */ + @Override + public boolean isWriteInterested() + { + _lock.lock(); + try + { + return (_interestOps&SelectionKey.OP_READ)!=0; + } + finally + { + _lock.unlock(); + } + } + + /* ------------------------------------------------------------ */ + @Override + public void setWriteInterested(boolean interested) + { + _lock.lock(); + try + { + _interestOps=interested?(_interestOps|SelectionKey.OP_WRITE):(_interestOps&~SelectionKey.OP_WRITE); + if (!_selected) + updateKey(); + } + finally + { + _lock.unlock(); + } + } + + /* ------------------------------------------------------------ */ public void cancelTimeout(Task task) { @@ -134,46 +226,51 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint } /* ------------------------------------------------------------ */ + @Override public void setCheckForIdle(boolean check) { - _idleTimestamp=check?System.currentTimeMillis():0; + _idlecheck=true; } /* ------------------------------------------------------------ */ + @Override public boolean isCheckForIdle() { - return _idleTimestamp!=0; + return _idlecheck; } /* ------------------------------------------------------------ */ protected void notIdle() { - if (_idleTimestamp!=0) - _idleTimestamp=System.currentTimeMillis(); + _lastNotIdleTimestamp=System.currentTimeMillis(); } /* ------------------------------------------------------------ */ - public void checkIdleTimestamp(long now) + public void checkForIdle(long now) { - long idleTimestamp=_idleTimestamp; - long max_idle_time=getMaxIdleTime(); - - if (idleTimestamp!=0 && max_idle_time>0) + if (_idlecheck) { - long idleForMs=now-idleTimestamp; + long idleTimestamp=_lastNotIdleTimestamp; + long max_idle_time=getMaxIdleTime(); - if (idleForMs>max_idle_time) + if (idleTimestamp!=0 && max_idle_time>0) { - onIdleExpired(idleForMs); - _idleTimestamp=now; + long idleForMs=now-idleTimestamp; + + if (idleForMs>max_idle_time) + { + onIdleExpired(idleForMs); + _lastNotIdleTimestamp=now; + } } } } /* ------------------------------------------------------------ */ + @Override public void onIdleExpired(long idleForMs) { - getConnection().onIdleExpired(idleForMs); + getSelectableConnection().onIdleExpired(idleForMs); } /* ------------------------------------------------------------ */ @@ -199,38 +296,26 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint /* ------------------------------------------------------------ */ /** - * Updates selection key. Adds operations types to the selection key as needed. No operations - * are removed as this is only done during dispatch. This method records the new key and - * schedules a call to doUpdateKey to do the keyChange + * Updates selection key. This method schedules a call to doUpdateKey to do the keyChange */ - public void updateKey() + private void updateKey() { - final boolean changed; - synchronized (this) + int current_ops=-1; + if (getChannel().isOpen()) { - int current_ops=-1; - if (getChannel().isOpen()) + try { - Socket socket = getSocket(); - boolean read_interest = getConnection().isReadInterested() && !socket.isInputShutdown(); - boolean write_interest= getConnection().isWriteInterested() && !socket.isOutputShutdown(); - - _interestOps = (read_interest?SelectionKey.OP_READ:0)|(write_interest?SelectionKey.OP_WRITE:0); - try - { - current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); - } - catch(Exception e) - { - _key=null; - LOG.ignore(e); - } + current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); + } + catch(Exception e) + { + _key=null; + LOG.ignore(e); } - changed=_interestOps!=current_ops; } - - if(changed) + if (_interestOps!=current_ops && !_changing) { + _changing=true; _selectSet.addChange(this); _selectSet.wakeup(); } @@ -243,8 +328,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint */ void doUpdateKey() { - synchronized (this) + _lock.lock(); + try { + _changing=false; if (getChannel().isOpen()) { if (_interestOps>0) @@ -305,6 +392,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint _key = null; } } + finally + { + _lock.unlock(); + } } @@ -328,7 +419,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint updateKey(); } } - + /* ------------------------------------------------------------ */ @Override public String toString() @@ -367,7 +458,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint isOutputShutdown(), _interestOps, keyString, - getConnection()); + getSelectableConnection()); } /* ------------------------------------------------------------ */ @@ -375,12 +466,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements EndPoint { return _selectSet; } - + /* ------------------------------------------------------------ */ @Override - public Connection getConnection() + public SelectableConnection getSelectableConnection() { return _connection; } + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java new file mode 100644 index 00000000000..26362565db2 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java @@ -0,0 +1,28 @@ +package org.eclipse.jetty.io; + +public interface SelectableConnection extends Connection +{ + SelectableEndPoint getSelectableEndPoint(); + + Runnable onReadable(); + Runnable onWriteable(); + + public boolean blockReadable(); + + public boolean blockWriteable(); + + /** + * Called when the connection idle timeout expires + * @param idleForMs TODO + */ + void onIdleExpired(long idleForMs); + + void onInputShutdown(); + + /** + * Called when the connection is closed + */ + void onClose(); + + +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableEndPoint.java new file mode 100644 index 00000000000..fcdc3cb85e1 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableEndPoint.java @@ -0,0 +1,38 @@ +package org.eclipse.jetty.io; + +public interface SelectableEndPoint extends EndPoint +{ + public abstract void setWriteInterested(boolean interested); + + public abstract boolean isWriteInterested(); + + public abstract void setReadInterested(boolean interested); + + public abstract boolean isReadInterested(); + + /* ------------------------------------------------------------ */ + SelectableConnection getSelectableConnection(); + + /* ------------------------------------------------------------ */ + /** Callback when idle. + *

An endpoint is idle if there has been no IO activity for + * {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true. + * @param idleForMs TODO + */ + public void onIdleExpired(long idleForMs); + + /* ------------------------------------------------------------ */ + /** Set if the endpoint should be checked for idleness + */ + public void setCheckForIdle(boolean check); + + /* ------------------------------------------------------------ */ + /** Get if the endpoint should be checked for idleness + */ + public boolean isCheckForIdle(); + + public long getLastNotIdleTimestamp(); + + public void checkForIdle(long now); + +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java similarity index 98% rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java rename to jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index b15abb4a6ce..6523505d672 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -11,7 +11,7 @@ // You may elect to redistribute this code under either of these licenses. // ======================================================================== -package org.eclipse.jetty.io.nio; +package org.eclipse.jetty.io; import java.io.IOException; import java.nio.channels.CancelledKeyException; @@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle; @@ -338,10 +337,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa protected abstract void endPointOpened(SelectChannelEndPoint endpoint); /* ------------------------------------------------------------ */ - protected abstract void endPointUpgraded(EndPoint endpoint,Connection oldConnection); + protected abstract void endPointUpgraded(SelectChannelEndPoint endpoint,Connection oldConnection); /* ------------------------------------------------------------------------------- */ - public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment); + public abstract SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment); /* ------------------------------------------------------------ */ /** @@ -472,7 +471,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa key = channel.register(selector,SelectionKey.OP_READ,att); SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); key.attach(endpoint); - endpoint.selected(); } else if (channel.isOpen()) { @@ -487,7 +485,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa key = channel.register(selector,SelectionKey.OP_READ,null); SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); - endpoint.selected(); } else if (change instanceof ChangeTask) { @@ -703,7 +700,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { for (SelectChannelEndPoint endp:_endPoints.keySet()) { - endp.checkIdleTimestamp(idle_now); + endp.checkForIdle(idle_now); } } public String toString() {return "Idle-"+super.toString();} @@ -842,6 +839,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { LOG.debug("destroyEndPoint {}",endp); _endPoints.remove(endp); + SelectableConnection connection=endp.getSelectableConnection(); + if (connection!=null) + connection.onClose(); endPointClosed(endp); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java similarity index 58% rename from jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java rename to jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java index d20d85240ed..61aadcb5ac1 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java @@ -11,11 +11,12 @@ // You may elect to redistribute this code under either of these licenses. // ======================================================================== -package org.eclipse.jetty.io.nio; +package org.eclipse.jetty.io; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLEngine; @@ -24,8 +25,6 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLException; import javax.net.ssl.SSLSession; -import org.eclipse.jetty.io.AbstractConnection; -import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -35,27 +34,27 @@ import org.eclipse.jetty.util.log.Logger; * An AysyncConnection that acts as an interceptor between and EndPoint and another * Connection, that implements TLS encryption using an {@link SSLEngine}. *

- * The connector uses an {@link AsyncEndPoint} (like {@link SelectChannelEndPoint}) as - * it's source/sink of encrypted data. It then provides {@link #getSslEndPoint()} to + * The connector uses an {@link EndPoint} (like {@link SelectChannelEndPoint}) as + * it's source/sink of encrypted data. It then provides {@link #getAppEndPoint()} to * expose a source/sink of unencrypted data to another connection (eg HttpConnection). */ -public class SslConnection extends AbstractConnection +public class SslConnection extends AbstractSelectableConnection { - private final Logger _logger = Log.getLogger("org.eclipse.jetty.io.nio.ssl"); + private static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl"); private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0); private static final ThreadLocal __buffers = new ThreadLocal(); private final SSLEngine _engine; private final SSLSession _session; - private Connection _connection; - private final SslEndPoint _sslEndPoint; + private SelectableConnection _appConnection; + private final AppEndPoint _appEndPoint; private int _allocations; private SslBuffers _buffers; - private ByteBuffer _inbound; - private ByteBuffer _unwrapBuf; - private ByteBuffer _outbound; - private AsyncEndPoint _aEndp; + private ByteBuffer _inNet; + private ByteBuffer _inApp; + private ByteBuffer _outNet; + private SelectableEndPoint _endp; private boolean _allowRenegotiate=true; private boolean _handshook; private boolean _ishut; @@ -67,44 +66,46 @@ public class SslConnection extends AbstractConnection */ private static class SslBuffers { - final ByteBuffer _in; - final ByteBuffer _out; - final ByteBuffer _unwrap; + final ByteBuffer _inNet; + final ByteBuffer _outNet; + final ByteBuffer _inApp; SslBuffers(int packetSize, int appSize) { - _in=BufferUtil.allocateDirect(packetSize); - _out=BufferUtil.allocateDirect(packetSize); - _unwrap=BufferUtil.allocate(appSize); + _inNet=BufferUtil.allocateDirect(packetSize); + _outNet=BufferUtil.allocateDirect(packetSize); + _inApp=BufferUtil.allocate(appSize); } + } - + /* ------------------------------------------------------------ */ - public SslConnection(SSLEngine engine,AsyncEndPoint endp) + public SslConnection(SSLEngine engine,SelectableEndPoint endp) { this(engine,endp,System.currentTimeMillis()); } /* ------------------------------------------------------------ */ - public SslConnection(SSLEngine engine,AsyncEndPoint endp, long timeStamp) + public SslConnection(SSLEngine engine,SelectableEndPoint endp, long timeStamp) { - super(endp,timeStamp); + super(endp); _engine=engine; _session=_engine.getSession(); - _aEndp=(AsyncEndPoint)endp; - _sslEndPoint = newSslEndPoint(); + _endp=endp; + _appEndPoint = newAppEndPoint(); } - /* ------------------------------------------------------------ */ - protected SslEndPoint newSslEndPoint() - { - return new SslEndPoint(); - } /* ------------------------------------------------------------ */ - public EndPoint getEndPoint() + public void setAppConnection(SelectableConnection connection) + { + _appConnection=connection; + } + + /* ------------------------------------------------------------ */ + protected AppEndPoint newAppEndPoint() { - return _aEndp; + return new AppEndPoint(); } /* ------------------------------------------------------------ */ @@ -143,10 +144,10 @@ public class SslConnection extends AbstractConnection { _buffers=__buffers.get(); if (_buffers==null) - _buffers=new SslBuffers(_session.getPacketBufferSize()*2,_session.getApplicationBufferSize()*2); - _inbound=_buffers._in; - _outbound=_buffers._out; - _unwrapBuf=_buffers._unwrap; + _buffers=new SslBuffers(_session.getPacketBufferSize(),_session.getApplicationBufferSize()); + _inNet=_buffers._inNet; + _outNet=_buffers._outNet; + _inApp=_buffers._inApp; __buffers.set(null); } } @@ -161,16 +162,16 @@ public class SslConnection extends AbstractConnection if (--_allocations==0) { if (_buffers!=null && - _inbound.remaining()==0 && - _outbound.remaining()==0 && - _unwrapBuf.remaining()==0) + _inNet.remaining()==0 && + _outNet.remaining()==0 && + _inApp.remaining()==0) { - _inbound=null; - _outbound=null; - _unwrapBuf=null; - _buffers._in.clear().limit(0); - _buffers._out.clear().limit(0); - _buffers._unwrap.clear().limit(0); + _inNet=null; + _outNet=null; + _inApp=null; + _buffers._inNet.clear().limit(0); + _buffers._outNet.clear().limit(0); + _buffers._inApp.clear().limit(0); __buffers.set(_buffers); _buffers=null; @@ -178,75 +179,18 @@ public class SslConnection extends AbstractConnection } } } - - /* ------------------------------------------------------------ */ - public void canRead() throws IOException - { - try - { - allocateBuffers(); - - boolean progress=true; - - while (progress) - { - progress=false; - - // If we are handshook let the delegate connection - if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING) - progress=process(null,null); - - // handle the delegate connection - _connection.canRead(); - - _logger.debug("{} handle {} progress={}", _session, this, progress); - } - } - finally - { - releaseBuffers(); - - if (!_ishut && _sslEndPoint.isInputShutdown() && _sslEndPoint.isOpen()) - { - _ishut=true; - try - { - _connection.onInputShutdown(); - } - catch(Throwable x) - { - _logger.warn("onInputShutdown failed", x); - try{_sslEndPoint.close();} - catch(IOException e2){ - _logger.ignore(e2);} - } - } - } - } - - /* ------------------------------------------------------------ */ - public void canWrite() throws IOException - { - // TODO - } - - /* ------------------------------------------------------------ */ + @Override public boolean isIdle() { - return _connection.isIdle(); - } - - /* ------------------------------------------------------------ */ - public boolean isReadInterested() - { - return _connection.isReadInterested(); + return _appConnection.isIdle(); } /* ------------------------------------------------------------ */ + @Override public void onClose() { - _connection.onClose(); + _appConnection.onClose(); } /* ------------------------------------------------------------ */ @@ -255,71 +199,98 @@ public class SslConnection extends AbstractConnection { try { - _logger.debug("onIdleExpired {}ms on {}",idleForMs,this); + LOG.debug("onIdleExpired {}ms on {}",idleForMs,this); if (_endp.isOutputShutdown()) - _sslEndPoint.close(); + _appEndPoint.close(); else - _sslEndPoint.shutdownOutput(); + _appEndPoint.shutdownOutput(); } catch (IOException e) { - _logger.warn(e); + LOG.warn(e); super.onIdleExpired(idleForMs); } } - /* ------------------------------------------------------------ */ - public void onInputShutdown() throws IOException - { + /* ------------------------------------------------------------ */ + @Override + public void doRead() + { + try + { + allocateBuffers(); + + boolean progress=true; + while(progress) + { + progress=false; + + // Fill the input buffer with everything available + if (!BufferUtil.isFull(_inNet)) + progress|=_endp.fill(_inNet)>0; + + progress|=process(null); + + if (BufferUtil.hasContent(_inApp) && _appEndPoint.isReadInterested()) + { + progress=true; + Runnable task =_appConnection.onReadable(); + if (task!=null) + task.run(); + } + } + } + catch(IOException e) + { + LOG.warn(e); + } + finally + { + releaseBuffers(); + _endp.setReadInterested(_appEndPoint.isReadInterested()); + _endp.setWriteInterested(BufferUtil.hasContent(_outNet)); + } + } + + /* ------------------------------------------------------------ */ + @Override + public void doWrite() + { + try + { + while (BufferUtil.hasContent(_outNet)) + { + int written = _endp.flush(_outNet); + + if (written>0 && _appEndPoint.isWriteInterested()) + { + Runnable task =_appConnection.onWriteable(); + if (task!=null) + task.run(); + } + } + } + catch(IOException e) + { + LOG.warn(e); + } + finally + { + if (BufferUtil.hasContent(_outNet)) + _endp.setWriteInterested(true); + } } /* ------------------------------------------------------------ */ - private synchronized boolean process(ByteBuffer toFill, ByteBuffer toFlush) throws IOException + private synchronized boolean process(ByteBuffer appOut) throws IOException { boolean some_progress=false; try { - // We need buffers to progress - allocateBuffers(); - - // if we don't have a buffer to put received data into - if (toFill==null) - { - // use the unwrapbuffer to hold received data. - _unwrapBuf.compact().flip(); - toFill=_unwrapBuf; - } - // Else if the fill buffer is too small for the SSL session - else if (toFill.capacity()<_session.getApplicationBufferSize()) - { - // fill to the temporary unwrapBuffer - boolean progress=process(null,toFlush); - - // if we received any data, - if (BufferUtil.hasContent(_unwrapBuf)) - { - // transfer from temp buffer to fill buffer - BufferUtil.flipPutFlip(_unwrapBuf,toFill); - return true; - } - else - // return progress from recursive call - return progress; - } - // Else if there is some temporary data - else if (BufferUtil.hasContent(_unwrapBuf)) - { - // transfer from temp buffer to fill buffer - BufferUtil.flipPutFlip(_unwrapBuf,toFill); - return true; - } - - // If we are here, we have a buffer ready into which we can put some read data. - // If we have no data to flush, flush the empty buffer - if (toFlush==null) - toFlush=__ZERO_BUFFER; + if (appOut==null) + appOut=__ZERO_BUFFER; // While we are making progress processing SSL engine boolean progress=true; @@ -327,34 +298,6 @@ public class SslConnection extends AbstractConnection { progress=false; - // Do any real IO - int filled=0,flushed=0; - try - { - // Read any available data - if (!BufferUtil.isFull(_inbound) && (filled=_endp.fill(_inbound))>0) - progress = true; - else - _inbound.compact().flip(); - - // flush any output data - if (BufferUtil.hasContent(_outbound) && (flushed=_endp.flush(_outbound))>0) - { - progress = true; - _outbound.compact().flip(); - } - - } - catch (IOException e) - { - _endp.close(); - throw e; - } - finally - { - _logger.debug("{} {} {} filled={}/{} flushed={}/{}",_session,this,_engine.getHandshakeStatus(),filled,_inbound.remaining(),flushed,_outbound.remaining()); - } - // handle the current hand share status switch(_engine.getHandshakeStatus()) { @@ -364,11 +307,11 @@ public class SslConnection extends AbstractConnection case NOT_HANDSHAKING: { // Try unwrapping some application data - if (!BufferUtil.isFull(toFill) && BufferUtil.hasContent(_inbound) && unwrap(toFill)) + if (!BufferUtil.isFull(_inApp) && BufferUtil.hasContent(_inNet) && unwrap()) progress=true; // Try wrapping some application data - if (BufferUtil.hasContent(toFlush) && !BufferUtil.isFull(_outbound) && wrap(toFlush)) + if (BufferUtil.hasContent(appOut) && !BufferUtil.isFull(_outNet) && wrap(appOut)) progress=true; } break; @@ -382,7 +325,6 @@ public class SslConnection extends AbstractConnection progress=true; task.run(); } - } break; @@ -391,7 +333,7 @@ public class SslConnection extends AbstractConnection // The SSL needs to send some handshake data to the other side if (_handshook && !_allowRenegotiate) _endp.close(); - else if (wrap(toFlush)) + else if (wrap(appOut)) progress=true; } break; @@ -401,55 +343,54 @@ public class SslConnection extends AbstractConnection // The SSL needs to receive some handshake data from the other side if (_handshook && !_allowRenegotiate) _endp.close(); - else if (BufferUtil.isEmpty(_inbound)&&filled==-1) - { - // No more input coming - _endp.shutdownInput(); - } - else if (unwrap(toFill)) + else if (BufferUtil.isEmpty(_inNet) && _endp.isInputShutdown()) + _endp.close(); + else if (unwrap()) progress=true; } break; } // pass on ishut/oshut state - if (_endp.isOpen() && _endp.isInputShutdown() && BufferUtil.isEmpty(_inbound)) + if (_endp.isOpen() && _endp.isInputShutdown() && BufferUtil.isEmpty(_inNet)) _engine.closeInbound(); - if (_endp.isOpen() && _engine.isOutboundDone() && BufferUtil.isEmpty(_outbound)) + if (_endp.isOpen() && _engine.isOutboundDone() && BufferUtil.isEmpty(_outNet)) _endp.shutdownOutput(); // remember if any progress has been made some_progress|=progress; } - - // If we are reading into the temp buffer and it has some content, then we should be dispatched. - if (toFill==_unwrapBuf && BufferUtil.hasContent(_unwrapBuf)) - _aEndp.asyncDispatch(); } finally { - releaseBuffers(); if (some_progress) _progressed.set(true); } return some_progress; } - private synchronized boolean wrap(final ByteBuffer buffer) throws IOException + private synchronized boolean wrap(final ByteBuffer outApp) throws IOException { final SSLEngineResult result; - _outbound.compact(); - result=_engine.wrap(buffer,_outbound); - if (_logger.isDebugEnabled()) - _logger.debug("{} wrap {} {} consumed={} produced={}", + int pos=BufferUtil.flipToFill(_outNet); + try + { + result=_engine.wrap(outApp,_outNet); + } + finally + { + BufferUtil.flipToFlush(_outNet,pos); + } + + if (LOG.isDebugEnabled()) + LOG.debug("{} wrap {} {} consumed={} produced={}", _session, result.getStatus(), result.getHandshakeStatus(), result.bytesConsumed(), result.bytesProduced()); - _outbound.flip(); switch(result.getStatus()) { @@ -465,58 +406,64 @@ public class SslConnection extends AbstractConnection break; case CLOSED: - _logger.debug("wrap CLOSE {} {}",this,result); + LOG.debug("wrap CLOSE {} {}",this,result); if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) _endp.close(); break; default: - _logger.debug("{} wrap default {}",_session,result); + LOG.debug("{} wrap default {}",_session,result); throw new IOException(result.toString()); } - return result.bytesConsumed()>0 || result.bytesProduced()>0; + int flushed = _endp.flush(_outNet); + + return result.bytesConsumed()>0 || result.bytesProduced()>0 || flushed>0; } - private synchronized boolean unwrap(final ByteBuffer buffer) throws IOException + private synchronized boolean unwrap() throws IOException { - if (BufferUtil.isEmpty(_inbound)) + if (BufferUtil.isEmpty(_inNet)) return false; final SSLEngineResult result; + int pos = BufferUtil.flipToFill(_inApp); try { - buffer.compact(); - result=_engine.unwrap(_inbound,buffer); - buffer.flip(); - - if (_logger.isDebugEnabled()) - _logger.debug("{} unwrap {} {} consumed={} produced={}", - _session, - result.getStatus(), - result.getHandshakeStatus(), - result.bytesConsumed(), - result.bytesProduced()); - + result=_engine.unwrap(_inNet,_inApp); } catch(SSLException e) { - _logger.debug(String.valueOf(_endp), e); + LOG.debug(String.valueOf(_endp), e); _endp.close(); throw e; } - + finally + { + BufferUtil.flipToFlush(_inApp,pos); + } + + if (LOG.isDebugEnabled()) + LOG.debug("{} unwrap {} {} consumed={} produced={}", + _session, + result.getStatus(), + result.getHandshakeStatus(), + result.bytesConsumed(), + result.bytesProduced()); + switch(result.getStatus()) { case BUFFER_UNDERFLOW: - _inbound.compact().flip(); + // need to wait for more net data + _inNet.compact().flip(); if (_endp.isInputShutdown()) - _inbound.clear().limit(0); + _inNet.clear().limit(0); break; case BUFFER_OVERFLOW: - _logger.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inbound,buffer); + // need to wait until more app data has been consumed. + LOG.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inNet,_inApp); break; case OK: @@ -525,13 +472,13 @@ public class SslConnection extends AbstractConnection break; case CLOSED: - _logger.debug("unwrap CLOSE {} {}",this,result); + LOG.debug("unwrap CLOSE {} {}",this,result); if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) _endp.close(); break; default: - _logger.debug("{} wrap default {}",_session,result); + LOG.debug("{} wrap default {}",_session,result); throw new IOException(result.toString()); } @@ -542,42 +489,54 @@ public class SslConnection extends AbstractConnection } /* ------------------------------------------------------------ */ - public AsyncEndPoint getSslEndPoint() + @Override + public void onInputShutdown() + { + } + + /* ------------------------------------------------------------ */ + public SelectableEndPoint getAppEndPoint() { - return _sslEndPoint; + return _appEndPoint; } /* ------------------------------------------------------------ */ + @Override public String toString() { - return String.format("%s %s", super.toString(), _sslEndPoint); + return String.format("%s %s", super.toString(), _appEndPoint); } /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ - public class SslEndPoint implements AsyncEndPoint + public class AppEndPoint implements SelectableEndPoint { + boolean _readInterested=true; + boolean _writeInterested; + public SSLEngine getSslEngine() { return _engine; } - public AsyncEndPoint getIoEndPoint() + public EndPoint getIoEndPoint() { - return _aEndp; + return _endp; } + @Override public void shutdownOutput() throws IOException { synchronized (SslConnection.this) { - _logger.debug("{} ssl endp.oshut {}",_session,this); + LOG.debug("{} ssl endp.oshut {}",_session,this); _engine.closeOutbound(); _oshut=true; } flush(); } + @Override public boolean isOutputShutdown() { synchronized (SslConnection.this) @@ -586,34 +545,44 @@ public class SslConnection extends AbstractConnection } } + @Override public void shutdownInput() throws IOException { - _logger.debug("{} ssl endp.ishut!",_session); + LOG.debug("{} ssl endp.ishut!",_session); // We do not do a closeInput here, as SSL does not support half close. // isInputShutdown works it out itself from buffer state and underlying endpoint state. } + @Override public boolean isInputShutdown() { synchronized (SslConnection.this) { return _endp.isInputShutdown() && - !(_unwrapBuf!=null&&BufferUtil.hasContent(_unwrapBuf)) && - !(_inbound!=null&&BufferUtil.hasContent(_inbound)); + !(_inApp!=null&&BufferUtil.hasContent(_inApp)) && + !(_inNet!=null&&BufferUtil.hasContent(_inNet)); } } + @Override public void close() throws IOException { - _logger.debug("{} ssl endp.close",_session); + LOG.debug("{} ssl endp.close",_session); _endp.close(); } + @Override public int fill(ByteBuffer buffer) throws IOException { int size=buffer.remaining(); - process(buffer, null); + synchronized (this) + { + if (!BufferUtil.hasContent(_inApp)) + process(null); + if (BufferUtil.hasContent(_inApp)) + BufferUtil.flipPutFlip(_inApp,buffer); + } int filled=buffer.remaining()-size; if (filled==0 && isInputShutdown()) @@ -621,31 +590,35 @@ public class SslConnection extends AbstractConnection return filled; } + @Override public int flush(ByteBuffer... buffers) throws IOException { int len=0; - for (ByteBuffer b : buffers) + bufloop: for (ByteBuffer b : buffers) { - if (b.hasRemaining()) + while (b.hasRemaining()) { int l = b.remaining(); - process(null, b); + if (!process(b)) + break bufloop; l=l-b.remaining(); if (l>0) len+=l; else - break; + break bufloop; } } return len; } + @Override public boolean isOpen() { return _endp.isOpen(); } + @Override public Object getTransport() { return _endp; @@ -653,67 +626,71 @@ public class SslConnection extends AbstractConnection public void flush() throws IOException { - process(null, null); + process(null); } + @Override public void onIdleExpired(long idleForMs) { - _aEndp.onIdleExpired(idleForMs); + _endp.onIdleExpired(idleForMs); } + @Override public void setCheckForIdle(boolean check) { - _aEndp.setCheckForIdle(check); + _endp.setCheckForIdle(check); } + @Override public boolean isCheckForIdle() { - return _aEndp.isCheckForIdle(); + return _endp.isCheckForIdle(); } + @Override public InetSocketAddress getLocalAddress() { - return _aEndp.getLocalAddress(); + return _endp.getLocalAddress(); } + @Override public InetSocketAddress getRemoteAddress() { - return _aEndp.getRemoteAddress(); - } - - public boolean isBlocking() - { - return false; + return _endp.getRemoteAddress(); } + @Override public int getMaxIdleTime() { - return _aEndp.getMaxIdleTime(); + return _endp.getMaxIdleTime(); } + @Override public void setMaxIdleTime(int timeMs) throws IOException { - _aEndp.setMaxIdleTime(timeMs); - } - - public Connection getConnection() - { - return _connection; + _endp.setMaxIdleTime(timeMs); } - public void setConnection(Connection connection) + @Override + public SelectableConnection getSelectableConnection() { - _connection=(Connection)connection; + return _appConnection; } + public void setSelectableConnection(SelectableConnection connection) + { + _appConnection=(AbstractSelectableConnection)connection; + } + + @Override public String toString() { // Do NOT use synchronized (SslConnection.this) // because it's very easy to deadlock when debugging is enabled. // We do a best effort to print the right toString() and that's it. - ByteBuffer inbound = _inbound; - ByteBuffer outbound = _outbound; - ByteBuffer unwrap = _unwrapBuf; + ByteBuffer inbound = _inNet; + ByteBuffer outbound = _outNet; + ByteBuffer unwrap = _inApp; int i = inbound == null? -1 : inbound.remaining(); int o = outbound == null ? -1 : outbound.remaining(); int u = unwrap == null ? -1 : unwrap.remaining(); @@ -721,7 +698,42 @@ public class SslConnection extends AbstractConnection _engine.getHandshakeStatus(), i, o, u, _ishut, _oshut, - _connection); + _appConnection); + } + + @Override + public void setWriteInterested(boolean interested) + { + _writeInterested=interested; + } + + @Override + public boolean isWriteInterested() + { + return _writeInterested; + } + + @Override + public void setReadInterested(boolean interested) + { + _readInterested=interested; + } + + @Override + public boolean isReadInterested() + { + return _readInterested; + } + + @Override + public long getLastNotIdleTimestamp() + { + return _endp.getLastNotIdleTimestamp(); + } + + @Override + public void checkForIdle(long now) + { } } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StringEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/StringEndPoint.java similarity index 96% rename from jetty-io/src/main/java/org/eclipse/jetty/io/bio/StringEndPoint.java rename to jetty-io/src/main/java/org/eclipse/jetty/io/StringEndPoint.java index ea5f4fc717d..cda32506825 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StringEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/StringEndPoint.java @@ -11,12 +11,11 @@ // You may elect to redistribute this code under either of these licenses. // ======================================================================== -package org.eclipse.jetty.io.bio; +package org.eclipse.jetty.io; import java.nio.ByteBuffer; import java.nio.charset.Charset; -import org.eclipse.jetty.io.ByteArrayEndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/BufferUtilTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java similarity index 97% rename from jetty-io/src/test/java/org/eclipse/jetty/io/nio/BufferUtilTest.java rename to jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java index 33062eec964..ae02dc36296 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/BufferUtilTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java @@ -1,4 +1,4 @@ -package org.eclipse.jetty.io.nio; +package org.eclipse.jetty.io; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/ChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ChannelEndPointTest.java similarity index 93% rename from jetty-io/src/test/java/org/eclipse/jetty/io/nio/ChannelEndPointTest.java rename to jetty-io/src/test/java/org/eclipse/jetty/io/ChannelEndPointTest.java index 1d8d5b2c7eb..6c849ca0a09 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/ChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ChannelEndPointTest.java @@ -1,9 +1,9 @@ -package org.eclipse.jetty.io.nio; +package org.eclipse.jetty.io; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import org.eclipse.jetty.io.EndPointTest; +import org.eclipse.jetty.io.ChannelEndPoint; import org.junit.AfterClass; import org.junit.BeforeClass; diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/NIOTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/NIOTest.java similarity index 99% rename from jetty-io/src/test/java/org/eclipse/jetty/io/nio/NIOTest.java rename to jetty-io/src/test/java/org/eclipse/jetty/io/NIOTest.java index 9366d8ce843..6ff25feb2aa 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/NIOTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/NIOTest.java @@ -11,7 +11,7 @@ // You may elect to redistribute this code under either of these licenses. // ======================================================================== -package org.eclipse.jetty.io.nio; +package org.eclipse.jetty.io; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertFalse; diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointSslTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java similarity index 93% rename from jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointSslTest.java rename to jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java index 66dbff1f51c..919f9f1e08e 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointSslTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java @@ -1,4 +1,4 @@ -package org.eclipse.jetty.io.nio; +package org.eclipse.jetty.io; import java.io.File; import java.io.IOException; @@ -11,6 +11,10 @@ import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLSocket; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.io.SslConnection; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.Assert; @@ -41,14 +45,14 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest } @Override - protected Connection newConnection(SocketChannel channel, AsyncEndPoint endpoint) + protected SelectableConnection newConnection(SocketChannel channel, SelectableEndPoint endpoint) { SSLEngine engine = __sslCtxFactory.newSslEngine(); engine.setUseClientMode(false); SslConnection connection = new SslConnection(engine,endpoint); - Connection delegate = super.newConnection(channel,connection.getSslEndPoint()); - connection.getSslEndPoint().setConnection(delegate); + SelectableConnection delegate = super.newConnection(channel,connection.getAppEndPoint()); + connection.setAppConnection(delegate); return connection; } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java similarity index 81% rename from jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointTest.java rename to jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 9b1f5a3bba0..a4672478860 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/nio/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -1,4 +1,4 @@ -package org.eclipse.jetty.io.nio; +package org.eclipse.jetty.io; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -20,8 +20,11 @@ import java.nio.channels.SocketChannel; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.AbstractSelectableConnection; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.SelectChannelEndPoint; +import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.thread.QueuedThreadPool; @@ -32,7 +35,7 @@ import org.junit.Test; public class SelectChannelEndPointTest { - protected SelectChannelEndPoint _lastEndp; + protected SelectableEndPoint _lastEndp; protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected SelectorManager _manager = new SelectorManager() @@ -54,12 +57,12 @@ public class SelectChannelEndPointTest } @Override - protected void endPointUpgraded(EndPoint endpoint, Connection oldConnection) + protected void endPointUpgraded(SelectChannelEndPoint endpoint, Connection oldConnection) { } @Override - public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) + public SelectableConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment) { return SelectChannelEndPointTest.this.newConnection(channel,endpoint); } @@ -68,7 +71,8 @@ public class SelectChannelEndPointTest protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException { SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000); - endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); + endp.setSelectableConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); + endp.setReadInterested(true); _lastEndp=endp; return endp; } @@ -99,88 +103,94 @@ public class SelectChannelEndPointTest return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort()); } - protected Connection newConnection(SocketChannel channel, EndPoint endpoint) + protected SelectableConnection newConnection(SocketChannel channel, SelectableEndPoint endpoint) { return new TestConnection(endpoint); } - public class TestConnection extends AbstractConnection implements Connection + public class TestConnection extends AbstractSelectableConnection { ByteBuffer _in = BufferUtil.allocate(32*1024); ByteBuffer _out = BufferUtil.allocate(32*1024); - public TestConnection(EndPoint endp) + public TestConnection(SelectableEndPoint endp) { super(endp); } - public void canRead() + @Override + public void doRead() { - boolean progress=true; - while(progress) + try { - progress=false; - _in.compact().flip(); - if (!BufferUtil.isFull(_in) && _endp.fill(_in)>0) + boolean progress=true; + while(progress) { - progress=true; - } - - - while (_blockAt>0 && _in.remaining()>0 && _in.remaining()<_blockAt) - { - // ((AsyncEndPoint)_endp).blockReadable(10000); - if (!BufferUtil.isFull(_in) && _endp.fill(_in)>0) + progress=false; + + // Fill the input buffer with everything available + if (!BufferUtil.isFull(_in)) + progress|=_endp.fill(_in)>0; + // If the tests wants to block, then block + while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt && blockReadable()) + progress|=_endp.fill(_in)>0; + + // Copy to the out buffer + if (BufferUtil.hasContent(_in) && BufferUtil.flipPutFlip(_in,_out)>0) progress=true; + + // Try non blocking write + if (BufferUtil.hasContent(_out) && _endp.flush(_out)>0) + + // Try blocking write + while (!_endp.isOutputShutdown() && BufferUtil.hasContent(_out)) + { + blockWriteable(); + if (_endp.flush(_out)>0) + progress=true; + } } - - if (BufferUtil.hasContent(_in) && BufferUtil.flipPutFlip(_in,_out)>0) - progress=true; - - if (BufferUtil.hasContent(_out) && _endp.flush(_out)>0) - progress=true; - - _out.compact().flip(); - - if (BufferUtil.isEmpty(_out) && _endp.isInputShutdown()) - _endp.shutdownOutput(); + } + catch(IOException e) + { + e.printStackTrace(); + } + finally + { + _endp.setReadInterested(true); } } - - public void canWrite() + + + @Override + public void onInputShutdown() { - + try + { + if (BufferUtil.isEmpty(_out)) + _endp.shutdownOutput(); + } + catch(IOException e) + { + e.printStackTrace(); + } } - + + @Override + public void onClose() + { + } + + @Override public boolean isIdle() { return false; } - - @Override - public boolean isReadInterested() - { - return true; - } - - @Override - public EndPoint getEndPoint() - { - return _endp; - } - - public void onClose() - { - // System.err.println("onClose"); - } - - public void onInputShutdown() throws IOException - { - // System.err.println("onInputShutdown"); - } - + + } + @Test public void testEcho() throws Exception { @@ -310,7 +320,7 @@ public class SelectChannelEndPointTest int specifiedTimeout = 400; client.setSoTimeout(specifiedTimeout); - // Write 8 and cause block for 10 + // Write 8 and cause block waiting for 10 _blockAt=10; clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.flush(); @@ -327,7 +337,7 @@ public class SelectChannelEndPointTest catch(SocketTimeoutException e) { int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue(); - System.err.println("blocked for " + elapsed+ "ms"); + // System.err.println("blocked for " + elapsed+ "ms"); Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3*specifiedTimeout/4)); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index e0d528221e5..b33ba39a3b7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -28,9 +28,9 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.Buffers.Type; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; -import org.eclipse.jetty.io.nio.Connection; import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; @@ -1142,12 +1142,10 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht /* ------------------------------------------------------------ */ protected void connectionClosed(Connection connection) { - connection.onClose(); - if (_statsStartedAt.get() == -1) return; - long duration = System.currentTimeMillis() - connection.getTimeStamp(); + long duration = System.currentTimeMillis() - connection.getCreatedTimeStamp(); int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0; _requestStats.set(requests); _connectionStats.decrement(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 220d83b0d5b..54b011aef79 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -27,17 +27,17 @@ import org.eclipse.jetty.http.HttpGenerator; import org.eclipse.jetty.http.HttpGenerator.Action; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.AbstractSelectableConnection; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; -import org.eclipse.jetty.io.nio.Connection; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; /** */ -public abstract class HttpConnection extends AbstractConnection +public abstract class HttpConnection extends AbstractSelectableConnection { private static final Logger LOG = Log.getLogger(HttpConnection.class); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index 6b795adc127..c85b347dd12 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -20,7 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.ByteArrayEndPoint; -import org.eclipse.jetty.io.nio.Connection; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java index fec8221972d..c9c25ab7561 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java @@ -20,8 +20,9 @@ import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.nio.SelectChannelEndPoint; -import org.eclipse.jetty.io.nio.SelectorManager; +import org.eclipse.jetty.io.SelectChannelEndPoint; +import org.eclipse.jetty.io.SelectableEndPoint; +import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; @@ -449,7 +450,7 @@ public class ConnectHandler extends HandlerWrapper } @Override - protected void endPointClosed(SelectChannelEndPoint endpoint) + protected void endPointClosed(SelectableEndPoint endpoint) { } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java index 072de4cf778..8129a9ddda2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java @@ -21,9 +21,10 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.eclipse.jetty.io.NetworkTrafficListener; -import org.eclipse.jetty.io.nio.NetworkTrafficSelectChannelEndPoint; -import org.eclipse.jetty.io.nio.SelectChannelEndPoint; -import org.eclipse.jetty.io.nio.SelectorManager; +import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint; +import org.eclipse.jetty.io.SelectChannelEndPoint; +import org.eclipse.jetty.io.SelectableEndPoint; +import org.eclipse.jetty.io.SelectorManager; /** *

A specialized version of {@link SelectChannelConnector} that supports {@link NetworkTrafficListener}s.

@@ -60,7 +61,7 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector } @Override - protected void endPointClosed(SelectChannelEndPoint endpoint) + protected void endPointClosed(SelectableEndPoint endpoint) { super.endPointClosed(endpoint); ((NetworkTrafficSelectChannelEndPoint)endpoint).notifyClosed(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java index 9c620c54fc5..2dda923bae4 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java @@ -21,11 +21,12 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import org.eclipse.jetty.continuation.Continuation; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.nio.Connection; -import org.eclipse.jetty.io.nio.SelectChannelEndPoint; -import org.eclipse.jetty.io.nio.SelectorManager; -import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; +import org.eclipse.jetty.io.SelectChannelEndPoint; +import org.eclipse.jetty.io.SelectableEndPoint; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.io.SelectorManager.SelectSet; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.util.thread.ThreadPool; @@ -255,7 +256,7 @@ public class SelectChannelConnector extends AbstractNIOConnector } /* ------------------------------------------------------------------------------- */ - protected void endPointClosed(SelectChannelEndPoint endpoint) + protected void endPointClosed(SelectableEndPoint endpoint) { connectionClosed(endpoint.getConnection()); } @@ -282,7 +283,7 @@ public class SelectChannelConnector extends AbstractNIOConnector } @Override - protected void endPointClosed(final SelectChannelEndPoint endpoint) + protected void endPointClosed(final SelectableEndPoint endpoint) { SelectChannelConnector.this.endPointClosed(endpoint); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java index 3029fb9b6a2..5fc717b3c36 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSelectChannelConnector.java @@ -28,7 +28,7 @@ import org.eclipse.jetty.io.Buffers.Type; import org.eclipse.jetty.io.BuffersFactory; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.RuntimeIOException; -import org.eclipse.jetty.io.nio.SslConnection; +import org.eclipse.jetty.io.SslConnection; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.util.component.AggregateLifeCycle; @@ -98,7 +98,7 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements request.setScheme(HttpScheme.HTTPS); super.customize(endpoint,request); - SslConnection.SslEndPoint sslEndpoint=(SslConnection.SslEndPoint)endpoint; + SslConnection.AppEndPoint sslEndpoint=(SslConnection.AppEndPoint)endpoint; SSLEngine sslEngine=sslEndpoint.getSslEngine(); SSLSession sslSession=sslEngine.getSession(); @@ -548,8 +548,8 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements { SSLEngine engine = createSSLEngine(channel); SslConnection connection = newSslConnection(endpoint, engine); - Connection delegate = newPlainConnection(channel, connection.getSslEndPoint()); - connection.getSslEndPoint().setConnection(delegate); + Connection delegate = newPlainConnection(channel, connection.getAppEndPoint()); + connection.getAppEndPoint().setConnection(delegate); connection.setAllowRenegotiate(_sslContextFactory.isAllowRenegotiate()); return connection; } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java index cf0b18df910..e85c20a474f 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java @@ -31,7 +31,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.nio.SslConnection; +import org.eclipse.jetty.io.SslConnection; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.IO; import org.junit.Assert; @@ -149,8 +149,8 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture // Get the server side endpoint EndPoint endp = endpoint.exchange(null,10,TimeUnit.SECONDS); - if (endp instanceof SslConnection.SslEndPoint) - endp=((SslConnection.SslEndPoint)endp).getEndpoint(); + if (endp instanceof SslConnection.AppEndPoint) + endp=((SslConnection.AppEndPoint)endp).getEndpoint(); // read the response String result=IO.toString(is); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java index 3bd3cf3920e..091a2c325b6 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelAsyncContextTest.java @@ -6,14 +6,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.nio.SelectChannelEndPoint; +import org.eclipse.jetty.io.SelectableEndPoint; import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.util.IO; import org.junit.Test; public class SelectChannelAsyncContextTest extends LocalAsyncContextTest { - volatile SelectChannelEndPoint _endp; + volatile SelectableEndPoint _endp; @Override protected Connector initConnector() @@ -24,7 +24,7 @@ public class SelectChannelAsyncContextTest extends LocalAsyncContextTest public void customize(EndPoint endpoint, Request request) throws IOException { super.customize(endpoint,request); - _endp=(SelectChannelEndPoint)endpoint; + _endp=(SelectableEndPoint)endpoint; } }; @@ -54,7 +54,7 @@ public class SelectChannelAsyncContextTest extends LocalAsyncContextTest try { TimeUnit.MILLISECONDS.sleep(200); - SelectChannelEndPoint endp=_endp; + SelectableEndPoint endp=_endp; if (endp!=null && endp.isOpen()) endp.asyncDispatch(); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java index 89ff23650db..c6aedaaacc2 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java @@ -268,7 +268,7 @@ public class BufferUtil /** * Put data from one buffer into another, avoiding over/under flows * @param from Buffer to take bytes from in flush mode - * @param to Buffer to put bytes to in flush mode. The buffer is flipped before and after the put. + * @param to Buffer to put bytes to in flush mode. The buffer is flipToFill before the put and flipToFlush after. * @return number of bytes moved */ public static int flipPutFlip(ByteBuffer from, ByteBuffer to) @@ -649,12 +649,12 @@ public class BufferUtil for (int i=0;i=' ') + if (c>=' ' && c<=127) buf.append(c); else if (c=='\r'||c=='\n') buf.append('|'); else - buf.append('?'); + buf.append('\ufffd'); if (i==16&&buffer.position()>32) { buf.append("..."); @@ -665,12 +665,12 @@ public class BufferUtil for (int i=buffer.position();i=' ') + if (c>=' ' && c<=127) buf.append(c); else if (c=='\r'||c=='\n') buf.append('|'); else - buf.append('?'); + buf.append('\ufffd'); if (i==buffer.position()+16&&buffer.limit()>buffer.position()+32) { buf.append("..."); @@ -683,12 +683,12 @@ public class BufferUtil for (int i=limit;i=' ') + if (c>=' ' && c<=127) buf.append(c); else if (c=='\r'||c=='\n') buf.append('|'); else - buf.append('?'); + buf.append('\ufffd'); if (i==limit+16&&buffer.capacity()>limit+32) { buf.append("...");