This closes #198

This commit is contained in:
Clebert Suconic 2015-10-15 16:03:18 -04:00
commit 46ac41de3f
8 changed files with 346 additions and 45 deletions

View File

@ -724,7 +724,7 @@ public class Create extends InputAbstract {
}
}
private boolean supportsLibaio() {
public boolean supportsLibaio() {
if (forceLibaio) {
// forcing libaio
return true;

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.cli.commands.Configurable;
import org.apache.activemq.artemis.cli.commands.Create;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
@ -77,6 +78,13 @@ public class ArtemisTest {
}
}
@Test
public void testSupportsLibaio() throws Exception {
Create x = new Create();
x.setInstance(new File("/tmp/foo"));
x.supportsLibaio();
}
@Test
public void testSync() throws Exception {
int writes = 20;

View File

@ -32,6 +32,7 @@
#include <fcntl.h>
#include <stdlib.h>
#include <pthread.h>
#include <limits.h>
#include "org_apache_activemq_artemis_jlibaio_LibaioContext.h"
#include "exception_helper.h"
@ -53,8 +54,16 @@ struct io_control {
int iocbPut;
int iocbGet;
int used;
};
// We need a fast and reliable way to stop the blocked poller
// for that we need a dumb file,
// We are using a temporary file for this.
int dumbWriteHandler = 0;
char dumbPath[PATH_MAX];
jclass submitClass = NULL;
jmethodID errorMethod = NULL;
jmethodID doneMethod = NULL;
@ -112,6 +121,20 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
return JNI_ERR;
} else {
sprintf (dumbPath, "%s/artemisJLHandler_XXXXXX", P_tmpdir);
dumbWriteHandler = mkstemp (dumbPath);
#ifdef DEBUG
fprintf (stdout, "Creating temp file %s for dumb writes\n", dumbPath);
fflush(stdout);
#endif
if (dumbWriteHandler < 0) {
fprintf (stderr, "couldn't create stop file handler %s\n", dumbPath);
return JNI_ERR;
}
//
// Accordingly to previous experiences we must hold Global Refs on Classes
// And
@ -177,12 +200,25 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
}
}
inline void closeDumbHandlers() {
if (dumbWriteHandler != 0) {
#ifdef DEBUG
fprintf (stdout, "Closing and removing dump handler %s\n", dumbPath);
#endif
dumbWriteHandler = 0;
close(dumbWriteHandler);
unlink(dumbPath);
}
}
void JNI_OnUnload(JavaVM* vm, void* reserved) {
JNIEnv* env;
if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
// Something is wrong but nothing we can do about this :(
return;
} else {
closeDumbHandlers();
// delete global references so the GC can collect them
if (runtimeExceptionClass != NULL) {
(*env)->DeleteGlobalRef(env, runtimeExceptionClass);
@ -201,6 +237,12 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
}
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_shutdownHook
(JNIEnv * env, jclass clazz) {
closeDumbHandlers();
}
static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) {
struct io_control * ioControl = (struct io_control *) (*env)->GetDirectBufferAddress(env, pointer);
if (ioControl == NULL) {
@ -252,6 +294,23 @@ static inline void putIOCB(struct io_control * control, struct iocb * iocbBack)
pthread_mutex_unlock(&(control->iocbLock));
}
static inline short submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) {
int result = io_submit(theControl->ioContext, 1, &iocb);
if (result < 0) {
// Putting the Global Ref and IOCB back in case of a failure
if (iocb->data != NULL && iocb->data != (void *) -1) {
(*env)->DeleteGlobalRef(env, (jobject)iocb->data);
}
putIOCB(theControl, iocb);
throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result);
return 0;
}
return 1;
}
static inline void * getBuffer(JNIEnv* env, jobject pointer) {
return (*env)->GetDirectBufferAddress(env, pointer);
}
@ -342,12 +401,34 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_de
return;
}
io_queue_release(theControl->ioContext);
struct iocb * iocb = getIOCB(theControl);
if (iocb == NULL) {
throwIOException(env, "Not enough space in libaio queue");
return;
}
// Submitting a dumb write so the loop finishes
io_prep_pwrite(iocb, dumbWriteHandler, 0, 0, 0);
iocb->data = (void *) -1;
if (!submit(env, theControl, iocb)) {
return;
}
// to make sure the poll has finished
pthread_mutex_lock(&(theControl->pollLock));
pthread_mutex_unlock(&(theControl->pollLock));
// To return any pending IOCBs
int result = io_getevents(theControl->ioContext, 0, 1, theControl->events, 0);
for (i = 0; i < result; i++) {
struct io_event * event = &(theControl->events[i]);
struct iocb * iocbp = event->obj;
putIOCB(theControl, iocbp);
}
io_queue_release(theControl->ioContext);
pthread_mutex_destroy(&(theControl->pollLock));
pthread_mutex_destroy(&(theControl->iocbLock));
@ -389,20 +470,6 @@ JNIEXPORT int JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_ope
return res;
}
static inline void submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) {
int result = io_submit(theControl->ioContext, 1, &iocb);
if (result < 0) {
// Putting the Global Ref and IOCB back in case of a failure
(*env)->DeleteGlobalRef(env, (jobject)iocb->data);
putIOCB(theControl, iocb);
throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result);
}
return;
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitWrite
(JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferWrite, jobject callback) {
struct io_control * theControl = getIOControl(env, contextPointer);
@ -429,7 +496,7 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su
// also as the real intention is to hold the reference until the life cycle is complete
iocb->data = (void *) (*env)->NewGlobalRef(env, callback);
return submit(env, theControl, iocb);
submit(env, theControl, iocb);
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitRead
@ -454,14 +521,15 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su
// also as the real intention is to hold the reference until the life cycle is complete
iocb->data = (void *) (*env)->NewGlobalRef(env, callback);
return submit(env, theControl, iocb);
submit(env, theControl, iocb);
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll
(JNIEnv * env, jobject thisObject, jobject contextPointer) {
#ifdef DEBUG
fprintf (stdout, "Running blockedPoll");
fprintf (stdout, "Running blockedPoll\n");
fflush(stdout);
#endif
int i;
@ -472,37 +540,53 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
int max = theControl->queueSize;
pthread_mutex_lock(&(theControl->pollLock));
for (;;) {
short running = 1;
while (running) {
int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0);
if (result < 0)
{
#ifdef DEBUG
fprintf (stdout, "finished blockedPoll rutine with result=%d\n", result);
#endif
throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result);
break;
}
#ifdef DEBUG
fprintf (stdout, "blockedPoll returned %d events\n", result);
fflush(stdout);
#endif
for (i = 0; i < result; i++)
{
#ifdef DEBUG
fprintf (stdout, "blockedPoll treading event %d\n", i);
fprintf (stdout, "blockedPoll treating event %d\n", i);
fflush(stdout);
#endif
struct io_event * event = &(theControl->events[i]);
struct iocb * iocbp = event->obj;
if (iocbp->aio_fildes == dumbWriteHandler) {
#ifdef DEBUG
fprintf (stdout, "Dumb write arrived, giving up the loop\n");
fflush(stdout);
#endif
putIOCB(theControl, iocbp);
running = 0;
break;
}
int eventResult = (int)event->res;
#ifdef DEBUG
fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result);
fflush (stdout);
#endif
if (eventResult < 0) {
#ifdef DEBUG
fprintf (stdout, "Error: %s\n", strerror(-eventResult));
fflush (stdout);
#endif
jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult));
@ -554,14 +638,15 @@ JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_po
fprintf (stdout, "Error: %s\n", strerror(-eventResult));
#endif
jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult));
if (iocbp->data != NULL && iocbp->data != (void *) -1) {
jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult));
(*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError);
(*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError);
}
}
(*env)->SetObjectArrayElement(env, callbacks, i, (jobject)iocbp->data);
if (iocbp->data != NULL) {
if (iocbp->data != NULL && iocbp->data != (void *) -1) {
(*env)->SetObjectArrayElement(env, callbacks, i, (jobject)iocbp->data);
// We delete the globalRef after the completion of the callback
(*env)->DeleteGlobalRef(env, (jobject)iocbp->data);
}

View File

@ -21,7 +21,9 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
@ -49,10 +51,14 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
* <br>
* Or else the native module won't be loaded because of version mismatches
*/
private static final int EXPECTED_NATIVE_VERSION = 2;
private static final int EXPECTED_NATIVE_VERSION = 3;
private static boolean loaded = false;
private static final AtomicBoolean shuttingDown = new AtomicBoolean(false);
private static final AtomicInteger contexts = new AtomicInteger(0);
public static boolean isLoaded() {
return loaded;
}
@ -81,6 +87,12 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
for (String library : libraries) {
if (loadLibrary(library)) {
loaded = true;
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
shuttingDown.set(true);
checkShutdown();
}
});
break;
}
else {
@ -93,6 +105,14 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
}
}
private static void checkShutdown() {
if (contexts.get() == 0 && shuttingDown.get()) {
shutdownHook();
}
}
private static native void shutdownHook();
/**
* This is used to validate leaks on tests.
*
@ -140,6 +160,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
*/
public LibaioContext(int queueSize, boolean useSemaphore) {
try {
contexts.incrementAndGet();
this.ioContext = newContext(queueSize);
}
catch (Exception e) {
@ -170,6 +191,9 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
int size,
ByteBuffer bufferWrite,
Callback callback) throws IOException {
if (closed.get()) {
throw new IOException("Libaio Context is closed!");
}
try {
if (ioSpace != null) {
ioSpace.acquire();
@ -187,6 +211,9 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
int size,
ByteBuffer bufferWrite,
Callback callback) throws IOException {
if (closed.get()) {
throw new IOException("Libaio Context is closed!");
}
try {
if (ioSpace != null) {
ioSpace.acquire();
@ -208,11 +235,22 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
@Override
public void close() {
if (!closed.getAndSet(true)) {
if (ioSpace != null) {
try {
ioSpace.tryAcquire(queueSize, 10, TimeUnit.SECONDS);
}
catch (Exception e) {
NativeLogger.LOGGER.error(e);
}
}
totalMaxIO.addAndGet(-queueSize);
if (ioContext != null) {
deleteContext(ioContext);
}
contexts.decrementAndGet();
checkShutdown();
}
}
@ -308,17 +346,19 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
* {@link SubmitInfo#done()} are called.
*/
public void poll() {
blockedPoll(ioContext);
if (!closed.get()) {
blockedPoll(ioContext);
}
}
/**
* Called from the native layer
*/
private void done(SubmitInfo info) {
info.done();
if (ioSpace != null) {
ioSpace.release();
}
info.done();
}
/**

View File

@ -657,19 +657,6 @@ public class LibaioTest {
latch.await();
System.out.println("time = " + (end - start) + " writes/second=" + NUMBER_OF_BLOCKS * 1000L / (end - start));
//
// MultiThreadAsynchronousFileTest.debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
// MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS *
// MultiThreadAsynchronousFileTest.NUMBER_OF_LINES *
// 1000 /
// (endTime - startTime) +
// " total time = " +
// (endTime - startTime) +
// " total number of records = " +
// MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS *
// MultiThreadAsynchronousFileTest.NUMBER_OF_LINES);
Thread.sleep(100);
blockedContext.close();
t.join();

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jlibaio.test;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.jlibaio.SubmitInfo;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class OpenCloseContextTest {
@BeforeClass
public static void testAIO() {
Assume.assumeTrue(LibaioContext.isLoaded());
}
@Rule
public TemporaryFolder folder;
public OpenCloseContextTest() {
folder = new TemporaryFolder(new File("./target"));
}
@Test
public void testRepeatOpenCloseContext() throws Exception {
ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
for (int i = 0; i < 512; i++) buffer.put((byte)'x');
for (int i = 0; i < 10; i++) {
System.out.println("#test " + i);
final LibaioContext control = new LibaioContext<>(5, true);
Thread t = new Thread() {
public void run() {
control.poll();
}
};
t.start();
LibaioFile file = control.openFile(folder.newFile(), true);
file.fill(4 * 1024);
final CountDownLatch insideMethod = new CountDownLatch(1);
final CountDownLatch awaitInside = new CountDownLatch(1);
file.write(0, 512, buffer, new SubmitInfo() {
@Override
public void onError(int errno, String message) {
}
@Override
public void done() {
insideMethod.countDown();
try {
awaitInside.await();
}
catch (Throwable e) {
e.printStackTrace();
}
System.out.println("done");
}
});
insideMethod.await();
file.write(512, 512, buffer, new SubmitInfo() {
@Override
public void onError(int errno, String message) {
}
@Override
public void done() {
}
});
awaitInside.countDown();
control.close();
t.join();
}
}
@Test
public void testRepeatOpenCloseContext2() throws Exception {
ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
for (int i = 0; i < 512; i++) buffer.put((byte)'x');
for (int i = 0; i < 10; i++) {
System.out.println("#test " + i);
final LibaioContext control = new LibaioContext<>(5, true);
Thread t = new Thread() {
public void run() {
control.poll();
}
};
t.start();
LibaioFile file = control.openFile(folder.newFile(), true);
file.fill(4 * 1024);
final CountDownLatch insideMethod = new CountDownLatch(1);
final CountDownLatch awaitInside = new CountDownLatch(1);
file.write(0, 512, buffer, new SubmitInfo() {
@Override
public void onError(int errno, String message) {
}
@Override
public void done() {
insideMethod.countDown();
try {
awaitInside.await(100, TimeUnit.MILLISECONDS);
}
catch (Throwable e) {
e.printStackTrace();
}
System.out.println("done");
}
});
insideMethod.await();
file.write(512, 512, buffer, new SubmitInfo() {
@Override
public void onError(int errno, String message) {
}
@Override
public void done() {
}
});
awaitInside.countDown();
control.close();
t.join();
}
}
@Test
public void testCloseAndStart() throws Exception {
final LibaioContext control2 = new LibaioContext<>(5, true);
final LibaioContext control = new LibaioContext<>(5, true);
control.close();
control.poll();
control2.close();
control2.poll();
System.out.println("Hello!!");
}
}