From 31d5758885214ff65aebfdeb6024f7b275397bff Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 13 Jul 2017 17:02:33 -0400 Subject: [PATCH] ARTEMIS-1288 crash during compact control files issues The system would become irresponsive if crashed right at the control file writing. --- .../impl/AbstractJournalUpdateTask.java | 57 +++++++- .../core/journal/impl/JournalCompactor.java | 55 -------- .../core/journal/impl/JournalImpl.java | 20 ++- .../journal/CrashOnCompactTest.java | 131 ++++++++++++++++++ 4 files changed, 205 insertions(+), 58 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/CrashOnCompactTest.java diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java index 8de3da6400..9d983c7676 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncoderPersister; +import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; @@ -39,7 +40,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback // Constants ----------------------------------------------------- // Attributes ---------------------------------------------------- - protected static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr"; + public static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr"; protected final JournalImpl journal; @@ -149,6 +150,60 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback } } + public static SequentialFile readControlFile(final SequentialFileFactory fileFactory, + final List dataFiles, + final List newFiles, + final List> renameFile) throws Exception { + SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL); + + if (controlFile.exists()) { + JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION); + + final ArrayList records = new ArrayList<>(); + + + JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() { + @Override + public void onReadAddRecord(final RecordInfo info) throws Exception { + records.add(info); + } + }); + + if (records.size() == 0) { + // the record is damaged + controlFile.delete(); + return null; + } else { + ActiveMQBuffer input = ActiveMQBuffers.wrappedBuffer(records.get(0).data); + + int numberDataFiles = input.readInt(); + + for (int i = 0; i < numberDataFiles; i++) { + dataFiles.add(input.readUTF()); + } + + int numberNewFiles = input.readInt(); + + for (int i = 0; i < numberNewFiles; i++) { + newFiles.add(input.readUTF()); + } + + int numberRenames = input.readInt(); + for (int i = 0; i < numberRenames; i++) { + String from = input.readUTF(); + String to = input.readUTF(); + renameFile.add(new Pair<>(from, to)); + } + + } + + return controlFile; + } else { + return null; + } + } + + /** * Write pending output into file */ diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java index e3e1e7b469..95c35d0b2e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java @@ -16,15 +16,10 @@ */ package org.apache.activemq.artemis.core.journal.impl; -import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -64,56 +59,6 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ */ private final LinkedList pendingCommands = new LinkedList<>(); - public static SequentialFile readControlFile(final SequentialFileFactory fileFactory, - final List dataFiles, - final List newFiles, - final List> renameFile) throws Exception { - SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL); - - if (controlFile.exists()) { - JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION); - - final ArrayList records = new ArrayList<>(); - - JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() { - @Override - public void onReadAddRecord(final RecordInfo info) throws Exception { - records.add(info); - } - }); - - if (records.size() == 0) { - return null; - } else { - ActiveMQBuffer input = ActiveMQBuffers.wrappedBuffer(records.get(0).data); - - int numberDataFiles = input.readInt(); - - for (int i = 0; i < numberDataFiles; i++) { - dataFiles.add(input.readUTF()); - } - - int numberNewFiles = input.readInt(); - - for (int i = 0; i < numberNewFiles; i++) { - newFiles.add(input.readUTF()); - } - - int numberRenames = input.readInt(); - for (int i = 0; i < numberRenames; i++) { - String from = input.readUTF(); - String to = input.readUTF(); - renameFile.add(new Pair<>(from, to)); - } - - } - - return controlFile; - } else { - return null; - } - } - public List getNewDataFiles() { return newDataFiles; } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index bb557e4fb1..1b7ba266a7 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -73,9 +73,9 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.SimpleFuture; import org.apache.activemq.artemis.utils.SimpleFutureImpl; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; @@ -435,6 +435,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal try { final int filesize = (int) file.getFile().size(); + if (filesize < JournalImpl.SIZE_HEADER) { + // the file is damaged or the system crash before it was able to write + return -1; + } + wholeFileBuffer = fileFactory.newBuffer(filesize); final int journalFileSize = file.getFile().read(wholeFileBuffer); @@ -2362,9 +2367,20 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal cleanupList = new ArrayList<>(); cleanupList.add(cleanupRename); } - return AbstractJournalUpdateTask.writeControlFile(fileFactory, files, newFiles, cleanupList); + return writeControlFile(fileFactory, files, newFiles, cleanupList); } + + protected SequentialFile writeControlFile(final SequentialFileFactory fileFactory, + final List files, + final List newFiles, + final List> renames) throws Exception { + + return JournalCompactor.writeControlFile(fileFactory, files, newFiles, renames); + } + + + protected void deleteControlFile(final SequentialFile controlFile) throws Exception { controlFile.delete(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/CrashOnCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/CrashOnCompactTest.java new file mode 100644 index 0000000000..46ca025076 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/CrashOnCompactTest.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.journal; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; +import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; +import org.apache.activemq.artemis.core.journal.impl.AbstractJournalUpdateTask; +import org.apache.activemq.artemis.core.journal.impl.JournalCompactor; +import org.apache.activemq.artemis.core.journal.impl.JournalFile; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.SpawnedVMSupport; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CrashOnCompactTest extends ActiveMQTestBase { + + static int OK = 2; + static int NOT_OK = 3; + + @Before + public void setup() throws Exception { + } + + @Test + public void testCrashCompact() throws Exception { + Process process = SpawnedVMSupport.spawnVM(CrashOnCompactTest.class.getCanonicalName(), getTestDirfile().getAbsolutePath()); + Assert.assertEquals(OK, process.waitFor()); + checkJournalSize(); + } + + @Test + public void testAddJournal() throws Exception { + addJournal(getTestDirfile(), false); + checkJournalSize(); + } + + private void checkJournalSize() throws Exception { + JournalImpl journal = createJournal(getTestDirfile(), false); + ArrayList info = new ArrayList<>(); + ArrayList txInfo = new ArrayList<>(); + journal.load(info, txInfo, new TransactionFailureCallback() { + @Override + public void failedTransaction(long transactionID, List records, List recordsToDelete) { + + } + }); + + Assert.assertEquals(900, info.size()); + } + + private static void addJournal(File folder, boolean crash) throws Exception { + JournalImpl journal = createJournal(folder, crash); + journal.loadInternalOnly(); + for (int i = 0; i < 1000; i++) { + journal.appendAddRecord(i, (byte) 1, new byte[5], true); + } + + for (int i = 0; i < 100; i++) { + journal.appendDeleteRecord(i, true); + } + + journal.compact(); + journal.stop(); + } + + public static void main(String[] arg) { + + try { + addJournal(new File(arg[0]), true); + } catch (Exception e) { + e.printStackTrace(); + } + System.exit(NOT_OK); + + } + + private static JournalImpl createJournal(File folder, boolean crash) throws Exception { + NIOSequentialFileFactory factory = new NIOSequentialFileFactory(folder, 10); + JournalImpl journal = new JournalImpl(100 * 1024, 2, 2, 0, 0, factory, "jrntest", "jrn", 512) { + + @Override + protected SequentialFile writeControlFile(final SequentialFileFactory fileFactory, + final List files, + final List newFiles, + final List> renames) throws Exception { + + if (crash) { + SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL); + controlFile.open(); + controlFile.close(); + System.err.println("crashing after creation of control file"); + System.exit(OK); + } + return JournalCompactor.writeControlFile(fileFactory, files, newFiles, renames); + } + + }; + journal.setAutoReclaim(false); + + journal.start(); + return journal; + + } + +}