apply patch from AMQ-2109, with thanks

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@747384 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-02-24 14:05:28 +00:00
parent 2de531cb52
commit 9d9d4cd69d
2 changed files with 167 additions and 9 deletions

View File

@ -82,18 +82,14 @@ public class DurableConduitBridge extends ConduitBridge {
} }
//add our original id to ourselves //add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId()); info.addNetworkConsumerId(info.getConsumerId());
// not matched so create a new one
// but first, if it's durable - changed set the
// ConsumerId here - so it won't be removed if the
// durable subscriber goes away on the other end
if (info.isDurable() || (info.getDestination().isQueue() && !info.getDestination().isTemporary())) {
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator
.getNextSequenceId()));
}
if (info.isDurable()) { if (info.isDurable()) {
// set the subscriber name to something reproducible // set the subscriber name to something reproducible
info.setSubscriptionName(getSubscriberName(info.getDestination())); info.setSubscriptionName(getSubscriberName(info.getDestination()));
// and override the consumerId with something unique so that it won't
// be removed if the durable subscriber (at the other end) goes away
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator
.getNextSequenceId()));
} }
info.setSelector(null); info.setSelector(null);
return doCreateDemandSubscription(info); return doCreateDemandSubscription(info);

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.net.MalformedURLException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class NetworkBrokerDetachTest extends TestCase {
private final static String BROKER_NAME = "broker";
private final static String REM_BROKER_NAME = "networkedBroker";
private final static String QUEUE_NAME = "testQ";
private final static int NUM_CONSUMERS = 1;
protected static final Log LOG = LogFactory.getLog(NetworkBrokerDetachTest.class);
protected final int numRestarts = 3;
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName(BROKER_NAME);
broker.addConnector("tcp://localhost:61617");
NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
networkConnector.setDuplex(false);
return broker;
}
protected BrokerService createNetworkedBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName(REM_BROKER_NAME);
broker.addConnector("tcp://localhost:62617");
return broker;
}
public void testNetworkedBrokerDetach() throws Exception {
BrokerService broker = createBroker();
broker.start();
BrokerService networkedBroker = createNetworkedBroker();
networkedBroker.start();
LOG.info("Creating Consumer on the networked broker ...");
// Create a consumer on the networked broker
ConnectionFactory consFactory = createConnectionFactory(networkedBroker);
Connection consConn = consFactory.createConnection();
Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
for(int i=0; i<NUM_CONSUMERS; i++) {
MessageConsumer consumer = consSession.createConsumer(consSession.createQueue(QUEUE_NAME));
}
Thread.sleep(5000);
MBeanServerConnection mbsc = getMBeanServerConnection();
// We should have 1 consumer for the queue on the local broker
Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
assertEquals(1L, ((Long)consumers).longValue());
LOG.info("Stopping Consumer on the networked broker ...");
// Closing the connection will also close the consumer
consConn.close();
Thread.sleep(5000);
// We should have 0 consumer for the queue on the local broker
consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
assertEquals(0L, ((Long)consumers).longValue());
networkedBroker.stop();
networkedBroker.waitUntilStopped();
broker.stop();
broker.waitUntilStopped();
}
protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connectionFactory.setOptimizedMessageDispatch(true);
connectionFactory.setCopyMessageOnSend(false);
connectionFactory.setUseCompression(false);
connectionFactory.setDispatchAsync(false);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setOptimizeAcknowledge(false);
connectionFactory.setWatchTopicAdvisories(true);
ActiveMQPrefetchPolicy qPrefetchPolicy= new ActiveMQPrefetchPolicy();
qPrefetchPolicy.setQueuePrefetch(100);
qPrefetchPolicy.setTopicPrefetch(1000);
connectionFactory.setPrefetchPolicy(qPrefetchPolicy);
connectionFactory.setAlwaysSyncSend(true);
return connectionFactory;
}
// JMX Helper Methods
private MBeanServerConnection getMBeanServerConnection() throws MalformedURLException {
final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
MBeanServerConnection mbsc = null;
try {
JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
mbsc = jmxc.getMBeanServerConnection();
// // trace all existing MBeans
// Set<?> all = mbsc.queryMBeans(null, null);
// LOG.info("Total MBean count=" + all.size());
// for (Object o : all) {
// ObjectInstance bean = (ObjectInstance)o;
// LOG.info(bean.getObjectName());
// }
} catch (Exception ignored) {
}
return mbsc;
}
private Object getAttribute(MBeanServerConnection mbsc, String type, String pattern, String attrName) throws Exception {
Object obj = mbsc.getAttribute(getObjectName(BROKER_NAME, type, pattern), attrName);
return obj;
}
private ObjectName getObjectName(String brokerName, String type, String pattern) throws Exception {
ObjectName beanName = new ObjectName(
"org.apache.activemq:BrokerName=" + brokerName + ",Type=" + type +"," + pattern
);
return beanName;
}
}