This closes #867

This commit is contained in:
Clebert Suconic 2016-10-26 13:21:13 -04:00
commit 9e7fe6b011
8 changed files with 196 additions and 49 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.api.core.management;
import javax.management.MBeanOperationInfo;
import java.util.Map;
/**
* An ActiveMQServerControl is used to manage ActiveMQ Artemis servers.
@ -850,6 +851,17 @@ public interface ActiveMQServerControl {
@Operation(desc = "Destroy a bridge", impact = MBeanOperationInfo.ACTION)
void destroyBridge(@Parameter(name = "name", desc = "Name of the bridge") String name) throws Exception;
@Operation(desc = "Create a connector service", impact = MBeanOperationInfo.ACTION)
void createConnectorService(@Parameter(name = "name", desc = "Name of the connector service") String name,
@Parameter(name = "factoryClass", desc = "Class name of the connector service factory") String factoryClass,
@Parameter(name = "parameters", desc = "Parameter specific to the connector service") Map<String, Object> parameters) throws Exception;
@Operation(desc = "Destroy a connector service", impact = MBeanOperationInfo.ACTION)
void destroyConnectorService(@Parameter(name = "name", desc = "Name of the connector service") String name) throws Exception;
@Attribute(desc = "names of the connector services on this server")
String[] getConnectorServices();
@Operation(desc = "force the server to stop and notify clients to failover", impact = MBeanOperationInfo.UNKNOWN)
void forceFailover() throws Exception;

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
@ -68,6 +69,7 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
@ -1850,6 +1852,47 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
@Override
public void createConnectorService(final String name, final String factoryClass, final Map<String, Object> parameters) throws Exception {
checkStarted();
clearIO();
try {
final ConnectorServiceConfiguration config = new ConnectorServiceConfiguration().setName(name).setFactoryClassName(factoryClass).setParams(parameters);
ConnectorServiceFactory factory = server.getServiceRegistry().getConnectorService(config);
server.getConnectorsService().createService(config, factory);
} finally {
blockOnIO();
}
}
@Override
public void destroyConnectorService(final String name) throws Exception {
checkStarted();
clearIO();
try {
server.getConnectorsService().destroyService(name);
} finally {
blockOnIO();
}
}
@Override
public String[] getConnectorServices() {
checkStarted();
clearIO();
try {
return server.getConnectorsService().getConnectors().keySet().toArray(new String[0]);
} finally {
blockOnIO();
}
}
@Override
public void forceFailover() throws Exception {
checkStarted();

View File

@ -54,6 +54,14 @@ public interface ServiceRegistry {
*/
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices(List<ConnectorServiceConfiguration> configs);
/**
* Get connector service for a given configuration.
*
* @param configuration The connector service configuration.
* @return an instance of the connector service factory.
*/
ConnectorServiceFactory getConnectorService(ConnectorServiceConfiguration configuration);
void addIncomingInterceptor(BaseInterceptor interceptor);
/**

View File

@ -17,10 +17,13 @@
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collection;
import java.util.HashSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
@ -52,7 +55,7 @@ public final class ConnectorsService implements ActiveMQComponent {
private final Configuration configuration;
private final Set<ConnectorService> connectors = new HashSet<>();
private final Map<String, ConnectorService> connectors = new HashMap<>();
private final ServiceRegistry serviceRegistry;
@ -69,51 +72,61 @@ public final class ConnectorsService implements ActiveMQComponent {
}
@Override
public void start() throws Exception {
public synchronized void start() throws Exception {
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServiceFactories = serviceRegistry.getConnectorServices(configuration.getConnectorServiceConfigurations());
for (Pair<ConnectorServiceFactory, ConnectorServiceConfiguration> pair : connectorServiceFactories) {
createService(pair.getB(), pair.getA());
}
for (ConnectorService connector : connectors) {
try {
connector.start();
createService(pair.getB(), pair.getA());
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorStartingConnectorService(e, connector.getName());
ActiveMQServerLogger.LOGGER.errorStartingConnectorService(e, pair.getB().getConnectorName());
}
}
isStarted = true;
}
public void createService(ConnectorServiceConfiguration info, ConnectorServiceFactory factory) {
public synchronized void createService(ConnectorServiceConfiguration info, ConnectorServiceFactory factory) throws Exception {
if (connectors.containsKey(info.getConnectorName())) {
throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Connector service " + info.getConnectorName() + " already created");
}
if (info.getParams() != null) {
Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), info.getParams().keySet());
if (!invalid.isEmpty()) {
ActiveMQServerLogger.LOGGER.connectorKeysInvalid(ConfigurationHelper.stringSetToCommaListString(invalid));
return;
throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Invalid connector keys for connector service " + info.getConnectorName() + ": " + ConfigurationHelper.stringSetToCommaListString(invalid));
}
}
Set<String> invalid = ConfigurationHelper.checkKeysExist(factory.getRequiredProperties(), info.getParams().keySet());
if (!invalid.isEmpty()) {
ActiveMQServerLogger.LOGGER.connectorKeysMissing(ConfigurationHelper.stringSetToCommaListString(invalid));
return;
throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Missing connector keys for connector service " + info.getConnectorName() + ": " + ConfigurationHelper.stringSetToCommaListString(invalid));
}
ConnectorService connectorService = factory.createConnectorService(info.getConnectorName(), info.getParams(), storageManager, postOffice, scheduledPool);
connectors.add(connectorService);
connectorService.start();
connectors.put(info.getConnectorName(), connectorService);
}
public synchronized void destroyService(String name) throws Exception {
if (!connectors.containsKey(name)) {
throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Connector service " + name + " does not exist");
}
ConnectorService connectorService = connectors.get(name);
connectorService.stop();
connectors.remove(name);
}
@Override
public void stop() throws Exception {
public synchronized void stop() throws Exception {
if (!isStarted) {
return;
}
for (ConnectorService connector : connectors) {
for (Map.Entry<String, ConnectorService> connector : connectors.entrySet()) {
try {
connector.stop();
connector.getValue().stop();
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorStoppingConnectorService(e, connector.getName());
ActiveMQServerLogger.LOGGER.errorStoppingConnectorService(e, connector.getKey());
}
}
connectors.clear();
@ -121,11 +134,11 @@ public final class ConnectorsService implements ActiveMQComponent {
}
@Override
public boolean isStarted() {
public synchronized boolean isStarted() {
return isStarted;
}
public Set<ConnectorService> getConnectors() {
return connectors;
public synchronized Map<String, ConnectorService> getConnectors() {
return Collections.unmodifiableMap(connectors);
}
}

View File

@ -103,12 +103,7 @@ public class ServiceRegistryImpl implements ServiceRegistry {
if (configs != null) {
for (final ConnectorServiceConfiguration config : configs) {
if (connectorServices.get(config.getConnectorName()) == null) {
ConnectorServiceFactory factory = AccessController.doPrivileged(new PrivilegedAction<ConnectorServiceFactory>() {
@Override
public ConnectorServiceFactory run() {
return (ConnectorServiceFactory) ClassloadingUtil.newInstanceFromClassLoader(config.getFactoryClassName());
}
});
ConnectorServiceFactory factory = loadClass(config.getFactoryClassName());
addConnectorService(factory, config);
}
}
@ -117,6 +112,11 @@ public class ServiceRegistryImpl implements ServiceRegistry {
return connectorServices.values();
}
@Override
public ConnectorServiceFactory getConnectorService(ConnectorServiceConfiguration configuration) {
return loadClass(configuration.getFactoryClassName());
}
@Override
public void addIncomingInterceptor(BaseInterceptor interceptor) {
incomingInterceptors.add(interceptor);
@ -184,13 +184,7 @@ public class ServiceRegistryImpl implements ServiceRegistry {
AcceptorFactory factory = acceptorFactories.get(name);
if (factory == null && className != null) {
factory = AccessController.doPrivileged(new PrivilegedAction<AcceptorFactory>() {
@Override
public AcceptorFactory run() {
return (AcceptorFactory) ClassloadingUtil.newInstanceFromClassLoader(className);
}
});
factory = loadClass(className);
addAcceptorFactory(name, factory);
}
@ -202,17 +196,21 @@ public class ServiceRegistryImpl implements ServiceRegistry {
acceptorFactories.put(name, acceptorFactory);
}
public <T> T loadClass(final String className) {
return AccessController.doPrivileged(new PrivilegedAction<T>() {
@Override
public T run() {
return (T) ClassloadingUtil.newInstanceFromClassLoader(className);
}
});
}
private Transformer instantiateTransformer(final String className) {
Transformer transformer = null;
if (className != null) {
try {
transformer = AccessController.doPrivileged(new PrivilegedAction<Transformer>() {
@Override
public Transformer run() {
return (Transformer) ClassloadingUtil.newInstanceFromClassLoader(className);
}
});
transformer = loadClass(className);
} catch (Exception e) {
throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, className);
}
@ -223,13 +221,7 @@ public class ServiceRegistryImpl implements ServiceRegistry {
private void instantiateInterceptors(List<String> classNames, List<BaseInterceptor> interceptors) {
if (classNames != null) {
for (final String className : classNames) {
BaseInterceptor interceptor = AccessController.doPrivileged(new PrivilegedAction<BaseInterceptor>() {
@Override
public BaseInterceptor run() {
return (BaseInterceptor) ClassloadingUtil.newInstanceFromClassLoader(className);
}
});
BaseInterceptor interceptor = loadClass(className);
interceptors.add(interceptor);
}
}

View File

@ -58,6 +58,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.unit.core.config.impl.fakes.FakeConnectorServiceFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert;
@ -1327,6 +1328,21 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertEquals(1, second.getJsonNumber("consumerCount").longValue());
}
@Test
public void testConnectorServiceManagement() throws Exception {
ActiveMQServerControl managementControl = createManagementControl();
managementControl.createConnectorService("myconn", FakeConnectorServiceFactory.class.getCanonicalName(), new HashMap<String, Object>());
Assert.assertEquals(1, server.getConnectorsService().getConnectors().size());
managementControl.createConnectorService("myconn2", FakeConnectorServiceFactory.class.getCanonicalName(), new HashMap<String, Object>());
Assert.assertEquals(2, server.getConnectorsService().getConnectors().size());
managementControl.destroyConnectorService("myconn");
Assert.assertEquals(1, server.getConnectorsService().getConnectors().size());
Assert.assertEquals("myconn2", managementControl.getConnectorServices()[0]);
}
protected void scaleDown(ScaleDownHandler handler) throws Exception {
SimpleString address = new SimpleString("testQueue");
HashMap<String, Object> params = new HashMap<>();

View File

@ -20,6 +20,8 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import java.util.Map;
public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTest {
// Constants -----------------------------------------------------
@ -627,6 +629,21 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
public void createConnectorService(String name, String factoryClass, Map<String, Object> parameters) throws Exception {
proxy.invokeOperation("createConnectorService", name, factoryClass, parameters);
}
@Override
public void destroyConnectorService(String name) throws Exception {
proxy.invokeOperation("destroyConnectorService", name);
}
@Override
public String[] getConnectorServices() {
return ActiveMQServerControlUsingCoreTest.toStringArray((Object[]) proxy.retrieveAttributeValue("connectorServices"));
}
@Override
public void forceFailover() throws Exception {
proxy.invokeOperation("forceFailover");

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.unit.core.config.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -65,7 +66,7 @@ public class ConnectorsServiceTest extends ActiveMQTestBase {
connectorsService.start();
assertTrue(connectorsService.getConnectors().size() == 1);
assertTrue(connectorsService.getConnectors().contains(connectorServiceFactory.getConnectorService()));
assertTrue(connectorsService.getConnectors().values().contains(connectorServiceFactory.getConnectorService()));
}
/**
@ -86,4 +87,49 @@ public class ConnectorsServiceTest extends ActiveMQTestBase {
assertTrue(connectorsService.getConnectors().size() == 1);
}
/**
* Test that connectors can be created and destroyed directly.
*
* @throws Exception
*/
@Test
public void testConnectorServiceUsedDirectly() throws Exception {
// Initial setup with existing connector service
ConnectorServiceConfiguration connectorServiceConfiguration = new ConnectorServiceConfiguration().setFactoryClassName(FakeConnectorServiceFactory.class.getCanonicalName()).setParams(new HashMap<String, Object>()).setName("myfact");
configuration.setConnectorServiceConfigurations(Arrays.asList(connectorServiceConfiguration));
ConnectorsService connectorsService = new ConnectorsService(configuration, null, null, null, serviceRegistry);
connectorsService.start();
assertEquals(1, connectorsService.getConnectors().size());
// Add with same name
FakeConnectorServiceFactory connectorServiceFactory = new FakeConnectorServiceFactory();
try {
connectorsService.createService(connectorServiceConfiguration, connectorServiceFactory);
assertTrue("Expected exception when creating service with same name", false);
} catch (Exception e) {
}
// Add unique with same factory
ConnectorServiceConfiguration additionalServiceConfiguration = new ConnectorServiceConfiguration().setFactoryClassName(FakeConnectorServiceFactory.class.getCanonicalName()).setParams(new HashMap<String, Object>()).setName("myfact2");
connectorsService.createService(additionalServiceConfiguration, connectorServiceFactory);
assertEquals(2, connectorsService.getConnectors().size());
// Destroy existing connector services
connectorsService.destroyService("myfact");
assertEquals(1, connectorsService.getConnectors().size());
connectorsService.destroyService("myfact2");
assertEquals(0, connectorsService.getConnectors().size());
// Destroy non-existing connector service
try {
connectorsService.destroyService("myfact");
assertTrue("Expected exception when destroying non-existing service", false);
} catch (Exception e) {
}
}
}