Made the test case more robust.
The call to EndPoint.blockReadable() must be enclosed in a while loop, since the EndPoint may be woken up spuriously or by write readyness, and no further bytes may be read.
This commit is contained in:
parent
7cc526b9dd
commit
34fbbd5e28
|
@ -1,11 +1,9 @@
|
|||
package org.eclipse.jetty.io.nio;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
|
@ -24,10 +22,13 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class SelectChannelEndPointTest
|
||||
{
|
||||
protected ServerSocketChannel _connector;
|
||||
protected ServerSocketChannel __serverSocket;
|
||||
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
|
||||
protected SelectorManager _manager = new SelectorManager()
|
||||
{
|
||||
|
@ -49,7 +50,7 @@ public class SelectChannelEndPointTest
|
|||
|
||||
@Override
|
||||
protected void endPointUpgraded(ConnectedEndPoint endpoint, org.eclipse.jetty.io.Connection oldConnection)
|
||||
{
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -64,12 +65,12 @@ public class SelectChannelEndPointTest
|
|||
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000);
|
||||
endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
|
||||
return endp;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
boolean _echo=true;
|
||||
int _blockAt=0;
|
||||
|
||||
// Must be volatile or the test may fail spuriously
|
||||
private volatile int _blockAt=0;
|
||||
|
||||
@Before
|
||||
public void startManager() throws Exception
|
||||
{
|
||||
|
@ -78,7 +79,7 @@ public class SelectChannelEndPointTest
|
|||
_threadPool.start();
|
||||
_manager.start();
|
||||
}
|
||||
|
||||
|
||||
@After
|
||||
public void stopManager() throws Exception
|
||||
{
|
||||
|
@ -86,22 +87,22 @@ public class SelectChannelEndPointTest
|
|||
_threadPool.stop();
|
||||
_connector.close();
|
||||
}
|
||||
|
||||
|
||||
protected Socket newClient() throws IOException
|
||||
{
|
||||
return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort());
|
||||
}
|
||||
|
||||
|
||||
protected AsyncConnection newConnection(SocketChannel channel, EndPoint endpoint)
|
||||
{
|
||||
return new TestConnection(endpoint);
|
||||
}
|
||||
|
||||
|
||||
public class TestConnection extends AbstractConnection implements AsyncConnection
|
||||
{
|
||||
NIOBuffer _in = new IndirectNIOBuffer(32*1024);
|
||||
NIOBuffer _out = new IndirectNIOBuffer(32*1024);
|
||||
|
||||
|
||||
public TestConnection(EndPoint endp)
|
||||
{
|
||||
super(endp);
|
||||
|
@ -119,22 +120,22 @@ public class SelectChannelEndPointTest
|
|||
progress=true;
|
||||
((AsyncEndPoint)_endp).cancelIdle();
|
||||
}
|
||||
|
||||
if (_blockAt>0 && _in.length()>0 && _in.length()<_blockAt)
|
||||
|
||||
while (_blockAt>0 && _in.length()>0 && _in.length()<_blockAt)
|
||||
{
|
||||
_endp.blockReadable(10000);
|
||||
if (_in.space()>0 && _endp.fill(_in)>0)
|
||||
progress=true;
|
||||
}
|
||||
|
||||
if (_echo && _in.hasContent() && _in.skip(_out.put(_in))>0)
|
||||
|
||||
if (_in.hasContent() && _in.skip(_out.put(_in))>0)
|
||||
progress=true;
|
||||
|
||||
|
||||
if (_out.hasContent() && _endp.flush(_out)>0)
|
||||
progress=true;
|
||||
|
||||
|
||||
_out.compact();
|
||||
|
||||
|
||||
if (!_out.hasContent() && _endp.isInputShutdown())
|
||||
_endp.shutdownOutput();
|
||||
}
|
||||
|
@ -160,24 +161,24 @@ public class SelectChannelEndPointTest
|
|||
{
|
||||
System.err.println("onInputShutdown");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testEcho() throws Exception
|
||||
{
|
||||
Socket client = newClient();
|
||||
|
||||
|
||||
client.setSoTimeout(500);
|
||||
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
|
||||
_manager.register(server);
|
||||
|
||||
|
||||
// Write client to server
|
||||
client.getOutputStream().write("HelloWorld".getBytes("UTF-8"));
|
||||
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
|
@ -185,7 +186,7 @@ public class SelectChannelEndPointTest
|
|||
assertTrue(b>0);
|
||||
assertEquals(c,(char)b);
|
||||
}
|
||||
|
||||
|
||||
// wait for read timeout
|
||||
long start=System.currentTimeMillis();
|
||||
try
|
||||
|
@ -197,7 +198,7 @@ public class SelectChannelEndPointTest
|
|||
{
|
||||
assertTrue(System.currentTimeMillis()-start>=400);
|
||||
}
|
||||
|
||||
|
||||
// write then shutdown
|
||||
client.getOutputStream().write("Goodbye Cruel TLS".getBytes("UTF-8"));
|
||||
|
||||
|
@ -208,27 +209,27 @@ public class SelectChannelEndPointTest
|
|||
assertTrue(b>0);
|
||||
assertEquals(c,(char)b);
|
||||
}
|
||||
|
||||
|
||||
client.close();
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testShutdown() throws Exception
|
||||
{
|
||||
Socket client = newClient();
|
||||
|
||||
|
||||
client.setSoTimeout(500);
|
||||
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
|
||||
_manager.register(server);
|
||||
|
||||
|
||||
// Write client to server
|
||||
client.getOutputStream().write("HelloWorld".getBytes("UTF-8"));
|
||||
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
|
@ -236,7 +237,7 @@ public class SelectChannelEndPointTest
|
|||
assertTrue(b>0);
|
||||
assertEquals(c,(char)b);
|
||||
}
|
||||
|
||||
|
||||
// wait for read timeout
|
||||
long start=System.currentTimeMillis();
|
||||
try
|
||||
|
@ -248,11 +249,11 @@ public class SelectChannelEndPointTest
|
|||
{
|
||||
assertTrue(System.currentTimeMillis()-start>=400);
|
||||
}
|
||||
|
||||
|
||||
// write then shutdown
|
||||
client.getOutputStream().write("Goodbye Cruel TLS".getBytes("UTF-8"));
|
||||
client.shutdownOutput();
|
||||
|
||||
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "Goodbye Cruel TLS".toCharArray())
|
||||
|
@ -261,54 +262,59 @@ public class SelectChannelEndPointTest
|
|||
assertTrue(b>0);
|
||||
assertEquals(c,(char)b);
|
||||
}
|
||||
|
||||
|
||||
// Read close
|
||||
assertEquals(-1,client.getInputStream().read());
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testBlockIn() throws Exception
|
||||
{
|
||||
Socket client = newClient();
|
||||
|
||||
client.setSoTimeout(200);
|
||||
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
|
||||
_manager.register(server);
|
||||
|
||||
|
||||
OutputStream clientOutputStream = client.getOutputStream();
|
||||
InputStream clientInputStream = client.getInputStream();
|
||||
|
||||
int specifiedTimeout = 200;
|
||||
client.setSoTimeout(specifiedTimeout);
|
||||
|
||||
// Write 8 and cause block for 10
|
||||
_blockAt=10;
|
||||
client.getOutputStream().write("12345678".getBytes("UTF-8"));
|
||||
|
||||
long specifiedTimeout = 200L;
|
||||
Thread.sleep(specifiedTimeout);
|
||||
clientOutputStream.write("12345678".getBytes("UTF-8"));
|
||||
clientOutputStream.flush();
|
||||
|
||||
Thread.sleep(2 * specifiedTimeout);
|
||||
|
||||
// No echo as blocking for 10
|
||||
long start=System.currentTimeMillis();
|
||||
try
|
||||
{
|
||||
int b= client.getInputStream().read();
|
||||
int b = clientInputStream.read();
|
||||
Assert.fail("Should have timed out waiting for a response, but read "+b);
|
||||
}
|
||||
catch(SocketTimeoutException e)
|
||||
{
|
||||
System.err.println("blocked "+(System.currentTimeMillis()-start));
|
||||
Assert.assertThat("Expected timeout", System.currentTimeMillis()-start, greaterThanOrEqualTo(specifiedTimeout));
|
||||
int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue();
|
||||
System.err.println("blocked " + elapsed);
|
||||
Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(specifiedTimeout));
|
||||
}
|
||||
|
||||
// write remaining characters
|
||||
client.getOutputStream().write("90ABCDEF".getBytes("UTF-8"));
|
||||
|
||||
|
||||
clientOutputStream.write("90ABCDEF".getBytes("UTF-8"));
|
||||
clientOutputStream.flush();
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "1234567890ABCDEF".toCharArray())
|
||||
{
|
||||
int b = client.getInputStream().read();
|
||||
int b = clientInputStream.read();
|
||||
assertTrue(b>0);
|
||||
assertEquals(c,(char)b);
|
||||
}
|
||||
|
@ -319,16 +325,16 @@ public class SelectChannelEndPointTest
|
|||
{
|
||||
Socket client = newClient();
|
||||
client.setSoTimeout(10000);
|
||||
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
|
||||
_manager.register(server);
|
||||
int writes = 1000000;
|
||||
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(writes);
|
||||
final InputStream in = new BufferedInputStream(client.getInputStream());
|
||||
|
||||
|
||||
new Thread()
|
||||
{
|
||||
public void run()
|
||||
|
@ -353,14 +359,14 @@ public class SelectChannelEndPointTest
|
|||
}
|
||||
}
|
||||
}.start();
|
||||
|
||||
|
||||
byte[] bytes="HelloWorld".getBytes("UTF-8");
|
||||
|
||||
|
||||
// Write client to server
|
||||
for (int i=0;i<writes;i++)
|
||||
client.getOutputStream().write(bytes);
|
||||
|
||||
|
||||
assertTrue(latch.await(100,TimeUnit.SECONDS));
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue