mirror of https://github.com/apache/activemq.git
Be more carefull about how long we wait for the test to timeout
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@386092 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7bc618b2ae
commit
3c09f0cba5
|
@ -25,10 +25,8 @@ import javax.jms.Session;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
|
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author rnewson
|
* @author rnewson
|
||||||
|
@ -41,9 +39,9 @@ public final class LargeStreamletTest extends TestCase {
|
||||||
|
|
||||||
private static final int MESSAGE_COUNT = 1024*1024;
|
private static final int MESSAGE_COUNT = 1024*1024;
|
||||||
|
|
||||||
private int totalRead;
|
private AtomicInteger totalRead = new AtomicInteger();
|
||||||
|
|
||||||
private int totalWritten;
|
private AtomicInteger totalWritten = new AtomicInteger();
|
||||||
|
|
||||||
private AtomicBoolean stopThreads = new AtomicBoolean(false);
|
private AtomicBoolean stopThreads = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@ -66,7 +64,7 @@ public final class LargeStreamletTest extends TestCase {
|
||||||
final Thread readerThread = new Thread(new Runnable() {
|
final Thread readerThread = new Thread(new Runnable() {
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
totalRead = 0;
|
totalRead.set(0);
|
||||||
try {
|
try {
|
||||||
final InputStream inputStream = connection
|
final InputStream inputStream = connection
|
||||||
.createInputStream(destination);
|
.createInputStream(destination);
|
||||||
|
@ -75,7 +73,7 @@ public final class LargeStreamletTest extends TestCase {
|
||||||
final byte[] buf = new byte[BUFFER_SIZE];
|
final byte[] buf = new byte[BUFFER_SIZE];
|
||||||
while (!stopThreads.get()
|
while (!stopThreads.get()
|
||||||
&& (read = inputStream.read(buf)) != -1) {
|
&& (read = inputStream.read(buf)) != -1) {
|
||||||
totalRead += read;
|
totalRead.addAndGet(read);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
inputStream.close();
|
inputStream.close();
|
||||||
|
@ -93,7 +91,7 @@ public final class LargeStreamletTest extends TestCase {
|
||||||
final Thread writerThread = new Thread(new Runnable() {
|
final Thread writerThread = new Thread(new Runnable() {
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
totalWritten = 0;
|
totalWritten.set(0);
|
||||||
int count = MESSAGE_COUNT;
|
int count = MESSAGE_COUNT;
|
||||||
try {
|
try {
|
||||||
final OutputStream outputStream = connection
|
final OutputStream outputStream = connection
|
||||||
|
@ -103,7 +101,7 @@ public final class LargeStreamletTest extends TestCase {
|
||||||
new Random().nextBytes(buf);
|
new Random().nextBytes(buf);
|
||||||
while (count > 0 && !stopThreads.get()) {
|
while (count > 0 && !stopThreads.get()) {
|
||||||
outputStream.write(buf);
|
outputStream.write(buf);
|
||||||
totalWritten += buf.length;
|
totalWritten.addAndGet(buf.length);
|
||||||
count--;
|
count--;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -122,8 +120,19 @@ public final class LargeStreamletTest extends TestCase {
|
||||||
readerThread.start();
|
readerThread.start();
|
||||||
writerThread.start();
|
writerThread.start();
|
||||||
|
|
||||||
writerThread.join(60 * 1000);
|
|
||||||
readerThread.join(60 * 1000);
|
// Wait till reader is has finished receiving all the messages or he has stopped
|
||||||
|
// receiving messages.
|
||||||
|
Thread.sleep(1000);
|
||||||
|
int lastRead = totalRead.get();
|
||||||
|
while( readerThread.isAlive() ) {
|
||||||
|
readerThread.join(1000);
|
||||||
|
// No progress?? then stop waiting..
|
||||||
|
if( lastRead == totalRead.get() ) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
lastRead = totalRead.get();
|
||||||
|
}
|
||||||
|
|
||||||
stopThreads.set(true);
|
stopThreads.set(true);
|
||||||
|
|
||||||
|
@ -131,7 +140,7 @@ public final class LargeStreamletTest extends TestCase {
|
||||||
assertTrue("Should not have received a writer exception", writerException == null);
|
assertTrue("Should not have received a writer exception", writerException == null);
|
||||||
|
|
||||||
Assert.assertEquals("Not all messages accounted for",
|
Assert.assertEquals("Not all messages accounted for",
|
||||||
totalWritten, totalRead);
|
totalWritten.get(), totalRead.get());
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
session.close();
|
session.close();
|
||||||
|
|
Loading…
Reference in New Issue