From c3e74e055c724c52c61ea9a2450743e30d6cbd3f Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Wed, 15 Jan 2020 19:24:15 +0100 Subject: [PATCH] ARTEMIS-2599 DescribeJournal isn't correctly counting surviving msg --- .../impl/journal/DescribeJournal.java | 95 +++++++++++++------ 1 file changed, 65 insertions(+), 30 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index e579704152..910b0c1260 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -310,10 +310,40 @@ public final class DescribeJournal { final StringBuffer bufferFailingTransactions = new StringBuffer(); - int messageCount = 0; - Map messageRefCounts = new HashMap<>(); - int preparedMessageCount = 0; - Map preparedMessageRefCount = new HashMap<>(); + final class Count { + + int value; + + Count(int v) { + value = v; + } + + @Override + public String toString() { + return Integer.toString(value); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + Count count = (Count) o; + return value == count.value; + } + + @Override + public int hashCode() { + return Integer.hashCode(value); + } + } + long messageCount = 0; + long largeMessageCount = 0; + Map messageRefCounts = new HashMap<>(); + long preparedMessageCount = 0; + long preparedLargeMessageCount = 0; + Map preparedMessageRefCount = new HashMap<>(); journal.load(records, preparedTransactions, new TransactionFailureCallback() { @Override @@ -337,26 +367,28 @@ public final class DescribeJournal { long queueIDForCounter = 0; Object o = newObjectEncoding(info); - if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE) { + final byte userRecordType = info.getUserRecordType(); + if (userRecordType == ADD_MESSAGE || userRecordType == ADD_MESSAGE_PROTOCOL) { messageCount++; - } else if (info.getUserRecordType() == JournalRecordIds.ADD_REF) { + } else if (userRecordType == ADD_LARGE_MESSAGE) { + largeMessageCount++; + } else if (userRecordType == JournalRecordIds.ADD_REF) { ReferenceDescribe ref = (ReferenceDescribe) o; - Integer count = messageRefCounts.get(ref.refEncoding.queueID); + Count count = messageRefCounts.get(ref.refEncoding.queueID); if (count == null) { - count = 1; + count = new Count(0); messageRefCounts.put(ref.refEncoding.queueID, count); - } else { - messageRefCounts.put(ref.refEncoding.queueID, count + 1); } - } else if (info.getUserRecordType() == JournalRecordIds.ACKNOWLEDGE_REF) { + count.value++; + } else if (userRecordType == JournalRecordIds.ACKNOWLEDGE_REF) { AckDescribe ref = (AckDescribe) o; - Integer count = messageRefCounts.get(ref.refEncoding.queueID); + Count count = messageRefCounts.get(ref.refEncoding.queueID); if (count == null) { - messageRefCounts.put(ref.refEncoding.queueID, 0); + messageRefCounts.put(ref.refEncoding.queueID, new Count(0)); } else { - messageRefCounts.put(ref.refEncoding.queueID, count - 1); + count.value--; } - } else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) { + } else if (userRecordType == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) { PageCountRecord encoding = (PageCountRecord) o; queueIDForCounter = encoding.getQueueID(); @@ -364,7 +396,7 @@ public final class DescribeJournal { subsCounter.loadValue(info.id, encoding.getValue(), encoding.getPersistentSize()); subsCounter.processReload(); - } else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) { + } else if (userRecordType == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) { PageCountRecordInc encoding = (PageCountRecordInc) o; queueIDForCounter = encoding.getQueueID(); @@ -395,17 +427,19 @@ public final class DescribeJournal { for (RecordInfo info : tx.getRecords()) { Object o = newObjectEncoding(info); out.println("- " + describeRecord(info, o, safe)); - if (info.getUserRecordType() == 31) { + final byte userRecordType = info.getUserRecordType(); + if (userRecordType == ADD_MESSAGE || userRecordType == ADD_MESSAGE_PROTOCOL) { preparedMessageCount++; - } else if (info.getUserRecordType() == 32) { + } else if (userRecordType == ADD_LARGE_MESSAGE) { + preparedLargeMessageCount++; + } else if (userRecordType == ADD_REF) { ReferenceDescribe ref = (ReferenceDescribe) o; - Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID); + Count count = preparedMessageRefCount.get(ref.refEncoding.queueID); if (count == null) { - count = 1; + count = new Count(0); preparedMessageRefCount.put(ref.refEncoding.queueID, count); - } else { - preparedMessageRefCount.put(ref.refEncoding.queueID, count + 1); } + count.value++; } } @@ -425,16 +459,17 @@ public final class DescribeJournal { out.println("### Message Counts ###"); out.println("message count=" + messageCount); + out.println("large message count=" + largeMessageCount); out.println("message reference count"); - for (Map.Entry longIntegerEntry : messageRefCounts.entrySet()) { - out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue()); - } - + messageRefCounts.forEach((queueId, count) -> { + out.println("queue id " + queueId + ",count=" + count); + }); out.println("prepared message count=" + preparedMessageCount); - - for (Map.Entry longIntegerEntry : preparedMessageRefCount.entrySet()) { - out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue()); - } + out.println("prepared large message count=" + preparedLargeMessageCount); + out.println("prepared message reference count"); + preparedMessageRefCount.forEach((queueId, count) -> { + out.println("queue id " + queueId + ",count=" + count); + }); journal.stop();