From 0edf599adca9365993579a96d717ed2f3c0d0af9 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 4 Jun 2021 18:15:58 -0400 Subject: [PATCH] ARTEMIS-3261 Updating logic to use only replaceable records on compacting verification --- .../tools/journal/CompactJournal.java | 4 +-- .../artemis/core/journal/Journal.java | 2 +- .../core/journal/impl/FileWrapperJournal.java | 2 +- .../core/journal/impl/JournalCompactor.java | 24 ++++++++----- .../core/journal/impl/JournalFile.java | 4 +++ .../core/journal/impl/JournalFileImpl.java | 12 +++++++ .../core/journal/impl/JournalImpl.java | 36 +++++++++++-------- .../core/journal/impl/JournalRecord.java | 5 ++- .../journal/impl/JournalRecordProvider.java | 2 ++ .../core/journal/impl/JournalTransaction.java | 15 ++++---- .../unit/core/journal/impl/ReclaimerTest.java | 10 ++++++ 11 files changed, 83 insertions(+), 33 deletions(-) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java index 32aef5ef7c..2b8dc4db82 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/CompactJournal.java @@ -60,14 +60,14 @@ public final class CompactJournal extends LockAbstract { final int poolFiles, final int fileSize, final IOCriticalErrorListener listener, - int... replaceableRecords) throws Exception { + byte... replaceableRecords) throws Exception { NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1); JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); if (historyFolder != null) { journal.setHistoryFolder(historyFolder, -1, -1); } - for (int i : replaceableRecords) { + for (byte i : replaceableRecords) { journal.replaceableRecord(i); } journal.setRemoveExtraFilesOnLoad(true); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index 6618e8d90f..28aa2d23a0 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -105,7 +105,7 @@ public interface Journal extends ActiveMQComponent { appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); } - default void replaceableRecord(int recordType) { + default void replaceableRecord(byte recordType) { } void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 401797e4c4..325e6161e8 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -60,7 +60,7 @@ public final class FileWrapperJournal extends JournalBase { protected volatile JournalFile currentFile; @Override - public void replaceableRecord(int recordType) { + public void replaceableRecord(byte recordType) { journal.replaceableRecord(recordType); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java index 8b70325705..0108bf9828 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java @@ -144,15 +144,20 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ pendingCommands.add(new DeleteCompactCommand(id, usedFile)); } + @Override + public boolean isReplaceableRecord(byte recordType) { + return journal.isReplaceableRecord(recordType); + } + /** * @param id * @param usedFile */ - public void addCommandUpdate(final long id, final JournalFile usedFile, final int size) { + public void addCommandUpdate(final long id, final JournalFile usedFile, final int size, byte userRecordType) { if (logger.isTraceEnabled()) { logger.trace("addCommandUpdate id " + id + " usedFile " + usedFile + " size " + size); } - pendingCommands.add(new UpdateCompactCommand(id, usedFile, size)); + pendingCommands.add(new UpdateCompactCommand(id, usedFile, size, userRecordType)); } private void checkSize(final int size) throws Exception { @@ -273,7 +278,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ checkSize(record.getEncodeSize(), info.compactCount); - newTransaction.addPositive(currentFile, info.id, record.getEncodeSize()); + newTransaction.addPositive(currentFile, info.id, record.getEncodeSize(), info.userRecordType); writeEncoder(record); } @@ -433,7 +438,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ newTransaction.rollback(currentFile); } - public void replaceableRecord(int recordType) { + public void replaceableRecord(byte recordType) { LongObjectHashMap longmap = new LongObjectHashMap(); pendingUpdates.put(recordType, longmap); } @@ -467,7 +472,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ if (newRecord == null) { ActiveMQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id); } else { - newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize()); + newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize(), journal.isReplaceableRecord(info.userRecordType)); } writeEncoder(updateRecord); @@ -497,7 +502,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ writeEncoder(updateRecordTX); - newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize()); + newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize(), info.userRecordType); } /** @@ -561,14 +566,17 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ private final long id; + private final byte userRecordType; + private final JournalFile usedFile; private final int size; - private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size) { + private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size, byte userRecordType) { this.id = id; this.usedFile = usedFile; this.size = size; + this.userRecordType = userRecordType; } @Override @@ -577,7 +585,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ if (updateRecord == null) { ActiveMQJournalLogger.LOGGER.noRecordDuringCompactReplay(id); } else { - updateRecord.addUpdateFile(usedFile, size); + updateRecord.addUpdateFile(usedFile, size, journal.isReplaceableRecord(userRecordType)); } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java index 9bd2b50933..264962697b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java @@ -39,6 +39,10 @@ public interface JournalFile { void decPosCount(); + int getReplaceableCount(); + + void incReplaceableCount(); + void incAddRecord(); int getAddRecord(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java index 8c99f29bb8..2e4a072177 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java @@ -42,10 +42,12 @@ public class JournalFileImpl implements JournalFile { this.reclaimable = reclaimable; } + private static final AtomicIntegerFieldUpdater replaceableCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "replaceableCountField"); private static final AtomicIntegerFieldUpdater posCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "posCountField"); private static final AtomicIntegerFieldUpdater addRecordUpdate = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "addRecordField"); private static final AtomicIntegerFieldUpdater liveBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "liveBytesField"); + private volatile int replaceableCountField = 0; private volatile int posCountField = 0; private volatile int addRecordField = 0; private volatile int liveBytesField = 0; @@ -77,6 +79,16 @@ public class JournalFileImpl implements JournalFile { return posCountUpdater.get(this); } + @Override + public int getReplaceableCount() { + return replaceableCountUpdater.get(this); + } + + @Override + public void incReplaceableCount() { + replaceableCountUpdater.incrementAndGet(this); + } + @Override public boolean isPosReclaimCriteria() { return posReclaimCriteria; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index aac1554237..383f2e9013 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -31,7 +31,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.GregorianCalendar; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -51,6 +50,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; +import io.netty.util.collection.ByteObjectHashMap; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -114,6 +114,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal * * To update this value, define a System Property org.apache.activemq.artemis.core.journal.impl.JournalImpl.UPDATE_FACTOR=YOUR VALUE * + * We only calculate this against replaceable updates, on this case for redelivery counts and rescheduled redelivery in artemis server + * * */ public static final double UPDATE_FACTOR; private static final String BKP_EXTENSION = "bkp"; @@ -125,7 +127,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal double value; try { if (UPDATE_FACTOR_STR == null) { - value = 100; + value = 10; } else { value = Double.parseDouble(UPDATE_FACTOR_STR); } @@ -323,20 +325,26 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private final ReadWriteLock journalLock = new ReentrantReadWriteLock(); private final ReadWriteLock compactorLock = new ReentrantReadWriteLock(); - HashSet replaceableRecords; + ByteObjectHashMap replaceableRecords; /** This will declare a record type as being replaceable on updates. * Certain update records only need the last value, and they could be replaceable during compacting. * */ @Override - public void replaceableRecord(int recordType) { + public void replaceableRecord(byte recordType) { if (replaceableRecords == null) { - replaceableRecords = new HashSet<>(); + replaceableRecords = new ByteObjectHashMap<>(); } - replaceableRecords.add(recordType); + replaceableRecords.put(recordType, Boolean.TRUE); } + @Override + public boolean isReplaceableRecord(byte recordType) { + return replaceableRecords != null && replaceableRecords.containsKey(recordType); + } + + private volatile JournalFile currentFile; private volatile JournalState state = JournalState.STOPPED; @@ -1108,10 +1116,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // computing the delete should be done after compacting is done if (jrnRecord == null) { if (compactor != null) { - compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize()); + compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize(), recordType); } } else { - jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize()); + jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize(), isReplaceableRecord(recordType)); } if (updateCallback != null) { @@ -1290,7 +1298,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal usedFile); } - tx.addPositive(usedFile, id, encodeSize); + tx.addPositive(usedFile, id, encodeSize, recordType); } catch (Throwable e) { logger.error("appendAddRecordTransactional:" + e, e); setErrorCondition(null, tx, e); @@ -1353,7 +1361,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal usedFile ); } - tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() ); + tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize(), recordType); } catch (Throwable e ) { logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e ); setErrorCondition(null, tx, e ); @@ -1987,7 +1995,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID()); if (replaceableRecords != null) { - replaceableRecords.forEach((i) -> compactor.replaceableRecord(i)); + replaceableRecords.forEach((k, v) -> compactor.replaceableRecord(k)); } transactions.forEach((id, pendingTransaction) -> { @@ -2124,7 +2132,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // have been deleted // just leaving some updates in this file - posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact + posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1, isReplaceableRecord(info.userRecordType)); // +1 = compact // count } } @@ -2172,7 +2180,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal transactions.put(transactionID, tnp); } - tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact + tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1, info.userRecordType); // +1 = compact // count } @@ -2620,7 +2628,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal for (JournalFile file : dataFiles) { totalLiveSize += file.getLiveSize(); - updateCount += file.getPosCount(); + updateCount += file.getReplaceableCount(); addRecord += file.getAddRecord(); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java index c1d47963ec..0be90c2015 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecord.java @@ -46,7 +46,7 @@ public class JournalRecord { addFile.incAddRecord(); } - void addUpdateFile(final JournalFile updateFile, final int bytes) { + void addUpdateFile(final JournalFile updateFile, final int bytes, boolean replaceableUpdate) { checkNotDeleted(); if (bytes == 0) { return; @@ -66,6 +66,9 @@ public class JournalRecord { fileUpdates.add(updateFile, bytes, 1); updateFile.incPosCount(); updateFile.addSize(bytes); + if (replaceableUpdate) { + updateFile.incReplaceableCount(); + } } void delete(final JournalFile file) { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java index c9c92f4407..6f9b40ca67 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java @@ -30,4 +30,6 @@ public interface JournalRecordProvider { JournalCompactor getCompactor(); ConcurrentLongHashMap getRecords(); + + boolean isReplaceableRecord(byte recordType); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java index ffc016a3a5..0cfd3697e3 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java @@ -202,7 +202,7 @@ public class JournalTransaction { } } - public void addPositive(final JournalFile file, final long id, final int size) { + public void addPositive(final JournalFile file, final long id, final int size, final byte userRecordType) { incCounter(file); addFile(file); @@ -211,7 +211,7 @@ public class JournalTransaction { pos = new ArrayList<>(); } - pos.add(new JournalUpdate(file, id, size)); + pos.add(new JournalUpdate(file, id, size, userRecordType)); } public void addNegative(final JournalFile file, final long id) { @@ -223,7 +223,7 @@ public class JournalTransaction { neg = new ArrayList<>(); } - neg.add(new JournalUpdate(file, id, 0)); + neg.add(new JournalUpdate(file, id, 0, (byte)0)); } /** @@ -254,13 +254,13 @@ public class JournalTransaction { // This is a case where the transaction was opened after compacting was started, // but the commit arrived while compacting was working // We need to cache the counter update, so compacting will take the correct files when it is done - compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size); + compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size, trUpdate.userRecordType); } else if (posFiles == null) { posFiles = new JournalRecord(trUpdate.file, trUpdate.size); journal.getRecords().put(trUpdate.id, posFiles); } else { - posFiles.addUpdateFile(trUpdate.file, trUpdate.size); + posFiles.addUpdateFile(trUpdate.file, trUpdate.size, journal.isReplaceableRecord(trUpdate.userRecordType)); } } } @@ -397,16 +397,19 @@ public class JournalTransaction { int size; + final byte userRecordType; + /** * @param file * @param id * @param size */ - private JournalUpdate(final JournalFile file, final long id, final int size) { + private JournalUpdate(final JournalFile file, final long id, final int size, final byte userRecordType) { super(); this.file = file; this.id = id; this.size = size; + this.userRecordType = userRecordType; } /** diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java index 54f83da387..a2b09975ab 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java @@ -744,6 +744,16 @@ public class ReclaimerTest extends ActiveMQTestBase { } } + @Override + public int getReplaceableCount() { + return 0; + } + + @Override + public void incReplaceableCount() { + + } + @Override public void incAddRecord() {