From 25d396b58949df63a02aeccd0fef1d1a16f706c9 Mon Sep 17 00:00:00 2001 From: Christian Posta Date: Fri, 7 Dec 2012 16:13:49 +0000 Subject: [PATCH] Fix for: https://issues.apache.org/jira/browse/AMQ-4209 - NetworkConnector and NetworkBridgeConfiguration have same named private variables for excludedDestination https://issues.apache.org/jira/browse/AMQ-4210 -DynamicallyIncludedDestinations is not enforced for the other end of duplex bridge excludedDestinations dynamicallyIncludedDestinations staticallyIncludedDestinations Updated some tests: request-reply with temp dest is not supported with dynamicallyAddedDestinations ATM git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1418373 13f79535-47bb-0310-9956-ffa450edef68 --- .../network/NetworkBridgeConfiguration.java | 15 ++- .../activemq/network/NetworkConnector.java | 45 +------ .../DemandForwardingBridgeFilterTest.java | 93 +++++++++----- ...IncludedDestinationsDuplexNetworkTest.java | 115 ++++++++++++++++++ .../network/MulticastNetworkTest.java | 7 ++ .../NetworkConnectionsCleanedupTest.java | 4 +- .../duplexDynamicIncludedDestLocalBroker.xml | 54 ++++++++ .../activemq/network/duplexLocalBroker.xml | 5 - 8 files changed, 252 insertions(+), 86 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java create mode 100644 activemq-core/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index dde53148d9..eeff55f532 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -16,16 +16,21 @@ */ package org.apache.activemq.network; -import java.util.List; - import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; /** * Configuration for a NetworkBridge */ public class NetworkBridgeConfiguration { + private static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeConfiguration.class); + private boolean conduitSubscriptions = true; private boolean dynamicOnly; private boolean dispatchAsync = true; @@ -42,9 +47,9 @@ public class NetworkBridgeConfiguration { private String destinationFilter = null; private String name = "NC"; - private List excludedDestinations; - private List dynamicallyIncludedDestinations; - private List staticallyIncludedDestinations; + protected List excludedDestinations = new CopyOnWriteArrayList(); + protected List dynamicallyIncludedDestinations = new CopyOnWriteArrayList(); + protected List staticallyIncludedDestinations = new CopyOnWriteArrayList(); private boolean suppressDuplicateQueueSubscriptions = false; private boolean suppressDuplicateTopicSubscriptions = true; diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java index 34e75a112e..aeb8b294de 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -67,9 +67,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem }; private Set durableDestinations; - private List excludedDestinations = new CopyOnWriteArrayList(); - private List dynamicallyIncludedDestinations = new CopyOnWriteArrayList(); - private List staticallyIncludedDestinations = new CopyOnWriteArrayList(); + private BrokerService brokerService; private ObjectName objectName; @@ -102,57 +100,16 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem this.durableDestinations = durableDestinations; } - /** - * @return Returns the excludedDestinations. - */ - public List getExcludedDestinations() { - return excludedDestinations; - } - - /** - * @param excludedDestinations The excludedDestinations to set. - */ - public void setExcludedDestinations(List excludedDestinations) { - this.excludedDestinations = excludedDestinations; - } public void addExcludedDestination(ActiveMQDestination destiantion) { this.excludedDestinations.add(destiantion); } - /** - * @return Returns the staticallyIncludedDestinations. - */ - public List getStaticallyIncludedDestinations() { - return staticallyIncludedDestinations; - } - - /** - * @param staticallyIncludedDestinations The staticallyIncludedDestinations - * to set. - */ - public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations) { - this.staticallyIncludedDestinations = staticallyIncludedDestinations; - } public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) { this.staticallyIncludedDestinations.add(destiantion); } - /** - * @return Returns the dynamicallyIncludedDestinations. - */ - public List getDynamicallyIncludedDestinations() { - return dynamicallyIncludedDestinations; - } - - /** - * @param dynamicallyIncludedDestinations The - * dynamicallyIncludedDestinations to set. - */ - public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations) { - this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; - } public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) { this.dynamicallyIncludedDestinations.add(destiantion); diff --git a/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java b/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java index 25f508d431..087ddd0a45 100644 --- a/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java @@ -26,6 +26,8 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.SessionInfo; +import java.util.Arrays; + public class DemandForwardingBridgeFilterTest extends NetworkTestSupport { @@ -41,36 +43,44 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport { public void testWildcardOnExcludedDestination() throws Exception { - bridge.setExcludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination("OTHER.>", - ActiveMQDestination.TOPIC_TYPE) }); - bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination( - "TEST", ActiveMQDestination.QUEUE_TYPE) }); - bridge.start(); + NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration(); + + configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", + ActiveMQDestination.TOPIC_TYPE))); + configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination( + "TEST", ActiveMQDestination.QUEUE_TYPE))); + + configureAndStartBridge(configuration); assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE); assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE); } public void testWildcardOnTwoExcludedDestination() throws Exception { + NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration(); - bridge.setExcludedDestinations(new ActiveMQDestination[] { - ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE), - ActiveMQDestination.createDestination("TEST.X1", ActiveMQDestination.QUEUE_TYPE) }); - bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination( - "TEST.X2", ActiveMQDestination.QUEUE_TYPE) }); - bridge.start(); + configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE), + ActiveMQDestination.createDestination("TEST.X1", ActiveMQDestination.QUEUE_TYPE))); + configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination( + "TEST.X2", ActiveMQDestination.QUEUE_TYPE))); + + configureAndStartBridge(configuration); assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE); assertReceiveNoMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE); assertReceiveNoMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE); } + public void testWildcardOnDynamicallyIncludedDestination() throws Exception { - bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { - ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE), - ActiveMQDestination.createDestination("TEST.X2", ActiveMQDestination.QUEUE_TYPE) }); - bridge.start(); + NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration(); + + configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE), + ActiveMQDestination.createDestination("TEST.X2", ActiveMQDestination.QUEUE_TYPE))); + + configureAndStartBridge(configuration); + assertReceiveMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE); assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE); @@ -78,11 +88,14 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport { public void testDistinctTopicAndQueue() throws Exception { - bridge.setExcludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination(">", - ActiveMQDestination.TOPIC_TYPE) }); - bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination( - ">", ActiveMQDestination.QUEUE_TYPE) }); - bridge.start(); + NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration(); + + configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(">", + ActiveMQDestination.TOPIC_TYPE))); + configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination( + ">", ActiveMQDestination.QUEUE_TYPE))); + + configureAndStartBridge(configuration); assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE); assertReceiveNoMessageOn("TEST", ActiveMQDestination.TOPIC_TYPE); @@ -90,14 +103,14 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport { public void testListOfExcludedDestinationWithWildcard() throws Exception { - bridge.setExcludedDestinations(new ActiveMQDestination[] { - ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.TOPIC_TYPE), - ActiveMQDestination.createDestination("TEST.*", ActiveMQDestination.TOPIC_TYPE) }); + NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration(); - bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination( - "TEST.X1", ActiveMQDestination.QUEUE_TYPE) }); + configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.TOPIC_TYPE), + ActiveMQDestination.createDestination("TEST.*", ActiveMQDestination.TOPIC_TYPE))); + configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination( + "TEST.X1", ActiveMQDestination.QUEUE_TYPE))); - bridge.start(); + configureAndStartBridge(configuration); assertReceiveMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE); assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE); @@ -143,11 +156,7 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport { protected void setUp() throws Exception { super.setUp(); - NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); - config.setBrokerName("local"); - config.setDispatchAsync(false); - bridge = new DemandForwardingBridge(config, createTransport(), createRemoteTransport()); - bridge.setBrokerService(broker); + producerConnection = createConnection(); ConnectionInfo producerConnectionInfo = createConnectionInfo(); @@ -177,4 +186,26 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport { junit.textui.TestRunner.run(suite()); } + public NetworkBridgeConfiguration getDefaultBridgeConfiguration() { + NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); + config.setBrokerName("local"); + config.setDispatchAsync(false); + return config; + } + + private void configureAndStartBridge(NetworkBridgeConfiguration configuration) throws Exception { + bridge = new DemandForwardingBridge(configuration, createTransport(), createRemoteTransport()); + bridge.setBrokerService(broker); + bridge.setDynamicallyIncludedDestinations(configuration.getDynamicallyIncludedDestinations().toArray( + new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()] + )); + bridge.setExcludedDestinations(configuration.getExcludedDestinations().toArray( + new ActiveMQDestination[configuration.getExcludedDestinations().size()] + )); + bridge.setStaticallyIncludedDestinations(configuration.getStaticallyIncludedDestinations().toArray( + new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()] + )); + bridge.start(); + } + } \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java new file mode 100644 index 0000000000..8036f9ef26 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnection; +import org.junit.Test; + +import javax.jms.MessageProducer; +import javax.jms.TemporaryQueue; +import java.lang.reflect.Field; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertNotNull; + +/** + * @author Christian Posta + */ +public class DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetworkTest { + + private static final int REMOTE_BROKER_TCP_PORT = 61617; + + @Override + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml"; + } + + @Override + protected BrokerService createRemoteBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("remoteBroker"); + broker.addConnector("tcp://localhost:" + REMOTE_BROKER_TCP_PORT); + return broker; + } + + // we have to override this, because with dynamicallyIncludedDestinations working properly + // (see https://issues.apache.org/jira/browse/AMQ-4209) you can't get request/response + // with temps working (there is no wild card like there is for staticallyIncludedDest) + // + @Override + public void testRequestReply() throws Exception { + + } + + @Test + public void testTempQueues() throws Exception { + TemporaryQueue temp = localSession.createTemporaryQueue(); + MessageProducer producer = localSession.createProducer(temp); + producer.send(localSession.createTextMessage("test")); + Thread.sleep(100); + assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length); + temp.delete(); + Thread.sleep(100); + assertEquals("Destination not deleted", 0, remoteBroker.getAdminView().getTemporaryQueues().length); + } + + @Test + public void testDynamicallyIncludedDestinationsForDuplex() throws Exception{ + // Once the bridge is set up, we should see the filter used for the duplex end of the bridge + // only subscribe to the specific destinations included in the list + // so let's test that the filter is correct, let's also test the subscription on the localbroker + // is correct + + // the bridge on the remote broker has the correct filter + TransportConnection bridgeConnection = getDuplexBridgeConnectionFromRemote(); + assertNotNull(bridgeConnection); + DemandForwardingBridge duplexBridge = getDuplexBridgeFromConnection(bridgeConnection); + assertNotNull(duplexBridge); + NetworkBridgeConfiguration configuration = getConfigurationFromNetworkBridge(duplexBridge); + assertNotNull(configuration); + assertFalse("This destinationFilter does not include ONLY the destinations specified in dynamicallyIncludedDestinations", + configuration.getDestinationFilter().equals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">")); + assertEquals("There are other patterns in the destinationFilter that shouldn't be there", + "ActiveMQ.Advisory.Consumer.Queue.include.test.foo,ActiveMQ.Advisory.Consumer.Topic.include.test.bar", + configuration.getDestinationFilter()); + } + + private NetworkBridgeConfiguration getConfigurationFromNetworkBridge(DemandForwardingBridgeSupport duplexBridge) throws NoSuchFieldException, IllegalAccessException { + Field f = DemandForwardingBridgeSupport.class.getDeclaredField("configuration"); + f.setAccessible(true); + NetworkBridgeConfiguration configuration = (NetworkBridgeConfiguration) f.get(duplexBridge); + return configuration; + } + + private DemandForwardingBridge getDuplexBridgeFromConnection(TransportConnection bridgeConnection) throws NoSuchFieldException, IllegalAccessException { + Field f = TransportConnection.class.getDeclaredField("duplexBridge"); + f.setAccessible(true); + DemandForwardingBridge bridge = (DemandForwardingBridge) f.get(bridgeConnection); + return bridge; + } + + public TransportConnection getDuplexBridgeConnectionFromRemote() { + TransportConnection duplexBridgeConnectionFromRemote = + remoteBroker.getTransportConnectorByName("tcp://localhost:" + REMOTE_BROKER_TCP_PORT) + .getConnections().get(0); + return duplexBridgeConnectionFromRemote; + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java index 9c75b9e099..7813b07635 100644 --- a/activemq-core/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java @@ -28,4 +28,11 @@ public class MulticastNetworkTest extends SimpleNetworkTest { protected String getLocalBrokerURI() { return "org/apache/activemq/network/multicast/localBroker.xml"; } + + // blocked out for multi cast because temp dest request reply isn't supported + // with dynamicallyAddedDestinations + @Override + public void testRequestReply() throws Exception { + + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/network/NetworkConnectionsCleanedupTest.java b/activemq-core/src/test/java/org/apache/activemq/network/NetworkConnectionsCleanedupTest.java index 8348e802fb..ee1bb6bcd0 100755 --- a/activemq-core/src/test/java/org/apache/activemq/network/NetworkConnectionsCleanedupTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/NetworkConnectionsCleanedupTest.java @@ -62,7 +62,9 @@ public class NetworkConnectionsCleanedupTest extends TestCase { protected ActiveMQTopic excluded; protected String consumerName = "durableSubs"; - public void testNetworkConnections() throws Exception { + // skip this test. it runs for an hour, doesn't assert anything, and could probably + // just be removed (seems like a throwaway impl for https://issues.apache.org/activemq/browse/AMQ-1202) + public void skipTestNetworkConnections() throws Exception { String uri = "static:(tcp://localhost:61617)?initialReconnectDelay=100"; List list = new ArrayList(); for (int i =0;i < 100;i++){ diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml b/activemq-core/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml new file mode 100644 index 0000000000..70f6da8215 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml @@ -0,0 +1,54 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/duplexLocalBroker.xml b/activemq-core/src/test/resources/org/apache/activemq/network/duplexLocalBroker.xml index 43a89d14c8..17268c37f1 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/network/duplexLocalBroker.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/network/duplexLocalBroker.xml @@ -32,11 +32,6 @@ conduitSubscriptions = "true" decreaseNetworkConsumerPriority = "false"> - - - - -