This commit is contained in:
Clebert Suconic 2020-01-21 12:14:03 -05:00
commit 5dbce95431
1 changed files with 65 additions and 30 deletions

View File

@ -310,10 +310,40 @@ public final class DescribeJournal {
final StringBuffer bufferFailingTransactions = new StringBuffer();
int messageCount = 0;
Map<Long, Integer> messageRefCounts = new HashMap<>();
int preparedMessageCount = 0;
Map<Long, Integer> preparedMessageRefCount = new HashMap<>();
final class Count {
int value;
Count(int v) {
value = v;
}
@Override
public String toString() {
return Integer.toString(value);
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Count count = (Count) o;
return value == count.value;
}
@Override
public int hashCode() {
return Integer.hashCode(value);
}
}
long messageCount = 0;
long largeMessageCount = 0;
Map<Long, Count> messageRefCounts = new HashMap<>();
long preparedMessageCount = 0;
long preparedLargeMessageCount = 0;
Map<Long, Count> preparedMessageRefCount = new HashMap<>();
journal.load(records, preparedTransactions, new TransactionFailureCallback() {
@Override
@ -337,26 +367,28 @@ public final class DescribeJournal {
long queueIDForCounter = 0;
Object o = newObjectEncoding(info);
if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE) {
final byte userRecordType = info.getUserRecordType();
if (userRecordType == ADD_MESSAGE || userRecordType == ADD_MESSAGE_PROTOCOL) {
messageCount++;
} else if (info.getUserRecordType() == JournalRecordIds.ADD_REF) {
} else if (userRecordType == ADD_LARGE_MESSAGE) {
largeMessageCount++;
} else if (userRecordType == JournalRecordIds.ADD_REF) {
ReferenceDescribe ref = (ReferenceDescribe) o;
Integer count = messageRefCounts.get(ref.refEncoding.queueID);
Count count = messageRefCounts.get(ref.refEncoding.queueID);
if (count == null) {
count = 1;
count = new Count(0);
messageRefCounts.put(ref.refEncoding.queueID, count);
} else {
messageRefCounts.put(ref.refEncoding.queueID, count + 1);
}
} else if (info.getUserRecordType() == JournalRecordIds.ACKNOWLEDGE_REF) {
count.value++;
} else if (userRecordType == JournalRecordIds.ACKNOWLEDGE_REF) {
AckDescribe ref = (AckDescribe) o;
Integer count = messageRefCounts.get(ref.refEncoding.queueID);
Count count = messageRefCounts.get(ref.refEncoding.queueID);
if (count == null) {
messageRefCounts.put(ref.refEncoding.queueID, 0);
messageRefCounts.put(ref.refEncoding.queueID, new Count(0));
} else {
messageRefCounts.put(ref.refEncoding.queueID, count - 1);
count.value--;
}
} else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) {
} else if (userRecordType == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) {
PageCountRecord encoding = (PageCountRecord) o;
queueIDForCounter = encoding.getQueueID();
@ -364,7 +396,7 @@ public final class DescribeJournal {
subsCounter.loadValue(info.id, encoding.getValue(), encoding.getPersistentSize());
subsCounter.processReload();
} else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
} else if (userRecordType == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
PageCountRecordInc encoding = (PageCountRecordInc) o;
queueIDForCounter = encoding.getQueueID();
@ -395,17 +427,19 @@ public final class DescribeJournal {
for (RecordInfo info : tx.getRecords()) {
Object o = newObjectEncoding(info);
out.println("- " + describeRecord(info, o, safe));
if (info.getUserRecordType() == 31) {
final byte userRecordType = info.getUserRecordType();
if (userRecordType == ADD_MESSAGE || userRecordType == ADD_MESSAGE_PROTOCOL) {
preparedMessageCount++;
} else if (info.getUserRecordType() == 32) {
} else if (userRecordType == ADD_LARGE_MESSAGE) {
preparedLargeMessageCount++;
} else if (userRecordType == ADD_REF) {
ReferenceDescribe ref = (ReferenceDescribe) o;
Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID);
Count count = preparedMessageRefCount.get(ref.refEncoding.queueID);
if (count == null) {
count = 1;
count = new Count(0);
preparedMessageRefCount.put(ref.refEncoding.queueID, count);
} else {
preparedMessageRefCount.put(ref.refEncoding.queueID, count + 1);
}
count.value++;
}
}
@ -425,16 +459,17 @@ public final class DescribeJournal {
out.println("### Message Counts ###");
out.println("message count=" + messageCount);
out.println("large message count=" + largeMessageCount);
out.println("message reference count");
for (Map.Entry<Long, Integer> longIntegerEntry : messageRefCounts.entrySet()) {
out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
}
messageRefCounts.forEach((queueId, count) -> {
out.println("queue id " + queueId + ",count=" + count);
});
out.println("prepared message count=" + preparedMessageCount);
for (Map.Entry<Long, Integer> longIntegerEntry : preparedMessageRefCount.entrySet()) {
out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
}
out.println("prepared large message count=" + preparedLargeMessageCount);
out.println("prepared message reference count");
preparedMessageRefCount.forEach((queueId, count) -> {
out.println("queue id " + queueId + ",count=" + count);
});
journal.stop();