diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 0b88d9ae2f..8bbf7b96fa 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -250,6 +250,11 @@ public class JDBCSequentialFile implements SequentialFile {
}
+ @Override
+ public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) {
+ writeDirect(bytes, sync, null);
+ }
+
@Override
public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
writeDirect(bytes, sync, null);
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
index 49130e6959..b6e49385f7 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java
@@ -77,6 +77,20 @@ public interface SequentialFile {
*/
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
+ /**
+ * Write directly to the file without using any intermediate buffer and wait completion.
+ * If {@code releaseBuffer} is {@code true} the provided {@code bytes} should be released
+ * through {@link SequentialFileFactory#releaseBuffer(ByteBuffer)}, if supported.
+ *
+ * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
+ * NIO). If {@code releaseBuffer} is {@code true} use a buffer from
+ * {@link SequentialFileFactory#newBuffer(int)}, {@link SequentialFileFactory#allocateDirectBuffer(int)}
+ * otherwise.
+ * @param sync if {@code true} will durable flush the written data on the file, {@code false} otherwise
+ * @param releaseBuffer if {@code true} will release the buffer, {@code false} otherwise
+ */
+ void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception;
+
/**
* @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
* NIO). To be safe, use a buffer from the corresponding
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
index 139b23613b..bc3c40850e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
@@ -233,6 +233,33 @@ public class AIOSequentialFile extends AbstractSequentialFile {
}
}
+ @Override
+ public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Write Direct, Sync: true File: " + getFileName());
+ }
+
+ final SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+ try {
+ checkOpened();
+ } catch (Exception e) {
+ ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
+ completion.onError(-1, e.getMessage());
+ return;
+ }
+
+ final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
+
+ final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+ final AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(completion, bytes, releaseBuffer);
+ runnableCallback.initWrite(positionToWrite, bytesToWrite);
+ runnableCallback.run();
+
+ completion.waitCompletion();
+ }
+
/**
* Note: Parameter sync is not used on AIO
*/
@@ -256,8 +283,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
}
AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) {
+ return getCallback(originalCallback, buffer, true);
+ }
+
+ AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback,
+ ByteBuffer buffer,
+ boolean releaseBuffer) {
AIOSequentialFileFactory.AIOSequentialCallback callback = aioFactory.getCallback();
- callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer);
+ callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer, releaseBuffer);
pendingCallbacks.countUp();
return callback;
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index d8288e6ca0..3cdf9fec39 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -286,7 +286,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
String errorMessage;
int errorCode = -1;
long writeSequence;
-
+ boolean releaseBuffer;
long position;
int bytes;
@@ -297,6 +297,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
", errorMessage='" + errorMessage + '\'' +
", errorCode=" + errorCode +
", writeSequence=" + writeSequence +
+ ", releaseBuffer=" + releaseBuffer +
", position=" + position +
'}';
}
@@ -332,7 +333,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
IOCallback IOCallback,
LibaioFile libaioFile,
AIOSequentialFile sequentialFile,
- ByteBuffer usedBuffer) {
+ ByteBuffer usedBuffer,
+ boolean releaseBuffer) {
this.callback = IOCallback;
this.sequentialFile = sequentialFile;
this.error = false;
@@ -340,6 +342,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
this.libaioFile = libaioFile;
this.writeSequence = writeSequence;
this.errorMessage = null;
+ this.releaseBuffer = releaseBuffer;
return this;
}
@@ -375,7 +378,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
callback.done();
}
- if (buffer != null && reuseBuffers) {
+ if (buffer != null && reuseBuffers && releaseBuffer) {
buffersControl.bufferDone(buffer);
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
index 4c5e23a3f8..efce2800cd 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
@@ -273,6 +273,28 @@ final class MappedSequentialFile implements SequentialFile {
}
}
+ @Override
+ public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
+ try {
+ checkIsOpen();
+ final int position = bytes.position();
+ final int limit = bytes.limit();
+ final int remaining = limit - position;
+ if (remaining > 0) {
+ this.mappedFile.write(bytes, position, remaining);
+ final int newPosition = position + remaining;
+ bytes.position(newPosition);
+ if (factory.isDatasync() && sync) {
+ this.mappedFile.force();
+ }
+ }
+ } finally {
+ if (releaseBuffer) {
+ this.factory.releaseBuffer(bytes);
+ }
+ }
+ }
+
@Override
public int read(ByteBuffer bytes, IOCallback callback) throws IOException {
if (callback == null) {
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
index 8436ed501a..e0a877a0a7 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
@@ -96,6 +96,11 @@ final class TimedSequentialFile implements SequentialFile {
this.sequentialFile.delete();
}
+ @Override
+ public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
+ this.sequentialFile.blockingWriteDirect(bytes, sync, releaseBuffer);
+ }
+
@Override
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
if (this.timedBuffer != null) {
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index 55654b7d5e..5f65b64bba 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -244,7 +244,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
}
try {
- internalWrite(bytes, sync, callback);
+ internalWrite(bytes, sync, callback, true);
} catch (Exception e) {
callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
}
@@ -252,7 +252,12 @@ public class NIOSequentialFile extends AbstractSequentialFile {
@Override
public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception {
- internalWrite(bytes, sync, null);
+ internalWrite(bytes, sync, null, true);
+ }
+
+ @Override
+ public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
+ internalWrite(bytes, sync, null, releaseBuffer);
}
@Override
@@ -266,7 +271,8 @@ public class NIOSequentialFile extends AbstractSequentialFile {
private void internalWrite(final ByteBuffer bytes,
final boolean sync,
- final IOCallback callback) throws IOException, ActiveMQIOErrorException, InterruptedException {
+ final IOCallback callback,
+ boolean releaseBuffer) throws IOException, ActiveMQIOErrorException, InterruptedException {
if (!isOpen()) {
if (callback != null) {
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
@@ -279,7 +285,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
position.addAndGet(bytes.limit());
try {
- doInternalWrite(bytes, sync, callback);
+ doInternalWrite(bytes, sync, callback, releaseBuffer);
} catch (ClosedChannelException e) {
throw e;
} catch (IOException e) {
@@ -296,7 +302,8 @@ public class NIOSequentialFile extends AbstractSequentialFile {
*/
private void doInternalWrite(final ByteBuffer bytes,
final boolean sync,
- final IOCallback callback) throws IOException {
+ final IOCallback callback,
+ boolean releaseBuffer) throws IOException {
try {
channel.write(bytes);
@@ -308,8 +315,10 @@ public class NIOSequentialFile extends AbstractSequentialFile {
callback.done();
}
} finally {
- //release it to recycle the write buffer if big enough
- this.factory.releaseBuffer(bytes);
+ if (releaseBuffer) {
+ //release it to recycle the write buffer if big enough
+ this.factory.releaseBuffer(bytes);
+ }
}
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index 10f5008f60..8f61f99cfa 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.journal.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@@ -152,10 +153,11 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
}
}
- public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
+ static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
final List dataFiles,
final List newFiles,
- final List> renameFile) throws Exception {
+ final List> renameFile,
+ final AtomicReference wholeFileBufferRef) throws Exception {
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
if (controlFile.exists()) {
@@ -163,13 +165,12 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
final ArrayList records = new ArrayList<>();
-
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() {
@Override
public void onReadAddRecord(final RecordInfo info) throws Exception {
records.add(info);
}
- });
+ }, wholeFileBufferRef);
if (records.size() == 0) {
// the record is damaged
@@ -205,29 +206,48 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
}
}
+ public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
+ final List dataFiles,
+ final List newFiles,
+ final List> renameFile) throws Exception {
+ return readControlFile(fileFactory, dataFiles, newFiles, renameFile, null);
+ }
+
+ private void flush(boolean releaseWritingBuffer) throws Exception {
+ if (writingChannel != null) {
+ try {
+ if (sequentialFile.isOpen()) {
+ try {
+ sequentialFile.position(0);
+
+ // To Fix the size of the file
+ writingChannel.writerIndex(writingChannel.capacity());
+
+ final ByteBuffer byteBuffer = bufferWrite;
+ final int readerIndex = writingChannel.readerIndex();
+ byteBuffer.clear().position(readerIndex).limit(readerIndex + writingChannel.readableBytes());
+ sequentialFile.blockingWriteDirect(byteBuffer, true, false);
+ } finally {
+ sequentialFile.close();
+ newDataFiles.add(currentFile);
+ }
+ }
+ } finally {
+ if (releaseWritingBuffer) {
+ //deterministic release of native resources
+ fileFactory.releaseDirectBuffer(bufferWrite);
+ writingChannel = null;
+ bufferWrite = null;
+ }
+ }
+ }
+ }
/**
* Write pending output into file
*/
public void flush() throws Exception {
- if (writingChannel != null) {
- sequentialFile.position(0);
-
- // To Fix the size of the file
- writingChannel.writerIndex(writingChannel.capacity());
-
- bufferWrite.clear()
- .position(writingChannel.readerIndex())
- .limit(writingChannel.readableBytes());
-
- sequentialFile.writeDirect(bufferWrite, true);
- sequentialFile.close();
- newDataFiles.add(currentFile);
- }
-
- bufferWrite = null;
-
- writingChannel = null;
+ flush(true);
}
public boolean containsRecord(final long id) {
@@ -243,11 +263,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
*/
protected void openFile() throws Exception {
- flush();
-
- bufferWrite = fileFactory.newBuffer(journal.getFileSize());
-
- writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
+ flush(false);
currentFile = filesRepository.openFileCMP();
@@ -257,6 +273,21 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
+ final int fileSize = journal.getFileSize();
+ if (bufferWrite != null && bufferWrite.capacity() < fileSize) {
+ fileFactory.releaseDirectBuffer(bufferWrite);
+ bufferWrite = null;
+ writingChannel = null;
+ }
+ if (bufferWrite == null) {
+ final ByteBuffer bufferWrite = fileFactory.allocateDirectBuffer(fileSize);
+ this.bufferWrite = bufferWrite;
+ writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
+ } else {
+ writingChannel.clear();
+ bufferWrite.clear();
+ }
+
JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 47bdc5b7c4..fe08ed8ccb 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.journal.impl;
import java.io.IOException;
-import java.io.Serializable;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedAction;
@@ -42,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -461,17 +461,34 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Now order them by ordering id - we can't use the file name for ordering
// since we can re-use dataFiles
- Collections.sort(orderedFiles, new JournalFileComparator());
+ Collections.sort(orderedFiles, JOURNAL_FILE_COMPARATOR);
return orderedFiles;
}
- /**
- * this method is used internally only however tools may use it to maintenance.
- */
- public static int readJournalFile(final SequentialFileFactory fileFactory,
- final JournalFile file,
- final JournalReaderCallback reader) throws Exception {
+ private static ByteBuffer allocateDirectBufferIfNeeded(final SequentialFileFactory fileFactory,
+ final int requiredCapacity,
+ final AtomicReference bufferRef) {
+ ByteBuffer buffer = bufferRef != null ? bufferRef.get() : null;
+ if (buffer != null && buffer.capacity() < requiredCapacity) {
+ fileFactory.releaseDirectBuffer(buffer);
+ buffer = null;
+ }
+ if (buffer == null) {
+ buffer = fileFactory.allocateDirectBuffer(requiredCapacity);
+ } else {
+ buffer.clear().limit(requiredCapacity);
+ }
+ if (bufferRef != null) {
+ bufferRef.lazySet(buffer);
+ }
+ return buffer;
+ }
+
+ static int readJournalFile(final SequentialFileFactory fileFactory,
+ final JournalFile file,
+ final JournalReaderCallback reader,
+ final AtomicReference wholeFileBufferReference) throws Exception {
file.getFile().open(1, false);
ByteBuffer wholeFileBuffer = null;
try {
@@ -481,8 +498,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// the file is damaged or the system crash before it was able to write
return -1;
}
-
- wholeFileBuffer = fileFactory.newBuffer(filesize);
+ wholeFileBuffer = allocateDirectBufferIfNeeded(fileFactory, filesize, wholeFileBufferReference);
final int journalFileSize = file.getFile().read(wholeFileBuffer);
@@ -771,16 +787,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
lastDataPos = wholeFileBuffer.position();
}
-
return lastDataPos;
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorReadingFile(e);
throw new Exception(e.getMessage(), e);
} finally {
- if (wholeFileBuffer != null) {
- fileFactory.releaseBuffer(wholeFileBuffer);
+ if (wholeFileBufferReference == null && wholeFileBuffer != null) {
+ fileFactory.releaseDirectBuffer(wholeFileBuffer);
}
-
try {
file.getFile().close();
} catch (Throwable ignored) {
@@ -788,6 +802,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
+ /**
+ * this method is used internally only however tools may use it to maintenance.
+ */
+ public static int readJournalFile(final SequentialFileFactory fileFactory,
+ final JournalFile file,
+ final JournalReaderCallback reader) throws Exception {
+ return readJournalFile(fileFactory, file, reader, null);
+ }
+
// Journal implementation
// ----------------------------------------------------------------
@@ -1594,19 +1617,28 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
journalLock.writeLock().unlock();
}
- Collections.sort(dataFilesToProcess, new JournalFileComparator());
+ Collections.sort(dataFilesToProcess, JOURNAL_FILE_COMPARATOR);
// This is where most of the work is done, taking most of the time of the compacting routine.
// Notice there are no locks while this is being done.
// Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
// well
- for (final JournalFile file : dataFilesToProcess) {
- try {
- JournalImpl.readJournalFile(fileFactory, file, compactor);
- } catch (Throwable e) {
- ActiveMQJournalLogger.LOGGER.compactReadError(file);
- throw new Exception("Error on reading compacting for " + file, e);
+ // this AtomicReference is not used for thread-safety, but just as a reference
+ final AtomicReference wholeFileBufferRef = dataFilesToProcess.isEmpty() ? null : new AtomicReference<>();
+ try {
+ for (final JournalFile file : dataFilesToProcess) {
+ try {
+ JournalImpl.readJournalFile(fileFactory, file, compactor, wholeFileBufferRef);
+ } catch (Throwable e) {
+ ActiveMQJournalLogger.LOGGER.compactReadError(file);
+ throw new Exception("Error on reading compacting for " + file, e);
+ }
+ }
+ } finally {
+ ByteBuffer wholeFileBuffer;
+ if (wholeFileBufferRef != null && (wholeFileBuffer = wholeFileBufferRef.get()) != null) {
+ fileFactory.releaseDirectBuffer(wholeFileBuffer);
}
}
@@ -1758,16 +1790,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
*/
private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
final boolean changeData,
- final JournalState replicationSync) throws Exception {
- if (state == JournalState.STOPPED || state == JournalState.LOADED) {
- throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
- state);
- }
- if (state == replicationSync) {
- throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
- }
+ final JournalState replicationSync,
+ final AtomicReference wholeFileBufferRef) throws Exception {
+ JournalState state;
+ assert (state = this.state) != JournalState.STOPPED &&
+ state != JournalState.LOADED &&
+ state != replicationSync;
- checkControlFile();
+ checkControlFile(wholeFileBufferRef);
records.clear();
@@ -1796,7 +1826,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private void checkID(final long id) {
if (id > maxID.longValue()) {
- maxID.set(id);
+ maxID.lazySet(id);
}
}
@@ -1804,7 +1834,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void onReadAddRecord(final RecordInfo info) throws Exception {
checkID(info.id);
- hasData.set(true);
+ hasData.lazySet(true);
loadManager.addRecord(info);
@@ -1815,7 +1845,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void onReadUpdateRecord(final RecordInfo info) throws Exception {
checkID(info.id);
- hasData.set(true);
+ hasData.lazySet(true);
loadManager.updateRecord(info);
@@ -1833,7 +1863,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public void onReadDeleteRecord(final long recordID) throws Exception {
- hasData.set(true);
+ hasData.lazySet(true);
loadManager.deleteRecord(recordID);
@@ -1854,7 +1884,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
checkID(info.id);
- hasData.set(true);
+ hasData.lazySet(true);
TransactionHolder tx = loadTransactions.get(transactionID);
@@ -1880,7 +1910,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
- hasData.set(true);
+ hasData.lazySet(true);
TransactionHolder tx = loadTransactions.get(transactionID);
@@ -1908,7 +1938,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void onReadPrepareRecord(final long transactionID,
final byte[] extraData,
final int numberOfRecords) throws Exception {
- hasData.set(true);
+ hasData.lazySet(true);
TransactionHolder tx = loadTransactions.get(transactionID);
@@ -1981,7 +2011,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
journalTransaction.forget();
}
- hasData.set(true);
+ hasData.lazySet(true);
}
}
@@ -2005,16 +2035,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Rollbacks.. We will ignore the data anyway.
tnp.rollback(file);
- hasData.set(true);
+ hasData.lazySet(true);
}
}
@Override
public void markAsDataFile(final JournalFile file) {
- hasData.set(true);
+ hasData.lazySet(true);
}
- });
+ }, wholeFileBufferRef);
if (hasData.get()) {
lastDataPos = resultLastPost;
@@ -2050,7 +2080,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} else {
for (RecordInfo info : transaction.recordInfos) {
if (info.id > maxID.get()) {
- maxID.set(info.id);
+ maxID.lazySet(info.id);
}
}
@@ -2069,6 +2099,30 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return new JournalLoadInformation(records.size(), maxID.longValue());
}
+ private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
+ final boolean changeData,
+ final JournalState replicationSync) throws Exception {
+ final JournalState state = this.state;
+ if (state == JournalState.STOPPED || state == JournalState.LOADED) {
+ throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
+ state);
+ }
+ if (state == replicationSync) {
+ throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
+ }
+ // AtomicReference is used only as a reference, not as an Atomic value
+ final AtomicReference wholeFileBufferRef = new AtomicReference<>();
+ try {
+ return load(loadManager, changeData, replicationSync, wholeFileBufferRef);
+ } finally {
+ final ByteBuffer wholeFileBuffer = wholeFileBufferRef.get();
+ if (wholeFileBuffer != null) {
+ fileFactory.releaseDirectBuffer(wholeFileBuffer);
+ wholeFileBufferRef.lazySet(null);
+ }
+ }
+ }
+
/**
* @return true if cleanup was called
*/
@@ -2808,12 +2862,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
/**
* @throws Exception
*/
- private void checkControlFile() throws Exception {
+ private void checkControlFile(AtomicReference wholeFileBufferRef) throws Exception {
ArrayList dataFiles = new ArrayList<>();
ArrayList newFiles = new ArrayList<>();
ArrayList> renames = new ArrayList<>();
- SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames);
+ SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames, wholeFileBufferRef);
if (controlFile != null) {
for (String dataFile : dataFiles) {
SequentialFile file = fileFactory.createSequentialFile(dataFile);
@@ -2904,18 +2958,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
- private static final class JournalFileComparator implements Comparator, Serializable {
-
- private static final long serialVersionUID = -6264728973604070321L;
-
- @Override
- public int compare(final JournalFile f1, final JournalFile f2) {
- long id1 = f1.getFileID();
- long id2 = f2.getFileID();
-
- return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
- }
- }
+ private static final Comparator JOURNAL_FILE_COMPARATOR = Comparator.comparingLong(JournalFile::getFileID);
@Override
public final void synchronizationLock() {
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
index e05b5d0962..155831fe9c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
public class FakeSequentialFileFactory implements SequentialFileFactory {
@@ -417,6 +418,18 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
}
+ @Override
+ public synchronized void blockingWriteDirect(ByteBuffer bytes,
+ boolean sync,
+ boolean releaseBuffer) throws Exception {
+ SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
+ try {
+ writeDirect(bytes, sync, callback);
+ } finally {
+ callback.waitCompletion();
+ }
+ }
+
@Override
public void sync() throws IOException {
if (supportsCallback) {