diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java index 57373d06c6..1330ed07f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java @@ -164,4 +164,9 @@ public class DivertBinding implements Binding { } + public Divert getDivert() + { + return divert; + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java index 82ec7e514f..9ae98c79b1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.server.cluster.Transformer; public interface Divert extends Bindable { @@ -28,4 +29,6 @@ public interface Divert extends Bindable SimpleString getUniqueName(); SimpleString getRoutingName(); + + Transformer getTransformer(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java index c1710b69ab..41dd06d36a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration; +import org.apache.activemq.artemis.core.server.cluster.Transformer; import java.util.Collection; import java.util.concurrent.ExecutorService; @@ -58,4 +59,8 @@ public interface ServiceRegistry void removeOutgoingInterceptor(String name); Collection getOutgoingInterceptors(); + + Transformer getDivertTransformer(String name); + + Transformer getBridgeTransformer(String name); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index e3ad79fd0a..40f7302fe5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -458,7 +458,12 @@ public final class ClusterManager implements ActiveMQComponent return; } - Transformer transformer = instantiateTransformer(config.getTransformerClassName()); + Transformer transformer = server.getServiceRegistry().getBridgeTransformer(config.getName()); + + if (transformer == null) + { + transformer = instantiateTransformer(config.getTransformerClassName()); + } Binding binding = postOffice.getBinding(new SimpleString(config.getQueueName())); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index d88ff4a0c9..243bc71207 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -806,6 +806,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled { return (ClientSessionFactoryImpl) csf; } + + public Transformer getTransformer() + { + return transformer; + } protected void fail(final boolean permanently) { ActiveMQServerLogger.LOGGER.debug(this + "\n\t::fail being called, permanently=" + permanently); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 11caee4998..d3c1722ee0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1516,7 +1516,12 @@ public class ActiveMQServerImpl implements ActiveMQServer SimpleString sAddress = new SimpleString(config.getAddress()); - Transformer transformer = instantiateTransformer(config.getTransformerClassName()); + Transformer transformer = getServiceRegistry().getDivertTransformer(config.getName()); + + if (transformer == null) + { + transformer = instantiateTransformer(config.getTransformerClassName()); + } Filter filter = FilterImpl.createFilter(config.getFilterString()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index e4106431aa..c6db703d21 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -140,6 +140,11 @@ public class DivertImpl implements Divert return filter; } + public Transformer getTransformer() + { + return transformer; + } + /* (non-Javadoc) * @see java.lang.Object#toString() */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java index 5a58b41537..f0825dcdfe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration; import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; import org.apache.activemq.artemis.core.server.ServiceRegistry; +import org.apache.activemq.artemis.core.server.cluster.Transformer; public class ServiceRegistryImpl implements ServiceRegistry { @@ -42,6 +43,10 @@ public class ServiceRegistryImpl implements ServiceRegistry private Map outgoingInterceptors; + private Map divertTransformers; + + private Map bridgeTransformers; + private Map> connectorServices; public ServiceRegistryImpl() @@ -49,6 +54,8 @@ public class ServiceRegistryImpl implements ServiceRegistry this.incomingInterceptors = new ConcurrentHashMap<>(); this.outgoingInterceptors = new ConcurrentHashMap<>(); this.connectorServices = new ConcurrentHashMap<>(); + this.divertTransformers = new ConcurrentHashMap<>(); + this.bridgeTransformers = new ConcurrentHashMap<>(); } public ExecutorService getExecutorService() @@ -125,4 +132,25 @@ public class ServiceRegistryImpl implements ServiceRegistry { return Collections.unmodifiableCollection(outgoingInterceptors.values()); } + + public void addDivertTransformer(String name, Transformer transformer) + { + divertTransformers.put(name, transformer); + } + + public Transformer getDivertTransformer(String name) + { + return divertTransformers.get(name); + } + + public void addBridgeTransformer(String name, Transformer transformer) + { + bridgeTransformers.put(name, transformer); + } + + @Override + public Transformer getBridgeTransformer(String name) + { + return bridgeTransformers.get(name); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index a82c2f1e8f..5fea52d875 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -47,7 +48,12 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.cluster.Bridge; +import org.apache.activemq.artemis.core.server.cluster.Transformer; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; @@ -1876,6 +1882,40 @@ public class BridgeTest extends ActiveMQTestBase closeFields(); } + @Test + public void testInjectedTransformer() throws Exception + { + final SimpleString ADDRESS = new SimpleString("myAddress"); + final SimpleString QUEUE = new SimpleString("myQueue"); + final String BRIDGE = "myBridge"; + + ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl(); + Transformer transformer = new Transformer() + { + @Override + public ServerMessage transform(ServerMessage message) + { + return null; + } + }; + serviceRegistry.addBridgeTransformer(BRIDGE, transformer); + Configuration config = createDefaultInVMConfig().addConnectorConfiguration("in-vm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + ActiveMQServer server = addServer(new ActiveMQServerImpl(config, null, null, null, serviceRegistry)); + server.start(); + server.waitForActivation(100, TimeUnit.MILLISECONDS); + server.deployQueue(ADDRESS, QUEUE, null, false, false); + List connectors = new ArrayList<>(); + connectors.add("in-vm"); + server.deployBridge(new BridgeConfiguration() + .setName(BRIDGE) + .setQueueName(QUEUE.toString()) + .setForwardingAddress(ADDRESS.toString()) + .setStaticConnectors(connectors)); + Bridge bridge = server.getClusterManager().getBridges().get(BRIDGE); + assertNotNull(bridge); + assertEquals(transformer, ((BridgeImpl)bridge).getTransformer()); + } + /** * It will inspect the journal directly and determine if there are queues on this journal, * diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java index 0681928e5c..23c01910b4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java @@ -26,13 +26,23 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.Divert; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.cluster.Transformer; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; import org.junit.Test; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + public class DivertTest extends ActiveMQTestBase { private static final int TIMEOUT = 500; @@ -1334,4 +1344,41 @@ public class DivertTest extends ActiveMQTestBase Assert.assertNull(consumer4.receiveImmediate()); } + @Test + public void testInjectedTransformer() throws Exception + { + final SimpleString ADDRESS = new SimpleString("myAddress"); + final String DIVERT = "myDivert"; + + ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl(); + Transformer transformer = new Transformer() + { + @Override + public ServerMessage transform(ServerMessage message) + { + return null; + } + }; + serviceRegistry.addDivertTransformer(DIVERT, transformer); + + ActiveMQServer server = addServer(new ActiveMQServerImpl(null, null, null, null, serviceRegistry)); + server.start(); + server.waitForActivation(100, TimeUnit.MILLISECONDS); + server.deployQueue(ADDRESS, SimpleString.toSimpleString("myQueue"), null, false, false); + server.deployDivert(new DivertConfiguration() + .setName(DIVERT) + .setAddress(ADDRESS.toString()) + .setForwardingAddress(ADDRESS.toString())); + Collection bindings = server.getPostOffice().getBindingsForAddress(ADDRESS).getBindings(); + Divert divert = null; + for (Binding binding : bindings) + { + if (binding instanceof DivertBinding) + { + divert = ((DivertBinding)binding).getDivert(); + } + } + assertNotNull(divert); + assertEquals(transformer, divert.getTransformer()); + } }