ARTEMIS-141 Move classloading into ServiceRegistry
This commit is contained in:
parent
517ca68cb0
commit
aa5ff90807
|
@ -16,6 +16,28 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.jms.bridge.impl;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
|
||||
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
|
||||
import org.apache.activemq.artemis.jms.bridge.ActiveMQJMSBridgeLogger;
|
||||
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
|
||||
import org.apache.activemq.artemis.jms.bridge.DestinationFactory;
|
||||
import org.apache.activemq.artemis.jms.bridge.JMSBridge;
|
||||
import org.apache.activemq.artemis.jms.bridge.JMSBridgeControl;
|
||||
import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
||||
import org.apache.activemq.artemis.jms.server.ActiveMQJMSServerBundle;
|
||||
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
|
||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistry;
|
||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
|
||||
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
|
||||
import org.apache.activemq.artemis.utils.SensitiveDataCodec;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
|
@ -36,8 +58,6 @@ import javax.transaction.Transaction;
|
|||
import javax.transaction.TransactionManager;
|
||||
import javax.transaction.TransactionRolledbackException;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
@ -49,29 +69,6 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
|
||||
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
|
||||
import org.apache.activemq.artemis.jms.bridge.ActiveMQJMSBridgeLogger;
|
||||
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
|
||||
import org.apache.activemq.artemis.jms.bridge.DestinationFactory;
|
||||
import org.apache.activemq.artemis.jms.bridge.JMSBridge;
|
||||
import org.apache.activemq.artemis.jms.bridge.JMSBridgeControl;
|
||||
import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
||||
import org.apache.activemq.artemis.jms.server.ActiveMQJMSServerBundle;
|
||||
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
|
||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistry;
|
||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
|
||||
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
|
||||
import org.apache.activemq.artemis.utils.SensitiveDataCodec;
|
||||
|
||||
public final class JMSBridgeImpl implements JMSBridge
|
||||
{
|
||||
private static final String[] RESOURCE_RECOVERY_CLASS_NAMES = new String[]{"org.jboss.as.messaging.jms.AS7RecoveryRegistry"};
|
||||
|
@ -2268,22 +2265,6 @@ public final class JMSBridgeImpl implements JMSBridge
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
|
||||
* utility class, as it would be a door to load anything you like in a safe VM.
|
||||
* For that reason any class trying to do a privileged block should do with the AccessController directly.
|
||||
*/
|
||||
private static Object safeInitNewInstance(final String className)
|
||||
{
|
||||
return AccessController.doPrivileged(new PrivilegedAction<Object>()
|
||||
{
|
||||
public Object run()
|
||||
{
|
||||
return ClassloadingUtil.newInstanceFromClassLoader(className);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public boolean isUseMaskedPassword()
|
||||
{
|
||||
return useMaskedPassword;
|
||||
|
|
|
@ -16,19 +16,16 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.ra.recovery;
|
||||
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.ra.ActiveMQRALogger;
|
||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistry;
|
||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistryImpl;
|
||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
|
||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
|
||||
public final class RecoveryManager
|
||||
{
|
||||
private ActiveMQRegistry registry;
|
||||
|
@ -121,21 +118,6 @@ public final class RecoveryManager
|
|||
}
|
||||
}
|
||||
|
||||
/** This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
|
||||
* utility class, as it would be a door to load anything you like in a safe VM.
|
||||
* For that reason any class trying to do a privileged block should do with the AccessController directly.
|
||||
*/
|
||||
private static Object safeInitNewInstance(final String className)
|
||||
{
|
||||
return AccessController.doPrivileged(new PrivilegedAction<Object>()
|
||||
{
|
||||
public Object run()
|
||||
{
|
||||
return ClassloadingUtil.newInstanceFromClassLoader(className);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public Set<XARecoveryConfig> getResources()
|
||||
{
|
||||
return resources;
|
||||
|
|
|
@ -16,25 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.remoting.server.impl;
|
||||
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
|
@ -67,10 +48,28 @@ import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
|||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener
|
||||
{
|
||||
// Constants -----------------------------------------------------
|
||||
|
@ -187,19 +186,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
|
|||
|
||||
private void setInterceptors(Configuration configuration)
|
||||
{
|
||||
addReflectivelyInstantiatedInterceptors(configuration.getIncomingInterceptorClassNames(), incomingInterceptors);
|
||||
addReflectivelyInstantiatedInterceptors(configuration.getOutgoingInterceptorClassNames(), outgoingInterceptors);
|
||||
incomingInterceptors.addAll(serviceRegistry.getIncomingInterceptors());
|
||||
outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors());
|
||||
}
|
||||
|
||||
private void addReflectivelyInstantiatedInterceptors(List<String> classNames, List<BaseInterceptor> interceptors)
|
||||
{
|
||||
for (String className : classNames)
|
||||
{
|
||||
BaseInterceptor interceptor = ((BaseInterceptor) safeInitNewInstance(className));
|
||||
interceptors.add(interceptor);
|
||||
}
|
||||
incomingInterceptors.addAll(serviceRegistry.getIncomingInterceptors(configuration.getIncomingInterceptorClassNames()));
|
||||
outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors(configuration.getOutgoingInterceptorClassNames()));
|
||||
}
|
||||
|
||||
public synchronized void start() throws Exception
|
||||
|
@ -810,17 +798,6 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
|
|||
}
|
||||
}
|
||||
|
||||
private static Object safeInitNewInstance(final String className)
|
||||
{
|
||||
return AccessController.doPrivileged(new PrivilegedAction<Object>()
|
||||
{
|
||||
public Object run()
|
||||
{
|
||||
return ClassloadingUtil.newInstanceFromClassLoader(className);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected void updateProtocols()
|
||||
{
|
||||
for (ProtocolManager<?> protocolManager : this.protocolMap.values())
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||
|
@ -23,6 +24,7 @@ import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
|||
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
|
@ -43,32 +45,64 @@ public interface ServiceRegistry
|
|||
|
||||
void removeConnectorService(ConnectorServiceConfiguration configuration);
|
||||
|
||||
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices();
|
||||
/**
|
||||
* Get a collection of paired org.apache.activemq.artemis.core.server.ConnectorServiceFactory and
|
||||
* org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration instances.
|
||||
*
|
||||
* @param configs
|
||||
* @return
|
||||
*/
|
||||
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices(List<ConnectorServiceConfiguration> configs);
|
||||
|
||||
void addIncomingInterceptor(String name, Interceptor interceptor);
|
||||
void addIncomingInterceptor(Interceptor interceptor);
|
||||
|
||||
void removeIncomingInterceptor(String name);
|
||||
/**
|
||||
* Get a list of org.apache.activemq.artemis.api.core.BaseInterceptor instances
|
||||
*
|
||||
* @param classNames
|
||||
* @return
|
||||
*/
|
||||
List<BaseInterceptor> getIncomingInterceptors(List<String> classNames);
|
||||
|
||||
Collection<Interceptor> getIncomingInterceptors();
|
||||
void addOutgoingInterceptor(Interceptor interceptor);
|
||||
|
||||
Interceptor getIncomingInterceptor(String name);
|
||||
/**
|
||||
* Get a list of org.apache.activemq.artemis.api.core.BaseInterceptor instances
|
||||
*
|
||||
* @param classNames
|
||||
* @return
|
||||
*/
|
||||
List<BaseInterceptor> getOutgoingInterceptors(List<String> classNames);
|
||||
|
||||
void addOutgoingInterceptor(String name, Interceptor interceptor);
|
||||
|
||||
Interceptor getOutgoingInterceptor(String name);
|
||||
|
||||
void removeOutgoingInterceptor(String name);
|
||||
|
||||
Collection<Interceptor> getOutgoingInterceptors();
|
||||
|
||||
Transformer getDivertTransformer(String name);
|
||||
/**
|
||||
* Get an instance of org.apache.activemq.artemis.core.server.cluster.Transformer for a divert
|
||||
*
|
||||
* @param name the name of divert for which the transformer will be used
|
||||
* @param className the fully qualified name of the transformer implementation (can be null)
|
||||
* @return
|
||||
*/
|
||||
Transformer getDivertTransformer(String name, String className);
|
||||
|
||||
void addDivertTransformer(String name, Transformer transformer);
|
||||
|
||||
Transformer getBridgeTransformer(String name);
|
||||
/**
|
||||
* Get an instance of org.apache.activemq.artemis.core.server.cluster.Transformer for a bridge
|
||||
*
|
||||
* @param name the name of bridge for which the transformer will be used
|
||||
* @param className the fully qualified name of the transformer implementation (can be null)
|
||||
* @return
|
||||
*/
|
||||
Transformer getBridgeTransformer(String name, String className);
|
||||
|
||||
void addBridgeTransformer(String name, Transformer transformer);
|
||||
|
||||
/**
|
||||
* Get an instance of org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory
|
||||
*
|
||||
* @param name the name of acceptor for which the factory will be used
|
||||
* @param className the fully qualified name of the factory implementation (can be null)
|
||||
* @return
|
||||
*/
|
||||
AcceptorFactory getAcceptorFactory(String name, String className);
|
||||
|
||||
void addAcceptorFactory(String name, AcceptorFactory acceptorFactory);
|
||||
|
|
|
@ -16,18 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server.cluster;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
|
||||
|
@ -49,7 +37,6 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
|
@ -67,6 +54,18 @@ import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
|||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
* A ClusterManager manages {@link ClusterConnection}s, {@link BroadcastGroup}s and {@link Bridge}s.
|
||||
* <p>
|
||||
|
@ -458,12 +457,7 @@ public final class ClusterManager implements ActiveMQComponent
|
|||
return;
|
||||
}
|
||||
|
||||
Transformer transformer = server.getServiceRegistry().getBridgeTransformer(config.getName());
|
||||
|
||||
if (transformer == null)
|
||||
{
|
||||
transformer = instantiateTransformer(config.getTransformerClassName());
|
||||
}
|
||||
Transformer transformer = server.getServiceRegistry().getBridgeTransformer(config.getName(), config.getTransformerClassName());
|
||||
|
||||
Binding binding = postOffice.getBinding(new SimpleString(config.getQueueName()));
|
||||
|
||||
|
@ -816,26 +810,6 @@ public final class ClusterManager implements ActiveMQComponent
|
|||
}
|
||||
}
|
||||
|
||||
private Transformer instantiateTransformer(final String transformerClassName)
|
||||
{
|
||||
Transformer transformer = null;
|
||||
|
||||
if (transformerClassName != null)
|
||||
{
|
||||
ClassLoader loader = Thread.currentThread().getContextClassLoader();
|
||||
try
|
||||
{
|
||||
Class<?> clz = loader.loadClass(transformerClassName);
|
||||
transformer = (Transformer) clz.newInstance();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, transformerClassName);
|
||||
}
|
||||
}
|
||||
return transformer;
|
||||
}
|
||||
|
||||
|
||||
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
|
||||
{
|
||||
|
|
|
@ -16,30 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server.impl;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -125,7 +101,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
|||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
||||
|
@ -133,6 +108,30 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
|
|||
import org.apache.activemq.artemis.utils.SecurityFormatter;
|
||||
import org.apache.activemq.artemis.utils.VersionLoader;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* The ActiveMQ Artemis server implementation
|
||||
*/
|
||||
|
@ -1516,12 +1515,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
|||
|
||||
SimpleString sAddress = new SimpleString(config.getAddress());
|
||||
|
||||
Transformer transformer = getServiceRegistry().getDivertTransformer(config.getName());
|
||||
|
||||
if (transformer == null)
|
||||
{
|
||||
transformer = instantiateTransformer(config.getTransformerClassName());
|
||||
}
|
||||
Transformer transformer = getServiceRegistry().getDivertTransformer(config.getName(), config.getTransformerClassName());
|
||||
|
||||
Filter filter = FilterImpl.createFilter(config.getFilterString());
|
||||
|
||||
|
@ -2206,23 +2200,6 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
|||
}
|
||||
}
|
||||
|
||||
private Transformer instantiateTransformer(final String transformerClassName)
|
||||
{
|
||||
Transformer transformer = null;
|
||||
|
||||
if (transformerClassName != null)
|
||||
{
|
||||
transformer = (Transformer) instantiateInstance(transformerClassName);
|
||||
}
|
||||
|
||||
return transformer;
|
||||
}
|
||||
|
||||
private Object instantiateInstance(final String className)
|
||||
{
|
||||
return safeInitNewInstance(className);
|
||||
}
|
||||
|
||||
private static ClassLoader getThisClassLoader()
|
||||
{
|
||||
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
|
||||
|
@ -2278,23 +2255,6 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
|
||||
* utility class, as it would be a door to load anything you like in a safe VM.
|
||||
* For that reason any class trying to do a privileged block should do with the AccessController directly.
|
||||
*/
|
||||
private static Object safeInitNewInstance(final String className)
|
||||
{
|
||||
return AccessController.doPrivileged(new PrivilegedAction<Object>()
|
||||
{
|
||||
public Object run()
|
||||
{
|
||||
return ClassloadingUtil.newInstanceFromClassLoader(className);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void addProtocolManagerFactory(ProtocolManagerFactory factory)
|
||||
{
|
||||
protocolManagerFactories.add(factory);
|
||||
|
|
|
@ -16,25 +16,23 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server.impl;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.ConnectorService;
|
||||
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
* ConnectorsService will pool some resource for updates, e.g. Twitter, then the changes are picked
|
||||
* and converted into a ServerMessage for a given destination (queue).
|
||||
|
@ -73,21 +71,13 @@ public final class ConnectorsService implements ActiveMQComponent
|
|||
|
||||
public void start() throws Exception
|
||||
{
|
||||
List<ConnectorServiceConfiguration> configurationList = configuration.getConnectorServiceConfigurations();
|
||||
|
||||
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServiceFactories = serviceRegistry.getConnectorServices();
|
||||
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServiceFactories = serviceRegistry.getConnectorServices(configuration.getConnectorServiceConfigurations());
|
||||
|
||||
for (Pair<ConnectorServiceFactory, ConnectorServiceConfiguration> pair : connectorServiceFactories)
|
||||
{
|
||||
createService(pair.getB(), pair.getA());
|
||||
}
|
||||
|
||||
for (ConnectorServiceConfiguration info : configurationList)
|
||||
{
|
||||
ConnectorServiceFactory factory = (ConnectorServiceFactory) ClassloadingUtil.newInstanceFromClassLoader(info.getFactoryClassName());
|
||||
createService(info, factory);
|
||||
}
|
||||
|
||||
for (ConnectorService connector : connectors)
|
||||
{
|
||||
try
|
||||
|
|
|
@ -16,24 +16,28 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server.impl;
|
||||
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
|
||||
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
|
||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
public class ServiceRegistryImpl implements ServiceRegistry
|
||||
{
|
||||
private ExecutorService executorService;
|
||||
|
@ -43,9 +47,9 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
|||
/* We are using a List rather than HashMap here as ActiveMQ Artemis allows multiple instances of the same class to be added
|
||||
* to the interceptor list
|
||||
*/
|
||||
private Map<String, Interceptor> incomingInterceptors;
|
||||
private List<BaseInterceptor> incomingInterceptors;
|
||||
|
||||
private Map<String, Interceptor> outgoingInterceptors;
|
||||
private List<BaseInterceptor> outgoingInterceptors;
|
||||
|
||||
private Map<String, Transformer> divertTransformers;
|
||||
|
||||
|
@ -57,87 +61,104 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
|||
|
||||
public ServiceRegistryImpl()
|
||||
{
|
||||
this.incomingInterceptors = new ConcurrentHashMap<>();
|
||||
this.outgoingInterceptors = new ConcurrentHashMap<>();
|
||||
this.incomingInterceptors = Collections.synchronizedList(new ArrayList<BaseInterceptor>());
|
||||
this.outgoingInterceptors = Collections.synchronizedList(new ArrayList<BaseInterceptor>());
|
||||
this.connectorServices = new ConcurrentHashMap<>();
|
||||
this.divertTransformers = new ConcurrentHashMap<>();
|
||||
this.bridgeTransformers = new ConcurrentHashMap<>();
|
||||
this.acceptorFactories = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutorService getExecutorService()
|
||||
{
|
||||
return executorService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setExecutorService(ExecutorService executorService)
|
||||
{
|
||||
this.executorService = executorService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledExecutorService getScheduledExecutorService()
|
||||
{
|
||||
return scheduledExecutorService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService)
|
||||
{
|
||||
this.scheduledExecutorService = scheduledExecutorService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addConnectorService(ConnectorServiceFactory connectorServiceFactory, ConnectorServiceConfiguration configuration)
|
||||
{
|
||||
connectorServices.put(configuration.getConnectorName(), new Pair<>(connectorServiceFactory, configuration));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeConnectorService(ConnectorServiceConfiguration configuration)
|
||||
{
|
||||
connectorServices.remove(configuration.getConnectorName());
|
||||
}
|
||||
|
||||
public Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices()
|
||||
@Override
|
||||
public Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices(List<ConnectorServiceConfiguration> configs)
|
||||
{
|
||||
if (configs != null)
|
||||
{
|
||||
for (final ConnectorServiceConfiguration config : configs)
|
||||
{
|
||||
if (connectorServices.get(config.getConnectorName()) == null)
|
||||
{
|
||||
ConnectorServiceFactory factory = AccessController.doPrivileged(new PrivilegedAction<ConnectorServiceFactory>()
|
||||
{
|
||||
public ConnectorServiceFactory run()
|
||||
{
|
||||
return (ConnectorServiceFactory) ClassloadingUtil.newInstanceFromClassLoader(config.getFactoryClassName());
|
||||
}
|
||||
});
|
||||
addConnectorService(factory, config);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return connectorServices.values();
|
||||
}
|
||||
|
||||
public void addIncomingInterceptor(String name, Interceptor interceptor)
|
||||
@Override
|
||||
public void addIncomingInterceptor(Interceptor interceptor)
|
||||
{
|
||||
incomingInterceptors.put(name, interceptor);
|
||||
incomingInterceptors.add(interceptor);
|
||||
}
|
||||
|
||||
public void removeIncomingInterceptor(String name)
|
||||
@Override
|
||||
public List<BaseInterceptor> getIncomingInterceptors(List<String> classNames)
|
||||
{
|
||||
incomingInterceptors.remove(name);
|
||||
List<BaseInterceptor> interceptors = new ArrayList<>(incomingInterceptors);
|
||||
|
||||
instantiateInterceptors(classNames, interceptors);
|
||||
|
||||
return interceptors;
|
||||
}
|
||||
|
||||
public Collection<Interceptor> getIncomingInterceptors()
|
||||
@Override
|
||||
public void addOutgoingInterceptor(Interceptor interceptor)
|
||||
{
|
||||
return Collections.unmodifiableCollection(incomingInterceptors.values());
|
||||
outgoingInterceptors.add(interceptor);
|
||||
}
|
||||
|
||||
public Interceptor getIncomingInterceptor(String name)
|
||||
@Override
|
||||
public List<BaseInterceptor> getOutgoingInterceptors(List<String> classNames)
|
||||
{
|
||||
return incomingInterceptors.get(name);
|
||||
}
|
||||
List<BaseInterceptor> interceptors = new ArrayList<>(outgoingInterceptors);
|
||||
|
||||
public void addOutgoingInterceptor(String name, Interceptor interceptor)
|
||||
{
|
||||
outgoingInterceptors.put(name, interceptor);
|
||||
}
|
||||
instantiateInterceptors(classNames, interceptors);
|
||||
|
||||
public Interceptor getOutgoingInterceptor(String name)
|
||||
{
|
||||
return outgoingInterceptors.get(name);
|
||||
}
|
||||
|
||||
public void removeOutgoingInterceptor(String name)
|
||||
{
|
||||
outgoingInterceptors.remove(name);
|
||||
}
|
||||
|
||||
public Collection<Interceptor> getOutgoingInterceptors()
|
||||
{
|
||||
return Collections.unmodifiableCollection(outgoingInterceptors.values());
|
||||
return interceptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -147,9 +168,17 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
|||
}
|
||||
|
||||
@Override
|
||||
public Transformer getDivertTransformer(String name)
|
||||
public Transformer getDivertTransformer(String name, String className)
|
||||
{
|
||||
return divertTransformers.get(name);
|
||||
Transformer transformer = divertTransformers.get(name);
|
||||
|
||||
if (transformer == null && className != null)
|
||||
{
|
||||
transformer = instantiateTransformer(className);
|
||||
addDivertTransformer(name, transformer);
|
||||
}
|
||||
|
||||
return transformer;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -159,9 +188,17 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
|||
}
|
||||
|
||||
@Override
|
||||
public Transformer getBridgeTransformer(String name)
|
||||
public Transformer getBridgeTransformer(String name, String className)
|
||||
{
|
||||
return bridgeTransformers.get(name);
|
||||
Transformer transformer = bridgeTransformers.get(name);
|
||||
|
||||
if (transformer == null && className != null)
|
||||
{
|
||||
transformer = instantiateTransformer(className);
|
||||
addBridgeTransformer(name, transformer);
|
||||
}
|
||||
|
||||
return transformer;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -169,7 +206,7 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
|||
{
|
||||
AcceptorFactory factory = acceptorFactories.get(name);
|
||||
|
||||
if (factory == null)
|
||||
if (factory == null && className != null)
|
||||
{
|
||||
factory = AccessController.doPrivileged(new PrivilegedAction<AcceptorFactory>()
|
||||
{
|
||||
|
@ -190,4 +227,47 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
|||
{
|
||||
acceptorFactories.put(name, acceptorFactory);
|
||||
}
|
||||
|
||||
private Transformer instantiateTransformer(final String className)
|
||||
{
|
||||
Transformer transformer = null;
|
||||
|
||||
if (className != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
transformer = AccessController.doPrivileged(new PrivilegedAction<Transformer>()
|
||||
{
|
||||
public Transformer run()
|
||||
{
|
||||
return (Transformer) ClassloadingUtil.newInstanceFromClassLoader(className);
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, className);
|
||||
}
|
||||
}
|
||||
return transformer;
|
||||
}
|
||||
|
||||
private void instantiateInterceptors(List<String> classNames, List<BaseInterceptor> interceptors)
|
||||
{
|
||||
if (classNames != null)
|
||||
{
|
||||
for (final String className : classNames)
|
||||
{
|
||||
BaseInterceptor interceptor = AccessController.doPrivileged(new PrivilegedAction<BaseInterceptor>()
|
||||
{
|
||||
public BaseInterceptor run()
|
||||
{
|
||||
return (BaseInterceptor) ClassloadingUtil.newInstanceFromClassLoader(className);
|
||||
}
|
||||
});
|
||||
|
||||
interceptors.add(interceptor);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,8 +83,7 @@ public class RemotingServiceImplTest
|
|||
@Test
|
||||
public void testSetInterceptorsAddsBothInterceptorsFromConfigAndServiceRegistry() throws Exception
|
||||
{
|
||||
Method method = RemotingServiceImpl.class.getDeclaredMethod("setInterceptors",
|
||||
Configuration.class);
|
||||
Method method = RemotingServiceImpl.class.getDeclaredMethod("setInterceptors", Configuration.class);
|
||||
Field incomingInterceptors = RemotingServiceImpl.class.getDeclaredField("incomingInterceptors");
|
||||
Field outgoingInterceptors = RemotingServiceImpl.class.getDeclaredField("outgoingInterceptors");
|
||||
|
||||
|
@ -92,10 +91,10 @@ public class RemotingServiceImplTest
|
|||
incomingInterceptors.setAccessible(true);
|
||||
outgoingInterceptors.setAccessible(true);
|
||||
|
||||
serviceRegistry.addIncomingInterceptor("Foo", new FakeInterceptor());
|
||||
serviceRegistry.addOutgoingInterceptor("Bar", new FakeInterceptor());
|
||||
serviceRegistry.addIncomingInterceptor(new FakeInterceptor());
|
||||
serviceRegistry.addOutgoingInterceptor(new FakeInterceptor());
|
||||
|
||||
List<String> interceptorClassNames = new ArrayList<String>();
|
||||
List<String> interceptorClassNames = new ArrayList<>();
|
||||
interceptorClassNames.add(FakeInterceptor.class.getCanonicalName());
|
||||
configuration.setIncomingInterceptorClassNames(interceptorClassNames);
|
||||
configuration.setOutgoingInterceptorClassNames(interceptorClassNames);
|
||||
|
@ -104,8 +103,8 @@ public class RemotingServiceImplTest
|
|||
|
||||
assertTrue(((List) incomingInterceptors.get(remotingService)).size() == 2 );
|
||||
assertTrue(((List) outgoingInterceptors.get(remotingService)).size() == 2 );
|
||||
assertTrue(((List) incomingInterceptors.get(remotingService)).contains(serviceRegistry.getIncomingInterceptor("Foo")));
|
||||
assertTrue(((List) outgoingInterceptors.get(remotingService)).contains(serviceRegistry.getOutgoingInterceptor("Bar")));
|
||||
assertTrue(((List) incomingInterceptors.get(remotingService)).contains(serviceRegistry.getIncomingInterceptors(null).get(0)));
|
||||
assertTrue(((List) outgoingInterceptors.get(remotingService)).contains(serviceRegistry.getOutgoingInterceptors(null).get(0)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -115,19 +114,16 @@ public class RemotingServiceImplTest
|
|||
@Test
|
||||
public void testInterceptorsAreAddedOnCreationOfServiceRegistry() throws Exception
|
||||
{
|
||||
Method method = RemotingServiceImpl.class.getDeclaredMethod("setInterceptors",
|
||||
Configuration.class);
|
||||
Field incomingInterceptors = RemotingServiceImpl.class.getDeclaredField("incomingInterceptors");
|
||||
Field outgoingInterceptors = RemotingServiceImpl.class.getDeclaredField("outgoingInterceptors");
|
||||
|
||||
method.setAccessible(true);
|
||||
incomingInterceptors.setAccessible(true);
|
||||
outgoingInterceptors.setAccessible(true);
|
||||
|
||||
serviceRegistry.addIncomingInterceptor("Foo", new FakeInterceptor());
|
||||
serviceRegistry.addOutgoingInterceptor("Bar", new FakeInterceptor());
|
||||
serviceRegistry.addIncomingInterceptor(new FakeInterceptor());
|
||||
serviceRegistry.addOutgoingInterceptor(new FakeInterceptor());
|
||||
|
||||
List<String> interceptorClassNames = new ArrayList<String>();
|
||||
List<String> interceptorClassNames = new ArrayList<>();
|
||||
interceptorClassNames.add(FakeInterceptor.class.getCanonicalName());
|
||||
configuration.setIncomingInterceptorClassNames(interceptorClassNames);
|
||||
configuration.setOutgoingInterceptorClassNames(interceptorClassNames);
|
||||
|
@ -136,7 +132,7 @@ public class RemotingServiceImplTest
|
|||
|
||||
assertTrue(((List) incomingInterceptors.get(remotingService)).size() == 2 );
|
||||
assertTrue(((List) outgoingInterceptors.get(remotingService)).size() == 2 );
|
||||
assertTrue(((List) incomingInterceptors.get(remotingService)).contains(serviceRegistry.getIncomingInterceptor("Foo")));
|
||||
assertTrue(((List) outgoingInterceptors.get(remotingService)).contains(serviceRegistry.getOutgoingInterceptor("Bar")));
|
||||
assertTrue(((List) incomingInterceptors.get(remotingService)).contains(serviceRegistry.getIncomingInterceptors(null).get(0)));
|
||||
assertTrue(((List) outgoingInterceptors.get(remotingService)).contains(serviceRegistry.getOutgoingInterceptors(null).get(0)));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue