ARTEMIS-1418 AIO Shutdown on IOError and logging
(cherry picked from commit 520a40b1a1
)
This commit is contained in:
parent
f51023506e
commit
ee4692d5ca
|
@ -35,9 +35,12 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
|||
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public abstract class AbstractSequentialFile implements SequentialFile {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(AbstractSequentialFile.class);
|
||||
|
||||
private File file;
|
||||
|
||||
protected final File directory;
|
||||
|
@ -267,6 +270,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
|
|||
|
||||
@Override
|
||||
public void onError(final int errorCode, final String errorMessage) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("onError" + " code: " + errorCode + " message: " + errorMessage);
|
||||
}
|
||||
|
||||
final int size = delegates.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
try {
|
||||
|
|
|
@ -33,12 +33,15 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
|||
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories
|
||||
*/
|
||||
public abstract class AbstractSequentialFileFactory implements SequentialFileFactory {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(AbstractSequentialFileFactory.class);
|
||||
|
||||
// Timeout used to wait executors to shutdown
|
||||
protected static final int EXECUTOR_TIMEOUT = 60;
|
||||
|
||||
|
@ -161,6 +164,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
|
|||
public void onIOError(Exception exception, String message, SequentialFile file) {
|
||||
if (critialErrorListener != null) {
|
||||
critialErrorListener.onIOException(exception, message, file);
|
||||
} else {
|
||||
logger.warn("Critical IO Error Called. No Critical IO Error Handler Registered");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,9 +34,12 @@ import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
|
|||
import org.apache.activemq.artemis.jlibaio.LibaioFile;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class AIOSequentialFile extends AbstractSequentialFile {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(AIOSequentialFileFactory.class);
|
||||
|
||||
private boolean opened = false;
|
||||
|
||||
private LibaioFile aioFile;
|
||||
|
@ -114,6 +117,10 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
@Override
|
||||
public synchronized void fill(final int size) throws Exception {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Filling file: " + getFileName());
|
||||
}
|
||||
|
||||
checkOpened();
|
||||
aioFile.fill(size);
|
||||
|
||||
|
@ -129,9 +136,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException {
|
||||
opened = true;
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Opening file: " + getFileName());
|
||||
}
|
||||
|
||||
try {
|
||||
aioFile = aioFactory.libaioContext.openFile(getFile(), factory.isDatasync());
|
||||
} catch (IOException e) {
|
||||
logger.error("Error opening file: " + getFileName());
|
||||
factory.onIOError(e, e.getMessage(), this);
|
||||
throw new ActiveMQNativeIOError(e.getMessage(), e);
|
||||
}
|
||||
|
@ -156,6 +168,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
// Sending it through the callback would make it released
|
||||
aioFile.read(positionToRead, bytesToRead, bytes, getCallback(callback, null));
|
||||
} catch (IOException e) {
|
||||
logger.error("IOError reading file: " + getFileName(), e);
|
||||
factory.onIOError(e, e.getMessage(), this);
|
||||
throw new ActiveMQNativeIOError(e.getMessage(), e);
|
||||
}
|
||||
|
@ -176,6 +189,10 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
@Override
|
||||
public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Write Direct, Sync: " + sync + " File: " + getFileName());
|
||||
}
|
||||
|
||||
if (sync) {
|
||||
SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
|
||||
|
||||
|
|
|
@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.activemq.artemis.ArtemisConstants;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
|
@ -77,6 +79,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
final IOCriticalErrorListener listener) {
|
||||
super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener);
|
||||
callbackPool = new CallbackCache<>(maxIO);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("New AIO File Created");
|
||||
}
|
||||
}
|
||||
|
||||
public AIOSequentialCallback getCallback() {
|
||||
|
@ -304,7 +309,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
try {
|
||||
libaioFile.write(position, bytes, buffer, this);
|
||||
} catch (IOException e) {
|
||||
callback.onError(-1, e.getMessage());
|
||||
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||
onIOError(e, "Failed to write to file", sequentialFile);
|
||||
}
|
||||
}
|
||||
|
@ -337,6 +342,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
|
||||
@Override
|
||||
public void onError(int errno, String message) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.trace("AIO on error issued. Error(code: " + errno + " msg: " + message + ")");
|
||||
}
|
||||
this.error = true;
|
||||
this.errorCode = errno;
|
||||
this.errorMessage = message;
|
||||
|
@ -357,6 +365,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
|
||||
if (error) {
|
||||
callback.onError(errorCode, errorMessage);
|
||||
onIOError(new ActiveMQException(errorCode, errorMessage), errorMessage, null);
|
||||
errorMessage = null;
|
||||
} else {
|
||||
if (callback != null) {
|
||||
|
@ -385,6 +394,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
libaioContext.poll();
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
|
||||
onIOError(new ActiveMQException("Error on libaio poll"), e.getMessage(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.concurrent.Executor;
|
|||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
|
@ -45,14 +46,17 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
|
|||
private final Set<FileStore> stores = new HashSet<>();
|
||||
private double maxUsage;
|
||||
private final Object monitorLock = new Object();
|
||||
private final IOCriticalErrorListener ioCriticalErrorListener;
|
||||
|
||||
public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService,
|
||||
Executor executor,
|
||||
long checkPeriod,
|
||||
TimeUnit timeUnit,
|
||||
double maxUsage) {
|
||||
double maxUsage,
|
||||
IOCriticalErrorListener ioCriticalErrorListener) {
|
||||
super(scheduledExecutorService, executor, checkPeriod, timeUnit, false);
|
||||
this.maxUsage = maxUsage;
|
||||
this.ioCriticalErrorListener = ioCriticalErrorListener;
|
||||
}
|
||||
|
||||
public FileStoreMonitor addCallback(Callback callback) {
|
||||
|
@ -99,6 +103,8 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent {
|
|||
if (over) {
|
||||
break;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
ioCriticalErrorListener.onIOException(ioe, "IO Error while calculating disk usage", null);
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
|
|
|
@ -2127,7 +2127,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
|
||||
try {
|
||||
injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f));
|
||||
injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, shutdownOnCriticalIO));
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
|
|||
};
|
||||
|
||||
final AtomicBoolean fakeReturn = new AtomicBoolean(false);
|
||||
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999) {
|
||||
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999, null) {
|
||||
@Override
|
||||
protected double calculateUsage(FileStore store) throws IOException {
|
||||
if (fakeReturn.get()) {
|
||||
|
@ -127,7 +127,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase {
|
|||
@Test
|
||||
public void testScheduler() throws Exception {
|
||||
|
||||
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9);
|
||||
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9, null);
|
||||
|
||||
final ReusableLatch latch = new ReusableLatch(5);
|
||||
storeMonitor.addStore(getTestDirfile());
|
||||
|
|
Loading…
Reference in New Issue