ARTEMIS-351 throw an exception if we get an IOException
This commit is contained in:
parent
54222d8667
commit
7b164e45c3
|
@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
|
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
|
||||||
|
@ -418,8 +419,10 @@ public class JournalFilesRepository {
|
||||||
* <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
|
* <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
|
||||||
* <p>In case there are no cached opened files, this method will block until the file was opened,
|
* <p>In case there are no cached opened files, this method will block until the file was opened,
|
||||||
* what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as ActiveMQ).</p>
|
* what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as ActiveMQ).</p>
|
||||||
|
*
|
||||||
|
* @throws ActiveMQIOErrorException In case the file could not be opened
|
||||||
*/
|
*/
|
||||||
public JournalFile openFile() throws InterruptedException {
|
public JournalFile openFile() throws InterruptedException, ActiveMQIOErrorException {
|
||||||
if (JournalFilesRepository.trace) {
|
if (JournalFilesRepository.trace) {
|
||||||
JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
|
JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
|
||||||
}
|
}
|
||||||
|
@ -431,13 +434,13 @@ public class JournalFilesRepository {
|
||||||
openFilesExecutor.execute(pushOpenRunnable);
|
openFilesExecutor.execute(pushOpenRunnable);
|
||||||
}
|
}
|
||||||
|
|
||||||
JournalFile nextFile = null;
|
JournalFile nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
|
||||||
|
if (nextFile == null) {
|
||||||
while (nextFile == null) {
|
fileFactory.onIOError(ActiveMQJournalBundle.BUNDLE.fileNotOpened(), "unable to open ", null);
|
||||||
nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
|
// We need to reconnect the current file with the timed buffer as we were not able to roll the file forward
|
||||||
if (nextFile == null) {
|
// If you don't do this you will get a NPE in TimedBuffer::checkSize where it uses the bufferobserver
|
||||||
fileFactory.onIOError(ActiveMQJournalBundle.BUNDLE.fileNotOpened(), "unable to open ", null);
|
fileFactory.activateBuffer(journal.getCurrentFile().getFile());
|
||||||
}
|
throw ActiveMQJournalBundle.BUNDLE.fileNotOpened();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (JournalFilesRepository.trace) {
|
if (JournalFilesRepository.trace) {
|
||||||
|
@ -465,7 +468,19 @@ public class JournalFilesRepository {
|
||||||
public void closeFile(final JournalFile file) throws Exception {
|
public void closeFile(final JournalFile file) throws Exception {
|
||||||
fileFactory.deactivateBuffer();
|
fileFactory.deactivateBuffer();
|
||||||
file.getFile().close();
|
file.getFile().close();
|
||||||
dataFiles.add(file);
|
if (!dataFiles.contains(file)) {
|
||||||
|
// This is not a retry from openFile
|
||||||
|
// If you don't check this then retries keep adding the same file into
|
||||||
|
// dataFiles list and the compactor then re-adds multiple copies of the
|
||||||
|
// same file into freeFiles.
|
||||||
|
// The consequence of that is that you can end up with the same file
|
||||||
|
// twice in a row in the list of openedFiles
|
||||||
|
// The consequence of that is that JournalImpl::switchFileIfNecessary
|
||||||
|
// will throw throw new IllegalStateException("Invalid logic on buffer allocation")
|
||||||
|
// because the file will be checked effectively twice and the buffer will
|
||||||
|
// not fit in it
|
||||||
|
dataFiles.add(file);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue