diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java index 347bd4b33f..d8c339c173 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java @@ -34,9 +34,9 @@ public final class CompactJournal extends LockAbstract { super.execute(context); try { Configuration configuration = getFileConfiguration(); - compactJournal(new File(getJournal()), "activemq-data", "amq", configuration.getJournalMinFiles(), configuration.getJournalFileSize(), null); + compactJournal(new File(getJournal()), "activemq-data", "amq", configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), configuration.getJournalFileSize(), null); System.out.println("Compactation succeeded for " + getJournal()); - compactJournal(new File(getBinding()), "activemq-bindings", "bindings", 2, 1048576, null); + compactJournal(new File(getBinding()), "activemq-bindings", "bindings", 2, 2, 1048576, null); System.out.println("Compactation succeeded for " + getBinding()); } catch (Exception e) { @@ -49,11 +49,12 @@ public final class CompactJournal extends LockAbstract { final String journalPrefix, final String journalSuffix, final int minFiles, + final int poolFiles, final int fileSize, final IOCriticalErrorListener listener) throws Exception { NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1); - JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); + JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); journal.start(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index 473ea1c3f2..a9299aaf83 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -83,6 +83,9 @@ public interface Journal extends ActiveMQComponent { appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); } + default void replaceableRecord(int recordType) { + } + void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index c76ed68d32..536c7e720d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -58,6 +58,11 @@ public final class FileWrapperJournal extends JournalBase { private final JournalImpl journal; protected volatile JournalFile currentFile; + @Override + public void replaceableRecord(int recordType) { + journal.replaceableRecord(recordType); + } + /** * @param journal * @throws Exception diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java index 10d8a666bf..8b70325705 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java @@ -17,9 +17,12 @@ package org.apache.activemq.artemis.core.journal.impl; import java.util.Arrays; +import java.util.Collection; import java.util.LinkedList; import java.util.List; +import io.netty.util.collection.IntObjectHashMap; +import io.netty.util.collection.LongObjectHashMap; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -32,6 +35,7 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.RunnableEx; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; import org.jboss.logging.Logger; @@ -40,6 +44,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ private static final Logger logger = Logger.getLogger(JournalCompactor.class); + LongObjectHashMap> pendingWritesOnTX = new LongObjectHashMap<>(); + IntObjectHashMap> pendingUpdates = new IntObjectHashMap<>(); + // We try to separate old record from new ones when doing the compacting // this is a split line // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE @@ -214,6 +221,18 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ pendingCommands.clear(); } + public void flushUpdates() throws Exception { + Collection> recordsUpdate = this.pendingUpdates.values(); + for (LongObjectHashMap recordMap : recordsUpdate) { + for (RunnableEx ex : recordMap.values()) { + ex.run(); + } + // a little hand for GC + recordMap.clear(); + } + // a little hand for GC + recordsUpdate.clear(); + } // JournalReaderCallback implementation ------------------------------------------- @Override @@ -238,21 +257,27 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ if (logger.isTraceEnabled()) { logger.trace("Read Add Record TX " + transactionID + " info " + info); } - if (pendingTransactions.get(transactionID) != null || containsRecord(info.id)) { - JournalTransaction newTransaction = getNewJournalTransaction(transactionID); - - JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), EncoderPersister.getInstance(),new ByteArrayEncoding(info.data)); - - record.setCompactCount((short) (info.compactCount + 1)); - - checkSize(record.getEncodeSize(), info.compactCount); - - newTransaction.addPositive(currentFile, info.id, record.getEncodeSize()); - - writeEncoder(record); + if (pendingTransactions.get(transactionID) != null) { + produceAddRecordTX(transactionID, info); + } else if (containsRecord(info.id)) { + addTX(transactionID, () -> produceAddRecordTX(transactionID, info)); } } + private void produceAddRecordTX(long transactionID, RecordInfo info) throws Exception { + JournalTransaction newTransaction = getNewJournalTransaction(transactionID); + + JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); + + record.setCompactCount((short) (info.compactCount + 1)); + + checkSize(record.getEncodeSize(), info.compactCount); + + newTransaction.addPositive(currentFile, info.id, record.getEncodeSize()); + + writeEncoder(record); + } + @Override public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception { @@ -264,6 +289,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ // Sanity check, this should never happen ActiveMQJournalLogger.LOGGER.inconsistencyDuringCompacting(transactionID); } else { + flushTX(transactionID); JournalTransaction newTransaction = newTransactions.remove(transactionID); if (newTransaction != null) { JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, transactionID, null); @@ -297,24 +323,57 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } if (pendingTransactions.get(transactionID) != null) { - JournalTransaction newTransaction = getNewJournalTransaction(transactionID); - - JournalInternalRecord record = new JournalDeleteRecordTX(transactionID, info.id, new ByteArrayEncoding(info.data)); - - checkSize(record.getEncodeSize()); - - writeEncoder(record); - - newTransaction.addNegative(currentFile, info.id); + produceDeleteRecordTX(transactionID, info); } // else.. nothing to be done } + private void produceDeleteRecordTX(long transactionID, RecordInfo info) throws Exception { + JournalTransaction newTransaction = getNewJournalTransaction(transactionID); + + JournalInternalRecord record = new JournalDeleteRecordTX(transactionID, info.id, new ByteArrayEncoding(info.data)); + + checkSize(record.getEncodeSize()); + + writeEncoder(record); + + newTransaction.addNegative(currentFile, info.id); + } + @Override public void markAsDataFile(final JournalFile file) { // nothing to be done here } + private void addTX(long tx, RunnableEx runnable) { + LinkedList runnables = pendingWritesOnTX.get(tx); + if (runnables == null) { + runnables = new LinkedList<>(); + pendingWritesOnTX.put(tx, runnables); + } + runnables.add(runnable); + } + + private void flushTX(long tx) throws Exception { + LinkedList runnables = pendingWritesOnTX.remove(tx); + if (runnables != null) { + for (RunnableEx runnableEx : runnables) { + runnableEx.run(); + } + // give a hand to GC... + runnables.clear(); + } + } + + private void dropTX(long tx) { + LinkedList objects = pendingWritesOnTX.remove(tx); + if (objects != null) { + // a little hand to GC + objects.clear(); + } + } + + @Override public void onReadPrepareRecord(final long transactionID, final byte[] extraData, @@ -324,20 +383,23 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } if (pendingTransactions.get(transactionID) != null) { - - JournalTransaction newTransaction = getNewJournalTransaction(transactionID); - - JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, transactionID, new ByteArrayEncoding(extraData)); - - checkSize(prepareRecord.getEncodeSize()); - - writeEncoder(prepareRecord, newTransaction.getCounter(currentFile)); - - newTransaction.prepare(currentFile); - + flushTX(transactionID); + producePrepareRecord(transactionID, extraData); } } + private void producePrepareRecord(long transactionID, byte[] extraData) throws Exception { + JournalTransaction newTransaction = getNewJournalTransaction(transactionID); + + JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, transactionID, new ByteArrayEncoding(extraData)); + + checkSize(prepareRecord.getEncodeSize()); + + writeEncoder(prepareRecord, newTransaction.getCounter(currentFile)); + + newTransaction.prepare(currentFile); + } + @Override public void onReadRollbackRecord(final long transactionID) throws Exception { if (logger.isTraceEnabled()) { @@ -351,19 +413,31 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } else { JournalTransaction newTransaction = newTransactions.remove(transactionID); if (newTransaction != null) { - - JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID); - - checkSize(rollbackRecord.getEncodeSize()); - - writeEncoder(rollbackRecord); - - newTransaction.rollback(currentFile); + flushTX(transactionID); // ths should be a moot operation, but just in case + // we should do this only if there's a prepare record + produceRollbackRecord(transactionID, newTransaction); + } else { + dropTX(transactionID); } } } + private void produceRollbackRecord(long transactionID, JournalTransaction newTransaction) throws Exception { + JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID); + + checkSize(rollbackRecord.getEncodeSize()); + + writeEncoder(rollbackRecord); + + newTransaction.rollback(currentFile); + } + + public void replaceableRecord(int recordType) { + LongObjectHashMap longmap = new LongObjectHashMap(); + pendingUpdates.put(recordType, longmap); + } + @Override public void onReadUpdateRecord(final RecordInfo info) throws Exception { if (logger.isTraceEnabled()) { @@ -371,47 +445,61 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } if (containsRecord(info.id)) { - JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); - - updateRecord.setCompactCount((short) (info.compactCount + 1)); - - checkSize(updateRecord.getEncodeSize(), info.compactCount); - - JournalRecord newRecord = newRecords.get(info.id); - - if (newRecord == null) { - ActiveMQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id); + LongObjectHashMap longmap = pendingUpdates.get(info.userRecordType); + if (longmap == null) { + // if not replaceable, we have to always produce the update + produceUpdateRecord(info); } else { - newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize()); + longmap.put(info.id, () -> produceUpdateRecord(info)); } - - writeEncoder(updateRecord); } } + private void produceUpdateRecord(RecordInfo info) throws Exception { + JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); + + updateRecord.setCompactCount((short) (info.compactCount + 1)); + + checkSize(updateRecord.getEncodeSize(), info.compactCount); + + JournalRecord newRecord = newRecords.get(info.id); + + if (newRecord == null) { + ActiveMQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id); + } else { + newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize()); + } + + writeEncoder(updateRecord); + } + @Override public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception { if (logger.isTraceEnabled()) { logger.trace("onReadUpdateRecordTX " + info); } - if (pendingTransactions.get(transactionID) != null || containsRecord(info.id)) { - JournalTransaction newTransaction = getNewJournalTransaction(transactionID); - - JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); - - updateRecordTX.setCompactCount((short) (info.compactCount + 1)); - - checkSize(updateRecordTX.getEncodeSize(), info.compactCount); - - writeEncoder(updateRecordTX); - - newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize()); - } else { - onReadUpdateRecord(info); + if (pendingTransactions.get(transactionID) != null) { + produceUpdateRecordTX(transactionID, info); + } else if (containsRecord(info.id)) { + addTX(transactionID, () -> produceUpdateRecordTX(transactionID, info)); } } + private void produceUpdateRecordTX(long transactionID, RecordInfo info) throws Exception { + JournalTransaction newTransaction = getNewJournalTransaction(transactionID); + + JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); + + updateRecordTX.setCompactCount((short) (info.compactCount + 1)); + + checkSize(updateRecordTX.getEncodeSize(), info.compactCount); + + writeEncoder(updateRecordTX); + + newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize()); + } + /** * @param transactionID * @return diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java index 34853f2700..258ab1d14d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java @@ -376,6 +376,9 @@ public class JournalFilesRepository { public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp, final boolean checkDelete) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Adding free file " + file + ", renameTMP=" + renameTmp + ", checkDelete=" + checkDelete); + } long calculatedSize = 0; try { calculatedSize = file.getFile().size(); @@ -388,7 +391,9 @@ public class JournalFilesRepository { // Re-initialise it if (logger.isTraceEnabled()) { - logger.trace("Adding free file " + file); + logger.trace("Re-initializing file " + file + " as checkDelete=" + checkDelete + + ", freeFilesCount=" + freeFilesCount + ", dataFiles.size=" + dataFiles.size() + + ", openedFiles=" + openedFiles + ", poolSize=" + poolSize); } JournalFile jf = reinitializeFile(file); @@ -400,6 +405,9 @@ public class JournalFilesRepository { freeFiles.add(jf); freeFilesCount.getAndIncrement(); } else { + if (logger.isDebugEnabled()) { + logger.debug("Deleting file " + file.getFile()); + } if (logger.isTraceEnabled()) { logger.trace("DataFiles.size() = " + dataFiles.size()); logger.trace("openedFiles.size() = " + openedFiles.size()); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index fcb69e4e0e..68d07dd349 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -209,6 +210,20 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private final ReadWriteLock journalLock = new ReentrantReadWriteLock(); private final ReadWriteLock compactorLock = new ReentrantReadWriteLock(); + HashSet replaceableRecords; + + + /** This will declare a record type as being replaceable on updates. + * Certain update records only need the last value, and they could be replaceable during compacting. + * */ + @Override + public void replaceableRecord(int recordType) { + if (replaceableRecords == null) { + replaceableRecords = new HashSet<>(); + } + replaceableRecords.add(recordType); + } + private volatile JournalFile currentFile; private volatile JournalState state = JournalState.STOPPED; @@ -1712,6 +1727,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } + compactor.flushUpdates(); compactor.flush(); // pointcut for tests @@ -1847,6 +1863,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID()); + if (replaceableRecords != null) { + replaceableRecords.forEach((i) -> compactor.replaceableRecord(i)); + } + transactions.forEach((id, pendingTransaction) -> { compactor.addPendingTransaction(id, pendingTransaction.getPositiveArray()); pendingTransaction.setCompacting(); @@ -2175,7 +2195,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } else { if (changeData) { // Empty dataFiles with no data - filesRepository.addFreeFile(file, false, false); + filesRepository.addFreeFile(file, false, true); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index b21e86831a..c01b411ba9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -170,6 +170,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager { Journal localMessage = createMessageJournal(config, criticalErrorListener, fileSize); messageJournal = localMessage; + messageJournal.replaceableRecord(JournalRecordIds.UPDATE_DELIVERY_COUNT); + messageJournal.replaceableRecord(JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME); + originalMessageJournal = localMessage; largeMessagesDirectory = config.getLargeMessagesDirectory(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.java new file mode 100644 index 0000000000..69b597765f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InfiniteRedeliveryTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; +import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer; +import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.apache.activemq.artemis.utils.Wait; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class InfiniteRedeliveryTest extends ActiveMQTestBase { + + private static final Logger logger = Logger.getLogger(InfiniteRedeliveryTest.class); + + @Parameterized.Parameters(name = "protocol={0}") + public static Collection getParameters() { + return Arrays.asList(new Object[][]{{"CORE"}, {"AMQP"}, {"OPENWIRE"}}); + } + + public InfiniteRedeliveryTest(String protocol) { + this.protocol = protocol; + } + + + String protocol; + + TestableServer liveServer; + TestableServer backupServer; + + Configuration backupConfig; + Configuration liveConfig; + NodeManager nodeManager; + + protected TestableServer createTestableServer(Configuration config, NodeManager nodeManager) throws Exception { + boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration; + return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1)); + } + + // I am using a replicated config to make sure the replica will also configured replaceable records + protected void createReplicatedConfigs() throws Exception { + final TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0); + final TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0); + final TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0); + + backupConfig = createDefaultConfig(0, true); + liveConfig = createDefaultConfig(0, true); + + ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null); + + backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false); + + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(-1).setAllowFailBack(true); + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false); + + nodeManager = new InVMNodeManager(true, backupConfig.getJournalLocation()); + + backupServer = createTestableServer(backupConfig, nodeManager); + + liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(TransportConfigurationUtils.getNettyAcceptor(true, 0)); + + liveServer = createTestableServer(liveConfig, nodeManager); + } + + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + } + + protected void startServer(boolean reschedule) throws Exception { + createReplicatedConfigs(); + Configuration configuration = liveServer.getServer().getConfiguration(); + configuration.getAddressesSettings().clear(); + if (reschedule) { + AddressSettings settings = new AddressSettings().setMaxDeliveryAttempts(Integer.MAX_VALUE).setRedeliveryDelay(1); + configuration.getAddressesSettings().put("#", settings); + } else { + AddressSettings settings = new AddressSettings().setMaxDeliveryAttempts(Integer.MAX_VALUE).setRedeliveryDelay(0); + configuration.getAddressesSettings().put("#", settings); + } + liveServer.start(); + backupServer.start(); + Wait.waitFor(liveServer.getServer()::isReplicaSync); + } + + @Test + public void testInifinteRedeliveryWithScheduling() throws Exception { + testInifinteRedeliveryWithScheduling(true); + } + + @Test + public void testInifinteRedeliveryWithoutScheduling() throws Exception { + testInifinteRedeliveryWithScheduling(false); + } + + public void testInifinteRedeliveryWithScheduling(boolean reschedule) throws Exception { + startServer(reschedule); + liveServer.getServer().addAddressInfo(new AddressInfo("test").setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + liveServer.getServer().createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST).setAddress("test").setDurable(true)); + + ConnectionFactory factory; + + if (protocol.toUpperCase().equals("OPENWIRE")) { + factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=100&jms.redeliveryPolicy.redeliveryDelay=0"); + } else { + factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + } + + Connection connection = factory.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("test"); + Assert.assertNotNull(queue); + MessageProducer producer = session.createProducer(queue); + + producer.send(session.createTextMessage("hello")); + session.commit(); + + + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + for (int i = 0; i < 100; i++) { + Message message = consumer.receive(10000); + Assert.assertNotNull(message); + session.rollback(); + } + connection.close(); + + liveServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000); + backupServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000); + + HashMap counts = countJournal(liveServer.getServer().getConfiguration()); + counts.forEach((k, v) -> logger.debug(k + "=" + v)); + counts.forEach((k, v) -> Assert.assertTrue("Record type " + k + " has a lot of records:" + v, v.intValue() < 20)); + + backupServer.stop(); + + HashMap backupCounts = countJournal(backupServer.getServer().getConfiguration()); + Assert.assertTrue(backupCounts.size() > 0); + backupCounts.forEach((k, v) -> logger.debug("On Backup:" + k + "=" + v)); + backupCounts.forEach((k, v) -> Assert.assertTrue("Backup Record type " + k + " has a lot of records:" + v, v.intValue() < 10)); + + + } +}