ARTEMIS-2854 Non-durable subscribers stop receiving after failover

In a cluster scenario where non durable subscribers fail over to
backup while another live node forwarding messages to it,
there is a chance that the the live node keeps the old remote
binding for the subs and messages go to those
old remote bindings will result in "binding not found".
This commit is contained in:
Howard Gao 2020-08-03 19:47:12 +08:00 committed by Clebert Suconic
parent 3e557f1070
commit fe5b81fd55
2 changed files with 117 additions and 9 deletions

View File

@ -1283,17 +1283,25 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
RemoteQueueBinding existingBinding = (RemoteQueueBinding) postOffice.getBinding(clusterName);
if (existingBinding != null) {
if (!existingBinding.isConnected()) {
existingBinding.connect();
if (queueID.equals(existingBinding.getRemoteQueueID())) {
if (!existingBinding.isConnected()) {
existingBinding.connect();
return;
}
// Sanity check - this means the binding has already been added via another bridge, probably max
// hops is too high
// or there are multiple cluster connections for the same address
ActiveMQServerLogger.LOGGER.remoteQueueAlreadyBoundOnClusterConnection(this, clusterName);
return;
}
// Sanity check - this means the binding has already been added via another bridge, probably max
// hops is too high
// or there are multiple cluster connections for the same address
ActiveMQServerLogger.LOGGER.remoteQueueAlreadyBoundOnClusterConnection(this, clusterName);
return;
//this could happen during jms non-durable failover while the qname doesn't change but qid
//will be re-generated in backup. In that case a new remote binding will be created
//and put it to the map and old binding removed.
if (logger.isTraceEnabled()) {
logger.trace("Removing binding because qid changed " + queueID + " old: " + existingBinding.getRemoteQueueID());
}
removeBinding(clusterName);
}
RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), queueAddress, clusterName, routingName, queueID, filterString, queue, bridge.getName(), distance + 1, messageLoadBalancingType);

View File

@ -17,8 +17,10 @@
package org.apache.activemq.artemis.tests.integration.cluster.bridge;
import java.util.ArrayList;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
@ -156,6 +158,104 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
}
@Test
public void testClusterBridgeAddRemoteBinding() throws Exception {
final String ADDRESS = "queues.testaddress";
final String QUEUE = UUID.randomUUID().toString();
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, ADDRESS, QUEUE, null, false);
addConsumer(0, 0, QUEUE, null);
waitForBindings(0, ADDRESS, 1, 1, true);
waitForBindings(1, ADDRESS, 0, 0, true);
waitForBindings(0, ADDRESS, 0, 0, false);
waitForBindings(1, ADDRESS, 1, 1, false);
ClientSession session0 = sfs[0].createSession();
ClientSession session1 = sfs[1].createSession();
session0.start();
session1.start();
ClientProducer producer1 = session1.createProducer(ADDRESS);
int NUMBER_OF_MESSAGES = 10;
//send to node1 and receive from node0
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = session1.createMessage(true);
producer1.send(msg);
session1.commit();
}
int cons0Count = 0;
while (true) {
ClientMessage msg = consumers[0].getConsumer().receive(1000);
if (msg == null) {
break;
}
cons0Count++;
msg.acknowledge();
session0.commit();
}
assertEquals(NUMBER_OF_MESSAGES, cons0Count);
//The following code similuates issue where a jms non-subscriber
//fails over to backup. In the process the temp queue is recreated
//on the backup with a new id while it's remote binding
//is created on the other node.
removeConsumer(0);
servers[0].getManagementService().enableNotifications(false);
servers[0].destroyQueue(new SimpleString(QUEUE));
servers[0].getManagementService().enableNotifications(true);
createQueue(0, ADDRESS, QUEUE, null, false);
addConsumer(0, 0, QUEUE, null);
waitForBindings(0, ADDRESS, 1, 1, true);
waitForBindings(1, ADDRESS, 0, 0, true);
waitForBindings(0, ADDRESS, 0, 0, false);
waitForBindings(1, ADDRESS, 1, 1, false);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = session1.createMessage(true);
producer1.send(msg);
session1.commit();
}
cons0Count = 0;
while (true) {
ClientMessage msg = consumers[0].getConsumer().receive(2000);
if (msg == null) {
break;
}
cons0Count++;
msg.acknowledge();
session0.commit();
}
assertEquals(NUMBER_OF_MESSAGES, cons0Count);
stopServers(0, 1);
}
@Override