diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java index 44db231e98..fbc8dc312c 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.ByteUtil; import org.jboss.logging.Logger; public abstract class AbstractSequentialFile implements SequentialFile { @@ -256,12 +257,11 @@ public abstract class AbstractSequentialFile implements SequentialFile { return file; } - protected ByteBuffer newBuffer(int size, int limit) { - size = factory.calculateBlockSize(size); - limit = factory.calculateBlockSize(limit); + protected ByteBuffer newBuffer(int requiredCapacity, boolean zeroed) { + final int alignedRequiredCapacity = factory.calculateBlockSize(requiredCapacity); - ByteBuffer buffer = factory.newBuffer(size); - buffer.limit(limit); + ByteBuffer buffer = factory.newBuffer(alignedRequiredCapacity, zeroed); + buffer.limit(alignedRequiredCapacity); return buffer; } @@ -271,9 +271,15 @@ public abstract class AbstractSequentialFile implements SequentialFile { public void flushBuffer(final ByteBuf byteBuf, final boolean requestedSync, final List callbacks) { final int bytes = byteBuf.readableBytes(); if (bytes > 0) { - final ByteBuffer buffer = newBuffer(byteBuf.capacity(), bytes); + final ByteBuffer buffer = newBuffer(bytes, false); + final int alignedLimit = buffer.limit(); + assert alignedLimit == factory.calculateBlockSize(bytes); buffer.limit(bytes); byteBuf.getBytes(byteBuf.readerIndex(), buffer); + final int missingNonZeroedBytes = alignedLimit - bytes; + if (missingNonZeroedBytes > 0) { + ByteUtil.zeros(buffer, bytes, missingNonZeroedBytes); + } buffer.flip(); writeDirect(buffer, requestedSync, new DelegateCallback(callbacks)); } else { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java index a724ad342f..c1817fbfde 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java @@ -74,6 +74,18 @@ public interface SequentialFileFactory { */ ByteBuffer newBuffer(int size); + /** + * Note: You need to release the buffer if is used for reading operations. You don't need to do + * it if using writing operations (AIO Buffer Lister will take of writing operations) + * + * @param size + * @param zeroed if {@code true} the returned {@link ByteBuffer} must be zeroed, otherwise it tries to save zeroing it. + * @return the allocated ByteBuffer + */ + default ByteBuffer newBuffer(int size, boolean zeroed) { + return newBuffer(size); + } + void releaseBuffer(ByteBuffer buffer); void activateBuffer(SequentialFile file); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index 015510cb5b..fda7ad0035 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -337,19 +337,6 @@ public class AIOSequentialFile extends AbstractSequentialFile { return "AIOSequentialFile:" + getFile().getAbsolutePath(); } - // Protected methods - // ----------------------------------------------------------------------------------------------------- - - @Override - protected ByteBuffer newBuffer(int size, int limit) { - size = factory.calculateBlockSize(size); - limit = factory.calculateBlockSize(limit); - - ByteBuffer buffer = factory.newBuffer(size); - buffer.limit(limit); - return buffer; - } - // Private methods // ----------------------------------------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index cc3c91c48b..c89122d98b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -30,11 +31,12 @@ import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext; import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile; import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo; import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.PowerOf2Util; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.jboss.logging.Logger; @@ -181,14 +183,25 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public ByteBuffer newBuffer(int size) { + return newBuffer(size, true); + } + + @Override + public ByteBuffer newBuffer(int size, boolean zeroed) { final int alignedSize = calculateBlockSize(size); - return buffersControl.newBuffer(alignedSize, true); + return buffersControl.newBuffer(alignedSize, zeroed); } @Override public void clearBuffer(final ByteBuffer directByteBuffer) { directByteBuffer.position(0); - libaioContext.memsetBuffer(directByteBuffer); + if (PlatformDependent.hasUnsafe()) { + // that's the same semantic of libaioContext.memsetBuffer: it hasn't any JNI cost + ByteUtil.zeros(directByteBuffer, 0, directByteBuffer.limit()); + } else { + // JNI cost + libaioContext.memsetBuffer(directByteBuffer); + } } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java index 1d7d6ba11f..cc7f680965 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java @@ -100,10 +100,15 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac @Override public ByteBuffer newBuffer(int size) { + return newBuffer(size, true); + } + + @Override + public ByteBuffer newBuffer(int size, boolean zeroed) { if (!this.bufferPooling) { return allocateDirectBuffer(size); } else { - return bytesPool.borrow(size, true); + return bytesPool.borrow(size, zeroed); } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java index 0d644ccd6e..7c61a70a3c 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java @@ -261,7 +261,7 @@ final class TimedSequentialFile implements SequentialFile { } else { //perform the copy on buffer releaseBuffer = true; - buffer = factory.newBuffer(byteBuf.capacity()); + buffer = factory.newBuffer(byteBuf.capacity(), false); buffer.limit(bytes); byteBuf.getBytes(byteBuf.readerIndex(), buffer); buffer.flip(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index 4ec79f2b37..a318503ac5 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -387,15 +387,6 @@ public class NIOSequentialFile extends AbstractSequentialFile { internalWrite(bytes, sync, null, releaseBuffer); } - @Override - protected ByteBuffer newBuffer(int size, final int limit) { - // For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO - - size = limit; - - return super.newBuffer(size, limit); - } - private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback, diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java index f8f5971ab9..d3c7172fe7 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -139,10 +139,15 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { @Override public ByteBuffer newBuffer(int size) { + return newBuffer(size, true); + } + + @Override + public ByteBuffer newBuffer(int size, boolean zeroed) { if (!this.bufferPooling) { return allocateDirectBuffer(size); } else { - return bytesPool.borrow(size, true); + return bytesPool.borrow(size, zeroed); } }