mirror of https://github.com/apache/activemq.git
Fixing an issue with syncDurableSubs that cause a bridge failure when adding multiple bridges between the same brokers
This commit is contained in:
parent
27238b2dd7
commit
25703fbd1f
|
@ -18,6 +18,7 @@ package org.apache.activemq.network;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.region.DurableTopicSubscription;
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
import org.apache.activemq.broker.region.Subscription;
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
import org.apache.activemq.broker.region.TopicRegion;
|
import org.apache.activemq.broker.region.TopicRegion;
|
||||||
|
@ -95,14 +96,19 @@ public class DurableConduitBridge extends ConduitBridge {
|
||||||
String candidateSubName = getSubscriberName(dest);
|
String candidateSubName = getSubscriberName(dest);
|
||||||
for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
|
for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
|
||||||
String subName = subscription.getConsumerInfo().getSubscriptionName();
|
String subName = subscription.getConsumerInfo().getSubscriptionName();
|
||||||
if (subName != null && subName.equals(candidateSubName)) {
|
if (subName != null && subName.equals(candidateSubName) &&
|
||||||
|
subscription instanceof DurableTopicSubscription) {
|
||||||
try {
|
try {
|
||||||
// remove the NC subscription as it is no longer for a permissable dest
|
DurableTopicSubscription durableSub = (DurableTopicSubscription) subscription;
|
||||||
|
//check the clientId so we only remove subs for the matching bridge
|
||||||
|
if (durableSub.getSubscriptionKey().getClientId().equals(localClientId)) {
|
||||||
|
// remove the NC subscription as it is no longer for a permissible dest
|
||||||
RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
|
RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
|
||||||
sending.setClientId(localClientId);
|
sending.setClientId(localClientId);
|
||||||
sending.setSubscriptionName(subName);
|
sending.setSubscriptionName(subName);
|
||||||
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
|
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
|
||||||
localBroker.oneway(sending);
|
localBroker.oneway(sending);
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.debug("Exception removing NC durable subscription: {}", subName, e);
|
LOG.debug("Exception removing NC durable subscription: {}", subName, e);
|
||||||
serviceRemoteException(e);
|
serviceRemoteException(e);
|
||||||
|
|
|
@ -76,7 +76,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
|
||||||
private boolean forceDurable = false;
|
private boolean forceDurable = false;
|
||||||
private boolean useVirtualDestSubs = false;
|
private boolean useVirtualDestSubs = false;
|
||||||
private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
|
private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
|
||||||
public static enum FLOW {FORWARD, REVERSE};
|
public static enum FLOW {FORWARD, REVERSE}
|
||||||
|
|
||||||
private BrokerService broker1;
|
private BrokerService broker1;
|
||||||
private BrokerService broker2;
|
private BrokerService broker2;
|
||||||
|
@ -535,6 +535,59 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Test that durable sync works with more than one bridge
|
||||||
|
@Test
|
||||||
|
public void testAddOnlineSubscriptionsTwoBridges() throws Exception {
|
||||||
|
|
||||||
|
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
|
||||||
|
final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
|
||||||
|
final ActiveMQTopic topic2 = new ActiveMQTopic("include.new.topic");
|
||||||
|
|
||||||
|
assertSubscriptionsCount(broker1, topic, 0);
|
||||||
|
assertNCDurableSubsCount(broker2, topic, 0);
|
||||||
|
|
||||||
|
//create durable that shouldn't be propagated
|
||||||
|
session1.createDurableSubscriber(excludeTopic, "sub-exclude");
|
||||||
|
|
||||||
|
//Add 3 online subs
|
||||||
|
session1.createDurableSubscriber(topic, subName);
|
||||||
|
session1.createDurableSubscriber(topic, "sub2");
|
||||||
|
session1.createDurableSubscriber(topic, "sub3");
|
||||||
|
//Add sub on second topic/bridge
|
||||||
|
session1.createDurableSubscriber(topic2, "secondTopicSubName");
|
||||||
|
assertSubscriptionsCount(broker1, topic, 3);
|
||||||
|
assertSubscriptionsCount(broker1, topic2, 1);
|
||||||
|
|
||||||
|
//Add the second network connector
|
||||||
|
NetworkConnector secondConnector = configureLocalNetworkConnector();
|
||||||
|
secondConnector.setName("networkConnector2");
|
||||||
|
secondConnector.setDynamicallyIncludedDestinations(
|
||||||
|
Lists.<ActiveMQDestination>newArrayList(
|
||||||
|
new ActiveMQTopic("include.new.topic?forceDurable=" + forceDurable)));
|
||||||
|
localBroker.addNetworkConnector(secondConnector);
|
||||||
|
secondConnector.start();
|
||||||
|
|
||||||
|
//Make sure both bridges are connected
|
||||||
|
assertTrue(Wait.waitFor(new Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
|
||||||
|
localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1;
|
||||||
|
}
|
||||||
|
}, 10000, 500));
|
||||||
|
|
||||||
|
//Make sure NC durables exist for both bridges
|
||||||
|
assertNCDurableSubsCount(broker2, topic2, 1);
|
||||||
|
assertNCDurableSubsCount(broker2, topic, 1);
|
||||||
|
assertNCDurableSubsCount(broker2, excludeTopic, 0);
|
||||||
|
|
||||||
|
//Make sure message can reach remote broker
|
||||||
|
MessageProducer producer = session2.createProducer(topic2);
|
||||||
|
producer.send(session2.createTextMessage("test"));
|
||||||
|
waitForDispatchFromLocalBroker(broker2.getDestination(topic2).getDestinationStatistics(), 1);
|
||||||
|
assertLocalBrokerStatistics(broker2.getDestination(topic2).getDestinationStatistics(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
public void testVirtualDestSubForceDurableSync() throws Exception {
|
public void testVirtualDestSubForceDurableSync() throws Exception {
|
||||||
Assume.assumeTrue(flow == FLOW.FORWARD);
|
Assume.assumeTrue(flow == FLOW.FORWARD);
|
||||||
|
|
Loading…
Reference in New Issue