mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3120 - KahaDB error: "Could not locate data file"
Test failure, issue with over eager cleanup. journal write as part of cleanup was incrementing the last update location leaving the current journal file as a candidate for cleanup prematurely. This could ocurr between a destination store journal write and index update so that the file was not present in the inexe. Capturing the last update exclusive of the checkpoint write resolves this git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1070484 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a1d3245182
commit
6b643dc2c3
|
@ -1142,7 +1142,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
*/
|
||||
void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
|
||||
LOG.debug("Checkpoint started.");
|
||||
|
||||
|
||||
// reflect last update exclusive of current checkpoint
|
||||
Location firstTxLocation = metadata.lastUpdate;
|
||||
|
||||
metadata.state = OPEN_STATE;
|
||||
metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
|
||||
metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
|
||||
|
@ -1153,16 +1156,19 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
|
||||
final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
|
||||
final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
|
||||
|
||||
|
||||
LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
|
||||
|
||||
// Don't GC files under replication
|
||||
if( journalFilesBeingReplicated!=null ) {
|
||||
gcCandidateSet.removeAll(journalFilesBeingReplicated);
|
||||
}
|
||||
|
||||
// Don't GC files after the first in progress tx
|
||||
Location firstTxLocation = metadata.lastUpdate;
|
||||
|
||||
// Don't GC files after the first in progress tx
|
||||
if( metadata.firstInProgressTransactionLocation!=null ) {
|
||||
firstTxLocation = metadata.firstInProgressTransactionLocation;
|
||||
if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
|
||||
firstTxLocation = metadata.firstInProgressTransactionLocation;
|
||||
};
|
||||
}
|
||||
|
||||
if( firstTxLocation!=null ) {
|
||||
|
|
|
@ -109,8 +109,8 @@ public class AMQ3120Test {
|
|||
|
||||
@Test
|
||||
public void testCleanupOfFiles() throws Exception {
|
||||
|
||||
startBroker(false);
|
||||
final int messageCount = 500;
|
||||
startBroker(true);
|
||||
int fileCount = getFileCount(kahaDbDir);
|
||||
assertEquals(4, fileCount);
|
||||
|
||||
|
@ -126,9 +126,11 @@ public class AMQ3120Test {
|
|||
return sess.createTextMessage(payload + "::" + i);
|
||||
}
|
||||
};
|
||||
producer.setSleep(1500);
|
||||
producer.setSleep(650);
|
||||
producer.setMessageCount(messageCount);
|
||||
ConsumerThread consumer = new ConsumerThread(consumerSess, destination);
|
||||
consumer.setBreakOnNull(false);
|
||||
consumer.setMessageCount(messageCount);
|
||||
|
||||
producer.start();
|
||||
consumer.start();
|
||||
|
@ -136,6 +138,7 @@ public class AMQ3120Test {
|
|||
producer.join();
|
||||
consumer.join();
|
||||
|
||||
assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived());
|
||||
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
|
|
Loading…
Reference in New Issue