This commit is contained in:
Clebert Suconic 2017-07-13 17:50:33 -04:00
commit 7bcde50ae8
4 changed files with 205 additions and 58 deletions

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
@ -39,7 +40,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
protected static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
public static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
protected final JournalImpl journal;
@ -149,6 +150,60 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
}
}
public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
final List<String> dataFiles,
final List<String> newFiles,
final List<Pair<String, String>> renameFile) throws Exception {
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
if (controlFile.exists()) {
JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION);
final ArrayList<RecordInfo> records = new ArrayList<>();
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() {
@Override
public void onReadAddRecord(final RecordInfo info) throws Exception {
records.add(info);
}
});
if (records.size() == 0) {
// the record is damaged
controlFile.delete();
return null;
} else {
ActiveMQBuffer input = ActiveMQBuffers.wrappedBuffer(records.get(0).data);
int numberDataFiles = input.readInt();
for (int i = 0; i < numberDataFiles; i++) {
dataFiles.add(input.readUTF());
}
int numberNewFiles = input.readInt();
for (int i = 0; i < numberNewFiles; i++) {
newFiles.add(input.readUTF());
}
int numberRenames = input.readInt();
for (int i = 0; i < numberRenames; i++) {
String from = input.readUTF();
String to = input.readUTF();
renameFile.add(new Pair<>(from, to));
}
}
return controlFile;
} else {
return null;
}
}
/**
* Write pending output into file
*/

View File

@ -16,15 +16,10 @@
*/
package org.apache.activemq.artemis.core.journal.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -64,56 +59,6 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
*/
private final LinkedList<CompactCommand> pendingCommands = new LinkedList<>();
public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
final List<String> dataFiles,
final List<String> newFiles,
final List<Pair<String, String>> renameFile) throws Exception {
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
if (controlFile.exists()) {
JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION);
final ArrayList<RecordInfo> records = new ArrayList<>();
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract() {
@Override
public void onReadAddRecord(final RecordInfo info) throws Exception {
records.add(info);
}
});
if (records.size() == 0) {
return null;
} else {
ActiveMQBuffer input = ActiveMQBuffers.wrappedBuffer(records.get(0).data);
int numberDataFiles = input.readInt();
for (int i = 0; i < numberDataFiles; i++) {
dataFiles.add(input.readUTF());
}
int numberNewFiles = input.readInt();
for (int i = 0; i < numberNewFiles; i++) {
newFiles.add(input.readUTF());
}
int numberRenames = input.readInt();
for (int i = 0; i < numberRenames; i++) {
String from = input.readUTF();
String to = input.readUTF();
renameFile.add(new Pair<>(from, to));
}
}
return controlFile;
} else {
return null;
}
}
public List<JournalFile> getNewDataFiles() {
return newDataFiles;
}

View File

@ -73,9 +73,9 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.SimpleFuture;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
@ -435,6 +435,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
try {
final int filesize = (int) file.getFile().size();
if (filesize < JournalImpl.SIZE_HEADER) {
// the file is damaged or the system crash before it was able to write
return -1;
}
wholeFileBuffer = fileFactory.newBuffer(filesize);
final int journalFileSize = file.getFile().read(wholeFileBuffer);
@ -2362,9 +2367,20 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
cleanupList = new ArrayList<>();
cleanupList.add(cleanupRename);
}
return AbstractJournalUpdateTask.writeControlFile(fileFactory, files, newFiles, cleanupList);
return writeControlFile(fileFactory, files, newFiles, cleanupList);
}
protected SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
final List<JournalFile> files,
final List<JournalFile> newFiles,
final List<Pair<String, String>> renames) throws Exception {
return JournalCompactor.writeControlFile(fileFactory, files, newFiles, renames);
}
protected void deleteControlFile(final SequentialFile controlFile) throws Exception {
controlFile.delete();
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.AbstractJournalUpdateTask;
import org.apache.activemq.artemis.core.journal.impl.JournalCompactor;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class CrashOnCompactTest extends ActiveMQTestBase {
static int OK = 2;
static int NOT_OK = 3;
@Before
public void setup() throws Exception {
}
@Test
public void testCrashCompact() throws Exception {
Process process = SpawnedVMSupport.spawnVM(CrashOnCompactTest.class.getCanonicalName(), getTestDirfile().getAbsolutePath());
Assert.assertEquals(OK, process.waitFor());
checkJournalSize();
}
@Test
public void testAddJournal() throws Exception {
addJournal(getTestDirfile(), false);
checkJournalSize();
}
private void checkJournalSize() throws Exception {
JournalImpl journal = createJournal(getTestDirfile(), false);
ArrayList<RecordInfo> info = new ArrayList<>();
ArrayList<PreparedTransactionInfo> txInfo = new ArrayList<>();
journal.load(info, txInfo, new TransactionFailureCallback() {
@Override
public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete) {
}
});
Assert.assertEquals(900, info.size());
}
private static void addJournal(File folder, boolean crash) throws Exception {
JournalImpl journal = createJournal(folder, crash);
journal.loadInternalOnly();
for (int i = 0; i < 1000; i++) {
journal.appendAddRecord(i, (byte) 1, new byte[5], true);
}
for (int i = 0; i < 100; i++) {
journal.appendDeleteRecord(i, true);
}
journal.compact();
journal.stop();
}
public static void main(String[] arg) {
try {
addJournal(new File(arg[0]), true);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(NOT_OK);
}
private static JournalImpl createJournal(File folder, boolean crash) throws Exception {
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(folder, 10);
JournalImpl journal = new JournalImpl(100 * 1024, 2, 2, 0, 0, factory, "jrntest", "jrn", 512) {
@Override
protected SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
final List<JournalFile> files,
final List<JournalFile> newFiles,
final List<Pair<String, String>> renames) throws Exception {
if (crash) {
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
controlFile.open();
controlFile.close();
System.err.println("crashing after creation of control file");
System.exit(OK);
}
return JournalCompactor.writeControlFile(fileFactory, files, newFiles, renames);
}
};
journal.setAutoReclaim(false);
journal.start();
return journal;
}
}