From 332c35ca18fba9316480160d4d294abda4e680bc Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Wed, 23 Nov 2011 17:44:52 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3384 - destinationFilter config out of dynamically included destinations git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1205508 13f79535-47bb-0310-9956-ffa450edef68 --- .../DemandForwardingBridgeSupport.java | 61 ++++++------------- .../network/NetworkBridgeConfiguration.java | 34 ++++++++++- .../network/NetworkDestinationFilterTest.java | 45 ++++++++++++++ .../network/multicast/localBroker.xml | 7 ++- .../network/multicast/remoteBroker.xml | 7 ++- 5 files changed, 105 insertions(+), 49 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index fb9a20dc1d..a70fd854e2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -16,17 +16,6 @@ */ package org.apache.activemq.network; -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.security.cert.X509Certificate; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import javax.management.ObjectName; import org.apache.activemq.Service; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; @@ -37,47 +26,31 @@ import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTempDestination; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.BrokerId; -import org.apache.activemq.command.BrokerInfo; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.ConnectionError; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.DataStructure; -import org.apache.activemq.command.DestinationInfo; -import org.apache.activemq.command.ExceptionResponse; -import org.apache.activemq.command.KeepAliveInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.NetworkBridgeFilter; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.RemoveInfo; -import org.apache.activemq.command.Response; -import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.command.ShutdownInfo; -import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.command.*; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.transport.DefaultTransportListener; -import org.apache.activemq.transport.FutureResponse; -import org.apache.activemq.transport.ResponseCallback; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportDisposedIOException; -import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.transport.*; import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.ObjectName; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.cert.X509Certificate; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + /** * A useful base class for implementing demand forwarding bridges. * @@ -311,7 +284,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // determine demand. demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync()); - String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + configuration.getDestinationFilter(); + String advisoryTopic = configuration.getDestinationFilter(); if (configuration.isBridgeTempDestinations()) { advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index 92eb9b8aef..726fc0796b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -16,10 +16,12 @@ */ package org.apache.activemq.network; -import java.util.List; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; +import java.util.List; + /** * Configuration for a NetworkBridge * @@ -39,7 +41,7 @@ public class NetworkBridgeConfiguration { private String brokerURL = ""; private String userName; private String password; - private String destinationFilter = ">"; + private String destinationFilter = null; private String name = "NC"; private List excludedDestinations; @@ -211,7 +213,33 @@ public class NetworkBridgeConfiguration { * @return the destinationFilter */ public String getDestinationFilter() { - return this.destinationFilter; + if (this.destinationFilter == null) { + if (dynamicallyIncludedDestinations != null && !dynamicallyIncludedDestinations.isEmpty()) { + StringBuffer filter = new StringBuffer(); + String delimiter = ""; + for (ActiveMQDestination destination : dynamicallyIncludedDestinations) { + if (!destination.isTemporary()) { + filter.append(delimiter); + filter.append(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX); + filter.append(destination.getDestinationTypeAsString()); + filter.append("."); + filter.append(destination.getPhysicalName()); + delimiter = ","; + } + } + return filter.toString(); + } else { + return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">"; + } + } else { + // prepend consumer advisory prefix + // to keep backward compatibility + if (!this.destinationFilter.startsWith(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX)) { + return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + this.destinationFilter; + } else { + return this.destinationFilter; + } + } } /** diff --git a/activemq-core/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java b/activemq-core/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java new file mode 100644 index 0000000000..80c585500a --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import junit.framework.TestCase; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTopic; + +import java.util.ArrayList; +import java.util.List; + +public class NetworkDestinationFilterTest extends TestCase { + + public void testFilter() throws Exception { + NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); + assertEquals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">", config.getDestinationFilter()); + List dests = new ArrayList(); + config.setDynamicallyIncludedDestinations(dests); + assertEquals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">", config.getDestinationFilter()); + dests.add(new ActiveMQQueue("TEST.>")); + dests.add(new ActiveMQTopic("TEST.>")); + dests.add(new ActiveMQTempQueue("TEST.>")); + String prefix = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX; + assertEquals(prefix + "Queue.TEST.>," + prefix + "Topic.TEST.>", config.getDestinationFilter()); + } + + +} diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml b/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml index 6310b679e3..a8e9c8fc1d 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/network/multicast/localBroker.xml @@ -15,7 +15,12 @@ See the License for the specific language governing permissions and limitations under the License. --> - + diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml b/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml index c8e5facaa2..6eede505c9 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/network/multicast/remoteBroker.xml @@ -15,7 +15,12 @@ See the License for the specific language governing permissions and limitations under the License. --> - +