diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java index 066638bde1..fe9d9053c7 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; @@ -418,8 +419,10 @@ public class JournalFilesRepository { *
This method will instantly return the opened file, and schedule opening and reclaiming.
*In case there are no cached opened files, this method will block until the file was opened, * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as ActiveMQ).
+ * + * @throws ActiveMQIOErrorException In case the file could not be opened */ - public JournalFile openFile() throws InterruptedException { + public JournalFile openFile() throws InterruptedException, ActiveMQIOErrorException { if (JournalFilesRepository.trace) { JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size()); } @@ -431,13 +434,13 @@ public class JournalFilesRepository { openFilesExecutor.execute(pushOpenRunnable); } - JournalFile nextFile = null; - - while (nextFile == null) { - nextFile = openedFiles.poll(5, TimeUnit.SECONDS); - if (nextFile == null) { - fileFactory.onIOError(ActiveMQJournalBundle.BUNDLE.fileNotOpened(), "unable to open ", null); - } + JournalFile nextFile = openedFiles.poll(5, TimeUnit.SECONDS); + if (nextFile == null) { + fileFactory.onIOError(ActiveMQJournalBundle.BUNDLE.fileNotOpened(), "unable to open ", null); + // We need to reconnect the current file with the timed buffer as we were not able to roll the file forward + // If you don't do this you will get a NPE in TimedBuffer::checkSize where it uses the bufferobserver + fileFactory.activateBuffer(journal.getCurrentFile().getFile()); + throw ActiveMQJournalBundle.BUNDLE.fileNotOpened(); } if (JournalFilesRepository.trace) { @@ -465,7 +468,19 @@ public class JournalFilesRepository { public void closeFile(final JournalFile file) throws Exception { fileFactory.deactivateBuffer(); file.getFile().close(); - dataFiles.add(file); + if (!dataFiles.contains(file)) { + // This is not a retry from openFile + // If you don't check this then retries keep adding the same file into + // dataFiles list and the compactor then re-adds multiple copies of the + // same file into freeFiles. + // The consequence of that is that you can end up with the same file + // twice in a row in the list of openedFiles + // The consequence of that is that JournalImpl::switchFileIfNecessary + // will throw throw new IllegalStateException("Invalid logic on buffer allocation") + // because the file will be checked effectively twice and the buffer will + // not fit in it + dataFiles.add(file); + } } /**