ARTEMIS-1811 NIO Seq File should use RandomAccessFile with heap buffers

It use RandomAccessFile to allow using heap buffers without additional
copies and/or leaks of direct buffers, as performed by FileChannel JDK
implementation (see https://bugs.openjdk.java.net/browse/JDK-8147468)
This commit is contained in:
Francesco Nigro 2018-09-03 13:51:23 +02:00 committed by Clebert Suconic
parent 77376e5d70
commit f51c799ac0
2 changed files with 217 additions and 9 deletions

View File

@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.io.nio;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -42,8 +42,20 @@ import org.apache.activemq.artemis.utils.Env;
public class NIOSequentialFile extends AbstractSequentialFile { public class NIOSequentialFile extends AbstractSequentialFile {
/* This value has been tuned just to reduce the memory footprint
of read/write of the whole file size: given that this value
is > 8192, RandomAccessFile JNI code will use malloc/free instead
of using a copy on the stack, but it has been proven to NOT be
a bottleneck.
Instead of reading the whole content in a single operation, this will read in smaller chunks.
*/
private static final int CHUNK_SIZE = 2 * 1024 * 1024;
private FileChannel channel; private FileChannel channel;
private RandomAccessFile rfile;
private final int maxIO; private final int maxIO;
public NIOSequentialFile(final SequentialFileFactory factory, public NIOSequentialFile(final SequentialFileFactory factory,
@ -82,7 +94,9 @@ public class NIOSequentialFile extends AbstractSequentialFile {
@Override @Override
public void open(final int maxIO, final boolean useExecutor) throws IOException { public void open(final int maxIO, final boolean useExecutor) throws IOException {
try { try {
channel = FileChannel.open(getFile().toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ); rfile = new RandomAccessFile(getFile(), "rw");
channel = rfile.getChannel();
fileSize = channel.size(); fileSize = channel.size();
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
@ -139,18 +153,27 @@ public class NIOSequentialFile extends AbstractSequentialFile {
super.close(); super.close();
try { try {
if (channel != null) { try {
if (waitSync && factory.isDatasync()) if (channel != null) {
channel.force(false); if (waitSync && factory.isDatasync())
channel.close(); channel.force(false);
channel.close();
}
} finally {
if (rfile != null) {
rfile.close();
}
} }
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
throw e; throw e;
} catch (IOException e) { } catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e; throw e;
} finally {
channel = null;
rfile = null;
} }
channel = null;
notifyAll(); notifyAll();
} }
@ -160,6 +183,37 @@ public class NIOSequentialFile extends AbstractSequentialFile {
return read(bytes, null); return read(bytes, null);
} }
private static int readRafInChunks(RandomAccessFile raf, byte[] b, int off, int len) throws IOException {
int remaining = len;
int offset = off;
while (remaining > 0) {
final int chunkSize = Math.min(CHUNK_SIZE, remaining);
final int read = raf.read(b, offset, chunkSize);
assert read != 0;
if (read == -1) {
if (len == remaining) {
return -1;
}
break;
}
offset += read;
remaining -= read;
}
return len - remaining;
}
private static void writeRafInChunks(RandomAccessFile raf, byte[] b, int off, int len) throws IOException {
int remaining = len;
int offset = off;
while (remaining > 0) {
final int chunkSize = Math.min(CHUNK_SIZE, remaining);
raf.write(b, offset, chunkSize);
offset += chunkSize;
remaining -= chunkSize;
}
}
@Override @Override
public synchronized int read(final ByteBuffer bytes, public synchronized int read(final ByteBuffer bytes,
final IOCallback callback) throws IOException, ActiveMQIllegalStateException { final IOCallback callback) throws IOException, ActiveMQIllegalStateException {
@ -167,7 +221,19 @@ public class NIOSequentialFile extends AbstractSequentialFile {
if (channel == null) { if (channel == null) {
throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel"); throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel");
} }
int bytesRead = channel.read(bytes); final int bytesRead;
if (bytes.hasArray()) {
if (bytes.remaining() > CHUNK_SIZE) {
bytesRead = readRafInChunks(rfile, bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
} else {
bytesRead = rfile.read(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
}
if (bytesRead > 0) {
bytes.position(bytes.position() + bytesRead);
}
} else {
bytesRead = channel.read(bytes);
}
if (callback != null) { if (callback != null) {
callback.done(); callback.done();
@ -310,7 +376,16 @@ public class NIOSequentialFile extends AbstractSequentialFile {
final IOCallback callback, final IOCallback callback,
boolean releaseBuffer) throws IOException { boolean releaseBuffer) throws IOException {
try { try {
channel.write(bytes); if (bytes.hasArray()) {
if (bytes.remaining() > CHUNK_SIZE) {
writeRafInChunks(rfile, bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
} else {
rfile.write(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
}
bytes.position(bytes.limit());
} else {
channel.write(bytes);
}
if (sync) { if (sync) {
sync(); sync();

View File

@ -17,10 +17,15 @@
package org.apache.activemq.artemis.tests.integration.journal; package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File; import java.io.File;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase; import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase;
import org.junit.Assert;
import org.junit.Test;
public class NIONonBufferedSequentialFileFactoryTest extends SequentialFileFactoryTestBase { public class NIONonBufferedSequentialFileFactoryTest extends SequentialFileFactoryTestBase {
@ -29,4 +34,132 @@ public class NIONonBufferedSequentialFileFactoryTest extends SequentialFileFacto
return new NIOSequentialFileFactory(new File(folder), false, 1); return new NIOSequentialFileFactory(new File(folder), false, 1);
} }
@Test
public void writeHeapBufferNotFromBeginningAndReadWithDirectBuffer() throws Exception {
writeHeapBufferNotFromBeginningAndRead(false);
}
@Test
public void writeHeapBufferNotFromBeginningAndReadWithHeapBuffer() throws Exception {
writeHeapBufferNotFromBeginningAndRead(true);
}
private void writeHeapBufferNotFromBeginningAndRead(boolean useHeapByteBufferToRead) throws Exception {
final SequentialFile file = factory.createSequentialFile("write.amq");
file.open();
Assert.assertEquals(0, file.size());
Assert.assertEquals(0, file.position());
try {
final String data = "writeDirectArray";
final byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
file.position(factory.calculateBlockSize(bytes.length));
file.writeDirect(ByteBuffer.wrap(bytes), false);
final ByteBuffer readBuffer;
if (!useHeapByteBufferToRead) {
readBuffer = factory.newBuffer(bytes.length);
} else {
readBuffer = ByteBuffer.allocate(bytes.length);
}
try {
file.position(factory.calculateBlockSize(bytes.length));
Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
} finally {
if (!useHeapByteBufferToRead) {
factory.releaseBuffer(readBuffer);
}
}
} finally {
file.close();
file.delete();
}
}
@Test
public void writeHeapBufferAndReadWithDirectBuffer() throws Exception {
writeHeapBufferAndRead(false);
}
@Test
public void writeHeapBufferAndReadWithHeapBuffer() throws Exception {
writeHeapBufferAndRead(true);
}
private void writeHeapBufferAndRead(boolean useHeapByteBufferToRead) throws Exception {
final SequentialFile file = factory.createSequentialFile("write.amq");
file.open();
Assert.assertEquals(0, file.size());
Assert.assertEquals(0, file.position());
try {
final String data = "writeDirectArray";
final byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
file.writeDirect(ByteBuffer.wrap(bytes), false);
final ByteBuffer readBuffer;
if (!useHeapByteBufferToRead) {
readBuffer = factory.newBuffer(bytes.length);
} else {
readBuffer = ByteBuffer.allocate(bytes.length);
}
try {
file.position(0);
Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
} finally {
if (!useHeapByteBufferToRead) {
factory.releaseBuffer(readBuffer);
}
}
} finally {
file.close();
file.delete();
}
}
@Test
public void writeHeapAndDirectBufferAndReadWithDirectBuffer() throws Exception {
writeHeapAndDirectBufferAndRead(false);
}
@Test
public void writeHeapAndDirectBufferAndReadWithHeapBuffer() throws Exception {
writeHeapAndDirectBufferAndRead(true);
}
private void writeHeapAndDirectBufferAndRead(boolean useHeapByteBufferToRead) throws Exception {
final SequentialFile file = factory.createSequentialFile("write.amq");
file.open();
Assert.assertEquals(0, file.size());
Assert.assertEquals(0, file.position());
try {
final String data = "writeDirectArray";
final byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
file.writeDirect(ByteBuffer.wrap(bytes), false);
final ByteBuffer byteBuffer = factory.newBuffer(bytes.length);
byteBuffer.put(bytes);
byteBuffer.flip();
file.writeDirect(byteBuffer, false);
final ByteBuffer readBuffer;
if (!useHeapByteBufferToRead) {
readBuffer = factory.newBuffer(bytes.length);
} else {
readBuffer = ByteBuffer.allocate(bytes.length);
}
try {
file.position(0);
Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
readBuffer.flip();
Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
} finally {
if (!useHeapByteBufferToRead) {
factory.releaseBuffer(readBuffer);
}
}
} finally {
file.close();
file.delete();
}
}
} }