From 04f6424928a3d9cbc26f6b26f7c2e9b7f7bf7869 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 20 Mar 2024 10:09:26 -0400 Subject: [PATCH] ARTEMIS-4694 Redistribution issues with Almost Large Header Redistribution would add data to the record which would then in turn make the record too large to redistribute. The Redistributor and Bridges should not be removed. Also a warning should be added to warn users about the situation. --- .../jdbc/store/journal/JDBCJournalImpl.java | 5 + .../artemis/core/journal/Journal.java | 2 + .../core/journal/impl/FileWrapperJournal.java | 4 + .../core/journal/impl/JournalImpl.java | 36 ++- .../journal/ActiveMQJournalLogger.java | 6 + .../core/persistence/StorageManager.java | 5 + .../AbstractJournalStorageManager.java | 6 +- .../impl/journal/JournalStorageManager.java | 1 + .../core/replication/ReplicatedJournal.java | 5 + .../core/server/ActiveMQServerLogger.java | 2 +- .../server/cluster/impl/Redistributor.java | 19 +- .../LargeHeadersClusterTest.java | 292 ++++++++++++++++++ .../replication/ReplicationTest.java | 5 + 13 files changed, 366 insertions(+), 22 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/LargeHeadersClusterTest.java diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 88782dad6c..9f48ab3d4f 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -154,6 +154,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { return sqlProvider.getMaxBlobSize(); } + @Override + public long getWarningRecordSize() { + return sqlProvider.getMaxBlobSize() - 2048; + } + @Override protected void createSchema() throws SQLException { createTable(sqlProvider.getCreateJournalTableSQL()); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index 43d602a492..67915fc9da 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -388,4 +388,6 @@ public interface Journal extends ActiveMQComponent { * @return */ long getMaxRecordSize(); + + long getWarningRecordSize(); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 0c2d9dc2b7..798260572b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -131,6 +131,10 @@ public final class FileWrapperJournal extends JournalBase { return journal.getMaxRecordSize(); } + @Override + public long getWarningRecordSize() { + return journal.getWarningRecordSize(); + } /** * Write the record to the current file. */ diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 6a3abac2f8..a76f06950d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -54,6 +54,7 @@ import io.netty.util.collection.ByteObjectHashMap; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQShutdownException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.io.DummyCallback; @@ -923,14 +924,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal logger.trace("scheduling appendAddRecord::id={}, userRecordType={}, record = {}", id, recordType, record); } - final long maxRecordSize = getMaxRecordSize(); final JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record); final int addRecordEncodeSize = addRecord.getEncodeSize(); - if (addRecordEncodeSize > maxRecordSize) { - //The record size should be larger than max record size only on the large messages case. - throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize); - } + checkRecordSize(addRecordEncodeSize, record); final SimpleFuture result = newSyncAndCallbackResult(sync, callback); appendExecutor.execute(new Runnable() { @@ -977,14 +974,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal logger.trace("scheduling appendAddEvent::id={}, userRecordType={}, record = {}", id, recordType, record); } - final long maxRecordSize = getMaxRecordSize(); final JournalInternalRecord addRecord = new JournalAddRecord(JournalImpl.EVENT_RECORD, id, recordType, persister, record); - final int addRecordEncodeSize = addRecord.getEncodeSize(); - if (addRecordEncodeSize > maxRecordSize) { - //The record size should be larger than max record size only on the large messages case. - throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize); - } + checkRecordSize(addRecord.getEncodeSize(), record); final SimpleFuture result = newSyncAndCallbackResult(sync, callback); appendExecutor.execute(() -> { @@ -1012,6 +1004,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal result.get(); } + private void checkRecordSize(int addRecordEncodeSize, Object record) throws ActiveMQIOErrorException { + if (addRecordEncodeSize > getWarningRecordSize()) { + long maxRecordSize = getMaxRecordSize(); + ActiveMQJournalLogger.LOGGER.largeHeaderWarning(addRecordEncodeSize, maxRecordSize, record); + + if (addRecordEncodeSize > maxRecordSize) { + //The record size should be larger than max record size only on the large messages case. + throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize); + } + } + } + @Override public void appendUpdateRecord(final long id, final byte recordType, @@ -1271,10 +1275,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record); int encodeSize = addRecord.getEncodeSize(); - if (encodeSize > getMaxRecordSize()) { - //The record size should be larger than max record size only on the large messages case. - throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(encodeSize, getMaxRecordSize()); - } + checkRecordSize(encodeSize, record); appendExecutor.execute(new Runnable() { @@ -2749,6 +2750,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + @Override + public long getWarningRecordSize() { + return getMaxRecordSize() - 2048; + } + private void flushExecutor(Executor executor) throws InterruptedException { if (executor != null) { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java index c1af466a4b..7b770cbb58 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java @@ -203,4 +203,10 @@ public interface ActiveMQJournalLogger { // same as criticalIO but with the FileName associated (if there's a file available) @LogMessage(id = 144011, value = "Critical IO Exception happened: {} on {}", level = LogMessage.Level.WARN) void criticalIOFile(String message, String fileName, Throwable error); + + @LogMessage(id = 144012, value = "Journal Record sized at {}, which is too close to the max record Size at {}. Record = {}. Internal broker operations such as redistribution and DLQ may be compromised. Move large headers into the body of messages.", level = LogMessage.Level.WARN) + void largeHeaderWarning(long recordSize, long maxRecordSize, Object originalData); + + + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 4644d6832d..76d94eac05 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -83,6 +83,11 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { return Long.MAX_VALUE; } + default long getWarningRecordSize() { + /** Null journal is pretty much memory */ + return Long.MAX_VALUE; + } + default void recoverLargeMessagesOnFolder(Set files) throws Exception { } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 61b2db6150..b63b88a9e7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -276,12 +276,16 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, this); } - @Override public long getMaxRecordSize() { return messageJournal.getMaxRecordSize(); } + @Override + public long getWarningRecordSize() { + return messageJournal.getWarningRecordSize(); + } + /** * Called during initialization. Used by implementations to setup Journals, Stores etc... diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 85c1b4bd2d..665f13b0ff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -539,6 +539,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { ActiveMQServerLogger.LOGGER.messageWithHeaderTooLarge(largeMessage.getMessageID(), logger.getName()); logger.debug("Message header too large for {}", largeMessage); + new Exception("Trace").printStackTrace(); throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize, maxRecordSize); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java index 32b3262d05..e8038ca8f4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java @@ -659,4 +659,9 @@ public class ReplicatedJournal implements Journal { public long getMaxRecordSize() { return localJournal.getMaxRecordSize(); } + + @Override + public long getWarningRecordSize() { + return localJournal.getWarningRecordSize(); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 9d97f2d408..19e7facfe3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1201,7 +1201,7 @@ public interface ActiveMQServerLogger { void failedToDealWithObjectProperty(SimpleString property, String exceptionMessage); @LogMessage(id = 222303, value = "Redistribution by {} of messageID = {} failed", level = LogMessage.Level.WARN) - void errorRedistributing(String queueName, long m, Throwable t); + void errorRedistributing(String queueName, String m, Throwable t); @LogMessage(id = 222304, value = "Unable to load message from journal", level = LogMessage.Level.WARN) void unableToLoadMessageFromJournal(Throwable t); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index 8d223a5ba6..222ad1db01 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; @@ -127,14 +128,22 @@ public class Redistributor implements Consumer { RoutingContext context = routingInfo.getA(); Message message = routingInfo.getB(); - postOffice.processRoute(message, context, false); + try { + postOffice.processRoute(message, context, false); - if (RefCountMessage.isRefTraceEnabled()) { - RefCountMessage.deferredDebug(reference.getMessage(), "redistributing"); + if (RefCountMessage.isRefTraceEnabled()) { + RefCountMessage.deferredDebug(reference.getMessage(), "redistributing"); + } + + ackRedistribution(reference, context.getTransaction()); + } catch (Throwable e) { + if (context.getTransaction() != null) { + context.getTransaction().setAsync(true).rollback(); + } + ActiveMQServerLogger.LOGGER.errorRedistributing(String.valueOf(this.queue.getName()), String.valueOf(message), e); + return HandleStatus.NO_MATCH; } - ackRedistribution(reference, context.getTransaction()); - return HandleStatus.HANDLED; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/LargeHeadersClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/LargeHeadersClusterTest.java new file mode 100644 index 0000000000..b8192fa762 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/LargeHeadersClusterTest.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.lang.invoke.MethodHandles; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(value = Parameterized.class) +public class LargeHeadersClusterTest extends ClusterTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final SimpleString queueName = SimpleString.toSimpleString("queues.0"); + + // I'm taking any number that /2 = Odd + // to avoid perfect roundings and making sure messages are evenly distributed + private static final int NUMBER_OF_MESSAGES = 77 * 2; + + @Parameterized.Parameters(name = "protocol={0}") + public static Collection getParameters() { + return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}, {"OPENWIRE"}}); + } + + @Parameterized.Parameter(0) + public String protocol; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + } + private void startServers(MessageLoadBalancingType loadBalancingType) throws Exception { + setupServers(); + + setRedistributionDelay(0); + + setupCluster(loadBalancingType); + + AddressSettings as = new AddressSettings().setRedistributionDelay(0).setExpiryAddress(SimpleString.toSimpleString("queues.expiry")); + + getServer(0).getAddressSettingsRepository().addMatch("queues.*", as); + getServer(1).getAddressSettingsRepository().addMatch("queues.*", as); + + startServers(0); + startServers(1); + + createQueue(SimpleString.toSimpleString("queues.expiry")); + createQueue(queueName); + } + + private void createQueue(SimpleString queueName) throws Exception { + QueueConfiguration queueConfiguration = new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST); + servers[0].createQueue(queueConfiguration); + servers[1].createQueue(queueConfiguration); + } + + protected boolean isNetty() { + return true; + } + + private ConnectionFactory getJmsConnectionFactory(int node) { + if (protocol.equals("AMQP")) { + return new JmsConnectionFactory("amqp://localhost:" + (61616 + node)); + } else if (protocol.equals("OPENWIRE")) { + return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node)); + } else if (protocol.equals("CORE")) { + return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node)); + } else { + Assert.fail("Protocol " + protocol + " unknown"); + return null; + } + } + + @Test + public void testGrowingHeaders() throws Exception { + startServers(MessageLoadBalancingType.ON_DEMAND); + + ConnectionFactory cf0 = getJmsConnectionFactory(0); + ConnectionFactory cf1 = getJmsConnectionFactory(1); + try (Connection cn = cf0.createConnection()) { + Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString())); + + StringBuffer bufferString = new StringBuffer(); + for (int i = 0; i < 9_500; i++) { + bufferString.append("-"); + } + + int i = 0; + + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + try { + for (i = 0; i < 1_000; i++) { + if (i % 100 == 0) { + logger.info("Sent {} messages", i); + } + TextMessage message = sn.createTextMessage("hello " + i); + message.setStringProperty("large", bufferString.toString()); + message.setBooleanProperty("newSender", false); + // we need to send two, one for each server to exercise the load balancing + pd.send(message); + pd.send(message); + bufferString.append("-"); // growing the header + } + } catch (Throwable e) { + logger.warn("error at {}", i, e); + } + if (!protocol.equals("AMQP")) { + Assert.assertTrue(loggerHandler.findText("AMQ144012")); + } + } + } + + try (Connection connection1 = cf1.createConnection()) { + Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue("queues.0")); + connection1.start(); + receiveAllMessages(consumer, 1, m -> logger.debug("received {}", m)); + } + + try (Connection cn = cf0.createConnection()) { + Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString())); + + try { + for (int i = 0; i < 1_000; i++) { + if (i % 100 == 0) { + logger.info("Sent {} messages", i); + } + TextMessage message = sn.createTextMessage("newSender " + i); + message.setBooleanProperty("newSender", true); + // we need to send two, one for each server to exercise the load balancing + pd.send(message); + pd.send(message); + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + } + + AtomicBoolean newSenderFound = new AtomicBoolean(false); + + try (Connection connection1 = cf1.createConnection()) { + Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue("queues.0")); + connection1.start(); + + receiveAllMessages(consumer, 1000, m -> { + try { + if (m.getBooleanProperty("newSender")) { + newSenderFound.set(true); + } + } catch (Exception ignored) { + } + }); + + } + + + // messages should still flow + Assert.assertTrue(newSenderFound.get()); + } + + + private int receiveAllMessages(MessageConsumer messageConsume, int minMessages, Consumer messageProcessor) throws JMSException { + + int msg = 0; + + for (;;) { + Message message; + + if (msg < minMessages) { + message = messageConsume.receive(10_000); + } else { + message = messageConsume.receive(1000); + } + if (message == null) { + break; + } + + msg++; + + if (messageProcessor != null) { + messageProcessor.accept(message); + } + } + + return msg; + } + + protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { + setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1); + + setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0); + } + + protected void setRedistributionDelay(final long delay) { + } + + protected void setupServers() throws Exception { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + + servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory()); + servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory()); + servers[0].addProtocolManagerFactory(new OpenWireProtocolManagerFactory()); + servers[1].addProtocolManagerFactory(new OpenWireProtocolManagerFactory()); + + servers[0].getConfiguration().setJournalBufferSize_NIO(20 * 1024); + servers[0].getConfiguration().setJournalBufferSize_AIO(20 * 1024); + servers[1].getConfiguration().setJournalBufferSize_NIO(20 * 1024); + servers[1].getConfiguration().setJournalBufferSize_AIO(20 * 1024); + + servers[0].getConfiguration().getAddressSettings().clear(); + servers[0].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(10)); + + servers[1].getConfiguration().getAddressSettings().clear(); + servers[1].getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(10)); + } + + protected void stopServers() throws Exception { + closeAllConsumers(); + + closeAllSessionFactories(); + + closeAllServerLocatorsFactories(); + + stopServers(0, 1); + + clearServer(0, 1); + } + + /** + * @param serverID + * @return + * @throws Exception + */ + @Override + protected ConfigurationImpl createBasicConfig(final int serverID) { + ConfigurationImpl configuration = super.createBasicConfig(serverID); + configuration.setMessageExpiryScanPeriod(100); + + return configuration; + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index dc23fa232c..3d2bb82ed3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -1068,6 +1068,11 @@ public final class ReplicationTest extends ActiveMQTestBase { public void replicationSyncFinished() { // no-op } + + @Override + public long getWarningRecordSize() { + return getMaxRecordSize() - 2048; + } } private interface ExtraConfigurer {