ARTEMIS-3261 Remove need to lookup replaceableRecords on the hot path
We known it's a replaceable record as part of the logic, no need to lookup the record type unless it's a reload from the system.
This commit is contained in:
parent
eb4723cdc1
commit
05498c350e
|
@ -239,7 +239,7 @@ public class RecoverMessages extends DBOption {
|
|||
public void markAsDataFile(JournalFile file) {
|
||||
|
||||
}
|
||||
}, null, reclaimed);
|
||||
}, null, reclaimed, null);
|
||||
}
|
||||
|
||||
targetJournal.flush();
|
||||
|
|
|
@ -234,7 +234,7 @@ public class DecodeJournal extends LockAbstract {
|
|||
byte userRecordType = parseByte("userRecordType", properties);
|
||||
boolean isUpdate = parseBoolean("isUpdate", properties);
|
||||
byte[] data = parseEncoding("data", properties);
|
||||
return new RecordInfo(id, userRecordType, data, isUpdate, (short) 0);
|
||||
return new RecordInfo(id, userRecordType, data, isUpdate, false, (short) 0);
|
||||
}
|
||||
|
||||
private static byte[] parseEncoding(final String name, final Properties properties) throws Exception {
|
||||
|
|
|
@ -412,7 +412,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
|
||||
// We actually only need the record ID in this instance.
|
||||
if (record.isTransactional()) {
|
||||
RecordInfo info = new RecordInfo(record.getId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount());
|
||||
RecordInfo info = new RecordInfo(record.getId(), record.getRecordType(), new byte[0], record.isUpdate(), false, record.getCompactCount());
|
||||
if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) {
|
||||
txHolder.recordsToDelete.add(info);
|
||||
} else {
|
||||
|
@ -498,7 +498,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
|
||||
public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableRecord) throws Exception {
|
||||
appendUpdateRecord(id, recordType, record, sync);
|
||||
}
|
||||
|
||||
|
@ -518,7 +518,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
|
||||
public void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableUpdate) throws Exception {
|
||||
appendUpdateRecord(id, recordType, persister, record, sync);
|
||||
}
|
||||
|
||||
|
@ -552,6 +552,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
Persister persister,
|
||||
Object record,
|
||||
boolean sync,
|
||||
boolean replaceableUpdate,
|
||||
JournalUpdateCallback updateCallback,
|
||||
IOCompletion completionCallback) throws Exception {
|
||||
appendUpdateRecord(id, recordType, persister, record, sync, completionCallback);
|
||||
|
|
|
@ -296,7 +296,7 @@ class JDBCJournalRecord {
|
|||
}
|
||||
|
||||
RecordInfo toRecordInfo() throws IOException {
|
||||
return new RecordInfo(getId(), getUserRecordType(), getRecordData(), isUpdate(), getCompactCount());
|
||||
return new RecordInfo(getId(), getUserRecordType(), getRecordData(), isUpdate(), false, getCompactCount());
|
||||
}
|
||||
|
||||
public boolean isTransactional() {
|
||||
|
|
|
@ -45,7 +45,7 @@ public class JDBCJournalLoaderCallbackTest {
|
|||
|
||||
JDBCJournalLoaderCallback cb = new JDBCJournalLoaderCallback(committedRecords, preparedTransactions, failureCallback, fixBadTX);
|
||||
|
||||
RecordInfo record = new RecordInfo(42, (byte) 0, null, false, (short) 0);
|
||||
RecordInfo record = new RecordInfo(42, (byte) 0, null, false, false, (short) 0);
|
||||
cb.addRecord(record);
|
||||
assertEquals(1, committedRecords.size());
|
||||
assertTrue(committedRecords.contains(record));
|
||||
|
|
|
@ -110,19 +110,19 @@ public interface Journal extends ActiveMQComponent {
|
|||
|
||||
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
|
||||
|
||||
void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception;
|
||||
void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableRecord) throws Exception;
|
||||
|
||||
default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
|
||||
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync);
|
||||
}
|
||||
|
||||
default void tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
|
||||
tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, updateCallback, sync);
|
||||
default void tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableRecord) throws Exception {
|
||||
tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, updateCallback, sync, replaceableRecord);
|
||||
}
|
||||
|
||||
void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
|
||||
|
||||
void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync) throws Exception;
|
||||
void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableUpdate) throws Exception;
|
||||
|
||||
default void appendUpdateRecord(long id,
|
||||
byte recordType,
|
||||
|
@ -136,9 +136,10 @@ public interface Journal extends ActiveMQComponent {
|
|||
byte recordType,
|
||||
EncodingSupport record,
|
||||
boolean sync,
|
||||
boolean replaceableUpdate,
|
||||
JournalUpdateCallback updateCallback,
|
||||
IOCompletion completionCallback) throws Exception {
|
||||
tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, updateCallback, completionCallback);
|
||||
tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, replaceableUpdate, updateCallback, completionCallback);
|
||||
}
|
||||
|
||||
void appendUpdateRecord(long id,
|
||||
|
@ -153,6 +154,7 @@ public interface Journal extends ActiveMQComponent {
|
|||
Persister persister,
|
||||
Object record,
|
||||
boolean sync,
|
||||
boolean replaceableUpdate,
|
||||
JournalUpdateCallback updateCallback,
|
||||
IOCompletion callback) throws Exception;
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ public class RecordInfo {
|
|||
final byte userRecordType,
|
||||
final byte[] data,
|
||||
final boolean isUpdate,
|
||||
final boolean replaceableUpdate,
|
||||
final short compactCount) {
|
||||
this.id = id;
|
||||
|
||||
|
@ -31,6 +32,8 @@ public class RecordInfo {
|
|||
|
||||
this.isUpdate = isUpdate;
|
||||
|
||||
this.replaceableUpdate = replaceableUpdate;
|
||||
|
||||
this.compactCount = compactCount;
|
||||
}
|
||||
|
||||
|
@ -49,6 +52,8 @@ public class RecordInfo {
|
|||
|
||||
public boolean isUpdate;
|
||||
|
||||
public boolean replaceableUpdate;
|
||||
|
||||
public byte getUserRecordType() {
|
||||
return userRecordType;
|
||||
}
|
||||
|
|
|
@ -170,7 +170,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
public void onReadAddRecord(final RecordInfo info) throws Exception {
|
||||
records.add(info);
|
||||
}
|
||||
}, wholeFileBufferRef);
|
||||
}, wholeFileBufferRef, false, null);
|
||||
|
||||
if (records.size() == 0) {
|
||||
// the record is damaged
|
||||
|
|
|
@ -228,6 +228,7 @@ public final class FileWrapperJournal extends JournalBase {
|
|||
Persister persister,
|
||||
Object record,
|
||||
boolean sync,
|
||||
boolean replaceableUpdate,
|
||||
JournalUpdateCallback updateCallback,
|
||||
IOCompletion callback) throws Exception {
|
||||
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record);
|
||||
|
|
|
@ -94,8 +94,9 @@ abstract class JournalBase implements Journal {
|
|||
final byte recordType,
|
||||
final byte[] record,
|
||||
JournalUpdateCallback updateCallback,
|
||||
final boolean sync) throws Exception {
|
||||
tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync);
|
||||
final boolean sync,
|
||||
final boolean replaceableRecord) throws Exception {
|
||||
tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync, replaceableRecord);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -163,10 +164,11 @@ abstract class JournalBase implements Journal {
|
|||
final Persister persister,
|
||||
final Object record,
|
||||
final JournalUpdateCallback updateCallback,
|
||||
final boolean sync) throws Exception {
|
||||
final boolean sync,
|
||||
final boolean replaceableUpdate) throws Exception {
|
||||
SyncIOCompletion callback = getSyncCallback(sync);
|
||||
|
||||
tryAppendUpdateRecord(id, recordType, persister, record, sync, updateCallback, callback);
|
||||
tryAppendUpdateRecord(id, recordType, persister, record, sync, replaceableUpdate, updateCallback, callback);
|
||||
|
||||
if (callback != null) {
|
||||
callback.waitCompletion();
|
||||
|
|
|
@ -144,20 +144,15 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
|
|||
pendingCommands.add(new DeleteCompactCommand(id, usedFile));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReplaceableRecord(byte recordType) {
|
||||
return journal.isReplaceableRecord(recordType);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param id
|
||||
* @param usedFile
|
||||
*/
|
||||
public void addCommandUpdate(final long id, final JournalFile usedFile, final int size, byte userRecordType) {
|
||||
public void addCommandUpdate(final long id, final JournalFile usedFile, final int size, final boolean replaceableUpdate) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("addCommandUpdate id " + id + " usedFile " + usedFile + " size " + size);
|
||||
}
|
||||
pendingCommands.add(new UpdateCompactCommand(id, usedFile, size, userRecordType));
|
||||
pendingCommands.add(new UpdateCompactCommand(id, usedFile, size, replaceableUpdate));
|
||||
}
|
||||
|
||||
private void checkSize(final int size) throws Exception {
|
||||
|
@ -278,7 +273,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
|
|||
|
||||
checkSize(record.getEncodeSize(), info.compactCount);
|
||||
|
||||
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize(), info.userRecordType);
|
||||
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize(), info.replaceableUpdate);
|
||||
|
||||
writeEncoder(record);
|
||||
}
|
||||
|
@ -472,7 +467,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
|
|||
if (newRecord == null) {
|
||||
ActiveMQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id);
|
||||
} else {
|
||||
newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize(), journal.isReplaceableRecord(info.userRecordType));
|
||||
newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize(), info.replaceableUpdate);
|
||||
}
|
||||
|
||||
writeEncoder(updateRecord);
|
||||
|
@ -502,7 +497,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
|
|||
|
||||
writeEncoder(updateRecordTX);
|
||||
|
||||
newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize(), info.userRecordType);
|
||||
newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize(), info.replaceableUpdate);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -566,17 +561,17 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
|
|||
|
||||
private final long id;
|
||||
|
||||
private final byte userRecordType;
|
||||
|
||||
private final JournalFile usedFile;
|
||||
|
||||
private final int size;
|
||||
|
||||
private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size, byte userRecordType) {
|
||||
private final boolean replaceableUpdate;
|
||||
|
||||
private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size, boolean replaceableUpdate) {
|
||||
this.id = id;
|
||||
this.usedFile = usedFile;
|
||||
this.size = size;
|
||||
this.userRecordType = userRecordType;
|
||||
this.replaceableUpdate = replaceableUpdate;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -585,7 +580,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
|
|||
if (updateRecord == null) {
|
||||
ActiveMQJournalLogger.LOGGER.noRecordDuringCompactReplay(id);
|
||||
} else {
|
||||
updateRecord.addUpdateFile(usedFile, size, journal.isReplaceableRecord(userRecordType));
|
||||
updateRecord.addUpdateFile(usedFile, size, replaceableUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -339,12 +339,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
replaceableRecords.put(recordType, Boolean.TRUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReplaceableRecord(byte recordType) {
|
||||
return replaceableRecords != null && replaceableRecords.containsKey(recordType);
|
||||
}
|
||||
|
||||
|
||||
private volatile JournalFile currentFile;
|
||||
|
||||
private volatile JournalState state = JournalState.STOPPED;
|
||||
|
@ -569,11 +563,19 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
return readJournalFile(fileFactory, file, reader, wholeFileBufferReference, false);
|
||||
}
|
||||
|
||||
public static int readJournalFile(final SequentialFileFactory fileFactory,
|
||||
final JournalFile file,
|
||||
final JournalReaderCallback reader,
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferReference,
|
||||
boolean reclaimed) throws Exception {
|
||||
return readJournalFile(fileFactory, file, reader, wholeFileBufferReference, reclaimed, null);
|
||||
}
|
||||
|
||||
public static int readJournalFile(final SequentialFileFactory fileFactory,
|
||||
final JournalFile file,
|
||||
final JournalReaderCallback reader,
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferReference,
|
||||
boolean reclaimed) throws Exception {
|
||||
boolean reclaimed, ByteObjectHashMap<Boolean> replaceableRecords) throws Exception {
|
||||
file.getFile().open(1, false);
|
||||
ByteBuffer wholeFileBuffer = null;
|
||||
try {
|
||||
|
@ -801,19 +803,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
logger.trace("reading " + recordID + ", userRecordType=" + userRecordType + ", compactCount=" + compactCount);
|
||||
}
|
||||
|
||||
boolean replaceableUpdate = replaceableRecords != null ? replaceableRecords.containsKey(userRecordType) : false;
|
||||
|
||||
switch (recordType) {
|
||||
case EVENT_RECORD: {
|
||||
reader.onReadEventRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
|
||||
reader.onReadEventRecord(new RecordInfo(recordID, userRecordType, record, false, replaceableUpdate, compactCount));
|
||||
break;
|
||||
}
|
||||
|
||||
case ADD_RECORD: {
|
||||
reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
|
||||
reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, false, compactCount));
|
||||
break;
|
||||
}
|
||||
|
||||
case UPDATE_RECORD: {
|
||||
reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, compactCount));
|
||||
reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, replaceableUpdate, compactCount));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -823,17 +827,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
}
|
||||
|
||||
case ADD_RECORD_TX: {
|
||||
reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, compactCount));
|
||||
reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, false, compactCount));
|
||||
break;
|
||||
}
|
||||
|
||||
case UPDATE_RECORD_TX: {
|
||||
reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, compactCount));
|
||||
reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, replaceableUpdate, compactCount));
|
||||
break;
|
||||
}
|
||||
|
||||
case DELETE_RECORD_TX: {
|
||||
reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte) 0, record, true, compactCount));
|
||||
reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte) 0, record, true, false, compactCount));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -899,7 +903,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
public static int readJournalFile(final SequentialFileFactory fileFactory,
|
||||
final JournalFile file,
|
||||
final JournalReaderCallback reader) throws Exception {
|
||||
return readJournalFile(fileFactory, file, reader, null);
|
||||
return readJournalFile(fileFactory, file, reader, null, false, null);
|
||||
}
|
||||
|
||||
// Journal implementation
|
||||
|
@ -1040,7 +1044,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
SimpleFuture<Boolean> future = new SimpleFutureImpl<>();
|
||||
|
||||
internalAppendUpdateRecord(id, recordType, persister, record, sync, (t, v) -> future.set(v), callback);
|
||||
internalAppendUpdateRecord(id, recordType, persister, record, sync, false, (t, v) -> future.set(v), callback);
|
||||
|
||||
if (!future.get()) {
|
||||
throw new IllegalStateException("Cannot find add info " + id);
|
||||
|
@ -1054,6 +1058,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
final Persister persister,
|
||||
final Object record,
|
||||
final boolean sync,
|
||||
final boolean replaceableUpdate,
|
||||
JournalUpdateCallback updateCallback,
|
||||
final IOCompletion callback) throws Exception {
|
||||
checkJournalIsLoaded();
|
||||
|
@ -1066,7 +1071,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
}
|
||||
|
||||
|
||||
internalAppendUpdateRecord(id, recordType, persister, record, sync, updateCallback, callback);
|
||||
internalAppendUpdateRecord(id, recordType, persister, record, sync, replaceableUpdate, updateCallback, callback);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1075,6 +1080,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
Persister persister,
|
||||
Object record,
|
||||
boolean sync,
|
||||
boolean replaceableUpdate,
|
||||
JournalUpdateCallback updateCallback,
|
||||
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
|
||||
appendExecutor.execute(new Runnable() {
|
||||
|
@ -1116,10 +1122,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
// computing the delete should be done after compacting is done
|
||||
if (jrnRecord == null) {
|
||||
if (compactor != null) {
|
||||
compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize(), recordType);
|
||||
compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize(), replaceableUpdate);
|
||||
}
|
||||
} else {
|
||||
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize(), isReplaceableRecord(recordType));
|
||||
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize(), replaceableUpdate);
|
||||
}
|
||||
|
||||
if (updateCallback != null) {
|
||||
|
@ -1298,7 +1304,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
usedFile);
|
||||
}
|
||||
|
||||
tx.addPositive(usedFile, id, encodeSize, recordType);
|
||||
tx.addPositive(usedFile, id, encodeSize, false);
|
||||
} catch (Throwable e) {
|
||||
logger.error("appendAddRecordTransactional:" + e, e);
|
||||
setErrorCondition(null, tx, e);
|
||||
|
@ -1361,7 +1367,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
usedFile );
|
||||
}
|
||||
|
||||
tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize(), recordType);
|
||||
tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize(), false);
|
||||
} catch (Throwable e ) {
|
||||
logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e );
|
||||
setErrorCondition(null, tx, e );
|
||||
|
@ -1845,7 +1851,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
try {
|
||||
for (final JournalFile file : dataFilesToProcess) {
|
||||
try {
|
||||
JournalImpl.readJournalFile(fileFactory, file, compactor, wholeFileBufferRef);
|
||||
JournalImpl.readJournalFile(fileFactory, file, compactor, wholeFileBufferRef, false, this.replaceableRecords);
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.compactReadError(file);
|
||||
throw new Exception("Error on reading compacting for " + file, e);
|
||||
|
@ -2132,7 +2138,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
// have been deleted
|
||||
// just leaving some updates in this file
|
||||
|
||||
posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1, isReplaceableRecord(info.userRecordType)); // +1 = compact
|
||||
posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1, info.replaceableUpdate); // +1 = compact
|
||||
// count
|
||||
}
|
||||
}
|
||||
|
@ -2180,7 +2186,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
transactions.put(transactionID, tnp);
|
||||
}
|
||||
|
||||
tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1, info.userRecordType); // +1 = compact
|
||||
tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1, info.replaceableUpdate); // +1 = compact
|
||||
// count
|
||||
}
|
||||
|
||||
|
@ -2320,7 +2326,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
hasData.lazySet(true);
|
||||
}
|
||||
|
||||
}, wholeFileBufferRef);
|
||||
}, wholeFileBufferRef, false, this.replaceableRecords);
|
||||
|
||||
if (hasData.get()) {
|
||||
lastDataPos = resultLastPost;
|
||||
|
|
|
@ -30,6 +30,4 @@ public interface JournalRecordProvider {
|
|||
JournalCompactor getCompactor();
|
||||
|
||||
ConcurrentLongHashMap<JournalRecord> getRecords();
|
||||
|
||||
boolean isReplaceableRecord(byte recordType);
|
||||
}
|
||||
|
|
|
@ -202,7 +202,7 @@ public class JournalTransaction {
|
|||
}
|
||||
}
|
||||
|
||||
public void addPositive(final JournalFile file, final long id, final int size, final byte userRecordType) {
|
||||
public void addPositive(final JournalFile file, final long id, final int size, final boolean replaceableRecord) {
|
||||
incCounter(file);
|
||||
|
||||
addFile(file);
|
||||
|
@ -211,7 +211,7 @@ public class JournalTransaction {
|
|||
pos = new ArrayList<>();
|
||||
}
|
||||
|
||||
pos.add(new JournalUpdate(file, id, size, userRecordType));
|
||||
pos.add(new JournalUpdate(file, id, size, replaceableRecord));
|
||||
}
|
||||
|
||||
public void addNegative(final JournalFile file, final long id) {
|
||||
|
@ -223,7 +223,7 @@ public class JournalTransaction {
|
|||
neg = new ArrayList<>();
|
||||
}
|
||||
|
||||
neg.add(new JournalUpdate(file, id, 0, (byte)0));
|
||||
neg.add(new JournalUpdate(file, id, 0, false));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -254,13 +254,13 @@ public class JournalTransaction {
|
|||
// This is a case where the transaction was opened after compacting was started,
|
||||
// but the commit arrived while compacting was working
|
||||
// We need to cache the counter update, so compacting will take the correct files when it is done
|
||||
compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size, trUpdate.userRecordType);
|
||||
compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size, trUpdate.replaceableUpdate);
|
||||
} else if (posFiles == null) {
|
||||
posFiles = new JournalRecord(trUpdate.file, trUpdate.size);
|
||||
|
||||
journal.getRecords().put(trUpdate.id, posFiles);
|
||||
} else {
|
||||
posFiles.addUpdateFile(trUpdate.file, trUpdate.size, journal.isReplaceableRecord(trUpdate.userRecordType));
|
||||
posFiles.addUpdateFile(trUpdate.file, trUpdate.size, trUpdate.replaceableUpdate);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -397,19 +397,19 @@ public class JournalTransaction {
|
|||
|
||||
int size;
|
||||
|
||||
final byte userRecordType;
|
||||
final boolean replaceableUpdate;
|
||||
|
||||
/**
|
||||
* @param file
|
||||
* @param id
|
||||
* @param size
|
||||
*/
|
||||
private JournalUpdate(final JournalFile file, final long id, final int size, final byte userRecordType) {
|
||||
private JournalUpdate(final JournalFile file, final long id, final int size, final boolean replaceableUpdate) {
|
||||
super();
|
||||
this.file = file;
|
||||
this.id = id;
|
||||
this.size = size;
|
||||
this.userRecordType = userRecordType;
|
||||
this.replaceableUpdate = replaceableUpdate;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -385,7 +385,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
@Override
|
||||
public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception {
|
||||
try (ArtemisCloseable lock = closeableReadLock()) {
|
||||
messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, this::messageUpdateCallback, getContext(last && syncNonTransactional));
|
||||
messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, false, this::messageUpdateCallback, getContext(last && syncNonTransactional));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -428,7 +428,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
@Override
|
||||
public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
|
||||
try (ArtemisCloseable lock = closeableReadLock()) {
|
||||
messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, this::messageUpdateCallback, getContext(syncNonTransactional));
|
||||
messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, false, this::messageUpdateCallback, getContext(syncNonTransactional));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -470,7 +470,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
|
||||
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID());
|
||||
try (ArtemisCloseable lock = closeableReadLock()) {
|
||||
messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, this::recordNotFoundCallback, getContext(syncNonTransactional));
|
||||
messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, true, this::recordNotFoundCallback, getContext(syncNonTransactional));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -702,7 +702,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());
|
||||
|
||||
try (ArtemisCloseable lock = closeableReadLock()) {
|
||||
messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, this::messageUpdateCallback, getContext(syncNonTransactional));
|
||||
messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, true, this::messageUpdateCallback, getContext(syncNonTransactional));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -309,7 +309,7 @@ public final class DescribeJournal {
|
|||
recordsPrintStream.println();
|
||||
}
|
||||
}
|
||||
}, null, reclaimed);
|
||||
}, null, reclaimed, null);
|
||||
}
|
||||
|
||||
recordsPrintStream.println();
|
||||
|
|
|
@ -401,9 +401,10 @@ public class ReplicatedJournal implements Journal {
|
|||
final byte recordType,
|
||||
final byte[] record,
|
||||
final JournalUpdateCallback updateCallback,
|
||||
final boolean sync) throws Exception {
|
||||
final boolean sync,
|
||||
final boolean replaceableRecord) throws Exception {
|
||||
|
||||
this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync);
|
||||
this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync, replaceableRecord);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -433,12 +434,12 @@ public class ReplicatedJournal implements Journal {
|
|||
final Persister persister,
|
||||
final Object record,
|
||||
final JournalUpdateCallback updateCallback,
|
||||
final boolean sync) throws Exception {
|
||||
final boolean sync, final boolean replaceable) throws Exception {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
|
||||
}
|
||||
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
|
||||
localJournal.tryAppendUpdateRecord(id, recordType, persister, record, updateCallback, sync);
|
||||
localJournal.tryAppendUpdateRecord(id, recordType, persister, record, updateCallback, sync, replaceable);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -461,13 +462,14 @@ public class ReplicatedJournal implements Journal {
|
|||
final Persister persister,
|
||||
final Object record,
|
||||
final boolean sync,
|
||||
final boolean replaceableUpdate,
|
||||
final JournalUpdateCallback updateCallback,
|
||||
final IOCompletion completionCallback) throws Exception {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType);
|
||||
}
|
||||
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
|
||||
localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, updateCallback, completionCallback);
|
||||
localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, replaceableUpdate, updateCallback, completionCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -701,7 +701,8 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
|||
byte recordType,
|
||||
Persister persister,
|
||||
Object record, JournalUpdateCallback updateCallback,
|
||||
boolean sync) throws Exception {
|
||||
boolean sync,
|
||||
boolean repalceableUpdate) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -719,7 +720,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
|||
byte recordType,
|
||||
Persister persister,
|
||||
Object record,
|
||||
boolean sync, JournalUpdateCallback updateCallback,
|
||||
boolean sync, boolean replaceableUpdate, JournalUpdateCallback updateCallback,
|
||||
IOCompletion callback) throws Exception {
|
||||
}
|
||||
|
||||
|
@ -844,7 +845,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception {
|
||||
public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceable) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -44,10 +44,12 @@ public class InfiniteRedeliverySmokeTest extends SmokeTestBase {
|
|||
|
||||
public static final String SERVER_NAME_0 = "infinite-redelivery";
|
||||
|
||||
Process serverProcess;
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
cleanupData(SERVER_NAME_0);
|
||||
startServer(SERVER_NAME_0, 0, 30000);
|
||||
serverProcess = startServer(SERVER_NAME_0, 0, 30000);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -89,6 +91,17 @@ public class InfiniteRedeliverySmokeTest extends SmokeTestBase {
|
|||
// as the real test I'm after here is the broker should clean itself up
|
||||
Wait.assertTrue("there are too many files created", () -> fileFactory.listFiles("amq").size() <= 20);
|
||||
|
||||
if (i % 100 == 0 && i > 0) {
|
||||
connection.close();
|
||||
serverProcess.destroyForcibly();
|
||||
Thread.sleep(1000);
|
||||
serverProcess = startServer(SERVER_NAME_0, 0, 3000);
|
||||
connection = factory.createConnection();
|
||||
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
consumer = session.createConsumer(queue);
|
||||
connection.start();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -409,7 +409,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
|
|||
|
||||
journal.appendAddRecord(element, (byte) 0, record, sync);
|
||||
|
||||
records.add(new RecordInfo(element, (byte) 0, record, false, (short) 0));
|
||||
records.add(new RecordInfo(element, (byte) 0, record, false, false, (short) 0));
|
||||
}
|
||||
|
||||
journal.debugWait();
|
||||
|
@ -422,11 +422,11 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
|
|||
|
||||
SimpleFutureImpl<Boolean> future = new SimpleFutureImpl();
|
||||
|
||||
journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, (r, b) -> future.set(b), sync);
|
||||
journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, (r, b) -> future.set(b), sync, false);
|
||||
|
||||
if (future.get()) {
|
||||
Assert.fail();
|
||||
records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, (short) 0));
|
||||
records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, false, (short) 0));
|
||||
}
|
||||
|
||||
return future.get();
|
||||
|
@ -440,7 +440,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
|
|||
|
||||
journal.appendUpdateRecord(element, (byte) 0, updateRecord, sync);
|
||||
|
||||
records.add(new RecordInfo(element, (byte) 0, updateRecord, true, (short) 0));
|
||||
records.add(new RecordInfo(element, (byte) 0, updateRecord, true, false, (short) 0));
|
||||
}
|
||||
|
||||
journal.debugWait();
|
||||
|
@ -485,7 +485,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
|
|||
|
||||
journal.appendAddRecordTransactional(txID, element, (byte) 0, record);
|
||||
|
||||
tx.records.add(new RecordInfo(element, (byte) 0, record, false, (short) 0));
|
||||
tx.records.add(new RecordInfo(element, (byte) 0, record, false, false, (short) 0));
|
||||
|
||||
}
|
||||
|
||||
|
@ -502,7 +502,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
|
|||
|
||||
journal.appendUpdateRecordTransactional(txID, element, (byte) 0, updateRecord);
|
||||
|
||||
tx.records.add(new RecordInfo(element, (byte) 0, updateRecord, true, (short) 0));
|
||||
tx.records.add(new RecordInfo(element, (byte) 0, updateRecord, true, false, (short) 0));
|
||||
}
|
||||
journal.debugWait();
|
||||
}
|
||||
|
@ -515,7 +515,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
|
|||
|
||||
journal.appendDeleteRecordTransactional(txID, element);
|
||||
|
||||
tx.deletes.add(new RecordInfo(element, (byte) 0, null, true, (short) 0));
|
||||
tx.deletes.add(new RecordInfo(element, (byte) 0, null, true, false, (short) 0));
|
||||
}
|
||||
|
||||
journal.debugWait();
|
||||
|
|
|
@ -2333,7 +2333,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
|
|||
|
||||
journal.appendAddRecord(i, (byte) 0, record, false);
|
||||
|
||||
records.add(new RecordInfo(i, (byte) 0, record, false, (short) 0));
|
||||
records.add(new RecordInfo(i, (byte) 0, record, false, false, (short) 0));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
|
@ -2341,7 +2341,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
|
|||
|
||||
journal.appendUpdateRecord(i, (byte) 0, record, false);
|
||||
|
||||
records.add(new RecordInfo(i, (byte) 0, record, true, (short) 0));
|
||||
records.add(new RecordInfo(i, (byte) 0, record, true, false, (short) 0));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
|
|
Loading…
Reference in New Issue