ARTEMIS-2602 Reduce number of copies for non JDBC Journal

This commit is contained in:
Francesco Nigro 2020-01-17 21:26:11 +01:00
parent 3b6176438e
commit fa0c187ae6
8 changed files with 134 additions and 86 deletions

View File

@ -30,7 +30,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -52,6 +51,7 @@ import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
import org.jboss.logging.Logger;
public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@ -897,13 +897,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
@Override
public synchronized JournalLoadInformation load(final Consumer<RecordInfo> onCommittedRecords,
public synchronized JournalLoadInformation load(final SparseArrayLinkedList<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
final boolean fixBadTX) throws Exception {
final List<RecordInfo> committedRecords = new ArrayList<>();
final JournalLoadInformation journalLoadInformation = load(committedRecords, preparedTransactions, failureCallback, fixBadTX);
committedRecords.forEach(onCommittedRecords);
final List<RecordInfo> records = new ArrayList<>();
final JournalLoadInformation journalLoadInformation = load(records, preparedTransactions, failureCallback, fixBadTX);
records.forEach(committedRecords::add);
return journalLoadInformation;
}

View File

@ -18,12 +18,12 @@ package org.apache.activemq.artemis.core.journal;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
/**
* Most methods on the journal provide a blocking version where you select the sync mode and a non
@ -196,25 +196,23 @@ public interface Journal extends ActiveMQComponent {
void lineUpContext(IOCompletion callback);
default JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception {
return load(committedRecords == null ? null : committedRecords::add, preparedTransactions, transactionFailure, true);
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception {
return load(committedRecords, preparedTransactions, transactionFailure, true);
}
default JournalLoadInformation load(Consumer<RecordInfo> committedRecords,
JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure,
boolean fixBadTx) throws Exception;
default JournalLoadInformation load(SparseArrayLinkedList<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception {
return load(committedRecords, preparedTransactions, transactionFailure, true);
}
default JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure,
boolean fixBadTx) throws Exception {
return load(committedRecords == null ? null : committedRecords::add, preparedTransactions, transactionFailure, fixBadTx);
}
JournalLoadInformation load(Consumer<RecordInfo> committedRecords,
JournalLoadInformation load(SparseArrayLinkedList<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure,
boolean fixBadTx) throws Exception;

View File

@ -20,7 +20,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQUnsupportedPacketException;
@ -43,6 +42,7 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
/**
* Journal used at a replicating backup server during the synchronization of data with the 'live'
@ -268,7 +268,15 @@ public final class FileWrapperJournal extends JournalBase {
}
@Override
public JournalLoadInformation load(Consumer<RecordInfo> committedRecords,
public JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure,
boolean fixbadtx) throws Exception {
throw new ActiveMQUnsupportedPacketException();
}
@Override
public JournalLoadInformation load(SparseArrayLinkedList<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure,
boolean fixbadtx) throws Exception {

View File

@ -40,7 +40,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -1441,19 +1440,35 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return load(DummyLoader.INSTANCE, true, syncState);
}
@Override
public JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure,
boolean fixBadTx) throws Exception {
// suboptimal method: it would perform an additional copy!
// Implementors should override this to provide their optimized version
final SparseArrayLinkedList<RecordInfo> records = new SparseArrayLinkedList<>();
final JournalLoadInformation info = load(records, preparedTransactions, transactionFailure, fixBadTx);
if (committedRecords instanceof ArrayList) {
final long survivedRecordsCount = records.size();
if (survivedRecordsCount <= Integer.MAX_VALUE) {
((ArrayList) committedRecords).ensureCapacity((int) survivedRecordsCount);
}
}
records.clear(committedRecords::add);
return info;
}
/**
* @see JournalImpl#load(LoaderCallback)
*/
@Override
public synchronized JournalLoadInformation load(final Consumer<RecordInfo> committedRecords,
public synchronized JournalLoadInformation load(final SparseArrayLinkedList<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
final boolean fixBadTX) throws Exception {
final LongHashSet recordsToDelete = new LongHashSet(1024);
final Predicate<RecordInfo> toDeleteFilter = recordInfo -> recordsToDelete.contains(recordInfo.id);
// ArrayList was taking too long to delete elements on checkDeleteSize
// and LinkedList<RecordInfo> creates too many nodes
final SparseArrayLinkedList<RecordInfo> records = new SparseArrayLinkedList<>();
final int DELETE_FLUSH = 20000;
@ -1468,7 +1483,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
// Clean up when the list is too large, or it won't be possible to load large sets of files
// Done as part of JBMESSAGING-1678
final long removed = records.remove(toDeleteFilter);
final long removed = committedRecords.remove(toDeleteFilter);
if (logger.isDebugEnabled()) {
logger.debugf("Removed records during loading = %d", removed);
}
@ -1486,13 +1501,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public void addRecord(final RecordInfo info) {
records.add(info);
committedRecords.add(info);
checkDeleteSize();
}
@Override
public void updateRecord(final RecordInfo info) {
records.add(info);
committedRecords.add(info);
checkDeleteSize();
}
@ -1512,13 +1527,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}, fixBadTX, null);
final Consumer<RecordInfo> fillCommittedRecord = record -> {
if (!recordsToDelete.contains(record.id)) {
committedRecords.accept(record);
}
};
// it helps GC by cleaning up each SparseArray too
records.clear(fillCommittedRecord);
if (!recordsToDelete.isEmpty()) {
committedRecords.remove(toDeleteFilter);
}
return info;
}

View File

@ -840,7 +840,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
readLock();
try {
JournalLoadInformation info = messageJournal.load(records::add, preparedTransactions, new LargeMessageTXFailureCallback(this));
JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this));
ArrayList<LargeServerMessage> largeMessages = new ArrayList<>();
@ -1465,7 +1465,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
final List<GroupingInfo> groupingInfos,
final List<AddressBindingInfo> addressBindingInfos) throws Exception {
List<RecordInfo> records = new ArrayList<>();
SparseArrayLinkedList<RecordInfo> records = new SparseArrayLinkedList<>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
@ -1474,56 +1474,62 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
HashMap<Long, PersistentQueueBindingEncoding> mapBindings = new HashMap<>();
HashMap<Long, PersistentAddressBindingEncoding> mapAddressBindings = new HashMap<>();
for (RecordInfo record : records) {
long id = record.id;
records.clear(record -> {
try {
long id = record.id;
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(record.data);
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(record.data);
byte rec = record.getUserRecordType();
byte rec = record.getUserRecordType();
if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) {
PersistentQueueBindingEncoding bindingEncoding = newQueueBindingEncoding(id, buffer);
mapBindings.put(bindingEncoding.getId(), bindingEncoding);
} else if (rec == JournalRecordIds.ID_COUNTER_RECORD) {
idGenerator.loadState(record.id, buffer);
} else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) {
PersistentAddressBindingEncoding bindingEncoding = newAddressBindingEncoding(id, buffer);
addressBindingInfos.add(bindingEncoding);
mapAddressBindings.put(id, bindingEncoding);
} else if (rec == JournalRecordIds.GROUP_RECORD) {
GroupingEncoding encoding = newGroupEncoding(id, buffer);
groupingInfos.add(encoding);
} else if (rec == JournalRecordIds.ADDRESS_SETTING_RECORD) {
PersistedAddressSetting setting = newAddressEncoding(id, buffer);
mapPersistedAddressSettings.put(setting.getAddressMatch(), setting);
} else if (rec == JournalRecordIds.SECURITY_RECORD) {
PersistedRoles roles = newSecurityRecord(id, buffer);
mapPersistedRoles.put(roles.getAddressMatch(), roles);
} else if (rec == JournalRecordIds.QUEUE_STATUS_RECORD) {
QueueStatusEncoding statusEncoding = newQueueStatusEncoding(id, buffer);
PersistentQueueBindingEncoding queueBindingEncoding = mapBindings.get(statusEncoding.queueID);
if (queueBindingEncoding != null) {
queueBindingEncoding.addQueueStatusEncoding(statusEncoding);
if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) {
PersistentQueueBindingEncoding bindingEncoding = newQueueBindingEncoding(id, buffer);
mapBindings.put(bindingEncoding.getId(), bindingEncoding);
} else if (rec == JournalRecordIds.ID_COUNTER_RECORD) {
idGenerator.loadState(record.id, buffer);
} else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) {
PersistentAddressBindingEncoding bindingEncoding = newAddressBindingEncoding(id, buffer);
addressBindingInfos.add(bindingEncoding);
mapAddressBindings.put(id, bindingEncoding);
} else if (rec == JournalRecordIds.GROUP_RECORD) {
GroupingEncoding encoding = newGroupEncoding(id, buffer);
groupingInfos.add(encoding);
} else if (rec == JournalRecordIds.ADDRESS_SETTING_RECORD) {
PersistedAddressSetting setting = newAddressEncoding(id, buffer);
mapPersistedAddressSettings.put(setting.getAddressMatch(), setting);
} else if (rec == JournalRecordIds.SECURITY_RECORD) {
PersistedRoles roles = newSecurityRecord(id, buffer);
mapPersistedRoles.put(roles.getAddressMatch(), roles);
} else if (rec == JournalRecordIds.QUEUE_STATUS_RECORD) {
QueueStatusEncoding statusEncoding = newQueueStatusEncoding(id, buffer);
PersistentQueueBindingEncoding queueBindingEncoding = mapBindings.get(statusEncoding.queueID);
if (queueBindingEncoding != null) {
queueBindingEncoding.addQueueStatusEncoding(statusEncoding);
} else {
// unlikely to happen, so I didn't bother about the Logger method
ActiveMQServerLogger.LOGGER.infoNoQueueWithID(statusEncoding.queueID, statusEncoding.getId());
this.deleteQueueStatus(statusEncoding.getId());
}
} else if (rec == JournalRecordIds.ADDRESS_STATUS_RECORD) {
AddressStatusEncoding statusEncoding = newAddressStatusEncoding(id, buffer);
PersistentAddressBindingEncoding addressBindingEncoding = mapAddressBindings.get(statusEncoding.getAddressId());
if (addressBindingEncoding != null) {
addressBindingEncoding.setAddressStatusEncoding(statusEncoding);
} else {
// unlikely to happen, so I didn't bother about the Logger method
ActiveMQServerLogger.LOGGER.infoNoAddressWithID(statusEncoding.getAddressId(), statusEncoding.getId());
this.deleteAddressStatus(statusEncoding.getId());
}
} else {
// unlikely to happen, so I didn't bother about the Logger method
ActiveMQServerLogger.LOGGER.infoNoQueueWithID(statusEncoding.queueID, statusEncoding.getId());
this.deleteQueueStatus(statusEncoding.getId());
// unlikely to happen
ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new Exception("invalid record type " + rec));
}
} else if (rec == JournalRecordIds.ADDRESS_STATUS_RECORD) {
AddressStatusEncoding statusEncoding = newAddressStatusEncoding(id, buffer);
PersistentAddressBindingEncoding addressBindingEncoding = mapAddressBindings.get(statusEncoding.getAddressId());
if (addressBindingEncoding != null) {
addressBindingEncoding.setAddressStatusEncoding(statusEncoding);
} else {
// unlikely to happen, so I didn't bother about the Logger method
ActiveMQServerLogger.LOGGER.infoNoAddressWithID(statusEncoding.getAddressId(), statusEncoding.getId());
this.deleteAddressStatus(statusEncoding.getId());
}
} else {
// unlikely to happen
ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new Exception("invalid record type " + rec));
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
for (PersistentQueueBindingEncoding queue : mapBindings.values()) {
queueBindingInfos.add(queue);

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.replication;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
@ -34,6 +33,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
import org.jboss.logging.Logger;
/**
@ -425,7 +425,22 @@ public class ReplicatedJournal implements Journal {
* @see org.apache.activemq.artemis.core.journal.Journal#load(java.util.List, java.util.List, org.apache.activemq.artemis.core.journal.TransactionFailureCallback)
*/
@Override
public JournalLoadInformation load(final Consumer<RecordInfo> committedRecords,
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure,
final boolean fixbadTX) throws Exception {
return localJournal.load(committedRecords, preparedTransactions, transactionFailure, fixbadTX);
}
/**
* @param committedRecords
* @param preparedTransactions
* @param transactionFailure
* @throws Exception
* @see org.apache.activemq.artemis.core.journal.Journal#load(java.util.List, java.util.List, org.apache.activemq.artemis.core.journal.TransactionFailureCallback)
*/
@Override
public JournalLoadInformation load(final SparseArrayLinkedList<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure,
final boolean fixbadTX) throws Exception {

View File

@ -29,7 +29,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -89,6 +88,7 @@ import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.junit.After;
import org.junit.Assert;
@ -812,7 +812,16 @@ public final class ReplicationTest extends ActiveMQTestBase {
}
@Override
public JournalLoadInformation load(final Consumer<RecordInfo> committedRecords,
public JournalLoadInformation load(final SparseArrayLinkedList<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure,
final boolean fixbadtx) throws Exception {
return new JournalLoadInformation();
}
@Override
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure,
final boolean fixbadtx) throws Exception {

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
@ -360,7 +361,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
}
protected void load() throws Exception {
journal.load((List<RecordInfo>) null, null, null);
journal.load(new SparseArrayLinkedList<>(), null, null);
}
protected void beforeJournalOperation() throws Exception {