ARTEMIS-4443 support broker plugin config via broker properties
This commit is contained in:
parent
7c9a15e9b4
commit
cca027b1cc
|
@ -138,6 +138,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
|
|
||||||
public static final JournalType DEFAULT_JOURNAL_TYPE = JournalType.ASYNCIO;
|
public static final JournalType DEFAULT_JOURNAL_TYPE = JournalType.ASYNCIO;
|
||||||
|
|
||||||
|
public static final String DOT_CLASS = ".class";
|
||||||
|
|
||||||
private static final int DEFAULT_JMS_MESSAGE_SIZE = 1864;
|
private static final int DEFAULT_JMS_MESSAGE_SIZE = 1864;
|
||||||
|
|
||||||
private static final int RANGE_SIZE_MIN = 0;
|
private static final int RANGE_SIZE_MIN = 0;
|
||||||
|
@ -811,6 +813,21 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
}
|
}
|
||||||
}, java.util.Set.class);
|
}, java.util.Set.class);
|
||||||
|
|
||||||
|
beanUtils.getConvertUtils().register(new Converter() {
|
||||||
|
@Override
|
||||||
|
public <T> T convert(Class<T> type, Object value) {
|
||||||
|
Map convertedValue = new HashMap();
|
||||||
|
for (String entry : value.toString().split(",")) {
|
||||||
|
String[] kv = entry.split("=");
|
||||||
|
if (2 != kv.length) {
|
||||||
|
throw new IllegalArgumentException("map value " + value + " not in k=v format");
|
||||||
|
}
|
||||||
|
convertedValue.put(kv[0], kv[1]);
|
||||||
|
}
|
||||||
|
return (T) convertedValue;
|
||||||
|
}
|
||||||
|
}, java.util.Map.class);
|
||||||
|
|
||||||
// support 25K or 25m etc like xml config
|
// support 25K or 25m etc like xml config
|
||||||
beanUtils.getConvertUtils().register(new Converter() {
|
beanUtils.getConvertUtils().register(new Converter() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -2139,41 +2156,44 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void registerBrokerPlugin(final ActiveMQServerBasePlugin plugin) {
|
public void registerBrokerPlugin(final ActiveMQServerBasePlugin plugin) {
|
||||||
brokerPlugins.add(plugin);
|
// programmatic call can be a duplicate if used before server start
|
||||||
if (plugin instanceof ActiveMQServerConnectionPlugin) {
|
if (!brokerPlugins.contains(plugin)) {
|
||||||
|
brokerPlugins.add(plugin);
|
||||||
|
}
|
||||||
|
if (plugin instanceof ActiveMQServerConnectionPlugin && !brokerConnectionPlugins.contains(plugin)) {
|
||||||
brokerConnectionPlugins.add((ActiveMQServerConnectionPlugin) plugin);
|
brokerConnectionPlugins.add((ActiveMQServerConnectionPlugin) plugin);
|
||||||
}
|
}
|
||||||
if (plugin instanceof ActiveMQServerSessionPlugin) {
|
if (plugin instanceof ActiveMQServerSessionPlugin && !brokerSessionPlugins.contains(plugin)) {
|
||||||
brokerSessionPlugins.add((ActiveMQServerSessionPlugin) plugin);
|
brokerSessionPlugins.add((ActiveMQServerSessionPlugin) plugin);
|
||||||
}
|
}
|
||||||
if (plugin instanceof ActiveMQServerConsumerPlugin) {
|
if (plugin instanceof ActiveMQServerConsumerPlugin && !brokerConsumerPlugins.contains(plugin)) {
|
||||||
brokerConsumerPlugins.add((ActiveMQServerConsumerPlugin) plugin);
|
brokerConsumerPlugins.add((ActiveMQServerConsumerPlugin) plugin);
|
||||||
}
|
}
|
||||||
if (plugin instanceof ActiveMQServerAddressPlugin) {
|
if (plugin instanceof ActiveMQServerAddressPlugin && !brokerAddressPlugins.contains(plugin)) {
|
||||||
brokerAddressPlugins.add((ActiveMQServerAddressPlugin) plugin);
|
brokerAddressPlugins.add((ActiveMQServerAddressPlugin) plugin);
|
||||||
}
|
}
|
||||||
if (plugin instanceof ActiveMQServerQueuePlugin) {
|
if (plugin instanceof ActiveMQServerQueuePlugin && !brokerQueuePlugins.contains(plugin)) {
|
||||||
brokerQueuePlugins.add((ActiveMQServerQueuePlugin) plugin);
|
brokerQueuePlugins.add((ActiveMQServerQueuePlugin) plugin);
|
||||||
}
|
}
|
||||||
if (plugin instanceof ActiveMQServerBindingPlugin) {
|
if (plugin instanceof ActiveMQServerBindingPlugin && !brokerBindingPlugins.contains(plugin)) {
|
||||||
brokerBindingPlugins.add((ActiveMQServerBindingPlugin) plugin);
|
brokerBindingPlugins.add((ActiveMQServerBindingPlugin) plugin);
|
||||||
}
|
}
|
||||||
if (plugin instanceof ActiveMQServerMessagePlugin) {
|
if (plugin instanceof ActiveMQServerMessagePlugin && !brokerMessagePlugins.contains(plugin)) {
|
||||||
brokerMessagePlugins.add((ActiveMQServerMessagePlugin) plugin);
|
brokerMessagePlugins.add((ActiveMQServerMessagePlugin) plugin);
|
||||||
}
|
}
|
||||||
if (plugin instanceof ActiveMQServerBridgePlugin) {
|
if (plugin instanceof ActiveMQServerBridgePlugin && !brokerBridgePlugins.contains(plugin)) {
|
||||||
brokerBridgePlugins.add((ActiveMQServerBridgePlugin) plugin);
|
brokerBridgePlugins.add((ActiveMQServerBridgePlugin) plugin);
|
||||||
}
|
}
|
||||||
if (plugin instanceof ActiveMQServerCriticalPlugin) {
|
if (plugin instanceof ActiveMQServerCriticalPlugin && !brokerCriticalPlugins.contains(plugin)) {
|
||||||
brokerCriticalPlugins.add((ActiveMQServerCriticalPlugin) plugin);
|
brokerCriticalPlugins.add((ActiveMQServerCriticalPlugin) plugin);
|
||||||
}
|
}
|
||||||
if (plugin instanceof ActiveMQServerFederationPlugin) {
|
if (plugin instanceof ActiveMQServerFederationPlugin && !brokerFederationPlugins.contains(plugin)) {
|
||||||
brokerFederationPlugins.add((ActiveMQServerFederationPlugin) plugin);
|
brokerFederationPlugins.add((ActiveMQServerFederationPlugin) plugin);
|
||||||
}
|
}
|
||||||
if (plugin instanceof AMQPFederationBrokerPlugin) {
|
if (plugin instanceof AMQPFederationBrokerPlugin && !brokerAMQPFederationPlugins.contains(plugin)) {
|
||||||
brokerAMQPFederationPlugins.add((AMQPFederationBrokerPlugin) plugin);
|
brokerAMQPFederationPlugins.add((AMQPFederationBrokerPlugin) plugin);
|
||||||
}
|
}
|
||||||
if (plugin instanceof ActiveMQServerResourcePlugin) {
|
if (plugin instanceof ActiveMQServerResourcePlugin && !brokerResourcePlugins.contains(plugin)) {
|
||||||
brokerResourcePlugins.add((ActiveMQServerResourcePlugin) plugin);
|
brokerResourcePlugins.add((ActiveMQServerResourcePlugin) plugin);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3341,24 +3361,28 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// we don't know the type, infer from add method add(X x) or add(String key, X x)
|
Object instance = null;
|
||||||
final String addPropertyName = addPropertyNameBuilder.toString();
|
|
||||||
final Method[] methods = hostingBean.getClass().getMethods();
|
|
||||||
final Method candidate = Arrays.stream(methods).filter(method -> method.getName().equals(addPropertyName) &&
|
|
||||||
((method.getParameterCount() == 1) || (method.getParameterCount() == 2
|
|
||||||
// has a String key
|
|
||||||
&& String.class.equals(method.getParameterTypes()[0])
|
|
||||||
// but not initialised from a String form (eg: uri)
|
|
||||||
&& !String.class.equals(method.getParameterTypes()[1]))))
|
|
||||||
.sorted((method1, method2) -> method2.getParameterCount() - method1.getParameterCount()).findFirst().orElse(null);
|
|
||||||
|
|
||||||
if (candidate == null) {
|
|
||||||
throw new IllegalArgumentException("failed to locate add method for collection property " + addPropertyName);
|
|
||||||
}
|
|
||||||
|
|
||||||
// create one and initialise with name
|
|
||||||
try {
|
try {
|
||||||
Object instance = candidate.getParameterTypes()[candidate.getParameterCount() - 1].getDeclaredConstructor().newInstance();
|
if (name.indexOf(DOT_CLASS) > 0) {
|
||||||
|
final String clazzName = name.substring(0, name.length() - DOT_CLASS.length());
|
||||||
|
instance = this.getClass().getClassLoader().loadClass(clazzName).getDeclaredConstructor().newInstance();
|
||||||
|
} else {
|
||||||
|
// we don't know the type, infer from add method add(X x) or add(String key, X x)
|
||||||
|
final String addPropertyName = addPropertyNameBuilder.toString();
|
||||||
|
final Method[] methods = hostingBean.getClass().getMethods();
|
||||||
|
final Method candidate = Arrays.stream(methods).filter(method -> method.getName().equals(addPropertyName) && ((method.getParameterCount() == 1) || (method.getParameterCount() == 2
|
||||||
|
// has a String key
|
||||||
|
&& String.class.equals(method.getParameterTypes()[0])
|
||||||
|
// but not initialised from a String form (eg: uri)
|
||||||
|
&& !String.class.equals(method.getParameterTypes()[1])))).sorted((method1, method2) -> method2.getParameterCount() - method1.getParameterCount()).findFirst().orElse(null);
|
||||||
|
|
||||||
|
if (candidate == null) {
|
||||||
|
throw new IllegalArgumentException("failed to locate add method for collection property " + addPropertyName);
|
||||||
|
}
|
||||||
|
|
||||||
|
instance = candidate.getParameterTypes()[candidate.getParameterCount() - 1].getDeclaredConstructor().newInstance();
|
||||||
|
}
|
||||||
|
// initialise with name
|
||||||
|
|
||||||
try {
|
try {
|
||||||
beanUtilsBean.setProperty(instance, "name", name);
|
beanUtilsBean.setProperty(instance, "name", name);
|
||||||
|
@ -3374,7 +3398,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Failed to add entry for {} with method: {}", name, candidate, e);
|
logger.debug("Failed to add entry for {} to collection: {}", name, hostingBean, e);
|
||||||
}
|
}
|
||||||
throw new IllegalArgumentException("failed to add entry for collection key " + name, e);
|
throw new IllegalArgumentException("failed to add entry for collection key " + name, e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3364,7 +3364,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasBrokerPlugins()) {
|
if (hasBrokerPlugins()) {
|
||||||
callBrokerPlugins(plugin -> plugin.registered(this));
|
registerBrokerPlugins(getBrokerPlugins());
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -19,12 +19,15 @@ package org.apache.activemq.artemis.core.server.plugin;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public interface ActiveMQServerBasePlugin {
|
public interface ActiveMQServerBasePlugin {
|
||||||
|
|
||||||
|
default void setInit(Map<String, String> props) {
|
||||||
|
init(props);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* used to pass configured properties to Plugin
|
* used to pass configured properties to Plugin
|
||||||
*
|
*
|
||||||
|
|
|
@ -543,6 +543,8 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
conf.registerBrokerPlugin(new LoggingActiveMQServerPlugin());
|
conf.registerBrokerPlugin(new LoggingActiveMQServerPlugin());
|
||||||
Assert.assertEquals("ensure one plugin registered", 1, conf.getBrokerPlugins().size());
|
Assert.assertEquals("ensure one plugin registered", 1, conf.getBrokerPlugins().size());
|
||||||
|
Assert.assertEquals("ensure one connection plugin registered", 1, conf.getBrokerConnectionPlugins().size());
|
||||||
|
|
||||||
|
|
||||||
// This will use serialization to perform a deep copy of the object
|
// This will use serialization to perform a deep copy of the object
|
||||||
Configuration conf2 = conf.copy();
|
Configuration conf2 = conf.copy();
|
||||||
|
@ -2110,6 +2112,42 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
|
||||||
assertTrue(jsonStatus.contains("alder32"));
|
assertTrue(jsonStatus.contains("alder32"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPlugin() throws Exception {
|
||||||
|
|
||||||
|
final ConfigurationImpl configuration = new ConfigurationImpl();
|
||||||
|
|
||||||
|
Properties insertionOrderedProperties = new ConfigurationImpl.InsertionOrderedProperties();
|
||||||
|
|
||||||
|
insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin.class\".init", "LOG_ALL_EVENTS=true,LOG_SESSION_EVENTS=false");
|
||||||
|
|
||||||
|
configuration.parsePrefixedProperties(insertionOrderedProperties, null);
|
||||||
|
|
||||||
|
Assert.assertEquals(1, configuration.getBrokerPlugins().size());
|
||||||
|
Assert.assertTrue(((LoggingActiveMQServerPlugin)(configuration.getBrokerPlugins().get(0))).isLogAll());
|
||||||
|
Assert.assertFalse(((LoggingActiveMQServerPlugin)(configuration.getBrokerPlugins().get(0))).isLogSessionEvents());
|
||||||
|
|
||||||
|
// mimic server initialisePart1
|
||||||
|
configuration.registerBrokerPlugins(configuration.getBrokerPlugins());
|
||||||
|
|
||||||
|
Assert.assertEquals(1, configuration.getBrokerPlugins().size());
|
||||||
|
Assert.assertEquals(1, configuration.getBrokerMessagePlugins().size());
|
||||||
|
Assert.assertEquals(1, configuration.getBrokerConnectionPlugins().size());
|
||||||
|
|
||||||
|
Assert.assertTrue(configuration.getStatus().contains("\"errors\":[]"));
|
||||||
|
|
||||||
|
// verify invalid map errors out
|
||||||
|
insertionOrderedProperties = new ConfigurationImpl.InsertionOrderedProperties();
|
||||||
|
|
||||||
|
// possible to change any attribute, but plugins only registered on start
|
||||||
|
insertionOrderedProperties.put("brokerPlugins.\"org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin.class\".init", "LOG_ALL_EVENTS");
|
||||||
|
|
||||||
|
configuration.parsePrefixedProperties(insertionOrderedProperties, null);
|
||||||
|
|
||||||
|
Assert.assertFalse(configuration.getStatus().contains("\"errors\":[]"));
|
||||||
|
Assert.assertTrue(configuration.getStatus().contains("LOG_ALL_EVENTS"));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* To test ARTEMIS-926
|
* To test ARTEMIS-926
|
||||||
* @throws Throwable
|
* @throws Throwable
|
||||||
|
|
Loading…
Reference in New Issue