More changes on the reclaimer for better performance
This commit is contained in:
parent
98fa433bd1
commit
b8d6a374a0
|
@ -42,12 +42,18 @@ public interface JournalFile {
|
||||||
int getTotalNegativeToOthers();
|
int getTotalNegativeToOthers();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether this file's contents can deleted and the file reused.
|
* Whether this file additions all have a delete in some other file
|
||||||
*
|
|
||||||
* @param canDelete if {@code true} then this file's contents are unimportant and may be deleted
|
|
||||||
* at any time.
|
|
||||||
*/
|
*/
|
||||||
void setCanReclaim(boolean canDelete);
|
boolean isPosReclaimCriteria();
|
||||||
|
|
||||||
|
void setPosReclaimCriteria();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether this file deletes are on files that are either marked for reclaim or have already been reclaimed
|
||||||
|
*/
|
||||||
|
boolean isNegReclaimCriteria();
|
||||||
|
|
||||||
|
void setNegReclaimCriteria();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether this file's contents can deleted and the file reused.
|
* Whether this file's contents can deleted and the file reused.
|
||||||
|
|
|
@ -16,9 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.journal.impl;
|
package org.apache.activemq.artemis.core.journal.impl;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
|
@ -37,13 +37,15 @@ public class JournalFileImpl implements JournalFile {
|
||||||
|
|
||||||
private final AtomicInteger liveBytes = new AtomicInteger(0);
|
private final AtomicInteger liveBytes = new AtomicInteger(0);
|
||||||
|
|
||||||
private boolean canReclaim;
|
// Flags to be used by determine if the journal file can be reclaimed
|
||||||
|
private boolean posReclaimCriteria = false;
|
||||||
|
private boolean negReclaimCriteria = false;
|
||||||
|
|
||||||
private final AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
|
private final AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
|
||||||
|
|
||||||
private final int version;
|
private final int version;
|
||||||
|
|
||||||
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<>();
|
private final ConcurrentMap<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public JournalFileImpl(final SequentialFile file, final long fileID, final int version) {
|
public JournalFileImpl(final SequentialFile file, final long fileID, final int version) {
|
||||||
this.file = file;
|
this.file = file;
|
||||||
|
@ -61,13 +63,28 @@ public class JournalFileImpl implements JournalFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isCanReclaim() {
|
public boolean isPosReclaimCriteria() {
|
||||||
return canReclaim;
|
return posReclaimCriteria;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setCanReclaim(final boolean canReclaim) {
|
public void setPosReclaimCriteria() {
|
||||||
this.canReclaim = canReclaim;
|
this.posReclaimCriteria = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isNegReclaimCriteria() {
|
||||||
|
return negReclaimCriteria;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNegReclaimCriteria() {
|
||||||
|
this.negReclaimCriteria = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCanReclaim() {
|
||||||
|
return posReclaimCriteria && negReclaimCriteria;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -75,7 +92,10 @@ public class JournalFileImpl implements JournalFile {
|
||||||
if (file != this) {
|
if (file != this) {
|
||||||
totalNegativeToOthers.incrementAndGet();
|
totalNegativeToOthers.incrementAndGet();
|
||||||
}
|
}
|
||||||
getOrCreateNegCount(file).incrementAndGet();
|
AtomicInteger previous = negCounts.putIfAbsent(file, new AtomicInteger(1));
|
||||||
|
if (previous != null) {
|
||||||
|
previous.incrementAndGet();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -152,17 +172,6 @@ public class JournalFileImpl implements JournalFile {
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized AtomicInteger getOrCreateNegCount(final JournalFile file) {
|
|
||||||
AtomicInteger count = negCounts.get(file);
|
|
||||||
|
|
||||||
if (count == null) {
|
|
||||||
count = new AtomicInteger();
|
|
||||||
negCounts.put(file, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
return count;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addSize(final int bytes) {
|
public void addSize(final int bytes) {
|
||||||
liveBytes.addAndGet(bytes);
|
liveBytes.addAndGet(bytes);
|
||||||
|
|
|
@ -36,25 +36,36 @@ public class Reclaimer {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(Reclaimer.class);
|
private static final Logger logger = Logger.getLogger(Reclaimer.class);
|
||||||
|
|
||||||
|
// The files are scanned in two stages. First we only check for 2) and do so while that criteria is not met.
|
||||||
|
// When 2) is met, set the first reclaim flag in the journal. After that point only check for 1)
|
||||||
|
// until that criteria is met as well. When 1) is met we set the second flag and the file can be reclaimed.
|
||||||
|
|
||||||
public void scan(final JournalFile[] files) {
|
public void scan(final JournalFile[] files) {
|
||||||
for (int i = 0; i < files.length; i++) {
|
for (int i = 0; i < files.length; i++) {
|
||||||
JournalFile currentFile = files[i];
|
JournalFile currentFile = files[i];
|
||||||
currentFile.setCanReclaim(true);
|
|
||||||
|
|
||||||
// First we evaluate criterion 2)
|
// criterion 2) --- this file deletes are from pos on files marked for reclaim or reclaimed
|
||||||
for (int j = i - 1; j >= 0; j--) {
|
if (!currentFile.isNegReclaimCriteria()) {
|
||||||
|
boolean outstandingNeg = false;
|
||||||
|
|
||||||
|
for (int j = i - 1; j >= 0 && !outstandingNeg; j--) {
|
||||||
JournalFile file = files[j];
|
JournalFile file = files[j];
|
||||||
if (currentFile.getNegCount(file) != 0 && !file.isCanReclaim()) {
|
if (!file.isCanReclaim() && currentFile.getNegCount(file) != 0) {
|
||||||
logger.tracef("%s can't be reclaimed because %s has negative values", currentFile, file);
|
logger.tracef("%s can't be reclaimed because %s has negative values", currentFile, file);
|
||||||
currentFile.setCanReclaim(false);
|
outstandingNeg = true;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!currentFile.isCanReclaim()) {
|
|
||||||
|
if (outstandingNeg) {
|
||||||
continue; // Move to next file as we already know that this file can't be reclaimed because criterion 2)
|
continue; // Move to next file as we already know that this file can't be reclaimed because criterion 2)
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
currentFile.setNegReclaimCriteria();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Now we evaluate criterion 1)
|
// criterion 1) --- this files additions all have matching deletes
|
||||||
|
if (!currentFile.isPosReclaimCriteria()) {
|
||||||
int negCount = 0, posCount = currentFile.getPosCount();
|
int negCount = 0, posCount = currentFile.getPosCount();
|
||||||
logger.tracef("posCount on %s = %d", currentFile, posCount);
|
logger.tracef("posCount on %s = %d", currentFile, posCount);
|
||||||
|
|
||||||
|
@ -69,10 +80,11 @@ public class Reclaimer {
|
||||||
|
|
||||||
if (negCount < posCount ) {
|
if (negCount < posCount ) {
|
||||||
logger.tracef("%s can't be reclaimed because there are not enough negatives %d", currentFile, negCount);
|
logger.tracef("%s can't be reclaimed because there are not enough negatives %d", currentFile, negCount);
|
||||||
currentFile.setCanReclaim(false);
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
logger.tracef("%s can be reclaimed", currentFile);
|
logger.tracef("%s can be reclaimed", currentFile);
|
||||||
|
currentFile.setPosReclaimCriteria();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,22 +16,16 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.unit.core.journal.impl;
|
package org.apache.activemq.artemis.tests.unit.core.journal.impl;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
|
||||||
import org.junit.Before;
|
|
||||||
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||||
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||||
import org.apache.activemq.artemis.core.journal.impl.Reclaimer;
|
import org.apache.activemq.artemis.core.journal.impl.Reclaimer;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
public class ReclaimerTest extends ActiveMQTestBase {
|
public class ReclaimerTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
@ -722,42 +716,25 @@ public class ReclaimerTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
static final class MockJournalFile implements JournalFile {
|
static final class MockJournalFile implements JournalFile {
|
||||||
|
|
||||||
private final Set<Long> transactionIDs = new HashSet<>();
|
|
||||||
|
|
||||||
private final Set<Long> transactionTerminationIDs = new HashSet<>();
|
|
||||||
|
|
||||||
private final Set<Long> transactionPrepareIDs = new HashSet<>();
|
|
||||||
|
|
||||||
private final Map<JournalFile, Integer> negCounts = new HashMap<>();
|
private final Map<JournalFile, Integer> negCounts = new HashMap<>();
|
||||||
|
|
||||||
private int posCount;
|
private int posCount;
|
||||||
|
|
||||||
private boolean canDelete;
|
|
||||||
|
|
||||||
private boolean needCleanup;
|
|
||||||
|
|
||||||
private int totalDep;
|
private int totalDep;
|
||||||
|
|
||||||
public void extendOffset(final int delta) {
|
private boolean posReclaimCriteria;
|
||||||
}
|
private boolean negReclaimCriteria;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SequentialFile getFile() {
|
public SequentialFile getFile() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getOffset() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getFileID() {
|
public long getFileID() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setOffset(final long offset) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getNegCount(final JournalFile file) {
|
public int getNegCount(final JournalFile file) {
|
||||||
Integer count = negCounts.get(file);
|
Integer count = negCounts.get(file);
|
||||||
|
@ -797,75 +774,28 @@ public class ReclaimerTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isCanReclaim() {
|
public boolean isPosReclaimCriteria() {
|
||||||
return canDelete;
|
return posReclaimCriteria;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setCanReclaim(final boolean canDelete) {
|
public void setPosReclaimCriteria() {
|
||||||
this.canDelete = canDelete;
|
this.posReclaimCriteria = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addTransactionID(final long id) {
|
@Override
|
||||||
transactionIDs.add(id);
|
public boolean isNegReclaimCriteria() {
|
||||||
|
return negReclaimCriteria;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addTransactionPrepareID(final long id) {
|
@Override
|
||||||
transactionPrepareIDs.add(id);
|
public void setNegReclaimCriteria() {
|
||||||
|
this.negReclaimCriteria = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addTransactionTerminationID(final long id) {
|
@Override
|
||||||
transactionTerminationIDs.add(id);
|
public boolean isCanReclaim() {
|
||||||
}
|
return posReclaimCriteria && negReclaimCriteria;
|
||||||
|
|
||||||
public boolean containsTransactionID(final long id) {
|
|
||||||
return transactionIDs.contains(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean containsTransactionPrepareID(final long id) {
|
|
||||||
return transactionPrepareIDs.contains(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean containsTransactionTerminationID(final long id) {
|
|
||||||
return transactionTerminationIDs.contains(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Set<Long> getTranactionTerminationIDs() {
|
|
||||||
return transactionTerminationIDs;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Set<Long> getTransactionPrepareIDs() {
|
|
||||||
return transactionPrepareIDs;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Set<Long> getTransactionsIDs() {
|
|
||||||
return transactionIDs;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#decPendingTransaction()
|
|
||||||
*/
|
|
||||||
public void decPendingTransaction() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getPendingTransactions()
|
|
||||||
*/
|
|
||||||
public int getPendingTransactions() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#incPendingTransaction()
|
|
||||||
*/
|
|
||||||
public void incPendingTransaction() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getOrderingID()
|
|
||||||
*/
|
|
||||||
public int getOrderingID() {
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -876,74 +806,25 @@ public class ReclaimerTest extends ActiveMQTestBase {
|
||||||
public void decSize(final int bytes) {
|
public void decSize(final int bytes) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getSize()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public int getLiveSize() {
|
public int getLiveSize() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#isNeedCleanup()
|
|
||||||
*/
|
|
||||||
public boolean isNeedCleanup() {
|
|
||||||
return needCleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#resetNegCount(org.apache.activemq.artemis.core.journal.impl.JournalFile)
|
|
||||||
*/
|
|
||||||
public boolean resetNegCount(final JournalFile file) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#setNeedCleanup(boolean)
|
|
||||||
*/
|
|
||||||
public void setNeedCleanup(final boolean needCleanup) {
|
|
||||||
this.needCleanup = needCleanup;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getRecordID()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public int getRecordID() {
|
public int getRecordID() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getTotalNegativeToOthers()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public int getTotalNegativeToOthers() {
|
public int getTotalNegativeToOthers() {
|
||||||
return totalDep;
|
return totalDep;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getJournalVersion()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public int getJournalVersion() {
|
public int getJournalVersion() {
|
||||||
return JournalImpl.FORMAT_VERSION;
|
return JournalImpl.FORMAT_VERSION;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#getTotNeg()
|
|
||||||
*/
|
|
||||||
public int getTotNeg() {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.artemis.core.journal.impl.JournalFile#setTotNeg(int)
|
|
||||||
*/
|
|
||||||
public void setTotNeg(int totNeg) {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue