ARTEMIS-3384 Adding tests around duplicate detection

This commit is contained in:
Clebert Suconic 2021-07-15 22:51:43 -04:00
parent 277aa3706a
commit c479cb558a
3 changed files with 128 additions and 2 deletions

View File

@ -20,6 +20,9 @@ import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
@ -42,11 +45,27 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class DuplicateDetectionTest extends ActiveMQTestBase {
@Parameterized.Parameters(name = "persistentCache={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}
@Parameterized.Parameter(0)
public boolean persistCache;
private final Logger log = Logger.getLogger(this.getClass());
private ActiveMQServer server;
@ -217,6 +236,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
// we would eventually have a higher number of caches while we couldn't have time to clear previous ones
@Test
public void testShrinkCache() throws Exception {
Assume.assumeTrue("This test would restart the server", persistCache);
server.stop();
server.getConfiguration().setIDCacheSize(150);
server.start();
@ -1454,6 +1474,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
@Test
public void testPersistTransactional() throws Exception {
Assume.assumeTrue("This test would restart the server", persistCache);
ClientSession session = sf.createSession(false, false, false);
session.start();
@ -1709,6 +1730,8 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
@Test
public void testPersistXA1() throws Exception {
Assume.assumeTrue("This test would restart the server", persistCache);
ClientSession session = addClientSession(sf.createSession(true, false, false));
Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
@ -1802,7 +1825,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
config = createDefaultInVMConfig().setIDCacheSize(cacheSize);
config = createDefaultInVMConfig().setIDCacheSize(cacheSize).setPersistIDCache(persistCache);
server = createServer(true, config);

View File

@ -22,6 +22,8 @@ import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -45,12 +47,30 @@ import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Test broker behavior when creating AMQP senders
*/
@RunWith(Parameterized.class)
public class AmqpSenderTest extends AmqpClientTestSupport {
@Parameterized.Parameters(name = "persistentCache={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}
@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setPersistIDCache(persistCache);
}
@Parameterized.Parameter(0)
public boolean persistCache;
@Override
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
}
@ -252,6 +272,48 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testDuplicateDetectionRollback() throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
try (Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
javax.jms.Queue producerQueue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(producerQueue);
javax.jms.Message message = session.createTextMessage("test");
message.setStringProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123");
producer.send(message);
session.rollback();
producer.send(message);
session.commit();
connection.start();
MessageConsumer consumer = session.createConsumer(producerQueue);
Assert.assertNotNull(consumer.receive(5000));
Assert.assertNull(consumer.receiveNoWait());
session.commit();
Queue serverQueue = server.locateQueue(getQueueName());
Wait.assertEquals(0, serverQueue::getMessageCount);
message = session.createTextMessage("test");
message.setStringProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123");
producer.send(message);
boolean error = false;
try {
session.commit();
} catch (Exception e) {
error = true;
}
Assert.assertTrue(error);
}
}
@Test(timeout = 60000)
public void testSenderCreditReplenishment() throws Exception {
AtomicInteger counter = new AtomicInteger();

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.tests.integration.cluster.bridge;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -25,6 +27,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -43,6 +46,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.HandleStatus;
@ -60,11 +64,26 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class BridgeReconnectTest extends BridgeTestBase {
@Parameterized.Parameters(name = "persistentCache={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}
@Parameterized.Parameter(0)
public boolean persistCache;
private static final Logger log = Logger.getLogger(BridgeReconnectTest.class);
private static final int NUM_MESSAGES = 100;
@ -412,8 +431,9 @@ public class BridgeReconnectTest extends BridgeTestBase {
}
// Fail bridge and reconnect same node, no backup specified
// It will keep a send blocking as if CPU was making it creep
@Test
public void testReconnectSameNodeAfterDelivery() throws Exception {
public void testReconnectSameNodeAfterDeliveryWithBlocking() throws Exception {
server0 = createActiveMQServer(0, isNetty(), server0Params);
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
@ -530,6 +550,15 @@ public class BridgeReconnectTest extends BridgeTestBase {
closeServers();
assertNoMoreConnections();
HashMap<Integer, AtomicInteger> counts = countJournal(server1.getConfiguration());
if (persistCache) {
// There should be one record per message
Assert.assertEquals(numMessages, counts.get(new Integer(JournalRecordIds.DUPLICATE_ID)).intValue());
} else {
// no cache means there shouldn't be an id anywhere
Assert.assertNull(counts.get(new Integer(JournalRecordIds.DUPLICATE_ID)));
}
}
// We test that we can pause more than client failure check period (to prompt the pinger to failing)
@ -545,6 +574,7 @@ public class BridgeReconnectTest extends BridgeTestBase {
}
private void testShutdownServerCleanlyAndReconnectSameNode(final boolean sleep) throws Exception {
Assume.assumeTrue(persistCache);
server0 = createActiveMQServer(0, isNetty(), server0Params);
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
@ -842,4 +872,15 @@ public class BridgeReconnectTest extends BridgeTestBase {
throw new IllegalStateException("Failed to get forwarding connection");
}
@Override
protected ActiveMQServer createActiveMQServer(final int id,
final Map<String, Object> params,
final boolean netty,
final NodeManager nodeManager) throws Exception {
ActiveMQServer server = super.createActiveMQServer(id, params, netty, nodeManager);
server.getConfiguration().setPersistIDCache(persistCache);
return server;
}
}