This closes #2250
This commit is contained in:
commit
5269df8525
|
@ -250,6 +250,11 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) {
|
||||
writeDirect(bytes, sync, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
|
||||
writeDirect(bytes, sync, null);
|
||||
|
|
|
@ -77,6 +77,20 @@ public interface SequentialFile {
|
|||
*/
|
||||
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
|
||||
|
||||
/**
|
||||
* Write directly to the file without using any intermediate buffer and wait completion.<br>
|
||||
* If {@code releaseBuffer} is {@code true} the provided {@code bytes} should be released
|
||||
* through {@link SequentialFileFactory#releaseBuffer(ByteBuffer)}, if supported.
|
||||
*
|
||||
* @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
|
||||
* NIO). If {@code releaseBuffer} is {@code true} use a buffer from
|
||||
* {@link SequentialFileFactory#newBuffer(int)}, {@link SequentialFileFactory#allocateDirectBuffer(int)}
|
||||
* otherwise.
|
||||
* @param sync if {@code true} will durable flush the written data on the file, {@code false} otherwise
|
||||
* @param releaseBuffer if {@code true} will release the buffer, {@code false} otherwise
|
||||
*/
|
||||
void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception;
|
||||
|
||||
/**
|
||||
* @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
|
||||
* NIO). To be safe, use a buffer from the corresponding
|
||||
|
|
|
@ -233,6 +233,33 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Write Direct, Sync: true File: " + getFileName());
|
||||
}
|
||||
|
||||
final SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
|
||||
|
||||
try {
|
||||
checkOpened();
|
||||
} catch (Exception e) {
|
||||
ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
|
||||
completion.onError(-1, e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
|
||||
|
||||
final long positionToWrite = position.getAndAdd(bytesToWrite);
|
||||
|
||||
final AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(completion, bytes, releaseBuffer);
|
||||
runnableCallback.initWrite(positionToWrite, bytesToWrite);
|
||||
runnableCallback.run();
|
||||
|
||||
completion.waitCompletion();
|
||||
}
|
||||
|
||||
/**
|
||||
* Note: Parameter sync is not used on AIO
|
||||
*/
|
||||
|
@ -256,8 +283,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
}
|
||||
|
||||
AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) {
|
||||
return getCallback(originalCallback, buffer, true);
|
||||
}
|
||||
|
||||
AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback,
|
||||
ByteBuffer buffer,
|
||||
boolean releaseBuffer) {
|
||||
AIOSequentialFileFactory.AIOSequentialCallback callback = aioFactory.getCallback();
|
||||
callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer);
|
||||
callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer, releaseBuffer);
|
||||
pendingCallbacks.countUp();
|
||||
return callback;
|
||||
}
|
||||
|
|
|
@ -286,7 +286,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
String errorMessage;
|
||||
int errorCode = -1;
|
||||
long writeSequence;
|
||||
|
||||
boolean releaseBuffer;
|
||||
long position;
|
||||
int bytes;
|
||||
|
||||
|
@ -297,6 +297,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
", errorMessage='" + errorMessage + '\'' +
|
||||
", errorCode=" + errorCode +
|
||||
", writeSequence=" + writeSequence +
|
||||
", releaseBuffer=" + releaseBuffer +
|
||||
", position=" + position +
|
||||
'}';
|
||||
}
|
||||
|
@ -332,7 +333,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
IOCallback IOCallback,
|
||||
LibaioFile libaioFile,
|
||||
AIOSequentialFile sequentialFile,
|
||||
ByteBuffer usedBuffer) {
|
||||
ByteBuffer usedBuffer,
|
||||
boolean releaseBuffer) {
|
||||
this.callback = IOCallback;
|
||||
this.sequentialFile = sequentialFile;
|
||||
this.error = false;
|
||||
|
@ -340,6 +342,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
this.libaioFile = libaioFile;
|
||||
this.writeSequence = writeSequence;
|
||||
this.errorMessage = null;
|
||||
this.releaseBuffer = releaseBuffer;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -375,7 +378,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
callback.done();
|
||||
}
|
||||
|
||||
if (buffer != null && reuseBuffers) {
|
||||
if (buffer != null && reuseBuffers && releaseBuffer) {
|
||||
buffersControl.bufferDone(buffer);
|
||||
}
|
||||
|
||||
|
|
|
@ -273,6 +273,28 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
|
||||
try {
|
||||
checkIsOpen();
|
||||
final int position = bytes.position();
|
||||
final int limit = bytes.limit();
|
||||
final int remaining = limit - position;
|
||||
if (remaining > 0) {
|
||||
this.mappedFile.write(bytes, position, remaining);
|
||||
final int newPosition = position + remaining;
|
||||
bytes.position(newPosition);
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (releaseBuffer) {
|
||||
this.factory.releaseBuffer(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(ByteBuffer bytes, IOCallback callback) throws IOException {
|
||||
if (callback == null) {
|
||||
|
|
|
@ -96,6 +96,11 @@ final class TimedSequentialFile implements SequentialFile {
|
|||
this.sequentialFile.delete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
|
||||
this.sequentialFile.blockingWriteDirect(bytes, sync, releaseBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
|
||||
if (this.timedBuffer != null) {
|
||||
|
|
|
@ -244,7 +244,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
}
|
||||
|
||||
try {
|
||||
internalWrite(bytes, sync, callback);
|
||||
internalWrite(bytes, sync, callback, true);
|
||||
} catch (Exception e) {
|
||||
callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
|
||||
}
|
||||
|
@ -252,7 +252,12 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
@Override
|
||||
public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception {
|
||||
internalWrite(bytes, sync, null);
|
||||
internalWrite(bytes, sync, null, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception {
|
||||
internalWrite(bytes, sync, null, releaseBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -266,7 +271,8 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
private void internalWrite(final ByteBuffer bytes,
|
||||
final boolean sync,
|
||||
final IOCallback callback) throws IOException, ActiveMQIOErrorException, InterruptedException {
|
||||
final IOCallback callback,
|
||||
boolean releaseBuffer) throws IOException, ActiveMQIOErrorException, InterruptedException {
|
||||
if (!isOpen()) {
|
||||
if (callback != null) {
|
||||
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
|
||||
|
@ -279,7 +285,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
position.addAndGet(bytes.limit());
|
||||
|
||||
try {
|
||||
doInternalWrite(bytes, sync, callback);
|
||||
doInternalWrite(bytes, sync, callback, releaseBuffer);
|
||||
} catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
|
@ -296,7 +302,8 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
*/
|
||||
private void doInternalWrite(final ByteBuffer bytes,
|
||||
final boolean sync,
|
||||
final IOCallback callback) throws IOException {
|
||||
final IOCallback callback,
|
||||
boolean releaseBuffer) throws IOException {
|
||||
try {
|
||||
channel.write(bytes);
|
||||
|
||||
|
@ -308,8 +315,10 @@ public class NIOSequentialFile extends AbstractSequentialFile {
|
|||
callback.done();
|
||||
}
|
||||
} finally {
|
||||
//release it to recycle the write buffer if big enough
|
||||
this.factory.releaseBuffer(bytes);
|
||||
if (releaseBuffer) {
|
||||
//release it to recycle the write buffer if big enough
|
||||
this.factory.releaseBuffer(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.journal.impl;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
|
@ -152,10 +153,11 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
}
|
||||
}
|
||||
|
||||
public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
|
||||
static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
|
||||
final List<String> dataFiles,
|
||||
final List<String> newFiles,
|
||||
final List<Pair<String, String>> renameFile) throws Exception {
|
||||
final List<Pair<String, String>> renameFile,
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
|
||||
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
|
||||
|
||||
if (controlFile.exists()) {
|
||||
|
@ -163,13 +165,12 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
|
||||
final ArrayList<RecordInfo> records = new ArrayList<>();
|
||||
|
||||
|
||||
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() {
|
||||
@Override
|
||||
public void onReadAddRecord(final RecordInfo info) throws Exception {
|
||||
records.add(info);
|
||||
}
|
||||
});
|
||||
}, wholeFileBufferRef);
|
||||
|
||||
if (records.size() == 0) {
|
||||
// the record is damaged
|
||||
|
@ -205,29 +206,48 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
}
|
||||
}
|
||||
|
||||
public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
|
||||
final List<String> dataFiles,
|
||||
final List<String> newFiles,
|
||||
final List<Pair<String, String>> renameFile) throws Exception {
|
||||
return readControlFile(fileFactory, dataFiles, newFiles, renameFile, null);
|
||||
}
|
||||
|
||||
private void flush(boolean releaseWritingBuffer) throws Exception {
|
||||
if (writingChannel != null) {
|
||||
try {
|
||||
if (sequentialFile.isOpen()) {
|
||||
try {
|
||||
sequentialFile.position(0);
|
||||
|
||||
// To Fix the size of the file
|
||||
writingChannel.writerIndex(writingChannel.capacity());
|
||||
|
||||
final ByteBuffer byteBuffer = bufferWrite;
|
||||
final int readerIndex = writingChannel.readerIndex();
|
||||
byteBuffer.clear().position(readerIndex).limit(readerIndex + writingChannel.readableBytes());
|
||||
sequentialFile.blockingWriteDirect(byteBuffer, true, false);
|
||||
} finally {
|
||||
sequentialFile.close();
|
||||
newDataFiles.add(currentFile);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (releaseWritingBuffer) {
|
||||
//deterministic release of native resources
|
||||
fileFactory.releaseDirectBuffer(bufferWrite);
|
||||
writingChannel = null;
|
||||
bufferWrite = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write pending output into file
|
||||
*/
|
||||
public void flush() throws Exception {
|
||||
if (writingChannel != null) {
|
||||
sequentialFile.position(0);
|
||||
|
||||
// To Fix the size of the file
|
||||
writingChannel.writerIndex(writingChannel.capacity());
|
||||
|
||||
bufferWrite.clear()
|
||||
.position(writingChannel.readerIndex())
|
||||
.limit(writingChannel.readableBytes());
|
||||
|
||||
sequentialFile.writeDirect(bufferWrite, true);
|
||||
sequentialFile.close();
|
||||
newDataFiles.add(currentFile);
|
||||
}
|
||||
|
||||
bufferWrite = null;
|
||||
|
||||
writingChannel = null;
|
||||
flush(true);
|
||||
}
|
||||
|
||||
public boolean containsRecord(final long id) {
|
||||
|
@ -243,11 +263,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
*/
|
||||
|
||||
protected void openFile() throws Exception {
|
||||
flush();
|
||||
|
||||
bufferWrite = fileFactory.newBuffer(journal.getFileSize());
|
||||
|
||||
writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
|
||||
flush(false);
|
||||
|
||||
currentFile = filesRepository.openFileCMP();
|
||||
|
||||
|
@ -257,6 +273,21 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
|
||||
currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
|
||||
|
||||
final int fileSize = journal.getFileSize();
|
||||
if (bufferWrite != null && bufferWrite.capacity() < fileSize) {
|
||||
fileFactory.releaseDirectBuffer(bufferWrite);
|
||||
bufferWrite = null;
|
||||
writingChannel = null;
|
||||
}
|
||||
if (bufferWrite == null) {
|
||||
final ByteBuffer bufferWrite = fileFactory.allocateDirectBuffer(fileSize);
|
||||
this.bufferWrite = bufferWrite;
|
||||
writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
|
||||
} else {
|
||||
writingChannel.clear();
|
||||
bufferWrite.clear();
|
||||
}
|
||||
|
||||
JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.activemq.artemis.core.journal.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
|
@ -42,6 +41,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
|
@ -461,17 +461,34 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
// Now order them by ordering id - we can't use the file name for ordering
|
||||
// since we can re-use dataFiles
|
||||
|
||||
Collections.sort(orderedFiles, new JournalFileComparator());
|
||||
Collections.sort(orderedFiles, JOURNAL_FILE_COMPARATOR);
|
||||
|
||||
return orderedFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* this method is used internally only however tools may use it to maintenance.
|
||||
*/
|
||||
public static int readJournalFile(final SequentialFileFactory fileFactory,
|
||||
final JournalFile file,
|
||||
final JournalReaderCallback reader) throws Exception {
|
||||
private static ByteBuffer allocateDirectBufferIfNeeded(final SequentialFileFactory fileFactory,
|
||||
final int requiredCapacity,
|
||||
final AtomicReference<ByteBuffer> bufferRef) {
|
||||
ByteBuffer buffer = bufferRef != null ? bufferRef.get() : null;
|
||||
if (buffer != null && buffer.capacity() < requiredCapacity) {
|
||||
fileFactory.releaseDirectBuffer(buffer);
|
||||
buffer = null;
|
||||
}
|
||||
if (buffer == null) {
|
||||
buffer = fileFactory.allocateDirectBuffer(requiredCapacity);
|
||||
} else {
|
||||
buffer.clear().limit(requiredCapacity);
|
||||
}
|
||||
if (bufferRef != null) {
|
||||
bufferRef.lazySet(buffer);
|
||||
}
|
||||
return buffer;
|
||||
}
|
||||
|
||||
static int readJournalFile(final SequentialFileFactory fileFactory,
|
||||
final JournalFile file,
|
||||
final JournalReaderCallback reader,
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferReference) throws Exception {
|
||||
file.getFile().open(1, false);
|
||||
ByteBuffer wholeFileBuffer = null;
|
||||
try {
|
||||
|
@ -481,8 +498,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
// the file is damaged or the system crash before it was able to write
|
||||
return -1;
|
||||
}
|
||||
|
||||
wholeFileBuffer = fileFactory.newBuffer(filesize);
|
||||
wholeFileBuffer = allocateDirectBufferIfNeeded(fileFactory, filesize, wholeFileBufferReference);
|
||||
|
||||
final int journalFileSize = file.getFile().read(wholeFileBuffer);
|
||||
|
||||
|
@ -771,16 +787,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
lastDataPos = wholeFileBuffer.position();
|
||||
|
||||
}
|
||||
|
||||
return lastDataPos;
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorReadingFile(e);
|
||||
throw new Exception(e.getMessage(), e);
|
||||
} finally {
|
||||
if (wholeFileBuffer != null) {
|
||||
fileFactory.releaseBuffer(wholeFileBuffer);
|
||||
if (wholeFileBufferReference == null && wholeFileBuffer != null) {
|
||||
fileFactory.releaseDirectBuffer(wholeFileBuffer);
|
||||
}
|
||||
|
||||
try {
|
||||
file.getFile().close();
|
||||
} catch (Throwable ignored) {
|
||||
|
@ -788,6 +802,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* this method is used internally only however tools may use it to maintenance.
|
||||
*/
|
||||
public static int readJournalFile(final SequentialFileFactory fileFactory,
|
||||
final JournalFile file,
|
||||
final JournalReaderCallback reader) throws Exception {
|
||||
return readJournalFile(fileFactory, file, reader, null);
|
||||
}
|
||||
|
||||
// Journal implementation
|
||||
// ----------------------------------------------------------------
|
||||
|
||||
|
@ -1594,19 +1617,28 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
journalLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
Collections.sort(dataFilesToProcess, new JournalFileComparator());
|
||||
Collections.sort(dataFilesToProcess, JOURNAL_FILE_COMPARATOR);
|
||||
|
||||
// This is where most of the work is done, taking most of the time of the compacting routine.
|
||||
// Notice there are no locks while this is being done.
|
||||
|
||||
// Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
|
||||
// well
|
||||
for (final JournalFile file : dataFilesToProcess) {
|
||||
try {
|
||||
JournalImpl.readJournalFile(fileFactory, file, compactor);
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.compactReadError(file);
|
||||
throw new Exception("Error on reading compacting for " + file, e);
|
||||
// this AtomicReference is not used for thread-safety, but just as a reference
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferRef = dataFilesToProcess.isEmpty() ? null : new AtomicReference<>();
|
||||
try {
|
||||
for (final JournalFile file : dataFilesToProcess) {
|
||||
try {
|
||||
JournalImpl.readJournalFile(fileFactory, file, compactor, wholeFileBufferRef);
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.compactReadError(file);
|
||||
throw new Exception("Error on reading compacting for " + file, e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
ByteBuffer wholeFileBuffer;
|
||||
if (wholeFileBufferRef != null && (wholeFileBuffer = wholeFileBufferRef.get()) != null) {
|
||||
fileFactory.releaseDirectBuffer(wholeFileBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1758,16 +1790,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
*/
|
||||
private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
|
||||
final boolean changeData,
|
||||
final JournalState replicationSync) throws Exception {
|
||||
if (state == JournalState.STOPPED || state == JournalState.LOADED) {
|
||||
throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
|
||||
state);
|
||||
}
|
||||
if (state == replicationSync) {
|
||||
throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
|
||||
}
|
||||
final JournalState replicationSync,
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
|
||||
JournalState state;
|
||||
assert (state = this.state) != JournalState.STOPPED &&
|
||||
state != JournalState.LOADED &&
|
||||
state != replicationSync;
|
||||
|
||||
checkControlFile();
|
||||
checkControlFile(wholeFileBufferRef);
|
||||
|
||||
records.clear();
|
||||
|
||||
|
@ -1796,7 +1826,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
private void checkID(final long id) {
|
||||
if (id > maxID.longValue()) {
|
||||
maxID.set(id);
|
||||
maxID.lazySet(id);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1804,7 +1834,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
public void onReadAddRecord(final RecordInfo info) throws Exception {
|
||||
checkID(info.id);
|
||||
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
loadManager.addRecord(info);
|
||||
|
||||
|
@ -1815,7 +1845,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
public void onReadUpdateRecord(final RecordInfo info) throws Exception {
|
||||
checkID(info.id);
|
||||
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
loadManager.updateRecord(info);
|
||||
|
||||
|
@ -1833,7 +1863,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
@Override
|
||||
public void onReadDeleteRecord(final long recordID) throws Exception {
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
loadManager.deleteRecord(recordID);
|
||||
|
||||
|
@ -1854,7 +1884,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
checkID(info.id);
|
||||
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
TransactionHolder tx = loadTransactions.get(transactionID);
|
||||
|
||||
|
@ -1880,7 +1910,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
@Override
|
||||
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
TransactionHolder tx = loadTransactions.get(transactionID);
|
||||
|
||||
|
@ -1908,7 +1938,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
public void onReadPrepareRecord(final long transactionID,
|
||||
final byte[] extraData,
|
||||
final int numberOfRecords) throws Exception {
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
|
||||
TransactionHolder tx = loadTransactions.get(transactionID);
|
||||
|
||||
|
@ -1981,7 +2011,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
journalTransaction.forget();
|
||||
}
|
||||
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2005,16 +2035,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
// Rollbacks.. We will ignore the data anyway.
|
||||
tnp.rollback(file);
|
||||
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markAsDataFile(final JournalFile file) {
|
||||
hasData.set(true);
|
||||
hasData.lazySet(true);
|
||||
}
|
||||
|
||||
});
|
||||
}, wholeFileBufferRef);
|
||||
|
||||
if (hasData.get()) {
|
||||
lastDataPos = resultLastPost;
|
||||
|
@ -2050,7 +2080,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
} else {
|
||||
for (RecordInfo info : transaction.recordInfos) {
|
||||
if (info.id > maxID.get()) {
|
||||
maxID.set(info.id);
|
||||
maxID.lazySet(info.id);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2069,6 +2099,30 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
return new JournalLoadInformation(records.size(), maxID.longValue());
|
||||
}
|
||||
|
||||
private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
|
||||
final boolean changeData,
|
||||
final JournalState replicationSync) throws Exception {
|
||||
final JournalState state = this.state;
|
||||
if (state == JournalState.STOPPED || state == JournalState.LOADED) {
|
||||
throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
|
||||
state);
|
||||
}
|
||||
if (state == replicationSync) {
|
||||
throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
|
||||
}
|
||||
// AtomicReference is used only as a reference, not as an Atomic value
|
||||
final AtomicReference<ByteBuffer> wholeFileBufferRef = new AtomicReference<>();
|
||||
try {
|
||||
return load(loadManager, changeData, replicationSync, wholeFileBufferRef);
|
||||
} finally {
|
||||
final ByteBuffer wholeFileBuffer = wholeFileBufferRef.get();
|
||||
if (wholeFileBuffer != null) {
|
||||
fileFactory.releaseDirectBuffer(wholeFileBuffer);
|
||||
wholeFileBufferRef.lazySet(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if cleanup was called
|
||||
*/
|
||||
|
@ -2808,12 +2862,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
private void checkControlFile() throws Exception {
|
||||
private void checkControlFile(AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
|
||||
ArrayList<String> dataFiles = new ArrayList<>();
|
||||
ArrayList<String> newFiles = new ArrayList<>();
|
||||
ArrayList<Pair<String, String>> renames = new ArrayList<>();
|
||||
|
||||
SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames);
|
||||
SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames, wholeFileBufferRef);
|
||||
if (controlFile != null) {
|
||||
for (String dataFile : dataFiles) {
|
||||
SequentialFile file = fileFactory.createSequentialFile(dataFile);
|
||||
|
@ -2904,18 +2958,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
}
|
||||
|
||||
private static final class JournalFileComparator implements Comparator<JournalFile>, Serializable {
|
||||
|
||||
private static final long serialVersionUID = -6264728973604070321L;
|
||||
|
||||
@Override
|
||||
public int compare(final JournalFile f1, final JournalFile f2) {
|
||||
long id1 = f1.getFileID();
|
||||
long id2 = f2.getFileID();
|
||||
|
||||
return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
|
||||
}
|
||||
}
|
||||
private static final Comparator<JournalFile> JOURNAL_FILE_COMPARATOR = Comparator.comparingLong(JournalFile::getFileID);
|
||||
|
||||
@Override
|
||||
public final void synchronizationLock() {
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
|
|||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
||||
|
||||
public class FakeSequentialFileFactory implements SequentialFileFactory {
|
||||
|
||||
|
@ -417,6 +418,18 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void blockingWriteDirect(ByteBuffer bytes,
|
||||
boolean sync,
|
||||
boolean releaseBuffer) throws Exception {
|
||||
SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
|
||||
try {
|
||||
writeDirect(bytes, sync, callback);
|
||||
} finally {
|
||||
callback.waitCompletion();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
if (supportsCallback) {
|
||||
|
|
Loading…
Reference in New Issue