ARTEMIS-3701 Do no block libaio on compacting or closing

I am adding a test showing it is safe to not wait pending callbacks before closing a file.
With this I can just close the file and let the kernel to deal with sending the completions.
This commit is contained in:
Clebert Suconic 2022-03-04 10:41:45 -05:00 committed by clebertsuconic
parent e0ca92d783
commit e949e3843b
8 changed files with 154 additions and 98 deletions

View File

@ -31,14 +31,6 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
public interface SequentialFile {
default boolean isPending() {
return false;
}
default void waitNotPending() {
return;
}
boolean isOpen();
boolean exists();

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.PriorityQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -33,8 +32,6 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.AutomaticLatch;
import org.apache.activemq.artemis.utils.Waiter;
import org.jboss.logging.Logger;
/** This class is implementing Runnable to reuse a callback to close it. */
@ -44,14 +41,10 @@ public class AIOSequentialFile extends AbstractSequentialFile {
private boolean opened = false;
private volatile boolean pendingClose = false;
private LibaioFile aioFile;
private final AIOSequentialFileFactory aioFactory;
private final AutomaticLatch pendingCallbacks = new AutomaticLatch();
/**
* Used to determine the next writing sequence
*/
@ -106,49 +99,9 @@ public class AIOSequentialFile extends AbstractSequentialFile {
close(true, true);
}
private void actualClose() {
try {
aioFile.close();
} catch (Throwable e) {
// an exception here would means a double
logger.debug("Exeption while closing file - " + e.getMessage(), e);
} finally {
aioFile = null;
pendingClose = false;
aioFactory.afterClose();
}
}
@Override
public boolean isPending() {
return pendingClose;
}
@Override
public void waitNotPending() {
try {
for (short retryPending = 0; pendingClose && retryPending < 60; retryPending++) {
if (pendingCallbacks.await(1, TimeUnit.SECONDS)) {
break;
}
}
if (pendingClose) {
if (!Waiter.waitFor(() -> !pendingClose, TimeUnit.SECONDS, 60, TimeUnit.NANOSECONDS, 1000)) {
AIOSequentialFileFactory.threadDump("File " + getFileName() + " still has pending IO before closing it");
}
}
} catch (InterruptedException e) {
// nothing to be done here, other than log it and forward it
logger.warn(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
@Override
public synchronized void close(boolean waitSync, boolean blockOnWait) throws IOException, InterruptedException, ActiveMQException {
// a double call on close, should result on it waitingNotPending before another close is called
waitNotPending();
if (!opened) {
return;
}
@ -157,16 +110,16 @@ public class AIOSequentialFile extends AbstractSequentialFile {
super.close();
opened = false;
pendingClose = true;
this.timedBuffer = null;
if (waitSync) {
pendingCallbacks.afterCompletion(this::actualClose);
if (blockOnWait) {
pendingCallbacks.await();
}
} else {
actualClose();
try {
aioFile.close();
} catch (Throwable e) {
// an exception here would means a double
logger.debug("Exeption while closing file - " + e.getMessage(), e);
} finally {
aioFile = null;
aioFactory.afterClose();
}
}
@ -190,7 +143,6 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override
public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException {
// in case we are opening a file that was just closed, we need to wait previous executions to be done
waitNotPending();
if (opened) {
return;
}
@ -322,13 +274,10 @@ public class AIOSequentialFile extends AbstractSequentialFile {
boolean releaseBuffer) {
AIOSequentialFileFactory.AIOSequentialCallback callback = aioFactory.getCallback();
callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer, releaseBuffer);
pendingCallbacks.countUp();
return callback;
}
void done(AIOSequentialFileFactory.AIOSequentialCallback callback) {
pendingCallbacks.countDown();
if (callback.writeSequence == -1) {
callback.sequentialDone();
}
@ -373,7 +322,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override
public String toString() {
return "AIOSequentialFile{" + getFileName() + ", opened=" + opened + ", pendingClose=" + pendingClose + ", pendingCallbacks=" + pendingCallbacks + '}';
return "AIOSequentialFile{" + getFileName() + ", opened=" + opened + '}';
}
private void checkOpened() {

View File

@ -23,7 +23,6 @@ import java.lang.management.ThreadInfo;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.util.internal.PlatformDependent;
@ -41,7 +40,6 @@ import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
import org.jctools.queues.MpmcArrayQueue;
@ -64,8 +62,6 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
}
}
private final ReusableLatch pendingClose = new ReusableLatch(0);
private final ReuseBuffersController buffersControl = new ReuseBuffersController();
private volatile boolean reuseBuffers = true;
@ -81,11 +77,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
private static final String AIO_TEST_FILE = ".aio-test";
public void beforeClose() {
pendingClose.countUp();
}
public void afterClose() {
pendingClose.countDown();
}
public AIOSequentialFileFactory(final File journalDir, int maxIO) {
@ -305,15 +299,6 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
if (this.running.compareAndSet(true, false)) {
buffersControl.stop();
try {
// if we stop libaioContext before we finish this, we will never get confirmation on items previously sent
if (!pendingClose.await(1, TimeUnit.MINUTES)) {
threadDump("Timeout on waiting for asynchronous close");
}
} catch (Throwable throwableToLog) {
logger.warn(throwableToLog.getMessage(), throwableToLog);
}
libaioContext.close();
libaioContext = null;

View File

@ -115,7 +115,7 @@ public class JournalFileImpl implements JournalFile {
@Override
public boolean isCanReclaim() {
return reclaimable && posReclaimCriteria && negReclaimCriteria && !file.isPending();
return reclaimable && posReclaimCriteria && negReclaimCriteria;
}
@Override

View File

@ -148,17 +148,8 @@ public class JournalFilesRepository {
}
public void clear() throws Exception {
for (JournalFile file : dataFiles) {
file.getFile().waitNotPending();
}
dataFiles.clear();
for (JournalFile file : freeFiles) {
file.getFile().waitNotPending();
}
freeFiles.clear();
freeFilesCount.set(0);

View File

@ -2050,10 +2050,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
processBackup();
for (JournalFile file : dataFilesToProcess) {
file.getFile().waitNotPending();
}
return dataFilesToProcess;
}

View File

@ -84,7 +84,6 @@ public class FileIOUtilTest {
file.close(true, false);
}
} finally {
file.waitNotPending();
factory.stop();
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.journal;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
public class AsyncOpenCloseTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(AsyncOpenCloseTest.class);
@Test
public void testCloseOnSubmit() throws Exception {
Assume.assumeTrue(LibaioContext.isLoaded());
AtomicInteger errors = new AtomicInteger(0);
SequentialFileFactory factory = new AIOSequentialFileFactory(temporaryFolder.getRoot(), (Throwable error, String message, SequentialFile file) -> errors.incrementAndGet(), 4 * 1024);
factory.start();
SequentialFile file = factory.createSequentialFile("fileAIO.bin");
file.open(1024, true);
final int WRITES = 100;
final int RECORD_SIZE = 4 * 1024;
final int OPEN_TIMES = 25;
file.fill(WRITES * RECORD_SIZE);
ByteBuffer buffer = factory.newBuffer(RECORD_SIZE);
ActiveMQBuffer buffer2 = ActiveMQBuffers.wrappedBuffer(buffer);
try {
file.close(true, false);
AtomicInteger submit = new AtomicInteger(0);
ReusableLatch valve = new ReusableLatch(0);
byte writtenByte = (byte) 'a';
for (int nclose = 0; nclose < OPEN_TIMES; nclose++) {
logger.debug("************************************************** test " + nclose);
writtenByte++;
if (writtenByte >= (byte) 'z') {
writtenByte = (byte) 'a';
}
buffer2.setIndex(0, 0);
for (int s = 0; s < RECORD_SIZE; s++) {
buffer2.writeByte(writtenByte);
}
file.open(1024, true);
CyclicBarrier blocked = new CyclicBarrier(2);
for (int i = 0; i < WRITES; i++) {
if (i == 10) {
valve.countUp();
}
file.position(i * RECORD_SIZE);
submit.incrementAndGet();
buffer2.setIndex(0, RECORD_SIZE);
file.write(buffer2, true, new IOCallback() {
@Override
public void done() {
try {
if (!valve.await(1, TimeUnit.MILLISECONDS)) {
logger.debug("blocking");
blocked.await();
valve.await(10, TimeUnit.SECONDS);
logger.debug("unblocking");
}
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
submit.decrementAndGet();
}
@Override
public void onError(int errorCode, String errorMessage) {
errors.incrementAndGet();
}
});
}
blocked.await();
logger.debug("Closing");
file.close(false, false);
// even though the callback is blocked, the content of the file should already be good as written
validateFile(file, (byte) writtenByte);
valve.countDown();
Wait.assertEquals(0, submit::get, 5000, 10);
}
Wait.assertEquals(0, submit::get);
} finally {
factory.releaseBuffer(buffer);
factory.stop();
}
Assert.assertEquals(0, errors.get());
}
private void validateFile(SequentialFile file, byte writtenByte) throws IOException {
FileInputStream fileInputStream = new FileInputStream(file.getJavaFile());
byte[] wholeFile = fileInputStream.readAllBytes();
for (int i = 0; i < wholeFile.length; i++) {
Assert.assertEquals(writtenByte, (byte) wholeFile[i]);
}
fileInputStream.close();
}
}