mirror of https://github.com/apache/activemq.git
AMQ-7102 - don't track objectNames that have not been registered due to suppressMBean filter, fix and test
This commit is contained in:
parent
50d1fe9f8e
commit
9cb680c0ba
|
@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import javax.management.MBeanException;
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
import javax.management.ObjectInstance;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.ReflectionException;
|
||||
|
||||
|
@ -52,7 +53,7 @@ public class AsyncAnnotatedMBean extends AnnotatedMBean {
|
|||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public static void registerMBean(ExecutorService executor, long timeout, ManagementContext context, Object object, ObjectName objectName) throws Exception {
|
||||
public static ObjectInstance registerMBean(ExecutorService executor, long timeout, ManagementContext context, Object object, ObjectName objectName) throws Exception {
|
||||
|
||||
if (timeout < 0 && executor != null) {
|
||||
throw new IllegalArgumentException("async timeout cannot be negative.");
|
||||
|
@ -67,15 +68,14 @@ public class AsyncAnnotatedMBean extends AnnotatedMBean {
|
|||
for (Class c : object.getClass().getInterfaces()) {
|
||||
if (mbeanName.equals(c.getName())) {
|
||||
if (timeout == 0) {
|
||||
context.registerMBean(new AnnotatedMBean(object, c, objectName), objectName);
|
||||
return context.registerMBean(new AnnotatedMBean(object, c, objectName), objectName);
|
||||
} else {
|
||||
context.registerMBean(new AsyncAnnotatedMBean(executor, timeout, object, c, objectName), objectName);
|
||||
return context.registerMBean(new AsyncAnnotatedMBean(executor, timeout, object, c, objectName), objectName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
context.registerMBean(object, objectName);
|
||||
return context.registerMBean(object, objectName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -324,8 +324,9 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
}
|
||||
}
|
||||
try {
|
||||
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
|
||||
registeredMBeans.add(key);
|
||||
if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) {
|
||||
registeredMBeans.add(key);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to register MBean {}", key);
|
||||
LOG.debug("Failure reason: ", e);
|
||||
|
@ -380,8 +381,9 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
}
|
||||
|
||||
try {
|
||||
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
|
||||
registeredMBeans.add(key);
|
||||
if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) {
|
||||
registeredMBeans.add(key);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to register MBean {}", key);
|
||||
LOG.debug("Failure reason: ", e);
|
||||
|
@ -444,8 +446,9 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
}
|
||||
|
||||
try {
|
||||
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
|
||||
registeredMBeans.add(key);
|
||||
if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) {
|
||||
registeredMBeans.add(key);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to register MBean {}", key);
|
||||
LOG.debug("Failure reason: ", e);
|
||||
|
@ -520,8 +523,9 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription);
|
||||
|
||||
try {
|
||||
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
|
||||
registeredMBeans.add(objectName);
|
||||
if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) {
|
||||
registeredMBeans.add(objectName);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to register MBean {}", key);
|
||||
LOG.debug("Failure reason: ", e);
|
||||
|
@ -770,8 +774,9 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
view = new AbortSlowConsumerStrategyView(this, strategy);
|
||||
}
|
||||
|
||||
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
|
||||
registeredMBeans.add(objectName);
|
||||
if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) {
|
||||
registeredMBeans.add(objectName);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to register MBean {}", strategy);
|
||||
|
@ -785,8 +790,9 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
|
||||
if (!registeredMBeans.contains(objectName)) {
|
||||
RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
|
||||
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
|
||||
registeredMBeans.add(objectName);
|
||||
if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) {
|
||||
registeredMBeans.add(objectName);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to register prepared transaction MBean {}", transaction);
|
||||
|
@ -837,4 +843,8 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
ObjectName objName = BrokerMBeanSupport.createDestinationName(brokerObjectName.toString(), "Queue", queueName);
|
||||
return queues.get(objName);
|
||||
}
|
||||
|
||||
public Set<ObjectName> getRegisteredMbeans() {
|
||||
return registeredMBeans;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,6 +74,11 @@ public class SelectiveMBeanRegistrationTest {
|
|||
|
||||
session.createConsumer(queue);
|
||||
|
||||
// create a plain topic
|
||||
Destination topic = session.createTopic("ATopic");
|
||||
session.createConsumer(topic);
|
||||
|
||||
|
||||
final ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) brokerService.getBroker().getAdaptor(ManagedRegionBroker.class);
|
||||
|
||||
// mbean exists
|
||||
|
@ -87,6 +92,10 @@ public class SelectiveMBeanRegistrationTest {
|
|||
// but it is not registered
|
||||
assertFalse(mbeanServer.isRegistered(managedRegionBroker.getQueueSubscribers()[0]));
|
||||
|
||||
// and is not tracked
|
||||
assertFalse("not tracked as registered", managedRegionBroker.getRegisteredMbeans().contains(managedRegionBroker.getQueueSubscribers()[0]));
|
||||
|
||||
|
||||
// verify dynamicProducer suppressed
|
||||
session.createProducer(null);
|
||||
|
||||
|
@ -105,9 +114,22 @@ public class SelectiveMBeanRegistrationTest {
|
|||
Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(query, null);
|
||||
assertEquals(0, mbeans.size());
|
||||
|
||||
assertFalse("producer not tracked as registered", managedRegionBroker.getRegisteredMbeans().contains(managedRegionBroker.getDynamicDestinationProducers()[0]));
|
||||
|
||||
|
||||
query = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationName=ActiveMQ.Advisory.*,*");
|
||||
mbeans = mbeanServer.queryMBeans(query, null);
|
||||
assertEquals(0, mbeans.size());
|
||||
|
||||
ObjectName[] topicNames = managedRegionBroker.getTopics();
|
||||
assertTrue("Some topics registered", topicNames.length > 0);
|
||||
for (ObjectName objectName : topicNames) {
|
||||
if (objectName.getKeyProperty("destinationName").contains("Advisory")) {
|
||||
assertFalse("advisory topic not tracked as registered: " + objectName, managedRegionBroker.getRegisteredMbeans().contains(objectName));
|
||||
} else {
|
||||
assertTrue("topic tracked as registered: " + objectName, managedRegionBroker.getRegisteredMbeans().contains(objectName));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue