mirror of https://github.com/apache/activemq.git
AMQ-6861 Allow customisation of network bridge creation logic.
This commit is contained in:
parent
ab2711abb1
commit
a8a032af09
|
@ -1442,7 +1442,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
}
|
}
|
||||||
MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
|
MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
|
||||||
listener.setCreatedByDuplex(true);
|
listener.setCreatedByDuplex(true);
|
||||||
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
|
duplexBridge = config.getBridgeFactory().createNetworkBridge(config, localTransport, remoteBridgeTransport, listener);
|
||||||
duplexBridge.setBrokerService(brokerService);
|
duplexBridge.setBrokerService(brokerService);
|
||||||
//Need to set durableDestinations to properly restart subs when dynamicOnly=false
|
//Need to set durableDestinations to properly restart subs when dynamicOnly=false
|
||||||
duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations(
|
duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations(
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
package org.apache.activemq.network;
|
||||||
|
|
||||||
|
import org.apache.activemq.transport.Transport;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulation of bridge creation logic.
|
||||||
|
*
|
||||||
|
* This SPI interface is intended to customize or decorate existing bridge implementations.
|
||||||
|
*/
|
||||||
|
public interface BridgeFactory {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a network bridge between two specified transports.
|
||||||
|
*
|
||||||
|
* @param configuration Bridge configuration.
|
||||||
|
* @param localTransport Local side of bridge.
|
||||||
|
* @param remoteTransport Remote side of bridge.
|
||||||
|
* @param listener Bridge listener.
|
||||||
|
* @return the NetworkBridge
|
||||||
|
*/
|
||||||
|
DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, final NetworkBridgeListener listener);
|
||||||
|
|
||||||
|
}
|
|
@ -256,7 +256,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
|
||||||
}
|
}
|
||||||
NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName());
|
NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName());
|
||||||
|
|
||||||
DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener);
|
DemandForwardingBridge result = getBridgeFactory().createNetworkBridge(this, localTransport, remoteTransport, listener);
|
||||||
result.setBrokerService(getBrokerService());
|
result.setBrokerService(getBrokerService());
|
||||||
return configureBridge(result);
|
return configureBridge(result);
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,6 +77,11 @@ public class NetworkBridgeConfiguration {
|
||||||
private long gcSweepTime = 60 * 1000;
|
private long gcSweepTime = 60 * 1000;
|
||||||
private boolean checkDuplicateMessagesOnDuplex = false;
|
private boolean checkDuplicateMessagesOnDuplex = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bridge factory implementation - by default backed by static factory, which is default implementation and will rely change.
|
||||||
|
*/
|
||||||
|
private BridgeFactory bridgeFactory = NetworkBridgeFactory.INSTANCE;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the conduitSubscriptions
|
* @return the conduitSubscriptions
|
||||||
*/
|
*/
|
||||||
|
@ -541,6 +546,14 @@ public class NetworkBridgeConfiguration {
|
||||||
return useVirtualDestSubs;
|
return useVirtualDestSubs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public BridgeFactory getBridgeFactory() {
|
||||||
|
return bridgeFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBridgeFactory(BridgeFactory bridgeFactory) {
|
||||||
|
this.bridgeFactory = bridgeFactory;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This was a typo, so this is deprecated as of 5.13.1
|
* This was a typo, so this is deprecated as of 5.13.1
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -18,7 +18,10 @@ package org.apache.activemq.network;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import org.apache.activemq.broker.Broker;
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.ServiceLoader;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.TransportFactory;
|
import org.apache.activemq.transport.TransportFactory;
|
||||||
import org.apache.activemq.util.URISupport;
|
import org.apache.activemq.util.URISupport;
|
||||||
|
@ -28,13 +31,32 @@ import org.apache.activemq.util.URISupport;
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public final class NetworkBridgeFactory {
|
public final class NetworkBridgeFactory implements BridgeFactory {
|
||||||
|
|
||||||
|
public final static BridgeFactory INSTANCE = new NetworkBridgeFactory();
|
||||||
|
|
||||||
private NetworkBridgeFactory() {
|
private NetworkBridgeFactory() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) {
|
||||||
|
if (configuration.isConduitSubscriptions()) {
|
||||||
|
// dynamicOnly determines whether durables are auto bridged
|
||||||
|
return attachListener(new DurableConduitBridge(configuration, localTransport, remoteTransport), listener);
|
||||||
|
}
|
||||||
|
return attachListener(new DemandForwardingBridge(configuration, localTransport, remoteTransport), listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
private DemandForwardingBridge attachListener(DemandForwardingBridge bridge, NetworkBridgeListener listener) {
|
||||||
|
if (listener != null) {
|
||||||
|
bridge.setNetworkBridgeListener(listener);
|
||||||
|
}
|
||||||
|
return bridge;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* create a network bridge
|
* Create a network bridge
|
||||||
*
|
*
|
||||||
* @param configuration
|
* @param configuration
|
||||||
* @param localTransport
|
* @param localTransport
|
||||||
|
@ -42,20 +64,11 @@ public final class NetworkBridgeFactory {
|
||||||
* @param listener
|
* @param listener
|
||||||
* @return the NetworkBridge
|
* @return the NetworkBridge
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,
|
public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,
|
||||||
Transport localTransport, Transport remoteTransport,
|
Transport localTransport, Transport remoteTransport,
|
||||||
final NetworkBridgeListener listener) {
|
final NetworkBridgeListener listener) {
|
||||||
DemandForwardingBridge result = null;
|
return INSTANCE.createNetworkBridge(configuration, localTransport, remoteTransport, listener);
|
||||||
if (configuration.isConduitSubscriptions()) {
|
|
||||||
// dynamicOnly determines whether durables are auto bridged
|
|
||||||
result = new DurableConduitBridge(configuration, localTransport, remoteTransport);
|
|
||||||
} else {
|
|
||||||
result = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
|
|
||||||
}
|
|
||||||
if (listener != null) {
|
|
||||||
result.setNetworkBridgeListener(listener);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Transport createLocalTransport(NetworkBridgeConfiguration configuration, URI uri) throws Exception {
|
public static Transport createLocalTransport(NetworkBridgeConfiguration configuration, URI uri) throws Exception {
|
||||||
|
@ -74,4 +87,5 @@ public final class NetworkBridgeFactory {
|
||||||
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
|
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
|
||||||
return TransportFactory.connect(uri);
|
return TransportFactory.connect(uri);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,96 @@
|
||||||
|
package org.apache.activemq.network;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.xbean.BrokerFactoryBean;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
import org.springframework.core.io.Resource;
|
||||||
|
|
||||||
|
public class BaseNetworkTest {
|
||||||
|
|
||||||
|
protected final Logger LOG = LoggerFactory.getLogger(getClass());
|
||||||
|
|
||||||
|
protected Connection localConnection;
|
||||||
|
protected Connection remoteConnection;
|
||||||
|
protected BrokerService localBroker;
|
||||||
|
protected BrokerService remoteBroker;
|
||||||
|
protected Session localSession;
|
||||||
|
protected Session remoteSession;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public final void setUp() throws Exception {
|
||||||
|
doSetUp(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public final void tearDown() throws Exception {
|
||||||
|
doTearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void doTearDown() throws Exception {
|
||||||
|
localConnection.close();
|
||||||
|
remoteConnection.close();
|
||||||
|
localBroker.stop();
|
||||||
|
remoteBroker.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void doSetUp(boolean deleteAllMessages) throws Exception {
|
||||||
|
remoteBroker = createRemoteBroker();
|
||||||
|
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||||
|
remoteBroker.start();
|
||||||
|
remoteBroker.waitUntilStarted();
|
||||||
|
localBroker = createLocalBroker();
|
||||||
|
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||||
|
localBroker.start();
|
||||||
|
localBroker.waitUntilStarted();
|
||||||
|
URI localURI = localBroker.getVmConnectorURI();
|
||||||
|
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
|
||||||
|
fac.setAlwaysSyncSend(true);
|
||||||
|
fac.setDispatchAsync(false);
|
||||||
|
localConnection = fac.createConnection();
|
||||||
|
localConnection.setClientID("clientId");
|
||||||
|
localConnection.start();
|
||||||
|
URI remoteURI = remoteBroker.getVmConnectorURI();
|
||||||
|
fac = new ActiveMQConnectionFactory(remoteURI);
|
||||||
|
remoteConnection = fac.createConnection();
|
||||||
|
remoteConnection.setClientID("clientId");
|
||||||
|
remoteConnection.start();
|
||||||
|
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getRemoteBrokerURI() {
|
||||||
|
return "org/apache/activemq/network/remoteBroker.xml";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getLocalBrokerURI() {
|
||||||
|
return "org/apache/activemq/network/localBroker.xml";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createBroker(String uri) throws Exception {
|
||||||
|
Resource resource = new ClassPathResource(uri);
|
||||||
|
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
|
||||||
|
resource = new ClassPathResource(uri);
|
||||||
|
factory = new BrokerFactoryBean(resource);
|
||||||
|
factory.afterPropertiesSet();
|
||||||
|
BrokerService result = factory.getBroker();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createLocalBroker() throws Exception {
|
||||||
|
return createBroker(getLocalBrokerURI());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createRemoteBroker() throws Exception {
|
||||||
|
return createBroker(getRemoteBrokerURI());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,217 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.network;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.transport.Transport;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Basic test which verify if custom bridge factory receives any interactions when configured.
|
||||||
|
*/
|
||||||
|
public class CustomBridgeFactoryTest extends BaseNetworkTest {
|
||||||
|
|
||||||
|
private ActiveMQQueue outgoing = new ActiveMQQueue("outgoing");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verification of outgoing communication - from local broker (with customized bridge configured) to remote one.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void verifyOutgoingCommunication() throws JMSException {
|
||||||
|
CustomNetworkBridgeFactory bridgeFactory = getCustomNetworkBridgeFactory();
|
||||||
|
NetworkBridgeListener listener = bridgeFactory.getListener();
|
||||||
|
|
||||||
|
verify(listener).onStart(any(NetworkBridge.class));
|
||||||
|
verifyNoMoreInteractions(listener);
|
||||||
|
|
||||||
|
send(localSession, outgoing, localSession.createTextMessage("test message"));
|
||||||
|
assertNotNull("Message didn't arrive", receive(remoteSession, outgoing));
|
||||||
|
|
||||||
|
verify(listener).onOutboundMessage(any(NetworkBridge.class), any(Message.class));
|
||||||
|
verifyNoMoreInteractions(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Additional test which makes sure that custom bridge receives notification about broker shutdown.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void verifyBrokerShutdown() {
|
||||||
|
shutdownTest(() -> {
|
||||||
|
try {
|
||||||
|
localBroker.stop();
|
||||||
|
} catch (Exception e) {
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verification of network connector shutdown.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void verifyConnectorShutdown() {
|
||||||
|
shutdownTest(() -> {
|
||||||
|
try {
|
||||||
|
getLocalConnector(0).stop();
|
||||||
|
} catch (Exception e) {
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shutdownTest(Supplier<Throwable> callback) {
|
||||||
|
CustomNetworkBridgeFactory bridgeFactory = getCustomNetworkBridgeFactory();
|
||||||
|
NetworkBridgeListener listener = bridgeFactory.getListener();
|
||||||
|
|
||||||
|
verify(listener).onStart(any(NetworkBridge.class));
|
||||||
|
verifyNoMoreInteractions(listener);
|
||||||
|
|
||||||
|
Throwable throwable = callback.get();
|
||||||
|
assertNull("Unexpected error", throwable);
|
||||||
|
|
||||||
|
verify(listener).onStop(any(NetworkBridge.class));
|
||||||
|
verifyNoMoreInteractions(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper methods
|
||||||
|
private void send(Session session, ActiveMQQueue destination, TextMessage message) throws JMSException {
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
try {
|
||||||
|
producer.send(message);
|
||||||
|
} finally {
|
||||||
|
producer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private javax.jms.Message receive(Session session, ActiveMQQueue destination) throws JMSException {
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
try {
|
||||||
|
return consumer.receive(TimeUnit.SECONDS.toMillis(5));
|
||||||
|
} finally {
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// infrastructure operations digging for connectors in running broker
|
||||||
|
private CustomNetworkBridgeFactory getCustomNetworkBridgeFactory() {
|
||||||
|
NetworkConnector connector = getLocalConnector(0);
|
||||||
|
|
||||||
|
assertTrue(connector.getBridgeFactory() instanceof CustomNetworkBridgeFactory);
|
||||||
|
|
||||||
|
return (CustomNetworkBridgeFactory) connector.getBridgeFactory();
|
||||||
|
}
|
||||||
|
|
||||||
|
private NetworkConnector getLocalConnector(int index) {
|
||||||
|
return localBroker.getNetworkConnectors().get(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
// customizations
|
||||||
|
protected String getLocalBrokerURI() {
|
||||||
|
return "org/apache/activemq/network/localBroker-custom-factory.xml";
|
||||||
|
}
|
||||||
|
|
||||||
|
// test classes
|
||||||
|
static class CustomNetworkBridgeFactory implements BridgeFactory {
|
||||||
|
|
||||||
|
private final NetworkBridgeListener listener;
|
||||||
|
|
||||||
|
CustomNetworkBridgeFactory() {
|
||||||
|
this(Mockito.mock(NetworkBridgeListener.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
CustomNetworkBridgeFactory(NetworkBridgeListener listener) {
|
||||||
|
this.listener = listener;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NetworkBridgeListener getListener() {
|
||||||
|
return listener;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) {
|
||||||
|
DemandForwardingBridge bridge = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
|
||||||
|
bridge.setNetworkBridgeListener(new CompositeNetworkBridgeListener(this.listener, listener));
|
||||||
|
return bridge;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static class CompositeNetworkBridgeListener implements NetworkBridgeListener {
|
||||||
|
|
||||||
|
private final List<NetworkBridgeListener> listeners;
|
||||||
|
|
||||||
|
public CompositeNetworkBridgeListener(NetworkBridgeListener ... wrapped) {
|
||||||
|
this.listeners = Arrays.asList(wrapped);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void bridgeFailed() {
|
||||||
|
for (NetworkBridgeListener listener : listeners) {
|
||||||
|
listener.bridgeFailed();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onStart(NetworkBridge bridge) {
|
||||||
|
for (NetworkBridgeListener listener : listeners) {
|
||||||
|
listener.onStart(bridge);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onStop(NetworkBridge bridge) {
|
||||||
|
for (NetworkBridgeListener listener : listeners) {
|
||||||
|
listener.onStop(bridge);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onOutboundMessage(NetworkBridge bridge, Message message) {
|
||||||
|
for (NetworkBridgeListener listener : listeners) {
|
||||||
|
listener.onOutboundMessage(bridge, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onInboundMessage(NetworkBridge bridge, Message message) {
|
||||||
|
for (NetworkBridgeListener listener : listeners) {
|
||||||
|
listener.onInboundMessage(bridge, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -21,11 +21,9 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.net.URI;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
@ -33,13 +31,11 @@ import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageListener;
|
import javax.jms.MessageListener;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import javax.jms.TopicRequestor;
|
import javax.jms.TopicRequestor;
|
||||||
import javax.jms.TopicSession;
|
import javax.jms.TopicSession;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
@ -48,33 +44,27 @@ import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.command.ConsumerId;
|
import org.apache.activemq.command.ConsumerId;
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
import org.apache.activemq.util.Wait.Condition;
|
import org.apache.activemq.util.Wait.Condition;
|
||||||
import org.apache.activemq.xbean.BrokerFactoryBean;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.context.support.AbstractApplicationContext;
|
import org.springframework.context.support.AbstractApplicationContext;
|
||||||
import org.springframework.core.io.ClassPathResource;
|
|
||||||
import org.springframework.core.io.Resource;
|
|
||||||
|
|
||||||
public class SimpleNetworkTest {
|
public class SimpleNetworkTest extends BaseNetworkTest {
|
||||||
|
|
||||||
protected static final int MESSAGE_COUNT = 10;
|
protected static final int MESSAGE_COUNT = 10;
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(SimpleNetworkTest.class);
|
|
||||||
|
|
||||||
protected AbstractApplicationContext context;
|
protected AbstractApplicationContext context;
|
||||||
protected Connection localConnection;
|
|
||||||
protected Connection remoteConnection;
|
|
||||||
protected BrokerService localBroker;
|
|
||||||
protected BrokerService remoteBroker;
|
|
||||||
protected Session localSession;
|
|
||||||
protected Session remoteSession;
|
|
||||||
protected ActiveMQTopic included;
|
protected ActiveMQTopic included;
|
||||||
protected ActiveMQTopic excluded;
|
protected ActiveMQTopic excluded;
|
||||||
protected String consumerName = "durableSubs";
|
protected String consumerName = "durableSubs";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doSetUp(boolean deleteAllMessages) throws Exception {
|
||||||
|
super.doSetUp(deleteAllMessages);
|
||||||
|
|
||||||
|
included = new ActiveMQTopic("include.test.bar");
|
||||||
|
excluded = new ActiveMQTopic("exclude.test.bar");
|
||||||
|
}
|
||||||
|
|
||||||
// works b/c of non marshaling vm transport, the connection
|
// works b/c of non marshaling vm transport, the connection
|
||||||
// ref from the client is used during the forward
|
// ref from the client is used during the forward
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
|
@ -364,76 +354,6 @@ public class SimpleNetworkTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
doSetUp(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
doTearDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void doTearDown() throws Exception {
|
|
||||||
localConnection.close();
|
|
||||||
remoteConnection.close();
|
|
||||||
localBroker.stop();
|
|
||||||
remoteBroker.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void doSetUp(boolean deleteAllMessages) throws Exception {
|
|
||||||
remoteBroker = createRemoteBroker();
|
|
||||||
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
|
||||||
remoteBroker.start();
|
|
||||||
remoteBroker.waitUntilStarted();
|
|
||||||
localBroker = createLocalBroker();
|
|
||||||
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
|
||||||
localBroker.start();
|
|
||||||
localBroker.waitUntilStarted();
|
|
||||||
URI localURI = localBroker.getVmConnectorURI();
|
|
||||||
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
|
|
||||||
fac.setAlwaysSyncSend(true);
|
|
||||||
fac.setDispatchAsync(false);
|
|
||||||
localConnection = fac.createConnection();
|
|
||||||
localConnection.setClientID("clientId");
|
|
||||||
localConnection.start();
|
|
||||||
URI remoteURI = remoteBroker.getVmConnectorURI();
|
|
||||||
fac = new ActiveMQConnectionFactory(remoteURI);
|
|
||||||
remoteConnection = fac.createConnection();
|
|
||||||
remoteConnection.setClientID("clientId");
|
|
||||||
remoteConnection.start();
|
|
||||||
included = new ActiveMQTopic("include.test.bar");
|
|
||||||
excluded = new ActiveMQTopic("exclude.test.bar");
|
|
||||||
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
||||||
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String getRemoteBrokerURI() {
|
|
||||||
return "org/apache/activemq/network/remoteBroker.xml";
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String getLocalBrokerURI() {
|
|
||||||
return "org/apache/activemq/network/localBroker.xml";
|
|
||||||
}
|
|
||||||
|
|
||||||
protected BrokerService createBroker(String uri) throws Exception {
|
|
||||||
Resource resource = new ClassPathResource(uri);
|
|
||||||
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
|
|
||||||
resource = new ClassPathResource(uri);
|
|
||||||
factory = new BrokerFactoryBean(resource);
|
|
||||||
factory.afterPropertiesSet();
|
|
||||||
BrokerService result = factory.getBroker();
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected BrokerService createLocalBroker() throws Exception {
|
|
||||||
return createBroker(getLocalBrokerURI());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected BrokerService createRemoteBroker() throws Exception {
|
|
||||||
return createBroker(getRemoteBrokerURI());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
|
protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
|
||||||
|
|
||||||
final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
|
final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<beans
|
||||||
|
xmlns="http://www.springframework.org/schema/beans"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
||||||
|
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
|
||||||
|
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
|
||||||
|
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||||
|
|
||||||
|
<broker brokerName="localBroker" start="false" deleteAllMessagesOnStartup="true" persistent="true" useShutdownHook="false" monitorConnectionSplits="true" xmlns="http://activemq.apache.org/schema/core">
|
||||||
|
<networkConnectors>
|
||||||
|
<networkConnector uri="static:(tcp://localhost:61617)" name="networkConnector">
|
||||||
|
<bridgeFactory>
|
||||||
|
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.network.CustomBridgeFactoryTest.CustomNetworkBridgeFactory" />
|
||||||
|
</bridgeFactory>
|
||||||
|
</networkConnector>
|
||||||
|
</networkConnectors>
|
||||||
|
|
||||||
|
<transportConnectors>
|
||||||
|
<transportConnector uri="tcp://localhost:61616"/>
|
||||||
|
</transportConnectors>
|
||||||
|
|
||||||
|
</broker>
|
||||||
|
</beans>
|
Loading…
Reference in New Issue