diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 85ad922225..7cd225fa9b 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -2118,6 +2118,20 @@ public abstract class ActiveMQTestBase extends Assert { return message; } + protected ClientMessage createTextMessage(final ClientSession session, final boolean durable, final int numChars) { + ClientMessage message = session.createMessage(Message.TEXT_TYPE, + durable, + 0, + System.currentTimeMillis(), + (byte)4); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < numChars; i++) { + builder.append('a'); + } + message.getBodyBuffer().writeString(builder.toString()); + return message; + } + protected XidImpl newXID() { return new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java index 36f22eb819..3798b926c3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java @@ -16,11 +16,22 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.junit.Before; import org.junit.Test; +import java.util.Map; + public class TwoWayTwoNodeClusterTest extends ClusterTestBase { private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; @@ -48,6 +59,116 @@ public class TwoWayTwoNodeClusterTest extends ClusterTestBase { return false; } + /* + * This test starts 2 servers and send messages to + * a queue until it enters into paging state. Then + * it changes the max-size to -1, restarts the 2 servers + * and consumes all the messages. If verifies that + * even if the max-size has changed all the paged + * messages will be depaged and consumed. No stuck + * messages after restarting. + */ + @Test(timeout = 60000) + public void testClusterRestartWithConfigChanged() throws Exception { + Configuration config0 = servers[0].getConfiguration(); + Configuration config1 = servers[1].getConfiguration(); + + configureBeforeStart(config0, config1); + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + createQueue(0, "queues", "queue0", null, true); + createQueue(1, "queues", "queue0", null, true); + + waitForBindings(0, "queues", 1, 0, true); + waitForBindings(1, "queues", 1, 0, true); + + waitForBindings(0, "queues", 1, 0, false); + waitForBindings(1, "queues", 1, 0, false); + + ClientSessionFactory sf0 = sfs[0]; + ClientSession session0 = sf0.createSession(false, false); + ClientProducer producer = session0.createProducer("queues"); + final int numSent = 200; + for (int i = 0; i < numSent; i++) { + ClientMessage msg = createTextMessage(session0, true, 5000); + producer.send(msg); + if (i % 50 == 0) { + session0.commit(); + } + } + session0.commit(); + session0.close(); + + while (true) { + long msgCount0 = getMessageCount(servers[0], "queues"); + long msgCount1 = getMessageCount(servers[1], "queues"); + + if (msgCount0 + msgCount1 >= numSent) { + break; + } + Thread.sleep(100); + } + + Queue queue0 = servers[0].locateQueue(new SimpleString("queue0")); + assertTrue(queue0.getPageSubscription().isPaging()); + + closeAllSessionFactories(); + stopServers(0, 1); + + AddressSettings addressSettings0 = config0.getAddressesSettings().get("#"); + AddressSettings addressSettings1 = config1.getAddressesSettings().get("#"); + + addressSettings0.setMaxSizeBytes(-1); + addressSettings1.setMaxSizeBytes(-1); + + startServers(0, 1); + + waitForBindings(0, "queues", 1, 0, true); + waitForBindings(1, "queues", 1, 0, true); + + waitForBindings(0, "queues", 1, 0, false); + waitForBindings(1, "queues", 1, 0, false); + + setupSessionFactory(0, isNetty()); + addConsumer(0, 0, "queue0", null); + + waitForBindings(0, "queues", 1, 1, true); + + for (int i = 0; i < numSent; i++) { + ClientMessage m = consumers[0].consumer.receive(5000); + assertNotNull("failed to receive message " + i, m); + } + } + + private void configureBeforeStart(Configuration... serverConfigs) { + for (Configuration config : serverConfigs) { + config.setPersistenceEnabled(true); + config.setMessageCounterEnabled(true); + config.setJournalFileSize(20971520); + config.setJournalMinFiles(20); + config.setJournalCompactPercentage(50); + + Map addressSettingsMap0 = config.getAddressesSettings(); + AddressSettings addrSettings = addressSettingsMap0.get("#"); + if (addrSettings == null) { + addrSettings = new AddressSettings(); + addressSettingsMap0.put("#", addrSettings); + } + addrSettings.setDeadLetterAddress(new SimpleString("jms.queue.DLQ")); + addrSettings.setExpiryAddress(new SimpleString("jms.queue.ExpiryQueue")); + addrSettings.setRedeliveryDelay(30); + addrSettings.setMaxDeliveryAttempts(5); + addrSettings.setMaxSizeBytes(1048576); + addrSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + addrSettings.setPageSizeBytes(524288); + addrSettings.setMessageCounterHistoryDayLimit(10); + addrSettings.setRedistributionDelay(1000); + } + } + @Test public void testStartStop() throws Exception {