diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index 0c558f8010..1de78d96f2 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -724,7 +724,7 @@ public class Create extends InputAbstract { } } - private boolean supportsLibaio() { + public boolean supportsLibaio() { if (forceLibaio) { // forcing libaio return true; diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java index df2ec38e27..c0817907a7 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.cli.Artemis; import org.apache.activemq.artemis.cli.commands.Configurable; +import org.apache.activemq.artemis.cli.commands.Create; import org.apache.activemq.artemis.cli.commands.Run; import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; @@ -77,6 +78,13 @@ public class ArtemisTest { } } + @Test + public void testSupportsLibaio() throws Exception { + Create x = new Create(); + x.setInstance(new File("/tmp/foo")); + x.supportsLibaio(); + } + @Test public void testSync() throws Exception { int writes = 20; diff --git a/artemis-native/bin/libartemis-native-32.so b/artemis-native/bin/libartemis-native-32.so index 15c3d6644a..540da4b73c 100755 Binary files a/artemis-native/bin/libartemis-native-32.so and b/artemis-native/bin/libartemis-native-32.so differ diff --git a/artemis-native/bin/libartemis-native-64.so b/artemis-native/bin/libartemis-native-64.so index 5c7ddac319..33b9b1c7b3 100755 Binary files a/artemis-native/bin/libartemis-native-64.so and b/artemis-native/bin/libartemis-native-64.so differ diff --git a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c index 3beb1bff79..1e08041c5d 100644 --- a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c +++ b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c @@ -32,6 +32,7 @@ #include #include #include +#include #include "org_apache_activemq_artemis_jlibaio_LibaioContext.h" #include "exception_helper.h" @@ -53,8 +54,16 @@ struct io_control { int iocbPut; int iocbGet; int used; + }; +// We need a fast and reliable way to stop the blocked poller +// for that we need a dumb file, +// We are using a temporary file for this. +int dumbWriteHandler = 0; +char dumbPath[PATH_MAX]; + + jclass submitClass = NULL; jmethodID errorMethod = NULL; jmethodID doneMethod = NULL; @@ -112,6 +121,20 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) { return JNI_ERR; } else { + + sprintf (dumbPath, "%s/artemisJLHandler_XXXXXX", P_tmpdir); + dumbWriteHandler = mkstemp (dumbPath); + + #ifdef DEBUG + fprintf (stdout, "Creating temp file %s for dumb writes\n", dumbPath); + fflush(stdout); + #endif + + if (dumbWriteHandler < 0) { + fprintf (stderr, "couldn't create stop file handler %s\n", dumbPath); + return JNI_ERR; + } + // // Accordingly to previous experiences we must hold Global Refs on Classes // And @@ -177,12 +200,25 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { } } +inline void closeDumbHandlers() { + if (dumbWriteHandler != 0) { + #ifdef DEBUG + fprintf (stdout, "Closing and removing dump handler %s\n", dumbPath); + #endif + dumbWriteHandler = 0; + close(dumbWriteHandler); + unlink(dumbPath); + } +} + void JNI_OnUnload(JavaVM* vm, void* reserved) { JNIEnv* env; if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) { // Something is wrong but nothing we can do about this :( return; } else { + closeDumbHandlers(); + // delete global references so the GC can collect them if (runtimeExceptionClass != NULL) { (*env)->DeleteGlobalRef(env, runtimeExceptionClass); @@ -201,6 +237,12 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { } } +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_shutdownHook + (JNIEnv * env, jclass clazz) { + closeDumbHandlers(); +} + + static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) { struct io_control * ioControl = (struct io_control *) (*env)->GetDirectBufferAddress(env, pointer); if (ioControl == NULL) { @@ -252,6 +294,23 @@ static inline void putIOCB(struct io_control * control, struct iocb * iocbBack) pthread_mutex_unlock(&(control->iocbLock)); } +static inline short submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) { + int result = io_submit(theControl->ioContext, 1, &iocb); + + if (result < 0) { + // Putting the Global Ref and IOCB back in case of a failure + if (iocb->data != NULL && iocb->data != (void *) -1) { + (*env)->DeleteGlobalRef(env, (jobject)iocb->data); + } + putIOCB(theControl, iocb); + + throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result); + return 0; + } + + return 1; +} + static inline void * getBuffer(JNIEnv* env, jobject pointer) { return (*env)->GetDirectBufferAddress(env, pointer); } @@ -342,12 +401,34 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_de return; } - io_queue_release(theControl->ioContext); + struct iocb * iocb = getIOCB(theControl); + + if (iocb == NULL) { + throwIOException(env, "Not enough space in libaio queue"); + return; + } + + // Submitting a dumb write so the loop finishes + io_prep_pwrite(iocb, dumbWriteHandler, 0, 0, 0); + iocb->data = (void *) -1; + if (!submit(env, theControl, iocb)) { + return; + } // to make sure the poll has finished pthread_mutex_lock(&(theControl->pollLock)); pthread_mutex_unlock(&(theControl->pollLock)); + // To return any pending IOCBs + int result = io_getevents(theControl->ioContext, 0, 1, theControl->events, 0); + for (i = 0; i < result; i++) { + struct io_event * event = &(theControl->events[i]); + struct iocb * iocbp = event->obj; + putIOCB(theControl, iocbp); + } + + io_queue_release(theControl->ioContext); + pthread_mutex_destroy(&(theControl->pollLock)); pthread_mutex_destroy(&(theControl->iocbLock)); @@ -389,20 +470,6 @@ JNIEXPORT int JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_ope return res; } -static inline void submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) { - int result = io_submit(theControl->ioContext, 1, &iocb); - - if (result < 0) { - // Putting the Global Ref and IOCB back in case of a failure - (*env)->DeleteGlobalRef(env, (jobject)iocb->data); - putIOCB(theControl, iocb); - - throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result); - } - - return; -} - JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitWrite (JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferWrite, jobject callback) { struct io_control * theControl = getIOControl(env, contextPointer); @@ -429,7 +496,7 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su // also as the real intention is to hold the reference until the life cycle is complete iocb->data = (void *) (*env)->NewGlobalRef(env, callback); - return submit(env, theControl, iocb); + submit(env, theControl, iocb); } JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitRead @@ -454,14 +521,15 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su // also as the real intention is to hold the reference until the life cycle is complete iocb->data = (void *) (*env)->NewGlobalRef(env, callback); - return submit(env, theControl, iocb); + submit(env, theControl, iocb); } JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll (JNIEnv * env, jobject thisObject, jobject contextPointer) { #ifdef DEBUG - fprintf (stdout, "Running blockedPoll"); + fprintf (stdout, "Running blockedPoll\n"); + fflush(stdout); #endif int i; @@ -472,37 +540,53 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl int max = theControl->queueSize; pthread_mutex_lock(&(theControl->pollLock)); - for (;;) { + short running = 1; + + while (running) { int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0); if (result < 0) { - #ifdef DEBUG - fprintf (stdout, "finished blockedPoll rutine with result=%d\n", result); - #endif + throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result); break; } #ifdef DEBUG fprintf (stdout, "blockedPoll returned %d events\n", result); + fflush(stdout); #endif for (i = 0; i < result; i++) { #ifdef DEBUG - fprintf (stdout, "blockedPoll treading event %d\n", i); + fprintf (stdout, "blockedPoll treating event %d\n", i); + fflush(stdout); #endif struct io_event * event = &(theControl->events[i]); struct iocb * iocbp = event->obj; + + if (iocbp->aio_fildes == dumbWriteHandler) { + #ifdef DEBUG + fprintf (stdout, "Dumb write arrived, giving up the loop\n"); + fflush(stdout); + #endif + putIOCB(theControl, iocbp); + running = 0; + break; + } + + int eventResult = (int)event->res; #ifdef DEBUG fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result); + fflush (stdout); #endif if (eventResult < 0) { #ifdef DEBUG fprintf (stdout, "Error: %s\n", strerror(-eventResult)); + fflush (stdout); #endif jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult)); @@ -554,14 +638,15 @@ JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_po fprintf (stdout, "Error: %s\n", strerror(-eventResult)); #endif - jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult)); + if (iocbp->data != NULL && iocbp->data != (void *) -1) { + jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult)); - (*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError); + (*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError); + } } - (*env)->SetObjectArrayElement(env, callbacks, i, (jobject)iocbp->data); - - if (iocbp->data != NULL) { + if (iocbp->data != NULL && iocbp->data != (void *) -1) { + (*env)->SetObjectArrayElement(env, callbacks, i, (jobject)iocbp->data); // We delete the globalRef after the completion of the callback (*env)->DeleteGlobalRef(env, (jobject)iocbp->data); } diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java index 62bd98ffb4..9e86047cc8 100644 --- a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java @@ -21,7 +21,9 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static io.netty.util.internal.ObjectUtil.checkNotNull; @@ -49,10 +51,14 @@ public class LibaioContext implements Closeable { *
* Or else the native module won't be loaded because of version mismatches */ - private static final int EXPECTED_NATIVE_VERSION = 2; + private static final int EXPECTED_NATIVE_VERSION = 3; private static boolean loaded = false; + private static final AtomicBoolean shuttingDown = new AtomicBoolean(false); + + private static final AtomicInteger contexts = new AtomicInteger(0); + public static boolean isLoaded() { return loaded; } @@ -81,6 +87,12 @@ public class LibaioContext implements Closeable { for (String library : libraries) { if (loadLibrary(library)) { loaded = true; + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + shuttingDown.set(true); + checkShutdown(); + } + }); break; } else { @@ -93,6 +105,14 @@ public class LibaioContext implements Closeable { } } + private static void checkShutdown() { + if (contexts.get() == 0 && shuttingDown.get()) { + shutdownHook(); + } + } + + private static native void shutdownHook(); + /** * This is used to validate leaks on tests. * @@ -140,6 +160,7 @@ public class LibaioContext implements Closeable { */ public LibaioContext(int queueSize, boolean useSemaphore) { try { + contexts.incrementAndGet(); this.ioContext = newContext(queueSize); } catch (Exception e) { @@ -170,6 +191,9 @@ public class LibaioContext implements Closeable { int size, ByteBuffer bufferWrite, Callback callback) throws IOException { + if (closed.get()) { + throw new IOException("Libaio Context is closed!"); + } try { if (ioSpace != null) { ioSpace.acquire(); @@ -187,6 +211,9 @@ public class LibaioContext implements Closeable { int size, ByteBuffer bufferWrite, Callback callback) throws IOException { + if (closed.get()) { + throw new IOException("Libaio Context is closed!"); + } try { if (ioSpace != null) { ioSpace.acquire(); @@ -208,11 +235,22 @@ public class LibaioContext implements Closeable { @Override public void close() { if (!closed.getAndSet(true)) { + + if (ioSpace != null) { + try { + ioSpace.tryAcquire(queueSize, 10, TimeUnit.SECONDS); + } + catch (Exception e) { + NativeLogger.LOGGER.error(e); + } + } totalMaxIO.addAndGet(-queueSize); if (ioContext != null) { deleteContext(ioContext); } + contexts.decrementAndGet(); + checkShutdown(); } } @@ -308,17 +346,19 @@ public class LibaioContext implements Closeable { * {@link SubmitInfo#done()} are called. */ public void poll() { - blockedPoll(ioContext); + if (!closed.get()) { + blockedPoll(ioContext); + } } /** * Called from the native layer */ private void done(SubmitInfo info) { + info.done(); if (ioSpace != null) { ioSpace.release(); } - info.done(); } /** diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java index c93670ad85..f9a1aeec02 100644 --- a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java +++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java @@ -657,19 +657,6 @@ public class LibaioTest { latch.await(); System.out.println("time = " + (end - start) + " writes/second=" + NUMBER_OF_BLOCKS * 1000L / (end - start)); - // - // MultiThreadAsynchronousFileTest.debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " + - // MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS * - // MultiThreadAsynchronousFileTest.NUMBER_OF_LINES * - // 1000 / - // (endTime - startTime) + - // " total time = " + - // (endTime - startTime) + - // " total number of records = " + - // MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS * - // MultiThreadAsynchronousFileTest.NUMBER_OF_LINES); - - Thread.sleep(100); blockedContext.close(); t.join(); diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java new file mode 100644 index 0000000000..432c05f014 --- /dev/null +++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.jlibaio.test; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.jlibaio.LibaioFile; +import org.apache.activemq.artemis.jlibaio.SubmitInfo; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class OpenCloseContextTest { + + @BeforeClass + public static void testAIO() { + Assume.assumeTrue(LibaioContext.isLoaded()); + } + + @Rule + public TemporaryFolder folder; + + public OpenCloseContextTest() { + folder = new TemporaryFolder(new File("./target")); + } + + @Test + public void testRepeatOpenCloseContext() throws Exception { + ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512); + for (int i = 0; i < 512; i++) buffer.put((byte)'x'); + + for (int i = 0; i < 10; i++) { + System.out.println("#test " + i); + final LibaioContext control = new LibaioContext<>(5, true); + Thread t = new Thread() { + public void run() { + control.poll(); + } + }; + t.start(); + LibaioFile file = control.openFile(folder.newFile(), true); + file.fill(4 * 1024); + final CountDownLatch insideMethod = new CountDownLatch(1); + final CountDownLatch awaitInside = new CountDownLatch(1); + file.write(0, 512, buffer, new SubmitInfo() { + @Override + public void onError(int errno, String message) { + + } + + @Override + public void done() { + insideMethod.countDown(); + try { + awaitInside.await(); + } + catch (Throwable e) { + e.printStackTrace(); + } + System.out.println("done"); + } + }); + + insideMethod.await(); + + file.write(512, 512, buffer, new SubmitInfo() { + @Override + public void onError(int errno, String message) { + } + + @Override + public void done() { + } + }); + + awaitInside.countDown(); + control.close(); + + + t.join(); + } + + + } + + @Test + public void testRepeatOpenCloseContext2() throws Exception { + ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512); + for (int i = 0; i < 512; i++) buffer.put((byte)'x'); + + for (int i = 0; i < 10; i++) { + System.out.println("#test " + i); + final LibaioContext control = new LibaioContext<>(5, true); + Thread t = new Thread() { + public void run() { + control.poll(); + } + }; + t.start(); + LibaioFile file = control.openFile(folder.newFile(), true); + file.fill(4 * 1024); + final CountDownLatch insideMethod = new CountDownLatch(1); + final CountDownLatch awaitInside = new CountDownLatch(1); + file.write(0, 512, buffer, new SubmitInfo() { + @Override + public void onError(int errno, String message) { + + } + + @Override + public void done() { + insideMethod.countDown(); + try { + awaitInside.await(100, TimeUnit.MILLISECONDS); + } + catch (Throwable e) { + e.printStackTrace(); + } + System.out.println("done"); + } + }); + + insideMethod.await(); + + file.write(512, 512, buffer, new SubmitInfo() { + @Override + public void onError(int errno, String message) { + } + + @Override + public void done() { + } + }); + + awaitInside.countDown(); + + control.close(); + + t.join(); + } + + + } + + @Test + public void testCloseAndStart() throws Exception { + final LibaioContext control2 = new LibaioContext<>(5, true); + + final LibaioContext control = new LibaioContext<>(5, true); + control.close(); + control.poll(); + + + control2.close(); + control2.poll(); + + System.out.println("Hello!!"); + } + +}