From 929defa465d82b66a6951f8b7f155c4fbbbffe6b Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 30 Jul 2012 16:01:58 +0200 Subject: [PATCH 1/3] Jetty9 - Removed assert with side effect. --- .../jetty/io/SelectChannelEndPointTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index da9f3e51354..35498e73e01 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -592,10 +592,8 @@ public class SelectChannelEndPointTest for (int i = 0; i < _writeCount; i++) { if (i % 1000 == 0) - { - //System.out.println(i); TimeUnit.MILLISECONDS.sleep(200); - } + // Verify echo server to client for (int j = 0; j < data.length(); j++) { @@ -604,18 +602,20 @@ public class SelectChannelEndPointTest assertTrue(b > 0); assertEquals("test-" + i + "/" + j, c, (char)b); } + if (i == 0) _lastEndp.setIdleTimeout(60000); } - client.close(); - int i = 0; - while (server.isOpen()) + for (int i = 0; i < 10; ++i) { - assert (i++ < 10); - Thread.sleep(10); + if (server.isOpen()) + Thread.sleep(10); + else + break; } + Assert.assertFalse(server.isOpen()); } } From aa24693870b87ee19eb1d3f8c467aba23a3e88c1 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 30 Jul 2012 16:44:18 +0200 Subject: [PATCH 2/3] Jetty9 - Cleaned up and made test more reliable. --- .../io/SelectChannelEndPointSslTest.java | 15 ++-- .../jetty/io/SelectChannelEndPointTest.java | 83 ++++++++++--------- 2 files changed, 51 insertions(+), 47 deletions(-) diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java index 71f8c8615da..39030aca8cd 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java @@ -6,6 +6,7 @@ import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; @@ -189,15 +190,15 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest @Test @Override - public void testWriteBlock() throws Exception + public void testWriteBlocked() throws Exception { - super.testWriteBlock(); + super.testWriteBlocked(); } @Override - public void testBlockRead() throws Exception + public void testReadBlocked() throws Exception { - super.testBlockRead(); + super.testReadBlocked(); } @Override @@ -230,8 +231,8 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest assertEquals(c,(char)b); } - // Set Max idle - _lastEndp.setIdleTimeout(500); + assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); + _lastEndPoint.setIdleTimeout(500); // Write 8 and cause block waiting for 10 _blockAt=10; @@ -248,7 +249,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest Thread.sleep(1000); - assertFalse(_lastEndp.isOpen()); + assertFalse(_lastEndPoint.isOpen()); } @Test diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 35498e73e01..67d0f6e6a3d 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -50,7 +50,8 @@ import static org.junit.Assert.assertTrue; public class SelectChannelEndPointTest { - protected volatile AsyncEndPoint _lastEndp; + protected CountDownLatch _lastEndPointLatch; + protected volatile AsyncEndPoint _lastEndPoint; protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor(); @@ -72,7 +73,8 @@ public class SelectChannelEndPointTest protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException { SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, _scheduler, 60000); - _lastEndp = endp; + _lastEndPoint = endp; + _lastEndPointLatch.countDown(); return endp; } }; @@ -85,7 +87,8 @@ public class SelectChannelEndPointTest public void startManager() throws Exception { _writeCount = 1; - _lastEndp = null; + _lastEndPoint = null; + _lastEndPointLatch = new CountDownLatch(1); _connector = ServerSocketChannel.open(); _connector.socket().bind(null); _threadPool.start(); @@ -253,13 +256,14 @@ public class SelectChannelEndPointTest } client.close(); - int i = 0; - while (server.isOpen()) + for (int i = 0; i < 10; ++i) { - Thread.sleep(10); - if (++i == 10) - Assert.fail(); + if (server.isOpen()) + Thread.sleep(10); + else + break; } + assertFalse(server.isOpen()); } @Test @@ -301,7 +305,6 @@ public class SelectChannelEndPointTest client.getOutputStream().write("Goodbye Cruel TLS".getBytes("UTF-8")); client.shutdownOutput(); - // Verify echo server to client for (char c : "Goodbye Cruel TLS".toCharArray()) { @@ -315,7 +318,7 @@ public class SelectChannelEndPointTest } @Test - public void testBlockRead() throws Exception + public void testReadBlocked() throws Exception { Socket client = newClient(); @@ -335,11 +338,8 @@ public class SelectChannelEndPointTest clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.flush(); - long wait = System.currentTimeMillis() + 1000; - while (_lastEndp == null && System.currentTimeMillis() < wait) - Thread.yield(); - - _lastEndp.setIdleTimeout(10 * specifiedTimeout); + Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); + _lastEndPoint.setIdleTimeout(10 * specifiedTimeout); Thread.sleep((11 * specifiedTimeout) / 10); long start = System.currentTimeMillis(); @@ -390,26 +390,29 @@ public class SelectChannelEndPointTest assertEquals(c, (char)b); } - // Set Max idle - _lastEndp.setIdleTimeout(500); + Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); + int idleTimeout = 500; + _lastEndPoint.setIdleTimeout(idleTimeout); // read until idle shutdown received long start = System.currentTimeMillis(); int b = client.getInputStream().read(); assertEquals(-1, b); long idle = System.currentTimeMillis() - start; - assertTrue(idle > 400); - assertTrue(idle < 2000); + assertTrue(idle > idleTimeout / 2); + assertTrue(idle < idleTimeout * 2); // But endpoint may still be open for a little bit. - if (_lastEndp.isOpen()) - Thread.sleep(2000); - - // endpoint is closed. - assertFalse(_lastEndp.isOpen()); + for (int i = 0; i < 10; ++i) + { + if (_lastEndPoint.isOpen()) + Thread.sleep(2 * idleTimeout / 10); + else + break; + } + assertFalse(_lastEndPoint.isOpen()); } - @Test public void testBlockedReadIdle() throws Exception { @@ -434,8 +437,9 @@ public class SelectChannelEndPointTest assertEquals(c, (char)b); } - // Set Max idle - _lastEndp.setIdleTimeout(500); + Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); + int idleTimeout = 500; + _lastEndPoint.setIdleTimeout(idleTimeout); // Write 8 and cause block waiting for 10 _blockAt = 10; @@ -447,8 +451,8 @@ public class SelectChannelEndPointTest int b = client.getInputStream().read(); assertEquals('E', b); long idle = System.currentTimeMillis() - start; - assertTrue(idle > 400); - assertTrue(idle < 2000); + assertTrue(idle > idleTimeout / 2); + assertTrue(idle < idleTimeout * 2); for (char c : "E: 12345678".toCharArray()) { @@ -458,13 +462,13 @@ public class SelectChannelEndPointTest } // But endpoint is still open. - assertTrue(_lastEndp.isOpen()); + assertTrue(_lastEndPoint.isOpen()); // Wait for another idle callback - Thread.sleep(2000); + Thread.sleep(idleTimeout * 2); // endpoint is closed. - assertFalse(_lastEndp.isOpen()); + assertFalse(_lastEndPoint.isOpen()); } @Test @@ -489,9 +493,8 @@ public class SelectChannelEndPointTest out.write(count); out.flush(); - while (_lastEndp == null) - Thread.sleep(10); - _lastEndp.setIdleTimeout(5000); + Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); + _lastEndPoint.setIdleTimeout(5000); new Thread() { @@ -536,8 +539,8 @@ public class SelectChannelEndPointTest System.err.println("latch=" + latch.getCount()); System.err.println("time=" + (now - start)); System.err.println("last=" + (now - last)); - System.err.println("endp=" + _lastEndp); - System.err.println("conn=" + _lastEndp.getAsyncConnection()); + System.err.println("endp=" + _lastEndPoint); + System.err.println("conn=" + _lastEndPoint.getAsyncConnection()); e.printStackTrace(); } @@ -572,7 +575,7 @@ public class SelectChannelEndPointTest } @Test - public void testWriteBlock() throws Exception + public void testWriteBlocked() throws Exception { Socket client = newClient(); @@ -604,7 +607,7 @@ public class SelectChannelEndPointTest } if (i == 0) - _lastEndp.setIdleTimeout(60000); + _lastEndPoint.setIdleTimeout(60000); } client.close(); @@ -616,6 +619,6 @@ public class SelectChannelEndPointTest else break; } - Assert.assertFalse(server.isOpen()); + assertFalse(server.isOpen()); } } From 0c1ca16e864172bd7b06fa883b3fd999d4b2c662 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 30 Jul 2012 19:43:13 +0200 Subject: [PATCH 3/3] Jetty9 - Better handling for I/O interests. When the SelectChannelEndPoint is selected, it should only remove interests for what has been selected, so that other interests are kept unchanged. --- .../jetty/io/SelectChannelEndPoint.java | 36 ++++++++++++++----- .../org/eclipse/jetty/io/SelectorManager.java | 5 ++- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 2bdff94e790..f3731633d95 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -54,8 +54,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, @Override protected boolean needsFill() { - updateKey(SelectionKey.OP_READ, true); - return false; + return SelectChannelEndPoint.this.needsFill(); } }; private final WriteFlusher _writeFlusher = new WriteFlusher(this) @@ -63,7 +62,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, @Override protected void onIncompleteFlushed() { - updateKey(SelectionKey.OP_WRITE, true); + SelectChannelEndPoint.this.onIncompleteFlush(); } }; private final SelectorManager.ManagedSelector _selector; @@ -91,6 +90,17 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, scheduleIdleTimeout(idleTimeout); } + protected boolean needsFill() + { + updateLocalInterests(SelectionKey.OP_READ, true); + return false; + } + + protected void onIncompleteFlush() + { + updateLocalInterests(SelectionKey.OP_WRITE, true); + } + private void scheduleIdleTimeout(long delay) { Future newTimeout = null; @@ -138,7 +148,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, @Override public void onSelected() { - _interestOps = 0; + int oldInterestOps = _key.interestOps(); + int readyOps = _key.readyOps(); + int newInterestOps = oldInterestOps & ~readyOps; + setKeyInterests(oldInterestOps, newInterestOps); + updateLocalInterests(readyOps, false); if (_key.isReadable()) _readInterest.readable(); if (_key.isWritable()) @@ -178,7 +192,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, } } - private void updateKey(int operation, boolean add) + private void updateLocalInterests(int operation, boolean add) { int oldInterestOps = _interestOps; int newInterestOps; @@ -195,12 +209,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, if (newInterestOps != oldInterestOps) { _interestOps = newInterestOps; - LOG.debug("Key update {} -> {} for {}", oldInterestOps, newInterestOps, this); + LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this); _selector.submit(this); } else { - LOG.debug("Ignoring key update {} -> {} for {}", oldInterestOps, newInterestOps, this); + LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this); } } @@ -214,7 +228,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, int oldInterestOps = _key.interestOps(); int newInterestOps = _interestOps; if (newInterestOps != oldInterestOps) - _key.interestOps(newInterestOps); + setKeyInterests(oldInterestOps, newInterestOps); } } catch (CancelledKeyException x) @@ -229,6 +243,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, } } + private void setKeyInterests(int oldInterestOps, int newInterestOps) + { + LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps); + _key.interestOps(newInterestOps); + } + @Override public void close() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index e043779ec3c..d74c589ce18 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -218,13 +218,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * or {@link #accept(SocketChannel)}.

* * @param channel the channel associated to the endpoint - * @param selectSet the selector the channel is registered to + * @param selector the selector the channel is registered to * @param selectionKey the selection key * @return a new endpoint * @throws IOException if the endPoint cannot be created * @see #newConnection(SocketChannel, AsyncEndPoint, Object) */ - protected abstract AsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey selectionKey) throws IOException; + protected abstract AsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException; /** *

Factory method to create {@link AsyncConnection}.

@@ -414,7 +414,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { if (attachment instanceof SelectableAsyncEndPoint) { - key.interestOps(0); ((SelectableAsyncEndPoint)attachment).onSelected(); } else if (key.isConnectable())