add some resilience to slow mbean registration

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@832829 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-11-04 18:15:57 +00:00
parent a455eef8eb
commit 9086693c61
2 changed files with 49 additions and 25 deletions

View File

@ -57,7 +57,7 @@ public class DuplexNetworkMBeanTest extends TestCase {
public void testMbeanPresenceOnNetworkBrokerRestart() throws Exception {
BrokerService broker = createBroker();
broker.start();
assertEquals(1, countMbeans(broker, "Connector", 10000));
assertEquals(1, countMbeans(broker, "Connector", 30000));
assertEquals(0, countMbeans(broker, "Connection"));
BrokerService networkedBroker = null;
for (int i=0; i<numRestarts; i++) {
@ -82,7 +82,7 @@ public class DuplexNetworkMBeanTest extends TestCase {
BrokerService networkedBroker = createNetworkedBroker();
networkedBroker.start();
assertEquals(1, countMbeans(networkedBroker, "Connector", 10000));
assertEquals(1, countMbeans(networkedBroker, "Connector", 30000));
assertEquals(0, countMbeans(networkedBroker, "Connection"));
BrokerService broker = null;
@ -147,6 +147,7 @@ public class DuplexNetworkMBeanTest extends TestCase {
LOG.info(bean.getObjectName());
}
} catch (Exception ignored) {
LOG.warn("getMBeanServer ex: " + ignored);
}
return mbsc;
}

View File

@ -22,6 +22,7 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
@ -34,6 +35,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -77,29 +79,52 @@ public class NetworkBrokerDetachTest extends TestCase {
Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
for(int i=0; i<NUM_CONSUMERS; i++) {
MessageConsumer consumer = consSession.createConsumer(consSession.createQueue(QUEUE_NAME));
consSession.createConsumer(consSession.createQueue(QUEUE_NAME));
}
assertTrue("got expected consumer count from mbean within time limit", Wait.waitFor(new Wait.Condition() {
Thread.sleep(5000);
MBeanServerConnection mbsc = getMBeanServerConnection();
// We should have 1 consumer for the queue on the local broker
Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
assertEquals(1L, ((Long)consumers).longValue());
public boolean isSatisified() throws Exception {
boolean result = false;
MBeanServerConnection mbsc = getMBeanServerConnection();
if (mbsc != null) {
// We should have 1 consumer for the queue on the local broker
Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
if (consumers != null) {
LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
if (1L == ((Long)consumers).longValue()) {
result = true;
}
}
}
return result;
}
}));
LOG.info("Stopping Consumer on the networked broker ...");
// Closing the connection will also close the consumer
consConn.close();
Thread.sleep(5000);
// We should have 0 consumer for the queue on the local broker
consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
assertEquals(0L, ((Long)consumers).longValue());
assertTrue("got expected 0 count from mbean within time limit", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
boolean result = false;
MBeanServerConnection mbsc = getMBeanServerConnection();
if (mbsc != null) {
// We should have 1 consumer for the queue on the local broker
Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
if (consumers != null) {
LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
if (0L == ((Long)consumers).longValue()) {
result = true;
}
}
}
return result;
}
}));
networkedBroker.stop();
networkedBroker.waitUntilStopped();
@ -134,21 +159,19 @@ public class NetworkBrokerDetachTest extends TestCase {
try {
JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
mbsc = jmxc.getMBeanServerConnection();
// // trace all existing MBeans
// Set<?> all = mbsc.queryMBeans(null, null);
// LOG.info("Total MBean count=" + all.size());
// for (Object o : all) {
// ObjectInstance bean = (ObjectInstance)o;
// LOG.info(bean.getObjectName());
// }
} catch (Exception ignored) {
LOG.warn("getMBeanServer ex: " + ignored);
}
return mbsc;
}
private Object getAttribute(MBeanServerConnection mbsc, String type, String pattern, String attrName) throws Exception {
Object obj = mbsc.getAttribute(getObjectName(BROKER_NAME, type, pattern), attrName);
Object obj = null;
try {
obj = mbsc.getAttribute(getObjectName(BROKER_NAME, type, pattern), attrName);
} catch (InstanceNotFoundException ignored) {
LOG.warn("getAttribute ex: " + ignored);
}
return obj;
}