ARTEMIS-3084 Avoiding Race condition on async close
This commit is contained in:
parent
e5a5ce1218
commit
873c2bcc18
|
@ -134,7 +134,9 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
@Override
|
||||
public void waitNotPending() {
|
||||
try {
|
||||
do {
|
||||
pendingCallbacks.await();
|
||||
} while(pendingClose);
|
||||
} catch (InterruptedException e) {
|
||||
// nothing to be done here, other than log it and forward it
|
||||
logger.warn(e.getMessage(), e);
|
||||
|
@ -184,6 +186,8 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
@Override
|
||||
public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException {
|
||||
// in case we are opening a file that was just closed, we need to wait previous executions to be done
|
||||
waitNotPending();
|
||||
if (opened) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -19,12 +19,16 @@ package org.apache.activemq.artemis.core.io.aio;
|
|||
|
||||
import java.io.File;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.util.FileIOUtil;
|
||||
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
|
||||
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
@ -40,6 +44,55 @@ public class FileIOUtilTest {
|
|||
temporaryFolder = new TemporaryFolder(parent);
|
||||
}
|
||||
|
||||
/** Since the introduction of asynchronous close on AsyncIO journal
|
||||
there was a situation that if you open a file while it was pending to close
|
||||
you could have many issues with file not open, NPEs
|
||||
this is to capture and fix that race
|
||||
*/
|
||||
@Test
|
||||
public void testOpenClose() throws Exception {
|
||||
Assume.assumeTrue(LibaioContext.isLoaded());
|
||||
AtomicInteger errors = new AtomicInteger(0);
|
||||
|
||||
SequentialFileFactory factory = new AIOSequentialFileFactory(temporaryFolder.getRoot(), (Throwable error, String message, SequentialFile file) -> errors.incrementAndGet(), 4 * 1024);
|
||||
factory.start();
|
||||
|
||||
SequentialFile file = factory.createSequentialFile("fileAIO.bin");
|
||||
file.open(1024, true);
|
||||
|
||||
final int WRITES = 10;
|
||||
final int RECORD_SIZE = 4 * 1024;
|
||||
final int OPEN_TIMES = 250;
|
||||
|
||||
|
||||
file.fill(WRITES * RECORD_SIZE);
|
||||
|
||||
|
||||
try {
|
||||
|
||||
file.close(true, false);
|
||||
|
||||
for (int nclose = 0; nclose < OPEN_TIMES; nclose++) {
|
||||
file.open(1024, true);
|
||||
for (int i = 0; i < WRITES; i++) {
|
||||
ByteBuffer buffer = factory.newBuffer(RECORD_SIZE);
|
||||
for (int s = 0; s < RECORD_SIZE; s++) {
|
||||
buffer.put((byte) 'j');
|
||||
}
|
||||
file.position(i * RECORD_SIZE);
|
||||
file.writeDirect(buffer, false);
|
||||
}
|
||||
file.close(true, false);
|
||||
}
|
||||
} finally {
|
||||
file.waitNotPending();
|
||||
factory.stop();
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, errors.get());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopy() throws Exception {
|
||||
SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
|
||||
|
|
Loading…
Reference in New Issue