ARTEMIS-3261 Enhance compact to deal with Rollbacks and update records that can be replaced

This commit is contained in:
Clebert Suconic 2021-04-22 21:59:05 -04:00
parent 2a28a5d42f
commit 1392cb5f0b
8 changed files with 390 additions and 72 deletions

View File

@ -34,9 +34,9 @@ public final class CompactJournal extends LockAbstract {
super.execute(context); super.execute(context);
try { try {
Configuration configuration = getFileConfiguration(); Configuration configuration = getFileConfiguration();
compactJournal(new File(getJournal()), "activemq-data", "amq", configuration.getJournalMinFiles(), configuration.getJournalFileSize(), null); compactJournal(new File(getJournal()), "activemq-data", "amq", configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), configuration.getJournalFileSize(), null);
System.out.println("Compactation succeeded for " + getJournal()); System.out.println("Compactation succeeded for " + getJournal());
compactJournal(new File(getBinding()), "activemq-bindings", "bindings", 2, 1048576, null); compactJournal(new File(getBinding()), "activemq-bindings", "bindings", 2, 2, 1048576, null);
System.out.println("Compactation succeeded for " + getBinding()); System.out.println("Compactation succeeded for " + getBinding());
} catch (Exception e) { } catch (Exception e) {
@ -49,11 +49,12 @@ public final class CompactJournal extends LockAbstract {
final String journalPrefix, final String journalPrefix,
final String journalSuffix, final String journalSuffix,
final int minFiles, final int minFiles,
final int poolFiles,
final int fileSize, final int fileSize,
final IOCriticalErrorListener listener) throws Exception { final IOCriticalErrorListener listener) throws Exception {
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1); NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
journal.start(); journal.start();

View File

@ -83,6 +83,9 @@ public interface Journal extends ActiveMQComponent {
appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
} }
default void replaceableRecord(int recordType) {
}
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;

View File

@ -58,6 +58,11 @@ public final class FileWrapperJournal extends JournalBase {
private final JournalImpl journal; private final JournalImpl journal;
protected volatile JournalFile currentFile; protected volatile JournalFile currentFile;
@Override
public void replaceableRecord(int recordType) {
journal.replaceableRecord(recordType);
}
/** /**
* @param journal * @param journal
* @throws Exception * @throws Exception

View File

@ -17,9 +17,12 @@
package org.apache.activemq.artemis.core.journal.impl; package org.apache.activemq.artemis.core.journal.impl;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -32,6 +35,7 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.RunnableEx;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -40,6 +44,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
private static final Logger logger = Logger.getLogger(JournalCompactor.class); private static final Logger logger = Logger.getLogger(JournalCompactor.class);
LongObjectHashMap<LinkedList<RunnableEx>> pendingWritesOnTX = new LongObjectHashMap<>();
IntObjectHashMap<LongObjectHashMap<RunnableEx>> pendingUpdates = new IntObjectHashMap<>();
// We try to separate old record from new ones when doing the compacting // We try to separate old record from new ones when doing the compacting
// this is a split line // this is a split line
// We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
@ -214,6 +221,18 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
pendingCommands.clear(); pendingCommands.clear();
} }
public void flushUpdates() throws Exception {
Collection<LongObjectHashMap<RunnableEx>> recordsUpdate = this.pendingUpdates.values();
for (LongObjectHashMap<RunnableEx> recordMap : recordsUpdate) {
for (RunnableEx ex : recordMap.values()) {
ex.run();
}
// a little hand for GC
recordMap.clear();
}
// a little hand for GC
recordsUpdate.clear();
}
// JournalReaderCallback implementation ------------------------------------------- // JournalReaderCallback implementation -------------------------------------------
@Override @Override
@ -238,21 +257,27 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Read Add Record TX " + transactionID + " info " + info); logger.trace("Read Add Record TX " + transactionID + " info " + info);
} }
if (pendingTransactions.get(transactionID) != null || containsRecord(info.id)) { if (pendingTransactions.get(transactionID) != null) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID); produceAddRecordTX(transactionID, info);
} else if (containsRecord(info.id)) {
JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), EncoderPersister.getInstance(),new ByteArrayEncoding(info.data)); addTX(transactionID, () -> produceAddRecordTX(transactionID, info));
record.setCompactCount((short) (info.compactCount + 1));
checkSize(record.getEncodeSize(), info.compactCount);
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
writeEncoder(record);
} }
} }
private void produceAddRecordTX(long transactionID, RecordInfo info) throws Exception {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
record.setCompactCount((short) (info.compactCount + 1));
checkSize(record.getEncodeSize(), info.compactCount);
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
writeEncoder(record);
}
@Override @Override
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception { public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
@ -264,6 +289,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
// Sanity check, this should never happen // Sanity check, this should never happen
ActiveMQJournalLogger.LOGGER.inconsistencyDuringCompacting(transactionID); ActiveMQJournalLogger.LOGGER.inconsistencyDuringCompacting(transactionID);
} else { } else {
flushTX(transactionID);
JournalTransaction newTransaction = newTransactions.remove(transactionID); JournalTransaction newTransaction = newTransactions.remove(transactionID);
if (newTransaction != null) { if (newTransaction != null) {
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, transactionID, null); JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, transactionID, null);
@ -297,24 +323,57 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
} }
if (pendingTransactions.get(transactionID) != null) { if (pendingTransactions.get(transactionID) != null) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID); produceDeleteRecordTX(transactionID, info);
JournalInternalRecord record = new JournalDeleteRecordTX(transactionID, info.id, new ByteArrayEncoding(info.data));
checkSize(record.getEncodeSize());
writeEncoder(record);
newTransaction.addNegative(currentFile, info.id);
} }
// else.. nothing to be done // else.. nothing to be done
} }
private void produceDeleteRecordTX(long transactionID, RecordInfo info) throws Exception {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
JournalInternalRecord record = new JournalDeleteRecordTX(transactionID, info.id, new ByteArrayEncoding(info.data));
checkSize(record.getEncodeSize());
writeEncoder(record);
newTransaction.addNegative(currentFile, info.id);
}
@Override @Override
public void markAsDataFile(final JournalFile file) { public void markAsDataFile(final JournalFile file) {
// nothing to be done here // nothing to be done here
} }
private void addTX(long tx, RunnableEx runnable) {
LinkedList<RunnableEx> runnables = pendingWritesOnTX.get(tx);
if (runnables == null) {
runnables = new LinkedList<>();
pendingWritesOnTX.put(tx, runnables);
}
runnables.add(runnable);
}
private void flushTX(long tx) throws Exception {
LinkedList<RunnableEx> runnables = pendingWritesOnTX.remove(tx);
if (runnables != null) {
for (RunnableEx runnableEx : runnables) {
runnableEx.run();
}
// give a hand to GC...
runnables.clear();
}
}
private void dropTX(long tx) {
LinkedList objects = pendingWritesOnTX.remove(tx);
if (objects != null) {
// a little hand to GC
objects.clear();
}
}
@Override @Override
public void onReadPrepareRecord(final long transactionID, public void onReadPrepareRecord(final long transactionID,
final byte[] extraData, final byte[] extraData,
@ -324,20 +383,23 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
} }
if (pendingTransactions.get(transactionID) != null) { if (pendingTransactions.get(transactionID) != null) {
flushTX(transactionID);
JournalTransaction newTransaction = getNewJournalTransaction(transactionID); producePrepareRecord(transactionID, extraData);
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, transactionID, new ByteArrayEncoding(extraData));
checkSize(prepareRecord.getEncodeSize());
writeEncoder(prepareRecord, newTransaction.getCounter(currentFile));
newTransaction.prepare(currentFile);
} }
} }
private void producePrepareRecord(long transactionID, byte[] extraData) throws Exception {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, transactionID, new ByteArrayEncoding(extraData));
checkSize(prepareRecord.getEncodeSize());
writeEncoder(prepareRecord, newTransaction.getCounter(currentFile));
newTransaction.prepare(currentFile);
}
@Override @Override
public void onReadRollbackRecord(final long transactionID) throws Exception { public void onReadRollbackRecord(final long transactionID) throws Exception {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -351,19 +413,31 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
} else { } else {
JournalTransaction newTransaction = newTransactions.remove(transactionID); JournalTransaction newTransaction = newTransactions.remove(transactionID);
if (newTransaction != null) { if (newTransaction != null) {
flushTX(transactionID); // ths should be a moot operation, but just in case
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID); // we should do this only if there's a prepare record
produceRollbackRecord(transactionID, newTransaction);
checkSize(rollbackRecord.getEncodeSize()); } else {
dropTX(transactionID);
writeEncoder(rollbackRecord);
newTransaction.rollback(currentFile);
} }
} }
} }
private void produceRollbackRecord(long transactionID, JournalTransaction newTransaction) throws Exception {
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID);
checkSize(rollbackRecord.getEncodeSize());
writeEncoder(rollbackRecord);
newTransaction.rollback(currentFile);
}
public void replaceableRecord(int recordType) {
LongObjectHashMap<RunnableEx> longmap = new LongObjectHashMap();
pendingUpdates.put(recordType, longmap);
}
@Override @Override
public void onReadUpdateRecord(final RecordInfo info) throws Exception { public void onReadUpdateRecord(final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -371,47 +445,61 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
} }
if (containsRecord(info.id)) { if (containsRecord(info.id)) {
JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); LongObjectHashMap<RunnableEx> longmap = pendingUpdates.get(info.userRecordType);
if (longmap == null) {
updateRecord.setCompactCount((short) (info.compactCount + 1)); // if not replaceable, we have to always produce the update
produceUpdateRecord(info);
checkSize(updateRecord.getEncodeSize(), info.compactCount);
JournalRecord newRecord = newRecords.get(info.id);
if (newRecord == null) {
ActiveMQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id);
} else { } else {
newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize()); longmap.put(info.id, () -> produceUpdateRecord(info));
} }
writeEncoder(updateRecord);
} }
} }
private void produceUpdateRecord(RecordInfo info) throws Exception {
JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
updateRecord.setCompactCount((short) (info.compactCount + 1));
checkSize(updateRecord.getEncodeSize(), info.compactCount);
JournalRecord newRecord = newRecords.get(info.id);
if (newRecord == null) {
ActiveMQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id);
} else {
newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize());
}
writeEncoder(updateRecord);
}
@Override @Override
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception { public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("onReadUpdateRecordTX " + info); logger.trace("onReadUpdateRecordTX " + info);
} }
if (pendingTransactions.get(transactionID) != null || containsRecord(info.id)) { if (pendingTransactions.get(transactionID) != null) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID); produceUpdateRecordTX(transactionID, info);
} else if (containsRecord(info.id)) {
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); addTX(transactionID, () -> produceUpdateRecordTX(transactionID, info));
updateRecordTX.setCompactCount((short) (info.compactCount + 1));
checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
writeEncoder(updateRecordTX);
newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize());
} else {
onReadUpdateRecord(info);
} }
} }
private void produceUpdateRecordTX(long transactionID, RecordInfo info) throws Exception {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
updateRecordTX.setCompactCount((short) (info.compactCount + 1));
checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
writeEncoder(updateRecordTX);
newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize());
}
/** /**
* @param transactionID * @param transactionID
* @return * @return

View File

@ -376,6 +376,9 @@ public class JournalFilesRepository {
public synchronized void addFreeFile(final JournalFile file, public synchronized void addFreeFile(final JournalFile file,
final boolean renameTmp, final boolean renameTmp,
final boolean checkDelete) throws Exception { final boolean checkDelete) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Adding free file " + file + ", renameTMP=" + renameTmp + ", checkDelete=" + checkDelete);
}
long calculatedSize = 0; long calculatedSize = 0;
try { try {
calculatedSize = file.getFile().size(); calculatedSize = file.getFile().size();
@ -388,7 +391,9 @@ public class JournalFilesRepository {
// Re-initialise it // Re-initialise it
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Adding free file " + file); logger.trace("Re-initializing file " + file + " as checkDelete=" + checkDelete +
", freeFilesCount=" + freeFilesCount + ", dataFiles.size=" + dataFiles.size() +
", openedFiles=" + openedFiles + ", poolSize=" + poolSize);
} }
JournalFile jf = reinitializeFile(file); JournalFile jf = reinitializeFile(file);
@ -400,6 +405,9 @@ public class JournalFilesRepository {
freeFiles.add(jf); freeFiles.add(jf);
freeFilesCount.getAndIncrement(); freeFilesCount.getAndIncrement();
} else { } else {
if (logger.isDebugEnabled()) {
logger.debug("Deleting file " + file.getFile());
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("DataFiles.size() = " + dataFiles.size()); logger.trace("DataFiles.size() = " + dataFiles.size());
logger.trace("openedFiles.size() = " + openedFiles.size()); logger.trace("openedFiles.size() = " + openedFiles.size());

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -209,6 +210,20 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private final ReadWriteLock journalLock = new ReentrantReadWriteLock(); private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
private final ReadWriteLock compactorLock = new ReentrantReadWriteLock(); private final ReadWriteLock compactorLock = new ReentrantReadWriteLock();
HashSet<Integer> replaceableRecords;
/** This will declare a record type as being replaceable on updates.
* Certain update records only need the last value, and they could be replaceable during compacting.
* */
@Override
public void replaceableRecord(int recordType) {
if (replaceableRecords == null) {
replaceableRecords = new HashSet<>();
}
replaceableRecords.add(recordType);
}
private volatile JournalFile currentFile; private volatile JournalFile currentFile;
private volatile JournalState state = JournalState.STOPPED; private volatile JournalState state = JournalState.STOPPED;
@ -1712,6 +1727,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
} }
compactor.flushUpdates();
compactor.flush(); compactor.flush();
// pointcut for tests // pointcut for tests
@ -1847,6 +1863,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID()); compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID());
if (replaceableRecords != null) {
replaceableRecords.forEach((i) -> compactor.replaceableRecord(i));
}
transactions.forEach((id, pendingTransaction) -> { transactions.forEach((id, pendingTransaction) -> {
compactor.addPendingTransaction(id, pendingTransaction.getPositiveArray()); compactor.addPendingTransaction(id, pendingTransaction.getPositiveArray());
pendingTransaction.setCompacting(); pendingTransaction.setCompacting();
@ -2175,7 +2195,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} else { } else {
if (changeData) { if (changeData) {
// Empty dataFiles with no data // Empty dataFiles with no data
filesRepository.addFreeFile(file, false, false); filesRepository.addFreeFile(file, false, true);
} }
} }
} }

View File

@ -170,6 +170,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
Journal localMessage = createMessageJournal(config, criticalErrorListener, fileSize); Journal localMessage = createMessageJournal(config, criticalErrorListener, fileSize);
messageJournal = localMessage; messageJournal = localMessage;
messageJournal.replaceableRecord(JournalRecordIds.UPDATE_DELIVERY_COUNT);
messageJournal.replaceableRecord(JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME);
originalMessageJournal = localMessage; originalMessageJournal = localMessage;
largeMessagesDirectory = config.getLargeMessagesDirectory(); largeMessagesDirectory = config.getLargeMessagesDirectory();

View File

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class InfiniteRedeliveryTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(InfiniteRedeliveryTest.class);
@Parameterized.Parameters(name = "protocol={0}")
public static Collection getParameters() {
return Arrays.asList(new Object[][]{{"CORE"}, {"AMQP"}, {"OPENWIRE"}});
}
public InfiniteRedeliveryTest(String protocol) {
this.protocol = protocol;
}
String protocol;
TestableServer liveServer;
TestableServer backupServer;
Configuration backupConfig;
Configuration liveConfig;
NodeManager nodeManager;
protected TestableServer createTestableServer(Configuration config, NodeManager nodeManager) throws Exception {
boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration;
return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1));
}
// I am using a replicated config to make sure the replica will also configured replaceable records
protected void createReplicatedConfigs() throws Exception {
final TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
final TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
final TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
backupConfig = createDefaultConfig(0, true);
liveConfig = createDefaultConfig(0, true);
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null);
backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(-1).setAllowFailBack(true);
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false);
nodeManager = new InVMNodeManager(true, backupConfig.getJournalLocation());
backupServer = createTestableServer(backupConfig, nodeManager);
liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(TransportConfigurationUtils.getNettyAcceptor(true, 0));
liveServer = createTestableServer(liveConfig, nodeManager);
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
protected void startServer(boolean reschedule) throws Exception {
createReplicatedConfigs();
Configuration configuration = liveServer.getServer().getConfiguration();
configuration.getAddressesSettings().clear();
if (reschedule) {
AddressSettings settings = new AddressSettings().setMaxDeliveryAttempts(Integer.MAX_VALUE).setRedeliveryDelay(1);
configuration.getAddressesSettings().put("#", settings);
} else {
AddressSettings settings = new AddressSettings().setMaxDeliveryAttempts(Integer.MAX_VALUE).setRedeliveryDelay(0);
configuration.getAddressesSettings().put("#", settings);
}
liveServer.start();
backupServer.start();
Wait.waitFor(liveServer.getServer()::isReplicaSync);
}
@Test
public void testInifinteRedeliveryWithScheduling() throws Exception {
testInifinteRedeliveryWithScheduling(true);
}
@Test
public void testInifinteRedeliveryWithoutScheduling() throws Exception {
testInifinteRedeliveryWithScheduling(false);
}
public void testInifinteRedeliveryWithScheduling(boolean reschedule) throws Exception {
startServer(reschedule);
liveServer.getServer().addAddressInfo(new AddressInfo("test").setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
liveServer.getServer().createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST).setAddress("test").setDurable(true));
ConnectionFactory factory;
if (protocol.toUpperCase().equals("OPENWIRE")) {
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=100&jms.redeliveryPolicy.redeliveryDelay=0");
} else {
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
}
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("test");
Assert.assertNotNull(queue);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello"));
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < 100; i++) {
Message message = consumer.receive(10000);
Assert.assertNotNull(message);
session.rollback();
}
connection.close();
liveServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
backupServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
HashMap<Integer, AtomicInteger> counts = countJournal(liveServer.getServer().getConfiguration());
counts.forEach((k, v) -> logger.debug(k + "=" + v));
counts.forEach((k, v) -> Assert.assertTrue("Record type " + k + " has a lot of records:" + v, v.intValue() < 20));
backupServer.stop();
HashMap<Integer, AtomicInteger> backupCounts = countJournal(backupServer.getServer().getConfiguration());
Assert.assertTrue(backupCounts.size() > 0);
backupCounts.forEach((k, v) -> logger.debug("On Backup:" + k + "=" + v));
backupCounts.forEach((k, v) -> Assert.assertTrue("Backup Record type " + k + " has a lot of records:" + v, v.intValue() < 10));
}
}