ARTEMIS-4694 Redistribution issues with Almost Large Header

Redistribution would add data to the record which would then in turn make the record too large to redistribute.

The Redistributor and Bridges should not be removed.

Also a warning should be added to warn users about the situation.
This commit is contained in:
Clebert Suconic 2024-03-20 10:09:26 -04:00 committed by clebertsuconic
parent 774d321012
commit 04f6424928
13 changed files with 366 additions and 22 deletions

View File

@ -154,6 +154,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
return sqlProvider.getMaxBlobSize();
}
@Override
public long getWarningRecordSize() {
return sqlProvider.getMaxBlobSize() - 2048;
}
@Override
protected void createSchema() throws SQLException {
createTable(sqlProvider.getCreateJournalTableSQL());

View File

@ -388,4 +388,6 @@ public interface Journal extends ActiveMQComponent {
* @return
*/
long getMaxRecordSize();
long getWarningRecordSize();
}

View File

@ -131,6 +131,10 @@ public final class FileWrapperJournal extends JournalBase {
return journal.getMaxRecordSize();
}
@Override
public long getWarningRecordSize() {
return journal.getWarningRecordSize();
}
/**
* Write the record to the current file.
*/

View File

@ -54,6 +54,7 @@ import io.netty.util.collection.ByteObjectHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.DummyCallback;
@ -923,14 +924,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
logger.trace("scheduling appendAddRecord::id={}, userRecordType={}, record = {}", id, recordType, record);
}
final long maxRecordSize = getMaxRecordSize();
final JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record);
final int addRecordEncodeSize = addRecord.getEncodeSize();
if (addRecordEncodeSize > maxRecordSize) {
//The record size should be larger than max record size only on the large messages case.
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
}
checkRecordSize(addRecordEncodeSize, record);
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@ -977,14 +974,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
logger.trace("scheduling appendAddEvent::id={}, userRecordType={}, record = {}", id, recordType, record);
}
final long maxRecordSize = getMaxRecordSize();
final JournalInternalRecord addRecord = new JournalAddRecord(JournalImpl.EVENT_RECORD, id, recordType, persister, record);
final int addRecordEncodeSize = addRecord.getEncodeSize();
if (addRecordEncodeSize > maxRecordSize) {
//The record size should be larger than max record size only on the large messages case.
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
}
checkRecordSize(addRecord.getEncodeSize(), record);
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(() -> {
@ -1012,6 +1004,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
result.get();
}
private void checkRecordSize(int addRecordEncodeSize, Object record) throws ActiveMQIOErrorException {
if (addRecordEncodeSize > getWarningRecordSize()) {
long maxRecordSize = getMaxRecordSize();
ActiveMQJournalLogger.LOGGER.largeHeaderWarning(addRecordEncodeSize, maxRecordSize, record);
if (addRecordEncodeSize > maxRecordSize) {
//The record size should be larger than max record size only on the large messages case.
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
}
}
}
@Override
public void appendUpdateRecord(final long id,
final byte recordType,
@ -1271,10 +1275,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
int encodeSize = addRecord.getEncodeSize();
if (encodeSize > getMaxRecordSize()) {
//The record size should be larger than max record size only on the large messages case.
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(encodeSize, getMaxRecordSize());
}
checkRecordSize(encodeSize, record);
appendExecutor.execute(new Runnable() {
@ -2749,6 +2750,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
@Override
public long getWarningRecordSize() {
return getMaxRecordSize() - 2048;
}
private void flushExecutor(Executor executor) throws InterruptedException {
if (executor != null) {

View File

@ -203,4 +203,10 @@ public interface ActiveMQJournalLogger {
// same as criticalIO but with the FileName associated (if there's a file available)
@LogMessage(id = 144011, value = "Critical IO Exception happened: {} on {}", level = LogMessage.Level.WARN)
void criticalIOFile(String message, String fileName, Throwable error);
@LogMessage(id = 144012, value = "Journal Record sized at {}, which is too close to the max record Size at {}. Record = {}. Internal broker operations such as redistribution and DLQ may be compromised. Move large headers into the body of messages.", level = LogMessage.Level.WARN)
void largeHeaderWarning(long recordSize, long maxRecordSize, Object originalData);
}

View File

@ -83,6 +83,11 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
return Long.MAX_VALUE;
}
default long getWarningRecordSize() {
/** Null journal is pretty much memory */
return Long.MAX_VALUE;
}
default void recoverLargeMessagesOnFolder(Set<Long> files) throws Exception {
}

View File

@ -276,12 +276,16 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, this);
}
@Override
public long getMaxRecordSize() {
return messageJournal.getMaxRecordSize();
}
@Override
public long getWarningRecordSize() {
return messageJournal.getWarningRecordSize();
}
/**
* Called during initialization. Used by implementations to setup Journals, Stores etc...

View File

@ -539,6 +539,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
ActiveMQServerLogger.LOGGER.messageWithHeaderTooLarge(largeMessage.getMessageID(), logger.getName());
logger.debug("Message header too large for {}", largeMessage);
new Exception("Trace").printStackTrace();
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize, maxRecordSize);
}

View File

@ -659,4 +659,9 @@ public class ReplicatedJournal implements Journal {
public long getMaxRecordSize() {
return localJournal.getMaxRecordSize();
}
@Override
public long getWarningRecordSize() {
return localJournal.getWarningRecordSize();
}
}

View File

@ -1201,7 +1201,7 @@ public interface ActiveMQServerLogger {
void failedToDealWithObjectProperty(SimpleString property, String exceptionMessage);
@LogMessage(id = 222303, value = "Redistribution by {} of messageID = {} failed", level = LogMessage.Level.WARN)
void errorRedistributing(String queueName, long m, Throwable t);
void errorRedistributing(String queueName, String m, Throwable t);
@LogMessage(id = 222304, value = "Unable to load message from journal", level = LogMessage.Level.WARN)
void unableToLoadMessageFromJournal(Throwable t);

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -127,14 +128,22 @@ public class Redistributor implements Consumer {
RoutingContext context = routingInfo.getA();
Message message = routingInfo.getB();
postOffice.processRoute(message, context, false);
try {
postOffice.processRoute(message, context, false);
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(reference.getMessage(), "redistributing");
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(reference.getMessage(), "redistributing");
}
ackRedistribution(reference, context.getTransaction());
} catch (Throwable e) {
if (context.getTransaction() != null) {
context.getTransaction().setAsync(true).rollback();
}
ActiveMQServerLogger.LOGGER.errorRedistributing(String.valueOf(this.queue.getName()), String.valueOf(message), e);
return HandleStatus.NO_MATCH;
}
ackRedistribution(reference, context.getTransaction());
return HandleStatus.HANDLED;
}

View File

@ -0,0 +1,292 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(value = Parameterized.class)
public class LargeHeadersClusterTest extends ClusterTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final SimpleString queueName = SimpleString.toSimpleString("queues.0");
// I'm taking any number that /2 = Odd
// to avoid perfect roundings and making sure messages are evenly distributed
private static final int NUMBER_OF_MESSAGES = 77 * 2;
@Parameterized.Parameters(name = "protocol={0}")
public static Collection getParameters() {
return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}, {"OPENWIRE"}});
}
@Parameterized.Parameter(0)
public String protocol;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
private void startServers(MessageLoadBalancingType loadBalancingType) throws Exception {
setupServers();
setRedistributionDelay(0);
setupCluster(loadBalancingType);
AddressSettings as = new AddressSettings().setRedistributionDelay(0).setExpiryAddress(SimpleString.toSimpleString("queues.expiry"));
getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
startServers(0);
startServers(1);
createQueue(SimpleString.toSimpleString("queues.expiry"));
createQueue(queueName);
}
private void createQueue(SimpleString queueName) throws Exception {
QueueConfiguration queueConfiguration = new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST);
servers[0].createQueue(queueConfiguration);
servers[1].createQueue(queueConfiguration);
}
protected boolean isNetty() {
return true;
}
private ConnectionFactory getJmsConnectionFactory(int node) {
if (protocol.equals("AMQP")) {
return new JmsConnectionFactory("amqp://localhost:" + (61616 + node));
} else if (protocol.equals("OPENWIRE")) {
return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node));
} else if (protocol.equals("CORE")) {
return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node));
} else {
Assert.fail("Protocol " + protocol + " unknown");
return null;
}
}
@Test
public void testGrowingHeaders() throws Exception {
startServers(MessageLoadBalancingType.ON_DEMAND);
ConnectionFactory cf0 = getJmsConnectionFactory(0);
ConnectionFactory cf1 = getJmsConnectionFactory(1);
try (Connection cn = cf0.createConnection()) {
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
StringBuffer bufferString = new StringBuffer();
for (int i = 0; i < 9_500; i++) {
bufferString.append("-");
}
int i = 0;
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
try {
for (i = 0; i < 1_000; i++) {
if (i % 100 == 0) {
logger.info("Sent {} messages", i);
}
TextMessage message = sn.createTextMessage("hello " + i);
message.setStringProperty("large", bufferString.toString());
message.setBooleanProperty("newSender", false);
// we need to send two, one for each server to exercise the load balancing
pd.send(message);
pd.send(message);
bufferString.append("-"); // growing the header
}
} catch (Throwable e) {
logger.warn("error at {}", i, e);
}
if (!protocol.equals("AMQP")) {
Assert.assertTrue(loggerHandler.findText("AMQ144012"));
}
}
}
try (Connection connection1 = cf1.createConnection()) {
Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue("queues.0"));
connection1.start();
receiveAllMessages(consumer, 1, m -> logger.debug("received {}", m));
}
try (Connection cn = cf0.createConnection()) {
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
try {
for (int i = 0; i < 1_000; i++) {
if (i % 100 == 0) {
logger.info("Sent {} messages", i);
}
TextMessage message = sn.createTextMessage("newSender " + i);
message.setBooleanProperty("newSender", true);
// we need to send two, one for each server to exercise the load balancing
pd.send(message);
pd.send(message);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
AtomicBoolean newSenderFound = new AtomicBoolean(false);
try (Connection connection1 = cf1.createConnection()) {
Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue("queues.0"));
connection1.start();
receiveAllMessages(consumer, 1000, m -> {
try {
if (m.getBooleanProperty("newSender")) {
newSenderFound.set(true);
}
} catch (Exception ignored) {
}
});
}
// messages should still flow
Assert.assertTrue(newSenderFound.get());
}
private int receiveAllMessages(MessageConsumer messageConsume, int minMessages, Consumer<Message> messageProcessor) throws JMSException {
int msg = 0;
for (;;) {
Message message;
if (msg < minMessages) {
message = messageConsume.receive(10_000);
} else {
message = messageConsume.receive(1000);
}
if (message == null) {
break;
}
msg++;
if (messageProcessor != null) {
messageProcessor.accept(message);
}
}
return msg;
}
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
}
protected void setRedistributionDelay(final long delay) {
}
protected void setupServers() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
servers[0].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
servers[1].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
servers[0].getConfiguration().setJournalBufferSize_NIO(20 * 1024);
servers[0].getConfiguration().setJournalBufferSize_AIO(20 * 1024);
servers[1].getConfiguration().setJournalBufferSize_NIO(20 * 1024);
servers[1].getConfiguration().setJournalBufferSize_AIO(20 * 1024);
servers[0].getConfiguration().getAddressSettings().clear();
servers[0].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(10));
servers[1].getConfiguration().getAddressSettings().clear();
servers[1].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(10));
}
protected void stopServers() throws Exception {
closeAllConsumers();
closeAllSessionFactories();
closeAllServerLocatorsFactories();
stopServers(0, 1);
clearServer(0, 1);
}
/**
* @param serverID
* @return
* @throws Exception
*/
@Override
protected ConfigurationImpl createBasicConfig(final int serverID) {
ConfigurationImpl configuration = super.createBasicConfig(serverID);
configuration.setMessageExpiryScanPeriod(100);
return configuration;
}
}

View File

@ -1068,6 +1068,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
public void replicationSyncFinished() {
// no-op
}
@Override
public long getWarningRecordSize() {
return getMaxRecordSize() - 2048;
}
}
private interface ExtraConfigurer {