Fixing durable sync over a network bridge so that network subscriptions
that are no longer permissible are also cleaned up

(cherry picked from commit a038655605)
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-09-08 08:27:49 -04:00
parent 4dbe61dd56
commit 5956bdc1f5
4 changed files with 170 additions and 8 deletions

View File

@ -652,14 +652,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
this.brokerService.getBrokerName(), subInfo.getBrokerName()); this.brokerService.getBrokerName(), subInfo.getBrokerName());
if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions() if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
&& !configuration.isDynamicOnly() && subInfo.getSubscriptionInfos() != null) { && !configuration.isDynamicOnly()) {
if (started.get()) { if (started.get()) {
if (subInfo.getSubscriptionInfos() != null) {
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) { for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) && if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
matchesDynamicallyIncludedDestinations(info.getDestination())) { matchesDynamicallyIncludedDestinations(info.getDestination())) {
serviceRemoteConsumerAdvisory(info); serviceRemoteConsumerAdvisory(info);
} }
} }
}
//After re-added, clean up any empty durables //After re-added, clean up any empty durables
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {

View File

@ -26,6 +26,7 @@ import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.TypeConversionSupport; import org.apache.activemq.util.TypeConversionSupport;
@ -88,6 +89,30 @@ public class DurableConduitBridge extends ConduitBridge {
LOG.error("Failed to add static destination {}", dest, e); LOG.error("Failed to add static destination {}", dest, e);
} }
LOG.trace("Forwarding messages for durable destination: {}", dest); LOG.trace("Forwarding messages for durable destination: {}", dest);
} else if (configuration.isSyncDurableSubs() && !isPermissableDestination(dest)) {
if (dest.isTopic()) {
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
String candidateSubName = getSubscriberName(dest);
for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
String subName = subscription.getConsumerInfo().getSubscriptionName();
if (subName != null && subName.equals(candidateSubName)) {
try {
// remove the NC subscription as it is no longer for a permissable dest
RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
sending.setClientId(localClientId);
sending.setSubscriptionName(subName);
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
localBroker.oneway(sending);
} catch (IOException e) {
LOG.debug("Exception removing NC durable subscription: {}", subName, e);
serviceRemoteException(e);
}
break;
}
}
}
} }
} }
} }

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
@ -56,14 +57,18 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class); protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class);
protected String staticIncludeTopics = "include.static.test";
protected String includedTopics = "include.test.>";
protected String testTopicName2 = "include.test.bar2"; protected String testTopicName2 = "include.test.bar2";
private boolean dynamicOnly = false; private boolean dynamicOnly = false;
private boolean forceDurable = false;
private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION; private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
public static enum FLOW {FORWARD, REVERSE}; public static enum FLOW {FORWARD, REVERSE};
private BrokerService broker1; private BrokerService broker1;
private BrokerService broker2; private BrokerService broker2;
private Session session1; private Session session1;
private Session session2;
private final FLOW flow; private final FLOW flow;
@Rule @Rule
@ -98,7 +103,10 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
includedTopics = "include.test.>";
staticIncludeTopics = "include.static.test";
dynamicOnly = false; dynamicOnly = false;
forceDurable = false;
remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION; remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder()); doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder());
} }
@ -135,6 +143,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertNCDurableSubsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
restartBrokers(true); restartBrokers(true);
assertBridgeStarted();
assertSubscriptionsCount(broker1, topic, 1); assertSubscriptionsCount(broker1, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
@ -157,6 +166,43 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
doTearDown(); doTearDown();
restartBroker(broker1, false); restartBroker(broker1, false);
restartBroker(broker2, false);
//Send some messages to the NC sub and make sure it can still be deleted
MessageProducer prod = session2.createProducer(topic);
for (int i = 0; i < 10; i++) {
prod.send(session2.createTextMessage("test"));
}
assertSubscriptionsCount(broker1, topic, 1);
removeSubscription(broker1, topic, subName);
assertSubscriptionsCount(broker1, topic, 0);
doTearDown();
//Test that on successful reconnection of the bridge that
//the NC sub will be removed
restartBroker(broker2, true);
assertNCDurableSubsCount(broker2, topic, 1);
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
}
@Test
public void testRemoveSubscriptionWithBridgeOfflineIncludedChanged() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName);
sub1.close();
assertSubscriptionsCount(broker1, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
doTearDown();
//change the included topics to make sure we still cleanup non-matching NC durables
includedTopics = "different.topic";
restartBroker(broker1, false);
assertSubscriptionsCount(broker1, topic, 1); assertSubscriptionsCount(broker1, topic, 1);
removeSubscription(broker1, topic, subName); removeSubscription(broker1, topic, subName);
assertSubscriptionsCount(broker1, topic, 0); assertSubscriptionsCount(broker1, topic, 0);
@ -166,10 +212,76 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
restartBroker(broker2, true); restartBroker(broker2, true);
assertNCDurableSubsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
restartBroker(broker1, true); restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
} }
@Test
public void testSubscriptionRemovedAfterIncludedChanged() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName);
sub1.close();
assertSubscriptionsCount(broker1, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
doTearDown();
//change the included topics to make sure we still cleanup non-matching NC durables
includedTopics = "different.topic";
restartBroker(broker1, false);
assertSubscriptionsCount(broker1, topic, 1);
//Test that on successful reconnection of the bridge that
//the NC sub will be removed because even though the local subscription exists,
//it no longer matches the included filter
restartBroker(broker2, true);
assertNCDurableSubsCount(broker2, topic, 1);
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
assertSubscriptionsCount(broker1, topic, 1);
}
@Test
public void testSubscriptionRemovedAfterStaticChanged() throws Exception {
forceDurable = true;
this.restartBrokers(true);
final ActiveMQTopic topic = new ActiveMQTopic(this.staticIncludeTopics);
MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName);
sub1.close();
assertSubscriptionsCount(broker1, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
doTearDown();
//change the included topics to make sure we still cleanup non-matching NC durables
staticIncludeTopics = "different.topic";
this.restartBrokers(false);
assertSubscriptionsCount(broker1, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
//Send some messages to the NC sub and make sure it can still be deleted
MessageProducer prod = session2.createProducer(topic);
for (int i = 0; i < 10; i++) {
prod.send(session2.createTextMessage("test"));
}
//Test that on successful reconnection of the bridge that
//the NC sub will be removed because even though the local subscription exists,
//it no longer matches the included static filter
restartBroker(broker2, true);
assertNCDurableSubsCount(broker2, topic, 1);
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
assertSubscriptionsCount(broker1, topic, 1);
}
@Test @Test
public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Exception { public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
@ -199,9 +311,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
//After sync, remove old NC and create one for topic 2 //After sync, remove old NC and create one for topic 2
restartBroker(broker1, true); restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
assertNCDurableSubsCount(broker2, topic2, 1); assertNCDurableSubsCount(broker2, topic2, 1);
} }
@Test @Test
@ -225,6 +337,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic2, 1); assertSubscriptionsCount(broker1, topic2, 1);
restartBrokers(true); restartBrokers(true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic2, 1); assertNCDurableSubsCount(broker2, topic2, 1);
assertNCDurableSubsCount(broker2, excludeTopic, 0); assertNCDurableSubsCount(broker2, excludeTopic, 0);
@ -265,6 +378,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1); assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1);
} }
assertBridgeStarted();
} }
@ -291,6 +405,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
//not be added //not be added
restartBrokers(true); restartBrokers(true);
assertNCDurableSubsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
assertBridgeStarted();
} }
@ -312,6 +427,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
restartBrokers(true); restartBrokers(true);
assertNCDurableSubsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0);
assertBridgeStarted();
} }
@Test @Test
@ -335,6 +451,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
//bring online again //bring online again
session1.createDurableSubscriber(topic, subName); session1.createDurableSubscriber(topic, subName);
assertNCDurableSubsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
assertBridgeStarted();
} }
@ -358,6 +475,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
restartBrokers(true); restartBrokers(true);
assertNCDurableSubsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, excludeTopic, 0); assertNCDurableSubsCount(broker2, excludeTopic, 0);
assertBridgeStarted();
} }
@ -389,6 +507,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
//between the sync command and the online durables that are added over //between the sync command and the online durables that are added over
//the consumer advisory //the consumer advisory
restartBrokers(true); restartBrokers(true);
assertBridgeStarted();
//Re-create //Re-create
session1.createDurableSubscriber(topic, subName); session1.createDurableSubscriber(topic, subName);
@ -460,7 +579,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1; return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
} }
}, 10000, 500); }, 5000, 500);
} }
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -469,6 +588,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
session1 = localSession; session1 = localSession;
} else { } else {
broker2 = localBroker; broker2 = localBroker;
session2 = localSession;
} }
} }
@ -486,6 +606,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
if (flow.equals(FLOW.FORWARD)) { if (flow.equals(FLOW.FORWARD)) {
broker2 = remoteBroker; broker2 = remoteBroker;
session2 = remoteSession;
} else { } else {
broker1 = remoteBroker; broker1 = remoteBroker;
session1 = remoteSession; session1 = remoteSession;
@ -524,8 +645,10 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
connector.setDuplex(true); connector.setDuplex(true);
connector.setStaticBridge(false); connector.setStaticBridge(false);
connector.setSyncDurableSubs(true); connector.setSyncDurableSubs(true);
connector.setStaticallyIncludedDestinations(
Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(staticIncludeTopics + "?forceDurable=" + forceDurable)));
connector.setDynamicallyIncludedDestinations( connector.setDynamicallyIncludedDestinations(
Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic("include.test.>"))); Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(includedTopics)));
connector.setExcludedDestinations( connector.setExcludedDestinations(
Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(excludeTopicName))); Lists.<ActiveMQDestination>newArrayList(new ActiveMQTopic(excludeTopicName)));
return connector; return connector;

View File

@ -88,6 +88,16 @@ public abstract class DynamicNetworkTestSupport {
} }
} }
protected void assertBridgeStarted() throws Exception {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
}
}, 10000, 500));
}
protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context, protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context,
final BrokerService brokerService) throws Exception { final BrokerService brokerService) throws Exception {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
@ -181,6 +191,7 @@ public abstract class DynamicNetworkTestSupport {
destination = (Topic) d; destination = (Topic) d;
} }
for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) { for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) { if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key); DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
@ -189,6 +200,7 @@ public abstract class DynamicNetworkTestSupport {
} }
} }
} }
return subs; return subs;
} }