mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5542 fix (via revert below) and test case applied with thanks.
Revert "resolve https://issues.apache.org/activemq/browse/AMQ-2736, logic issue in code that keeps data files with acks around pending message file gc. thanks jgenender - test case to follow"
This reverts commit dd68c61e65
.
resolves: https://issues.apache.org/jira/browse/AMQ-5542 and applies test case that nicely demonstrates the defect, thanks
This commit is contained in:
parent
b35b8e37f6
commit
b0a1bd833c
|
@ -1672,14 +1672,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("gc candidates: " + gcCandidateSet);
|
||||
}
|
||||
final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
|
||||
Iterator<Integer> candidates = gcCandidateSet.iterator();
|
||||
while (candidates.hasNext()) {
|
||||
Integer candidate = candidates.next();
|
||||
Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
|
||||
if (referencedFileIds != null) {
|
||||
for (Integer referencedFileId : referencedFileIds) {
|
||||
if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
|
||||
if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
|
||||
// active file that is not targeted for deletion is referenced so don't delete
|
||||
candidates.remove();
|
||||
break;
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.bugs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -117,7 +118,57 @@ public class AMQ2832Test {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Scenario:
|
||||
* db-1.log has an unacknowledged message,
|
||||
* db-2.log contains acks for the messages from db-1.log,
|
||||
* db-3.log contains acks for the messages from db-2.log
|
||||
*
|
||||
* Expected behavior: since db-1.log is blocked, db-2.log and db-3.log should not be removed during the cleanup.
|
||||
* Current situation on 5.10.0, 5.10.1 is that db-3.log is removed causing all messages from db-2.log, whose acks were in db-3.log, to be replayed.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testAckChain() throws Exception {
|
||||
startBroker();
|
||||
|
||||
StagedConsumer consumer = new StagedConsumer();
|
||||
// file #1
|
||||
produceMessagesToConsumeMultipleDataFiles(5);
|
||||
// acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log
|
||||
consumer.receive(3);
|
||||
|
||||
// send messages by consuming and acknowledging every message right after sent in order to get KahadbAdd and Remove command to be saved together
|
||||
// this is necessary in order to get KahaAddMessageCommand to be saved in one db file and the corresponding KahaRemoveMessageCommand in the next one
|
||||
produceAndConsumeImmediately(20, consumer);
|
||||
consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed
|
||||
|
||||
// now we have 3 files written and started with #4
|
||||
consumer.close();
|
||||
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
|
||||
recoverBroker();
|
||||
|
||||
consumer = new StagedConsumer();
|
||||
Message message = consumer.receive(1);
|
||||
assertNotNull("One message stays unacked from db-1.log", message);
|
||||
message.acknowledge();
|
||||
message = consumer.receive(1);
|
||||
assertNull("There should not be any unconsumed messages any more", message);
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception {
|
||||
for (int i = 0; i < numOfMsgs; i++) {
|
||||
produceMessagesToConsumeMultipleDataFiles(1);
|
||||
consumer.receive(1).acknowledge();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
|
||||
|
||||
startBroker();
|
||||
|
|
Loading…
Reference in New Issue