https://issues.apache.org/jira/browse/AMQ-5542 fix (via revert below) and test case applied with thanks.

Revert "resolve https://issues.apache.org/activemq/browse/AMQ-2736, logic issue in code that keeps data files with acks around pending message file gc. thanks jgenender - test case to follow"

This reverts commit dd68c61e65.

resolves: https://issues.apache.org/jira/browse/AMQ-5542 and applies test case that nicely demonstrates the defect, thanks
This commit is contained in:
gtully 2015-01-30 14:45:26 +00:00 committed by Hadrian Zbarcea
parent b997bd480d
commit e828dc791f
2 changed files with 52 additions and 2 deletions

View File

@ -1672,14 +1672,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("gc candidates: " + gcCandidateSet); LOG.trace("gc candidates: " + gcCandidateSet);
} }
final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
Iterator<Integer> candidates = gcCandidateSet.iterator(); Iterator<Integer> candidates = gcCandidateSet.iterator();
while (candidates.hasNext()) { while (candidates.hasNext()) {
Integer candidate = candidates.next(); Integer candidate = candidates.next();
Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate); Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
if (referencedFileIds != null) { if (referencedFileIds != null) {
for (Integer referencedFileId : referencedFileIds) { for (Integer referencedFileId : referencedFileIds) {
if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) { if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
// active file that is not targeted for deletion is referenced so don't delete // active file that is not targeted for deletion is referenced so don't delete
candidates.remove(); candidates.remove();
break; break;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.bugs; package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -117,7 +118,57 @@ public class AMQ2832Test {
} }
} }
/**
* Scenario:
* db-1.log has an unacknowledged message,
* db-2.log contains acks for the messages from db-1.log,
* db-3.log contains acks for the messages from db-2.log
*
* Expected behavior: since db-1.log is blocked, db-2.log and db-3.log should not be removed during the cleanup.
* Current situation on 5.10.0, 5.10.1 is that db-3.log is removed causing all messages from db-2.log, whose acks were in db-3.log, to be replayed.
*
* @throws Exception
*/
@Test @Test
public void testAckChain() throws Exception {
startBroker();
StagedConsumer consumer = new StagedConsumer();
// file #1
produceMessagesToConsumeMultipleDataFiles(5);
// acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log
consumer.receive(3);
// send messages by consuming and acknowledging every message right after sent in order to get KahadbAdd and Remove command to be saved together
// this is necessary in order to get KahaAddMessageCommand to be saved in one db file and the corresponding KahaRemoveMessageCommand in the next one
produceAndConsumeImmediately(20, consumer);
consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed
// now we have 3 files written and started with #4
consumer.close();
broker.stop();
broker.waitUntilStopped();
recoverBroker();
consumer = new StagedConsumer();
Message message = consumer.receive(1);
assertNotNull("One message stays unacked from db-1.log", message);
message.acknowledge();
message = consumer.receive(1);
assertNull("There should not be any unconsumed messages any more", message);
consumer.close();
}
private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception {
for (int i = 0; i < numOfMsgs; i++) {
produceMessagesToConsumeMultipleDataFiles(1);
consumer.receive(1).acknowledge();
}
}
@Test
public void testAckRemovedMessageReplayedAfterRecovery() throws Exception { public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
startBroker(); startBroker();