diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java index 12444183160..78c1df64d55 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java @@ -15,20 +15,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection private static final Logger LOG = Log.getLogger(AbstractAsyncConnection.class); protected final AsyncEndPoint _endp; - private IOFuture.Callback _readCallback = new IOFuture.Callback() - { - @Override - public void onReady() - { - onReadable(); - } - - @Override - public void onFail(Throwable cause) - { - onReadFail(cause); - } - }; + private final IOFuture.Callback _readCallback = new ReadCallback(); public AbstractAsyncConnection(AsyncEndPoint endp) @@ -89,4 +76,27 @@ public abstract class AbstractAsyncConnection implements AsyncConnection { return String.format("%s@%x", getClass().getSimpleName(), hashCode()); } + + + private class ReadCallback implements IOFuture.Callback + { + @Override + public void onReady() + { + onReadable(); + } + + @Override + public void onFail(Throwable cause) + { + onReadFail(cause); + } + + @Override + public String toString() + { + return String.format("AAC$ReadCB@%x",hashCode()); + } + } + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/CompletedIOFuture.java b/jetty-io/src/main/java/org/eclipse/jetty/io/CompletedIOFuture.java index 4c4cd69c499..85d394bc813 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/CompletedIOFuture.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/CompletedIOFuture.java @@ -81,9 +81,8 @@ public class CompletedIOFuture implements IOFuture @Override public String toString() { - return String.format("CIOF@%x{r=%b,c=%s}", + return String.format("CIOF@%x{%s}", hashCode(), - _ready, - _cause); + _ready?"R":_cause); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java b/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java index 6659dc3b4e7..c430b1c393b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java @@ -265,11 +265,10 @@ public class DispatchedIOFuture implements IOFuture @Override public String toString() { - return String.format("RIOF@%x{c=%b,r=%b,c=%s}", + return String.format("DIOF@%x{%s,%s}", hashCode(), - _complete, - _ready, - _cause); + _complete?(_ready?"R":_cause):"-", + _callback==null?"-":_callback); } public static void rethrow(ExecutionException e) throws IOException diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java b/jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java index 7d8b0662401..25778afd245 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java @@ -34,7 +34,9 @@ final class RunnableIOFuture extends DispatchedIOFuture public void run() { - takeTask().run(); + Runnable task=takeTask(); + if (task!=null) + task.run(); } public boolean isDispatched() diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index c506c77d920..6f91a4576f5 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -40,6 +40,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo private final SelectorManager.SelectSet _selectSet; private final SelectorManager _manager; + private final DispatchedIOFuture _readFuture = new InterestedFuture(SelectionKey.OP_READ,true,_lock); + private final DispatchedIOFuture _writeFuture = new InterestedFuture(SelectionKey.OP_WRITE,true,_lock); + private SelectionKey _key; private boolean _selected; @@ -54,56 +57,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo private volatile boolean _idlecheck; private volatile AbstractAsyncConnection _connection; - private DispatchedIOFuture _readFuture = new DispatchedIOFuture(true,_lock) - { - @Override - protected void dispatch(Runnable task) - { - _manager.dispatch(task); - } - - @Override - public void cancel() - { - _lock.lock(); - try - { - _interestOps=_interestOps&~SelectionKey.OP_READ; - updateKey(); - cancelled(); - } - finally - { - _lock.unlock(); - } - } - }; - private ByteBuffer[] _writeBuffers; - private DispatchedIOFuture _writeFuture = new DispatchedIOFuture(true,_lock) - { - @Override - protected void dispatch(Runnable task) - { - _manager.dispatch(task); - } - - @Override - public void cancel() - { - _lock.lock(); - try - { - _interestOps=_interestOps&~SelectionKey.OP_WRITE; - updateKey(); - cancelled(); - } - finally - { - _lock.unlock(); - } - } - }; /* ------------------------------------------------------------ */ public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) @@ -475,6 +429,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo @Override public void close() throws IOException { + _lock.lock(); try { super.close(); @@ -486,6 +441,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo finally { updateKey(); + _lock.unlock(); } } @@ -518,7 +474,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo } - return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s}-{%s}", + return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s,r=%s,w=%s}-{%s}", hashCode(), getRemoteAddress(), getLocalAddress(), @@ -527,6 +483,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo isOutputShutdown(), _interestOps, keyString, + _readFuture, + _writeFuture, getAsyncConnection()); } @@ -536,6 +494,44 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo return _selectSet; } - + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + private class InterestedFuture extends DispatchedIOFuture + { + final int _interest; + private InterestedFuture(int interest,boolean ready, Lock lock) + { + super(ready,lock); + _interest=interest; + } + + @Override + protected void dispatch(Runnable task) + { + if (!_manager.dispatch(task)) + { + LOG.warn("Dispatch failed: i="+_interest); + throw new IllegalStateException(); + } + } + + @Override + public void cancel() + { + _lock.lock(); + try + { + _interestOps=_interestOps&~_interest; + updateKey(); + cancelled(); + } + finally + { + _lock.unlock(); + } + } + } + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java index 47201665221..812ad5a7c3a 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java @@ -45,6 +45,7 @@ import org.eclipse.jetty.util.log.Logger; public class SslConnection extends AbstractAsyncConnection { static final Logger LOG = Log.getLogger("org.eclipse.jetty.io.ssl"); + private static final ByteBuffer __ZERO_BUFFER=BufferUtil.allocate(0); private static final ThreadLocal __buffers = new ThreadLocal(); @@ -238,31 +239,24 @@ public class SslConnection extends AbstractAsyncConnection @Override public void onIdleExpired(long idleForMs) { - try - { - LOG.debug("onIdleExpired {}ms on {}",idleForMs,this); - if (_endp.isOutputShutdown()) - _appEndPoint.close(); - else - _appEndPoint.shutdownOutput(); - } - catch (IOException e) - { - LOG.warn(e); - super.onIdleExpired(idleForMs); - } + System.err.println("LAST "+(System.currentTimeMillis()-_last)); + _appConnection.onIdleExpired(idleForMs); } + long _last; + /* ------------------------------------------------------------ */ @Override public void onReadable() { - LOG.debug("onReadable {}",this); - _lock.lock(); try { + System.err.println("onReadable"); + _last=System.currentTimeMillis(); + LOG.debug("onReadable {}",this); + _netReadFuture=null; allocateBuffers(); @@ -292,17 +286,18 @@ public class SslConnection extends AbstractAsyncConnection finally { releaseBuffers(); - if (!_appReadFuture.isComplete() && _netReadFuture==null) + if (!_appReadFuture.isComplete() && _netReadFuture==null && !BufferUtil.isFull(_inNet)) _netReadFuture=scheduleOnReadable(); LOG.debug("!onReadable {} {}",this,_netReadFuture); - + _lock.unlock(); + + // Run any ready callback from _appReadFuture in this thread. + _appReadFuture.run(); + _appWriteFuture.run(); } - // Run any ready callback from _appReadFuture in this thread. - if (_appReadFuture.isDispatched()) - _appReadFuture.run(); } /* ------------------------------------------------------------ */ @@ -414,6 +409,10 @@ public class SslConnection extends AbstractAsyncConnection } finally { + // Has the net data consumed allowed us to release net backpressure? + if (BufferUtil.compact(_inNet) && !_appReadFuture.isComplete() && _netReadFuture==null) + _netReadFuture=scheduleOnReadable(); + releaseBuffers(); _lock.unlock(); } @@ -497,7 +496,7 @@ public class SslConnection extends AbstractAsyncConnection int pos = BufferUtil.flipToFill(_inApp); try { - result=_engine.unwrap(_inNet,_inApp); + result=_engine.unwrap(_inNet,_inApp); } catch(SSLException e) { @@ -516,7 +515,7 @@ public class SslConnection extends AbstractAsyncConnection result.getStatus(), result.getHandshakeStatus(), result.bytesConsumed(), - result.bytesProduced()); + result.bytesProduced()); switch(result.getStatus()) { @@ -605,6 +604,9 @@ public class SslConnection extends AbstractAsyncConnection finally { _lock.unlock(); + _appReadFuture.run(); + _appWriteFuture.run(); + } } @@ -654,7 +656,10 @@ public class SslConnection extends AbstractAsyncConnection process(null); if (BufferUtil.hasContent(_inApp)) + { BufferUtil.append(_inApp,buffer); + BufferUtil.compact(_inApp); + } } finally { @@ -670,23 +675,31 @@ public class SslConnection extends AbstractAsyncConnection @Override public int flush(ByteBuffer... buffers) throws IOException { - int len=0; - bufloop: for (ByteBuffer b : buffers) + _lock.lock(); + try { - while (b.hasRemaining()) + int len=0; + bufloop: for (ByteBuffer b : buffers) { - int l = b.remaining(); - if (!process(b)) - break bufloop; - l=l-b.remaining(); - - if (l>0) - len+=l; - else - break bufloop; + while (b.hasRemaining()) + { + int l = b.remaining(); + if (!process(b)) + break bufloop; + l=l-b.remaining(); + + if (l>0) + len+=l; + else + break bufloop; + } } + return len; + } + finally + { + _lock.unlock(); } - return len; } @Override @@ -726,11 +739,12 @@ public class SslConnection extends AbstractAsyncConnection int i = inbound == null? -1 : inbound.remaining(); int o = outbound == null ? -1 : outbound.remaining(); int u = unwrap == null ? -1 : unwrap.remaining(); - return String.format("SSL %s %s i/o/u=%d/%d/%d ep.ishut=%b oshut=%b {%s}", + return String.format("SSL%s[%s,i/o/u=%d/%d/%d,ep.ishut=%b,oshut=%b,r=%s,w=%s}-{%s}", super.toString(), _engine.getHandshakeStatus(), i, o, u, _endp.isInputShutdown(), _oshut, + _appReadFuture,_appWriteFuture, _appConnection); } @@ -806,6 +820,8 @@ public class SslConnection extends AbstractAsyncConnection finally { _lock.unlock(); + _appReadFuture.run(); + _appWriteFuture.run(); } } @@ -831,9 +847,8 @@ public class SslConnection extends AbstractAsyncConnection finally { _lock.unlock(); - - if (_appWriteFuture.isDispatched()) - _appWriteFuture.run(); + _appReadFuture.run(); + _appWriteFuture.run(); } } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java index e9bd187a791..e945f4c47fd 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java @@ -194,6 +194,6 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest @Override public void testStress() throws Exception { - // super.testStress(); + super.testStress(); } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index d6c590ae7af..ae57cf65c58 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -7,6 +7,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -21,6 +22,7 @@ import java.nio.channels.SocketChannel; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.io.AbstractAsyncConnection; @@ -119,6 +121,7 @@ public class SelectChannelEndPointTest { ByteBuffer _in = BufferUtil.allocate(32*1024); ByteBuffer _out = BufferUtil.allocate(32*1024); + long _last=-1; public TestConnection(AsyncEndPoint endp) { @@ -126,10 +129,12 @@ public class SelectChannelEndPointTest } @Override - public void onReadable() + public synchronized void onReadable() { + System.err.println("APP onReadable"); try { + _last=System.currentTimeMillis(); _endp.setCheckForIdle(false); boolean progress=true; while(progress) @@ -137,18 +142,17 @@ public class SelectChannelEndPointTest progress=false; // Fill the input buffer with everything available - if (!BufferUtil.isFull(_in)) - { - int filled=_endp.fill(_in); - if (filled>0) - progress=true; - } - + if (BufferUtil.isFull(_in)) + throw new IllegalStateException("FULL "+BufferUtil.toDetailString(_in)); + int filled=_endp.fill(_in); + if (filled>0) + progress=true; + // If the tests wants to block, then block while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt) { _endp.read().block(); - int filled=_endp.fill(_in); + filled=_endp.fill(_in); progress|=filled>0; } @@ -162,21 +166,15 @@ public class SelectChannelEndPointTest ByteBuffer out=_out.duplicate(); BufferUtil.clear(_out); for (int i=0;i<_writeCount;i++) - { _endp.write(out.asReadOnlyBuffer()).block(); - } progress=true; } // are we done? - if (BufferUtil.isEmpty(_out) && _endp.isInputShutdown()) + if (_endp.isInputShutdown()) _endp.shutdownOutput(); } } - catch(ClosedChannelException e) - { - // System.err.println(e); - } catch(ExecutionException e) { // Timeout does not close, so echo exception then shutdown @@ -190,10 +188,6 @@ public class SelectChannelEndPointTest e2.printStackTrace(); } } - catch(InterruptedException e) - { - // System.err.println(e); - } catch(Exception e) { e.printStackTrace(); @@ -208,10 +202,28 @@ public class SelectChannelEndPointTest } } + @Override + public void onIdleExpired(long idleForMs) + { + System.err.println("IDLE "+idleForMs); + System.err.println("last "+(System.currentTimeMillis()-_last)); + System.err.println("ENDP "+_endp); + System.err.println("tran "+_endp.getTransport()); + System.err.println(); + super.onIdleExpired(idleForMs); + } + @Override public void onClose() { } + + @Override + public String toString() + { + return String.format("%s{}", + super.toString()); + } } @@ -365,7 +377,6 @@ public class SelectChannelEndPointTest catch(SocketTimeoutException e) { int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue(); - // System.err.println("blocked for " + elapsed+ "ms"); Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3*specifiedTimeout/4)); } @@ -491,7 +502,7 @@ public class SelectChannelEndPointTest public void testStress() throws Exception { Socket client = newClient(); - client.setSoTimeout(30000); + client.setSoTimeout(60000); SocketChannel server = _connector.accept(); server.configureBlocking(false); @@ -501,17 +512,25 @@ public class SelectChannelEndPointTest final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET); byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET); + BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream()); final CountDownLatch latch = new CountDownLatch(writes); final InputStream in = new BufferedInputStream(client.getInputStream()); final long start = System.currentTimeMillis(); - client.getOutputStream().write(bytes); - client.getOutputStream().write(count); - client.getOutputStream().flush(); + out.write(bytes); + out.write(count); + out.flush(); + + while (_lastEndp==null) + Thread.sleep(10); + _lastEndp.setMaxIdleTime(5000); new Thread() { public void run() { + Thread.currentThread().setPriority(MAX_PRIORITY); + long last=-1; + int count=-1; try { while (latch.getCount()>0) @@ -524,36 +543,42 @@ public class SelectChannelEndPointTest assertEquals(0xff&b0,b); } + count=0; int b=in.read(); while(b>0 && b!='\n') + { + count=count*10+(b-'0'); b=in.read(); + } + last=System.currentTimeMillis(); + latch.countDown(); } } catch(Throwable e) { + long now = System.currentTimeMillis(); + System.err.println("count="+count); System.err.println("latch="+latch.getCount()); - System.err.println("time="+(System.currentTimeMillis()-start)); + System.err.println("time="+(now-start)); + System.err.println("last="+(now-last)); + System.err.println("endp="+_lastEndp); e.printStackTrace(); } } }.start(); - - - PrintStream print = new PrintStream(client.getOutputStream()); // Write client to server for (int i=1;i