https://issues.apache.org/jira/browse/AMQ-2327 - resolve. key is not to conduit proxy/proxy consumers b/c the dependencies cannot be easily resolved without more network traffic on additions to demand subs. Maintaining the order of consumer advisories fixes duplicate suppression. thanks for the easymock test. while brittle it did help focus on checkpaths which was key

This commit is contained in:
gtully 2013-09-06 22:45:35 +01:00
parent 5515b9be3f
commit 6c5732bc5c
9 changed files with 550 additions and 65 deletions

View File

@ -16,10 +16,12 @@
*/
package org.apache.activemq.advisory;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
@ -52,7 +54,34 @@ public class AdvisoryBroker extends BrokerFilter {
private static final IdGenerator ID_GENERATOR = new IdGenerator();
protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId, ConsumerInfo>();
class ConsumerIdKey {
final ConsumerId delegate;
final long creationTime = System.currentTimeMillis();
ConsumerIdKey(ConsumerId id) {
delegate = id;
}
@Override
public boolean equals(Object other) {
return delegate.equals(other);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
}
// replay consumer advisory messages in the order in which they arrive - allows duplicate suppression in
// mesh networks with ttl>1
protected final Map<ConsumerIdKey, ConsumerInfo> consumers = new ConcurrentSkipListMap<ConsumerIdKey, ConsumerInfo>(
new Comparator<ConsumerIdKey>() {
@Override
public int compare(ConsumerIdKey o1, ConsumerIdKey o2) {
return (o1.creationTime < o2.creationTime ? -1 : (o1.delegate==o2.delegate ? 0 : 1));
}
}
);
protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
@ -84,7 +113,7 @@ public class AdvisoryBroker extends BrokerFilter {
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.put(info.getConsumerId(), info);
consumers.put(new ConsumerIdKey(info.getConsumerId()), info);
fireConsumerAdvisory(context, info.getDestination(), topic, info);
} else {
// We need to replay all the previously collected state objects
@ -247,7 +276,7 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQDestination dest = info.getDestination();
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
consumers.remove(info.getConsumerId());
consumers.remove(new ConsumerIdKey(info.getConsumerId()));
if (!dest.isTemporary() || destinations.containsKey(dest)) {
fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
}
@ -575,7 +604,7 @@ public class AdvisoryBroker extends BrokerFilter {
return connections;
}
public Map<ConsumerId, ConsumerInfo> getAdvisoryConsumers() {
public Map<ConsumerIdKey, ConsumerInfo> getAdvisoryConsumers() {
return consumers;
}

View File

@ -57,38 +57,24 @@ public class ConduitBridge extends DemandForwardingBridge {
return doCreateDemandSubscription(info);
}
protected boolean checkPaths(BrokerId[] first, BrokerId[] second) {
if (first == null || second == null) {
return true;
}
if (Arrays.equals(first, second)) {
return true;
}
if (first[0].equals(second[0]) && first[first.length - 1].equals(second[second.length - 1])) {
return false;
} else {
return true;
}
}
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
// search through existing subscriptions and see if we have a match
if (info.isNetworkSubscription()) {
return false;
}
boolean matched = false;
for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
if (filter.matches(info.getDestination())) {
if (!ds.getRemoteInfo().isNetworkSubscription() && filter.matches(info.getDestination())) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " " + info + " with ids" + info.getNetworkConsumerIds() + " matched (add interest) " + ds);
}
// add the interest in the subscription
if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
if (!info.isDurable()) {
ds.add(info.getConsumerId());
} else {
ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
}
if (!info.isDurable()) {
ds.add(info.getConsumerId());
} else {
ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
}
matched = true;
// continue - we want interest to any existing DemandSubscriptions

View File

@ -75,7 +75,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
protected final Transport localBroker;
protected final Transport remoteBroker;
protected final IdGenerator idGenerator = new IdGenerator();
protected IdGenerator idGenerator;
protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
protected ConnectionInfo localConnectionInfo;
protected ConnectionInfo remoteConnectionInfo;
@ -354,6 +354,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// Fill in the remote broker's information now.
remoteBrokerPath[0] = remoteBrokerId;
remoteBrokerName = remoteBrokerInfo.getBrokerName();
if (configuration.isUseBrokerNamesAsIdSeed()) {
idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
} else {
idGenerator = new IdGenerator();
}
} catch (Throwable e) {
serviceLocalException(e);
}

View File

@ -33,7 +33,7 @@ public class DurableConduitBridge extends ConduitBridge {
private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
public String toString() {
return "DurableConduitBridge";
return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName();
}
/**
* Constructor

View File

@ -61,6 +61,7 @@ public class NetworkBridgeConfiguration {
private boolean staticBridge = false;
private boolean useCompression = false;
private boolean advisoryForFailedForward = false;
private boolean useBrokerNamesAsIdSeed = true;
/**
* @return the conduitSubscriptions
@ -415,4 +416,12 @@ public class NetworkBridgeConfiguration {
public int getMessageTTL() {
return messageTTL;
}
public boolean isUseBrokerNamesAsIdSeed() {
return useBrokerNamesAsIdSeed;
}
public void setUseBrokerNameAsIdSees(boolean val) {
useBrokerNamesAsIdSeed = val;
}
}

View File

@ -288,6 +288,12 @@
<artifactId>jmock-legacy</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ftpserver</groupId>
<artifactId>ftpserver-core</artifactId>

View File

@ -0,0 +1,346 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IMocksControl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class NetworkRouteTest {
private IMocksControl control;
private BrokerService brokerService;
private Transport localBroker;
private Transport remoteBroker;
private TransportListener localListener;
private TransportListener remoteListener;
private MessageDispatch msgDispatch;
private ActiveMQMessage path1Msg;
private ActiveMQMessage path2Msg;
private ActiveMQMessage removePath1Msg;
private ActiveMQMessage removePath2Msg;
// this sort of mockery is very brittle but it is fast!
@Test
public void verifyNoRemoveOnOneConduitRemove() throws Exception {
EasyMock.expect(localBroker.request(EasyMock.isA(ConsumerInfo.class))).andReturn(null);
control.replay();
remoteListener.onCommand(path2Msg);
remoteListener.onCommand(path1Msg);
remoteListener.onCommand(removePath2Msg);
control.verify();
}
@Test
public void addAndRemoveOppositeOrder() throws Exception {
// from (1)
localBroker.request(EasyMock.isA(ConsumerInfo.class));
ArgHolder localConsumer = ArgHolder.holdArgsForLastObjectCall();
// from (2a)
remoteBroker.asyncRequest(EasyMock.isA(ActiveMQMessage.class), EasyMock.isA(ResponseCallback.class));
ArgHolder firstMessageFuture = ArgHolder.holdArgsForLastFutureRequestCall();
localBroker.oneway(EasyMock.isA(MessageAck.class));
// from (2b)
remoteBroker.asyncRequest(EasyMock.isA(ActiveMQMessage.class), EasyMock.isA(ResponseCallback.class));
ArgHolder secondMessageFuture = ArgHolder.holdArgsForLastFutureRequestCall();
localBroker.oneway(EasyMock.isA(MessageAck.class));
// from (3)
localBroker.oneway(EasyMock.isA(RemoveInfo.class));
ExpectationWaiter waitForRemove = ExpectationWaiter.waiterForLastVoidCall();
control.replay();
// (1) send advisory of path 1
remoteListener.onCommand(path1Msg);
msgDispatch.setConsumerId(((ConsumerInfo) localConsumer.arguments[0]).getConsumerId());
// send advisory of path 2, doesn't send a ConsumerInfo to localBroker
remoteListener.onCommand(path2Msg);
// (2a) send a message
localListener.onCommand(msgDispatch);
ResponseCallback callback = (ResponseCallback) firstMessageFuture.arguments[1];
FutureResponse response = new FutureResponse(callback);
response.set(new Response());
// send advisory of path 2 remove, doesn't send a RemoveInfo to localBroker
remoteListener.onCommand(removePath2Msg);
// (2b) send a message
localListener.onCommand(msgDispatch);
callback = (ResponseCallback) secondMessageFuture.arguments[1];
response = new FutureResponse(callback);
response.set(new Response());
// (3) send advisory of path 1 remove, sends a RemoveInfo to localBroker
remoteListener.onCommand(removePath1Msg);
waitForRemove.assertHappens(5, TimeUnit.SECONDS);
// send a message, does not send message as in 2a and 2b
localListener.onCommand(msgDispatch);
control.verify();
}
@Test
public void addAndRemoveSameOrder() throws Exception {
// from (1)
localBroker.request(EasyMock.isA(ConsumerInfo.class));
ArgHolder localConsumer = ArgHolder.holdArgsForLastObjectCall();
// from (2a)
remoteBroker.asyncRequest(EasyMock.isA(ActiveMQMessage.class), EasyMock.isA(ResponseCallback.class));
ArgHolder firstMessageFuture = ArgHolder.holdArgsForLastFutureRequestCall();
localBroker.oneway(EasyMock.isA(MessageAck.class));
// from (2b)
remoteBroker.asyncRequest(EasyMock.isA(ActiveMQMessage.class), EasyMock.isA(ResponseCallback.class));
ArgHolder secondMessageFuture = ArgHolder.holdArgsForLastFutureRequestCall();
localBroker.oneway(EasyMock.isA(MessageAck.class));
// from (3)
localBroker.oneway(EasyMock.isA(RemoveInfo.class));
ExpectationWaiter waitForRemove = ExpectationWaiter.waiterForLastVoidCall();
control.replay();
// (1) send advisory of path 1
remoteListener.onCommand(path1Msg);
msgDispatch.setConsumerId(((ConsumerInfo) localConsumer.arguments[0]).getConsumerId());
// send advisory of path 2, doesn't send a ConsumerInfo to localBroker
remoteListener.onCommand(path2Msg);
// (2a) send a message
localListener.onCommand(msgDispatch);
ResponseCallback callback = (ResponseCallback) firstMessageFuture.arguments[1];
FutureResponse response = new FutureResponse(callback);
response.set(new Response());
// send advisory of path 1 remove, shouldn't send a RemoveInfo to localBroker
remoteListener.onCommand(removePath1Msg);
// (2b) send a message, should send the message as in 2a
localListener.onCommand(msgDispatch);
callback = (ResponseCallback) secondMessageFuture.arguments[1];
response = new FutureResponse(callback);
response.set(new Response());
// (3) send advisory of path 1 remove, should send a RemoveInfo to localBroker
remoteListener.onCommand(removePath2Msg);
waitForRemove.assertHappens(5, TimeUnit.SECONDS);
// send a message, does not send message as in 2a
localListener.onCommand(msgDispatch);
control.verify();
}
@Before
public void before() throws Exception {
control = EasyMock.createControl();
localBroker = control.createMock(Transport.class);
remoteBroker = control.createMock(Transport.class);
NetworkBridgeConfiguration configuration = new NetworkBridgeConfiguration();
brokerService = new BrokerService();
BrokerInfo remoteBrokerInfo = new BrokerInfo();
configuration.setDuplex(true);
configuration.setNetworkTTL(5);
brokerService.setBrokerId("broker-1");
brokerService.setPersistent(false);
brokerService.setUseJmx(false);
brokerService.start();
brokerService.waitUntilStarted();
remoteBrokerInfo.setBrokerId(new BrokerId("remote-broker-id"));
remoteBrokerInfo.setBrokerName("remote-broker-name");
localBroker.setTransportListener(EasyMock.isA(TransportListener.class));
ArgHolder localListenerRef = ArgHolder.holdArgsForLastVoidCall();
remoteBroker.setTransportListener(EasyMock.isA(TransportListener.class));
ArgHolder remoteListenerRef = ArgHolder.holdArgsForLastVoidCall();
localBroker.start();
remoteBroker.start();
remoteBroker.oneway(EasyMock.isA(Object.class));
EasyMock.expectLastCall().times(4);
remoteBroker.oneway(EasyMock.isA(Object.class));
ExpectationWaiter remoteInitWaiter = ExpectationWaiter.waiterForLastVoidCall();
localBroker.oneway(remoteBrokerInfo);
EasyMock.expect(localBroker.request(EasyMock.isA(Object.class)))
.andReturn(null);
localBroker.oneway(EasyMock.isA(Object.class));
ExpectationWaiter localInitWaiter = ExpectationWaiter.waiterForLastVoidCall();
control.replay();
DurableConduitBridge bridge = new DurableConduitBridge(configuration, localBroker, remoteBroker);
bridge.setBrokerService(brokerService);
bridge.start();
localListener = (TransportListener) localListenerRef.getArguments()[0];
Assert.assertNotNull(localListener);
remoteListener = (TransportListener) remoteListenerRef.getArguments()[0];
Assert.assertNotNull(remoteListener);
remoteListener.onCommand(remoteBrokerInfo);
remoteInitWaiter.assertHappens(5, TimeUnit.SECONDS);
localInitWaiter.assertHappens(5, TimeUnit.SECONDS);
control.verify();
control.reset();
ActiveMQMessage msg = new ActiveMQMessage();
msg.setDestination(new ActiveMQTopic("test"));
msgDispatch = new MessageDispatch();
msgDispatch.setMessage(msg);
ConsumerInfo path1 = new ConsumerInfo();
path1.setDestination(msg.getDestination());
path1.setConsumerId(new ConsumerId(new SessionId(new ConnectionId("conn-id-1"), 1), 3));
path1.setBrokerPath(new BrokerId[]{
new BrokerId("remote-broker-id"),
new BrokerId("server(1)-broker-id"),
});
path1Msg = new ActiveMQMessage();
path1Msg.setDestination(AdvisorySupport.getConsumerAdvisoryTopic(path1.getDestination()));
path1Msg.setDataStructure(path1);
ConsumerInfo path2 = new ConsumerInfo();
path2.setDestination(path1.getDestination());
path2.setConsumerId(new ConsumerId(new SessionId(new ConnectionId("conn-id-2"), 2), 4));
path2.setBrokerPath(new BrokerId[]{
new BrokerId("remote-broker-id"),
new BrokerId("server(2)-broker-id"),
new BrokerId("server(1)-broker-id"),
});
path2Msg = new ActiveMQMessage();
path2Msg.setDestination(path1Msg.getDestination());
path2Msg.setDataStructure(path2);
RemoveInfo removePath1 = new RemoveInfo(path1.getConsumerId());
RemoveInfo removePath2 = new RemoveInfo(path2.getConsumerId());
removePath1Msg = new ActiveMQMessage();
removePath1Msg.setDestination(path1Msg.getDestination());
removePath1Msg.setDataStructure(removePath1);
removePath2Msg = new ActiveMQMessage();
removePath2Msg.setDestination(path1Msg.getDestination());
removePath2Msg.setDataStructure(removePath2);
}
@After
public void after() throws Exception {
control.reset();
brokerService.stop();
brokerService.waitUntilStopped();
}
private static class ArgHolder {
public Object[] arguments;
public static ArgHolder holdArgsForLastVoidCall() {
final ArgHolder holder = new ArgHolder();
EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
Object[] args = EasyMock.getCurrentArguments();
holder.arguments = Arrays.copyOf(args, args.length);
return null;
}
});
return holder;
}
public static ArgHolder holdArgsForLastObjectCall() {
final ArgHolder holder = new ArgHolder();
EasyMock.expect(new Object()).andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
Object[] args = EasyMock.getCurrentArguments();
holder.arguments = Arrays.copyOf(args, args.length);
return null;
}
});
return holder;
}
public static ArgHolder holdArgsForLastFutureRequestCall() {
final ArgHolder holder = new ArgHolder();
EasyMock.expect(new FutureResponse(null)).andAnswer(new IAnswer<FutureResponse>() {
@Override
public FutureResponse answer() throws Throwable {
Object[] args = EasyMock.getCurrentArguments();
holder.arguments = Arrays.copyOf(args, args.length);
return null;
}
});
return holder;
}
public Object[] getArguments() {
Assert.assertNotNull(arguments);
return arguments;
}
}
private static class ExpectationWaiter {
private CountDownLatch latch = new CountDownLatch(1);
public static ExpectationWaiter waiterForLastVoidCall() {
final ExpectationWaiter waiter = new ExpectationWaiter();
EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
waiter.latch.countDown();
return null;
}
});
return waiter;
}
public void assertHappens(long timeout, TimeUnit unit) throws InterruptedException {
Assert.assertTrue(latch.await(timeout, unit));
}
}
}

View File

@ -571,8 +571,8 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
public void testDuplicateQueueSubs() throws Exception {
createBroker("BrokerD");
configureBroker(createBroker("BrokerD"));
bridgeAllBrokers("default", 3, false);
startAllBrokers();
@ -596,7 +596,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
if (!brokerName.equals(broker.getBrokerName())) {
verifyConsumerCount(broker, 3, dest);
verifyConsumerCount(broker, 5, dest);
verifyConsumePriority(broker, ConsumerInfo.NORMAL_PRIORITY, dest);
}
}
@ -605,7 +605,14 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
// wait for advisories
Thread.sleep(2000);
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
if (!brokerName.equals(broker.getBrokerName())) {
logConsumerCount(broker, 0, dest);
}
}
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker;
verifyConsumerCount(broker, 0, dest);
@ -620,9 +627,21 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
}
});
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
LOG.info("Verify: consumer count on " + broker.getBrokerName() + " matches:" + count + ", val:" + internalQueue.getConsumers().size());
assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " + internalQueue, count, internalQueue.getConsumers().size());
}
private void logConsumerCount(BrokerService broker, int count, final Destination dest) throws Exception {
final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
waitFor(new Condition() {
public boolean isSatisified() throws Exception {
return !regionBroker.getDestinations(ActiveMQDestination.transform(dest)).isEmpty();
}
});
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
LOG.info("Verify: consumer count on " + broker.getBrokerName() + " matches:" + count + ", val:" + internalQueue.getConsumers().size());
}
private void verifyConsumePriority(BrokerService broker, byte expectedPriority, Destination dest) throws Exception {
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next();
@ -630,7 +649,12 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
assertEquals("consumer on " + broker.getBrokerName() + " matches priority: " + internalQueue, expectedPriority, consumer.getConsumerInfo().getPriority());
}
}
@Override
public void configureBroker(BrokerService brokerService) {
brokerService.setBrokerId(brokerService.getBrokerName());
}
@Override
public void setUp() throws Exception {
super.setAutoFail(true);

View File

@ -18,14 +18,16 @@ package org.apache.activemq.usecases;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.management.ObjectName;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
@ -39,26 +41,24 @@ import org.slf4j.LoggerFactory;
public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler {
public static final int BROKER_COUNT = 3;
public static final int CONSUMER_COUNT = 1;
public static final int CONSUMER_COUNT = 5;
public static final int MESSAGE_COUNT = 0;
public static final boolean DUPLEX = false;
public static final boolean CONDUIT = true;
// NETWORK_TTL=4 is problematic for consumer/demand propagation
// needs setConsumerTTL=1 to override
public static final int NETWORK_TTL = 4;
public static final int NETWORK_TTL = 6;
private static final Logger LOG = LoggerFactory.getLogger(VerifyNetworkConsumersDisconnectTest.class);
public static final int TIMEOUT = 30000;
protected Map<String, MessageConsumer> consumerMap;
Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
Map<Thread, Throwable> unhandledExceptions = new HashMap<Thread, Throwable>();
private void assertNoUnhandeledExceptions() {
for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) {
private void assertNoUnhandledExceptions() {
for( Entry<Thread, Throwable> e: unhandledExceptions.entrySet()) {
LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue());
}
assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions,
unhandeledExceptions.isEmpty());
assertTrue("There are no unhandled exceptions, see: log for detail on: " + unhandledExceptions,
unhandledExceptions.isEmpty());
}
public NetworkConnector bridge(String from, String to) throws Exception {
@ -66,14 +66,17 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
networkConnector.setSuppressDuplicateQueueSubscriptions(true);
networkConnector.setDecreaseNetworkConsumerPriority(true);
networkConnector.setDuplex(DUPLEX);
// infinite ttl for messages in a mesh
networkConnector.setMessageTTL(-1);
// one hop for consumers in a mesh
networkConnector.setConsumerTTL(1);
return networkConnector;
}
public void testQueueAllConnected() throws Exception {
/*why conduit proxy proxy consumers gets us in a knot w.r.t removal
DC-7 for CA-9, add DB-15, remove CA-9, add CB-8
CB-8 add DC-7
CB-8 - why not dead?
CB-8 for BA-6, add BD-15, remove BA-6
BD-15 for DA-11, add DC-7
*/
public void testConsumerOnEachBroker() throws Exception {
bridge("Broker0", "Broker1");
if (!DUPLEX) bridge("Broker1", "Broker0");
@ -81,7 +84,10 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
if (!DUPLEX) bridge("Broker2", "Broker1");
startAllBrokers();
this.waitForBridgeFormation();
waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1);
Destination dest = createDestination("TEST.FOO", false);
@ -90,14 +96,14 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, dest));
}
assertExactConsumersConnect("Broker0", dest, 2, TIMEOUT);
assertExactConsumersConnect("Broker2", dest, 2, TIMEOUT);
assertExactConsumersConnect("Broker0", 3, 1, TIMEOUT);
assertExactConsumersConnect("Broker2", 3, 1, TIMEOUT);
// piggy in the middle
assertExactConsumersConnect("Broker1", dest, 3, TIMEOUT);
assertExactConsumersConnect("Broker1", 3, 1, TIMEOUT);
assertNoUnhandeledExceptions();
assertNoUnhandledExceptions();
LOG.info("Complate the mesh - 0->2");
LOG.info("Complete the mesh - 0->2");
// shorter route
NetworkConnector nc = bridge("Broker0", "Broker2");
@ -106,7 +112,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
if (!DUPLEX) {
LOG.info("... complate the mesh - 2->0");
LOG.info("... complete the mesh - 2->0");
nc = bridge("Broker2", "Broker0");
nc.setBrokerName("Broker2");
nc.start();
@ -114,7 +120,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
// wait for consumers to get propagated
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, dest, 3, TIMEOUT);
assertExactConsumersConnect("Broker" + i, 3, 1, TIMEOUT);
}
// reverse order close
@ -126,24 +132,98 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
LOG.info("Check for no consumers..");
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, dest, 0, TIMEOUT);
assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT);
}
}
protected void assertExactConsumersConnect(final String brokerName, Destination destination, final int count, long timeout) throws Exception {
public void testXConsumerOnEachBroker() throws Exception {
bridge("Broker0", "Broker1");
if (!DUPLEX) bridge("Broker1", "Broker0");
bridge("Broker1", "Broker2");
if (!DUPLEX) bridge("Broker2", "Broker1");
startAllBrokers();
waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1);
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
for (int i = 0; i < BROKER_COUNT; i++) {
for (int j=0; j< CONSUMER_COUNT; j++)
consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
}
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, CONSUMER_COUNT + (BROKER_COUNT -1), 1, TIMEOUT);
}
assertNoUnhandledExceptions();
LOG.info("Complete the mesh - 0->2");
// shorter route
NetworkConnector nc = bridge("Broker0", "Broker2");
nc.setBrokerName("Broker0");
nc.start();
waitForBridgeFormation(brokers.get("Broker0").broker, 1, 1);
if (!DUPLEX) {
LOG.info("... complete the mesh - 2->0");
nc = bridge("Broker2", "Broker0");
nc.setBrokerName("Broker2");
nc.start();
}
waitForBridgeFormation(brokers.get("Broker2").broker, 1, 1);
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, CONSUMER_COUNT + (BROKER_COUNT -1), 1, TIMEOUT);
}
// reverse order close
for (int i=0; i<CONSUMER_COUNT; i++) {
consumerMap.get("Consumer:" + 2 + ":" + i).close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 1 + ":" + i).close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 0 + ":" + i).close();
}
LOG.info("Check for no consumers..");
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT);
}
}
protected void assertExactConsumersConnect(final String brokerName, final int count, final int numChecks, long timeout) throws Exception {
final ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
assertTrue("Excepected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
final AtomicInteger stability = new AtomicInteger(0);
assertTrue("Expected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
try {
QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
long currentCount = queueViewMBean.getConsumerCount();
LOG.info("On " + brokerName + " current consumer count for " + queueViewMBean + ", " + currentCount);
if (count != currentCount) {
LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
LinkedList<String> consumerIds = new LinkedList<String>();
for (ObjectName objectName : queueViewMBean.getSubscriptions()) {
consumerIds.add(objectName.getKeyProperty("consumerId"));
}
return currentCount == count;
LOG.info("Sub IDs: " + consumerIds);
if (currentCount == count) {
stability.incrementAndGet();
} else {
stability.set(0);
}
return stability.get() > numChecks;
} catch (Exception e) {
LOG.warn(": ", e);
return false;
@ -156,7 +236,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
super.setAutoFail(true);
super.setUp();
unhandeledExceptions.clear();
unhandledExceptions.clear();
Thread.setDefaultUncaughtExceptionHandler(this);
// Setup n brokers
@ -177,8 +257,8 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
}
public void uncaughtException(Thread t, Throwable e) {
synchronized(unhandeledExceptions) {
unhandeledExceptions.put(t,e);
synchronized(unhandledExceptions) {
unhandledExceptions.put(t, e);
}
}
}