diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 01a8d7450a..075a5ef8e6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.api.core.management; import javax.management.MBeanOperationInfo; +import java.util.Map; /** * An ActiveMQServerControl is used to manage ActiveMQ Artemis servers. @@ -850,6 +851,17 @@ public interface ActiveMQServerControl { @Operation(desc = "Destroy a bridge", impact = MBeanOperationInfo.ACTION) void destroyBridge(@Parameter(name = "name", desc = "Name of the bridge") String name) throws Exception; + @Operation(desc = "Create a connector service", impact = MBeanOperationInfo.ACTION) + void createConnectorService(@Parameter(name = "name", desc = "Name of the connector service") String name, + @Parameter(name = "factoryClass", desc = "Class name of the connector service factory") String factoryClass, + @Parameter(name = "parameters", desc = "Parameter specific to the connector service") Map parameters) throws Exception; + + @Operation(desc = "Destroy a connector service", impact = MBeanOperationInfo.ACTION) + void destroyConnectorService(@Parameter(name = "name", desc = "Name of the connector service") String name) throws Exception; + + @Attribute(desc = "names of the connector services on this server") + String[] getConnectorServices(); + @Operation(desc = "force the server to stop and notify clients to failover", impact = MBeanOperationInfo.UNKNOWN) void forceFailover() throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 5918ec42a1..fb7deee5d7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl; @@ -68,6 +69,7 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.Queue; @@ -1850,6 +1852,47 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } } + @Override + public void createConnectorService(final String name, final String factoryClass, final Map parameters) throws Exception { + checkStarted(); + + clearIO(); + + try { + final ConnectorServiceConfiguration config = new ConnectorServiceConfiguration().setName(name).setFactoryClassName(factoryClass).setParams(parameters); + ConnectorServiceFactory factory = server.getServiceRegistry().getConnectorService(config); + server.getConnectorsService().createService(config, factory); + } finally { + blockOnIO(); + } + } + + @Override + public void destroyConnectorService(final String name) throws Exception { + checkStarted(); + + clearIO(); + + try { + server.getConnectorsService().destroyService(name); + } finally { + blockOnIO(); + } + } + + @Override + public String[] getConnectorServices() { + checkStarted(); + + clearIO(); + + try { + return server.getConnectorsService().getConnectors().keySet().toArray(new String[0]); + } finally { + blockOnIO(); + } + } + @Override public void forceFailover() throws Exception { checkStarted(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java index 4f2ef9d667..b0fa65804a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java @@ -54,6 +54,14 @@ public interface ServiceRegistry { */ Collection> getConnectorServices(List configs); + /** + * Get connector service for a given configuration. + * + * @param configuration The connector service configuration. + * @return an instance of the connector service factory. + */ + ConnectorServiceFactory getConnectorService(ConnectorServiceConfiguration configuration); + void addIncomingInterceptor(BaseInterceptor interceptor); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ConnectorsService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ConnectorsService.java index 1397070768..897d27cdce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ConnectorsService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ConnectorsService.java @@ -17,10 +17,13 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.Collection; -import java.util.HashSet; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration; @@ -52,7 +55,7 @@ public final class ConnectorsService implements ActiveMQComponent { private final Configuration configuration; - private final Set connectors = new HashSet<>(); + private final Map connectors = new HashMap<>(); private final ServiceRegistry serviceRegistry; @@ -69,51 +72,61 @@ public final class ConnectorsService implements ActiveMQComponent { } @Override - public void start() throws Exception { + public synchronized void start() throws Exception { Collection> connectorServiceFactories = serviceRegistry.getConnectorServices(configuration.getConnectorServiceConfigurations()); for (Pair pair : connectorServiceFactories) { - createService(pair.getB(), pair.getA()); - } - - for (ConnectorService connector : connectors) { try { - connector.start(); + createService(pair.getB(), pair.getA()); } catch (Throwable e) { - ActiveMQServerLogger.LOGGER.errorStartingConnectorService(e, connector.getName()); + ActiveMQServerLogger.LOGGER.errorStartingConnectorService(e, pair.getB().getConnectorName()); } } + isStarted = true; } - public void createService(ConnectorServiceConfiguration info, ConnectorServiceFactory factory) { + public synchronized void createService(ConnectorServiceConfiguration info, ConnectorServiceFactory factory) throws Exception { + if (connectors.containsKey(info.getConnectorName())) { + throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Connector service " + info.getConnectorName() + " already created"); + } + if (info.getParams() != null) { Set invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), info.getParams().keySet()); if (!invalid.isEmpty()) { - ActiveMQServerLogger.LOGGER.connectorKeysInvalid(ConfigurationHelper.stringSetToCommaListString(invalid)); - return; + throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Invalid connector keys for connector service " + info.getConnectorName() + ": " + ConfigurationHelper.stringSetToCommaListString(invalid)); } } Set invalid = ConfigurationHelper.checkKeysExist(factory.getRequiredProperties(), info.getParams().keySet()); if (!invalid.isEmpty()) { - ActiveMQServerLogger.LOGGER.connectorKeysMissing(ConfigurationHelper.stringSetToCommaListString(invalid)); - return; + throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Missing connector keys for connector service " + info.getConnectorName() + ": " + ConfigurationHelper.stringSetToCommaListString(invalid)); } ConnectorService connectorService = factory.createConnectorService(info.getConnectorName(), info.getParams(), storageManager, postOffice, scheduledPool); - connectors.add(connectorService); + connectorService.start(); + + connectors.put(info.getConnectorName(), connectorService); + } + + public synchronized void destroyService(String name) throws Exception { + if (!connectors.containsKey(name)) { + throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Connector service " + name + " does not exist"); + } + ConnectorService connectorService = connectors.get(name); + connectorService.stop(); + connectors.remove(name); } @Override - public void stop() throws Exception { + public synchronized void stop() throws Exception { if (!isStarted) { return; } - for (ConnectorService connector : connectors) { + for (Map.Entry connector : connectors.entrySet()) { try { - connector.stop(); + connector.getValue().stop(); } catch (Throwable e) { - ActiveMQServerLogger.LOGGER.errorStoppingConnectorService(e, connector.getName()); + ActiveMQServerLogger.LOGGER.errorStoppingConnectorService(e, connector.getKey()); } } connectors.clear(); @@ -121,11 +134,11 @@ public final class ConnectorsService implements ActiveMQComponent { } @Override - public boolean isStarted() { + public synchronized boolean isStarted() { return isStarted; } - public Set getConnectors() { - return connectors; + public synchronized Map getConnectors() { + return Collections.unmodifiableMap(connectors); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java index d2d66a4798..4add7b53b4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java @@ -103,12 +103,7 @@ public class ServiceRegistryImpl implements ServiceRegistry { if (configs != null) { for (final ConnectorServiceConfiguration config : configs) { if (connectorServices.get(config.getConnectorName()) == null) { - ConnectorServiceFactory factory = AccessController.doPrivileged(new PrivilegedAction() { - @Override - public ConnectorServiceFactory run() { - return (ConnectorServiceFactory) ClassloadingUtil.newInstanceFromClassLoader(config.getFactoryClassName()); - } - }); + ConnectorServiceFactory factory = loadClass(config.getFactoryClassName()); addConnectorService(factory, config); } } @@ -117,6 +112,11 @@ public class ServiceRegistryImpl implements ServiceRegistry { return connectorServices.values(); } + @Override + public ConnectorServiceFactory getConnectorService(ConnectorServiceConfiguration configuration) { + return loadClass(configuration.getFactoryClassName()); + } + @Override public void addIncomingInterceptor(BaseInterceptor interceptor) { incomingInterceptors.add(interceptor); @@ -184,13 +184,7 @@ public class ServiceRegistryImpl implements ServiceRegistry { AcceptorFactory factory = acceptorFactories.get(name); if (factory == null && className != null) { - factory = AccessController.doPrivileged(new PrivilegedAction() { - @Override - public AcceptorFactory run() { - return (AcceptorFactory) ClassloadingUtil.newInstanceFromClassLoader(className); - } - }); - + factory = loadClass(className); addAcceptorFactory(name, factory); } @@ -202,17 +196,21 @@ public class ServiceRegistryImpl implements ServiceRegistry { acceptorFactories.put(name, acceptorFactory); } + public T loadClass(final String className) { + return AccessController.doPrivileged(new PrivilegedAction() { + @Override + public T run() { + return (T) ClassloadingUtil.newInstanceFromClassLoader(className); + } + }); + } + private Transformer instantiateTransformer(final String className) { Transformer transformer = null; if (className != null) { try { - transformer = AccessController.doPrivileged(new PrivilegedAction() { - @Override - public Transformer run() { - return (Transformer) ClassloadingUtil.newInstanceFromClassLoader(className); - } - }); + transformer = loadClass(className); } catch (Exception e) { throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, className); } @@ -223,13 +221,7 @@ public class ServiceRegistryImpl implements ServiceRegistry { private void instantiateInterceptors(List classNames, List interceptors) { if (classNames != null) { for (final String className : classNames) { - BaseInterceptor interceptor = AccessController.doPrivileged(new PrivilegedAction() { - @Override - public BaseInterceptor run() { - return (BaseInterceptor) ClassloadingUtil.newInstanceFromClassLoader(className); - } - }); - + BaseInterceptor interceptor = loadClass(className); interceptors.add(interceptor); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index d040b8a3ab..7dd2d0b153 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -58,6 +58,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.unit.core.config.impl.fakes.FakeConnectorServiceFactory; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.junit.Assert; @@ -1327,6 +1328,21 @@ public class ActiveMQServerControlTest extends ManagementTestBase { Assert.assertEquals(1, second.getJsonNumber("consumerCount").longValue()); } + @Test + public void testConnectorServiceManagement() throws Exception { + ActiveMQServerControl managementControl = createManagementControl(); + managementControl.createConnectorService("myconn", FakeConnectorServiceFactory.class.getCanonicalName(), new HashMap()); + + Assert.assertEquals(1, server.getConnectorsService().getConnectors().size()); + + managementControl.createConnectorService("myconn2", FakeConnectorServiceFactory.class.getCanonicalName(), new HashMap()); + Assert.assertEquals(2, server.getConnectorsService().getConnectors().size()); + + managementControl.destroyConnectorService("myconn"); + Assert.assertEquals(1, server.getConnectorsService().getConnectors().size()); + Assert.assertEquals("myconn2", managementControl.getConnectorServices()[0]); + } + protected void scaleDown(ScaleDownHandler handler) throws Exception { SimpleString address = new SimpleString("testQueue"); HashMap params = new HashMap<>(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 777ddd2f1c..60187f083c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -20,6 +20,8 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.Parameter; import org.apache.activemq.artemis.api.core.management.ResourceNames; +import java.util.Map; + public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTest { // Constants ----------------------------------------------------- @@ -627,6 +629,21 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes } + @Override + public void createConnectorService(String name, String factoryClass, Map parameters) throws Exception { + proxy.invokeOperation("createConnectorService", name, factoryClass, parameters); + } + + @Override + public void destroyConnectorService(String name) throws Exception { + proxy.invokeOperation("destroyConnectorService", name); + } + + @Override + public String[] getConnectorServices() { + return ActiveMQServerControlUsingCoreTest.toStringArray((Object[]) proxy.retrieveAttributeValue("connectorServices")); + } + @Override public void forceFailover() throws Exception { proxy.invokeOperation("forceFailover"); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConnectorsServiceTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConnectorsServiceTest.java index aee7a71c6c..00f0663895 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConnectorsServiceTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConnectorsServiceTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.unit.core.config.impl; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -65,7 +66,7 @@ public class ConnectorsServiceTest extends ActiveMQTestBase { connectorsService.start(); assertTrue(connectorsService.getConnectors().size() == 1); - assertTrue(connectorsService.getConnectors().contains(connectorServiceFactory.getConnectorService())); + assertTrue(connectorsService.getConnectors().values().contains(connectorServiceFactory.getConnectorService())); } /** @@ -86,4 +87,49 @@ public class ConnectorsServiceTest extends ActiveMQTestBase { assertTrue(connectorsService.getConnectors().size() == 1); } + + /** + * Test that connectors can be created and destroyed directly. + * + * @throws Exception + */ + @Test + public void testConnectorServiceUsedDirectly() throws Exception { + // Initial setup with existing connector service + ConnectorServiceConfiguration connectorServiceConfiguration = new ConnectorServiceConfiguration().setFactoryClassName(FakeConnectorServiceFactory.class.getCanonicalName()).setParams(new HashMap()).setName("myfact"); + configuration.setConnectorServiceConfigurations(Arrays.asList(connectorServiceConfiguration)); + + ConnectorsService connectorsService = new ConnectorsService(configuration, null, null, null, serviceRegistry); + connectorsService.start(); + assertEquals(1, connectorsService.getConnectors().size()); + + + // Add with same name + FakeConnectorServiceFactory connectorServiceFactory = new FakeConnectorServiceFactory(); + try { + connectorsService.createService(connectorServiceConfiguration, connectorServiceFactory); + assertTrue("Expected exception when creating service with same name", false); + } catch (Exception e) { + } + + + // Add unique with same factory + ConnectorServiceConfiguration additionalServiceConfiguration = new ConnectorServiceConfiguration().setFactoryClassName(FakeConnectorServiceFactory.class.getCanonicalName()).setParams(new HashMap()).setName("myfact2"); + connectorsService.createService(additionalServiceConfiguration, connectorServiceFactory); + assertEquals(2, connectorsService.getConnectors().size()); + + // Destroy existing connector services + connectorsService.destroyService("myfact"); + assertEquals(1, connectorsService.getConnectors().size()); + + connectorsService.destroyService("myfact2"); + assertEquals(0, connectorsService.getConnectors().size()); + + // Destroy non-existing connector service + try { + connectorsService.destroyService("myfact"); + assertTrue("Expected exception when destroying non-existing service", false); + } catch (Exception e) { + } + } }