diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractSelectableConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractSelectableConnection.java deleted file mode 100644 index 4dc1a189f6f..00000000000 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractSelectableConnection.java +++ /dev/null @@ -1,226 +0,0 @@ -package org.eclipse.jetty.io; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; - - -public abstract class AbstractSelectableConnection implements SelectableConnection -{ - private static final Logger LOG = Log.getLogger(AbstractSelectableConnection.class); - - protected final SelectableEndPoint _endp; - private final long _createdTimeStamp; - private final Lock _lock=new ReentrantLock(); - private final Condition _readable=_lock.newCondition(); - private final Condition _writeable=_lock.newCondition(); - private boolean _readBlocked; - private boolean _writeBlocked; - - private final Runnable _reader=new Runnable() - { - @Override - public void run() - { - try - { - doRead(); - } - catch(Throwable th) - { - LOG.warn(th); - } - } - }; - private final Runnable _writer=new Runnable() - { - @Override - public void run() - { - try - { - doWrite(); - } - catch(Throwable th) - { - LOG.warn(th); - } - } - }; - - private volatile int _maxIdleTime=-1; - - public AbstractSelectableConnection(SelectableEndPoint endp) - { - _endp=endp; - _createdTimeStamp = System.currentTimeMillis(); - } - - @Override - public EndPoint getEndPoint() - { - return _endp; - } - - @Override - public SelectableEndPoint getSelectableEndPoint() - { - return _endp; - } - - @Override - public long getCreatedTimeStamp() - { - return _createdTimeStamp; - } - - @Override - public Runnable onReadable() - { - _lock.lock(); - try - { - if (_readBlocked) - _readable.signalAll(); - else - return _reader; - } - finally - { - _lock.unlock(); - } - return null; - } - - @Override - public Runnable onWriteable() - { - _lock.lock(); - try - { - if (_writeBlocked) - _writeable.signalAll(); - else - return _writer; - } - finally - { - _lock.unlock(); - } - return null; - } - - @Override - public boolean blockReadable() - { - _lock.lock(); - boolean readable=false; - try - { - if (_readBlocked) - throw new IllegalStateException(); - _readBlocked=true; - _endp.setReadInterested(true); - readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS); - } - catch(InterruptedException e) - { - LOG.ignore(e); - } - finally - { - if (!readable) - _endp.setReadInterested(false); - _readBlocked=false; - _lock.unlock(); - } - return readable; - } - - @Override - public boolean blockWriteable() - { - _lock.lock(); - boolean writeable=false; - try - { - if (_writeBlocked) - throw new IllegalStateException(); - _writeBlocked=true; - _endp.setWriteInterested(true); - writeable=_writeable.await(getMaxIdleTime(),TimeUnit.SECONDS); - } - catch(InterruptedException e) - { - LOG.ignore(e); - } - finally - { - if (!writeable) - _endp.setWriteInterested(false); - _writeBlocked=false; - _lock.unlock(); - } - return writeable; - } - - @Override - public void onIdleExpired(long idleForMs) - { - try - { - LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp); - if (_endp.isInputShutdown() || _endp.isOutputShutdown()) - _endp.close(); - else - _endp.shutdownOutput(); - } - catch(IOException e) - { - LOG.ignore(e); - - try - { - _endp.close(); - } - catch(IOException e2) - { - LOG.ignore(e2); - } - } - } - - protected void doRead() - { - throw new IllegalStateException(); - } - - protected void doWrite() - { - throw new IllegalStateException(); - } - - - @Override - public int getMaxIdleTime() - { - int max=_maxIdleTime; - return (max==-1)?_endp.getMaxIdleTime():max; - } - - public void setMaxIdleTime(int max) - { - _maxIdleTime=max; - } - - @Override - public String toString() - { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); - } -} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 9dd159a0c71..86d832cdea8 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -104,7 +104,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Selectable /** Called by selectSet to schedule handling * */ - public void selected() + public void selected() throws IOException { _lock.lock(); _selected=true; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java index 26362565db2..c1b1a5ed895 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableConnection.java @@ -1,28 +1,227 @@ package org.eclipse.jetty.io; -public interface SelectableConnection extends Connection +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + + +public abstract class SelectableConnection implements Connection { - SelectableEndPoint getSelectableEndPoint(); + private static final Logger LOG = Log.getLogger(SelectableConnection.class); - Runnable onReadable(); - Runnable onWriteable(); - - public boolean blockReadable(); - - public boolean blockWriteable(); + protected final SelectableEndPoint _endp; + private final long _createdTimeStamp; + private final Lock _lock=new ReentrantLock(); + private final Condition _readable=_lock.newCondition(); + private final Condition _writeable=_lock.newCondition(); + private boolean _readBlocked; + private boolean _writeBlocked; - /** - * Called when the connection idle timeout expires - * @param idleForMs TODO - */ - void onIdleExpired(long idleForMs); + private final Runnable _reader=new Runnable() + { + @Override + public void run() + { + try + { + doRead(); + } + catch(Throwable th) + { + LOG.warn(th); + } + } + }; + private final Runnable _writer=new Runnable() + { + @Override + public void run() + { + try + { + doWrite(); + } + catch(Throwable th) + { + LOG.warn(th); + } + } + }; - void onInputShutdown(); + private volatile int _maxIdleTime=-1; + + public SelectableConnection(SelectableEndPoint endp) + { + _endp=endp; + _createdTimeStamp = System.currentTimeMillis(); + } - /** - * Called when the connection is closed - */ - void onClose(); + @Override + public EndPoint getEndPoint() + { + return _endp; + } + public SelectableEndPoint getSelectableEndPoint() + { + return _endp; + } + + @Override + public long getCreatedTimeStamp() + { + return _createdTimeStamp; + } + public Runnable onReadable() + { + _lock.lock(); + try + { + if (_readBlocked) + _readable.signalAll(); + else + return _reader; + } + finally + { + _lock.unlock(); + } + return null; + } + + public Runnable onWriteable() + { + _lock.lock(); + try + { + if (_writeBlocked) + _writeable.signalAll(); + else + return _writer; + } + finally + { + _lock.unlock(); + } + return null; + } + + public boolean blockReadable() + { + _lock.lock(); + boolean readable=false; + try + { + if (_readBlocked) + throw new IllegalStateException(); + _readBlocked=true; + _endp.setReadInterested(true); + readable=_readable.await(getMaxIdleTime(),TimeUnit.SECONDS); + } + catch(InterruptedException e) + { + LOG.ignore(e); + } + finally + { + if (!readable) + _endp.setReadInterested(false); + _readBlocked=false; + _lock.unlock(); + } + return readable; + } + + public boolean blockWriteable() + { + _lock.lock(); + boolean writeable=false; + try + { + if (_writeBlocked) + throw new IllegalStateException(); + _writeBlocked=true; + _endp.setWriteInterested(true); + writeable=_writeable.await(getMaxIdleTime(),TimeUnit.SECONDS); + } + catch(InterruptedException e) + { + LOG.ignore(e); + } + finally + { + if (!writeable) + _endp.setWriteInterested(false); + _writeBlocked=false; + _lock.unlock(); + } + return writeable; + } + + public void onIdleExpired(long idleForMs) + { + try + { + LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp); + if (_endp.isInputShutdown() || _endp.isOutputShutdown()) + _endp.close(); + else + _endp.shutdownOutput(); + } + catch(IOException e) + { + LOG.ignore(e); + + try + { + _endp.close(); + } + catch(IOException e2) + { + LOG.ignore(e2); + } + } + } + + protected void doRead() + { + throw new IllegalStateException(); + } + + protected void doWrite() + { + throw new IllegalStateException(); + } + + @Override + public int getMaxIdleTime() + { + int max=_maxIdleTime; + return (max==-1)?_endp.getMaxIdleTime():max; + } + + public void setMaxIdleTime(int max) + { + _maxIdleTime=max; + } + + @Override + public String toString() + { + return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + } + + public void onInputShutdown() throws IOException + { + } + + public void onClose() + { + } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java index 61aadcb5ac1..968519d7f15 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java @@ -38,7 +38,7 @@ import org.eclipse.jetty.util.log.Logger; * it's source/sink of encrypted data. It then provides {@link #getAppEndPoint()} to * expose a source/sink of unencrypted data to another connection (eg HttpConnection). */ -public class SslConnection extends AbstractSelectableConnection +public class SslConnection extends SelectableConnection { private static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.nio.ssl"); @@ -679,7 +679,7 @@ public class SslConnection extends AbstractSelectableConnection public void setSelectableConnection(SelectableConnection connection) { - _appConnection=(AbstractSelectableConnection)connection; + _appConnection=(SelectableConnection)connection; } @Override diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index a4672478860..095be5c5e7f 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -21,7 +21,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.Connection; -import org.eclipse.jetty.io.AbstractSelectableConnection; +import org.eclipse.jetty.io.SelectableConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectorManager; @@ -108,7 +108,7 @@ public class SelectChannelEndPointTest return new TestConnection(endpoint); } - public class TestConnection extends AbstractSelectableConnection + public class TestConnection extends SelectableConnection { ByteBuffer _in = BufferUtil.allocate(32*1024); ByteBuffer _out = BufferUtil.allocate(32*1024); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 54b011aef79..e157a4665e5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -27,17 +27,18 @@ import org.eclipse.jetty.http.HttpGenerator; import org.eclipse.jetty.http.HttpGenerator.Action; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.io.AbstractSelectableConnection; +import org.eclipse.jetty.io.SelectableConnection; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.SelectableEndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; /** */ -public abstract class HttpConnection extends AbstractSelectableConnection +public abstract class HttpConnection extends SelectableConnection { private static final Logger LOG = Log.getLogger(HttpConnection.class); @@ -74,7 +75,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection /** Constructor * */ - public HttpConnection(Connector connector, EndPoint endpoint, Server server) + public HttpConnection(Connector connector, SelectableEndPoint endpoint, Server server) { super(endpoint); _connector = connector; @@ -149,6 +150,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection } /* ------------------------------------------------------------ */ + @Override public boolean isIdle() { return _parser.isIdle() && _generator.isIdle(); @@ -161,6 +163,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection } /* ------------------------------------------------------------ */ + @Override public int getMaxIdleTime() { if (_connector.isLowResources() && _endp.getMaxIdleTime()==_connector.getMaxIdleTime()) @@ -171,6 +174,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection } /* ------------------------------------------------------------ */ + @Override public String toString() { return String.format("%s,g=%s,p=%s", @@ -179,21 +183,6 @@ public abstract class HttpConnection extends AbstractSelectableConnection _parser); } - /* ------------------------------------------------------------ */ - @Override - public void canRead() - { - - - } - - /* ------------------------------------------------------------ */ - @Override - public void canWrite() - { - - - } /* ------------------------------------------------------------ */ public void processInput() @@ -206,7 +195,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection setCurrentConnection(this); // don't check for idle while dispatched (unless blocking IO is done). - getEndPoint().setCheckForIdle(false); + getSelectableEndPoint().setCheckForIdle(false); // While progress and the connection has not changed @@ -277,7 +266,7 @@ public abstract class HttpConnection extends AbstractSelectableConnection if (!_processor.getRequest().getAsyncContinuation().isAsyncStarted()) { // reenable idle checking unless request is suspended - getEndPoint().setCheckForIdle(true); + getSelectableEndPoint().setCheckForIdle(true); } } } @@ -416,18 +405,20 @@ public abstract class HttpConnection extends AbstractSelectableConnection break; if (_toFlush>0) - _endp.blockWritable(getMaxIdleTime()); + blockWriteable(); } } /* ------------------------------------------------------------ */ + @Override public void onClose() { _processor.onClose(); } /* ------------------------------------------------------------ */ + @Override public void onInputShutdown() throws IOException { // If we don't have a committed response and we are not suspended