ARTEMIS-906 Memory Mapped JournalType
This commit is contained in:
parent
ef8cb60df7
commit
aacddfda61
|
@ -216,6 +216,9 @@ public class Create extends InputAbstract {
|
|||
@Option(name = "--nio", description = "sets the journal as nio.")
|
||||
boolean nio;
|
||||
|
||||
@Option(name = "--mapped", description = "Sets the journal as mapped.")
|
||||
boolean mapped;
|
||||
|
||||
// this is used by the setupJournalType method
|
||||
private JournalType journalType;
|
||||
|
||||
|
@ -797,7 +800,7 @@ public class Create extends InputAbstract {
|
|||
}
|
||||
|
||||
private void setupJournalType() {
|
||||
int countJournalTypes = countBoolean(aio, nio);
|
||||
int countJournalTypes = countBoolean(aio, nio, mapped);
|
||||
if (countJournalTypes > 1) {
|
||||
throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped).");
|
||||
}
|
||||
|
@ -814,6 +817,8 @@ public class Create extends InputAbstract {
|
|||
journalType = JournalType.ASYNCIO;
|
||||
} else if (nio) {
|
||||
journalType = JournalType.NIO;
|
||||
} else if (mapped) {
|
||||
journalType = JournalType.MAPPED;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,9 +24,11 @@ import java.text.DecimalFormat;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
|
@ -186,6 +188,16 @@ public class SyncCalculation {
|
|||
factory.start();
|
||||
((AIOSequentialFileFactory) factory).disableBufferReuse();
|
||||
return factory;
|
||||
case MAPPED:
|
||||
factory = new MappedSequentialFileFactory(datafolder, new IOCriticalErrorListener() {
|
||||
@Override
|
||||
public void onIOException(Throwable code, String message, SequentialFile file) {
|
||||
|
||||
}
|
||||
}, true).chunkBytes(fileSize).overlapBytes(0).setDatasync(datasync);
|
||||
|
||||
factory.start();
|
||||
return factory;
|
||||
default:
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(journalType);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,371 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.netty.buffer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.GatheringByteChannel;
|
||||
import java.nio.channels.ScatteringByteChannel;
|
||||
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
/**
|
||||
* A NIO direct {@link ByteBuffer} wrapper.
|
||||
* Only ByteBuffer's manipulation operations are supported.
|
||||
* Is best suited only for encoding/decoding purposes.
|
||||
*/
|
||||
public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf {
|
||||
|
||||
private ByteBuffer buffer;
|
||||
private long memoryAddress;
|
||||
|
||||
/**
|
||||
* Creates a new direct buffer by wrapping the specified initial buffer.
|
||||
*/
|
||||
public UnpooledUnsafeDirectByteBufWrapper() {
|
||||
super(0);
|
||||
this.buffer = null;
|
||||
this.memoryAddress = 0L;
|
||||
}
|
||||
|
||||
public void wrap(ByteBuffer buffer, int srcIndex, int length) {
|
||||
if (buffer != null) {
|
||||
this.buffer = buffer;
|
||||
this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex;
|
||||
clear();
|
||||
maxCapacity(length);
|
||||
} else {
|
||||
reset();
|
||||
}
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
this.buffer = null;
|
||||
this.memoryAddress = 0L;
|
||||
clear();
|
||||
maxCapacity(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDirect() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int capacity() {
|
||||
return maxCapacity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf capacity(int newCapacity) {
|
||||
if (newCapacity != maxCapacity()) {
|
||||
throw new IllegalArgumentException("can't set a capacity different from the max allowed one");
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBufAllocator alloc() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteOrder order() {
|
||||
return ByteOrder.BIG_ENDIAN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasArray() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] array() {
|
||||
throw new UnsupportedOperationException("direct buffer");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int arrayOffset() {
|
||||
throw new UnsupportedOperationException("direct buffer");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasMemoryAddress() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long memoryAddress() {
|
||||
return memoryAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte _getByte(int index) {
|
||||
return UnsafeByteBufUtil.getByte(addr(index));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected short _getShort(int index) {
|
||||
return UnsafeByteBufUtil.getShort(addr(index));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected short _getShortLE(int index) {
|
||||
return UnsafeByteBufUtil.getShortLE(addr(index));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int _getUnsignedMedium(int index) {
|
||||
return UnsafeByteBufUtil.getUnsignedMedium(addr(index));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int _getUnsignedMediumLE(int index) {
|
||||
return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int _getInt(int index) {
|
||||
return UnsafeByteBufUtil.getInt(addr(index));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int _getIntLE(int index) {
|
||||
return UnsafeByteBufUtil.getIntLE(addr(index));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long _getLong(int index) {
|
||||
return UnsafeByteBufUtil.getLong(addr(index));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long _getLongLE(int index) {
|
||||
return UnsafeByteBufUtil.getLongLE(addr(index));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
|
||||
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
|
||||
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf getBytes(int index, ByteBuffer dst) {
|
||||
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf readBytes(ByteBuffer dst) {
|
||||
int length = dst.remaining();
|
||||
checkReadableBytes(length);
|
||||
getBytes(readerIndex, dst);
|
||||
readerIndex += length;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void _setByte(int index, int value) {
|
||||
UnsafeByteBufUtil.setByte(addr(index), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void _setShort(int index, int value) {
|
||||
UnsafeByteBufUtil.setShort(addr(index), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void _setShortLE(int index, int value) {
|
||||
UnsafeByteBufUtil.setShortLE(addr(index), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void _setMedium(int index, int value) {
|
||||
UnsafeByteBufUtil.setMedium(addr(index), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void _setMediumLE(int index, int value) {
|
||||
UnsafeByteBufUtil.setMediumLE(addr(index), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void _setInt(int index, int value) {
|
||||
UnsafeByteBufUtil.setInt(addr(index), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void _setIntLE(int index, int value) {
|
||||
UnsafeByteBufUtil.setIntLE(addr(index), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void _setLong(int index, long value) {
|
||||
UnsafeByteBufUtil.setLong(addr(index), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void _setLongLE(int index, long value) {
|
||||
UnsafeByteBufUtil.setLongLE(addr(index), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
|
||||
UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
|
||||
UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf setBytes(int index, ByteBuffer src) {
|
||||
UnsafeByteBufUtil.setBytes(this, addr(index), index, src);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public int readBytes(GatheringByteChannel out, int length) throws IOException {
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public int readBytes(FileChannel out, long position, int length) throws IOException {
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public int setBytes(int index, InputStream in, int length) throws IOException {
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int nioBufferCount() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public ByteBuffer[] nioBuffers(int index, int length) {
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public ByteBuf copy(int index, int length) {
|
||||
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public ByteBuffer internalNioBuffer(int index, int length) {
|
||||
throw new UnsupportedOperationException("cannot access directly the wrapped buffer!");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public ByteBuffer nioBuffer(int index, int length) {
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
protected void deallocate() {
|
||||
//NO_OP
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf unwrap() {
|
||||
return null;
|
||||
}
|
||||
|
||||
private long addr(int index) {
|
||||
return memoryAddress + index;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
protected SwappedByteBuf newSwappedByteBuf() {
|
||||
throw new UnsupportedOperationException("unsupported!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf setZero(int index, int length) {
|
||||
UnsafeByteBufUtil.setZero(this, addr(index), index, length);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf writeZero(int length) {
|
||||
ensureWritable(length);
|
||||
int wIndex = writerIndex;
|
||||
setZero(wIndex, length);
|
||||
writerIndex = wIndex + length;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
@ -20,18 +20,19 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.nio.BufferUnderflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.MappedByteBuffer;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.UnpooledUnsafeDirectByteBufWrapper;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
|
||||
final class MappedFile implements AutoCloseable {
|
||||
|
||||
private static final ByteBuffer ZERO_PAGE = ByteBuffer.allocateDirect(MappedByteBufferCache.PAGE_SIZE).order(ByteOrder.nativeOrder());
|
||||
|
||||
private final MappedByteBufferCache cache;
|
||||
private final int zerosMaxPage;
|
||||
private final UnpooledUnsafeDirectByteBufWrapper byteBufWrapper;
|
||||
private final ChannelBufferWrapper channelBufferWrapper;
|
||||
private MappedByteBuffer lastMapped;
|
||||
private long lastMappedStart;
|
||||
private long lastMappedLimit;
|
||||
|
@ -45,7 +46,8 @@ final class MappedFile implements AutoCloseable {
|
|||
this.lastMappedLimit = -1;
|
||||
this.position = 0;
|
||||
this.length = this.cache.fileSize();
|
||||
this.zerosMaxPage = Math.min(ZERO_PAGE.capacity(), (int) Math.min(Integer.MAX_VALUE, cache.overlapBytes()));
|
||||
this.byteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper();
|
||||
this.channelBufferWrapper = new ChannelBufferWrapper(this.byteBufWrapper, false);
|
||||
}
|
||||
|
||||
public static MappedFile of(File file, long chunckSize, long overlapSize) throws IOException {
|
||||
|
@ -58,6 +60,14 @@ final class MappedFile implements AutoCloseable {
|
|||
|
||||
private int checkOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
|
||||
if (!MappedByteBufferCache.inside(offset, lastMappedStart, lastMappedLimit)) {
|
||||
return updateOffset(offset, bytes);
|
||||
} else {
|
||||
final int bufferPosition = (int) (offset - lastMappedStart);
|
||||
return bufferPosition;
|
||||
}
|
||||
}
|
||||
|
||||
private int updateOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
|
||||
try {
|
||||
final int index = cache.indexFor(offset);
|
||||
final long mappedPosition = cache.mappedPositionFor(index);
|
||||
|
@ -75,10 +85,6 @@ final class MappedFile implements AutoCloseable {
|
|||
} catch (IllegalArgumentException e) {
|
||||
throw new BufferUnderflowException();
|
||||
}
|
||||
} else {
|
||||
final int bufferPosition = (int) (offset - lastMappedStart);
|
||||
return bufferPosition;
|
||||
}
|
||||
}
|
||||
|
||||
public void force() {
|
||||
|
@ -179,6 +185,26 @@ final class MappedFile implements AutoCloseable {
|
|||
return read;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes an encoded sequence of bytes to this file from the given buffer.
|
||||
* <p>
|
||||
* <p> Bytes are written starting at this file's current position,
|
||||
*/
|
||||
public void write(EncodingSupport encodingSupport) throws IOException {
|
||||
final int encodedSize = encodingSupport.getEncodeSize();
|
||||
final int bufferPosition = checkOffset(position, encodedSize);
|
||||
this.byteBufWrapper.wrap(this.lastMapped, bufferPosition, encodedSize);
|
||||
try {
|
||||
encodingSupport.encode(this.channelBufferWrapper);
|
||||
} finally {
|
||||
this.byteBufWrapper.reset();
|
||||
}
|
||||
position += encodedSize;
|
||||
if (position > this.length) {
|
||||
this.length = position;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a sequence of bytes to this file from the given buffer.
|
||||
* <p>
|
||||
|
@ -273,21 +299,20 @@ final class MappedFile implements AutoCloseable {
|
|||
* <p> Bytes are written starting at this file's current position,
|
||||
*/
|
||||
public void zeros(long offset, int count) throws IOException {
|
||||
final long targetOffset = offset + count;
|
||||
final int zerosBulkCopies = count / zerosMaxPage;
|
||||
final long srcAddress = PlatformDependent.directBufferAddress(ZERO_PAGE);
|
||||
for (int i = 0; i < zerosBulkCopies; i++) {
|
||||
final int bufferPosition = checkOffset(offset, zerosMaxPage);
|
||||
while (count > 0) {
|
||||
//do not need to validate the bytes count
|
||||
final int bufferPosition = checkOffset(offset, 0);
|
||||
final int endZerosPosition = (int)Math.min((long)bufferPosition + count, lastMapped.capacity());
|
||||
final int zeros = endZerosPosition - bufferPosition;
|
||||
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||
PlatformDependent.copyMemory(srcAddress, destAddress, zerosMaxPage);
|
||||
offset += zerosMaxPage;
|
||||
PlatformDependent.setMemory(destAddress, zeros, (byte) 0);
|
||||
offset += zeros;
|
||||
count -= zeros;
|
||||
//TODO need to call force on each write?
|
||||
//this.force();
|
||||
}
|
||||
final int remainingToBeZeroes = (int) (targetOffset - offset);
|
||||
final int bufferPosition = checkOffset(offset, remainingToBeZeroes);
|
||||
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||
PlatformDependent.copyMemory(srcAddress, destAddress, remainingToBeZeroes);
|
||||
if (targetOffset > this.length) {
|
||||
this.length = targetOffset;
|
||||
if (offset > this.length) {
|
||||
this.length = offset;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,16 +20,13 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.FileLock;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
|
@ -44,12 +41,11 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
private final long chunkBytes;
|
||||
private final long overlapBytes;
|
||||
private final IOCriticalErrorListener criticalErrorListener;
|
||||
private final MappedSequentialFileFactory factory;
|
||||
private File file;
|
||||
private File absoluteFile;
|
||||
private String fileName;
|
||||
private MappedFile mappedFile;
|
||||
private ActiveMQBuffer pooledActiveMQBuffer;
|
||||
private final MappedSequentialFileFactory factory;
|
||||
|
||||
MappedSequentialFile(MappedSequentialFileFactory factory,
|
||||
final File directory,
|
||||
|
@ -65,19 +61,24 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
this.chunkBytes = chunkBytes;
|
||||
this.overlapBytes = overlapBytes;
|
||||
this.mappedFile = null;
|
||||
this.pooledActiveMQBuffer = null;
|
||||
this.criticalErrorListener = criticalErrorListener;
|
||||
}
|
||||
|
||||
private void checkIsOpen() {
|
||||
if (!isOpen()) {
|
||||
throw new IllegalStateException("must be open!");
|
||||
throw new IllegalStateException("File not opened!");
|
||||
}
|
||||
}
|
||||
|
||||
private void checkIsOpen(IOCallback callback) {
|
||||
if (!isOpen()) {
|
||||
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
|
||||
}
|
||||
}
|
||||
|
||||
private void checkIsNotOpen() {
|
||||
if (isOpen()) {
|
||||
throw new IllegalStateException("must be closed!");
|
||||
throw new IllegalStateException("File opened!");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,7 +102,6 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
@Override
|
||||
public void open(int maxIO, boolean useExecutor) throws IOException {
|
||||
//ignore maxIO e useExecutor
|
||||
ActiveMQJournalLogger.LOGGER.warn("ignoring maxIO and useExecutor unsupported parameters!");
|
||||
this.open();
|
||||
}
|
||||
|
||||
|
@ -134,7 +134,7 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
|
||||
@Override
|
||||
public void delete() {
|
||||
checkIsNotOpen();
|
||||
close();
|
||||
if (file.exists() && !file.delete()) {
|
||||
ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
|
||||
}
|
||||
|
@ -142,10 +142,10 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws IOException {
|
||||
checkIsOpen();
|
||||
if (callback == null) {
|
||||
throw new NullPointerException("callback parameter need to be set");
|
||||
}
|
||||
checkIsOpen(callback);
|
||||
try {
|
||||
final ByteBuf byteBuf = bytes.byteBuf();
|
||||
final int writerIndex = byteBuf.writerIndex();
|
||||
|
@ -182,35 +182,17 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
}
|
||||
}
|
||||
|
||||
private ActiveMQBuffer acquiresActiveMQBufferWithAtLeast(int size) {
|
||||
if (this.pooledActiveMQBuffer == null || this.pooledActiveMQBuffer.capacity() < size) {
|
||||
this.pooledActiveMQBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size).order(ByteOrder.nativeOrder()));
|
||||
} else {
|
||||
this.pooledActiveMQBuffer.clear();
|
||||
}
|
||||
return pooledActiveMQBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws IOException {
|
||||
checkIsOpen();
|
||||
if (callback == null) {
|
||||
throw new NullPointerException("callback parameter need to be set");
|
||||
}
|
||||
checkIsOpen(callback);
|
||||
try {
|
||||
final int encodedSize = bytes.getEncodeSize();
|
||||
final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize);
|
||||
bytes.encode(outBuffer);
|
||||
final ByteBuf byteBuf = outBuffer.byteBuf();
|
||||
final int writerIndex = byteBuf.writerIndex();
|
||||
final int readerIndex = byteBuf.readerIndex();
|
||||
final int readableBytes = writerIndex - readerIndex;
|
||||
if (readableBytes > 0) {
|
||||
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
|
||||
this.mappedFile.write(bytes);
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
callback.done();
|
||||
} catch (IOException e) {
|
||||
if (this.criticalErrorListener != null) {
|
||||
|
@ -224,33 +206,26 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
@Override
|
||||
public void write(EncodingSupport bytes, boolean sync) throws IOException {
|
||||
checkIsOpen();
|
||||
final int encodedSize = bytes.getEncodeSize();
|
||||
final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize);
|
||||
bytes.encode(outBuffer);
|
||||
final ByteBuf byteBuf = outBuffer.byteBuf();
|
||||
final int writerIndex = byteBuf.writerIndex();
|
||||
final int readerIndex = byteBuf.readerIndex();
|
||||
final int readableBytes = writerIndex - readerIndex;
|
||||
if (readableBytes > 0) {
|
||||
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
|
||||
this.mappedFile.write(bytes);
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
|
||||
checkIsOpen();
|
||||
if (callback == null) {
|
||||
throw new NullPointerException("callback parameter need to be set");
|
||||
}
|
||||
checkIsOpen(callback);
|
||||
try {
|
||||
final int position = bytes.position();
|
||||
final int limit = bytes.limit();
|
||||
final int remaining = limit - position;
|
||||
if (remaining > 0) {
|
||||
this.mappedFile.write(bytes, position, remaining);
|
||||
final int newPosition = position + remaining;
|
||||
bytes.position(newPosition);
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
|
@ -273,6 +248,8 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
final int remaining = limit - position;
|
||||
if (remaining > 0) {
|
||||
this.mappedFile.write(bytes, position, remaining);
|
||||
final int newPosition = position + remaining;
|
||||
bytes.position(newPosition);
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
|
@ -281,10 +258,10 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
|
||||
@Override
|
||||
public int read(ByteBuffer bytes, IOCallback callback) throws IOException {
|
||||
checkIsOpen();
|
||||
if (callback == null) {
|
||||
throw new NullPointerException("callback parameter need to be set");
|
||||
}
|
||||
checkIsOpen(callback);
|
||||
try {
|
||||
final int position = bytes.position();
|
||||
final int limit = bytes.limit();
|
||||
|
@ -296,8 +273,10 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
bytes.flip();
|
||||
callback.done();
|
||||
return bytesRead;
|
||||
}
|
||||
} else {
|
||||
callback.done();
|
||||
return 0;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (this.criticalErrorListener != null) {
|
||||
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
|
@ -360,7 +339,14 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
|
||||
@Override
|
||||
public void renameTo(String newFileName) throws Exception {
|
||||
checkIsNotOpen();
|
||||
try {
|
||||
close();
|
||||
} catch (Exception e) {
|
||||
if (e instanceof IOException) {
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
if (this.fileName == null) {
|
||||
this.fileName = this.file.getName();
|
||||
}
|
||||
|
@ -388,14 +374,10 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
if (dstFile.isOpen()) {
|
||||
throw new IllegalArgumentException("dstFile must be closed too");
|
||||
}
|
||||
try (RandomAccessFile src = new RandomAccessFile(file, "rw");
|
||||
FileChannel srcChannel = src.getChannel();
|
||||
FileLock srcLock = srcChannel.lock()) {
|
||||
try (RandomAccessFile src = new RandomAccessFile(file, "rw"); FileChannel srcChannel = src.getChannel(); FileLock srcLock = srcChannel.lock()) {
|
||||
final long readableBytes = srcChannel.size();
|
||||
if (readableBytes > 0) {
|
||||
try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw");
|
||||
FileChannel dstChannel = dst.getChannel();
|
||||
FileLock dstLock = dstChannel.lock()) {
|
||||
try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw"); FileChannel dstChannel = dst.getChannel(); FileLock dstLock = dstChannel.lock()) {
|
||||
final long oldLength = dst.length();
|
||||
final long newLength = oldLength + readableBytes;
|
||||
dst.setLength(newLength);
|
||||
|
|
|
@ -20,7 +20,6 @@ import java.io.File;
|
|||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -29,30 +28,42 @@ import io.netty.util.internal.PlatformDependent;
|
|||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
||||
|
||||
public final class MappedSequentialFileFactory implements SequentialFileFactory {
|
||||
|
||||
private static long DEFAULT_BLOCK_SIZE = 64L << 20;
|
||||
private final File directory;
|
||||
private final IOCriticalErrorListener criticalErrorListener;
|
||||
private final TimedBuffer timedBuffer;
|
||||
private long chunkBytes;
|
||||
private long overlapBytes;
|
||||
private boolean useDataSync;
|
||||
private boolean supportCallbacks;
|
||||
|
||||
public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
|
||||
protected volatile int alignment = -1;
|
||||
|
||||
public MappedSequentialFileFactory(File directory,
|
||||
IOCriticalErrorListener criticalErrorListener,
|
||||
boolean supportCallbacks) {
|
||||
this.directory = directory;
|
||||
this.criticalErrorListener = criticalErrorListener;
|
||||
this.chunkBytes = DEFAULT_BLOCK_SIZE;
|
||||
this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
|
||||
this.useDataSync = true;
|
||||
this.timedBuffer = null;
|
||||
this.supportCallbacks = supportCallbacks;
|
||||
}
|
||||
|
||||
public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
|
||||
this(directory, criticalErrorListener, false);
|
||||
}
|
||||
|
||||
public MappedSequentialFileFactory(File directory) {
|
||||
this.directory = directory;
|
||||
this.criticalErrorListener = null;
|
||||
this.chunkBytes = DEFAULT_BLOCK_SIZE;
|
||||
this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
|
||||
this(directory, null);
|
||||
}
|
||||
|
||||
|
||||
public long chunkBytes() {
|
||||
return chunkBytes;
|
||||
}
|
||||
|
@ -73,7 +84,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
|
||||
@Override
|
||||
public SequentialFile createSequentialFile(String fileName) {
|
||||
return new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
|
||||
final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
|
||||
if (this.timedBuffer == null) {
|
||||
return mappedSequentialFile;
|
||||
} else {
|
||||
return new TimedSequentialFile(this, mappedSequentialFile);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -89,17 +105,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
|
||||
@Override
|
||||
public int getMaxIO() {
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listFiles(final String extension) throws Exception {
|
||||
final FilenameFilter extensionFilter = new FilenameFilter() {
|
||||
@Override
|
||||
public boolean accept(final File file, final String name) {
|
||||
return name.endsWith("." + extension);
|
||||
}
|
||||
};
|
||||
final FilenameFilter extensionFilter = (file, name) -> name.endsWith("." + extension);
|
||||
final String[] fileNames = directory.list(extensionFilter);
|
||||
if (fileNames == null) {
|
||||
return Collections.EMPTY_LIST;
|
||||
|
@ -109,7 +120,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
|
||||
@Override
|
||||
public boolean isSupportsCallbacks() {
|
||||
return false;
|
||||
return this.supportCallbacks;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -121,7 +132,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
|
||||
@Override
|
||||
public ByteBuffer allocateDirectBuffer(final int size) {
|
||||
return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
|
||||
return ByteBuffer.allocateDirect(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -131,7 +142,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
|
||||
@Override
|
||||
public ByteBuffer newBuffer(final int size) {
|
||||
return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
|
||||
return ByteBuffer.allocate(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -143,17 +154,23 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
|
||||
@Override
|
||||
public void activateBuffer(SequentialFile file) {
|
||||
|
||||
if (timedBuffer != null) {
|
||||
file.setTimedBuffer(timedBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deactivateBuffer() {
|
||||
|
||||
if (timedBuffer != null) {
|
||||
// When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer
|
||||
timedBuffer.flush();
|
||||
timedBuffer.setObserver(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer wrapBuffer(final byte[] bytes) {
|
||||
return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
|
||||
return ByteBuffer.wrap(bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -162,8 +179,8 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public SequentialFileFactory setAlignment(int alignment) {
|
||||
// no op
|
||||
public MappedSequentialFileFactory setAlignment(int alignment) {
|
||||
this.alignment = alignment;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -179,7 +196,6 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
|
||||
@Override
|
||||
public void clearBuffer(final ByteBuffer buffer) {
|
||||
buffer.clear();
|
||||
if (buffer.isDirect()) {
|
||||
BytesUtils.zerosDirect(buffer);
|
||||
} else if (buffer.hasArray()) {
|
||||
|
@ -193,16 +209,21 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
buffer.put(i, (byte) 0);
|
||||
}
|
||||
}
|
||||
buffer.rewind();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
if (timedBuffer != null) {
|
||||
timedBuffer.start();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
if (timedBuffer != null) {
|
||||
timedBuffer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -215,6 +236,8 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
|
||||
@Override
|
||||
public void flush() {
|
||||
|
||||
if (timedBuffer != null) {
|
||||
timedBuffer.flush();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,377 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.io.mapped;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.core.io.DummyCallback;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
||||
import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
|
||||
final class TimedSequentialFile implements SequentialFile {
|
||||
|
||||
private final SequentialFileFactory factory;
|
||||
private final SequentialFile sequentialFile;
|
||||
private final LocalBufferObserver observer;
|
||||
private final ThreadLocal<ResettableIOCallback> callbackPool;
|
||||
private TimedBuffer timedBuffer;
|
||||
|
||||
TimedSequentialFile(SequentialFileFactory factory, SequentialFile sequentialFile) {
|
||||
this.sequentialFile = sequentialFile;
|
||||
this.factory = factory;
|
||||
this.observer = new LocalBufferObserver();
|
||||
this.callbackPool = ThreadLocal.withInitial(ResettableIOCallback::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return this.sequentialFile.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean exists() {
|
||||
return this.sequentialFile.exists();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() throws Exception {
|
||||
this.sequentialFile.open();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(int maxIO, boolean useExecutor) throws Exception {
|
||||
this.sequentialFile.open(maxIO, useExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean fits(int size) {
|
||||
if (timedBuffer == null) {
|
||||
return this.sequentialFile.fits(size);
|
||||
} else {
|
||||
return timedBuffer.checkSize(size);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int calculateBlockStart(int position) throws Exception {
|
||||
return this.sequentialFile.calculateBlockStart(position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFileName() {
|
||||
return this.sequentialFile.getFileName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fill(int size) throws Exception {
|
||||
this.sequentialFile.fill(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete() throws IOException, InterruptedException, ActiveMQException {
|
||||
this.sequentialFile.delete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
|
||||
if (this.timedBuffer != null) {
|
||||
this.timedBuffer.addBytes(bytes, sync, callback);
|
||||
} else {
|
||||
this.sequentialFile.write(bytes, sync, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
|
||||
if (sync) {
|
||||
if (this.timedBuffer != null) {
|
||||
final ResettableIOCallback callback = callbackPool.get();
|
||||
try {
|
||||
this.timedBuffer.addBytes(bytes, true, callback);
|
||||
callback.waitCompletion();
|
||||
} finally {
|
||||
callback.reset();
|
||||
}
|
||||
} else {
|
||||
this.sequentialFile.write(bytes, true);
|
||||
}
|
||||
} else {
|
||||
if (this.timedBuffer != null) {
|
||||
this.timedBuffer.addBytes(bytes, false, DummyCallback.getInstance());
|
||||
} else {
|
||||
this.sequentialFile.write(bytes, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception {
|
||||
if (this.timedBuffer != null) {
|
||||
this.timedBuffer.addBytes(bytes, sync, callback);
|
||||
} else {
|
||||
this.sequentialFile.write(bytes, sync, callback);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(EncodingSupport bytes, boolean sync) throws Exception {
|
||||
if (sync) {
|
||||
if (this.timedBuffer != null) {
|
||||
final ResettableIOCallback callback = callbackPool.get();
|
||||
try {
|
||||
this.timedBuffer.addBytes(bytes, true, callback);
|
||||
callback.waitCompletion();
|
||||
} finally {
|
||||
callback.reset();
|
||||
}
|
||||
} else {
|
||||
this.sequentialFile.write(bytes, true);
|
||||
}
|
||||
} else {
|
||||
if (this.timedBuffer != null) {
|
||||
this.timedBuffer.addBytes(bytes, false, DummyCallback.getInstance());
|
||||
} else {
|
||||
this.sequentialFile.write(bytes, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
|
||||
this.sequentialFile.writeDirect(bytes, sync, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
|
||||
this.sequentialFile.writeDirect(bytes, sync);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(ByteBuffer bytes, IOCallback callback) throws Exception {
|
||||
return this.sequentialFile.read(bytes, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(ByteBuffer bytes) throws Exception {
|
||||
return this.sequentialFile.read(bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void position(long pos) throws IOException {
|
||||
this.sequentialFile.position(pos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long position() {
|
||||
return this.sequentialFile.position();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
this.sequentialFile.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
this.sequentialFile.sync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long size() throws Exception {
|
||||
return this.sequentialFile.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameTo(String newFileName) throws Exception {
|
||||
this.sequentialFile.renameTo(newFileName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SequentialFile cloneFile() {
|
||||
return new TimedSequentialFile(factory, this.sequentialFile.cloneFile());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copyTo(SequentialFile newFileName) throws Exception {
|
||||
this.sequentialFile.copyTo(newFileName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimedBuffer(TimedBuffer buffer) {
|
||||
if (this.timedBuffer != null) {
|
||||
this.timedBuffer.setObserver(null);
|
||||
}
|
||||
this.timedBuffer = buffer;
|
||||
if (buffer != null) {
|
||||
buffer.setObserver(this.observer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getJavaFile() {
|
||||
return this.sequentialFile.getJavaFile();
|
||||
}
|
||||
|
||||
private static final class ResettableIOCallback implements IOCallback {
|
||||
|
||||
private final CyclicBarrier cyclicBarrier;
|
||||
private int errorCode;
|
||||
private String errorMessage;
|
||||
|
||||
ResettableIOCallback() {
|
||||
this.cyclicBarrier = new CyclicBarrier(2);
|
||||
}
|
||||
|
||||
public void waitCompletion() throws InterruptedException, ActiveMQException, BrokenBarrierException {
|
||||
this.cyclicBarrier.await();
|
||||
if (this.errorMessage != null) {
|
||||
throw ActiveMQExceptionType.createException(this.errorCode, this.errorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
this.errorCode = 0;
|
||||
this.errorMessage = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done() {
|
||||
try {
|
||||
this.cyclicBarrier.await();
|
||||
} catch (BrokenBarrierException | InterruptedException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
try {
|
||||
this.errorCode = errorCode;
|
||||
this.errorMessage = errorMessage;
|
||||
this.cyclicBarrier.await();
|
||||
} catch (BrokenBarrierException | InterruptedException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class DelegateCallback implements IOCallback {
|
||||
|
||||
final List<IOCallback> delegates;
|
||||
|
||||
private DelegateCallback() {
|
||||
this.delegates = new ArrayList<>();
|
||||
}
|
||||
|
||||
public List<IOCallback> delegates() {
|
||||
return this.delegates;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done() {
|
||||
final int size = delegates.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
try {
|
||||
final IOCallback callback = delegates.get(i);
|
||||
callback.done();
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final int errorCode, final String errorMessage) {
|
||||
for (IOCallback callback : delegates) {
|
||||
try {
|
||||
callback.onError(errorCode, errorMessage);
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class LocalBufferObserver implements TimedBufferObserver {
|
||||
|
||||
private final ThreadLocal<DelegateCallback> callbacksPool = ThreadLocal.withInitial(DelegateCallback::new);
|
||||
|
||||
@Override
|
||||
public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) {
|
||||
buffer.flip();
|
||||
if (buffer.limit() == 0) {
|
||||
//if there are no bytes to flush, can release the callbacks
|
||||
final int size = callbacks.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
callbacks.get(i).done();
|
||||
}
|
||||
} else {
|
||||
final DelegateCallback delegateCallback = callbacksPool.get();
|
||||
final int size = callbacks.size();
|
||||
final List<IOCallback> delegates = delegateCallback.delegates();
|
||||
for (int i = 0; i < size; i++) {
|
||||
delegates.add(callbacks.get(i));
|
||||
}
|
||||
try {
|
||||
sequentialFile.writeDirect(buffer, requestedSync, delegateCallback);
|
||||
} finally {
|
||||
delegates.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer newBuffer(final int size, final int limit) {
|
||||
final int alignedSize = factory.calculateBlockSize(size);
|
||||
final int alignedLimit = factory.calculateBlockSize(limit);
|
||||
final ByteBuffer buffer = factory.newBuffer(alignedSize);
|
||||
buffer.limit(alignedLimit);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemainingBytes() {
|
||||
try {
|
||||
final int remaining = (int) Math.min(sequentialFile.size() - sequentialFile.position(), Integer.MAX_VALUE);
|
||||
return remaining;
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TimedBufferObserver on file (" + getFileName() + ")";
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,208 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.io;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue;
|
||||
import org.apache.activemq.artemis.ArtemisConstants;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.Journal;
|
||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||
|
||||
/**
|
||||
* To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
|
||||
*/
|
||||
public class JournalTptBenchmark {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final boolean useDefaultIoExecutor = true;
|
||||
final int fileSize = 1024 * 1024;
|
||||
final boolean dataSync = true;
|
||||
final Type type = Type.Mapped;
|
||||
final int tests = 5;
|
||||
final int warmup = 20_000;
|
||||
final int measurements = 20_000;
|
||||
final int msgSize = 100;
|
||||
final byte[] msgContent = new byte[msgSize];
|
||||
Arrays.fill(msgContent, (byte) 1);
|
||||
final int totalMessages = (measurements * tests + warmup);
|
||||
final File tmpDirectory = new File("./");
|
||||
//using the default configuration when the broker starts!
|
||||
final SequentialFileFactory factory;
|
||||
switch (type) {
|
||||
|
||||
case Mapped:
|
||||
final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
|
||||
factory = mappedFactory.chunkBytes(fileSize).overlapBytes(0).setDatasync(dataSync);
|
||||
break;
|
||||
case Nio:
|
||||
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
|
||||
break;
|
||||
case Aio:
|
||||
factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync);
|
||||
//disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
|
||||
if (!LibaioContext.isLoaded()) {
|
||||
throw new IllegalStateException("lib AIO not loaded!");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("unsupported case");
|
||||
}
|
||||
|
||||
int numFiles = (int) (totalMessages * factory.calculateBlockSize(msgSize)) / fileSize;
|
||||
if (numFiles < 2) {
|
||||
numFiles = 2;
|
||||
}
|
||||
ExecutorService service = null;
|
||||
final Journal journal;
|
||||
if (useDefaultIoExecutor) {
|
||||
journal = new JournalImpl(fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO());
|
||||
journal.start();
|
||||
} else {
|
||||
final ArrayList<MpscArrayQueue<Runnable>> tasks = new ArrayList<>();
|
||||
service = Executors.newSingleThreadExecutor();
|
||||
journal = new JournalImpl(() -> new Executor() {
|
||||
|
||||
private final MpscArrayQueue<Runnable> taskQueue = new MpscArrayQueue<>(1024);
|
||||
|
||||
{
|
||||
tasks.add(taskQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
while (!taskQueue.offer(command)) {
|
||||
LockSupport.parkNanos(1L);
|
||||
}
|
||||
}
|
||||
}, fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO(), 0);
|
||||
journal.start();
|
||||
service.execute(() -> {
|
||||
final int size = tasks.size();
|
||||
final int capacity = 1024;
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
for (int i = 0; i < size; i++) {
|
||||
final MpscArrayQueue<Runnable> runnables = tasks.get(i);
|
||||
for (int j = 0; j < capacity; j++) {
|
||||
final Runnable task = runnables.poll();
|
||||
if (task == null) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
task.run();
|
||||
} catch (Throwable t) {
|
||||
System.err.println(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
try {
|
||||
journal.load(new ArrayList<RecordInfo>(), null, null);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
try {
|
||||
final EncodingSupport encodingSupport = new EncodingSupport() {
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
return msgSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer) {
|
||||
final int writerIndex = buffer.writerIndex();
|
||||
buffer.setBytes(writerIndex, msgContent);
|
||||
buffer.writerIndex(writerIndex + msgSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buffer) {
|
||||
|
||||
}
|
||||
};
|
||||
long id = 1;
|
||||
{
|
||||
final long elapsed = writeMeasurements(id, journal, encodingSupport, warmup);
|
||||
id += warmup;
|
||||
System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
|
||||
}
|
||||
for (int t = 0; t < tests; t++) {
|
||||
final long elapsed = writeMeasurements(id, journal, encodingSupport, measurements);
|
||||
System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
|
||||
id += warmup;
|
||||
}
|
||||
|
||||
} finally {
|
||||
journal.stop();
|
||||
if (service != null) {
|
||||
service.shutdown();
|
||||
}
|
||||
final File[] fileToDeletes = tmpDirectory.listFiles();
|
||||
System.out.println("Files to deletes" + Arrays.toString(fileToDeletes));
|
||||
Stream.of(fileToDeletes).forEach(File::delete);
|
||||
}
|
||||
}
|
||||
|
||||
private static long writeMeasurements(long id,
|
||||
Journal journal,
|
||||
EncodingSupport encodingSupport,
|
||||
int measurements) throws Exception {
|
||||
System.gc();
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
|
||||
final long start = System.nanoTime();
|
||||
for (int i = 0; i < measurements; i++) {
|
||||
write(id, journal, encodingSupport);
|
||||
id++;
|
||||
}
|
||||
final long elapsed = System.nanoTime() - start;
|
||||
return elapsed;
|
||||
}
|
||||
|
||||
private static void write(long id, Journal journal, EncodingSupport encodingSupport) throws Exception {
|
||||
journal.appendAddRecord(id, (byte) 1, encodingSupport, false);
|
||||
final SimpleWaitIOCallback ioCallback = new SimpleWaitIOCallback();
|
||||
journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true, ioCallback);
|
||||
ioCallback.waitCompletion();
|
||||
}
|
||||
|
||||
private enum Type {
|
||||
|
||||
Mapped, Nio, Aio
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,203 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.io;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
||||
import org.apache.activemq.artemis.ArtemisConstants;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||
|
||||
/**
|
||||
* To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
|
||||
*/
|
||||
public class SequentialFileTptBenchmark {
|
||||
|
||||
private static final FastWaitIOCallback CALLBACK = new FastWaitIOCallback();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final boolean dataSync = true;
|
||||
final boolean writeSync = true;
|
||||
final Type type = Type.Mapped;
|
||||
final int tests = 10;
|
||||
final int warmup = 20_000;
|
||||
final int measurements = 20_000;
|
||||
final int msgSize = 100;
|
||||
final byte[] msgContent = new byte[msgSize];
|
||||
Arrays.fill(msgContent, (byte) 1);
|
||||
final File tmpDirectory = new File("./");
|
||||
//using the default configuration when the broker starts!
|
||||
final SequentialFileFactory factory;
|
||||
switch (type) {
|
||||
|
||||
case Mapped:
|
||||
final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
|
||||
final int alignedMessageSize = mappedFactory.calculateBlockSize(msgSize);
|
||||
final int totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
|
||||
factory = mappedFactory.chunkBytes(totalFileSize).overlapBytes(0).setDatasync(dataSync);
|
||||
break;
|
||||
case Nio:
|
||||
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
|
||||
break;
|
||||
case Aio:
|
||||
factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync);
|
||||
//disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
|
||||
if (!LibaioContext.isLoaded()) {
|
||||
throw new IllegalStateException("lib AIO not loaded!");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("unsupported case");
|
||||
}
|
||||
factory.start();
|
||||
try {
|
||||
final EncodingSupport encodingSupport = new EncodingSupport() {
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
return msgSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer) {
|
||||
final int writerIndex = buffer.writerIndex();
|
||||
buffer.setBytes(writerIndex, msgContent);
|
||||
buffer.writerIndex(writerIndex + msgSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buffer) {
|
||||
|
||||
}
|
||||
};
|
||||
final int alignedMessageSize = factory.calculateBlockSize(msgSize);
|
||||
final long totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
|
||||
if (totalFileSize > Integer.MAX_VALUE)
|
||||
throw new IllegalArgumentException("reduce measurements/warmup");
|
||||
final int fileSize = (int) totalFileSize;
|
||||
final SequentialFile sequentialFile = factory.createSequentialFile("seq.dat");
|
||||
sequentialFile.getJavaFile().delete();
|
||||
sequentialFile.getJavaFile().deleteOnExit();
|
||||
sequentialFile.open();
|
||||
final long startZeros = System.nanoTime();
|
||||
sequentialFile.fill(fileSize);
|
||||
final long elapsedZeros = System.nanoTime() - startZeros;
|
||||
System.out.println("Zeroed " + fileSize + " bytes in " + TimeUnit.NANOSECONDS.toMicros(elapsedZeros) + " us");
|
||||
try {
|
||||
{
|
||||
final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, warmup, writeSync);
|
||||
System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
|
||||
}
|
||||
for (int t = 0; t < tests; t++) {
|
||||
final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, measurements, writeSync);
|
||||
System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
|
||||
}
|
||||
} finally {
|
||||
sequentialFile.close();
|
||||
}
|
||||
} finally {
|
||||
factory.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private static long writeMeasurements(SequentialFileFactory sequentialFileFactory,
|
||||
SequentialFile sequentialFile,
|
||||
EncodingSupport encodingSupport,
|
||||
int measurements,
|
||||
boolean writeSync) throws Exception {
|
||||
//System.gc();
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
sequentialFileFactory.activateBuffer(sequentialFile);
|
||||
sequentialFile.position(0);
|
||||
final long start = System.nanoTime();
|
||||
for (int i = 0; i < measurements; i++) {
|
||||
write(sequentialFile, encodingSupport, writeSync);
|
||||
}
|
||||
sequentialFileFactory.deactivateBuffer();
|
||||
final long elapsed = System.nanoTime() - start;
|
||||
return elapsed;
|
||||
}
|
||||
|
||||
private static void write(SequentialFile sequentialFile,
|
||||
EncodingSupport encodingSupport,
|
||||
boolean sync) throws Exception {
|
||||
//this pattern is necessary to ensure that NIO's TimedBuffer fill flush the buffer and know the real size of it
|
||||
if (sequentialFile.fits(encodingSupport.getEncodeSize())) {
|
||||
final FastWaitIOCallback ioCallback = CALLBACK.reset();
|
||||
sequentialFile.write(encodingSupport, sync, ioCallback);
|
||||
ioCallback.waitCompletion();
|
||||
} else {
|
||||
throw new IllegalStateException("can't happen!");
|
||||
}
|
||||
}
|
||||
|
||||
private enum Type {
|
||||
|
||||
Mapped, Nio, Aio
|
||||
|
||||
}
|
||||
|
||||
private static final class FastWaitIOCallback implements IOCallback {
|
||||
|
||||
private final AtomicBoolean done = new AtomicBoolean(false);
|
||||
private int errorCode = 0;
|
||||
private String errorMessage = null;
|
||||
|
||||
public FastWaitIOCallback reset() {
|
||||
errorCode = 0;
|
||||
errorMessage = null;
|
||||
done.lazySet(false);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done() {
|
||||
errorCode = 0;
|
||||
errorMessage = null;
|
||||
done.lazySet(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
this.errorCode = errorCode;
|
||||
this.errorMessage = errorMessage;
|
||||
done.lazySet(true);
|
||||
}
|
||||
|
||||
public void waitCompletion() throws InterruptedException, ActiveMQException {
|
||||
final Thread currentThread = Thread.currentThread();
|
||||
while (!done.get()) {
|
||||
LockSupport.parkNanos(1L);
|
||||
if (currentThread.isInterrupted())
|
||||
throw new InterruptedException();
|
||||
}
|
||||
if (errorMessage != null) {
|
||||
throw ActiveMQExceptionType.createException(errorCode, errorMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
|||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.Journal;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||
|
@ -136,6 +137,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
ActiveMQServerLogger.LOGGER.journalUseAIO();
|
||||
journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener);
|
||||
break;
|
||||
case MAPPED:
|
||||
ActiveMQServerLogger.LOGGER.journalUseMAPPED();
|
||||
//the mapped version do not need buffering by default
|
||||
journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), criticalErrorListener, true).chunkBytes(config.getJournalFileSize()).overlapBytes(0);
|
||||
break;
|
||||
default:
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
|
||||
}
|
||||
|
|
|
@ -1558,8 +1558,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
@Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void invalidMessageCounterPeriod(long value);
|
||||
|
||||
@LogMessage(level = Logger.Level.INFO)
|
||||
@Message(id = 224073, value = "Using MAPPED Journal", format = Message.Format.MESSAGE_FORMAT)
|
||||
void journalUseMAPPED();
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 224073, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT)
|
||||
@Message(id = 224074, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT)
|
||||
void failedToPurgeQueue(@Cause Exception e, SimpleString bindingName);
|
||||
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ public enum JournalType {
|
|||
switch (type) {
|
||||
case "NIO": return NIO;
|
||||
case "ASYNCIO" : return ASYNCIO;
|
||||
case "MAPPED" : return MAPPED;
|
||||
default: throw new IllegalStateException("Invalid JournalType:" + type + " valid Types: " + validValues);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -584,6 +584,7 @@
|
|||
<xsd:restriction base="xsd:string">
|
||||
<xsd:enumeration value="ASYNCIO"/>
|
||||
<xsd:enumeration value="NIO"/>
|
||||
<xsd:enumeration value="MAPPED"/>
|
||||
</xsd:restriction>
|
||||
</xsd:simpleType>
|
||||
</xsd:element>
|
||||
|
|
|
@ -576,6 +576,7 @@
|
|||
<xsd:restriction base="xsd:string">
|
||||
<xsd:enumeration value="ASYNCIO"/>
|
||||
<xsd:enumeration value="NIO"/>
|
||||
<xsd:enumeration value="MAPPED"/>
|
||||
</xsd:restriction>
|
||||
</xsd:simpleType>
|
||||
</xsd:element>
|
||||
|
|
|
@ -59,7 +59,8 @@ public class AIOJournalImplTest extends JournalImplTestUnit {
|
|||
|
||||
file.mkdir();
|
||||
|
||||
return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 1000000, 10, false);
|
||||
// forcing the alignment to be 512, as this test was hard coded around this size.
|
||||
return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 1000000, 10, false).setAlignment(512);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.journal;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
|
||||
public class MappedImportExportTest extends NIOImportExportTest {
|
||||
|
||||
@Override
|
||||
protected SequentialFileFactory getFileFactory() throws Exception {
|
||||
return new MappedSequentialFileFactory(getTestDirfile());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.journal;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
|
||||
public class MappedJournalCompactTest extends NIOJournalCompactTest {
|
||||
|
||||
@Override
|
||||
protected SequentialFileFactory getFileFactory() throws Exception {
|
||||
File file = new File(getTestDir());
|
||||
|
||||
ActiveMQTestBase.deleteDirectory(file);
|
||||
|
||||
file.mkdir();
|
||||
|
||||
return new MappedSequentialFileFactory(getTestDirfile());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.journal;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
|
||||
|
||||
public class MappedJournalImplTest extends JournalImplTestUnit {
|
||||
|
||||
@Override
|
||||
protected SequentialFileFactory getFileFactory() throws Exception {
|
||||
File file = new File(getTestDir());
|
||||
|
||||
deleteDirectory(file);
|
||||
|
||||
file.mkdir();
|
||||
|
||||
return new MappedSequentialFileFactory(getTestDirfile());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getAlignment() {
|
||||
return fileFactory.getAlignment();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,184 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.journal;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBase {
|
||||
|
||||
@Override
|
||||
protected SequentialFileFactory createFactory(String folder) {
|
||||
return new MappedSequentialFileFactory(new File(folder));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterrupts() throws Throwable {
|
||||
|
||||
final EncodingSupport fakeEncoding = new EncodingSupport() {
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
return 10;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer) {
|
||||
buffer.writeBytes(new byte[10]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buffer) {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
final AtomicInteger calls = new AtomicInteger(0);
|
||||
final MappedSequentialFileFactory factory = new MappedSequentialFileFactory(new File(getTestDir()), (code, message, file) -> {
|
||||
new Exception("shutdown").printStackTrace();
|
||||
calls.incrementAndGet();
|
||||
});
|
||||
|
||||
Thread threadOpen = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Thread.currentThread().interrupt();
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadOpen.start();
|
||||
threadOpen.join();
|
||||
|
||||
Thread threadClose = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
file.write(fakeEncoding, true);
|
||||
Thread.currentThread().interrupt();
|
||||
file.close();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadClose.start();
|
||||
threadClose.join();
|
||||
|
||||
Thread threadWrite = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
Thread.currentThread().interrupt();
|
||||
file.write(fakeEncoding, true);
|
||||
file.close();
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadWrite.start();
|
||||
threadWrite.join();
|
||||
|
||||
Thread threadFill = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
Thread.currentThread().interrupt();
|
||||
file.fill(1024);
|
||||
file.close();
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadFill.start();
|
||||
threadFill.join();
|
||||
|
||||
Thread threadWriteDirect = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
ByteBuffer buffer = ByteBuffer.allocate(10);
|
||||
buffer.put(new byte[10]);
|
||||
Thread.currentThread().interrupt();
|
||||
file.writeDirect(buffer, true);
|
||||
file.close();
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadWriteDirect.start();
|
||||
threadWriteDirect.join();
|
||||
|
||||
Thread threadRead = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
SequentialFile file = factory.createSequentialFile("file.txt");
|
||||
file.open();
|
||||
file.write(fakeEncoding, true);
|
||||
file.position(0);
|
||||
ByteBuffer readBytes = ByteBuffer.allocate(fakeEncoding.getEncodeSize());
|
||||
Thread.currentThread().interrupt();
|
||||
file.read(readBytes);
|
||||
file.close();
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
threadRead.start();
|
||||
threadRead.join();
|
||||
|
||||
// An interrupt exception shouldn't issue a shutdown
|
||||
Assert.assertEquals(0, calls.get());
|
||||
}
|
||||
}
|
|
@ -22,8 +22,11 @@ import java.util.List;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.artemis.ArtemisConstants;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.LoaderCallback;
|
||||
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
||||
|
@ -102,6 +105,27 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
|
|||
internalTest("nio2", getTestDir(), 10000, 0, true, true, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMMap() throws Exception {
|
||||
internalTest("mmap", getTestDir(), 10000, 100, true, true, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMMAPHugeTransaction() throws Exception {
|
||||
internalTest("mmap", getTestDir(), 10000, 10000, true, true, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMMAPOMultiThread() throws Exception {
|
||||
internalTest("mmap", getTestDir(), 1000, 100, true, true, 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMMAPNonTransactional() throws Exception {
|
||||
internalTest("mmap", getTestDir(), 10000, 0, true, true, 1);
|
||||
}
|
||||
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
||||
private void internalTest(final String type,
|
||||
|
@ -234,7 +258,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
|
|||
|
||||
if (args.length != 5) {
|
||||
System.err.println("Use: java -cp <classpath> " + ValidateTransactionHealthTest.class.getCanonicalName() +
|
||||
" aio|nio <journalDirectory> <NumberOfElements> <TransactionSize> <NumberOfThreads>");
|
||||
" aio|nio|mmap <journalDirectory> <NumberOfElements> <TransactionSize> <NumberOfThreads>");
|
||||
System.exit(-1);
|
||||
}
|
||||
System.out.println("Running");
|
||||
|
@ -320,15 +344,22 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
public static JournalImpl createJournal(final String journalType, final String journalDir) {
|
||||
JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir), "journaltst", "tst", 500);
|
||||
JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir, 10485760), "journaltst", "tst", 500);
|
||||
return journal;
|
||||
}
|
||||
|
||||
public static SequentialFileFactory getFactory(final String factoryType, final String directory) {
|
||||
public static SequentialFileFactory getFactory(final String factoryType, final String directory, int fileSize) {
|
||||
if (factoryType.equals("aio")) {
|
||||
return new AIOSequentialFileFactory(new File(directory), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 10, false);
|
||||
} else if (factoryType.equals("nio2")) {
|
||||
return new NIOSequentialFileFactory(new File(directory), true, 1);
|
||||
} else if (factoryType.equals("mmap")) {
|
||||
return new MappedSequentialFileFactory(new File(directory), new IOCriticalErrorListener() {
|
||||
@Override
|
||||
public void onIOException(Throwable code, String message, SequentialFile file) {
|
||||
code.printStackTrace();
|
||||
}
|
||||
}, true).chunkBytes(fileSize).overlapBytes(0);
|
||||
} else {
|
||||
return new NIOSequentialFileFactory(new File(directory), false, 1);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue