diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 6bae2a9a03..ecc48e33d6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -160,7 +160,7 @@ public class PagingStoreImpl implements PagingStore { this.storeName = storeName; - this.size = new SizeAwareMetric(maxSize, maxSize, -1, -1). + this.size = new SizeAwareMetric(maxSize, maxSize, maxMessages, maxMessages). setUnderCallback(this::underSized).setOverCallback(this::overSized). setOnSizeCallback(pagingManager::addSize); @@ -865,15 +865,15 @@ public class PagingStoreImpl implements PagingStore { return false; } - if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) { + if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || maxMessages != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) { if (isFull()) { if (runOnFailure && runWhenAvailable != null) { onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable)); } return false; } - } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) { - if (pagingManager.isDiskFull() || maxSize > 0 && this.full || pagingManager.isGlobalFull()) { + } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxMessages != -1 || maxSize != -1 || usingGlobalMaxSize)) { + if (pagingManager.isDiskFull() || this.full || pagingManager.isGlobalFull()) { if (runWhenBlocking != null) { runWhenBlocking.run(); } @@ -885,7 +885,7 @@ public class PagingStoreImpl implements PagingStore { // has been added, but the check to execute was done before the element was added // NOTE! We do not fix this race by locking the whole thing, doing this check provides // MUCH better performance in a highly concurrent environment - if (!pagingManager.isGlobalFull() && (!full || maxSize < 0)) { + if (!pagingManager.isGlobalFull() && !full) { // run it now atomicRunWhenAvailable.run(); } else { @@ -942,7 +942,7 @@ public class PagingStoreImpl implements PagingStore { @Override public boolean checkReleasedMemory() { - if (!blockedViaAddressControl && !pagingManager.isGlobalFull() && (!full || maxSize < 0)) { + if (!blockedViaAddressControl && !pagingManager.isGlobalFull() && !full) { if (!onMemoryFreedRunnables.isEmpty()) { executor.execute(this::memoryReleased); if (blocking) { @@ -1308,7 +1308,7 @@ public class PagingStoreImpl implements PagingStore { // To be used on isDropMessagesWhenFull @Override public boolean isFull() { - return maxSize > 0 && getAddressSize() >= maxSize || pagingManager.isGlobalFull(); + return full || pagingManager.isGlobalFull(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java index 25bbffc79b..816bfc0876 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java @@ -16,7 +16,21 @@ */ package org.apache.activemq.artemis.tests.integration.paging; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -28,16 +42,22 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.SizeAwareMetric; +import org.jboss.logging.Logger; import org.junit.Assert; import org.junit.Test; public class MaxMessagesPagingTest extends ActiveMQTestBase { + private static final Logger logger = Logger.getLogger(MaxMessagesPagingTest.class); + protected static final int PAGE_MAX = 100 * 1024; protected static final int PAGE_SIZE = 10 * 1024; protected ActiveMQServer server; @@ -301,4 +321,317 @@ public class MaxMessagesPagingTest extends ActiveMQTestBase { } } + @Test + public void testFailMaxMessage() throws Exception { + internalFailMaxMessge(false); + } + + @Test + public void testFailMaxMessageGlobal() throws Exception { + internalFailMaxMessge(true); + } + + private void internalFailMaxMessge(boolean global) throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultConfig(true); + + if (global) { + config.setGlobalMaxMessages(10); + } + + server = createServer(true, config, 1024, 5 * 1024, new HashMap<>()); + + server.start(); + + internalFailMaxMessages("CORE", server, global); + internalFailMaxMessages("AMQP", server, global); + internalFailMaxMessages("OPENWIRE", server, global); + } + + private void internalFailMaxMessages(String protocol, ActiveMQServer server, boolean global) throws Exception { + + final String ADDRESS = "FAIL_MAX_MESSAGES_" + protocol; + final int MESSAGE_COUNT = 10; + + AddressSettings set = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); + if (global) { + set.setMaxSizeBytes(-1).setMaxSizeMessages(-1); + } else { + set.setMaxSizeBytes(-1).setMaxSizeMessages(MESSAGE_COUNT); + } + + server.getAddressSettingsRepository().addMatch(ADDRESS, set); + + + server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST)); + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + + Connection conn = factory.createConnection(); + + runAfter(conn::close); // not using closeable because OPENWIRE might not support it depending on the version + + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + + Queue queue = server.locateQueue(ADDRESS); + + AssertionLoggerHandler.startCapture(); + runAfter(() -> AssertionLoggerHandler.stopCapture()); + + for (int repeat = 0; repeat < 5; repeat++) { + boolean durable = repeat % 2 == 0; + + MessageProducer producer = session.createProducer(session.createQueue(ADDRESS)); + + // Mixing persistent and non persistent just to challenge counters a bit more + // in case there's a different counter for persistent and non persistent on the server's impl + producer.setDeliveryMode(durable ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + for (int i = 0; i < MESSAGE_COUNT; i++) { + producer.send(session.createTextMessage("OK")); + } + + Wait.assertEquals(MESSAGE_COUNT, queue::getMessageCount); + + AssertionLoggerHandler.clear(); + + try { + producer.send(session.createTextMessage("should fail")); + if (durable) { + Assert.fail(("supposed to fail")); + } else { + // in case of async send, the exception will not propagate to the client, and we should still check the logger on that case + Wait.assertTrue(() -> AssertionLoggerHandler.findText("is full")); // my intention was to assert for "AMQ229102" howerver openwire is not using the code here + } + } catch (Exception expected) { + } + + MessageConsumer consumer = session.createConsumer(session.createQueue(ADDRESS)); + + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals("OK", message.getText()); + } + + Assert.assertNull(consumer.receiveNoWait()); + + consumer.close(); + producer.close(); + } + + conn.close(); + } + + + @Test + public void testBlockMaxMessage() throws Exception { + internalBlockMaxMessge(false); + } + + @Test + public void testBlockMaxMessageGlobal() throws Exception { + internalBlockMaxMessge(true); + } + + private void internalBlockMaxMessge(boolean global) throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultConfig(true); + + if (global) { + config.setGlobalMaxMessages(10); + } + + server = createServer(true, config, 1024, 5 * 1024, new HashMap<>()); + + server.start(); + + internalBlockMaxMessages("AMQP", "CORE", server, global); + internalBlockMaxMessages("AMQP", "OPENWIRE", server, global); + internalBlockMaxMessages("AMQP", "AMQP", server, global); + internalBlockMaxMessages("CORE", "CORE", server, global); + internalBlockMaxMessages("CORE", "AMQP", server, global); + internalBlockMaxMessages("CORE", "OPENWIRE", server, global); + internalBlockMaxMessages("OPENWIRE", "OPENWIRE", server, global); + internalBlockMaxMessages("OPENWIRE", "AMQP", server, global); + internalBlockMaxMessages("OPENWIRE", "CORE", server, global); + } + + private void internalBlockMaxMessages(String protocolSend, String protocolReceive, ActiveMQServer server, boolean global) throws Exception { + + final int MESSAGES = 1000; + + logger.info("\n********************************************************************************\nSending " + protocolSend + ", Receiving " + protocolReceive + + "\n********************************************************************************"); + + final String ADDRESS = "FAIL_MAX_MESSAGES_" + protocolSend + "_" + protocolReceive; + + AddressSettings set = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + if (global) { + set.setMaxSizeBytes(-1).setMaxSizeMessages(-1); + } else { + set.setMaxSizeBytes(-1).setMaxSizeMessages(10); + } + + server.getAddressSettingsRepository().addMatch(ADDRESS, set); + + server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST)); + + ConnectionFactory factorySend = CFUtil.createConnectionFactory(protocolSend, "tcp://localhost:61616"); + Connection connSend = factorySend.createConnection(); + + ConnectionFactory factoryReceive = CFUtil.createConnectionFactory(protocolReceive, "tcp://localhost:61616"); + Connection connReceive = factoryReceive.createConnection(); + connReceive.start(); + + runAfter(connSend::close); // not using closeable because OPENWIRE might not support it depending on the version + runAfter(connReceive::close); // not using closeable because OPENWIRE might not support it depending on the version + + AssertionLoggerHandler.startCapture(); + runAfter(() -> AssertionLoggerHandler.stopCapture()); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + runAfter(executorService::shutdownNow); + + CountDownLatch done = new CountDownLatch(1); + executorService.execute(() -> { + try { + Session session = connSend.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(ADDRESS)); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < MESSAGES; i++) { + producer.send(session.createTextMessage("OK!" + i)); + } + session.close(); + } catch (Exception e) { + e.printStackTrace(); + } + + done.countDown(); + }); + + Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ222183"), 5000, 10); //unblock + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ221046")); // should not been unblocked + + AssertionLoggerHandler.clear(); + + Assert.assertFalse(done.await(100, TimeUnit.MILLISECONDS)); + + Session sessionReceive = connReceive.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = sessionReceive.createConsumer(sessionReceive.createQueue(ADDRESS)); + for (int i = 0; i < MESSAGES; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("OK!" + i, message.getText()); + } + sessionReceive.close(); + + Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ221046"), 5000, 10); // unblock + } + + + + @Test + public void testDropMaxMessage() throws Exception { + internalDropMaxMessge(false); + } + + @Test + public void testDropMaxMessageGlobal() throws Exception { + internalDropMaxMessge(true); + } + + private void internalDropMaxMessge(boolean global) throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultConfig(true); + + if (global) { + config.setGlobalMaxMessages(10); + } + + server = createServer(true, config, 1024, 5 * 1024, new HashMap<>()); + + server.start(); + + internalDropMaxMessages("AMQP", "CORE", server, global); + internalDropMaxMessages("AMQP", "OPENWIRE", server, global); + internalDropMaxMessages("AMQP", "AMQP", server, global); + internalDropMaxMessages("CORE", "CORE", server, global); + internalDropMaxMessages("CORE", "AMQP", server, global); + internalDropMaxMessages("CORE", "OPENWIRE", server, global); + internalDropMaxMessages("OPENWIRE", "OPENWIRE", server, global); + internalDropMaxMessages("OPENWIRE", "AMQP", server, global); + internalDropMaxMessages("OPENWIRE", "CORE", server, global); + } + + private void internalDropMaxMessages(String protocolSend, String protocolReceive, ActiveMQServer server, boolean global) throws Exception { + + final int MESSAGES = 20; + + logger.info("\n********************************************************************************\nSending " + protocolSend + ", Receiving " + protocolReceive + + "\n********************************************************************************"); + + final String ADDRESS = "FAIL_MAX_MESSAGES_" + protocolSend + "_" + protocolReceive; + + AddressSettings set = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP); + if (global) { + set.setMaxSizeBytes(-1).setMaxSizeMessages(-1); + } else { + set.setMaxSizeBytes(-1).setMaxSizeMessages(10); + } + + server.getAddressSettingsRepository().addMatch(ADDRESS, set); + + server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST)); + + ConnectionFactory factorySend = CFUtil.createConnectionFactory(protocolSend, "tcp://localhost:61616"); + Connection connSend = factorySend.createConnection(); + + ConnectionFactory factoryReceive = CFUtil.createConnectionFactory(protocolReceive, "tcp://localhost:61616"); + Connection connReceive = factoryReceive.createConnection(); + connReceive.start(); + + runAfter(connSend::close); // not using closeable because OPENWIRE might not support it depending on the version + runAfter(connReceive::close); // not using closeable because OPENWIRE might not support it depending on the version + + AssertionLoggerHandler.startCapture(); + runAfter(() -> AssertionLoggerHandler.stopCapture()); + + for (int repeat = 0; repeat < 5; repeat++) { + AssertionLoggerHandler.clear(); + { + Session session = connSend.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(ADDRESS)); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < MESSAGES; i++) { + producer.send(session.createTextMessage("OK!" + i)); + } + session.close(); + } + + if (repeat == 0) { + // the server will only log it on the first repeat as expected + Assert.assertTrue(AssertionLoggerHandler.findText("AMQ222039")); // dropped messages + } + + { + Session sessionReceive = connReceive.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = sessionReceive.createConsumer(sessionReceive.createQueue(ADDRESS)); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + } + Assert.assertNull(consumer.receiveNoWait()); + sessionReceive.close(); + } + } + + } + } \ No newline at end of file