diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java index ae04c0cb544..aab7e2f8094 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java @@ -13,6 +13,9 @@ import org.eclipse.jetty.util.log.Logger; public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint { + + + public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class); private final Lock _lock = new ReentrantLock(); @@ -20,54 +23,10 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn private volatile AsyncConnection _connection; - private DispatchedIOFuture _readFuture = new DispatchedIOFuture(true,_lock) - { - @Override - protected void dispatch(Runnable task) - { - AsyncByteArrayEndPoint.this.dispatch(task); - } - - @Override - public void cancel() - { - _lock.lock(); - try - { - // TODO ?? - cancelled(); - } - finally - { - _lock.unlock(); - } - } - }; + private DispatchedIOFuture _readFuture; + private DispatchedIOFuture _writeFuture; private ByteBuffer[] _writeBuffers; - private DispatchedIOFuture _writeFuture = new DispatchedIOFuture(true,_lock) - { - @Override - protected void dispatch(Runnable task) - { - AsyncByteArrayEndPoint.this.dispatch(task); - } - - @Override - public void cancel() - { - _lock.lock(); - try - { - // TODO - cancelled(); - } - finally - { - _lock.unlock(); - } - } - }; @@ -93,10 +52,10 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn _lock.lock(); try { - if (!_readFuture.isComplete()) + if (_readFuture!=null && !_readFuture.isComplete()) throw new IllegalStateException("previous read not complete"); - _readFuture.recycle(); + _readFuture=new ReadFuture(); // TODO @@ -114,7 +73,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn _lock.lock(); try { - if (!_writeFuture.isComplete()) + if (_writeFuture!=null && !_writeFuture.isComplete()) throw new IllegalStateException("previous write not complete"); flush(buffers); @@ -125,7 +84,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn if (b.hasRemaining()) { _writeBuffers=buffers; - _writeFuture.recycle(); + _writeFuture=new WriteFuture(); // TODO return _writeFuture; } @@ -221,5 +180,68 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn } } } + + + private final class WriteFuture extends DispatchedIOFuture + { + private WriteFuture() + { + super(_lock); + } + + @Override + protected void dispatch(Runnable task) + { + AsyncByteArrayEndPoint.this.dispatch(task); + } + + @Override + public void cancel() + { + _lock.lock(); + try + { + // TODO + cancelled(); + } + finally + { + _lock.unlock(); + } + } + } + + + + + private final class ReadFuture extends DispatchedIOFuture + { + private ReadFuture() + { + super(_lock); + } + + @Override + protected void dispatch(Runnable task) + { + AsyncByteArrayEndPoint.this.dispatch(task); + } + + @Override + public void cancel() + { + _lock.lock(); + try + { + // TODO ?? + cancelled(); + } + finally + { + _lock.unlock(); + } + } + } + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java b/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java index c58b66be85d..e8dd4c93d35 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java @@ -109,25 +109,6 @@ public class DispatchedIOFuture implements IOFuture } } - public void recycle() - { - // System.err.println(this);new Throwable().printStackTrace(); - _lock.lock(); - try - { - if (!_complete) - throw new IllegalStateException(); - _ready=false; - _cause=null; - _complete=false; - _callback=null; - } - finally - { - _lock.unlock(); - } - } - @Override public boolean isComplete() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java b/jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java index 25778afd245..dcb7910a398 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java @@ -16,6 +16,15 @@ final class RunnableIOFuture extends DispatchedIOFuture { super(ready,lock); } + + RunnableIOFuture(Lock lock) + { + super(lock); + } + + RunnableIOFuture() + { + } @Override protected void dispatch(Runnable callback) @@ -44,11 +53,4 @@ final class RunnableIOFuture extends DispatchedIOFuture return _task!=null; } - @Override - public void recycle() - { - if (_task!=null) - throw new IllegalStateException("unrun task"); - super.recycle(); - } } \ No newline at end of file diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 6f91a4576f5..e9b749763dd 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -40,8 +40,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo private final SelectorManager.SelectSet _selectSet; private final SelectorManager _manager; - private final DispatchedIOFuture _readFuture = new InterestedFuture(SelectionKey.OP_READ,true,_lock); - private final DispatchedIOFuture _writeFuture = new InterestedFuture(SelectionKey.OP_WRITE,true,_lock); + + private DispatchedIOFuture _readFuture=new DispatchedIOFuture(true,_lock); + private DispatchedIOFuture _writeFuture=new DispatchedIOFuture(true,_lock); private SelectionKey _key; @@ -221,10 +222,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo _lock.lock(); try { - if (!_readFuture.isComplete()) + if (_readFuture!=null && !_readFuture.isComplete()) throw new IllegalStateException("previous read not complete"); - _readFuture.recycle(); + _readFuture=new InterestedFuture(SelectionKey.OP_READ); _interestOps=_interestOps|SelectionKey.OP_READ; updateKey(); @@ -244,7 +245,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo _lock.lock(); try { - if (!_writeFuture.isComplete()) + if (_writeFuture!=null && !_writeFuture.isComplete()) throw new IllegalStateException("previous write not complete"); flush(buffers); @@ -255,7 +256,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo if (b.hasRemaining()) { _writeBuffers=buffers; - _writeFuture.recycle(); + _writeFuture=new InterestedFuture(SelectionKey.OP_WRITE); _interestOps=_interestOps|SelectionKey.OP_WRITE; updateKey(); return _writeFuture; @@ -500,9 +501,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo private class InterestedFuture extends DispatchedIOFuture { final int _interest; - private InterestedFuture(int interest,boolean ready, Lock lock) + private InterestedFuture(int interest) { - super(ready,lock); + super(_lock); _interest=interest; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java index d782d283385..428171316cf 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java @@ -16,10 +16,7 @@ package org.eclipse.jetty.io; import static org.eclipse.jetty.io.CompletedIOFuture.COMPLETE; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -50,10 +47,10 @@ public class SslConnection extends AbstractAsyncConnection private static final ThreadLocal __buffers = new ThreadLocal(); private final Lock _lock = new ReentrantLock(); - - private final RunnableIOFuture _appReadFuture = new RunnableIOFuture(true,_lock); - private final RunnableIOFuture _appWriteFuture = new RunnableIOFuture(true,_lock); private final IOFuture.Callback _netWriteCallback = new NetWriteCallback(); + + private RunnableIOFuture _appReadFuture = new RunnableIOFuture(true,_lock); + private RunnableIOFuture _appWriteFuture = new RunnableIOFuture(true,_lock); private final SSLEngine _engine; private final SSLSession _session; @@ -259,7 +256,7 @@ public class SslConnection extends AbstractAsyncConnection allocateBuffers(); boolean progress=true; - while(progress && !_appReadFuture.isDispatched()) + while(progress && (_appReadFuture==null || !_appReadFuture.isDispatched())) { progress=false; @@ -284,7 +281,7 @@ public class SslConnection extends AbstractAsyncConnection finally { releaseBuffers(); - if (!_appReadFuture.isComplete() && _netReadFuture==null && !BufferUtil.isFull(_inNet)) + if (_appReadFuture!=null && !_appReadFuture.isComplete() && _netReadFuture==null && !BufferUtil.isFull(_inNet)) _netReadFuture=scheduleOnReadable(); LOG.debug("!onReadable {} {}",this,_netReadFuture); @@ -778,7 +775,7 @@ public class SslConnection extends AbstractAsyncConnection return COMPLETE; // No, we need to schedule a network read - _appReadFuture.recycle(); + _appReadFuture=new RunnableIOFuture(_lock); if (_netReadFuture==null) _netReadFuture=scheduleOnReadable(); return _appReadFuture; @@ -806,7 +803,7 @@ public class SslConnection extends AbstractAsyncConnection if (b.hasRemaining()) { _writeBuffers=buffers; - _appWriteFuture.recycle(); + _appWriteFuture=new RunnableIOFuture(_lock); return _appWriteFuture; } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/IOFutureTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/IOFutureTest.java index d3243324239..20e313bc128 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/IOFutureTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/IOFutureTest.java @@ -163,7 +163,7 @@ public class IOFutureTest @Test public void testReady() throws Exception { - final DispatchedIOFuture future = new DispatchedIOFuture(); + DispatchedIOFuture future = new DispatchedIOFuture(); assertFalse(future.isComplete()); assertFalse(future.isReady()); @@ -195,13 +195,14 @@ public class IOFutureTest start=System.currentTimeMillis(); + final DispatchedIOFuture f0=future; new Thread() { @Override public void run() { try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){} - future.ready(); + f0.ready(); } }.start(); @@ -215,17 +216,20 @@ public class IOFutureTest assertEquals((Throwable)null,fail.get()); ready.set(false); - future.recycle(); + + + future = new DispatchedIOFuture(); assertFalse(future.isComplete()); assertFalse(future.isReady()); start=System.currentTimeMillis(); + final DispatchedIOFuture f1=future; new Thread() { @Override public void run() { try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){} - future.ready(); + f1.ready(); } }.start(); @@ -236,14 +240,13 @@ public class IOFutureTest assertTrue(future.isReady()); assertFalse(ready.get()); // no callback set assertEquals((Throwable)null,fail.get()); - } @Test public void testFail() throws Exception { - final DispatchedIOFuture future = new DispatchedIOFuture(); + DispatchedIOFuture future = new DispatchedIOFuture(); final Exception ex=new Exception("failed"); assertFalse(future.isComplete()); @@ -275,13 +278,14 @@ public class IOFutureTest assertEquals((Throwable)null,fail.get()); start=System.currentTimeMillis(); + final DispatchedIOFuture f0=future; new Thread() { @Override public void run() { try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){} - future.fail(ex); + f0.fail(ex); } }.start(); @@ -309,20 +313,22 @@ public class IOFutureTest } assertFalse(ready.get()); assertEquals(ex,fail.get()); - + + future=new DispatchedIOFuture(); ready.set(false); fail.set(null); - future.recycle(); + assertFalse(future.isComplete()); assertFalse(future.isReady()); start=System.currentTimeMillis(); + final DispatchedIOFuture f1=future; new Thread() { @Override public void run() { try{TimeUnit.MILLISECONDS.sleep(50);}catch(Exception e){} - future.fail(ex); + f1.fail(ex); } }.start(); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 2c1281091d3..a08e29d8074 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -187,6 +187,10 @@ public class SelectChannelEndPointTest e2.printStackTrace(); } } + catch(InterruptedException e) + { + // e.printStackTrace(); + } catch(Exception e) { e.printStackTrace(); @@ -204,11 +208,11 @@ public class SelectChannelEndPointTest @Override public void onIdleExpired(long idleForMs) { - System.err.println("IDLE "+idleForMs); + /*System.err.println("IDLE "+idleForMs); System.err.println("last "+(System.currentTimeMillis()-_last)); System.err.println("ENDP "+_endp); System.err.println("tran "+_endp.getTransport()); - System.err.println(); + System.err.println();*/ super.onIdleExpired(idleForMs); } @@ -507,7 +511,7 @@ public class SelectChannelEndPointTest server.configureBlocking(false); _manager.register(server); - int writes = 1000; + int writes = 10000; final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET); byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET);