Fixing testcase failure on blocking read/write during WebSocketClient use

This commit is contained in:
Joakim Erdfelt 2011-11-15 14:47:42 -07:00
parent 67bd6f5868
commit 7fb371a7ad
2 changed files with 40 additions and 38 deletions

View File

@ -41,10 +41,9 @@
<artifactId>jetty-http</artifactId> <artifactId>jetty-http</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>junit</artifactId> <artifactId>jetty-test-helper</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -1,5 +1,7 @@
package org.eclipse.jetty.websocket; package org.eclipse.jetty.websocket;
import static org.hamcrest.Matchers.*;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -518,7 +520,7 @@ public class WebSocketClientTest
Assert.assertTrue(open.get()); Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get()); Assert.assertEquals(0,close.get());
final int messages=20000; final int messages=200000;
final AtomicLong totalB=new AtomicLong(); final AtomicLong totalB=new AtomicLong();
Thread consumer = new Thread() Thread consumer = new Thread()
@ -526,9 +528,10 @@ public class WebSocketClientTest
@Override @Override
public void run() public void run()
{ {
// Thread.sleep is for artificially poor performance reader needed for this testcase.
try try
{ {
Thread.sleep(2000); Thread.sleep(200);
byte[] recv = new byte[32*1024]; byte[] recv = new byte[32*1024];
int len=0; int len=0;
@ -552,26 +555,23 @@ public class WebSocketClientTest
consumer.start(); consumer.start();
// Send lots of messages client to server // Send lots of messages client to server
long max=0;
long start=System.currentTimeMillis(); long start=System.currentTimeMillis();
String mesg="This is a test message to send"; String mesg="This is a test message to send";
for (int i=0;i<messages;i++) for (int i=0;i<messages;i++)
{ {
connection.sendMessage("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"); connection.sendMessage(mesg);
if (i%100==0)
{
long now=System.currentTimeMillis();
long duration=now-start;
start=now;
if (duration>max)
max=duration;
}
} }
// Duration for the write phase
long writeDur = (System.currentTimeMillis() - start);
// wait for consumer to complete // wait for consumer to complete
while (totalB.get()<messages*(mesg.length()+6L)) while (totalB.get()<messages*(mesg.length()+6L))
{
Thread.sleep(10); Thread.sleep(10);
Assert.assertTrue(max>1000); // writing was blocked }
Assert.assertThat("write duration", writeDur, greaterThan(1000L)); // writing was blocked
Assert.assertEquals(messages*(mesg.length()+6L),totalB.get()); Assert.assertEquals(messages*(mesg.length()+6L),totalB.get());
consumer.interrupt(); consumer.interrupt();
@ -640,14 +640,23 @@ public class WebSocketClientTest
{ {
try try
{ {
Thread.sleep(2000); Thread.sleep(200);
while(m.get()<messages) while (m.get() < messages)
{ {
String msg =exchanger.exchange(null); String msg = exchanger.exchange(null);
if ("Hello".equals(msg)) if ("Hello".equals(msg))
{
m.incrementAndGet(); m.incrementAndGet();
}
else else
throw new IllegalStateException("exchanged "+msg); {
throw new IllegalStateException("exchanged " + msg);
}
if (m.get() % 1000 == 0)
{
// Artificially slow reader
Thread.sleep(10);
}
} }
} }
catch(InterruptedException e) catch(InterruptedException e)
@ -662,28 +671,22 @@ public class WebSocketClientTest
}; };
consumer.start(); consumer.start();
long max=0;
long start=System.currentTimeMillis(); long start=System.currentTimeMillis();
for (int i=0;i<messages;i++) for (int i=0;i<messages;i++)
{ {
socket.getOutputStream().write(send,0,send.length); socket.getOutputStream().write(send,0,send.length);
socket.getOutputStream().flush(); socket.getOutputStream().flush();
if (i%100==0)
{
long now=System.currentTimeMillis();
long duration=now-start;
start=now;
if (duration>max)
max=duration;
}
} }
while(consumer.isAlive()) while(consumer.isAlive())
{
Thread.sleep(10); Thread.sleep(10);
}
// Duration of the read operation.
long readDur = (System.currentTimeMillis() - start);
Assert.assertTrue(max>1000); // writing was blocked Assert.assertThat("read duration", readDur, greaterThan(1000L)); // reading was blocked
Assert.assertEquals(m.get(),messages); Assert.assertEquals(m.get(),messages);
// Close with code // Close with code