https://issues.apache.org/jira/browse/AMQ-6373

Adding a new flag that can be specified on a network bridge to allow
forcing of subscriptions to be durable.  Cleaned up some unit tests.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-07-30 10:58:29 -04:00
parent 71bb54f94c
commit e73ab34837
10 changed files with 614 additions and 91 deletions

View File

@ -103,6 +103,7 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@ -46,7 +46,7 @@ public class ConduitBridge extends DemandForwardingBridge {
@Override @Override
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
if (addToAlreadyInterestedConsumers(info)) { if (addToAlreadyInterestedConsumers(info, false)) {
return null; // don't want this subscription added return null; // don't want this subscription added
} }
//add our original id to ourselves //add our original id to ourselves
@ -55,7 +55,7 @@ public class ConduitBridge extends DemandForwardingBridge {
return doCreateDemandSubscription(info); return doCreateDemandSubscription(info);
} }
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) { protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, boolean isForcedDurable) {
// search through existing subscriptions and see if we have a match // search through existing subscriptions and see if we have a match
if (info.isNetworkSubscription()) { if (info.isNetworkSubscription()) {
return false; return false;
@ -71,6 +71,10 @@ public class ConduitBridge extends DemandForwardingBridge {
// add the interest in the subscription // add the interest in the subscription
if (!info.isDurable()) { if (!info.isDurable()) {
ds.add(info.getConsumerId()); ds.add(info.getConsumerId());
if (isForcedDurable) {
forcedDurableRemoteId.add(info.getConsumerId());
ds.addForcedDurableConsumer(info.getConsumerId());
}
} else { } else {
ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
} }

View File

@ -21,9 +21,11 @@ import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -99,6 +101,7 @@ import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -131,6 +134,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected ActiveMQDestination[] durableDestinations; protected ActiveMQDestination[] durableDestinations;
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>(); protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>(); protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
protected final CountDownLatch startedLatch = new CountDownLatch(2); protected final CountDownLatch startedLatch = new CountDownLatch(2);
protected final CountDownLatch localStartedLatch = new CountDownLatch(1); protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
@ -549,6 +553,21 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// set our properties // set our properties
Properties props = new Properties(); Properties props = new Properties();
IntrospectionSupport.getProperties(configuration, props, null); IntrospectionSupport.getProperties(configuration, props, null);
String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations";
String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations";
if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) {
props.put(dynamicallyIncludedDestinationsKey,
StringToListOfActiveMQDestinationConverter.
convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true));
}
if (!configuration.getStaticallyIncludedDestinations().isEmpty()) {
props.put(staticallyIncludedDestinationsKey,
StringToListOfActiveMQDestinationConverter.
convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true));
}
props.remove("networkTTL"); props.remove("networkTTL");
String str = MarshallingSupport.propertiesToString(props); String str = MarshallingSupport.propertiesToString(props);
brokerInfo.setNetworkProperties(str); brokerInfo.setNetworkProperties(str);
@ -858,6 +877,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else if (data.getClass() == RemoveInfo.class) { } else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id); removeDemandSubscription(id);
if (forcedDurableRemoteId.remove(id)) {
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
boolean removed = ds.removeForcedDurableConsumer(id);
if (removed) {
cleanupDurableSub(ds, i);
}
}
}
} else if (data.getClass() == RemoveSubscriptionInfo.class) { } else if (data.getClass() == RemoveSubscriptionInfo.class) {
RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
@ -873,7 +903,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private void cleanupDurableSub(final DemandSubscription ds, private void cleanupDurableSub(final DemandSubscription ds,
Iterator<DemandSubscription> i) throws IOException { Iterator<DemandSubscription> i) throws IOException {
if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()) { if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
&& ds.getForcedDurableConsumersSize() == 0) {
// deactivate subscriber // deactivate subscriber
RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId()); RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
@ -1196,7 +1227,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
dests = dynamicallyIncludedDestinations; dests = dynamicallyIncludedDestinations;
if (dests != null && dests.length > 0) { if (dests != null && dests.length > 0) {
return matchesDynamicallyIncludedDestinations(destination); for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}
return false;
} }
return true; return true;
@ -1216,6 +1254,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return false; return false;
} }
protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return dest;
}
}
}
return null;
}
/** /**
* Subscriptions for these destinations are always created * Subscriptions for these destinations are always created
*/ */

View File

@ -41,6 +41,7 @@ public class DemandSubscription {
private final AtomicInteger dispatched = new AtomicInteger(0); private final AtomicInteger dispatched = new AtomicInteger(0);
private final AtomicBoolean activeWaiter = new AtomicBoolean(); private final AtomicBoolean activeWaiter = new AtomicBoolean();
private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>(); private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>();
private SubscriptionInfo localDurableSubscriber; private SubscriptionInfo localDurableSubscriber;
private NetworkBridgeFilter networkBridgeFilter; private NetworkBridgeFilter networkBridgeFilter;
@ -106,6 +107,18 @@ public class DemandSubscription {
return remoteInfo; return remoteInfo;
} }
public boolean addForcedDurableConsumer(ConsumerId id) {
return forcedDurableConsumers.add(id);
}
public boolean removeForcedDurableConsumer(ConsumerId id) {
return forcedDurableConsumers.remove(id);
}
public int getForcedDurableConsumersSize() {
return forcedDurableConsumers.size();
}
public void waitForCompletion() { public void waitForCompletion() {
if (dispatched.get() > 0) { if (dispatched.get() > 0) {
LOG.debug("Waiting for completion for sub: {}, dispatched: {}", localInfo.getConsumerId(), this.dispatched.get()); LOG.debug("Waiting for completion for sub: {}, dispatched: {}", localInfo.getConsumerId(), this.dispatched.get());

View File

@ -17,7 +17,9 @@
package org.apache.activemq.network; package org.apache.activemq.network;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicRegion;
@ -26,6 +28,7 @@ import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.TypeConversionSupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -92,13 +95,16 @@ public class DurableConduitBridge extends ConduitBridge {
@Override @Override
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
if (addToAlreadyInterestedConsumers(info)) { boolean isForcedDurable = isForcedDurable(info);
if (addToAlreadyInterestedConsumers(info, isForcedDurable)) {
return null; // don't want this subscription added return null; // don't want this subscription added
} }
//add our original id to ourselves //add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId()); info.addNetworkConsumerId(info.getConsumerId());
ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null;
if (info.isDurable()) { if(info.isDurable() || isForcedDurable) {
// set the subscriber name to something reproducible // set the subscriber name to something reproducible
info.setSubscriptionName(getSubscriberName(info.getDestination())); info.setSubscriptionName(getSubscriberName(info.getDestination()));
// and override the consumerId with something unique so that it won't // and override the consumerId with something unique so that it won't
@ -107,7 +113,46 @@ public class DurableConduitBridge extends ConduitBridge {
consumerIdGenerator.getNextSequenceId())); consumerIdGenerator.getNextSequenceId()));
} }
info.setSelector(null); info.setSelector(null);
return doCreateDemandSubscription(info); DemandSubscription demandSubscription = doCreateDemandSubscription(info);
if (forcedDurableId != null) {
demandSubscription.addForcedDurableConsumer(forcedDurableId);
forcedDurableRemoteId.add(forcedDurableId);
}
return demandSubscription;
}
private boolean isForcedDurable(ConsumerInfo info) {
if (info.isDurable()) {
return false;
}
ActiveMQDestination destination = info.getDestination();
if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
destination.isQueue()) {
return false;
}
ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
if (matching != null) {
return isDestForcedDurable(matching);
}
matching = findMatchingDestination(staticallyIncludedDestinations, destination);
if (matching != null) {
return isDestForcedDurable(matching);
}
return false;
}
private boolean isDestForcedDurable(ActiveMQDestination destination) {
final Map<String, String> options = destination.getOptions();
boolean isForceDurable = false;
if (options != null) {
isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
}
return isForceDurable;
} }
protected String getSubscriberName(ActiveMQDestination dest) { protected String getSubscriberName(ActiveMQDestination dest) {

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.util; package org.apache.activemq.util;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -59,6 +61,10 @@ public class StringToListOfActiveMQDestinationConverter {
} }
public static String convertFromActiveMQDestination(Object value) { public static String convertFromActiveMQDestination(Object value) {
return convertFromActiveMQDestination(value, false);
}
public static String convertFromActiveMQDestination(Object value, boolean includeOptions) {
if (value == null) { if (value == null) {
return null; return null;
} }
@ -70,7 +76,17 @@ public class StringToListOfActiveMQDestinationConverter {
Object e = list.get(i); Object e = list.get(i);
if (e instanceof ActiveMQDestination) { if (e instanceof ActiveMQDestination) {
ActiveMQDestination destination = (ActiveMQDestination) e; ActiveMQDestination destination = (ActiveMQDestination) e;
sb.append(destination); if (includeOptions && destination.getOptions() != null) {
try {
//Reapply the options as URI parameters
sb.append(destination.toString() + URISupport.applyParameters(
new URI(""), destination.getOptions()));
} catch (URISyntaxException e1) {
sb.append(destination);
}
} else {
sb.append(destination);
}
if (i < list.size() - 1) { if (i < list.size() - 1) {
sb.append(", "); sb.append(", ");
} }

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.activemq.network; package org.apache.activemq.network;
import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.net.URI; import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
@ -30,12 +28,10 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisoryBroker; import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition; import org.apache.activemq.util.Wait.Condition;
import org.junit.After; import org.junit.After;
@ -98,12 +94,12 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
sub1.close(); sub1.close();
assertSubscriptionsCount(broker1, topic, 1); assertSubscriptionsCount(broker1, topic, 1);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName); removeSubscription(broker1, topic, subName);
assertSubscriptionsCount(broker1, topic, 0); assertSubscriptionsCount(broker1, topic, 0);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
} }
@ -114,17 +110,17 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
sub1.close(); sub1.close();
assertSubscriptionsCount(broker1, topic, 1); assertSubscriptionsCount(broker1, topic, 1);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
restartBrokers(true); restartBrokers(true);
assertSubscriptionsCount(broker1, topic, 1); assertSubscriptionsCount(broker1, topic, 1);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName); removeSubscription(broker1, topic, subName);
assertSubscriptionsCount(broker1, topic, 0); assertSubscriptionsCount(broker1, topic, 0);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
} }
@ -135,7 +131,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
sub1.close(); sub1.close();
assertSubscriptionsCount(broker1, topic, 1); assertSubscriptionsCount(broker1, topic, 1);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
doTearDown(); doTearDown();
restartBroker(broker1, false); restartBroker(broker1, false);
@ -146,9 +142,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
//Test that on successful reconnection of the bridge that //Test that on successful reconnection of the bridge that
//the NC sub will be removed //the NC sub will be removed
restartBroker(broker2, true); restartBroker(broker2, true);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
restartBroker(broker1, true); restartBroker(broker1, true);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
} }
@ -160,7 +156,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
sub1.close(); sub1.close();
assertSubscriptionsCount(broker1, topic, 1); assertSubscriptionsCount(broker1, topic, 1);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
doTearDown(); doTearDown();
restartBroker(broker1, false); restartBroker(broker1, false);
@ -176,13 +172,13 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
//before sync, the old NC should exist //before sync, the old NC should exist
restartBroker(broker2, true); restartBroker(broker2, true);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
assertNCSubscriptionsCount(broker2, topic2, 0); assertNCDurableSubsCount(broker2, topic2, 0);
//After sync, remove old NC and create one for topic 2 //After sync, remove old NC and create one for topic 2
restartBroker(broker1, true); restartBroker(broker1, true);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
assertNCSubscriptionsCount(broker2, topic2, 1); assertNCDurableSubsCount(broker2, topic2, 1);
} }
@ -193,7 +189,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName); final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
assertSubscriptionsCount(broker1, topic, 0); assertSubscriptionsCount(broker1, topic, 0);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
doTearDown(); doTearDown();
restartBroker(broker1, false); restartBroker(broker1, false);
@ -207,9 +203,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic2, 1); assertSubscriptionsCount(broker1, topic2, 1);
restartBrokers(true); restartBrokers(true);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
assertNCSubscriptionsCount(broker2, topic2, 1); assertNCDurableSubsCount(broker2, topic2, 1);
assertNCSubscriptionsCount(broker2, excludeTopic, 0); assertNCDurableSubsCount(broker2, excludeTopic, 0);
} }
@ -223,7 +219,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
assertSubscriptionsCount(broker1, topic, 0); assertSubscriptionsCount(broker1, topic, 0);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
doTearDown(); doTearDown();
restartBroker(broker1, false); restartBroker(broker1, false);
@ -235,7 +231,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
//Since we are using an old version of openwire, the NC should //Since we are using an old version of openwire, the NC should
//not be added //not be added
restartBrokers(true); restartBrokers(true);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
} }
@ -246,7 +242,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
assertSubscriptionsCount(broker1, topic, 0); assertSubscriptionsCount(broker1, topic, 0);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
doTearDown(); doTearDown();
restartBroker(broker1, false); restartBroker(broker1, false);
@ -256,7 +252,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic, 1); assertSubscriptionsCount(broker1, topic, 1);
restartBrokers(true); restartBrokers(true);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
} }
@Test @Test
@ -266,7 +262,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
assertSubscriptionsCount(broker1, topic, 0); assertSubscriptionsCount(broker1, topic, 0);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
doTearDown(); doTearDown();
restartBroker(broker1, false); restartBroker(broker1, false);
@ -276,10 +272,10 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic, 1); assertSubscriptionsCount(broker1, topic, 1);
restartBrokers(true); restartBrokers(true);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
//bring online again //bring online again
session1.createDurableSubscriber(topic, subName); session1.createDurableSubscriber(topic, subName);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
} }
@ -290,7 +286,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
session1.createDurableSubscriber(topic, subName).close(); session1.createDurableSubscriber(topic, subName).close();
assertSubscriptionsCount(broker1, topic, 1); assertSubscriptionsCount(broker1, topic, 1);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
doTearDown(); doTearDown();
restartBroker(broker1, false); restartBroker(broker1, false);
@ -301,8 +297,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic, 1); assertSubscriptionsCount(broker1, topic, 1);
restartBrokers(true); restartBrokers(true);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
assertNCSubscriptionsCount(broker2, excludeTopic, 0); assertNCDurableSubsCount(broker2, excludeTopic, 0);
} }
@ -314,7 +310,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName); final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
assertSubscriptionsCount(broker1, topic, 0); assertSubscriptionsCount(broker1, topic, 0);
assertNCSubscriptionsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
doTearDown(); doTearDown();
restartBrokers(false); restartBrokers(false);
@ -342,34 +338,11 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
session1.createDurableSubscriber(excludeTopic, "sub-exclude"); session1.createDurableSubscriber(excludeTopic, "sub-exclude");
Thread.sleep(1000); Thread.sleep(1000);
assertNCSubscriptionsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
assertNCSubscriptionsCount(broker2, excludeTopic, 0); assertNCDurableSubsCount(broker2, excludeTopic, 0);
} }
protected void removeSubscription(final BrokerService brokerService, final ActiveMQTopic topic,
final String subName) throws Exception {
final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);
info.setSubscriptionName(subName);
final ConnectionContext context = new ConnectionContext();
context.setBroker(brokerService.getBroker());
context.setClientId(clientId);
brokerService.getBroker().removeSubscription(context, info);
}
protected void assertSubscriptionsCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return count == getSubscriptions(brokerService, dest).size();
}
}, 10000, 500));
}
protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception { protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception {
if (broker.getBrokerName().equals("localBroker")) { if (broker.getBrokerName().equals("localBroker")) {
restartLocalBroker(startNetworkConnector); restartLocalBroker(startNetworkConnector);
@ -387,7 +360,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir, protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir,
File remoteDataDir) throws Exception { File remoteDataDir) throws Exception {
included = new ActiveMQTopic(testTopicName); included = new ActiveMQTopic(testTopicName);
doSetUpRemoteBroker(deleteAllMessages, remoteDataDir); doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);
doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir); doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir);
//Give time for advisories to propagate //Give time for advisories to propagate
Thread.sleep(1000); Thread.sleep(1000);
@ -399,8 +372,13 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
} }
protected void restartRemoteBroker() throws Exception { protected void restartRemoteBroker() throws Exception {
int port = 0;
if (remoteBroker != null) {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
port = transportConnectors.get(0).getConnectUri().getPort();
}
stopRemoteBroker(); stopRemoteBroker();
doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile()); doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile(), port);
} }
protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector,
@ -438,8 +416,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
} }
} }
protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir) throws Exception { protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception {
remoteBroker = createRemoteBroker(dataDir); remoteBroker = createRemoteBroker(dataDir, port);
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start(); remoteBroker.start();
remoteBroker.waitUntilStarted(); remoteBroker.waitUntilStarted();
@ -494,7 +472,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
protected AdvisoryBroker remoteAdvisoryBroker; protected AdvisoryBroker remoteAdvisoryBroker;
protected BrokerService createRemoteBroker(File dataDir) throws Exception { protected BrokerService createRemoteBroker(File dataDir, int port) throws Exception {
BrokerService brokerService = new BrokerService(); BrokerService brokerService = new BrokerService();
brokerService.setBrokerName("remoteBroker"); brokerService.setBrokerName("remoteBroker");
brokerService.setUseJmx(false); brokerService.setUseJmx(false);
@ -502,7 +480,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
brokerService.addConnector("tcp://localhost:0?wireFormat.version=" + remoteBrokerWireFormatVersion); brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.version=" + remoteBrokerWireFormatVersion);
return brokerService; return brokerService;
} }

View File

@ -30,8 +30,11 @@ import javax.jms.Session;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
@ -96,24 +99,24 @@ public abstract class DynamicNetworkTestSupport {
} }
protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception { protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception {
Wait.waitFor(new Wait.Condition() { assertTrue(Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
//should only be 1 for the composite destination creation //should only be 1 for the composite destination creation
return count == destinationStatistics.getConsumers().getCount(); return count == destinationStatistics.getConsumers().getCount();
} }
}); }));
} }
protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception { protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception {
Wait.waitFor(new Wait.Condition() { assertTrue(Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return count == destinationStatistics.getDequeues().getCount() && return count == destinationStatistics.getDequeues().getCount() &&
count == destinationStatistics.getDispatched().getCount() && count == destinationStatistics.getDispatched().getCount() &&
count == destinationStatistics.getForwards().getCount(); count == destinationStatistics.getForwards().getCount();
} }
}); }));
} }
protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) { protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) {
@ -126,16 +129,32 @@ public abstract class DynamicNetworkTestSupport {
MessageConsumer createConsumer() throws JMSException; MessageConsumer createConsumer() throws JMSException;
} }
protected void assertNCSubscriptionsCount(final BrokerService brokerService, protected void assertNCDurableSubsCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception { final ActiveMQTopic dest, final int count) throws Exception {
assertTrue(Wait.waitFor(new Condition() { assertTrue(Wait.waitFor(new Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return count == getNCSubscriptions(brokerService, dest).size(); return count == getNCDurableSubs(brokerService, dest).size();
} }
}, 10000, 500)); }, 10000, 500));
} }
protected void assertConsumersCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return count == getConsumers(brokerService, dest).size();
}
}, 10000, 500));
}
protected List<Subscription> getConsumers(final BrokerService brokerService,
final ActiveMQTopic dest) throws Exception {
Topic destination = (Topic) brokerService.getDestination(dest);
return destination.getConsumers();
}
protected List<DurableTopicSubscription> getSubscriptions(final BrokerService brokerService, protected List<DurableTopicSubscription> getSubscriptions(final BrokerService brokerService,
final ActiveMQTopic dest) throws Exception { final ActiveMQTopic dest) throws Exception {
List<DurableTopicSubscription> subs = new ArrayList<>(); List<DurableTopicSubscription> subs = new ArrayList<>();
@ -151,10 +170,17 @@ public abstract class DynamicNetworkTestSupport {
return subs; return subs;
} }
protected List<DurableTopicSubscription> getNCSubscriptions(final BrokerService brokerService, protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
final ActiveMQTopic dest) throws Exception { final ActiveMQTopic dest) throws Exception {
List<DurableTopicSubscription> subs = new ArrayList<>(); List<DurableTopicSubscription> subs = new ArrayList<>();
Topic destination = (Topic) brokerService.getDestination(dest); Destination d = brokerService.getDestination(dest);
Topic destination = null;
if (d instanceof DestinationFilter){
destination = ((DestinationFilter) d).getAdaptor(Topic.class);
} else {
destination = (Topic) d;
}
for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) { for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) { if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key); DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
@ -166,4 +192,28 @@ public abstract class DynamicNetworkTestSupport {
return subs; return subs;
} }
protected void removeSubscription(final BrokerService brokerService, final ActiveMQTopic topic,
final String subName) throws Exception {
final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);
info.setSubscriptionName(subName);
final ConnectionContext context = new ConnectionContext();
context.setBroker(brokerService.getBroker());
context.setClientId(clientId);
brokerService.getBroker().removeSubscription(context, info);
}
protected void assertSubscriptionsCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return count == getSubscriptions(brokerService, dest).size();
}
}, 10000, 500));
}
} }

View File

@ -0,0 +1,351 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.io.File;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
@RunWith(Parameterized.class)
public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(ForceDurableNetworkBridgeTest.class);
protected String testTopicName2 = "include.nonforced.bar";
protected String staticTopic = "include.static.bar";
protected String staticTopic2 = "include.static.nonforced.bar";
public static enum FLOW {FORWARD, REVERSE};
private BrokerService broker1;
private BrokerService broker2;
private Session session1;
private final FLOW flow;
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{FLOW.FORWARD},
{FLOW.REVERSE}
});
}
public ForceDurableNetworkBridgeTest(final FLOW flow) {
this.flow = flow;
}
@Before
public void setUp() throws Exception {
doSetUp(true, tempFolder.newFolder(), tempFolder.newFolder());
}
@After
public void tearDown() throws Exception {
doTearDown();
}
@Test
public void testForceDurableSubscriptionStatic() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(staticTopic);
assertNCDurableSubsCount(broker2, topic, 1);
assertConsumersCount(broker2, topic, 1);
//Static so consumers stick around
assertNCDurableSubsCount(broker2, topic, 1);
assertConsumersCount(broker2, topic, 1);
}
@Test
public void testConsumerNotForceDurableSubscriptionStatic() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(staticTopic2);
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 0);
}
@Test
public void testConsumerNotForceDurableSubscription() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName2);
MessageConsumer sub1 = session1.createConsumer(topic);
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 0);
sub1.close();
assertNCDurableSubsCount(broker2, topic, 0);
assertConsumersCount(broker2, topic, 0);
}
@Test
public void testConsumerNotForceDurableWithAnotherDurable() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName2);
TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
session1.createConsumer(topic);
//1 consumer because of conduit
//1 durable sub
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
//Remove the sub
durSub.close();
Thread.sleep(1000);
removeSubscription(broker1, topic, subName);
//The durable should be gone even though there is a consumer left
//since we are not forcing durable subs
assertNCDurableSubsCount(broker2, topic, 0);
//consumers count ends up being 0 here, even though there is a non-durable consumer left,
//because the durable sub is destroyed and it is a conduit subscription
//this is another good reason to want to enable forcing of durables
assertConsumersCount(broker2, topic, 0);
}
@Test
public void testForceDurableSubscription() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
MessageConsumer sub1 = session1.createConsumer(topic);
assertNCDurableSubsCount(broker2, topic, 1);
assertConsumersCount(broker2, topic, 1);
sub1.close();
assertNCDurableSubsCount(broker2, topic, 0);
assertConsumersCount(broker2, topic, 0);
}
@Test
public void testForceDurableMultiSubscriptions() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
MessageConsumer sub1 = session1.createConsumer(topic);
MessageConsumer sub2 = session1.createConsumer(topic);
MessageConsumer sub3 = session1.createConsumer(topic);
assertNCDurableSubsCount(broker2, topic, 1);
assertConsumersCount(broker2, topic, 1);
sub1.close();
sub2.close();
assertNCDurableSubsCount(broker2, topic, 1);
assertConsumersCount(broker2, topic, 1);
sub3.close();
assertNCDurableSubsCount(broker2, topic, 0);
assertConsumersCount(broker2, topic, 0);
}
@Test
public void testForceDurableSubWithDurableCreatedFirst() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
durSub.close();
assertNCDurableSubsCount(broker2, topic, 1);
MessageConsumer sub1 = session1.createConsumer(topic);
Thread.sleep(1000);
assertNCDurableSubsCount(broker2, topic, 1);
sub1.close();
Thread.sleep(1000);
assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
assertNCDurableSubsCount(broker2, topic, 0);
}
@Test
public void testForceDurableSubWithNonDurableCreatedFirst() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
MessageConsumer sub1 = session1.createConsumer(topic);
assertNCDurableSubsCount(broker2, topic, 1);
TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
durSub.close();
Thread.sleep(1000);
assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
Thread.sleep(1000);
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
sub1.close();
assertNCDurableSubsCount(broker2, topic, 0);
}
@Test
public void testDurableSticksAroundOnConsumerClose() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
//Create the normal consumer first
MessageConsumer sub1 = session1.createConsumer(topic);
assertNCDurableSubsCount(broker2, topic, 1);
TopicSubscriber durSub = session1.createDurableSubscriber(topic, subName);
durSub.close();
sub1.close();
Thread.sleep(1000);
//Both consumer and durable are closed but the durable should stick around
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
assertConsumersCount(broker2, topic, 0);
assertNCDurableSubsCount(broker2, topic, 0);
}
protected void restartBrokers() throws Exception {
doTearDown();
doSetUp(false, localBroker.getDataDirectoryFile(), remoteBroker.getDataDirectoryFile());
}
protected void doSetUp(boolean deleteAllMessages, File localDataDir,
File remoteDataDir) throws Exception {
included = new ActiveMQTopic(testTopicName);
doSetUpRemoteBroker(deleteAllMessages, remoteDataDir);
doSetUpLocalBroker(deleteAllMessages, localDataDir);
//Give time for advisories to propagate
Thread.sleep(1000);
}
protected void doSetUpLocalBroker(boolean deleteAllMessages, File dataDir) throws Exception {
localBroker = createLocalBroker(dataDir);
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start();
localBroker.waitUntilStarted();
URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true);
fac.setDispatchAsync(false);
localConnection = fac.createConnection();
localConnection.setClientID("clientId");
localConnection.start();
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
}
}, 10000, 500);
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (flow.equals(FLOW.FORWARD)) {
broker1 = localBroker;
session1 = localSession;
} else {
broker2 = localBroker;
}
}
protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir) throws Exception {
remoteBroker = createRemoteBroker(dataDir);
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start();
remoteBroker.waitUntilStarted();
URI remoteURI = remoteBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(remoteURI);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("clientId");
remoteConnection.start();
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (flow.equals(FLOW.FORWARD)) {
broker2 = remoteBroker;
} else {
broker1 = remoteBroker;
session1 = remoteSession;
}
}
protected BrokerService createLocalBroker(File dataDir) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setMonitorConnectionSplits(true);
brokerService.setDataDirectoryFile(dataDir);
brokerService.setBrokerName("localBroker");
brokerService.addNetworkConnector(configureLocalNetworkConnector());
brokerService.addConnector("tcp://localhost:0");
brokerService.setDestinations(new ActiveMQDestination[] {
new ActiveMQTopic(testTopicName),
new ActiveMQTopic(testTopicName2),
new ActiveMQTopic(excludeTopicName)});
return brokerService;
}
protected NetworkConnector configureLocalNetworkConnector() throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI = transportConnectors.get(0).getConnectUri();
String uri = "static:(" + remoteURI + ")";
NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setName("networkConnector");
connector.setDynamicOnly(false);
connector.setDecreaseNetworkConsumerPriority(false);
connector.setConduitSubscriptions(true);
connector.setDuplex(true);
connector.setStaticBridge(false);
connector.setStaticallyIncludedDestinations(Lists.<ActiveMQDestination>newArrayList(
new ActiveMQTopic(staticTopic + "?forceDurable=true"),
new ActiveMQTopic(staticTopic2)));
connector.setDynamicallyIncludedDestinations(
Lists.<ActiveMQDestination>newArrayList(
new ActiveMQTopic("include.test.>?forceDurable=true"),
new ActiveMQTopic(testTopicName2)));
connector.setExcludedDestinations(
Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(excludeTopicName)));
return connector;
}
protected BrokerService createRemoteBroker(File dataDir) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setBrokerName("remoteBroker");
brokerService.setUseJmx(false);
brokerService.setDataDirectoryFile(dataDir);
brokerService.addConnector("tcp://localhost:0");
brokerService.setDestinations(new ActiveMQDestination[] {
new ActiveMQTopic(testTopicName),
new ActiveMQTopic(testTopicName2),
new ActiveMQTopic(excludeTopicName)});
return brokerService;
}
}

View File

@ -25,6 +25,7 @@ import java.lang.reflect.Field;
import java.net.URI; import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -39,6 +40,7 @@ import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.virtual.CompositeQueue; import org.apache.activemq.broker.region.virtual.CompositeQueue;
@ -240,9 +242,18 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
*/ */
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)
public void testDynamicFlow() throws Exception { public void testDynamicFlow() throws Exception {
testDynamicFlow(false);
}
@Test(timeout = 60 * 1000)
public void testDynamicFlowForceDurable() throws Exception {
testDynamicFlow(true);
}
protected void testDynamicFlow(boolean forceDurable) throws Exception {
Assume.assumeTrue(isUseVirtualDestSubsOnCreation); Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
doSetUp(true, null); doSetUp(true, null, true, forceDurable);
MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName);
@ -262,6 +273,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
waitForConsumerCount(destinationStatistics, 1); waitForConsumerCount(destinationStatistics, 1);
assertNCDurableSubsCount(localBroker, included, forceDurable ? 1 : 0);
includedProducer.send(test); includedProducer.send(test);
waitForDispatchFromLocalBroker(destinationStatistics, 1); waitForDispatchFromLocalBroker(destinationStatistics, 1);
@ -272,7 +284,6 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
assertAdvisoryBrokerCounts(1,1,1); assertAdvisoryBrokerCounts(1,1,1);
} }
/** /**
* Test that dynamic flow works for virtual destinations when a second composite * Test that dynamic flow works for virtual destinations when a second composite
* topic is included that forwards to the same queue, but is excluded from * topic is included that forwards to the same queue, but is excluded from
@ -1006,7 +1017,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
CompositeTopic compositeTopic = createCompositeTopic(testTopicName, CompositeTopic compositeTopic = createCompositeTopic(testTopicName,
new ActiveMQQueue("include.test.bar.bridge")); new ActiveMQQueue("include.test.bar.bridge"));
doSetUp(true, new VirtualDestination[] {compositeTopic}, false); doSetUp(true, new VirtualDestination[] {compositeTopic}, false, false);
MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName);
@ -1034,7 +1045,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
CompositeTopic compositeTopic = createCompositeTopic(testTopicName, CompositeTopic compositeTopic = createCompositeTopic(testTopicName,
new ActiveMQQueue("include.test.bar.bridge")); new ActiveMQQueue("include.test.bar.bridge"));
doSetUp(true, new VirtualDestination[] {compositeTopic}, false); doSetUp(true, new VirtualDestination[] {compositeTopic}, false, false);
MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName);
@ -1291,16 +1302,16 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
protected void doSetUp(boolean deleteAllMessages, protected void doSetUp(boolean deleteAllMessages,
VirtualDestination[] remoteVirtualDests) throws Exception { VirtualDestination[] remoteVirtualDests) throws Exception {
doSetUp(deleteAllMessages, remoteVirtualDests, true); doSetUp(deleteAllMessages, remoteVirtualDests, true, false);
} }
protected void doSetUp(boolean deleteAllMessages, protected void doSetUp(boolean deleteAllMessages,
VirtualDestination[] remoteVirtualDests, boolean startNetworkConnector) throws Exception { VirtualDestination[] remoteVirtualDests, boolean startNetworkConnector, boolean forceDurable) throws Exception {
remoteBroker = createRemoteBroker(isUseVirtualDestSubsOnCreation, remoteVirtualDests); remoteBroker = createRemoteBroker(isUseVirtualDestSubsOnCreation, remoteVirtualDests);
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start(); remoteBroker.start();
remoteBroker.waitUntilStarted(); remoteBroker.waitUntilStarted();
localBroker = createLocalBroker(startNetworkConnector); localBroker = createLocalBroker(startNetworkConnector, forceDurable);
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start(); localBroker.start();
localBroker.waitUntilStarted(); localBroker.waitUntilStarted();
@ -1324,13 +1335,16 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
protected NetworkConnector connector; protected NetworkConnector connector;
protected BrokerService createLocalBroker(boolean startNetworkConnector) throws Exception { protected BrokerService createLocalBroker(boolean startNetworkConnector, boolean forceDurable) throws Exception {
BrokerService brokerService = new BrokerService(); BrokerService brokerService = new BrokerService();
brokerService.setMonitorConnectionSplits(true); brokerService.setMonitorConnectionSplits(true);
brokerService.setDataDirectoryFile(tempFolder.newFolder()); brokerService.setDataDirectoryFile(tempFolder.newFolder());
brokerService.setBrokerName("localBroker"); brokerService.setBrokerName("localBroker");
connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61617)")); List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI = transportConnectors.get(0).getConnectUri();
String uri = "static:(" + remoteURI + ")";
connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setName("networkConnector"); connector.setName("networkConnector");
connector.setDynamicOnly(false); connector.setDynamicOnly(false);
connector.setDecreaseNetworkConsumerPriority(false); connector.setDecreaseNetworkConsumerPriority(false);
@ -1338,7 +1352,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
connector.setDuplex(isDuplex); connector.setDuplex(isDuplex);
connector.setUseVirtualDestSubs(true); connector.setUseVirtualDestSubs(true);
connector.setDynamicallyIncludedDestinations(Lists.newArrayList(new ActiveMQQueue(testQueueName), connector.setDynamicallyIncludedDestinations(Lists.newArrayList(new ActiveMQQueue(testQueueName),
new ActiveMQTopic(testTopicName), new ActiveMQTopic("VirtualTopic.>"))); new ActiveMQTopic(testTopicName + (forceDurable ? "?forceDurable=true" : "")), new ActiveMQTopic("VirtualTopic.>")));
connector.setExcludedDestinations(Lists.newArrayList(new ActiveMQQueue("exclude.test.foo"), connector.setExcludedDestinations(Lists.newArrayList(new ActiveMQQueue("exclude.test.foo"),
new ActiveMQTopic("exclude.test.bar"))); new ActiveMQTopic("exclude.test.bar")));
@ -1346,7 +1360,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
brokerService.addNetworkConnector(connector); brokerService.addNetworkConnector(connector);
} }
brokerService.addConnector("tcp://localhost:61616"); brokerService.addConnector("tcp://localhost:0");
return brokerService; return brokerService;
} }
@ -1374,7 +1388,7 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
remoteAdvisoryBroker = (AdvisoryBroker) remoteAdvisoryBroker = (AdvisoryBroker)
brokerService.getBroker().getAdaptor(AdvisoryBroker.class); brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
brokerService.addConnector("tcp://localhost:61617"); brokerService.addConnector("tcp://localhost:0");
return brokerService; return brokerService;
} }