[ARTEMIS-92] Transformer instance injection

This commit is contained in:
jbertram 2015-06-16 09:01:09 -05:00
parent ce7d87eb52
commit adb9ccd013
10 changed files with 150 additions and 2 deletions

View File

@ -164,4 +164,9 @@ public class DivertBinding implements Binding
{
}
public Divert getDivert()
{
return divert;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
public interface Divert extends Bindable
{
@ -28,4 +29,6 @@ public interface Divert extends Bindable
SimpleString getUniqueName();
SimpleString getRoutingName();
Transformer getTransformer();
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
@ -58,4 +59,8 @@ public interface ServiceRegistry
void removeOutgoingInterceptor(String name);
Collection<Interceptor> getOutgoingInterceptors();
Transformer getDivertTransformer(String name);
Transformer getBridgeTransformer(String name);
}

View File

@ -458,7 +458,12 @@ public final class ClusterManager implements ActiveMQComponent
return;
}
Transformer transformer = instantiateTransformer(config.getTransformerClassName());
Transformer transformer = server.getServiceRegistry().getBridgeTransformer(config.getName());
if (transformer == null)
{
transformer = instantiateTransformer(config.getTransformerClassName());
}
Binding binding = postOffice.getBinding(new SimpleString(config.getQueueName()));

View File

@ -806,6 +806,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
{
return (ClientSessionFactoryImpl) csf;
}
public Transformer getTransformer()
{
return transformer;
}
protected void fail(final boolean permanently)
{
ActiveMQServerLogger.LOGGER.debug(this + "\n\t::fail being called, permanently=" + permanently);

View File

@ -1516,7 +1516,12 @@ public class ActiveMQServerImpl implements ActiveMQServer
SimpleString sAddress = new SimpleString(config.getAddress());
Transformer transformer = instantiateTransformer(config.getTransformerClassName());
Transformer transformer = getServiceRegistry().getDivertTransformer(config.getName());
if (transformer == null)
{
transformer = instantiateTransformer(config.getTransformerClassName());
}
Filter filter = FilterImpl.createFilter(config.getFilterString());

View File

@ -140,6 +140,11 @@ public class DivertImpl implements Divert
return filter;
}
public Transformer getTransformer()
{
return transformer;
}
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
public class ServiceRegistryImpl implements ServiceRegistry
{
@ -42,6 +43,10 @@ public class ServiceRegistryImpl implements ServiceRegistry
private Map<String, Interceptor> outgoingInterceptors;
private Map<String, Transformer> divertTransformers;
private Map<String, Transformer> bridgeTransformers;
private Map<String, Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServices;
public ServiceRegistryImpl()
@ -49,6 +54,8 @@ public class ServiceRegistryImpl implements ServiceRegistry
this.incomingInterceptors = new ConcurrentHashMap<>();
this.outgoingInterceptors = new ConcurrentHashMap<>();
this.connectorServices = new ConcurrentHashMap<>();
this.divertTransformers = new ConcurrentHashMap<>();
this.bridgeTransformers = new ConcurrentHashMap<>();
}
public ExecutorService getExecutorService()
@ -125,4 +132,25 @@ public class ServiceRegistryImpl implements ServiceRegistry
{
return Collections.unmodifiableCollection(outgoingInterceptors.values());
}
public void addDivertTransformer(String name, Transformer transformer)
{
divertTransformers.put(name, transformer);
}
public Transformer getDivertTransformer(String name)
{
return divertTransformers.get(name);
}
public void addBridgeTransformer(String name, Transformer transformer)
{
bridgeTransformers.put(name, transformer);
}
@Override
public Transformer getBridgeTransformer(String name)
{
return bridgeTransformers.get(name);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -47,7 +48,12 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
@ -1876,6 +1882,40 @@ public class BridgeTest extends ActiveMQTestBase
closeFields();
}
@Test
public void testInjectedTransformer() throws Exception
{
final SimpleString ADDRESS = new SimpleString("myAddress");
final SimpleString QUEUE = new SimpleString("myQueue");
final String BRIDGE = "myBridge";
ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl();
Transformer transformer = new Transformer()
{
@Override
public ServerMessage transform(ServerMessage message)
{
return null;
}
};
serviceRegistry.addBridgeTransformer(BRIDGE, transformer);
Configuration config = createDefaultInVMConfig().addConnectorConfiguration("in-vm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
ActiveMQServer server = addServer(new ActiveMQServerImpl(config, null, null, null, serviceRegistry));
server.start();
server.waitForActivation(100, TimeUnit.MILLISECONDS);
server.deployQueue(ADDRESS, QUEUE, null, false, false);
List<String> connectors = new ArrayList<>();
connectors.add("in-vm");
server.deployBridge(new BridgeConfiguration()
.setName(BRIDGE)
.setQueueName(QUEUE.toString())
.setForwardingAddress(ADDRESS.toString())
.setStaticConnectors(connectors));
Bridge bridge = server.getClusterManager().getBridges().get(BRIDGE);
assertNotNull(bridge);
assertEquals(transformer, ((BridgeImpl)bridge).getTransformer());
}
/**
* It will inspect the journal directly and determine if there are queues on this journal,
*

View File

@ -26,13 +26,23 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
public class DivertTest extends ActiveMQTestBase
{
private static final int TIMEOUT = 500;
@ -1334,4 +1344,41 @@ public class DivertTest extends ActiveMQTestBase
Assert.assertNull(consumer4.receiveImmediate());
}
@Test
public void testInjectedTransformer() throws Exception
{
final SimpleString ADDRESS = new SimpleString("myAddress");
final String DIVERT = "myDivert";
ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl();
Transformer transformer = new Transformer()
{
@Override
public ServerMessage transform(ServerMessage message)
{
return null;
}
};
serviceRegistry.addDivertTransformer(DIVERT, transformer);
ActiveMQServer server = addServer(new ActiveMQServerImpl(null, null, null, null, serviceRegistry));
server.start();
server.waitForActivation(100, TimeUnit.MILLISECONDS);
server.deployQueue(ADDRESS, SimpleString.toSimpleString("myQueue"), null, false, false);
server.deployDivert(new DivertConfiguration()
.setName(DIVERT)
.setAddress(ADDRESS.toString())
.setForwardingAddress(ADDRESS.toString()));
Collection<Binding> bindings = server.getPostOffice().getBindingsForAddress(ADDRESS).getBindings();
Divert divert = null;
for (Binding binding : bindings)
{
if (binding instanceof DivertBinding)
{
divert = ((DivertBinding)binding).getDivert();
}
}
assertNotNull(divert);
assertEquals(transformer, divert.getTransformer());
}
}