ARTEMIS-4949 TimedBuffer.checkSync could fail on ClosedChannelException

This commit is contained in:
Clebert Suconic 2024-07-22 17:45:51 -04:00 committed by clebertsuconic
parent 10adca5479
commit 7c573db6a1
2 changed files with 90 additions and 9 deletions

View File

@ -63,11 +63,11 @@ public class NIOSequentialFile extends AbstractSequentialFile {
*/
private static final int CHUNK_SIZE = 2 * 1024 * 1024;
private FileChannel channel;
protected volatile FileChannel channel;
private RandomAccessFile rfile;
protected volatile RandomAccessFile rfile;
private final int maxIO;
protected final int maxIO;
public NIOSequentialFile(final SequentialFileFactory factory,
final File directory,
@ -317,18 +317,27 @@ public class NIOSequentialFile extends AbstractSequentialFile {
@Override
public void sync() throws IOException {
if (factory.isDatasync() && channel != null) {
FileChannel channel1 = channel;
if (factory.isDatasync() && channel1 != null && channel1.isOpen()) {
try {
channel.force(false);
} catch (ClosedChannelException e) {
throw e;
syncChannel(channel1);
} catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
if (e instanceof ClosedChannelException) {
// ClosedChannelException here means the file was closed after TimedBuffer issued a sync
// we are performing the sync away from locks to ensure scalability and this is an expected cost
logger.debug("ClosedChannelException for file {}", file, e);
} else {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
}
}
}
}
protected void syncChannel(FileChannel syncChannel) throws IOException {
syncChannel.force(false);
}
@Override
public long size() throws IOException {
if (channel == null) {

View File

@ -19,7 +19,9 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -31,8 +33,11 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFile;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Env;
@ -211,9 +216,76 @@ public class TimedBufferTest extends ActiveMQTestBase {
} finally {
timedBuffer.stop();
}
}
@Test
public void testSyncOnNIOClosed() throws Exception {
TimedBuffer timedBuffer = new TimedBuffer(null, 100, 1, false);
timedBuffer.start();
runAfterEx(timedBuffer::stop);
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1) {
@Override
public SequentialFile createSequentialFile(final String fileName) {
return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor) {
@Override
protected void syncChannel(FileChannel channel) throws IOException {
try {
close(true, true);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
super.syncChannel(channel);
}
};
}
};
assertTrue(factory.isDatasync());
AtomicInteger errors = new AtomicInteger(0);
int x = 0;
byte[] bytes = new byte[10];
for (int j = 0; j < 10; j++) {
bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
}
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
ReusableLatch done = new ReusableLatch(1);
IOCallback callback = new IOCallback() {
@Override
public void done() {
done.countDown();
}
@Override
public void onError(int errorCode, String errorMessage) {
logger.warn("error= {} / {}", errorCode, errorMessage);
errors.incrementAndGet();
}
};
try {
for (int i = 0; i < 100; i++) {
if (i % 10 == 0) {
logger.info("i {}", i);
}
done.setCount(1);
SequentialFile closeOnSyncFile = factory.createSequentialFile("test.txt", 1);
closeOnSyncFile.open(100, false);
closeOnSyncFile.position(0);
closeOnSyncFile.setTimedBuffer(timedBuffer);
timedBuffer.addBytes(buff, true, callback);
assertTrue(done.await(1, TimeUnit.MINUTES));
assertEquals(0, errors.get());
closeOnSyncFile.close(true, true);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
throw e;
}
}
private static void spinSleep(long timeout) {