ARTEMIS-1266 Mapped Journal refactoring
The MAPPED journal refactoring include: - simplified lifecycle and logic (eg fixed file size with single mmap memory region) - supports for the TimedBuffer to coalesce msyncs (via Decorator pattern) - TLAB pooling of direct ByteBuffer like the NIO journal - remove of old benchmarks and benchmark dependencies
This commit is contained in:
parent
43dcc572e1
commit
7075e2e457
|
@ -24,7 +24,6 @@ import java.text.DecimalFormat;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||
|
@ -190,13 +189,9 @@ public class SyncCalculation {
|
|||
((AIOSequentialFileFactory) factory).disableBufferReuse();
|
||||
return factory;
|
||||
case MAPPED:
|
||||
factory = new MappedSequentialFileFactory(datafolder, new IOCriticalErrorListener() {
|
||||
@Override
|
||||
public void onIOException(Throwable code, String message, SequentialFile file) {
|
||||
|
||||
}
|
||||
}, true).chunkBytes(fileSize).overlapBytes(0).setDatasync(datasync);
|
||||
|
||||
factory = MappedSequentialFileFactory.unbuffered(datafolder, fileSize, null)
|
||||
.setDatasync(datasync)
|
||||
.disableBufferReuse();
|
||||
factory.start();
|
||||
return factory;
|
||||
default:
|
||||
|
|
|
@ -29,8 +29,32 @@ final class BytesUtils {
|
|||
return (value + (alignment - 1)) & ~(alignment - 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is a value a positive power of two.
|
||||
*
|
||||
* @param value to be checked.
|
||||
* @return true if the number is a positive power of two otherwise false.
|
||||
*/
|
||||
public static boolean isPowOf2(final int value) {
|
||||
return Integer.bitCount(value) == 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if a value is pow2alignment-aligned.
|
||||
*
|
||||
* @param value to be tested.
|
||||
* @param pow2alignment boundary the address is tested against.
|
||||
* @return true if the address is on the aligned boundary otherwise false.
|
||||
* @throws IllegalArgumentException if the alignment is not a power of 2
|
||||
*/
|
||||
public static boolean isAligned(final long value, final int pow2alignment) {
|
||||
if (!isPowOf2(pow2alignment)) {
|
||||
throw new IllegalArgumentException("Alignment must be a power of 2");
|
||||
}
|
||||
return (value & (pow2alignment - 1)) == 0;
|
||||
}
|
||||
|
||||
public static void zerosDirect(final ByteBuffer buffer) {
|
||||
//TODO When PlatformDependent will be replaced by VarHandle or Unsafe, replace with safepoint-fixed setMemory
|
||||
//DANGEROUS!! erases bound-checking using directly addresses -> safe only if it use counted loops
|
||||
int remaining = buffer.capacity();
|
||||
long address = PlatformDependent.directBufferAddress(buffer);
|
||||
|
|
|
@ -1,224 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.io.mapped;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.FileLock;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
final class MappedByteBufferCache implements AutoCloseable {
|
||||
|
||||
public static final int PAGE_SIZE = Integer.parseInt(System.getProperty("os_page_size", "4096"));
|
||||
private static final Object FILE_LOCK = new Object();
|
||||
private final RandomAccessFile raf;
|
||||
private final FileChannel fileChannel;
|
||||
private final long chunkBytes;
|
||||
private final long overlapBytes;
|
||||
private final ArrayList<WeakReference<MappedByteBuffer>> byteBuffers;
|
||||
private final File file;
|
||||
private final long mappedSize;
|
||||
private boolean closed;
|
||||
|
||||
private MappedByteBufferCache(File file, RandomAccessFile raf, long chunkBytes, long overlapBytes, long alignment) {
|
||||
this.byteBuffers = new ArrayList<>();
|
||||
this.file = file;
|
||||
this.raf = raf;
|
||||
this.fileChannel = raf.getChannel();
|
||||
this.chunkBytes = BytesUtils.align(chunkBytes, alignment);
|
||||
this.overlapBytes = BytesUtils.align(overlapBytes, alignment);
|
||||
this.closed = false;
|
||||
this.mappedSize = this.chunkBytes + this.overlapBytes;
|
||||
}
|
||||
|
||||
public static MappedByteBufferCache of(File file, long chunkSize, long overlapSize) throws FileNotFoundException {
|
||||
final RandomAccessFile raf = new RandomAccessFile(file, "rw");
|
||||
return new MappedByteBufferCache(file, raf, chunkSize, overlapSize, PAGE_SIZE);
|
||||
}
|
||||
|
||||
public static boolean inside(long position, long mappedPosition, long mappedLimit) {
|
||||
return mappedPosition <= position && position < mappedLimit;
|
||||
}
|
||||
|
||||
public File file() {
|
||||
return file;
|
||||
}
|
||||
|
||||
public long chunkBytes() {
|
||||
return chunkBytes;
|
||||
}
|
||||
|
||||
public long overlapBytes() {
|
||||
return overlapBytes;
|
||||
}
|
||||
|
||||
public int indexFor(long position) {
|
||||
final int chunk = (int) (position / chunkBytes);
|
||||
return chunk;
|
||||
}
|
||||
|
||||
public long mappedPositionFor(int index) {
|
||||
return index * chunkBytes;
|
||||
}
|
||||
|
||||
public long mappedLimitFor(long mappedPosition) {
|
||||
return mappedPosition + chunkBytes;
|
||||
}
|
||||
|
||||
public MappedByteBuffer acquireMappedByteBuffer(final int index) throws IOException, IllegalArgumentException, IllegalStateException {
|
||||
if (closed)
|
||||
throw new IOException("Closed");
|
||||
if (index < 0)
|
||||
throw new IOException("Attempt to access a negative index: " + index);
|
||||
while (byteBuffers.size() <= index) {
|
||||
byteBuffers.add(null);
|
||||
}
|
||||
final WeakReference<MappedByteBuffer> mbbRef = byteBuffers.get(index);
|
||||
if (mbbRef != null) {
|
||||
final MappedByteBuffer mbb = mbbRef.get();
|
||||
if (mbb != null) {
|
||||
return mbb;
|
||||
}
|
||||
}
|
||||
return mapAndAcquire(index);
|
||||
}
|
||||
|
||||
//METHOD BUILT TO SEPARATE THE SLOW PATH TO ENSURE INLINING OF THE MOST OCCURRING CASE
|
||||
private MappedByteBuffer mapAndAcquire(final int index) throws IOException {
|
||||
final long chunkStartPosition = mappedPositionFor(index);
|
||||
final long minSize = chunkStartPosition + mappedSize;
|
||||
if (fileChannel.size() < minSize) {
|
||||
try {
|
||||
synchronized (FILE_LOCK) {
|
||||
try (FileLock lock = fileChannel.lock()) {
|
||||
final long size = fileChannel.size();
|
||||
if (size < minSize) {
|
||||
raf.setLength(minSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw new IOException("Failed to resize to " + minSize, ioe);
|
||||
}
|
||||
}
|
||||
|
||||
final MappedByteBuffer mbb = fileChannel.map(FileChannel.MapMode.READ_WRITE, chunkStartPosition, mappedSize);
|
||||
mbb.order(ByteOrder.nativeOrder());
|
||||
byteBuffers.set(index, new WeakReference<>(mbb));
|
||||
return mbb;
|
||||
}
|
||||
|
||||
public long fileSize() throws IOException {
|
||||
if (closed)
|
||||
throw new IllegalStateException("Closed");
|
||||
return fileChannel.size();
|
||||
}
|
||||
|
||||
public void closeAndResize(long length) {
|
||||
if (!closed) {
|
||||
//TO_FIX: unmap in this way is not portable BUT required on Windows that can't resize a memmory mapped file!
|
||||
for (final WeakReference<MappedByteBuffer> mbbRef : this.byteBuffers) {
|
||||
if (mbbRef != null) {
|
||||
final MappedByteBuffer mbb = mbbRef.get();
|
||||
if (mbb != null) {
|
||||
try {
|
||||
PlatformDependent.freeDirectBuffer(mbb);
|
||||
} catch (Throwable t) {
|
||||
//TO_FIX: force releasing of the other buffers
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
this.byteBuffers.clear();
|
||||
try {
|
||||
if (fileChannel.size() != length) {
|
||||
try {
|
||||
synchronized (FILE_LOCK) {
|
||||
try (FileLock lock = fileChannel.lock()) {
|
||||
final long size = fileChannel.size();
|
||||
if (size != length) {
|
||||
raf.setLength(length);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw new IllegalStateException("Failed to resize to " + length, ioe);
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
throw new IllegalStateException("Failed to get size", ex);
|
||||
} finally {
|
||||
try {
|
||||
fileChannel.close();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Failed to close channel", e);
|
||||
} finally {
|
||||
try {
|
||||
raf.close();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Failed to close RandomAccessFile", e);
|
||||
}
|
||||
}
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (!closed) {
|
||||
//TO_FIX: unmap in this way is not portable BUT required on Windows that can't resize a memory mapped file!
|
||||
for (final WeakReference<MappedByteBuffer> mbbRef : this.byteBuffers) {
|
||||
if (mbbRef != null) {
|
||||
final MappedByteBuffer mbb = mbbRef.get();
|
||||
if (mbb != null) {
|
||||
try {
|
||||
PlatformDependent.freeDirectBuffer(mbb);
|
||||
} catch (Throwable t) {
|
||||
//TO_FIX: force releasing of the other buffers
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
this.byteBuffers.clear();
|
||||
try {
|
||||
fileChannel.close();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Failed to close channel", e);
|
||||
} finally {
|
||||
try {
|
||||
raf.close();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Failed to close RandomAccessFile", e);
|
||||
}
|
||||
}
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,79 +18,66 @@ package org.apache.activemq.artemis.core.io.mapped;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.BufferUnderflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.UnpooledUnsafeDirectByteBufWrapper;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.utils.Env;
|
||||
|
||||
final class MappedFile implements AutoCloseable {
|
||||
|
||||
private final MappedByteBufferCache cache;
|
||||
private static final int OS_PAGE_SIZE = Env.osPageSize();
|
||||
private final MappedByteBuffer buffer;
|
||||
private final FileChannel channel;
|
||||
private final long address;
|
||||
private final UnpooledUnsafeDirectByteBufWrapper byteBufWrapper;
|
||||
private final ChannelBufferWrapper channelBufferWrapper;
|
||||
private MappedByteBuffer lastMapped;
|
||||
private long lastMappedStart;
|
||||
private long lastMappedLimit;
|
||||
private long position;
|
||||
private long length;
|
||||
private int position;
|
||||
private int length;
|
||||
|
||||
private MappedFile(MappedByteBufferCache cache) throws IOException {
|
||||
this.cache = cache;
|
||||
this.lastMapped = null;
|
||||
this.lastMappedStart = -1;
|
||||
this.lastMappedLimit = -1;
|
||||
this.position = 0;
|
||||
this.length = this.cache.fileSize();
|
||||
private MappedFile(FileChannel channel, MappedByteBuffer byteBuffer, int position, int length) throws IOException {
|
||||
this.channel = channel;
|
||||
this.buffer = byteBuffer;
|
||||
this.position = position;
|
||||
this.length = length;
|
||||
this.byteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper();
|
||||
this.channelBufferWrapper = new ChannelBufferWrapper(this.byteBufWrapper, false);
|
||||
this.address = PlatformDependent.directBufferAddress(buffer);
|
||||
}
|
||||
|
||||
public static MappedFile of(File file, long chunckSize, long overlapSize) throws IOException {
|
||||
return new MappedFile(MappedByteBufferCache.of(file, chunckSize, overlapSize));
|
||||
}
|
||||
|
||||
public MappedByteBufferCache cache() {
|
||||
return cache;
|
||||
}
|
||||
|
||||
private int checkOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
|
||||
if (!MappedByteBufferCache.inside(offset, lastMappedStart, lastMappedLimit)) {
|
||||
return updateOffset(offset, bytes);
|
||||
} else {
|
||||
final int bufferPosition = (int) (offset - lastMappedStart);
|
||||
return bufferPosition;
|
||||
public static MappedFile of(File file, int position, int capacity) throws IOException {
|
||||
final MappedByteBuffer buffer;
|
||||
final int length;
|
||||
final FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);
|
||||
length = (int) channel.size();
|
||||
if (length != capacity && length != 0) {
|
||||
channel.close();
|
||||
throw new IllegalStateException("the file is not " + capacity + " bytes long!");
|
||||
}
|
||||
buffer = channel.map(FileChannel.MapMode.READ_WRITE, position, capacity);
|
||||
return new MappedFile(channel, buffer, 0, length);
|
||||
}
|
||||
|
||||
private int updateOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
|
||||
try {
|
||||
final int index = cache.indexFor(offset);
|
||||
final long mappedPosition = cache.mappedPositionFor(index);
|
||||
final long mappedLimit = cache.mappedLimitFor(mappedPosition);
|
||||
if (offset + bytes > mappedLimit) {
|
||||
throw new IOException("mapping overflow!");
|
||||
}
|
||||
lastMapped = cache.acquireMappedByteBuffer(index);
|
||||
lastMappedStart = mappedPosition;
|
||||
lastMappedLimit = mappedLimit;
|
||||
final int bufferPosition = (int) (offset - mappedPosition);
|
||||
return bufferPosition;
|
||||
} catch (IllegalStateException e) {
|
||||
throw new IOException(e);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new BufferUnderflowException();
|
||||
}
|
||||
public FileChannel channel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
public MappedByteBuffer mapped() {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public long address() {
|
||||
return this.address;
|
||||
}
|
||||
|
||||
public void force() {
|
||||
if (lastMapped != null) {
|
||||
lastMapped.force();
|
||||
}
|
||||
this.buffer.force();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -98,9 +85,8 @@ final class MappedFile implements AutoCloseable {
|
|||
* <p>
|
||||
* <p> Bytes are read starting at this file's specified position.
|
||||
*/
|
||||
public int read(long position, ByteBuf dst, int dstStart, int dstLength) throws IOException {
|
||||
final int bufferPosition = checkOffset(position, dstLength);
|
||||
final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||
public int read(int position, ByteBuf dst, int dstStart, int dstLength) throws IOException {
|
||||
final long srcAddress = this.address + position;
|
||||
if (dst.hasMemoryAddress()) {
|
||||
final long dstAddress = dst.memoryAddress() + dstStart;
|
||||
PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength);
|
||||
|
@ -122,9 +108,8 @@ final class MappedFile implements AutoCloseable {
|
|||
* <p>
|
||||
* <p> Bytes are read starting at this file's specified position.
|
||||
*/
|
||||
public int read(long position, ByteBuffer dst, int dstStart, int dstLength) throws IOException {
|
||||
final int bufferPosition = checkOffset(position, dstLength);
|
||||
final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||
public int read(int position, ByteBuffer dst, int dstStart, int dstLength) throws IOException {
|
||||
final long srcAddress = this.address + position;
|
||||
if (dst.isDirect()) {
|
||||
final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart;
|
||||
PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength);
|
||||
|
@ -146,10 +131,9 @@ final class MappedFile implements AutoCloseable {
|
|||
* then the position is updated with the number of bytes actually read.
|
||||
*/
|
||||
public int read(ByteBuf dst, int dstStart, int dstLength) throws IOException {
|
||||
final int remaining = (int) Math.min(this.length - this.position, Integer.MAX_VALUE);
|
||||
final int remaining = this.length - this.position;
|
||||
final int read = Math.min(remaining, dstLength);
|
||||
final int bufferPosition = checkOffset(position, read);
|
||||
final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||
final long srcAddress = this.address + position;
|
||||
if (dst.hasMemoryAddress()) {
|
||||
final long dstAddress = dst.memoryAddress() + dstStart;
|
||||
PlatformDependent.copyMemory(srcAddress, dstAddress, read);
|
||||
|
@ -170,10 +154,9 @@ final class MappedFile implements AutoCloseable {
|
|||
* then the position is updated with the number of bytes actually read.
|
||||
*/
|
||||
public int read(ByteBuffer dst, int dstStart, int dstLength) throws IOException {
|
||||
final int remaining = (int) Math.min(this.length - this.position, Integer.MAX_VALUE);
|
||||
final int remaining = this.length - this.position;
|
||||
final int read = Math.min(remaining, dstLength);
|
||||
final int bufferPosition = checkOffset(position, read);
|
||||
final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||
final long srcAddress = this.address + position;
|
||||
if (dst.isDirect()) {
|
||||
final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart;
|
||||
PlatformDependent.copyMemory(srcAddress, dstAddress, read);
|
||||
|
@ -192,8 +175,7 @@ final class MappedFile implements AutoCloseable {
|
|||
*/
|
||||
public void write(EncodingSupport encodingSupport) throws IOException {
|
||||
final int encodedSize = encodingSupport.getEncodeSize();
|
||||
final int bufferPosition = checkOffset(position, encodedSize);
|
||||
this.byteBufWrapper.wrap(this.lastMapped, bufferPosition, encodedSize);
|
||||
this.byteBufWrapper.wrap(this.buffer, this.position, encodedSize);
|
||||
try {
|
||||
encodingSupport.encode(this.channelBufferWrapper);
|
||||
} finally {
|
||||
|
@ -211,8 +193,7 @@ final class MappedFile implements AutoCloseable {
|
|||
* <p> Bytes are written starting at this file's current position,
|
||||
*/
|
||||
public void write(ByteBuf src, int srcStart, int srcLength) throws IOException {
|
||||
final int bufferPosition = checkOffset(position, srcLength);
|
||||
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||
final long destAddress = this.address + position;
|
||||
if (src.hasMemoryAddress()) {
|
||||
final long srcAddress = src.memoryAddress() + srcStart;
|
||||
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
|
||||
|
@ -234,8 +215,7 @@ final class MappedFile implements AutoCloseable {
|
|||
* <p> Bytes are written starting at this file's current position,
|
||||
*/
|
||||
public void write(ByteBuffer src, int srcStart, int srcLength) throws IOException {
|
||||
final int bufferPosition = checkOffset(position, srcLength);
|
||||
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||
final long destAddress = this.address + position;
|
||||
if (src.isDirect()) {
|
||||
final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart;
|
||||
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
|
||||
|
@ -254,9 +234,8 @@ final class MappedFile implements AutoCloseable {
|
|||
* <p>
|
||||
* <p> Bytes are written starting at this file's specified position,
|
||||
*/
|
||||
public void write(long position, ByteBuf src, int srcStart, int srcLength) throws IOException {
|
||||
final int bufferPosition = checkOffset(position, srcLength);
|
||||
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||
public void write(int position, ByteBuf src, int srcStart, int srcLength) throws IOException {
|
||||
final long destAddress = this.address + position;
|
||||
if (src.hasMemoryAddress()) {
|
||||
final long srcAddress = src.memoryAddress() + srcStart;
|
||||
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
|
||||
|
@ -277,9 +256,8 @@ final class MappedFile implements AutoCloseable {
|
|||
* <p>
|
||||
* <p> Bytes are written starting at this file's specified position,
|
||||
*/
|
||||
public void write(long position, ByteBuffer src, int srcStart, int srcLength) throws IOException {
|
||||
final int bufferPosition = checkOffset(position, srcLength);
|
||||
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||
public void write(int position, ByteBuffer src, int srcStart, int srcLength) throws IOException {
|
||||
final long destAddress = this.address + position;
|
||||
if (src.isDirect()) {
|
||||
final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart;
|
||||
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
|
||||
|
@ -298,32 +276,47 @@ final class MappedFile implements AutoCloseable {
|
|||
* <p>
|
||||
* <p> Bytes are written starting at this file's current position,
|
||||
*/
|
||||
public void zeros(long offset, int count) throws IOException {
|
||||
while (count > 0) {
|
||||
//do not need to validate the bytes count
|
||||
final int bufferPosition = checkOffset(offset, 0);
|
||||
final int endZerosPosition = (int)Math.min((long)bufferPosition + count, lastMapped.capacity());
|
||||
final int zeros = endZerosPosition - bufferPosition;
|
||||
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||
PlatformDependent.setMemory(destAddress, zeros, (byte) 0);
|
||||
offset += zeros;
|
||||
count -= zeros;
|
||||
//TODO need to call force on each write?
|
||||
//this.force();
|
||||
public void zeros(int position, final int count) throws IOException {
|
||||
//zeroes memory in reverse direction in OS_PAGE_SIZE batches
|
||||
//to gain sympathy by the page cache LRU policy
|
||||
final long start = this.address + position;
|
||||
final long end = start + count;
|
||||
int toZeros = count;
|
||||
final long lastGap = (int) (end & (OS_PAGE_SIZE - 1));
|
||||
final long lastStartPage = end - lastGap;
|
||||
long lastZeroed = end;
|
||||
if (start <= lastStartPage) {
|
||||
if (lastGap > 0) {
|
||||
PlatformDependent.setMemory(lastStartPage, lastGap, (byte) 0);
|
||||
lastZeroed = lastStartPage;
|
||||
toZeros -= lastGap;
|
||||
}
|
||||
}
|
||||
if (offset > this.length) {
|
||||
this.length = offset;
|
||||
//any that will enter has lastZeroed OS page aligned
|
||||
while (toZeros >= OS_PAGE_SIZE) {
|
||||
assert BytesUtils.isAligned(lastZeroed, OS_PAGE_SIZE);/**/
|
||||
final long startPage = lastZeroed - OS_PAGE_SIZE;
|
||||
PlatformDependent.setMemory(startPage, OS_PAGE_SIZE, (byte) 0);
|
||||
lastZeroed = startPage;
|
||||
toZeros -= OS_PAGE_SIZE;
|
||||
}
|
||||
//there is anything left in the first OS page?
|
||||
if (toZeros > 0) {
|
||||
PlatformDependent.setMemory(start, toZeros, (byte) 0);
|
||||
}
|
||||
|
||||
position += count;
|
||||
if (position > this.length) {
|
||||
this.length = position;
|
||||
}
|
||||
}
|
||||
|
||||
public long position() {
|
||||
public int position() {
|
||||
return position;
|
||||
}
|
||||
|
||||
public long position(long newPosition) {
|
||||
final long oldPosition = this.position;
|
||||
this.position = newPosition;
|
||||
return oldPosition;
|
||||
public void position(int position) {
|
||||
this.position = position;
|
||||
}
|
||||
|
||||
public long length() {
|
||||
|
@ -332,10 +325,13 @@ final class MappedFile implements AutoCloseable {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
cache.close();
|
||||
}
|
||||
|
||||
public void closeAndResize(long length) {
|
||||
cache.closeAndResize(length);
|
||||
try {
|
||||
channel.close();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
} finally {
|
||||
//unmap in a deterministic way: do not rely on GC to do it
|
||||
PlatformDependent.freeDirectBuffer(this.buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,32 +38,37 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
|||
final class MappedSequentialFile implements SequentialFile {
|
||||
|
||||
private final File directory;
|
||||
private final long chunkBytes;
|
||||
private final long overlapBytes;
|
||||
private final IOCriticalErrorListener criticalErrorListener;
|
||||
private final MappedSequentialFileFactory factory;
|
||||
private File file;
|
||||
private File absoluteFile;
|
||||
private String fileName;
|
||||
private MappedFile mappedFile;
|
||||
private int capacity;
|
||||
|
||||
MappedSequentialFile(MappedSequentialFileFactory factory,
|
||||
final File directory,
|
||||
final File file,
|
||||
final long chunkBytes,
|
||||
final long overlapBytes,
|
||||
final int capacity,
|
||||
final IOCriticalErrorListener criticalErrorListener) {
|
||||
this.factory = factory;
|
||||
this.directory = directory;
|
||||
this.file = file;
|
||||
this.absoluteFile = null;
|
||||
this.fileName = null;
|
||||
this.chunkBytes = chunkBytes;
|
||||
this.overlapBytes = overlapBytes;
|
||||
this.capacity = capacity;
|
||||
this.mappedFile = null;
|
||||
this.criticalErrorListener = criticalErrorListener;
|
||||
}
|
||||
|
||||
public MappedFile mappedFile() {
|
||||
return mappedFile;
|
||||
}
|
||||
|
||||
public int capacity() {
|
||||
return this.capacity;
|
||||
}
|
||||
|
||||
private void checkIsOpen() {
|
||||
if (!isOpen()) {
|
||||
throw new IllegalStateException("File not opened!");
|
||||
|
@ -95,7 +100,7 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
@Override
|
||||
public void open() throws IOException {
|
||||
if (this.mappedFile == null) {
|
||||
this.mappedFile = MappedFile.of(file, chunkBytes, overlapBytes);
|
||||
this.mappedFile = MappedFile.of(this.file, 0, this.capacity);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,7 +134,11 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
@Override
|
||||
public void fill(int size) throws IOException {
|
||||
checkIsOpen();
|
||||
//the fill will give a big performance hit when done in parallel of other writings!
|
||||
this.mappedFile.zeros(this.mappedFile.position(), size);
|
||||
if (factory.isDatasync()) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -214,11 +223,11 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
|
||||
@Override
|
||||
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
|
||||
if (callback == null) {
|
||||
throw new NullPointerException("callback parameter need to be set");
|
||||
}
|
||||
checkIsOpen(callback);
|
||||
try {
|
||||
if (callback == null) {
|
||||
throw new NullPointerException("callback parameter need to be set");
|
||||
}
|
||||
checkIsOpen(callback);
|
||||
final int position = bytes.position();
|
||||
final int limit = bytes.limit();
|
||||
final int remaining = limit - position;
|
||||
|
@ -237,22 +246,28 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
}
|
||||
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
this.factory.releaseBuffer(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeDirect(ByteBuffer bytes, boolean sync) throws IOException {
|
||||
checkIsOpen();
|
||||
final int position = bytes.position();
|
||||
final int limit = bytes.limit();
|
||||
final int remaining = limit - position;
|
||||
if (remaining > 0) {
|
||||
this.mappedFile.write(bytes, position, remaining);
|
||||
final int newPosition = position + remaining;
|
||||
bytes.position(newPosition);
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
try {
|
||||
checkIsOpen();
|
||||
final int position = bytes.position();
|
||||
final int limit = bytes.limit();
|
||||
final int remaining = limit - position;
|
||||
if (remaining > 0) {
|
||||
this.mappedFile.write(bytes, position, remaining);
|
||||
final int newPosition = position + remaining;
|
||||
bytes.position(newPosition);
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.factory.releaseBuffer(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -304,8 +319,11 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
|
||||
@Override
|
||||
public void position(long pos) {
|
||||
if (pos > Integer.MAX_VALUE) {
|
||||
throw new IllegalArgumentException("pos must be < " + Integer.MAX_VALUE);
|
||||
}
|
||||
checkIsOpen();
|
||||
this.mappedFile.position(pos);
|
||||
this.mappedFile.position((int) pos);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -317,7 +335,7 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
@Override
|
||||
public void close() {
|
||||
if (this.mappedFile != null) {
|
||||
this.mappedFile.closeAndResize(this.mappedFile.length());
|
||||
this.mappedFile.close();
|
||||
this.mappedFile = null;
|
||||
}
|
||||
}
|
||||
|
@ -325,7 +343,9 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
@Override
|
||||
public void sync() throws IOException {
|
||||
checkIsOpen();
|
||||
this.mappedFile.force();
|
||||
if (factory.isDatasync()) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -363,9 +383,9 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
}
|
||||
|
||||
@Override
|
||||
public SequentialFile cloneFile() {
|
||||
public MappedSequentialFile cloneFile() {
|
||||
checkIsNotOpen();
|
||||
return new MappedSequentialFile(factory, this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener);
|
||||
return new MappedSequentialFile(this.factory, this.directory, this.file, this.capacity, this.criticalErrorListener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -404,4 +424,4 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
}
|
||||
return this.absoluteFile;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -29,62 +29,64 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
|||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
||||
import org.apache.activemq.artemis.utils.Env;
|
||||
|
||||
public final class MappedSequentialFileFactory implements SequentialFileFactory {
|
||||
|
||||
private static long DEFAULT_BLOCK_SIZE = 64L << 20;
|
||||
private final File directory;
|
||||
private int capacity;
|
||||
private final IOCriticalErrorListener criticalErrorListener;
|
||||
private final TimedBuffer timedBuffer;
|
||||
private long chunkBytes;
|
||||
private long overlapBytes;
|
||||
private boolean useDataSync;
|
||||
private boolean supportCallbacks;
|
||||
private boolean bufferPooling;
|
||||
//pools only the biggest one -> optimized for the common case
|
||||
private final ThreadLocal<ByteBuffer> bytesPool;
|
||||
|
||||
protected volatile int alignment = -1;
|
||||
|
||||
public MappedSequentialFileFactory(File directory,
|
||||
IOCriticalErrorListener criticalErrorListener,
|
||||
boolean supportCallbacks) {
|
||||
private MappedSequentialFileFactory(File directory,
|
||||
int capacity,
|
||||
final boolean buffered,
|
||||
final int bufferSize,
|
||||
final int bufferTimeout,
|
||||
IOCriticalErrorListener criticalErrorListener) {
|
||||
this.directory = directory;
|
||||
this.capacity = capacity;
|
||||
this.criticalErrorListener = criticalErrorListener;
|
||||
this.chunkBytes = DEFAULT_BLOCK_SIZE;
|
||||
this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
|
||||
this.useDataSync = true;
|
||||
this.timedBuffer = null;
|
||||
this.supportCallbacks = supportCallbacks;
|
||||
if (buffered && bufferTimeout > 0 && bufferSize > 0) {
|
||||
timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, false);
|
||||
} else {
|
||||
timedBuffer = null;
|
||||
}
|
||||
this.bufferPooling = true;
|
||||
this.bytesPool = new ThreadLocal<>();
|
||||
}
|
||||
|
||||
public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
|
||||
this(directory, criticalErrorListener, false);
|
||||
}
|
||||
|
||||
public MappedSequentialFileFactory(File directory) {
|
||||
this(directory, null);
|
||||
}
|
||||
|
||||
|
||||
public long chunkBytes() {
|
||||
return chunkBytes;
|
||||
}
|
||||
|
||||
public MappedSequentialFileFactory chunkBytes(long chunkBytes) {
|
||||
this.chunkBytes = chunkBytes;
|
||||
public MappedSequentialFileFactory capacity(int capacity) {
|
||||
this.capacity = capacity;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long overlapBytes() {
|
||||
return overlapBytes;
|
||||
public int capacity() {
|
||||
return capacity;
|
||||
}
|
||||
|
||||
public MappedSequentialFileFactory overlapBytes(long overlapBytes) {
|
||||
this.overlapBytes = overlapBytes;
|
||||
return this;
|
||||
public static MappedSequentialFileFactory buffered(File directory,
|
||||
int capacity,
|
||||
final int bufferSize,
|
||||
final int bufferTimeout,
|
||||
IOCriticalErrorListener criticalErrorListener) {
|
||||
return new MappedSequentialFileFactory(directory, capacity, true, bufferSize, bufferTimeout, criticalErrorListener);
|
||||
}
|
||||
|
||||
public static MappedSequentialFileFactory unbuffered(File directory,
|
||||
int capacity,
|
||||
IOCriticalErrorListener criticalErrorListener) {
|
||||
return new MappedSequentialFileFactory(directory, capacity, false, 0, 0, criticalErrorListener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SequentialFile createSequentialFile(String fileName) {
|
||||
final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
|
||||
final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), capacity, criticalErrorListener);
|
||||
if (this.timedBuffer == null) {
|
||||
return mappedSequentialFile;
|
||||
} else {
|
||||
|
@ -93,7 +95,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public SequentialFileFactory setDatasync(boolean enabled) {
|
||||
public MappedSequentialFileFactory setDatasync(boolean enabled) {
|
||||
this.useDataSync = enabled;
|
||||
return this;
|
||||
}
|
||||
|
@ -120,7 +122,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
|
||||
@Override
|
||||
public boolean isSupportsCallbacks() {
|
||||
return this.supportCallbacks;
|
||||
return timedBuffer != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -132,23 +134,65 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
|
||||
@Override
|
||||
public ByteBuffer allocateDirectBuffer(final int size) {
|
||||
return ByteBuffer.allocateDirect(size);
|
||||
final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
|
||||
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
|
||||
byteBuffer.limit(size);
|
||||
return byteBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseDirectBuffer(final ByteBuffer buffer) {
|
||||
public void releaseDirectBuffer(ByteBuffer buffer) {
|
||||
PlatformDependent.freeDirectBuffer(buffer);
|
||||
}
|
||||
|
||||
public MappedSequentialFileFactory enableBufferReuse() {
|
||||
this.bufferPooling = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MappedSequentialFileFactory disableBufferReuse() {
|
||||
this.bufferPooling = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer newBuffer(final int size) {
|
||||
return ByteBuffer.allocate(size);
|
||||
if (!this.bufferPooling) {
|
||||
return allocateDirectBuffer(size);
|
||||
} else {
|
||||
final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
|
||||
ByteBuffer byteBuffer = bytesPool.get();
|
||||
if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
|
||||
//do not free the old one (if any) until the new one will be released into the pool!
|
||||
byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
|
||||
} else {
|
||||
bytesPool.set(null);
|
||||
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0);
|
||||
byteBuffer.clear();
|
||||
}
|
||||
byteBuffer.limit(size);
|
||||
return byteBuffer;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseBuffer(ByteBuffer buffer) {
|
||||
if (buffer.isDirect()) {
|
||||
PlatformDependent.freeDirectBuffer(buffer);
|
||||
if (this.bufferPooling) {
|
||||
if (buffer.isDirect()) {
|
||||
final ByteBuffer byteBuffer = bytesPool.get();
|
||||
if (byteBuffer != buffer) {
|
||||
//replace with the current pooled only if greater or null
|
||||
if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
|
||||
if (byteBuffer != null) {
|
||||
//free the smaller one
|
||||
PlatformDependent.freeDirectBuffer(byteBuffer);
|
||||
}
|
||||
bytesPool.set(buffer);
|
||||
} else {
|
||||
PlatformDependent.freeDirectBuffer(buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -179,9 +223,9 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public MappedSequentialFileFactory setAlignment(int alignment) {
|
||||
this.alignment = alignment;
|
||||
return this;
|
||||
throw new UnsupportedOperationException("alignment can't be changed!");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -203,7 +247,6 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
//SIMD OPTIMIZATION
|
||||
Arrays.fill(array, (byte) 0);
|
||||
} else {
|
||||
//TODO VERIFY IF IT COULD HAPPENS
|
||||
final int capacity = buffer.capacity();
|
||||
for (int i = 0; i < capacity; i++) {
|
||||
buffer.put(i, (byte) 0);
|
||||
|
|
|
@ -20,14 +20,10 @@ package org.apache.activemq.artemis.core.io.mapped;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.core.io.DummyCallback;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
|
@ -35,6 +31,7 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
|||
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
||||
import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
|
||||
final class TimedSequentialFile implements SequentialFile {
|
||||
|
@ -42,14 +39,12 @@ final class TimedSequentialFile implements SequentialFile {
|
|||
private final SequentialFileFactory factory;
|
||||
private final SequentialFile sequentialFile;
|
||||
private final LocalBufferObserver observer;
|
||||
private final ThreadLocal<ResettableIOCallback> callbackPool;
|
||||
private TimedBuffer timedBuffer;
|
||||
|
||||
TimedSequentialFile(SequentialFileFactory factory, SequentialFile sequentialFile) {
|
||||
this.sequentialFile = sequentialFile;
|
||||
this.factory = factory;
|
||||
this.observer = new LocalBufferObserver();
|
||||
this.callbackPool = ThreadLocal.withInitial(ResettableIOCallback::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -114,13 +109,10 @@ final class TimedSequentialFile implements SequentialFile {
|
|||
public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
|
||||
if (sync) {
|
||||
if (this.timedBuffer != null) {
|
||||
final ResettableIOCallback callback = callbackPool.get();
|
||||
try {
|
||||
this.timedBuffer.addBytes(bytes, true, callback);
|
||||
callback.waitCompletion();
|
||||
} finally {
|
||||
callback.reset();
|
||||
}
|
||||
//the only way to avoid allocations is by using a lock-free pooled callback -> CyclicBarrier allocates on each new Generation!!!
|
||||
final SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
|
||||
this.timedBuffer.addBytes(bytes, true, callback);
|
||||
callback.waitCompletion();
|
||||
} else {
|
||||
this.sequentialFile.write(bytes, true);
|
||||
}
|
||||
|
@ -146,13 +138,10 @@ final class TimedSequentialFile implements SequentialFile {
|
|||
public void write(EncodingSupport bytes, boolean sync) throws Exception {
|
||||
if (sync) {
|
||||
if (this.timedBuffer != null) {
|
||||
final ResettableIOCallback callback = callbackPool.get();
|
||||
try {
|
||||
this.timedBuffer.addBytes(bytes, true, callback);
|
||||
callback.waitCompletion();
|
||||
} finally {
|
||||
callback.reset();
|
||||
}
|
||||
//the only way to avoid allocations is by using a lock-free pooled callback -> CyclicBarrier allocates on each new Generation!!!
|
||||
final SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
|
||||
this.timedBuffer.addBytes(bytes, true, callback);
|
||||
callback.waitCompletion();
|
||||
} else {
|
||||
this.sequentialFile.write(bytes, true);
|
||||
}
|
||||
|
@ -197,7 +186,11 @@ final class TimedSequentialFile implements SequentialFile {
|
|||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
this.sequentialFile.close();
|
||||
try {
|
||||
this.sequentialFile.close();
|
||||
} finally {
|
||||
this.timedBuffer = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -241,128 +234,99 @@ final class TimedSequentialFile implements SequentialFile {
|
|||
return this.sequentialFile.getJavaFile();
|
||||
}
|
||||
|
||||
private static final class ResettableIOCallback implements IOCallback {
|
||||
|
||||
private final CyclicBarrier cyclicBarrier;
|
||||
private int errorCode;
|
||||
private String errorMessage;
|
||||
|
||||
ResettableIOCallback() {
|
||||
this.cyclicBarrier = new CyclicBarrier(2);
|
||||
}
|
||||
|
||||
public void waitCompletion() throws InterruptedException, ActiveMQException, BrokenBarrierException {
|
||||
this.cyclicBarrier.await();
|
||||
if (this.errorMessage != null) {
|
||||
throw ActiveMQExceptionType.createException(this.errorCode, this.errorMessage);
|
||||
private static void invokeDoneOn(List<? extends IOCallback> callbacks) {
|
||||
final int size = callbacks.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
try {
|
||||
final IOCallback callback = callbacks.get(i);
|
||||
callback.done();
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
this.errorCode = 0;
|
||||
this.errorMessage = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done() {
|
||||
private static void invokeOnErrorOn(final int errorCode,
|
||||
final String errorMessage,
|
||||
List<? extends IOCallback> callbacks) {
|
||||
final int size = callbacks.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
try {
|
||||
this.cyclicBarrier.await();
|
||||
} catch (BrokenBarrierException | InterruptedException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
try {
|
||||
this.errorCode = errorCode;
|
||||
this.errorMessage = errorMessage;
|
||||
this.cyclicBarrier.await();
|
||||
} catch (BrokenBarrierException | InterruptedException e) {
|
||||
throw new IllegalStateException(e);
|
||||
final IOCallback callback = callbacks.get(i);
|
||||
callback.onError(errorCode, errorMessage);
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class DelegateCallback implements IOCallback {
|
||||
|
||||
final List<IOCallback> delegates;
|
||||
List<IOCallback> delegates;
|
||||
|
||||
private DelegateCallback() {
|
||||
this.delegates = new ArrayList<>();
|
||||
}
|
||||
|
||||
public List<IOCallback> delegates() {
|
||||
return this.delegates;
|
||||
this.delegates = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void done() {
|
||||
final int size = delegates.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
try {
|
||||
final IOCallback callback = delegates.get(i);
|
||||
callback.done();
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
|
||||
}
|
||||
}
|
||||
invokeDoneOn(delegates);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final int errorCode, final String errorMessage) {
|
||||
for (IOCallback callback : delegates) {
|
||||
try {
|
||||
callback.onError(errorCode, errorMessage);
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
|
||||
}
|
||||
}
|
||||
invokeOnErrorOn(errorCode, errorMessage, delegates);
|
||||
}
|
||||
}
|
||||
|
||||
private final class LocalBufferObserver implements TimedBufferObserver {
|
||||
|
||||
private final ThreadLocal<DelegateCallback> callbacksPool = ThreadLocal.withInitial(DelegateCallback::new);
|
||||
private final DelegateCallback delegateCallback = new DelegateCallback();
|
||||
|
||||
@Override
|
||||
public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) {
|
||||
buffer.flip();
|
||||
|
||||
if (buffer.limit() == 0) {
|
||||
//if there are no bytes to flush, can release the callbacks
|
||||
final int size = callbacks.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
callbacks.get(i).done();
|
||||
try {
|
||||
invokeDoneOn(callbacks);
|
||||
} finally {
|
||||
factory.releaseBuffer(buffer);
|
||||
}
|
||||
} else {
|
||||
final DelegateCallback delegateCallback = callbacksPool.get();
|
||||
final int size = callbacks.size();
|
||||
final List<IOCallback> delegates = delegateCallback.delegates();
|
||||
for (int i = 0; i < size; i++) {
|
||||
delegates.add(callbacks.get(i));
|
||||
}
|
||||
try {
|
||||
sequentialFile.writeDirect(buffer, requestedSync, delegateCallback);
|
||||
} finally {
|
||||
delegates.clear();
|
||||
if (callbacks.isEmpty()) {
|
||||
try {
|
||||
sequentialFile.writeDirect(buffer, requestedSync);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
} else {
|
||||
delegateCallback.delegates = callbacks;
|
||||
try {
|
||||
sequentialFile.writeDirect(buffer, requestedSync, delegateCallback);
|
||||
} finally {
|
||||
delegateCallback.delegates = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer newBuffer(final int size, final int limit) {
|
||||
final int alignedSize = factory.calculateBlockSize(size);
|
||||
final int alignedLimit = factory.calculateBlockSize(limit);
|
||||
final ByteBuffer buffer = factory.newBuffer(alignedSize);
|
||||
buffer.limit(alignedLimit);
|
||||
return buffer;
|
||||
return factory.newBuffer(limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemainingBytes() {
|
||||
try {
|
||||
final int remaining = (int) Math.min(sequentialFile.size() - sequentialFile.position(), Integer.MAX_VALUE);
|
||||
return remaining;
|
||||
final long position = sequentialFile.position();
|
||||
final long size = sequentialFile.size();
|
||||
final long remaining = size - position;
|
||||
if (remaining > Integer.MAX_VALUE) {
|
||||
return Integer.MAX_VALUE;
|
||||
} else {
|
||||
return (int) remaining;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
@ -370,7 +334,7 @@ final class TimedSequentialFile implements SequentialFile {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TimedBufferObserver on file (" + getFileName() + ")";
|
||||
return "TimedBufferObserver on file (" + sequentialFile.getFileName() + ")";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
|||
import org.apache.activemq.artemis.core.journal.Journal;
|
||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||
|
||||
/**
|
||||
|
@ -47,12 +46,12 @@ public class JournalTptBenchmark {
|
|||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final boolean useDefaultIoExecutor = true;
|
||||
final int fileSize = 1024 * 1024;
|
||||
final boolean dataSync = true;
|
||||
final int fileSize = 10 * 1024 * 1024;
|
||||
final boolean dataSync = false;
|
||||
final Type type = Type.Mapped;
|
||||
final int tests = 5;
|
||||
final int tests = 10;
|
||||
final int warmup = 20_000;
|
||||
final int measurements = 20_000;
|
||||
final int measurements = 100_000;
|
||||
final int msgSize = 100;
|
||||
final byte[] msgContent = new byte[msgSize];
|
||||
Arrays.fill(msgContent, (byte) 1);
|
||||
|
@ -63,8 +62,8 @@ public class JournalTptBenchmark {
|
|||
switch (type) {
|
||||
|
||||
case Mapped:
|
||||
final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
|
||||
factory = mappedFactory.chunkBytes(fileSize).overlapBytes(0).setDatasync(dataSync);
|
||||
factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null)
|
||||
.setDatasync(dataSync);
|
||||
break;
|
||||
case Nio:
|
||||
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
|
||||
|
@ -195,9 +194,7 @@ public class JournalTptBenchmark {
|
|||
|
||||
private static void write(long id, Journal journal, EncodingSupport encodingSupport) throws Exception {
|
||||
journal.appendAddRecord(id, (byte) 1, encodingSupport, false);
|
||||
final SimpleWaitIOCallback ioCallback = new SimpleWaitIOCallback();
|
||||
journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true, ioCallback);
|
||||
ioCallback.waitCompletion();
|
||||
journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true);
|
||||
}
|
||||
|
||||
private enum Type {
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.io.File;
|
|||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
||||
import org.apache.activemq.artemis.ArtemisConstants;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
|
@ -41,12 +40,12 @@ public class SequentialFileTptBenchmark {
|
|||
private static final FastWaitIOCallback CALLBACK = new FastWaitIOCallback();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final boolean dataSync = true;
|
||||
final boolean dataSync = false;
|
||||
final boolean writeSync = true;
|
||||
final Type type = Type.Mapped;
|
||||
final int tests = 10;
|
||||
final int warmup = 20_000;
|
||||
final int measurements = 20_000;
|
||||
final int measurements = 100_000;
|
||||
final int msgSize = 100;
|
||||
final byte[] msgContent = new byte[msgSize];
|
||||
Arrays.fill(msgContent, (byte) 1);
|
||||
|
@ -56,10 +55,8 @@ public class SequentialFileTptBenchmark {
|
|||
switch (type) {
|
||||
|
||||
case Mapped:
|
||||
final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
|
||||
final int alignedMessageSize = mappedFactory.calculateBlockSize(msgSize);
|
||||
final int totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
|
||||
factory = mappedFactory.chunkBytes(totalFileSize).overlapBytes(0).setDatasync(dataSync);
|
||||
final int fileSize = Math.max(msgSize * measurements, msgSize * warmup);
|
||||
factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync);
|
||||
break;
|
||||
case Nio:
|
||||
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
|
||||
|
@ -147,9 +144,9 @@ public class SequentialFileTptBenchmark {
|
|||
boolean sync) throws Exception {
|
||||
//this pattern is necessary to ensure that NIO's TimedBuffer fill flush the buffer and know the real size of it
|
||||
if (sequentialFile.fits(encodingSupport.getEncodeSize())) {
|
||||
final FastWaitIOCallback ioCallback = CALLBACK.reset();
|
||||
sequentialFile.write(encodingSupport, sync, ioCallback);
|
||||
ioCallback.waitCompletion();
|
||||
CALLBACK.reset();
|
||||
sequentialFile.write(encodingSupport, sync, CALLBACK);
|
||||
CALLBACK.waitCompletion();
|
||||
} else {
|
||||
throw new IllegalStateException("can't happen!");
|
||||
}
|
||||
|
@ -189,11 +186,7 @@ public class SequentialFileTptBenchmark {
|
|||
}
|
||||
|
||||
public void waitCompletion() throws InterruptedException, ActiveMQException {
|
||||
final Thread currentThread = Thread.currentThread();
|
||||
while (!done.get()) {
|
||||
LockSupport.parkNanos(1L);
|
||||
if (currentThread.isInterrupted())
|
||||
throw new InterruptedException();
|
||||
}
|
||||
if (errorMessage != null) {
|
||||
throw ActiveMQExceptionType.createException(errorCode, errorMessage);
|
||||
|
|
|
@ -137,8 +137,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
break;
|
||||
case MAPPED:
|
||||
ActiveMQServerLogger.LOGGER.journalUseMAPPED();
|
||||
//the mapped version do not need buffering by default
|
||||
journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), criticalErrorListener, true).chunkBytes(config.getJournalFileSize()).overlapBytes(0);
|
||||
journalFF = MappedSequentialFileFactory.buffered(config.getJournalLocation(), config.getJournalFileSize(), config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), criticalErrorListener);
|
||||
break;
|
||||
default:
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
|
||||
|
|
|
@ -223,34 +223,6 @@
|
|||
<artifactId>jbossjts-jacorb</artifactId>
|
||||
<version>4.17.13.Final</version>
|
||||
</dependency>
|
||||
|
||||
<!-- ### Benchmark Tools -->
|
||||
<!-- ### Java Latency Benchmarking Harness -->
|
||||
<dependency>
|
||||
<groupId>net.openhft</groupId>
|
||||
<artifactId>chronicle-core</artifactId>
|
||||
<version>${openhft.core.version}</version>
|
||||
<!-- License: Apache 2.0 -->
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.openhft</groupId>
|
||||
<artifactId>affinity</artifactId>
|
||||
<version>${openhft.affinity.version}</version>
|
||||
<!-- License: LGPLv3-->
|
||||
</dependency>
|
||||
<!-- ### Java Microbenchmark Harness -->
|
||||
<dependency>
|
||||
<groupId>org.openjdk.jmh</groupId>
|
||||
<artifactId>jmh-core</artifactId>
|
||||
<version>${openjdk.jmh.version}</version>
|
||||
<!-- License: GPLv2-->
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.openjdk.jmh</groupId>
|
||||
<artifactId>jmh-generator-annprocess</artifactId>
|
||||
<version>${openjdk.jmh.version}</version>
|
||||
<!-- License: GPLv2-->
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -1,153 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.extras.benchmarks.journal;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import net.openhft.chronicle.core.jlbh.JLBH;
|
||||
import net.openhft.chronicle.core.jlbh.JLBHOptions;
|
||||
import net.openhft.chronicle.core.jlbh.JLBHTask;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.Journal;
|
||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||
|
||||
public class JournalImplLatencyBench implements JLBHTask {
|
||||
|
||||
private static final int FILE_SIZE = 1024 * 1024 * 1024;
|
||||
private static final JournalType JOURNAL_TYPE = JournalType.MAPPED;
|
||||
private static final int ITERATIONS = 100_000;
|
||||
private static final int WARMUP_ITERATIONS = 20_000;
|
||||
private static final int TARGET_THROUGHPUT = 50_000;
|
||||
private static final int TESTS = 5;
|
||||
private static int TOTAL_MESSAGES = (ITERATIONS * TESTS + WARMUP_ITERATIONS);
|
||||
private static int ENCODED_SIZE = 8;
|
||||
private static int CHUNK_BYTES = FILE_SIZE;
|
||||
private static int OVERLAP_BYTES = CHUNK_BYTES / 4;
|
||||
private final SequentialFileFactory sequentialFileFactory;
|
||||
private Journal journal;
|
||||
private EncodingSupport encodingSupport;
|
||||
private JLBH jlbh;
|
||||
private long id;
|
||||
|
||||
public JournalImplLatencyBench(SequentialFileFactory sequentialFileFactory) {
|
||||
this.sequentialFileFactory = sequentialFileFactory;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
final File journalDir = Files.createTempDirectory("seq_files").toFile();
|
||||
journalDir.deleteOnExit();
|
||||
final boolean buffered = false;
|
||||
final int bufferSize = 4096;
|
||||
final int bufferTimeout = 0;
|
||||
final int maxIO = -1;
|
||||
final boolean logRates = false;
|
||||
final IOCriticalErrorListener criticalErrorListener = null;
|
||||
final SequentialFileFactory sequentialFileFactory;
|
||||
switch (JOURNAL_TYPE) {
|
||||
case MAPPED:
|
||||
sequentialFileFactory = new MappedSequentialFileFactory(journalDir, criticalErrorListener).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES);
|
||||
break;
|
||||
case NIO:
|
||||
sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new AssertionError("!?");
|
||||
}
|
||||
final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new JournalImplLatencyBench(sequentialFileFactory));
|
||||
new JLBH(lth).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(JLBH jlbh) {
|
||||
id = 0;
|
||||
this.jlbh = jlbh;
|
||||
int numFiles = (int) ((TOTAL_MESSAGES * 1024 + 512) / FILE_SIZE * 1.3);
|
||||
if (numFiles < 2) {
|
||||
numFiles = 2;
|
||||
}
|
||||
this.journal = new JournalImpl(FILE_SIZE, numFiles, numFiles, 0, 0, sequentialFileFactory, "activemq-data", "amq", Integer.MAX_VALUE);
|
||||
this.encodingSupport = NilEncodingSupport.Instance;
|
||||
try {
|
||||
journal.start();
|
||||
journal.load(new ArrayList<RecordInfo>(), null, null);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(long startTimeNS) {
|
||||
id++;
|
||||
try {
|
||||
journal.appendAddRecord(id, (byte) 0, encodingSupport, false);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
jlbh.sample(System.nanoTime() - startTimeNS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete() {
|
||||
try {
|
||||
journal.stop();
|
||||
for (File journalFile : sequentialFileFactory.getDirectory().listFiles()) {
|
||||
journalFile.deleteOnExit();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private enum JournalType {
|
||||
MAPPED,
|
||||
NIO
|
||||
}
|
||||
|
||||
private enum NilEncodingSupport implements EncodingSupport {
|
||||
Instance;
|
||||
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
return ENCODED_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer) {
|
||||
final int writerIndex = buffer.writerIndex();
|
||||
for (int i = 0; i < ENCODED_SIZE; i++) {
|
||||
buffer.writeByte((byte) 0);
|
||||
}
|
||||
buffer.writerIndex(writerIndex + ENCODED_SIZE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buffer) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,105 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
/**
|
||||
* IT IS NOT A FLYWEIGHT BUT AN ENCODER: NEED TO RESPECT THE SEQUENCE OF WRITE:
|
||||
* FileId<CompactCount<Id<RecordType<RecordBytes
|
||||
*/
|
||||
final class AddJournalRecordEncoder {
|
||||
|
||||
private static final int FILE_ID_OFFSET = 0;
|
||||
private static final int COMPACT_COUNT_OFFSET = FILE_ID_OFFSET + 4;
|
||||
private static final int ID_OFFSET = COMPACT_COUNT_OFFSET + 4;
|
||||
private static final int RECORD_TYPE_OFFSET = ID_OFFSET + 8;
|
||||
public static final int BLOCK_SIZE = RECORD_TYPE_OFFSET + 4;
|
||||
|
||||
private ByteBuffer bytes;
|
||||
private int offset;
|
||||
private int limit;
|
||||
|
||||
public static int expectedSize(int recordBytes) {
|
||||
return BLOCK_SIZE + 4 + recordBytes;
|
||||
}
|
||||
|
||||
public ByteBuffer bytes() {
|
||||
return bytes;
|
||||
}
|
||||
|
||||
public int offset() {
|
||||
return this.offset;
|
||||
}
|
||||
|
||||
public int limit() {
|
||||
return this.limit;
|
||||
}
|
||||
|
||||
public void limit(int limit) {
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
public AddJournalRecordEncoder on(ByteBuffer bytes, int offset) {
|
||||
this.bytes = bytes;
|
||||
this.offset = offset;
|
||||
this.limit = offset + BLOCK_SIZE;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AddJournalRecordEncoder fileId(int value) {
|
||||
this.bytes.putInt(offset + FILE_ID_OFFSET, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AddJournalRecordEncoder compactCount(int value) {
|
||||
this.bytes.putInt(offset + COMPACT_COUNT_OFFSET, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AddJournalRecordEncoder id(long value) {
|
||||
this.bytes.putLong(offset + ID_OFFSET, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AddJournalRecordEncoder recordType(int value) {
|
||||
this.bytes.putLong(offset + RECORD_TYPE_OFFSET, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AddJournalRecordEncoder noRecord() {
|
||||
this.bytes.putInt(this.limit, 0);
|
||||
this.limit += 4;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AddJournalRecordEncoder record(final ByteBuffer recordBytes, final int recordOffset, final int recordLength) {
|
||||
this.bytes.putInt(this.limit, recordLength);
|
||||
final long dstAddr = PlatformDependent.directBufferAddress(bytes) + this.limit + 4;
|
||||
final long srcAddr = PlatformDependent.directBufferAddress(recordBytes) + recordOffset;
|
||||
PlatformDependent.copyMemory(srcAddr, dstAddr, recordLength);
|
||||
this.limit += (4 + recordLength);
|
||||
return this;
|
||||
}
|
||||
|
||||
public int encodedLength() {
|
||||
return this.limit - this.offset;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,114 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.buffer.Unpooled;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
import org.apache.activemq.artemis.core.journal.EncoderPersister;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
|
||||
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.profile.GCProfiler;
|
||||
import org.openjdk.jmh.runner.Runner;
|
||||
import org.openjdk.jmh.runner.RunnerException;
|
||||
import org.openjdk.jmh.runner.options.Options;
|
||||
import org.openjdk.jmh.runner.options.OptionsBuilder;
|
||||
|
||||
@State(Scope.Thread)
|
||||
@BenchmarkMode(value = {Mode.Throughput, Mode.SampleTime})
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
public class EncodersBench {
|
||||
|
||||
private static final int expectedEncoderSize = JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(0);
|
||||
private JournalInternalRecord record;
|
||||
private ByteBuffer byteBuffer;
|
||||
private AddJournalRecordEncoder addJournalRecordEncoder;
|
||||
private ActiveMQBuffer outBuffer;
|
||||
|
||||
public static void main(String[] args) throws RunnerException {
|
||||
final Options opt = new OptionsBuilder().include(EncodersBench.class.getSimpleName()).addProfiler(GCProfiler.class).warmupIterations(5).measurementIterations(5).forks(1).build();
|
||||
new Runner(opt).run();
|
||||
}
|
||||
|
||||
@Setup
|
||||
public void init() {
|
||||
this.byteBuffer = ByteBuffer.allocateDirect(expectedEncoderSize);
|
||||
this.byteBuffer.order(ByteOrder.nativeOrder());
|
||||
this.addJournalRecordEncoder = new AddJournalRecordEncoder();
|
||||
|
||||
this.record = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance);
|
||||
this.record.setFileID(1);
|
||||
this.record.setCompactCount((short) 1);
|
||||
this.outBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(this.record.getEncodeSize(), this.record.getEncodeSize()).order(ByteOrder.nativeOrder()));
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public int encodeAligned() {
|
||||
//Header
|
||||
final long header = JournalRecordHeader.makeHeader(JournalRecordTypes.ADD_JOURNAL, expectedEncoderSize);
|
||||
this.byteBuffer.putLong(0, header);
|
||||
//FileId<CompactCount<Id<RecordType<RecordBytes
|
||||
return addJournalRecordEncoder.on(byteBuffer, JournalRecordHeader.BYTES).fileId(1).compactCount(1).id(1L).recordType(1).noRecord().encodedLength();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public int encodeUnaligned() {
|
||||
outBuffer.clear();
|
||||
record.encode(outBuffer);
|
||||
return record.getEncodeSize();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public int encodeUnalignedWithGarbage() {
|
||||
outBuffer.clear();
|
||||
final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance);
|
||||
addRecord.setFileID(1);
|
||||
addRecord.setCompactCount((short) 1);
|
||||
addRecord.encode(outBuffer);
|
||||
return addRecord.getEncodeSize();
|
||||
}
|
||||
|
||||
public enum ZeroEncodingSupport implements EncodingSupport {
|
||||
Instance;
|
||||
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buffer) {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,80 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||
|
||||
final class GcFreeJournal extends JournalImpl {
|
||||
|
||||
private final AddJournalRecordEncoder addJournalRecordEncoder = new AddJournalRecordEncoder();
|
||||
//TODO replace with thread local pools if not single threaded!
|
||||
private ByteBuffer journalRecordBytes = null;
|
||||
|
||||
GcFreeJournal(final int fileSize,
|
||||
final int minFiles,
|
||||
final int poolSize,
|
||||
final int compactMinFiles,
|
||||
final int compactPercentage,
|
||||
final SequentialFileFactory fileFactory,
|
||||
final String filePrefix,
|
||||
final String fileExtension,
|
||||
final int maxAIO) {
|
||||
super(fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
|
||||
}
|
||||
|
||||
public static int align(final int value, final int alignment) {
|
||||
return (value + (alignment - 1)) & ~(alignment - 1);
|
||||
}
|
||||
|
||||
public void appendAddRecord(final long id,
|
||||
final int recordType,
|
||||
final ByteBuffer encodedRecord,
|
||||
final int offset,
|
||||
final int length,
|
||||
final boolean sync) throws Exception {
|
||||
final int expectedLength = JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(length);
|
||||
final int alignedLength = align(expectedLength, 8);
|
||||
switchFileIfNecessary(alignedLength);
|
||||
final JournalFile currentFile = getCurrentFile();
|
||||
final int fileId = currentFile.getRecordID();
|
||||
if (this.journalRecordBytes == null || this.journalRecordBytes.capacity() < alignedLength) {
|
||||
final int newPooledLength = align(alignedLength, 4096);
|
||||
//TODO ADD LIMITS OR WARNS IN CASE OF TOO MUCH BIGGER SIZE
|
||||
this.journalRecordBytes = ByteBuffer.allocateDirect(newPooledLength);
|
||||
this.journalRecordBytes.order(ByteOrder.nativeOrder());
|
||||
}
|
||||
final long journalRecordHeader = JournalRecordHeader.makeHeader(JournalRecordTypes.ADD_JOURNAL, expectedLength);
|
||||
this.journalRecordBytes.putLong(0, journalRecordHeader);
|
||||
//use natural stride while encoding: FileId<CompactCount<Id<RecordType<RecordBytes
|
||||
this.addJournalRecordEncoder.on(this.journalRecordBytes, JournalRecordHeader.BYTES).fileId(fileId).compactCount(0).id(id).recordType(recordType).record(encodedRecord, offset, length);
|
||||
final SequentialFile sequentialFile = currentFile.getFile();
|
||||
try {
|
||||
this.journalRecordBytes.limit(alignedLength);
|
||||
sequentialFile.writeDirect(this.journalRecordBytes, sync);
|
||||
} finally {
|
||||
this.journalRecordBytes.clear();
|
||||
}
|
||||
//TODO AVOID INDEXING WITH CONCURRENT MAP!
|
||||
}
|
||||
|
||||
}
|
|
@ -1,131 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import net.openhft.chronicle.core.jlbh.JLBH;
|
||||
import net.openhft.chronicle.core.jlbh.JLBHOptions;
|
||||
import net.openhft.chronicle.core.jlbh.JLBHTask;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||
|
||||
public class GcFreeJournalLatencyBench implements JLBHTask {
|
||||
|
||||
private static final int FILE_SIZE = JournalImpl.SIZE_HEADER + (1024 * 1024 * 1024);
|
||||
private static final JournalType JOURNAL_TYPE = JournalType.MAPPED;
|
||||
private static final int ITERATIONS = 100_000;
|
||||
private static final int WARMUP_ITERATIONS = 20_000;
|
||||
private static final int TARGET_THROUGHPUT = 500_000;
|
||||
private static final int TESTS = 5;
|
||||
private static int TOTAL_MESSAGES = (ITERATIONS * TESTS + WARMUP_ITERATIONS);
|
||||
private static int ENCODED_SIZE = 8;
|
||||
private static int CHUNK_BYTES = FILE_SIZE;
|
||||
private static int OVERLAP_BYTES = CHUNK_BYTES / 4;
|
||||
private final SequentialFileFactory sequentialFileFactory;
|
||||
private GcFreeJournal journal;
|
||||
private JLBH jlbh;
|
||||
private long id;
|
||||
private ByteBuffer encodedRecord;
|
||||
|
||||
public GcFreeJournalLatencyBench(SequentialFileFactory sequentialFileFactory) {
|
||||
this.sequentialFileFactory = sequentialFileFactory;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
final File journalDir = Files.createTempDirectory("seq_files").toFile();
|
||||
journalDir.deleteOnExit();
|
||||
final boolean buffered = false;
|
||||
final int bufferSize = 4096;
|
||||
final int bufferTimeout = 0;
|
||||
final int maxIO = -1;
|
||||
final boolean logRates = false;
|
||||
final IOCriticalErrorListener criticalErrorListener = null;
|
||||
final SequentialFileFactory sequentialFileFactory;
|
||||
switch (JOURNAL_TYPE) {
|
||||
case MAPPED:
|
||||
sequentialFileFactory = new MappedSequentialFileFactory(journalDir, criticalErrorListener).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES);
|
||||
break;
|
||||
case NIO:
|
||||
sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new AssertionError("!?");
|
||||
}
|
||||
final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new GcFreeJournalLatencyBench(sequentialFileFactory));
|
||||
new JLBH(lth).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(JLBH jlbh) {
|
||||
id = 0;
|
||||
this.jlbh = jlbh;
|
||||
final int expectedMaxSize = GcFreeJournal.align(JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(ENCODED_SIZE), 8);
|
||||
int numFiles = (int) ((TOTAL_MESSAGES * expectedMaxSize + 512) / FILE_SIZE * 1.3);
|
||||
if (numFiles < 2) {
|
||||
numFiles = 2;
|
||||
}
|
||||
this.encodedRecord = ByteBuffer.allocateDirect(ENCODED_SIZE);
|
||||
this.encodedRecord.order(ByteOrder.nativeOrder());
|
||||
this.journal = new GcFreeJournal(FILE_SIZE, numFiles, numFiles, 0, 0, sequentialFileFactory, "activemq-data", "amq", Integer.MAX_VALUE);
|
||||
try {
|
||||
journal.start();
|
||||
journal.load(new ArrayList<RecordInfo>(), null, null);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(long startTimeNS) {
|
||||
id++;
|
||||
try {
|
||||
journal.appendAddRecord(id, (byte) 0, encodedRecord, 0, ENCODED_SIZE, false);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
jlbh.sample(System.nanoTime() - startTimeNS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete() {
|
||||
try {
|
||||
journal.stop();
|
||||
for (File journalFile : sequentialFileFactory.getDirectory().listFiles()) {
|
||||
journalFile.deleteOnExit();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private enum JournalType {
|
||||
MAPPED,
|
||||
NIO
|
||||
}
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||
|
||||
final class JournalRecordHeader {
|
||||
|
||||
public static final int BYTES = 8;
|
||||
|
||||
public static long makeHeader(final int journalRecordTypeId, final int length) {
|
||||
return ((journalRecordTypeId & 0xFFFF_FFFFL) << 32) | (length & 0xFFFF_FFFFL);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||
|
||||
/**
|
||||
* Created by developer on 18/06/16.
|
||||
*/
|
||||
final class JournalRecordTypes {
|
||||
|
||||
public static final int ADD_JOURNAL = 11;
|
||||
|
||||
private JournalRecordTypes() {
|
||||
|
||||
}
|
||||
}
|
|
@ -1,128 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.extras.benchmarks.sequentialfile;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import net.openhft.chronicle.core.jlbh.JLBH;
|
||||
import net.openhft.chronicle.core.jlbh.JLBHOptions;
|
||||
import net.openhft.chronicle.core.jlbh.JLBHTask;
|
||||
import org.apache.activemq.artemis.core.io.DummyCallback;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
|
||||
public final class SequentialFileLatencyBench implements JLBHTask {
|
||||
|
||||
private static final JournalType JOURNAL_TYPE = JournalType.MAPPED;
|
||||
//NOTE: SUPPORTED ONLY ON *NIX
|
||||
private static final boolean SHM = false;
|
||||
private static final int JOURNAL_RECORD_SIZE = 8;
|
||||
private static final int ITERATIONS = 100_000;
|
||||
private static final int WARMUP_ITERATIONS = 20_000;
|
||||
private static final int TARGET_THROUGHPUT = 500_000;
|
||||
private static final int TESTS = 5;
|
||||
private static int CHUNK_BYTES = 4096 * 1024 * 16;
|
||||
private static int OVERLAP_BYTES = CHUNK_BYTES / 4;
|
||||
private final SequentialFileFactory sequentialFileFactory;
|
||||
private SequentialFile sequentialFile;
|
||||
private ByteBuffer message;
|
||||
private JLBH jlbh;
|
||||
|
||||
public SequentialFileLatencyBench(SequentialFileFactory sequentialFileFactory) {
|
||||
this.sequentialFileFactory = sequentialFileFactory;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
final File journalDir;
|
||||
if (SHM) {
|
||||
journalDir = Files.createDirectory(Paths.get("/dev/shm/seq_files")).toFile();
|
||||
} else {
|
||||
journalDir = Files.createTempDirectory("seq_files").toFile();
|
||||
}
|
||||
journalDir.deleteOnExit();
|
||||
final boolean buffered = false;
|
||||
final int bufferSize = 4096;
|
||||
final int bufferTimeout = 0;
|
||||
final int maxIO = -1;
|
||||
final boolean logRates = false;
|
||||
final IOCriticalErrorListener criticalErrorListener = null;
|
||||
final SequentialFileFactory sequentialFileFactory;
|
||||
switch (JOURNAL_TYPE) {
|
||||
case MAPPED:
|
||||
sequentialFileFactory = new MappedSequentialFileFactory(journalDir).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES);
|
||||
break;
|
||||
case NIO:
|
||||
sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener);
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("!?");
|
||||
}
|
||||
final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new SequentialFileLatencyBench(sequentialFileFactory));
|
||||
new JLBH(lth).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(JLBH jlbh) {
|
||||
this.jlbh = jlbh;
|
||||
this.sequentialFile = this.sequentialFileFactory.createSequentialFile(Long.toString(System.nanoTime()));
|
||||
try {
|
||||
this.sequentialFile.open(-1, false);
|
||||
final File file = this.sequentialFile.getJavaFile();
|
||||
file.deleteOnExit();
|
||||
System.out.println("sequentialFile: " + file);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
this.message = this.sequentialFileFactory.allocateDirectBuffer(JOURNAL_RECORD_SIZE).order(ByteOrder.nativeOrder());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(long startTimeNS) {
|
||||
message.position(0);
|
||||
try {
|
||||
sequentialFile.writeDirect(message, false, DummyCallback.getInstance());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
jlbh.sample(System.nanoTime() - startTimeNS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete() {
|
||||
sequentialFileFactory.releaseDirectBuffer(message);
|
||||
try {
|
||||
sequentialFile.close();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private enum JournalType {
|
||||
MAPPED,
|
||||
NIO
|
||||
}
|
||||
}
|
|
@ -22,9 +22,27 @@ import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
|||
|
||||
public class MappedImportExportTest extends NIOImportExportTest {
|
||||
|
||||
@Override
|
||||
protected void setup(int minFreeFiles, int fileSize, boolean sync) {
|
||||
super.setup(minFreeFiles, fileSize, sync);
|
||||
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setup(int minFreeFiles, int fileSize, boolean sync, int maxAIO) {
|
||||
super.setup(minFreeFiles, fileSize, sync, maxAIO);
|
||||
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setup(int minFreeFiles, int poolSize, int fileSize, boolean sync, int maxAIO) {
|
||||
super.setup(minFreeFiles, poolSize, fileSize, sync, maxAIO);
|
||||
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SequentialFileFactory getFileFactory() throws Exception {
|
||||
return new MappedSequentialFileFactory(getTestDirfile());
|
||||
return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 10 * 4096, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,24 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
|||
|
||||
public class MappedJournalCompactTest extends NIOJournalCompactTest {
|
||||
|
||||
@Override
|
||||
protected void setup(int minFreeFiles, int fileSize, boolean sync) {
|
||||
super.setup(minFreeFiles, fileSize, sync);
|
||||
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setup(int minFreeFiles, int fileSize, boolean sync, int maxAIO) {
|
||||
super.setup(minFreeFiles, fileSize, sync, maxAIO);
|
||||
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setup(int minFreeFiles, int poolSize, int fileSize, boolean sync, int maxAIO) {
|
||||
super.setup(minFreeFiles, poolSize, fileSize, sync, maxAIO);
|
||||
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SequentialFileFactory getFileFactory() throws Exception {
|
||||
File file = new File(getTestDir());
|
||||
|
@ -32,6 +50,6 @@ public class MappedJournalCompactTest extends NIOJournalCompactTest {
|
|||
|
||||
file.mkdir();
|
||||
|
||||
return new MappedSequentialFileFactory(getTestDirfile());
|
||||
return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 60 * 1024, null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,24 @@ import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestU
|
|||
|
||||
public class MappedJournalImplTest extends JournalImplTestUnit {
|
||||
|
||||
@Override
|
||||
protected void setup(int minFreeFiles, int fileSize, boolean sync) {
|
||||
super.setup(minFreeFiles, fileSize, sync);
|
||||
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setup(int minFreeFiles, int fileSize, boolean sync, int maxAIO) {
|
||||
super.setup(minFreeFiles, fileSize, sync, maxAIO);
|
||||
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setup(int minFreeFiles, int poolSize, int fileSize, boolean sync, int maxAIO) {
|
||||
super.setup(minFreeFiles, poolSize, fileSize, sync, maxAIO);
|
||||
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SequentialFileFactory getFileFactory() throws Exception {
|
||||
File file = new File(getTestDir());
|
||||
|
@ -32,7 +50,7 @@ public class MappedJournalImplTest extends JournalImplTestUnit {
|
|||
|
||||
file.mkdir();
|
||||
|
||||
return new MappedSequentialFileFactory(getTestDirfile());
|
||||
return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 10 * 1024, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -34,7 +34,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa
|
|||
|
||||
@Override
|
||||
protected SequentialFileFactory createFactory(String folder) {
|
||||
return new MappedSequentialFileFactory(new File(folder));
|
||||
return MappedSequentialFileFactory.unbuffered(new File(folder), 2048, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -58,7 +58,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa
|
|||
};
|
||||
|
||||
final AtomicInteger calls = new AtomicInteger(0);
|
||||
final MappedSequentialFileFactory factory = new MappedSequentialFileFactory(new File(getTestDir()), (code, message, file) -> {
|
||||
final MappedSequentialFileFactory factory = MappedSequentialFileFactory.unbuffered(new File(getTestDir()), fakeEncoding.getEncodeSize(), (code, message, file) -> {
|
||||
new Exception("shutdown").printStackTrace();
|
||||
calls.incrementAndGet();
|
||||
});
|
||||
|
|
|
@ -22,8 +22,6 @@ import java.util.List;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.artemis.ArtemisConstants;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||
|
@ -354,12 +352,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
|
|||
} else if (factoryType.equals("nio2")) {
|
||||
return new NIOSequentialFileFactory(new File(directory), true, 1);
|
||||
} else if (factoryType.equals("mmap")) {
|
||||
return new MappedSequentialFileFactory(new File(directory), new IOCriticalErrorListener() {
|
||||
@Override
|
||||
public void onIOException(Throwable code, String message, SequentialFile file) {
|
||||
code.printStackTrace();
|
||||
}
|
||||
}, true).chunkBytes(fileSize).overlapBytes(0);
|
||||
return MappedSequentialFileFactory.unbuffered(new File(directory), fileSize, null);
|
||||
} else {
|
||||
return new NIOSequentialFileFactory(new File(directory), false, 1);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue