ARTEMIS-3273 Recover tool and PrintData enhancements

This commit is contained in:
Clebert Suconic 2021-04-30 12:44:13 -04:00 committed by clebertsuconic
parent d2676e77f8
commit b0f8f515c5
10 changed files with 491 additions and 57 deletions

View File

@ -54,6 +54,7 @@ import org.apache.activemq.artemis.cli.commands.queue.PurgeQueue;
import org.apache.activemq.artemis.cli.commands.queue.UpdateQueue;
import org.apache.activemq.artemis.cli.commands.tools.HelpData;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.cli.commands.tools.RecoverMessages;
import org.apache.activemq.artemis.cli.commands.tools.journal.CompactJournal;
import org.apache.activemq.artemis.cli.commands.tools.journal.DecodeJournal;
import org.apache.activemq.artemis.cli.commands.tools.journal.EncodeJournal;
@ -171,7 +172,7 @@ public class Artemis {
if (instance != null) {
builder.withGroup("data").withDescription("data tools group (print|imp|exp|encode|decode|compact) (example ./artemis data print)").
withDefaultCommand(HelpData.class).withCommands(PrintData.class, XmlDataExporter.class, XmlDataImporter.class, DecodeJournal.class, EncodeJournal.class, CompactJournal.class);
withDefaultCommand(HelpData.class).withCommands(RecoverMessages.class, PrintData.class, XmlDataExporter.class, XmlDataImporter.class, DecodeJournal.class, EncodeJournal.class, CompactJournal.class);
builder.withGroup("user").withDescription("default file-based user management (add|rm|list|reset) (example ./artemis user list)").
withDefaultCommand(HelpUser.class).withCommands(ListUser.class, AddUser.class, RemoveUser.class, ResetUser.class);
builder = builder.withCommands(Run.class, Stop.class, Kill.class, PerfJournal.class);

View File

@ -69,6 +69,10 @@ public class PrintData extends DBOption {
@Option(name = "--safe", description = "It will print your data structure without showing your data")
private boolean safe = false;
@Option(name = "--reclaimed", description = "This option will try to print as many records as possible from reclaimed files")
private boolean reclaimed = false;
private static final String BINDINGS_BANNER = "B I N D I N G S J O U R N A L";
private static final String MESSAGES_BANNER = "M E S S A G E S J O U R N A L";
static {
@ -85,7 +89,7 @@ public class PrintData extends DBOption {
if (configuration.isJDBC()) {
printDataJDBC(configuration, context.out);
} else {
printData(new File(getBinding()), new File(getJournal()), new File(getPaging()), context.out, safe);
printData(new File(getBinding()), new File(getJournal()), new File(getPaging()), context.out, safe, reclaimed);
}
} catch (Exception e) {
treatError(e, "data", "print");
@ -117,10 +121,10 @@ public class PrintData extends DBOption {
}
public static void printData(File bindingsDirectory, File messagesDirectory, File pagingDirectory, boolean secret) throws Exception {
printData(bindingsDirectory, messagesDirectory, pagingDirectory, System.out, secret);
printData(bindingsDirectory, messagesDirectory, pagingDirectory, System.out, secret, false);
}
public static void printData(File bindingsDirectory, File messagesDirectory, File pagingDirectory, PrintStream out, boolean safe) throws Exception {
public static void printData(File bindingsDirectory, File messagesDirectory, File pagingDirectory, PrintStream out, boolean safe, boolean reclaimed) throws Exception {
// Having the version on the data report is an information very useful to understand what happened
// When debugging stuff
Artemis.printBanner(out);
@ -140,21 +144,14 @@ public class PrintData extends DBOption {
printBanner(out, BINDINGS_BANNER);
try {
DescribeJournal.describeBindingsJournal(bindingsDirectory, out, safe);
} catch (Exception e) {
e.printStackTrace();
}
printBindings(bindingsDirectory, out, safe, true, true, reclaimed);
printBanner(out, MESSAGES_BANNER);
DescribeJournal describeJournal = null;
try {
describeJournal = DescribeJournal.describeMessagesJournal(messagesDirectory, out, safe);
} catch (Exception e) {
e.printStackTrace();
describeJournal = printMessages(messagesDirectory, out, safe, true, true, reclaimed);
if (describeJournal == null)
return;
}
try {
printBanner(out, "P A G I N G");
@ -167,6 +164,25 @@ public class PrintData extends DBOption {
}
public static DescribeJournal printMessages(File messagesDirectory, PrintStream out, boolean safe, boolean printRecords, boolean printSurving, boolean reclaimed) {
DescribeJournal describeJournal;
try {
describeJournal = DescribeJournal.describeMessagesJournal(messagesDirectory, out, safe, printRecords, printSurving, reclaimed);
} catch (Exception e) {
e.printStackTrace();
return null;
}
return describeJournal;
}
public static void printBindings(File bindingsDirectory, PrintStream out, boolean safe, boolean printRecords, boolean printSurviving, boolean reclaimed) {
try {
DescribeJournal.describeBindingsJournal(bindingsDirectory, out, safe, printRecords, printSurviving, reclaimed);
} catch (Exception e) {
e.printStackTrace();
}
}
protected static void printBanner(PrintStream out, String x2) {
out.println();
out.println("********************************************");

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.tools;
import java.io.File;
import java.util.HashSet;
import java.util.List;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.ByteUtil;
@Command(name = "recover", description = "Recover (undelete) every message on the journal by creating a new output journal. Rolled backed and acked messages will be sent out to the output as much as possible.")
public class RecoverMessages extends DBOption {
static {
MessagePersister.registerPersister(CoreMessagePersister.getInstance());
}
@Option(name = "--reclaimed", description = "This option will try to recover as many records as possible from reclaimed files")
private boolean reclaimed = false;
@Option(name = "--target", description = "Output folder container the new journal with all the generated messages", required = true)
private String outputJournal;
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
Configuration configuration = getParameterConfiguration();
File journalOutput = new File(outputJournal);
journalOutput.mkdirs();
if (!journalOutput.isDirectory()) {
throw new IllegalStateException(outputJournal + " is not a directory");
}
try {
if (configuration.isJDBC()) {
throw new IllegalAccessException("JDBC Not supported on recover");
} else {
recover(configuration, journalOutput, reclaimed);
}
} catch (Exception e) {
treatError(e, "data", "print");
}
return null;
}
public static void recover(Configuration configuration, File journalOutput, boolean reclaimed) throws Exception {
File journal = configuration.getJournalLocation();
journalOutput.mkdirs();
SequentialFileFactory outputFF = new NIOSequentialFileFactory(journalOutput, null, 1);
outputFF.setDatasync(false);
JournalImpl targetJournal = new JournalImpl(configuration.getJournalFileSize(), 2, 2, 0, 0, outputFF, "activemq-data", "amq", 1);
targetJournal.start();
targetJournal.loadInternalOnly();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journal, null, 1);
// Will use only default values. The load function should adapt to anything different
JournalImpl messagesJournal = new JournalImpl(configuration.getJournalFileSize(), configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
List<JournalFile> files = messagesJournal.orderFiles();
HashSet<Byte> userRecordsOfInterest = new HashSet<>();
userRecordsOfInterest.add(JournalRecordIds.ADD_LARGE_MESSAGE);
userRecordsOfInterest.add(JournalRecordIds.ADD_MESSAGE);
userRecordsOfInterest.add(JournalRecordIds.ADD_MESSAGE_PROTOCOL);
userRecordsOfInterest.add(JournalRecordIds.ADD_REF);
userRecordsOfInterest.add(JournalRecordIds.PAGE_TRANSACTION);
for (JournalFile file : files) {
System.out.println("Recovering messages from file " + file);
HashSet<Pair<Long, Long>> routeBindigns = new HashSet<>();
JournalImpl.readJournalFile(messagesFF, file, new JournalReaderCallback() {
@Override
public void onReadAddRecord(RecordInfo info) throws Exception {
if (userRecordsOfInterest.contains(info.getUserRecordType())) {
if (targetJournal.getRecords().get(info.id) != null) {
System.out.println("RecordID " + info.id + " would been duplicated, ignoring it");
return;
}
try {
targetJournal.appendAddRecord(info.id, info.userRecordType, info.data, true);
} catch (Exception e) {
System.out.println("Cannot append record for " + info.id + "->" + e.getMessage());
}
}
}
@Override
public void onReadUpdateRecord(RecordInfo info) throws Exception {
if (userRecordsOfInterest.contains(info.getUserRecordType())) {
if (info.getUserRecordType() == JournalRecordIds.ADD_REF) {
long queue = ByteUtil.bytesToLong(info.data);
Pair<Long, Long> pairQueue = new Pair<>(info.id, queue);
if (routeBindigns.contains(pairQueue)) {
System.out.println("AddRef on " + info.id + " / queue=" + queue + " has already been recorded, ignoring it");
return;
}
routeBindigns.add(pairQueue);
}
try {
targetJournal.appendUpdateRecord(info.id, info.userRecordType, info.data, true);
} catch (Exception e) {
System.out.println("Cannot update record " + info.id + "-> " + e.getMessage());
}
}
}
@Override
public void onReadDeleteRecord(long recordID) throws Exception {
}
@Override
public void onReadAddRecordTX(long transactionID, RecordInfo info) throws Exception {
onReadAddRecord(info);
}
@Override
public void onReadUpdateRecordTX(long transactionID, RecordInfo info) throws Exception {
onReadUpdateRecord(info);
}
@Override
public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception {
}
@Override
public void onReadPrepareRecord(long transactionID,
byte[] extraData,
int numberOfRecords) throws Exception {
}
@Override
public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception {
}
@Override
public void onReadRollbackRecord(long transactionID) throws Exception {
}
@Override
public void markAsDataFile(JournalFile file) {
}
}, null, reclaimed);
}
targetJournal.flush();
targetJournal.stop();
outputFF.stop();
}
}

View File

@ -205,6 +205,17 @@ public class ByteUtil {
| ((int) b[0] & 0xff) << 24;
}
public static long bytesToLong(byte[] b) {
return ((long) b[7] & 0xff)
| ((long) b[6] & 0xff) << 8
| ((long) b[5] & 0xff) << 16
| ((long) b[4] & 0xff) << 24
| ((long) b[3] & 0xff) << 32
| ((long) b[2] & 0xff) << 40
| ((long) b[1] & 0xff) << 48
| ((long) b[0] & 0xff) << 56;
}
public static byte[] longToBytes(long value) {
byte[] output = new byte[8];
longToBytes(value, output, 0);
@ -212,13 +223,13 @@ public class ByteUtil {
}
public static void longToBytes(long x, byte[] output, int offset) {
output[offset] = (byte)(x >> 56);
output[offset + 1] = (byte)(x >> 48);
output[offset + 2] = (byte)(x >> 40);
output[offset + 3] = (byte)(x >> 32);
output[offset + 4] = (byte)(x >> 24);
output[offset + 5] = (byte)(x >> 16);
output[offset + 6] = (byte)(x >> 8);
output[offset] = (byte)(x >>> 56);
output[offset + 1] = (byte)(x >>> 48);
output[offset + 2] = (byte)(x >>> 40);
output[offset + 3] = (byte)(x >>> 32);
output[offset + 4] = (byte)(x >>> 24);
output[offset + 5] = (byte)(x >>> 16);
output[offset + 6] = (byte)(x >>> 8);
output[offset + 7] = (byte)(x);
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
public class StringPrintStream {
final ByteArrayOutputStream byteOuptut = new ByteArrayOutputStream();
public PrintStream newStream() throws IOException {
return new PrintStream(byteOuptut, true, StandardCharsets.UTF_8.name());
}
@Override
public String toString() {
try {
return byteOuptut.toString(StandardCharsets.UTF_8.name());
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}

View File

@ -530,10 +530,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return buffer;
}
static int readJournalFile(final SequentialFileFactory fileFactory,
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
final JournalReaderCallback reader,
final AtomicReference<ByteBuffer> wholeFileBufferReference) throws Exception {
return readJournalFile(fileFactory, file, reader, wholeFileBufferReference, false);
}
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
final JournalReaderCallback reader,
final AtomicReference<ByteBuffer> wholeFileBufferReference,
boolean reclaimed) throws Exception {
file.getFile().open(1, false);
ByteBuffer wholeFileBuffer = null;
try {
@ -582,7 +590,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// This record is from a previous file-usage. The file was
// reused and we need to ignore this record
if (readFileId != file.getRecordID()) {
if (readFileId != file.getRecordID() && !reclaimed) {
wholeFileBuffer.position(pos + 1);
continue;
}

View File

@ -18,6 +18,8 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
import javax.transaction.xa.Xid;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.HashMap;
@ -148,31 +150,43 @@ public final class DescribeJournal {
}
public static void describeBindingsJournal(final File bindingsDir) throws Exception {
describeBindingsJournal(bindingsDir, System.out, false);
describeBindingsJournal(bindingsDir, System.out, false, true, true);
}
public static void describeBindingsJournal(final File bindingsDir, PrintStream out, boolean safe) throws Exception {
public static void describeBindingsJournal(final File bindingsDir, PrintStream out, boolean safe, boolean printRecords, boolean printSurviving) throws Exception {
describeBindingsJournal(bindingsDir, out, safe, printRecords, printSurviving, false);
}
public static void describeBindingsJournal(final File bindingsDir, PrintStream out, boolean safe, boolean printRecords, boolean printSurviving, boolean reclaimed) throws Exception {
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, null, 1);
JournalImpl bindings = new JournalImpl(1024 * 1024, 2, 2, -1, 0, bindingsFF, "activemq-bindings", "bindings", 1);
describeJournal(bindingsFF, bindings, bindingsDir, out, safe);
describeJournal(bindingsFF, bindings, bindingsDir, out, safe, printRecords, printSurviving, reclaimed);
}
public static DescribeJournal describeMessagesJournal(final File messagesDir) throws Exception {
return describeMessagesJournal(messagesDir, System.out, false);
return describeMessagesJournal(messagesDir, System.out, false, true, true, false);
}
public static DescribeJournal describeMessagesJournal(final File messagesDir, PrintStream out, boolean safe) throws Exception {
public static DescribeJournal describeMessagesJournal(final File messagesDir, PrintStream out, boolean safe, boolean printRecords, boolean printSurviving, boolean reclaimed) throws Exception {
Configuration configuration = getConfiguration();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir, null, 1);
// Will use only default values. The load function should adapt to anything different
JournalImpl messagesJournal = new JournalImpl(configuration.getJournalFileSize(), configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
return describeJournal(messagesFF, messagesJournal, messagesDir, out, safe);
return describeJournal(messagesFF, messagesJournal, messagesDir, out, safe, printRecords, printSurviving, reclaimed);
}
private static final PrintStream nullPrintStream = new PrintStream(new OutputStream() {
@Override
public void write(int b) throws IOException {
}
});
/**
* @param fileFactory
* @param journal
@ -182,66 +196,72 @@ public final class DescribeJournal {
JournalImpl journal,
final File path,
PrintStream out,
boolean safe) throws Exception {
boolean safe,
boolean printRecords,
boolean printSurving,
boolean reclaimed) throws Exception {
List<JournalFile> files = journal.orderFiles();
final Map<Long, PageSubscriptionCounterImpl> counters = new HashMap<>();
out.println("Journal path: " + path);
PrintStream recordsPrintStream = printRecords ? out : nullPrintStream;
PrintStream survivingPrintStrea = printSurving ? out : nullPrintStream;
recordsPrintStream.println("Journal path: " + path);
for (JournalFile file : files) {
out.println("#" + file + " (size=" + file.getFile().size() + ")");
recordsPrintStream.println("#" + file + " (size=" + file.getFile().size() + ")");
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() {
@Override
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
out.println("operation@UpdateTX;txID=" + transactionID + "," + describeRecord(recordInfo, safe));
recordsPrintStream.println("operation@UpdateTX;txID=" + transactionID + "," + describeRecord(recordInfo, safe));
checkRecordCounter(recordInfo);
}
@Override
public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception {
out.println("operation@Update;" + describeRecord(recordInfo, safe));
recordsPrintStream.println("operation@Update;" + describeRecord(recordInfo, safe));
checkRecordCounter(recordInfo);
}
@Override
public void onReadRollbackRecord(final long transactionID) throws Exception {
out.println("operation@Rollback;txID=" + transactionID);
recordsPrintStream.println("operation@Rollback;txID=" + transactionID);
}
@Override
public void onReadPrepareRecord(final long transactionID,
final byte[] extraData,
final int numberOfRecords) throws Exception {
out.println("operation@Prepare,txID=" + transactionID + ",numberOfRecords=" + numberOfRecords +
recordsPrintStream.println("operation@Prepare,txID=" + transactionID + ",numberOfRecords=" + numberOfRecords +
",extraData=" + encode(extraData) + ", xid=" + toXid(extraData));
}
@Override
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
out.println("operation@DeleteRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo, safe));
recordsPrintStream.println("operation@DeleteRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo, safe));
}
@Override
public void onReadDeleteRecord(final long recordID) throws Exception {
out.println("operation@DeleteRecord;recordID=" + recordID);
recordsPrintStream.println("operation@DeleteRecord;recordID=" + recordID);
}
@Override
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
out.println("operation@Commit;txID=" + transactionID + ",numberOfRecords=" + numberOfRecords);
recordsPrintStream.println("operation@Commit;txID=" + transactionID + ",numberOfRecords=" + numberOfRecords);
}
@Override
public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
out.println("operation@AddRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo, safe));
recordsPrintStream.println("operation@AddRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo, safe));
}
@Override
public void onReadAddRecord(final RecordInfo recordInfo) throws Exception {
out.println("operation@AddRecord;" + describeRecord(recordInfo, safe));
recordsPrintStream.println("operation@AddRecord;" + describeRecord(recordInfo, safe));
}
@Override
@ -256,18 +276,18 @@ public final class DescribeJournal {
PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter);
if (subsCounter.getValue() != 0 && subsCounter.getValue() != encoding.getValue()) {
out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.getValue());
recordsPrintStream.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.getValue());
}
subsCounter.loadValue(info.id, encoding.getValue(), encoding.getPersistentSize());
subsCounter.processReload();
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() + ", result=" + subsCounter.getValue());
recordsPrintStream.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() + ", result=" + subsCounter.getValue());
if (subsCounter.getValue() < 0) {
out.println(" #NegativeCounter!!!!");
recordsPrintStream.println(" #NegativeCounter!!!!");
} else {
out.println();
recordsPrintStream.println();
}
out.println();
recordsPrintStream.println();
} else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
PageCountRecordInc encoding = (PageCountRecordInc) newObjectEncoding(info);
long queueIDForCounter = encoding.getQueueID();
@ -276,26 +296,26 @@ public final class DescribeJournal {
subsCounter.loadInc(info.id, encoding.getValue(), encoding.getPersistentSize());
subsCounter.processReload();
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() + " increased by " + encoding.getValue());
recordsPrintStream.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() + " increased by " + encoding.getValue());
if (subsCounter.getValue() < 0) {
out.println(" #NegativeCounter!!!!");
recordsPrintStream.println(" #NegativeCounter!!!!");
} else {
out.println();
recordsPrintStream.println();
}
out.println();
recordsPrintStream.println();
}
}
});
}, null, reclaimed);
}
out.println();
recordsPrintStream.println();
if (counters.size() != 0) {
out.println("#Counters during initial load:");
printCounters(out, counters);
recordsPrintStream.println("#Counters during initial load:");
printCounters(recordsPrintStream, counters);
}
return printSurvivingRecords(journal, out, safe);
return printSurvivingRecords(journal, survivingPrintStrea, safe);
}
public static DescribeJournal printSurvivingRecords(Journal journal,

View File

@ -64,7 +64,7 @@ public class AMQPPrintDataTest extends ActiveMQTestBase {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
PrintStream printStream = new PrintStream(byteArrayOutputStream, true, StandardCharsets.UTF_8.name());
PrintData.printData(server.getConfiguration().getBindingsLocation().getAbsoluteFile(), server.getConfiguration().getJournalLocation().getAbsoluteFile(), server.getConfiguration().getPagingLocation().getAbsoluteFile(),
printStream, false);
printStream, false, false);
Assert.assertTrue(byteArrayOutputStream.toString().contains(random));

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cli;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.io.File;
import org.apache.activemq.artemis.cli.commands.tools.RecoverMessages;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Assert;
import org.junit.Test;
public class RecoverTest extends JMSTestBase {
@Override
protected boolean usePersistence() {
return true;
}
@Test
public void testRecoverCoreNoTx() throws Exception {
testRecover(false, "CORE");
}
@Test
public void testRecoverCORETx() throws Exception {
testRecover(true, "CORE");
}
@Test
public void testRecoverAMQPNoTx() throws Exception {
testRecover(false, "AMQP");
}
@Test
public void testRecoverAMQPTx() throws Exception {
testRecover(true, "AMQP");
}
@Test
public void testRecoverOpenWireNoTx() throws Exception {
testRecover(false, "OPENWIRE");
}
@Test
public void testRecoverOpenWireTx() throws Exception {
testRecover(true, "OPENWIRE");
}
public void testRecover(boolean useTX, String protocol) throws Exception {
createQueue(true, "TestQueue");
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(useTX, useTX ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("TestQueue");
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 1000; i++) {
producer.send(session.createTextMessage("test1"));
}
if (useTX) {
session.commit();
}
// Using compacting here, will kind of duplicate all the records into reclaimed files
// this might cause extra challenges on recovering the data
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(60_000);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < 1000; i++) {
Assert.assertNotNull(consumer.receive(5000));
}
if (useTX) {
session.commit();
}
connection.close();
server.stop();
File newJournalLocation = new File(server.getConfiguration().getJournalLocation().getParentFile(), "recovered");
RecoverMessages.recover(server.getConfiguration(), newJournalLocation, true);
server.getConfiguration().setJournalDirectory(newJournalLocation.getAbsolutePath());
server.start();
factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
connection = factory.createConnection();
session = connection.createSession(useTX, useTX ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
connection.start();
consumer = session.createConsumer(queue);
for (int i = 0; i < 1000; i++) {
Assert.assertNotNull(consumer.receive(5000));
}
Assert.assertNull(consumer.receiveNoWait());
if (useTX) {
session.commit();
}
connection.close();
}
}

View File

@ -63,7 +63,7 @@ public class JournalDataPrintTest extends ActiveMQTestBase {
public void write(int b) throws IOException {
// dev/null
}
}), false);
}), false, false);
// list journal file
File dirFile = server.getConfiguration().getJournalLocation().getAbsoluteFile();