From e828dc791f59691d2d0ba2ce169082e4c1984139 Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 30 Jan 2015 14:45:26 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5542 fix (via revert below) and test case applied with thanks. Revert "resolve https://issues.apache.org/activemq/browse/AMQ-2736, logic issue in code that keeps data files with acks around pending message file gc. thanks jgenender - test case to follow" This reverts commit dd68c61e65f24b7dc498b36e34960a4bc46ded4b. resolves: https://issues.apache.org/jira/browse/AMQ-5542 and applies test case that nicely demonstrates the defect, thanks --- .../store/kahadb/MessageDatabase.java | 3 +- .../org/apache/activemq/bugs/AMQ2832Test.java | 51 +++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 35a59cec4b..477f42c711 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1672,14 +1672,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (LOG.isTraceEnabled()) { LOG.trace("gc candidates: " + gcCandidateSet); } - final TreeSet gcCandidates = new TreeSet(gcCandidateSet); Iterator candidates = gcCandidateSet.iterator(); while (candidates.hasNext()) { Integer candidate = candidates.next(); Set referencedFileIds = metadata.ackMessageFileMap.get(candidate); if (referencedFileIds != null) { for (Integer referencedFileId : referencedFileIds) { - if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) { + if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) { // active file that is not targeted for deletion is referenced so don't delete candidates.remove(); break; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java index 319fcc2304..22ad6abd4c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java @@ -17,6 +17,7 @@ package org.apache.activemq.bugs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -117,7 +118,57 @@ public class AMQ2832Test { } } + /** + * Scenario: + * db-1.log has an unacknowledged message, + * db-2.log contains acks for the messages from db-1.log, + * db-3.log contains acks for the messages from db-2.log + * + * Expected behavior: since db-1.log is blocked, db-2.log and db-3.log should not be removed during the cleanup. + * Current situation on 5.10.0, 5.10.1 is that db-3.log is removed causing all messages from db-2.log, whose acks were in db-3.log, to be replayed. + * + * @throws Exception + */ @Test + public void testAckChain() throws Exception { + startBroker(); + + StagedConsumer consumer = new StagedConsumer(); + // file #1 + produceMessagesToConsumeMultipleDataFiles(5); + // acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log + consumer.receive(3); + + // send messages by consuming and acknowledging every message right after sent in order to get KahadbAdd and Remove command to be saved together + // this is necessary in order to get KahaAddMessageCommand to be saved in one db file and the corresponding KahaRemoveMessageCommand in the next one + produceAndConsumeImmediately(20, consumer); + consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed + + // now we have 3 files written and started with #4 + consumer.close(); + + broker.stop(); + broker.waitUntilStopped(); + + recoverBroker(); + + consumer = new StagedConsumer(); + Message message = consumer.receive(1); + assertNotNull("One message stays unacked from db-1.log", message); + message.acknowledge(); + message = consumer.receive(1); + assertNull("There should not be any unconsumed messages any more", message); + consumer.close(); + } + + private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception { + for (int i = 0; i < numOfMsgs; i++) { + produceMessagesToConsumeMultipleDataFiles(1); + consumer.receive(1).acknowledge(); + } + } + + @Test public void testAckRemovedMessageReplayedAfterRecovery() throws Exception { startBroker();