diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
index 29635e6163..cd42f808d2 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
@@ -216,6 +216,9 @@ public class Create extends InputAbstract {
@Option(name = "--nio", description = "sets the journal as nio.")
boolean nio;
+ @Option(name = "--mapped", description = "Sets the journal as mapped.")
+ boolean mapped;
+
// this is used by the setupJournalType method
private JournalType journalType;
@@ -797,7 +800,7 @@ public class Create extends InputAbstract {
}
private void setupJournalType() {
- int countJournalTypes = countBoolean(aio, nio);
+ int countJournalTypes = countBoolean(aio, nio, mapped);
if (countJournalTypes > 1) {
throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped).");
}
@@ -814,6 +817,8 @@ public class Create extends InputAbstract {
journalType = JournalType.ASYNCIO;
} else if (nio) {
journalType = JournalType.NIO;
+ } else if (mapped) {
+ journalType = JournalType.MAPPED;
}
}
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
index ae7a8ca389..656327862e 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
@@ -24,9 +24,11 @@ import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.JournalType;
@@ -186,6 +188,16 @@ public class SyncCalculation {
factory.start();
((AIOSequentialFileFactory) factory).disableBufferReuse();
return factory;
+ case MAPPED:
+ factory = new MappedSequentialFileFactory(datafolder, new IOCriticalErrorListener() {
+ @Override
+ public void onIOException(Throwable code, String message, SequentialFile file) {
+
+ }
+ }, true).chunkBytes(fileSize).overlapBytes(0).setDatasync(datasync);
+
+ factory.start();
+ return factory;
default:
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(journalType);
}
diff --git a/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
new file mode 100644
index 0000000000..0da33c68ea
--- /dev/null
+++ b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * A NIO direct {@link ByteBuffer} wrapper.
+ * Only ByteBuffer's manipulation operations are supported.
+ * Is best suited only for encoding/decoding purposes.
+ */
+public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf {
+
+ private ByteBuffer buffer;
+ private long memoryAddress;
+
+ /**
+ * Creates a new direct buffer by wrapping the specified initial buffer.
+ */
+ public UnpooledUnsafeDirectByteBufWrapper() {
+ super(0);
+ this.buffer = null;
+ this.memoryAddress = 0L;
+ }
+
+ public void wrap(ByteBuffer buffer, int srcIndex, int length) {
+ if (buffer != null) {
+ this.buffer = buffer;
+ this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex;
+ clear();
+ maxCapacity(length);
+ } else {
+ reset();
+ }
+ }
+
+ public void reset() {
+ this.buffer = null;
+ this.memoryAddress = 0L;
+ clear();
+ maxCapacity(0);
+ }
+
+ @Override
+ public boolean isDirect() {
+ return true;
+ }
+
+ @Override
+ public int capacity() {
+ return maxCapacity();
+ }
+
+ @Override
+ public ByteBuf capacity(int newCapacity) {
+ if (newCapacity != maxCapacity()) {
+ throw new IllegalArgumentException("can't set a capacity different from the max allowed one");
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return null;
+ }
+
+ @Override
+ public ByteOrder order() {
+ return ByteOrder.BIG_ENDIAN;
+ }
+
+ @Override
+ public boolean hasArray() {
+ return false;
+ }
+
+ @Override
+ public byte[] array() {
+ throw new UnsupportedOperationException("direct buffer");
+ }
+
+ @Override
+ public int arrayOffset() {
+ throw new UnsupportedOperationException("direct buffer");
+ }
+
+ @Override
+ public boolean hasMemoryAddress() {
+ return true;
+ }
+
+ @Override
+ public long memoryAddress() {
+ return memoryAddress;
+ }
+
+ @Override
+ protected byte _getByte(int index) {
+ return UnsafeByteBufUtil.getByte(addr(index));
+ }
+
+ @Override
+ protected short _getShort(int index) {
+ return UnsafeByteBufUtil.getShort(addr(index));
+ }
+
+ @Override
+ protected short _getShortLE(int index) {
+ return UnsafeByteBufUtil.getShortLE(addr(index));
+ }
+
+ @Override
+ protected int _getUnsignedMedium(int index) {
+ return UnsafeByteBufUtil.getUnsignedMedium(addr(index));
+ }
+
+ @Override
+ protected int _getUnsignedMediumLE(int index) {
+ return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index));
+ }
+
+ @Override
+ protected int _getInt(int index) {
+ return UnsafeByteBufUtil.getInt(addr(index));
+ }
+
+ @Override
+ protected int _getIntLE(int index) {
+ return UnsafeByteBufUtil.getIntLE(addr(index));
+ }
+
+ @Override
+ protected long _getLong(int index) {
+ return UnsafeByteBufUtil.getLong(addr(index));
+ }
+
+ @Override
+ protected long _getLongLE(int index) {
+ return UnsafeByteBufUtil.getLongLE(addr(index));
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+ UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+ UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuffer dst) {
+ UnsafeByteBufUtil.getBytes(this, addr(index), index, dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf readBytes(ByteBuffer dst) {
+ int length = dst.remaining();
+ checkReadableBytes(length);
+ getBytes(readerIndex, dst);
+ readerIndex += length;
+ return this;
+ }
+
+ @Override
+ protected void _setByte(int index, int value) {
+ UnsafeByteBufUtil.setByte(addr(index), value);
+ }
+
+ @Override
+ protected void _setShort(int index, int value) {
+ UnsafeByteBufUtil.setShort(addr(index), value);
+ }
+
+ @Override
+ protected void _setShortLE(int index, int value) {
+ UnsafeByteBufUtil.setShortLE(addr(index), value);
+ }
+
+ @Override
+ protected void _setMedium(int index, int value) {
+ UnsafeByteBufUtil.setMedium(addr(index), value);
+ }
+
+ @Override
+ protected void _setMediumLE(int index, int value) {
+ UnsafeByteBufUtil.setMediumLE(addr(index), value);
+ }
+
+ @Override
+ protected void _setInt(int index, int value) {
+ UnsafeByteBufUtil.setInt(addr(index), value);
+ }
+
+ @Override
+ protected void _setIntLE(int index, int value) {
+ UnsafeByteBufUtil.setIntLE(addr(index), value);
+ }
+
+ @Override
+ protected void _setLong(int index, long value) {
+ UnsafeByteBufUtil.setLong(addr(index), value);
+ }
+
+ @Override
+ protected void _setLongLE(int index, long value) {
+ UnsafeByteBufUtil.setLongLE(addr(index), value);
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+ UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+ UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuffer src) {
+ UnsafeByteBufUtil.setBytes(this, addr(index), index, src);
+ return this;
+ }
+
+ @Override
+ @Deprecated
+ public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+ throw new UnsupportedOperationException("unsupported!");
+ }
+
+ @Override
+ @Deprecated
+ public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+ throw new UnsupportedOperationException("unsupported!");
+ }
+
+ @Override
+ @Deprecated
+ public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+ throw new UnsupportedOperationException("unsupported!");
+ }
+
+ @Override
+ @Deprecated
+ public int readBytes(GatheringByteChannel out, int length) throws IOException {
+ throw new UnsupportedOperationException("unsupported!");
+ }
+
+ @Override
+ @Deprecated
+ public int readBytes(FileChannel out, long position, int length) throws IOException {
+ throw new UnsupportedOperationException("unsupported!");
+ }
+
+ @Override
+ @Deprecated
+ public int setBytes(int index, InputStream in, int length) throws IOException {
+ throw new UnsupportedOperationException("unsupported!");
+ }
+
+ @Override
+ @Deprecated
+ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+ throw new UnsupportedOperationException("unsupported!");
+ }
+
+ @Override
+ @Deprecated
+ public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+ throw new UnsupportedOperationException("unsupported!");
+ }
+
+ @Override
+ public int nioBufferCount() {
+ return 1;
+ }
+
+ @Override
+ @Deprecated
+ public ByteBuffer[] nioBuffers(int index, int length) {
+ throw new UnsupportedOperationException("unsupported!");
+ }
+
+ @Override
+ @Deprecated
+ public ByteBuf copy(int index, int length) {
+
+ throw new UnsupportedOperationException("unsupported!");
+
+ }
+
+ @Override
+ @Deprecated
+ public ByteBuffer internalNioBuffer(int index, int length) {
+ throw new UnsupportedOperationException("cannot access directly the wrapped buffer!");
+ }
+
+ @Override
+ @Deprecated
+ public ByteBuffer nioBuffer(int index, int length) {
+ throw new UnsupportedOperationException("unsupported!");
+ }
+
+ @Override
+ @Deprecated
+ protected void deallocate() {
+ //NO_OP
+ }
+
+ @Override
+ public ByteBuf unwrap() {
+ return null;
+ }
+
+ private long addr(int index) {
+ return memoryAddress + index;
+ }
+
+ @Override
+ @Deprecated
+ protected SwappedByteBuf newSwappedByteBuf() {
+ throw new UnsupportedOperationException("unsupported!");
+ }
+
+ @Override
+ public ByteBuf setZero(int index, int length) {
+ UnsafeByteBufUtil.setZero(this, addr(index), index, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeZero(int length) {
+ ensureWritable(length);
+ int wIndex = writerIndex;
+ setZero(wIndex, length);
+ writerIndex = wIndex + length;
+ return this;
+ }
+}
+
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
index 0aa98669a2..adfc4fef7b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
@@ -20,18 +20,19 @@ import java.io.File;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledUnsafeDirectByteBufWrapper;
import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
final class MappedFile implements AutoCloseable {
- private static final ByteBuffer ZERO_PAGE = ByteBuffer.allocateDirect(MappedByteBufferCache.PAGE_SIZE).order(ByteOrder.nativeOrder());
-
private final MappedByteBufferCache cache;
- private final int zerosMaxPage;
+ private final UnpooledUnsafeDirectByteBufWrapper byteBufWrapper;
+ private final ChannelBufferWrapper channelBufferWrapper;
private MappedByteBuffer lastMapped;
private long lastMappedStart;
private long lastMappedLimit;
@@ -45,7 +46,8 @@ final class MappedFile implements AutoCloseable {
this.lastMappedLimit = -1;
this.position = 0;
this.length = this.cache.fileSize();
- this.zerosMaxPage = Math.min(ZERO_PAGE.capacity(), (int) Math.min(Integer.MAX_VALUE, cache.overlapBytes()));
+ this.byteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper();
+ this.channelBufferWrapper = new ChannelBufferWrapper(this.byteBufWrapper, false);
}
public static MappedFile of(File file, long chunckSize, long overlapSize) throws IOException {
@@ -58,29 +60,33 @@ final class MappedFile implements AutoCloseable {
private int checkOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
if (!MappedByteBufferCache.inside(offset, lastMappedStart, lastMappedLimit)) {
- try {
- final int index = cache.indexFor(offset);
- final long mappedPosition = cache.mappedPositionFor(index);
- final long mappedLimit = cache.mappedLimitFor(mappedPosition);
- if (offset + bytes > mappedLimit) {
- throw new IOException("mapping overflow!");
- }
- lastMapped = cache.acquireMappedByteBuffer(index);
- lastMappedStart = mappedPosition;
- lastMappedLimit = mappedLimit;
- final int bufferPosition = (int) (offset - mappedPosition);
- return bufferPosition;
- } catch (IllegalStateException e) {
- throw new IOException(e);
- } catch (IllegalArgumentException e) {
- throw new BufferUnderflowException();
- }
+ return updateOffset(offset, bytes);
} else {
final int bufferPosition = (int) (offset - lastMappedStart);
return bufferPosition;
}
}
+ private int updateOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
+ try {
+ final int index = cache.indexFor(offset);
+ final long mappedPosition = cache.mappedPositionFor(index);
+ final long mappedLimit = cache.mappedLimitFor(mappedPosition);
+ if (offset + bytes > mappedLimit) {
+ throw new IOException("mapping overflow!");
+ }
+ lastMapped = cache.acquireMappedByteBuffer(index);
+ lastMappedStart = mappedPosition;
+ lastMappedLimit = mappedLimit;
+ final int bufferPosition = (int) (offset - mappedPosition);
+ return bufferPosition;
+ } catch (IllegalStateException e) {
+ throw new IOException(e);
+ } catch (IllegalArgumentException e) {
+ throw new BufferUnderflowException();
+ }
+ }
+
public void force() {
if (lastMapped != null) {
lastMapped.force();
@@ -179,6 +185,26 @@ final class MappedFile implements AutoCloseable {
return read;
}
+ /**
+ * Writes an encoded sequence of bytes to this file from the given buffer.
+ *
+ *
Bytes are written starting at this file's current position,
+ */
+ public void write(EncodingSupport encodingSupport) throws IOException {
+ final int encodedSize = encodingSupport.getEncodeSize();
+ final int bufferPosition = checkOffset(position, encodedSize);
+ this.byteBufWrapper.wrap(this.lastMapped, bufferPosition, encodedSize);
+ try {
+ encodingSupport.encode(this.channelBufferWrapper);
+ } finally {
+ this.byteBufWrapper.reset();
+ }
+ position += encodedSize;
+ if (position > this.length) {
+ this.length = position;
+ }
+ }
+
/**
* Writes a sequence of bytes to this file from the given buffer.
*
@@ -273,21 +299,20 @@ final class MappedFile implements AutoCloseable {
*
Bytes are written starting at this file's current position,
*/
public void zeros(long offset, int count) throws IOException {
- final long targetOffset = offset + count;
- final int zerosBulkCopies = count / zerosMaxPage;
- final long srcAddress = PlatformDependent.directBufferAddress(ZERO_PAGE);
- for (int i = 0; i < zerosBulkCopies; i++) {
- final int bufferPosition = checkOffset(offset, zerosMaxPage);
+ while (count > 0) {
+ //do not need to validate the bytes count
+ final int bufferPosition = checkOffset(offset, 0);
+ final int endZerosPosition = (int)Math.min((long)bufferPosition + count, lastMapped.capacity());
+ final int zeros = endZerosPosition - bufferPosition;
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
- PlatformDependent.copyMemory(srcAddress, destAddress, zerosMaxPage);
- offset += zerosMaxPage;
+ PlatformDependent.setMemory(destAddress, zeros, (byte) 0);
+ offset += zeros;
+ count -= zeros;
+ //TODO need to call force on each write?
+ //this.force();
}
- final int remainingToBeZeroes = (int) (targetOffset - offset);
- final int bufferPosition = checkOffset(offset, remainingToBeZeroes);
- final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
- PlatformDependent.copyMemory(srcAddress, destAddress, remainingToBeZeroes);
- if (targetOffset > this.length) {
- this.length = targetOffset;
+ if (offset > this.length) {
+ this.length = offset;
}
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
index a9591136ae..12e359cfc3 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
@@ -20,16 +20,13 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -44,12 +41,11 @@ final class MappedSequentialFile implements SequentialFile {
private final long chunkBytes;
private final long overlapBytes;
private final IOCriticalErrorListener criticalErrorListener;
+ private final MappedSequentialFileFactory factory;
private File file;
private File absoluteFile;
private String fileName;
private MappedFile mappedFile;
- private ActiveMQBuffer pooledActiveMQBuffer;
- private final MappedSequentialFileFactory factory;
MappedSequentialFile(MappedSequentialFileFactory factory,
final File directory,
@@ -65,19 +61,24 @@ final class MappedSequentialFile implements SequentialFile {
this.chunkBytes = chunkBytes;
this.overlapBytes = overlapBytes;
this.mappedFile = null;
- this.pooledActiveMQBuffer = null;
this.criticalErrorListener = criticalErrorListener;
}
private void checkIsOpen() {
if (!isOpen()) {
- throw new IllegalStateException("must be open!");
+ throw new IllegalStateException("File not opened!");
+ }
+ }
+
+ private void checkIsOpen(IOCallback callback) {
+ if (!isOpen()) {
+ callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
}
}
private void checkIsNotOpen() {
if (isOpen()) {
- throw new IllegalStateException("must be closed!");
+ throw new IllegalStateException("File opened!");
}
}
@@ -101,7 +102,6 @@ final class MappedSequentialFile implements SequentialFile {
@Override
public void open(int maxIO, boolean useExecutor) throws IOException {
//ignore maxIO e useExecutor
- ActiveMQJournalLogger.LOGGER.warn("ignoring maxIO and useExecutor unsupported parameters!");
this.open();
}
@@ -134,7 +134,7 @@ final class MappedSequentialFile implements SequentialFile {
@Override
public void delete() {
- checkIsNotOpen();
+ close();
if (file.exists() && !file.delete()) {
ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
}
@@ -142,10 +142,10 @@ final class MappedSequentialFile implements SequentialFile {
@Override
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws IOException {
- checkIsOpen();
if (callback == null) {
throw new NullPointerException("callback parameter need to be set");
}
+ checkIsOpen(callback);
try {
final ByteBuf byteBuf = bytes.byteBuf();
final int writerIndex = byteBuf.writerIndex();
@@ -182,34 +182,16 @@ final class MappedSequentialFile implements SequentialFile {
}
}
- private ActiveMQBuffer acquiresActiveMQBufferWithAtLeast(int size) {
- if (this.pooledActiveMQBuffer == null || this.pooledActiveMQBuffer.capacity() < size) {
- this.pooledActiveMQBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size).order(ByteOrder.nativeOrder()));
- } else {
- this.pooledActiveMQBuffer.clear();
- }
- return pooledActiveMQBuffer;
- }
-
@Override
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws IOException {
- checkIsOpen();
if (callback == null) {
throw new NullPointerException("callback parameter need to be set");
}
+ checkIsOpen(callback);
try {
- final int encodedSize = bytes.getEncodeSize();
- final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize);
- bytes.encode(outBuffer);
- final ByteBuf byteBuf = outBuffer.byteBuf();
- final int writerIndex = byteBuf.writerIndex();
- final int readerIndex = byteBuf.readerIndex();
- final int readableBytes = writerIndex - readerIndex;
- if (readableBytes > 0) {
- this.mappedFile.write(byteBuf, readerIndex, readableBytes);
- if (factory.isDatasync() && sync) {
- this.mappedFile.force();
- }
+ this.mappedFile.write(bytes);
+ if (factory.isDatasync() && sync) {
+ this.mappedFile.force();
}
callback.done();
} catch (IOException e) {
@@ -224,33 +206,26 @@ final class MappedSequentialFile implements SequentialFile {
@Override
public void write(EncodingSupport bytes, boolean sync) throws IOException {
checkIsOpen();
- final int encodedSize = bytes.getEncodeSize();
- final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize);
- bytes.encode(outBuffer);
- final ByteBuf byteBuf = outBuffer.byteBuf();
- final int writerIndex = byteBuf.writerIndex();
- final int readerIndex = byteBuf.readerIndex();
- final int readableBytes = writerIndex - readerIndex;
- if (readableBytes > 0) {
- this.mappedFile.write(byteBuf, readerIndex, readableBytes);
- if (factory.isDatasync() && sync) {
- this.mappedFile.force();
- }
+ this.mappedFile.write(bytes);
+ if (factory.isDatasync() && sync) {
+ this.mappedFile.force();
}
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
- checkIsOpen();
if (callback == null) {
throw new NullPointerException("callback parameter need to be set");
}
+ checkIsOpen(callback);
try {
final int position = bytes.position();
final int limit = bytes.limit();
final int remaining = limit - position;
if (remaining > 0) {
this.mappedFile.write(bytes, position, remaining);
+ final int newPosition = position + remaining;
+ bytes.position(newPosition);
if (factory.isDatasync() && sync) {
this.mappedFile.force();
}
@@ -273,6 +248,8 @@ final class MappedSequentialFile implements SequentialFile {
final int remaining = limit - position;
if (remaining > 0) {
this.mappedFile.write(bytes, position, remaining);
+ final int newPosition = position + remaining;
+ bytes.position(newPosition);
if (factory.isDatasync() && sync) {
this.mappedFile.force();
}
@@ -281,10 +258,10 @@ final class MappedSequentialFile implements SequentialFile {
@Override
public int read(ByteBuffer bytes, IOCallback callback) throws IOException {
- checkIsOpen();
if (callback == null) {
throw new NullPointerException("callback parameter need to be set");
}
+ checkIsOpen(callback);
try {
final int position = bytes.position();
final int limit = bytes.limit();
@@ -296,8 +273,10 @@ final class MappedSequentialFile implements SequentialFile {
bytes.flip();
callback.done();
return bytesRead;
+ } else {
+ callback.done();
+ return 0;
}
- return 0;
} catch (IOException e) {
if (this.criticalErrorListener != null) {
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
@@ -360,7 +339,14 @@ final class MappedSequentialFile implements SequentialFile {
@Override
public void renameTo(String newFileName) throws Exception {
- checkIsNotOpen();
+ try {
+ close();
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+ }
+ throw e;
+ }
if (this.fileName == null) {
this.fileName = this.file.getName();
}
@@ -388,14 +374,10 @@ final class MappedSequentialFile implements SequentialFile {
if (dstFile.isOpen()) {
throw new IllegalArgumentException("dstFile must be closed too");
}
- try (RandomAccessFile src = new RandomAccessFile(file, "rw");
- FileChannel srcChannel = src.getChannel();
- FileLock srcLock = srcChannel.lock()) {
+ try (RandomAccessFile src = new RandomAccessFile(file, "rw"); FileChannel srcChannel = src.getChannel(); FileLock srcLock = srcChannel.lock()) {
final long readableBytes = srcChannel.size();
if (readableBytes > 0) {
- try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw");
- FileChannel dstChannel = dst.getChannel();
- FileLock dstLock = dstChannel.lock()) {
+ try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw"); FileChannel dstChannel = dst.getChannel(); FileLock dstLock = dstChannel.lock()) {
final long oldLength = dst.length();
final long newLength = oldLength + readableBytes;
dst.setLength(newLength);
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
index 55bb2bf72c..c4b7d30c3d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
@@ -20,7 +20,6 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -29,30 +28,42 @@ import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
public final class MappedSequentialFileFactory implements SequentialFileFactory {
private static long DEFAULT_BLOCK_SIZE = 64L << 20;
private final File directory;
private final IOCriticalErrorListener criticalErrorListener;
+ private final TimedBuffer timedBuffer;
private long chunkBytes;
private long overlapBytes;
private boolean useDataSync;
+ private boolean supportCallbacks;
- public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
+ protected volatile int alignment = -1;
+
+ public MappedSequentialFileFactory(File directory,
+ IOCriticalErrorListener criticalErrorListener,
+ boolean supportCallbacks) {
this.directory = directory;
this.criticalErrorListener = criticalErrorListener;
this.chunkBytes = DEFAULT_BLOCK_SIZE;
this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
+ this.useDataSync = true;
+ this.timedBuffer = null;
+ this.supportCallbacks = supportCallbacks;
+ }
+
+ public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
+ this(directory, criticalErrorListener, false);
}
public MappedSequentialFileFactory(File directory) {
- this.directory = directory;
- this.criticalErrorListener = null;
- this.chunkBytes = DEFAULT_BLOCK_SIZE;
- this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
+ this(directory, null);
}
+
public long chunkBytes() {
return chunkBytes;
}
@@ -73,7 +84,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override
public SequentialFile createSequentialFile(String fileName) {
- return new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
+ final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
+ if (this.timedBuffer == null) {
+ return mappedSequentialFile;
+ } else {
+ return new TimedSequentialFile(this, mappedSequentialFile);
+ }
}
@Override
@@ -89,17 +105,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override
public int getMaxIO() {
- return 0;
+ return 1;
}
@Override
public List listFiles(final String extension) throws Exception {
- final FilenameFilter extensionFilter = new FilenameFilter() {
- @Override
- public boolean accept(final File file, final String name) {
- return name.endsWith("." + extension);
- }
- };
+ final FilenameFilter extensionFilter = (file, name) -> name.endsWith("." + extension);
final String[] fileNames = directory.list(extensionFilter);
if (fileNames == null) {
return Collections.EMPTY_LIST;
@@ -109,7 +120,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override
public boolean isSupportsCallbacks() {
- return false;
+ return this.supportCallbacks;
}
@Override
@@ -121,7 +132,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
- return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
+ return ByteBuffer.allocateDirect(size);
}
@Override
@@ -131,7 +142,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override
public ByteBuffer newBuffer(final int size) {
- return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
+ return ByteBuffer.allocate(size);
}
@Override
@@ -143,17 +154,23 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override
public void activateBuffer(SequentialFile file) {
-
+ if (timedBuffer != null) {
+ file.setTimedBuffer(timedBuffer);
+ }
}
@Override
public void deactivateBuffer() {
-
+ if (timedBuffer != null) {
+ // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer
+ timedBuffer.flush();
+ timedBuffer.setObserver(null);
+ }
}
@Override
public ByteBuffer wrapBuffer(final byte[] bytes) {
- return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+ return ByteBuffer.wrap(bytes);
}
@Override
@@ -162,8 +179,8 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
}
@Override
- public SequentialFileFactory setAlignment(int alignment) {
- // no op
+ public MappedSequentialFileFactory setAlignment(int alignment) {
+ this.alignment = alignment;
return this;
}
@@ -179,7 +196,6 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override
public void clearBuffer(final ByteBuffer buffer) {
- buffer.clear();
if (buffer.isDirect()) {
BytesUtils.zerosDirect(buffer);
} else if (buffer.hasArray()) {
@@ -193,16 +209,21 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
buffer.put(i, (byte) 0);
}
}
+ buffer.rewind();
}
@Override
public void start() {
-
+ if (timedBuffer != null) {
+ timedBuffer.start();
+ }
}
@Override
public void stop() {
-
+ if (timedBuffer != null) {
+ timedBuffer.stop();
+ }
}
@Override
@@ -215,6 +236,8 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override
public void flush() {
-
+ if (timedBuffer != null) {
+ timedBuffer.flush();
+ }
}
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
new file mode 100644
index 0000000000..d376d7df65
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io.mapped;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.core.io.DummyCallback;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
+import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+
+final class TimedSequentialFile implements SequentialFile {
+
+ private final SequentialFileFactory factory;
+ private final SequentialFile sequentialFile;
+ private final LocalBufferObserver observer;
+ private final ThreadLocal callbackPool;
+ private TimedBuffer timedBuffer;
+
+ TimedSequentialFile(SequentialFileFactory factory, SequentialFile sequentialFile) {
+ this.sequentialFile = sequentialFile;
+ this.factory = factory;
+ this.observer = new LocalBufferObserver();
+ this.callbackPool = ThreadLocal.withInitial(ResettableIOCallback::new);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return this.sequentialFile.isOpen();
+ }
+
+ @Override
+ public boolean exists() {
+ return this.sequentialFile.exists();
+ }
+
+ @Override
+ public void open() throws Exception {
+ this.sequentialFile.open();
+ }
+
+ @Override
+ public void open(int maxIO, boolean useExecutor) throws Exception {
+ this.sequentialFile.open(maxIO, useExecutor);
+ }
+
+ @Override
+ public boolean fits(int size) {
+ if (timedBuffer == null) {
+ return this.sequentialFile.fits(size);
+ } else {
+ return timedBuffer.checkSize(size);
+ }
+ }
+
+ @Override
+ public int calculateBlockStart(int position) throws Exception {
+ return this.sequentialFile.calculateBlockStart(position);
+ }
+
+ @Override
+ public String getFileName() {
+ return this.sequentialFile.getFileName();
+ }
+
+ @Override
+ public void fill(int size) throws Exception {
+ this.sequentialFile.fill(size);
+ }
+
+ @Override
+ public void delete() throws IOException, InterruptedException, ActiveMQException {
+ this.sequentialFile.delete();
+ }
+
+ @Override
+ public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
+ if (this.timedBuffer != null) {
+ this.timedBuffer.addBytes(bytes, sync, callback);
+ } else {
+ this.sequentialFile.write(bytes, sync, callback);
+ }
+ }
+
+ @Override
+ public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
+ if (sync) {
+ if (this.timedBuffer != null) {
+ final ResettableIOCallback callback = callbackPool.get();
+ try {
+ this.timedBuffer.addBytes(bytes, true, callback);
+ callback.waitCompletion();
+ } finally {
+ callback.reset();
+ }
+ } else {
+ this.sequentialFile.write(bytes, true);
+ }
+ } else {
+ if (this.timedBuffer != null) {
+ this.timedBuffer.addBytes(bytes, false, DummyCallback.getInstance());
+ } else {
+ this.sequentialFile.write(bytes, false);
+ }
+ }
+ }
+
+ @Override
+ public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception {
+ if (this.timedBuffer != null) {
+ this.timedBuffer.addBytes(bytes, sync, callback);
+ } else {
+ this.sequentialFile.write(bytes, sync, callback);
+ }
+ }
+
+ @Override
+ public void write(EncodingSupport bytes, boolean sync) throws Exception {
+ if (sync) {
+ if (this.timedBuffer != null) {
+ final ResettableIOCallback callback = callbackPool.get();
+ try {
+ this.timedBuffer.addBytes(bytes, true, callback);
+ callback.waitCompletion();
+ } finally {
+ callback.reset();
+ }
+ } else {
+ this.sequentialFile.write(bytes, true);
+ }
+ } else {
+ if (this.timedBuffer != null) {
+ this.timedBuffer.addBytes(bytes, false, DummyCallback.getInstance());
+ } else {
+ this.sequentialFile.write(bytes, false);
+ }
+ }
+ }
+
+ @Override
+ public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
+ this.sequentialFile.writeDirect(bytes, sync, callback);
+ }
+
+ @Override
+ public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
+ this.sequentialFile.writeDirect(bytes, sync);
+ }
+
+ @Override
+ public int read(ByteBuffer bytes, IOCallback callback) throws Exception {
+ return this.sequentialFile.read(bytes, callback);
+ }
+
+ @Override
+ public int read(ByteBuffer bytes) throws Exception {
+ return this.sequentialFile.read(bytes);
+ }
+
+ @Override
+ public void position(long pos) throws IOException {
+ this.sequentialFile.position(pos);
+ }
+
+ @Override
+ public long position() {
+ return this.sequentialFile.position();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.sequentialFile.close();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ this.sequentialFile.sync();
+ }
+
+ @Override
+ public long size() throws Exception {
+ return this.sequentialFile.size();
+ }
+
+ @Override
+ public void renameTo(String newFileName) throws Exception {
+ this.sequentialFile.renameTo(newFileName);
+ }
+
+ @Override
+ public SequentialFile cloneFile() {
+ return new TimedSequentialFile(factory, this.sequentialFile.cloneFile());
+ }
+
+ @Override
+ public void copyTo(SequentialFile newFileName) throws Exception {
+ this.sequentialFile.copyTo(newFileName);
+ }
+
+ @Override
+ public void setTimedBuffer(TimedBuffer buffer) {
+ if (this.timedBuffer != null) {
+ this.timedBuffer.setObserver(null);
+ }
+ this.timedBuffer = buffer;
+ if (buffer != null) {
+ buffer.setObserver(this.observer);
+ }
+ }
+
+ @Override
+ public File getJavaFile() {
+ return this.sequentialFile.getJavaFile();
+ }
+
+ private static final class ResettableIOCallback implements IOCallback {
+
+ private final CyclicBarrier cyclicBarrier;
+ private int errorCode;
+ private String errorMessage;
+
+ ResettableIOCallback() {
+ this.cyclicBarrier = new CyclicBarrier(2);
+ }
+
+ public void waitCompletion() throws InterruptedException, ActiveMQException, BrokenBarrierException {
+ this.cyclicBarrier.await();
+ if (this.errorMessage != null) {
+ throw ActiveMQExceptionType.createException(this.errorCode, this.errorMessage);
+ }
+ }
+
+ public void reset() {
+ this.errorCode = 0;
+ this.errorMessage = null;
+ }
+
+ @Override
+ public void done() {
+ try {
+ this.cyclicBarrier.await();
+ } catch (BrokenBarrierException | InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ try {
+ this.errorCode = errorCode;
+ this.errorMessage = errorMessage;
+ this.cyclicBarrier.await();
+ } catch (BrokenBarrierException | InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ private static final class DelegateCallback implements IOCallback {
+
+ final List delegates;
+
+ private DelegateCallback() {
+ this.delegates = new ArrayList<>();
+ }
+
+ public List delegates() {
+ return this.delegates;
+ }
+
+ @Override
+ public void done() {
+ final int size = delegates.size();
+ for (int i = 0; i < size; i++) {
+ try {
+ final IOCallback callback = delegates.get(i);
+ callback.done();
+ } catch (Throwable e) {
+ ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
+ }
+ }
+ }
+
+ @Override
+ public void onError(final int errorCode, final String errorMessage) {
+ for (IOCallback callback : delegates) {
+ try {
+ callback.onError(errorCode, errorMessage);
+ } catch (Throwable e) {
+ ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
+ }
+ }
+ }
+ }
+
+ private final class LocalBufferObserver implements TimedBufferObserver {
+
+ private final ThreadLocal callbacksPool = ThreadLocal.withInitial(DelegateCallback::new);
+
+ @Override
+ public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List callbacks) {
+ buffer.flip();
+ if (buffer.limit() == 0) {
+ //if there are no bytes to flush, can release the callbacks
+ final int size = callbacks.size();
+ for (int i = 0; i < size; i++) {
+ callbacks.get(i).done();
+ }
+ } else {
+ final DelegateCallback delegateCallback = callbacksPool.get();
+ final int size = callbacks.size();
+ final List delegates = delegateCallback.delegates();
+ for (int i = 0; i < size; i++) {
+ delegates.add(callbacks.get(i));
+ }
+ try {
+ sequentialFile.writeDirect(buffer, requestedSync, delegateCallback);
+ } finally {
+ delegates.clear();
+ }
+ }
+ }
+
+ @Override
+ public ByteBuffer newBuffer(final int size, final int limit) {
+ final int alignedSize = factory.calculateBlockSize(size);
+ final int alignedLimit = factory.calculateBlockSize(limit);
+ final ByteBuffer buffer = factory.newBuffer(alignedSize);
+ buffer.limit(alignedLimit);
+ return buffer;
+ }
+
+ @Override
+ public int getRemainingBytes() {
+ try {
+ final int remaining = (int) Math.min(sequentialFile.size() - sequentialFile.position(), Integer.MAX_VALUE);
+ return remaining;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "TimedBufferObserver on file (" + getFileName() + ")";
+ }
+
+ }
+}
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
new file mode 100644
index 0000000000..b426219551
--- /dev/null
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Stream;
+
+import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue;
+import org.apache.activemq.artemis.ArtemisConstants;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+
+/**
+ * To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+ */
+public class JournalTptBenchmark {
+
+ public static void main(String[] args) throws Exception {
+ final boolean useDefaultIoExecutor = true;
+ final int fileSize = 1024 * 1024;
+ final boolean dataSync = true;
+ final Type type = Type.Mapped;
+ final int tests = 5;
+ final int warmup = 20_000;
+ final int measurements = 20_000;
+ final int msgSize = 100;
+ final byte[] msgContent = new byte[msgSize];
+ Arrays.fill(msgContent, (byte) 1);
+ final int totalMessages = (measurements * tests + warmup);
+ final File tmpDirectory = new File("./");
+ //using the default configuration when the broker starts!
+ final SequentialFileFactory factory;
+ switch (type) {
+
+ case Mapped:
+ final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
+ factory = mappedFactory.chunkBytes(fileSize).overlapBytes(0).setDatasync(dataSync);
+ break;
+ case Nio:
+ factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
+ break;
+ case Aio:
+ factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync);
+ //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
+ if (!LibaioContext.isLoaded()) {
+ throw new IllegalStateException("lib AIO not loaded!");
+ }
+ break;
+ default:
+ throw new AssertionError("unsupported case");
+ }
+
+ int numFiles = (int) (totalMessages * factory.calculateBlockSize(msgSize)) / fileSize;
+ if (numFiles < 2) {
+ numFiles = 2;
+ }
+ ExecutorService service = null;
+ final Journal journal;
+ if (useDefaultIoExecutor) {
+ journal = new JournalImpl(fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO());
+ journal.start();
+ } else {
+ final ArrayList> tasks = new ArrayList<>();
+ service = Executors.newSingleThreadExecutor();
+ journal = new JournalImpl(() -> new Executor() {
+
+ private final MpscArrayQueue taskQueue = new MpscArrayQueue<>(1024);
+
+ {
+ tasks.add(taskQueue);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ while (!taskQueue.offer(command)) {
+ LockSupport.parkNanos(1L);
+ }
+ }
+ }, fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO(), 0);
+ journal.start();
+ service.execute(() -> {
+ final int size = tasks.size();
+ final int capacity = 1024;
+ while (!Thread.currentThread().isInterrupted()) {
+ for (int i = 0; i < size; i++) {
+ final MpscArrayQueue runnables = tasks.get(i);
+ for (int j = 0; j < capacity; j++) {
+ final Runnable task = runnables.poll();
+ if (task == null) {
+ break;
+ }
+ try {
+ task.run();
+ } catch (Throwable t) {
+ System.err.println(t);
+ }
+ }
+ }
+ }
+
+ });
+ }
+ try {
+ journal.load(new ArrayList(), null, null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ final EncodingSupport encodingSupport = new EncodingSupport() {
+ @Override
+ public int getEncodeSize() {
+ return msgSize;
+ }
+
+ @Override
+ public void encode(ActiveMQBuffer buffer) {
+ final int writerIndex = buffer.writerIndex();
+ buffer.setBytes(writerIndex, msgContent);
+ buffer.writerIndex(writerIndex + msgSize);
+ }
+
+ @Override
+ public void decode(ActiveMQBuffer buffer) {
+
+ }
+ };
+ long id = 1;
+ {
+ final long elapsed = writeMeasurements(id, journal, encodingSupport, warmup);
+ id += warmup;
+ System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
+ }
+ for (int t = 0; t < tests; t++) {
+ final long elapsed = writeMeasurements(id, journal, encodingSupport, measurements);
+ System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
+ id += warmup;
+ }
+
+ } finally {
+ journal.stop();
+ if (service != null) {
+ service.shutdown();
+ }
+ final File[] fileToDeletes = tmpDirectory.listFiles();
+ System.out.println("Files to deletes" + Arrays.toString(fileToDeletes));
+ Stream.of(fileToDeletes).forEach(File::delete);
+ }
+ }
+
+ private static long writeMeasurements(long id,
+ Journal journal,
+ EncodingSupport encodingSupport,
+ int measurements) throws Exception {
+ System.gc();
+ TimeUnit.SECONDS.sleep(2);
+
+ final long start = System.nanoTime();
+ for (int i = 0; i < measurements; i++) {
+ write(id, journal, encodingSupport);
+ id++;
+ }
+ final long elapsed = System.nanoTime() - start;
+ return elapsed;
+ }
+
+ private static void write(long id, Journal journal, EncodingSupport encodingSupport) throws Exception {
+ journal.appendAddRecord(id, (byte) 1, encodingSupport, false);
+ final SimpleWaitIOCallback ioCallback = new SimpleWaitIOCallback();
+ journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true, ioCallback);
+ ioCallback.waitCompletion();
+ }
+
+ private enum Type {
+
+ Mapped, Nio, Aio
+
+ }
+}
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
new file mode 100644
index 0000000000..7756a064cd
--- /dev/null
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+
+import org.apache.activemq.artemis.ArtemisConstants;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+
+/**
+ * To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+ */
+public class SequentialFileTptBenchmark {
+
+ private static final FastWaitIOCallback CALLBACK = new FastWaitIOCallback();
+
+ public static void main(String[] args) throws Exception {
+ final boolean dataSync = true;
+ final boolean writeSync = true;
+ final Type type = Type.Mapped;
+ final int tests = 10;
+ final int warmup = 20_000;
+ final int measurements = 20_000;
+ final int msgSize = 100;
+ final byte[] msgContent = new byte[msgSize];
+ Arrays.fill(msgContent, (byte) 1);
+ final File tmpDirectory = new File("./");
+ //using the default configuration when the broker starts!
+ final SequentialFileFactory factory;
+ switch (type) {
+
+ case Mapped:
+ final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
+ final int alignedMessageSize = mappedFactory.calculateBlockSize(msgSize);
+ final int totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
+ factory = mappedFactory.chunkBytes(totalFileSize).overlapBytes(0).setDatasync(dataSync);
+ break;
+ case Nio:
+ factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
+ break;
+ case Aio:
+ factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync);
+ //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
+ if (!LibaioContext.isLoaded()) {
+ throw new IllegalStateException("lib AIO not loaded!");
+ }
+ break;
+ default:
+ throw new AssertionError("unsupported case");
+ }
+ factory.start();
+ try {
+ final EncodingSupport encodingSupport = new EncodingSupport() {
+ @Override
+ public int getEncodeSize() {
+ return msgSize;
+ }
+
+ @Override
+ public void encode(ActiveMQBuffer buffer) {
+ final int writerIndex = buffer.writerIndex();
+ buffer.setBytes(writerIndex, msgContent);
+ buffer.writerIndex(writerIndex + msgSize);
+ }
+
+ @Override
+ public void decode(ActiveMQBuffer buffer) {
+
+ }
+ };
+ final int alignedMessageSize = factory.calculateBlockSize(msgSize);
+ final long totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
+ if (totalFileSize > Integer.MAX_VALUE)
+ throw new IllegalArgumentException("reduce measurements/warmup");
+ final int fileSize = (int) totalFileSize;
+ final SequentialFile sequentialFile = factory.createSequentialFile("seq.dat");
+ sequentialFile.getJavaFile().delete();
+ sequentialFile.getJavaFile().deleteOnExit();
+ sequentialFile.open();
+ final long startZeros = System.nanoTime();
+ sequentialFile.fill(fileSize);
+ final long elapsedZeros = System.nanoTime() - startZeros;
+ System.out.println("Zeroed " + fileSize + " bytes in " + TimeUnit.NANOSECONDS.toMicros(elapsedZeros) + " us");
+ try {
+ {
+ final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, warmup, writeSync);
+ System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
+ }
+ for (int t = 0; t < tests; t++) {
+ final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, measurements, writeSync);
+ System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
+ }
+ } finally {
+ sequentialFile.close();
+ }
+ } finally {
+ factory.stop();
+ }
+ }
+
+ private static long writeMeasurements(SequentialFileFactory sequentialFileFactory,
+ SequentialFile sequentialFile,
+ EncodingSupport encodingSupport,
+ int measurements,
+ boolean writeSync) throws Exception {
+ //System.gc();
+ TimeUnit.SECONDS.sleep(2);
+ sequentialFileFactory.activateBuffer(sequentialFile);
+ sequentialFile.position(0);
+ final long start = System.nanoTime();
+ for (int i = 0; i < measurements; i++) {
+ write(sequentialFile, encodingSupport, writeSync);
+ }
+ sequentialFileFactory.deactivateBuffer();
+ final long elapsed = System.nanoTime() - start;
+ return elapsed;
+ }
+
+ private static void write(SequentialFile sequentialFile,
+ EncodingSupport encodingSupport,
+ boolean sync) throws Exception {
+ //this pattern is necessary to ensure that NIO's TimedBuffer fill flush the buffer and know the real size of it
+ if (sequentialFile.fits(encodingSupport.getEncodeSize())) {
+ final FastWaitIOCallback ioCallback = CALLBACK.reset();
+ sequentialFile.write(encodingSupport, sync, ioCallback);
+ ioCallback.waitCompletion();
+ } else {
+ throw new IllegalStateException("can't happen!");
+ }
+ }
+
+ private enum Type {
+
+ Mapped, Nio, Aio
+
+ }
+
+ private static final class FastWaitIOCallback implements IOCallback {
+
+ private final AtomicBoolean done = new AtomicBoolean(false);
+ private int errorCode = 0;
+ private String errorMessage = null;
+
+ public FastWaitIOCallback reset() {
+ errorCode = 0;
+ errorMessage = null;
+ done.lazySet(false);
+ return this;
+ }
+
+ @Override
+ public void done() {
+ errorCode = 0;
+ errorMessage = null;
+ done.lazySet(true);
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ this.errorCode = errorCode;
+ this.errorMessage = errorMessage;
+ done.lazySet(true);
+ }
+
+ public void waitCompletion() throws InterruptedException, ActiveMQException {
+ final Thread currentThread = Thread.currentThread();
+ while (!done.get()) {
+ LockSupport.parkNanos(1L);
+ if (currentThread.isInterrupted())
+ throw new InterruptedException();
+ }
+ if (errorMessage != null) {
+ throw ActiveMQExceptionType.createException(errorCode, errorMessage);
+ }
+ }
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 7c0a6510db..51fd6cc1f5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
@@ -136,6 +137,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
ActiveMQServerLogger.LOGGER.journalUseAIO();
journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener);
break;
+ case MAPPED:
+ ActiveMQServerLogger.LOGGER.journalUseMAPPED();
+ //the mapped version do not need buffering by default
+ journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), criticalErrorListener, true).chunkBytes(config.getJournalFileSize()).overlapBytes(0);
+ break;
default:
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index c365b7d326..7d5822c28a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1558,8 +1558,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT)
void invalidMessageCounterPeriod(long value);
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 224073, value = "Using MAPPED Journal", format = Message.Format.MESSAGE_FORMAT)
+ void journalUseMAPPED();
+
@LogMessage(level = Logger.Level.ERROR)
- @Message(id = 224073, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 224074, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT)
void failedToPurgeQueue(@Cause Exception e, SimpleString bindingName);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java
index 2716a384e3..df60e9beef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java
@@ -40,6 +40,7 @@ public enum JournalType {
switch (type) {
case "NIO": return NIO;
case "ASYNCIO" : return ASYNCIO;
+ case "MAPPED" : return MAPPED;
default: throw new IllegalStateException("Invalid JournalType:" + type + " valid Types: " + validValues);
}
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 66739fed70..bc9363f252 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -584,6 +584,7 @@
+
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index e538ff0596..2676e19c3b 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -576,6 +576,7 @@
+
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java
index a220ab659b..b0d19b3a46 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java
@@ -59,7 +59,8 @@ public class AIOJournalImplTest extends JournalImplTestUnit {
file.mkdir();
- return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 1000000, 10, false);
+ // forcing the alignment to be 512, as this test was hard coded around this size.
+ return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 1000000, 10, false).setAlignment(512);
}
@Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java
new file mode 100644
index 0000000000..5d540601c6
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+
+public class MappedImportExportTest extends NIOImportExportTest {
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception {
+ return new MappedSequentialFileFactory(getTestDirfile());
+ }
+}
+
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java
new file mode 100644
index 0000000000..32b4b8f96c
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import java.io.File;
+
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+
+public class MappedJournalCompactTest extends NIOJournalCompactTest {
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception {
+ File file = new File(getTestDir());
+
+ ActiveMQTestBase.deleteDirectory(file);
+
+ file.mkdir();
+
+ return new MappedSequentialFileFactory(getTestDirfile());
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java
new file mode 100644
index 0000000000..940c8a69d8
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import java.io.File;
+
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
+
+public class MappedJournalImplTest extends JournalImplTestUnit {
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception {
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new MappedSequentialFileFactory(getTestDirfile());
+ }
+
+ @Override
+ protected int getAlignment() {
+ return fileFactory.getAlignment();
+ }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java
new file mode 100644
index 0000000000..cf87cdede9
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBase {
+
+ @Override
+ protected SequentialFileFactory createFactory(String folder) {
+ return new MappedSequentialFileFactory(new File(folder));
+ }
+
+ @Test
+ public void testInterrupts() throws Throwable {
+
+ final EncodingSupport fakeEncoding = new EncodingSupport() {
+ @Override
+ public int getEncodeSize() {
+ return 10;
+ }
+
+ @Override
+ public void encode(ActiveMQBuffer buffer) {
+ buffer.writeBytes(new byte[10]);
+ }
+
+ @Override
+ public void decode(ActiveMQBuffer buffer) {
+
+ }
+ };
+
+ final AtomicInteger calls = new AtomicInteger(0);
+ final MappedSequentialFileFactory factory = new MappedSequentialFileFactory(new File(getTestDir()), (code, message, file) -> {
+ new Exception("shutdown").printStackTrace();
+ calls.incrementAndGet();
+ });
+
+ Thread threadOpen = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.currentThread().interrupt();
+ SequentialFile file = factory.createSequentialFile("file.txt");
+ file.open();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ threadOpen.start();
+ threadOpen.join();
+
+ Thread threadClose = new Thread() {
+ @Override
+ public void run() {
+ try {
+ SequentialFile file = factory.createSequentialFile("file.txt");
+ file.open();
+ file.write(fakeEncoding, true);
+ Thread.currentThread().interrupt();
+ file.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ threadClose.start();
+ threadClose.join();
+
+ Thread threadWrite = new Thread() {
+ @Override
+ public void run() {
+ try {
+ SequentialFile file = factory.createSequentialFile("file.txt");
+ file.open();
+ Thread.currentThread().interrupt();
+ file.write(fakeEncoding, true);
+ file.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ threadWrite.start();
+ threadWrite.join();
+
+ Thread threadFill = new Thread() {
+ @Override
+ public void run() {
+ try {
+ SequentialFile file = factory.createSequentialFile("file.txt");
+ file.open();
+ Thread.currentThread().interrupt();
+ file.fill(1024);
+ file.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ threadFill.start();
+ threadFill.join();
+
+ Thread threadWriteDirect = new Thread() {
+ @Override
+ public void run() {
+ try {
+ SequentialFile file = factory.createSequentialFile("file.txt");
+ file.open();
+ ByteBuffer buffer = ByteBuffer.allocate(10);
+ buffer.put(new byte[10]);
+ Thread.currentThread().interrupt();
+ file.writeDirect(buffer, true);
+ file.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ threadWriteDirect.start();
+ threadWriteDirect.join();
+
+ Thread threadRead = new Thread() {
+ @Override
+ public void run() {
+ try {
+ SequentialFile file = factory.createSequentialFile("file.txt");
+ file.open();
+ file.write(fakeEncoding, true);
+ file.position(0);
+ ByteBuffer readBytes = ByteBuffer.allocate(fakeEncoding.getEncodeSize());
+ Thread.currentThread().interrupt();
+ file.read(readBytes);
+ file.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ threadRead.start();
+ threadRead.join();
+
+ // An interrupt exception shouldn't issue a shutdown
+ Assert.assertEquals(0, calls.get());
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
index 8f15c4854a..d2ffd6fac3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java
@@ -22,8 +22,11 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.ArtemisConstants;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@@ -102,6 +105,27 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
internalTest("nio2", getTestDir(), 10000, 0, true, true, 1);
}
+ @Test
+ public void testMMap() throws Exception {
+ internalTest("mmap", getTestDir(), 10000, 100, true, true, 1);
+ }
+
+ @Test
+ public void testMMAPHugeTransaction() throws Exception {
+ internalTest("mmap", getTestDir(), 10000, 10000, true, true, 1);
+ }
+
+ @Test
+ public void testMMAPOMultiThread() throws Exception {
+ internalTest("mmap", getTestDir(), 1000, 100, true, true, 10);
+ }
+
+ @Test
+ public void testMMAPNonTransactional() throws Exception {
+ internalTest("mmap", getTestDir(), 10000, 0, true, true, 1);
+ }
+
+
// Package protected ---------------------------------------------
private void internalTest(final String type,
@@ -234,7 +258,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
if (args.length != 5) {
System.err.println("Use: java -cp " + ValidateTransactionHealthTest.class.getCanonicalName() +
- " aio|nio ");
+ " aio|nio|mmap ");
System.exit(-1);
}
System.out.println("Running");
@@ -320,15 +344,22 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
}
public static JournalImpl createJournal(final String journalType, final String journalDir) {
- JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir), "journaltst", "tst", 500);
+ JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir, 10485760), "journaltst", "tst", 500);
return journal;
}
- public static SequentialFileFactory getFactory(final String factoryType, final String directory) {
+ public static SequentialFileFactory getFactory(final String factoryType, final String directory, int fileSize) {
if (factoryType.equals("aio")) {
return new AIOSequentialFileFactory(new File(directory), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 10, false);
} else if (factoryType.equals("nio2")) {
return new NIOSequentialFileFactory(new File(directory), true, 1);
+ } else if (factoryType.equals("mmap")) {
+ return new MappedSequentialFileFactory(new File(directory), new IOCriticalErrorListener() {
+ @Override
+ public void onIOException(Throwable code, String message, SequentialFile file) {
+ code.printStackTrace();
+ }
+ }, true).chunkBytes(fileSize).overlapBytes(0);
} else {
return new NIOSequentialFileFactory(new File(directory), false, 1);
}