AMQ-8023 - serialize sub add with destination removal advisory processing to avoid resub blocking a necessary purge via removal, fix and test via mqtt clean session scenario

This commit is contained in:
gtully 2020-08-25 16:56:50 +01:00
parent 213f381c95
commit 0e2b24be36
5 changed files with 141 additions and 4 deletions

View File

@ -267,7 +267,7 @@ public abstract class AbstractRegion implements Region {
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
Subscription sub = iter.next();
if (sub.matches(destination) ) {
throw new JMSException("Destination still has an active subscription: " + destination);
throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
}
}
}

View File

@ -1148,9 +1148,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
}
protected void addSubscription(DemandSubscription sub) throws IOException {
protected void addSubscription(final DemandSubscription sub) throws IOException {
if (sub != null) {
// Serialize with remove operations such that new sub does not cause remove/purge to fail
serialExecutor.execute(new Runnable() {
@Override
public void run() {
try {
localBroker.oneway(sub.getLocalInfo());
} catch (IOException e) {
LOG.warn("failed to deliver add sub command: {}, cause: {}", sub.getLocalInfo(), e);
LOG.debug("detail", e);
}
}
});
}
}

View File

@ -64,6 +64,7 @@ public class MQTTTestSupport {
protected String protocolConfig;
protected String protocolScheme;
protected boolean useSSL;
protected boolean advisorySupport;
public static final int AT_MOST_ONCE = 0;
public static final int AT_LEAST_ONCE = 1;
@ -145,7 +146,7 @@ public class MQTTTestSupport {
kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
brokerService.setPersistenceAdapter(kaha);
}
brokerService.setAdvisorySupport(false);
brokerService.setAdvisorySupport(advisorySupport);
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateConnector(false);
brokerService.setSchedulerSupport(isSchedulerSupportEnabled());

View File

@ -17,11 +17,21 @@
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkBridgeListener;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
@ -37,7 +47,9 @@ import org.slf4j.LoggerFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Run the basic tests with the NIO Transport.
@ -217,4 +229,112 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
connection.disconnect();
}
@Test(timeout = 6000 * 1000)
public void testCleanSessionWithNetworkOfBrokersRemoveAddRace() throws Exception {
stopBroker();
advisorySupport = true;
startBroker();
BrokerService brokerTwo = createBroker(false);
brokerTwo.setBrokerName("BrokerTwo");
final NetworkConnector networkConnector = brokerTwo.addNetworkConnector("static:" + jmsUri);
networkConnector.setDestinationFilter("ActiveMQ.Advisory.Consumer.Queue.>,ActiveMQ.Advisory.Queue");
// let remove Ops backup on the executor
final CountDownLatch removeOp = new CountDownLatch(1);
brokerTwo.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() {
@Override
public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
// delay remove ops till subscription is renewed such that the single thread executor backs up
removeOp.await(50, TimeUnit.SECONDS);
super.removeDestinationInfo(context, destInfo);
}
}});
brokerTwo.start();
assertTrue("Bridge created", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return !networkConnector.activeBridges().isEmpty();
}
}));
// track an error on the network bridge
final AtomicBoolean failed = new AtomicBoolean();
NetworkBridgeListener listener = new NetworkBridgeListener() {
@Override
public void bridgeFailed() {
failed.set(true);
}
@Override
public void onStart(NetworkBridge bridge) {
}
@Override
public void onStop(NetworkBridge bridge) {
}
@Override
public void onOutboundMessage(NetworkBridge bridge, org.apache.activemq.command.Message message) {
}
@Override
public void onInboundMessage(NetworkBridge bridge, org.apache.activemq.command.Message message) {
}
};
for (NetworkBridge bridge : networkConnector.activeBridges()) {
bridge.setNetworkBridgeListener(listener);
}
final int numDests = 100;
// subscribe with durability
final String CLIENTID = "clean-session";
final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false);
BlockingConnection notClean = mqttNotClean.blockingConnection();
notClean.connect();
for (int i=0; i<numDests; i++) {
final String TOPIC = "TopicA-" + i;
notClean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
}
notClean.disconnect();
// whack any old state with reconnect clean
final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
final BlockingConnection clean = mqttClean.blockingConnection();
clean.connect();
for (int i=0; i<numDests; i++) {
final String TOPIC = "TopicA-" + i;
clean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
}
clean.disconnect();
// subscribe again with durability
notClean = mqttNotClean.blockingConnection();
notClean.connect();
for (int i=0; i<numDests; i++) {
final String TOPIC = "TopicA-" + i;
notClean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
}
// release bridge remove ops *after* new/re subscription
removeOp.countDown();
Message msg = notClean.receive(500, TimeUnit.MILLISECONDS);
assertNull(msg);
notClean.disconnect();
assertFalse("bridge did not fail", failed.get());
brokerTwo.stop();
}
}

View File

@ -94,6 +94,7 @@ public class NetworkRouteTest {
// (1) send advisory of path 1
remoteListener.onCommand(path1Msg);
localConsumer.latch.await(5, TimeUnit.SECONDS);
msgDispatch.setConsumerId(((ConsumerInfo) localConsumer.arguments[0]).getConsumerId());
// send advisory of path 2, doesn't send a ConsumerInfo to localBroker
remoteListener.onCommand(path2Msg);
@ -145,6 +146,7 @@ public class NetworkRouteTest {
// (1) send advisory of path 1
remoteListener.onCommand(path1Msg);
localConsumer.latch.await(5, TimeUnit.SECONDS);
msgDispatch.setConsumerId(((ConsumerInfo) localConsumer.arguments[0]).getConsumerId());
// send advisory of path 2, doesn't send a ConsumerInfo to localBroker
remoteListener.onCommand(path2Msg);
@ -280,6 +282,7 @@ public class NetworkRouteTest {
private static class ArgHolder {
public Object[] arguments;
final CountDownLatch latch = new CountDownLatch(1);
public static ArgHolder holdArgsForLastVoidCall() {
final ArgHolder holder = new ArgHolder();
@ -295,12 +298,14 @@ public class NetworkRouteTest {
}
public static ArgHolder holdArgsForLastObjectCall() {
final ArgHolder holder = new ArgHolder();
EasyMock.expect(new Object()).andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
Object[] args = EasyMock.getCurrentArguments();
holder.arguments = Arrays.copyOf(args, args.length);
holder.latch.countDown();
return null;
}
});