fix test case after changes in https://issues.apache.org/jira/browse/AMQ-4237 broker the tests queue MBean lookup

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1427870 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-02 17:17:50 +00:00
parent 650d2b4d52
commit 2d84bbd992
1 changed files with 52 additions and 20 deletions

View File

@ -16,7 +16,23 @@
*/
package org.apache.activemq.usecases;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
@ -33,20 +49,6 @@ import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesWithNoConsumerTest.class);
@ -126,6 +128,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
final long sendCount = 2000;
final Thread producingThread = new Thread("Producing Thread") {
@Override
public void run() {
try {
int i = 0;
@ -151,6 +154,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
producingThread.start();
assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
producingThread.join(TimeUnit.SECONDS.toMillis(3000));
return !producingThread.isAlive();
@ -158,8 +162,14 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
}));
TimeUnit.SECONDS.sleep(5);
for (ObjectName name : broker.getAdminView().getQueues()) {
LOG.info("Broker Queue: {}", name);
}
final DestinationViewMBean view = createView(destination);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
try {
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
@ -202,6 +212,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
final long sendCount = 2000;
final Thread producingThread = new Thread("Producing Thread") {
@Override
public void run() {
try {
int i = 0;
@ -222,6 +233,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
producingThread.start();
assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
producingThread.join(TimeUnit.SECONDS.toMillis(3000));
return !producingThread.isAlive();
@ -230,6 +242,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
final DestinationViewMBean view = createView(destination);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
@ -264,6 +277,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
LOG.info("Got my message: " + message);
@ -281,6 +295,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
connection.start();
final Thread producingThread = new Thread("Producing Thread") {
@Override
public void run() {
try {
int i = 0;
@ -302,6 +317,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS));
assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
producingThread.join(1000);
return !producingThread.isAlive();
@ -311,11 +327,13 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
final DestinationViewMBean view = createView(destination);
assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return queuePrefetch == view.getDispatchCount();
}
}));
assertTrue("Not all sent have expired ", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return sendCount == view.getExpiredCount();
}
@ -329,6 +347,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
waitCondition.countDown();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == view.getInFlightCount();
}
@ -345,6 +364,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
consumer.close();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == view.getInFlightCount();
}
@ -372,6 +392,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if(LOG.isDebugEnabled()) {
@ -394,6 +415,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
connection.start();
final Thread producingThread = new Thread("Producing Thread") {
@Override
public void run() {
try {
int i = 0;
@ -415,6 +437,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS));
assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
producingThread.join(1000);
return !producingThread.isAlive();
@ -424,11 +447,13 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
final DestinationViewMBean view = createView(destination);
assertTrue("Not all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return queuePrefetch == view.getDispatchCount();
}
}));
assertTrue("All have not sent have expired ", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return sendCount == view.getExpiredCount();
}
@ -442,6 +467,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
waitCondition.countDown();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == view.getInFlightCount();
}
@ -467,7 +493,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
}
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
@Override
public boolean isSatisified() throws Exception {
return received.get() >= sendCount;
}
});
@ -475,6 +502,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
consumer.close();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == view.getInFlightCount();
}
@ -530,7 +558,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
LOG.info("Waiting for messages to arrive");
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
@Override
public boolean isSatisified() throws Exception {
return received.get() >= sendCount;
}
}, 1000);
@ -547,14 +576,17 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
String domain = "org.apache.activemq";
ObjectName name;
if (destination.isQueue()) {
name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test");
} else {
name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test");
}
return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
true);
LOG.info("Attempting to find Queue named: {}", name);
return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
}
@Override
protected void tearDown() throws Exception {
connection.stop();
broker.stop();