From 2b78e69ac992ac69222a61c1447124b7bccff86b Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Mon, 7 May 2012 13:37:23 +0200 Subject: [PATCH] jetty-9 work in progress --- .../jetty/io/AbstractAsyncConnection.java | 2 +- .../eclipse/jetty/io/AbstractEndPoint.java | 2 +- .../jetty/io/AsyncByteArrayEndPoint.java | 2 +- .../org/eclipse/jetty/io/AsyncEndPoint.java | 27 +++++---- .../org/eclipse/jetty/io/ChannelEndPoint.java | 17 ++---- .../eclipse/jetty/io/DispatchedIOFuture.java | 2 +- .../eclipse/jetty/io/RunnableIOFuture.java | 56 ------------------- .../jetty/io/SelectChannelEndPoint.java | 44 +++++++++------ .../org/eclipse/jetty/io/SslConnection.java | 29 +++------- .../jetty/io/SelectChannelEndPointTest.java | 13 +++-- 10 files changed, 65 insertions(+), 129 deletions(-) delete mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java index 78c1df64d55..33ccc44c817 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractAsyncConnection.java @@ -59,7 +59,7 @@ public abstract class AbstractAsyncConnection implements AsyncConnection @Override public IOFuture scheduleOnReadable() { - IOFuture read=getEndPoint().read(); + IOFuture read=getEndPoint().readable(); read.setCallback(_readCallback); return read; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 946db2bbe50..4050cd4d88c 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -9,7 +9,7 @@ public abstract class AbstractEndPoint implements EndPoint private final long _created=System.currentTimeMillis(); private final InetSocketAddress _local; private final InetSocketAddress _remote; - private int _maxIdleTime; + private volatile int _maxIdleTime; private volatile long _idleTimestamp; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java index aab7e2f8094..5ee4fdd31e2 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java @@ -47,7 +47,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn } @Override - public IOFuture read() throws IllegalStateException + public IOFuture readable() throws IllegalStateException { _lock.lock(); try diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java index 343399722e7..20aacabf6a5 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java @@ -19,44 +19,44 @@ import java.util.concurrent.Future; * ; in a callback mode (like {@link CompletionHandler} mode; or blocking mod;e or a hybrid mode *

Blocking read

*
- * endpoint.read().complete();
+ * endpoint.readable().block();
  * endpoint.fill(buffer);
  * 
*

Polling read

*
- * IOFuture read = endpoint.read();
+ * IOFuture read = endpoint.readable();
  * ...
- * if (read.isReady())
- *   endpoint.fill(buffer);
+ * while (!read.isReady()) 
+ *   Thread.sleep(10);
+ * endpoint.fill(buffer);
  * 
*

Callback read

*
- * endpoint.read().setHandler(new IOCallback()
+ * endpoint.readable().setCallback(new IOCallback()
  * {
  *   public void onReady() { endpoint.fill(buffer); ... }
  *   public void onFail(IOException e) { ... }
- *   public void onTimeout() { ... }
  * }
  * 
* *

Blocking write

*
- * endpoint.write(buffer).complete();
+ * endpoint.write(buffer).block();
  * 
*

Polling write

*
  * IOFuture write = endpoint.write(buffer);
  * ...
- * if (write.isReady())
- *   // do next write
+ * while (!write.isReady())
+ *   Thread.sleep(10);
+ * 
  * 
*

Callback write

*
- * endpoint.write(buffer).setHandler(new IOCallback()
+ * endpoint.write(buffer0,buffer1).setCallback(new IOCallback()
  * {
  *   public void onReady() { ... }
  *   public void onFail(IOException e) { ... }
- *   public void onTimeout() { ... }
  * }
  * 
*

Hybrid write

@@ -70,14 +70,13 @@ import java.util.concurrent.Future; * { * public void onReady() { ... } * public void onFail(IOException e) { ... } - * public void onTimeout() { ... } * }); * ... * * *

Compatibility Notes

* Some Async IO APIs have the concept of setting read interest. With this - * API calling {@link #read()} is equivalent to setting read interest to true + * API calling {@link #readable()} is equivalent to setting read interest to true * and calling {@link IOFuture#cancel()} is equivalent to setting read interest * to false. */ @@ -95,7 +94,7 @@ public interface AsyncEndPoint extends EndPoint * return immediately with data without blocking. * @throws IllegalStateException if another read operation has been scheduled and has not timedout, been cancelled or is ready. */ - IOFuture read() throws IllegalStateException; + IOFuture readable() throws IllegalStateException; /* ------------------------------------------------------------ */ /** Schedule a write operation. diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index c81a32664d9..505cf9ae38f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -69,8 +69,12 @@ public class ChannelEndPoint extends AbstractEndPoint if (_oshut) close(); } - - protected final void shutdownChannelOutput() throws IOException + + /* (non-Javadoc) + * @see org.eclipse.io.EndPoint#close() + */ + @Override + public void shutdownOutput() throws IOException { LOG.debug("oshut {}",this); _oshut = true; @@ -101,15 +105,6 @@ public class ChannelEndPoint extends AbstractEndPoint } } - /* (non-Javadoc) - * @see org.eclipse.io.EndPoint#close() - */ - @Override - public void shutdownOutput() throws IOException - { - shutdownChannelOutput(); - } - @Override public boolean isOutputShutdown() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java b/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java index e8dd4c93d35..86bbc315774 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/DispatchedIOFuture.java @@ -209,7 +209,7 @@ public class DispatchedIOFuture implements IOFuture protected void dispatch(Runnable callback) { - callback.run(); + new Thread(callback).run(); } private void dispatchReady() diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java b/jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java deleted file mode 100644 index dcb7910a398..00000000000 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/RunnableIOFuture.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.eclipse.jetty.io; - -import java.util.concurrent.locks.Lock; - -/* ------------------------------------------------------------ */ -/** A Dispatched IOFuture that retains the Runnable. - * This IOFuture captures a dispatched task as a runnable that can be executed later. - * This is often used when the {@link #ready()} or {@link #fail(Throwable)} method is - * called holding locks that should not be held during the execution of the runnable. - */ -final class RunnableIOFuture extends DispatchedIOFuture -{ - private volatile Runnable _task; - - RunnableIOFuture(boolean ready, Lock lock) - { - super(ready,lock); - } - - RunnableIOFuture(Lock lock) - { - super(lock); - } - - RunnableIOFuture() - { - } - - @Override - protected void dispatch(Runnable callback) - { - if (_task!=null) - throw new IllegalStateException(); - _task=callback; - } - - public Runnable takeTask() - { - Runnable t=_task; - _task=null; - return t; - } - - public void run() - { - Runnable task=takeTask(); - if (task!=null) - task.run(); - } - - public boolean isDispatched() - { - return _task!=null; - } - -} \ No newline at end of file diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index e9b749763dd..4cfe34a304f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -124,8 +124,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo return; } - boolean can_read=(_key.isReadable() && (_key.interestOps()|SelectionKey.OP_READ)!=0); - boolean can_write=(_key.isWritable() && (_key.interestOps()|SelectionKey.OP_WRITE)!=0); + //TODO do we need to test interest here ??? + boolean can_read=(_key.isReadable() && (_key.interestOps()&SelectionKey.OP_READ)==SelectionKey.OP_READ); + boolean can_write=(_key.isWritable() && (_key.interestOps()&SelectionKey.OP_WRITE)==SelectionKey.OP_WRITE); _interestOps=0; if (can_read && !_readFuture.isComplete()) @@ -217,7 +218,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo /* ------------------------------------------------------------ */ @Override - public IOFuture read() throws IllegalStateException + public IOFuture readable() throws IllegalStateException { _lock.lock(); try @@ -321,27 +322,36 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo */ private void updateKey() { - if (!_selected) + if (!_lock.tryLock()) + throw new IllegalStateException(); + try { - int current_ops=-1; - if (getChannel().isOpen()) + if (!_selected) { - try + int current_ops=-1; + if (getChannel().isOpen()) { - current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); + try + { + current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); + } + catch(Exception e) + { + _key=null; + LOG.ignore(e); + } } - catch(Exception e) + if (_interestOps!=current_ops && !_changing) { - _key=null; - LOG.ignore(e); + _changing=true; + _selectSet.addChange(this); + _selectSet.wakeup(); } } - if (_interestOps!=current_ops && !_changing) - { - _changing=true; - _selectSet.addChange(this); - _selectSet.wakeup(); - } + } + finally + { + _lock.unlock(); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java index 428171316cf..07cfc16f7f6 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SslConnection.java @@ -49,8 +49,8 @@ public class SslConnection extends AbstractAsyncConnection private final Lock _lock = new ReentrantLock(); private final IOFuture.Callback _netWriteCallback = new NetWriteCallback(); - private RunnableIOFuture _appReadFuture = new RunnableIOFuture(true,_lock); - private RunnableIOFuture _appWriteFuture = new RunnableIOFuture(true,_lock); + private DispatchedIOFuture _appReadFuture = new DispatchedIOFuture(true,_lock); + private DispatchedIOFuture _appWriteFuture = new DispatchedIOFuture(true,_lock); private final SSLEngine _engine; private final SSLSession _session; @@ -83,10 +83,7 @@ public class SslConnection extends AbstractAsyncConnection { LOG.debug("write FAILED",cause); if (!_appWriteFuture.isComplete()) - { _appWriteFuture.fail(cause); - _appWriteFuture.run(); - } else LOG.warn("write FAILED",cause); } @@ -228,7 +225,6 @@ public class SslConnection extends AbstractAsyncConnection @Override public void onClose() { - // TODO is this right? _appConnection.onClose(); } @@ -256,7 +252,7 @@ public class SslConnection extends AbstractAsyncConnection allocateBuffers(); boolean progress=true; - while(progress && (_appReadFuture==null || !_appReadFuture.isDispatched())) + while(progress) { progress=false; @@ -287,11 +283,6 @@ public class SslConnection extends AbstractAsyncConnection LOG.debug("!onReadable {} {}",this,_netReadFuture); _lock.unlock(); - - // Run any ready callback from _appReadFuture in this thread. - _appReadFuture.run(); - _appWriteFuture.run(); - } } @@ -600,9 +591,6 @@ public class SslConnection extends AbstractAsyncConnection finally { _lock.unlock(); - _appReadFuture.run(); - _appWriteFuture.run(); - } } @@ -764,7 +752,7 @@ public class SslConnection extends AbstractAsyncConnection } @Override - public IOFuture read() throws IllegalStateException + public IOFuture readable() throws IllegalStateException { LOG.debug("{} sslEndp.read()",_session); _lock.lock(); @@ -775,7 +763,7 @@ public class SslConnection extends AbstractAsyncConnection return COMPLETE; // No, we need to schedule a network read - _appReadFuture=new RunnableIOFuture(_lock); + _appReadFuture=new DispatchedIOFuture(_lock); if (_netReadFuture==null) _netReadFuture=scheduleOnReadable(); return _appReadFuture; @@ -803,7 +791,7 @@ public class SslConnection extends AbstractAsyncConnection if (b.hasRemaining()) { _writeBuffers=buffers; - _appWriteFuture=new RunnableIOFuture(_lock); + _appWriteFuture=new DispatchedIOFuture(_lock); return _appWriteFuture; } } @@ -816,8 +804,6 @@ public class SslConnection extends AbstractAsyncConnection finally { _lock.unlock(); - _appReadFuture.run(); - _appWriteFuture.run(); } } @@ -843,9 +829,8 @@ public class SslConnection extends AbstractAsyncConnection finally { _lock.unlock(); - _appReadFuture.run(); - _appWriteFuture.run(); } } } + } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index a08e29d8074..e7f83956b0d 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -150,7 +150,7 @@ public class SelectChannelEndPointTest // If the tests wants to block, then block while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt) { - _endp.read().block(); + _endp.readable().block(); filled=_endp.fill(_in); progress|=filled>0; } @@ -179,6 +179,7 @@ public class SelectChannelEndPointTest // Timeout does not close, so echo exception then shutdown try { + System.err.println(e); _endp.write(BufferUtil.toBuffer("EE: "+BufferUtil.toString(_in))).block(); _endp.shutdownOutput(); } @@ -208,11 +209,13 @@ public class SelectChannelEndPointTest @Override public void onIdleExpired(long idleForMs) { - /*System.err.println("IDLE "+idleForMs); + /* + System.err.println("IDLE "+idleForMs); System.err.println("last "+(System.currentTimeMillis()-_last)); System.err.println("ENDP "+_endp); System.err.println("tran "+_endp.getTransport()); - System.err.println();*/ + System.err.println(); + */ super.onIdleExpired(idleForMs); } @@ -511,7 +514,7 @@ public class SelectChannelEndPointTest server.configureBlocking(false); _manager.register(server); - int writes = 10000; + int writes = 100000; final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET); byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET); @@ -542,7 +545,7 @@ public class SelectChannelEndPointTest for (byte b0 : bytes) { int b = in.read(); - assertTrue(b>0); + Assert.assertThat(b,greaterThan(0)); assertEquals(0xff&b0,b); }