git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1465814 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-04-08 22:34:04 +00:00
parent 858ab263d4
commit a80c711d78
1 changed files with 19 additions and 9 deletions

View File

@ -81,13 +81,15 @@ public class AMQ4368Test {
abstract class Client implements Runnable {
private final String name;
final AtomicBoolean done = new AtomicBoolean();
CountDownLatch startedLatch;
CountDownLatch doneLatch = new CountDownLatch(1);
Connection connection;
Session session;
final AtomicLong size = new AtomicLong();
Client(String name) {
Client(String name, CountDownLatch startedLatch) {
this.name = name;
this.startedLatch = startedLatch;
}
public void start() {
@ -146,8 +148,8 @@ public class AMQ4368Test {
class ProducingClient extends Client {
ProducingClient(String name) {
super(name);
ProducingClient(String name, CountDownLatch startedLatch) {
super(name, startedLatch);
}
private String createMessage() {
@ -162,6 +164,7 @@ public class AMQ4368Test {
protected void work() throws Exception {
String data = createMessage();
MessageProducer producer = session.createProducer(destination);
startedLatch.countDown();
while (!done.get()) {
producer.send(session.createTextMessage(data));
long i = size.incrementAndGet();
@ -173,14 +176,14 @@ public class AMQ4368Test {
}
class ConsumingClient extends Client {
public ConsumingClient(String name) {
super(name);
public ConsumingClient(String name, CountDownLatch startedLatch) {
super(name, startedLatch);
}
@Override
protected void work() throws Exception {
MessageConsumer consumer = session.createConsumer(destination);
startedLatch.countDown();
while (!done.get()) {
Message msg = consumer.receive(100);
if (msg != null) {
@ -193,16 +196,23 @@ public class AMQ4368Test {
@Test
public void testENTMQ220() throws InterruptedException, JMSException {
LOG.info("Start test.");
CountDownLatch producer1Started = new CountDownLatch(1);
CountDownLatch producer2Started = new CountDownLatch(1);
CountDownLatch listener1Started = new CountDownLatch(1);
ProducingClient producer1 = new ProducingClient("1");
ProducingClient producer2 = new ProducingClient("2");
ConsumingClient listener1 = new ConsumingClient("subscriber-1");
ProducingClient producer1 = new ProducingClient("1", producer1Started);
ProducingClient producer2 = new ProducingClient("2", producer2Started);
ConsumingClient listener1 = new ConsumingClient("subscriber-1", listener1Started);
try {
producer1.start();
producer2.start();
listener1.start();
producer1Started.await(15, TimeUnit.SECONDS);
producer2Started.await(15, TimeUnit.SECONDS);
listener1Started.await(15, TimeUnit.SECONDS);
long lastSize = listener1.size.get();
for (int i = 0; i < 10; i++) {
Thread.sleep(2000);