From 5c8086961fb9e052e30a5846e6b738267de7c4a0 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 27 Aug 2020 16:31:50 +0100 Subject: [PATCH] AMQ-8023 - rework fix to deal with addSub interleaved with removeDestination advisory processing, serialise add/remove dest such that add is not lost and new sub resubscribes ok, extra verifications in the test --- .../DemandForwardingBridgeSupport.java | 55 ++++++------------- .../MQTTVirtualTopicSubscriptionsTest.java | 12 ++++ 2 files changed, 29 insertions(+), 38 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 6441c92cca..602e6eb56a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -33,7 +33,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -979,7 +978,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } } else if (data.getClass() == DestinationInfo.class) { - // It's a destination info - we want to pass up information about temporary destinations final DestinationInfo destInfo = (DestinationInfo) data; BrokerId[] path = destInfo.getBrokerPath(); if (path != null && networkTTL > -1 && path.length >= networkTTL) { @@ -1003,21 +1001,21 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo }); if (destInfo.isRemoveOperation()) { - // Serialize with removeSub operations such that all removeSub advisories - // are generated - serialExecutor.execute(new Runnable() { - @Override - public void run() { - try { - localBroker.oneway(destInfo); - } catch (IOException e) { - LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e); - } - } - }); - } else { - localBroker.oneway(destInfo); + // not synced with addSubs so we will need to ignore any potential new subs with a timeout!=0 + destInfo.setTimeout(1); } + // Serialize both add/remove dest with removeSub operations such that all removeSub advisories are generated + serialExecutor.execute(new Runnable() { + @Override + public void run() { + try { + localBroker.oneway(destInfo); + } catch (IOException e) { + LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e); + } + } + }); + } else if (data.getClass() == RemoveInfo.class) { ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); removeDemandSubscription(id); @@ -1149,28 +1147,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; } - protected void addSubscription(final DemandSubscription sub) throws IOException { + protected void addSubscription(DemandSubscription sub) throws IOException { if (sub != null) { - // Serialize with remove operations such that new sub does not cause remove/purge to fail - // remain synchronous b/c duplicate suppression depends on add completion - FutureTask syncTask = new FutureTask(new Runnable() { - @Override - public void run() { - try { - localBroker.oneway(sub.getLocalInfo()); - } catch (IOException e) { - LOG.warn("failed to deliver add sub command: {}, cause: {}", sub.getLocalInfo(), e); - LOG.debug("detail", e); - } - } - }, null); - try { - serialExecutor.execute(syncTask); - syncTask.get(); - } catch (Exception e) { - LOG.warn("failed to execute add sub command: {}, cause: {}", sub.getLocalInfo(), e); - LOG.debug("detail", e); - } + localBroker.oneway(sub.getLocalInfo()); } } @@ -1182,7 +1161,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); - // continue removal in separate thread to free up tshis thread for outstanding responses + // continue removal in separate thread to free up this thread for outstanding responses // Serialize with removeDestination operations so that removeSubs are serialized with // removeDestinations such that all removeSub advisories are generated serialExecutor.execute(new Runnable() { diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java index 4eedc1cbc5..c601a8e530 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java @@ -27,6 +27,7 @@ import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.jmx.BrokerView; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.network.NetworkBridge; @@ -330,6 +331,17 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest { // release bridge remove ops *after* new/re subscription removeOp.countDown(); + assertTrue("All destinations and subs recreated and consumers connected on brokerTwo via network", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + BrokerView brokerView = brokerTwo.getAdminView(); + int numQueues = brokerView.getQueues().length; + int numSubscriptions = brokerView.getQueueSubscribers().length; + + LOG.info("#Queues: " + numQueues + ", #Subs: " + numSubscriptions); + return numQueues == numDests && numSubscriptions == numDests; + } + })); Message msg = notClean.receive(500, TimeUnit.MILLISECONDS); assertNull(msg); notClean.disconnect();