some additional tests for AMQ-2198|https://issues.apache.org/activemq/browse/AMQ-2198, also additional logging

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@763983 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-04-10 17:31:40 +00:00
parent bcbb1ec1de
commit 3da13732ec
7 changed files with 213 additions and 17 deletions

View File

@ -113,7 +113,7 @@ public abstract class AbstractRegion implements Region {
} }
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
LOG.debug("Adding destination: " + destination); LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
synchronized (destinationsMutex) { synchronized (destinationsMutex) {
Destination dest = destinations.get(destination); Destination dest = destinations.get(destination);
if (dest == null) { if (dest == null) {
@ -216,7 +216,7 @@ public abstract class AbstractRegion implements Region {
} }
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug("Adding consumer: " + info.getConsumerId()); LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " + info.getDestination());
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
if (destination != null && !destination.isPattern() && !destination.isComposite()) { if (destination != null && !destination.isPattern() && !destination.isComposite()) {
// lets auto-create the destination // lets auto-create the destination
@ -308,7 +308,7 @@ public abstract class AbstractRegion implements Region {
} }
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug("Removing consumer: " + info.getConsumerId()); LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " + info.getDestination());
Subscription sub = subscriptions.remove(info.getConsumerId()); Subscription sub = subscriptions.remove(info.getConsumerId());
//The sub could be removed elsewhere - see ConnectionSplitBroker //The sub could be removed elsewhere - see ConnectionSplitBroker

View File

@ -17,6 +17,7 @@
package org.apache.activemq.command; package org.apache.activemq.command;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -75,7 +76,9 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
if (contains(message.getBrokerPath(), networkBrokerId)) { if (contains(message.getBrokerPath(), networkBrokerId)) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Message all ready routed once through this broker - ignoring: " + message); LOG.trace("Message all ready routed once through this broker ("
+ networkBrokerId + "), path: "
+ Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
} }
return false; return false;
} }

View File

@ -64,7 +64,7 @@ public class ConduitBridge extends DemandForwardingBridge {
DemandSubscription ds = (DemandSubscription)i.next(); DemandSubscription ds = (DemandSubscription)i.next();
if (filter.matches(ds.getLocalInfo().getDestination())) { if (filter.matches(ds.getLocalInfo().getDestination())) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " matched exsting sub (add interest) for : " + ds.getRemoteInfo() LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo()
+ " with sub: " + info); + " with sub: " + info);
} }
// add the interest in the subscription // add the interest in the subscription
@ -84,18 +84,17 @@ public class ConduitBridge extends DemandForwardingBridge {
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) { for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
DemandSubscription ds = (DemandSubscription)i.next(); DemandSubscription ds = (DemandSubscription)i.next();
ds.remove(id); if (ds.remove(id)) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id + " existing matched sub: " + ds.getRemoteInfo());
}
}
if (ds.isEmpty()) { if (ds.isEmpty()) {
tmpList.add(ds); tmpList.add(ds);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo());
}
} }
} }
for (Iterator<DemandSubscription> i = tmpList.iterator(); i.hasNext();) { for (Iterator<DemandSubscription> i = tmpList.iterator(); i.hasNext();) {
DemandSubscription ds = i.next(); DemandSubscription ds = i.next();
subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId());
removeSubscription(ds); removeSubscription(ds);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo()); LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo());

View File

@ -617,6 +617,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void removeSubscription(DemandSubscription sub) throws IOException { protected void removeSubscription(DemandSubscription sub) throws IOException {
if (sub != null) { if (sub != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
}
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
} }

View File

@ -113,12 +113,16 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
} }
// This will interconnect all brokes using multicast // This will interconnect all brokers using multicast
protected void bridgeAllBrokers() throws Exception { protected void bridgeAllBrokers() throws Exception {
bridgeAllBrokers("default", 1, false); bridgeAllBrokers("default", 1, false, false);
} }
protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception { protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception {
bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false);
}
protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception {
Collection<BrokerItem> brokerList = brokers.values(); Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) { for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker; BrokerService broker = i.next().broker;
@ -134,6 +138,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName); NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName);
nc.setNetworkTTL(ttl); nc.setNetworkTTL(ttl);
nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs); nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs);
nc.setDecreaseNetworkConsumerPriority(decreasePriority);
} }
// Multicasting may take longer to setup // Multicasting may take longer to setup

View File

@ -20,6 +20,8 @@ import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
@ -30,11 +32,14 @@ import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList; import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** /**
* @version $Revision: 1.1.1.1 $ * @version $Revision: 1.1.1.1 $
*/ */
public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
private static final Log LOG = LogFactory.getLog(ThreeBrokerQueueNetworkTest.class);
protected static final int MESSAGE_COUNT = 100; protected static final int MESSAGE_COUNT = 100;
/** /**
@ -243,9 +248,6 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()); assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
} }
/**
* BrokerA <-> BrokerB <-> BrokerC
*/
public void testAllConnectedUsingMulticast() throws Exception { public void testAllConnectedUsingMulticast() throws Exception {
// Setup broker networks // Setup broker networks
bridgeAllBrokers(); bridgeAllBrokers();
@ -276,6 +278,156 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()); assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount());
} }
public void testAllConnectedUsingMulticastProducerConsumerOnA() throws Exception {
bridgeAllBrokers("default", 3, false);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
int messageCount = 2000;
CountDownLatch messagesReceived = new CountDownLatch(messageCount);
MessageConsumer clientA = createConsumer("BrokerA", dest, messagesReceived);
// Let's try to wait for advisory percolation.
Thread.sleep(1000);
// Send messages
sendMessages("BrokerA", dest, messageCount);
assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
assertEquals(messageCount, msgsA.getMessageCount());
}
public void testAllConnectedWithSpare() throws Exception {
bridgeAllBrokers("default", 3, false);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
int messageCount = 2000;
CountDownLatch messagesReceived = new CountDownLatch(messageCount);
MessageConsumer clientA = createConsumer("BrokerA", dest, messagesReceived);
// ensure advisory percolation.
Thread.sleep(1000);
// Send messages
sendMessages("BrokerB", dest, messageCount);
assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
assertEquals(messageCount, msgsA.getMessageCount());
}
public void testMigrateConsumerStuckMessages() throws Exception {
boolean suppressQueueDuplicateSubscriptions = false;
bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
LOG.info("Consumer on A");
MessageConsumer clientA = createConsumer("BrokerA", dest);
// ensure advisors have percolated
Thread.sleep(500);
LOG.info("Consumer on B");
int messageCount = 2000;
// will only get half of the messages
CountDownLatch messagesReceived = new CountDownLatch(messageCount/2);
MessageConsumer clientB = createConsumer("BrokerB", dest, messagesReceived);
// ensure advisors have percolated
Thread.sleep(500);
LOG.info("Close consumer on A");
clientA.close();
// ensure advisors have percolated
Thread.sleep(500);
LOG.info("Send to B");
sendMessages("BrokerB", dest, messageCount);
// Let's try to wait for any messages.
assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
// Get message count
MessageIdList msgs = getConsumerMessages("BrokerB", clientB);
// see will any more arrive
Thread.sleep(500);
assertEquals(messageCount/2, msgs.getMessageCount());
// pick up the stuck messages
messagesReceived = new CountDownLatch(messageCount/2);
clientA = createConsumer("BrokerA", dest, messagesReceived);
// Let's try to wait for any messages.
assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
msgs = getConsumerMessages("BrokerA", clientA);
assertEquals(messageCount/2, msgs.getMessageCount());
}
// use case: for maintenance, migrate consumers and producers from one
// node in the network to another so node can be replaced/updated
public void testMigrateConsumer() throws Exception {
boolean suppressQueueDuplicateSubscriptions = true;
boolean decreaseNetworkConsumerPriority = true;
bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
LOG.info("Consumer on A");
MessageConsumer clientA = createConsumer("BrokerA", dest);
// ensure advisors have percolated
Thread.sleep(500);
LOG.info("Consumer on B");
int messageCount = 2000;
CountDownLatch messagesReceived = new CountDownLatch(messageCount);
MessageConsumer clientB = createConsumer("BrokerB", dest, messagesReceived);
// make the consumer slow so that any network consumer has a chance, even
// if it has a lower priority
MessageIdList msgs = getConsumerMessages("BrokerB", clientB);
msgs.setProcessingDelay(10);
// ensure advisors have percolated
Thread.sleep(500);
LOG.info("Close consumer on A");
clientA.close();
// ensure advisors have percolated
Thread.sleep(500);
LOG.info("Send to B");
sendMessages("BrokerB", dest, messageCount);
// Let's try to wait for any messages.
assertTrue(messagesReceived.await(30, TimeUnit.SECONDS));
assertEquals(messageCount, msgs.getMessageCount());
}
public void testNoDuplicateQueueSubs() throws Exception { public void testNoDuplicateQueueSubs() throws Exception {
bridgeAllBrokers("default", 3, true); bridgeAllBrokers("default", 3, true);

View File

@ -99,6 +99,40 @@ public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultip
} }
/**
* BrokerA -> BrokerB && BrokerB -> BrokerA
*/
public void testDuplexStaticRemoteBrokerHasNoConsumer() throws Exception {
// Setup broker networks
boolean dynamicOnly = true;
int networkTTL = 2;
boolean conduit = true;
bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduit);
bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduit);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerA", dest);
Thread.sleep(2*1000);
int messageCount = 2000;
// Send messages
sendMessages("BrokerA", dest, messageCount);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
msgsA.waitForMessagesToArrive(messageCount);
assertEquals(messageCount, msgsA.getMessageCount());
}
public void setUp() throws Exception { public void setUp() throws Exception {
super.setAutoFail(true); super.setAutoFail(true);
super.setUp(); super.setUp();