ARTEMIS-824: Add management routines for connector services

This commit is contained in:
Ulf Lilleengen 2016-10-25 15:40:37 +02:00
parent 2dfd14421f
commit 1b7033a20e
8 changed files with 196 additions and 49 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.api.core.management; package org.apache.activemq.artemis.api.core.management;
import javax.management.MBeanOperationInfo; import javax.management.MBeanOperationInfo;
import java.util.Map;
/** /**
* An ActiveMQServerControl is used to manage ActiveMQ Artemis servers. * An ActiveMQServerControl is used to manage ActiveMQ Artemis servers.
@ -850,6 +851,17 @@ public interface ActiveMQServerControl {
@Operation(desc = "Destroy a bridge", impact = MBeanOperationInfo.ACTION) @Operation(desc = "Destroy a bridge", impact = MBeanOperationInfo.ACTION)
void destroyBridge(@Parameter(name = "name", desc = "Name of the bridge") String name) throws Exception; void destroyBridge(@Parameter(name = "name", desc = "Name of the bridge") String name) throws Exception;
@Operation(desc = "Create a connector service", impact = MBeanOperationInfo.ACTION)
void createConnectorService(@Parameter(name = "name", desc = "Name of the connector service") String name,
@Parameter(name = "factoryClass", desc = "Class name of the connector service factory") String factoryClass,
@Parameter(name = "parameters", desc = "Parameter specific to the connector service") Map<String, Object> parameters) throws Exception;
@Operation(desc = "Destroy a connector service", impact = MBeanOperationInfo.ACTION)
void destroyConnectorService(@Parameter(name = "name", desc = "Name of the connector service") String name) throws Exception;
@Attribute(desc = "names of the connector services on this server")
String[] getConnectorServices();
@Operation(desc = "force the server to stop and notify clients to failover", impact = MBeanOperationInfo.UNKNOWN) @Operation(desc = "force the server to stop and notify clients to failover", impact = MBeanOperationInfo.UNKNOWN)
void forceFailover() throws Exception; void forceFailover() throws Exception;

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager; import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
@ -68,6 +69,7 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
@ -1850,6 +1852,47 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
} }
} }
@Override
public void createConnectorService(final String name, final String factoryClass, final Map<String, Object> parameters) throws Exception {
checkStarted();
clearIO();
try {
final ConnectorServiceConfiguration config = new ConnectorServiceConfiguration().setName(name).setFactoryClassName(factoryClass).setParams(parameters);
ConnectorServiceFactory factory = server.getServiceRegistry().getConnectorService(config);
server.getConnectorsService().createService(config, factory);
} finally {
blockOnIO();
}
}
@Override
public void destroyConnectorService(final String name) throws Exception {
checkStarted();
clearIO();
try {
server.getConnectorsService().destroyService(name);
} finally {
blockOnIO();
}
}
@Override
public String[] getConnectorServices() {
checkStarted();
clearIO();
try {
return server.getConnectorsService().getConnectors().keySet().toArray(new String[0]);
} finally {
blockOnIO();
}
}
@Override @Override
public void forceFailover() throws Exception { public void forceFailover() throws Exception {
checkStarted(); checkStarted();

View File

@ -54,6 +54,14 @@ public interface ServiceRegistry {
*/ */
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices(List<ConnectorServiceConfiguration> configs); Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices(List<ConnectorServiceConfiguration> configs);
/**
* Get connector service for a given configuration.
*
* @param configuration The connector service configuration.
* @return an instance of the connector service factory.
*/
ConnectorServiceFactory getConnectorService(ConnectorServiceConfiguration configuration);
void addIncomingInterceptor(BaseInterceptor interceptor); void addIncomingInterceptor(BaseInterceptor interceptor);
/** /**

View File

@ -17,10 +17,13 @@
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration; import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
@ -52,7 +55,7 @@ public final class ConnectorsService implements ActiveMQComponent {
private final Configuration configuration; private final Configuration configuration;
private final Set<ConnectorService> connectors = new HashSet<>(); private final Map<String, ConnectorService> connectors = new HashMap<>();
private final ServiceRegistry serviceRegistry; private final ServiceRegistry serviceRegistry;
@ -69,51 +72,61 @@ public final class ConnectorsService implements ActiveMQComponent {
} }
@Override @Override
public void start() throws Exception { public synchronized void start() throws Exception {
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServiceFactories = serviceRegistry.getConnectorServices(configuration.getConnectorServiceConfigurations()); Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServiceFactories = serviceRegistry.getConnectorServices(configuration.getConnectorServiceConfigurations());
for (Pair<ConnectorServiceFactory, ConnectorServiceConfiguration> pair : connectorServiceFactories) { for (Pair<ConnectorServiceFactory, ConnectorServiceConfiguration> pair : connectorServiceFactories) {
createService(pair.getB(), pair.getA());
}
for (ConnectorService connector : connectors) {
try { try {
connector.start(); createService(pair.getB(), pair.getA());
} catch (Throwable e) { } catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorStartingConnectorService(e, connector.getName()); ActiveMQServerLogger.LOGGER.errorStartingConnectorService(e, pair.getB().getConnectorName());
} }
} }
isStarted = true; isStarted = true;
} }
public void createService(ConnectorServiceConfiguration info, ConnectorServiceFactory factory) { public synchronized void createService(ConnectorServiceConfiguration info, ConnectorServiceFactory factory) throws Exception {
if (connectors.containsKey(info.getConnectorName())) {
throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Connector service " + info.getConnectorName() + " already created");
}
if (info.getParams() != null) { if (info.getParams() != null) {
Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), info.getParams().keySet()); Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), info.getParams().keySet());
if (!invalid.isEmpty()) { if (!invalid.isEmpty()) {
ActiveMQServerLogger.LOGGER.connectorKeysInvalid(ConfigurationHelper.stringSetToCommaListString(invalid)); throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Invalid connector keys for connector service " + info.getConnectorName() + ": " + ConfigurationHelper.stringSetToCommaListString(invalid));
return;
} }
} }
Set<String> invalid = ConfigurationHelper.checkKeysExist(factory.getRequiredProperties(), info.getParams().keySet()); Set<String> invalid = ConfigurationHelper.checkKeysExist(factory.getRequiredProperties(), info.getParams().keySet());
if (!invalid.isEmpty()) { if (!invalid.isEmpty()) {
ActiveMQServerLogger.LOGGER.connectorKeysMissing(ConfigurationHelper.stringSetToCommaListString(invalid)); throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Missing connector keys for connector service " + info.getConnectorName() + ": " + ConfigurationHelper.stringSetToCommaListString(invalid));
return;
} }
ConnectorService connectorService = factory.createConnectorService(info.getConnectorName(), info.getParams(), storageManager, postOffice, scheduledPool); ConnectorService connectorService = factory.createConnectorService(info.getConnectorName(), info.getParams(), storageManager, postOffice, scheduledPool);
connectors.add(connectorService); connectorService.start();
connectors.put(info.getConnectorName(), connectorService);
}
public synchronized void destroyService(String name) throws Exception {
if (!connectors.containsKey(name)) {
throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Connector service " + name + " does not exist");
}
ConnectorService connectorService = connectors.get(name);
connectorService.stop();
connectors.remove(name);
} }
@Override @Override
public void stop() throws Exception { public synchronized void stop() throws Exception {
if (!isStarted) { if (!isStarted) {
return; return;
} }
for (ConnectorService connector : connectors) { for (Map.Entry<String, ConnectorService> connector : connectors.entrySet()) {
try { try {
connector.stop(); connector.getValue().stop();
} catch (Throwable e) { } catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorStoppingConnectorService(e, connector.getName()); ActiveMQServerLogger.LOGGER.errorStoppingConnectorService(e, connector.getKey());
} }
} }
connectors.clear(); connectors.clear();
@ -121,11 +134,11 @@ public final class ConnectorsService implements ActiveMQComponent {
} }
@Override @Override
public boolean isStarted() { public synchronized boolean isStarted() {
return isStarted; return isStarted;
} }
public Set<ConnectorService> getConnectors() { public synchronized Map<String, ConnectorService> getConnectors() {
return connectors; return Collections.unmodifiableMap(connectors);
} }
} }

View File

@ -103,12 +103,7 @@ public class ServiceRegistryImpl implements ServiceRegistry {
if (configs != null) { if (configs != null) {
for (final ConnectorServiceConfiguration config : configs) { for (final ConnectorServiceConfiguration config : configs) {
if (connectorServices.get(config.getConnectorName()) == null) { if (connectorServices.get(config.getConnectorName()) == null) {
ConnectorServiceFactory factory = AccessController.doPrivileged(new PrivilegedAction<ConnectorServiceFactory>() { ConnectorServiceFactory factory = loadClass(config.getFactoryClassName());
@Override
public ConnectorServiceFactory run() {
return (ConnectorServiceFactory) ClassloadingUtil.newInstanceFromClassLoader(config.getFactoryClassName());
}
});
addConnectorService(factory, config); addConnectorService(factory, config);
} }
} }
@ -117,6 +112,11 @@ public class ServiceRegistryImpl implements ServiceRegistry {
return connectorServices.values(); return connectorServices.values();
} }
@Override
public ConnectorServiceFactory getConnectorService(ConnectorServiceConfiguration configuration) {
return loadClass(configuration.getFactoryClassName());
}
@Override @Override
public void addIncomingInterceptor(BaseInterceptor interceptor) { public void addIncomingInterceptor(BaseInterceptor interceptor) {
incomingInterceptors.add(interceptor); incomingInterceptors.add(interceptor);
@ -184,13 +184,7 @@ public class ServiceRegistryImpl implements ServiceRegistry {
AcceptorFactory factory = acceptorFactories.get(name); AcceptorFactory factory = acceptorFactories.get(name);
if (factory == null && className != null) { if (factory == null && className != null) {
factory = AccessController.doPrivileged(new PrivilegedAction<AcceptorFactory>() { factory = loadClass(className);
@Override
public AcceptorFactory run() {
return (AcceptorFactory) ClassloadingUtil.newInstanceFromClassLoader(className);
}
});
addAcceptorFactory(name, factory); addAcceptorFactory(name, factory);
} }
@ -202,17 +196,21 @@ public class ServiceRegistryImpl implements ServiceRegistry {
acceptorFactories.put(name, acceptorFactory); acceptorFactories.put(name, acceptorFactory);
} }
public <T> T loadClass(final String className) {
return AccessController.doPrivileged(new PrivilegedAction<T>() {
@Override
public T run() {
return (T) ClassloadingUtil.newInstanceFromClassLoader(className);
}
});
}
private Transformer instantiateTransformer(final String className) { private Transformer instantiateTransformer(final String className) {
Transformer transformer = null; Transformer transformer = null;
if (className != null) { if (className != null) {
try { try {
transformer = AccessController.doPrivileged(new PrivilegedAction<Transformer>() { transformer = loadClass(className);
@Override
public Transformer run() {
return (Transformer) ClassloadingUtil.newInstanceFromClassLoader(className);
}
});
} catch (Exception e) { } catch (Exception e) {
throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, className); throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, className);
} }
@ -223,13 +221,7 @@ public class ServiceRegistryImpl implements ServiceRegistry {
private void instantiateInterceptors(List<String> classNames, List<BaseInterceptor> interceptors) { private void instantiateInterceptors(List<String> classNames, List<BaseInterceptor> interceptors) {
if (classNames != null) { if (classNames != null) {
for (final String className : classNames) { for (final String className : classNames) {
BaseInterceptor interceptor = AccessController.doPrivileged(new PrivilegedAction<BaseInterceptor>() { BaseInterceptor interceptor = loadClass(className);
@Override
public BaseInterceptor run() {
return (BaseInterceptor) ClassloadingUtil.newInstanceFromClassLoader(className);
}
});
interceptors.add(interceptor); interceptors.add(interceptor);
} }
} }

View File

@ -58,6 +58,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.unit.core.config.impl.fakes.FakeConnectorServiceFactory;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert; import org.junit.Assert;
@ -1327,6 +1328,21 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertEquals(1, second.getJsonNumber("consumerCount").longValue()); Assert.assertEquals(1, second.getJsonNumber("consumerCount").longValue());
} }
@Test
public void testConnectorServiceManagement() throws Exception {
ActiveMQServerControl managementControl = createManagementControl();
managementControl.createConnectorService("myconn", FakeConnectorServiceFactory.class.getCanonicalName(), new HashMap<String, Object>());
Assert.assertEquals(1, server.getConnectorsService().getConnectors().size());
managementControl.createConnectorService("myconn2", FakeConnectorServiceFactory.class.getCanonicalName(), new HashMap<String, Object>());
Assert.assertEquals(2, server.getConnectorsService().getConnectors().size());
managementControl.destroyConnectorService("myconn");
Assert.assertEquals(1, server.getConnectorsService().getConnectors().size());
Assert.assertEquals("myconn2", managementControl.getConnectorServices()[0]);
}
protected void scaleDown(ScaleDownHandler handler) throws Exception { protected void scaleDown(ScaleDownHandler handler) throws Exception {
SimpleString address = new SimpleString("testQueue"); SimpleString address = new SimpleString("testQueue");
HashMap<String, Object> params = new HashMap<>(); HashMap<String, Object> params = new HashMap<>();

View File

@ -20,6 +20,8 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.Parameter; import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
import java.util.Map;
public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTest { public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTest {
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------
@ -627,6 +629,21 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
} }
@Override
public void createConnectorService(String name, String factoryClass, Map<String, Object> parameters) throws Exception {
proxy.invokeOperation("createConnectorService", name, factoryClass, parameters);
}
@Override
public void destroyConnectorService(String name) throws Exception {
proxy.invokeOperation("destroyConnectorService", name);
}
@Override
public String[] getConnectorServices() {
return ActiveMQServerControlUsingCoreTest.toStringArray((Object[]) proxy.retrieveAttributeValue("connectorServices"));
}
@Override @Override
public void forceFailover() throws Exception { public void forceFailover() throws Exception {
proxy.invokeOperation("forceFailover"); proxy.invokeOperation("forceFailover");

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.unit.core.config.impl; package org.apache.activemq.artemis.tests.unit.core.config.impl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -65,7 +66,7 @@ public class ConnectorsServiceTest extends ActiveMQTestBase {
connectorsService.start(); connectorsService.start();
assertTrue(connectorsService.getConnectors().size() == 1); assertTrue(connectorsService.getConnectors().size() == 1);
assertTrue(connectorsService.getConnectors().contains(connectorServiceFactory.getConnectorService())); assertTrue(connectorsService.getConnectors().values().contains(connectorServiceFactory.getConnectorService()));
} }
/** /**
@ -86,4 +87,49 @@ public class ConnectorsServiceTest extends ActiveMQTestBase {
assertTrue(connectorsService.getConnectors().size() == 1); assertTrue(connectorsService.getConnectors().size() == 1);
} }
/**
* Test that connectors can be created and destroyed directly.
*
* @throws Exception
*/
@Test
public void testConnectorServiceUsedDirectly() throws Exception {
// Initial setup with existing connector service
ConnectorServiceConfiguration connectorServiceConfiguration = new ConnectorServiceConfiguration().setFactoryClassName(FakeConnectorServiceFactory.class.getCanonicalName()).setParams(new HashMap<String, Object>()).setName("myfact");
configuration.setConnectorServiceConfigurations(Arrays.asList(connectorServiceConfiguration));
ConnectorsService connectorsService = new ConnectorsService(configuration, null, null, null, serviceRegistry);
connectorsService.start();
assertEquals(1, connectorsService.getConnectors().size());
// Add with same name
FakeConnectorServiceFactory connectorServiceFactory = new FakeConnectorServiceFactory();
try {
connectorsService.createService(connectorServiceConfiguration, connectorServiceFactory);
assertTrue("Expected exception when creating service with same name", false);
} catch (Exception e) {
}
// Add unique with same factory
ConnectorServiceConfiguration additionalServiceConfiguration = new ConnectorServiceConfiguration().setFactoryClassName(FakeConnectorServiceFactory.class.getCanonicalName()).setParams(new HashMap<String, Object>()).setName("myfact2");
connectorsService.createService(additionalServiceConfiguration, connectorServiceFactory);
assertEquals(2, connectorsService.getConnectors().size());
// Destroy existing connector services
connectorsService.destroyService("myfact");
assertEquals(1, connectorsService.getConnectors().size());
connectorsService.destroyService("myfact2");
assertEquals(0, connectorsService.getConnectors().size());
// Destroy non-existing connector service
try {
connectorsService.destroyService("myfact");
assertTrue("Expected exception when destroying non-existing service", false);
} catch (Exception e) {
}
}
} }