diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index 8e355874be..688ae08eb0 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -134,7 +134,9 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public void waitNotPending() { try { - pendingCallbacks.await(); + do { + pendingCallbacks.await(); + } while(pendingClose); } catch (InterruptedException e) { // nothing to be done here, other than log it and forward it logger.warn(e.getMessage(), e); @@ -184,6 +186,8 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException { + // in case we are opening a file that was just closed, we need to wait previous executions to be done + waitNotPending(); if (opened) { return; } diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java index 981d119787..4bc4f2b66b 100644 --- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java @@ -19,12 +19,16 @@ package org.apache.activemq.artemis.core.io.aio; import java.io.File; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.util.FileIOUtil; +import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext; +import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile; import org.junit.Assert; +import org.junit.Assume; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -40,6 +44,55 @@ public class FileIOUtilTest { temporaryFolder = new TemporaryFolder(parent); } + /** Since the introduction of asynchronous close on AsyncIO journal + there was a situation that if you open a file while it was pending to close + you could have many issues with file not open, NPEs + this is to capture and fix that race + */ + @Test + public void testOpenClose() throws Exception { + Assume.assumeTrue(LibaioContext.isLoaded()); + AtomicInteger errors = new AtomicInteger(0); + + SequentialFileFactory factory = new AIOSequentialFileFactory(temporaryFolder.getRoot(), (Throwable error, String message, SequentialFile file) -> errors.incrementAndGet(), 4 * 1024); + factory.start(); + + SequentialFile file = factory.createSequentialFile("fileAIO.bin"); + file.open(1024, true); + + final int WRITES = 10; + final int RECORD_SIZE = 4 * 1024; + final int OPEN_TIMES = 250; + + + file.fill(WRITES * RECORD_SIZE); + + + try { + + file.close(true, false); + + for (int nclose = 0; nclose < OPEN_TIMES; nclose++) { + file.open(1024, true); + for (int i = 0; i < WRITES; i++) { + ByteBuffer buffer = factory.newBuffer(RECORD_SIZE); + for (int s = 0; s < RECORD_SIZE; s++) { + buffer.put((byte) 'j'); + } + file.position(i * RECORD_SIZE); + file.writeDirect(buffer, false); + } + file.close(true, false); + } + } finally { + file.waitNotPending(); + factory.stop(); + } + + Assert.assertEquals(0, errors.get()); + + } + @Test public void testCopy() throws Exception { SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);