https://issues.apache.org/jira/browse/AMQ-5656 - add support for selective mbean suppression based on objectName attribute match. Avoids mbeanserver contention on systems where producer/consumers/dests are dynamic

This commit is contained in:
gtully 2015-06-04 14:26:51 +01:00
parent 73d2810c60
commit 928e815a02
2 changed files with 209 additions and 2 deletions

View File

@ -22,6 +22,7 @@ import java.rmi.NoSuchObjectException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -93,6 +94,8 @@ public class ManagementContext implements Service {
private final Map<ObjectName, ObjectName> registeredMBeanNames = new ConcurrentHashMap<ObjectName, ObjectName>();
private boolean allowRemoteAddressInMBeanNames = true;
private String brokerName;
private String suppressMBean;
private List<Map.Entry<String,String>> suppressMBeanList;
public ManagementContext() {
this(null);
@ -107,6 +110,8 @@ public class ManagementContext implements Service {
// lets force the MBeanServer to be created if needed
if (started.compareAndSet(false, true)) {
populateMBeanSuppressionMap();
// fallback and use localhost
if (connectorHost == null) {
connectorHost = "localhost";
@ -163,6 +168,31 @@ public class ManagementContext implements Service {
}
}
private void populateMBeanSuppressionMap() {
if (suppressMBean != null) {
suppressMBeanList = new LinkedList<>();
for (String pair : suppressMBean.split(",")) {
final String[] keyValue = pair.split("=");
suppressMBeanList.add(new Map.Entry<String, String>() {
@Override
public String getKey() {
return keyValue[0];
}
@Override
public String getValue() {
return keyValue[1];
}
@Override
public String setValue(String value) {
return null;
}
});
}
}
}
@Override
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
@ -389,8 +419,24 @@ public class ManagementContext implements Service {
}
public ObjectInstance registerMBean(Object bean, ObjectName name) throws Exception{
ObjectInstance result = getMBeanServer().registerMBean(bean, name);
this.registeredMBeanNames.put(name, result.getObjectName());
ObjectInstance result = null;
if (isAllowedToRegister(name)) {
result = getMBeanServer().registerMBean(bean, name);
this.registeredMBeanNames.put(name, result.getObjectName());
}
return result;
}
private boolean isAllowedToRegister(ObjectName name) {
boolean result = true;
if (suppressMBean != null && suppressMBeanList != null) {
for (Map.Entry<String,String> attr : suppressMBeanList) {
if (attr.getValue().equals(name.getKeyProperty(attr.getKey()))) {
result = false;
break;
}
}
}
return result;
}
@ -618,4 +664,19 @@ public class ManagementContext implements Service {
public void setAllowRemoteAddressInMBeanNames(boolean allowRemoteAddressInMBeanNames) {
this.allowRemoteAddressInMBeanNames = allowRemoteAddressInMBeanNames;
}
/**
* Allow selective MBeans registration to be suppressed. Any Mbean ObjectName that matches any
* of the supplied attribute values will not be registered with the MBeanServer.
* eg: "endpoint=dynamicProducer,endpoint=Consumer" will suppress the registration of *all* dynamic producer and consumer mbeans.
*
* @param commaListOfAttributeKeyValuePairs the comma separated list of attribute key=value pairs to match.
*/
public void setSuppressMBean(String commaListOfAttributeKeyValuePairs) {
this.suppressMBean = commaListOfAttributeKeyValuePairs;
}
public String getSuppressMBean() {
return suppressMBean;
}
}

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class SelectiveMBeanRegistrationTest {
private static final Logger LOG = LoggerFactory.getLogger(SelectiveMBeanRegistrationTest.class);
BrokerService brokerService;
protected MBeanServer mbeanServer;
protected String domain = "org.apache.activemq";
protected ConnectionFactory connectionFactory;
protected Connection connection;
protected boolean transacted;
@Before
public void createBroker() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
ManagementContext managementContext = new ManagementContext();
managementContext.setCreateConnector(false);
managementContext.setSuppressMBean("endpoint=dynamicProducer,endpoint=Consumer");
brokerService.setManagementContext(managementContext);
brokerService.start();
connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
mbeanServer = managementContext.getMBeanServer();
}
@Test
public void testSuppression() throws Exception {
connection = connectionFactory.createConnection("admin", "admin");
connection.setClientID("MBeanTest");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("AQueue");
session.createConsumer(queue);
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
final BrokerViewMBean broker = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
// mbean exists
assertTrue("one sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getQueueSubscribers().length == 1;
}
}));
// but it is not registered
assertFalse(mbeanServer.isRegistered(broker.getQueueSubscribers()[0]));
// verify dynamicProducer suppressed
session.createProducer(null);
// mbean exists
assertTrue("one sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getDynamicDestinationProducers().length == 1;
}
}));
// but it is not registered
ObjectName query = new ObjectName(domain + ":type=Broker,brokerName=localhost,endpoint=dynamicProducer,*");
Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(query, null);
assertEquals(0, mbeans.size());
}
@After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
connection = null;
}
if (brokerService != null) {
brokerService.stop();
}
}
protected ObjectName assertRegisteredObjectName(String name) throws Exception {
final ObjectName objectName = new ObjectName(name);
final AtomicBoolean result = new AtomicBoolean(false);
assertTrue("Bean registered: " + objectName, Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
try {
result.set(mbeanServer.isRegistered(objectName));
} catch (Exception ignored) {
LOG.debug(ignored.toString());
}
return result.get();
}
}));
return objectName;
}
}