This closes #39 on ServiceRegistry classloading
This commit is contained in:
commit
455f0e394e
|
@ -16,6 +16,28 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.jms.bridge.impl;
|
package org.apache.activemq.artemis.jms.bridge.impl;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
|
||||||
|
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
|
||||||
|
import org.apache.activemq.artemis.jms.bridge.ActiveMQJMSBridgeLogger;
|
||||||
|
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
|
||||||
|
import org.apache.activemq.artemis.jms.bridge.DestinationFactory;
|
||||||
|
import org.apache.activemq.artemis.jms.bridge.JMSBridge;
|
||||||
|
import org.apache.activemq.artemis.jms.bridge.JMSBridgeControl;
|
||||||
|
import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.artemis.jms.server.ActiveMQJMSServerBundle;
|
||||||
|
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
|
||||||
|
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistry;
|
||||||
|
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
|
||||||
|
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
|
||||||
|
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
|
||||||
|
import org.apache.activemq.artemis.utils.SensitiveDataCodec;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
@ -36,8 +58,6 @@ import javax.transaction.Transaction;
|
||||||
import javax.transaction.TransactionManager;
|
import javax.transaction.TransactionManager;
|
||||||
import javax.transaction.TransactionRolledbackException;
|
import javax.transaction.TransactionRolledbackException;
|
||||||
import javax.transaction.xa.XAResource;
|
import javax.transaction.xa.XAResource;
|
||||||
import java.security.AccessController;
|
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -49,29 +69,6 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
|
|
||||||
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
|
|
||||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
|
|
||||||
import org.apache.activemq.artemis.jms.bridge.ActiveMQJMSBridgeLogger;
|
|
||||||
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
|
|
||||||
import org.apache.activemq.artemis.jms.bridge.DestinationFactory;
|
|
||||||
import org.apache.activemq.artemis.jms.bridge.JMSBridge;
|
|
||||||
import org.apache.activemq.artemis.jms.bridge.JMSBridgeControl;
|
|
||||||
import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
|
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
|
||||||
import org.apache.activemq.artemis.jms.server.ActiveMQJMSServerBundle;
|
|
||||||
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
|
|
||||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistry;
|
|
||||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
|
|
||||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
|
||||||
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
|
|
||||||
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
|
|
||||||
import org.apache.activemq.artemis.utils.SensitiveDataCodec;
|
|
||||||
|
|
||||||
public final class JMSBridgeImpl implements JMSBridge
|
public final class JMSBridgeImpl implements JMSBridge
|
||||||
{
|
{
|
||||||
private static final String[] RESOURCE_RECOVERY_CLASS_NAMES = new String[]{"org.jboss.as.messaging.jms.AS7RecoveryRegistry"};
|
private static final String[] RESOURCE_RECOVERY_CLASS_NAMES = new String[]{"org.jboss.as.messaging.jms.AS7RecoveryRegistry"};
|
||||||
|
@ -2268,22 +2265,6 @@ public final class JMSBridgeImpl implements JMSBridge
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
|
|
||||||
* utility class, as it would be a door to load anything you like in a safe VM.
|
|
||||||
* For that reason any class trying to do a privileged block should do with the AccessController directly.
|
|
||||||
*/
|
|
||||||
private static Object safeInitNewInstance(final String className)
|
|
||||||
{
|
|
||||||
return AccessController.doPrivileged(new PrivilegedAction<Object>()
|
|
||||||
{
|
|
||||||
public Object run()
|
|
||||||
{
|
|
||||||
return ClassloadingUtil.newInstanceFromClassLoader(className);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isUseMaskedPassword()
|
public boolean isUseMaskedPassword()
|
||||||
{
|
{
|
||||||
return useMaskedPassword;
|
return useMaskedPassword;
|
||||||
|
|
|
@ -16,19 +16,16 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.ra.recovery;
|
package org.apache.activemq.artemis.ra.recovery;
|
||||||
|
|
||||||
import java.security.AccessController;
|
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.ServiceLoader;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.ra.ActiveMQRALogger;
|
import org.apache.activemq.artemis.ra.ActiveMQRALogger;
|
||||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistry;
|
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistry;
|
||||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistryImpl;
|
import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistryImpl;
|
||||||
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
|
import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
|
||||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
|
||||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||||
|
|
||||||
|
import java.util.ServiceLoader;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public final class RecoveryManager
|
public final class RecoveryManager
|
||||||
{
|
{
|
||||||
private ActiveMQRegistry registry;
|
private ActiveMQRegistry registry;
|
||||||
|
@ -121,21 +118,6 @@ public final class RecoveryManager
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
|
|
||||||
* utility class, as it would be a door to load anything you like in a safe VM.
|
|
||||||
* For that reason any class trying to do a privileged block should do with the AccessController directly.
|
|
||||||
*/
|
|
||||||
private static Object safeInitNewInstance(final String className)
|
|
||||||
{
|
|
||||||
return AccessController.doPrivileged(new PrivilegedAction<Object>()
|
|
||||||
{
|
|
||||||
public Object run()
|
|
||||||
{
|
|
||||||
return ClassloadingUtil.newInstanceFromClassLoader(className);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public Set<XARecoveryConfig> getResources()
|
public Set<XARecoveryConfig> getResources()
|
||||||
{
|
{
|
||||||
return resources;
|
return resources;
|
||||||
|
|
|
@ -16,8 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.remoting.server;
|
package org.apache.activemq.artemis.core.remoting.server;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||||
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
|
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
|
||||||
|
@ -43,8 +45,12 @@ public interface RemotingService
|
||||||
|
|
||||||
void addIncomingInterceptor(Interceptor interceptor);
|
void addIncomingInterceptor(Interceptor interceptor);
|
||||||
|
|
||||||
|
List<BaseInterceptor> getIncomingInterceptors();
|
||||||
|
|
||||||
void addOutgoingInterceptor(Interceptor interceptor);
|
void addOutgoingInterceptor(Interceptor interceptor);
|
||||||
|
|
||||||
|
List<BaseInterceptor> getOutgoinInterceptors();
|
||||||
|
|
||||||
boolean removeIncomingInterceptor(Interceptor interceptor);
|
boolean removeIncomingInterceptor(Interceptor interceptor);
|
||||||
|
|
||||||
boolean removeOutgoingInterceptor(Interceptor interceptor);
|
boolean removeOutgoingInterceptor(Interceptor interceptor);
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.remoting.server.impl;
|
||||||
|
|
||||||
import java.security.AccessController;
|
import java.security.AccessController;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -67,7 +68,6 @@ import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
|
||||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
|
|
||||||
|
@ -187,19 +187,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
|
||||||
|
|
||||||
private void setInterceptors(Configuration configuration)
|
private void setInterceptors(Configuration configuration)
|
||||||
{
|
{
|
||||||
addReflectivelyInstantiatedInterceptors(configuration.getIncomingInterceptorClassNames(), incomingInterceptors);
|
incomingInterceptors.addAll(serviceRegistry.getIncomingInterceptors(configuration.getIncomingInterceptorClassNames()));
|
||||||
addReflectivelyInstantiatedInterceptors(configuration.getOutgoingInterceptorClassNames(), outgoingInterceptors);
|
outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors(configuration.getOutgoingInterceptorClassNames()));
|
||||||
incomingInterceptors.addAll(serviceRegistry.getIncomingInterceptors());
|
|
||||||
outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addReflectivelyInstantiatedInterceptors(List<String> classNames, List<BaseInterceptor> interceptors)
|
|
||||||
{
|
|
||||||
for (String className : classNames)
|
|
||||||
{
|
|
||||||
BaseInterceptor interceptor = ((BaseInterceptor) safeInitNewInstance(className));
|
|
||||||
interceptors.add(interceptor);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void start() throws Exception
|
public synchronized void start() throws Exception
|
||||||
|
@ -623,6 +612,12 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
|
||||||
updateProtocols();
|
updateProtocols();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<BaseInterceptor> getIncomingInterceptors()
|
||||||
|
{
|
||||||
|
return Collections.unmodifiableList(incomingInterceptors);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean removeIncomingInterceptor(final Interceptor interceptor)
|
public boolean removeIncomingInterceptor(final Interceptor interceptor)
|
||||||
{
|
{
|
||||||
|
@ -644,6 +639,12 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
|
||||||
updateProtocols();
|
updateProtocols();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<BaseInterceptor> getOutgoinInterceptors()
|
||||||
|
{
|
||||||
|
return Collections.unmodifiableList(outgoingInterceptors);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean removeOutgoingInterceptor(final Interceptor interceptor)
|
public boolean removeOutgoingInterceptor(final Interceptor interceptor)
|
||||||
{
|
{
|
||||||
|
@ -810,17 +811,6 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Object safeInitNewInstance(final String className)
|
|
||||||
{
|
|
||||||
return AccessController.doPrivileged(new PrivilegedAction<Object>()
|
|
||||||
{
|
|
||||||
public Object run()
|
|
||||||
{
|
|
||||||
return ClassloadingUtil.newInstanceFromClassLoader(className);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void updateProtocols()
|
protected void updateProtocols()
|
||||||
{
|
{
|
||||||
for (ProtocolManager<?> protocolManager : this.protocolMap.values())
|
for (ProtocolManager<?> protocolManager : this.protocolMap.values())
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.server;
|
package org.apache.activemq.artemis.core.server;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||||
|
@ -23,6 +24,7 @@ import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
|
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
|
@ -43,32 +45,64 @@ public interface ServiceRegistry
|
||||||
|
|
||||||
void removeConnectorService(ConnectorServiceConfiguration configuration);
|
void removeConnectorService(ConnectorServiceConfiguration configuration);
|
||||||
|
|
||||||
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices();
|
/**
|
||||||
|
* Get a collection of paired org.apache.activemq.artemis.core.server.ConnectorServiceFactory and
|
||||||
|
* org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration instances.
|
||||||
|
*
|
||||||
|
* @param configs
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices(List<ConnectorServiceConfiguration> configs);
|
||||||
|
|
||||||
void addIncomingInterceptor(String name, Interceptor interceptor);
|
void addIncomingInterceptor(Interceptor interceptor);
|
||||||
|
|
||||||
void removeIncomingInterceptor(String name);
|
/**
|
||||||
|
* Get a list of org.apache.activemq.artemis.api.core.BaseInterceptor instances
|
||||||
|
*
|
||||||
|
* @param classNames
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
List<BaseInterceptor> getIncomingInterceptors(List<String> classNames);
|
||||||
|
|
||||||
Collection<Interceptor> getIncomingInterceptors();
|
void addOutgoingInterceptor(Interceptor interceptor);
|
||||||
|
|
||||||
Interceptor getIncomingInterceptor(String name);
|
/**
|
||||||
|
* Get a list of org.apache.activemq.artemis.api.core.BaseInterceptor instances
|
||||||
|
*
|
||||||
|
* @param classNames
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
List<BaseInterceptor> getOutgoingInterceptors(List<String> classNames);
|
||||||
|
|
||||||
void addOutgoingInterceptor(String name, Interceptor interceptor);
|
/**
|
||||||
|
* Get an instance of org.apache.activemq.artemis.core.server.cluster.Transformer for a divert
|
||||||
Interceptor getOutgoingInterceptor(String name);
|
*
|
||||||
|
* @param name the name of divert for which the transformer will be used
|
||||||
void removeOutgoingInterceptor(String name);
|
* @param className the fully qualified name of the transformer implementation (can be null)
|
||||||
|
* @return
|
||||||
Collection<Interceptor> getOutgoingInterceptors();
|
*/
|
||||||
|
Transformer getDivertTransformer(String name, String className);
|
||||||
Transformer getDivertTransformer(String name);
|
|
||||||
|
|
||||||
void addDivertTransformer(String name, Transformer transformer);
|
void addDivertTransformer(String name, Transformer transformer);
|
||||||
|
|
||||||
Transformer getBridgeTransformer(String name);
|
/**
|
||||||
|
* Get an instance of org.apache.activemq.artemis.core.server.cluster.Transformer for a bridge
|
||||||
|
*
|
||||||
|
* @param name the name of bridge for which the transformer will be used
|
||||||
|
* @param className the fully qualified name of the transformer implementation (can be null)
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
Transformer getBridgeTransformer(String name, String className);
|
||||||
|
|
||||||
void addBridgeTransformer(String name, Transformer transformer);
|
void addBridgeTransformer(String name, Transformer transformer);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an instance of org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory
|
||||||
|
*
|
||||||
|
* @param name the name of acceptor for which the factory will be used
|
||||||
|
* @param className the fully qualified name of the factory implementation (can be null)
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
AcceptorFactory getAcceptorFactory(String name, String className);
|
AcceptorFactory getAcceptorFactory(String name, String className);
|
||||||
|
|
||||||
void addAcceptorFactory(String name, AcceptorFactory acceptorFactory);
|
void addAcceptorFactory(String name, AcceptorFactory acceptorFactory);
|
||||||
|
|
|
@ -16,18 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.server.cluster;
|
package org.apache.activemq.artemis.core.server.cluster;
|
||||||
|
|
||||||
import java.io.PrintWriter;
|
|
||||||
import java.io.StringWriter;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
|
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
|
||||||
|
@ -49,7 +37,6 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||||
|
@ -67,6 +54,18 @@ import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||||
|
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.io.StringWriter;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A ClusterManager manages {@link ClusterConnection}s, {@link BroadcastGroup}s and {@link Bridge}s.
|
* A ClusterManager manages {@link ClusterConnection}s, {@link BroadcastGroup}s and {@link Bridge}s.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -458,12 +457,7 @@ public final class ClusterManager implements ActiveMQComponent
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Transformer transformer = server.getServiceRegistry().getBridgeTransformer(config.getName());
|
Transformer transformer = server.getServiceRegistry().getBridgeTransformer(config.getName(), config.getTransformerClassName());
|
||||||
|
|
||||||
if (transformer == null)
|
|
||||||
{
|
|
||||||
transformer = instantiateTransformer(config.getTransformerClassName());
|
|
||||||
}
|
|
||||||
|
|
||||||
Binding binding = postOffice.getBinding(new SimpleString(config.getQueueName()));
|
Binding binding = postOffice.getBinding(new SimpleString(config.getQueueName()));
|
||||||
|
|
||||||
|
@ -816,26 +810,6 @@ public final class ClusterManager implements ActiveMQComponent
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Transformer instantiateTransformer(final String transformerClassName)
|
|
||||||
{
|
|
||||||
Transformer transformer = null;
|
|
||||||
|
|
||||||
if (transformerClassName != null)
|
|
||||||
{
|
|
||||||
ClassLoader loader = Thread.currentThread().getContextClassLoader();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
Class<?> clz = loader.loadClass(transformerClassName);
|
|
||||||
transformer = (Transformer) clz.newInstance();
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, transformerClassName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return transformer;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
|
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -16,30 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.server.impl;
|
package org.apache.activemq.artemis.core.server.impl;
|
||||||
|
|
||||||
import javax.management.MBeanServer;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FilenameFilter;
|
|
||||||
import java.io.PrintWriter;
|
|
||||||
import java.io.StringWriter;
|
|
||||||
import java.lang.management.ManagementFactory;
|
|
||||||
import java.security.AccessController;
|
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
@ -125,7 +101,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
|
||||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
||||||
|
@ -133,6 +108,30 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
import org.apache.activemq.artemis.utils.SecurityFormatter;
|
import org.apache.activemq.artemis.utils.SecurityFormatter;
|
||||||
import org.apache.activemq.artemis.utils.VersionLoader;
|
import org.apache.activemq.artemis.utils.VersionLoader;
|
||||||
|
|
||||||
|
import javax.management.MBeanServer;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FilenameFilter;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.io.StringWriter;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.security.AccessController;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The ActiveMQ Artemis server implementation
|
* The ActiveMQ Artemis server implementation
|
||||||
*/
|
*/
|
||||||
|
@ -1516,12 +1515,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
||||||
|
|
||||||
SimpleString sAddress = new SimpleString(config.getAddress());
|
SimpleString sAddress = new SimpleString(config.getAddress());
|
||||||
|
|
||||||
Transformer transformer = getServiceRegistry().getDivertTransformer(config.getName());
|
Transformer transformer = getServiceRegistry().getDivertTransformer(config.getName(), config.getTransformerClassName());
|
||||||
|
|
||||||
if (transformer == null)
|
|
||||||
{
|
|
||||||
transformer = instantiateTransformer(config.getTransformerClassName());
|
|
||||||
}
|
|
||||||
|
|
||||||
Filter filter = FilterImpl.createFilter(config.getFilterString());
|
Filter filter = FilterImpl.createFilter(config.getFilterString());
|
||||||
|
|
||||||
|
@ -2206,23 +2200,6 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Transformer instantiateTransformer(final String transformerClassName)
|
|
||||||
{
|
|
||||||
Transformer transformer = null;
|
|
||||||
|
|
||||||
if (transformerClassName != null)
|
|
||||||
{
|
|
||||||
transformer = (Transformer) instantiateInstance(transformerClassName);
|
|
||||||
}
|
|
||||||
|
|
||||||
return transformer;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Object instantiateInstance(final String className)
|
|
||||||
{
|
|
||||||
return safeInitNewInstance(className);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ClassLoader getThisClassLoader()
|
private static ClassLoader getThisClassLoader()
|
||||||
{
|
{
|
||||||
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
|
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
|
||||||
|
@ -2278,23 +2255,6 @@ public class ActiveMQServerImpl implements ActiveMQServer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a
|
|
||||||
* utility class, as it would be a door to load anything you like in a safe VM.
|
|
||||||
* For that reason any class trying to do a privileged block should do with the AccessController directly.
|
|
||||||
*/
|
|
||||||
private static Object safeInitNewInstance(final String className)
|
|
||||||
{
|
|
||||||
return AccessController.doPrivileged(new PrivilegedAction<Object>()
|
|
||||||
{
|
|
||||||
public Object run()
|
|
||||||
{
|
|
||||||
return ClassloadingUtil.newInstanceFromClassLoader(className);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addProtocolManagerFactory(ProtocolManagerFactory factory)
|
public void addProtocolManagerFactory(ProtocolManagerFactory factory)
|
||||||
{
|
{
|
||||||
protocolManagerFactories.add(factory);
|
protocolManagerFactories.add(factory);
|
||||||
|
|
|
@ -16,25 +16,23 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.server.impl;
|
package org.apache.activemq.artemis.core.server.impl;
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.ConnectorService;
|
import org.apache.activemq.artemis.core.server.ConnectorService;
|
||||||
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
|
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
|
||||||
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
||||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
|
||||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ConnectorsService will pool some resource for updates, e.g. Twitter, then the changes are picked
|
* ConnectorsService will pool some resource for updates, e.g. Twitter, then the changes are picked
|
||||||
* and converted into a ServerMessage for a given destination (queue).
|
* and converted into a ServerMessage for a given destination (queue).
|
||||||
|
@ -73,21 +71,13 @@ public final class ConnectorsService implements ActiveMQComponent
|
||||||
|
|
||||||
public void start() throws Exception
|
public void start() throws Exception
|
||||||
{
|
{
|
||||||
List<ConnectorServiceConfiguration> configurationList = configuration.getConnectorServiceConfigurations();
|
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServiceFactories = serviceRegistry.getConnectorServices(configuration.getConnectorServiceConfigurations());
|
||||||
|
|
||||||
Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServiceFactories = serviceRegistry.getConnectorServices();
|
|
||||||
|
|
||||||
for (Pair<ConnectorServiceFactory, ConnectorServiceConfiguration> pair : connectorServiceFactories)
|
for (Pair<ConnectorServiceFactory, ConnectorServiceConfiguration> pair : connectorServiceFactories)
|
||||||
{
|
{
|
||||||
createService(pair.getB(), pair.getA());
|
createService(pair.getB(), pair.getA());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ConnectorServiceConfiguration info : configurationList)
|
|
||||||
{
|
|
||||||
ConnectorServiceFactory factory = (ConnectorServiceFactory) ClassloadingUtil.newInstanceFromClassLoader(info.getFactoryClassName());
|
|
||||||
createService(info, factory);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (ConnectorService connector : connectors)
|
for (ConnectorService connector : connectors)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
|
|
@ -16,24 +16,28 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.server.impl;
|
package org.apache.activemq.artemis.core.server.impl;
|
||||||
|
|
||||||
import java.security.AccessController;
|
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
|
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
|
||||||
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
|
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
|
||||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||||
|
|
||||||
|
import java.security.AccessController;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
public class ServiceRegistryImpl implements ServiceRegistry
|
public class ServiceRegistryImpl implements ServiceRegistry
|
||||||
{
|
{
|
||||||
private ExecutorService executorService;
|
private ExecutorService executorService;
|
||||||
|
@ -43,9 +47,9 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
||||||
/* We are using a List rather than HashMap here as ActiveMQ Artemis allows multiple instances of the same class to be added
|
/* We are using a List rather than HashMap here as ActiveMQ Artemis allows multiple instances of the same class to be added
|
||||||
* to the interceptor list
|
* to the interceptor list
|
||||||
*/
|
*/
|
||||||
private Map<String, Interceptor> incomingInterceptors;
|
private List<BaseInterceptor> incomingInterceptors;
|
||||||
|
|
||||||
private Map<String, Interceptor> outgoingInterceptors;
|
private List<BaseInterceptor> outgoingInterceptors;
|
||||||
|
|
||||||
private Map<String, Transformer> divertTransformers;
|
private Map<String, Transformer> divertTransformers;
|
||||||
|
|
||||||
|
@ -57,87 +61,104 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
||||||
|
|
||||||
public ServiceRegistryImpl()
|
public ServiceRegistryImpl()
|
||||||
{
|
{
|
||||||
this.incomingInterceptors = new ConcurrentHashMap<>();
|
this.incomingInterceptors = Collections.synchronizedList(new ArrayList<BaseInterceptor>());
|
||||||
this.outgoingInterceptors = new ConcurrentHashMap<>();
|
this.outgoingInterceptors = Collections.synchronizedList(new ArrayList<BaseInterceptor>());
|
||||||
this.connectorServices = new ConcurrentHashMap<>();
|
this.connectorServices = new ConcurrentHashMap<>();
|
||||||
this.divertTransformers = new ConcurrentHashMap<>();
|
this.divertTransformers = new ConcurrentHashMap<>();
|
||||||
this.bridgeTransformers = new ConcurrentHashMap<>();
|
this.bridgeTransformers = new ConcurrentHashMap<>();
|
||||||
this.acceptorFactories = new ConcurrentHashMap<>();
|
this.acceptorFactories = new ConcurrentHashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ExecutorService getExecutorService()
|
public ExecutorService getExecutorService()
|
||||||
{
|
{
|
||||||
return executorService;
|
return executorService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setExecutorService(ExecutorService executorService)
|
public void setExecutorService(ExecutorService executorService)
|
||||||
{
|
{
|
||||||
this.executorService = executorService;
|
this.executorService = executorService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ScheduledExecutorService getScheduledExecutorService()
|
public ScheduledExecutorService getScheduledExecutorService()
|
||||||
{
|
{
|
||||||
return scheduledExecutorService;
|
return scheduledExecutorService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService)
|
public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService)
|
||||||
{
|
{
|
||||||
this.scheduledExecutorService = scheduledExecutorService;
|
this.scheduledExecutorService = scheduledExecutorService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void addConnectorService(ConnectorServiceFactory connectorServiceFactory, ConnectorServiceConfiguration configuration)
|
public void addConnectorService(ConnectorServiceFactory connectorServiceFactory, ConnectorServiceConfiguration configuration)
|
||||||
{
|
{
|
||||||
connectorServices.put(configuration.getConnectorName(), new Pair<>(connectorServiceFactory, configuration));
|
connectorServices.put(configuration.getConnectorName(), new Pair<>(connectorServiceFactory, configuration));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void removeConnectorService(ConnectorServiceConfiguration configuration)
|
public void removeConnectorService(ConnectorServiceConfiguration configuration)
|
||||||
{
|
{
|
||||||
connectorServices.remove(configuration.getConnectorName());
|
connectorServices.remove(configuration.getConnectorName());
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices()
|
@Override
|
||||||
|
public Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices(List<ConnectorServiceConfiguration> configs)
|
||||||
{
|
{
|
||||||
|
if (configs != null)
|
||||||
|
{
|
||||||
|
for (final ConnectorServiceConfiguration config : configs)
|
||||||
|
{
|
||||||
|
if (connectorServices.get(config.getConnectorName()) == null)
|
||||||
|
{
|
||||||
|
ConnectorServiceFactory factory = AccessController.doPrivileged(new PrivilegedAction<ConnectorServiceFactory>()
|
||||||
|
{
|
||||||
|
public ConnectorServiceFactory run()
|
||||||
|
{
|
||||||
|
return (ConnectorServiceFactory) ClassloadingUtil.newInstanceFromClassLoader(config.getFactoryClassName());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
addConnectorService(factory, config);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return connectorServices.values();
|
return connectorServices.values();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addIncomingInterceptor(String name, Interceptor interceptor)
|
@Override
|
||||||
|
public void addIncomingInterceptor(Interceptor interceptor)
|
||||||
{
|
{
|
||||||
incomingInterceptors.put(name, interceptor);
|
incomingInterceptors.add(interceptor);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeIncomingInterceptor(String name)
|
@Override
|
||||||
|
public List<BaseInterceptor> getIncomingInterceptors(List<String> classNames)
|
||||||
{
|
{
|
||||||
incomingInterceptors.remove(name);
|
List<BaseInterceptor> interceptors = new ArrayList<>(incomingInterceptors);
|
||||||
|
|
||||||
|
instantiateInterceptors(classNames, interceptors);
|
||||||
|
|
||||||
|
return interceptors;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection<Interceptor> getIncomingInterceptors()
|
@Override
|
||||||
|
public void addOutgoingInterceptor(Interceptor interceptor)
|
||||||
{
|
{
|
||||||
return Collections.unmodifiableCollection(incomingInterceptors.values());
|
outgoingInterceptors.add(interceptor);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Interceptor getIncomingInterceptor(String name)
|
@Override
|
||||||
|
public List<BaseInterceptor> getOutgoingInterceptors(List<String> classNames)
|
||||||
{
|
{
|
||||||
return incomingInterceptors.get(name);
|
List<BaseInterceptor> interceptors = new ArrayList<>(outgoingInterceptors);
|
||||||
}
|
|
||||||
|
|
||||||
public void addOutgoingInterceptor(String name, Interceptor interceptor)
|
instantiateInterceptors(classNames, interceptors);
|
||||||
{
|
|
||||||
outgoingInterceptors.put(name, interceptor);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Interceptor getOutgoingInterceptor(String name)
|
return interceptors;
|
||||||
{
|
|
||||||
return outgoingInterceptors.get(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void removeOutgoingInterceptor(String name)
|
|
||||||
{
|
|
||||||
outgoingInterceptors.remove(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Collection<Interceptor> getOutgoingInterceptors()
|
|
||||||
{
|
|
||||||
return Collections.unmodifiableCollection(outgoingInterceptors.values());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -147,9 +168,17 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Transformer getDivertTransformer(String name)
|
public Transformer getDivertTransformer(String name, String className)
|
||||||
{
|
{
|
||||||
return divertTransformers.get(name);
|
Transformer transformer = divertTransformers.get(name);
|
||||||
|
|
||||||
|
if (transformer == null && className != null)
|
||||||
|
{
|
||||||
|
transformer = instantiateTransformer(className);
|
||||||
|
addDivertTransformer(name, transformer);
|
||||||
|
}
|
||||||
|
|
||||||
|
return transformer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -159,9 +188,17 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Transformer getBridgeTransformer(String name)
|
public Transformer getBridgeTransformer(String name, String className)
|
||||||
{
|
{
|
||||||
return bridgeTransformers.get(name);
|
Transformer transformer = bridgeTransformers.get(name);
|
||||||
|
|
||||||
|
if (transformer == null && className != null)
|
||||||
|
{
|
||||||
|
transformer = instantiateTransformer(className);
|
||||||
|
addBridgeTransformer(name, transformer);
|
||||||
|
}
|
||||||
|
|
||||||
|
return transformer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -169,7 +206,7 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
||||||
{
|
{
|
||||||
AcceptorFactory factory = acceptorFactories.get(name);
|
AcceptorFactory factory = acceptorFactories.get(name);
|
||||||
|
|
||||||
if (factory == null)
|
if (factory == null && className != null)
|
||||||
{
|
{
|
||||||
factory = AccessController.doPrivileged(new PrivilegedAction<AcceptorFactory>()
|
factory = AccessController.doPrivileged(new PrivilegedAction<AcceptorFactory>()
|
||||||
{
|
{
|
||||||
|
@ -190,4 +227,47 @@ public class ServiceRegistryImpl implements ServiceRegistry
|
||||||
{
|
{
|
||||||
acceptorFactories.put(name, acceptorFactory);
|
acceptorFactories.put(name, acceptorFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Transformer instantiateTransformer(final String className)
|
||||||
|
{
|
||||||
|
Transformer transformer = null;
|
||||||
|
|
||||||
|
if (className != null)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
transformer = AccessController.doPrivileged(new PrivilegedAction<Transformer>()
|
||||||
|
{
|
||||||
|
public Transformer run()
|
||||||
|
{
|
||||||
|
return (Transformer) ClassloadingUtil.newInstanceFromClassLoader(className);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, className);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return transformer;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void instantiateInterceptors(List<String> classNames, List<BaseInterceptor> interceptors)
|
||||||
|
{
|
||||||
|
if (classNames != null)
|
||||||
|
{
|
||||||
|
for (final String className : classNames)
|
||||||
|
{
|
||||||
|
BaseInterceptor interceptor = AccessController.doPrivileged(new PrivilegedAction<BaseInterceptor>()
|
||||||
|
{
|
||||||
|
public BaseInterceptor run()
|
||||||
|
{
|
||||||
|
return (BaseInterceptor) ClassloadingUtil.newInstanceFromClassLoader(className);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
interceptors.add(interceptor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,14 +22,13 @@ import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
|
||||||
import org.apache.activemq.artemis.tests.unit.core.remoting.server.impl.fake.FakeInterceptor;
|
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||||
import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
|
import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
|
||||||
|
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
|
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
|
||||||
|
import org.apache.activemq.artemis.tests.unit.core.remoting.server.impl.fake.FakeInterceptor;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -53,27 +52,21 @@ public class RemotingServiceImplTest
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that the method addReflectivelyInstantiatedInterceptors creates new instances of interceptors and adds
|
* Tests that the service registry gets propaged into remotingService.
|
||||||
* them to the provided list.
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testAddReflectivelyInstantiatedInterceptorsAddsNewInstancesToList() throws Exception
|
public void testPropagatingInterceptors() throws Exception
|
||||||
{
|
{
|
||||||
Method method = RemotingServiceImpl.class.getDeclaredMethod("addReflectivelyInstantiatedInterceptors",
|
|
||||||
List.class,
|
|
||||||
List.class);
|
|
||||||
method.setAccessible(true);
|
|
||||||
List<String> interceptorClassNames = new ArrayList<String>();
|
|
||||||
for (int i = 0; i < 5; i++)
|
for (int i = 0; i < 5; i++)
|
||||||
{
|
{
|
||||||
interceptorClassNames.add(FakeInterceptor.class.getCanonicalName());
|
serviceRegistry.addIncomingInterceptor(new FakeInterceptor());
|
||||||
}
|
}
|
||||||
List<Interceptor> interceptors = new ArrayList<Interceptor>();
|
|
||||||
method.invoke(remotingService, interceptorClassNames, interceptors);
|
|
||||||
|
|
||||||
assertTrue(interceptors.size() == 5);
|
remotingService = new RemotingServiceImpl(null, configuration, null, null, null, null, null, serviceRegistry);
|
||||||
assertTrue(interceptors.get(0) instanceof FakeInterceptor);
|
|
||||||
assertTrue(interceptors.get(0) != interceptors.get(1));
|
assertTrue(remotingService.getIncomingInterceptors().size() == 5);
|
||||||
|
assertTrue(remotingService.getIncomingInterceptors().get(0) instanceof FakeInterceptor);
|
||||||
|
assertTrue(remotingService.getIncomingInterceptors().get(0) != remotingService.getIncomingInterceptors().get(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -83,8 +76,7 @@ public class RemotingServiceImplTest
|
||||||
@Test
|
@Test
|
||||||
public void testSetInterceptorsAddsBothInterceptorsFromConfigAndServiceRegistry() throws Exception
|
public void testSetInterceptorsAddsBothInterceptorsFromConfigAndServiceRegistry() throws Exception
|
||||||
{
|
{
|
||||||
Method method = RemotingServiceImpl.class.getDeclaredMethod("setInterceptors",
|
Method method = RemotingServiceImpl.class.getDeclaredMethod("setInterceptors", Configuration.class);
|
||||||
Configuration.class);
|
|
||||||
Field incomingInterceptors = RemotingServiceImpl.class.getDeclaredField("incomingInterceptors");
|
Field incomingInterceptors = RemotingServiceImpl.class.getDeclaredField("incomingInterceptors");
|
||||||
Field outgoingInterceptors = RemotingServiceImpl.class.getDeclaredField("outgoingInterceptors");
|
Field outgoingInterceptors = RemotingServiceImpl.class.getDeclaredField("outgoingInterceptors");
|
||||||
|
|
||||||
|
@ -92,10 +84,10 @@ public class RemotingServiceImplTest
|
||||||
incomingInterceptors.setAccessible(true);
|
incomingInterceptors.setAccessible(true);
|
||||||
outgoingInterceptors.setAccessible(true);
|
outgoingInterceptors.setAccessible(true);
|
||||||
|
|
||||||
serviceRegistry.addIncomingInterceptor("Foo", new FakeInterceptor());
|
serviceRegistry.addIncomingInterceptor(new FakeInterceptor());
|
||||||
serviceRegistry.addOutgoingInterceptor("Bar", new FakeInterceptor());
|
serviceRegistry.addOutgoingInterceptor(new FakeInterceptor());
|
||||||
|
|
||||||
List<String> interceptorClassNames = new ArrayList<String>();
|
List<String> interceptorClassNames = new ArrayList<>();
|
||||||
interceptorClassNames.add(FakeInterceptor.class.getCanonicalName());
|
interceptorClassNames.add(FakeInterceptor.class.getCanonicalName());
|
||||||
configuration.setIncomingInterceptorClassNames(interceptorClassNames);
|
configuration.setIncomingInterceptorClassNames(interceptorClassNames);
|
||||||
configuration.setOutgoingInterceptorClassNames(interceptorClassNames);
|
configuration.setOutgoingInterceptorClassNames(interceptorClassNames);
|
||||||
|
@ -104,8 +96,8 @@ public class RemotingServiceImplTest
|
||||||
|
|
||||||
assertTrue(((List) incomingInterceptors.get(remotingService)).size() == 2 );
|
assertTrue(((List) incomingInterceptors.get(remotingService)).size() == 2 );
|
||||||
assertTrue(((List) outgoingInterceptors.get(remotingService)).size() == 2 );
|
assertTrue(((List) outgoingInterceptors.get(remotingService)).size() == 2 );
|
||||||
assertTrue(((List) incomingInterceptors.get(remotingService)).contains(serviceRegistry.getIncomingInterceptor("Foo")));
|
assertTrue(((List) incomingInterceptors.get(remotingService)).contains(serviceRegistry.getIncomingInterceptors(null).get(0)));
|
||||||
assertTrue(((List) outgoingInterceptors.get(remotingService)).contains(serviceRegistry.getOutgoingInterceptor("Bar")));
|
assertTrue(((List) outgoingInterceptors.get(remotingService)).contains(serviceRegistry.getOutgoingInterceptors(null).get(0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -115,19 +107,16 @@ public class RemotingServiceImplTest
|
||||||
@Test
|
@Test
|
||||||
public void testInterceptorsAreAddedOnCreationOfServiceRegistry() throws Exception
|
public void testInterceptorsAreAddedOnCreationOfServiceRegistry() throws Exception
|
||||||
{
|
{
|
||||||
Method method = RemotingServiceImpl.class.getDeclaredMethod("setInterceptors",
|
|
||||||
Configuration.class);
|
|
||||||
Field incomingInterceptors = RemotingServiceImpl.class.getDeclaredField("incomingInterceptors");
|
Field incomingInterceptors = RemotingServiceImpl.class.getDeclaredField("incomingInterceptors");
|
||||||
Field outgoingInterceptors = RemotingServiceImpl.class.getDeclaredField("outgoingInterceptors");
|
Field outgoingInterceptors = RemotingServiceImpl.class.getDeclaredField("outgoingInterceptors");
|
||||||
|
|
||||||
method.setAccessible(true);
|
|
||||||
incomingInterceptors.setAccessible(true);
|
incomingInterceptors.setAccessible(true);
|
||||||
outgoingInterceptors.setAccessible(true);
|
outgoingInterceptors.setAccessible(true);
|
||||||
|
|
||||||
serviceRegistry.addIncomingInterceptor("Foo", new FakeInterceptor());
|
serviceRegistry.addIncomingInterceptor(new FakeInterceptor());
|
||||||
serviceRegistry.addOutgoingInterceptor("Bar", new FakeInterceptor());
|
serviceRegistry.addOutgoingInterceptor(new FakeInterceptor());
|
||||||
|
|
||||||
List<String> interceptorClassNames = new ArrayList<String>();
|
List<String> interceptorClassNames = new ArrayList<>();
|
||||||
interceptorClassNames.add(FakeInterceptor.class.getCanonicalName());
|
interceptorClassNames.add(FakeInterceptor.class.getCanonicalName());
|
||||||
configuration.setIncomingInterceptorClassNames(interceptorClassNames);
|
configuration.setIncomingInterceptorClassNames(interceptorClassNames);
|
||||||
configuration.setOutgoingInterceptorClassNames(interceptorClassNames);
|
configuration.setOutgoingInterceptorClassNames(interceptorClassNames);
|
||||||
|
@ -136,7 +125,7 @@ public class RemotingServiceImplTest
|
||||||
|
|
||||||
assertTrue(((List) incomingInterceptors.get(remotingService)).size() == 2 );
|
assertTrue(((List) incomingInterceptors.get(remotingService)).size() == 2 );
|
||||||
assertTrue(((List) outgoingInterceptors.get(remotingService)).size() == 2 );
|
assertTrue(((List) outgoingInterceptors.get(remotingService)).size() == 2 );
|
||||||
assertTrue(((List) incomingInterceptors.get(remotingService)).contains(serviceRegistry.getIncomingInterceptor("Foo")));
|
assertTrue(((List) incomingInterceptors.get(remotingService)).contains(serviceRegistry.getIncomingInterceptors(null).get(0)));
|
||||||
assertTrue(((List) outgoingInterceptors.get(remotingService)).contains(serviceRegistry.getOutgoingInterceptor("Bar")));
|
assertTrue(((List) outgoingInterceptors.get(remotingService)).contains(serviceRegistry.getOutgoingInterceptors(null).get(0)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue