resolve AMQ-2111, only the network subscription that causes the initial subscription needs to be tracked as this is the one that will be propagated via an advisory to the rest of the network. The existing update was making a mod to a possibly inflight advisory message

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@744983 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-02-17 12:04:57 +00:00
parent 9f8777d3a0
commit e48ff70d4b
2 changed files with 31 additions and 12 deletions

View File

@ -37,8 +37,6 @@ public class DemandSubscription {
DemandSubscription(ConsumerInfo info) { DemandSubscription(ConsumerInfo info) {
remoteInfo = info; remoteInfo = info;
localInfo = info.copy(); localInfo = info.copy();
localInfo.setSelector(info.getSelector());
localInfo.setBrokerPath(info.getBrokerPath());
localInfo.setNetworkSubscription(true); localInfo.setNetworkSubscription(true);
remoteSubsIds.add(info.getConsumerId()); remoteSubsIds.add(info.getConsumerId());
} }
@ -50,9 +48,6 @@ public class DemandSubscription {
* @return true if added * @return true if added
*/ */
public boolean add(ConsumerId id) { public boolean add(ConsumerId id) {
if (localInfo != null) {
localInfo.addNetworkConsumerId(id);
}
return remoteSubsIds.add(id); return remoteSubsIds.add(id);
} }
@ -63,9 +58,6 @@ public class DemandSubscription {
* @return true if removed * @return true if removed
*/ */
public boolean remove(ConsumerId id) { public boolean remove(ConsumerId id) {
if (localInfo != null) {
localInfo.removeNetworkConsumerId(id);
}
return remoteSubsIds.remove(id); return remoteSubsIds.remove(id);
} }

View File

@ -16,9 +16,11 @@
*/ */
package org.apache.activemq.usecases; package org.apache.activemq.usecases;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.URI; import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -27,17 +29,22 @@ import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.util.MessageIdList; import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** /**
* @version $Revision: 1.1.1.1 $ * @version $Revision: 1.1.1.1 $
*/ */
public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport { public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler {
public static final int BROKER_COUNT = 2; // number of brokers to network public static final int BROKER_COUNT = 6; // number of brokers to network
public static final int CONSUMER_COUNT = 3; // consumers per broker public static final int CONSUMER_COUNT = 25; // consumers per broker
public static final int PRODUCER_COUNT = 3; // producers per broker public static final int PRODUCER_COUNT = 3; // producers per broker
public static final int MESSAGE_COUNT = 20; // messages per producer public static final int MESSAGE_COUNT = 20; // messages per producer
private static final Log LOG = LogFactory.getLog(MultiBrokersMultiClientsTest.class);
protected Map consumerMap; protected Map consumerMap;
Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
public void testTopicAllConnected() throws Exception { public void testTopicAllConnected() throws Exception {
bridgeAllBrokers(); bridgeAllBrokers();
@ -78,6 +85,15 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
} }
} }
assertNoUnhandeledExceptions();
}
private void assertNoUnhandeledExceptions() {
for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) {
LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue());
}
assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions,
unhandeledExceptions.isEmpty());
} }
public void testQueueAllConnected() throws Exception { public void testQueueAllConnected() throws Exception {
@ -121,12 +137,17 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
} }
} }
assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, totalMsg); assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, totalMsg);
assertNoUnhandeledExceptions();
} }
public void setUp() throws Exception { public void setUp() throws Exception {
super.setAutoFail(true); super.setAutoFail(true);
super.setUp(); super.setUp();
unhandeledExceptions.clear();
Thread.setDefaultUncaughtExceptionHandler(this);
// Setup n brokers // Setup n brokers
for (int i = 1; i <= BROKER_COUNT; i++) { for (int i = 1; i <= BROKER_COUNT; i++) {
createBroker(new URI("broker:()/Broker" + i + "?persistent=false&useJmx=false")); createBroker(new URI("broker:()/Broker" + i + "?persistent=false&useJmx=false"));
@ -134,4 +155,10 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
consumerMap = new HashMap(); consumerMap = new HashMap();
} }
public void uncaughtException(Thread t, Throwable e) {
synchronized(unhandeledExceptions) {
unhandeledExceptions.put(t,e);
}
}
} }