ARTEMIS-3384 Fix bridge duplicate messages after reconnection
This commit is contained in:
parent
b4aef3fca8
commit
2d07d0d844
|
@ -75,13 +75,15 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
|
|||
|
||||
@Override
|
||||
public void deleteFromCache(byte[] duplicateID) {
|
||||
deleteFromCache(new ByteArray(duplicateID));
|
||||
}
|
||||
|
||||
private void deleteFromCache(final ByteArray duplicateID) {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.tracef("deleting id = %s", describeID(duplicateID));
|
||||
LOGGER.tracef("deleting id = %s", describeID(duplicateID.bytes));
|
||||
}
|
||||
|
||||
ByteArray bah = new ByteArray(duplicateID);
|
||||
|
||||
Integer posUsed = cache.remove(bah);
|
||||
Integer posUsed = cache.remove(duplicateID);
|
||||
|
||||
if (posUsed != null) {
|
||||
ByteArray id;
|
||||
|
@ -90,10 +92,10 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
|
|||
final int index = posUsed.intValue();
|
||||
id = ids.get(index);
|
||||
|
||||
if (id.equals(bah)) {
|
||||
if (id.equals(duplicateID)) {
|
||||
ids.set(index, null);
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID));
|
||||
LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID.bytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -158,6 +160,7 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
|
|||
}
|
||||
|
||||
if (instantAdd) {
|
||||
addToCacheInMemory(holder);
|
||||
tx.addOperation(new AddDuplicateIDOperation(holder, false));
|
||||
} else {
|
||||
// For a tx, it's important that the entry is not added to the cache until commit
|
||||
|
@ -262,9 +265,9 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void beforeCommit(Transaction tx) throws Exception {
|
||||
public void beforeRollback(Transaction tx) throws Exception {
|
||||
if (!afterCommit) {
|
||||
process();
|
||||
deleteFromCache(id);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -143,24 +143,26 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache {
|
|||
|
||||
@Override
|
||||
public void deleteFromCache(byte[] duplicateID) throws Exception {
|
||||
deleteFromCache(new ByteArray(duplicateID));
|
||||
}
|
||||
|
||||
private void deleteFromCache(final ByteArray duplicateID) throws Exception {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.tracef("deleting id = %s", describeID(duplicateID));
|
||||
LOGGER.tracef("deleting id = %s", describeID(duplicateID.bytes));
|
||||
}
|
||||
|
||||
final ByteArray bah = new ByteArray(duplicateID);
|
||||
|
||||
final Integer posUsed = cache.remove(bah);
|
||||
final Integer posUsed = cache.remove(duplicateID);
|
||||
|
||||
if (posUsed != null) {
|
||||
synchronized (this) {
|
||||
final ObjLongPair<ByteArray> id = ids.get(posUsed.intValue());
|
||||
|
||||
if (id.getA().equals(bah)) {
|
||||
if (id.getA().equals(duplicateID)) {
|
||||
final long recordID = id.getB();
|
||||
id.setA(null);
|
||||
id.setB(NIL);
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID, id.getB()));
|
||||
LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID.bytes, id.getB()));
|
||||
}
|
||||
storageManager.deleteDuplicateID(recordID);
|
||||
}
|
||||
|
@ -240,6 +242,7 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache {
|
|||
}
|
||||
|
||||
if (instantAdd) {
|
||||
addToCacheInMemory(holder, recordID);
|
||||
tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false));
|
||||
} else {
|
||||
// For a tx, it's important that the entry is not added to the cache until commit
|
||||
|
@ -379,9 +382,9 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void beforeCommit(Transaction tx) throws Exception {
|
||||
public void beforeRollback(Transaction tx) throws Exception {
|
||||
if (!afterCommit) {
|
||||
process();
|
||||
deleteFromCache(holder);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,13 +17,19 @@
|
|||
package org.apache.activemq.artemis.tests.integration.cluster.bridge;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
|
@ -39,12 +45,18 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
|||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Bridge;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.jboss.logging.Logger;
|
||||
|
@ -399,6 +411,127 @@ public class BridgeReconnectTest extends BridgeTestBase {
|
|||
assertNoMoreConnections();
|
||||
}
|
||||
|
||||
// Fail bridge and reconnect same node, no backup specified
|
||||
@Test
|
||||
public void testReconnectSameNodeAfterDelivery() throws Exception {
|
||||
server0 = createActiveMQServer(0, isNetty(), server0Params);
|
||||
|
||||
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
|
||||
|
||||
server0.getConfiguration().setConnectorConfigurations(connectors);
|
||||
server1.getConfiguration().setConnectorConfigurations(connectors);
|
||||
|
||||
BridgeConfiguration bridgeConfiguration = createBridgeConfig();
|
||||
|
||||
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
|
||||
bridgeConfigs.add(bridgeConfiguration);
|
||||
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
|
||||
|
||||
QueueConfiguration queueConfig0 = new QueueConfiguration(queueName).setAddress(testAddress);
|
||||
List<QueueConfiguration> queueConfigs0 = new ArrayList<>();
|
||||
queueConfigs0.add(queueConfig0);
|
||||
server0.getConfiguration().setQueueConfigs(queueConfigs0);
|
||||
|
||||
QueueConfiguration queueConfig1 = new QueueConfiguration(queueName).setAddress(forwardAddress);
|
||||
List<QueueConfiguration> queueConfigs1 = new ArrayList<>();
|
||||
queueConfigs1.add(queueConfig1);
|
||||
server1.getConfiguration().setQueueConfigs(queueConfigs1);
|
||||
|
||||
startServers();
|
||||
|
||||
locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(server0tc, server1tc));
|
||||
ClientSessionFactory csf0 = locator.createSessionFactory(server0tc);
|
||||
session0 = csf0.createSession(false, true, true);
|
||||
|
||||
ClientSessionFactory csf1 = locator.createSessionFactory(server1tc);
|
||||
session1 = csf1.createSession(false, true, true);
|
||||
|
||||
ClientProducer prod0 = session0.createProducer(testAddress);
|
||||
|
||||
ClientConsumer cons1 = session1.createConsumer(queueName);
|
||||
|
||||
session1.start();
|
||||
|
||||
final ManagementService managementService = server0.getManagementService();
|
||||
QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.QUEUE + queueName);
|
||||
assertEquals(0, coreQueueControl.getDeliveringCount());
|
||||
|
||||
final int numMessages = NUM_MESSAGES;
|
||||
|
||||
SimpleString propKey = new SimpleString("propkey");
|
||||
|
||||
CyclicBarrier routingBarrier = new CyclicBarrier(2);
|
||||
CountDownLatch deliveryBeforeFailureLatch = new CountDownLatch(numMessages);
|
||||
CountDownLatch deliveryAfterFailureLatch = new CountDownLatch(2 * numMessages);
|
||||
List<Message> sendingMessages = Collections.synchronizedList(new ArrayList<>());
|
||||
Map<Integer, ClientMessage> clientMessages = new ConcurrentHashMap<>();
|
||||
|
||||
server0.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() {
|
||||
@Override
|
||||
public void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
|
||||
ActiveMQServerPlugin.super.afterDeliverBridge(bridge, ref, status);
|
||||
|
||||
deliveryBeforeFailureLatch.countDown();
|
||||
deliveryAfterFailureLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
server1.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() {
|
||||
@Override
|
||||
public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
|
||||
sendingMessages.add(message);
|
||||
try {
|
||||
// Simulate CPU load until bridge delivery after failure
|
||||
deliveryAfterFailureLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
log.debug(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
|
||||
if (sendingMessages.contains(message)) {
|
||||
try {
|
||||
// Force duplicateID atomicVerify of messages delivered again by the bridge after failure
|
||||
// before routing messages delivered by bridge before failure
|
||||
routingBarrier.await();
|
||||
} catch (InterruptedException e) {
|
||||
log.debug(e);
|
||||
} catch (BrokenBarrierException e) {
|
||||
log.debug(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = session0.createMessage(false);
|
||||
message.putIntProperty(propKey, i);
|
||||
|
||||
prod0.send(message);
|
||||
}
|
||||
|
||||
deliveryBeforeFailureLatch.await();
|
||||
|
||||
assertEquals(numMessages, coreQueueControl.getDeliveringCount());
|
||||
|
||||
// Now we will simulate a failure of the bridge connection between server0 and server1
|
||||
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
|
||||
assertNotNull(bridge);
|
||||
RemotingConnection forwardingConnection = getForwardingConnection(bridge);
|
||||
forwardingConnection.fail(new ActiveMQNotConnectedException());
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage r1 = cons1.receive(1500);
|
||||
assertNotNull(r1);
|
||||
assertNull(clientMessages.putIfAbsent(r1.getIntProperty(propKey), r1));
|
||||
}
|
||||
closeServers();
|
||||
|
||||
assertNoMoreConnections();
|
||||
}
|
||||
|
||||
// We test that we can pause more than client failure check period (to prompt the pinger to failing)
|
||||
// before reconnecting
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue