ARTEMIS-4041 Improve critical IO reporting

This commit is contained in:
Clebert Suconic 2022-10-11 14:09:04 -04:00 committed by clebertsuconic
parent b604545a3c
commit 49d33470f9
19 changed files with 62 additions and 52 deletions

View File

@ -152,7 +152,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
}
@Override
public void onIOError(Exception exception, String message, SequentialFile file) {
public void onIOError(Throwable exception, String message, String file) {
criticalErrorListener.onIOException(exception, message, file);
}
@ -236,7 +236,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
try {
file.sync();
} catch (Exception e) {
criticalErrorListener.onIOException(e, "Error during JDBC file sync.", file);
criticalErrorListener.onIOException(e, "Error during JDBC file sync.", file.getFileName());
}
}
}

View File

@ -99,7 +99,7 @@ public class JDBCSequentialFileFactoryTest {
String jdbcDatasourceClass = ActiveMQDefaultConfiguration.getDefaultDataSourceClassName();
factory = new JDBCSequentialFileFactory(new JDBCConnectionProvider(JDBCDataSourceUtils.getDataSource(jdbcDatasourceClass, dataSourceProperties)), JDBCUtils.getSQLProvider(dataSourceProperties, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
public void onIOException(Throwable code, String message, String file) {
}
});
factory.start();

View File

@ -126,7 +126,7 @@ public abstract class AbstractSequentialFile implements SequentialFile {
} catch (ClosedChannelException e) {
throw e;
} catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this.file.getName());
throw e;
}
}

View File

@ -180,8 +180,13 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
}
@Override
public void onIOError(Exception exception, String message, SequentialFile file) {
ActiveMQJournalLogger.LOGGER.criticalIO(message, exception);
public void onIOError(Throwable exception, String message, String file) {
if (file != null) {
ActiveMQJournalLogger.LOGGER.criticalIOFile(message, file, exception);
} else {
ActiveMQJournalLogger.LOGGER.criticalIO(message, exception);
}
if (critialErrorListener != null) {
critialErrorListener.onIOException(exception, message, file);
}
@ -222,7 +227,7 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
boolean ok = journalDir.mkdirs();
if (!ok && !journalDir.exists()) {
IOException e = new IOException("Unable to create directory: " + journalDir);
onIOError(e, e.getMessage(), null);
onIOError(e, e.getMessage());
throw e;
}
}

View File

@ -21,5 +21,5 @@ package org.apache.activemq.artemis.core.io;
*/
public interface IOCriticalErrorListener {
void onIOException(Throwable code, String message, SequentialFile file);
void onIOException(Throwable code, String message, String file);
}

View File

@ -55,7 +55,15 @@ public interface SequentialFileFactory {
/**
* The SequentialFile will call this method when a disk IO Error happens during the live phase.
*/
void onIOError(Exception exception, String message, SequentialFile file);
void onIOError(Throwable exception, String message, String file);
default void onIOError(Throwable exception, String message, SequentialFile file) {
onIOError(exception, message, file.getFileName());
}
default void onIOError(Throwable exception, String message) {
onIOError(exception, message, (String) null);
}
/**
* used for cases where you need direct buffer outside of the journal context.

View File

@ -418,7 +418,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
if (callback != null) {
callback.onError(errorCode, errorMessage);
}
onIOError(new ActiveMQException(errorCode, errorMessage), errorMessage, null);
onIOError(new ActiveMQException(errorCode, errorMessage), errorMessage);
errorMessage = null;
} else {
if (callback != null) {
@ -447,7 +447,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
libaioContext.poll();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
onIOError(new ActiveMQException("Error on libaio poll"), e.getMessage(), null);
onIOError(new ActiveMQException("Error on libaio poll"), e.getMessage());
}
}
}

View File

@ -168,7 +168,7 @@ final class MappedSequentialFile implements SequentialFile {
callback.done();
} catch (IOException e) {
if (this.criticalErrorListener != null) {
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this.getFileName());
}
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
throw e;
@ -204,7 +204,7 @@ final class MappedSequentialFile implements SequentialFile {
callback.done();
} catch (IOException e) {
if (this.criticalErrorListener != null) {
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this.getFileName());
}
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
throw e;
@ -241,7 +241,7 @@ final class MappedSequentialFile implements SequentialFile {
callback.done();
} catch (IOException e) {
if (this.criticalErrorListener != null) {
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this.getFileName());
}
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
throw new RuntimeException(e);
@ -315,7 +315,7 @@ final class MappedSequentialFile implements SequentialFile {
}
} catch (IOException e) {
if (this.criticalErrorListener != null) {
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this.getFileName());
}
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
throw e;

View File

@ -102,7 +102,7 @@ public class JournalFilesRepository {
pushOpenedFile();
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.errorPushingFile(e);
fileFactory.onIOError(e, "unable to open ", null);
fileFactory.onIOError(e, "unable to open ");
}
}
};
@ -495,7 +495,7 @@ public class JournalFilesRepository {
try {
nextFile = takeFile(true, true, true, false);
} catch (Exception e) {
fileFactory.onIOError(e, "unable to open ", null);
fileFactory.onIOError(e, "unable to open ");
// We need to reconnect the current file with the timed buffer as we were not able to roll the file forward
// If you don't do this you will get a NPE in TimedBuffer::checkSize where it uses the bufferobserver
fileFactory.activateBuffer(journal.getCurrentFile().getFile());

View File

@ -1888,11 +1888,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
} catch (Throwable e) {
try {
criticalIO(e);
} catch (Throwable ignored) {
logger.warn(ignored.getMessage(), ignored);
}
fileFactory.onIOError(e, e.getMessage());
return;
} finally {
journalLock.writeLock().unlock();
@ -1909,11 +1905,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
logger.debug("Finished compacting on journal {}", this);
} catch (Throwable e) {
try {
criticalIO(e);
} catch (Throwable ignored) {
logger.warn(ignored.getMessage(), ignored);
}
fileFactory.onIOError(e, e.getMessage());
}
} finally {
compactorLock.writeLock().unlock();
@ -2478,11 +2470,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
try {
Files.copy(copyFrom.toPath(), copyTo.toPath(), StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
try {
criticalIO(e);
} catch (Exception ignored) {
}
fileFactory.onIOError(e, e.getMessage(), copyFrom.getName());
}
try {
@ -2490,7 +2478,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (criticalErrorListener != null) {
criticalErrorListener.onIOException(e, e.getMessage(), fileToCopy.getFile());
criticalErrorListener.onIOException(e, e.getMessage(), fileToCopy.getFile().getFileName());
}
}
@ -3507,21 +3495,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// The same check needs to be done at the new file also
if (!currentFile.getFile().fits(size)) {
// Sanity check, this should never happen
throw new IllegalStateException("Invalid logic on buffer allocation");
// The exception will be thrown by criticalIO
Exception reportingException = ActiveMQJournalBundle.BUNDLE.unexpectedFileSize(currentFile.getFile().getFileName(), size, currentFile.getFile().size());
fileFactory.onIOError(reportingException, reportingException.getMessage());
return null;
}
}
return currentFile;
} catch (Throwable e) {
criticalIO(e);
criticalIO(e, null);
return null; // this will never happen, the method will call throw
}
}
private void criticalIO(Throwable e) throws Exception {
if (criticalErrorListener != null) {
criticalErrorListener.onIOException(e, e.getMessage(), currentFile == null ? null : currentFile.getFile());
}
private void criticalIO(Throwable e, SequentialFile file) throws Exception {
fileFactory.onIOError(e, e.getMessage(), file);
if (e instanceof Exception) {
throw (Exception) e;
} else if (e instanceof IllegalStateException) {

View File

@ -46,4 +46,7 @@ public interface ActiveMQJournalBundle {
@Message(id = 149005, value = "Message of {} bytes is bigger than the max record size of {} bytes. You should try to move large application properties to the message body.")
ActiveMQIOErrorException recordLargerThanStoreMax(long recordSize, long maxRecordSize);
@Message(id = 149006, value = "The file system returned a file {} with unexpected file size. The broker requested a file sized as {} but the system returned a file sized as {}")
ActiveMQIOErrorException unexpectedFileSize(String fileName, long expectedSize, long returnedSize);
}

View File

@ -198,6 +198,9 @@ public interface ActiveMQJournalLogger {
void cantOpenFileTimeout(long timeout);
@LogMessage(id = 144010, value = "Critical IO Exception happened: {}", level = LogMessage.Level.WARN)
void criticalIO(String message, Exception error);
void criticalIO(String message, Throwable error);
// same as criticalIO but with the FileName associated (if there's a file available)
@LogMessage(id = 144011, value = "Critical IO Exception happened: {} on {}", level = LogMessage.Level.WARN)
void criticalIOFile(String message, String fileName, Throwable error);
}

View File

@ -53,7 +53,7 @@ public class FileIOUtilTest {
Assume.assumeTrue(LibaioContext.isLoaded());
AtomicInteger errors = new AtomicInteger(0);
SequentialFileFactory factory = new AIOSequentialFileFactory(temporaryFolder.getRoot(), (Throwable error, String message, SequentialFile file) -> errors.incrementAndGet(), 4 * 1024);
SequentialFileFactory factory = new AIOSequentialFileFactory(temporaryFolder.getRoot(), (Throwable error, String message, String file) -> errors.incrementAndGet(), 4 * 1024);
factory.start();
SequentialFile file = factory.createSequentialFile("fileAIO.bin");

View File

@ -93,7 +93,7 @@ public class NullStorageManager implements StorageManager {
public NullStorageManager() {
this(new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
public void onIOException(Throwable code, String message, String file) {
code.printStackTrace();
}
});

View File

@ -78,7 +78,6 @@ import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
@ -4230,7 +4229,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private final AtomicBoolean failedAlready = new AtomicBoolean();
@Override
public synchronized void onIOException(Throwable cause, String message, SequentialFile file) {
public synchronized void onIOException(Throwable cause, String message, String file) {
if (!failedAlready.compareAndSet(false, true)) {
return;
}
@ -4238,7 +4237,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (file == null) {
ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
} else {
ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file, cause);
}
stopTheServer(true);

View File

@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -128,7 +127,7 @@ public class JDBCJournalTest extends ActiveMQTestBase {
executorService = Executors.newSingleThreadExecutor();
journal = new JDBCJournalImpl(dbConf.getConnectionProvider(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
public void onIOException(Throwable code, String message, String file) {
}
}, 5);

View File

@ -50,7 +50,7 @@ public class AsyncOpenCloseTest extends ActiveMQTestBase {
Assume.assumeTrue(LibaioContext.isLoaded());
AtomicInteger errors = new AtomicInteger(0);
SequentialFileFactory factory = new AIOSequentialFileFactory(temporaryFolder.getRoot(), (Throwable error, String message, SequentialFile file) -> errors.incrementAndGet(), 4 * 1024);
SequentialFileFactory factory = new AIOSequentialFileFactory(temporaryFolder.getRoot(), (Throwable error, String message, String file) -> errors.incrementAndGet(), 4 * 1024);
factory.start();
SequentialFile file = factory.createSequentialFile("fileAIO.bin");

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
@ -29,9 +30,13 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
protected SequentialFileFactory createFactory(String folder) {
return new NIOSequentialFileFactory(new File(folder), true, 1);
@ -60,8 +65,8 @@ public class NIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase
final AtomicInteger calls = new AtomicInteger(0);
final NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getTestDir()), new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
new Exception("shutdown").printStackTrace();
public void onIOException(Throwable code, String message, String file) {
logger.debug("IOException happening", code);
calls.incrementAndGet();
}
}, 1);

View File

@ -662,7 +662,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
}
@Override
public void onIOError(Exception exception, String message, SequentialFile file) {
public void onIOError(Throwable exception, String message, String file) {
}
@Override