This commit is contained in:
Clebert Suconic 2018-02-12 16:01:09 -05:00
commit 1efae00e7c
5 changed files with 96 additions and 43 deletions
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal

View File

@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -64,6 +65,10 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
public class PrintData extends DBOption {
@Option(name = "--safe", description = "It will print your data structure without showing your data")
private boolean safe = false;
private static final String BINDINGS_BANNER = "B I N D I N G S J O U R N A L";
private static final String MESSAGES_BANNER = "M E S S A G E S J O U R N A L";
static {
@ -80,7 +85,7 @@ public class PrintData extends DBOption {
if (configuration.isJDBC()) {
printDataJDBC(configuration, context.out);
} else {
printData(new File(getBinding()), new File(getJournal()), new File(getPaging()), context.out);
printData(new File(getBinding()), new File(getJournal()), new File(getPaging()), context.out, safe);
}
} catch (Exception e) {
treatError(e, "data", "print");
@ -96,23 +101,26 @@ public class PrintData extends DBOption {
printBanner(out, BINDINGS_BANNER);
DescribeJournal.printSurvivingRecords(storageManager.getBindingsJournal(), out);
DescribeJournal.printSurvivingRecords(storageManager.getBindingsJournal(), out, safe);
printBanner(out, MESSAGES_BANNER);
DescribeJournal describeJournal = DescribeJournal.printSurvivingRecords(storageManager.getMessageJournal(), out);
DescribeJournal describeJournal = DescribeJournal.printSurvivingRecords(storageManager.getMessageJournal(), out, safe);
printPages(describeJournal, storageManager, pagingmanager, out);
printPages(describeJournal, storageManager, pagingmanager, out, safe);
cleanup();
}
public static void printData(File bindingsDirectory, File messagesDirectory, File pagingDirectory) throws Exception {
printData(bindingsDirectory, messagesDirectory, pagingDirectory, System.out);
printData(bindingsDirectory, messagesDirectory, pagingDirectory, false);
}
public static void printData(File bindingsDirectory, File messagesDirectory, File pagingDirectory, PrintStream out) throws Exception {
public static void printData(File bindingsDirectory, File messagesDirectory, File pagingDirectory, boolean secret) throws Exception {
printData(bindingsDirectory, messagesDirectory, pagingDirectory, System.out, secret);
}
public static void printData(File bindingsDirectory, File messagesDirectory, File pagingDirectory, PrintStream out, boolean safe) throws Exception {
// Having the version on the data report is an information very useful to understand what happened
// When debugging stuff
Artemis.printBanner(out);
@ -133,7 +141,7 @@ public class PrintData extends DBOption {
printBanner(out, BINDINGS_BANNER);
try {
DescribeJournal.describeBindingsJournal(bindingsDirectory, out);
DescribeJournal.describeBindingsJournal(bindingsDirectory, out, safe);
} catch (Exception e) {
e.printStackTrace();
}
@ -142,7 +150,7 @@ public class PrintData extends DBOption {
DescribeJournal describeJournal = null;
try {
describeJournal = DescribeJournal.describeMessagesJournal(messagesDirectory, out);
describeJournal = DescribeJournal.describeMessagesJournal(messagesDirectory, out, safe);
} catch (Exception e) {
e.printStackTrace();
return;
@ -151,7 +159,7 @@ public class PrintData extends DBOption {
try {
printBanner(out, "P A G I N G");
printPages(pagingDirectory, describeJournal, out);
printPages(pagingDirectory, describeJournal, out, safe);
} catch (Exception e) {
e.printStackTrace();
return;
@ -166,7 +174,7 @@ public class PrintData extends DBOption {
out.println("********************************************");
}
private static void printPages(File pageDirectory, DescribeJournal describeJournal, PrintStream out) {
private static void printPages(File pageDirectory, DescribeJournal describeJournal, PrintStream out, boolean safe) {
try {
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
@ -183,7 +191,7 @@ public class PrintData extends DBOption {
addressSettingsRepository.setDefault(new AddressSettings());
PagingManager manager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
printPages(describeJournal, sm, manager, out);
printPages(describeJournal, sm, manager, out, safe);
} catch (Exception e) {
e.printStackTrace();
}
@ -192,7 +200,8 @@ public class PrintData extends DBOption {
private static void printPages(DescribeJournal describeJournal,
StorageManager sm,
PagingManager manager,
PrintStream out) throws Exception {
PrintStream out,
boolean safe) throws Exception {
PageCursorsInfo cursorACKs = calculateCursorsInfo(describeJournal.getRecords());
Set<Long> pgTXs = cursorACKs.getPgTXs();
@ -222,7 +231,15 @@ public class PrintData extends DBOption {
for (PagedMessage msg : msgs) {
msg.initMessage(sm);
out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ",userMessageID=" + (msg.getMessage().getUserID() != null ? msg.getMessage().getUserID() : "") + ", msg=" + msg.getMessage());
if (safe) {
try {
out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ", msg=" + msg.getMessage().getClass().getSimpleName() + "(safe data, size=" + msg.getMessage().getPersistentSize() + ")");
} catch (Exception e) {
out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ", msg=" + msg.getMessage().getClass().getSimpleName() + "(safe data)");
}
} else {
out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID() + ",userMessageID=" + (msg.getMessage().getUserID() != null ? msg.getMessage().getUserID() : "") + ", msg=" + msg.getMessage());
}
out.print(",Queues = ");
long[] q = msg.getQueueIDs();
for (int i = 0; i < q.length; i++) {

View File

@ -1128,7 +1128,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
checkProperties();
return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties + "]@" + System.identityHashCode(this);
", durable=" + durable + ", address=" + getAddress() + ",size=" + getPersistentSize() + ",properties=" + properties + "]@" + System.identityHashCode(this);
} catch (Throwable e) {
logger.warn("Error creating String for message: ", e);
return "ServerMessage[messageID=" + messageID + "]";

View File

@ -1171,6 +1171,7 @@ public class AMQPMessage extends RefCountMessage {
return "AMQPMessage [durable=" + isDurable() +
", messageID=" + getMessageID() +
", address=" + getAddress() +
", size=" + getEncodeSize() +
"]";
}

View File

@ -75,6 +75,8 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCount
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
@ -107,22 +109,22 @@ public final class DescribeJournal {
}
public static void describeBindingsJournal(final File bindingsDir) throws Exception {
describeBindingsJournal(bindingsDir, System.out);
describeBindingsJournal(bindingsDir, System.out, false);
}
public static void describeBindingsJournal(final File bindingsDir, PrintStream out) throws Exception {
public static void describeBindingsJournal(final File bindingsDir, PrintStream out, boolean safe) throws Exception {
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, null, 1);
JournalImpl bindings = new JournalImpl(1024 * 1024, 2, 2, -1, 0, bindingsFF, "activemq-bindings", "bindings", 1);
describeJournal(bindingsFF, bindings, bindingsDir, out);
describeJournal(bindingsFF, bindings, bindingsDir, out, safe);
}
public static DescribeJournal describeMessagesJournal(final File messagesDir) throws Exception {
return describeMessagesJournal(messagesDir, System.out);
return describeMessagesJournal(messagesDir, System.out, false);
}
public static DescribeJournal describeMessagesJournal(final File messagesDir, PrintStream out) throws Exception {
public static DescribeJournal describeMessagesJournal(final File messagesDir, PrintStream out, boolean safe) throws Exception {
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir, null, 1);
// Will use only default values. The load function should adapt to anything different
@ -130,7 +132,7 @@ public final class DescribeJournal {
JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(), defaultValues.getJournalMinFiles(), defaultValues.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
return describeJournal(messagesFF, messagesJournal, messagesDir, out);
return describeJournal(messagesFF, messagesJournal, messagesDir, out, safe);
}
/**
@ -141,7 +143,8 @@ public final class DescribeJournal {
private static DescribeJournal describeJournal(SequentialFileFactory fileFactory,
JournalImpl journal,
final File path,
PrintStream out) throws Exception {
PrintStream out,
boolean safe) throws Exception {
List<JournalFile> files = journal.orderFiles();
final Map<Long, PageSubscriptionCounterImpl> counters = new HashMap<>();
@ -155,13 +158,13 @@ public final class DescribeJournal {
@Override
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
out.println("operation@UpdateTX;txID=" + transactionID + "," + describeRecord(recordInfo));
out.println("operation@UpdateTX;txID=" + transactionID + "," + describeRecord(recordInfo, safe));
checkRecordCounter(recordInfo);
}
@Override
public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception {
out.println("operation@Update;" + describeRecord(recordInfo));
out.println("operation@Update;" + describeRecord(recordInfo, safe));
checkRecordCounter(recordInfo);
}
@ -180,7 +183,7 @@ public final class DescribeJournal {
@Override
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
out.println("operation@DeleteRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
out.println("operation@DeleteRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo, safe));
}
@Override
@ -195,12 +198,12 @@ public final class DescribeJournal {
@Override
public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception {
out.println("operation@AddRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
out.println("operation@AddRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo, safe));
}
@Override
public void onReadAddRecord(final RecordInfo recordInfo) throws Exception {
out.println("operation@AddRecord;" + describeRecord(recordInfo));
out.println("operation@AddRecord;" + describeRecord(recordInfo, safe));
}
@Override
@ -254,11 +257,12 @@ public final class DescribeJournal {
printCounters(out, counters);
}
return printSurvivingRecords(journal, out);
return printSurvivingRecords(journal, out, safe);
}
public static DescribeJournal printSurvivingRecords(Journal journal,
PrintStream out) throws Exception {
PrintStream out,
boolean safe) throws Exception {
final Map<Long, PageSubscriptionCounterImpl> counters = new HashMap<>();
out.println("### Surviving Records Summary ###");
@ -282,11 +286,11 @@ public final class DescribeJournal {
List<RecordInfo> recordsToDelete) {
bufferFailingTransactions.append("Transaction " + transactionID + " failed with these records:\n");
for (RecordInfo info : records1) {
bufferFailingTransactions.append("- " + describeRecord(info) + "\n");
bufferFailingTransactions.append("- " + describeRecord(info, safe) + "\n");
}
for (RecordInfo info : recordsToDelete) {
bufferFailingTransactions.append("- " + describeRecord(info) + " <marked to delete>\n");
bufferFailingTransactions.append("- " + describeRecord(info, safe) + " <marked to delete>\n");
}
}
@ -334,7 +338,7 @@ public final class DescribeJournal {
subsCounter.processReload();
}
out.println(describeRecord(info, o));
out.println(describeRecord(info, o, safe));
if (subsCounter != null) {
out.println("##SubsCounter for queue=" + queueIDForCounter + ", value=" + subsCounter.getValue());
@ -354,7 +358,7 @@ public final class DescribeJournal {
out.println(tx.getId());
for (RecordInfo info : tx.getRecords()) {
Object o = newObjectEncoding(info);
out.println("- " + describeRecord(info, o));
out.println("- " + describeRecord(info, o, safe));
if (info.getUserRecordType() == 31) {
preparedMessageCount++;
} else if (info.getUserRecordType() == 32) {
@ -370,7 +374,7 @@ public final class DescribeJournal {
}
for (RecordInfo info : tx.getRecordsToDelete()) {
out.println("- " + describeRecord(info) + " <marked to delete>");
out.println("- " + describeRecord(info, safe) + " <marked to delete>");
}
}
@ -387,13 +391,13 @@ public final class DescribeJournal {
out.println("message count=" + messageCount);
out.println("message reference count");
for (Map.Entry<Long, Integer> longIntegerEntry : messageRefCounts.entrySet()) {
System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
}
out.println("prepared message count=" + preparedMessageCount);
for (Map.Entry<Long, Integer> longIntegerEntry : preparedMessageRefCount.entrySet()) {
System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
}
journal.stop();
@ -418,12 +422,40 @@ public final class DescribeJournal {
return subsCounter;
}
private static String describeRecord(RecordInfo info) {
return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";compactCount=" + info.compactCount + ";" + newObjectEncoding(info);
private static boolean isSafe(Object obj) {
// these classes will have user's data and not considered safe
return !(obj instanceof PersistentAddressBindingEncoding ||
obj instanceof MessageDescribe ||
obj instanceof PersistentQueueBindingEncoding);
}
private static String describeRecord(RecordInfo info, Object o) {
return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";compactCount=" + info.compactCount + ";" + o;
private static String toString(Object obj, boolean safe) {
if (obj == null) {
return "** null **";
}
if (safe && !isSafe(obj)) {
if (obj instanceof MessageDescribe) {
MessageDescribe describe = (MessageDescribe)obj;
try {
return describe.getMsg().getClass().getSimpleName() + "(safe data, size=" + describe.getMsg().getPersistentSize() + ")";
} catch (Throwable e) {
e.printStackTrace();
return describe.getMsg().getClass().getSimpleName() + "(safe data)";
}
} else {
return obj.getClass().getSimpleName() + "(safe data)";
}
} else {
return obj.toString();
}
}
private static String describeRecord(RecordInfo info, boolean safe) {
return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";compactCount=" + info.compactCount + ";" + toString(newObjectEncoding(info), safe);
}
private static String describeRecord(RecordInfo info, Object o, boolean safe) {
return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";compactCount=" + info.compactCount + ";" + toString(o, safe);
}
private static String encode(final byte[] data) {

View File

@ -354,9 +354,12 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
}
@Override
public String toString() {
return "LargeServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
", durable=" + durable + ", address=" + getAddress() + ",properties=" + (properties != null ? properties.toString() : "") + "]@" + System.identityHashCode(this);
try {
return "LargeServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + ", durable=" + durable + ", address=" + getAddress() + ",size=" + getPersistentSize() + ",properties=" + (properties != null ? properties.toString() : "") + "]@" + System.identityHashCode(this);
} catch (Exception e) {
e.printStackTrace();
return "LargeServerMessage[messageID=" + messageID + "]";
}
}
@Override