Durable sync over a network bridge will now also sync non-durable
subscriptions proplrly if the consumer belongs to a destination that is
configured to force network durable subscriptions.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-10-20 13:46:27 -04:00
parent 52ab6ba09b
commit d206621a73
6 changed files with 344 additions and 93 deletions

View File

@ -893,6 +893,10 @@ public class AdvisoryBroker extends BrokerFilter {
return destinations;
}
public ConcurrentMap<ConsumerInfo, VirtualDestination> getVirtualDestinationConsumers() {
return virtualDestinationConsumers;
}
private class VirtualConsumerPair {
private final VirtualDestination virtualDestination;

View File

@ -20,14 +20,15 @@ import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -39,10 +40,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.XAResource;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
@ -103,7 +106,7 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
import org.apache.activemq.util.NetworkBridgeUtils;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -1378,23 +1381,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.pendingStop = pendingStop;
}
public static BrokerSubscriptionInfo getBrokerSubscriptionInfo(final BrokerService brokerService) {
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
List<ConsumerInfo> subscriptionInfos = new ArrayList<>();
for (SubscriptionKey key : topicRegion.getDurableSubscriptions().keySet()) {
DurableTopicSubscription sub = topicRegion.getDurableSubscriptions().get(key);
if (sub != null) {
ConsumerInfo ci = sub.getConsumerInfo().copy();
ci.setClientId(key.getClientId());
subscriptionInfos.add(ci);
}
}
BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(brokerService.getBrokerName());
bsi.setSubscriptionInfos(subscriptionInfos.toArray(new ConsumerInfo[0]));
return bsi;
}
private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {
Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
Map<String, String> props = createMap(properties);
@ -1412,7 +1398,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService()));
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
}
} catch (Exception e) {
LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
@ -1425,9 +1411,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
config.setBrokerName(broker.getBrokerName());
if (config.isSyncDurableSubs() && protocolVersion.get() >= 12) {
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService()));
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
}
// check for existing duplex connection hanging about

View File

@ -99,6 +99,7 @@ import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.NetworkBridgeUtils;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
@ -575,7 +576,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
remoteBroker.oneway(brokerInfo);
if (configuration.isSyncDurableSubs() &&
remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
remoteBroker.oneway(TransportConnection.getBrokerSubscriptionInfo(brokerService));
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
configuration));
}
}
if (remoteConnectionInfo != null) {
@ -656,8 +658,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (started.get()) {
if (subInfo.getSubscriptionInfos() != null) {
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
matchesDynamicallyIncludedDestinations(info.getDestination())) {
//re-add any process any non-NC consumers that match the
//dynamicallyIncludedDestinations list
if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
serviceRemoteConsumerAdvisory(info);
}
}
@ -666,7 +670,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
//After re-added, clean up any empty durables
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
if (matchesDynamicallyIncludedDestinations(ds.getLocalInfo().getDestination())) {
if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
cleanupDurableSub(ds, i);
}
}
@ -907,7 +911,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
Iterator<DemandSubscription> i) throws IOException {
if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
&& ds.getForcedDurableConsumersSize() == 0) {
// deactivate subscriber
RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
localBroker.oneway(removeInfo);
@ -1245,33 +1248,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return true;
}
private boolean matchesDynamicallyIncludedDestinations(ActiveMQDestination destination) {
ActiveMQDestination[] dests = dynamicallyIncludedDestinations;
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}
}
return false;
}
protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return dest;
}
}
}
return null;
}
/**
* Subscriptions for these destinations are always created
*/

View File

@ -17,9 +17,7 @@
package org.apache.activemq.network;
import java.io.IOException;
import java.util.Map;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
@ -29,7 +27,7 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.TypeConversionSupport;
import org.apache.activemq.util.NetworkBridgeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -120,7 +118,8 @@ public class DurableConduitBridge extends ConduitBridge {
@Override
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
boolean isForcedDurable = isForcedDurable(info);
boolean isForcedDurable = NetworkBridgeUtils.isForcedDurable(info,
dynamicallyIncludedDestinations, staticallyIncludedDestinations);
if (addToAlreadyInterestedConsumers(info, isForcedDurable)) {
return null; // don't want this subscription added
@ -146,40 +145,6 @@ public class DurableConduitBridge extends ConduitBridge {
return demandSubscription;
}
private boolean isForcedDurable(ConsumerInfo info) {
if (info.isDurable()) {
return false;
}
ActiveMQDestination destination = info.getDestination();
if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
destination.isQueue()) {
return false;
}
ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
if (matching != null) {
return isDestForcedDurable(matching);
}
matching = findMatchingDestination(staticallyIncludedDestinations, destination);
if (matching != null) {
return isDestForcedDurable(matching);
}
return false;
}
private boolean isDestForcedDurable(ActiveMQDestination destination) {
final Map<String, String> options = destination.getOptions();
boolean isForceDurable = false;
if (options != null) {
isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
}
return isForceDurable;
}
protected String getSubscriberName(ActiveMQDestination dest) {
String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName();
return subscriberName;

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerSubscriptionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NetworkBridgeUtils {
private static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeUtils.class);
/**
* Generate the BrokerSubscriptionInfo which is used to tell the broker on the other
* side of the network bridge which NC durable subscriptions are still needed for demand.
* @param brokerService
* @param config
* @return
*/
public static BrokerSubscriptionInfo getBrokerSubscriptionInfo(final BrokerService brokerService,
final NetworkBridgeConfiguration config) {
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
Set<ConsumerInfo> subscriptionInfos = new HashSet<>();
//Add all durable subscriptions to the set that match the network config
//which currently is just the dynamicallyIncludedDestinations list
for (SubscriptionKey key : topicRegion.getDurableSubscriptions().keySet()) {
DurableTopicSubscription sub = topicRegion.getDurableSubscriptions().get(key);
if (sub != null && NetworkBridgeUtils.matchesNetworkConfig(config, sub.getConsumerInfo().getDestination())) {
ConsumerInfo ci = sub.getConsumerInfo().copy();
ci.setClientId(key.getClientId());
subscriptionInfos.add(ci);
}
}
//We also need to iterate over all normal subscriptions and check if they are part of
//any dynamicallyIncludedDestination that is configured with forceDurable to be true
//over the network bridge. If forceDurable is true then we want to add the consumer to the set
for (Subscription sub : topicRegion.getSubscriptions().values()) {
if (sub != null && NetworkBridgeUtils.isForcedDurable(sub.getConsumerInfo(),
config.getDynamicallyIncludedDestinations())) {
subscriptionInfos.add(sub.getConsumerInfo().copy());
}
}
try {
//Lastly, if isUseVirtualDestSubs is configured on this broker (to fire advisories) and
//configured on the network connector (to listen to advisories) then also add any virtual
//dest subscription to the set if forceDurable is true for its destination
AdvisoryBroker ab = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
if (ab != null && brokerService.isUseVirtualDestSubs() && config.isUseVirtualDestSubs()) {
for (ConsumerInfo info : ab.getVirtualDestinationConsumers().keySet()) {
if (NetworkBridgeUtils.isForcedDurable(info, config.getDynamicallyIncludedDestinations())) {
subscriptionInfos.add(info.copy());
}
}
}
} catch (Exception e) {
LOG.warn("Error processing virtualDestinationSubs for BrokerSubscriptionInfo");
LOG.debug("Error processing virtualDestinationSubs for BrokerSubscriptionInfo", e);
}
BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(brokerService.getBrokerName());
bsi.setSubscriptionInfos(subscriptionInfos.toArray(new ConsumerInfo[0]));
return bsi;
}
public static boolean isForcedDurable(final ConsumerInfo info,
final List<ActiveMQDestination> dynamicallyIncludedDestinations) {
return dynamicallyIncludedDestinations != null
? isForcedDurable(info,
dynamicallyIncludedDestinations.toArray(new ActiveMQDestination[0]), null) : false;
}
public static boolean isForcedDurable(final ConsumerInfo info,
final ActiveMQDestination[] dynamicallyIncludedDestinations,
final ActiveMQDestination[] staticallyIncludedDestinations) {
if (info.isDurable() || info.getDestination().isQueue()) {
return false;
}
ActiveMQDestination destination = info.getDestination();
if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
destination.isQueue()) {
return false;
}
ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
if (matching != null) {
return isDestForcedDurable(matching);
}
matching = findMatchingDestination(staticallyIncludedDestinations, destination);
if (matching != null) {
return isDestForcedDurable(matching);
}
return false;
}
public static boolean matchesNetworkConfig(final NetworkBridgeConfiguration config,
ActiveMQDestination destination) {
List<ActiveMQDestination> includedDests = config.getDynamicallyIncludedDestinations();
if (includedDests != null && includedDests.size() > 0) {
for (ActiveMQDestination dest : includedDests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}
}
return false;
}
public static boolean matchesDestinations(ActiveMQDestination[] dests, final ActiveMQDestination destination) {
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}
}
return false;
}
public static ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return dest;
}
}
}
return null;
}
public static boolean isDestForcedDurable(final ActiveMQDestination destination) {
boolean isForceDurable = false;
if (destination != null) {
final Map<String, String> options = destination.getOptions();
if (options != null) {
isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
}
}
return isForceDurable;
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.network;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.net.URI;
import java.util.Arrays;
@ -23,17 +26,25 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.util.Wait;
@ -57,11 +68,13 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class);
protected JavaRuntimeConfigurationBroker remoteRuntimeBroker;
protected String staticIncludeTopics = "include.static.test";
protected String includedTopics = "include.test.>";
protected String testTopicName2 = "include.test.bar2";
private boolean dynamicOnly = false;
private boolean forceDurable = false;
private boolean useVirtualDestSubs = false;
private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
public static enum FLOW {FORWARD, REVERSE};
@ -107,6 +120,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
staticIncludeTopics = "include.static.test";
dynamicOnly = false;
forceDurable = false;
useVirtualDestSubs = false;
remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder());
}
@ -521,6 +535,116 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
}
@Test(timeout = 60 * 1000)
public void testVirtualDestSubForceDurableSync() throws Exception {
Assume.assumeTrue(flow == FLOW.FORWARD);
forceDurable = true;
useVirtualDestSubs = true;
this.restartBrokers(true);
//configure a virtual destination that forwards messages from topic testQueueName
CompositeTopic compositeTopic = createCompositeTopic(testTopicName,
new ActiveMQQueue("include.test.bar.bridge"));
remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true);
MessageProducer includedProducer = localSession.createProducer(included);
Message test = localSession.createTextMessage("test");
final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics();
final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination(
new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
//Make sure that the NC durable is created because of the compositeTopic
waitForConsumerCount(destinationStatistics, 1);
assertNCDurableSubsCount(localBroker, included, 1);
//Send message and make sure it is dispatched across the bridge
includedProducer.send(test);
waitForDispatchFromLocalBroker(destinationStatistics, 1);
assertLocalBrokerStatistics(destinationStatistics, 1);
assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount());
//Stop the remote broker so the bridge stops and then send 500 messages so
//the messages build up on the NC durable
this.stopRemoteBroker();
for (int i = 0; i < 500; i++) {
includedProducer.send(test);
}
this.stopLocalBroker();
//Restart the brokers
this.restartRemoteBroker();
remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true);
this.restartLocalBroker(true);
//We now need to verify that 501 messages made it to the queue on the remote side
//which means that the NC durable was not deleted and recreated during the sync
final DestinationStatistics remoteDestStatistics2 = remoteBroker.getDestination(
new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return remoteDestStatistics2.getMessages().getCount() == 501;
}
}));
}
@Test(timeout = 60 * 1000)
public void testForceDurableTopicSubSync() throws Exception {
Assume.assumeTrue(flow == FLOW.FORWARD);
forceDurable = true;
this.restartBrokers(true);
//configure a virtual destination that forwards messages from topic testQueueName
remoteSession.createConsumer(included);
MessageProducer includedProducer = localSession.createProducer(included);
Message test = localSession.createTextMessage("test");
final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics();
//Make sure that the NC durable is created because of the compositeTopic
waitForConsumerCount(destinationStatistics, 1);
assertNCDurableSubsCount(localBroker, included, 1);
//Send message and make sure it is dispatched across the bridge
includedProducer.send(test);
waitForDispatchFromLocalBroker(destinationStatistics, 1);
assertLocalBrokerStatistics(destinationStatistics, 1);
//Stop the network connector and send messages to the local broker so they build
//up on the durable
this.localBroker.getNetworkConnectorByName("networkConnector").stop();
for (int i = 0; i < 500; i++) {
includedProducer.send(test);
}
//restart the local broker and bridge
this.stopLocalBroker();
this.restartLocalBroker(true);
//We now need to verify that the 500 messages on the NC durable are dispatched
//on bridge sync which shows that the durable wasn't destroyed/recreated
final DestinationStatistics destinationStatistics2 =
localBroker.getDestination(included).getDestinationStatistics();
waitForDispatchFromLocalBroker(destinationStatistics2, 500);
assertLocalBrokerStatistics(destinationStatistics2, 500);
}
protected CompositeTopic createCompositeTopic(String name, ActiveMQDestination...forwardTo) {
CompositeTopic compositeTopic = new CompositeTopic();
compositeTopic.setName(name);
compositeTopic.setForwardOnly(true);
compositeTopic.setForwardTo( Lists.newArrayList(forwardTo));
return compositeTopic;
}
protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception {
if (broker.getBrokerName().equals("localBroker")) {
restartLocalBroker(startNetworkConnector);
@ -607,12 +731,15 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
if (flow.equals(FLOW.FORWARD)) {
broker2 = remoteBroker;
session2 = remoteSession;
remoteRuntimeBroker = (JavaRuntimeConfigurationBroker)
remoteBroker.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
} else {
broker1 = remoteBroker;
session1 = remoteSession;
}
}
protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setMonitorConnectionSplits(true);
@ -622,6 +749,8 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
adapter.setDirectory(dataDir);
adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
brokerService.setPersistenceAdapter(adapter);
brokerService.setUseVirtualDestSubs(useVirtualDestSubs);
brokerService.setUseVirtualDestSubsOnCreation(useVirtualDestSubs);
if (startNetworkConnector) {
brokerService.addNetworkConnector(configureLocalNetworkConnector());
@ -645,10 +774,11 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
connector.setDuplex(true);
connector.setStaticBridge(false);
connector.setSyncDurableSubs(true);
connector.setUseVirtualDestSubs(useVirtualDestSubs);
connector.setStaticallyIncludedDestinations(
Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(staticIncludeTopics + "?forceDurable=" + forceDurable)));
connector.setDynamicallyIncludedDestinations(
Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(includedTopics)));
Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(includedTopics + "?forceDurable=" + forceDurable)));
connector.setExcludedDestinations(
Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(excludeTopicName)));
return connector;
@ -665,6 +795,12 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
adapter.setDirectory(dataDir);
adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
brokerService.setPersistenceAdapter(adapter);
brokerService.setUseVirtualDestSubs(useVirtualDestSubs);
brokerService.setUseVirtualDestSubsOnCreation(useVirtualDestSubs);
if (useVirtualDestSubs) {
brokerService.setPlugins(new BrokerPlugin[] {new JavaRuntimeConfigurationPlugin()});
}
remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);