ARTEMIS-2602 load surviving records into SparseArrayLinkedList

This commit is contained in:
Francesco Nigro 2020-01-17 15:21:11 +01:00 committed by Clebert Suconic
parent 4cc6464ddd
commit b10d765139
10 changed files with 359 additions and 356 deletions

View File

@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -895,6 +896,17 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
callback.storeLineUp();
}
@Override
public synchronized JournalLoadInformation load(final Consumer<RecordInfo> onCommittedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
final boolean fixBadTX) throws Exception {
final List<RecordInfo> committedRecords = new ArrayList<>();
final JournalLoadInformation journalLoadInformation = load(committedRecords, preparedTransactions, failureCallback, fixBadTX);
committedRecords.forEach(onCommittedRecords);
return journalLoadInformation;
}
@Override
public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.journal;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
@ -195,12 +196,25 @@ public interface Journal extends ActiveMQComponent {
void lineUpContext(IOCompletion callback);
default JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception {
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception {
return load(committedRecords == null ? null : committedRecords::add, preparedTransactions, transactionFailure, true);
}
default JournalLoadInformation load(Consumer<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception {
return load(committedRecords, preparedTransactions, transactionFailure, true);
}
JournalLoadInformation load(List<RecordInfo> committedRecords,
default JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure,
boolean fixBadTx) throws Exception {
return load(committedRecords == null ? null : committedRecords::add, preparedTransactions, transactionFailure, fixBadTx);
}
JournalLoadInformation load(Consumer<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure,
boolean fixBadTx) throws Exception;

View File

@ -20,6 +20,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQUnsupportedPacketException;
@ -267,7 +268,7 @@ public final class FileWrapperJournal extends JournalBase {
}
@Override
public JournalLoadInformation load(List<RecordInfo> committedRecords,
public JournalLoadInformation load(Consumer<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure,
boolean fixbadtx) throws Exception {

View File

@ -1445,7 +1445,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* @see JournalImpl#load(LoaderCallback)
*/
@Override
public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
public synchronized JournalLoadInformation load(final Consumer<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
final boolean fixBadTX) throws Exception {
@ -1514,7 +1514,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final Consumer<RecordInfo> fillCommittedRecord = record -> {
if (!recordsToDelete.contains(record.id)) {
committedRecords.add(record);
committedRecords.accept(record);
}
};
// it helps GC by cleaning up each SparseArray too

View File

@ -23,11 +23,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
import java.io.File;
import java.io.FileInputStream;
import java.security.DigestInputStream;
import java.security.InvalidParameterException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -111,10 +107,10 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.jboss.logging.Logger;
@ -258,25 +254,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
OperationContextImpl.clearContext();
}
public static String md5(File file) {
try {
byte[] buffer = new byte[1 << 4];
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] digest;
try (FileInputStream is = new FileInputStream(file);
DigestInputStream is2 = new DigestInputStream(is, md)) {
while (is2.read(buffer) > 0) {
continue;
}
digest = md.digest();
}
return Base64.encodeBytes(digest);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public IDGenerator getIDGenerator() {
return idGenerator;
}
@ -853,17 +830,17 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
final Set<Pair<Long, Long>> pendingLargeMessages,
List<PageCountPending> pendingNonTXPageCounter,
final JournalLoader journalLoader) throws Exception {
List<RecordInfo> records = new ArrayList<>();
SparseArrayLinkedList<RecordInfo> records = new SparseArrayLinkedList<>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
Set<PageTransactionInfo> invalidPageTransactions = null;
Set<PageTransactionInfo> invalidPageTransactions = new HashSet<>();
Map<Long, Message> messages = new HashMap<>();
readLock();
try {
JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this, messages));
JournalLoadInformation info = messageJournal.load(records::add, preparedTransactions, new LargeMessageTXFailureCallback(this));
ArrayList<LargeServerMessage> largeMessages = new ArrayList<>();
@ -871,317 +848,321 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
Map<Long, PageSubscription> pageSubscriptions = new HashMap<>();
final int totalSize = records.size();
final long totalSize = records.size();
for (int reccount = 0; reccount < totalSize; reccount++) {
// It will show log.info only with large journals (more than 1 million records)
if (reccount > 0 && reccount % 1000000 == 0) {
long percent = (long) ((((double) reccount) / ((double) totalSize)) * 100f);
final class MutableLong {
ActiveMQServerLogger.LOGGER.percentLoaded(percent);
}
RecordInfo record = records.get(reccount);
byte[] data = record.data;
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
byte recordType = record.getUserRecordType();
switch (recordType) {
case JournalRecordIds.ADD_LARGE_MESSAGE_PENDING: {
PendingLargeMessageEncoding pending = new PendingLargeMessageEncoding();
pending.decode(buff);
if (pendingLargeMessages != null) {
// it could be null on tests, and we don't need anything on that case
pendingLargeMessages.add(new Pair<>(record.id, pending.largeMessageID));
}
break;
}
case JournalRecordIds.ADD_LARGE_MESSAGE: {
LargeServerMessage largeMessage = parseLargeMessage(messages, buff);
messages.put(record.id, largeMessage);
largeMessages.add(largeMessage);
break;
}
case JournalRecordIds.ADD_MESSAGE: {
throw new IllegalStateException("This is using old journal data, export your data and import at the correct version");
}
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
Message message = MessagePersister.getInstance().decode(buff, null);
messages.put(record.id, message);
break;
}
case JournalRecordIds.ADD_REF: {
long messageID = record.id;
RefEncoding encoding = new RefEncoding();
encoding.decode(buff);
Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);
if (queueMessages == null) {
queueMessages = new LinkedHashMap<>();
queueMap.put(encoding.queueID, queueMessages);
}
Message message = messages.get(messageID);
if (message == null) {
ActiveMQServerLogger.LOGGER.cannotFindMessage(record.id);
} else {
queueMessages.put(messageID, new AddMessageRecord(message));
}
break;
}
case JournalRecordIds.ACKNOWLEDGE_REF: {
long messageID = record.id;
RefEncoding encoding = new RefEncoding();
encoding.decode(buff);
Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);
if (queueMessages == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindQueue(encoding.queueID, messageID);
} else {
AddMessageRecord rec = queueMessages.remove(messageID);
if (rec == null) {
ActiveMQServerLogger.LOGGER.cannotFindMessage(messageID);
}
}
break;
}
case JournalRecordIds.UPDATE_DELIVERY_COUNT: {
long messageID = record.id;
DeliveryCountUpdateEncoding encoding = new DeliveryCountUpdateEncoding();
encoding.decode(buff);
Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);
if (queueMessages == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueDelCount(encoding.queueID);
} else {
AddMessageRecord rec = queueMessages.get(messageID);
if (rec == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindMessageDelCount(messageID);
} else {
rec.setDeliveryCount(encoding.count);
}
}
break;
}
case JournalRecordIds.PAGE_TRANSACTION: {
PageTransactionInfo invalidPGTx = null;
if (record.isUpdate) {
PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
pageUpdate.decode(buff);
PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
if (pageTX == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindPageTX(pageUpdate.pageTX);
} else {
if (!pageTX.onUpdate(pageUpdate.recods, null, null)) {
invalidPGTx = pageTX;
}
}
} else {
PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
pageTransactionInfo.decode(buff);
pageTransactionInfo.setRecordID(record.id);
pagingManager.addTransaction(pageTransactionInfo);
if (!pageTransactionInfo.checkSize(null, null)) {
invalidPGTx = pageTransactionInfo;
}
}
if (invalidPGTx != null) {
if (invalidPageTransactions == null) {
invalidPageTransactions = new HashSet<>();
}
invalidPageTransactions.add(invalidPGTx);
}
break;
}
case JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME: {
long messageID = record.id;
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding();
encoding.decode(buff);
Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);
if (queueMessages == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueScheduled(encoding.queueID, messageID);
} else {
AddMessageRecord rec = queueMessages.get(messageID);
if (rec == null) {
ActiveMQServerLogger.LOGGER.cannotFindMessage(messageID);
} else {
rec.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
}
}
break;
}
case JournalRecordIds.DUPLICATE_ID: {
DuplicateIDEncoding encoding = new DuplicateIDEncoding();
encoding.decode(buff);
List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);
if (ids == null) {
ids = new ArrayList<>();
duplicateIDMap.put(encoding.address, ids);
}
ids.add(new Pair<>(encoding.duplID, record.id));
break;
}
case JournalRecordIds.HEURISTIC_COMPLETION: {
HeuristicCompletionEncoding encoding = new HeuristicCompletionEncoding();
encoding.decode(buff);
resourceManager.putHeuristicCompletion(record.id, encoding.xid, encoding.isCommit);
break;
}
case JournalRecordIds.ACKNOWLEDGE_CURSOR: {
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
encoding.position.setRecordID(record.id);
PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.reloadACK(encoding.position);
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloading(encoding.queueID);
messageJournal.appendDeleteRecord(record.id, false);
}
break;
}
case JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE: {
PageCountRecord encoding = new PageCountRecord();
encoding.decode(buff);
PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize());
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
messageJournal.appendDeleteRecord(record.id, false);
}
break;
}
case JournalRecordIds.PAGE_CURSOR_COUNTER_INC: {
PageCountRecordInc encoding = new PageCountRecordInc();
encoding.decode(buff);
PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.getCounter().loadInc(record.id, encoding.getValue(), encoding.getPersistentSize());
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID());
messageJournal.appendDeleteRecord(record.id, false);
}
break;
}
case JournalRecordIds.PAGE_CURSOR_COMPLETE: {
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
encoding.position.setRecordID(record.id);
PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
if (!sub.reloadPageCompletion(encoding.position)) {
if (logger.isDebugEnabled()) {
logger.debug("Complete page " + encoding.position.getPageNr() + " doesn't exist on page manager " + sub.getPagingStore().getAddress());
}
messageJournal.appendDeleteRecord(record.id, false);
}
} else {
ActiveMQServerLogger.LOGGER.cantFindQueueOnPageComplete(encoding.queueID);
messageJournal.appendDeleteRecord(record.id, false);
}
break;
}
case JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER: {
PageCountPendingImpl pendingCountEncoding = new PageCountPendingImpl();
pendingCountEncoding.decode(buff);
pendingCountEncoding.setID(record.id);
PageSubscription sub = locateSubscription(pendingCountEncoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.notEmpty();
}
// This can be null on testcases not interested on this outcome
if (pendingNonTXPageCounter != null) {
pendingNonTXPageCounter.add(pendingCountEncoding);
}
break;
}
default: {
throw new IllegalStateException("Invalid record type " + recordType);
}
}
// This will free up memory sooner. The record is not needed any more
// and its byte array would consume memory during the load process even though it's not necessary any longer
// what would delay processing time during load
records.set(reccount, null);
long value;
}
final MutableLong recordNumber = new MutableLong();
// This will free up memory sooner while reading the records
records.clear(record -> {
try {
// It will show log.info only with large journals (more than 1 million records)
if (recordNumber.value > 0 && recordNumber.value % 1000000 == 0) {
long percent = (long) ((((double) recordNumber.value) / ((double) totalSize)) * 100f);
ActiveMQServerLogger.LOGGER.percentLoaded(percent);
}
recordNumber.value++;
byte[] data = record.data;
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
byte recordType = record.getUserRecordType();
switch (recordType) {
case JournalRecordIds.ADD_LARGE_MESSAGE_PENDING: {
PendingLargeMessageEncoding pending = new PendingLargeMessageEncoding();
pending.decode(buff);
if (pendingLargeMessages != null) {
// it could be null on tests, and we don't need anything on that case
pendingLargeMessages.add(new Pair<>(record.id, pending.largeMessageID));
}
break;
}
case JournalRecordIds.ADD_LARGE_MESSAGE: {
LargeServerMessage largeMessage = parseLargeMessage(buff);
messages.put(record.id, largeMessage);
largeMessages.add(largeMessage);
break;
}
case JournalRecordIds.ADD_MESSAGE: {
throw new IllegalStateException("This is using old journal data, export your data and import at the correct version");
}
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
Message message = MessagePersister.getInstance().decode(buff, null);
messages.put(record.id, message);
break;
}
case JournalRecordIds.ADD_REF: {
long messageID = record.id;
RefEncoding encoding = new RefEncoding();
encoding.decode(buff);
Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);
if (queueMessages == null) {
queueMessages = new LinkedHashMap<>();
queueMap.put(encoding.queueID, queueMessages);
}
Message message = messages.get(messageID);
if (message == null) {
ActiveMQServerLogger.LOGGER.cannotFindMessage(record.id);
} else {
queueMessages.put(messageID, new AddMessageRecord(message));
}
break;
}
case JournalRecordIds.ACKNOWLEDGE_REF: {
long messageID = record.id;
RefEncoding encoding = new RefEncoding();
encoding.decode(buff);
Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);
if (queueMessages == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindQueue(encoding.queueID, messageID);
} else {
AddMessageRecord rec = queueMessages.remove(messageID);
if (rec == null) {
ActiveMQServerLogger.LOGGER.cannotFindMessage(messageID);
}
}
break;
}
case JournalRecordIds.UPDATE_DELIVERY_COUNT: {
long messageID = record.id;
DeliveryCountUpdateEncoding encoding = new DeliveryCountUpdateEncoding();
encoding.decode(buff);
Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);
if (queueMessages == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueDelCount(encoding.queueID);
} else {
AddMessageRecord rec = queueMessages.get(messageID);
if (rec == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindMessageDelCount(messageID);
} else {
rec.setDeliveryCount(encoding.count);
}
}
break;
}
case JournalRecordIds.PAGE_TRANSACTION: {
PageTransactionInfo invalidPGTx = null;
if (record.isUpdate) {
PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
pageUpdate.decode(buff);
PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
if (pageTX == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindPageTX(pageUpdate.pageTX);
} else {
if (!pageTX.onUpdate(pageUpdate.recods, null, null)) {
invalidPGTx = pageTX;
}
}
} else {
PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
pageTransactionInfo.decode(buff);
pageTransactionInfo.setRecordID(record.id);
pagingManager.addTransaction(pageTransactionInfo);
if (!pageTransactionInfo.checkSize(null, null)) {
invalidPGTx = pageTransactionInfo;
}
}
if (invalidPGTx != null) {
invalidPageTransactions.add(invalidPGTx);
}
break;
}
case JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME: {
long messageID = record.id;
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding();
encoding.decode(buff);
Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID);
if (queueMessages == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueScheduled(encoding.queueID, messageID);
} else {
AddMessageRecord rec = queueMessages.get(messageID);
if (rec == null) {
ActiveMQServerLogger.LOGGER.cannotFindMessage(messageID);
} else {
rec.setScheduledDeliveryTime(encoding.scheduledDeliveryTime);
}
}
break;
}
case JournalRecordIds.DUPLICATE_ID: {
DuplicateIDEncoding encoding = new DuplicateIDEncoding();
encoding.decode(buff);
List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);
if (ids == null) {
ids = new ArrayList<>();
duplicateIDMap.put(encoding.address, ids);
}
ids.add(new Pair<>(encoding.duplID, record.id));
break;
}
case JournalRecordIds.HEURISTIC_COMPLETION: {
HeuristicCompletionEncoding encoding = new HeuristicCompletionEncoding();
encoding.decode(buff);
resourceManager.putHeuristicCompletion(record.id, encoding.xid, encoding.isCommit);
break;
}
case JournalRecordIds.ACKNOWLEDGE_CURSOR: {
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
encoding.position.setRecordID(record.id);
PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.reloadACK(encoding.position);
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloading(encoding.queueID);
messageJournal.appendDeleteRecord(record.id, false);
}
break;
}
case JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE: {
PageCountRecord encoding = new PageCountRecord();
encoding.decode(buff);
PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize());
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
messageJournal.appendDeleteRecord(record.id, false);
}
break;
}
case JournalRecordIds.PAGE_CURSOR_COUNTER_INC: {
PageCountRecordInc encoding = new PageCountRecordInc();
encoding.decode(buff);
PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.getCounter().loadInc(record.id, encoding.getValue(), encoding.getPersistentSize());
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID());
messageJournal.appendDeleteRecord(record.id, false);
}
break;
}
case JournalRecordIds.PAGE_CURSOR_COMPLETE: {
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
encoding.position.setRecordID(record.id);
PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
if (!sub.reloadPageCompletion(encoding.position)) {
if (logger.isDebugEnabled()) {
logger.debug("Complete page " + encoding.position.getPageNr() + " doesn't exist on page manager " + sub.getPagingStore().getAddress());
}
messageJournal.appendDeleteRecord(record.id, false);
}
} else {
ActiveMQServerLogger.LOGGER.cantFindQueueOnPageComplete(encoding.queueID);
messageJournal.appendDeleteRecord(record.id, false);
}
break;
}
case JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER: {
PageCountPendingImpl pendingCountEncoding = new PageCountPendingImpl();
pendingCountEncoding.decode(buff);
pendingCountEncoding.setID(record.id);
PageSubscription sub = locateSubscription(pendingCountEncoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
if (sub != null) {
sub.notEmpty();
}
// This can be null on testcases not interested on this outcome
if (pendingNonTXPageCounter != null) {
pendingNonTXPageCounter.add(pendingCountEncoding);
}
break;
}
default: {
throw new IllegalStateException("Invalid record type " + recordType);
}
}
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// Release the memory as soon as not needed any longer
records.clear();
records = null;
journalLoader.handleAddMessage(queueMap);
@ -1221,7 +1202,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public void checkInvalidPageTransactions(PagingManager pagingManager,
Set<PageTransactionInfo> invalidPageTransactions) {
if (invalidPageTransactions != null) {
if (invalidPageTransactions != null && !invalidPageTransactions.isEmpty()) {
for (PageTransactionInfo pginfo : invalidPageTransactions) {
pginfo.checkSize(this, pagingManager);
}
@ -1713,8 +1694,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
}
protected abstract LargeServerMessage parseLargeMessage(Map<Long, Message> messages,
ActiveMQBuffer buff) throws Exception;
protected abstract LargeServerMessage parseLargeMessage(ActiveMQBuffer buff) throws Exception;
private void loadPreparedTransactions(final PostOffice postOffice,
final PagingManager pagingManager,
@ -1750,7 +1730,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
switch (recordType) {
case JournalRecordIds.ADD_LARGE_MESSAGE: {
messages.put(record.id, parseLargeMessage(messages, buff));
messages.put(record.id, parseLargeMessage(buff));
break;
}

View File

@ -358,12 +358,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
@Override
/**
* @param messages
* @param buff
* @return
* @throws Exception
*/ protected LargeServerMessage parseLargeMessage(final Map<Long, Message> messages,
final ActiveMQBuffer buff) throws Exception {
*/
protected LargeServerMessage parseLargeMessage(final ActiveMQBuffer buff) throws Exception {
LargeServerMessage largeMessage = createLargeMessage();
LargeMessagePersister.getInstance().decode(buff, largeMessage);

View File

@ -17,11 +17,9 @@
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -32,13 +30,10 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
public class LargeMessageTXFailureCallback implements TransactionFailureCallback {
private AbstractJournalStorageManager journalStorageManager;
private final Map<Long, Message> messages;
public LargeMessageTXFailureCallback(AbstractJournalStorageManager journalStorageManager,
final Map<Long, Message> messages) {
public LargeMessageTXFailureCallback(AbstractJournalStorageManager journalStorageManager) {
super();
this.journalStorageManager = journalStorageManager;
this.messages = messages;
}
@Override
@ -52,7 +47,7 @@ public class LargeMessageTXFailureCallback implements TransactionFailureCallback
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
try {
LargeServerMessage serverMessage = journalStorageManager.parseLargeMessage(messages, buff);
LargeServerMessage serverMessage = journalStorageManager.parseLargeMessage(buff);
serverMessage.decrementDelayDeletionCount();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.journalError(e);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.replication;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
@ -424,7 +425,7 @@ public class ReplicatedJournal implements Journal {
* @see org.apache.activemq.artemis.core.journal.Journal#load(java.util.List, java.util.List, org.apache.activemq.artemis.core.journal.TransactionFailureCallback)
*/
@Override
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
public JournalLoadInformation load(final Consumer<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure,
final boolean fixbadTX) throws Exception {

View File

@ -29,6 +29,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -811,7 +812,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
}
@Override
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
public JournalLoadInformation load(final Consumer<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure,
final boolean fixbadtx) throws Exception {

View File

@ -360,7 +360,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
}
protected void load() throws Exception {
journal.load(null, null, null);
journal.load((List<RecordInfo>) null, null, null);
}
protected void beforeJournalOperation() throws Exception {