ARTEMIS-1117 Improving IO Resilience Part II
https://issues.apache.org/jira/browse/ARTEMIS-1117
This commit is contained in:
parent
23ba3e27d9
commit
0a0955d0cc
|
@ -90,6 +90,7 @@ public class JournalFilesRepository {
|
||||||
pushOpenedFile();
|
pushOpenedFile();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQJournalLogger.LOGGER.errorPushingFile(e);
|
ActiveMQJournalLogger.LOGGER.errorPushingFile(e);
|
||||||
|
fileFactory.onIOError(e, "unable to open ", null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -412,21 +413,35 @@ public class JournalFilesRepository {
|
||||||
logger.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
|
logger.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (openFilesExecutor == null) {
|
// First try to get an open file, that's prepared and already open
|
||||||
pushOpenRunnable.run();
|
JournalFile nextFile = openedFiles.poll();
|
||||||
} else {
|
|
||||||
openFilesExecutor.execute(pushOpenRunnable);
|
|
||||||
}
|
|
||||||
|
|
||||||
JournalFile nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
|
|
||||||
if (nextFile == null) {
|
if (nextFile == null) {
|
||||||
fileFactory.onIOError(ActiveMQJournalBundle.BUNDLE.fileNotOpened(), "unable to open ", null);
|
// if there's none, push to open
|
||||||
// We need to reconnect the current file with the timed buffer as we were not able to roll the file forward
|
|
||||||
// If you don't do this you will get a NPE in TimedBuffer::checkSize where it uses the bufferobserver
|
pushOpen();
|
||||||
fileFactory.activateBuffer(journal.getCurrentFile().getFile());
|
|
||||||
throw ActiveMQJournalBundle.BUNDLE.fileNotOpened();
|
nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (openedFiles.isEmpty()) {
|
||||||
|
// if empty, push to open one.
|
||||||
|
pushOpen();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nextFile == null) {
|
||||||
|
|
||||||
|
logger.debug("Could not get a file in 5 seconds, it will retry directly, without an executor");
|
||||||
|
try {
|
||||||
|
nextFile = takeFile(true, true, true, false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fileFactory.onIOError(e, "unable to open ", null);
|
||||||
|
// We need to reconnect the current file with the timed buffer as we were not able to roll the file forward
|
||||||
|
// If you don't do this you will get a NPE in TimedBuffer::checkSize where it uses the bufferobserver
|
||||||
|
fileFactory.activateBuffer(journal.getCurrentFile().getFile());
|
||||||
|
throw ActiveMQJournalBundle.BUNDLE.fileNotOpened();
|
||||||
|
}
|
||||||
|
}
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Returning file " + nextFile);
|
logger.trace("Returning file " + nextFile);
|
||||||
}
|
}
|
||||||
|
@ -434,6 +449,14 @@ public class JournalFilesRepository {
|
||||||
return nextFile;
|
return nextFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void pushOpen() {
|
||||||
|
if (openFilesExecutor == null) {
|
||||||
|
pushOpenRunnable.run();
|
||||||
|
} else {
|
||||||
|
openFilesExecutor.execute(pushOpenRunnable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Open a file and place it into the openedFiles queue
|
* Open a file and place it into the openedFiles queue
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue