From 2d07d0d84419698c0bc6e5c87a228e6b3ac9f26f Mon Sep 17 00:00:00 2001 From: Domenico Francesco Bruscino Date: Thu, 15 Jul 2021 10:58:28 +0200 Subject: [PATCH] ARTEMIS-3384 Fix bridge duplicate messages after reconnection --- .../impl/InMemoryDuplicateIDCache.java | 19 +-- .../impl/PersistentDuplicateIDCache.java | 19 +-- .../cluster/bridge/BridgeReconnectTest.java | 133 ++++++++++++++++++ 3 files changed, 155 insertions(+), 16 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java index f1f86142df..0105186d5e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java @@ -75,13 +75,15 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache { @Override public void deleteFromCache(byte[] duplicateID) { + deleteFromCache(new ByteArray(duplicateID)); + } + + private void deleteFromCache(final ByteArray duplicateID) { if (LOGGER.isTraceEnabled()) { - LOGGER.tracef("deleting id = %s", describeID(duplicateID)); + LOGGER.tracef("deleting id = %s", describeID(duplicateID.bytes)); } - ByteArray bah = new ByteArray(duplicateID); - - Integer posUsed = cache.remove(bah); + Integer posUsed = cache.remove(duplicateID); if (posUsed != null) { ByteArray id; @@ -90,10 +92,10 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache { final int index = posUsed.intValue(); id = ids.get(index); - if (id.equals(bah)) { + if (id.equals(duplicateID)) { ids.set(index, null); if (LOGGER.isTraceEnabled()) { - LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID)); + LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID.bytes)); } } } @@ -158,6 +160,7 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache { } if (instantAdd) { + addToCacheInMemory(holder); tx.addOperation(new AddDuplicateIDOperation(holder, false)); } else { // For a tx, it's important that the entry is not added to the cache until commit @@ -262,9 +265,9 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache { } @Override - public void beforeCommit(Transaction tx) throws Exception { + public void beforeRollback(Transaction tx) throws Exception { if (!afterCommit) { - process(); + deleteFromCache(id); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java index 3e3758d3c9..cfdb972071 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java @@ -143,24 +143,26 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache { @Override public void deleteFromCache(byte[] duplicateID) throws Exception { + deleteFromCache(new ByteArray(duplicateID)); + } + + private void deleteFromCache(final ByteArray duplicateID) throws Exception { if (LOGGER.isTraceEnabled()) { - LOGGER.tracef("deleting id = %s", describeID(duplicateID)); + LOGGER.tracef("deleting id = %s", describeID(duplicateID.bytes)); } - final ByteArray bah = new ByteArray(duplicateID); - - final Integer posUsed = cache.remove(bah); + final Integer posUsed = cache.remove(duplicateID); if (posUsed != null) { synchronized (this) { final ObjLongPair id = ids.get(posUsed.intValue()); - if (id.getA().equals(bah)) { + if (id.getA().equals(duplicateID)) { final long recordID = id.getB(); id.setA(null); id.setB(NIL); if (LOGGER.isTraceEnabled()) { - LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID, id.getB())); + LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID.bytes, id.getB())); } storageManager.deleteDuplicateID(recordID); } @@ -240,6 +242,7 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache { } if (instantAdd) { + addToCacheInMemory(holder, recordID); tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false)); } else { // For a tx, it's important that the entry is not added to the cache until commit @@ -379,9 +382,9 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache { } @Override - public void beforeCommit(Transaction tx) throws Exception { + public void beforeRollback(Transaction tx) throws Exception { if (!afterCommit) { - process(); + deleteFromCache(holder); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java index 49680f2f38..1d9c5669cd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java @@ -17,13 +17,19 @@ package org.apache.activemq.artemis.tests.integration.cluster.bridge; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -39,12 +45,18 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.HandleStatus; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.core.server.management.ManagementService; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.Wait; import org.jboss.logging.Logger; @@ -399,6 +411,127 @@ public class BridgeReconnectTest extends BridgeTestBase { assertNoMoreConnections(); } + // Fail bridge and reconnect same node, no backup specified + @Test + public void testReconnectSameNodeAfterDelivery() throws Exception { + server0 = createActiveMQServer(0, isNetty(), server0Params); + + TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc"); + + server0.getConfiguration().setConnectorConfigurations(connectors); + server1.getConfiguration().setConnectorConfigurations(connectors); + + BridgeConfiguration bridgeConfiguration = createBridgeConfig(); + + List bridgeConfigs = new ArrayList<>(); + bridgeConfigs.add(bridgeConfiguration); + server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); + + QueueConfiguration queueConfig0 = new QueueConfiguration(queueName).setAddress(testAddress); + List queueConfigs0 = new ArrayList<>(); + queueConfigs0.add(queueConfig0); + server0.getConfiguration().setQueueConfigs(queueConfigs0); + + QueueConfiguration queueConfig1 = new QueueConfiguration(queueName).setAddress(forwardAddress); + List queueConfigs1 = new ArrayList<>(); + queueConfigs1.add(queueConfig1); + server1.getConfiguration().setQueueConfigs(queueConfigs1); + + startServers(); + + locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(server0tc, server1tc)); + ClientSessionFactory csf0 = locator.createSessionFactory(server0tc); + session0 = csf0.createSession(false, true, true); + + ClientSessionFactory csf1 = locator.createSessionFactory(server1tc); + session1 = csf1.createSession(false, true, true); + + ClientProducer prod0 = session0.createProducer(testAddress); + + ClientConsumer cons1 = session1.createConsumer(queueName); + + session1.start(); + + final ManagementService managementService = server0.getManagementService(); + QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.QUEUE + queueName); + assertEquals(0, coreQueueControl.getDeliveringCount()); + + final int numMessages = NUM_MESSAGES; + + SimpleString propKey = new SimpleString("propkey"); + + CyclicBarrier routingBarrier = new CyclicBarrier(2); + CountDownLatch deliveryBeforeFailureLatch = new CountDownLatch(numMessages); + CountDownLatch deliveryAfterFailureLatch = new CountDownLatch(2 * numMessages); + List sendingMessages = Collections.synchronizedList(new ArrayList<>()); + Map clientMessages = new ConcurrentHashMap<>(); + + server0.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() { + @Override + public void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException { + ActiveMQServerPlugin.super.afterDeliverBridge(bridge, ref, status); + + deliveryBeforeFailureLatch.countDown(); + deliveryAfterFailureLatch.countDown(); + } + }); + + + server1.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() { + @Override + public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException { + sendingMessages.add(message); + try { + // Simulate CPU load until bridge delivery after failure + deliveryAfterFailureLatch.await(); + } catch (InterruptedException e) { + log.debug(e); + } + } + + @Override + public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException { + if (sendingMessages.contains(message)) { + try { + // Force duplicateID atomicVerify of messages delivered again by the bridge after failure + // before routing messages delivered by bridge before failure + routingBarrier.await(); + } catch (InterruptedException e) { + log.debug(e); + } catch (BrokenBarrierException e) { + log.debug(e); + } + } + } + }); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session0.createMessage(false); + message.putIntProperty(propKey, i); + + prod0.send(message); + } + + deliveryBeforeFailureLatch.await(); + + assertEquals(numMessages, coreQueueControl.getDeliveringCount()); + + // Now we will simulate a failure of the bridge connection between server0 and server1 + Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName); + assertNotNull(bridge); + RemotingConnection forwardingConnection = getForwardingConnection(bridge); + forwardingConnection.fail(new ActiveMQNotConnectedException()); + + for (int i = 0; i < numMessages; i++) { + ClientMessage r1 = cons1.receive(1500); + assertNotNull(r1); + assertNull(clientMessages.putIfAbsent(r1.getIntProperty(propKey), r1)); + } + closeServers(); + + assertNoMoreConnections(); + } + // We test that we can pause more than client failure check period (to prompt the pinger to failing) // before reconnecting @Test