ARTEMIS-4285 Limit number of redelivery records

This commit is contained in:
Clebert Suconic 2023-05-18 13:27:13 -04:00 committed by clebertsuconic
parent 47d6dfaad2
commit e719622de5
14 changed files with 257 additions and 2 deletions

View File

@ -31,6 +31,10 @@ under the License.
${jdbc}
<persistence-enabled>${persistence-enabled}</persistence-enabled>
<!-- It is recommended to keep this value as 1, maximizing the number of records stored about redeliveries.
However if you must preserve state of individual redeliveries, you may increase this value or set it to -1 (infinite). -->
<max-redelivery-records>1</max-redelivery-records>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files

View File

@ -248,6 +248,9 @@ public final class ActiveMQDefaultConfiguration {
// True means that the delivery count is persisted before delivery. False means that this only happens after a message has been cancelled.
private static boolean DEFAULT_PERSIST_DELIVERY_COUNT_BEFORE_DELIVERY = false;
// Default Maximum number of records we would store for redeliveries
private static int DEFAULT_MAX_REDELIVERY_RECORDS = 10;
// the directory to store paged messages in
private static String DEFAULT_PAGING_DIR = "data/paging";
@ -685,6 +688,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_PERSISTENCE_ENABLED;
}
public static int getDefaultMaxRedeliveryRecords() {
return DEFAULT_MAX_REDELIVERY_RECORDS;
}
public static boolean isDefaultJournalDatasync() {
return DEFAULT_JOURNAL_DATASYNC;
}

View File

@ -138,6 +138,13 @@ public interface Configuration {
*/
Configuration setPersistenceEnabled(boolean enable);
/**
* Maximum number of redelivery records stored on the journal per message reference.
*/
Configuration setMaxRedeliveryRecords(int maxPersistRedelivery);
int getMaxRedeliveryRecords();
/**
* Should use fdatasync on journal files.
*

View File

@ -148,6 +148,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled();
private int maxRedeliveryRecords = ActiveMQDefaultConfiguration.getDefaultMaxRedeliveryRecords();
private boolean journalDatasync = ActiveMQDefaultConfiguration.isDefaultJournalDatasync();
protected long fileDeploymentScanPeriod = ActiveMQDefaultConfiguration.getDefaultFileDeployerScanPeriod();
@ -928,6 +930,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public Configuration setMaxRedeliveryRecords(int max) {
maxRedeliveryRecords = max;
return this;
}
@Override
public int getMaxRedeliveryRecords() {
return maxRedeliveryRecords;
}
@Override
public boolean isJournalDatasync() {
return journalDatasync;

View File

@ -397,6 +397,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setPersistDeliveryCountBeforeDelivery(getBoolean(e, "persist-delivery-count-before-delivery", config.isPersistDeliveryCountBeforeDelivery()));
config.setMaxRedeliveryRecords(getInteger(e, "max-redelivery-records", config.getMaxRedeliveryRecords(), Validators.MINUS_ONE_OR_GE_ZERO));
config.setScheduledThreadPoolMaxSize(getInteger(e, "scheduled-thread-pool-max-size", config.getScheduledThreadPoolMaxSize(), Validators.GT_ZERO));
config.setThreadPoolMaxSize(getInteger(e, "thread-pool-max-size", config.getThreadPoolMaxSize(), Validators.MINUS_ONE_OR_GT_ZERO));

View File

@ -475,6 +475,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
if (config.getMaxRedeliveryRecords() >= 0 && ref.getDeliveryCount() > config.getMaxRedeliveryRecords()) {
return;
}
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID());
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, true, this::recordNotFoundCallback, getContext(syncNonTransactional));
@ -706,6 +709,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return;
}
if (config.getMaxRedeliveryRecords() >= 0 && ref.getDeliveryCount() > config.getMaxRedeliveryRecords()) {
return;
}
ref.setPersistedCount(ref.getDeliveryCount());
DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());

View File

@ -24,7 +24,7 @@ public class ScheduledDeliveryEncoding extends QueueEncoding {
@Override
public String toString() {
return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]";
return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + ", queueID=" + queueID + "]";
}
public ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID) {

View File

@ -412,6 +412,17 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="max-redelivery-records" type="xsd:long" default="10" maxOccurs="1"
minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The default for this value is 10, however the recommended set for this is 1.
The system will add a store update for every redelivery happening on the system.
It is recommended to keep max-redelivery-records=1 in situations where you are operating with very short redelivery delays as you will be creating unecessary records on the journal.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="populate-validated-user" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -535,6 +535,22 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
Assert.assertEquals(1000, configuration.getGlobalMaxMessages());
}
@Test
public void testConfigurationPersistRedelivery() throws Exception {
StringPrintStream stringPrintStream = new StringPrintStream();
PrintStream stream = stringPrintStream.newStream();
stream.println("<configuration><core>");
stream.println("<max-redelivery-records>0</max-redelivery-records>");
stream.println("</core></configuration>");
ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes());
FileConfigurationParser parser = new FileConfigurationParser();
Configuration configuration = parser.parseMainConfig(inputStream);
Assert.assertEquals(0, configuration.getMaxRedeliveryRecords());
}
@Test
public void testExceptionMaxSize() throws Exception {
StringPrintStream stringPrintStream = new StringPrintStream();

View File

@ -919,7 +919,7 @@ public abstract class ActiveMQTestBase extends Assert {
return "memory:" + getTestDir();
}
private String getTestJDBCConnectionUrl() {
protected final String getTestJDBCConnectionUrl() {
return System.getProperty("jdbc.connection.url", "jdbc:derby:" + getEmbeddedDataBaseName() + ";create=true");
}

View File

@ -77,6 +77,7 @@
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor2</class-name>
</remoting-outgoing-interceptors>
<persist-delivery-count-before-delivery>true</persist-delivery-count-before-delivery>
<max-redelivery-records>7</max-redelivery-records>
<connectors>
<connector name="connector1">
tcp://localhost1:5678?

View File

@ -210,6 +210,7 @@ name | node name; used in topology notifications if set. | n/a
[read-whole-page](paging.md) | If true the whole page would be read, otherwise just seek and read while getting message. | `false`
[paging-directory](paging.md#configuration)| the directory to store paged messages in. | `data/paging`
[persist-delivery-count-before-delivery](undelivered-messages.md#delivery-count-persistence) | True means that the delivery count is persisted before delivery. False means that this only happens after a message has been cancelled. | `false`
[max-redelivery-records](undelivered-messages.md#persist-redelivery) | Maximum number of records the system will store for redeliveries. In most cases this should be set to '1'. | `10`
[persistence-enabled](persistence.md#zero-persistence)| true means that the server will use the file based journal for persistence. | `true`
[persist-id-cache](duplicate-detection.md#configuring-the-duplicate-id-cache) | true means that ID's are persisted to the journal. | `true`
queues | **deprecated** [use addresses](#address-type) | n/a

View File

@ -31,6 +31,12 @@ fail or rollback. Without a delayed redelivery, the system can get into a
and delivery being re-attempted ad infinitum in quick succession,
consuming valuable CPU and network resources.
#Persist Redelivery
Two Journal update records are stored every time a redelivery happens. One for the number of deliveries that happened, and one in case a scheduled redelivery is being used.
It is recommended to keep max-redelivery-records=1 in situations where you are operating with very short redelivery delays as you will be creating unecessary records on the journal.
### Configuring Delayed Redelivery
Delayed redelivery is defined in the address-setting configuration:

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RescheduleJDBCDeliveryTest extends ActiveMQTestBase {
// Set this to true if you're debugging what happened in the journal
private final boolean PRINT_DATA = false;
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Test
public void testRescheduledRedeliveryCORE() throws Exception {
testRescheduledRedelivery("CORE", 0);
}
@Test
public void testRescheduledRedeliveryCORE_1() throws Exception {
testRescheduledRedelivery("CORE", 1);
}
@Test
public void testRescheduledRedeliveryAMQP_1() throws Exception {
testRescheduledRedelivery("AMQP", 1);
}
@Test
public void testRescheduledRedeliveryAMQP() throws Exception {
testRescheduledRedelivery("AMQP", 0);
}
@Test
public void testRescheduledRedeliveryCOREInfinite() throws Exception {
testRescheduledRedelivery("CORE", -1);
}
@Test
public void testRescheduledRedeliveryAMQPInfinite() throws Exception {
testRescheduledRedelivery("AMQP", -1);
}
private void testRescheduledRedelivery(String protocol, int maxRecords) throws Exception {
int maxRedeliveries = 100;
String testQueue = getName();
Configuration configuration = createDefaultJDBCConfig(true);
configuration.setMaxRedeliveryRecords(maxRecords);
configuration.addAddressSetting("#", new AddressSettings().setRedeliveryDelay(1).setMaxDeliveryAttempts(-1).setDeadLetterAddress(SimpleString.toSimpleString("DLQ")));
configuration.addAddressConfiguration(new CoreAddressConfiguration().setName("DLQ").addRoutingType(RoutingType.ANYCAST));
configuration.addQueueConfiguration(new QueueConfiguration("DLQ").setAddress("DLQ").setRoutingType(RoutingType.ANYCAST));
configuration.addAddressConfiguration(new CoreAddressConfiguration().setName(testQueue).addRoutingType(RoutingType.ANYCAST));
configuration.addQueueConfiguration(new QueueConfiguration(testQueue).setAddress(testQueue).setRoutingType(RoutingType.ANYCAST));
ActiveMQServer server = createServer(true, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
server.start();
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(testQueue);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello"));
session.commit();
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < maxRedeliveries; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
logger.debug("received {}", message);
Assert.assertNotNull(message);
session.rollback();
}
}
server.stop();
java.sql.Connection jdbcConnection = DriverManager.getConnection(getTestJDBCConnectionUrl());
runAfter(jdbcConnection::close);
int records = executeQuery(jdbcConnection, "SELECT * FROM MESSAGE WHERE USERRECORDTYPE=36 OR USERRECORDTYPE=34");
// manually set this value to true if you need to understand what's in the journal
if (PRINT_DATA) {
PrintData printData = new PrintData();
printData.printDataJDBC(configuration, System.out);
}
if (maxRecords < 0) {
Assert.assertEquals(maxRedeliveries * 2, records);
} else {
Assert.assertEquals(maxRecords * 2, records);
}
server.start();
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(testQueue);
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = (TextMessage) consumer.receive(5000);
logger.debug("received {}", message);
Assert.assertNotNull(message);
session.commit();
Assert.assertNull(consumer.receiveNoWait());
}
}
protected int executeQuery(java.sql.Connection connection, String sql) throws Exception {
PreparedStatement statement = connection.prepareStatement(sql);
ResultSet result = statement.executeQuery();
ResultSetMetaData metaData = result.getMetaData();
int columnCount = metaData.getColumnCount();
int records = 0;
while (result.next()) {
if (logger.isDebugEnabled()) {
StringBuffer line = new StringBuffer();
for (int i = 1; i <= columnCount; i++) {
Object value = result.getObject(i);
line.append(metaData.getColumnLabel(i) + " = " + value);
if (i + 1 <= columnCount)
line.append(", ");
}
logger.info(line.toString());
}
records++;
}
return records;
}
}