From e949e3843bac074b7808285e1885fab7f3af630c Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 4 Mar 2022 10:41:45 -0500 Subject: [PATCH] ARTEMIS-3701 Do no block libaio on compacting or closing I am adding a test showing it is safe to not wait pending callbacks before closing a file. With this I can just close the file and let the kernel to deal with sending the completions. --- .../artemis/core/io/SequentialFile.java | 8 - .../core/io/aio/AIOSequentialFile.java | 69 ++------- .../core/io/aio/AIOSequentialFileFactory.java | 15 -- .../core/journal/impl/JournalFileImpl.java | 2 +- .../journal/impl/JournalFilesRepository.java | 9 -- .../core/journal/impl/JournalImpl.java | 4 - .../artemis/core/io/aio/FileIOUtilTest.java | 1 - .../journal/AsyncOpenCloseTest.java | 144 ++++++++++++++++++ 8 files changed, 154 insertions(+), 98 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AsyncOpenCloseTest.java diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java index 328ffe688e..a285d84f84 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java @@ -31,14 +31,6 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport; public interface SequentialFile { - default boolean isPending() { - return false; - } - - default void waitNotPending() { - return; - } - boolean isOpen(); boolean exists(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index 026e2172db..5e46e3d73d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.PriorityQueue; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -33,8 +32,6 @@ import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; -import org.apache.activemq.artemis.utils.AutomaticLatch; -import org.apache.activemq.artemis.utils.Waiter; import org.jboss.logging.Logger; /** This class is implementing Runnable to reuse a callback to close it. */ @@ -44,14 +41,10 @@ public class AIOSequentialFile extends AbstractSequentialFile { private boolean opened = false; - private volatile boolean pendingClose = false; - private LibaioFile aioFile; private final AIOSequentialFileFactory aioFactory; - private final AutomaticLatch pendingCallbacks = new AutomaticLatch(); - /** * Used to determine the next writing sequence */ @@ -106,49 +99,9 @@ public class AIOSequentialFile extends AbstractSequentialFile { close(true, true); } - private void actualClose() { - try { - aioFile.close(); - } catch (Throwable e) { - // an exception here would means a double - logger.debug("Exeption while closing file - " + e.getMessage(), e); - } finally { - aioFile = null; - pendingClose = false; - aioFactory.afterClose(); - } - } - - @Override - public boolean isPending() { - return pendingClose; - } - - @Override - public void waitNotPending() { - try { - for (short retryPending = 0; pendingClose && retryPending < 60; retryPending++) { - if (pendingCallbacks.await(1, TimeUnit.SECONDS)) { - break; - } - } - if (pendingClose) { - if (!Waiter.waitFor(() -> !pendingClose, TimeUnit.SECONDS, 60, TimeUnit.NANOSECONDS, 1000)) { - AIOSequentialFileFactory.threadDump("File " + getFileName() + " still has pending IO before closing it"); - } - } - } catch (InterruptedException e) { - // nothing to be done here, other than log it and forward it - logger.warn(e.getMessage(), e); - Thread.currentThread().interrupt(); - } - } - @Override public synchronized void close(boolean waitSync, boolean blockOnWait) throws IOException, InterruptedException, ActiveMQException { // a double call on close, should result on it waitingNotPending before another close is called - waitNotPending(); - if (!opened) { return; } @@ -157,16 +110,16 @@ public class AIOSequentialFile extends AbstractSequentialFile { super.close(); opened = false; - pendingClose = true; this.timedBuffer = null; - if (waitSync) { - pendingCallbacks.afterCompletion(this::actualClose); - if (blockOnWait) { - pendingCallbacks.await(); - } - } else { - actualClose(); + try { + aioFile.close(); + } catch (Throwable e) { + // an exception here would means a double + logger.debug("Exeption while closing file - " + e.getMessage(), e); + } finally { + aioFile = null; + aioFactory.afterClose(); } } @@ -190,7 +143,6 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException { // in case we are opening a file that was just closed, we need to wait previous executions to be done - waitNotPending(); if (opened) { return; } @@ -322,13 +274,10 @@ public class AIOSequentialFile extends AbstractSequentialFile { boolean releaseBuffer) { AIOSequentialFileFactory.AIOSequentialCallback callback = aioFactory.getCallback(); callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer, releaseBuffer); - pendingCallbacks.countUp(); return callback; } void done(AIOSequentialFileFactory.AIOSequentialCallback callback) { - pendingCallbacks.countDown(); - if (callback.writeSequence == -1) { callback.sequentialDone(); } @@ -373,7 +322,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public String toString() { - return "AIOSequentialFile{" + getFileName() + ", opened=" + opened + ", pendingClose=" + pendingClose + ", pendingCallbacks=" + pendingCallbacks + '}'; + return "AIOSequentialFile{" + getFileName() + ", opened=" + opened + '}'; } private void checkOpened() { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index 0050b163d3..1676242f5d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -23,7 +23,6 @@ import java.lang.management.ThreadInfo; import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.util.internal.PlatformDependent; @@ -41,7 +40,6 @@ import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile; import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.PowerOf2Util; -import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.jboss.logging.Logger; import org.jctools.queues.MpmcArrayQueue; @@ -64,8 +62,6 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor } } - private final ReusableLatch pendingClose = new ReusableLatch(0); - private final ReuseBuffersController buffersControl = new ReuseBuffersController(); private volatile boolean reuseBuffers = true; @@ -81,11 +77,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor private static final String AIO_TEST_FILE = ".aio-test"; public void beforeClose() { - pendingClose.countUp(); } public void afterClose() { - pendingClose.countDown(); } public AIOSequentialFileFactory(final File journalDir, int maxIO) { @@ -305,15 +299,6 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor if (this.running.compareAndSet(true, false)) { buffersControl.stop(); - try { - // if we stop libaioContext before we finish this, we will never get confirmation on items previously sent - if (!pendingClose.await(1, TimeUnit.MINUTES)) { - threadDump("Timeout on waiting for asynchronous close"); - } - } catch (Throwable throwableToLog) { - logger.warn(throwableToLog.getMessage(), throwableToLog); - } - libaioContext.close(); libaioContext = null; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java index e1055d613e..4b406be090 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java @@ -115,7 +115,7 @@ public class JournalFileImpl implements JournalFile { @Override public boolean isCanReclaim() { - return reclaimable && posReclaimCriteria && negReclaimCriteria && !file.isPending(); + return reclaimable && posReclaimCriteria && negReclaimCriteria; } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java index 2d951c4a53..0c715735a7 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java @@ -148,17 +148,8 @@ public class JournalFilesRepository { } public void clear() throws Exception { - for (JournalFile file : dataFiles) { - file.getFile().waitNotPending(); - } - dataFiles.clear(); - - for (JournalFile file : freeFiles) { - file.getFile().waitNotPending(); - } - freeFiles.clear(); freeFilesCount.set(0); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 0d0a64e76b..5ba981a3ba 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -2050,10 +2050,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } processBackup(); - - for (JournalFile file : dataFilesToProcess) { - file.getFile().waitNotPending(); - } return dataFilesToProcess; } diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java index ad783464bf..99f74c7d02 100644 --- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java @@ -84,7 +84,6 @@ public class FileIOUtilTest { file.close(true, false); } } finally { - file.waitNotPending(); factory.stop(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AsyncOpenCloseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AsyncOpenCloseTest.java new file mode 100644 index 0000000000..4aed037518 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AsyncOpenCloseTest.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.journal; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.ReusableLatch; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +public class AsyncOpenCloseTest extends ActiveMQTestBase { + + private static final Logger logger = Logger.getLogger(AsyncOpenCloseTest.class); + + @Test + public void testCloseOnSubmit() throws Exception { + Assume.assumeTrue(LibaioContext.isLoaded()); + AtomicInteger errors = new AtomicInteger(0); + + SequentialFileFactory factory = new AIOSequentialFileFactory(temporaryFolder.getRoot(), (Throwable error, String message, SequentialFile file) -> errors.incrementAndGet(), 4 * 1024); + factory.start(); + + SequentialFile file = factory.createSequentialFile("fileAIO.bin"); + file.open(1024, true); + + final int WRITES = 100; + final int RECORD_SIZE = 4 * 1024; + final int OPEN_TIMES = 25; + + file.fill(WRITES * RECORD_SIZE); + + ByteBuffer buffer = factory.newBuffer(RECORD_SIZE); + ActiveMQBuffer buffer2 = ActiveMQBuffers.wrappedBuffer(buffer); + + try { + + file.close(true, false); + AtomicInteger submit = new AtomicInteger(0); + + ReusableLatch valve = new ReusableLatch(0); + + byte writtenByte = (byte) 'a'; + for (int nclose = 0; nclose < OPEN_TIMES; nclose++) { + logger.debug("************************************************** test " + nclose); + writtenByte++; + if (writtenByte >= (byte) 'z') { + writtenByte = (byte) 'a'; + } + buffer2.setIndex(0, 0); + for (int s = 0; s < RECORD_SIZE; s++) { + buffer2.writeByte(writtenByte); + } + file.open(1024, true); + CyclicBarrier blocked = new CyclicBarrier(2); + for (int i = 0; i < WRITES; i++) { + if (i == 10) { + valve.countUp(); + } + file.position(i * RECORD_SIZE); + submit.incrementAndGet(); + buffer2.setIndex(0, RECORD_SIZE); + file.write(buffer2, true, new IOCallback() { + @Override + public void done() { + try { + if (!valve.await(1, TimeUnit.MILLISECONDS)) { + logger.debug("blocking"); + blocked.await(); + valve.await(10, TimeUnit.SECONDS); + logger.debug("unblocking"); + } + } catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + submit.decrementAndGet(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + errors.incrementAndGet(); + } + }); + } + blocked.await(); + logger.debug("Closing"); + file.close(false, false); + // even though the callback is blocked, the content of the file should already be good as written + validateFile(file, (byte) writtenByte); + valve.countDown(); + Wait.assertEquals(0, submit::get, 5000, 10); + + } + Wait.assertEquals(0, submit::get); + } finally { + factory.releaseBuffer(buffer); + factory.stop(); + } + + Assert.assertEquals(0, errors.get()); + + } + + private void validateFile(SequentialFile file, byte writtenByte) throws IOException { + FileInputStream fileInputStream = new FileInputStream(file.getJavaFile()); + byte[] wholeFile = fileInputStream.readAllBytes(); + for (int i = 0; i < wholeFile.length; i++) { + Assert.assertEquals(writtenByte, (byte) wholeFile[i]); + } + fileInputStream.close(); + } + +}