This commit is contained in:
Clebert Suconic 2018-10-17 20:49:51 -04:00
commit 69447ca1f8

View File

@ -56,6 +56,8 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
private ActiveMQBuffer writingChannel; private ActiveMQBuffer writingChannel;
private ByteBuffer bufferWrite;
private final ConcurrentLongHashSet recordsSnapshot; private final ConcurrentLongHashSet recordsSnapshot;
protected final List<JournalFile> newDataFiles = new ArrayList<>(); protected final List<JournalFile> newDataFiles = new ArrayList<>();
@ -214,11 +216,17 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
// To Fix the size of the file // To Fix the size of the file
writingChannel.writerIndex(writingChannel.capacity()); writingChannel.writerIndex(writingChannel.capacity());
sequentialFile.writeDirect(writingChannel.toByteBuffer(), true); bufferWrite.clear()
.position(writingChannel.readerIndex())
.limit(writingChannel.readableBytes());
sequentialFile.writeDirect(bufferWrite, true);
sequentialFile.close(); sequentialFile.close();
newDataFiles.add(currentFile); newDataFiles.add(currentFile);
} }
bufferWrite = null;
writingChannel = null; writingChannel = null;
} }
@ -237,7 +245,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
protected void openFile() throws Exception { protected void openFile() throws Exception {
flush(); flush();
ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize()); bufferWrite = fileFactory.newBuffer(journal.getFileSize());
writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite); writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);