ARTEMIS-1996 MappedSequentialFileFactory may cause DirectByteBuffer off-heap memory leaks
Compaction is now reusing direct ByteBuffers on both reading and writing with explicit and deterministic release to avoid high peak of native memory utilisation after compaction.
This commit is contained in:
parent
ef7ff38de9
commit
2967df6a99
|
@ -250,6 +250,11 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) {
|
||||
writeDirect(bytes, sync, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
|
||||
writeDirect(bytes, sync, null);
|
||||
|
|
|
@ -77,6 +77,20 @@ public interface SequentialFile {
|
|||
*/
|
||||
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
|
||||
|
||||
/**
|
||||
* Write directly to the file without using any intermediate buffer and wait completion.<br>
|
||||
* If {@code releaseBuffer} is {@code true} the provided {@code bytes} should be released
|
||||
* through {@link SequentialFileFactory#releaseBuffer(ByteBuffer)}, if supported.
|
||||
*
|
||||
* @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
|
||||
* NIO). If {@code releaseBuffer} is {@code true} use a buffer from
|
||||
* {@link SequentialFileFactory#newBuffer(int)}, {@link SequentialFileFactory#allocateDirectBuffer(int)}
|
||||
* otherwise.
|
||||
* @param sync if {@code true} will durable flush the written data on the file, {@code false} otherwise
|
||||
* @param releaseBuffer if {@code true} will release the buffer, {@code false} otherwise
|
||||
*/
|
||||
void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception;
|
||||
|
||||
/**
|
||||
* @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
|
||||
* NIO). To be safe, use a buffer from the corresponding
|
||||
|
|
|
@ -233,6 +233,33 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Write Direct, Sync: true File: " + getFileName());
|
||||
}
|
||||
|
||||
final SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
|
||||
|
||||
try {
|
||||
checkOpened();
|
||||
} catch (Exception e) {
|
||||
ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
|
||||
completion.onError(-1, e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
|
||||
|
||||
final long positionToWrite = position.getAndAdd(bytesToWrite);
|
||||
|
||||
final AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(completion, bytes, releaseBuffer);
|
||||
runnableCallback.initWrite(positionToWrite, bytesToWrite);
|
||||
runnableCallback.run();
|
||||
|
||||
completion.waitCompletion();
|
||||
}
|
||||
|
||||
/**
|
||||
* Note: Parameter sync is not used on AIO
|
||||
*/
|
||||
|
@ -256,8 +283,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
}
|
||||
|
||||
AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) {
|
||||
return getCallback(originalCallback, buffer, true);
|
||||
}
|
||||
|
||||
AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback,
|
||||
ByteBuffer buffer,
|
||||
boolean releaseBuffer) {
|
||||
AIOSequentialFileFactory.AIOSequentialCallback callback = aioFactory.getCallback();
|
||||
callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer);
|
||||
callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer, releaseBuffer);
|
||||
pendingCallbacks.countUp();
|
||||
return callback;
|
||||
}
|
||||
|
|
|
@ -286,7 +286,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
String errorMessage;
|
||||
int errorCode = -1;
|
||||
long writeSequence;
|
||||
|
||||
boolean releaseBuffer;
|
||||
long position;
|
||||
int bytes;
|
||||
|
||||
|
@ -297,6 +297,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
", errorMessage='" + errorMessage + '\'' +
|
||||
", errorCode=" + errorCode +
|
||||
", writeSequence=" + writeSequence +
|
||||
", releaseBuffer=" + releaseBuffer +
|
||||
", position=" + position +
|
||||
'}';
|
||||
}
|
||||
|
@ -332,7 +333,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
IOCallback IOCallback,
|
||||
LibaioFile libaioFile,
|
||||
AIOSequentialFile sequentialFile,
|
||||
ByteBuffer usedBuffer) {
|
||||
ByteBuffer usedBuffer,
|
||||
boolean releaseBuffer) {
|
||||
this.callback = IOCallback;
|
||||
this.sequentialFile = sequentialFile;
|
||||
this.error = false;
|
||||
|
@ -340,6 +342,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
this.libaioFile = libaioFile;
|
||||
this.writeSequence = writeSequence;
|
||||
this.errorMessage = null;
|
||||
this.releaseBuffer = releaseBuffer;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -375,7 +378,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
callback.done();
|
||||
}
|
||||
|
||||
if (buffer != null && reuseBuffers) {
|
||||
if (buffer != null && reuseBuffers && releaseBuffer) {
|
||||
buffersControl.bufferDone(buffer);
|
||||
}
|
||||
|
||||
|
|
|
@ -273,6 +273,28 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
|
||||
try {
|
||||
checkIsOpen();
|
||||
final int position = bytes.position();
|
||||
final int limit = bytes.limit();
|
||||
final int remaining = limit - position;
|
||||
if (remaining > 0) {
|
||||
this.mappedFile.write(bytes, position, remaining);
|
||||
final int newPosition = position + remaining;
|
||||
bytes.position(newPosition);
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (releaseBuffer) {
|
||||
this.factory.releaseBuffer(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(ByteBuffer bytes, IOCallback callback) throws IOException {
|
||||
if (callback == null) {
|
||||
|
|
|
@ -96,6 +96,11 @@ final class TimedSequentialFile implements SequentialFile {
|
|||
this.sequentialFile.delete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
|
||||
this.sequentialFile.blockingWriteDirect(bytes, sync, releaseBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
|
||||
if (this.timedBuffer != null) {
|
||||
|
|
|
@ -244,7 +244,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
}
|
||||
|
||||
try {
|
||||
internalWrite(bytes, sync, callback);
|
||||
internalWrite(bytes, sync, callback, true);
|
||||
} catch (Exception e) {
|
||||
callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
|
||||
}
|
||||
|
@ -252,7 +252,12 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
@Override
|
||||
public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception {
|
||||
internalWrite(bytes, sync, null);
|
||||
internalWrite(bytes, sync, null, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
|
||||
internalWrite(bytes, sync, null, releaseBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -266,7 +271,8 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
private void internalWrite(final ByteBuffer bytes,
|
||||
final boolean sync,
|
||||
final IOCallback callback) throws IOException, ActiveMQIOErrorException, InterruptedException {
|
||||
final IOCallback callback,
|
||||
boolean releaseBuffer) throws IOException, ActiveMQIOErrorException, InterruptedException {
|
||||
if (!isOpen()) {
|
||||
if (callback != null) {
|
||||
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
|
||||
|
@ -279,7 +285,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
position.addAndGet(bytes.limit());
|
||||
|
||||
try {
|
||||
doInternalWrite(bytes, sync, callback);
|
||||
doInternalWrite(bytes, sync, callback, releaseBuffer);
|
||||
} catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
|
@ -296,7 +302,8 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
*/
|
||||
private void doInternalWrite(final ByteBuffer bytes,
|
||||
final boolean sync,
|
||||
final IOCallback callback) throws IOException {
|
||||
final IOCallback callback,
|
||||
boolean releaseBuffer) throws IOException {
|
||||
try {
|
||||
channel.write(bytes);
|
||||
|
||||
|
@ -308,8 +315,10 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
callback.done();
|
||||
}
|
||||
} finally {
|
||||
//release it to recycle the write buffer if big enough
|
||||
this.factory.releaseBuffer(bytes);
|
||||
if (releaseBuffer) {
|
||||
//release it to recycle the write buffer if big enough
|
||||
this.factory.releaseBuffer(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.journal.impl;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
|
@ -152,10 +153,11 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
}
|
||||
}
|
||||
|
||||
public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
|
||||
static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
|
||||
final List<String> dataFiles,
|
||||
final List<String> newFiles,
|
||||
final List<Pair<String, String>> renameFile) throws Exception {
|
||||
final List<Pair<String, String>> renameFile,
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
|
||||
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
|
||||
|
||||
if (controlFile.exists()) {
|
||||
|
@ -163,13 +165,12 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
|
||||
final ArrayList<RecordInfo> records = new ArrayList<>();
|
||||
|
||||
|
||||
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() {
|
||||
@Override
|
||||
public void onReadAddRecord(final RecordInfo info) throws Exception {
|
||||
records.add(info);
|
||||
}
|
||||
});
|
||||
}, wholeFileBufferRef);
|
||||
|
||||
if (records.size() == 0) {
|
||||
// the record is damaged
|
||||
|
@ -205,29 +206,48 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
}
|
||||
}
|
||||
|
||||
public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
|
||||
final List<String> dataFiles,
|
||||
final List<String> newFiles,
|
||||
final List<Pair<String, String>> renameFile) throws Exception {
|
||||
return readControlFile(fileFactory, dataFiles, newFiles, renameFile, null);
|
||||
}
|
||||
|
||||
private void flush(boolean releaseWritingBuffer) throws Exception {
|
||||
if (writingChannel != null) {
|
||||
try {
|
||||
if (sequentialFile.isOpen()) {
|
||||
try {
|
||||
sequentialFile.position(0);
|
||||
|
||||
// To Fix the size of the file
|
||||
writingChannel.writerIndex(writingChannel.capacity());
|
||||
|
||||
final ByteBuffer byteBuffer = bufferWrite;
|
||||
final int readerIndex = writingChannel.readerIndex();
|
||||
byteBuffer.clear().position(readerIndex).limit(readerIndex + writingChannel.readableBytes());
|
||||
sequentialFile.blockingWriteDirect(byteBuffer, true, false);
|
||||
} finally {
|
||||
sequentialFile.close();
|
||||
newDataFiles.add(currentFile);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (releaseWritingBuffer) {
|
||||
//deterministic release of native resources
|
||||
fileFactory.releaseDirectBuffer(bufferWrite);
|
||||
writingChannel = null;
|
||||
bufferWrite = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write pending output into file
|
||||
*/
|
||||
public void flush() throws Exception {
|
||||
if (writingChannel != null) {
|
||||
sequentialFile.position(0);
|
||||
|
||||
// To Fix the size of the file
|
||||
writingChannel.writerIndex(writingChannel.capacity());
|
||||
|
||||
bufferWrite.clear()
|
||||
.position(writingChannel.readerIndex())
|
||||
.limit(writingChannel.readableBytes());
|
||||
|
||||
sequentialFile.writeDirect(bufferWrite, true);
|
||||
sequentialFile.close();
|
||||
newDataFiles.add(currentFile);
|
||||
}
|
||||
|
||||
bufferWrite = null;
|
||||
|
||||
writingChannel = null;
|
||||
flush(true);
|
||||
}
|
||||
|
||||
public boolean containsRecord(final long id) {
|
||||
|
@ -243,11 +263,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
*/
|
||||
|
||||
protected void openFile() throws Exception {
|
||||
flush();
|
||||
|
||||
bufferWrite = fileFactory.newBuffer(journal.getFileSize());
|
||||
|
||||
writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
|
||||
flush(false);
|
||||
|
||||
currentFile = filesRepository.openFileCMP();
|
||||
|
||||
|
@ -257,6 +273,21 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
|
||||
currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
|
||||
|
||||
final int fileSize = journal.getFileSize();
|
||||
if (bufferWrite != null && bufferWrite.capacity() < fileSize) {
|
||||
fileFactory.releaseDirectBuffer(bufferWrite);
|
||||
bufferWrite = null;
|
||||
writingChannel = null;
|
||||
}
|
||||
if (bufferWrite == null) {
|
||||
final ByteBuffer bufferWrite = fileFactory.allocateDirectBuffer(fileSize);
|
||||
this.bufferWrite = bufferWrite;
|
||||
writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
|
||||
} else {
|
||||
writingChannel.clear();
|
||||
bufferWrite.clear();
|
||||
}
|
||||
|
||||
JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.activemq.artemis.core.journal.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
|
@ -42,6 +41,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
|
@ -461,17 +461,34 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
// Now order them by ordering id - we can't use the file name for ordering
|
||||
// since we can re-use dataFiles
|
||||
|
||||
Collections.sort(orderedFiles, new JournalFileComparator());
|
||||
Collections.sort(orderedFiles, JOURNAL_FILE_COMPARATOR);
|
||||
|
||||
return orderedFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* this method is used internally only however tools may use it to maintenance.
|
||||
*/
|
||||
public static int readJournalFile(final SequentialFileFactory fileFactory,
|
||||
final JournalFile file,
|
||||
final JournalReaderCallback reader) throws Exception {
|
||||
private static ByteBuffer allocateDirectBufferIfNeeded(final SequentialFileFactory fileFactory,
|
||||
final int requiredCapacity,
|
||||
final AtomicReference<ByteBuffer> bufferRef) {
|
||||
ByteBuffer buffer = bufferRef != null ? bufferRef.get() : null;
|
||||
if (buffer != null && buffer.capacity() < requiredCapacity) {
|
||||
fileFactory.releaseDirectBuffer(buffer);
|
||||
buffer = null;
|
||||
}
|
||||
if (buffer == null) {
|
||||
buffer = fileFactory.allocateDirectBuffer(requiredCapacity);
|
||||
} else {
|
||||
buffer.clear().limit(requiredCapacity);
|
||||
}
|
||||
if (bufferRef != null) {
|
||||
bufferRef.lazySet(buffer);
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
|
||||
static int readJournalFile(final SequentialFileFactory fileFactory,
|
||||
final JournalFile file,
|
||||
final JournalReaderCallback reader,
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferReference) throws Exception {
|
||||
file.getFile().open(1, false);
|
||||
ByteBuffer wholeFileBuffer = null;
|
||||
try {
|
||||
|
@ -481,8 +498,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
// the file is damaged or the system crash before it was able to write
|
||||
return -1;
|
||||
}
|
||||
|
||||
wholeFileBuffer = fileFactory.newBuffer(filesize);
|
||||
wholeFileBuffer = allocateDirectBufferIfNeeded(fileFactory, filesize, wholeFileBufferReference);
|
||||
|
||||
final int journalFileSize = file.getFile().read(wholeFileBuffer);
|
||||
|
||||
|
@ -771,16 +787,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
lastDataPos = wholeFileBuffer.position();
|
||||
|
||||
}
|
||||
|
||||
return lastDataPos;
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorReadingFile(e);
|
||||
throw new Exception(e.getMessage(), e);
|
||||
} finally {
|
||||
if (wholeFileBuffer != null) {
|
||||
fileFactory.releaseBuffer(wholeFileBuffer);
|
||||
if (wholeFileBufferReference == null && wholeFileBuffer != null) {
|
||||
fileFactory.releaseDirectBuffer(wholeFileBuffer);
|
||||
}
|
||||
|
||||
try {
|
||||
file.getFile().close();
|
||||
} catch (Throwable ignored) {
|
||||
|
@ -788,6 +802,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* this method is used internally only however tools may use it to maintenance.
|
||||
*/
|
||||
public static int readJournalFile(final SequentialFileFactory fileFactory,
|
||||
final JournalFile file,
|
||||
final JournalReaderCallback reader) throws Exception {
|
||||
return readJournalFile(fileFactory, file, reader, null);
|
||||
}
|
||||
|
||||
// Journal implementation
|
||||
// ----------------------------------------------------------------
|
||||
|
||||
|
@ -1594,19 +1617,28 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
journalLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
Collections.sort(dataFilesToProcess, new JournalFileComparator());
|
||||
Collections.sort(dataFilesToProcess, JOURNAL_FILE_COMPARATOR);
|
||||
|
||||
// This is where most of the work is done, taking most of the time of the compacting routine.
|
||||
// Notice there are no locks while this is being done.
|
||||
|
||||
// Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
|
||||
// well
|
||||
for (final JournalFile file : dataFilesToProcess) {
|
||||
try {
|
||||
JournalImpl.readJournalFile(fileFactory, file, compactor);
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.compactReadError(file);
|
||||
throw new Exception("Error on reading compacting for " + file, e);
|
||||
// this AtomicReference is not used for thread-safety, but just as a reference
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferRef = dataFilesToProcess.isEmpty() ? null : new AtomicReference<>();
|
||||
try {
|
||||
for (final JournalFile file : dataFilesToProcess) {
|
||||
try {
|
||||
JournalImpl.readJournalFile(fileFactory, file, compactor, wholeFileBufferRef);
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.compactReadError(file);
|
||||
throw new Exception("Error on reading compacting for " + file, e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
ByteBuffer wholeFileBuffer;
|
||||
if (wholeFileBufferRef != null && (wholeFileBuffer = wholeFileBufferRef.get()) != null) {
|
||||
fileFactory.releaseDirectBuffer(wholeFileBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1758,16 +1790,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
*/
|
||||
private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
|
||||
final boolean changeData,
|
||||
final JournalState replicationSync) throws Exception {
|
||||
if (state == JournalState.STOPPED || state == JournalState.LOADED) {
|
||||
throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
|
||||
state);
|
||||
}
|
||||
if (state == replicationSync) {
|
||||
throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
|
||||
}
|
||||
final JournalState replicationSync,
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
|
||||
JournalState state;
|
||||
assert (state = this.state) != JournalState.STOPPED &&
|
||||
state != JournalState.LOADED &&
|
||||
state != replicationSync;
|
||||
|
||||
checkControlFile();
|
||||
checkControlFile(wholeFileBufferRef);
|
||||
|
||||
records.clear();
|
||||
|
||||
|
@ -1796,7 +1826,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
private void checkID(final long id) {
|
||||
if (id > maxID.longValue()) {
|
||||
maxID.set(id);
|
||||
maxID.lazySet(id);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1804,7 +1834,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
public void onReadAddRecord(final RecordInfo info) throws Exception {
|
||||
checkID(info.id);
|
||||
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
loadManager.addRecord(info);
|
||||
|
||||
|
@ -1815,7 +1845,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
public void onReadUpdateRecord(final RecordInfo info) throws Exception {
|
||||
checkID(info.id);
|
||||
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
loadManager.updateRecord(info);
|
||||
|
||||
|
@ -1833,7 +1863,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
@Override
|
||||
public void onReadDeleteRecord(final long recordID) throws Exception {
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
loadManager.deleteRecord(recordID);
|
||||
|
||||
|
@ -1854,7 +1884,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
checkID(info.id);
|
||||
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
TransactionHolder tx = loadTransactions.get(transactionID);
|
||||
|
||||
|
@ -1880,7 +1910,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
@Override
|
||||
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
TransactionHolder tx = loadTransactions.get(transactionID);
|
||||
|
||||
|
@ -1908,7 +1938,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
public void onReadPrepareRecord(final long transactionID,
|
||||
final byte[] extraData,
|
||||
final int numberOfRecords) throws Exception {
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
TransactionHolder tx = loadTransactions.get(transactionID);
|
||||
|
||||
|
@ -1981,7 +2011,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
journalTransaction.forget();
|
||||
}
|
||||
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2005,16 +2035,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
// Rollbacks.. We will ignore the data anyway.
|
||||
tnp.rollback(file);
|
||||
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markAsDataFile(final JournalFile file) {
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
}
|
||||
|
||||
});
|
||||
}, wholeFileBufferRef);
|
||||
|
||||
if (hasData.get()) {
|
||||
lastDataPos = resultLastPost;
|
||||
|
@ -2050,7 +2080,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
} else {
|
||||
for (RecordInfo info : transaction.recordInfos) {
|
||||
if (info.id > maxID.get()) {
|
||||
maxID.set(info.id);
|
||||
maxID.lazySet(info.id);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2069,6 +2099,30 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
return new JournalLoadInformation(records.size(), maxID.longValue());
|
||||
}
|
||||
|
||||
private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
|
||||
final boolean changeData,
|
||||
final JournalState replicationSync) throws Exception {
|
||||
final JournalState state = this.state;
|
||||
if (state == JournalState.STOPPED || state == JournalState.LOADED) {
|
||||
throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
|
||||
state);
|
||||
}
|
||||
if (state == replicationSync) {
|
||||
throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
|
||||
}
|
||||
// AtomicReference is used only as a reference, not as an Atomic value
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferRef = new AtomicReference<>();
|
||||
try {
|
||||
return load(loadManager, changeData, replicationSync, wholeFileBufferRef);
|
||||
} finally {
|
||||
final ByteBuffer wholeFileBuffer = wholeFileBufferRef.get();
|
||||
if (wholeFileBuffer != null) {
|
||||
fileFactory.releaseDirectBuffer(wholeFileBuffer);
|
||||
wholeFileBufferRef.lazySet(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if cleanup was called
|
||||
*/
|
||||
|
@ -2808,12 +2862,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
private void checkControlFile() throws Exception {
|
||||
private void checkControlFile(AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
|
||||
ArrayList<String> dataFiles = new ArrayList<>();
|
||||
ArrayList<String> newFiles = new ArrayList<>();
|
||||
ArrayList<Pair<String, String>> renames = new ArrayList<>();
|
||||
|
||||
SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames);
|
||||
SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames, wholeFileBufferRef);
|
||||
if (controlFile != null) {
|
||||
for (String dataFile : dataFiles) {
|
||||
SequentialFile file = fileFactory.createSequentialFile(dataFile);
|
||||
|
@ -2904,18 +2958,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
}
|
||||
|
||||
private static final class JournalFileComparator implements Comparator<JournalFile>, Serializable {
|
||||
|
||||
private static final long serialVersionUID = -6264728973604070321L;
|
||||
|
||||
@Override
|
||||
public int compare(final JournalFile f1, final JournalFile f2) {
|
||||
long id1 = f1.getFileID();
|
||||
long id2 = f2.getFileID();
|
||||
|
||||
return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
|
||||
}
|
||||
}
|
||||
private static final Comparator<JournalFile> JOURNAL_FILE_COMPARATOR = Comparator.comparingLong(JournalFile::getFileID);
|
||||
|
||||
@Override
|
||||
public final void synchronizationLock() {
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
|
|||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
||||
|
||||
public class FakeSequentialFileFactory implements SequentialFileFactory {
|
||||
|
||||
|
@ -417,6 +418,18 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void blockingWriteDirect(ByteBuffer bytes,
|
||||
boolean sync,
|
||||
boolean releaseBuffer) throws Exception {
|
||||
SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
|
||||
try {
|
||||
writeDirect(bytes, sync, callback);
|
||||
} finally {
|
||||
callback.waitCompletion();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
if (supportsCallback) {
|
||||
|
|
Loading…
Reference in New Issue