ARTEMIS-3940 Address.maxSizeMessage is ignored on FAIL and Blocking Paging Policy

This commit is contained in:
Clebert Suconic 2022-08-15 12:42:37 -04:00 committed by clebertsuconic
parent 2fbf2fcef0
commit 520088b8c6
2 changed files with 340 additions and 7 deletions

View File

@ -160,7 +160,7 @@ public class PagingStoreImpl implements PagingStore {
this.storeName = storeName;
this.size = new SizeAwareMetric(maxSize, maxSize, -1, -1).
this.size = new SizeAwareMetric(maxSize, maxSize, maxMessages, maxMessages).
setUnderCallback(this::underSized).setOverCallback(this::overSized).
setOnSizeCallback(pagingManager::addSize);
@ -865,15 +865,15 @@ public class PagingStoreImpl implements PagingStore {
return false;
}
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || maxMessages != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
if (isFull()) {
if (runOnFailure && runWhenAvailable != null) {
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
}
return false;
}
} else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
if (pagingManager.isDiskFull() || maxSize > 0 && this.full || pagingManager.isGlobalFull()) {
} else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxMessages != -1 || maxSize != -1 || usingGlobalMaxSize)) {
if (pagingManager.isDiskFull() || this.full || pagingManager.isGlobalFull()) {
if (runWhenBlocking != null) {
runWhenBlocking.run();
}
@ -885,7 +885,7 @@ public class PagingStoreImpl implements PagingStore {
// has been added, but the check to execute was done before the element was added
// NOTE! We do not fix this race by locking the whole thing, doing this check provides
// MUCH better performance in a highly concurrent environment
if (!pagingManager.isGlobalFull() && (!full || maxSize < 0)) {
if (!pagingManager.isGlobalFull() && !full) {
// run it now
atomicRunWhenAvailable.run();
} else {
@ -942,7 +942,7 @@ public class PagingStoreImpl implements PagingStore {
@Override
public boolean checkReleasedMemory() {
if (!blockedViaAddressControl && !pagingManager.isGlobalFull() && (!full || maxSize < 0)) {
if (!blockedViaAddressControl && !pagingManager.isGlobalFull() && !full) {
if (!onMemoryFreedRunnables.isEmpty()) {
executor.execute(this::memoryReleased);
if (blocking) {
@ -1308,7 +1308,7 @@ public class PagingStoreImpl implements PagingStore {
// To be used on isDropMessagesWhenFull
@Override
public boolean isFull() {
return maxSize > 0 && getAddressSize() >= maxSize || pagingManager.isGlobalFull();
return full || pagingManager.isGlobalFull();
}

View File

@ -16,7 +16,21 @@
*/
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@ -28,16 +42,22 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
public class MaxMessagesPagingTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(MaxMessagesPagingTest.class);
protected static final int PAGE_MAX = 100 * 1024;
protected static final int PAGE_SIZE = 10 * 1024;
protected ActiveMQServer server;
@ -301,4 +321,317 @@ public class MaxMessagesPagingTest extends ActiveMQTestBase {
}
}
@Test
public void testFailMaxMessage() throws Exception {
internalFailMaxMessge(false);
}
@Test
public void testFailMaxMessageGlobal() throws Exception {
internalFailMaxMessge(true);
}
private void internalFailMaxMessge(boolean global) throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultConfig(true);
if (global) {
config.setGlobalMaxMessages(10);
}
server = createServer(true, config, 1024, 5 * 1024, new HashMap<>());
server.start();
internalFailMaxMessages("CORE", server, global);
internalFailMaxMessages("AMQP", server, global);
internalFailMaxMessages("OPENWIRE", server, global);
}
private void internalFailMaxMessages(String protocol, ActiveMQServer server, boolean global) throws Exception {
final String ADDRESS = "FAIL_MAX_MESSAGES_" + protocol;
final int MESSAGE_COUNT = 10;
AddressSettings set = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
if (global) {
set.setMaxSizeBytes(-1).setMaxSizeMessages(-1);
} else {
set.setMaxSizeBytes(-1).setMaxSizeMessages(MESSAGE_COUNT);
}
server.getAddressSettingsRepository().addMatch(ADDRESS, set);
server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
Connection conn = factory.createConnection();
runAfter(conn::close); // not using closeable because OPENWIRE might not support it depending on the version
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Queue queue = server.locateQueue(ADDRESS);
AssertionLoggerHandler.startCapture();
runAfter(() -> AssertionLoggerHandler.stopCapture());
for (int repeat = 0; repeat < 5; repeat++) {
boolean durable = repeat % 2 == 0;
MessageProducer producer = session.createProducer(session.createQueue(ADDRESS));
// Mixing persistent and non persistent just to challenge counters a bit more
// in case there's a different counter for persistent and non persistent on the server's impl
producer.setDeliveryMode(durable ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < MESSAGE_COUNT; i++) {
producer.send(session.createTextMessage("OK"));
}
Wait.assertEquals(MESSAGE_COUNT, queue::getMessageCount);
AssertionLoggerHandler.clear();
try {
producer.send(session.createTextMessage("should fail"));
if (durable) {
Assert.fail(("supposed to fail"));
} else {
// in case of async send, the exception will not propagate to the client, and we should still check the logger on that case
Wait.assertTrue(() -> AssertionLoggerHandler.findText("is full")); // my intention was to assert for "AMQ229102" howerver openwire is not using the code here
}
} catch (Exception expected) {
}
MessageConsumer consumer = session.createConsumer(session.createQueue(ADDRESS));
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("OK", message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
consumer.close();
producer.close();
}
conn.close();
}
@Test
public void testBlockMaxMessage() throws Exception {
internalBlockMaxMessge(false);
}
@Test
public void testBlockMaxMessageGlobal() throws Exception {
internalBlockMaxMessge(true);
}
private void internalBlockMaxMessge(boolean global) throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultConfig(true);
if (global) {
config.setGlobalMaxMessages(10);
}
server = createServer(true, config, 1024, 5 * 1024, new HashMap<>());
server.start();
internalBlockMaxMessages("AMQP", "CORE", server, global);
internalBlockMaxMessages("AMQP", "OPENWIRE", server, global);
internalBlockMaxMessages("AMQP", "AMQP", server, global);
internalBlockMaxMessages("CORE", "CORE", server, global);
internalBlockMaxMessages("CORE", "AMQP", server, global);
internalBlockMaxMessages("CORE", "OPENWIRE", server, global);
internalBlockMaxMessages("OPENWIRE", "OPENWIRE", server, global);
internalBlockMaxMessages("OPENWIRE", "AMQP", server, global);
internalBlockMaxMessages("OPENWIRE", "CORE", server, global);
}
private void internalBlockMaxMessages(String protocolSend, String protocolReceive, ActiveMQServer server, boolean global) throws Exception {
final int MESSAGES = 1000;
logger.info("\n********************************************************************************\nSending " + protocolSend + ", Receiving " + protocolReceive +
"\n********************************************************************************");
final String ADDRESS = "FAIL_MAX_MESSAGES_" + protocolSend + "_" + protocolReceive;
AddressSettings set = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
if (global) {
set.setMaxSizeBytes(-1).setMaxSizeMessages(-1);
} else {
set.setMaxSizeBytes(-1).setMaxSizeMessages(10);
}
server.getAddressSettingsRepository().addMatch(ADDRESS, set);
server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
ConnectionFactory factorySend = CFUtil.createConnectionFactory(protocolSend, "tcp://localhost:61616");
Connection connSend = factorySend.createConnection();
ConnectionFactory factoryReceive = CFUtil.createConnectionFactory(protocolReceive, "tcp://localhost:61616");
Connection connReceive = factoryReceive.createConnection();
connReceive.start();
runAfter(connSend::close); // not using closeable because OPENWIRE might not support it depending on the version
runAfter(connReceive::close); // not using closeable because OPENWIRE might not support it depending on the version
AssertionLoggerHandler.startCapture();
runAfter(() -> AssertionLoggerHandler.stopCapture());
ExecutorService executorService = Executors.newSingleThreadExecutor();
runAfter(executorService::shutdownNow);
CountDownLatch done = new CountDownLatch(1);
executorService.execute(() -> {
try {
Session session = connSend.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(ADDRESS));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < MESSAGES; i++) {
producer.send(session.createTextMessage("OK!" + i));
}
session.close();
} catch (Exception e) {
e.printStackTrace();
}
done.countDown();
});
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ222183"), 5000, 10); //unblock
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ221046")); // should not been unblocked
AssertionLoggerHandler.clear();
Assert.assertFalse(done.await(100, TimeUnit.MILLISECONDS));
Session sessionReceive = connReceive.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sessionReceive.createConsumer(sessionReceive.createQueue(ADDRESS));
for (int i = 0; i < MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("OK!" + i, message.getText());
}
sessionReceive.close();
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ221046"), 5000, 10); // unblock
}
@Test
public void testDropMaxMessage() throws Exception {
internalDropMaxMessge(false);
}
@Test
public void testDropMaxMessageGlobal() throws Exception {
internalDropMaxMessge(true);
}
private void internalDropMaxMessge(boolean global) throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultConfig(true);
if (global) {
config.setGlobalMaxMessages(10);
}
server = createServer(true, config, 1024, 5 * 1024, new HashMap<>());
server.start();
internalDropMaxMessages("AMQP", "CORE", server, global);
internalDropMaxMessages("AMQP", "OPENWIRE", server, global);
internalDropMaxMessages("AMQP", "AMQP", server, global);
internalDropMaxMessages("CORE", "CORE", server, global);
internalDropMaxMessages("CORE", "AMQP", server, global);
internalDropMaxMessages("CORE", "OPENWIRE", server, global);
internalDropMaxMessages("OPENWIRE", "OPENWIRE", server, global);
internalDropMaxMessages("OPENWIRE", "AMQP", server, global);
internalDropMaxMessages("OPENWIRE", "CORE", server, global);
}
private void internalDropMaxMessages(String protocolSend, String protocolReceive, ActiveMQServer server, boolean global) throws Exception {
final int MESSAGES = 20;
logger.info("\n********************************************************************************\nSending " + protocolSend + ", Receiving " + protocolReceive +
"\n********************************************************************************");
final String ADDRESS = "FAIL_MAX_MESSAGES_" + protocolSend + "_" + protocolReceive;
AddressSettings set = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
if (global) {
set.setMaxSizeBytes(-1).setMaxSizeMessages(-1);
} else {
set.setMaxSizeBytes(-1).setMaxSizeMessages(10);
}
server.getAddressSettingsRepository().addMatch(ADDRESS, set);
server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
ConnectionFactory factorySend = CFUtil.createConnectionFactory(protocolSend, "tcp://localhost:61616");
Connection connSend = factorySend.createConnection();
ConnectionFactory factoryReceive = CFUtil.createConnectionFactory(protocolReceive, "tcp://localhost:61616");
Connection connReceive = factoryReceive.createConnection();
connReceive.start();
runAfter(connSend::close); // not using closeable because OPENWIRE might not support it depending on the version
runAfter(connReceive::close); // not using closeable because OPENWIRE might not support it depending on the version
AssertionLoggerHandler.startCapture();
runAfter(() -> AssertionLoggerHandler.stopCapture());
for (int repeat = 0; repeat < 5; repeat++) {
AssertionLoggerHandler.clear();
{
Session session = connSend.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(ADDRESS));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < MESSAGES; i++) {
producer.send(session.createTextMessage("OK!" + i));
}
session.close();
}
if (repeat == 0) {
// the server will only log it on the first repeat as expected
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ222039")); // dropped messages
}
{
Session sessionReceive = connReceive.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sessionReceive.createConsumer(sessionReceive.createQueue(ADDRESS));
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
}
Assert.assertNull(consumer.receiveNoWait());
sessionReceive.close();
}
}
}
}