Remove the dependency on the fixed 61616 port number

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1174734 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-09-23 13:13:33 +00:00
parent d314b7f57d
commit 2db7cbf436
1 changed files with 49 additions and 45 deletions

View File

@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
/**
* This is a test case for the issue reported at:
* https://issues.apache.org/activemq/browse/AMQ-2021
* https://issues.apache.org/activemq/browse/AMQ-2021
* Bug is modification of inflight message properties so the failure can manifest itself in a bunch
* or ways, from message receipt with null properties to marshall errors
*/
@ -53,25 +53,26 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
BrokerService brokerService;
ArrayList<Thread> threads = new ArrayList<Thread>();
Vector<Throwable> exceptions;
AMQ2021Test testCase;
String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616";
String ACTIVEMQ_BROKER_URL = ACTIVEMQ_BROKER_BIND + "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
private int numMessages = 1000;
private int numConsumers = 2;
private int dlqMessages = numMessages/2;
CountDownLatch receivedLatch;
private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
private String CONSUMER_BROKER_URL = "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
private String PRODUCER_BROKER_URL;
private final int numMessages = 1000;
private final int numConsumers = 2;
private final int dlqMessages = numMessages/2;
private CountDownLatch receivedLatch;
private ActiveMQTopic destination;
public CountDownLatch started;
private CountDownLatch started;
@Override
protected void setUp() throws Exception {
Thread.setDefaultUncaughtExceptionHandler(this);
testCase = this;
// Start an embedded broker up.
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
@ -79,18 +80,21 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
brokerService.start();
destination = new ActiveMQTopic(getName());
exceptions = new Vector<Throwable>();
receivedLatch =
CONSUMER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString() + CONSUMER_BROKER_URL;
PRODUCER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
receivedLatch =
new CountDownLatch(numConsumers * (numMessages + dlqMessages));
started = new CountDownLatch(1);
}
@Override
protected void tearDown() throws Exception {
for (Thread t : threads) {
t.interrupt();
t.join();
}
}
brokerService.stop();
}
@ -101,9 +105,9 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
threads.add(c1);
c1.start();
}
assertTrue(started.await(10, TimeUnit.SECONDS));
Thread producer = new Thread() {
@Override
public void run() {
@ -115,34 +119,34 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
};
threads.add(producer);
producer.start();
boolean allGood = receivedLatch.await(30, TimeUnit.SECONDS);
boolean allGood = receivedLatch.await(90, TimeUnit.SECONDS);
for (Throwable t: exceptions) {
log.error("failing test with first exception", t);
fail("exception during test : " + t);
}
}
assertTrue("excepted messages received within time limit", allGood);
assertEquals(0, exceptions.size());
for (int i=0; i<numConsumers; i++) {
// last recovery sends message to deq so is not received again
assertEquals(dlqMessages*2, ((ConsumerThread)threads.get(i)).recoveries);
assertEquals(numMessages + dlqMessages, ((ConsumerThread)threads.get(i)).counter);
}
// half of the messages for each consumer should go to the dlq but duplicates will
// be suppressed
consumeFromDLQ(dlqMessages);
}
}
private void consumeFromDLQ( int messageCount) throws Exception {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URL);
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
int count = 0;
@ -158,19 +162,19 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
public void produce(int count) throws Exception {
Connection connection=null;
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(PRODUCER_BROKER_URL);
connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
producer.setTimeToLive(0);
connection.start();
for (int i=0 ; i< count; i++) {
int id = i+1;
TextMessage message = session.createTextMessage(getName()+" Message "+ id);
message.setIntProperty("MsgNumber", id);
producer.send(message);
if (id % 500 == 0) {
log.info("sent " + id + ", ith " + message);
}
@ -187,7 +191,7 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
}
}
}
public class ConsumerThread extends Thread implements MessageListener {
public long counter = 0;
public long recoveries = 0;
@ -199,24 +203,24 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
public void run() {
try {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URL);
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(testCase);
connection.setClientID(getName());
connection.setClientID(getName());
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(destination, getName());
MessageConsumer consumer = session.createDurableSubscriber(destination, getName());
consumer.setMessageListener(this);
connection.start();
started .countDown();
} catch (JMSException exception) {
log.error("unexpected ex in consumer run", exception);
exceptions.add(exception);
}
}
public void onMessage(Message message) {
try {
counter++;
@ -226,8 +230,8 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
recoveries++;
} else {
message.acknowledge();
}
}
if (counter % 200 == 0) {
log.info("recoveries:" + recoveries + ", Received " + counter + ", counter'th " + message);
}
@ -237,7 +241,7 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
exceptions.add(e);
}
}
}
public void onException(JMSException exception) {