ARTEMIS-1288 crash during compact control files issues
The system would become irresponsive if crashed right at the control file writing.
This commit is contained in:
parent
ac6420038f
commit
31d5758885
|
@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.journal.EncoderPersister;
|
import org.apache.activemq.artemis.core.journal.EncoderPersister;
|
||||||
|
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||||
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
|
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
|
||||||
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
|
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
|
||||||
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
|
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
|
||||||
|
@ -39,7 +40,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
||||||
// Constants -----------------------------------------------------
|
// Constants -----------------------------------------------------
|
||||||
|
|
||||||
// Attributes ----------------------------------------------------
|
// Attributes ----------------------------------------------------
|
||||||
protected static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
|
public static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
|
||||||
|
|
||||||
protected final JournalImpl journal;
|
protected final JournalImpl journal;
|
||||||
|
|
||||||
|
@ -149,6 +150,60 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
|
||||||
|
final List<String> dataFiles,
|
||||||
|
final List<String> newFiles,
|
||||||
|
final List<Pair<String, String>> renameFile) throws Exception {
|
||||||
|
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
|
||||||
|
|
||||||
|
if (controlFile.exists()) {
|
||||||
|
JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION);
|
||||||
|
|
||||||
|
final ArrayList<RecordInfo> records = new ArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
|
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() {
|
||||||
|
@Override
|
||||||
|
public void onReadAddRecord(final RecordInfo info) throws Exception {
|
||||||
|
records.add(info);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (records.size() == 0) {
|
||||||
|
// the record is damaged
|
||||||
|
controlFile.delete();
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
ActiveMQBuffer input = ActiveMQBuffers.wrappedBuffer(records.get(0).data);
|
||||||
|
|
||||||
|
int numberDataFiles = input.readInt();
|
||||||
|
|
||||||
|
for (int i = 0; i < numberDataFiles; i++) {
|
||||||
|
dataFiles.add(input.readUTF());
|
||||||
|
}
|
||||||
|
|
||||||
|
int numberNewFiles = input.readInt();
|
||||||
|
|
||||||
|
for (int i = 0; i < numberNewFiles; i++) {
|
||||||
|
newFiles.add(input.readUTF());
|
||||||
|
}
|
||||||
|
|
||||||
|
int numberRenames = input.readInt();
|
||||||
|
for (int i = 0; i < numberRenames; i++) {
|
||||||
|
String from = input.readUTF();
|
||||||
|
String to = input.readUTF();
|
||||||
|
renameFile.add(new Pair<>(from, to));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return controlFile;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write pending output into file
|
* Write pending output into file
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -16,15 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.journal.impl;
|
package org.apache.activemq.artemis.core.journal.impl;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.journal.EncoderPersister;
|
import org.apache.activemq.artemis.core.journal.EncoderPersister;
|
||||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||||
|
@ -64,56 +59,6 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
|
||||||
*/
|
*/
|
||||||
private final LinkedList<CompactCommand> pendingCommands = new LinkedList<>();
|
private final LinkedList<CompactCommand> pendingCommands = new LinkedList<>();
|
||||||
|
|
||||||
public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
|
|
||||||
final List<String> dataFiles,
|
|
||||||
final List<String> newFiles,
|
|
||||||
final List<Pair<String, String>> renameFile) throws Exception {
|
|
||||||
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
|
|
||||||
|
|
||||||
if (controlFile.exists()) {
|
|
||||||
JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION);
|
|
||||||
|
|
||||||
final ArrayList<RecordInfo> records = new ArrayList<>();
|
|
||||||
|
|
||||||
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() {
|
|
||||||
@Override
|
|
||||||
public void onReadAddRecord(final RecordInfo info) throws Exception {
|
|
||||||
records.add(info);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (records.size() == 0) {
|
|
||||||
return null;
|
|
||||||
} else {
|
|
||||||
ActiveMQBuffer input = ActiveMQBuffers.wrappedBuffer(records.get(0).data);
|
|
||||||
|
|
||||||
int numberDataFiles = input.readInt();
|
|
||||||
|
|
||||||
for (int i = 0; i < numberDataFiles; i++) {
|
|
||||||
dataFiles.add(input.readUTF());
|
|
||||||
}
|
|
||||||
|
|
||||||
int numberNewFiles = input.readInt();
|
|
||||||
|
|
||||||
for (int i = 0; i < numberNewFiles; i++) {
|
|
||||||
newFiles.add(input.readUTF());
|
|
||||||
}
|
|
||||||
|
|
||||||
int numberRenames = input.readInt();
|
|
||||||
for (int i = 0; i < numberRenames; i++) {
|
|
||||||
String from = input.readUTF();
|
|
||||||
String to = input.readUTF();
|
|
||||||
renameFile.add(new Pair<>(from, to));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return controlFile;
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<JournalFile> getNewDataFiles() {
|
public List<JournalFile> getNewDataFiles() {
|
||||||
return newDataFiles;
|
return newDataFiles;
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,9 +73,9 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.DataConstants;
|
import org.apache.activemq.artemis.utils.DataConstants;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
|
||||||
import org.apache.activemq.artemis.utils.SimpleFuture;
|
import org.apache.activemq.artemis.utils.SimpleFuture;
|
||||||
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
|
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
|
||||||
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
|
||||||
|
@ -435,6 +435,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
try {
|
try {
|
||||||
final int filesize = (int) file.getFile().size();
|
final int filesize = (int) file.getFile().size();
|
||||||
|
|
||||||
|
if (filesize < JournalImpl.SIZE_HEADER) {
|
||||||
|
// the file is damaged or the system crash before it was able to write
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
wholeFileBuffer = fileFactory.newBuffer(filesize);
|
wholeFileBuffer = fileFactory.newBuffer(filesize);
|
||||||
|
|
||||||
final int journalFileSize = file.getFile().read(wholeFileBuffer);
|
final int journalFileSize = file.getFile().read(wholeFileBuffer);
|
||||||
|
@ -2362,9 +2367,20 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
cleanupList = new ArrayList<>();
|
cleanupList = new ArrayList<>();
|
||||||
cleanupList.add(cleanupRename);
|
cleanupList.add(cleanupRename);
|
||||||
}
|
}
|
||||||
return AbstractJournalUpdateTask.writeControlFile(fileFactory, files, newFiles, cleanupList);
|
return writeControlFile(fileFactory, files, newFiles, cleanupList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
|
||||||
|
final List<JournalFile> files,
|
||||||
|
final List<JournalFile> newFiles,
|
||||||
|
final List<Pair<String, String>> renames) throws Exception {
|
||||||
|
|
||||||
|
return JournalCompactor.writeControlFile(fileFactory, files, newFiles, renames);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
protected void deleteControlFile(final SequentialFile controlFile) throws Exception {
|
protected void deleteControlFile(final SequentialFile controlFile) throws Exception {
|
||||||
controlFile.delete();
|
controlFile.delete();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,131 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.integration.journal;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
||||||
|
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||||
|
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
|
||||||
|
import org.apache.activemq.artemis.core.journal.impl.AbstractJournalUpdateTask;
|
||||||
|
import org.apache.activemq.artemis.core.journal.impl.JournalCompactor;
|
||||||
|
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||||
|
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class CrashOnCompactTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
static int OK = 2;
|
||||||
|
static int NOT_OK = 3;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCrashCompact() throws Exception {
|
||||||
|
Process process = SpawnedVMSupport.spawnVM(CrashOnCompactTest.class.getCanonicalName(), getTestDirfile().getAbsolutePath());
|
||||||
|
Assert.assertEquals(OK, process.waitFor());
|
||||||
|
checkJournalSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddJournal() throws Exception {
|
||||||
|
addJournal(getTestDirfile(), false);
|
||||||
|
checkJournalSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkJournalSize() throws Exception {
|
||||||
|
JournalImpl journal = createJournal(getTestDirfile(), false);
|
||||||
|
ArrayList<RecordInfo> info = new ArrayList<>();
|
||||||
|
ArrayList<PreparedTransactionInfo> txInfo = new ArrayList<>();
|
||||||
|
journal.load(info, txInfo, new TransactionFailureCallback() {
|
||||||
|
@Override
|
||||||
|
public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete) {
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.assertEquals(900, info.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void addJournal(File folder, boolean crash) throws Exception {
|
||||||
|
JournalImpl journal = createJournal(folder, crash);
|
||||||
|
journal.loadInternalOnly();
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
journal.appendAddRecord(i, (byte) 1, new byte[5], true);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
journal.appendDeleteRecord(i, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
journal.compact();
|
||||||
|
journal.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] arg) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
addJournal(new File(arg[0]), true);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
System.exit(NOT_OK);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JournalImpl createJournal(File folder, boolean crash) throws Exception {
|
||||||
|
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(folder, 10);
|
||||||
|
JournalImpl journal = new JournalImpl(100 * 1024, 2, 2, 0, 0, factory, "jrntest", "jrn", 512) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
|
||||||
|
final List<JournalFile> files,
|
||||||
|
final List<JournalFile> newFiles,
|
||||||
|
final List<Pair<String, String>> renames) throws Exception {
|
||||||
|
|
||||||
|
if (crash) {
|
||||||
|
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
|
||||||
|
controlFile.open();
|
||||||
|
controlFile.close();
|
||||||
|
System.err.println("crashing after creation of control file");
|
||||||
|
System.exit(OK);
|
||||||
|
}
|
||||||
|
return JournalCompactor.writeControlFile(fileFactory, files, newFiles, renames);
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
journal.setAutoReclaim(false);
|
||||||
|
|
||||||
|
journal.start();
|
||||||
|
return journal;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue