ARTEMIS-3261 Updating logic to use only replaceable records on compacting verification

This commit is contained in:
Clebert Suconic 2021-06-04 18:15:58 -04:00 committed by clebertsuconic
parent 686a61dd65
commit 0edf599adc
11 changed files with 83 additions and 33 deletions

View File

@ -60,14 +60,14 @@ public final class CompactJournal extends LockAbstract {
final int poolFiles, final int poolFiles,
final int fileSize, final int fileSize,
final IOCriticalErrorListener listener, final IOCriticalErrorListener listener,
int... replaceableRecords) throws Exception { byte... replaceableRecords) throws Exception {
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1); NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
if (historyFolder != null) { if (historyFolder != null) {
journal.setHistoryFolder(historyFolder, -1, -1); journal.setHistoryFolder(historyFolder, -1, -1);
} }
for (int i : replaceableRecords) { for (byte i : replaceableRecords) {
journal.replaceableRecord(i); journal.replaceableRecord(i);
} }
journal.setRemoveExtraFilesOnLoad(true); journal.setRemoveExtraFilesOnLoad(true);

View File

@ -105,7 +105,7 @@ public interface Journal extends ActiveMQComponent {
appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback);
} }
default void replaceableRecord(int recordType) { default void replaceableRecord(byte recordType) {
} }
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;

View File

@ -60,7 +60,7 @@ public final class FileWrapperJournal extends JournalBase {
protected volatile JournalFile currentFile; protected volatile JournalFile currentFile;
@Override @Override
public void replaceableRecord(int recordType) { public void replaceableRecord(byte recordType) {
journal.replaceableRecord(recordType); journal.replaceableRecord(recordType);
} }

View File

@ -144,15 +144,20 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
pendingCommands.add(new DeleteCompactCommand(id, usedFile)); pendingCommands.add(new DeleteCompactCommand(id, usedFile));
} }
@Override
public boolean isReplaceableRecord(byte recordType) {
return journal.isReplaceableRecord(recordType);
}
/** /**
* @param id * @param id
* @param usedFile * @param usedFile
*/ */
public void addCommandUpdate(final long id, final JournalFile usedFile, final int size) { public void addCommandUpdate(final long id, final JournalFile usedFile, final int size, byte userRecordType) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("addCommandUpdate id " + id + " usedFile " + usedFile + " size " + size); logger.trace("addCommandUpdate id " + id + " usedFile " + usedFile + " size " + size);
} }
pendingCommands.add(new UpdateCompactCommand(id, usedFile, size)); pendingCommands.add(new UpdateCompactCommand(id, usedFile, size, userRecordType));
} }
private void checkSize(final int size) throws Exception { private void checkSize(final int size) throws Exception {
@ -273,7 +278,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
checkSize(record.getEncodeSize(), info.compactCount); checkSize(record.getEncodeSize(), info.compactCount);
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize()); newTransaction.addPositive(currentFile, info.id, record.getEncodeSize(), info.userRecordType);
writeEncoder(record); writeEncoder(record);
} }
@ -433,7 +438,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
newTransaction.rollback(currentFile); newTransaction.rollback(currentFile);
} }
public void replaceableRecord(int recordType) { public void replaceableRecord(byte recordType) {
LongObjectHashMap<RunnableEx> longmap = new LongObjectHashMap(); LongObjectHashMap<RunnableEx> longmap = new LongObjectHashMap();
pendingUpdates.put(recordType, longmap); pendingUpdates.put(recordType, longmap);
} }
@ -467,7 +472,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
if (newRecord == null) { if (newRecord == null) {
ActiveMQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id); ActiveMQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id);
} else { } else {
newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize()); newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize(), journal.isReplaceableRecord(info.userRecordType));
} }
writeEncoder(updateRecord); writeEncoder(updateRecord);
@ -497,7 +502,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
writeEncoder(updateRecordTX); writeEncoder(updateRecordTX);
newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize()); newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize(), info.userRecordType);
} }
/** /**
@ -561,14 +566,17 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
private final long id; private final long id;
private final byte userRecordType;
private final JournalFile usedFile; private final JournalFile usedFile;
private final int size; private final int size;
private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size) { private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size, byte userRecordType) {
this.id = id; this.id = id;
this.usedFile = usedFile; this.usedFile = usedFile;
this.size = size; this.size = size;
this.userRecordType = userRecordType;
} }
@Override @Override
@ -577,7 +585,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
if (updateRecord == null) { if (updateRecord == null) {
ActiveMQJournalLogger.LOGGER.noRecordDuringCompactReplay(id); ActiveMQJournalLogger.LOGGER.noRecordDuringCompactReplay(id);
} else { } else {
updateRecord.addUpdateFile(usedFile, size); updateRecord.addUpdateFile(usedFile, size, journal.isReplaceableRecord(userRecordType));
} }
} }

View File

@ -39,6 +39,10 @@ public interface JournalFile {
void decPosCount(); void decPosCount();
int getReplaceableCount();
void incReplaceableCount();
void incAddRecord(); void incAddRecord();
int getAddRecord(); int getAddRecord();

View File

@ -42,10 +42,12 @@ public class JournalFileImpl implements JournalFile {
this.reclaimable = reclaimable; this.reclaimable = reclaimable;
} }
private static final AtomicIntegerFieldUpdater<JournalFileImpl> replaceableCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "replaceableCountField");
private static final AtomicIntegerFieldUpdater<JournalFileImpl> posCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "posCountField"); private static final AtomicIntegerFieldUpdater<JournalFileImpl> posCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "posCountField");
private static final AtomicIntegerFieldUpdater<JournalFileImpl> addRecordUpdate = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "addRecordField"); private static final AtomicIntegerFieldUpdater<JournalFileImpl> addRecordUpdate = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "addRecordField");
private static final AtomicIntegerFieldUpdater<JournalFileImpl> liveBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "liveBytesField"); private static final AtomicIntegerFieldUpdater<JournalFileImpl> liveBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "liveBytesField");
private volatile int replaceableCountField = 0;
private volatile int posCountField = 0; private volatile int posCountField = 0;
private volatile int addRecordField = 0; private volatile int addRecordField = 0;
private volatile int liveBytesField = 0; private volatile int liveBytesField = 0;
@ -77,6 +79,16 @@ public class JournalFileImpl implements JournalFile {
return posCountUpdater.get(this); return posCountUpdater.get(this);
} }
@Override
public int getReplaceableCount() {
return replaceableCountUpdater.get(this);
}
@Override
public void incReplaceableCount() {
replaceableCountUpdater.incrementAndGet(this);
}
@Override @Override
public boolean isPosReclaimCriteria() { public boolean isPosReclaimCriteria() {
return posReclaimCriteria; return posReclaimCriteria;

View File

@ -31,7 +31,6 @@ import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.GregorianCalendar; import java.util.GregorianCalendar;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -51,6 +50,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate; import java.util.function.Predicate;
import io.netty.util.collection.ByteObjectHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -114,6 +114,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* *
* To update this value, define a System Property org.apache.activemq.artemis.core.journal.impl.JournalImpl.UPDATE_FACTOR=YOUR VALUE * To update this value, define a System Property org.apache.activemq.artemis.core.journal.impl.JournalImpl.UPDATE_FACTOR=YOUR VALUE
* *
* We only calculate this against replaceable updates, on this case for redelivery counts and rescheduled redelivery in artemis server
*
* */ * */
public static final double UPDATE_FACTOR; public static final double UPDATE_FACTOR;
private static final String BKP_EXTENSION = "bkp"; private static final String BKP_EXTENSION = "bkp";
@ -125,7 +127,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
double value; double value;
try { try {
if (UPDATE_FACTOR_STR == null) { if (UPDATE_FACTOR_STR == null) {
value = 100; value = 10;
} else { } else {
value = Double.parseDouble(UPDATE_FACTOR_STR); value = Double.parseDouble(UPDATE_FACTOR_STR);
} }
@ -323,20 +325,26 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private final ReadWriteLock journalLock = new ReentrantReadWriteLock(); private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
private final ReadWriteLock compactorLock = new ReentrantReadWriteLock(); private final ReadWriteLock compactorLock = new ReentrantReadWriteLock();
HashSet<Integer> replaceableRecords; ByteObjectHashMap<Boolean> replaceableRecords;
/** This will declare a record type as being replaceable on updates. /** This will declare a record type as being replaceable on updates.
* Certain update records only need the last value, and they could be replaceable during compacting. * Certain update records only need the last value, and they could be replaceable during compacting.
* */ * */
@Override @Override
public void replaceableRecord(int recordType) { public void replaceableRecord(byte recordType) {
if (replaceableRecords == null) { if (replaceableRecords == null) {
replaceableRecords = new HashSet<>(); replaceableRecords = new ByteObjectHashMap<>();
} }
replaceableRecords.add(recordType); replaceableRecords.put(recordType, Boolean.TRUE);
} }
@Override
public boolean isReplaceableRecord(byte recordType) {
return replaceableRecords != null && replaceableRecords.containsKey(recordType);
}
private volatile JournalFile currentFile; private volatile JournalFile currentFile;
private volatile JournalState state = JournalState.STOPPED; private volatile JournalState state = JournalState.STOPPED;
@ -1108,10 +1116,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// computing the delete should be done after compacting is done // computing the delete should be done after compacting is done
if (jrnRecord == null) { if (jrnRecord == null) {
if (compactor != null) { if (compactor != null) {
compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize()); compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize(), recordType);
} }
} else { } else {
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize()); jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize(), isReplaceableRecord(recordType));
} }
if (updateCallback != null) { if (updateCallback != null) {
@ -1290,7 +1298,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
usedFile); usedFile);
} }
tx.addPositive(usedFile, id, encodeSize); tx.addPositive(usedFile, id, encodeSize, recordType);
} catch (Throwable e) { } catch (Throwable e) {
logger.error("appendAddRecordTransactional:" + e, e); logger.error("appendAddRecordTransactional:" + e, e);
setErrorCondition(null, tx, e); setErrorCondition(null, tx, e);
@ -1353,7 +1361,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
usedFile ); usedFile );
} }
tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() ); tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize(), recordType);
} catch (Throwable e ) { } catch (Throwable e ) {
logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e ); logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e );
setErrorCondition(null, tx, e ); setErrorCondition(null, tx, e );
@ -1987,7 +1995,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID()); compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID());
if (replaceableRecords != null) { if (replaceableRecords != null) {
replaceableRecords.forEach((i) -> compactor.replaceableRecord(i)); replaceableRecords.forEach((k, v) -> compactor.replaceableRecord(k));
} }
transactions.forEach((id, pendingTransaction) -> { transactions.forEach((id, pendingTransaction) -> {
@ -2124,7 +2132,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// have been deleted // have been deleted
// just leaving some updates in this file // just leaving some updates in this file
posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1, isReplaceableRecord(info.userRecordType)); // +1 = compact
// count // count
} }
} }
@ -2172,7 +2180,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
transactions.put(transactionID, tnp); transactions.put(transactionID, tnp);
} }
tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1, info.userRecordType); // +1 = compact
// count // count
} }
@ -2620,7 +2628,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
for (JournalFile file : dataFiles) { for (JournalFile file : dataFiles) {
totalLiveSize += file.getLiveSize(); totalLiveSize += file.getLiveSize();
updateCount += file.getPosCount(); updateCount += file.getReplaceableCount();
addRecord += file.getAddRecord(); addRecord += file.getAddRecord();
} }

View File

@ -46,7 +46,7 @@ public class JournalRecord {
addFile.incAddRecord(); addFile.incAddRecord();
} }
void addUpdateFile(final JournalFile updateFile, final int bytes) { void addUpdateFile(final JournalFile updateFile, final int bytes, boolean replaceableUpdate) {
checkNotDeleted(); checkNotDeleted();
if (bytes == 0) { if (bytes == 0) {
return; return;
@ -66,6 +66,9 @@ public class JournalRecord {
fileUpdates.add(updateFile, bytes, 1); fileUpdates.add(updateFile, bytes, 1);
updateFile.incPosCount(); updateFile.incPosCount();
updateFile.addSize(bytes); updateFile.addSize(bytes);
if (replaceableUpdate) {
updateFile.incReplaceableCount();
}
} }
void delete(final JournalFile file) { void delete(final JournalFile file) {

View File

@ -30,4 +30,6 @@ public interface JournalRecordProvider {
JournalCompactor getCompactor(); JournalCompactor getCompactor();
ConcurrentLongHashMap<JournalRecord> getRecords(); ConcurrentLongHashMap<JournalRecord> getRecords();
boolean isReplaceableRecord(byte recordType);
} }

View File

@ -202,7 +202,7 @@ public class JournalTransaction {
} }
} }
public void addPositive(final JournalFile file, final long id, final int size) { public void addPositive(final JournalFile file, final long id, final int size, final byte userRecordType) {
incCounter(file); incCounter(file);
addFile(file); addFile(file);
@ -211,7 +211,7 @@ public class JournalTransaction {
pos = new ArrayList<>(); pos = new ArrayList<>();
} }
pos.add(new JournalUpdate(file, id, size)); pos.add(new JournalUpdate(file, id, size, userRecordType));
} }
public void addNegative(final JournalFile file, final long id) { public void addNegative(final JournalFile file, final long id) {
@ -223,7 +223,7 @@ public class JournalTransaction {
neg = new ArrayList<>(); neg = new ArrayList<>();
} }
neg.add(new JournalUpdate(file, id, 0)); neg.add(new JournalUpdate(file, id, 0, (byte)0));
} }
/** /**
@ -254,13 +254,13 @@ public class JournalTransaction {
// This is a case where the transaction was opened after compacting was started, // This is a case where the transaction was opened after compacting was started,
// but the commit arrived while compacting was working // but the commit arrived while compacting was working
// We need to cache the counter update, so compacting will take the correct files when it is done // We need to cache the counter update, so compacting will take the correct files when it is done
compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size); compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size, trUpdate.userRecordType);
} else if (posFiles == null) { } else if (posFiles == null) {
posFiles = new JournalRecord(trUpdate.file, trUpdate.size); posFiles = new JournalRecord(trUpdate.file, trUpdate.size);
journal.getRecords().put(trUpdate.id, posFiles); journal.getRecords().put(trUpdate.id, posFiles);
} else { } else {
posFiles.addUpdateFile(trUpdate.file, trUpdate.size); posFiles.addUpdateFile(trUpdate.file, trUpdate.size, journal.isReplaceableRecord(trUpdate.userRecordType));
} }
} }
} }
@ -397,16 +397,19 @@ public class JournalTransaction {
int size; int size;
final byte userRecordType;
/** /**
* @param file * @param file
* @param id * @param id
* @param size * @param size
*/ */
private JournalUpdate(final JournalFile file, final long id, final int size) { private JournalUpdate(final JournalFile file, final long id, final int size, final byte userRecordType) {
super(); super();
this.file = file; this.file = file;
this.id = id; this.id = id;
this.size = size; this.size = size;
this.userRecordType = userRecordType;
} }
/** /**

View File

@ -744,6 +744,16 @@ public class ReclaimerTest extends ActiveMQTestBase {
} }
} }
@Override
public int getReplaceableCount() {
return 0;
}
@Override
public void incReplaceableCount() {
}
@Override @Override
public void incAddRecord() { public void incAddRecord() {