diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java index 83b3cf8b03..ade4da85d7 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java @@ -42,12 +42,18 @@ public interface JournalFile { int getTotalNegativeToOthers(); /** - * Whether this file's contents can deleted and the file reused. - * - * @param canDelete if {@code true} then this file's contents are unimportant and may be deleted - * at any time. + * Whether this file additions all have a delete in some other file */ - void setCanReclaim(boolean canDelete); + boolean isPosReclaimCriteria(); + + void setPosReclaimCriteria(); + + /** + * Whether this file deletes are on files that are either marked for reclaim or have already been reclaimed + */ + boolean isNegReclaimCriteria(); + + void setNegReclaimCriteria(); /** * Whether this file's contents can deleted and the file reused. diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java index 01d8000071..210fcd6606 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.artemis.core.journal.impl; -import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -37,13 +37,15 @@ public class JournalFileImpl implements JournalFile { private final AtomicInteger liveBytes = new AtomicInteger(0); - private boolean canReclaim; + // Flags to be used by determine if the journal file can be reclaimed + private boolean posReclaimCriteria = false; + private boolean negReclaimCriteria = false; private final AtomicInteger totalNegativeToOthers = new AtomicInteger(0); private final int version; - private final Map negCounts = new ConcurrentHashMap<>(); + private final ConcurrentMap negCounts = new ConcurrentHashMap<>(); public JournalFileImpl(final SequentialFile file, final long fileID, final int version) { this.file = file; @@ -61,13 +63,28 @@ public class JournalFileImpl implements JournalFile { } @Override - public boolean isCanReclaim() { - return canReclaim; + public boolean isPosReclaimCriteria() { + return posReclaimCriteria; } @Override - public void setCanReclaim(final boolean canReclaim) { - this.canReclaim = canReclaim; + public void setPosReclaimCriteria() { + this.posReclaimCriteria = true; + } + + @Override + public boolean isNegReclaimCriteria() { + return negReclaimCriteria; + } + + @Override + public void setNegReclaimCriteria() { + this.negReclaimCriteria = true; + } + + @Override + public boolean isCanReclaim() { + return posReclaimCriteria && negReclaimCriteria; } @Override @@ -75,7 +92,10 @@ public class JournalFileImpl implements JournalFile { if (file != this) { totalNegativeToOthers.incrementAndGet(); } - getOrCreateNegCount(file).incrementAndGet(); + AtomicInteger previous = negCounts.putIfAbsent(file, new AtomicInteger(1)); + if (previous != null) { + previous.incrementAndGet(); + } } @Override @@ -152,17 +172,6 @@ public class JournalFileImpl implements JournalFile { return builder.toString(); } - private synchronized AtomicInteger getOrCreateNegCount(final JournalFile file) { - AtomicInteger count = negCounts.get(file); - - if (count == null) { - count = new AtomicInteger(); - negCounts.put(file, count); - } - - return count; - } - @Override public void addSize(final int bytes) { liveBytes.addAndGet(bytes); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/Reclaimer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/Reclaimer.java index c83e2afa7f..accbe80ef4 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/Reclaimer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/Reclaimer.java @@ -36,43 +36,55 @@ public class Reclaimer { private static final Logger logger = Logger.getLogger(Reclaimer.class); + // The files are scanned in two stages. First we only check for 2) and do so while that criteria is not met. + // When 2) is met, set the first reclaim flag in the journal. After that point only check for 1) + // until that criteria is met as well. When 1) is met we set the second flag and the file can be reclaimed. + public void scan(final JournalFile[] files) { for (int i = 0; i < files.length; i++) { JournalFile currentFile = files[i]; - currentFile.setCanReclaim(true); - // First we evaluate criterion 2) - for (int j = i - 1; j >= 0; j--) { - JournalFile file = files[j]; - if (currentFile.getNegCount(file) != 0 && !file.isCanReclaim()) { - logger.tracef("%s can't be reclaimed because %s has negative values", currentFile, file); - currentFile.setCanReclaim(false); - break; + // criterion 2) --- this file deletes are from pos on files marked for reclaim or reclaimed + if (!currentFile.isNegReclaimCriteria()) { + boolean outstandingNeg = false; + + for (int j = i - 1; j >= 0 && !outstandingNeg; j--) { + JournalFile file = files[j]; + if (!file.isCanReclaim() && currentFile.getNegCount(file) != 0) { + logger.tracef("%s can't be reclaimed because %s has negative values", currentFile, file); + outstandingNeg = true; + } } - } - if (!currentFile.isCanReclaim()) { - continue; // Move to next file as we already know that this file can't be reclaimed because criterion 2) - } - // Now we evaluate criterion 1) - int negCount = 0, posCount = currentFile.getPosCount(); - logger.tracef("posCount on %s = %d", currentFile, posCount); - - for (int j = i; j < files.length && negCount < posCount; j++) { - int toNeg = files[j].getNegCount(currentFile); - negCount += toNeg; - - if (logger.isTraceEnabled() && toNeg != 0) { - logger.tracef("Negative from %s into %s = %d", files[j], currentFile, toNeg); + if (outstandingNeg) { + continue; // Move to next file as we already know that this file can't be reclaimed because criterion 2) + } + else { + currentFile.setNegReclaimCriteria(); } } - if (negCount < posCount ) { - logger.tracef("%s can't be reclaimed because there are not enough negatives %d", currentFile, negCount); - currentFile.setCanReclaim(false); - } - else { - logger.tracef("%s can be reclaimed", currentFile); + // criterion 1) --- this files additions all have matching deletes + if (!currentFile.isPosReclaimCriteria()) { + int negCount = 0, posCount = currentFile.getPosCount(); + logger.tracef("posCount on %s = %d", currentFile, posCount); + + for (int j = i; j < files.length && negCount < posCount; j++) { + int toNeg = files[j].getNegCount(currentFile); + negCount += toNeg; + + if (logger.isTraceEnabled() && toNeg != 0) { + logger.tracef("Negative from %s into %s = %d", files[j], currentFile, toNeg); + } + } + + if (negCount < posCount ) { + logger.tracef("%s can't be reclaimed because there are not enough negatives %d", currentFile, negCount); + } + else { + logger.tracef("%s can be reclaimed", currentFile); + currentFile.setPosReclaimCriteria(); + } } } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java index 39132a5dc1..8216800740 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java @@ -16,22 +16,16 @@ */ package org.apache.activemq.artemis.tests.unit.core.journal.impl; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.junit.Before; - -import org.junit.Test; - import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; - -import org.junit.Assert; - import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.Reclaimer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; public class ReclaimerTest extends ActiveMQTestBase { @@ -722,42 +716,25 @@ public class ReclaimerTest extends ActiveMQTestBase { static final class MockJournalFile implements JournalFile { - private final Set transactionIDs = new HashSet<>(); - - private final Set transactionTerminationIDs = new HashSet<>(); - - private final Set transactionPrepareIDs = new HashSet<>(); - private final Map negCounts = new HashMap<>(); private int posCount; - private boolean canDelete; - - private boolean needCleanup; - private int totalDep; - public void extendOffset(final int delta) { - } + private boolean posReclaimCriteria; + private boolean negReclaimCriteria; @Override public SequentialFile getFile() { return null; } - public long getOffset() { - return 0; - } - @Override public long getFileID() { return 0; } - public void setOffset(final long offset) { - } - @Override public int getNegCount(final JournalFile file) { Integer count = negCounts.get(file); @@ -797,75 +774,28 @@ public class ReclaimerTest extends ActiveMQTestBase { } @Override - public boolean isCanReclaim() { - return canDelete; + public boolean isPosReclaimCriteria() { + return posReclaimCriteria; } @Override - public void setCanReclaim(final boolean canDelete) { - this.canDelete = canDelete; + public void setPosReclaimCriteria() { + this.posReclaimCriteria = true; } - public void addTransactionID(final long id) { - transactionIDs.add(id); + @Override + public boolean isNegReclaimCriteria() { + return negReclaimCriteria; } - public void addTransactionPrepareID(final long id) { - transactionPrepareIDs.add(id); + @Override + public void setNegReclaimCriteria() { + this.negReclaimCriteria = true; } - public void addTransactionTerminationID(final long id) { - transactionTerminationIDs.add(id); - } - - public boolean containsTransactionID(final long id) { - return transactionIDs.contains(id); - } - - public boolean containsTransactionPrepareID(final long id) { - return transactionPrepareIDs.contains(id); - } - - public boolean containsTransactionTerminationID(final long id) { - return transactionTerminationIDs.contains(id); - } - - public Set getTranactionTerminationIDs() { - return transactionTerminationIDs; - } - - public Set getTransactionPrepareIDs() { - return transactionPrepareIDs; - } - - public Set getTransactionsIDs() { - return transactionIDs; - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#decPendingTransaction() - */ - public void decPendingTransaction() { - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getPendingTransactions() - */ - public int getPendingTransactions() { - return 0; - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#incPendingTransaction() - */ - public void incPendingTransaction() { - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getOrderingID() - */ - public int getOrderingID() { - return 0; + @Override + public boolean isCanReclaim() { + return posReclaimCriteria && negReclaimCriteria; } @Override @@ -876,74 +806,25 @@ public class ReclaimerTest extends ActiveMQTestBase { public void decSize(final int bytes) { } - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getSize() - */ @Override public int getLiveSize() { return 0; } - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#isNeedCleanup() - */ - public boolean isNeedCleanup() { - return needCleanup; - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#resetNegCount(org.apache.activemq.artemis.core.journal.impl.JournalFile) - */ - public boolean resetNegCount(final JournalFile file) { - return false; - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#setNeedCleanup(boolean) - */ - public void setNeedCleanup(final boolean needCleanup) { - this.needCleanup = needCleanup; - - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getRecordID() - */ @Override public int getRecordID() { return 0; } - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getTotalNegativeToOthers() - */ @Override public int getTotalNegativeToOthers() { return totalDep; } - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getJournalVersion() - */ @Override public int getJournalVersion() { return JournalImpl.FORMAT_VERSION; } - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getTotNeg() - */ - public int getTotNeg() { - // TODO Auto-generated method stub - return 0; - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.impl.JournalFile#setTotNeg(int) - */ - public void setTotNeg(int totNeg) { - // TODO Auto-generated method stub - - } } }