diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index b18881d21f..967e977a47 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -1442,7 +1442,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName)); listener.setCreatedByDuplex(true); - duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); + duplexBridge = config.getBridgeFactory().createNetworkBridge(config, localTransport, remoteBridgeTransport, listener); duplexBridge.setBrokerService(brokerService); //Need to set durableDestinations to properly restart subs when dynamicOnly=false duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations( diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/BridgeFactory.java b/activemq-broker/src/main/java/org/apache/activemq/network/BridgeFactory.java new file mode 100644 index 0000000000..af6a6db27f --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/network/BridgeFactory.java @@ -0,0 +1,23 @@ +package org.apache.activemq.network; + +import org.apache.activemq.transport.Transport; + +/** + * Encapsulation of bridge creation logic. + * + * This SPI interface is intended to customize or decorate existing bridge implementations. + */ +public interface BridgeFactory { + + /** + * Create a network bridge between two specified transports. + * + * @param configuration Bridge configuration. + * @param localTransport Local side of bridge. + * @param remoteTransport Remote side of bridge. + * @param listener Bridge listener. + * @return the NetworkBridge + */ + DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, final NetworkBridgeListener listener); + +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index e05c42cae7..3850da5979 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -256,7 +256,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco } NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName()); - DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener); + DemandForwardingBridge result = getBridgeFactory().createNetworkBridge(this, localTransport, remoteTransport, listener); result.setBrokerService(getBrokerService()); return configureBridge(result); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index 3c64758ddd..1adff09768 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -77,6 +77,11 @@ public class NetworkBridgeConfiguration { private long gcSweepTime = 60 * 1000; private boolean checkDuplicateMessagesOnDuplex = false; + /** + * Bridge factory implementation - by default backed by static factory, which is default implementation and will rely change. + */ + private BridgeFactory bridgeFactory = NetworkBridgeFactory.INSTANCE; + /** * @return the conduitSubscriptions */ @@ -541,6 +546,14 @@ public class NetworkBridgeConfiguration { return useVirtualDestSubs; } + public BridgeFactory getBridgeFactory() { + return bridgeFactory; + } + + public void setBridgeFactory(BridgeFactory bridgeFactory) { + this.bridgeFactory = bridgeFactory; + } + /** * This was a typo, so this is deprecated as of 5.13.1 */ diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java index 32a78537ac..0ac56dd9e5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java @@ -18,7 +18,10 @@ package org.apache.activemq.network; import java.net.URI; import java.util.HashMap; -import org.apache.activemq.broker.Broker; +import java.util.LinkedHashSet; +import java.util.ServiceLoader; +import java.util.Set; + import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.util.URISupport; @@ -28,13 +31,32 @@ import org.apache.activemq.util.URISupport; * * */ -public final class NetworkBridgeFactory { +public final class NetworkBridgeFactory implements BridgeFactory { + + public final static BridgeFactory INSTANCE = new NetworkBridgeFactory(); private NetworkBridgeFactory() { + + } + + @Override + public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) { + if (configuration.isConduitSubscriptions()) { + // dynamicOnly determines whether durables are auto bridged + return attachListener(new DurableConduitBridge(configuration, localTransport, remoteTransport), listener); + } + return attachListener(new DemandForwardingBridge(configuration, localTransport, remoteTransport), listener); + } + + private DemandForwardingBridge attachListener(DemandForwardingBridge bridge, NetworkBridgeListener listener) { + if (listener != null) { + bridge.setNetworkBridgeListener(listener); + } + return bridge; } /** - * create a network bridge + * Create a network bridge * * @param configuration * @param localTransport @@ -42,20 +64,11 @@ public final class NetworkBridgeFactory { * @param listener * @return the NetworkBridge */ + @Deprecated public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, final NetworkBridgeListener listener) { - DemandForwardingBridge result = null; - if (configuration.isConduitSubscriptions()) { - // dynamicOnly determines whether durables are auto bridged - result = new DurableConduitBridge(configuration, localTransport, remoteTransport); - } else { - result = new DemandForwardingBridge(configuration, localTransport, remoteTransport); - } - if (listener != null) { - result.setNetworkBridgeListener(listener); - } - return result; + return INSTANCE.createNetworkBridge(configuration, localTransport, remoteTransport, listener); } public static Transport createLocalTransport(NetworkBridgeConfiguration configuration, URI uri) throws Exception { @@ -74,4 +87,5 @@ public final class NetworkBridgeFactory { uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); return TransportFactory.connect(uri); } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java new file mode 100644 index 0000000000..6bb9384bd4 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java @@ -0,0 +1,96 @@ +package org.apache.activemq.network; + +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; + +public class BaseNetworkTest { + + protected final Logger LOG = LoggerFactory.getLogger(getClass()); + + protected Connection localConnection; + protected Connection remoteConnection; + protected BrokerService localBroker; + protected BrokerService remoteBroker; + protected Session localSession; + protected Session remoteSession; + + @Before + public final void setUp() throws Exception { + doSetUp(true); + } + + @After + public final void tearDown() throws Exception { + doTearDown(); + } + + protected void doTearDown() throws Exception { + localConnection.close(); + remoteConnection.close(); + localBroker.stop(); + remoteBroker.stop(); + } + + protected void doSetUp(boolean deleteAllMessages) throws Exception { + remoteBroker = createRemoteBroker(); + remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + localBroker = createLocalBroker(); + localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + localBroker.start(); + localBroker.waitUntilStarted(); + URI localURI = localBroker.getVmConnectorURI(); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); + fac.setAlwaysSyncSend(true); + fac.setDispatchAsync(false); + localConnection = fac.createConnection(); + localConnection.setClientID("clientId"); + localConnection.start(); + URI remoteURI = remoteBroker.getVmConnectorURI(); + fac = new ActiveMQConnectionFactory(remoteURI); + remoteConnection = fac.createConnection(); + remoteConnection.setClientID("clientId"); + remoteConnection.start(); + localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + protected String getRemoteBrokerURI() { + return "org/apache/activemq/network/remoteBroker.xml"; + } + + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/localBroker.xml"; + } + + protected BrokerService createBroker(String uri) throws Exception { + Resource resource = new ClassPathResource(uri); + BrokerFactoryBean factory = new BrokerFactoryBean(resource); + resource = new ClassPathResource(uri); + factory = new BrokerFactoryBean(resource); + factory.afterPropertiesSet(); + BrokerService result = factory.getBroker(); + return result; + } + + protected BrokerService createLocalBroker() throws Exception { + return createBroker(getLocalBrokerURI()); + } + + protected BrokerService createRemoteBroker() throws Exception { + return createBroker(getRemoteBrokerURI()); + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CustomBridgeFactoryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CustomBridgeFactoryTest.java new file mode 100644 index 0000000000..3340584974 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CustomBridgeFactoryTest.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.Message; +import org.apache.activemq.transport.Transport; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * Basic test which verify if custom bridge factory receives any interactions when configured. + */ +public class CustomBridgeFactoryTest extends BaseNetworkTest { + + private ActiveMQQueue outgoing = new ActiveMQQueue("outgoing"); + + /** + * Verification of outgoing communication - from local broker (with customized bridge configured) to remote one. + */ + @Test + public void verifyOutgoingCommunication() throws JMSException { + CustomNetworkBridgeFactory bridgeFactory = getCustomNetworkBridgeFactory(); + NetworkBridgeListener listener = bridgeFactory.getListener(); + + verify(listener).onStart(any(NetworkBridge.class)); + verifyNoMoreInteractions(listener); + + send(localSession, outgoing, localSession.createTextMessage("test message")); + assertNotNull("Message didn't arrive", receive(remoteSession, outgoing)); + + verify(listener).onOutboundMessage(any(NetworkBridge.class), any(Message.class)); + verifyNoMoreInteractions(listener); + } + + /** + * Additional test which makes sure that custom bridge receives notification about broker shutdown. + */ + @Test + public void verifyBrokerShutdown() { + shutdownTest(() -> { + try { + localBroker.stop(); + } catch (Exception e) { + return e; + } + return null; + }); + } + + /** + * Verification of network connector shutdown. + */ + @Test + public void verifyConnectorShutdown() { + shutdownTest(() -> { + try { + getLocalConnector(0).stop(); + } catch (Exception e) { + return e; + } + return null; + }); + } + + private void shutdownTest(Supplier callback) { + CustomNetworkBridgeFactory bridgeFactory = getCustomNetworkBridgeFactory(); + NetworkBridgeListener listener = bridgeFactory.getListener(); + + verify(listener).onStart(any(NetworkBridge.class)); + verifyNoMoreInteractions(listener); + + Throwable throwable = callback.get(); + assertNull("Unexpected error", throwable); + + verify(listener).onStop(any(NetworkBridge.class)); + verifyNoMoreInteractions(listener); + } + + // helper methods + private void send(Session session, ActiveMQQueue destination, TextMessage message) throws JMSException { + MessageProducer producer = session.createProducer(destination); + try { + producer.send(message); + } finally { + producer.close(); + } + } + + private javax.jms.Message receive(Session session, ActiveMQQueue destination) throws JMSException { + MessageConsumer consumer = session.createConsumer(destination); + try { + return consumer.receive(TimeUnit.SECONDS.toMillis(5)); + } finally { + consumer.close(); + } + } + + // infrastructure operations digging for connectors in running broker + private CustomNetworkBridgeFactory getCustomNetworkBridgeFactory() { + NetworkConnector connector = getLocalConnector(0); + + assertTrue(connector.getBridgeFactory() instanceof CustomNetworkBridgeFactory); + + return (CustomNetworkBridgeFactory) connector.getBridgeFactory(); + } + + private NetworkConnector getLocalConnector(int index) { + return localBroker.getNetworkConnectors().get(index); + } + + // customizations + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/localBroker-custom-factory.xml"; + } + + // test classes + static class CustomNetworkBridgeFactory implements BridgeFactory { + + private final NetworkBridgeListener listener; + + CustomNetworkBridgeFactory() { + this(Mockito.mock(NetworkBridgeListener.class)); + } + + CustomNetworkBridgeFactory(NetworkBridgeListener listener) { + this.listener = listener; + } + + public NetworkBridgeListener getListener() { + return listener; + } + + @Override + public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) { + DemandForwardingBridge bridge = new DemandForwardingBridge(configuration, localTransport, remoteTransport); + bridge.setNetworkBridgeListener(new CompositeNetworkBridgeListener(this.listener, listener)); + return bridge; + } + + } + + static class CompositeNetworkBridgeListener implements NetworkBridgeListener { + + private final List listeners; + + public CompositeNetworkBridgeListener(NetworkBridgeListener ... wrapped) { + this.listeners = Arrays.asList(wrapped); + } + + @Override + public void bridgeFailed() { + for (NetworkBridgeListener listener : listeners) { + listener.bridgeFailed(); + } + } + + @Override + public void onStart(NetworkBridge bridge) { + for (NetworkBridgeListener listener : listeners) { + listener.onStart(bridge); + } + } + + @Override + public void onStop(NetworkBridge bridge) { + for (NetworkBridgeListener listener : listeners) { + listener.onStop(bridge); + } + } + + @Override + public void onOutboundMessage(NetworkBridge bridge, Message message) { + for (NetworkBridgeListener listener : listeners) { + listener.onOutboundMessage(bridge, message); + } + } + + @Override + public void onInboundMessage(NetworkBridge bridge, Message message) { + for (NetworkBridgeListener listener : listeners) { + listener.onInboundMessage(bridge, message); + } + } + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java index 2df48ddef6..c030db5659 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java @@ -21,11 +21,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.net.URI; import java.util.Arrays; import java.util.concurrent.ConcurrentMap; -import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; @@ -33,13 +31,11 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; -import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicRequestor; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -48,33 +44,27 @@ import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait.Condition; -import org.apache.activemq.xbean.BrokerFactoryBean; -import org.junit.After; -import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.context.support.AbstractApplicationContext; -import org.springframework.core.io.ClassPathResource; -import org.springframework.core.io.Resource; -public class SimpleNetworkTest { +public class SimpleNetworkTest extends BaseNetworkTest { protected static final int MESSAGE_COUNT = 10; - private static final Logger LOG = LoggerFactory.getLogger(SimpleNetworkTest.class); protected AbstractApplicationContext context; - protected Connection localConnection; - protected Connection remoteConnection; - protected BrokerService localBroker; - protected BrokerService remoteBroker; - protected Session localSession; - protected Session remoteSession; protected ActiveMQTopic included; protected ActiveMQTopic excluded; protected String consumerName = "durableSubs"; + @Override + protected void doSetUp(boolean deleteAllMessages) throws Exception { + super.doSetUp(deleteAllMessages); + + included = new ActiveMQTopic("include.test.bar"); + excluded = new ActiveMQTopic("exclude.test.bar"); + } + // works b/c of non marshaling vm transport, the connection // ref from the client is used during the forward @Test(timeout = 60 * 1000) @@ -364,76 +354,6 @@ public class SimpleNetworkTest { } } - @Before - public void setUp() throws Exception { - doSetUp(true); - } - - @After - public void tearDown() throws Exception { - doTearDown(); - } - - protected void doTearDown() throws Exception { - localConnection.close(); - remoteConnection.close(); - localBroker.stop(); - remoteBroker.stop(); - } - - protected void doSetUp(boolean deleteAllMessages) throws Exception { - remoteBroker = createRemoteBroker(); - remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); - remoteBroker.start(); - remoteBroker.waitUntilStarted(); - localBroker = createLocalBroker(); - localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); - localBroker.start(); - localBroker.waitUntilStarted(); - URI localURI = localBroker.getVmConnectorURI(); - ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); - fac.setAlwaysSyncSend(true); - fac.setDispatchAsync(false); - localConnection = fac.createConnection(); - localConnection.setClientID("clientId"); - localConnection.start(); - URI remoteURI = remoteBroker.getVmConnectorURI(); - fac = new ActiveMQConnectionFactory(remoteURI); - remoteConnection = fac.createConnection(); - remoteConnection.setClientID("clientId"); - remoteConnection.start(); - included = new ActiveMQTopic("include.test.bar"); - excluded = new ActiveMQTopic("exclude.test.bar"); - localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - protected String getRemoteBrokerURI() { - return "org/apache/activemq/network/remoteBroker.xml"; - } - - protected String getLocalBrokerURI() { - return "org/apache/activemq/network/localBroker.xml"; - } - - protected BrokerService createBroker(String uri) throws Exception { - Resource resource = new ClassPathResource(uri); - BrokerFactoryBean factory = new BrokerFactoryBean(resource); - resource = new ClassPathResource(uri); - factory = new BrokerFactoryBean(resource); - factory.afterPropertiesSet(); - BrokerService result = factory.getBroker(); - return result; - } - - protected BrokerService createLocalBroker() throws Exception { - return createBroker(getLocalBrokerURI()); - } - - protected BrokerService createRemoteBroker() throws Exception { - return createBroker(getRemoteBrokerURI()); - } - protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception { final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-custom-factory.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-custom-factory.xml new file mode 100644 index 0000000000..9dc8c61beb --- /dev/null +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-custom-factory.xml @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + \ No newline at end of file