Merge branch 'jetty-9' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project into jetty-9

This commit is contained in:
Joakim Erdfelt 2012-07-30 12:40:52 -07:00
commit e75e0e9a04
4 changed files with 89 additions and 66 deletions

View File

@ -54,8 +54,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
@Override @Override
protected boolean needsFill() protected boolean needsFill()
{ {
updateKey(SelectionKey.OP_READ, true); return SelectChannelEndPoint.this.needsFill();
return false;
} }
}; };
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
@ -63,7 +62,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
@Override @Override
protected void onIncompleteFlushed() protected void onIncompleteFlushed()
{ {
updateKey(SelectionKey.OP_WRITE, true); SelectChannelEndPoint.this.onIncompleteFlush();
} }
}; };
private final SelectorManager.ManagedSelector _selector; private final SelectorManager.ManagedSelector _selector;
@ -91,6 +90,17 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
scheduleIdleTimeout(idleTimeout); scheduleIdleTimeout(idleTimeout);
} }
protected boolean needsFill()
{
updateLocalInterests(SelectionKey.OP_READ, true);
return false;
}
protected void onIncompleteFlush()
{
updateLocalInterests(SelectionKey.OP_WRITE, true);
}
private void scheduleIdleTimeout(long delay) private void scheduleIdleTimeout(long delay)
{ {
Future<?> newTimeout = null; Future<?> newTimeout = null;
@ -138,7 +148,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
@Override @Override
public void onSelected() public void onSelected()
{ {
_interestOps = 0; int oldInterestOps = _key.interestOps();
int readyOps = _key.readyOps();
int newInterestOps = oldInterestOps & ~readyOps;
setKeyInterests(oldInterestOps, newInterestOps);
updateLocalInterests(readyOps, false);
if (_key.isReadable()) if (_key.isReadable())
_readInterest.readable(); _readInterest.readable();
if (_key.isWritable()) if (_key.isWritable())
@ -178,7 +192,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
} }
} }
private void updateKey(int operation, boolean add) private void updateLocalInterests(int operation, boolean add)
{ {
int oldInterestOps = _interestOps; int oldInterestOps = _interestOps;
int newInterestOps; int newInterestOps;
@ -195,12 +209,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
if (newInterestOps != oldInterestOps) if (newInterestOps != oldInterestOps)
{ {
_interestOps = newInterestOps; _interestOps = newInterestOps;
LOG.debug("Key update {} -> {} for {}", oldInterestOps, newInterestOps, this); LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
_selector.submit(this); _selector.submit(this);
} }
else else
{ {
LOG.debug("Ignoring key update {} -> {} for {}", oldInterestOps, newInterestOps, this); LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
} }
} }
@ -214,7 +228,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
int oldInterestOps = _key.interestOps(); int oldInterestOps = _key.interestOps();
int newInterestOps = _interestOps; int newInterestOps = _interestOps;
if (newInterestOps != oldInterestOps) if (newInterestOps != oldInterestOps)
_key.interestOps(newInterestOps); setKeyInterests(oldInterestOps, newInterestOps);
} }
} }
catch (CancelledKeyException x) catch (CancelledKeyException x)
@ -229,6 +243,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
} }
} }
private void setKeyInterests(int oldInterestOps, int newInterestOps)
{
LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
_key.interestOps(newInterestOps);
}
@Override @Override
public void close() public void close()
{ {

View File

@ -218,13 +218,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* or {@link #accept(SocketChannel)}.</p> * or {@link #accept(SocketChannel)}.</p>
* *
* @param channel the channel associated to the endpoint * @param channel the channel associated to the endpoint
* @param selectSet the selector the channel is registered to * @param selector the selector the channel is registered to
* @param selectionKey the selection key * @param selectionKey the selection key
* @return a new endpoint * @return a new endpoint
* @throws IOException if the endPoint cannot be created * @throws IOException if the endPoint cannot be created
* @see #newConnection(SocketChannel, AsyncEndPoint, Object) * @see #newConnection(SocketChannel, AsyncEndPoint, Object)
*/ */
protected abstract AsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey selectionKey) throws IOException; protected abstract AsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
/** /**
* <p>Factory method to create {@link AsyncConnection}.</p> * <p>Factory method to create {@link AsyncConnection}.</p>
@ -414,7 +414,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{ {
if (attachment instanceof SelectableAsyncEndPoint) if (attachment instanceof SelectableAsyncEndPoint)
{ {
key.interestOps(0);
((SelectableAsyncEndPoint)attachment).onSelected(); ((SelectableAsyncEndPoint)attachment).onSelected();
} }
else if (key.isConnectable()) else if (key.isConnectable())

View File

@ -6,6 +6,7 @@ import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.HandshakeStatus;
@ -189,15 +190,15 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
@Test @Test
@Override @Override
public void testWriteBlock() throws Exception public void testWriteBlocked() throws Exception
{ {
super.testWriteBlock(); super.testWriteBlocked();
} }
@Override @Override
public void testBlockRead() throws Exception public void testReadBlocked() throws Exception
{ {
super.testBlockRead(); super.testReadBlocked();
} }
@Override @Override
@ -230,8 +231,8 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
assertEquals(c,(char)b); assertEquals(c,(char)b);
} }
// Set Max idle assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
_lastEndp.setIdleTimeout(500); _lastEndPoint.setIdleTimeout(500);
// Write 8 and cause block waiting for 10 // Write 8 and cause block waiting for 10
_blockAt=10; _blockAt=10;
@ -248,7 +249,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
Thread.sleep(1000); Thread.sleep(1000);
assertFalse(_lastEndp.isOpen()); assertFalse(_lastEndPoint.isOpen());
} }
@Test @Test

View File

@ -50,7 +50,8 @@ import static org.junit.Assert.assertTrue;
public class SelectChannelEndPointTest public class SelectChannelEndPointTest
{ {
protected volatile AsyncEndPoint _lastEndp; protected CountDownLatch _lastEndPointLatch;
protected volatile AsyncEndPoint _lastEndPoint;
protected ServerSocketChannel _connector; protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor(); protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor();
@ -72,7 +73,8 @@ public class SelectChannelEndPointTest
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{ {
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, _scheduler, 60000); SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, _scheduler, 60000);
_lastEndp = endp; _lastEndPoint = endp;
_lastEndPointLatch.countDown();
return endp; return endp;
} }
}; };
@ -85,7 +87,8 @@ public class SelectChannelEndPointTest
public void startManager() throws Exception public void startManager() throws Exception
{ {
_writeCount = 1; _writeCount = 1;
_lastEndp = null; _lastEndPoint = null;
_lastEndPointLatch = new CountDownLatch(1);
_connector = ServerSocketChannel.open(); _connector = ServerSocketChannel.open();
_connector.socket().bind(null); _connector.socket().bind(null);
_threadPool.start(); _threadPool.start();
@ -253,13 +256,14 @@ public class SelectChannelEndPointTest
} }
client.close(); client.close();
int i = 0; for (int i = 0; i < 10; ++i)
while (server.isOpen())
{ {
if (server.isOpen())
Thread.sleep(10); Thread.sleep(10);
if (++i == 10) else
Assert.fail(); break;
} }
assertFalse(server.isOpen());
} }
@Test @Test
@ -301,7 +305,6 @@ public class SelectChannelEndPointTest
client.getOutputStream().write("Goodbye Cruel TLS".getBytes("UTF-8")); client.getOutputStream().write("Goodbye Cruel TLS".getBytes("UTF-8"));
client.shutdownOutput(); client.shutdownOutput();
// Verify echo server to client // Verify echo server to client
for (char c : "Goodbye Cruel TLS".toCharArray()) for (char c : "Goodbye Cruel TLS".toCharArray())
{ {
@ -315,7 +318,7 @@ public class SelectChannelEndPointTest
} }
@Test @Test
public void testBlockRead() throws Exception public void testReadBlocked() throws Exception
{ {
Socket client = newClient(); Socket client = newClient();
@ -335,11 +338,8 @@ public class SelectChannelEndPointTest
clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.write("12345678".getBytes("UTF-8"));
clientOutputStream.flush(); clientOutputStream.flush();
long wait = System.currentTimeMillis() + 1000; Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
while (_lastEndp == null && System.currentTimeMillis() < wait) _lastEndPoint.setIdleTimeout(10 * specifiedTimeout);
Thread.yield();
_lastEndp.setIdleTimeout(10 * specifiedTimeout);
Thread.sleep((11 * specifiedTimeout) / 10); Thread.sleep((11 * specifiedTimeout) / 10);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
@ -390,25 +390,28 @@ public class SelectChannelEndPointTest
assertEquals(c, (char)b); assertEquals(c, (char)b);
} }
// Set Max idle Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
_lastEndp.setIdleTimeout(500); int idleTimeout = 500;
_lastEndPoint.setIdleTimeout(idleTimeout);
// read until idle shutdown received // read until idle shutdown received
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
int b = client.getInputStream().read(); int b = client.getInputStream().read();
assertEquals(-1, b); assertEquals(-1, b);
long idle = System.currentTimeMillis() - start; long idle = System.currentTimeMillis() - start;
assertTrue(idle > 400); assertTrue(idle > idleTimeout / 2);
assertTrue(idle < 2000); assertTrue(idle < idleTimeout * 2);
// But endpoint may still be open for a little bit. // But endpoint may still be open for a little bit.
if (_lastEndp.isOpen()) for (int i = 0; i < 10; ++i)
Thread.sleep(2000); {
if (_lastEndPoint.isOpen())
// endpoint is closed. Thread.sleep(2 * idleTimeout / 10);
assertFalse(_lastEndp.isOpen()); else
break;
}
assertFalse(_lastEndPoint.isOpen());
} }
@Test @Test
public void testBlockedReadIdle() throws Exception public void testBlockedReadIdle() throws Exception
@ -434,8 +437,9 @@ public class SelectChannelEndPointTest
assertEquals(c, (char)b); assertEquals(c, (char)b);
} }
// Set Max idle Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
_lastEndp.setIdleTimeout(500); int idleTimeout = 500;
_lastEndPoint.setIdleTimeout(idleTimeout);
// Write 8 and cause block waiting for 10 // Write 8 and cause block waiting for 10
_blockAt = 10; _blockAt = 10;
@ -447,8 +451,8 @@ public class SelectChannelEndPointTest
int b = client.getInputStream().read(); int b = client.getInputStream().read();
assertEquals('E', b); assertEquals('E', b);
long idle = System.currentTimeMillis() - start; long idle = System.currentTimeMillis() - start;
assertTrue(idle > 400); assertTrue(idle > idleTimeout / 2);
assertTrue(idle < 2000); assertTrue(idle < idleTimeout * 2);
for (char c : "E: 12345678".toCharArray()) for (char c : "E: 12345678".toCharArray())
{ {
@ -458,13 +462,13 @@ public class SelectChannelEndPointTest
} }
// But endpoint is still open. // But endpoint is still open.
assertTrue(_lastEndp.isOpen()); assertTrue(_lastEndPoint.isOpen());
// Wait for another idle callback // Wait for another idle callback
Thread.sleep(2000); Thread.sleep(idleTimeout * 2);
// endpoint is closed. // endpoint is closed.
assertFalse(_lastEndp.isOpen()); assertFalse(_lastEndPoint.isOpen());
} }
@Test @Test
@ -489,9 +493,8 @@ public class SelectChannelEndPointTest
out.write(count); out.write(count);
out.flush(); out.flush();
while (_lastEndp == null) Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
Thread.sleep(10); _lastEndPoint.setIdleTimeout(5000);
_lastEndp.setIdleTimeout(5000);
new Thread() new Thread()
{ {
@ -536,8 +539,8 @@ public class SelectChannelEndPointTest
System.err.println("latch=" + latch.getCount()); System.err.println("latch=" + latch.getCount());
System.err.println("time=" + (now - start)); System.err.println("time=" + (now - start));
System.err.println("last=" + (now - last)); System.err.println("last=" + (now - last));
System.err.println("endp=" + _lastEndp); System.err.println("endp=" + _lastEndPoint);
System.err.println("conn=" + _lastEndp.getAsyncConnection()); System.err.println("conn=" + _lastEndPoint.getAsyncConnection());
e.printStackTrace(); e.printStackTrace();
} }
@ -572,7 +575,7 @@ public class SelectChannelEndPointTest
} }
@Test @Test
public void testWriteBlock() throws Exception public void testWriteBlocked() throws Exception
{ {
Socket client = newClient(); Socket client = newClient();
@ -592,10 +595,8 @@ public class SelectChannelEndPointTest
for (int i = 0; i < _writeCount; i++) for (int i = 0; i < _writeCount; i++)
{ {
if (i % 1000 == 0) if (i % 1000 == 0)
{
//System.out.println(i);
TimeUnit.MILLISECONDS.sleep(200); TimeUnit.MILLISECONDS.sleep(200);
}
// Verify echo server to client // Verify echo server to client
for (int j = 0; j < data.length(); j++) for (int j = 0; j < data.length(); j++)
{ {
@ -604,18 +605,20 @@ public class SelectChannelEndPointTest
assertTrue(b > 0); assertTrue(b > 0);
assertEquals("test-" + i + "/" + j, c, (char)b); assertEquals("test-" + i + "/" + j, c, (char)b);
} }
if (i == 0)
_lastEndp.setIdleTimeout(60000);
}
if (i == 0)
_lastEndPoint.setIdleTimeout(60000);
}
client.close(); client.close();
int i = 0; for (int i = 0; i < 10; ++i)
while (server.isOpen())
{ {
assert (i++ < 10); if (server.isOpen())
Thread.sleep(10); Thread.sleep(10);
else
break;
} }
assertFalse(server.isOpen());
} }
} }