AMQ-8023 - rework fix to deal with addSub interleaved with removeDestination advisory processing, serialise add/remove dest such that add is not lost and new sub resubscribes ok, extra verifications in the test

This commit is contained in:
gtully 2020-08-27 16:31:50 +01:00
parent c0e6d47121
commit 5c8086961f
2 changed files with 29 additions and 38 deletions

View File

@ -33,7 +33,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -979,7 +978,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
} else if (data.getClass() == DestinationInfo.class) {
// It's a destination info - we want to pass up information about temporary destinations
final DestinationInfo destInfo = (DestinationInfo) data;
BrokerId[] path = destInfo.getBrokerPath();
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
@ -1003,21 +1001,21 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
});
if (destInfo.isRemoveOperation()) {
// Serialize with removeSub operations such that all removeSub advisories
// are generated
serialExecutor.execute(new Runnable() {
@Override
public void run() {
try {
localBroker.oneway(destInfo);
} catch (IOException e) {
LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
}
}
});
} else {
localBroker.oneway(destInfo);
// not synced with addSubs so we will need to ignore any potential new subs with a timeout!=0
destInfo.setTimeout(1);
}
// Serialize both add/remove dest with removeSub operations such that all removeSub advisories are generated
serialExecutor.execute(new Runnable() {
@Override
public void run() {
try {
localBroker.oneway(destInfo);
} catch (IOException e) {
LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
}
}
});
} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);
@ -1149,28 +1147,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
}
protected void addSubscription(final DemandSubscription sub) throws IOException {
protected void addSubscription(DemandSubscription sub) throws IOException {
if (sub != null) {
// Serialize with remove operations such that new sub does not cause remove/purge to fail
// remain synchronous b/c duplicate suppression depends on add completion
FutureTask syncTask = new FutureTask(new Runnable() {
@Override
public void run() {
try {
localBroker.oneway(sub.getLocalInfo());
} catch (IOException e) {
LOG.warn("failed to deliver add sub command: {}, cause: {}", sub.getLocalInfo(), e);
LOG.debug("detail", e);
}
}
}, null);
try {
serialExecutor.execute(syncTask);
syncTask.get();
} catch (Exception e) {
LOG.warn("failed to execute add sub command: {}, cause: {}", sub.getLocalInfo(), e);
LOG.debug("detail", e);
}
localBroker.oneway(sub.getLocalInfo());
}
}
@ -1182,7 +1161,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
// continue removal in separate thread to free up tshis thread for outstanding responses
// continue removal in separate thread to free up this thread for outstanding responses
// Serialize with removeDestination operations so that removeSubs are serialized with
// removeDestinations such that all removeSub advisories are generated
serialExecutor.execute(new Runnable() {

View File

@ -27,6 +27,7 @@ import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.network.NetworkBridge;
@ -330,6 +331,17 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
// release bridge remove ops *after* new/re subscription
removeOp.countDown();
assertTrue("All destinations and subs recreated and consumers connected on brokerTwo via network", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
BrokerView brokerView = brokerTwo.getAdminView();
int numQueues = brokerView.getQueues().length;
int numSubscriptions = brokerView.getQueueSubscribers().length;
LOG.info("#Queues: " + numQueues + ", #Subs: " + numSubscriptions);
return numQueues == numDests && numSubscriptions == numDests;
}
}));
Message msg = notClean.receive(500, TimeUnit.MILLISECONDS);
assertNull(msg);
notClean.disconnect();