From e48ff70d4b43b6b29a3c238b2aa835da89e1472c Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Tue, 17 Feb 2009 12:04:57 +0000 Subject: [PATCH] resolve AMQ-2111, only the network subscription that causes the initial subscription needs to be tracked as this is the one that will be propagated via an advisory to the rest of the network. The existing update was making a mod to a possibly inflight advisory message git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@744983 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/network/DemandSubscription.java | 10 +----- .../MultiBrokersMultiClientsTest.java | 33 +++++++++++++++++-- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java index 7486a83002..02d7edb32a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java @@ -37,8 +37,6 @@ public class DemandSubscription { DemandSubscription(ConsumerInfo info) { remoteInfo = info; localInfo = info.copy(); - localInfo.setSelector(info.getSelector()); - localInfo.setBrokerPath(info.getBrokerPath()); localInfo.setNetworkSubscription(true); remoteSubsIds.add(info.getConsumerId()); } @@ -49,10 +47,7 @@ public class DemandSubscription { * @param id * @return true if added */ - public boolean add(ConsumerId id) { - if (localInfo != null) { - localInfo.addNetworkConsumerId(id); - } + public boolean add(ConsumerId id) { return remoteSubsIds.add(id); } @@ -63,9 +58,6 @@ public class DemandSubscription { * @return true if removed */ public boolean remove(ConsumerId id) { - if (localInfo != null) { - localInfo.removeNetworkConsumerId(id); - } return remoteSubsIds.remove(id); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java index 5f9323b770..9dd59384cf 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java @@ -16,9 +16,11 @@ */ package org.apache.activemq.usecases; +import java.lang.Thread.UncaughtExceptionHandler; import java.net.URI; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -27,17 +29,22 @@ import javax.jms.MessageConsumer; import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.util.MessageIdList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * @version $Revision: 1.1.1.1 $ */ -public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport { - public static final int BROKER_COUNT = 2; // number of brokers to network - public static final int CONSUMER_COUNT = 3; // consumers per broker +public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler { + public static final int BROKER_COUNT = 6; // number of brokers to network + public static final int CONSUMER_COUNT = 25; // consumers per broker public static final int PRODUCER_COUNT = 3; // producers per broker public static final int MESSAGE_COUNT = 20; // messages per producer + private static final Log LOG = LogFactory.getLog(MultiBrokersMultiClientsTest.class); + protected Map consumerMap; + Map unhandeledExceptions = new HashMap(); public void testTopicAllConnected() throws Exception { bridgeAllBrokers(); @@ -78,6 +85,15 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport } } + assertNoUnhandeledExceptions(); + } + + private void assertNoUnhandeledExceptions() { + for( Entry e: unhandeledExceptions.entrySet()) { + LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue()); + } + assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions, + unhandeledExceptions.isEmpty()); } public void testQueueAllConnected() throws Exception { @@ -121,12 +137,17 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport } } assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, totalMsg); + + assertNoUnhandeledExceptions(); } public void setUp() throws Exception { super.setAutoFail(true); super.setUp(); + unhandeledExceptions.clear(); + Thread.setDefaultUncaughtExceptionHandler(this); + // Setup n brokers for (int i = 1; i <= BROKER_COUNT; i++) { createBroker(new URI("broker:()/Broker" + i + "?persistent=false&useJmx=false")); @@ -134,4 +155,10 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport consumerMap = new HashMap(); } + + public void uncaughtException(Thread t, Throwable e) { + synchronized(unhandeledExceptions) { + unhandeledExceptions.put(t,e); + } + } }