https://issues.apache.org/jira/browse/AMQ-4209 - NetworkConnector and NetworkBridgeConfiguration have same named private variables for excludedDestination
https://issues.apache.org/jira/browse/AMQ-4210 -DynamicallyIncludedDestinations is not enforced for the other end of duplex bridge

excludedDestinations
dynamicallyIncludedDestinations
staticallyIncludedDestinations
Updated some tests: request-reply with temp dest is not supported with dynamicallyAddedDestinations ATM

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1418373 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christian Posta 2012-12-07 16:13:49 +00:00
parent 76963e6adf
commit 25d396b589
8 changed files with 252 additions and 86 deletions

View File

@ -16,16 +16,21 @@
*/
package org.apache.activemq.network;
import java.util.List;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Configuration for a NetworkBridge
*/
public class NetworkBridgeConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeConfiguration.class);
private boolean conduitSubscriptions = true;
private boolean dynamicOnly;
private boolean dispatchAsync = true;
@ -42,9 +47,9 @@ public class NetworkBridgeConfiguration {
private String destinationFilter = null;
private String name = "NC";
private List<ActiveMQDestination> excludedDestinations;
private List<ActiveMQDestination> dynamicallyIncludedDestinations;
private List<ActiveMQDestination> staticallyIncludedDestinations;
protected List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
protected List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
protected List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
private boolean suppressDuplicateQueueSubscriptions = false;
private boolean suppressDuplicateTopicSubscriptions = true;

View File

@ -67,9 +67,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
};
private Set<ActiveMQDestination> durableDestinations;
private List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
private List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
private BrokerService brokerService;
private ObjectName objectName;
@ -102,57 +100,16 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
this.durableDestinations = durableDestinations;
}
/**
* @return Returns the excludedDestinations.
*/
public List<ActiveMQDestination> getExcludedDestinations() {
return excludedDestinations;
}
/**
* @param excludedDestinations The excludedDestinations to set.
*/
public void setExcludedDestinations(List<ActiveMQDestination> excludedDestinations) {
this.excludedDestinations = excludedDestinations;
}
public void addExcludedDestination(ActiveMQDestination destiantion) {
this.excludedDestinations.add(destiantion);
}
/**
* @return Returns the staticallyIncludedDestinations.
*/
public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
return staticallyIncludedDestinations;
}
/**
* @param staticallyIncludedDestinations The staticallyIncludedDestinations
* to set.
*/
public void setStaticallyIncludedDestinations(List<ActiveMQDestination> staticallyIncludedDestinations) {
this.staticallyIncludedDestinations = staticallyIncludedDestinations;
}
public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
this.staticallyIncludedDestinations.add(destiantion);
}
/**
* @return Returns the dynamicallyIncludedDestinations.
*/
public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
return dynamicallyIncludedDestinations;
}
/**
* @param dynamicallyIncludedDestinations The
* dynamicallyIncludedDestinations to set.
*/
public void setDynamicallyIncludedDestinations(List<ActiveMQDestination> dynamicallyIncludedDestinations) {
this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
}
public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
this.dynamicallyIncludedDestinations.add(destiantion);

View File

@ -26,6 +26,8 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import java.util.Arrays;
public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
@ -41,36 +43,44 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
public void testWildcardOnExcludedDestination() throws Exception {
bridge.setExcludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination("OTHER.>",
ActiveMQDestination.TOPIC_TYPE) });
bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination(
"TEST", ActiveMQDestination.QUEUE_TYPE) });
bridge.start();
NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>",
ActiveMQDestination.TOPIC_TYPE)));
configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
"TEST", ActiveMQDestination.QUEUE_TYPE)));
configureAndStartBridge(configuration);
assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE);
assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE);
}
public void testWildcardOnTwoExcludedDestination() throws Exception {
NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
bridge.setExcludedDestinations(new ActiveMQDestination[] {
ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE),
ActiveMQDestination.createDestination("TEST.X1", ActiveMQDestination.QUEUE_TYPE) });
bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination(
"TEST.X2", ActiveMQDestination.QUEUE_TYPE) });
bridge.start();
configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE),
ActiveMQDestination.createDestination("TEST.X1", ActiveMQDestination.QUEUE_TYPE)));
configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
"TEST.X2", ActiveMQDestination.QUEUE_TYPE)));
configureAndStartBridge(configuration);
assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE);
assertReceiveNoMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE);
assertReceiveNoMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE);
}
public void testWildcardOnDynamicallyIncludedDestination() throws Exception {
bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] {
ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE),
ActiveMQDestination.createDestination("TEST.X2", ActiveMQDestination.QUEUE_TYPE) });
bridge.start();
NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.QUEUE_TYPE),
ActiveMQDestination.createDestination("TEST.X2", ActiveMQDestination.QUEUE_TYPE)));
configureAndStartBridge(configuration);
assertReceiveMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE);
assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE);
@ -78,11 +88,14 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
public void testDistinctTopicAndQueue() throws Exception {
bridge.setExcludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination(">",
ActiveMQDestination.TOPIC_TYPE) });
bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination(
">", ActiveMQDestination.QUEUE_TYPE) });
bridge.start();
NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(">",
ActiveMQDestination.TOPIC_TYPE)));
configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
">", ActiveMQDestination.QUEUE_TYPE)));
configureAndStartBridge(configuration);
assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE);
assertReceiveNoMessageOn("TEST", ActiveMQDestination.TOPIC_TYPE);
@ -90,14 +103,14 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
public void testListOfExcludedDestinationWithWildcard() throws Exception {
bridge.setExcludedDestinations(new ActiveMQDestination[] {
ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.TOPIC_TYPE),
ActiveMQDestination.createDestination("TEST.*", ActiveMQDestination.TOPIC_TYPE) });
NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { ActiveMQDestination.createDestination(
"TEST.X1", ActiveMQDestination.QUEUE_TYPE) });
configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("OTHER.>", ActiveMQDestination.TOPIC_TYPE),
ActiveMQDestination.createDestination("TEST.*", ActiveMQDestination.TOPIC_TYPE)));
configuration.setDynamicallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
"TEST.X1", ActiveMQDestination.QUEUE_TYPE)));
bridge.start();
configureAndStartBridge(configuration);
assertReceiveMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE);
assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE);
@ -143,11 +156,7 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
protected void setUp() throws Exception {
super.setUp();
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setBrokerName("local");
config.setDispatchAsync(false);
bridge = new DemandForwardingBridge(config, createTransport(), createRemoteTransport());
bridge.setBrokerService(broker);
producerConnection = createConnection();
ConnectionInfo producerConnectionInfo = createConnectionInfo();
@ -177,4 +186,26 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
junit.textui.TestRunner.run(suite());
}
public NetworkBridgeConfiguration getDefaultBridgeConfiguration() {
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setBrokerName("local");
config.setDispatchAsync(false);
return config;
}
private void configureAndStartBridge(NetworkBridgeConfiguration configuration) throws Exception {
bridge = new DemandForwardingBridge(configuration, createTransport(), createRemoteTransport());
bridge.setBrokerService(broker);
bridge.setDynamicallyIncludedDestinations(configuration.getDynamicallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]
));
bridge.setExcludedDestinations(configuration.getExcludedDestinations().toArray(
new ActiveMQDestination[configuration.getExcludedDestinations().size()]
));
bridge.setStaticallyIncludedDestinations(configuration.getStaticallyIncludedDestinations().toArray(
new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]
));
bridge.start();
}
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnection;
import org.junit.Test;
import javax.jms.MessageProducer;
import javax.jms.TemporaryQueue;
import java.lang.reflect.Field;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertNotNull;
/**
* @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
*/
public class DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetworkTest {
private static final int REMOTE_BROKER_TCP_PORT = 61617;
@Override
protected String getLocalBrokerURI() {
return "org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml";
}
@Override
protected BrokerService createRemoteBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("remoteBroker");
broker.addConnector("tcp://localhost:" + REMOTE_BROKER_TCP_PORT);
return broker;
}
// we have to override this, because with dynamicallyIncludedDestinations working properly
// (see https://issues.apache.org/jira/browse/AMQ-4209) you can't get request/response
// with temps working (there is no wild card like there is for staticallyIncludedDest)
//
@Override
public void testRequestReply() throws Exception {
}
@Test
public void testTempQueues() throws Exception {
TemporaryQueue temp = localSession.createTemporaryQueue();
MessageProducer producer = localSession.createProducer(temp);
producer.send(localSession.createTextMessage("test"));
Thread.sleep(100);
assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length);
temp.delete();
Thread.sleep(100);
assertEquals("Destination not deleted", 0, remoteBroker.getAdminView().getTemporaryQueues().length);
}
@Test
public void testDynamicallyIncludedDestinationsForDuplex() throws Exception{
// Once the bridge is set up, we should see the filter used for the duplex end of the bridge
// only subscribe to the specific destinations included in the <dynamicallyIncludedDestinations> list
// so let's test that the filter is correct, let's also test the subscription on the localbroker
// is correct
// the bridge on the remote broker has the correct filter
TransportConnection bridgeConnection = getDuplexBridgeConnectionFromRemote();
assertNotNull(bridgeConnection);
DemandForwardingBridge duplexBridge = getDuplexBridgeFromConnection(bridgeConnection);
assertNotNull(duplexBridge);
NetworkBridgeConfiguration configuration = getConfigurationFromNetworkBridge(duplexBridge);
assertNotNull(configuration);
assertFalse("This destinationFilter does not include ONLY the destinations specified in dynamicallyIncludedDestinations",
configuration.getDestinationFilter().equals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">"));
assertEquals("There are other patterns in the destinationFilter that shouldn't be there",
"ActiveMQ.Advisory.Consumer.Queue.include.test.foo,ActiveMQ.Advisory.Consumer.Topic.include.test.bar",
configuration.getDestinationFilter());
}
private NetworkBridgeConfiguration getConfigurationFromNetworkBridge(DemandForwardingBridgeSupport duplexBridge) throws NoSuchFieldException, IllegalAccessException {
Field f = DemandForwardingBridgeSupport.class.getDeclaredField("configuration");
f.setAccessible(true);
NetworkBridgeConfiguration configuration = (NetworkBridgeConfiguration) f.get(duplexBridge);
return configuration;
}
private DemandForwardingBridge getDuplexBridgeFromConnection(TransportConnection bridgeConnection) throws NoSuchFieldException, IllegalAccessException {
Field f = TransportConnection.class.getDeclaredField("duplexBridge");
f.setAccessible(true);
DemandForwardingBridge bridge = (DemandForwardingBridge) f.get(bridgeConnection);
return bridge;
}
public TransportConnection getDuplexBridgeConnectionFromRemote() {
TransportConnection duplexBridgeConnectionFromRemote =
remoteBroker.getTransportConnectorByName("tcp://localhost:" + REMOTE_BROKER_TCP_PORT)
.getConnections().get(0);
return duplexBridgeConnectionFromRemote;
}
}

View File

@ -28,4 +28,11 @@ public class MulticastNetworkTest extends SimpleNetworkTest {
protected String getLocalBrokerURI() {
return "org/apache/activemq/network/multicast/localBroker.xml";
}
// blocked out for multi cast because temp dest request reply isn't supported
// with dynamicallyAddedDestinations
@Override
public void testRequestReply() throws Exception {
}
}

View File

@ -62,7 +62,9 @@ public class NetworkConnectionsCleanedupTest extends TestCase {
protected ActiveMQTopic excluded;
protected String consumerName = "durableSubs";
public void testNetworkConnections() throws Exception {
// skip this test. it runs for an hour, doesn't assert anything, and could probably
// just be removed (seems like a throwaway impl for https://issues.apache.org/activemq/browse/AMQ-1202)
public void skipTestNetworkConnections() throws Exception {
String uri = "static:(tcp://localhost:61617)?initialReconnectDelay=100";
List<ActiveMQDestination> list = new ArrayList<ActiveMQDestination>();
for (int i =0;i < 100;i++){

View File

@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="localBroker" start="false" persistent="true" useShutdownHook="false" xmlns="http://activemq.apache.org/schema/core">
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61617)"
duplex="true"
dynamicOnly = "false"
conduitSubscriptions = "true"
decreaseNetworkConsumerPriority = "false">
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
<excludedDestinations>
<queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
</broker>
</beans>

View File

@ -32,11 +32,6 @@
conduitSubscriptions = "true"
decreaseNetworkConsumerPriority = "false">
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
<excludedDestinations>
<queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/>