diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java index 75bfc7986c..045fde9ecf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java @@ -20,6 +20,9 @@ import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.util.Arrays; +import java.util.Collection; + import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; @@ -42,11 +45,27 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.jboss.logging.Logger; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class DuplicateDetectionTest extends ActiveMQTestBase { + @Parameterized.Parameters(name = "persistentCache={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true}, {false} + }); + } + + @Parameterized.Parameter(0) + public boolean persistCache; + + + private final Logger log = Logger.getLogger(this.getClass()); private ActiveMQServer server; @@ -217,6 +236,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase { // we would eventually have a higher number of caches while we couldn't have time to clear previous ones @Test public void testShrinkCache() throws Exception { + Assume.assumeTrue("This test would restart the server", persistCache); server.stop(); server.getConfiguration().setIDCacheSize(150); server.start(); @@ -1454,6 +1474,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase { @Test public void testPersistTransactional() throws Exception { + Assume.assumeTrue("This test would restart the server", persistCache); ClientSession session = sf.createSession(false, false, false); session.start(); @@ -1709,6 +1730,8 @@ public class DuplicateDetectionTest extends ActiveMQTestBase { @Test public void testPersistXA1() throws Exception { + Assume.assumeTrue("This test would restart the server", persistCache); + ClientSession session = addClientSession(sf.createSession(true, false, false)); Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); @@ -1802,7 +1825,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); - config = createDefaultInVMConfig().setIDCacheSize(cacheSize); + config = createDefaultInVMConfig().setIDCacheSize(cacheSize).setPersistIDCache(persistCache); server = createServer(true, config); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java index f556bdf36e..62b9d4d885 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java @@ -22,6 +22,8 @@ import javax.jms.DeliveryMode; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -45,12 +47,30 @@ import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Sender; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Test broker behavior when creating AMQP senders */ +@RunWith(Parameterized.class) public class AmqpSenderTest extends AmqpClientTestSupport { + @Parameterized.Parameters(name = "persistentCache={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true}, {false} + }); + } + + @Override + protected void addConfiguration(ActiveMQServer server) { + server.getConfiguration().setPersistIDCache(persistCache); + } + + @Parameterized.Parameter(0) + public boolean persistCache; + @Override protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { } @@ -252,6 +272,48 @@ public class AmqpSenderTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testDuplicateDetectionRollback() throws Exception { + + ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672"); + try (Connection connection = factory.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) { + javax.jms.Queue producerQueue = session.createQueue(getQueueName()); + + MessageProducer producer = session.createProducer(producerQueue); + javax.jms.Message message = session.createTextMessage("test"); + message.setStringProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123"); + producer.send(message); + session.rollback(); + + producer.send(message); + session.commit(); + + connection.start(); + + MessageConsumer consumer = session.createConsumer(producerQueue); + Assert.assertNotNull(consumer.receive(5000)); + Assert.assertNull(consumer.receiveNoWait()); + session.commit(); + + Queue serverQueue = server.locateQueue(getQueueName()); + Wait.assertEquals(0, serverQueue::getMessageCount); + + message = session.createTextMessage("test"); + message.setStringProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123"); + producer.send(message); + boolean error = false; + try { + session.commit(); + } catch (Exception e) { + error = true; + } + Assert.assertTrue(error); + + + } + } + @Test(timeout = 60000) public void testSenderCreditReplenishment() throws Exception { AtomicInteger counter = new AtomicInteger(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java index 1d9c5669cd..f7333568a5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.tests.integration.cluster.bridge; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -25,6 +27,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -43,6 +46,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.HandleStatus; @@ -60,11 +64,26 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.Wait; import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class BridgeReconnectTest extends BridgeTestBase { + @Parameterized.Parameters(name = "persistentCache={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true}, {false} + }); + } + + @Parameterized.Parameter(0) + public boolean persistCache; + private static final Logger log = Logger.getLogger(BridgeReconnectTest.class); private static final int NUM_MESSAGES = 100; @@ -412,8 +431,9 @@ public class BridgeReconnectTest extends BridgeTestBase { } // Fail bridge and reconnect same node, no backup specified + // It will keep a send blocking as if CPU was making it creep @Test - public void testReconnectSameNodeAfterDelivery() throws Exception { + public void testReconnectSameNodeAfterDeliveryWithBlocking() throws Exception { server0 = createActiveMQServer(0, isNetty(), server0Params); TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc"); @@ -530,6 +550,15 @@ public class BridgeReconnectTest extends BridgeTestBase { closeServers(); assertNoMoreConnections(); + + HashMap counts = countJournal(server1.getConfiguration()); + if (persistCache) { + // There should be one record per message + Assert.assertEquals(numMessages, counts.get(new Integer(JournalRecordIds.DUPLICATE_ID)).intValue()); + } else { + // no cache means there shouldn't be an id anywhere + Assert.assertNull(counts.get(new Integer(JournalRecordIds.DUPLICATE_ID))); + } } // We test that we can pause more than client failure check period (to prompt the pinger to failing) @@ -545,6 +574,7 @@ public class BridgeReconnectTest extends BridgeTestBase { } private void testShutdownServerCleanlyAndReconnectSameNode(final boolean sleep) throws Exception { + Assume.assumeTrue(persistCache); server0 = createActiveMQServer(0, isNetty(), server0Params); TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc"); @@ -842,4 +872,15 @@ public class BridgeReconnectTest extends BridgeTestBase { throw new IllegalStateException("Failed to get forwarding connection"); } + + @Override + protected ActiveMQServer createActiveMQServer(final int id, + final Map params, + final boolean netty, + final NodeManager nodeManager) throws Exception { + ActiveMQServer server = super.createActiveMQServer(id, params, netty, nodeManager); + server.getConfiguration().setPersistIDCache(persistCache); + return server; + } + } diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index 7fb05df653..59b7f17922 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -848,6 +848,38 @@ ${basedir}/target/classes/servers/brokerConnect/qdr + + test-compile + create-bridge-transfer-A + + create + + + amq + artemis + artemis + true + true + ${basedir}/target/bridgeTransfer/serverA + ${basedir}/target/classes/servers/bridgeTransfer/serverA + + + + test-compile + create-bridge-transfer-B + + create + + + amq + artemis + artemis + true + true + ${basedir}/target/bridgeTransfer/serverB + ${basedir}/target/classes/servers/bridgeTransfer/serverB + + diff --git a/tests/smoke-tests/src/main/resources/servers/bridgeTransfer/serverA/broker.xml b/tests/smoke-tests/src/main/resources/servers/bridgeTransfer/serverA/broker.xml new file mode 100644 index 0000000000..b43b5b57a8 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/bridgeTransfer/serverA/broker.xml @@ -0,0 +1,247 @@ + + + + + + + + 0.0.0.0 + + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + + + true + + 2 + + 10 + + 4096 + + 10M + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + true + + 120000 + + 60000 + + HALT + + + + + + + + + + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + + + + + tcp://localhost:61617 + + + + + + bridgeQueue + 100 + -1 + + other-side + + + + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + +
+ + + +
+
+ + + +
+ +
+ + + +
+ +
+ + + + +
+
diff --git a/tests/smoke-tests/src/main/resources/servers/bridgeTransfer/serverB/broker.xml b/tests/smoke-tests/src/main/resources/servers/bridgeTransfer/serverB/broker.xml new file mode 100644 index 0000000000..8920f419f7 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/bridgeTransfer/serverB/broker.xml @@ -0,0 +1,229 @@ + + + + + + + + 0.0.0.0 + + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + + + true + + 2 + + 10 + + 4096 + + 10M + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + true + + 120000 + + 60000 + + HALT + + + + + + + + + + + + + + + + + + tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + +
+ + + +
+
+ + + +
+ +
+ + + +
+ +
+ + + + +
+
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/bridgeTransfer/BridgeTransferingTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/bridgeTransfer/BridgeTransferingTest.java new file mode 100644 index 0000000000..5b58857b8e --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/bridgeTransfer/BridgeTransferingTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.bridgeTransfer; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.jboss.logging.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class BridgeTransferingTest extends SmokeTestBase { + + public static final String SERVER_NAME_0 = "bridgeTransfer/serverA"; + public static final String SERVER_NAME_1 = "bridgeTransfer/serverB"; + private static final Logger logger = Logger.getLogger(BridgeTransferingTest.class); + private static final String JMX_SERVER_HOSTNAME = "localhost"; + private static final int JMX_SERVER_PORT = 11099; + + final String theprotocol; + // As the produces sends messages, a client will be killed every X messages. This is it! + final int killServerInterval; + final int numberOfMessages; + final int commitInterval; + final int messageSize; + final boolean killBothServers; + final int minlargeMessageSize; + Process serverProcess; + Process serverProcess2; + + public BridgeTransferingTest(String protocol, int commitInterval, int killServerInterval, int numberOfMessages, int messageSize, int minlargeMessageSize, boolean killBothServers) { + this.theprotocol = protocol; + this.killServerInterval = killServerInterval; + this.messageSize = messageSize; + this.commitInterval = commitInterval; + this.numberOfMessages = numberOfMessages; + this.killBothServers = killBothServers; + this.minlargeMessageSize = minlargeMessageSize; + } + + @Parameterized.Parameters(name = "protocol={0}, commitInterval={1}, killInterval={2}, numberOfMessages={3}, messageSize={4}, minLargeMessageSize={5}, KillBothServers={6}") + public static Collection parameters() { + return Arrays.asList(new Object[][]{{"CORE", 200, 1000, 10000, 15_000, 5000, true}, {"CORE", 200, 1000, 10000, 15_000, 5000, false}}); + } + + public static ConnectionFactory createConnectionFactory(String protocol, String uri) { + if (protocol.toUpperCase().equals("OPENWIRE")) { + return new org.apache.activemq.ActiveMQConnectionFactory(uri); + } else if (protocol.toUpperCase().equals("AMQP")) { + + if (uri.startsWith("tcp://")) { + // replacing tcp:// by amqp:// + uri = "amqp" + uri.substring(3); + } + return new JmsConnectionFactory(uri); + } else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) { + return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri); + } else { + throw new IllegalStateException("Unkown:" + protocol); + } + } + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + cleanupData(SERVER_NAME_1); + disableCheckThread(); + serverProcess = startServer(SERVER_NAME_0, 0, 30000); + serverProcess2 = startServer(SERVER_NAME_1, 1, 30000); + } + + @After + public void stopServers() throws Exception { + serverProcess2.destroyForcibly(); + serverProcess.destroyForcibly(); + } + + @Test + public void testTransfer() throws Exception { + ConnectionFactory cf = createConnectionFactory(theprotocol, "tcp://localhost:61616"); + ((ActiveMQConnectionFactory) cf).setMinLargeMessageSize(minlargeMessageSize); + + String body; + + { + StringBuffer buffer = new StringBuffer(); + while (buffer.length() < messageSize) { + buffer.append(" "); + } + body = buffer.toString(); + } + + { + Connection connection = cf.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("bridgeQueue"); + MessageProducer producer = session.createProducer(queue); + + int txElement = 0; + int killElement = 0; + + for (int i = 0; i < numberOfMessages; i++) { + producer.send(session.createTextMessage(body + " " + i)); + + if (++txElement == commitInterval) { + logger.debug("Sent " + (i + 1) + " messages"); + txElement = 0; + session.commit(); + } + + if (++killElement == killServerInterval) { + logger.debug("Killing server at " + (i + 1)); + killElement = 0; + if (killBothServers) { + serverProcess.destroyForcibly(); + Wait.assertFalse(serverProcess::isAlive); + } + serverProcess2.destroyForcibly(); + Wait.assertFalse(serverProcess2::isAlive); + serverProcess2 = startServer(SERVER_NAME_1, 1, 30000); + if (killBothServers) { + serverProcess = startServer(SERVER_NAME_0, 0, 30000); + } + if (killBothServers) { + connection.close(); + connection = cf.createConnection(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + queue = session.createQueue("bridgeQueue"); + producer = session.createProducer(queue); + } + } + } + + if (txElement > 0) { + session.commit(); + } + } + ConnectionFactory cf2 = createConnectionFactory(theprotocol, "tcp://localhost:61617"); + try (Connection connection = cf2.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("bridgeQueue"); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < numberOfMessages; i++) { + if (i % 100 == 0) { + logger.debug("consuming " + i); + } + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(body + " " + i, message.getText()); + } + + Assert.assertNull(consumer.receiveNoWait()); + } + + } + +}