Make sure there are Topic consumers online before starting to send

otherwise they can miss a message and the test fails when it shouldn't
This commit is contained in:
Timothy Bish 2013-12-16 14:37:06 -05:00
parent 257710ba1a
commit a6d05daba6
1 changed files with 32 additions and 22 deletions

View File

@ -16,11 +16,13 @@
*/
package org.apache.activemq.transport.amqp;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -31,21 +33,22 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQ4920Test extends AmqpTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(AMQ4920Test.class);
private static final Integer ITERATIONS = 1 * 1000;
private static final Integer CONSUMER_COUNT = 4; // At least 2 consumers are required to reproduce the original issue
public static final String TEXT_MESSAGE = "TextMessage: ";
private CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
private final CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
private final CountDownLatch initLatch = new CountDownLatch(CONSUMER_COUNT);
@Override
@Before
public void setUp() throws Exception {
super.setUp();
@ -54,7 +57,7 @@ public class AMQ4920Test extends AmqpTestSupport {
@Test(timeout = 5 * 60 * 1000)
public void testSendWithMultipleConsumers() throws Exception {
ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin");
ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String destinationName = "topic://AMQ4920Test" + System.currentTimeMillis();
@ -63,11 +66,15 @@ public class AMQ4920Test extends AmqpTestSupport {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i=0; i < CONSUMER_COUNT; i++) {
AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(destinationName, port, "Consumer-" + i, latch, ITERATIONS);
AMQ4930ConsumerTask consumerTask =
new AMQ4930ConsumerTask(initLatch, destinationName, port, "Consumer-" + i, latch, ITERATIONS);
executor.submit(consumerTask);
}
connection.start();
// Make sure at least Topic consumers are subscribed before the first send.
initLatch.await();
LOG.debug("At start latch is " + latch.getCount());
sendMessages(connection, destination, ITERATIONS, 10);
LOG.debug("After send latch is " + latch.getCount());
@ -97,16 +104,17 @@ public class AMQ4920Test extends AmqpTestSupport {
}
}
class AMQ4930ConsumerTask implements Callable<Boolean> {
protected static final Logger LOG = LoggerFactory.getLogger(AMQ4930ConsumerTask.class);
private String destinationName;
private String consumerName;
private CountDownLatch messagesReceived;
private int port;
private int expectedMessageCount;
private final String destinationName;
private final String consumerName;
private final CountDownLatch messagesReceived;
private final int port;
private final int expectedMessageCount;
private final CountDownLatch started;
public AMQ4930ConsumerTask (String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) {
public AMQ4930ConsumerTask (CountDownLatch started, String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) {
this.started = started;
this.destinationName = destinationName;
this.port = port;
this.consumerName = consumerName;
@ -119,13 +127,15 @@ class AMQ4930ConsumerTask implements Callable<Boolean> {
LOG.debug(consumerName + " starting");
Connection connection=null;
try {
ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin");
ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin");
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(destinationName);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
started.countDown();
int receivedCount = 0;
while(receivedCount < expectedMessageCount) {
Message message = consumer.receive(5 * 1000);