ARTEMIS-623/ARTEMIS-622 Added memory mapped impl of Sequential File + benchs.
Added experimental GCFree Journal impl + benchs + Sequentially Encoded Aligned Binary Protocol. https://issues.apache.org/jira/browse/ARTEMIS-622 https://issues.apache.org/jira/browse/ARTEMIS-623
This commit is contained in:
parent
506dbc7fff
commit
36555a10c5
|
@ -0,0 +1,49 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.io.mapped;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
|
||||||
|
final class BytesUtils {
|
||||||
|
|
||||||
|
private BytesUtils() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long align(final long value, final long alignment) {
|
||||||
|
return (value + (alignment - 1)) & ~(alignment - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void zerosDirect(final ByteBuffer buffer) {
|
||||||
|
//TODO When PlatformDependent will be replaced by VarHandle or Unsafe, replace with safepoint-fixed setMemory
|
||||||
|
//DANGEROUS!! erases bound-checking using directly addresses -> safe only if it use counted loops
|
||||||
|
int remaining = buffer.capacity();
|
||||||
|
long address = PlatformDependent.directBufferAddress(buffer);
|
||||||
|
while (remaining >= 8) {
|
||||||
|
PlatformDependent.putLong(address, 0L);
|
||||||
|
address += 8;
|
||||||
|
remaining -= 8;
|
||||||
|
}
|
||||||
|
while (remaining > 0) {
|
||||||
|
PlatformDependent.putByte(address, (byte) 0);
|
||||||
|
address++;
|
||||||
|
remaining--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,239 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.io.mapped;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
|
import java.lang.ref.WeakReference;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
import java.nio.MappedByteBuffer;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
|
import java.nio.channels.FileLock;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
|
||||||
|
final class MappedByteBufferCache implements AutoCloseable {
|
||||||
|
|
||||||
|
public static final int PAGE_SIZE = Integer.parseInt(System.getProperty("os_page_size", "4096"));
|
||||||
|
private static final Object FILE_LOCK = new Object();
|
||||||
|
private final RandomAccessFile raf;
|
||||||
|
private final FileChannel fileChannel;
|
||||||
|
private final long chunkBytes;
|
||||||
|
private final long overlapBytes;
|
||||||
|
private final ArrayList<WeakReference<MappedByteBuffer>> byteBuffers;
|
||||||
|
private final File file;
|
||||||
|
private final long mappedSize;
|
||||||
|
private boolean closed;
|
||||||
|
|
||||||
|
private MappedByteBufferCache(File file, RandomAccessFile raf, long chunkBytes, long overlapBytes, long alignment) {
|
||||||
|
this.byteBuffers = new ArrayList<>();
|
||||||
|
this.file = file;
|
||||||
|
this.raf = raf;
|
||||||
|
this.fileChannel = raf.getChannel();
|
||||||
|
this.chunkBytes = BytesUtils.align(chunkBytes, alignment);
|
||||||
|
this.overlapBytes = BytesUtils.align(overlapBytes, alignment);
|
||||||
|
this.closed = false;
|
||||||
|
this.mappedSize = this.chunkBytes + this.overlapBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static MappedByteBufferCache of(File file, long chunkSize, long overlapSize) throws FileNotFoundException {
|
||||||
|
final RandomAccessFile raf = new RandomAccessFile(file, "rw");
|
||||||
|
return new MappedByteBufferCache(file, raf, chunkSize, overlapSize, PAGE_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean inside(long position, long mappedPosition, long mappedLimit) {
|
||||||
|
return mappedPosition <= position && position < mappedLimit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public File file() {
|
||||||
|
return file;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long chunkBytes() {
|
||||||
|
return chunkBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long overlapBytes() {
|
||||||
|
return overlapBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int indexFor(long position) {
|
||||||
|
final int chunk = (int) (position / chunkBytes);
|
||||||
|
return chunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long mappedPositionFor(int index) {
|
||||||
|
return index * chunkBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long mappedLimitFor(long mappedPosition) {
|
||||||
|
return mappedPosition + chunkBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MappedByteBuffer acquireMappedByteBuffer(final int index) throws IOException, IllegalArgumentException, IllegalStateException {
|
||||||
|
if (closed)
|
||||||
|
throw new IOException("Closed");
|
||||||
|
if (index < 0)
|
||||||
|
throw new IOException("Attempt to access a negative index: " + index);
|
||||||
|
while (byteBuffers.size() <= index) {
|
||||||
|
byteBuffers.add(null);
|
||||||
|
}
|
||||||
|
final WeakReference<MappedByteBuffer> mbbRef = byteBuffers.get(index);
|
||||||
|
if (mbbRef != null) {
|
||||||
|
final MappedByteBuffer mbb = mbbRef.get();
|
||||||
|
if (mbb != null) {
|
||||||
|
return mbb;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return mapAndAcquire(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
//METHOD BUILT TO SEPARATE THE SLOW PATH TO ENSURE INLINING OF THE MOST OCCURRING CASE
|
||||||
|
private MappedByteBuffer mapAndAcquire(final int index) throws IOException {
|
||||||
|
final long chunkStartPosition = mappedPositionFor(index);
|
||||||
|
final long minSize = chunkStartPosition + mappedSize;
|
||||||
|
if (fileChannel.size() < minSize) {
|
||||||
|
try {
|
||||||
|
synchronized (FILE_LOCK) {
|
||||||
|
try (FileLock lock = fileChannel.lock()) {
|
||||||
|
final long size = fileChannel.size();
|
||||||
|
if (size < minSize) {
|
||||||
|
raf.setLength(minSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException ioe) {
|
||||||
|
throw new IOException("Failed to resize to " + minSize, ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final MappedByteBuffer mbb = fileChannel.map(FileChannel.MapMode.READ_WRITE, chunkStartPosition, mappedSize);
|
||||||
|
mbb.order(ByteOrder.nativeOrder());
|
||||||
|
byteBuffers.set(index, new WeakReference<>(mbb));
|
||||||
|
return mbb;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long fileSize() throws IOException {
|
||||||
|
if (closed)
|
||||||
|
throw new IllegalStateException("Closed");
|
||||||
|
return fileChannel.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void closeAndResize(long length) {
|
||||||
|
if (!closed) {
|
||||||
|
//TO_FIX: unmap in this way is not portable BUT required on Windows that can't resize a memmory mapped file!
|
||||||
|
final int mappedBuffers = this.byteBuffers.size();
|
||||||
|
for (int i = 0; i < mappedBuffers; i++) {
|
||||||
|
final WeakReference<MappedByteBuffer> mbbRef = byteBuffers.get(i);
|
||||||
|
if (mbbRef != null) {
|
||||||
|
final MappedByteBuffer mbb = mbbRef.get();
|
||||||
|
if (mbb != null) {
|
||||||
|
try {
|
||||||
|
PlatformDependent.freeDirectBuffer(mbb);
|
||||||
|
}
|
||||||
|
catch (Throwable t) {
|
||||||
|
//TO_FIX: force releasing of the other buffers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.byteBuffers.clear();
|
||||||
|
try {
|
||||||
|
if (fileChannel.size() != length) {
|
||||||
|
try {
|
||||||
|
synchronized (FILE_LOCK) {
|
||||||
|
try (FileLock lock = fileChannel.lock()) {
|
||||||
|
final long size = fileChannel.size();
|
||||||
|
if (size != length) {
|
||||||
|
raf.setLength(length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException ioe) {
|
||||||
|
throw new IllegalStateException("Failed to resize to " + length, ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException ex) {
|
||||||
|
throw new IllegalStateException("Failed to get size", ex);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
try {
|
||||||
|
fileChannel.close();
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new IllegalStateException("Failed to close channel", e);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
try {
|
||||||
|
raf.close();
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new IllegalStateException("Failed to close RandomAccessFile", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isClosed() {
|
||||||
|
return closed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
if (!closed) {
|
||||||
|
//TO_FIX: unmap in this way is not portable BUT required on Windows that can't resize a memory mapped file!
|
||||||
|
final int mappedBuffers = this.byteBuffers.size();
|
||||||
|
for (int i = 0; i < mappedBuffers; i++) {
|
||||||
|
final WeakReference<MappedByteBuffer> mbbRef = byteBuffers.get(i);
|
||||||
|
if (mbbRef != null) {
|
||||||
|
final MappedByteBuffer mbb = mbbRef.get();
|
||||||
|
if (mbb != null) {
|
||||||
|
try {
|
||||||
|
PlatformDependent.freeDirectBuffer(mbb);
|
||||||
|
}
|
||||||
|
catch (Throwable t) {
|
||||||
|
//TO_FIX: force releasing of the other buffers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.byteBuffers.clear();
|
||||||
|
try {
|
||||||
|
fileChannel.close();
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new IllegalStateException("Failed to close channel", e);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
try {
|
||||||
|
raf.close();
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new IllegalStateException("Failed to close RandomAccessFile", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,331 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.io.mapped;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.BufferUnderflowException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
import java.nio.MappedByteBuffer;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
|
||||||
|
final class MappedFile implements AutoCloseable {
|
||||||
|
|
||||||
|
private static final ByteBuffer ZERO_PAGE = ByteBuffer.allocateDirect(MappedByteBufferCache.PAGE_SIZE).order(ByteOrder.nativeOrder());
|
||||||
|
|
||||||
|
private final MappedByteBufferCache cache;
|
||||||
|
private final int zerosMaxPage;
|
||||||
|
private MappedByteBuffer lastMapped;
|
||||||
|
private long lastMappedStart;
|
||||||
|
private long lastMappedLimit;
|
||||||
|
private long position;
|
||||||
|
private long length;
|
||||||
|
|
||||||
|
private MappedFile(MappedByteBufferCache cache) throws IOException {
|
||||||
|
this.cache = cache;
|
||||||
|
this.lastMapped = null;
|
||||||
|
this.lastMappedStart = -1;
|
||||||
|
this.lastMappedLimit = -1;
|
||||||
|
this.position = 0;
|
||||||
|
this.length = this.cache.fileSize();
|
||||||
|
this.zerosMaxPage = Math.min(ZERO_PAGE.capacity(), (int) Math.min(Integer.MAX_VALUE, cache.overlapBytes()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static MappedFile of(File file, long chunckSize, long overlapSize) throws IOException {
|
||||||
|
return new MappedFile(MappedByteBufferCache.of(file, chunckSize, overlapSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
public MappedByteBufferCache cache() {
|
||||||
|
return cache;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int checkOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
|
||||||
|
if (!MappedByteBufferCache.inside(offset, lastMappedStart, lastMappedLimit)) {
|
||||||
|
try {
|
||||||
|
final int index = cache.indexFor(offset);
|
||||||
|
final long mappedPosition = cache.mappedPositionFor(index);
|
||||||
|
final long mappedLimit = cache.mappedLimitFor(mappedPosition);
|
||||||
|
if (offset + bytes > mappedLimit) {
|
||||||
|
throw new IOException("mapping overflow!");
|
||||||
|
}
|
||||||
|
lastMapped = cache.acquireMappedByteBuffer(index);
|
||||||
|
lastMappedStart = mappedPosition;
|
||||||
|
lastMappedLimit = mappedLimit;
|
||||||
|
final int bufferPosition = (int) (offset - mappedPosition);
|
||||||
|
return bufferPosition;
|
||||||
|
}
|
||||||
|
catch (IllegalStateException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
catch (IllegalArgumentException e) {
|
||||||
|
throw new BufferUnderflowException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
final int bufferPosition = (int) (offset - lastMappedStart);
|
||||||
|
return bufferPosition;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void force() {
|
||||||
|
if (lastMapped != null) {
|
||||||
|
lastMapped.force();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads a sequence of bytes from this file into the given buffer.
|
||||||
|
* <p>
|
||||||
|
* <p> Bytes are read starting at this file's specified position.
|
||||||
|
*/
|
||||||
|
public int read(long position, ByteBuf dst, int dstStart, int dstLength) throws IOException {
|
||||||
|
final int bufferPosition = checkOffset(position, dstLength);
|
||||||
|
final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||||
|
if (dst.hasMemoryAddress()) {
|
||||||
|
final long dstAddress = dst.memoryAddress() + dstStart;
|
||||||
|
PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength);
|
||||||
|
}
|
||||||
|
else if (dst.hasArray()) {
|
||||||
|
final byte[] dstArray = dst.array();
|
||||||
|
PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, dstLength);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IllegalArgumentException("unsupported byte buffer");
|
||||||
|
}
|
||||||
|
position += dstLength;
|
||||||
|
if (position > this.length) {
|
||||||
|
this.length = position;
|
||||||
|
}
|
||||||
|
return dstLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads a sequence of bytes from this file into the given buffer.
|
||||||
|
* <p>
|
||||||
|
* <p> Bytes are read starting at this file's specified position.
|
||||||
|
*/
|
||||||
|
public int read(long position, ByteBuffer dst, int dstStart, int dstLength) throws IOException {
|
||||||
|
final int bufferPosition = checkOffset(position, dstLength);
|
||||||
|
final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||||
|
if (dst.isDirect()) {
|
||||||
|
final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart;
|
||||||
|
PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
final byte[] dstArray = dst.array();
|
||||||
|
PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, dstLength);
|
||||||
|
}
|
||||||
|
position += dstLength;
|
||||||
|
if (position > this.length) {
|
||||||
|
this.length = position;
|
||||||
|
}
|
||||||
|
return dstLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads a sequence of bytes from this file into the given buffer.
|
||||||
|
* <p>
|
||||||
|
* <p> Bytes are read starting at this file's current position, and
|
||||||
|
* then the position is updated with the number of bytes actually read.
|
||||||
|
*/
|
||||||
|
public int read(ByteBuf dst, int dstStart, int dstLength) throws IOException {
|
||||||
|
final int remaining = (int) Math.min(this.length - this.position, (long) Integer.MAX_VALUE);
|
||||||
|
final int read = Math.min(remaining, dstLength);
|
||||||
|
final int bufferPosition = checkOffset(position, read);
|
||||||
|
final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||||
|
if (dst.hasMemoryAddress()) {
|
||||||
|
final long dstAddress = dst.memoryAddress() + dstStart;
|
||||||
|
PlatformDependent.copyMemory(srcAddress, dstAddress, read);
|
||||||
|
}
|
||||||
|
else if (dst.hasArray()) {
|
||||||
|
final byte[] dstArray = dst.array();
|
||||||
|
PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, read);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IllegalArgumentException("unsupported byte buffer");
|
||||||
|
}
|
||||||
|
position += read;
|
||||||
|
return read;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads a sequence of bytes from this file into the given buffer.
|
||||||
|
* <p>
|
||||||
|
* <p> Bytes are read starting at this file's current position, and
|
||||||
|
* then the position is updated with the number of bytes actually read.
|
||||||
|
*/
|
||||||
|
public int read(ByteBuffer dst, int dstStart, int dstLength) throws IOException {
|
||||||
|
final int remaining = (int) Math.min(this.length - this.position, (long) Integer.MAX_VALUE);
|
||||||
|
final int read = Math.min(remaining, dstLength);
|
||||||
|
final int bufferPosition = checkOffset(position, read);
|
||||||
|
final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||||
|
if (dst.isDirect()) {
|
||||||
|
final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart;
|
||||||
|
PlatformDependent.copyMemory(srcAddress, dstAddress, read);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
final byte[] dstArray = dst.array();
|
||||||
|
PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, read);
|
||||||
|
}
|
||||||
|
position += read;
|
||||||
|
return read;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes a sequence of bytes to this file from the given buffer.
|
||||||
|
* <p>
|
||||||
|
* <p> Bytes are written starting at this file's current position,
|
||||||
|
*/
|
||||||
|
public void write(ByteBuf src, int srcStart, int srcLength) throws IOException {
|
||||||
|
final int bufferPosition = checkOffset(position, srcLength);
|
||||||
|
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||||
|
if (src.hasMemoryAddress()) {
|
||||||
|
final long srcAddress = src.memoryAddress() + srcStart;
|
||||||
|
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
|
||||||
|
}
|
||||||
|
else if (src.hasArray()) {
|
||||||
|
final byte[] srcArray = src.array();
|
||||||
|
PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IllegalArgumentException("unsupported byte buffer");
|
||||||
|
}
|
||||||
|
position += srcLength;
|
||||||
|
if (position > this.length) {
|
||||||
|
this.length = position;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes a sequence of bytes to this file from the given buffer.
|
||||||
|
* <p>
|
||||||
|
* <p> Bytes are written starting at this file's current position,
|
||||||
|
*/
|
||||||
|
public void write(ByteBuffer src, int srcStart, int srcLength) throws IOException {
|
||||||
|
final int bufferPosition = checkOffset(position, srcLength);
|
||||||
|
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||||
|
if (src.isDirect()) {
|
||||||
|
final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart;
|
||||||
|
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
final byte[] srcArray = src.array();
|
||||||
|
PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength);
|
||||||
|
}
|
||||||
|
position += srcLength;
|
||||||
|
if (position > this.length) {
|
||||||
|
this.length = position;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes a sequence of bytes to this file from the given buffer.
|
||||||
|
* <p>
|
||||||
|
* <p> Bytes are written starting at this file's specified position,
|
||||||
|
*/
|
||||||
|
public void write(long position, ByteBuf src, int srcStart, int srcLength) throws IOException {
|
||||||
|
final int bufferPosition = checkOffset(position, srcLength);
|
||||||
|
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||||
|
if (src.hasMemoryAddress()) {
|
||||||
|
final long srcAddress = src.memoryAddress() + srcStart;
|
||||||
|
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
|
||||||
|
}
|
||||||
|
else if (src.hasArray()) {
|
||||||
|
final byte[] srcArray = src.array();
|
||||||
|
PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IllegalArgumentException("unsupported byte buffer");
|
||||||
|
}
|
||||||
|
position += srcLength;
|
||||||
|
if (position > this.length) {
|
||||||
|
this.length = position;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes a sequence of bytes to this file from the given buffer.
|
||||||
|
* <p>
|
||||||
|
* <p> Bytes are written starting at this file's specified position,
|
||||||
|
*/
|
||||||
|
public void write(long position, ByteBuffer src, int srcStart, int srcLength) throws IOException {
|
||||||
|
final int bufferPosition = checkOffset(position, srcLength);
|
||||||
|
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||||
|
if (src.isDirect()) {
|
||||||
|
final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart;
|
||||||
|
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
final byte[] srcArray = src.array();
|
||||||
|
PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength);
|
||||||
|
}
|
||||||
|
position += srcLength;
|
||||||
|
if (position > this.length) {
|
||||||
|
this.length = position;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes a sequence of bytes to this file from the given buffer.
|
||||||
|
* <p>
|
||||||
|
* <p> Bytes are written starting at this file's current position,
|
||||||
|
*/
|
||||||
|
public void zeros(long offset, int count) throws IOException {
|
||||||
|
final long targetOffset = offset + count;
|
||||||
|
final int zerosBulkCopies = count / zerosMaxPage;
|
||||||
|
final long srcAddress = PlatformDependent.directBufferAddress(ZERO_PAGE);
|
||||||
|
for (int i = 0; i < zerosBulkCopies; i++) {
|
||||||
|
final int bufferPosition = checkOffset(offset, zerosMaxPage);
|
||||||
|
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||||
|
PlatformDependent.copyMemory(srcAddress, destAddress, zerosMaxPage);
|
||||||
|
offset += zerosMaxPage;
|
||||||
|
}
|
||||||
|
final int remainingToBeZeroes = (int) (targetOffset - offset);
|
||||||
|
final int bufferPosition = checkOffset(offset, remainingToBeZeroes);
|
||||||
|
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
|
||||||
|
PlatformDependent.copyMemory(srcAddress, destAddress, remainingToBeZeroes);
|
||||||
|
if (targetOffset > this.length) {
|
||||||
|
this.length = targetOffset;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long position() {
|
||||||
|
return position;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long position(long newPosition) {
|
||||||
|
final long oldPosition = this.position;
|
||||||
|
this.position = newPosition;
|
||||||
|
return oldPosition;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long length() {
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
cache.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void closeAndResize(long length) {
|
||||||
|
cache.closeAndResize(length);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,434 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.io.mapped;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
|
import java.nio.channels.FileLock;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
||||||
|
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
|
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
||||||
|
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||||
|
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
|
||||||
|
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||||
|
|
||||||
|
final class MappedSequentialFile implements SequentialFile {
|
||||||
|
|
||||||
|
private final File directory;
|
||||||
|
private final long chunkBytes;
|
||||||
|
private final long overlapBytes;
|
||||||
|
private final IOCriticalErrorListener criticalErrorListener;
|
||||||
|
private File file;
|
||||||
|
private File absoluteFile;
|
||||||
|
private String fileName;
|
||||||
|
private MappedFile mappedFile;
|
||||||
|
private ActiveMQBuffer pooledActiveMQBuffer;
|
||||||
|
|
||||||
|
MappedSequentialFile(final File directory,
|
||||||
|
final File file,
|
||||||
|
final long chunkBytes,
|
||||||
|
final long overlapBytes,
|
||||||
|
final IOCriticalErrorListener criticalErrorListener) {
|
||||||
|
this.directory = directory;
|
||||||
|
this.file = file;
|
||||||
|
this.absoluteFile = null;
|
||||||
|
this.fileName = null;
|
||||||
|
this.chunkBytes = chunkBytes;
|
||||||
|
this.overlapBytes = overlapBytes;
|
||||||
|
this.mappedFile = null;
|
||||||
|
this.pooledActiveMQBuffer = null;
|
||||||
|
this.criticalErrorListener = criticalErrorListener;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkIsOpen() {
|
||||||
|
if (!isOpen()) {
|
||||||
|
throw new IllegalStateException("must be open!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkIsNotOpen() {
|
||||||
|
if (isOpen()) {
|
||||||
|
throw new IllegalStateException("must be closed!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isOpen() {
|
||||||
|
return this.mappedFile != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean exists() {
|
||||||
|
return this.file.exists();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open() throws IOException {
|
||||||
|
if (this.mappedFile == null) {
|
||||||
|
this.mappedFile = MappedFile.of(file, chunkBytes, overlapBytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(int maxIO, boolean useExecutor) throws IOException {
|
||||||
|
//ignore maxIO e useExecutor
|
||||||
|
ActiveMQJournalLogger.LOGGER.warn("ignoring maxIO and useExecutor unsupported parameters!");
|
||||||
|
this.open();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean fits(int size) {
|
||||||
|
checkIsOpen();
|
||||||
|
final long newPosition = this.mappedFile.position() + size;
|
||||||
|
final boolean hasRemaining = newPosition <= this.mappedFile.length();
|
||||||
|
return hasRemaining;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getAlignment() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int calculateBlockStart(int position) {
|
||||||
|
return position;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getFileName() {
|
||||||
|
if (this.fileName == null) {
|
||||||
|
this.fileName = this.file.getName();
|
||||||
|
}
|
||||||
|
return this.fileName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void fill(int size) throws IOException {
|
||||||
|
checkIsOpen();
|
||||||
|
this.mappedFile.zeros(this.mappedFile.position(), size);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void delete() {
|
||||||
|
checkIsNotOpen();
|
||||||
|
if (file.exists() && !file.delete()) {
|
||||||
|
ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws IOException {
|
||||||
|
checkIsOpen();
|
||||||
|
if (callback == null) {
|
||||||
|
throw new NullPointerException("callback parameter need to be set");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
final ByteBuf byteBuf = bytes.byteBuf();
|
||||||
|
final int writerIndex = byteBuf.writerIndex();
|
||||||
|
final int readerIndex = byteBuf.readerIndex();
|
||||||
|
final int readableBytes = writerIndex - readerIndex;
|
||||||
|
if (readableBytes > 0) {
|
||||||
|
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
|
||||||
|
if (sync) {
|
||||||
|
this.mappedFile.force();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
callback.done();
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
if (this.criticalErrorListener != null) {
|
||||||
|
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||||
|
}
|
||||||
|
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ActiveMQBuffer bytes, boolean sync) throws IOException {
|
||||||
|
checkIsOpen();
|
||||||
|
final ByteBuf byteBuf = bytes.byteBuf();
|
||||||
|
final int writerIndex = byteBuf.writerIndex();
|
||||||
|
final int readerIndex = byteBuf.readerIndex();
|
||||||
|
final int readableBytes = writerIndex - readerIndex;
|
||||||
|
if (readableBytes > 0) {
|
||||||
|
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
|
||||||
|
if (sync) {
|
||||||
|
this.mappedFile.force();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ActiveMQBuffer acquiresActiveMQBufferWithAtLeast(int size) {
|
||||||
|
if (this.pooledActiveMQBuffer == null || this.pooledActiveMQBuffer.capacity() < size) {
|
||||||
|
this.pooledActiveMQBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size).order(ByteOrder.nativeOrder()));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
this.pooledActiveMQBuffer.clear();
|
||||||
|
}
|
||||||
|
return pooledActiveMQBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws IOException {
|
||||||
|
checkIsOpen();
|
||||||
|
if (callback == null) {
|
||||||
|
throw new NullPointerException("callback parameter need to be set");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
final int encodedSize = bytes.getEncodeSize();
|
||||||
|
final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize);
|
||||||
|
bytes.encode(outBuffer);
|
||||||
|
final ByteBuf byteBuf = outBuffer.byteBuf();
|
||||||
|
final int writerIndex = byteBuf.writerIndex();
|
||||||
|
final int readerIndex = byteBuf.readerIndex();
|
||||||
|
final int readableBytes = writerIndex - readerIndex;
|
||||||
|
if (readableBytes > 0) {
|
||||||
|
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
|
||||||
|
if (sync) {
|
||||||
|
this.mappedFile.force();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
callback.done();
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
if (this.criticalErrorListener != null) {
|
||||||
|
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||||
|
}
|
||||||
|
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(EncodingSupport bytes, boolean sync) throws IOException {
|
||||||
|
checkIsOpen();
|
||||||
|
final int encodedSize = bytes.getEncodeSize();
|
||||||
|
final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize);
|
||||||
|
bytes.encode(outBuffer);
|
||||||
|
final ByteBuf byteBuf = outBuffer.byteBuf();
|
||||||
|
final int writerIndex = byteBuf.writerIndex();
|
||||||
|
final int readerIndex = byteBuf.readerIndex();
|
||||||
|
final int readableBytes = writerIndex - readerIndex;
|
||||||
|
if (readableBytes > 0) {
|
||||||
|
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
|
||||||
|
if (sync) {
|
||||||
|
this.mappedFile.force();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
|
||||||
|
checkIsOpen();
|
||||||
|
if (callback == null) {
|
||||||
|
throw new NullPointerException("callback parameter need to be set");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
final int position = bytes.position();
|
||||||
|
final int limit = bytes.limit();
|
||||||
|
final int remaining = limit - position;
|
||||||
|
if (remaining > 0) {
|
||||||
|
this.mappedFile.write(bytes, position, remaining);
|
||||||
|
if (sync) {
|
||||||
|
this.mappedFile.force();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
callback.done();
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
if (this.criticalErrorListener != null) {
|
||||||
|
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||||
|
}
|
||||||
|
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeDirect(ByteBuffer bytes, boolean sync) throws IOException {
|
||||||
|
checkIsOpen();
|
||||||
|
final int position = bytes.position();
|
||||||
|
final int limit = bytes.limit();
|
||||||
|
final int remaining = limit - position;
|
||||||
|
if (remaining > 0) {
|
||||||
|
this.mappedFile.write(bytes, position, remaining);
|
||||||
|
if (sync) {
|
||||||
|
this.mappedFile.force();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(ByteBuffer bytes, IOCallback callback) throws IOException {
|
||||||
|
checkIsOpen();
|
||||||
|
if (callback == null) {
|
||||||
|
throw new NullPointerException("callback parameter need to be set");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
final int position = bytes.position();
|
||||||
|
final int limit = bytes.limit();
|
||||||
|
final int remaining = limit - position;
|
||||||
|
if (remaining > 0) {
|
||||||
|
final int bytesRead = this.mappedFile.read(bytes, position, remaining);
|
||||||
|
final int newPosition = position + bytesRead;
|
||||||
|
bytes.position(newPosition);
|
||||||
|
bytes.flip();
|
||||||
|
callback.done();
|
||||||
|
return bytesRead;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
if (this.criticalErrorListener != null) {
|
||||||
|
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||||
|
}
|
||||||
|
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(ByteBuffer bytes) throws IOException {
|
||||||
|
checkIsOpen();
|
||||||
|
final int position = bytes.position();
|
||||||
|
final int limit = bytes.limit();
|
||||||
|
final int remaining = limit - position;
|
||||||
|
if (remaining > 0) {
|
||||||
|
final int bytesRead = this.mappedFile.read(bytes, position, remaining);
|
||||||
|
final int newPosition = position + bytesRead;
|
||||||
|
bytes.position(newPosition);
|
||||||
|
bytes.flip();
|
||||||
|
return bytesRead;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void position(long pos) {
|
||||||
|
checkIsOpen();
|
||||||
|
this.mappedFile.position(pos);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long position() {
|
||||||
|
checkIsOpen();
|
||||||
|
return this.mappedFile.position();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (this.mappedFile != null) {
|
||||||
|
this.mappedFile.closeAndResize(this.mappedFile.length());
|
||||||
|
this.mappedFile = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sync() throws IOException {
|
||||||
|
checkIsOpen();
|
||||||
|
this.mappedFile.force();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long size() {
|
||||||
|
if (this.mappedFile != null) {
|
||||||
|
return this.mappedFile.length();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return this.file.length();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void renameTo(String newFileName) throws Exception {
|
||||||
|
checkIsNotOpen();
|
||||||
|
if (this.fileName == null) {
|
||||||
|
this.fileName = this.file.getName();
|
||||||
|
}
|
||||||
|
if (!this.fileName.contentEquals(newFileName)) {
|
||||||
|
final File newFile = new File(this.directory, newFileName);
|
||||||
|
if (!file.renameTo(newFile)) {
|
||||||
|
throw ActiveMQJournalBundle.BUNDLE.ioRenameFileError(file.getName(), newFileName);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
this.file = newFile;
|
||||||
|
this.fileName = newFileName;
|
||||||
|
this.absoluteFile = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SequentialFile cloneFile() {
|
||||||
|
checkIsNotOpen();
|
||||||
|
return new MappedSequentialFile(this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void copyTo(SequentialFile dstFile) throws IOException {
|
||||||
|
checkIsNotOpen();
|
||||||
|
if (dstFile.isOpen()) {
|
||||||
|
throw new IllegalArgumentException("dstFile must be closed too");
|
||||||
|
}
|
||||||
|
try (RandomAccessFile src = new RandomAccessFile(file, "rw");
|
||||||
|
FileChannel srcChannel = src.getChannel();
|
||||||
|
FileLock srcLock = srcChannel.lock()) {
|
||||||
|
final long readableBytes = srcChannel.size();
|
||||||
|
if (readableBytes > 0) {
|
||||||
|
try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw");
|
||||||
|
FileChannel dstChannel = dst.getChannel();
|
||||||
|
FileLock dstLock = dstChannel.lock()) {
|
||||||
|
final long oldLength = dst.length();
|
||||||
|
final long newLength = oldLength + readableBytes;
|
||||||
|
dst.setLength(newLength);
|
||||||
|
final long transferred = dstChannel.transferFrom(srcChannel, oldLength, readableBytes);
|
||||||
|
if (transferred != readableBytes) {
|
||||||
|
dstChannel.truncate(oldLength);
|
||||||
|
throw new IOException("copied less then expected");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Deprecated
|
||||||
|
public void setTimedBuffer(TimedBuffer buffer) {
|
||||||
|
throw new UnsupportedOperationException("the timed buffer is not currently supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public File getJavaFile() {
|
||||||
|
if (this.absoluteFile == null) {
|
||||||
|
this.absoluteFile = this.file.getAbsoluteFile();
|
||||||
|
}
|
||||||
|
return this.absoluteFile;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,204 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.io.mapped;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FilenameFilter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
|
|
||||||
|
public final class MappedSequentialFileFactory implements SequentialFileFactory {
|
||||||
|
|
||||||
|
private static long DEFAULT_BLOCK_SIZE = 64L << 20;
|
||||||
|
private final File directory;
|
||||||
|
private final IOCriticalErrorListener criticalErrorListener;
|
||||||
|
private long chunkBytes;
|
||||||
|
private long overlapBytes;
|
||||||
|
|
||||||
|
public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
|
||||||
|
this.directory = directory;
|
||||||
|
this.criticalErrorListener = criticalErrorListener;
|
||||||
|
this.chunkBytes = DEFAULT_BLOCK_SIZE;
|
||||||
|
this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MappedSequentialFileFactory(File directory) {
|
||||||
|
this.directory = directory;
|
||||||
|
this.criticalErrorListener = null;
|
||||||
|
this.chunkBytes = DEFAULT_BLOCK_SIZE;
|
||||||
|
this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long chunkBytes() {
|
||||||
|
return chunkBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MappedSequentialFileFactory chunkBytes(long chunkBytes) {
|
||||||
|
this.chunkBytes = chunkBytes;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long overlapBytes() {
|
||||||
|
return overlapBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MappedSequentialFileFactory overlapBytes(long overlapBytes) {
|
||||||
|
this.overlapBytes = overlapBytes;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SequentialFile createSequentialFile(String fileName) {
|
||||||
|
return new MappedSequentialFile(directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxIO() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> listFiles(final String extension) throws Exception {
|
||||||
|
final FilenameFilter extensionFilter = new FilenameFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(final File file, final String name) {
|
||||||
|
return name.endsWith("." + extension);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final String[] fileNames = directory.list(extensionFilter);
|
||||||
|
if (fileNames == null) {
|
||||||
|
return Collections.EMPTY_LIST;
|
||||||
|
}
|
||||||
|
return Arrays.asList(fileNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSupportsCallbacks() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onIOError(Exception exception, String message, SequentialFile file) {
|
||||||
|
if (criticalErrorListener != null) {
|
||||||
|
criticalErrorListener.onIOException(exception, message, file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer allocateDirectBuffer(final int size) {
|
||||||
|
return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void releaseDirectBuffer(final ByteBuffer buffer) {
|
||||||
|
PlatformDependent.freeDirectBuffer(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer newBuffer(final int size) {
|
||||||
|
return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void releaseBuffer(ByteBuffer buffer) {
|
||||||
|
if (buffer.isDirect()) {
|
||||||
|
PlatformDependent.freeDirectBuffer(buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void activateBuffer(SequentialFile file) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deactivateBuffer() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer wrapBuffer(final byte[] bytes) {
|
||||||
|
return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getAlignment() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int calculateBlockSize(int bytes) {
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public File getDirectory() {
|
||||||
|
return this.directory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clearBuffer(final ByteBuffer buffer) {
|
||||||
|
buffer.clear();
|
||||||
|
if (buffer.isDirect()) {
|
||||||
|
BytesUtils.zerosDirect(buffer);
|
||||||
|
}
|
||||||
|
else if (buffer.hasArray()) {
|
||||||
|
final byte[] array = buffer.array();
|
||||||
|
//SIMD OPTIMIZATION
|
||||||
|
Arrays.fill(array, (byte) 0);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
//TODO VERIFY IF IT COULD HAPPENS
|
||||||
|
final int capacity = buffer.capacity();
|
||||||
|
for (int i = 0; i < capacity; i++) {
|
||||||
|
buffer.put(i, (byte) 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void createDirs() throws Exception {
|
||||||
|
boolean ok = directory.mkdirs();
|
||||||
|
if (!ok) {
|
||||||
|
throw new IOException("Failed to create directory " + directory);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -38,6 +38,9 @@
|
||||||
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
|
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
|
||||||
<jboss-jts.version>4.17.13.Final</jboss-jts.version>
|
<jboss-jts.version>4.17.13.Final</jboss-jts.version>
|
||||||
<hornetq.version>2.4.7.Final</hornetq.version>
|
<hornetq.version>2.4.7.Final</hornetq.version>
|
||||||
|
<openhft.core.version>1.4.9</openhft.core.version>
|
||||||
|
<openhft.affinity.version>3.0.6</openhft.affinity.version>
|
||||||
|
<openjdk.jmh.version>1.12</openjdk.jmh.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
@ -213,6 +216,34 @@
|
||||||
<artifactId>jbossjts-jacorb</artifactId>
|
<artifactId>jbossjts-jacorb</artifactId>
|
||||||
<version>4.17.13.Final</version>
|
<version>4.17.13.Final</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- ### Benchmark Tools -->
|
||||||
|
<!-- ### Java Latency Benchmarking Harness -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>net.openhft</groupId>
|
||||||
|
<artifactId>chronicle-core</artifactId>
|
||||||
|
<version>${openhft.core.version}</version>
|
||||||
|
<!-- License: Apache 2.0 -->
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>net.openhft</groupId>
|
||||||
|
<artifactId>affinity</artifactId>
|
||||||
|
<version>${openhft.affinity.version}</version>
|
||||||
|
<!-- License: LGPLv3-->
|
||||||
|
</dependency>
|
||||||
|
<!-- ### Java Microbenchmark Harness -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.openjdk.jmh</groupId>
|
||||||
|
<artifactId>jmh-core</artifactId>
|
||||||
|
<version>${openjdk.jmh.version}</version>
|
||||||
|
<!-- License: GPLv2-->
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.openjdk.jmh</groupId>
|
||||||
|
<artifactId>jmh-generator-annprocess</artifactId>
|
||||||
|
<version>${openjdk.jmh.version}</version>
|
||||||
|
<!-- License: GPLv2-->
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -0,0 +1,155 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.extras.benchmarks.journal;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import net.openhft.chronicle.core.jlbh.JLBH;
|
||||||
|
import net.openhft.chronicle.core.jlbh.JLBHOptions;
|
||||||
|
import net.openhft.chronicle.core.jlbh.JLBHTask;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||||
|
import org.apache.activemq.artemis.core.journal.Journal;
|
||||||
|
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||||
|
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||||
|
|
||||||
|
public class JournalImplLatencyBench implements JLBHTask {
|
||||||
|
|
||||||
|
private static final int FILE_SIZE = 1024 * 1024 * 1024;
|
||||||
|
private static final JournalType JOURNAL_TYPE = JournalType.MAPPED;
|
||||||
|
private static final int ITERATIONS = 100_000;
|
||||||
|
private static final int WARMUP_ITERATIONS = 20_000;
|
||||||
|
private static final int TARGET_THROUGHPUT = 50_000;
|
||||||
|
private static final int TESTS = 5;
|
||||||
|
private static int TOTAL_MESSAGES = (ITERATIONS * TESTS + WARMUP_ITERATIONS);
|
||||||
|
private static int ENCODED_SIZE = 8;
|
||||||
|
private static int CHUNK_BYTES = FILE_SIZE;
|
||||||
|
private static int OVERLAP_BYTES = CHUNK_BYTES / 4;
|
||||||
|
private final SequentialFileFactory sequentialFileFactory;
|
||||||
|
private Journal journal;
|
||||||
|
private EncodingSupport encodingSupport;
|
||||||
|
private JLBH jlbh;
|
||||||
|
private long id;
|
||||||
|
public JournalImplLatencyBench(SequentialFileFactory sequentialFileFactory) {
|
||||||
|
this.sequentialFileFactory = sequentialFileFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
final File journalDir = Files.createTempDirectory("seq_files").toFile();
|
||||||
|
journalDir.deleteOnExit();
|
||||||
|
final boolean buffered = false;
|
||||||
|
final int bufferSize = 4096;
|
||||||
|
final int bufferTimeout = 0;
|
||||||
|
final int maxIO = -1;
|
||||||
|
final boolean logRates = false;
|
||||||
|
final IOCriticalErrorListener criticalErrorListener = null;
|
||||||
|
final SequentialFileFactory sequentialFileFactory;
|
||||||
|
switch (JOURNAL_TYPE) {
|
||||||
|
case MAPPED:
|
||||||
|
sequentialFileFactory = new MappedSequentialFileFactory(journalDir, criticalErrorListener).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES);
|
||||||
|
break;
|
||||||
|
case NIO:
|
||||||
|
sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw new AssertionError("!?");
|
||||||
|
}
|
||||||
|
final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new JournalImplLatencyBench(sequentialFileFactory));
|
||||||
|
new JLBH(lth).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(JLBH jlbh) {
|
||||||
|
id = 0;
|
||||||
|
this.jlbh = jlbh;
|
||||||
|
int numFiles = (int) ((TOTAL_MESSAGES * 1024 + 512) / FILE_SIZE * 1.3);
|
||||||
|
if (numFiles < 2) {
|
||||||
|
numFiles = 2;
|
||||||
|
}
|
||||||
|
this.journal = new JournalImpl(FILE_SIZE, numFiles, numFiles, 0, 0, sequentialFileFactory, "activemq-data", "amq", Integer.MAX_VALUE);
|
||||||
|
this.encodingSupport = NilEncodingSupport.Instance;
|
||||||
|
try {
|
||||||
|
journal.start();
|
||||||
|
journal.load(new ArrayList<RecordInfo>(), null, null);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(long startTimeNS) {
|
||||||
|
id++;
|
||||||
|
try {
|
||||||
|
journal.appendAddRecord(id, (byte) 0, encodingSupport, false);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
jlbh.sample(System.nanoTime() - startTimeNS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void complete() {
|
||||||
|
try {
|
||||||
|
journal.stop();
|
||||||
|
for (File journalFile : sequentialFileFactory.getDirectory().listFiles()) {
|
||||||
|
journalFile.deleteOnExit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum JournalType {
|
||||||
|
MAPPED,
|
||||||
|
NIO
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum NilEncodingSupport implements EncodingSupport {
|
||||||
|
Instance;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getEncodeSize() {
|
||||||
|
return ENCODED_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void encode(ActiveMQBuffer buffer) {
|
||||||
|
final int writerIndex = buffer.writerIndex();
|
||||||
|
for (int i = 0; i < ENCODED_SIZE; i++) {
|
||||||
|
buffer.writeByte((byte) 0);
|
||||||
|
}
|
||||||
|
buffer.writerIndex(writerIndex + ENCODED_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void decode(ActiveMQBuffer buffer) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,113 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||||
|
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||||
|
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
|
||||||
|
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
|
||||||
|
import org.openjdk.jmh.annotations.Benchmark;
|
||||||
|
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||||
|
import org.openjdk.jmh.annotations.Mode;
|
||||||
|
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||||
|
import org.openjdk.jmh.annotations.Scope;
|
||||||
|
import org.openjdk.jmh.annotations.Setup;
|
||||||
|
import org.openjdk.jmh.annotations.State;
|
||||||
|
import org.openjdk.jmh.profile.GCProfiler;
|
||||||
|
import org.openjdk.jmh.runner.Runner;
|
||||||
|
import org.openjdk.jmh.runner.RunnerException;
|
||||||
|
import org.openjdk.jmh.runner.options.Options;
|
||||||
|
import org.openjdk.jmh.runner.options.OptionsBuilder;
|
||||||
|
|
||||||
|
@State(Scope.Thread)
|
||||||
|
@BenchmarkMode(value = {Mode.Throughput, Mode.SampleTime})
|
||||||
|
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||||
|
public class EncodersBench {
|
||||||
|
|
||||||
|
private static final int expectedEncoderSize = JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(0);
|
||||||
|
private JournalInternalRecord record;
|
||||||
|
private ByteBuffer byteBuffer;
|
||||||
|
private AddJournalRecordEncoder addJournalRecordEncoder;
|
||||||
|
private ActiveMQBuffer outBuffer;
|
||||||
|
|
||||||
|
public static void main(String[] args) throws RunnerException {
|
||||||
|
final Options opt = new OptionsBuilder().include(EncodersBench.class.getSimpleName()).addProfiler(GCProfiler.class).warmupIterations(5).measurementIterations(5).forks(1).build();
|
||||||
|
new Runner(opt).run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Setup
|
||||||
|
public void init() {
|
||||||
|
this.byteBuffer = ByteBuffer.allocateDirect(expectedEncoderSize);
|
||||||
|
this.byteBuffer.order(ByteOrder.nativeOrder());
|
||||||
|
this.addJournalRecordEncoder = new AddJournalRecordEncoder();
|
||||||
|
|
||||||
|
this.record = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance);
|
||||||
|
this.record.setFileID(1);
|
||||||
|
this.record.setCompactCount((short) 1);
|
||||||
|
this.outBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(this.record.getEncodeSize(), this.record.getEncodeSize()).order(ByteOrder.nativeOrder()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
public int encodeAligned() {
|
||||||
|
//Header
|
||||||
|
final long header = JournalRecordHeader.makeHeader(JournalRecordTypes.ADD_JOURNAL, expectedEncoderSize);
|
||||||
|
this.byteBuffer.putLong(0, header);
|
||||||
|
//FileId<CompactCount<Id<RecordType<RecordBytes
|
||||||
|
return addJournalRecordEncoder.on(byteBuffer, JournalRecordHeader.BYTES).fileId(1).compactCount(1).id(1L).recordType(1).noRecord().encodedLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
public int encodeUnaligned() {
|
||||||
|
outBuffer.clear();
|
||||||
|
record.encode(outBuffer);
|
||||||
|
return record.getEncodeSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
public int encodeUnalignedWithGarbage() {
|
||||||
|
outBuffer.clear();
|
||||||
|
final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance);
|
||||||
|
addRecord.setFileID(1);
|
||||||
|
addRecord.setCompactCount((short) 1);
|
||||||
|
addRecord.encode(outBuffer);
|
||||||
|
return addRecord.getEncodeSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
public enum ZeroEncodingSupport implements EncodingSupport {
|
||||||
|
Instance;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getEncodeSize() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void encode(ActiveMQBuffer buffer) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void decode(ActiveMQBuffer buffer) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,81 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||||
|
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||||
|
|
||||||
|
final class GcFreeJournal extends JournalImpl {
|
||||||
|
|
||||||
|
private final AddJournalRecordEncoder addJournalRecordEncoder = new AddJournalRecordEncoder();
|
||||||
|
//TODO replace with thread local pools if not single threaded!
|
||||||
|
private ByteBuffer journalRecordBytes = null;
|
||||||
|
|
||||||
|
GcFreeJournal(final int fileSize,
|
||||||
|
final int minFiles,
|
||||||
|
final int poolSize,
|
||||||
|
final int compactMinFiles,
|
||||||
|
final int compactPercentage,
|
||||||
|
final SequentialFileFactory fileFactory,
|
||||||
|
final String filePrefix,
|
||||||
|
final String fileExtension,
|
||||||
|
final int maxAIO) {
|
||||||
|
super(fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int align(final int value, final int alignment) {
|
||||||
|
return (value + (alignment - 1)) & ~(alignment - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void appendAddRecord(final long id,
|
||||||
|
final int recordType,
|
||||||
|
final ByteBuffer encodedRecord,
|
||||||
|
final int offset,
|
||||||
|
final int length,
|
||||||
|
final boolean sync) throws Exception {
|
||||||
|
final int expectedLength = JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(length);
|
||||||
|
final int alignedLength = align(expectedLength, 8);
|
||||||
|
switchFileIfNecessary(alignedLength);
|
||||||
|
final JournalFile currentFile = getCurrentFile();
|
||||||
|
final int fileId = currentFile.getRecordID();
|
||||||
|
if (this.journalRecordBytes == null || this.journalRecordBytes.capacity() < alignedLength) {
|
||||||
|
final int newPooledLength = align(alignedLength, 4096);
|
||||||
|
//TODO ADD LIMITS OR WARNS IN CASE OF TOO MUCH BIGGER SIZE
|
||||||
|
this.journalRecordBytes = ByteBuffer.allocateDirect(newPooledLength);
|
||||||
|
this.journalRecordBytes.order(ByteOrder.nativeOrder());
|
||||||
|
}
|
||||||
|
final long journalRecordHeader = JournalRecordHeader.makeHeader(JournalRecordTypes.ADD_JOURNAL, expectedLength);
|
||||||
|
this.journalRecordBytes.putLong(0, journalRecordHeader);
|
||||||
|
//use natural stride while encoding: FileId<CompactCount<Id<RecordType<RecordBytes
|
||||||
|
this.addJournalRecordEncoder.on(this.journalRecordBytes, JournalRecordHeader.BYTES).fileId(fileId).compactCount(0).id(id).recordType(recordType).record(encodedRecord, offset, length);
|
||||||
|
final SequentialFile sequentialFile = currentFile.getFile();
|
||||||
|
try {
|
||||||
|
this.journalRecordBytes.limit(alignedLength);
|
||||||
|
sequentialFile.writeDirect(this.journalRecordBytes, sync);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
this.journalRecordBytes.clear();
|
||||||
|
}
|
||||||
|
//TODO AVOID INDEXING WITH CONCURRENT MAP!
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,134 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import net.openhft.chronicle.core.jlbh.JLBH;
|
||||||
|
import net.openhft.chronicle.core.jlbh.JLBHOptions;
|
||||||
|
import net.openhft.chronicle.core.jlbh.JLBHTask;
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||||
|
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||||
|
|
||||||
|
public class GcFreeJournalLatencyBench implements JLBHTask {
|
||||||
|
|
||||||
|
private static final int FILE_SIZE = JournalImpl.SIZE_HEADER + (1024 * 1024 * 1024);
|
||||||
|
private static final JournalType JOURNAL_TYPE = JournalType.MAPPED;
|
||||||
|
private static final int ITERATIONS = 100_000;
|
||||||
|
private static final int WARMUP_ITERATIONS = 20_000;
|
||||||
|
private static final int TARGET_THROUGHPUT = 500_000;
|
||||||
|
private static final int TESTS = 5;
|
||||||
|
private static int TOTAL_MESSAGES = (ITERATIONS * TESTS + WARMUP_ITERATIONS);
|
||||||
|
private static int ENCODED_SIZE = 8;
|
||||||
|
private static int CHUNK_BYTES = FILE_SIZE;
|
||||||
|
private static int OVERLAP_BYTES = CHUNK_BYTES / 4;
|
||||||
|
private final SequentialFileFactory sequentialFileFactory;
|
||||||
|
private GcFreeJournal journal;
|
||||||
|
private JLBH jlbh;
|
||||||
|
private long id;
|
||||||
|
private ByteBuffer encodedRecord;
|
||||||
|
|
||||||
|
public GcFreeJournalLatencyBench(SequentialFileFactory sequentialFileFactory) {
|
||||||
|
this.sequentialFileFactory = sequentialFileFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
final File journalDir = Files.createTempDirectory("seq_files").toFile();
|
||||||
|
journalDir.deleteOnExit();
|
||||||
|
final boolean buffered = false;
|
||||||
|
final int bufferSize = 4096;
|
||||||
|
final int bufferTimeout = 0;
|
||||||
|
final int maxIO = -1;
|
||||||
|
final boolean logRates = false;
|
||||||
|
final IOCriticalErrorListener criticalErrorListener = null;
|
||||||
|
final SequentialFileFactory sequentialFileFactory;
|
||||||
|
switch (JOURNAL_TYPE) {
|
||||||
|
case MAPPED:
|
||||||
|
sequentialFileFactory = new MappedSequentialFileFactory(journalDir, criticalErrorListener).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES);
|
||||||
|
break;
|
||||||
|
case NIO:
|
||||||
|
sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw new AssertionError("!?");
|
||||||
|
}
|
||||||
|
final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new GcFreeJournalLatencyBench(sequentialFileFactory));
|
||||||
|
new JLBH(lth).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(JLBH jlbh) {
|
||||||
|
id = 0;
|
||||||
|
this.jlbh = jlbh;
|
||||||
|
final int expectedMaxSize = GcFreeJournal.align(JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(ENCODED_SIZE), 8);
|
||||||
|
int numFiles = (int) ((TOTAL_MESSAGES * expectedMaxSize + 512) / FILE_SIZE * 1.3);
|
||||||
|
if (numFiles < 2) {
|
||||||
|
numFiles = 2;
|
||||||
|
}
|
||||||
|
this.encodedRecord = ByteBuffer.allocateDirect(ENCODED_SIZE);
|
||||||
|
this.encodedRecord.order(ByteOrder.nativeOrder());
|
||||||
|
this.journal = new GcFreeJournal(FILE_SIZE, numFiles, numFiles, 0, 0, sequentialFileFactory, "activemq-data", "amq", Integer.MAX_VALUE);
|
||||||
|
try {
|
||||||
|
journal.start();
|
||||||
|
journal.load(new ArrayList<RecordInfo>(), null, null);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(long startTimeNS) {
|
||||||
|
id++;
|
||||||
|
try {
|
||||||
|
journal.appendAddRecord(id, (byte) 0, encodedRecord, 0, ENCODED_SIZE, false);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
jlbh.sample(System.nanoTime() - startTimeNS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void complete() {
|
||||||
|
try {
|
||||||
|
journal.stop();
|
||||||
|
for (File journalFile : sequentialFileFactory.getDirectory().listFiles()) {
|
||||||
|
journalFile.deleteOnExit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum JournalType {
|
||||||
|
MAPPED,
|
||||||
|
NIO
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,105 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* IT IS NOT A FLYWEIGHT BUT AN ENCODER: NEED TO RESPECT THE SEQUENCE OF WRITE:
|
||||||
|
* FileId<CompactCount<Id<RecordType<RecordBytes
|
||||||
|
*/
|
||||||
|
final class AddJournalRecordEncoder {
|
||||||
|
|
||||||
|
private static final int FILE_ID_OFFSET = 0;
|
||||||
|
private static final int COMPACT_COUNT_OFFSET = FILE_ID_OFFSET + 4;
|
||||||
|
private static final int ID_OFFSET = COMPACT_COUNT_OFFSET + 4;
|
||||||
|
private static final int RECORD_TYPE_OFFSET = ID_OFFSET + 8;
|
||||||
|
public static final int BLOCK_SIZE = RECORD_TYPE_OFFSET + 4;
|
||||||
|
|
||||||
|
private ByteBuffer bytes;
|
||||||
|
private int offset;
|
||||||
|
private int limit;
|
||||||
|
|
||||||
|
public static int expectedSize(int recordBytes) {
|
||||||
|
return BLOCK_SIZE + 4 + recordBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ByteBuffer bytes() {
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int offset() {
|
||||||
|
return this.offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int limit() {
|
||||||
|
return this.limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void limit(int limit) {
|
||||||
|
this.limit = limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AddJournalRecordEncoder on(ByteBuffer bytes, int offset) {
|
||||||
|
this.bytes = bytes;
|
||||||
|
this.offset = offset;
|
||||||
|
this.limit = offset + BLOCK_SIZE;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AddJournalRecordEncoder fileId(int value) {
|
||||||
|
this.bytes.putInt(offset + FILE_ID_OFFSET, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AddJournalRecordEncoder compactCount(int value) {
|
||||||
|
this.bytes.putInt(offset + COMPACT_COUNT_OFFSET, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AddJournalRecordEncoder id(long value) {
|
||||||
|
this.bytes.putLong(offset + ID_OFFSET, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AddJournalRecordEncoder recordType(int value) {
|
||||||
|
this.bytes.putLong(offset + RECORD_TYPE_OFFSET, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AddJournalRecordEncoder noRecord() {
|
||||||
|
this.bytes.putInt(this.limit, 0);
|
||||||
|
this.limit += 4;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AddJournalRecordEncoder record(final ByteBuffer recordBytes, final int recordOffset, final int recordLength) {
|
||||||
|
this.bytes.putInt(this.limit, recordLength);
|
||||||
|
final long dstAddr = PlatformDependent.directBufferAddress(bytes) + this.limit + 4;
|
||||||
|
final long srcAddr = PlatformDependent.directBufferAddress(recordBytes) + recordOffset;
|
||||||
|
PlatformDependent.copyMemory(srcAddr, dstAddr, recordLength);
|
||||||
|
this.limit += (4 + recordLength);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int encodedLength() {
|
||||||
|
return this.limit - this.offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||||
|
|
||||||
|
final class JournalRecordHeader {
|
||||||
|
|
||||||
|
public static final int BYTES = 8;
|
||||||
|
|
||||||
|
public static long makeHeader(final int journalRecordTypeId, final int length) {
|
||||||
|
return ((journalRecordTypeId & 0xFFFF_FFFFL) << 32) | (length & 0xFFFF_FFFFL);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by developer on 18/06/16.
|
||||||
|
*/
|
||||||
|
final class JournalRecordTypes {
|
||||||
|
|
||||||
|
public static final int ADD_JOURNAL = 11;
|
||||||
|
|
||||||
|
private JournalRecordTypes() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,131 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.extras.benchmarks.sequentialfile;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
|
||||||
|
import net.openhft.chronicle.core.jlbh.JLBH;
|
||||||
|
import net.openhft.chronicle.core.jlbh.JLBHOptions;
|
||||||
|
import net.openhft.chronicle.core.jlbh.JLBHTask;
|
||||||
|
import org.apache.activemq.artemis.core.io.DummyCallback;
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||||
|
|
||||||
|
public final class SequentialFileLatencyBench implements JLBHTask {
|
||||||
|
|
||||||
|
private static final JournalType JOURNAL_TYPE = JournalType.MAPPED;
|
||||||
|
//NOTE: SUPPORTED ONLY ON *NIX
|
||||||
|
private static final boolean SHM = false;
|
||||||
|
private static final int JOURNAL_RECORD_SIZE = 8;
|
||||||
|
private static final int ITERATIONS = 100_000;
|
||||||
|
private static final int WARMUP_ITERATIONS = 20_000;
|
||||||
|
private static final int TARGET_THROUGHPUT = 500_000;
|
||||||
|
private static final int TESTS = 5;
|
||||||
|
private static int CHUNK_BYTES = 4096 * 1024 * 16;
|
||||||
|
private static int OVERLAP_BYTES = CHUNK_BYTES / 4;
|
||||||
|
private final SequentialFileFactory sequentialFileFactory;
|
||||||
|
private SequentialFile sequentialFile;
|
||||||
|
private ByteBuffer message;
|
||||||
|
private JLBH jlbh;
|
||||||
|
public SequentialFileLatencyBench(SequentialFileFactory sequentialFileFactory) {
|
||||||
|
this.sequentialFileFactory = sequentialFileFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
final File journalDir;
|
||||||
|
if (SHM) {
|
||||||
|
journalDir = Files.createDirectory(Paths.get("/dev/shm/seq_files")).toFile();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
journalDir = Files.createTempDirectory("seq_files").toFile();
|
||||||
|
}
|
||||||
|
journalDir.deleteOnExit();
|
||||||
|
final boolean buffered = false;
|
||||||
|
final int bufferSize = 4096;
|
||||||
|
final int bufferTimeout = 0;
|
||||||
|
final int maxIO = -1;
|
||||||
|
final boolean logRates = false;
|
||||||
|
final IOCriticalErrorListener criticalErrorListener = null;
|
||||||
|
final SequentialFileFactory sequentialFileFactory;
|
||||||
|
switch (JOURNAL_TYPE) {
|
||||||
|
case MAPPED:
|
||||||
|
sequentialFileFactory = new MappedSequentialFileFactory(journalDir).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES);
|
||||||
|
break;
|
||||||
|
case NIO:
|
||||||
|
sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new AssertionError("!?");
|
||||||
|
}
|
||||||
|
final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new SequentialFileLatencyBench(sequentialFileFactory));
|
||||||
|
new JLBH(lth).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(JLBH jlbh) {
|
||||||
|
this.jlbh = jlbh;
|
||||||
|
this.sequentialFile = this.sequentialFileFactory.createSequentialFile(Long.toString(System.nanoTime()));
|
||||||
|
try {
|
||||||
|
this.sequentialFile.open(-1, false);
|
||||||
|
final File file = this.sequentialFile.getJavaFile();
|
||||||
|
file.deleteOnExit();
|
||||||
|
System.out.println("sequentialFile: " + file);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
this.message = this.sequentialFileFactory.allocateDirectBuffer(JOURNAL_RECORD_SIZE).order(ByteOrder.nativeOrder());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(long startTimeNS) {
|
||||||
|
message.position(0);
|
||||||
|
try {
|
||||||
|
sequentialFile.writeDirect(message, false, DummyCallback.getInstance());
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
jlbh.sample(System.nanoTime() - startTimeNS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void complete() {
|
||||||
|
sequentialFileFactory.releaseDirectBuffer(message);
|
||||||
|
try {
|
||||||
|
sequentialFile.close();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum JournalType {
|
||||||
|
MAPPED,
|
||||||
|
NIO
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue