diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index b823ca5d18..be95bf918b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -63,11 +63,11 @@ public class NIOSequentialFile extends AbstractSequentialFile { */ private static final int CHUNK_SIZE = 2 * 1024 * 1024; - private FileChannel channel; + protected volatile FileChannel channel; - private RandomAccessFile rfile; + protected volatile RandomAccessFile rfile; - private final int maxIO; + protected final int maxIO; public NIOSequentialFile(final SequentialFileFactory factory, final File directory, @@ -317,18 +317,27 @@ public class NIOSequentialFile extends AbstractSequentialFile { @Override public void sync() throws IOException { - if (factory.isDatasync() && channel != null) { + FileChannel channel1 = channel; + if (factory.isDatasync() && channel1 != null && channel1.isOpen()) { try { - channel.force(false); - } catch (ClosedChannelException e) { - throw e; + syncChannel(channel1); } catch (IOException e) { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - throw e; + if (e instanceof ClosedChannelException) { + // ClosedChannelException here means the file was closed after TimedBuffer issued a sync + // we are performing the sync away from locks to ensure scalability and this is an expected cost + logger.debug("ClosedChannelException for file {}", file, e); + } else { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + throw e; + } } } } + protected void syncChannel(FileChannel syncChannel) throws IOException { + syncChannel.force(false); + } + @Override public long size() throws IOException { if (channel == null) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java index cb42f67dc2..e7054416e8 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java @@ -19,7 +19,9 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -31,8 +33,11 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.core.io.DummyCallback; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFile; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.Env; @@ -211,9 +216,76 @@ public class TimedBufferTest extends ActiveMQTestBase { } finally { timedBuffer.stop(); } + } + @Test + public void testSyncOnNIOClosed() throws Exception { + TimedBuffer timedBuffer = new TimedBuffer(null, 100, 1, false); + timedBuffer.start(); + runAfterEx(timedBuffer::stop); + NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1) { + @Override + public SequentialFile createSequentialFile(final String fileName) { + return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor) { + @Override + protected void syncChannel(FileChannel channel) throws IOException { + try { + close(true, true); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + super.syncChannel(channel); + } + }; + } + }; + assertTrue(factory.isDatasync()); + AtomicInteger errors = new AtomicInteger(0); + + int x = 0; + + byte[] bytes = new byte[10]; + for (int j = 0; j < 10; j++) { + bytes[j] = ActiveMQTestBase.getSamplebyte(x++); + } + + ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes); + + ReusableLatch done = new ReusableLatch(1); + IOCallback callback = new IOCallback() { + @Override + public void done() { + done.countDown(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + logger.warn("error= {} / {}", errorCode, errorMessage); + errors.incrementAndGet(); + } + }; + + try { + for (int i = 0; i < 100; i++) { + if (i % 10 == 0) { + logger.info("i {}", i); + } + done.setCount(1); + SequentialFile closeOnSyncFile = factory.createSequentialFile("test.txt", 1); + closeOnSyncFile.open(100, false); + closeOnSyncFile.position(0); + closeOnSyncFile.setTimedBuffer(timedBuffer); + timedBuffer.addBytes(buff, true, callback); + assertTrue(done.await(1, TimeUnit.MINUTES)); + assertEquals(0, errors.get()); + closeOnSyncFile.close(true, true); + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + throw e; + } } private static void spinSleep(long timeout) {