AMQ-6861 Allow customisation of network bridge creation logic.

This commit is contained in:
Łukasz Dywicki 2017-11-13 15:06:22 +01:00 committed by Hadrian Zbarcea
parent 2f1a6d3b3b
commit 3e9ee71062
9 changed files with 428 additions and 105 deletions

View File

@ -1442,7 +1442,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
listener.setCreatedByDuplex(true);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
duplexBridge = config.getBridgeFactory().createNetworkBridge(config, localTransport, remoteBridgeTransport, listener);
duplexBridge.setBrokerService(brokerService);
//Need to set durableDestinations to properly restart subs when dynamicOnly=false
duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations(

View File

@ -0,0 +1,23 @@
package org.apache.activemq.network;
import org.apache.activemq.transport.Transport;
/**
* Encapsulation of bridge creation logic.
*
* This SPI interface is intended to customize or decorate existing bridge implementations.
*/
public interface BridgeFactory {
/**
* Create a network bridge between two specified transports.
*
* @param configuration Bridge configuration.
* @param localTransport Local side of bridge.
* @param remoteTransport Remote side of bridge.
* @param listener Bridge listener.
* @return the NetworkBridge
*/
DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, final NetworkBridgeListener listener);
}

View File

@ -256,7 +256,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
}
NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName());
DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener);
DemandForwardingBridge result = getBridgeFactory().createNetworkBridge(this, localTransport, remoteTransport, listener);
result.setBrokerService(getBrokerService());
return configureBridge(result);
}

View File

@ -77,6 +77,11 @@ public class NetworkBridgeConfiguration {
private long gcSweepTime = 60 * 1000;
private boolean checkDuplicateMessagesOnDuplex = false;
/**
* Bridge factory implementation - by default backed by static factory, which is default implementation and will rely change.
*/
private BridgeFactory bridgeFactory = NetworkBridgeFactory.INSTANCE;
/**
* @return the conduitSubscriptions
*/
@ -541,6 +546,14 @@ public class NetworkBridgeConfiguration {
return useVirtualDestSubs;
}
public BridgeFactory getBridgeFactory() {
return bridgeFactory;
}
public void setBridgeFactory(BridgeFactory bridgeFactory) {
this.bridgeFactory = bridgeFactory;
}
/**
* This was a typo, so this is deprecated as of 5.13.1
*/

View File

@ -18,7 +18,10 @@ package org.apache.activemq.network;
import java.net.URI;
import java.util.HashMap;
import org.apache.activemq.broker.Broker;
import java.util.LinkedHashSet;
import java.util.ServiceLoader;
import java.util.Set;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.URISupport;
@ -28,13 +31,32 @@ import org.apache.activemq.util.URISupport;
*
*
*/
public final class NetworkBridgeFactory {
public final class NetworkBridgeFactory implements BridgeFactory {
public final static BridgeFactory INSTANCE = new NetworkBridgeFactory();
private NetworkBridgeFactory() {
}
@Override
public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) {
if (configuration.isConduitSubscriptions()) {
// dynamicOnly determines whether durables are auto bridged
return attachListener(new DurableConduitBridge(configuration, localTransport, remoteTransport), listener);
}
return attachListener(new DemandForwardingBridge(configuration, localTransport, remoteTransport), listener);
}
private DemandForwardingBridge attachListener(DemandForwardingBridge bridge, NetworkBridgeListener listener) {
if (listener != null) {
bridge.setNetworkBridgeListener(listener);
}
return bridge;
}
/**
* create a network bridge
* Create a network bridge
*
* @param configuration
* @param localTransport
@ -42,20 +64,11 @@ public final class NetworkBridgeFactory {
* @param listener
* @return the NetworkBridge
*/
@Deprecated
public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,
Transport localTransport, Transport remoteTransport,
final NetworkBridgeListener listener) {
DemandForwardingBridge result = null;
if (configuration.isConduitSubscriptions()) {
// dynamicOnly determines whether durables are auto bridged
result = new DurableConduitBridge(configuration, localTransport, remoteTransport);
} else {
result = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
}
if (listener != null) {
result.setNetworkBridgeListener(listener);
}
return result;
return INSTANCE.createNetworkBridge(configuration, localTransport, remoteTransport, listener);
}
public static Transport createLocalTransport(NetworkBridgeConfiguration configuration, URI uri) throws Exception {
@ -74,4 +87,5 @@ public final class NetworkBridgeFactory {
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
return TransportFactory.connect(uri);
}
}

View File

@ -0,0 +1,96 @@
package org.apache.activemq.network;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
public class BaseNetworkTest {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
protected Connection localConnection;
protected Connection remoteConnection;
protected BrokerService localBroker;
protected BrokerService remoteBroker;
protected Session localSession;
protected Session remoteSession;
@Before
public final void setUp() throws Exception {
doSetUp(true);
}
@After
public final void tearDown() throws Exception {
doTearDown();
}
protected void doTearDown() throws Exception {
localConnection.close();
remoteConnection.close();
localBroker.stop();
remoteBroker.stop();
}
protected void doSetUp(boolean deleteAllMessages) throws Exception {
remoteBroker = createRemoteBroker();
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start();
remoteBroker.waitUntilStarted();
localBroker = createLocalBroker();
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start();
localBroker.waitUntilStarted();
URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true);
fac.setDispatchAsync(false);
localConnection = fac.createConnection();
localConnection.setClientID("clientId");
localConnection.start();
URI remoteURI = remoteBroker.getVmConnectorURI();
fac = new ActiveMQConnectionFactory(remoteURI);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("clientId");
remoteConnection.start();
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
protected String getRemoteBrokerURI() {
return "org/apache/activemq/network/remoteBroker.xml";
}
protected String getLocalBrokerURI() {
return "org/apache/activemq/network/localBroker.xml";
}
protected BrokerService createBroker(String uri) throws Exception {
Resource resource = new ClassPathResource(uri);
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
resource = new ClassPathResource(uri);
factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
BrokerService result = factory.getBroker();
return result;
}
protected BrokerService createLocalBroker() throws Exception {
return createBroker(getLocalBrokerURI());
}
protected BrokerService createRemoteBroker() throws Exception {
return createBroker(getRemoteBrokerURI());
}
}

View File

@ -0,0 +1,217 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
import org.apache.activemq.transport.Transport;
import org.junit.Test;
import org.mockito.Mockito;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* Basic test which verify if custom bridge factory receives any interactions when configured.
*/
public class CustomBridgeFactoryTest extends BaseNetworkTest {
private ActiveMQQueue outgoing = new ActiveMQQueue("outgoing");
/**
* Verification of outgoing communication - from local broker (with customized bridge configured) to remote one.
*/
@Test
public void verifyOutgoingCommunication() throws JMSException {
CustomNetworkBridgeFactory bridgeFactory = getCustomNetworkBridgeFactory();
NetworkBridgeListener listener = bridgeFactory.getListener();
verify(listener).onStart(any(NetworkBridge.class));
verifyNoMoreInteractions(listener);
send(localSession, outgoing, localSession.createTextMessage("test message"));
assertNotNull("Message didn't arrive", receive(remoteSession, outgoing));
verify(listener).onOutboundMessage(any(NetworkBridge.class), any(Message.class));
verifyNoMoreInteractions(listener);
}
/**
* Additional test which makes sure that custom bridge receives notification about broker shutdown.
*/
@Test
public void verifyBrokerShutdown() {
shutdownTest(() -> {
try {
localBroker.stop();
} catch (Exception e) {
return e;
}
return null;
});
}
/**
* Verification of network connector shutdown.
*/
@Test
public void verifyConnectorShutdown() {
shutdownTest(() -> {
try {
getLocalConnector(0).stop();
} catch (Exception e) {
return e;
}
return null;
});
}
private void shutdownTest(Supplier<Throwable> callback) {
CustomNetworkBridgeFactory bridgeFactory = getCustomNetworkBridgeFactory();
NetworkBridgeListener listener = bridgeFactory.getListener();
verify(listener).onStart(any(NetworkBridge.class));
verifyNoMoreInteractions(listener);
Throwable throwable = callback.get();
assertNull("Unexpected error", throwable);
verify(listener).onStop(any(NetworkBridge.class));
verifyNoMoreInteractions(listener);
}
// helper methods
private void send(Session session, ActiveMQQueue destination, TextMessage message) throws JMSException {
MessageProducer producer = session.createProducer(destination);
try {
producer.send(message);
} finally {
producer.close();
}
}
private javax.jms.Message receive(Session session, ActiveMQQueue destination) throws JMSException {
MessageConsumer consumer = session.createConsumer(destination);
try {
return consumer.receive(TimeUnit.SECONDS.toMillis(5));
} finally {
consumer.close();
}
}
// infrastructure operations digging for connectors in running broker
private CustomNetworkBridgeFactory getCustomNetworkBridgeFactory() {
NetworkConnector connector = getLocalConnector(0);
assertTrue(connector.getBridgeFactory() instanceof CustomNetworkBridgeFactory);
return (CustomNetworkBridgeFactory) connector.getBridgeFactory();
}
private NetworkConnector getLocalConnector(int index) {
return localBroker.getNetworkConnectors().get(index);
}
// customizations
protected String getLocalBrokerURI() {
return "org/apache/activemq/network/localBroker-custom-factory.xml";
}
// test classes
static class CustomNetworkBridgeFactory implements BridgeFactory {
private final NetworkBridgeListener listener;
CustomNetworkBridgeFactory() {
this(Mockito.mock(NetworkBridgeListener.class));
}
CustomNetworkBridgeFactory(NetworkBridgeListener listener) {
this.listener = listener;
}
public NetworkBridgeListener getListener() {
return listener;
}
@Override
public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) {
DemandForwardingBridge bridge = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
bridge.setNetworkBridgeListener(new CompositeNetworkBridgeListener(this.listener, listener));
return bridge;
}
}
static class CompositeNetworkBridgeListener implements NetworkBridgeListener {
private final List<NetworkBridgeListener> listeners;
public CompositeNetworkBridgeListener(NetworkBridgeListener ... wrapped) {
this.listeners = Arrays.asList(wrapped);
}
@Override
public void bridgeFailed() {
for (NetworkBridgeListener listener : listeners) {
listener.bridgeFailed();
}
}
@Override
public void onStart(NetworkBridge bridge) {
for (NetworkBridgeListener listener : listeners) {
listener.onStart(bridge);
}
}
@Override
public void onStop(NetworkBridge bridge) {
for (NetworkBridgeListener listener : listeners) {
listener.onStop(bridge);
}
}
@Override
public void onOutboundMessage(NetworkBridge bridge, Message message) {
for (NetworkBridgeListener listener : listeners) {
listener.onOutboundMessage(bridge, message);
}
}
@Override
public void onInboundMessage(NetworkBridge bridge, Message message) {
for (NetworkBridgeListener listener : listeners) {
listener.onInboundMessage(bridge, message);
}
}
}
}

View File

@ -21,11 +21,9 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
@ -33,13 +31,11 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicRequestor;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@ -48,33 +44,27 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
public class SimpleNetworkTest {
public class SimpleNetworkTest extends BaseNetworkTest {
protected static final int MESSAGE_COUNT = 10;
private static final Logger LOG = LoggerFactory.getLogger(SimpleNetworkTest.class);
protected AbstractApplicationContext context;
protected Connection localConnection;
protected Connection remoteConnection;
protected BrokerService localBroker;
protected BrokerService remoteBroker;
protected Session localSession;
protected Session remoteSession;
protected ActiveMQTopic included;
protected ActiveMQTopic excluded;
protected String consumerName = "durableSubs";
@Override
protected void doSetUp(boolean deleteAllMessages) throws Exception {
super.doSetUp(deleteAllMessages);
included = new ActiveMQTopic("include.test.bar");
excluded = new ActiveMQTopic("exclude.test.bar");
}
// works b/c of non marshaling vm transport, the connection
// ref from the client is used during the forward
@Test(timeout = 60 * 1000)
@ -364,76 +354,6 @@ public class SimpleNetworkTest {
}
}
@Before
public void setUp() throws Exception {
doSetUp(true);
}
@After
public void tearDown() throws Exception {
doTearDown();
}
protected void doTearDown() throws Exception {
localConnection.close();
remoteConnection.close();
localBroker.stop();
remoteBroker.stop();
}
protected void doSetUp(boolean deleteAllMessages) throws Exception {
remoteBroker = createRemoteBroker();
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start();
remoteBroker.waitUntilStarted();
localBroker = createLocalBroker();
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start();
localBroker.waitUntilStarted();
URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true);
fac.setDispatchAsync(false);
localConnection = fac.createConnection();
localConnection.setClientID("clientId");
localConnection.start();
URI remoteURI = remoteBroker.getVmConnectorURI();
fac = new ActiveMQConnectionFactory(remoteURI);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("clientId");
remoteConnection.start();
included = new ActiveMQTopic("include.test.bar");
excluded = new ActiveMQTopic("exclude.test.bar");
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
protected String getRemoteBrokerURI() {
return "org/apache/activemq/network/remoteBroker.xml";
}
protected String getLocalBrokerURI() {
return "org/apache/activemq/network/localBroker.xml";
}
protected BrokerService createBroker(String uri) throws Exception {
Resource resource = new ClassPathResource(uri);
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
resource = new ClassPathResource(uri);
factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
BrokerService result = factory.getBroker();
return result;
}
protected BrokerService createLocalBroker() throws Exception {
return createBroker(getLocalBrokerURI());
}
protected BrokerService createRemoteBroker() throws Exception {
return createBroker(getRemoteBrokerURI());
}
protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();

View File

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="localBroker" start="false" deleteAllMessagesOnStartup="true" persistent="true" useShutdownHook="false" monitorConnectionSplits="true" xmlns="http://activemq.apache.org/schema/core">
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61617)" name="networkConnector">
<bridgeFactory>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.network.CustomBridgeFactoryTest.CustomNetworkBridgeFactory" />
</bridgeFactory>
</networkConnector>
</networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
</broker>
</beans>