ARTEMIS-249 - Improving LibaioContext shutdown
This will be avoiding scenarios where you could crash the VM during the LibaioContext.close, if done outside of the proper order.
This commit is contained in:
parent
a391f327d2
commit
c6d045b330
|
@ -724,7 +724,7 @@ public class Create extends InputAbstract {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean supportsLibaio() {
|
public boolean supportsLibaio() {
|
||||||
if (forceLibaio) {
|
if (forceLibaio) {
|
||||||
// forcing libaio
|
// forcing libaio
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.cli.Artemis;
|
import org.apache.activemq.artemis.cli.Artemis;
|
||||||
import org.apache.activemq.artemis.cli.commands.Configurable;
|
import org.apache.activemq.artemis.cli.commands.Configurable;
|
||||||
|
import org.apache.activemq.artemis.cli.commands.Create;
|
||||||
import org.apache.activemq.artemis.cli.commands.Run;
|
import org.apache.activemq.artemis.cli.commands.Run;
|
||||||
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
|
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
||||||
|
@ -77,6 +78,13 @@ public class ArtemisTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSupportsLibaio() throws Exception {
|
||||||
|
Create x = new Create();
|
||||||
|
x.setInstance(new File("/tmp/foo"));
|
||||||
|
x.supportsLibaio();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSync() throws Exception {
|
public void testSync() throws Exception {
|
||||||
int writes = 20;
|
int writes = 20;
|
||||||
|
|
Binary file not shown.
Binary file not shown.
|
@ -32,6 +32,7 @@
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#include <limits.h>
|
||||||
#include "org_apache_activemq_artemis_jlibaio_LibaioContext.h"
|
#include "org_apache_activemq_artemis_jlibaio_LibaioContext.h"
|
||||||
#include "exception_helper.h"
|
#include "exception_helper.h"
|
||||||
|
|
||||||
|
@ -53,8 +54,16 @@ struct io_control {
|
||||||
int iocbPut;
|
int iocbPut;
|
||||||
int iocbGet;
|
int iocbGet;
|
||||||
int used;
|
int used;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// We need a fast and reliable way to stop the blocked poller
|
||||||
|
// for that we need a dumb file,
|
||||||
|
// We are using a temporary file for this.
|
||||||
|
int dumbWriteHandler = 0;
|
||||||
|
char dumbPath[PATH_MAX];
|
||||||
|
|
||||||
|
|
||||||
jclass submitClass = NULL;
|
jclass submitClass = NULL;
|
||||||
jmethodID errorMethod = NULL;
|
jmethodID errorMethod = NULL;
|
||||||
jmethodID doneMethod = NULL;
|
jmethodID doneMethod = NULL;
|
||||||
|
@ -112,6 +121,20 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
|
||||||
if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
|
if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
|
||||||
return JNI_ERR;
|
return JNI_ERR;
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
|
sprintf (dumbPath, "%s/artemisJLHandler_XXXXXX", P_tmpdir);
|
||||||
|
dumbWriteHandler = mkstemp (dumbPath);
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
fprintf (stdout, "Creating temp file %s for dumb writes\n", dumbPath);
|
||||||
|
fflush(stdout);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (dumbWriteHandler < 0) {
|
||||||
|
fprintf (stderr, "couldn't create stop file handler %s\n", dumbPath);
|
||||||
|
return JNI_ERR;
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Accordingly to previous experiences we must hold Global Refs on Classes
|
// Accordingly to previous experiences we must hold Global Refs on Classes
|
||||||
// And
|
// And
|
||||||
|
@ -177,12 +200,25 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void closeDumbHandlers() {
|
||||||
|
if (dumbWriteHandler != 0) {
|
||||||
|
#ifdef DEBUG
|
||||||
|
fprintf (stdout, "Closing and removing dump handler %s\n", dumbPath);
|
||||||
|
#endif
|
||||||
|
dumbWriteHandler = 0;
|
||||||
|
close(dumbWriteHandler);
|
||||||
|
unlink(dumbPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void JNI_OnUnload(JavaVM* vm, void* reserved) {
|
void JNI_OnUnload(JavaVM* vm, void* reserved) {
|
||||||
JNIEnv* env;
|
JNIEnv* env;
|
||||||
if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
|
if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
|
||||||
// Something is wrong but nothing we can do about this :(
|
// Something is wrong but nothing we can do about this :(
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
|
closeDumbHandlers();
|
||||||
|
|
||||||
// delete global references so the GC can collect them
|
// delete global references so the GC can collect them
|
||||||
if (runtimeExceptionClass != NULL) {
|
if (runtimeExceptionClass != NULL) {
|
||||||
(*env)->DeleteGlobalRef(env, runtimeExceptionClass);
|
(*env)->DeleteGlobalRef(env, runtimeExceptionClass);
|
||||||
|
@ -201,6 +237,12 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_shutdownHook
|
||||||
|
(JNIEnv * env, jclass clazz) {
|
||||||
|
closeDumbHandlers();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) {
|
static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) {
|
||||||
struct io_control * ioControl = (struct io_control *) (*env)->GetDirectBufferAddress(env, pointer);
|
struct io_control * ioControl = (struct io_control *) (*env)->GetDirectBufferAddress(env, pointer);
|
||||||
if (ioControl == NULL) {
|
if (ioControl == NULL) {
|
||||||
|
@ -252,6 +294,23 @@ static inline void putIOCB(struct io_control * control, struct iocb * iocbBack)
|
||||||
pthread_mutex_unlock(&(control->iocbLock));
|
pthread_mutex_unlock(&(control->iocbLock));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline short submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) {
|
||||||
|
int result = io_submit(theControl->ioContext, 1, &iocb);
|
||||||
|
|
||||||
|
if (result < 0) {
|
||||||
|
// Putting the Global Ref and IOCB back in case of a failure
|
||||||
|
if (iocb->data != NULL && iocb->data != (void *) -1) {
|
||||||
|
(*env)->DeleteGlobalRef(env, (jobject)iocb->data);
|
||||||
|
}
|
||||||
|
putIOCB(theControl, iocb);
|
||||||
|
|
||||||
|
throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
static inline void * getBuffer(JNIEnv* env, jobject pointer) {
|
static inline void * getBuffer(JNIEnv* env, jobject pointer) {
|
||||||
return (*env)->GetDirectBufferAddress(env, pointer);
|
return (*env)->GetDirectBufferAddress(env, pointer);
|
||||||
}
|
}
|
||||||
|
@ -342,12 +401,34 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_de
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
io_queue_release(theControl->ioContext);
|
struct iocb * iocb = getIOCB(theControl);
|
||||||
|
|
||||||
|
if (iocb == NULL) {
|
||||||
|
throwIOException(env, "Not enough space in libaio queue");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Submitting a dumb write so the loop finishes
|
||||||
|
io_prep_pwrite(iocb, dumbWriteHandler, 0, 0, 0);
|
||||||
|
iocb->data = (void *) -1;
|
||||||
|
if (!submit(env, theControl, iocb)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// to make sure the poll has finished
|
// to make sure the poll has finished
|
||||||
pthread_mutex_lock(&(theControl->pollLock));
|
pthread_mutex_lock(&(theControl->pollLock));
|
||||||
pthread_mutex_unlock(&(theControl->pollLock));
|
pthread_mutex_unlock(&(theControl->pollLock));
|
||||||
|
|
||||||
|
// To return any pending IOCBs
|
||||||
|
int result = io_getevents(theControl->ioContext, 0, 1, theControl->events, 0);
|
||||||
|
for (i = 0; i < result; i++) {
|
||||||
|
struct io_event * event = &(theControl->events[i]);
|
||||||
|
struct iocb * iocbp = event->obj;
|
||||||
|
putIOCB(theControl, iocbp);
|
||||||
|
}
|
||||||
|
|
||||||
|
io_queue_release(theControl->ioContext);
|
||||||
|
|
||||||
pthread_mutex_destroy(&(theControl->pollLock));
|
pthread_mutex_destroy(&(theControl->pollLock));
|
||||||
pthread_mutex_destroy(&(theControl->iocbLock));
|
pthread_mutex_destroy(&(theControl->iocbLock));
|
||||||
|
|
||||||
|
@ -389,20 +470,6 @@ JNIEXPORT int JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_ope
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) {
|
|
||||||
int result = io_submit(theControl->ioContext, 1, &iocb);
|
|
||||||
|
|
||||||
if (result < 0) {
|
|
||||||
// Putting the Global Ref and IOCB back in case of a failure
|
|
||||||
(*env)->DeleteGlobalRef(env, (jobject)iocb->data);
|
|
||||||
putIOCB(theControl, iocb);
|
|
||||||
|
|
||||||
throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitWrite
|
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitWrite
|
||||||
(JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferWrite, jobject callback) {
|
(JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferWrite, jobject callback) {
|
||||||
struct io_control * theControl = getIOControl(env, contextPointer);
|
struct io_control * theControl = getIOControl(env, contextPointer);
|
||||||
|
@ -429,7 +496,7 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su
|
||||||
// also as the real intention is to hold the reference until the life cycle is complete
|
// also as the real intention is to hold the reference until the life cycle is complete
|
||||||
iocb->data = (void *) (*env)->NewGlobalRef(env, callback);
|
iocb->data = (void *) (*env)->NewGlobalRef(env, callback);
|
||||||
|
|
||||||
return submit(env, theControl, iocb);
|
submit(env, theControl, iocb);
|
||||||
}
|
}
|
||||||
|
|
||||||
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitRead
|
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitRead
|
||||||
|
@ -454,14 +521,15 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su
|
||||||
// also as the real intention is to hold the reference until the life cycle is complete
|
// also as the real intention is to hold the reference until the life cycle is complete
|
||||||
iocb->data = (void *) (*env)->NewGlobalRef(env, callback);
|
iocb->data = (void *) (*env)->NewGlobalRef(env, callback);
|
||||||
|
|
||||||
return submit(env, theControl, iocb);
|
submit(env, theControl, iocb);
|
||||||
}
|
}
|
||||||
|
|
||||||
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll
|
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll
|
||||||
(JNIEnv * env, jobject thisObject, jobject contextPointer) {
|
(JNIEnv * env, jobject thisObject, jobject contextPointer) {
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
fprintf (stdout, "Running blockedPoll");
|
fprintf (stdout, "Running blockedPoll\n");
|
||||||
|
fflush(stdout);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int i;
|
int i;
|
||||||
|
@ -472,37 +540,53 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
|
||||||
int max = theControl->queueSize;
|
int max = theControl->queueSize;
|
||||||
pthread_mutex_lock(&(theControl->pollLock));
|
pthread_mutex_lock(&(theControl->pollLock));
|
||||||
|
|
||||||
for (;;) {
|
short running = 1;
|
||||||
|
|
||||||
|
while (running) {
|
||||||
|
|
||||||
int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0);
|
int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0);
|
||||||
|
|
||||||
if (result < 0)
|
if (result < 0)
|
||||||
{
|
{
|
||||||
#ifdef DEBUG
|
throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result);
|
||||||
fprintf (stdout, "finished blockedPoll rutine with result=%d\n", result);
|
|
||||||
#endif
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
fprintf (stdout, "blockedPoll returned %d events\n", result);
|
fprintf (stdout, "blockedPoll returned %d events\n", result);
|
||||||
|
fflush(stdout);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
for (i = 0; i < result; i++)
|
for (i = 0; i < result; i++)
|
||||||
{
|
{
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
fprintf (stdout, "blockedPoll treading event %d\n", i);
|
fprintf (stdout, "blockedPoll treating event %d\n", i);
|
||||||
|
fflush(stdout);
|
||||||
#endif
|
#endif
|
||||||
struct io_event * event = &(theControl->events[i]);
|
struct io_event * event = &(theControl->events[i]);
|
||||||
struct iocb * iocbp = event->obj;
|
struct iocb * iocbp = event->obj;
|
||||||
|
|
||||||
|
if (iocbp->aio_fildes == dumbWriteHandler) {
|
||||||
|
#ifdef DEBUG
|
||||||
|
fprintf (stdout, "Dumb write arrived, giving up the loop\n");
|
||||||
|
fflush(stdout);
|
||||||
|
#endif
|
||||||
|
putIOCB(theControl, iocbp);
|
||||||
|
running = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int eventResult = (int)event->res;
|
int eventResult = (int)event->res;
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result);
|
fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result);
|
||||||
|
fflush (stdout);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (eventResult < 0) {
|
if (eventResult < 0) {
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
fprintf (stdout, "Error: %s\n", strerror(-eventResult));
|
fprintf (stdout, "Error: %s\n", strerror(-eventResult));
|
||||||
|
fflush (stdout);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult));
|
jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult));
|
||||||
|
@ -554,14 +638,15 @@ JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_po
|
||||||
fprintf (stdout, "Error: %s\n", strerror(-eventResult));
|
fprintf (stdout, "Error: %s\n", strerror(-eventResult));
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
if (iocbp->data != NULL && iocbp->data != (void *) -1) {
|
||||||
jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult));
|
jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult));
|
||||||
|
|
||||||
(*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError);
|
(*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iocbp->data != NULL && iocbp->data != (void *) -1) {
|
||||||
(*env)->SetObjectArrayElement(env, callbacks, i, (jobject)iocbp->data);
|
(*env)->SetObjectArrayElement(env, callbacks, i, (jobject)iocbp->data);
|
||||||
|
|
||||||
if (iocbp->data != NULL) {
|
|
||||||
// We delete the globalRef after the completion of the callback
|
// We delete the globalRef after the completion of the callback
|
||||||
(*env)->DeleteGlobalRef(env, (jobject)iocbp->data);
|
(*env)->DeleteGlobalRef(env, (jobject)iocbp->data);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,9 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||||
|
@ -49,10 +51,14 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
||||||
* <br>
|
* <br>
|
||||||
* Or else the native module won't be loaded because of version mismatches
|
* Or else the native module won't be loaded because of version mismatches
|
||||||
*/
|
*/
|
||||||
private static final int EXPECTED_NATIVE_VERSION = 2;
|
private static final int EXPECTED_NATIVE_VERSION = 3;
|
||||||
|
|
||||||
private static boolean loaded = false;
|
private static boolean loaded = false;
|
||||||
|
|
||||||
|
private static final AtomicBoolean shuttingDown = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private static final AtomicInteger contexts = new AtomicInteger(0);
|
||||||
|
|
||||||
public static boolean isLoaded() {
|
public static boolean isLoaded() {
|
||||||
return loaded;
|
return loaded;
|
||||||
}
|
}
|
||||||
|
@ -81,6 +87,12 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
||||||
for (String library : libraries) {
|
for (String library : libraries) {
|
||||||
if (loadLibrary(library)) {
|
if (loadLibrary(library)) {
|
||||||
loaded = true;
|
loaded = true;
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||||
|
public void run() {
|
||||||
|
shuttingDown.set(true);
|
||||||
|
checkShutdown();
|
||||||
|
}
|
||||||
|
});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -93,6 +105,14 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void checkShutdown() {
|
||||||
|
if (contexts.get() == 0 && shuttingDown.get()) {
|
||||||
|
shutdownHook();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static native void shutdownHook();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is used to validate leaks on tests.
|
* This is used to validate leaks on tests.
|
||||||
*
|
*
|
||||||
|
@ -140,6 +160,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
||||||
*/
|
*/
|
||||||
public LibaioContext(int queueSize, boolean useSemaphore) {
|
public LibaioContext(int queueSize, boolean useSemaphore) {
|
||||||
try {
|
try {
|
||||||
|
contexts.incrementAndGet();
|
||||||
this.ioContext = newContext(queueSize);
|
this.ioContext = newContext(queueSize);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
@ -170,6 +191,9 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
||||||
int size,
|
int size,
|
||||||
ByteBuffer bufferWrite,
|
ByteBuffer bufferWrite,
|
||||||
Callback callback) throws IOException {
|
Callback callback) throws IOException {
|
||||||
|
if (closed.get()) {
|
||||||
|
throw new IOException("Libaio Context is closed!");
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
if (ioSpace != null) {
|
if (ioSpace != null) {
|
||||||
ioSpace.acquire();
|
ioSpace.acquire();
|
||||||
|
@ -187,6 +211,9 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
||||||
int size,
|
int size,
|
||||||
ByteBuffer bufferWrite,
|
ByteBuffer bufferWrite,
|
||||||
Callback callback) throws IOException {
|
Callback callback) throws IOException {
|
||||||
|
if (closed.get()) {
|
||||||
|
throw new IOException("Libaio Context is closed!");
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
if (ioSpace != null) {
|
if (ioSpace != null) {
|
||||||
ioSpace.acquire();
|
ioSpace.acquire();
|
||||||
|
@ -208,11 +235,22 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
if (!closed.getAndSet(true)) {
|
if (!closed.getAndSet(true)) {
|
||||||
|
|
||||||
|
if (ioSpace != null) {
|
||||||
|
try {
|
||||||
|
ioSpace.tryAcquire(queueSize, 10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
NativeLogger.LOGGER.error(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
totalMaxIO.addAndGet(-queueSize);
|
totalMaxIO.addAndGet(-queueSize);
|
||||||
|
|
||||||
if (ioContext != null) {
|
if (ioContext != null) {
|
||||||
deleteContext(ioContext);
|
deleteContext(ioContext);
|
||||||
}
|
}
|
||||||
|
contexts.decrementAndGet();
|
||||||
|
checkShutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,17 +346,19 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
||||||
* {@link SubmitInfo#done()} are called.
|
* {@link SubmitInfo#done()} are called.
|
||||||
*/
|
*/
|
||||||
public void poll() {
|
public void poll() {
|
||||||
|
if (!closed.get()) {
|
||||||
blockedPoll(ioContext);
|
blockedPoll(ioContext);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called from the native layer
|
* Called from the native layer
|
||||||
*/
|
*/
|
||||||
private void done(SubmitInfo info) {
|
private void done(SubmitInfo info) {
|
||||||
|
info.done();
|
||||||
if (ioSpace != null) {
|
if (ioSpace != null) {
|
||||||
ioSpace.release();
|
ioSpace.release();
|
||||||
}
|
}
|
||||||
info.done();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -657,19 +657,6 @@ public class LibaioTest {
|
||||||
latch.await();
|
latch.await();
|
||||||
|
|
||||||
System.out.println("time = " + (end - start) + " writes/second=" + NUMBER_OF_BLOCKS * 1000L / (end - start));
|
System.out.println("time = " + (end - start) + " writes/second=" + NUMBER_OF_BLOCKS * 1000L / (end - start));
|
||||||
//
|
|
||||||
// MultiThreadAsynchronousFileTest.debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
|
|
||||||
// MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS *
|
|
||||||
// MultiThreadAsynchronousFileTest.NUMBER_OF_LINES *
|
|
||||||
// 1000 /
|
|
||||||
// (endTime - startTime) +
|
|
||||||
// " total time = " +
|
|
||||||
// (endTime - startTime) +
|
|
||||||
// " total number of records = " +
|
|
||||||
// MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS *
|
|
||||||
// MultiThreadAsynchronousFileTest.NUMBER_OF_LINES);
|
|
||||||
|
|
||||||
Thread.sleep(100);
|
|
||||||
|
|
||||||
blockedContext.close();
|
blockedContext.close();
|
||||||
t.join();
|
t.join();
|
||||||
|
|
|
@ -0,0 +1,181 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.jlibaio.test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||||
|
import org.apache.activemq.artemis.jlibaio.LibaioFile;
|
||||||
|
import org.apache.activemq.artemis.jlibaio.SubmitInfo;
|
||||||
|
import org.junit.Assume;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
public class OpenCloseContextTest {
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void testAIO() {
|
||||||
|
Assume.assumeTrue(LibaioContext.isLoaded());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TemporaryFolder folder;
|
||||||
|
|
||||||
|
public OpenCloseContextTest() {
|
||||||
|
folder = new TemporaryFolder(new File("./target"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRepeatOpenCloseContext() throws Exception {
|
||||||
|
ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
|
||||||
|
for (int i = 0; i < 512; i++) buffer.put((byte)'x');
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
System.out.println("#test " + i);
|
||||||
|
final LibaioContext control = new LibaioContext<>(5, true);
|
||||||
|
Thread t = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
control.poll();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
t.start();
|
||||||
|
LibaioFile file = control.openFile(folder.newFile(), true);
|
||||||
|
file.fill(4 * 1024);
|
||||||
|
final CountDownLatch insideMethod = new CountDownLatch(1);
|
||||||
|
final CountDownLatch awaitInside = new CountDownLatch(1);
|
||||||
|
file.write(0, 512, buffer, new SubmitInfo() {
|
||||||
|
@Override
|
||||||
|
public void onError(int errno, String message) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void done() {
|
||||||
|
insideMethod.countDown();
|
||||||
|
try {
|
||||||
|
awaitInside.await();
|
||||||
|
}
|
||||||
|
catch (Throwable e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
System.out.println("done");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
insideMethod.await();
|
||||||
|
|
||||||
|
file.write(512, 512, buffer, new SubmitInfo() {
|
||||||
|
@Override
|
||||||
|
public void onError(int errno, String message) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void done() {
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
awaitInside.countDown();
|
||||||
|
control.close();
|
||||||
|
|
||||||
|
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRepeatOpenCloseContext2() throws Exception {
|
||||||
|
ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
|
||||||
|
for (int i = 0; i < 512; i++) buffer.put((byte)'x');
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
System.out.println("#test " + i);
|
||||||
|
final LibaioContext control = new LibaioContext<>(5, true);
|
||||||
|
Thread t = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
control.poll();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
t.start();
|
||||||
|
LibaioFile file = control.openFile(folder.newFile(), true);
|
||||||
|
file.fill(4 * 1024);
|
||||||
|
final CountDownLatch insideMethod = new CountDownLatch(1);
|
||||||
|
final CountDownLatch awaitInside = new CountDownLatch(1);
|
||||||
|
file.write(0, 512, buffer, new SubmitInfo() {
|
||||||
|
@Override
|
||||||
|
public void onError(int errno, String message) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void done() {
|
||||||
|
insideMethod.countDown();
|
||||||
|
try {
|
||||||
|
awaitInside.await(100, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
catch (Throwable e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
System.out.println("done");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
insideMethod.await();
|
||||||
|
|
||||||
|
file.write(512, 512, buffer, new SubmitInfo() {
|
||||||
|
@Override
|
||||||
|
public void onError(int errno, String message) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void done() {
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
awaitInside.countDown();
|
||||||
|
|
||||||
|
control.close();
|
||||||
|
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCloseAndStart() throws Exception {
|
||||||
|
final LibaioContext control2 = new LibaioContext<>(5, true);
|
||||||
|
|
||||||
|
final LibaioContext control = new LibaioContext<>(5, true);
|
||||||
|
control.close();
|
||||||
|
control.poll();
|
||||||
|
|
||||||
|
|
||||||
|
control2.close();
|
||||||
|
control2.poll();
|
||||||
|
|
||||||
|
System.out.println("Hello!!");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue