diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java index c01b6cd6de..3b7fde1ac7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java @@ -82,18 +82,14 @@ public class DurableConduitBridge extends ConduitBridge { } //add our original id to ourselves info.addNetworkConsumerId(info.getConsumerId()); - // not matched so create a new one - // but first, if it's durable - changed set the - // ConsumerId here - so it won't be removed if the - // durable subscriber goes away on the other end - if (info.isDurable() || (info.getDestination().isQueue() && !info.getDestination().isTemporary())) { - info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator - .getNextSequenceId())); - } + if (info.isDurable()) { // set the subscriber name to something reproducible - info.setSubscriptionName(getSubscriberName(info.getDestination())); + // and override the consumerId with something unique so that it won't + // be removed if the durable subscriber (at the other end) goes away + info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator + .getNextSequenceId())); } info.setSelector(null); return doCreateDemandSubscription(info); diff --git a/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java b/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java new file mode 100644 index 0000000000..854dcdfbf3 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.net.MalformedURLException; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class NetworkBrokerDetachTest extends TestCase { + + private final static String BROKER_NAME = "broker"; + private final static String REM_BROKER_NAME = "networkedBroker"; + private final static String QUEUE_NAME = "testQ"; + private final static int NUM_CONSUMERS = 1; + + protected static final Log LOG = LogFactory.getLog(NetworkBrokerDetachTest.class); + protected final int numRestarts = 3; + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName(BROKER_NAME); + broker.addConnector("tcp://localhost:61617"); + NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false"); + networkConnector.setDuplex(false); + return broker; + } + + protected BrokerService createNetworkedBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName(REM_BROKER_NAME); + broker.addConnector("tcp://localhost:62617"); + return broker; + } + + public void testNetworkedBrokerDetach() throws Exception { + BrokerService broker = createBroker(); + broker.start(); + + BrokerService networkedBroker = createNetworkedBroker(); + networkedBroker.start(); + + LOG.info("Creating Consumer on the networked broker ..."); + // Create a consumer on the networked broker + ConnectionFactory consFactory = createConnectionFactory(networkedBroker); + Connection consConn = consFactory.createConnection(); + Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + for(int i=0; i all = mbsc.queryMBeans(null, null); +// LOG.info("Total MBean count=" + all.size()); +// for (Object o : all) { +// ObjectInstance bean = (ObjectInstance)o; +// LOG.info(bean.getObjectName()); +// } + } catch (Exception ignored) { + } + return mbsc; + } + + private Object getAttribute(MBeanServerConnection mbsc, String type, String pattern, String attrName) throws Exception { + Object obj = mbsc.getAttribute(getObjectName(BROKER_NAME, type, pattern), attrName); + return obj; + } + + private ObjectName getObjectName(String brokerName, String type, String pattern) throws Exception { + ObjectName beanName = new ObjectName( + "org.apache.activemq:BrokerName=" + brokerName + ",Type=" + type +"," + pattern + ); + + return beanName; + } +}