mirror of https://github.com/apache/activemq.git
fix test cases after changes in https://issues.apache.org/jira/browse/AMQ-4237 broker the tests queue MBean lookup
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1428029 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
982943550f
commit
7a95e1809b
|
@ -17,11 +17,8 @@
|
|||
package org.apache.activemq.network;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeNotNull;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -31,38 +28,35 @@ import javax.jms.Message;
|
|||
import javax.jms.MessageListener;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TopicSubscriber;
|
||||
import javax.management.MBeanServerConnection;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.remote.JMXConnector;
|
||||
import javax.management.remote.JMXConnectorFactory;
|
||||
import javax.management.remote.JMXServiceURL;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQPrefetchPolicy;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.broker.jmx.BrokerView;
|
||||
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class NetworkBrokerDetachTest {
|
||||
|
||||
private final static String BROKER_NAME = "broker";
|
||||
private final static String REM_BROKER_NAME = "networkedBroker";
|
||||
private final static String DESTINATION_NAME = "testQ";
|
||||
private final static int NUM_CONSUMERS = 1;
|
||||
|
||||
private final static String BROKER_NAME = "broker";
|
||||
private final static String REM_BROKER_NAME = "networkedBroker";
|
||||
private final static String DESTINATION_NAME = "testQ";
|
||||
private final static int NUM_CONSUMERS = 1;
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(NetworkBrokerDetachTest.class);
|
||||
protected final int numRestarts = 3;
|
||||
protected final int networkTTL = 2;
|
||||
protected final boolean dynamicOnly = false;
|
||||
|
||||
|
||||
protected BrokerService broker;
|
||||
protected BrokerService networkedBroker;
|
||||
|
||||
|
@ -86,42 +80,36 @@ public class NetworkBrokerDetachTest {
|
|||
configureNetworkConnector(networkConnector);
|
||||
return broker;
|
||||
}
|
||||
|
||||
|
||||
private void configureNetworkConnector(NetworkConnector networkConnector) {
|
||||
networkConnector.setDuplex(false);
|
||||
networkConnector.setNetworkTTL(networkTTL);
|
||||
networkConnector.setDynamicOnly(dynamicOnly);
|
||||
}
|
||||
|
||||
|
||||
// variants for each store....
|
||||
protected void configureBroker(BrokerService broker) throws Exception {
|
||||
//KahaPersistenceAdapter persistenceAdapter = new KahaPersistenceAdapter();
|
||||
//persistenceAdapter.setDirectory(new File("target/activemq-data/kaha/" + broker.getBrokerName() + "/NetworBrokerDetatchTest"));
|
||||
//broker.setPersistenceAdapter(persistenceAdapter);
|
||||
|
||||
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
|
||||
persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/" + broker.getBrokerName() + "NetworBrokerDetatchTest"));
|
||||
broker.setPersistenceAdapter(persistenceAdapter);
|
||||
|
||||
// default AMQ
|
||||
}
|
||||
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
broker = createBroker();
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
broker.start();
|
||||
|
||||
|
||||
networkedBroker = createNetworkedBroker();
|
||||
networkedBroker.setDeleteAllMessagesOnStartup(true);
|
||||
networkedBroker.start();
|
||||
}
|
||||
|
||||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
networkedBroker.stop();
|
||||
networkedBroker.waitUntilStopped();
|
||||
|
||||
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
}
|
||||
|
@ -129,7 +117,7 @@ public class NetworkBrokerDetachTest {
|
|||
@Test
|
||||
public void testNetworkedBrokerDetach() throws Exception {
|
||||
LOG.info("Creating Consumer on the networked broker ...");
|
||||
// Create a consumer on the networked broker
|
||||
// Create a consumer on the networked broker
|
||||
ConnectionFactory consFactory = createConnectionFactory(networkedBroker);
|
||||
Connection consConn = consFactory.createConnection();
|
||||
Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -137,64 +125,63 @@ public class NetworkBrokerDetachTest {
|
|||
for(int i=0; i<NUM_CONSUMERS; i++) {
|
||||
consSession.createConsumer(destination);
|
||||
}
|
||||
|
||||
assertTrue("got expected consumer count from mbean within time limit",
|
||||
verifyConsumerCount(1, destination, broker));
|
||||
|
||||
|
||||
|
||||
assertTrue("got expected consumer count from mbean within time limit",
|
||||
verifyConsumerCount(1, destination, broker));
|
||||
|
||||
LOG.info("Stopping Consumer on the networked broker ...");
|
||||
// Closing the connection will also close the consumer
|
||||
// Closing the connection will also close the consumer
|
||||
consConn.close();
|
||||
|
||||
|
||||
// We should have 0 consumer for the queue on the local broker
|
||||
assertTrue("got expected 0 count from mbean within time limit", verifyConsumerCount(0, destination, broker));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testNetworkedBrokerDurableSubAfterRestart() throws Exception {
|
||||
|
||||
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
MessageListener counter = new MessageListener() {
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
count.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
LOG.info("Creating durable consumer on each broker ...");
|
||||
ActiveMQTopic destination = registerDurableConsumer(networkedBroker, counter);
|
||||
registerDurableConsumer(broker, counter);
|
||||
|
||||
|
||||
assertTrue("got expected consumer count from local broker mbean within time limit",
|
||||
verifyConsumerCount(2, destination, broker));
|
||||
|
||||
|
||||
assertTrue("got expected consumer count from network broker mbean within time limit",
|
||||
verifyConsumerCount(2, destination, networkedBroker));
|
||||
|
||||
|
||||
sendMessageTo(destination, broker);
|
||||
|
||||
|
||||
assertTrue("Got one message on each", verifyMessageCount(2, count));
|
||||
|
||||
|
||||
LOG.info("Stopping brokerTwo...");
|
||||
networkedBroker.stop();
|
||||
networkedBroker.waitUntilStopped();
|
||||
|
||||
networkedBroker.waitUntilStopped();
|
||||
|
||||
LOG.info("restarting broker Two...");
|
||||
networkedBroker = createNetworkedBroker();
|
||||
networkedBroker.start();
|
||||
|
||||
|
||||
LOG.info("Recreating durable Consumer on the broker after restart...");
|
||||
registerDurableConsumer(networkedBroker, counter);
|
||||
|
||||
|
||||
// give advisories a chance to percolate
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
|
||||
|
||||
sendMessageTo(destination, broker);
|
||||
|
||||
|
||||
// expect similar after restart
|
||||
assertTrue("got expected consumer count from local broker mbean within time limit",
|
||||
verifyConsumerCount(2, destination, broker));
|
||||
|
||||
|
||||
// a durable sub is auto bridged on restart unless dynamicOnly=true
|
||||
assertTrue("got expected consumer count from network broker mbean within time limit",
|
||||
verifyConsumerCount(2, destination, networkedBroker));
|
||||
|
@ -209,9 +196,10 @@ public class NetworkBrokerDetachTest {
|
|||
|
||||
private boolean verifyMessageCount(final int i, final AtomicInteger count) throws Exception {
|
||||
return Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return i == count.get();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -237,10 +225,9 @@ public class NetworkBrokerDetachTest {
|
|||
session.createProducer(destination).send(session.createTextMessage("Hi"));
|
||||
conn.close();
|
||||
}
|
||||
|
||||
|
||||
protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
|
||||
|
||||
String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
|
||||
String url = broker.getTransportConnectors().get(0).getServer().getConnectURI().toString();
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
|
||||
connectionFactory.setOptimizedMessageDispatch(true);
|
||||
connectionFactory.setCopyMessageOnSend(false);
|
||||
|
@ -256,70 +243,61 @@ public class NetworkBrokerDetachTest {
|
|||
connectionFactory.setAlwaysSyncSend(true);
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
// JMX Helper Methods
|
||||
|
||||
// JMX Helper Methods
|
||||
private boolean verifyConsumerCount(final long expectedCount, final ActiveMQDestination destination, final BrokerService broker) throws Exception {
|
||||
return Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
boolean result = false;
|
||||
try {
|
||||
|
||||
ObjectName[] destinations;
|
||||
|
||||
if (destination.isQueue()) {
|
||||
destinations = broker.getAdminView().getQueues();
|
||||
} else {
|
||||
destinations = broker.getAdminView().getTopics();
|
||||
}
|
||||
|
||||
// We should have 1 consumer for the queue on the local broker
|
||||
Object consumers = broker.getManagementContext().getAttribute(getObjectName(broker.getBrokerName(), destination.isQueue() ? "Queue" : "Topic", "Destination=" + destination.getPhysicalName()), "ConsumerCount");
|
||||
if (consumers != null) {
|
||||
LOG.info("Consumers for " + destination.getPhysicalName() + " on " + broker + " : " + consumers);
|
||||
if (expectedCount == ((Long)consumers).longValue()) {
|
||||
result = true;
|
||||
for (ObjectName name : destinations) {
|
||||
DestinationViewMBean view = (DestinationViewMBean)
|
||||
broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
|
||||
|
||||
if (view.getName().equals(destination.getPhysicalName())) {
|
||||
LOG.info("Consumers for " + destination.getPhysicalName() + " on " + broker + " : " + view.getConsumerCount());
|
||||
if (expectedCount == view.getConsumerCount()) {
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception ignoreAndRetry) {
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
private boolean verifyDurableConsumerCount(final long expectedCount, final BrokerService broker) throws Exception {
|
||||
return Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
boolean result = false;
|
||||
MBeanServerConnection mbsc = getMBeanServerConnection();
|
||||
if (mbsc != null) {
|
||||
Set subs = broker.getManagementContext().queryNames(getObjectName(broker.getBrokerName(), "Subscription", "active=false,*"), null);
|
||||
BrokerView view = broker.getAdminView();
|
||||
|
||||
if (view != null) {
|
||||
ObjectName[] subs = broker.getAdminView().getInactiveDurableTopicSubscribers();
|
||||
if (subs != null) {
|
||||
LOG.info("inactive durable subs on " + broker + " : " + subs);
|
||||
if (expectedCount == subs.size()) {
|
||||
if (expectedCount == subs.length) {
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private MBeanServerConnection getMBeanServerConnection() throws MalformedURLException {
|
||||
final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
|
||||
MBeanServerConnection mbsc = null;
|
||||
try {
|
||||
JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
|
||||
mbsc = jmxc.getMBeanServerConnection();
|
||||
} catch (Exception ignored) {
|
||||
LOG.warn("getMBeanServer ex: " + ignored);
|
||||
}
|
||||
// If port 1099 is in use when the Broker starts, starting the jmx
|
||||
// connector will fail. So, if we have no mbsc to query, skip the
|
||||
// test.
|
||||
assumeNotNull(mbsc);
|
||||
return mbsc;
|
||||
}
|
||||
|
||||
|
||||
private ObjectName getObjectName(String brokerName, String type, String pattern) throws Exception {
|
||||
ObjectName beanName = new ObjectName(
|
||||
"org.apache.activemq:BrokerName=" + brokerName + ",Type=" + type +"," + pattern
|
||||
);
|
||||
|
||||
return beanName;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue