ARTEMIS-618 Using proper block size on Native One Mega Buffer

This buffer is used on fill
This commit is contained in:
Clebert Suconic 2018-04-04 20:37:38 -04:00 committed by Justin Bertram
parent 97ccf96bb4
commit 4dd594f38b
8 changed files with 100 additions and 64 deletions

View File

@ -151,7 +151,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
}
checkOpened();
aioFile.fill(size);
aioFile.fill(aioFactory.getAlignment(), size);
fileSize = aioFile.getSize();
}

View File

@ -65,6 +65,7 @@ char dumbPath[PATH_MAX];
#define ONE_MEGA 1048576l
void * oneMegaBuffer = 0;
pthread_mutex_t oneMegaMutex;
jclass submitClass = NULL;
@ -119,18 +120,39 @@ char* exceptionMessage(char* msg, int error) {
return result;
}
static inline short verifyBuffer(int alignment) {
pthread_mutex_lock(&oneMegaMutex);
if (oneMegaBuffer == 0) {
#ifdef DEBUG
fprintf (stdout, "oneMegaBuffer %ld\n", (long) oneMegaBuffer);
#endif
if (posix_memalign(&oneMegaBuffer, alignment, ONE_MEGA) != 0) {
fprintf(stderr, "Could not allocate the 1 Mega Buffer for initializing files\n");
pthread_mutex_unlock(&oneMegaMutex);
return -1;
}
memset(oneMegaBuffer, 0, ONE_MEGA);
}
pthread_mutex_unlock(&oneMegaMutex);
return 0;
}
jint JNI_OnLoad(JavaVM* vm, void* reserved) {
JNIEnv* env;
if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
return JNI_ERR;
} else {
if (posix_memalign(&oneMegaBuffer, 512, ONE_MEGA) != 0)
{
fprintf(stderr, "Could not allocate the 1 Mega Buffer for initializing files\n");
int res = pthread_mutex_init(&oneMegaMutex, 0);
if (res) {
fprintf(stderr, "could not initialize mutex on on_load, %d", res);
return JNI_ERR;
}
memset(oneMegaBuffer, 0, ONE_MEGA);
sprintf (dumbPath, "%s/artemisJLHandler_XXXXXX", P_tmpdir);
dumbWriteHandler = mkstemp (dumbPath);
@ -228,7 +250,12 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
} else {
closeDumbHandlers();
if (oneMegaBuffer != 0) {
free(oneMegaBuffer);
oneMegaBuffer = 0;
}
pthread_mutex_destroy(&oneMegaMutex);
// delete global references so the GC can collect them
if (runtimeExceptionClass != NULL) {
@ -782,7 +809,7 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fa
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fill
(JNIEnv * env, jclass clazz, jint fd, jlong size)
(JNIEnv * env, jclass clazz, jint fd, jint alignment, jlong size)
{
int i;
@ -790,14 +817,20 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fi
int rest = size % ONE_MEGA;
#ifdef DEBUG
fprintf (stderr, "blocks = %d, rest=%d\n", blocks, rest);
fprintf (stdout, "calling fill ... blocks = %d, rest=%d, alignment=%d\n", blocks, rest, alignment);
#endif
verifyBuffer(alignment);
lseek (fd, 0, SEEK_SET);
for (i = 0; i < blocks; i++)
{
if (write(fd, oneMegaBuffer, ONE_MEGA) < 0)
{
#ifdef DEBUG
fprintf (stdout, "Errno is %d\n", errno);
#endif
throwIOException(env, "Cannot initialize file");
return;
}
@ -807,7 +840,10 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fi
{
if (write(fd, oneMegaBuffer, rest) < 0)
{
throwIOException(env, "Cannot initialize file");
#ifdef DEBUG
fprintf (stdout, "Errno is %d\n", errno);
#endif
throwIOException(env, "Cannot initialize file with final rest");
return;
}
}

View File

@ -49,7 +49,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
* <br>
* Or else the native module won't be loaded because of version mismatches
*/
private static final int EXPECTED_NATIVE_VERSION = 7;
private static final int EXPECTED_NATIVE_VERSION = 8;
private static boolean loaded = false;
@ -460,7 +460,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
static native void fallocate(int fd, long size);
static native void fill(int fd, long size);
static native void fill(int fd, int alignment, long size);
static native void writeInternal(int fd, long position, long size, ByteBuffer bufferWrite) throws IOException;
}

View File

@ -108,7 +108,7 @@ public final class LibaioFile<Callback extends SubmitInfo> implements AutoClosea
* @return the buffer allocated.
*/
public ByteBuffer newBuffer(int size) {
return LibaioContext.newAlignedBuffer(size, 512);
return LibaioContext.newAlignedBuffer(size, 4 * 1024);
}
/**
@ -116,9 +116,9 @@ public final class LibaioFile<Callback extends SubmitInfo> implements AutoClosea
*
* @param size number of bytes to be filled on the file
*/
public void fill(long size) {
public void fill(int alignment, long size) {
try {
LibaioContext.fill(fd, size);
LibaioContext.fill(fd, alignment, size);
} catch (OutOfMemoryError e) {
NativeLogger.LOGGER.debug("Didn't have enough memory to allocate " + size + " bytes in memory, using simple fallocate");
LibaioContext.fallocate(fd, size);

View File

@ -136,7 +136,7 @@ public class LibaioTest {
buffer.position(0);
LibaioFile fileDescriptor2 = control.openFile(temporaryFolder.newFile("test2.bin"), true);
fileDescriptor2.fill(size);
fileDescriptor2.fill(fileDescriptor.getBlockSize(), size);
fileDescriptor2.read(0, size, buffer, new TestInfo());
control.poll(callbacks, 1, 1);
@ -149,12 +149,12 @@ public class LibaioTest {
@Test
public void testInitAndFallocate10K() throws Exception {
testInit(10 * 1024);
testInit(10 * 4096);
}
@Test
public void testInitAndFallocate20K() throws Exception {
testInit(20 * 1024);
testInit(20 * 4096);
}
@Test
@ -168,18 +168,18 @@ public class LibaioTest {
LibaioFile[] fileDescriptor = new LibaioFile[]{control.openFile(file1, true), control.openFile(file2, true)};
Assert.assertEquals((LIBAIO_QUEUE_SIZE / 2) * 512, fileDescriptor[0].getSize());
Assert.assertEquals((LIBAIO_QUEUE_SIZE / 2) * 512, fileDescriptor[1].getSize());
Assert.assertEquals((LIBAIO_QUEUE_SIZE / 2) * 4096, fileDescriptor[0].getSize());
Assert.assertEquals((LIBAIO_QUEUE_SIZE / 2) * 4096, fileDescriptor[1].getSize());
Assert.assertEquals(fileDescriptor[0].getBlockSize(), fileDescriptor[1].getBlockSize());
Assert.assertEquals(LibaioContext.getBlockSize(temporaryFolder.getRoot()), LibaioContext.getBlockSize(file1));
Assert.assertEquals(LibaioContext.getBlockSize(file1), LibaioContext.getBlockSize(file2));
System.out.println("blockSize = " + fileDescriptor[0].getBlockSize());
System.out.println("blockSize /tmp= " + LibaioContext.getBlockSize("/tmp"));
ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096, 4096);
try {
for (int i = 0; i < 512; i++) {
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'a');
}
@ -188,7 +188,7 @@ public class LibaioTest {
for (int i = 0; i < LIBAIO_QUEUE_SIZE / 2; i++) {
for (LibaioFile file : fileDescriptor) {
file.write(i * 512, 512, buffer, callback);
file.write(i * 4096, 4096, buffer, callback);
}
}
@ -199,15 +199,15 @@ public class LibaioTest {
}
for (LibaioFile file : fileDescriptor) {
ByteBuffer bigbuffer = LibaioContext.newAlignedBuffer(512 * 25, 512);
file.read(0, 512 * 25, bigbuffer, callback);
ByteBuffer bigbuffer = LibaioContext.newAlignedBuffer(4096 * 25, 4096);
file.read(0, 4096 * 25, bigbuffer, callback);
Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
for (Object returnedCallback : callbacks) {
Assert.assertSame(returnedCallback, callback);
}
for (int i = 0; i < 512 * 25; i++) {
for (int i = 0; i < 4096 * 25; i++) {
Assert.assertEquals((byte) 'a', bigbuffer.get());
}
@ -228,17 +228,17 @@ public class LibaioTest {
LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
// ByteBuffer buffer = ByteBuffer.allocateDirect(512);
ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
// ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096, 4096);
try {
for (int i = 0; i < 512; i++) {
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'a');
}
buffer.rewind();
fileDescriptor.write(0, 512, buffer, callback);
fileDescriptor.write(0, 4096, buffer, callback);
int retValue = control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE);
Assert.assertEquals(1, retValue);
@ -247,23 +247,23 @@ public class LibaioTest {
LibaioContext.freeBuffer(buffer);
buffer = LibaioContext.newAlignedBuffer(512, 512);
buffer = LibaioContext.newAlignedBuffer(4096, 4096);
for (int i = 0; i < 512; i++) {
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'B');
}
fileDescriptor.write(0, 512, buffer, null);
fileDescriptor.write(0, 4096, buffer, null);
Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
buffer.rewind();
fileDescriptor.read(0, 512, buffer, null);
fileDescriptor.read(0, 4096, buffer, null);
Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
for (int i = 0; i < 512; i++) {
for (int i = 0; i < 4096; i++) {
Assert.assertEquals('B', buffer.get());
}
} finally {
@ -343,9 +343,9 @@ public class LibaioTest {
LibaioFile fileDescriptor = control.openFile(file, true);
ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096, 4096);
final int BUFFER_SIZE = 512;
final int BUFFER_SIZE = 4096;
try {
for (int i = 0; i < BUFFER_SIZE; i++) {
buffer.put((byte) '@');
@ -402,20 +402,20 @@ public class LibaioTest {
System.out.println("Error:" + callbacks[0]);
buffer = fileDescriptor.newBuffer(512);
for (int i = 0; i < 512; i++) {
buffer = fileDescriptor.newBuffer(4096);
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'z');
}
callback = new TestInfo();
fileDescriptor.write(0, 512, buffer, callback);
fileDescriptor.write(0, 4096, buffer, callback);
Assert.assertEquals(1, control.poll(callbacks, 1, 1));
Assert.assertSame(callback, callbacks[0]);
fileDescriptor.write(5, 512, buffer, callback);
fileDescriptor.write(5, 4096, buffer, callback);
Assert.assertEquals(1, control.poll(callbacks, 1, 1));
@ -440,17 +440,17 @@ public class LibaioTest {
LibaioFile<TestInfo> fileDescriptor = control.openFile(file, true);
ByteBuffer bufferWrite = LibaioContext.newAlignedBuffer(512, 512);
ByteBuffer bufferWrite = LibaioContext.newAlignedBuffer(4096, 4096);
try {
for (int i = 0; i < 512; i++) {
for (int i = 0; i < 4096; i++) {
bufferWrite.put((byte) 'B');
}
for (int j = 0; j < LIBAIO_QUEUE_SIZE * 2; j++) {
for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++) {
TestInfo countClass = new TestInfo();
fileDescriptor.write(i * 512, 512, bufferWrite, countClass);
fileDescriptor.write(i * 4096, 4096, bufferWrite, countClass);
}
Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE));
@ -482,7 +482,7 @@ public class LibaioTest {
File file = temporaryFolder.newFile("test.bin");
LibaioFile fileDescriptor = control.openFile(file, true);
fileDescriptor.fill(10 * 1024 * 1024);
fileDescriptor.fill(fileDescriptor.getBlockSize(),10 * 1024 * 1024);
fileDescriptor.close();
}
@ -503,7 +503,7 @@ public class LibaioTest {
@Test
public void testMemset() throws Exception {
ByteBuffer buffer = LibaioContext.newAlignedBuffer(512 * 8, 512);
ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096 * 8, 4096);
for (int i = 0; i < buffer.capacity(); i++) {
buffer.put((byte) 'z');
@ -566,20 +566,20 @@ public class LibaioTest {
fileDescriptor = control.openFile(temporaryFolder.newFile(), true);
ByteBuffer buffer = fileDescriptor.newBuffer(512);
ByteBuffer buffer = fileDescriptor.newBuffer(4096);
try {
for (int i = 0; i < 512; i++) {
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'a');
}
for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++) {
fileDescriptor.write(i * 512, 512, buffer, new TestInfo());
fileDescriptor.write(i * 4096, 4096, buffer, new TestInfo());
}
boolean ex = false;
try {
fileDescriptor.write(0, 512, buffer, new TestInfo());
fileDescriptor.write(0, 4096, buffer, new TestInfo());
} catch (Exception e) {
ex = true;
}
@ -590,12 +590,12 @@ public class LibaioTest {
Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE));
// it should be possible to write now after queue space being released
fileDescriptor.write(0, 512, buffer, new TestInfo());
fileDescriptor.write(0, 4096, buffer, new TestInfo());
Assert.assertEquals(1, control.poll(callbacks, 1, 100));
TestInfo errorCallback = new TestInfo();
// odd positions will have failures through O_DIRECT
fileDescriptor.read(3, 512, buffer, errorCallback);
fileDescriptor.read(3, 4096, buffer, errorCallback);
Assert.assertEquals(1, control.poll(callbacks, 1, 50));
Assert.assertTrue(callbacks[0].isError());
Assert.assertSame(errorCallback, (callbacks[0]));
@ -608,7 +608,7 @@ public class LibaioTest {
exceptionThrown = false;
try {
LibaioContext.newAlignedBuffer(300, 512);
LibaioContext.newAlignedBuffer(300, 4096);
} catch (RuntimeException e) {
exceptionThrown = true;
}
@ -617,7 +617,7 @@ public class LibaioTest {
exceptionThrown = false;
try {
LibaioContext.newAlignedBuffer(-512, 512);
LibaioContext.newAlignedBuffer(-4096, 4096);
} catch (RuntimeException e) {
exceptionThrown = true;
}
@ -646,7 +646,7 @@ public class LibaioTest {
File file = temporaryFolder.newFile("sub-file.txt");
LibaioFile aioFile = blockedContext.openFile(file, true);
aioFile.fill(NUMBER_OF_BLOCKS * 512);
aioFile.fill(aioFile.getBlockSize(),NUMBER_OF_BLOCKS * 4096);
final AtomicInteger errors = new AtomicInteger(0);
@ -665,16 +665,16 @@ public class LibaioTest {
MyCallback callback = new MyCallback();
ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096, 4096);
for (int i = 0; i < 512; i++) {
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'a');
}
long start = System.currentTimeMillis();
for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
aioFile.write(i * 512, 512, buffer, callback);
aioFile.write(i * 4096, 4096, buffer, callback);
}
long end = System.currentTimeMillis();
@ -689,8 +689,8 @@ public class LibaioTest {
private void fillupFile(File file, int blocks) throws IOException {
FileOutputStream fileOutputStream = new FileOutputStream(file);
byte[] bufferWrite = new byte[512];
for (int i = 0; i < 512; i++) {
byte[] bufferWrite = new byte[4096];
for (int i = 0; i < 4096; i++) {
bufferWrite[i] = (byte) 0;
}

View File

@ -62,7 +62,7 @@ public class OpenCloseContextTest {
};
t.start();
LibaioFile file = control.openFile(folder.newFile(), true);
file.fill(4 * 1024);
file.fill(file.getBlockSize(),4 * 1024);
final CountDownLatch insideMethod = new CountDownLatch(1);
final CountDownLatch awaitInside = new CountDownLatch(1);
file.write(0, 512, buffer, new SubmitInfo() {
@ -120,7 +120,7 @@ public class OpenCloseContextTest {
};
t.start();
LibaioFile file = control.openFile(folder.newFile(), true);
file.fill(4 * 1024);
file.fill(file.getBlockSize(), 4 * 1024);
final CountDownLatch insideMethod = new CountDownLatch(1);
final CountDownLatch awaitInside = new CountDownLatch(1);
file.write(0, 512, buffer, new SubmitInfo() {

View File

@ -68,7 +68,7 @@ public abstract class AIOTestBase extends ActiveMQTestBase {
}
protected void preAlloc(final LibaioFile controller, final long size) throws ActiveMQException {
controller.fill(size);
controller.fill(controller.getBlockSize(), size);
}
protected static class CountDownCallback implements IOCallback {